we0agent

API reference

Built-ins

内建服务参考:用公开类装配 event hub、状态、快照、回滚等运行时能力,并组合进 We0AgentPorts。

We0Agent 本身只关心「一次 turn 怎么跑」,而事件分发、状态跟踪、快照、会话回滚与变更摘要这些进程级运行时能力都由外部端口注入(参见 We0Agentports 参数)。SDK 在 we0agent.builtins 下提供这些端口的开箱即用实现。

Mermaid
Rendering diagram...

内建组件总览

组件import形态对应端口 / 用途
We0EventHubfrom we0agent.builtins.manager.we0_event_hub import We0EventHub进程级发布订阅 hub实现 We0EventPublisher,可放进 ports.event_publishers
SessionStatusManagerfrom we0agent.builtins.manager.session_status_manager import SessionStatusManager按 session 的状态表实现 We0StatusTracker,可作为 ports.status_tracker
AbortEventManagerfrom we0agent.builtins.manager.abort_event_manager import AbortEventManager活跃 session 的中断事件注册表配合 runner,供外部按 session_id 触发中断
SessionRunnerManagerfrom we0agent.builtins.manager.session_runner_manager import SessionRunnerManager按 session 串行化的运行器编排单会话 run / cancel / busy 判定
SessionSnapshotServicefrom we0agent.builtins.service.session_snapshot import SessionSnapshotService按 session 管理 SnapshotStateSessionRevertService 提供快照能力,参见 Snapshots
BuiltinSessionSummaryServicefrom we0agent.builtins.service.session_summary import BuiltinSessionSummaryService变更摘要服务SessionRevertService 写入 diff summary
SessionRevertServicefrom we0agent.builtins.service.session_revert import SessionRevertService会话回滚增强入口实现 SessionRevertCapability,可作为 ports.session_revert
BuiltinSessionRevertRuntimefrom we0agent.builtins.service.session_revert import BuiltinSessionRevertRuntime会话回滚执行器实现 SessionRevertRuntime,执行 revertunrevertcleanup

区分:manager 下的组件偏向进程级运行时基础设施(事件、状态、中断、运行器),service 下的组件偏向与会话数据交互的高层能力(快照、摘要、回滚)。两者都是普通类,构造即用,状态保存在进程内存中。

装配进 We0AgentPorts

最快的路径是直接构造 SessionRevertService,再放进 We0AgentPorts.session_revert。服务内部完成快照服务创建、会话状态绑定、摘要服务和回滚执行器组合。

from we0agent.agent.agent import We0Agent
from we0agent.domain.models.agent import We0AgentPorts
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.service.session_revert import SessionRevertService

file_system = LocalFileSystemRuntime(root="/abs/app-runtime")

ports = We0AgentPorts(
    session_revert=SessionRevertService(
        session_id="demo-session",
        file_system=file_system,
        worktree="/abs/project",
    )
)

agent = We0Agent(
    name="demo",
    model=model,             # 见 We0Model
    system_prompt=prompt,    # 见 Prompts
    ports=ports,
)

SessionRevertService 的低成本参数:

参数类型必填默认说明
session_idstr要绑定快照状态的会话标识。
file_systemFileSystemRuntime当前文件系统和 Git 执行环境;本地实现用 LocalFileSystemRuntime(root=...) 显式指定运行时根目录。
worktreestr | Path"."git --work-tree 根目录,会被 resolve() 成绝对路径。
event_publisherWe0EventPublisher | NoneNone回滚时发布 session.diff 事件;省略时使用内部 We0EventHub
runner_managerSessionRunnerManager | NoneNone回滚前执行 busy 判定;省略时使用空 observer 的内建 runner manager。

若还需要事件与状态能力,按下文手动补齐 event_publishersstatus_tracker 即可。

We0EventHub

进程级发布订阅 hub,是 We0EventPublisher 抽象基类(publish(*, session_id, event))的实现,可直接放进 ports.event_publishers。engine 产生的每个 We0Event 都会被包装成 We0StreamEvent 后分发给所有匹配的订阅者。

from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.domain.models.agent import We0AgentPorts

hub = We0EventHub()
ports = We0AgentPorts(event_publishers=[hub])

# 订阅指定类型事件(仅本 session)
subscription = hub.subscribe({"text-delta", "finish"}, session_id="demo-session")
async for stream_event in subscription:           # We0StreamEvent,frozen
    print(stream_event.type, stream_event.event)  # type 与原始 We0Event

订阅方式分两类:

方法返回说明
subscribe(event_types, *, session_id=None)We0EventSubscription订阅指定类型集合,可 async for 迭代;session_id 为空表示全部会话。
subscribe_all(*, session_id=None)We0EventSubscription订阅全部类型。
subscribe_callback(config)Callable[[], None]注册回调(同步或异步),返回退订函数。
subscribe_all_callback(callback, *, session_id=None)Callable[[], None]全类型回调订阅的便捷封装。
close_subscription(subscription)None(async)关闭并移除一个迭代式订阅。
reset()None关闭并清空全部订阅。

We0StreamEvent 是 frozen 模型,字段为 id(hub 内事件标识)、session_idtypeWe0EventType)、event(原始 We0Event)。订阅只接收注册之后发布的事件,不会回放历史;回调订阅中抛出的异常会被 hub 捕获并记录日志,不影响其他订阅者。事件结构参见 EventsStreaming

SessionStatusManager

session_id 保存当前运行状态,是 We0StatusTracker 抽象基类(set(session_id, status))的实现。

from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.domain.models.agent import We0AgentPorts

status_manager = SessionStatusManager()
ports = We0AgentPorts(status_tracker=status_manager)
方法返回说明
get(session_id)We0Status取某会话状态;未记录时返回 We0Statuses.idle()
list()dict[str, We0Status]返回全部非 idle 会话的状态副本。
set(session_id, status)None写入状态;传入 idle删除该条记录而非保存。
reset()None清空全部状态。

易错点:set 收到 type == "idle" 的状态时不会保留记录,而是直接移除该会话条目,因此 list() 只列出处于 busy 等非空闲态的会话。所有读取(get / list)返回的都是 We0Statuses.clone(...) 的副本,外部修改不会影响内部状态。

AbortEventManager

活跃 managed-session 的中断事件注册表。它本身不实现 We0AgentPorts 端口,而是配合 SessionRunnerManager:运行器启动时把该 turn 的 asyncio.Event 注册进来,外部服务即可按 session_id 取到事件并 set() 触发中断(参见 Abort)。

from we0agent.builtins.manager.abort_event_manager import AbortEventManager

abort_events = AbortEventManager()

# 外部按 session_id 触发中断
event = abort_events.get("demo-session")
if event is not None:
    event.set()
方法返回说明
register(session_id, event)None登记某会话的中断事件(由 runner observer 调用)。
unregister(session_id)None会话进入 idle 时注销。
get(session_id)asyncio.Event | None取某会话当前的中断事件;无活跃 run 时为 None
reset()None清空全部登记。

SessionRunnerManager

session_id 串行化会话运行的运行器集合。它不直接进 We0AgentPorts,而是会话回滚执行器的依赖:BuiltinSessionRevertRuntime 通过它的 assert_not_busy 判断会话是否正在运行,从而避免在 run 进行中执行回滚。

需要连接中断和状态时,直接构造 observer 列表:

from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.builtins.manager.abort_event_manager import AbortEventManager
from we0agent.builtins.manager.session_runner_manager import SessionRunnerManager
from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.builtins.manager.session_runner_lifecycle import SessionRunnerLifecycle
from we0agent.builtins.manager.session_runner_observers import (
    AbortEventRunnerObserver,
    SessionStatusRunnerObserver,
)

event_hub = We0EventHub()
status_manager = SessionStatusManager()
abort_events = AbortEventManager()

runner_manager = SessionRunnerManager(
    lifecycle=SessionRunnerLifecycle(
        observers=[
            AbortEventRunnerObserver(abort_events),
            SessionStatusRunnerObserver(
                status_manager=status_manager,
                event_publisher=event_hub,
            ),
        ]
    )
)

AbortEventRunnerObserver 会在 run 启动和结束时登记或注销中断事件。SessionStatusRunnerObserver 会在 run 启动时写入 busy,结束时写回 idle,并可通过事件发布器广播状态变化。

SessionRunnerManager 自身的关键方法:

方法返回说明
is_busy(session_id)bool(async)该会话是否有正在运行的 runner。
assert_not_busy(session_id)None(async)忙时抛 We0BusyError
cancel(session_id)None(async)触发中断并在宽限期后强制取消。
ensure_running(session_id, abort_event, on_interrupt, work)ResultT(async)启动或复用运行器执行 work,已运行则等待同一结果。

约束:同一 session_id 同时只跑一个 run。ensure_running 在已有运行时不会重复启动,而是 await 同一个 result future;cancelset 中断事件,超过 CANCEL_GRACE_SECONDS(1.0 秒)仍未收口才 task.cancel()We0BusyError 位于 we0agent.builtins.manager.session_runner_manager,携带 session_id 字段。

SessionSnapshotService

session_id 管理 SnapshotState 的快照服务,是 SessionRevertService 的内部依赖。它包装底层无状态的 SnapshotService,把「每次都要传 state」收敛成「按会话绑定一次」。完整的快照模型与底层服务参见 Snapshots

from we0agent.snapshot.service import SnapshotService
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.service.session_snapshot import SessionSnapshotService

file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
session_snapshot = SessionSnapshotService(snapshot_service=SnapshotService(file_system))

# 绑定会话状态;SessionRevertService 会按 session_id 取得 SnapshotTracker
session_snapshot.bind_state(
    session_id="demo-session",
    state=file_system.snapshot_state(worktree="/abs/project"),
)
方法返回说明
bind_state(*, session_id, state)None登记会话与 state 的映射(深拷贝保存)。
bind(*, session_id, state)SnapshotInstance登记 state 并返回绑定后的快照实例(即 SnapshotTracker)。
tracker(session_id)SnapshotInstance | None取已绑定会话的快照实例;未绑定时返回 None
get_state(session_id) / require_state(session_id)SnapshotState | None / SnapshotState取状态副本;require_* 缺失时抛 ValueError
track(session_id)str | None(async)记录当前快照 hash;未绑定 state 时返回 None
restore / revert / diff_snapshot / diff_fullSnapshots按会话转调底层服务的对应操作。
reset()None清空全部会话状态映射。

易错点:bind_state 对同一 session_id 重复绑定到不同 state 会抛 ValueError,绑定到相同 state 则幂等。track 在会话未 bind_state 时静默返回 None(即快照能力对该会话关闭),不会报错。

BuiltinSessionSummaryService

变更摘要服务,是 SessionSummaryService 协议的实现,通常作为 SessionRevertService 的内部依赖。它在一轮用户消息及其后续 assistant 消息之间,借助 SnapshotTracker 计算逐文件 diff,并把结果写回该用户消息的 summary.diffs

from we0agent.builtins.service.session_summary import BuiltinSessionSummaryService

summary_service = BuiltinSessionSummaryService()
方法签名说明
summarize(*, session_id, message_id, context) -> None计算指定用户消息这一轮的 diff,并写回其 summarycontextSessionSummaryContext,包含 persistence 与可选 snapshot_tracker
diff(*, session_id, persistence, message_id=None) -> list[FileDiff]读取某用户消息已存的摘要 diff;message_id 为空时返回 []
compute_diff(*, messages, snapshot) -> list[FileDiff]在一段消息范围内,取首个 step 起始快照与末个 step 结束快照之间的 diff_full

约束与默认行为:

  • summarize 总会先把会话级 summary 重置为 additions=0, deletions=0, files=0;随后若 context.snapshot_tracker is None 直接返回(即未启用快照时只清零、不计算)。
  • 当 summary 由 We0TurnRunner 触发时,runner 会通过 ports.event_publishers 发出一条空 session.diff 事件作为初始化通知;summary 服务本身不持有事件发布端口。
  • compute_diff 依赖消息 parts 中的 StepStartPart.snapshotStepFinishPart.snapshot 定位起止快照;任一缺失则返回 []
  • FileDiff 字段(file / patch / additions / deletions / status)参见 Snapshots

SessionRevertService

会话回滚增强入口,是 SessionRevertCapability 抽象基类的内建实现,可作为 ports.session_revert。它采用 capability facade 形态,对 engine 暴露快照、摘要和回滚执行三个部件。回滚的整体语义参见 Revert

低成本入口只需要会话、文件系统和工作区参数:

from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.service.session_revert import SessionRevertService

file_system = LocalFileSystemRuntime(root="/abs/app-runtime")

session_revert = SessionRevertService(
    session_id="demo-session",
    file_system=file_system,
    worktree="/abs/project",
)

共享已有 manager 或事件发布器时直接传入可选参数:

from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.builtins.manager.session_runner_manager import SessionRunnerManager
from we0agent.builtins.manager.session_runner_lifecycle import SessionRunnerLifecycle
from we0agent.builtins.service.session_revert import SessionRevertService

event_hub = We0EventHub()
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")
runner_manager = SessionRunnerManager(lifecycle=SessionRunnerLifecycle(observers=[]))

session_revert = SessionRevertService(
    session_id="demo-session",
    file_system=file_system,
    worktree="/abs/project",
    event_publisher=event_hub,
    runner_manager=runner_manager,
)
参数类型必填说明
session_idstr当前会话标识。
file_systemFileSystemRuntime当前文件系统与 Git 执行环境。
worktreestr | Path当前项目工作区。
event_publisherWe0EventPublisher | None发出回滚相关的 session.diff 事件。
runner_managerSessionRunnerManager | None回滚前用 assert_not_busy 确保会话不在运行中。

SessionRevertService 的方法:

方法返回说明
snapshot_tracker(session_id)SnapshotTracker | None取该会话绑定的快照端口;未绑定时返回 None
summary_serviceSessionSummaryService取摘要策略。
revert_runtimeSessionRevertRuntime取回滚执行器。

BuiltinSessionRevertRuntime

BuiltinSessionRevertRuntime 实现 SessionRevertRuntime,负责真正执行 revertunrevertcleanup。它依赖 SessionRunnerManager 做 busy 判定,依赖 SessionSnapshotService 恢复文件状态,依赖 SessionSummaryService 计算回滚后的 diff,并通过 We0EventPublisher 发布 session.diff 事件。

方法返回说明
revert(request, persistence)We0SessionInfoSessionRevertRequest 回滚会话与文件,写入 revert 状态与摘要。
unrevert(*, session_id, persistence)We0SessionInfo撤销回滚,恢复快照并清除 revert 状态。
cleanup(*, session, persistence)None落实回滚:物理删除回滚边界之后的消息/part 并清除 revert 状态。
assert_not_busy(session_id)None转调 runner manager 的忙判定。

SessionRevertRequestfrom we0agent.domain.session.revert import SessionRevertRequest)字段:

字段类型必填默认说明
session_idstr目标会话。
message_idstr回滚边界所在消息。
part_idstr | NoneNone精确到 part 的回滚边界;为空时回滚到消息粒度。

易错点:revertcleanup 是两步。revert标记回滚边界并还原文件、记录 session.revert,会话消息仍保留,可被 unrevert 整体撤销;只有 cleanup 才会按已记录的边界物理删除后续消息与 part。会话正在运行(busy)时回滚会抛 We0BusyError

端到端装配

把上述对象组合成一套完整的运行时端口,可以采用下面的装配方式:

from we0agent.domain.models.agent import We0AgentPorts
from we0agent.snapshot.runtime import LocalFileSystemRuntime
from we0agent.builtins.manager.we0_event_hub import We0EventHub
from we0agent.builtins.manager.abort_event_manager import AbortEventManager
from we0agent.builtins.manager.session_runner_manager import SessionRunnerManager
from we0agent.builtins.manager.session_status_manager import SessionStatusManager
from we0agent.builtins.manager.session_runner_lifecycle import SessionRunnerLifecycle
from we0agent.builtins.manager.session_runner_observers import (
    AbortEventRunnerObserver,
    SessionStatusRunnerObserver,
)
from we0agent.builtins.service.session_revert import SessionRevertService

# 1) 进程级单例:先各构造一次,再共享引用
event_hub = We0EventHub()
status_manager = SessionStatusManager()
abort_events = AbortEventManager()
file_system = LocalFileSystemRuntime(root="/abs/app-runtime")

# 2) 运行器:把中断与状态 observer 接到同一个 hub 上
runner_manager = SessionRunnerManager(
    lifecycle=SessionRunnerLifecycle(
        observers=[
            AbortEventRunnerObserver(abort_events),
            SessionStatusRunnerObserver(
                status_manager=status_manager,
                event_publisher=event_hub,
            ),
        ]
    )
)

# 3) 回滚增强入口内部绑定快照状态,并组合摘要与回滚执行器
session_id = "demo-session"
session_revert = SessionRevertService(
    session_id=session_id,
    file_system=file_system,
    worktree="/abs/project",
    event_publisher=event_hub,
    runner_manager=runner_manager,
)

# 4) 组合进 We0AgentPorts,交给 We0Agent(ports=...)
ports = We0AgentPorts(
    event_publishers=[event_hub],
    status_tracker=status_manager,
    session_revert=session_revert,
)

装配顺序的关键点:event_hubstatus_managerabort_eventsrunner_manager 先构造一次再共享。SessionRevertService 内部持有快照、摘要和回滚运行时,用户侧只把它作为一个可选增强放入 ports.session_revert

On this page