we0agent

Guides

Abort

用 asyncio.Event 打断正在运行的 agent,并理解中断后的收口与 AbortError 结果。

概述

we0agent 用一个 asyncio.Event 表示「中断信号」。每次运行(stream()invoke()build_request())都必须传入一个 abort 事件,调用方在任意时刻 abort.set(),engine 就会在当前运行阶段执行中断收口:取消正在进行的模型流和工具执行,把已经产生的状态落库,并以一个带 AbortError 的 assistant message 收尾。

abort 不是「立刻杀进程」,而是一次协作式中断。engine 监听这个事件,在安全的边界上停止后续工作,因此被打断的那一轮仍然会留下一段可读、可恢复的会话历史。被 abort 打断后,常见的下一步是用同一个 session_idresume 模式继续,参见 Resume

Mermaid
Rendering diagram...

触发中断

最直接的用法:自己持有那个 asyncio.Event,需要时调用 set()

import asyncio

abort = asyncio.Event()

async def run() -> None:
    async for event in agent.stream(
        abort=abort,
        mode="prompt",
        session_id="ses_demo",
        messages=[user_message("整理当前目录并生成报告。")],
    ):
        if event["type"] == "text-delta":
            print(event["text"], end="", flush=True)

async def main() -> None:
    task = asyncio.create_task(run())
    await asyncio.sleep(5)
    abort.set()          # 5 秒后请求中断
    await task           # 等待 engine 完成收口

abort 是一个普通的 asyncio.Event,没有内置超时。「运行 N 秒后中断」「按下按钮后中断」这类策略由调用方实现,engine 只负责在事件被 set 之后收口。

中断的传播

abort 事件被 set 之后,engine 内部按下面的链路逐层收口:

阶段行为
abort watcher后台 watcher 监听 abort.wait(),事件触发后把当前 step 标记为 aborted
模型流若此刻没有正在执行的工具,直接取消正在消费的模型流任务(producer task)。
工具执行若有正在执行的工具,先等工具收到取消并收口,再继续。读取类工具按取消处理,已落库的副作用不会回滚。
收口当前 attempt 调用 halt_abort:标记 aborted=True、取消工具执行、给 assistant message 写入 AbortError、按中断语义关闭仍打开的 part。
落库已产生的 parts 和这条带错误的 assistant message 一并持久化,供后续 resume 使用。

也就是说,正在跑工具时触发 abort,engine 会优先让工具收到取消并完成清理,而不是粗暴地中断模型流,避免留下半开的工具调用状态。

AbortError 结果

中断收口时,engine 给当前 assistant message 写入一个结构化错误。它由 MessageErrorBuilder.build(error, aborted=True) 构造,是 We0AssistantMessageError 的一个实例:

from we0agent.domain.session.message import We0AssistantMessageError

We0AssistantMessageError(reason="AbortError", message="Aborted")

落在 We0AssistantMessage.error 上的字段如下:

字段类型说明
reasonstr"AbortError"标识本条消息因中断收口。
messagestr | None"Aborted"人类可读的简短说明。
codestr | NoneNoneabort 不携带 provider 错误码。
status_codeint | NoneNoneabort 不携带 HTTP 状态码。
retryablebool | NoneNoneabort 不参与可重试判定。

AbortError 是 engine 主动收口的结果,不同于 provider 返回的 APIErrorContextOverflowError 等真实错误:只要 aborted=TrueMessageErrorBuilder 会直接返回 AbortError,不再去解析底层异常。消费方可以通过 error.reason == "AbortError" 判断这一轮是被中断的,而非失败的。assistant message 的结构参见 Events

托管会话的中断(AbortEventManager)

如果由一个长期存在的管理层来托管多个会话(而不是调用方各自持有 abort 事件),可以用 AbortEventManagersession_id 维护这些事件的注册表。它是一个进程内的字典,把每个活跃会话的 abort 事件登记在一起,便于从运行循环之外触发中断。

from we0agent.builtins.manager.abort_event_manager import AbortEventManager

abort_events = AbortEventManager()

AbortEventManager 的接口:

方法签名说明
registerregister(session_id: str, event: asyncio.Event) -> None登记某个会话当前正在使用的 abort 事件。
unregisterunregister(session_id: str) -> None移除登记,会话进入 idle 时调用。
getget(session_id: str) -> asyncio.Event | None取出某会话的 abort 事件,未登记时返回 None
resetreset() -> None清空全部登记。

注册和注销由 AbortEventRunnerObserver 在会话运行的生命周期上自动完成:会话 startedregister,进入 idleunregister。因此外部只需按 session_id 取出事件并 set 即可触发中断。

abort = abort_events.get(session_id)
if abort is not None:
    abort.set()

get 返回 None 表示该会话当前没有正在运行的轮次(没有登记 abort 事件),此时无需中断。这种「先取再判空」的写法是必要的,直接对 None 调用 set() 会抛 AttributeError

默认行为与易错点

  • 每次运行都必须传 abort,它是 stream()invoke()build_request() 的必填参数;engine 不会替你创建。
  • abort 是一次性信号。被 set 过的事件若复用到下一轮,会被立即判定为已中断,所以每一轮应当用一个全新的 asyncio.Event
  • abort 是协作式中断,不保证瞬时停止。正在执行的工具会先收到取消并收口,已经发生的副作用(写文件、执行命令等)不会自动回滚。
  • 被打断的那一轮会留下一条带 AbortError 的、未完成的 assistant message。要从断点继续,用同一个 session_id 配一个新的 abort 事件以 mode="resume" 再跑一次,参见 Resume
  • AbortEventManager 仅在单进程内有效,它不跨进程、不持久化,进程退出后登记即丢失。
  • 通过 AbortEventManager.get(session_id) 取事件时务必判空,会话处于 idle 时返回 None

On this page