第27章:多智能体协作与通信
"单丝不成线,独木不成林。在AI的世界里,智能体的真正力量在于协作。"
🎯 本章学习目标
📚 知识目标
- 理解多智能体系统架构:掌握分布式AI系统的设计原理
- 掌握协作机制:学习任务分解、协调算法、冲突解决
- 熟悉通信协议:理解Agent间的消息传递和数据交换
- 了解性能优化:学习多Agent系统的监控和调优
🛠️ 技能目标
- 设计多Agent架构:能够设计复杂的多智能体协作系统
- 实现通信机制:开发标准化的Agent通信接口
- 开发协调算法:实现任务分配和冲突解决机制
- 构建监控系统:建立多Agent系统的性能监控
🌟 素养目标
- 分布式思维:培养分布式系统设计的全局观
- 协作理念:理解团队协作在AI系统中的重要性
- 系统优化:具备复杂系统性能调优的能力
🏢 欢迎来到智能体协作中心
经过前面章节的学习,我们已经在智能体总部成功设计了单个智能体系统。现在,让我们将视野扩展到更广阔的天地——智能体协作中心!
🌆 协作中心全景图
想象一下,你正站在一座现代化智能大厦的顶层,俯瞰整个智能体协作中心:
🎭 从独角戏到交响乐
如果说单个智能体是一位独奏演员,那么多智能体系统就是一支完整的交响乐团:
- 🎺 各司其职:每个Agent都有自己的专长领域
- 🎼 协调统一:通过指挥家(协调器)统一行动
- 🎵 和谐共奏:不同Agent的输出组合成美妙乐章
- 📻 实时沟通:演奏者之间需要默契的交流
27.1 多智能体系统概述
🧭 什么是多智能体系统
**多智能体系统(Multi-Agent System, MAS)**是由多个自主智能体组成的分布式系统,这些智能体通过协作完成单个智能体无法胜任的复杂任务。
🏗️ 系统架构模式
让我们通过一个企业组织架构来理解多Agent系统的设计模式:
# 多智能体系统基础框架import asyncioimport jsonimport uuidfrom datetime import datetimefrom typing import Dict, List, Any, Optionalfrom abc import ABC, abstractmethodfrom enum import Enumclass AgentRole(Enum):"""智能体角色枚举"""COORDINATOR = "coordinator" # 协调者WORKER = "worker" # 工作者MONITOR = "monitor" # 监控者SPECIALIST = "specialist" # 专家class MessageType(Enum):"""消息类型枚举"""TASK_ASSIGNMENT = "task_assignment"STATUS_UPDATE = "status_update"RESULT_REPORT = "result_report"COORDINATION_REQUEST = "coordination_request"ERROR_ALERT = "error_alert"class Message:"""智能体间的消息格式"""def __init__(self,sender_id: str,receiver_id: str,message_type: MessageType,content: Any,priority: int = 1):self.id = str(uuid.uuid4())self.sender_id = sender_idself.receiver_id = receiver_idself.message_type = message_typeself.content = contentself.priority = priorityself.timestamp = datetime.now()def to_dict(self) -> Dict:"""转换为字典格式"""return {"id": self.id,"sender_id": self.sender_id,"receiver_id": self.receiver_id,"message_type": self.message_type.value,"content": self.content,"priority": self.priority,"timestamp": self.timestamp.isoformat()}print("✅ 多智能体系统基础框架定义完成")
🎯 多Agent系统的优势
通过一个新闻编辑室的例子来展示多Agent协作的威力:
27.2 智能体间通信机制
📡 通信架构设计
智能体之间的通信就像企业内部的沟通体系,需要标准化的协议和高效的传输机制:
class CommunicationManager:"""智能体通信管理器"""def __init__(self):self.agents: Dict[str, 'BaseAgent'] = {}self.message_queue: List[Message] = []self.routing_table: Dict[str, str] = {}self.communication_log: List[Dict] = []def register_agent(self, agent: 'BaseAgent'):"""注册智能体到通信网络"""self.agents[agent.agent_id] = agentprint(f"📡 Agent {agent.name} 已注册到通信网络")def send_message(self, message: Message) -> bool:"""发送消息"""try:# 消息路由if message.receiver_id not in self.agents:print(f"❌ 目标Agent {message.receiver_id} 不存在")return False# 添加到消息队列self.message_queue.append(message)# 记录通信日志log_entry = {"timestamp": datetime.now().isoformat(),"sender": message.sender_id,"receiver": message.receiver_id,"type": message.message_type.value,"status": "sent"}self.communication_log.append(log_entry)print(f"📤 消息已发送: {message.sender_id} → {message.receiver_id}")return Trueexcept Exception as e:print(f"❌ 发送 消息失败: {str(e)}")return Falsedef deliver_messages(self):"""分发消息给目标Agent"""while self.message_queue:message = self.message_queue.pop(0)target_agent = self.agents.get(message.receiver_id)if target_agent:target_agent.receive_message(message)print(f"📥 消息已送达: {message.receiver_id}")def broadcast_message(self, sender_id: str, content: Any,message_type: MessageType = MessageType.STATUS_UPDATE):"""广播消息给所有Agent"""for agent_id in self.agents:if agent_id != sender_id: # 不发送给自己message = Message(sender_id, agent_id, message_type, content)self.send_message(message)print(f"📢 {sender_id} 已广播消息给所有Agent")# 创建全局通信管理器comm_manager = CommunicationManager()print("✅ 通信管理器初始化完成")
🔄 异步通信模式
为了提高系统效率,我们实现异步通信机制:
class AsyncCommunicationManager(CommunicationManager):"""异步通信管理器"""def __init__(self):super().__init__()self.message_handlers: Dict[str, asyncio.Queue] = {}async def async_send_message(self, message: Message) -> bool:"""异步发送消息"""try:if message.receiver_id not in self.agents:return False# 获取或创建接收者的消息队列if message.receiver_id not in self.message_handlers:self.message_handlers[message.receiver_id] = asyncio.Queue()# 将消息放入队列await self.message_handlers[message.receiver_id].put(message)# 记录日志log_entry = {"timestamp": datetime.now().isoformat(),"sender": message.sender_id,"receiver": message.receiver_id,"type": message.message_type.value,"status": "queued"}self.communication_log.append(log_entry)return Trueexcept Exception as e:print(f"❌ 异步发送失败: {str(e)}")return Falseasync def start_message_processing(self):"""启动消息处理循环"""while True:# 处理所有Agent的消息队列for agent_id, queue in self.message_handlers.items():if not queue.empty():message = await queue.get()target_agent = self.agents.get(agent_id)if target_agent:await target_agent.async_receive_message(message)await asyncio.sleep(0.1) # 避免CPU占用过高# 创建异步通信管理器async_comm_manager = AsyncCommunicationManager()print("✅ 异步通信管理器初始化完成")
27.3 任务分解与协调
🧩 智能任务分解
在多Agent系统中,复杂任务需要被智能地分解为多个子任务,然后分配给不同的专业Agent:
class TaskDecomposer:"""任务分解器"""def __init__(self):self.task_templates = {"news_writing": {"subtasks": [{"name": "research", "agent_type": "researcher", "priority": 1},{"name": "writing", "agent_type": "writer", "priority": 2},{"name": "editing", "agent_type": "editor", "priority": 3},{"name": "publishing", "agent_type": "publisher", "priority": 4}],"dependencies": {"writing": ["research"],"editing": ["writing"],"publishing": ["editing"]}},"data_analysis": {"subtasks": [{"name": "data_collection", "agent_type": "collector", "priority": 1},{"name": "data_cleaning", "agent_type": "cleaner", "priority": 2},{"name": "analysis", "agent_type": "analyst", "priority": 3},{"name": "visualization", "agent_type": "visualizer", "priority": 3},{"name": "reporting", "agent_type": "reporter", "priority": 4}],"dependencies": {"data_cleaning": ["data_collection"],"analysis": ["data_cleaning"],"visualization": ["data_cleaning"],"reporting": ["analysis", "visualization"]}}}def decompose_task(self, task_type: str, task_details: Dict) -> List[Dict]:"""分解任务为子任务"""if task_type not in self.task_templates:raise ValueError(f"未知任务类型: {task_type}")template = self.task_templates[task_type]subtasks = []for subtask_info in template["subtasks"]:subtask = {"id": str(uuid.uuid4()),"name": subtask_info["name"],"agent_type": subtask_info["agent_type"],"priority": subtask_info["priority"],"status": "pending","details": task_details,"dependencies": template["dependencies"].get(subtask_info["name"], []),"created_at": datetime.now().isoformat()}subtasks.append(subtask)print(f"🧩 任务已分解为 {len(subtasks)} 个子任务")return subtasksdef check_dependencies(self, subtasks: List[Dict], target_task: str) -> bool:"""检查任务依赖是否满足"""target_info = next((task for task in subtasks if task["name"] == target_task), None)if not target_info:return Falsedependencies = target_info.get("dependencies", [])for dep in dependencies:dep_task = next((task for task in subtasks if task["name"] == dep), None)if not dep_task or dep_task["status"] != "completed":return Falsereturn True# 创建任务分解器task_decomposer = TaskDecomposer()# 示例:分解新闻写作任务task_details = {"topic": "人工智能在教育中的应用","target_audience": "技术专业人士","word_count": 2000,"deadline": "2025-02-05"}subtasks = task_decomposer.decompose_task("news_writing", task_details)for task in subtasks:print(f"📋 子任务: {task['name']} (优先级: {task['priority']})")