*Flow心智模型-通信协议层flow[自主flow,内置命令,运行时上下文,合并策略,apply_command方法,agent_settings]
核心理念:主要为自主flow服务,多个自主flow通过通信层用事件发布与订阅逻辑在同层协作
要点:
1.通信层Flow的shared_context作为信息交换媒介
2.自主flow之间通过通信层Flow的Shared_Context交换信息
3.接入该通信层的自主flow一定要有一个PublishEventAgent来将自主flow的工作结果通过源上下文(source_context)用EventHandlerAgent上传到通信层Flow的shared_context中
4.EventHandlerAgent同时还会接受source_context中的publish和subscribe特殊键值用于调度下一批次的自主Flow和订阅新的事件
通信层实现example:
# flows/orchestration_flow.py:
import asyncio
from agent_os2 import BaseAgent,Flow
from typing import Any

class EventHandlerAgent(BaseAgent):
    @staticmethod
    def class_name_to_registered_name(class_name:str)->str:
        name_list = []
        i=0
        p = 0
        while i<len(class_name):
            if class_name[i].isupper():
                if p != i:
                    name_list.append(class_name[p:i])
                p=i
            i+=1
        if class_name[p:] != "Agent":
            name_list.append(class_name[p:])
        return "_".join([name.lower() for name in name_list])
                
    def setup(self):
        self.prompts = "" #不需要模型
        self.user_info = "开始处理订阅和发布的事件"
    async def post_process(self,source_context:Any,model_result:Any|None,shared_context:dict[str,Any],extra_contexts:dict[str,Any],observer:list[tuple[asyncio.Task[tuple[Any,bool]],"BaseAgent"]],batch_id:int|None=None)->tuple[Any,dict[str,dict|list]]:
        command = {}
        if "events" not in shared_context:
            shared_context["events"] = {}
        if not isinstance(source_context,dict):
            raise ValueError("EventHandler必须接受的上游输入必须是一个字典")
        if "subscribe" in source_context and isinstance(source_context["subscribe"],dict):
            for class_name,events_list in source_context["subscribe"].items():
                if not isinstance(events_list,list):
                    raise ValueError("订阅事件必须是一个列表")
                for event_name in events_list:
                    self.subscribe(self.class_name_to_registered_name(class_name),event_name,shared_context["events"])
        if "unsubscribe" in source_context and isinstance(source_context["unsubscribe"],dict):
            for class_name,events_list in source_context["unsubscribe"].items():
                if not isinstance(events_list,list):
                    raise ValueError("取消订阅事件必须是一个列表")
                for event_name in events_list:
                    self.unsubscribe(self.class_name_to_registered_name(class_name),event_name,shared_context["events"])
        if "publish" in source_context and isinstance(source_context["publish"],list):
            parallel_actions = self.publish(source_context.pop("publish"),shared_context["events"])
            command["actions"] = [{"add_branch":[
                parallel_actions,
                {"name":"event_handler"}
            ]}]
        command["memory_append"] = source_context.copy()
        return source_context,command
    def subscribe(self,agent_name:str,event_name:str,events_context:dict):
        if event_name not in events_context:
            events_context[event_name] = []
        if agent_name not in events_context[event_name]:
            events_context[event_name].append(agent_name)
    def unsubscribe(self,agent_name:str,event_name:str,event_context:dict):
        if event_name in event_context and agent_name in event_context[event_name]:
            event_context[event_name].remove(agent_name)
    def publish(self,published_events:list,events_context:dict)->list[dict] :
        parallel_actions = []
        for published_event in published_events:
            for agent_name in events_context.get(published_event,[]):
                parallel_actions.append({"name":agent_name})
        return parallel_actions
class OrchestratorFlow(Flow):
    def __init__(self,name:str="orchestrator",parent:BaseAgent|None=None,*,agents_key:str|None=None,guider_agent_name:str=None,**settings):
        super().__init__(name,parent,agents_key=agents_key,expected_shared_context_keys={"events"},**settings)
        self.add_custom_agent_class("event_handler",EventHandlerAgent)
        self.add_agent(guider_agent_name,alias="guider",user_input_prompt="请描述你的问题:")
        self.add_agent("event_handler")
        self.add_edge("guider","event_handler")
实现通信层后要给这些会用到的自主flow写一个统一的agents_key:
agent_settings:
{
    "automatic_flows":["automation/flows"]
}
