-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathagent_processor.py
More file actions
354 lines (316 loc) · 14.4 KB
/
agent_processor.py
File metadata and controls
354 lines (316 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
"""
Agent processor for handling interactions with Microsoft Foundry agents.
Includes MCP (Model Context Protocol) integration for tool calling.
"""
import os
import json
from typing import List, Dict, Any
try:
from azure.ai.projects import AIProjectClient # type: ignore
from azure.identity import DefaultAzureCredential # type: ignore
from services.azure_auth import get_default_credential, get_inference_credential # type: ignore
try:
# Preferred runtime client for threads/messages/runs
from azure.ai.agents import AgentsClient # type: ignore
except Exception:
AgentsClient = None # type: ignore
_REMOTE_AVAILABLE = True
except Exception:
_REMOTE_AVAILABLE = False
def create_function_tool_for_agent(agent_name: str) -> List[Dict[str, Any]]:
"""
Create function tools for a specific agent using MCP.
Args:
agent_name: Name of the agent (e.g., 'interior_designer', 'inventory_agent')
Returns:
List of function tool definitions
"""
# Placeholder for MCP tool integration
# In production, this would connect to MCP servers to get available tools
tools = []
# Define tools based on agent type
if agent_name == "interior_designer":
tools.append({
"type": "function",
"function": {
"name": "create_image",
"description": "Create or modify images based on user requirements",
"parameters": {
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "Image generation prompt"},
"path": {"type": "string", "description": "Path to existing image (optional)"}
},
"required": ["prompt"]
}
}
})
elif agent_name == "inventory_agent":
tools.append({
"type": "function",
"function": {
"name": "inventory_check",
"description": "Check inventory levels for products",
"parameters": {
"type": "object",
"properties": {
"product_dict": {
"type": "object",
"description": "Dictionary mapping product names to product IDs"
}
},
"required": ["product_dict"]
}
}
})
elif agent_name == "customer_loyalty":
tools.append({
"type": "function",
"function": {
"name": "customer_loyalty_check",
"description": "Check customer loyalty status and calculate discount",
"parameters": {
"type": "object",
"properties": {
"customer_id": {"type": "string", "description": "Customer ID"}
},
"required": ["customer_id"]
}
}
})
elif agent_name == "cora":
# Cora (shopper agent) might have general query tools
tools.append({
"type": "function",
"function": {
"name": "search_products",
"description": "Search for products in catalog",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
})
return tools
class AgentProcessor:
"""Handles communication with Microsoft Foundry agents"""
def __init__(self, agent_id: str, project_endpoint: str = None):
"""
Initialize agent processor.
Args:
agent_id: The agent ID from Microsoft Foundry
project_endpoint: Optional project endpoint (reads from env if not provided)
"""
self.agent_id = agent_id
self._runtime_agent_id = agent_id
raw_endpoint = (
project_endpoint
or os.environ.get("AZURE_AI_AGENT_ENDPOINT")
or os.environ.get("AZURE_AI_PROJECT_ENDPOINT")
or os.environ.get("AZURE_AI_FOUNDRY_ENDPOINT")
)
if not raw_endpoint or not _REMOTE_AVAILABLE:
raise ValueError("Remote agent support unavailable (endpoint or SDK missing)")
# The Azure AI Projects SDK expects: https://<hub>.services.ai.azure.com/api/projects/<project>
project_name = os.environ.get("AZURE_AI_PROJECT_NAME")
normalized = raw_endpoint.replace("cognitiveservices.azure.com", "services.ai.azure.com")
if "/api/projects/" in normalized:
# Already a full project endpoint
full_project_endpoint = normalized.rstrip("/")
elif project_name:
base_endpoint = normalized.split("/api/")[0].rstrip("/")
full_project_endpoint = f"{base_endpoint}/api/projects/{project_name}"
else:
# Best-effort fallback (may still work if the caller provided a full endpoint)
full_project_endpoint = normalized.rstrip("/")
self.project_endpoint = full_project_endpoint
self.client = AIProjectClient(endpoint=self.project_endpoint, credential=get_default_credential())
# Best-effort: resolve the underlying OpenAI-style assistant id (asst_...)
# when the configured id is a friendly/name-based id.
self._runtime_agent_id = self._maybe_resolve_assistant_id(self._runtime_agent_id)
# Some azure-ai-projects builds expose only agent-management operations on .agents.
# In that case, use azure-ai-agents AgentsClient for thread/message/run operations.
self._agents_api = None
try:
if (
hasattr(self.client, "agents")
and hasattr(self.client.agents, "threads")
and hasattr(self.client.agents.threads, "create")
and hasattr(self.client.agents, "messages")
and hasattr(self.client.agents.messages, "create")
and hasattr(self.client.agents, "runs")
and hasattr(self.client.agents.runs, "create_and_process")
):
self._agents_api = self.client.agents
except Exception:
self._agents_api = None
if self._agents_api is None:
if AgentsClient is None:
raise ValueError(
"Remote agent support unavailable: this SDK build doesn't expose threads on AIProjectClient.agents "
"and azure-ai-agents is not installed."
)
# AgentsClient expects the project endpoint (per Microsoft docs snippets).
self._agents_api = AgentsClient(endpoint=self.project_endpoint, credential=get_default_credential())
def _maybe_resolve_assistant_id(self, configured_id: str) -> str:
if not configured_id:
return configured_id
# Local simulation stays untouched.
if configured_id.startswith("asst_local_"):
return configured_id
# Already an assistant id.
if configured_id.startswith("asst"):
return configured_id
try:
agents = getattr(self.client, "agents", None)
if not agents or not hasattr(agents, "list"):
return configured_id
for agent in agents.list():
agent_id = getattr(agent, "id", None)
agent_name = getattr(agent, "name", None)
if configured_id not in {agent_id, agent_name}:
continue
for attr in (
"assistant_id",
"assistantId",
"openai_assistant_id",
"openaiAssistantId",
"assistantID",
):
value = getattr(agent, attr, None)
if isinstance(value, str) and value.startswith("asst"):
return value
# Some SDKs only populate `id` with an assistant id.
if isinstance(agent_id, str) and agent_id.startswith("asst"):
return agent_id
except Exception:
# Best-effort only; keep configured value.
return configured_id
return configured_id
@staticmethod
def _get_obj_id(obj: Any) -> str | None:
if obj is None:
return None
# SDK models can be rich objects or MutableMapping
if hasattr(obj, "id"):
return getattr(obj, "id")
if isinstance(obj, dict):
return obj.get("id")
return None
def _create_thread(self):
agents = self._agents_api
if hasattr(agents, "threads") and hasattr(agents.threads, "create"):
return agents.threads.create()
if hasattr(agents, "create_thread"):
return agents.create_thread()
raise AttributeError("No supported thread creation method on agents client")
def _delete_thread(self, thread_id: str) -> None:
agents = self._agents_api
if hasattr(agents, "threads") and hasattr(agents.threads, "delete"):
agents.threads.delete(thread_id)
return
if hasattr(agents, "delete_thread"):
agents.delete_thread(thread_id)
return
def _create_message(self, thread_id: str, role: str, content: str) -> None:
agents = self._agents_api
if hasattr(agents, "messages") and hasattr(agents.messages, "create"):
agents.messages.create(thread_id=thread_id, role=role, content=content)
return
if hasattr(agents, "create_message"):
agents.create_message(thread_id=thread_id, role=role, content=content)
return
raise AttributeError("No supported message creation method on agents client")
def _run_and_process(self, thread_id: str):
agents = self._agents_api
runtime_id = self._runtime_agent_id
# Preferred (azure-ai-agents style)
if hasattr(agents, "runs") and hasattr(agents.runs, "create_and_process"):
# Different SDK builds use either `agent_id` or `assistant_id`.
try:
return agents.runs.create_and_process(thread_id=thread_id, agent_id=runtime_id)
except TypeError:
return agents.runs.create_and_process(thread_id=thread_id, assistant_id=runtime_id)
# Older helper naming
if hasattr(agents, "create_and_process_run"):
# This helper is typically OpenAI-assistants shaped.
return agents.create_and_process_run(thread_id=thread_id, assistant_id=runtime_id)
# Some clients expose a one-shot convenience
if hasattr(agents, "create_thread_and_process_run"):
try:
return agents.create_thread_and_process_run(agent_id=runtime_id)
except TypeError:
return agents.create_thread_and_process_run(assistant_id=runtime_id)
raise AttributeError("No supported run method on agents client")
def _list_messages(self, thread_id: str):
agents = self._agents_api
if hasattr(agents, "messages") and hasattr(agents.messages, "list"):
return agents.messages.list(thread_id=thread_id)
if hasattr(agents, "list_messages"):
return agents.list_messages(thread_id=thread_id)
raise AttributeError("No supported message listing method on agents client")
def run_conversation_with_text_stream(
self,
user_message: str,
conversation_history: List[Dict[str, str]] = None,
additional_context: Dict[str, Any] = None
):
"""
Run a conversation with the agent and stream the response.
Args:
user_message: The user's message
conversation_history: Optional conversation history
additional_context: Additional context to provide to the agent
Yields:
Chunks of the agent's response
"""
thread_id: str | None = None
try:
# Create a thread for this conversation
thread = self._create_thread()
thread_id = self._get_obj_id(thread)
if not thread_id:
raise RuntimeError("Agent thread creation returned no id")
# Build the message content
message_content = user_message
if additional_context:
message_content = f"Context: {json.dumps(additional_context)}\n\nUser: {user_message}"
# Add message to thread
self._create_message(thread_id=thread_id, role="user", content=message_content)
# Run the agent
self._run_and_process(thread_id=thread_id)
# Get messages
messages = self._list_messages(thread_id=thread_id)
# Find the assistant's response
for message in messages:
if message.role == "assistant":
# Message content can be a list of blocks or a mapping
contents = getattr(message, "content", None)
if isinstance(message, dict) and contents is None:
contents = message.get("content")
if not contents:
continue
for content in contents:
# SDK content blocks commonly expose .text.value
if hasattr(content, "text") and hasattr(content.text, "value"):
yield content.text.value
elif isinstance(content, dict):
text = content.get("text")
if isinstance(text, dict) and isinstance(text.get("value"), str):
yield text["value"]
elif isinstance(text, str):
yield text
elif isinstance(content, str):
yield content
except Exception as e:
yield f"Error communicating with agent: {str(e)}"
finally:
if thread_id:
try:
self._delete_thread(thread_id)
except Exception:
# Best-effort cleanup; ignore failures
pass