-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathagent_processor.py
More file actions
196 lines (169 loc) · 7.04 KB
/
agent_processor.py
File metadata and controls
196 lines (169 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
Agent processor for handling interactions with Microsoft Foundry agents.
Includes MCP (Model Context Protocol) integration for tool calling.
"""
import os
import json
from typing import List, Dict, Any
try:
from azure.ai.projects import AIProjectClient # type: ignore
from azure.identity import DefaultAzureCredential # type: ignore
from services.azure_auth import get_default_credential, get_inference_credential # type: ignore
_REMOTE_AVAILABLE = True
except Exception:
_REMOTE_AVAILABLE = False
def create_function_tool_for_agent(agent_name: str) -> List[Dict[str, Any]]:
"""
Create function tools for a specific agent using MCP.
Args:
agent_name: Name of the agent (e.g., 'interior_designer', 'inventory_agent')
Returns:
List of function tool definitions
"""
# Placeholder for MCP tool integration
# In production, this would connect to MCP servers to get available tools
tools = []
# Define tools based on agent type
if agent_name == "interior_designer":
tools.append({
"type": "function",
"function": {
"name": "create_image",
"description": "Create or modify images based on user requirements",
"parameters": {
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "Image generation prompt"},
"path": {"type": "string", "description": "Path to existing image (optional)"}
},
"required": ["prompt"]
}
}
})
elif agent_name == "inventory_agent":
tools.append({
"type": "function",
"function": {
"name": "inventory_check",
"description": "Check inventory levels for products",
"parameters": {
"type": "object",
"properties": {
"product_dict": {
"type": "object",
"description": "Dictionary mapping product names to product IDs"
}
},
"required": ["product_dict"]
}
}
})
elif agent_name == "customer_loyalty":
tools.append({
"type": "function",
"function": {
"name": "customer_loyalty_check",
"description": "Check customer loyalty status and calculate discount",
"parameters": {
"type": "object",
"properties": {
"customer_id": {"type": "string", "description": "Customer ID"}
},
"required": ["customer_id"]
}
}
})
elif agent_name == "cora":
# Cora (shopper agent) might have general query tools
tools.append({
"type": "function",
"function": {
"name": "search_products",
"description": "Search for products in catalog",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
})
return tools
class AgentProcessor:
"""Handles communication with Microsoft Foundry agents"""
def __init__(self, agent_id: str, project_endpoint: str = None):
"""
Initialize agent processor.
Args:
agent_id: The agent ID from Microsoft Foundry
project_endpoint: Optional project endpoint (reads from env if not provided)
"""
self.agent_id = agent_id
raw_endpoint = (
project_endpoint
or os.environ.get("AZURE_AI_AGENT_ENDPOINT")
or os.environ.get("AZURE_AI_PROJECT_ENDPOINT")
or os.environ.get("AZURE_AI_FOUNDRY_ENDPOINT")
)
if not raw_endpoint or not _REMOTE_AVAILABLE:
raise ValueError("Remote agent support unavailable (endpoint or SDK missing)")
# The Azure AI Projects SDK expects: https://<hub>.services.ai.azure.com/api/projects/<project>
project_name = os.environ.get("AZURE_AI_PROJECT_NAME")
normalized = raw_endpoint.replace("cognitiveservices.azure.com", "services.ai.azure.com")
if "/api/projects/" in normalized:
# Already a full project endpoint
full_project_endpoint = normalized.rstrip("/")
elif project_name:
base_endpoint = normalized.split("/api/")[0].rstrip("/")
full_project_endpoint = f"{base_endpoint}/api/projects/{project_name}"
else:
# Best-effort fallback (may still work if the caller provided a full endpoint)
full_project_endpoint = normalized.rstrip("/")
self.project_endpoint = full_project_endpoint
self.client = AIProjectClient(endpoint=self.project_endpoint, credential=get_default_credential())
def run_conversation_with_text_stream(
self,
user_message: str,
conversation_history: List[Dict[str, str]] = None,
additional_context: Dict[str, Any] = None
):
"""
Run a conversation with the agent and stream the response.
Args:
user_message: The user's message
conversation_history: Optional conversation history
additional_context: Additional context to provide to the agent
Yields:
Chunks of the agent's response
"""
try:
# Create a thread for this conversation
thread = self.client.agents.create_thread()
# Build the message content
message_content = user_message
if additional_context:
message_content = f"Context: {json.dumps(additional_context)}\n\nUser: {user_message}"
# Add message to thread
self.client.agents.create_message(
thread_id=thread.id,
role="user",
content=message_content
)
# Run the agent
run = self.client.agents.create_and_process_run(
thread_id=thread.id,
assistant_id=self.agent_id
)
# Get messages
messages = self.client.agents.list_messages(thread_id=thread.id)
# Find the assistant's response
for message in messages:
if message.role == "assistant":
for content in message.content:
if hasattr(content, 'text'):
yield content.text.value
# Clean up
self.client.agents.delete_thread(thread.id)
except Exception as e:
yield f"Error communicating with agent: {str(e)}"