we0agent

Guides

Ports

使用 We0AgentPorts 接入事件发布、状态跟踪、上下文管理和会话回滚,并演示每个端口接入后的操作。

portsWe0Agent 的可选增强入口。stream() 本身已经会返回事件;ports 负责把同一轮运行接到外部运行时,例如事件 hub、状态查询、动态上下文、快照和回滚。

from we0agent.domain.models.agent import We0AgentPorts

ports = We0AgentPorts(
    event_publishers=[event_hub],
    status_tracker=status_manager,
    context_manager=context_manager,
    session_revert=session_revert,
)

agent = We0Agent(
    name="assistant",
    model=model,
    system_prompt=prompt,
    tools=tools,
    persistence=persistence,
    ports=ports,
)

端口总览

字段用途常用实现
event_publishers将 engine 产生的事件复制发布到外部订阅者。We0EventHub
status_tracker保存当前 session 的运行状态,便于外部轮询。SessionStatusManager
context_manager构造动态 user meta,并在模型请求前插入 system reminder。业务自定义 We0ContextManager
session_revert接入 snapshot、summary、revert、unrevert 和 cleanup。SessionRevertService

事件发布端口

event_publishers 接收 engine 的所有 We0Event。它适合把事件转发到 WebSocket、消息队列、日志系统或另一个异步消费者。We0EventHub 是内建发布订阅实现。

import asyncio

from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.domain.models.agent import We0AgentPorts

session_id = "ses_001"
event_hub = We0EventHub()
subscription = event_hub.subscribe({"text-delta", "finish"}, session_id=session_id)

agent = We0Agent(
    name="assistant",
    model=model,
    system_prompt=prompt,
    ports=We0AgentPorts(event_publishers=[event_hub]),
)


async def print_events() -> None:
    async for stream_event in subscription:
        event = stream_event.event
        if event["type"] == "text-delta":
            print(event["text"], end="", flush=True)
        elif event["type"] == "finish":
            print(f"\n[finish] {event.get('reason')}")


consumer = asyncio.create_task(print_events())
try:
    await agent.invoke(
        abort=asyncio.Event(),
        session_id=session_id,
        messages=[user_message("用两句话说明当前任务。")],
    )
finally:
    await event_hub.close_subscription(subscription)
    await consumer

这里使用 invoke() 运行 agent,同时由 event_hub 把事件推给订阅者。这个模式适合后端接口:接口层等待最终结果,UI 层通过 hub 订阅实时事件。

状态跟踪端口

status_tracker 保存 session 当前状态。engine 会写入 busyretry 等状态,并在回到 idle 时清理记录。

import asyncio

from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.domain.models.agent import We0AgentPorts

session_id = "ses_001"
status_manager = SessionStatusManager()

agent = We0Agent(
    name="assistant",
    model=model,
    system_prompt=prompt,
    ports=We0AgentPorts(status_tracker=status_manager),
)

run_task = asyncio.create_task(
    agent.invoke(
        abort=asyncio.Event(),
        session_id=session_id,
        messages=[user_message("执行一个会调用工具的任务。")],
    )
)

while not run_task.done():
    status = status_manager.get(session_id)
    print(status["type"])
    await asyncio.sleep(0.2)

result = await run_task
print(result.result)

status_manager.get(session_id) 无记录时返回 {"type": "idle"}status_manager.list() 只列出非 idle 的会话,适合后台管理页展示活跃 session。

上下文管理端口

context_manager 用于把业务上下文放入模型输入。它有两个方法:build_user_meta() 生成用户元数据消息,insert_system_reminder() 在每次模型请求前改写消息视图。

from we0agent.protocols.context import We0ContextManager
from we0agent.utils.time import current_time_ms
from we0agent.domain.models.context import (
    We0ContextUserMetaRequest,
    We0ContextSystemReminderRequest,
)
from we0agent.domain.session.part import TextPart
from we0agent.domain.types.session import We0Messages
from we0agent.domain.session.message import (
    MessageWithParts,
    We0UserMessage,
    We0UserMessageTime,
    We0SystemMessage,
    We0SystemMessageTime,
)


class TenantContextManager(We0ContextManager):
    async def build_user_meta(self, request: We0ContextUserMetaRequest) -> MessageWithParts | None:
        tenant_id = request.metadata.get("tenant_id")
        if tenant_id is None:
            return None
        return MessageWithParts(
            info=We0UserMessage(
                time=We0UserMessageTime(created=current_time_ms()),
                user_meta=True,
                metadata={"tenant_id": tenant_id},
            ),
            parts=[
                TextPart(
                    text=f"当前租户: {tenant_id}",
                    synthetic=True,
                    metadata={"kind": "user_meta"},
                )
            ],
        )

    async def insert_system_reminder(self, request: We0ContextSystemReminderRequest) -> We0Messages:
        reminder = MessageWithParts(
            info=We0SystemMessage(time=We0SystemMessageTime(created=current_time_ms())),
            parts=[
                TextPart(
                    text="<system-reminder>回复前检查租户权限。</system-reminder>",
                    synthetic=True,
                    metadata={"kind": "system_reminder"},
                )
            ],
        )
        return [*request.messages, reminder]


agent = We0Agent(
    name="assistant",
    model=model,
    system_prompt=prompt,
    ports=We0AgentPorts(context_manager=TenantContextManager()),
)

await agent.invoke(
    abort=asyncio.Event(),
    session_id="ses_tenant",
    messages=[user_message("列出当前项目状态。")],
    metadata={"tenant_id": "tenant_a"},
)

build_user_meta() 的返回值会参与持久化和模型输入;insert_system_reminder() 只改写发送给模型的消息视图,适合每 step 动态注入短提醒。

会话回滚端口

session_revert 接入会话回滚能力。它在 agent 运行时提供 snapshot tracker 和 summary service,并在运行后通过 revert_runtime 属性取得回滚执行器。

from pathlib import Path

from we0agent.builtins.service.session_revert import SessionRevertService
from we0agent.domain.models.agent import We0AgentPorts
from we0agent.snapshot.runtime import LocalFileSystemRuntime

session_id = "ses_code"
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")

session_revert = SessionRevertService(
    session_id=session_id,
    file_system=file_system,
    worktree=Path("/abs/project"),
)

agent = We0Agent(
    name="coder",
    model=model,
    system_prompt=prompt,
    tools=tools,
    persistence=persistence,
    ports=We0AgentPorts(session_revert=session_revert),
)

await agent.invoke(
    abort=asyncio.Event(),
    session_id=session_id,
    messages=[user_message("修改 src/app.py,并说明改动。")],
)

运行结束后,通过同一个 session_revert 取得回滚运行时,再按消息或 part 边界执行操作。

from we0agent.domain.session.revert import SessionRevertRequest

messages = await persistence.messages(session_id)
target_message = next(message for message in messages if message.info.role == "user")
target_message_id = target_message.info.id

session = await session_revert.revert_runtime.revert(
    SessionRevertRequest(session_id=session_id, message_id=target_message_id),
    persistence,
)

# 撤销本次回滚
restored = await session_revert.revert_runtime.unrevert(session_id=session_id, persistence=persistence)

# 再次回滚,并确认删除边界之后的消息和 part
session = await session_revert.revert_runtime.revert(
    SessionRevertRequest(session_id=session_id, message_id=target_message_id),
    persistence,
)
await session_revert.revert_runtime.cleanup(session=session, persistence=persistence)

更多回滚形态,包括对话回滚、part 级回滚、代码和对话一起回滚,见 Revert。内建组件完整参数见 Built-ins

On this page