Skip to Content
架构Task Sandbox v2 (技术选型)

Task Sandbox v2 — 技术选型与解耦架构

双向通信 · 任务可扩展 · 长任务可靠 — 把当前单向 SSE + stdout JSONL 的事件链路,升级到 WebSocket + sandbank Relay + Claude SDK streamInput 的双向架构。一次性解锁 用户运行中追加指令 / 主 API 重启不丢任务 / 多 task type 插件化 / 多 task 并发隔离。本文不替代 POC 阶段沉淀的真知识(network quirks · 凭据 scope · WSL2 SOP)— 它在那之上重新画”如果今天从零开始,task 沙箱该长什么样”。

technical selection post-POC · 2026-05-22 draft
  • 作者: 探索会话沉淀
  • 状态: 待 review,未进 OpenSpec change
  • 前置: 2026-05-19 task-runner POC / 2026-05-22 microsandbox fallback
  • 替代: 本文若通过,specs/task-runner/spec.md 多条 Requirement 会被改写

§1 背景与触发点

这份文档是哪些质问的回答?

POC v2 跑通了 6 个 sandbox backend(Fly / BoxLite / E2B / Cube / AIO / Microsandbox),主 API 通过 adapter.streamLogs() 订阅 sandbox 的 stdout JSON Lines 事件,broker 模式工作正常。但越深入”接下来要做什么”,越浮现三类需求:

需求 A

任务类型可扩展

OpenSpec 4 阶段是当前唯一形态。但 spec-kit / TDD 循环 / bug fix from issue 都是合理 task type 候选,目前 run.js hardcode 不可扩。

需求 B

用户运行中追加指令

30 min 任务中段,用户想干预:“这里加个测试” / “跳过这步” / “换个方向”。当前架构是单向 SSE,没有 client → agent 的通路。

需求 C

主 API 重启不丢事件流

每次 deploy → in-process streamLogs loop 死 → 任务被 janitor 60min 后误判 failed。sandbox 可能早就跑完了,只是没人接事件。

本文核心论点: 把通信形态从”单向 SSE + stdout JSONL”换成”WebSocket + Relay”,三个需求会被同一套机制同时解决,并且 不需要 引入 BullMQ / Trigger.dev / 任何 durable execution 引擎。

§2 现状盘点

通信链路真相 + 责任耦合点 + 失败模式

2.1 真实通信形态(不是 WebSocket,是 SSE + ReadableStream)

┌──────────────────────────────────────────────────────────────────────────┐ │ web (browser) 主 API (NestJS, Fly) sandbox │ │ │ │ │ │ │ │ EventSource (SSE) │ │ │ │ │ ◄────── 单向 ─────────────│ │ │ │ │ │ │ │ │ │ │ adapter.streamLogs() │ │ │ │ │ ReadableStream<Uint8> │ │ │ │ │ ◄──── 单向 ───────────────│ │ │ │ │ (sandbox stdout JSONL) │ │ └──────────────────────────────────────────────────────────────────────────┘ • web→主 API: HTTP POST /tasks/:id/dispatch (一次性,非通道) • 主 API→sandbox: 仅在 dispatch 时 provider.create(env, …) 注入,之后无通路
web ↔ 主 API
@Sse decorator, apps/api/src/task-runner/task-runner.controller.ts:92
主 API ↔ sandbox
handle.streamLogs(), apps/api/src/task-runner/task-runner.dispatcher.ts:169
web 自动重连
浏览器 EventSource 内建(3s retry + Last-Event-ID)
主 API ↔ sandbox 重连
不存在 — in-memory loop 死即断

2.2 责任耦合点

职责当前归属类型
启停 sandboxsandbank adapter硬约束
网络方向(outbound-only)broker 架构整体硬约束
容器超时(双层)run.js 自杀 + janitor cron硬约束
凭据 scope(最小权限)InstallationTokenService硬约束
事件传输(JSONL via stdout)run.js + adapter.streamLogs实现选择
事件签名(HMAC)run.js + dispatcher实现选择
行级解析 + 状态机dispatcher in-memory loop实现选择
Stage 推进(4 阶段)PostToolUse hook 内实现选择
Agent runtime(Claude SDK + skill)run.js实现选择
Repo 操作(clone/push/PR)run.js + token helper实现选择
Token 续期dispatcher /internal endpoint实现选择

结论:硬约束 vs 实现选择 ≈ 3 : 7。后者全部是 v2 可以重画的对象。

2.3 失败模式

场景当前行为影响
主 API deploy(kill_timeout=10sstreamLogs loop 断 → 重启后不知 sandbox 归属每次 deploy 期间 running 的 task 失去事件流,janitor 60min 后误判 failed
主 API 进程 crash同上同上(生产 crash 罕见,但发生即批量丢)
web 标签页切走再回SSE 自动重连✓ 不丢(但缺 Last-Event-ID 实现,重连期间 server 发的事件丢失)
用户想中途追加指令无通路只能 destroy 整个 sandbox 重新派发
用户想接受 / 拒绝 permission容器内 bypassPermissionsN/A(设计上跳过了这个能力)

§3 解耦目标

v2 想动什么,不想动什么

3.1 想解耦的(按产品价值排序)

G1 · 产品需求

双向通道:用户能在任务运行中追加指令

“这里加测试” / “换个方向” / “取消” / “回答 permission prompt” 都要能从 web 实时发到 agent,agent 实时收到并影响后续行为。

G2 · 产品需求

任务类型可扩展(不止 OpenSpec)

spec-kit、TDD red-green-refactor 循环、bug fix from issue —— 不能 hardcode 在 run.js 里。换 task type 不用改 image,也不用改 sandbox adapter。

G3 · 系统韧性

主 API 重启不丢正在跑的任务

deploy 期间,sandbox 内 agent 不知道、不关心 host 死了;事件继续累积;主 API 重启后接续订阅,前端无感。

G4 · 系统能力

多 task 并发隔离(不是多 agent)

能并发的是”不同 task”,不是单 task 内的多个 agent。单 task 内 agent 仍是序列的(一次只在 explore/propose/apply/archive 其一)。Relay 的 channel = task_id 让多 task 事件流天然隔离、可独立 catch-up。

3.2 不动的(POC 真知识)

沉淀保留理由
sandbank fork(6 backend)给用户的”产品选择”,不是抽象冗余
主 API outbound-only 网络约束Fly / TCC mac / VPN 部署的网络真相
install token 最小权限 + 不持久化已 hardened,无理由动
容器内三轨鉴权(subapi / 直连 / OAuth 禁用)S7 实测验证
双层 timeout(容器自杀 + 主 API 兜底)主 API 兜底由 janitor 改 ResubscribeBootstrap,但仍是双层
WSL2 quirk SOP(s13)microsandbox shim 部署沉淀
scoped install token 续期但续期”通道”会变(容器拉 → 主 API 推)

§4 关键决策 D1–D6

每条对应 §3 的一个目标

D1通信形态 = WebSocket 双向(替代 SSE + stdout JSONL)

决定

web ↔ 主 API ↔ sandbox-agent 三段全部走 WebSocket,由中间的 sandbank Relay 桥接。退役 SSE / stdout JSONL 作为主路径(保留 stdout 作 crash diagnostic)。

理由

  • G1(用户追加指令)需要 client → server 双向,SSE 做不到
  • Claude Agent SDK 原生支持 query({ prompt: AsyncIterable<SDKUserMessage> })Query.streamInput() —— 边跑边接收用户消息是它内建能力
  • Fly.io 官方:“WebSocket implementation is very straightforward; no third-party tools necessary”
  • NestJS 有原生 @WebSocketGateway,跟现有 module / guard / DI 无缝

代价

  • web 端 EventSource 改 socket.io-client(一次性迁移)
  • run.js 改为 streamInput + AsyncIterable 模式
  • 退 SSE 期间会有 dual-protocol 阶段(迁移期 1–2 周)
D2中间层 = sandbank Relay(不自写 broker,不让主 API 直接接 inbound)

决定

引入 @douglas-agent/sandbank-relay(WebSocket + JSON-RPC 2.0),作为独立 fly app 部署。主 API 和 sandbox-agent 都 outbound 连 Relay,channel key = task_id

理由

  • 主 API outbound-only 网络约束完美保留:主 API 不再暴露 /ingest 这种 inbound
  • Relay 是上游 chekusu/sandbank 一等公民组件,我们的 @douglas-agent/sandbank-* fork 跟随上游;协议、缓冲、重连机制不需要我们造
  • channel = task_id 让 G4(多 task 隔离)天然成立,无需自写分片
  • 每个 backend 都要”消息注入通路”的方式 B 工作量 ≈ ×6(每 backend 写一次),不可取

代价

  • 新增 1 个 fly app(shared-cpu-1x, 512MB ≈ $5-10/mo
  • 跟着 sandbank 上游版本(已经在跟,本期复用)
  • Relay 单点故障域:Relay 死 → 所有 task 失去事件流(但仍可 belt-and-suspenders 走 stdout JSONL fallback)
D3agent runtime = Claude SDK streamInput(替代 query("string prompt"))

决定

容器入口 run.js 把当前的单次 query("/opsx:propose ...") 改为:

const session = await connect() // @douglas-agent/sandbank-agent const userQueue = new AsyncQueue() session.on(‘message’, m => userQueue.push(m)) async function* input() { yield { role: ‘user’, content: PROMPT } // 初始 prompt while (running) { yield await userQueue.shift() // 用户追加 } } for await (msg of query({ prompt: input(), … })) { session.broadcast(‘task-event’, msg) // 替代 stdout JSONL }

理由

  • Claude Agent SDK 文档明确:“streaming input mode is the preferred way”
  • permission prompt / interruption 都是 SDK 内建语义,可以转发给用户
  • 事件流回报从 stdout 切到 session.broadcast() —— 不再依赖 adapter.streamLogs
D4主 API 重启接续 = Task 表加 sandboxId + ResubscribeBootstrap

决定

Prisma Task model 加两列:sandboxId + sandboxProvider。主 API OnModuleInit 阶段跑一个轻量重订阅例程。

// apps/api/src/task-runner/task-runner.bootstrap.ts async onModuleInit() { const active = await prisma.task.findMany({ where: { status: { in: [‘running’, ‘awaiting_review’] }, sandboxId: { not: null } } }) for (const t of active) { const relayChannel = `task/${t.id}` relayClient.subscribe(relayChannel, msg => this.routeEvent(t.id, msg)) } }

理由

  • 这就是 G3(主 API 重启韧性)的最小可行方案
  • 不需要 ReconcileWorker / BullMQ / Trigger.dev / durable execution(前面探讨过的方向是过度工程)
  • 30 行代码 + 一条 migration 解决 90%+ 的失败模式
D5任务类型插件化 = /task-spec.json 协议

决定

容器启动时挂载一个 /task-spec.json 描述跑什么模块;run.js 不再 hardcode OpenSpec 4 阶段,改为 dispatch 到对应 runner 模块。

{ “version”: 1, “type”: “openspec-4stage” | “spec-kit” | “tdd-cycle” | “bug-fix-from-issue”, “params”: { “prompt”: ”…”, “change_id”: ”…”, // task-type 特定参数 }, “limits”: { “max_turns”: 100, “max_minutes”: 60 }, “skills”: [“.claude/skills/openspec-explore”, “.claude/skills/openspec-propose”, …] }

每个 task-type 对应一个 npm 子包

  • @douglas-agent/task-runner-openspec · 现有 4 阶段
  • @douglas-agent/task-runner-spec-kit · GitHub spec-kit phase gates
  • @douglas-agent/task-runner-tdd · red-green-refactor 循环(cyclic state)
  • @douglas-agent/task-runner-bugfix · reproduce → localize → patch → verify

建议先做的三个 task-type

spec-kit(验证抽象通用性)+ TDD(验证 cyclic state)+ OpenSpec explore-only(验证子集形态)。其它后续。

D6不引入 durable execution 引擎(BullMQ / Inngest / Trigger.dev / Temporal)

决定

本期不引入任何上述工具。任务状态机仍住在 Postgres + NestJS 内,重启韧性由 D4 解决。

理由

  • 当前任务并发个位数 ~ 十位数,远未到工具门槛
  • BullMQ 解决的是”分发短任务”,错配于”长任务可恢复”
  • Trigger.dev / Inngest 是真正契合的,但要付 1 个新 fly app + Redis ≈ $20-30/mo + 学一套 DSL
  • D4 的 30 行 bootstrap 已经覆盖了 90% 的失败模式
  • 未来真到瓶颈,reconcile 函数搬到 Trigger.dev 是平移式迁移,不破坏现有代码

触发重新评估的条件

  • 同时活跃 task 经常 > 100
  • 主 API 需要多副本横向扩
  • 商业化 SLA 承诺需要”任意 step 幂等可重试”

§5 目标架构

四层组件 + 数据流

┌────────────────────────────────────────────────────────────────────────────┐ │ web (Next.js · browser) │ │ socket.io-client / native WS │ │ ─ send: 用户追加指令 / 暂停 / 取消 / permission 回应 │ │ ─ recv: stage_change / log / completed / failed / agent_question │ │ ─ 浏览器自动重连(WS Last-Event-ID 模拟) │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ WSS (持久连接) ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ 主 API (NestJS · Fly · min=1) │ │ │ │ @WebSocketGateway(‘/tasks/:id/stream’) [NEW] │ │ ─ on(‘message’) from web → forward 到 Relay ‘task/:id’ channel │ │ ─ subscribe Relay ‘task/:id’ → forward 到 web client │ │ │ │ TaskRunnerDispatcher [修订] │ │ ─ provider.create() 启动 sandbox │ │ ─ store sandbox_id 进 Task 表 │ │ ─ subscribe Relay ‘task/:id’ │ │ ─ 不再 维护 in-process streamLogs loop │ │ │ │ ResubscribeBootstrap [NEW · 30 lines] │ │ ─ OnModuleInit: SELECT running tasks → 对每个 subscribe Relay channel │ │ │ │ TaskRunnerJanitor [保留 · 兜底] │ │ ─ 仅当 ResubscribeBootstrap 发现 sandbox 已死时清理 │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ WSS outbound (主 API → Relay) ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ sandbank Relay (Fly · 独立 app · ~$5-10/mo) [NEW] │ │ @douglas-agent/sandbank-relay (WebSocket + JSON-RPC 2.0) │ │ ─ channel = task_id (多 task 天然隔离) │ │ ─ 物理上接受 inbound 但主 API 和 sandbox 都是 outbound 进 │ │ ─ 可选缓冲最近 N 条消息(供主 API 重连 catch-up) │ │ ─ session.context 能力不本期使用(单 task 内 agent 序列, 不需共享 context) │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ WSS outbound (sandbox → Relay) ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ sandbox: task-runner image · run.js [修订] │ │ │ │ 1. 读 /task-spec.json (替代 hardcoded 4 阶段) │ │ 2. connect() to Relay via @douglas-agent/sandbank-agent │ │ 3. session.on(‘message’) push to userQueue │ │ 4. query({ prompt: AsyncIterable(initial + userQueue) }) [streamInput] │ │ 5. for await event of query() → session.broadcast(event) │ │ 6. 不再写 stdout JSONL (除非 dual-protocol 迁移期) │ └────────────────────────────────────────────────────────────────────────────┘

5.1 主 API 在新架构里只剩”协议翻译 + 状态机持久化”

组件修订前修订后
TaskRunnerDispatcher启 sandbox + 维护 streamLogs in-memory loop(30min 长寿)启 sandbox + 存 sandbox_id + 订阅 Relay(瞬时操作)
TaskRunnerEventBroker行级解析 + HMAC + 写 task_events + SSE 广播退役(验签搬到 Relay 层;分发由 WebSocketGateway 直接做)
TaskRunnerJanitor主路径:60min stale → 标 failed兜底路径:ResubscribeBootstrap 发现 sandbox 死了才标 failed
TaskRunnerController@Sse + POST /dispatch@WebSocketGateway + POST /dispatch(dispatch 保留为短 HTTP)
/internal/tasks/:id/token容器 inbound 拉续期 token退役 — 主 API 主动通过 Relay ‘task/:id’ 推新 token

§6 与现有架构的差异

保留 / 修改 / 删除 / 新增

保留
  • sandbank fork + 6 backend(产品选择)
  • install token 最小权限 + 不持久化
  • 容器内三轨鉴权
  • 双层 timeout 思想(实现细节变)
  • WSL2 quirk SOP(microsandbox shim 部署)
  • 主 API outbound-only 网络约束
  • scoped install token 续期(但通道变)
修改
  • 事件回传协议:stdout JSONL → Relay broadcast
  • 用户面通道:SSE → WebSocket
  • 容器入口:单次 query(string) → streamInput AsyncIterable
  • task type:hardcoded 4 阶段 → /task-spec.json plugin
  • 主 API 重启:丢任务 → 自动接续
  • Token 续期:容器拉 → 主 API 推
  • 双层 timeout 主路径:cron → bootstrap reconcile
删除
  • adapter.streamLogs 强制约束(变 optional)
  • dispatcher in-memory streamLogs loop
  • HMAC inline 签名(连接层 WSS / mTLS 取代)
  • POST /internal/tasks/:id/token 路由
  • cron 60min 巡检主路径
新增
  • Relay 作为独立 fly app 部署
  • Task.sandboxId / sandboxProvider 两列 + migration
  • ResubscribeBootstrap module
  • @WebSocketGateway('/tasks/:id/stream')
  • run.js streamInput + @douglas-agent/sandbank-agent 集成
  • /task-spec.json 协议 + 至少 1 个 plugin runner(OpenSpec)

6.1 现有 spec.md Requirement 影响

Requirement(当前 spec)v2 状态
Sandbank 沙箱中间层抽象保留(adapter 接口可瘦:streamLogs 变 optional)
Sandbank 包来源切换到自有 fork保留
新增 adapter 的 capability 集保留
Microsandbox shim 服务契约保留
WSL2 quirk 合规约束保留
Event broker · 主 API outbound-only 订阅 adapter 日志改写 — broker 由 Relay 替代
task-runner stdout JSON Lines 事件协议改写 — 主路径 Relay broadcast,stdout 留 fallback
task-runner 容器入口契约改写 — 增加 task-spec.json + streamInput 章节
HMAC callback 协议改写 — 主路径退役,连接层 WSS 取代
installation token 续期改写 — 通道改为主动推
双层 timeout 防护改写 — 巡检主路径换 ResubscribeBootstrap
容器内鉴权策略(三轨)保留
主 API 并发上限保留
容器 install token 最小权限保留
容器内 git credential 不持久化保留
容器 outbound 网络层 partial 防护保留

§7 任务类型插件协议

支撑 G2(task type 可扩展)· runner = npm 包,host 提供能力

7.1 设计原则

原则 1

runner 不感知基础设施

runner 模块不 import sandbank / sandbox-agent / Anthropic SDK / git。所有这些由 host context 注入。换 sandbox backend、换 LLM provider 都不改 runner。

原则 2

单 task 内 agent 仍是序列

runner 是 stage 调度器,不是并行编排器。单 task 内同一时刻只有一个 query() 在跑(4 阶段是串行 stage 切换,cyclic 是同一 query() 多轮循环)。

原则 3

声明式优先于代码

能用 /task-spec.json 声明的(stage 检测 rule、skill 列表、limit)就别写 TS。runner 代码只承载真业务逻辑(stage 间过渡、cycle 终止条件、completion verifier)。

原则 4

host 是稳定 ABI

runner 依赖 HostContext 接口而非具体实现。host 升级不破坏 runner,runner 升级不破坏 host。

7.2 容器启动加载流程

┌─────────────────────────────────────────────────────────────────────────┐ │ task-runner image 入口 (/app/entry.js) │ │ │ │ 1. parse /task-spec.json │ │ 2. resolve runner module: require(‘@douglas-agent/task-runner-’ + │ │ spec.type) │ │ 3. const runner = module.create() // implements TaskRunner │ │ 4. const host = await initHostContext(spec) │ │ │ │ │ ├─ Claude SDK client (subapi / direct base url 已配) │ │ ├─ Sandbox session via @douglas-agent/sandbank-agent.connect() │ │ ├─ Git workspace (clone + credential helper) │ │ ├─ Token store (auto-refresh via Relay 主推) │ │ └─ Stage state writer │ │ 5. await runner.run(host, spec) │ │ 6. host.complete(result) 或 host.fail(reason) │ └─────────────────────────────────────────────────────────────────────────┘

7.3 host 给 runner 的能力(HostContext 接口)

// 由 task-runner image 提供, runner 模块通过依赖注入接收 interface HostContext { // Claude Agent SDK 已就绪的 client (env、auth 已注入) ai: { query(opts: QueryOptions): AsyncIterable<SDKMessage> streamInput(messages: AsyncIterable<SDKUserMessage>): void interrupt(): Promise<void> } // 双向通道 (D3: 替代 stdout JSONL) session: { broadcast(event: TaskEvent): void // agent → web on(type: ‘user_message’, handler: (msg) => void): Unsubscribe on(type: ‘interrupt’, handler: () => void): Unsubscribe on(type: ‘token_renewed’, handler: (token) => void): Unsubscribe } // Git 操作 (token 由 host 透明管理) git: { clone(repo: string, branch: string): Promise<Workspace> commit(ws: Workspace, message: string): Promise<void> push(ws: Workspace, branch: string): Promise<void> openPr(opts: PrOptions): Promise<{ url: string }> } // stage 状态 (持久化到 task 行) stage: { transition(to: string, status?: ‘running’ | ‘awaiting_review’): void increment_cycle(): void // cyclic 用 current(): string } // 日志 (会被 broadcast 出去, 不要 console.log) log(level: ‘info’ | ‘warn’ | ‘error’, msg: string, data?: object): void // 退出 complete(result: { prUrl?: string; metrics?: object }): never fail(reason: string, stage?: string): never }

7.4 Runner 模块接口契约

// 每个 task type = 一个 npm 包, 默认 export 一个 factory export interface TaskRunner { readonly type: string // ‘openspec-4stage’ readonly stateModel: ‘dag’ | ‘cyclic’ describe(): RunnerManifest // 给主 API 探测用 run(host: HostContext, spec: TaskSpec): Promise<void> } interface RunnerManifest { type: string version: string // semver, 跟 npm 包版对齐 required_skills: string[] // 默认 skill 列表 required_capabilities: string[] // 如 ‘git’, ‘pr’ state_model: ‘dag’ | ‘cyclic’ stages: string[] // DAG: 全部 stage; Cyclic: 单轮内的 stage param_schema: JsonSchema // spec.params 校验 default_limits: TaskLimits }

7.5 task-spec.json schema (v1, 完整)

{ “version”: 1, // schema 版本, 不是 task spec 版本 “type”: “openspec-4stage”, // runner module type “runner_pkg”: “@douglas-agent/task-runner-openspec”, “runner_version”: “^1.0.0”, // semver range “params”: { … }, // runner 特定参数, 由 param_schema 校验 “limits”: { “max_turns”: 100, // 单 query() 最大 turn “max_minutes”: 60, // 总时长, 触发 hard timeout “max_input_tokens”: 200000, “max_cycles”: 10 // 仅 cyclic }, “skills”: […], // 覆盖 manifest.required_skills “stage_detection”: { // 声明式 stage 切换 “rules”: [ Rule, Rule, … ] }, “completion”: { “git_push”: true, “open_pr”: true, “verifier”: { “type”: “typecheck” } // 可选, completion 前跑 }, “user_controls”: { // 允许用户从 web 发的指令 “interrupt”: true, “append_message”: true, “stop_loop”: false // 仅 cyclic 有意义 } }

7.6 State Model: DAG vs Cyclic

DAG (openspec / spec-kit / refactor / docs-sync)
explore ─▶ propose ─▶ apply ─▶ archive ─▶ done ↘ failed
  • 有限 stage, 每个 stage 进入恰好 1 次(或失败)
  • stage_detection rule 决定 “什么时候进入下一 stage”
  • 用户追加指令影响当前 stage 内 agent 行为, 不改变 DAG
Cyclic (tdd / bug-fix / dep-upgrade)
┌─▶ red ─▶ green ─▶ refactor ─▶ evaluate ─┐ │ │ └──────── repeat (cycle++) ◄───────────────┤ │ done ◄───┤ (用户 stop / max_cycles failed │ / verifier 通过) ▼
  • 单轮内是 DAG, 多轮循环
  • 每轮结束跑 evaluate predicate 决定 done / repeat / fail
  • 用户的 stop_loop 消息触发本轮完成后退出

7.7 stage_detection rule DSL

// Rule 由 PostToolUse hook 在 sandbox 内本地评估 interface Rule { match: ToolPattern // 匹配条件 action: Action // 触发动作 priority?: number // 默认 0, 高优先级先匹配 } interface ToolPattern { tool: ‘Write’ | ‘Bash’ | ‘Read’ | ‘Edit’ | ’*’ path?: string // glob: ‘src/**/*.ts’ command_prefix?: string // Bash: ‘npm test’ 等 result?: ‘success’ | ‘failure’ // Bash 退出码 } type Action = | { kind: ‘transition’; to: string } // DAG: 切到新 stage | { kind: ‘increment_cycle’ } // Cyclic: cycle++ | { kind: ‘evaluate’; check: ‘tests_pass’ | ‘verifier’ } | { kind: ‘complete’; data?: object } | { kind: ‘fail’; reason: string }

规则评估:每次 Claude tool 调用后,runner 按 priority 顺序匹配 rules,第一个命中的 action 触发。未命中保持当前 stage。

7.8 三个 task-type 完整设计

7.8.1 openspec-4stage (DAG, 现有 task)

{ “version”: 1, “type”: “openspec-4stage”, “runner_pkg”: “@douglas-agent/task-runner-openspec”, “runner_version”: “^1.0.0”, “params”: { “prompt”: “Add dark mode toggle”, “change_id”: “add-dark-mode”, “openspec_change_path”: “openspec/changes/add-dark-mode” }, “limits”: { “max_turns”: 100, “max_minutes”: 60 }, “skills”: [ “.claude/skills/openspec-explore/SKILL.md”, “.claude/skills/openspec-propose/SKILL.md”, “.claude/skills/openspec-apply/SKILL.md”, “.claude/skills/openspec-archive/SKILL.md” ], “stage_detection”: { “rules”: [ { “match”: { “tool”: “Write”, “path”: “openspec/changes/*/proposal.md” }, “action”: { “kind”: “transition”, “to”: “propose” } }, { “match”: { “tool”: “Write”, “path”: “src/**/*” }, “action”: { “kind”: “transition”, “to”: “apply” } }, { “match”: { “tool”: “Bash”, “command_prefix”: “openspec archive” }, “action”: { “kind”: “transition”, “to”: “archive” } } ] }, “completion”: { “git_push”: true, “open_pr”: true, “verifier”: { “type”: “typecheck” } }, “user_controls”: { “interrupt”: true, “append_message”: true } }

7.8.2 spec-kit (DAG, 更重 phase gate)

{ “version”: 1, “type”: “spec-kit”, “runner_pkg”: “@douglas-agent/task-runner-spec-kit”, “params”: { “prompt”: ”…”, “constitution_path”: “.github/spec-kit/constitution.md” }, “stage_detection”: { “rules”: [ { “match”: { “tool”: “Write”, “path”: “specs/*/spec.md” }, “action”: { “kind”: “transition”, “to”: “spec” } }, { “match”: { “tool”: “Write”, “path”: “plans/*/plan.md” }, “action”: { “kind”: “transition”, “to”: “plan” } }, { “match”: { “tool”: “Write”, “path”: “plans/*/tasks.md” }, “action”: { “kind”: “transition”, “to”: “tasks” } }, { “match”: { “tool”: “Bash”, “command_prefix”: “git checkout -b implement/” }, “action”: { “kind”: “transition”, “to”: “implement” } } ] }, “completion”: { “verifier”: { “type”: “checklist”, “path”: “plans/*/tasks.md” }, “git_push”: true, “open_pr”: true } }

7.8.3 tdd-cycle (Cyclic, 暴露循环协议)

{ “version”: 1, “type”: “tdd-cycle”, “runner_pkg”: “@douglas-agent/task-runner-tdd”, “params”: { “feature_prompt”: “Add user.email validation”, “test_command”: “pnpm vitest run —reporter=json”, “target_module”: “packages/auth/src/user.ts” }, “limits”: { “max_minutes”: 60, “max_cycles”: 10 }, “stage_detection”: { “rules”: [ { “match”: { “tool”: “Write”, “path”: ”**/*.test.ts” }, “action”: { “kind”: “transition”, “to”: “red” } }, { “match”: { “tool”: “Bash”, “command_prefix”: “pnpm vitest”, “result”: “failure” }, “action”: { “kind”: “transition”, “to”: “green” } }, { “match”: { “tool”: “Bash”, “command_prefix”: “pnpm vitest”, “result”: “success” }, “action”: { “kind”: “evaluate”, “check”: “tests_pass” } } ] }, “completion”: { “verifier”: { “type”: “tests_green_n_cycles”, “min_cycles”: 3 }, “git_push”: true, “open_pr”: true }, “user_controls”: { “interrupt”: true, “append_message”: true, “stop_loop”: true // cyclic 专有 } }

7.9 生命周期 hooks(host 调用 runner)

Hook触发runner 可做
runner.run(host, spec)容器入口调用 1 次主循环 — 调 host.ai.query、串 stage、复用 host.git
runner.onUserMessage?(msg, host)web 用户从 Relay 发消息注入到 streamInput / 改 stage / 终止 cycle
runner.onInterrupt?(host)用户主动中断 / max_minutes 到清理 → host.fail(reason)
runner.evaluateCycle?(host, cycle)cyclic 模式每轮结束'continue' | 'done' | 'fail'
runner.onCompletion?(host, result)主循环正常退出前跑 verifier / 改 PR 标题 / 写 release note

7.10 加载与发现机制

┌─────────────────────────────────────────────────────────────────────────┐ │ monorepo: packages/task-runners/ │ │ ├─ openspec/ package.json: “@douglas-agent/task-runner-openspec” │ │ ├─ spec-kit/ package.json: “@douglas-agent/task-runner-spec-kit” │ │ └─ tdd/ package.json: “@douglas-agent/task-runner-tdd” │ │ │ │ 每个 runner 包结构: │ │ ├─ src/ │ │ │ ├─ index.ts // export default factory() │ │ │ ├─ manifest.ts // RunnerManifest 静态对象 │ │ │ ├─ stages/ // 每 stage 逻辑(可选) │ │ │ └─ verifier.ts // completion verifier │ │ ├─ skills/ // 包内 SKILL.md, copied into image │ │ └─ schema/task-spec.schema.json │ └─────────────────────────────────────────────────────────────────────────┘ 主 API dispatcher 派发流程: 1. 客户端 POST /tasks/:id/dispatch body: { task_type, params } 2. 主 API 查 runner registry (Postgres 表或硬编码) SELECT image, runner_version FROM runner_registry WHERE type = $1 3. 校验 params 是否符合 manifest.param_schema 4. 生成 task-spec.json 并放进容器 env / volume 5. provider.create({ image, env: { TASK_SPEC_JSON: ’…’ }, … })

7.11 版本治理与扩展点

问题策略
task-spec.json schema v1 → v2 怎么兼容?顶部 "version": 1 字段;host 内置 v1 → v2 migrator;runner 声明支持的 schema 版本范围
runner 包升级会破坏 in-flight task 吗?task 在 sandbox 里跑,image 已经 freeze 当时的 runner_version;主 API 升级 registry 仅影响新 task
怎么新增 task type?① 写新 runner npm 包 ② 加 manifest ③ 加 SKILL.md ④ 写 integration test ⑤ 注册到 runner_registry ⑥ 更新 web UI 的 task_type 选择
能否同一 image 多 runner 共存?能。image 内 npm i 多个 runner 包,entry.js 按 spec.type 动态 require
runner 出错怎么暴露?runner 函数抛错 → host 捕获 → broadcast failed event + 退出码 1。stack trace 进 task_events.data
不同 task type 用不同 image 还是单一 image?单一 image + 多 runner 包是默认(共享 base layer 节省 pull);若某 runner 依赖巨型工具链(如 Python ML),独立 image,主 API 用 spec.type 路由

7.12 优先开发的三个 task-type(落地建议)

顺序type价值 / 验证目标
1openspec-4stage把现有 run.js 逻辑搬到 runner 包 —— 验证 HostContext 接口足够支撑现有能力,不丢功能
2spec-kit形态与 openspec 同构(DAG, 4 阶段, 写 spec / plan / tasks / implement)—— 验证”换 task type 不改协议”是否真成立
3tdd-cycle第一个 Cyclic —— 验证 cycle / evaluate / stop_loop 三个扩展点真能 work;逼出协议未考虑的边界

TDD cyclic 暴露了协议关键扩展点: state 不再是有限 DAG,需要支持”用户中途结束循环”信号。这与 G1(用户追加指令通路)天然契合 — 用户从 web 发 stop_loop 消息,runner.onUserMessage 接到后在下个 evaluateCycle 返 'done'

当下不做: bug-fix-from-issue / refactor / docs-sync / dep-upgrade。前三个 task type 跑完一个完整 release 周期、验证协议稳定后再加。

§8 三阶段迁移路径

不需要 big bang,每阶段可独立验证

P1

主 API 重启韧性(不动协议,只补 reattach)

目标:解决 G3。不依赖 Relay,不改 web,不改 sandbox。

  • Task model 加 sandboxId + sandboxProvider 字段(migration 一条)
  • dispatcher.dispatch()provider.create() 后 update task
  • ResubscribeBootstrap module(OnModuleInit,30 行):遍历 running tasks → provider.get(id).streamLogs() 重订阅
  • 测试:起 task → 重启 NestJS → 验证事件流接续

fly.io 变更:无。新依赖:无。预估:1-2 天工。

P2

引入 Relay + WebSocket 双向通道(不动 task-type)

目标:解决 G1(用户追加指令)+ G4(多 task channel 隔离自然达成)。

  • 新 fly app: deploy/relay/fly.toml 部署 @douglas-agent/sandbank-relay
  • 主 API: @WebSocketGateway('/tasks/:id/stream'),转发 web ↔ Relay
  • 主 API: 接 Relay 客户端,把 P1 的 streamLogs 订阅切到 Relay 频道订阅
  • run.js: 引入 @douglas-agent/sandbank-agent 的 connect(),session.on(‘message’) 喂 userQueue
  • run.js: query() 改 streamInput AsyncIterable
  • run.js: 事件回传从 stdout 切到 session.broadcast()(stdout 双写作 fallback)
  • web: 替换 EventSource → socket.io-client,加”追加指令”UI
  • 测试:起 task → 中途发指令 → 验证 agent 接到并响应

fly.io 变更:+1 app(Relay)。新依赖@douglas-agent/sandbank-relay + @douglas-agent/sandbank-agent + @nestjs/websockets预估:4-5 天工。

P3

退 SSE + 上线任务类型插件协议

目标:解决 G2(task type 可扩展),收尾整套架构。

  • 退役 @Sse 端点(保留兼容期 ≥ 1 release)
  • run.js: 退役 stdout JSONL 主路径(仅留 crash diagnostic)
  • spec:/task-spec.json v1 协议落地
  • 新 npm 包:@douglas-agent/task-runner-openspec(搬现有 4 阶段逻辑过去)
  • 新 npm 包:@douglas-agent/task-runner-spec-kittdd(验证插件协议通用性)
  • 主 API: POST /tasks/:id/dispatch 接受 task_type 参数
  • web: task 创建表单加 task_type 选择

fly.io 变更:无。新依赖:内部 monorepo 包。预估:5-7 天工。

每阶段都可独立上线 + 回滚。 P1 单独上线就解决了”deploy 丢任务”的真实痛点;P2 解锁用户体验;P3 解锁产品横向扩展。

§9 风险与未决问题

已知 unknown 显式列出

风险评估缓解
Relay 上游版本不稳 chekusu/sandbank 尚是 beta,上游可能 break主 API 依赖 @douglas-agent/sandbank-relay fork(不直依上游),由 sync-upstream.yml 周一 fast-forward;如有 break MUST 走 PR 人工 review。Relay 本期锁版本,升级走 follow-up change(跟 microsandbox SOP 一致)
Relay 单点故障 Relay 死 → 所有 task 事件流断P1 沉淀的 ResubscribeBootstrap 仍是 belt-and-suspenders;保留 stdout JSONL 作 audit fallback
6 backend 对 outbound WSS 的支持 microsandbox / boxlite / aio / fly / e2b / cube 各自的网络限制P2 起步前 spike 每个 backend 验证 outbound TCP 通到 Relay 域名
WebSocket on Fly proxy Fly 原生支持
多副本主 API(未来)低(当前 min=1)未来加 fly-replay sticky session header
streamInput 在 SDK 中的成熟度 2026 Q1 之前部分场景被报”silent abort”P2 起步前 spike 验证;准备 fallback 到当前 single-query 模式
Relay 重启时事件缓冲是否丢未决 Relay 单点死掉重启期间,channel 缓冲消息是否保留?需读源码P2 起步前 spike 验证。退路:sandbox 内 stdout JSONL 作 belt-and-suspenders(D2 已保留)
用户追加指令的 UX产品未决 当前 agent 正在跑工具时收到追加指令怎么 UX 化?打断 vs 排队?Claude SDK interrupt 语义内建,但 UX 决策(弹窗 / 输入框 / 队列)由产品阶段决定

§10 数据模型与持久化

现有 schema 盘点 + v2 改/新增 + ER 图 + 显式不引入的表

10.1 领域分层(4 个 bounded context)

域 1 · 身份

Identity

  • User
  • Session

谁是用户 / 谁登录了 / 谁有 GitHub 安装。v2 不动

域 2 · 资源

Resources

  • GithubInstallation
  • GithubRepo
  • GithubBranch

用户能动哪些 GitHub 资源。v2 不动

域 3 · 任务

Tasks

  • Task
  • TaskEvent

用户在 branch 上发起的 AI 工作 + 事件流。v2 加运行时绑定与保序字段。

域 4 · 运行时 NEW

Runtime

  • RunnerRegistry

每种 task_type 对应的 image / runner npm 包 / 版本。v2 新增。

10.2 现有 entity 速览(v2 影响标记)

实体角色关键字段v2
User身份平台用户github_user_id, login, email不动
Session身份会话凭据id, user_id, expires_at, revoked_at不动
GithubInstallation资源GitHub App 安装installation_id, account_login不动
GithubRepo资源已连接仓库镜像github_repo_id, owner, name, default_branch不动
GithubBranch资源分支镜像name, head_sha, last_commit_at不动
Task任务用户发起的 AI 工作branch_id, prompt, status, stage 加 sandbox 绑定 + task_type
TaskEvent任务事件流task_id, type, stage, status, data 加 sequence + 来源审计
RunnerRegistry运行时task_type 注册表type, runner_pkg, version, image, manifest新增

10.3 v2 改字段 · Task

model Task { id String @id @default(uuid()) branchId String @map(“branch_id”) creatorId String? @map(“creator_id”) title String prompt String changeId String @map(“change_id”) openspecChangePath String @map(“openspec_change_path”) status TaskStatus @default(proposing) stage TaskStage @default(explore) // enum 写死 4 值 stage String @default(“explore”) @db.VarChar(32) // 不同 task_type 有不同 stage 名,故改 varchar sandboxId String? @map(“sandbox_id”) @db.VarChar(128) // sandbank 返回的 handle id, ResubscribeBootstrap 用 sandboxProvider SandboxProvider? @map(“sandbox_provider”) // ‘microsandbox’ | ‘fly’ | ‘boxlite’ | ‘aio’ | ‘e2b’ | ‘cube’ taskType String @default(“openspec-4stage”) @map(“task_type”) @db.VarChar(64) runnerPkg String @map(“runner_pkg”) @db.VarChar(128) runnerVersion String @map(“runner_version”) @db.VarChar(32) taskSpec Json @map(“task_spec”) // 完整 /task-spec.json 副本, 主 API 重启后 // 不调容器即可知道这个 task 是哪种 type / 哪个 stage cycleCount Int @default(0) @map(“cycle_count”) // cyclic 类 task 进度持久化 (tdd-cycle 等) additions, deletions, filesChanged, prUrl, failedReason, createdAt, updatedAt, deletedAt … @@index([branchId, deletedAt]) @@index([creatorId, deletedAt]) @@index([createdAt]) @@index([status, sandboxId]) // ResubscribeBootstrap WHERE status IN (‘running’…) 用 }

字段动机一览

新字段动机所属决策
sandbox_id主 API 重启后通过 provider.get(id) 重订阅事件流D4
sandbox_provider重订阅时知道用哪个 adapter(6 个 backend 之一)D4
task_type插件协议——决定加载哪个 runner npm 包D5
runner_pkg / runner_versionimage 内 require 哪个版本的 runner 模块(in-flight task 不被新 release 破坏)D5
task_spec (jsonb)主 API 仅看 Task 表就知道 stage 集合、cycle 状态、completion 配置——不必去问 sandboxD5
cycle_countcyclic task(TDD 等)当前是第几轮,跨 Relay 重连可恢复D5
stage 改 varcharTaskStage enum 写死 explore/propose/apply/archive,spec-kit / tdd 用不上;改 varchar 容纳D5

10.4 v2 改字段 · TaskEvent

model TaskEvent { id String @id @default(uuid()) taskId String @map(“task_id”) type String // 值域扩 (见下) stage String? status String? data Json sequenceNumber BigInt @map(“sequence_number”) // per task 单调自增, 行级保序; web 用 last_seq // 做 catch-up (避免依赖 created_at 精度) ingestMethod String @default(“relay”) @map(“ingest_method”) @db.VarChar(24) // ‘relay’ | ‘stdout’ | ‘bootstrap_resubscribe’ | ‘janitor’ // 审计 + 调试用 cycleIndex Int? @map(“cycle_index”) // cyclic 任务专用,标记是第几轮的事件 createdAt DateTime @default(now()) task Task @relation(fields: [taskId], references: [id], onDelete: Cascade) @@index([taskId, createdAt]) @@unique([taskId, sequenceNumber]) // 保序 + catch-up 唯一索引 } // type 字段值域扩 现: ‘stage_change’ | ‘log’ | ‘completed’ | ‘failed’ 加: ‘user_message_injected’ | ‘token_renewed’ | ‘cycle_evaluated’ | ‘stage_evaluated’

10.5 v2 新增 · RunnerRegistry

model RunnerRegistry { id String @id @default(uuid()) @db.Uuid type String @unique @db.VarChar(64) // ‘openspec-4stage’ | ‘spec-kit’ | ‘tdd-cycle’ | … runnerPkg String @map(“runner_pkg”) @db.VarChar(128) // ‘@douglas-agent/task-runner-openspec’ runnerVersion String @map(“runner_version”) @db.VarChar(32) // semver range 或锁定版本 image String @db.VarChar(256) // ‘ghcr.io/openspec/task-runner:v2.1.0’ imageDigest String? @map(“image_digest”) @db.VarChar(80) // ‘sha256:…’ 用于 supply chain 锁定 manifest Json // RunnerManifest: required_skills, param_schema, // state_model, stages, default_limits active Boolean @default(true) // false = 软停用, 不能新派发但 in-flight 不影响 createdAt DateTime @default(now()) @map(“created_at”) @db.Timestamptz(6) updatedAt DateTime @updatedAt @map(“updated_at”) @db.Timestamptz(6) @@index([type, active]) @@map(“runner_registry”) } // dispatcher 派发路径: // 1. POST /tasks/:id/dispatch body: { task_type, params } // 2. SELECT * FROM runner_registry WHERE type = $1 AND active = true // 3. validate(params, registry.manifest.param_schema) // 4. 写 Task.{taskType, runnerPkg, runnerVersion, taskSpec} // 5. provider.create({ image: registry.image, env: { TASK_SPEC_JSON: … })}

10.6 ER 图(跨域关系)

┌─────────────────────────────────────────────────────────────────────────────┐ │ 身份域 (Identity) │ │ │ │ ┌──────────────┐ 1 ──── N ┌───────────────┐ │ │ │ User │ ──────────▶│ Session │ │ │ │ │ │ (cookie 凭据) │ │ │ └──────┬───────┘ └───────────────┘ │ │ │ 1 │ │ │ │ └──────────┼──────────────────────────────────────────────────────────────────┘ │ │ N (installedBy / creator) │ ┌──────────┴──────────────────────────────────────────────────────────────────┐ │ 资源域 (Resources) │ │ │ │ ┌──────────────────┐ 1 ── N ┌───────────────┐ 1 ── N ┌──────────────┐ │ │ │ GithubInstall │ ──────▶│ GithubRepo │ ──────▶│ GithubBranch │ │ │ │ ation │ │ (镜像) │ │ (镜像) │ │ │ └──────────────────┘ └───────────────┘ └──────┬───────┘ │ │ │ 1 │ └────────────────────────────────────────────────────────────────┼────────────┘ │ │ N │ ┌────────────────────────────────────────────────────────────────┴────────────┐ │ 任务域 (Tasks) │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Task │ 1 ── N ┌─────────────┐ │ │ │ ─ branch_id (FK 硬约束 → GithubBranch) │ ────────▶│ TaskEvent │ │ │ │ ─ creator_id (FK 软约束 → User, SetNull) │ │ ─ task_id │ │ │ │ ─ sandbox_id (外部 sandbank id,非 FK) │ │ ─ seq_no │ │ │ │ ─ sandbox_provider (enum) │ │ ─ cycle_idx│ │ │ │ ─ task_type (软引用 RunnerRegistry.type) │ │ ─ type │ │ │ │ ─ runner_pkg / runner_version │ │ ─ data │ │ │ │ ─ task_spec (jsonb) │ └─────────────┘ │ │ │ ─ cycle_count │ │ │ │ ─ status, stage, prompt, … │ │ │ └────────┬────────────────────────────────────┘ │ │ │ 软引用 (字符串 task_type, 不约束) │ └────────────┼────────────────────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ 运行时域 (Runtime) [v2 NEW] │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ RunnerRegistry │ │ │ │ ─ type (unique, 软引用目标) │ │ │ │ ─ runner_pkg / runner_version │ │ │ │ ─ image / image_digest │ │ │ │ ─ manifest (jsonb) │ │ │ │ ─ active │ │ │ └─────────────────────────────────────────┘ │ │ 无 FK 引用外部 (独立资源, 主 API 启动时 seed) │ └────────────────────────────────────────────────────────────────────────────┘ 外键约束策略: 硬约束 (CASCADE): Task.branch_id, TaskEvent.task_id 软约束 (SetNull): Task.creator_id (用户删了 task 留着) 无 FK (软引用): Task.task_type → RunnerRegistry.type Task.sandbox_id → 外部 sandbank handle

10.7 完整 Prisma diff

+ enum SandboxProvider { + microsandbox + fly + boxlite + aio + e2b + cube + @@map(“sandbox_provider”) + - enum TaskStage { // 删除: 不同 task_type 有不同 stage 名 - explore - propose - apply - archive - model Task { … (现有字段) - stage TaskStage @default(explore) + stage String @default(“explore”) @db.VarChar(32) + sandboxId String? @map(“sandbox_id”) @db.VarChar(128) + sandboxProvider SandboxProvider? @map(“sandbox_provider”) + taskType String @default(“openspec-4stage”) @map(“task_type”) + runnerPkg String @map(“runner_pkg”) @db.VarChar(128) + runnerVersion String @map(“runner_version”) @db.VarChar(32) + taskSpec Json @map(“task_spec”) + cycleCount Int @default(0) @map(“cycle_count”) + @@index([status, sandboxId]) } model TaskEvent { … (现有字段) + sequenceNumber BigInt @map(“sequence_number”) + ingestMethod String @default(“relay”) @map(“ingest_method”) @db.VarChar(24) + cycleIndex Int? @map(“cycle_index”) + @@unique([taskId, sequenceNumber]) } + model RunnerRegistry { … } // 见 10.5

10.8 显式不引入的表(决策附录)

未引入形态不引入的理由未来何时再考虑
SandboxRunTask 1:N 的 sandbox 历史,支持任务重试本期 1:1 假设(一个 task 单次 sandbox 启动)。提前引入违反 D4 快速落地原则用户真要”task 失败可重试”产品功能时 extract
CredentialLeasescoped install token 发放 / 续期审计spec.md 明确要求 token 不持久化到磁盘(“容器内 git credential 不持久化”),仅活在内存。审计走 task_events.type='token_renewed'商业化要审计追溯单一 token 谁发的时
ReconcileJob调度子域,BullMQ / Trigger.dev 风格D6 决定不上 durable execution;ResubscribeBootstrap 是 OnModuleInit 一次性扫,不是持续队列同时活跃 task > 100 或多副本主 API 时
TaskQuota用户 / repo / task_type 维度配额现有”并发上限”是实时 SELECT COUNT,无配额表商业化收费 / 限流分租户时
RelaySessionRecordRelay 频道状态镜像Relay 是无状态总线,task.sandbox_id + task.id 已足够 reattach不会引入(Relay 不应该作为 source of truth)
UserMessageInjection用户中途追加指令的独立持久化已经通过 task_events.type='user_message_injected' 走事件表,不必再独立不会引入

10.9 Migration 顺序(对齐 P1 / P2 / P3)

P1

0042_task_sandbox_binding.sql · 对应 P1 重启韧性

  • CREATE TYPE sandbox_provider AS ENUM (...)
  • ALTER TABLE tasks ADD COLUMN sandbox_id VARCHAR(128)
  • ALTER TABLE tasks ADD COLUMN sandbox_provider sandbox_provider
  • CREATE INDEX idx_tasks_active ON tasks(status, sandbox_id) WHERE status IN ('running','awaiting_review')

Breaking: 否。Backfill: 不需要(新字段 nullable)。Rollback: 删字段即可,无依赖。

P2

0043_task_events_sequence.sql · 对应 P2 Relay + WS

  • ALTER TABLE task_events ADD COLUMN sequence_number BIGINT
  • ALTER TABLE task_events ADD COLUMN ingest_method VARCHAR(24) DEFAULT 'stdout'
  • -- backfill: UPDATE task_events SET sequence_number = row_number() OVER (PARTITION BY task_id ORDER BY created_at)
  • ALTER TABLE task_events ALTER COLUMN sequence_number SET NOT NULL
  • CREATE UNIQUE INDEX uq_task_events_seq ON task_events(task_id, sequence_number)

Breaking: 否。Backfill: 历史事件 sequence_number 按 created_at 升序填。Rollback: 删字段 + 删索引。

P3

0044_task_type_pluggable.sql · 对应 P3 插件协议

  • ALTER TABLE tasks ADD COLUMN task_type VARCHAR(64) DEFAULT 'openspec-4stage'
  • ALTER TABLE tasks ADD COLUMN runner_pkg VARCHAR(128) DEFAULT '@douglas-agent/task-runner-openspec'
  • ALTER TABLE tasks ADD COLUMN runner_version VARCHAR(32) DEFAULT '^1.0.0'
  • ALTER TABLE tasks ADD COLUMN task_spec JSONB
  • ALTER TABLE tasks ADD COLUMN cycle_count INT DEFAULT 0
  • ALTER TABLE tasks ALTER COLUMN stage TYPE VARCHAR(32) USING stage::text
  • ALTER TABLE task_events ADD COLUMN cycle_index INT
  • DROP TYPE task_stage
  • CREATE TABLE runner_registry (...)
  • -- seed: INSERT INTO runner_registry VALUES ('openspec-4stage', ...)

Breaking: 是(TaskStage enum 退役)。需先升级所有读 stage 的代码到容忍 string。Backfill: 历史 task 默认 ‘openspec-4stage’。Rollback: 复杂——需要恢复 enum + 反向同步 stage 字段。建议 forward-only。

关键设计原则: v2 数据模型遵循 D4 + D6 决策——该持久化的才落表。token / Relay channel 状态 / reconcile 队列都不进数据库;只有”未来重启后还需要的”信息(sandbox_id, task_spec, sequence_number)才入表。

§11 收口建议

一句话决策 + 下一步

建议落 OpenSpec change 提案: add-task-sandbox-websocket-broker(或类似名)。范围限 P1 + P2,把 P3 的插件协议拆为独立 follow-up change。

11.1 一句话决策

方向判断
web ↔ 主 API:WebSocket 替代 SSE?✓ 是 — 用户追加指令是真实需求,SSE 单向不够
主 API ↔ sandbox:引入 sandbank Relay?✓ 是 — 自写或直连每个 backend 都更贵
agent runtime:切 streamInput?✓ 是 — Claude SDK 原生能力,门槛低
主 API 重启韧性:上 ReconcileWorker / BullMQ / Trigger.dev?✗ 否(当下)— 30 行 ResubscribeBootstrap 已经够,未来再说
领域模型重构:4 个聚合?✗ 否(当下)— Task 加 2 列即可,等任务重试 / 多 lease 真有需求再拆

11.2 下一步

  • 本文档 review → 用户决定是否进 OpenSpec propose
  • P2 起步前的两个 spike:(a) 6 backend 的 outbound WSS 联通性;(b) streamInput 在 v2 SDK 的稳定性
  • 如果通过:先做 P1(独立闭环 + 上线),再启动 P2 设计 review

引用源码

  • apps/api/src/task-runner/task-runner.controller.ts:92 — 当前 SSE 端点
  • apps/api/src/task-runner/task-runner.dispatcher.ts:169-199 — streamLogs in-memory loop
  • apps/api/src/task-runner/task-runner.janitor.ts — 60min cron 兜底
  • apps/api/prisma/schema.prisma · model Task — 缺 sandbox_id
  • apps/api/fly.toml — kill_signal=SIGINT, kill_timeout=10s
  • openspec/specs/task-runner/spec.md — POC v2 现状 spec

外部参考

历史 sandbank 探索沉淀

  • docs/spikes/s1-claude-agent-sdk-skills.md — Agent SDK + skill 加载
  • docs/spikes/s5-four-stages-pipeline.md — 四阶段单 query() 验证
  • docs/spikes/s7-microvm-long-task.md — 长任务 cache hit 实测
  • docs/spikes/s12-cube-on-wsl.md + docs/spikes/s13-microsandbox-on-wsl.md — WSL2 quirk SOP
  • docs/task-runner-poc.html — POC v1/v2 技术选型
  • docs/task-runner-sandbank-integration.md — sandbank 集成笔记