Task Sandbox v2 — 技术选型与解耦架构
双向通信 · 任务可扩展 · 长任务可靠 — 把当前单向 SSE + stdout JSONL 的事件链路,升级到 WebSocket + sandbank Relay + Claude SDK streamInput 的双向架构。一次性解锁 用户运行中追加指令 / 主 API 重启不丢任务 / 多 task type 插件化 / 多 task 并发隔离。本文不替代 POC 阶段沉淀的真知识(network quirks · 凭据 scope · WSL2 SOP)— 它在那之上重新画”如果今天从零开始,task 沙箱该长什么样”。
technical selection post-POC · 2026-05-22 draft- 作者: 探索会话沉淀
- 状态: 待 review,未进 OpenSpec change
- 前置: 2026-05-19 task-runner POC / 2026-05-22 microsandbox fallback
- 替代: 本文若通过,
specs/task-runner/spec.md多条 Requirement 会被改写
§1 背景与触发点
这份文档是哪些质问的回答?
POC v2 跑通了 6 个 sandbox backend(Fly / BoxLite / E2B / Cube / AIO / Microsandbox),主 API 通过 adapter.streamLogs() 订阅 sandbox 的 stdout JSON Lines 事件,broker 模式工作正常。但越深入”接下来要做什么”,越浮现三类需求:
任务类型可扩展
OpenSpec 4 阶段是当前唯一形态。但 spec-kit / TDD 循环 / bug fix from issue 都是合理 task type 候选,目前 run.js hardcode 不可扩。
用户运行中追加指令
30 min 任务中段,用户想干预:“这里加个测试” / “跳过这步” / “换个方向”。当前架构是单向 SSE,没有 client → agent 的通路。
主 API 重启不丢事件流
每次 deploy → in-process streamLogs loop 死 → 任务被 janitor 60min 后误判 failed。sandbox 可能早就跑完了,只是没人接事件。
本文核心论点: 把通信形态从”单向 SSE + stdout JSONL”换成”WebSocket + Relay”,三个需求会被同一套机制同时解决,并且 不需要 引入 BullMQ / Trigger.dev / 任何 durable execution 引擎。
§2 现状盘点
通信链路真相 + 责任耦合点 + 失败模式
2.1 真实通信形态(不是 WebSocket,是 SSE + ReadableStream)
┌──────────────────────────────────────────────────────────────────────────┐ │ web (browser) 主 API (NestJS, Fly) sandbox │ │ │ │ │ │ │ │ EventSource (SSE) │ │ │ │ │ ◄────── 单向 ─────────────│ │ │ │ │ │ │ │ │ │ │ adapter.streamLogs() │ │ │ │ │ ReadableStream<Uint8> │ │ │ │ │ ◄──── 单向 ───────────────│ │ │ │ │ (sandbox stdout JSONL) │ │ └──────────────────────────────────────────────────────────────────────────┘ • web→主 API: HTTP POST /tasks/:id/dispatch (一次性,非通道) • 主 API→sandbox: 仅在 dispatch 时 provider.create(env, …) 注入,之后无通路
- web ↔ 主 API
@Ssedecorator,apps/api/src/task-runner/task-runner.controller.ts:92- 主 API ↔ sandbox
handle.streamLogs(),apps/api/src/task-runner/task-runner.dispatcher.ts:169- web 自动重连
- 浏览器 EventSource 内建(3s retry + Last-Event-ID)
- 主 API ↔ sandbox 重连
- 不存在 — in-memory loop 死即断
2.2 责任耦合点
| 职责 | 当前归属 | 类型 |
|---|---|---|
| 启停 sandbox | sandbank adapter | 硬约束 |
| 网络方向(outbound-only) | broker 架构整体 | 硬约束 |
| 容器超时(双层) | run.js 自杀 + janitor cron | 硬约束 |
| 凭据 scope(最小权限) | InstallationTokenService | 硬约束 |
| 事件传输(JSONL via stdout) | run.js + adapter.streamLogs | 实现选择 |
| 事件签名(HMAC) | run.js + dispatcher | 实现选择 |
| 行级解析 + 状态机 | dispatcher in-memory loop | 实现选择 |
| Stage 推进(4 阶段) | PostToolUse hook 内 | 实现选择 |
| Agent runtime(Claude SDK + skill) | run.js | 实现选择 |
| Repo 操作(clone/push/PR) | run.js + token helper | 实现选择 |
| Token 续期 | dispatcher /internal endpoint | 实现选择 |
结论:硬约束 vs 实现选择 ≈ 3 : 7。后者全部是 v2 可以重画的对象。
2.3 失败模式
| 场景 | 当前行为 | 影响 |
|---|---|---|
主 API deploy(kill_timeout=10s) | streamLogs loop 断 → 重启后不知 sandbox 归属 | 每次 deploy 期间 running 的 task 失去事件流,janitor 60min 后误判 failed |
| 主 API 进程 crash | 同上 | 同上(生产 crash 罕见,但发生即批量丢) |
| web 标签页切走再回 | SSE 自动重连 | ✓ 不丢(但缺 Last-Event-ID 实现,重连期间 server 发的事件丢失) |
| 用户想中途追加指令 | 无通路 | 只能 destroy 整个 sandbox 重新派发 |
| 用户想接受 / 拒绝 permission | 容器内 bypassPermissions | N/A(设计上跳过了这个能力) |
§3 解耦目标
v2 想动什么,不想动什么
3.1 想解耦的(按产品价值排序)
双向通道:用户能在任务运行中追加指令
“这里加测试” / “换个方向” / “取消” / “回答 permission prompt” 都要能从 web 实时发到 agent,agent 实时收到并影响后续行为。
任务类型可扩展(不止 OpenSpec)
spec-kit、TDD red-green-refactor 循环、bug fix from issue —— 不能 hardcode 在 run.js 里。换 task type 不用改 image,也不用改 sandbox adapter。
主 API 重启不丢正在跑的任务
deploy 期间,sandbox 内 agent 不知道、不关心 host 死了;事件继续累积;主 API 重启后接续订阅,前端无感。
多 task 并发隔离(不是多 agent)
能并发的是”不同 task”,不是单 task 内的多个 agent。单 task 内 agent 仍是序列的(一次只在 explore/propose/apply/archive 其一)。Relay 的 channel = task_id 让多 task 事件流天然隔离、可独立 catch-up。
3.2 不动的(POC 真知识)
| 沉淀 | 保留理由 |
|---|---|
| sandbank fork(6 backend) | 给用户的”产品选择”,不是抽象冗余 |
| 主 API outbound-only 网络约束 | Fly / TCC mac / VPN 部署的网络真相 |
| install token 最小权限 + 不持久化 | 已 hardened,无理由动 |
| 容器内三轨鉴权(subapi / 直连 / OAuth 禁用) | S7 实测验证 |
| 双层 timeout(容器自杀 + 主 API 兜底) | 主 API 兜底由 janitor 改 ResubscribeBootstrap,但仍是双层 |
| WSL2 quirk SOP(s13) | microsandbox shim 部署沉淀 |
| scoped install token 续期 | 但续期”通道”会变(容器拉 → 主 API 推) |
§4 关键决策 D1–D6
每条对应 §3 的一个目标
决定
web ↔ 主 API ↔ sandbox-agent 三段全部走 WebSocket,由中间的 sandbank Relay 桥接。退役 SSE / stdout JSONL 作为主路径(保留 stdout 作 crash diagnostic)。
理由
- G1(用户追加指令)需要 client → server 双向,SSE 做不到
- Claude Agent SDK 原生支持
query({ prompt: AsyncIterable<SDKUserMessage> })和Query.streamInput()—— 边跑边接收用户消息是它内建能力 - Fly.io 官方:“WebSocket implementation is very straightforward; no third-party tools necessary”
- NestJS 有原生
@WebSocketGateway,跟现有 module / guard / DI 无缝
代价
- web 端 EventSource 改 socket.io-client(一次性迁移)
- run.js 改为 streamInput + AsyncIterable 模式
- 退 SSE 期间会有 dual-protocol 阶段(迁移期 1–2 周)
决定
引入 @douglas-agent/sandbank-relay(WebSocket + JSON-RPC 2.0),作为独立 fly app 部署。主 API 和 sandbox-agent 都 outbound 连 Relay,channel key = task_id。
理由
- 主 API outbound-only 网络约束完美保留:主 API 不再暴露
/ingest这种 inbound - Relay 是上游 chekusu/sandbank 一等公民组件,我们的
@douglas-agent/sandbank-*fork 跟随上游;协议、缓冲、重连机制不需要我们造 - channel = task_id 让 G4(多 task 隔离)天然成立,无需自写分片
- 每个 backend 都要”消息注入通路”的方式 B 工作量 ≈ ×6(每 backend 写一次),不可取
代价
- 新增 1 个 fly app(
shared-cpu-1x, 512MB ≈ $5-10/mo) - 跟着 sandbank 上游版本(已经在跟,本期复用)
- Relay 单点故障域:Relay 死 → 所有 task 失去事件流(但仍可 belt-and-suspenders 走 stdout JSONL fallback)
决定
容器入口 run.js 把当前的单次 query("/opsx:propose ...") 改为:
const session = await connect() // @douglas-agent/sandbank-agent const userQueue = new AsyncQueue() session.on(‘message’, m => userQueue.push(m)) async function* input() { yield { role: ‘user’, content: PROMPT } // 初始 prompt while (running) { yield await userQueue.shift() // 用户追加 } } for await (msg of query({ prompt: input(), … })) { session.broadcast(‘task-event’, msg) // 替代 stdout JSONL }
理由
- Claude Agent SDK 文档明确:“streaming input mode is the preferred way”
- permission prompt / interruption 都是 SDK 内建语义,可以转发给用户
- 事件流回报从 stdout 切到
session.broadcast()—— 不再依赖 adapter.streamLogs
决定
Prisma Task model 加两列:sandboxId + sandboxProvider。主 API OnModuleInit 阶段跑一个轻量重订阅例程。
// apps/api/src/task-runner/task-runner.bootstrap.ts
async onModuleInit() {
const active = await prisma.task.findMany({
where: { status: { in: [‘running’, ‘awaiting_review’] }, sandboxId: { not: null } }
})
for (const t of active) {
const relayChannel = `task/${t.id}`
relayClient.subscribe(relayChannel, msg => this.routeEvent(t.id, msg))
}
}理由
- 这就是 G3(主 API 重启韧性)的最小可行方案
- 不需要 ReconcileWorker / BullMQ / Trigger.dev / durable execution(前面探讨过的方向是过度工程)
- 30 行代码 + 一条 migration 解决 90%+ 的失败模式
决定
容器启动时挂载一个 /task-spec.json 描述跑什么模块;run.js 不再 hardcode OpenSpec 4 阶段,改为 dispatch 到对应 runner 模块。
{
“version”: 1,
“type”: “openspec-4stage” | “spec-kit” | “tdd-cycle” | “bug-fix-from-issue”,
“params”: {
“prompt”: ”…”,
“change_id”: ”…”,
// task-type 特定参数
},
“limits”: { “max_turns”: 100, “max_minutes”: 60 },
“skills”: [“.claude/skills/openspec-explore”, “.claude/skills/openspec-propose”, …]
}每个 task-type 对应一个 npm 子包
@douglas-agent/task-runner-openspec· 现有 4 阶段@douglas-agent/task-runner-spec-kit· GitHub spec-kit phase gates@douglas-agent/task-runner-tdd· red-green-refactor 循环(cyclic state)@douglas-agent/task-runner-bugfix· reproduce → localize → patch → verify
建议先做的三个 task-type
spec-kit(验证抽象通用性)+ TDD(验证 cyclic state)+ OpenSpec explore-only(验证子集形态)。其它后续。
决定
本期不引入任何上述工具。任务状态机仍住在 Postgres + NestJS 内,重启韧性由 D4 解决。
理由
- 当前任务并发个位数 ~ 十位数,远未到工具门槛
- BullMQ 解决的是”分发短任务”,错配于”长任务可恢复”
- Trigger.dev / Inngest 是真正契合的,但要付 1 个新 fly app + Redis ≈ $20-30/mo + 学一套 DSL
- D4 的 30 行 bootstrap 已经覆盖了 90% 的失败模式
- 未来真到瓶颈,reconcile 函数搬到 Trigger.dev 是平移式迁移,不破坏现有代码
触发重新评估的条件
- 同时活跃 task 经常
>100 - 主 API 需要多副本横向扩
- 商业化 SLA 承诺需要”任意 step 幂等可重试”
§5 目标架构
四层组件 + 数据流
┌────────────────────────────────────────────────────────────────────────────┐ │ web (Next.js · browser) │ │ socket.io-client / native WS │ │ ─ send: 用户追加指令 / 暂停 / 取消 / permission 回应 │ │ ─ recv: stage_change / log / completed / failed / agent_question │ │ ─ 浏览器自动重连(WS Last-Event-ID 模拟) │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ WSS (持久连接) ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ 主 API (NestJS · Fly · min=1) │ │ │ │ @WebSocketGateway(‘/tasks/:id/stream’) [NEW] │ │ ─ on(‘message’) from web → forward 到 Relay ‘task/:id’ channel │ │ ─ subscribe Relay ‘task/:id’ → forward 到 web client │ │ │ │ TaskRunnerDispatcher [修订] │ │ ─ provider.create() 启动 sandbox │ │ ─ store sandbox_id 进 Task 表 │ │ ─ subscribe Relay ‘task/:id’ │ │ ─ 不再 维护 in-process streamLogs loop │ │ │ │ ResubscribeBootstrap [NEW · 30 lines] │ │ ─ OnModuleInit: SELECT running tasks → 对每个 subscribe Relay channel │ │ │ │ TaskRunnerJanitor [保留 · 兜底] │ │ ─ 仅当 ResubscribeBootstrap 发现 sandbox 已死时清理 │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ WSS outbound (主 API → Relay) ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ sandbank Relay (Fly · 独立 app · ~$5-10/mo) [NEW] │ │ @douglas-agent/sandbank-relay (WebSocket + JSON-RPC 2.0) │ │ ─ channel = task_id (多 task 天然隔离) │ │ ─ 物理上接受 inbound 但主 API 和 sandbox 都是 outbound 进 │ │ ─ 可选缓冲最近 N 条消息(供主 API 重连 catch-up) │ │ ─ session.context 能力不本期使用(单 task 内 agent 序列, 不需共享 context) │ └──────────────────────────────────┬─────────────────────────────────────────┘ │ WSS outbound (sandbox → Relay) ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ sandbox: task-runner image · run.js [修订] │ │ │ │ 1. 读 /task-spec.json (替代 hardcoded 4 阶段) │ │ 2. connect() to Relay via @douglas-agent/sandbank-agent │ │ 3. session.on(‘message’) push to userQueue │ │ 4. query({ prompt: AsyncIterable(initial + userQueue) }) [streamInput] │ │ 5. for await event of query() → session.broadcast(event) │ │ 6. 不再写 stdout JSONL (除非 dual-protocol 迁移期) │ └────────────────────────────────────────────────────────────────────────────┘
5.1 主 API 在新架构里只剩”协议翻译 + 状态机持久化”
| 组件 | 修订前 | 修订后 |
|---|---|---|
TaskRunnerDispatcher | 启 sandbox + 维护 streamLogs in-memory loop(30min 长寿) | 启 sandbox + 存 sandbox_id + 订阅 Relay(瞬时操作) |
TaskRunnerEventBroker | 行级解析 + HMAC + 写 task_events + SSE 广播 | 退役(验签搬到 Relay 层;分发由 WebSocketGateway 直接做) |
TaskRunnerJanitor | 主路径:60min stale → 标 failed | 兜底路径:ResubscribeBootstrap 发现 sandbox 死了才标 failed |
TaskRunnerController | @Sse + POST /dispatch | @WebSocketGateway + POST /dispatch(dispatch 保留为短 HTTP) |
/internal/tasks/:id/token | 容器 inbound 拉续期 token | 退役 — 主 API 主动通过 Relay ‘task/:id’ 推新 token |
§6 与现有架构的差异
保留 / 修改 / 删除 / 新增
- sandbank fork + 6 backend(产品选择)
- install token 最小权限 + 不持久化
- 容器内三轨鉴权
- 双层 timeout 思想(实现细节变)
- WSL2 quirk SOP(microsandbox shim 部署)
- 主 API outbound-only 网络约束
- scoped install token 续期(但通道变)
- 事件回传协议:stdout JSONL → Relay broadcast
- 用户面通道:SSE → WebSocket
- 容器入口:单次 query(string) → streamInput AsyncIterable
- task type:hardcoded 4 阶段 → /task-spec.json plugin
- 主 API 重启:丢任务 → 自动接续
- Token 续期:容器拉 → 主 API 推
- 双层 timeout 主路径:cron → bootstrap reconcile
adapter.streamLogs强制约束(变 optional)- dispatcher in-memory streamLogs loop
HMAC inline 签名(连接层 WSS / mTLS 取代)POST /internal/tasks/:id/token路由- cron 60min 巡检主路径
- Relay 作为独立 fly app 部署
Task.sandboxId / sandboxProvider两列 + migrationResubscribeBootstrapmodule@WebSocketGateway('/tasks/:id/stream')- run.js streamInput +
@douglas-agent/sandbank-agent集成 /task-spec.json协议 + 至少 1 个 plugin runner(OpenSpec)
6.1 现有 spec.md Requirement 影响
| Requirement(当前 spec) | v2 状态 |
|---|---|
| Sandbank 沙箱中间层抽象 | 保留(adapter 接口可瘦:streamLogs 变 optional) |
| Sandbank 包来源切换到自有 fork | 保留 |
| 新增 adapter 的 capability 集 | 保留 |
| Microsandbox shim 服务契约 | 保留 |
| WSL2 quirk 合规约束 | 保留 |
| Event broker · 主 API outbound-only 订阅 adapter 日志 | 改写 — broker 由 Relay 替代 |
| task-runner stdout JSON Lines 事件协议 | 改写 — 主路径 Relay broadcast,stdout 留 fallback |
| task-runner 容器入口契约 | 改写 — 增加 task-spec.json + streamInput 章节 |
| HMAC callback 协议 | 改写 — 主路径退役,连接层 WSS 取代 |
| installation token 续期 | 改写 — 通道改为主动推 |
| 双层 timeout 防护 | 改写 — 巡检主路径换 ResubscribeBootstrap |
| 容器内鉴权策略(三轨) | 保留 |
| 主 API 并发上限 | 保留 |
| 容器 install token 最小权限 | 保留 |
| 容器内 git credential 不持久化 | 保留 |
| 容器 outbound 网络层 partial 防护 | 保留 |
§7 任务类型插件协议
支撑 G2(task type 可扩展)· runner = npm 包,host 提供能力
7.1 设计原则
runner 不感知基础设施
runner 模块不 import sandbank / sandbox-agent / Anthropic SDK / git。所有这些由 host context 注入。换 sandbox backend、换 LLM provider 都不改 runner。
单 task 内 agent 仍是序列
runner 是 stage 调度器,不是并行编排器。单 task 内同一时刻只有一个 query() 在跑(4 阶段是串行 stage 切换,cyclic 是同一 query() 多轮循环)。
声明式优先于代码
能用 /task-spec.json 声明的(stage 检测 rule、skill 列表、limit)就别写 TS。runner 代码只承载真业务逻辑(stage 间过渡、cycle 终止条件、completion verifier)。
host 是稳定 ABI
runner 依赖 HostContext 接口而非具体实现。host 升级不破坏 runner,runner 升级不破坏 host。
7.2 容器启动加载流程
┌─────────────────────────────────────────────────────────────────────────┐
│ task-runner image 入口 (/app/entry.js) │
│ │
│ 1. parse /task-spec.json │
│ 2. resolve runner module: require(‘@douglas-agent/task-runner-’ + │
│ spec.type) │
│ 3. const runner = module.create() // implements TaskRunner │
│ 4. const host = await initHostContext(spec) │
│ │ │
│ ├─ Claude SDK client (subapi / direct base url 已配) │
│ ├─ Sandbox session via @douglas-agent/sandbank-agent.connect() │
│ ├─ Git workspace (clone + credential helper) │
│ ├─ Token store (auto-refresh via Relay 主推) │
│ └─ Stage state writer │
│ 5. await runner.run(host, spec) │
│ 6. host.complete(result) 或 host.fail(reason) │
└─────────────────────────────────────────────────────────────────────────┘7.3 host 给 runner 的能力(HostContext 接口)
// 由 task-runner image 提供, runner 模块通过依赖注入接收 interface HostContext { // Claude Agent SDK 已就绪的 client (env、auth 已注入) ai: { query(opts: QueryOptions): AsyncIterable<SDKMessage> streamInput(messages: AsyncIterable<SDKUserMessage>): void interrupt(): Promise<void> } // 双向通道 (D3: 替代 stdout JSONL) session: { broadcast(event: TaskEvent): void // agent → web on(type: ‘user_message’, handler: (msg) => void): Unsubscribe on(type: ‘interrupt’, handler: () => void): Unsubscribe on(type: ‘token_renewed’, handler: (token) => void): Unsubscribe } // Git 操作 (token 由 host 透明管理) git: { clone(repo: string, branch: string): Promise<Workspace> commit(ws: Workspace, message: string): Promise<void> push(ws: Workspace, branch: string): Promise<void> openPr(opts: PrOptions): Promise<{ url: string }> } // stage 状态 (持久化到 task 行) stage: { transition(to: string, status?: ‘running’ | ‘awaiting_review’): void increment_cycle(): void // cyclic 用 current(): string } // 日志 (会被 broadcast 出去, 不要 console.log) log(level: ‘info’ | ‘warn’ | ‘error’, msg: string, data?: object): void // 退出 complete(result: { prUrl?: string; metrics?: object }): never fail(reason: string, stage?: string): never }
7.4 Runner 模块接口契约
// 每个 task type = 一个 npm 包, 默认 export 一个 factory export interface TaskRunner { readonly type: string // ‘openspec-4stage’ readonly stateModel: ‘dag’ | ‘cyclic’ describe(): RunnerManifest // 给主 API 探测用 run(host: HostContext, spec: TaskSpec): Promise<void> } interface RunnerManifest { type: string version: string // semver, 跟 npm 包版对齐 required_skills: string[] // 默认 skill 列表 required_capabilities: string[] // 如 ‘git’, ‘pr’ state_model: ‘dag’ | ‘cyclic’ stages: string[] // DAG: 全部 stage; Cyclic: 单轮内的 stage param_schema: JsonSchema // spec.params 校验 default_limits: TaskLimits }
7.5 task-spec.json schema (v1, 完整)
{
“version”: 1, // schema 版本, 不是 task spec 版本
“type”: “openspec-4stage”, // runner module type
“runner_pkg”: “@douglas-agent/task-runner-openspec”,
“runner_version”: “^1.0.0”, // semver range
“params”: { … }, // runner 特定参数, 由 param_schema 校验
“limits”: {
“max_turns”: 100, // 单 query() 最大 turn
“max_minutes”: 60, // 总时长, 触发 hard timeout
“max_input_tokens”: 200000,
“max_cycles”: 10 // 仅 cyclic
},
“skills”: […], // 覆盖 manifest.required_skills
“stage_detection”: { // 声明式 stage 切换
“rules”: [ Rule, Rule, … ]
},
“completion”: {
“git_push”: true,
“open_pr”: true,
“verifier”: { “type”: “typecheck” } // 可选, completion 前跑
},
“user_controls”: { // 允许用户从 web 发的指令
“interrupt”: true,
“append_message”: true,
“stop_loop”: false // 仅 cyclic 有意义
}
}7.6 State Model: DAG vs Cyclic
explore ─▶ propose ─▶ apply ─▶ archive ─▶ done ↘ failed
- 有限 stage, 每个 stage 进入恰好 1 次(或失败)
- stage_detection rule 决定 “什么时候进入下一 stage”
- 用户追加指令影响当前 stage 内 agent 行为, 不改变 DAG
┌─▶ red ─▶ green ─▶ refactor ─▶ evaluate ─┐ │ │ └──────── repeat (cycle++) ◄───────────────┤ │ done ◄───┤ (用户 stop / max_cycles failed │ / verifier 通过) ▼
- 单轮内是 DAG, 多轮循环
- 每轮结束跑 evaluate predicate 决定 done / repeat / fail
- 用户的
stop_loop消息触发本轮完成后退出
7.7 stage_detection rule DSL
// Rule 由 PostToolUse hook 在 sandbox 内本地评估 interface Rule { match: ToolPattern // 匹配条件 action: Action // 触发动作 priority?: number // 默认 0, 高优先级先匹配 } interface ToolPattern { tool: ‘Write’ | ‘Bash’ | ‘Read’ | ‘Edit’ | ’*’ path?: string // glob: ‘src/**/*.ts’ command_prefix?: string // Bash: ‘npm test’ 等 result?: ‘success’ | ‘failure’ // Bash 退出码 } type Action = | { kind: ‘transition’; to: string } // DAG: 切到新 stage | { kind: ‘increment_cycle’ } // Cyclic: cycle++ | { kind: ‘evaluate’; check: ‘tests_pass’ | ‘verifier’ } | { kind: ‘complete’; data?: object } | { kind: ‘fail’; reason: string }
规则评估:每次 Claude tool 调用后,runner 按 priority 顺序匹配 rules,第一个命中的 action 触发。未命中保持当前 stage。
7.8 三个 task-type 完整设计
7.8.1 openspec-4stage (DAG, 现有 task)
{
“version”: 1,
“type”: “openspec-4stage”,
“runner_pkg”: “@douglas-agent/task-runner-openspec”,
“runner_version”: “^1.0.0”,
“params”: {
“prompt”: “Add dark mode toggle”,
“change_id”: “add-dark-mode”,
“openspec_change_path”: “openspec/changes/add-dark-mode”
},
“limits”: { “max_turns”: 100, “max_minutes”: 60 },
“skills”: [
“.claude/skills/openspec-explore/SKILL.md”,
“.claude/skills/openspec-propose/SKILL.md”,
“.claude/skills/openspec-apply/SKILL.md”,
“.claude/skills/openspec-archive/SKILL.md”
],
“stage_detection”: {
“rules”: [
{ “match”: { “tool”: “Write”, “path”: “openspec/changes/*/proposal.md” },
“action”: { “kind”: “transition”, “to”: “propose” } },
{ “match”: { “tool”: “Write”, “path”: “src/**/*” },
“action”: { “kind”: “transition”, “to”: “apply” } },
{ “match”: { “tool”: “Bash”, “command_prefix”: “openspec archive” },
“action”: { “kind”: “transition”, “to”: “archive” } }
]
},
“completion”: {
“git_push”: true, “open_pr”: true,
“verifier”: { “type”: “typecheck” }
},
“user_controls”: { “interrupt”: true, “append_message”: true }
}7.8.2 spec-kit (DAG, 更重 phase gate)
{
“version”: 1,
“type”: “spec-kit”,
“runner_pkg”: “@douglas-agent/task-runner-spec-kit”,
“params”: {
“prompt”: ”…”,
“constitution_path”: “.github/spec-kit/constitution.md”
},
“stage_detection”: {
“rules”: [
{ “match”: { “tool”: “Write”, “path”: “specs/*/spec.md” },
“action”: { “kind”: “transition”, “to”: “spec” } },
{ “match”: { “tool”: “Write”, “path”: “plans/*/plan.md” },
“action”: { “kind”: “transition”, “to”: “plan” } },
{ “match”: { “tool”: “Write”, “path”: “plans/*/tasks.md” },
“action”: { “kind”: “transition”, “to”: “tasks” } },
{ “match”: { “tool”: “Bash”, “command_prefix”: “git checkout -b implement/” },
“action”: { “kind”: “transition”, “to”: “implement” } }
]
},
“completion”: {
“verifier”: { “type”: “checklist”, “path”: “plans/*/tasks.md” },
“git_push”: true, “open_pr”: true
}
}7.8.3 tdd-cycle (Cyclic, 暴露循环协议)
{
“version”: 1,
“type”: “tdd-cycle”,
“runner_pkg”: “@douglas-agent/task-runner-tdd”,
“params”: {
“feature_prompt”: “Add user.email validation”,
“test_command”: “pnpm vitest run —reporter=json”,
“target_module”: “packages/auth/src/user.ts”
},
“limits”: { “max_minutes”: 60, “max_cycles”: 10 },
“stage_detection”: {
“rules”: [
{ “match”: { “tool”: “Write”, “path”: ”**/*.test.ts” },
“action”: { “kind”: “transition”, “to”: “red” } },
{ “match”: { “tool”: “Bash”, “command_prefix”: “pnpm vitest”, “result”: “failure” },
“action”: { “kind”: “transition”, “to”: “green” } },
{ “match”: { “tool”: “Bash”, “command_prefix”: “pnpm vitest”, “result”: “success” },
“action”: { “kind”: “evaluate”, “check”: “tests_pass” } }
]
},
“completion”: {
“verifier”: { “type”: “tests_green_n_cycles”, “min_cycles”: 3 },
“git_push”: true, “open_pr”: true
},
“user_controls”: {
“interrupt”: true,
“append_message”: true,
“stop_loop”: true // cyclic 专有
}
}7.9 生命周期 hooks(host 调用 runner)
| Hook | 触发 | runner 可做 |
|---|---|---|
runner.run(host, spec) | 容器入口调用 1 次 | 主循环 — 调 host.ai.query、串 stage、复用 host.git |
runner.onUserMessage?(msg, host) | web 用户从 Relay 发消息 | 注入到 streamInput / 改 stage / 终止 cycle |
runner.onInterrupt?(host) | 用户主动中断 / max_minutes 到 | 清理 → host.fail(reason) |
runner.evaluateCycle?(host, cycle) | cyclic 模式每轮结束 | 返 'continue' | 'done' | 'fail' |
runner.onCompletion?(host, result) | 主循环正常退出前 | 跑 verifier / 改 PR 标题 / 写 release note |
7.10 加载与发现机制
┌─────────────────────────────────────────────────────────────────────────┐ │ monorepo: packages/task-runners/ │ │ ├─ openspec/ package.json: “@douglas-agent/task-runner-openspec” │ │ ├─ spec-kit/ package.json: “@douglas-agent/task-runner-spec-kit” │ │ └─ tdd/ package.json: “@douglas-agent/task-runner-tdd” │ │ │ │ 每个 runner 包结构: │ │ ├─ src/ │ │ │ ├─ index.ts // export default factory() │ │ │ ├─ manifest.ts // RunnerManifest 静态对象 │ │ │ ├─ stages/ // 每 stage 逻辑(可选) │ │ │ └─ verifier.ts // completion verifier │ │ ├─ skills/ // 包内 SKILL.md, copied into image │ │ └─ schema/task-spec.schema.json │ └─────────────────────────────────────────────────────────────────────────┘ 主 API dispatcher 派发流程: 1. 客户端 POST /tasks/:id/dispatch body: { task_type, params } 2. 主 API 查 runner registry (Postgres 表或硬编码) SELECT image, runner_version FROM runner_registry WHERE type = $1 3. 校验 params 是否符合 manifest.param_schema 4. 生成 task-spec.json 并放进容器 env / volume 5. provider.create({ image, env: { TASK_SPEC_JSON: ’…’ }, … })
7.11 版本治理与扩展点
| 问题 | 策略 |
|---|---|
| task-spec.json schema v1 → v2 怎么兼容? | 顶部 "version": 1 字段;host 内置 v1 → v2 migrator;runner 声明支持的 schema 版本范围 |
| runner 包升级会破坏 in-flight task 吗? | task 在 sandbox 里跑,image 已经 freeze 当时的 runner_version;主 API 升级 registry 仅影响新 task |
| 怎么新增 task type? | ① 写新 runner npm 包 ② 加 manifest ③ 加 SKILL.md ④ 写 integration test ⑤ 注册到 runner_registry ⑥ 更新 web UI 的 task_type 选择 |
| 能否同一 image 多 runner 共存? | 能。image 内 npm i 多个 runner 包,entry.js 按 spec.type 动态 require |
| runner 出错怎么暴露? | runner 函数抛错 → host 捕获 → broadcast failed event + 退出码 1。stack trace 进 task_events.data |
| 不同 task type 用不同 image 还是单一 image? | 单一 image + 多 runner 包是默认(共享 base layer 节省 pull);若某 runner 依赖巨型工具链(如 Python ML),独立 image,主 API 用 spec.type 路由 |
7.12 优先开发的三个 task-type(落地建议)
| 顺序 | type | 价值 / 验证目标 |
|---|---|---|
| 1 | openspec-4stage | 把现有 run.js 逻辑搬到 runner 包 —— 验证 HostContext 接口足够支撑现有能力,不丢功能 |
| 2 | spec-kit | 形态与 openspec 同构(DAG, 4 阶段, 写 spec / plan / tasks / implement)—— 验证”换 task type 不改协议”是否真成立 |
| 3 | tdd-cycle | 第一个 Cyclic —— 验证 cycle / evaluate / stop_loop 三个扩展点真能 work;逼出协议未考虑的边界 |
TDD cyclic 暴露了协议关键扩展点: state 不再是有限 DAG,需要支持”用户中途结束循环”信号。这与 G1(用户追加指令通路)天然契合 — 用户从 web 发 stop_loop 消息,runner.onUserMessage 接到后在下个 evaluateCycle 返 'done'。
当下不做: bug-fix-from-issue / refactor / docs-sync / dep-upgrade。前三个 task type 跑完一个完整 release 周期、验证协议稳定后再加。
§8 三阶段迁移路径
不需要 big bang,每阶段可独立验证
主 API 重启韧性(不动协议,只补 reattach)
目标:解决 G3。不依赖 Relay,不改 web,不改 sandbox。
Taskmodel 加sandboxId+sandboxProvider字段(migration 一条)dispatcher.dispatch()在provider.create()后 update task- 新
ResubscribeBootstrapmodule(OnModuleInit,30 行):遍历 running tasks →provider.get(id).streamLogs()重订阅 - 测试:起 task → 重启 NestJS → 验证事件流接续
fly.io 变更:无。新依赖:无。预估:1-2 天工。
引入 Relay + WebSocket 双向通道(不动 task-type)
目标:解决 G1(用户追加指令)+ G4(多 task channel 隔离自然达成)。
- 新 fly app:
deploy/relay/fly.toml部署@douglas-agent/sandbank-relay - 主 API:
@WebSocketGateway('/tasks/:id/stream'),转发 web ↔ Relay - 主 API: 接 Relay 客户端,把 P1 的 streamLogs 订阅切到 Relay 频道订阅
- run.js: 引入
@douglas-agent/sandbank-agent的 connect(),session.on(‘message’) 喂 userQueue - run.js: query() 改 streamInput AsyncIterable
- run.js: 事件回传从 stdout 切到
session.broadcast()(stdout 双写作 fallback) - web: 替换 EventSource → socket.io-client,加”追加指令”UI
- 测试:起 task → 中途发指令 → 验证 agent 接到并响应
fly.io 变更:+1 app(Relay)。新依赖:@douglas-agent/sandbank-relay + @douglas-agent/sandbank-agent + @nestjs/websockets。预估:4-5 天工。
退 SSE + 上线任务类型插件协议
目标:解决 G2(task type 可扩展),收尾整套架构。
- 退役
@Sse端点(保留兼容期 ≥ 1 release) - run.js: 退役 stdout JSONL 主路径(仅留 crash diagnostic)
- spec:
/task-spec.jsonv1 协议落地 - 新 npm 包:
@douglas-agent/task-runner-openspec(搬现有 4 阶段逻辑过去) - 新 npm 包:
@douglas-agent/task-runner-spec-kit或tdd(验证插件协议通用性) - 主 API:
POST /tasks/:id/dispatch接受task_type参数 - web: task 创建表单加 task_type 选择
fly.io 变更:无。新依赖:内部 monorepo 包。预估:5-7 天工。
每阶段都可独立上线 + 回滚。 P1 单独上线就解决了”deploy 丢任务”的真实痛点;P2 解锁用户体验;P3 解锁产品横向扩展。
§9 风险与未决问题
已知 unknown 显式列出
| 风险 | 评估 | 缓解 |
|---|---|---|
| Relay 上游版本不稳 | 中 chekusu/sandbank 尚是 beta,上游可能 break | 主 API 依赖 @douglas-agent/sandbank-relay fork(不直依上游),由 sync-upstream.yml 周一 fast-forward;如有 break MUST 走 PR 人工 review。Relay 本期锁版本,升级走 follow-up change(跟 microsandbox SOP 一致) |
| Relay 单点故障 | 中 Relay 死 → 所有 task 事件流断 | P1 沉淀的 ResubscribeBootstrap 仍是 belt-and-suspenders;保留 stdout JSONL 作 audit fallback |
| 6 backend 对 outbound WSS 的支持 | 中 microsandbox / boxlite / aio / fly / e2b / cube 各自的网络限制 | P2 起步前 spike 每个 backend 验证 outbound TCP 通到 Relay 域名 |
| WebSocket on Fly proxy | 低 Fly 原生支持 | — |
| 多副本主 API(未来) | 低(当前 min=1) | 未来加 fly-replay sticky session header |
| streamInput 在 SDK 中的成熟度 | 中 2026 Q1 之前部分场景被报”silent abort” | P2 起步前 spike 验证;准备 fallback 到当前 single-query 模式 |
| Relay 重启时事件缓冲是否丢 | 未决 Relay 单点死掉重启期间,channel 缓冲消息是否保留?需读源码 | P2 起步前 spike 验证。退路:sandbox 内 stdout JSONL 作 belt-and-suspenders(D2 已保留) |
| 用户追加指令的 UX | 产品未决 当前 agent 正在跑工具时收到追加指令怎么 UX 化?打断 vs 排队? | Claude SDK interrupt 语义内建,但 UX 决策(弹窗 / 输入框 / 队列)由产品阶段决定 |
§10 数据模型与持久化
现有 schema 盘点 + v2 改/新增 + ER 图 + 显式不引入的表
10.1 领域分层(4 个 bounded context)
Identity
UserSession
谁是用户 / 谁登录了 / 谁有 GitHub 安装。v2 不动。
Resources
GithubInstallationGithubRepoGithubBranch
用户能动哪些 GitHub 资源。v2 不动。
Tasks
Task改TaskEvent改
用户在 branch 上发起的 AI 工作 + 事件流。v2 加运行时绑定与保序字段。
Runtime
RunnerRegistry新
每种 task_type 对应的 image / runner npm 包 / 版本。v2 新增。
10.2 现有 entity 速览(v2 影响标记)
| 实体 | 域 | 角色 | 关键字段 | v2 |
|---|---|---|---|---|
User | 身份 | 平台用户 | github_user_id, login, email | 不动 |
Session | 身份 | 会话凭据 | id, user_id, expires_at, revoked_at | 不动 |
GithubInstallation | 资源 | GitHub App 安装 | installation_id, account_login | 不动 |
GithubRepo | 资源 | 已连接仓库镜像 | github_repo_id, owner, name, default_branch | 不动 |
GithubBranch | 资源 | 分支镜像 | name, head_sha, last_commit_at | 不动 |
Task | 任务 | 用户发起的 AI 工作 | branch_id, prompt, status, stage | 改 加 sandbox 绑定 + task_type |
TaskEvent | 任务 | 事件流 | task_id, type, stage, status, data | 改 加 sequence + 来源审计 |
RunnerRegistry | 运行时 | task_type 注册表 | type, runner_pkg, version, image, manifest | 新增 |
10.3 v2 改字段 · Task 表
model Task {
id String @id @default(uuid())
branchId String @map(“branch_id”)
creatorId String? @map(“creator_id”)
title String
prompt String
changeId String @map(“change_id”)
openspecChangePath String @map(“openspec_change_path”)
status TaskStatus @default(proposing)
stage TaskStage @default(explore) // enum 写死 4 值
stage String @default(“explore”) @db.VarChar(32)
// 不同 task_type 有不同 stage 名,故改 varchar
sandboxId String? @map(“sandbox_id”) @db.VarChar(128)
// sandbank 返回的 handle id, ResubscribeBootstrap 用
sandboxProvider SandboxProvider? @map(“sandbox_provider”)
// ‘microsandbox’ | ‘fly’ | ‘boxlite’ | ‘aio’ | ‘e2b’ | ‘cube’
taskType String @default(“openspec-4stage”) @map(“task_type”) @db.VarChar(64)
runnerPkg String @map(“runner_pkg”) @db.VarChar(128)
runnerVersion String @map(“runner_version”) @db.VarChar(32)
taskSpec Json @map(“task_spec”)
// 完整 /task-spec.json 副本, 主 API 重启后
// 不调容器即可知道这个 task 是哪种 type / 哪个 stage
cycleCount Int @default(0) @map(“cycle_count”)
// cyclic 类 task 进度持久化 (tdd-cycle 等)
additions, deletions, filesChanged, prUrl, failedReason,
createdAt, updatedAt, deletedAt
…
@@index([branchId, deletedAt])
@@index([creatorId, deletedAt])
@@index([createdAt])
@@index([status, sandboxId]) // ResubscribeBootstrap WHERE status IN (‘running’…) 用
}字段动机一览
| 新字段 | 动机 | 所属决策 |
|---|---|---|
sandbox_id | 主 API 重启后通过 provider.get(id) 重订阅事件流 | D4 |
sandbox_provider | 重订阅时知道用哪个 adapter(6 个 backend 之一) | D4 |
task_type | 插件协议——决定加载哪个 runner npm 包 | D5 |
runner_pkg / runner_version | image 内 require 哪个版本的 runner 模块(in-flight task 不被新 release 破坏) | D5 |
task_spec (jsonb) | 主 API 仅看 Task 表就知道 stage 集合、cycle 状态、completion 配置——不必去问 sandbox | D5 |
cycle_count | cyclic task(TDD 等)当前是第几轮,跨 Relay 重连可恢复 | D5 |
stage 改 varchar | TaskStage enum 写死 explore/propose/apply/archive,spec-kit / tdd 用不上;改 varchar 容纳 | D5 |
10.4 v2 改字段 · TaskEvent 表
model TaskEvent {
id String @id @default(uuid())
taskId String @map(“task_id”)
type String // 值域扩 (见下)
stage String?
status String?
data Json
sequenceNumber BigInt @map(“sequence_number”)
// per task 单调自增, 行级保序; web 用 last_seq
// 做 catch-up (避免依赖 created_at 精度)
ingestMethod String @default(“relay”) @map(“ingest_method”) @db.VarChar(24)
// ‘relay’ | ‘stdout’ | ‘bootstrap_resubscribe’ | ‘janitor’
// 审计 + 调试用
cycleIndex Int? @map(“cycle_index”)
// cyclic 任务专用,标记是第几轮的事件
createdAt DateTime @default(now())
task Task @relation(fields: [taskId], references: [id], onDelete: Cascade)
@@index([taskId, createdAt])
@@unique([taskId, sequenceNumber]) // 保序 + catch-up 唯一索引
}
// type 字段值域扩
现: ‘stage_change’ | ‘log’ | ‘completed’ | ‘failed’
加: ‘user_message_injected’ | ‘token_renewed’ | ‘cycle_evaluated’ | ‘stage_evaluated’10.5 v2 新增 · RunnerRegistry 表
model RunnerRegistry { id String @id @default(uuid()) @db.Uuid type String @unique @db.VarChar(64) // ‘openspec-4stage’ | ‘spec-kit’ | ‘tdd-cycle’ | … runnerPkg String @map(“runner_pkg”) @db.VarChar(128) // ‘@douglas-agent/task-runner-openspec’ runnerVersion String @map(“runner_version”) @db.VarChar(32) // semver range 或锁定版本 image String @db.VarChar(256) // ‘ghcr.io/openspec/task-runner:v2.1.0’ imageDigest String? @map(“image_digest”) @db.VarChar(80) // ‘sha256:…’ 用于 supply chain 锁定 manifest Json // RunnerManifest: required_skills, param_schema, // state_model, stages, default_limits active Boolean @default(true) // false = 软停用, 不能新派发但 in-flight 不影响 createdAt DateTime @default(now()) @map(“created_at”) @db.Timestamptz(6) updatedAt DateTime @updatedAt @map(“updated_at”) @db.Timestamptz(6) @@index([type, active]) @@map(“runner_registry”) } // dispatcher 派发路径: // 1. POST /tasks/:id/dispatch body: { task_type, params } // 2. SELECT * FROM runner_registry WHERE type = $1 AND active = true // 3. validate(params, registry.manifest.param_schema) // 4. 写 Task.{taskType, runnerPkg, runnerVersion, taskSpec} // 5. provider.create({ image: registry.image, env: { TASK_SPEC_JSON: … })}
10.6 ER 图(跨域关系)
┌─────────────────────────────────────────────────────────────────────────────┐ │ 身份域 (Identity) │ │ │ │ ┌──────────────┐ 1 ──── N ┌───────────────┐ │ │ │ User │ ──────────▶│ Session │ │ │ │ │ │ (cookie 凭据) │ │ │ └──────┬───────┘ └───────────────┘ │ │ │ 1 │ │ │ │ └──────────┼──────────────────────────────────────────────────────────────────┘ │ │ N (installedBy / creator) │ ┌──────────┴──────────────────────────────────────────────────────────────────┐ │ 资源域 (Resources) │ │ │ │ ┌──────────────────┐ 1 ── N ┌───────────────┐ 1 ── N ┌──────────────┐ │ │ │ GithubInstall │ ──────▶│ GithubRepo │ ──────▶│ GithubBranch │ │ │ │ ation │ │ (镜像) │ │ (镜像) │ │ │ └──────────────────┘ └───────────────┘ └──────┬───────┘ │ │ │ 1 │ └────────────────────────────────────────────────────────────────┼────────────┘ │ │ N │ ┌────────────────────────────────────────────────────────────────┴────────────┐ │ 任务域 (Tasks) │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Task │ 1 ── N ┌─────────────┐ │ │ │ ─ branch_id (FK 硬约束 → GithubBranch) │ ────────▶│ TaskEvent │ │ │ │ ─ creator_id (FK 软约束 → User, SetNull) │ │ ─ task_id │ │ │ │ ─ sandbox_id (外部 sandbank id,非 FK) │ │ ─ seq_no │ │ │ │ ─ sandbox_provider (enum) │ │ ─ cycle_idx│ │ │ │ ─ task_type (软引用 RunnerRegistry.type) │ │ ─ type │ │ │ │ ─ runner_pkg / runner_version │ │ ─ data │ │ │ │ ─ task_spec (jsonb) │ └─────────────┘ │ │ │ ─ cycle_count │ │ │ │ ─ status, stage, prompt, … │ │ │ └────────┬────────────────────────────────────┘ │ │ │ 软引用 (字符串 task_type, 不约束) │ └────────────┼────────────────────────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────────────────┐ │ 运行时域 (Runtime) [v2 NEW] │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ RunnerRegistry │ │ │ │ ─ type (unique, 软引用目标) │ │ │ │ ─ runner_pkg / runner_version │ │ │ │ ─ image / image_digest │ │ │ │ ─ manifest (jsonb) │ │ │ │ ─ active │ │ │ └─────────────────────────────────────────┘ │ │ 无 FK 引用外部 (独立资源, 主 API 启动时 seed) │ └────────────────────────────────────────────────────────────────────────────┘ 外键约束策略: 硬约束 (CASCADE): Task.branch_id, TaskEvent.task_id 软约束 (SetNull): Task.creator_id (用户删了 task 留着) 无 FK (软引用): Task.task_type → RunnerRegistry.type Task.sandbox_id → 外部 sandbank handle
10.7 完整 Prisma diff
+ enum SandboxProvider { + microsandbox + fly + boxlite + aio + e2b + cube + @@map(“sandbox_provider”) + - enum TaskStage { // 删除: 不同 task_type 有不同 stage 名 - explore - propose - apply - archive - model Task { … (现有字段) - stage TaskStage @default(explore) + stage String @default(“explore”) @db.VarChar(32) + sandboxId String? @map(“sandbox_id”) @db.VarChar(128) + sandboxProvider SandboxProvider? @map(“sandbox_provider”) + taskType String @default(“openspec-4stage”) @map(“task_type”) + runnerPkg String @map(“runner_pkg”) @db.VarChar(128) + runnerVersion String @map(“runner_version”) @db.VarChar(32) + taskSpec Json @map(“task_spec”) + cycleCount Int @default(0) @map(“cycle_count”) + @@index([status, sandboxId]) } model TaskEvent { … (现有字段) + sequenceNumber BigInt @map(“sequence_number”) + ingestMethod String @default(“relay”) @map(“ingest_method”) @db.VarChar(24) + cycleIndex Int? @map(“cycle_index”) + @@unique([taskId, sequenceNumber]) } + model RunnerRegistry { … } // 见 10.5
10.8 显式不引入的表(决策附录)
| 未引入 | 形态 | 不引入的理由 | 未来何时再考虑 |
|---|---|---|---|
SandboxRun | Task 1:N 的 sandbox 历史,支持任务重试 | 本期 1:1 假设(一个 task 单次 sandbox 启动)。提前引入违反 D4 快速落地原则 | 用户真要”task 失败可重试”产品功能时 extract |
CredentialLease | scoped install token 发放 / 续期审计 | spec.md 明确要求 token 不持久化到磁盘(“容器内 git credential 不持久化”),仅活在内存。审计走 task_events.type='token_renewed' | 商业化要审计追溯单一 token 谁发的时 |
ReconcileJob | 调度子域,BullMQ / Trigger.dev 风格 | D6 决定不上 durable execution;ResubscribeBootstrap 是 OnModuleInit 一次性扫,不是持续队列 | 同时活跃 task > 100 或多副本主 API 时 |
TaskQuota | 用户 / repo / task_type 维度配额 | 现有”并发上限”是实时 SELECT COUNT,无配额表 | 商业化收费 / 限流分租户时 |
RelaySessionRecord | Relay 频道状态镜像 | Relay 是无状态总线,task.sandbox_id + task.id 已足够 reattach | 不会引入(Relay 不应该作为 source of truth) |
UserMessageInjection | 用户中途追加指令的独立持久化 | 已经通过 task_events.type='user_message_injected' 走事件表,不必再独立 | 不会引入 |
10.9 Migration 顺序(对齐 P1 / P2 / P3)
0042_task_sandbox_binding.sql · 对应 P1 重启韧性
CREATE TYPE sandbox_provider AS ENUM (...)ALTER TABLE tasks ADD COLUMN sandbox_id VARCHAR(128)ALTER TABLE tasks ADD COLUMN sandbox_provider sandbox_providerCREATE INDEX idx_tasks_active ON tasks(status, sandbox_id) WHERE status IN ('running','awaiting_review')
Breaking: 否。Backfill: 不需要(新字段 nullable)。Rollback: 删字段即可,无依赖。
0043_task_events_sequence.sql · 对应 P2 Relay + WS
ALTER TABLE task_events ADD COLUMN sequence_number BIGINTALTER TABLE task_events ADD COLUMN ingest_method VARCHAR(24) DEFAULT 'stdout'-- backfill: UPDATE task_events SET sequence_number = row_number() OVER (PARTITION BY task_id ORDER BY created_at)ALTER TABLE task_events ALTER COLUMN sequence_number SET NOT NULLCREATE UNIQUE INDEX uq_task_events_seq ON task_events(task_id, sequence_number)
Breaking: 否。Backfill: 历史事件 sequence_number 按 created_at 升序填。Rollback: 删字段 + 删索引。
0044_task_type_pluggable.sql · 对应 P3 插件协议
ALTER TABLE tasks ADD COLUMN task_type VARCHAR(64) DEFAULT 'openspec-4stage'ALTER TABLE tasks ADD COLUMN runner_pkg VARCHAR(128) DEFAULT '@douglas-agent/task-runner-openspec'ALTER TABLE tasks ADD COLUMN runner_version VARCHAR(32) DEFAULT '^1.0.0'ALTER TABLE tasks ADD COLUMN task_spec JSONBALTER TABLE tasks ADD COLUMN cycle_count INT DEFAULT 0ALTER TABLE tasks ALTER COLUMN stage TYPE VARCHAR(32) USING stage::textALTER TABLE task_events ADD COLUMN cycle_index INTDROP TYPE task_stageCREATE TABLE runner_registry (...)-- seed: INSERT INTO runner_registry VALUES ('openspec-4stage', ...)
Breaking: 是(TaskStage enum 退役)。需先升级所有读 stage 的代码到容忍 string。Backfill: 历史 task 默认 ‘openspec-4stage’。Rollback: 复杂——需要恢复 enum + 反向同步 stage 字段。建议 forward-only。
关键设计原则: v2 数据模型遵循 D4 + D6 决策——该持久化的才落表。token / Relay channel 状态 / reconcile 队列都不进数据库;只有”未来重启后还需要的”信息(sandbox_id, task_spec, sequence_number)才入表。
§11 收口建议
一句话决策 + 下一步
建议落 OpenSpec change 提案: add-task-sandbox-websocket-broker(或类似名)。范围限 P1 + P2,把 P3 的插件协议拆为独立 follow-up change。
11.1 一句话决策
| 方向 | 判断 |
|---|---|
| web ↔ 主 API:WebSocket 替代 SSE? | ✓ 是 — 用户追加指令是真实需求,SSE 单向不够 |
| 主 API ↔ sandbox:引入 sandbank Relay? | ✓ 是 — 自写或直连每个 backend 都更贵 |
| agent runtime:切 streamInput? | ✓ 是 — Claude SDK 原生能力,门槛低 |
| 主 API 重启韧性:上 ReconcileWorker / BullMQ / Trigger.dev? | ✗ 否(当下)— 30 行 ResubscribeBootstrap 已经够,未来再说 |
| 领域模型重构:4 个聚合? | ✗ 否(当下)— Task 加 2 列即可,等任务重试 / 多 lease 真有需求再拆 |
11.2 下一步
- 本文档 review → 用户决定是否进 OpenSpec propose
- P2 起步前的两个 spike:(a) 6 backend 的 outbound WSS 联通性;(b) streamInput 在 v2 SDK 的稳定性
- 如果通过:先做 P1(独立闭环 + 上线),再启动 P2 设计 review
引用源码
apps/api/src/task-runner/task-runner.controller.ts:92— 当前 SSE 端点apps/api/src/task-runner/task-runner.dispatcher.ts:169-199— streamLogs in-memory loopapps/api/src/task-runner/task-runner.janitor.ts— 60min cron 兜底apps/api/prisma/schema.prisma · model Task— 缺 sandbox_idapps/api/fly.toml— kill_signal=SIGINT, kill_timeout=10sopenspec/specs/task-runner/spec.md— POC v2 现状 spec
外部参考
- Claude Agent SDK · Streaming Input mode
- Claude Agent SDK TypeScript reference · Query.streamInput()
- chekusu/sandbank · 上游
- sandbank CHANGELOG · provider.get / list / Relay / agent
- NestJS WebSockets · Gateways
- Fly.io · WebSockets and Fly
- Fly.io · Graceful VM exits
- Fly.io · Sticky Sessions
历史 sandbank 探索沉淀
docs/spikes/s1-claude-agent-sdk-skills.md— Agent SDK + skill 加载docs/spikes/s5-four-stages-pipeline.md— 四阶段单 query() 验证docs/spikes/s7-microvm-long-task.md— 长任务 cache hit 实测docs/spikes/s12-cube-on-wsl.md+docs/spikes/s13-microsandbox-on-wsl.md— WSL2 quirk SOPdocs/task-runner-poc.html— POC v1/v2 技术选型docs/task-runner-sandbank-integration.md— sandbank 集成笔记