Guides
Ports
使用 We0AgentPorts 接入事件发布、状态跟踪、上下文管理和会话回滚,并演示每个端口接入后的操作。
ports 是 We0Agent 的可选增强入口。stream() 本身已经会返回事件;ports 负责把同一轮运行接到外部运行时,例如事件 hub、状态查询、动态上下文、快照和回滚。
from we0agent.domain.models.agent import We0AgentPorts
ports = We0AgentPorts(
event_publishers=[event_hub],
status_tracker=status_manager,
context_manager=context_manager,
session_revert=session_revert,
)
agent = We0Agent(
name="assistant",
model=model,
system_prompt=prompt,
tools=tools,
persistence=persistence,
ports=ports,
)端口总览
| 字段 | 用途 | 常用实现 |
|---|---|---|
event_publishers | 将 engine 产生的事件复制发布到外部订阅者。 | We0EventHub |
status_tracker | 保存当前 session 的运行状态,便于外部轮询。 | SessionStatusManager |
context_manager | 构造动态 user meta,并在模型请求前插入 system reminder。 | 业务自定义 We0ContextManager |
session_revert | 接入 snapshot、summary、revert、unrevert 和 cleanup。 | SessionRevertService |
事件发布端口
event_publishers 接收 engine 的所有 We0Event。它适合把事件转发到 WebSocket、消息队列、日志系统或另一个异步消费者。We0EventHub 是内建发布订阅实现。
import asyncio
from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.domain.models.agent import We0AgentPorts
session_id = "ses_001"
event_hub = We0EventHub()
subscription = event_hub.subscribe({"text-delta", "finish"}, session_id=session_id)
agent = We0Agent(
name="assistant",
model=model,
system_prompt=prompt,
ports=We0AgentPorts(event_publishers=[event_hub]),
)
async def print_events() -> None:
async for stream_event in subscription:
event = stream_event.event
if event["type"] == "text-delta":
print(event["text"], end="", flush=True)
elif event["type"] == "finish":
print(f"\n[finish] {event.get('reason')}")
consumer = asyncio.create_task(print_events())
try:
await agent.invoke(
abort=asyncio.Event(),
session_id=session_id,
messages=[user_message("用两句话说明当前任务。")],
)
finally:
await event_hub.close_subscription(subscription)
await consumer这里使用 invoke() 运行 agent,同时由 event_hub 把事件推给订阅者。这个模式适合后端接口:接口层等待最终结果,UI 层通过 hub 订阅实时事件。
状态跟踪端口
status_tracker 保存 session 当前状态。engine 会写入 busy、retry 等状态,并在回到 idle 时清理记录。
import asyncio
from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.domain.models.agent import We0AgentPorts
session_id = "ses_001"
status_manager = SessionStatusManager()
agent = We0Agent(
name="assistant",
model=model,
system_prompt=prompt,
ports=We0AgentPorts(status_tracker=status_manager),
)
run_task = asyncio.create_task(
agent.invoke(
abort=asyncio.Event(),
session_id=session_id,
messages=[user_message("执行一个会调用工具的任务。")],
)
)
while not run_task.done():
status = status_manager.get(session_id)
print(status["type"])
await asyncio.sleep(0.2)
result = await run_task
print(result.result)status_manager.get(session_id) 无记录时返回 {"type": "idle"}。status_manager.list() 只列出非 idle 的会话,适合后台管理页展示活跃 session。
上下文管理端口
context_manager 用于把业务上下文放入模型输入。它有两个方法:build_user_meta() 生成用户元数据消息,insert_system_reminder() 在每次模型请求前改写消息视图。
from we0agent.protocols.context import We0ContextManager
from we0agent.utils.time import current_time_ms
from we0agent.domain.models.context import (
We0ContextUserMetaRequest,
We0ContextSystemReminderRequest,
)
from we0agent.domain.session.part import TextPart
from we0agent.domain.types.session import We0Messages
from we0agent.domain.session.message import (
MessageWithParts,
We0UserMessage,
We0UserMessageTime,
We0SystemMessage,
We0SystemMessageTime,
)
class TenantContextManager(We0ContextManager):
async def build_user_meta(self, request: We0ContextUserMetaRequest) -> MessageWithParts | None:
tenant_id = request.metadata.get("tenant_id")
if tenant_id is None:
return None
return MessageWithParts(
info=We0UserMessage(
time=We0UserMessageTime(created=current_time_ms()),
user_meta=True,
metadata={"tenant_id": tenant_id},
),
parts=[
TextPart(
text=f"当前租户: {tenant_id}",
synthetic=True,
metadata={"kind": "user_meta"},
)
],
)
async def insert_system_reminder(self, request: We0ContextSystemReminderRequest) -> We0Messages:
reminder = MessageWithParts(
info=We0SystemMessage(time=We0SystemMessageTime(created=current_time_ms())),
parts=[
TextPart(
text="<system-reminder>回复前检查租户权限。</system-reminder>",
synthetic=True,
metadata={"kind": "system_reminder"},
)
],
)
return [*request.messages, reminder]
agent = We0Agent(
name="assistant",
model=model,
system_prompt=prompt,
ports=We0AgentPorts(context_manager=TenantContextManager()),
)
await agent.invoke(
abort=asyncio.Event(),
session_id="ses_tenant",
messages=[user_message("列出当前项目状态。")],
metadata={"tenant_id": "tenant_a"},
)build_user_meta() 的返回值会参与持久化和模型输入;insert_system_reminder() 只改写发送给模型的消息视图,适合每 step 动态注入短提醒。
会话回滚端口
session_revert 接入会话回滚能力。它在 agent 运行时提供 snapshot tracker 和 summary service,并在运行后通过 revert_runtime 属性取得回滚执行器。
from pathlib import Path
from we0agent.builtins.service.session_revert import SessionRevertService
from we0agent.domain.models.agent import We0AgentPorts
from we0agent.snapshot.runtime import LocalFileSystemRuntime
session_id = "ses_code"
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
session_revert = SessionRevertService(
session_id=session_id,
file_system=file_system,
worktree=Path("/abs/project"),
)
agent = We0Agent(
name="coder",
model=model,
system_prompt=prompt,
tools=tools,
persistence=persistence,
ports=We0AgentPorts(session_revert=session_revert),
)
await agent.invoke(
abort=asyncio.Event(),
session_id=session_id,
messages=[user_message("修改 src/app.py,并说明改动。")],
)运行结束后,通过同一个 session_revert 取得回滚运行时,再按消息或 part 边界执行操作。
from we0agent.domain.session.revert import SessionRevertRequest
messages = await persistence.messages(session_id)
target_message = next(message for message in messages if message.info.role == "user")
target_message_id = target_message.info.id
session = await session_revert.revert_runtime.revert(
SessionRevertRequest(session_id=session_id, message_id=target_message_id),
persistence,
)
# 撤销本次回滚
restored = await session_revert.revert_runtime.unrevert(session_id=session_id, persistence=persistence)
# 再次回滚,并确认删除边界之后的消息和 part
session = await session_revert.revert_runtime.revert(
SessionRevertRequest(session_id=session_id, message_id=target_message_id),
persistence,
)
await session_revert.revert_runtime.cleanup(session=session, persistence=persistence)更多回滚形态,包括对话回滚、part 级回滚、代码和对话一起回滚,见 Revert。内建组件完整参数见 Built-ins。