API reference
Built-ins
内建服务参考:用公开类装配 event hub、状态、快照、回滚等运行时能力,并组合进 We0AgentPorts。
We0Agent 本身只关心「一次 turn 怎么跑」,而事件分发、状态跟踪、快照、会话回滚与变更摘要这些进程级运行时能力都由外部端口注入(参见 We0Agent 的 ports 参数)。SDK 在 we0agent.builtins 下提供这些端口的开箱即用实现。
内建组件总览
| 组件 | import | 形态 | 对应端口 / 用途 |
|---|---|---|---|
We0EventHub | from we0agent.builtins.manager.we0_event_hub import We0EventHub | 进程级发布订阅 hub | 实现 We0EventPublisher,可放进 ports.event_publishers |
SessionStatusManager | from we0agent.builtins.manager.session_status_manager import SessionStatusManager | 按 session 的状态表 | 实现 We0StatusTracker,可作为 ports.status_tracker |
AbortEventManager | from we0agent.builtins.manager.abort_event_manager import AbortEventManager | 活跃 session 的中断事件注册表 | 配合 runner,供外部按 session_id 触发中断 |
SessionRunnerManager | from we0agent.builtins.manager.session_runner_manager import SessionRunnerManager | 按 session 串行化的运行器 | 编排单会话 run / cancel / busy 判定 |
SessionSnapshotService | from we0agent.builtins.service.session_snapshot import SessionSnapshotService | 按 session 管理 SnapshotState | 供 SessionRevertService 提供快照能力,参见 Snapshots |
BuiltinSessionSummaryService | from we0agent.builtins.service.session_summary import BuiltinSessionSummaryService | 变更摘要服务 | 供 SessionRevertService 写入 diff summary |
SessionRevertService | from we0agent.builtins.service.session_revert import SessionRevertService | 会话回滚增强入口 | 实现 SessionRevertCapability,可作为 ports.session_revert |
BuiltinSessionRevertRuntime | from we0agent.builtins.service.session_revert import BuiltinSessionRevertRuntime | 会话回滚执行器 | 实现 SessionRevertRuntime,执行 revert、unrevert 和 cleanup |
区分:
manager下的组件偏向进程级运行时基础设施(事件、状态、中断、运行器),service下的组件偏向与会话数据交互的高层能力(快照、摘要、回滚)。两者都是普通类,构造即用,状态保存在进程内存中。
装配进 We0AgentPorts
最快的路径是直接构造 SessionRevertService,再放进 We0AgentPorts.session_revert。服务内部完成快照服务创建、会话状态绑定、摘要服务和回滚执行器组合。
from we0agent.agent.agent import We0Agent
from we0agent.domain.models.agent import We0AgentPorts
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.service.session_revert import SessionRevertService
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
ports = We0AgentPorts(
session_revert=SessionRevertService(
session_id="demo-session",
file_system=file_system,
worktree="/abs/project",
)
)
agent = We0Agent(
name="demo",
model=model, # 见 We0Model
system_prompt=prompt, # 见 Prompts
ports=ports,
)SessionRevertService 的低成本参数:
| 参数 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
session_id | str | 是 | — | 要绑定快照状态的会话标识。 |
file_system | FileSystemRuntime | 是 | — | 当前文件系统和 Git 执行环境;本地实现用 LocalFileSystemRuntime(root=...) 显式指定运行时根目录。 |
worktree | str | Path | 否 | "." | git --work-tree 根目录,会被 resolve() 成绝对路径。 |
event_publisher | We0EventPublisher | None | 否 | None | 回滚时发布 session.diff 事件;省略时使用内部 We0EventHub。 |
runner_manager | SessionRunnerManager | None | 否 | None | 回滚前执行 busy 判定;省略时使用空 observer 的内建 runner manager。 |
若还需要事件与状态能力,按下文手动补齐 event_publishers 与 status_tracker 即可。
We0EventHub
进程级发布订阅 hub,是 We0EventPublisher 抽象基类(publish(*, session_id, event))的实现,可直接放进 ports.event_publishers。engine 产生的每个 We0Event 都会被包装成 We0StreamEvent 后分发给所有匹配的订阅者。
from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.domain.models.agent import We0AgentPorts
hub = We0EventHub()
ports = We0AgentPorts(event_publishers=[hub])
# 订阅指定类型事件(仅本 session)
subscription = hub.subscribe({"text-delta", "finish"}, session_id="demo-session")
async for stream_event in subscription: # We0StreamEvent,frozen
print(stream_event.type, stream_event.event) # type 与原始 We0Event订阅方式分两类:
| 方法 | 返回 | 说明 |
|---|---|---|
subscribe(event_types, *, session_id=None) | We0EventSubscription | 订阅指定类型集合,可 async for 迭代;session_id 为空表示全部会话。 |
subscribe_all(*, session_id=None) | We0EventSubscription | 订阅全部类型。 |
subscribe_callback(config) | Callable[[], None] | 注册回调(同步或异步),返回退订函数。 |
subscribe_all_callback(callback, *, session_id=None) | Callable[[], None] | 全类型回调订阅的便捷封装。 |
close_subscription(subscription) | None(async) | 关闭并移除一个迭代式订阅。 |
reset() | None | 关闭并清空全部订阅。 |
We0StreamEvent 是 frozen 模型,字段为 id(hub 内事件标识)、session_id、type(We0EventType)、event(原始 We0Event)。订阅只接收注册之后发布的事件,不会回放历史;回调订阅中抛出的异常会被 hub 捕获并记录日志,不影响其他订阅者。事件结构参见 Events 与 Streaming。
SessionStatusManager
按 session_id 保存当前运行状态,是 We0StatusTracker 抽象基类(set(session_id, status))的实现。
from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.domain.models.agent import We0AgentPorts
status_manager = SessionStatusManager()
ports = We0AgentPorts(status_tracker=status_manager)| 方法 | 返回 | 说明 |
|---|---|---|
get(session_id) | We0Status | 取某会话状态;未记录时返回 We0Statuses.idle()。 |
list() | dict[str, We0Status] | 返回全部非 idle 会话的状态副本。 |
set(session_id, status) | None | 写入状态;传入 idle 会删除该条记录而非保存。 |
reset() | None | 清空全部状态。 |
易错点:
set收到type == "idle"的状态时不会保留记录,而是直接移除该会话条目,因此list()只列出处于 busy 等非空闲态的会话。所有读取(get/list)返回的都是We0Statuses.clone(...)的副本,外部修改不会影响内部状态。
AbortEventManager
活跃 managed-session 的中断事件注册表。它本身不实现 We0AgentPorts 端口,而是配合 SessionRunnerManager:运行器启动时把该 turn 的 asyncio.Event 注册进来,外部服务即可按 session_id 取到事件并 set() 触发中断(参见 Abort)。
from we0agent.builtins.manager.abort_event_manager import AbortEventManager
abort_events = AbortEventManager()
# 外部按 session_id 触发中断
event = abort_events.get("demo-session")
if event is not None:
event.set()| 方法 | 返回 | 说明 |
|---|---|---|
register(session_id, event) | None | 登记某会话的中断事件(由 runner observer 调用)。 |
unregister(session_id) | None | 会话进入 idle 时注销。 |
get(session_id) | asyncio.Event | None | 取某会话当前的中断事件;无活跃 run 时为 None。 |
reset() | None | 清空全部登记。 |
SessionRunnerManager
按 session_id 串行化会话运行的运行器集合。它不直接进 We0AgentPorts,而是会话回滚执行器的依赖:BuiltinSessionRevertRuntime 通过它的 assert_not_busy 判断会话是否正在运行,从而避免在 run 进行中执行回滚。
需要连接中断和状态时,直接构造 observer 列表:
from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.builtins.manager.abort_event_manager import AbortEventManager
from we0agent.builtins.manager.session_runner_manager import SessionRunnerManager
from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.builtins.manager.session_runner_lifecycle import SessionRunnerLifecycle
from we0agent.builtins.manager.session_runner_observers import (
AbortEventRunnerObserver,
SessionStatusRunnerObserver,
)
event_hub = We0EventHub()
status_manager = SessionStatusManager()
abort_events = AbortEventManager()
runner_manager = SessionRunnerManager(
lifecycle=SessionRunnerLifecycle(
observers=[
AbortEventRunnerObserver(abort_events),
SessionStatusRunnerObserver(
status_manager=status_manager,
event_publisher=event_hub,
),
]
)
)AbortEventRunnerObserver 会在 run 启动和结束时登记或注销中断事件。SessionStatusRunnerObserver 会在 run 启动时写入 busy,结束时写回 idle,并可通过事件发布器广播状态变化。
SessionRunnerManager 自身的关键方法:
| 方法 | 返回 | 说明 |
|---|---|---|
is_busy(session_id) | bool(async) | 该会话是否有正在运行的 runner。 |
assert_not_busy(session_id) | None(async) | 忙时抛 We0BusyError。 |
cancel(session_id) | None(async) | 触发中断并在宽限期后强制取消。 |
ensure_running(session_id, abort_event, on_interrupt, work) | ResultT(async) | 启动或复用运行器执行 work,已运行则等待同一结果。 |
约束:同一
session_id同时只跑一个 run。ensure_running在已有运行时不会重复启动,而是await同一个 result future;cancel先set中断事件,超过CANCEL_GRACE_SECONDS(1.0 秒)仍未收口才task.cancel()。We0BusyError位于we0agent.builtins.manager.session_runner_manager,携带session_id字段。
SessionSnapshotService
按 session_id 管理 SnapshotState 的快照服务,是 SessionRevertService 的内部依赖。它包装底层无状态的 SnapshotService,把「每次都要传 state」收敛成「按会话绑定一次」。完整的快照模型与底层服务参见 Snapshots。
from we0agent.snapshot.service import SnapshotService
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.service.session_snapshot import SessionSnapshotService
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
session_snapshot = SessionSnapshotService(snapshot_service=SnapshotService(file_system))
# 绑定会话状态;SessionRevertService 会按 session_id 取得 SnapshotTracker
session_snapshot.bind_state(
session_id="demo-session",
state=file_system.snapshot_state(worktree="/abs/project"),
)| 方法 | 返回 | 说明 |
|---|---|---|
bind_state(*, session_id, state) | None | 登记会话与 state 的映射(深拷贝保存)。 |
bind(*, session_id, state) | SnapshotInstance | 登记 state 并返回绑定后的快照实例(即 SnapshotTracker)。 |
tracker(session_id) | SnapshotInstance | None | 取已绑定会话的快照实例;未绑定时返回 None。 |
get_state(session_id) / require_state(session_id) | SnapshotState | None / SnapshotState | 取状态副本;require_* 缺失时抛 ValueError。 |
track(session_id) | str | None(async) | 记录当前快照 hash;未绑定 state 时返回 None。 |
restore / revert / diff_snapshot / diff_full | 见 Snapshots | 按会话转调底层服务的对应操作。 |
reset() | None | 清空全部会话状态映射。 |
易错点:
bind_state对同一session_id重复绑定到不同 state 会抛ValueError,绑定到相同 state 则幂等。track在会话未bind_state时静默返回None(即快照能力对该会话关闭),不会报错。
BuiltinSessionSummaryService
变更摘要服务,是 SessionSummaryService 协议的实现,通常作为 SessionRevertService 的内部依赖。它在一轮用户消息及其后续 assistant 消息之间,借助 SnapshotTracker 计算逐文件 diff,并把结果写回该用户消息的 summary.diffs。
from we0agent.builtins.service.session_summary import BuiltinSessionSummaryService
summary_service = BuiltinSessionSummaryService()| 方法 | 签名 | 说明 |
|---|---|---|
summarize | (*, session_id, message_id, context) -> None | 计算指定用户消息这一轮的 diff,并写回其 summary。context 为 SessionSummaryContext,包含 persistence 与可选 snapshot_tracker。 |
diff | (*, session_id, persistence, message_id=None) -> list[FileDiff] | 读取某用户消息已存的摘要 diff;message_id 为空时返回 []。 |
compute_diff | (*, messages, snapshot) -> list[FileDiff] | 在一段消息范围内,取首个 step 起始快照与末个 step 结束快照之间的 diff_full。 |
约束与默认行为:
summarize总会先把会话级 summary 重置为additions=0, deletions=0, files=0;随后若context.snapshot_tracker is None直接返回(即未启用快照时只清零、不计算)。- 当 summary 由
We0TurnRunner触发时,runner 会通过ports.event_publishers发出一条空session.diff事件作为初始化通知;summary 服务本身不持有事件发布端口。 compute_diff依赖消息 parts 中的StepStartPart.snapshot与StepFinishPart.snapshot定位起止快照;任一缺失则返回[]。FileDiff字段(file/patch/additions/deletions/status)参见 Snapshots。
SessionRevertService
会话回滚增强入口,是 SessionRevertCapability 抽象基类的内建实现,可作为 ports.session_revert。它采用 capability facade 形态,对 engine 暴露快照、摘要和回滚执行三个部件。回滚的整体语义参见 Revert。
低成本入口只需要会话、文件系统和工作区参数:
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.service.session_revert import SessionRevertService
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
session_revert = SessionRevertService(
session_id="demo-session",
file_system=file_system,
worktree="/abs/project",
)共享已有 manager 或事件发布器时直接传入可选参数:
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.builtins.manager.session_runner_manager import SessionRunnerManager
from we0agent.builtins.manager.session_runner_lifecycle import SessionRunnerLifecycle
from we0agent.builtins.service.session_revert import SessionRevertService
event_hub = We0EventHub()
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
runner_manager = SessionRunnerManager(lifecycle=SessionRunnerLifecycle(observers=[]))
session_revert = SessionRevertService(
session_id="demo-session",
file_system=file_system,
worktree="/abs/project",
event_publisher=event_hub,
runner_manager=runner_manager,
)| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
session_id | str | 是 | 当前会话标识。 |
file_system | FileSystemRuntime | 是 | 当前文件系统与 Git 执行环境。 |
worktree | str | Path | 否 | 当前项目工作区。 |
event_publisher | We0EventPublisher | None | 否 | 发出回滚相关的 session.diff 事件。 |
runner_manager | SessionRunnerManager | None | 否 | 回滚前用 assert_not_busy 确保会话不在运行中。 |
SessionRevertService 的方法:
| 方法 | 返回 | 说明 |
|---|---|---|
snapshot_tracker(session_id) | SnapshotTracker | None | 取该会话绑定的快照端口;未绑定时返回 None。 |
summary_service | SessionSummaryService | 取摘要策略。 |
revert_runtime | SessionRevertRuntime | 取回滚执行器。 |
BuiltinSessionRevertRuntime
BuiltinSessionRevertRuntime 实现 SessionRevertRuntime,负责真正执行 revert、unrevert 和 cleanup。它依赖 SessionRunnerManager 做 busy 判定,依赖 SessionSnapshotService 恢复文件状态,依赖 SessionSummaryService 计算回滚后的 diff,并通过 We0EventPublisher 发布 session.diff 事件。
| 方法 | 返回 | 说明 |
|---|---|---|
revert(request, persistence) | We0SessionInfo | 按 SessionRevertRequest 回滚会话与文件,写入 revert 状态与摘要。 |
unrevert(*, session_id, persistence) | We0SessionInfo | 撤销回滚,恢复快照并清除 revert 状态。 |
cleanup(*, session, persistence) | None | 落实回滚:物理删除回滚边界之后的消息/part 并清除 revert 状态。 |
assert_not_busy(session_id) | None | 转调 runner manager 的忙判定。 |
SessionRevertRequest(from we0agent.domain.session.revert import SessionRevertRequest)字段:
| 字段 | 类型 | 必填 | 默认 | 说明 |
|---|---|---|---|---|
session_id | str | 是 | — | 目标会话。 |
message_id | str | 是 | — | 回滚边界所在消息。 |
part_id | str | None | 否 | None | 精确到 part 的回滚边界;为空时回滚到消息粒度。 |
易错点:
revert与cleanup是两步。revert只标记回滚边界并还原文件、记录session.revert,会话消息仍保留,可被unrevert整体撤销;只有cleanup才会按已记录的边界物理删除后续消息与 part。会话正在运行(busy)时回滚会抛We0BusyError。
端到端装配
把上述对象组合成一套完整的运行时端口,可以采用下面的装配方式:
from we0agent.domain.models.agent import We0AgentPorts
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.builtins.manager.abort_event_manager import AbortEventManager
from we0agent.builtins.manager.session_runner_manager import SessionRunnerManager
from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.builtins.manager.session_runner_lifecycle import SessionRunnerLifecycle
from we0agent.builtins.manager.session_runner_observers import (
AbortEventRunnerObserver,
SessionStatusRunnerObserver,
)
from we0agent.builtins.service.session_revert import SessionRevertService
# 1) 进程级单例:先各构造一次,再共享引用
event_hub = We0EventHub()
status_manager = SessionStatusManager()
abort_events = AbortEventManager()
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
# 2) 运行器:把中断与状态 observer 接到同一个 hub 上
runner_manager = SessionRunnerManager(
lifecycle=SessionRunnerLifecycle(
observers=[
AbortEventRunnerObserver(abort_events),
SessionStatusRunnerObserver(
status_manager=status_manager,
event_publisher=event_hub,
),
]
)
)
# 3) 回滚增强入口内部绑定快照状态,并组合摘要与回滚执行器
session_id = "demo-session"
session_revert = SessionRevertService(
session_id=session_id,
file_system=file_system,
worktree="/abs/project",
event_publisher=event_hub,
runner_manager=runner_manager,
)
# 4) 组合进 We0AgentPorts,交给 We0Agent(ports=...)
ports = We0AgentPorts(
event_publishers=[event_hub],
status_tracker=status_manager,
session_revert=session_revert,
)装配顺序的关键点:
event_hub、status_manager、abort_events和runner_manager先构造一次再共享。SessionRevertService内部持有快照、摘要和回滚运行时,用户侧只把它作为一个可选增强放入ports.session_revert。