-
Notifications
You must be signed in to change notification settings - Fork 16
adding notifications to python agent #190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,9 @@ | |
| from local_authentication_options import LocalAuthenticationOptions | ||
| from microsoft_agents.hosting.core import Authorization, TurnContext | ||
|
|
||
| # Notifications | ||
| from microsoft_agents_a365.notifications.agent_notification import NotificationTypes | ||
|
|
||
| # Observability Components | ||
| from microsoft_agents_a365.observability.core.config import configure | ||
| from microsoft_agents_a365.observability.extensions.openai import OpenAIAgentsTraceInstrumentor | ||
|
|
@@ -326,6 +329,82 @@ async def process_user_message( | |
|
|
||
| # </MessageProcessing> | ||
|
|
||
| # ========================================================================= | ||
| # NOTIFICATION HANDLING | ||
| # ========================================================================= | ||
| # <NotificationHandling> | ||
|
|
||
| async def handle_agent_notification_activity( | ||
| self, notification_activity, auth: Authorization, auth_handler_name: str, context: TurnContext | ||
| ) -> str: | ||
| """Handle agent notification activities (email, Word mentions, etc.)""" | ||
| try: | ||
| notification_type = notification_activity.notification_type | ||
| logger.info(f"📬 Processing notification: {notification_type}") | ||
|
|
||
| # Setup MCP servers on first call | ||
| await self.setup_mcp_servers(auth, auth_handler_name, context) | ||
|
|
||
| # Handle Email Notifications | ||
| if notification_type == NotificationTypes.EMAIL_NOTIFICATION: | ||
| if not hasattr(notification_activity, "email") or not notification_activity.email: | ||
| return "I could not find the email notification details." | ||
|
|
||
| email = notification_activity.email | ||
| email_body = getattr(email, "html_body", "") or getattr(email, "body", "") | ||
| message = f"You have received the following email. Please follow any instructions in it. {email_body}" | ||
|
|
||
| result = await Runner.run(starting_agent=self.agent, input=message, context=context) | ||
| return self._extract_result(result) or "Email notification processed." | ||
|
|
||
| # Handle Word Comment Notifications | ||
| elif notification_type == NotificationTypes.WPX_COMMENT: | ||
| if not hasattr(notification_activity, "wpx_comment") or not notification_activity.wpx_comment: | ||
| return "I could not find the Word notification details." | ||
|
|
||
| wpx = notification_activity.wpx_comment | ||
| doc_id = getattr(wpx, "document_id", "") | ||
| comment_id = getattr(wpx, "initiating_comment_id", "") | ||
| drive_id = "default" | ||
|
|
||
| # Get Word document content | ||
| doc_message = f"You have a new comment on the Word document with id '{doc_id}', comment id '{comment_id}', drive id '{drive_id}'. Please retrieve the Word document as well as the comments and return it in text format." | ||
| doc_result = await Runner.run(starting_agent=self.agent, input=doc_message, context=context) | ||
| word_content = self._extract_result(doc_result) | ||
|
|
||
| # Process the comment with document context | ||
| comment_text = notification_activity.text or "" | ||
| response_message = f"You have received the following Word document content and comments. Please refer to these when responding to comment '{comment_text}'. {word_content}" | ||
| result = await Runner.run(starting_agent=self.agent, input=response_message, context=context) | ||
| return self._extract_result(result) or "Word notification processed." | ||
|
|
||
| # Generic notification handling | ||
| else: | ||
| notification_message = notification_activity.text or f"Notification received: {notification_type}" | ||
| result = await Runner.run(starting_agent=self.agent, input=notification_message, context=context) | ||
| return self._extract_result(result) or "Notification processed successfully." | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error processing notification: {e}") | ||
| return f"Sorry, I encountered an error processing the notification: {str(e)}" | ||
|
|
||
| def _extract_result(self, result) -> str: | ||
| """Extract text content from agent result""" | ||
| if not result: | ||
| return "" | ||
| if hasattr(result, "final_output") and result.final_output: | ||
| return str(result.final_output) | ||
| elif hasattr(result, "contents"): | ||
| return str(result.contents) | ||
| elif hasattr(result, "text"): | ||
| return str(result.text) | ||
| elif hasattr(result, "content"): | ||
| return str(result.content) | ||
| else: | ||
| return str(result) | ||
|
|
||
| # </NotificationHandling> | ||
|
Comment on lines
+337
to
+406
|
||
|
|
||
| # ========================================================================= | ||
| # CLEANUP | ||
| # ========================================================================= | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,14 @@ | |
| TurnContext, | ||
| TurnState, | ||
| ) | ||
| from microsoft_agents_a365.notifications.agent_notification import ( | ||
| AgentNotification, | ||
| NotificationTypes, | ||
| AgentNotificationActivity, | ||
| ChannelId, | ||
| ) | ||
| from microsoft_agents_a365.notifications import EmailResponse | ||
| from microsoft_agents_a365.observability.core.config import configure | ||
| from microsoft_agents_a365.observability.core.middleware.baggage_builder import BaggageBuilder | ||
| from microsoft_agents_a365.runtime.environment_utils import ( | ||
| get_observability_authentication_scope, | ||
|
|
@@ -94,9 +102,11 @@ def __init__(self, agent_class: type[AgentInterface], *agent_args, **agent_kwarg | |
| authorization=self.authorization, | ||
| **agents_sdk_config, | ||
| ) | ||
| self.agent_notification = AgentNotification(self.agent_app) | ||
|
|
||
| # Setup message handlers | ||
| self._setup_handlers() | ||
| logger.info("✅ Notification handlers registered successfully") | ||
|
||
|
|
||
| def _setup_handlers(self): | ||
| """Setup the Microsoft Agents SDK message handlers""" | ||
|
|
@@ -177,6 +187,62 @@ async def on_message(context: TurnContext, _: TurnState): | |
| logger.error(f"❌ Error processing message: {e}") | ||
| await context.send_activity(error_msg) | ||
|
|
||
| # Configure auth handlers for notifications | ||
| @self.agent_notification.on_agent_notification( | ||
| channel_id=ChannelId(channel="agents", sub_channel="*"), | ||
| auth_handlers=[self.auth_handler_name] if self.auth_handler_name else [], | ||
| ) | ||
| async def on_notification( | ||
| context: TurnContext, | ||
| state: TurnState, | ||
| notification_activity: AgentNotificationActivity, | ||
| ): | ||
| try: | ||
| tenant_id = context.activity.recipient.tenant_id | ||
| agent_id = context.activity.recipient.agentic_app_id | ||
|
|
||
| with BaggageBuilder().tenant_id(tenant_id).agent_id(agent_id).build(): | ||
| # Ensure the agent is available | ||
| if not self.agent_instance: | ||
| logger.error("Agent not available") | ||
| await context.send_activity("❌ Sorry, the agent is not available.") | ||
| return | ||
|
|
||
| # Exchange token for observability if auth handler is configured | ||
| if self.auth_handler_name: | ||
| exaau_token = await self.agent_app.auth.exchange_token( | ||
| context, | ||
| scopes=get_observability_authentication_scope(), | ||
| auth_handler_id=self.auth_handler_name, | ||
| ) | ||
| cache_agentic_token(tenant_id, agent_id, exaau_token.token) | ||
|
|
||
| logger.info(f"📬 Processing notification: {notification_activity.notification_type}") | ||
|
|
||
| if not hasattr(self.agent_instance, "handle_agent_notification_activity"): | ||
| logger.warning("⚠️ Agent doesn't support notifications") | ||
| await context.send_activity( | ||
| "This agent doesn't support notification handling yet." | ||
| ) | ||
| return | ||
|
|
||
| response = await self.agent_instance.handle_agent_notification_activity( | ||
| notification_activity, self.agent_app.auth, self.auth_handler_name, context | ||
| ) | ||
|
|
||
| if notification_activity.notification_type == NotificationTypes.EMAIL_NOTIFICATION: | ||
| response_activity = EmailResponse.create_email_response_activity(response) | ||
| await context.send_activity(response_activity) | ||
| return | ||
|
|
||
| await context.send_activity(response) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"❌ Notification error: {e}") | ||
| await context.send_activity( | ||
| f"Sorry, I encountered an error processing the notification: {str(e)}" | ||
| ) | ||
|
|
||
| async def initialize_agent(self): | ||
| """Initialize the hosted agent instance""" | ||
| if self.agent_instance is None: | ||
|
|
@@ -343,6 +409,11 @@ def create_and_run_host(agent_class: type[AgentInterface], *agent_args, **agent_ | |
| if not check_agent_inheritance(agent_class): | ||
| raise TypeError(f"Agent class {agent_class.__name__} must inherit from AgentInterface") | ||
|
|
||
| configure( | ||
| service_name="OpenAIAgentTracingWithAzureOpenAI", | ||
| service_namespace="OpenAIAgentTesting", | ||
| ) | ||
|
Comment on lines
+412
to
+415
|
||
|
|
||
| # Create the host | ||
| host = GenericAgentHost(agent_class, *agent_args, **agent_kwargs) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The notification_activity parameter lacks type hints. According to the import at line 40, this should be typed as AgentNotificationActivity. This would improve code clarity and enable better IDE support and type checking.