*模型交互系统-自定义模型处理器[提示词模板系统,内置模型配置,agent_settings]
在AgentOS2之中,你可以通过继承BaseProcessor实现自定义模型处理器
相关文件:agent_os2/agent_os/base_model/model_processor.py
example:
#my_processor.py
from agent_os2 import register,BaseProcessor,ModelConfig,ModelError
from typing import Any
@register("my_processor") #对应model_settings下的键名
class MyProcessor(BaseProcessor):
    async def async_generator(
        self,
        messages: Any,
        model_config: ModelConfig,
        proxy: str,
        api_key: str,
        base_url: str
    ) -> AsyncGenerator[Any, None]:
        ...

    def get_usage(self, last_chunk_data: dict[str,Any], messages: Any, final_output: Any, model_config: ModelConfig) -> dict[str,Any]:
        ...

    def process_chunk(self, raw_chunk: Any,model_config:ModelConfig) -> Any:
        ...

    def process_complete(self, chunks: list[Any],model_config:ModelConfig) -> Any:
        ...

    def process_error(self, error: Exception, model_config: ModelConfig) -> dict[str, Any]:
        super().process_error(error,model_config)
!!在业务流程被调用前应该import my_processor来让python程序自动注册(可以交给__init__.py自动注册或者手动import my_processor)!!
流程说明：
BaseProcessor内部的inteact方法定义了如下流程:
通过异步模型开启流式对话携程,流式输出的chunk交给process_chunk处理如果有返回值且is_stream配置为True则返回StreamDataStatus.GENERATING的DataPackage
退出流式输出块后,将累积的raw_chunk交给process_complete处理,如果有返回值则返回StreamDataStatus.COMPLETED的DataPackage
async_generator遇到网络问题等服务端发来的请求错误要使用ModelError包装记录错误信息和错误码,这样BaseAgent会进行自动重试
Processor遇到代码逻辑上错误应当直接抛出而非使用try吞掉错误,以让开发者更轻松地debug
get_usage方法根据给定的参数计算本次模型调用的成本,用户Agent累计资源消耗统计量
为自己的processor添加对应的模型,在执行目录/aos_config/model_settings.json中添加对应的模型配置
example:
{
    "my_processor":{
        "api_key":"sk-",
        "base_url":"",
        "proxy":"",
        "models":[
            "my_model"
        ]
    }
}
测试你的自定义Processor:
from agent_os2 import interact_with_model,ModelConfig,StreamDataStatus,ModelError
async def test():
    async for chunk in interact_with_model("Hello,my processor",model_config=ModelConfig("my_model",is_stream=True)):
        if chunk.get_status() == StreamDataStatus.GENERATING:
            print(chunk.read_data()) # 流失输出信息
        elif chunk.get_status() == StreamDataStatus.COMPLETED:
            print(chunk.read_data()) # 最终累积的输出信息
            print(chunk.get_usage()) # Completed的DataPackage可以获得本次调用的成本
        elif chunk.get_status() == StreamDataStatus.ERROR:
            code = chunk.read_data().get("code")
            message = chunk.read_data().get("message")
            detail = chunk.read_data().get("detail")
            raise ModelError(message,code,detail) # 或者自定义对服务端错误码进行处理
if __name__ == "__main__":
    asyncio.run(test())