Track ACP sessions_spawn runs and emit ACP lifecycle events (#40885)

* Fix ACP sessions_spawn lifecycle tracking

* fix(tests): resolve leftover merge markers in sessions spawn lifecycle test

* fix(agents): clarify acp spawn cleanup semantics

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
助爪
2026-03-29 05:20:10 -04:00
committed by GitHub
parent a7a89fb680
commit 443295448c
6 changed files with 367 additions and 0 deletions
+1
View File
@@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- ACP/sessions_spawn: register ACP child runs for completion tracking and lifecycle cleanup, and make registration-failure cleanup explicitly best-effort so callers do not assume an already-started ACP turn was fully aborted. (#40885) Thanks @xaeon2026 and @vincentkoc.
- ACPX/runtime: derive the bundled ACPX expected version from the extension package metadata instead of hardcoding a separate literal, so plugin-local ACPX installs stop drifting out of health-check parity after version bumps. (#49089) Thanks @jiejiesks and @vincentkoc.
- Gateway/auth: make local-direct `trusted-proxy` fallback require the configured shared token instead of silently authenticating same-host callers, while keeping same-host reverse proxy identity-header flows on the normal trusted-proxy path. Thanks @zhangning-agent and @vincentkoc.
- Agents/sandbox: honor `tools.sandbox.tools.alsoAllow`, let explicit sandbox re-allows remove matching built-in default-deny tools, and keep sandbox explain/error guidance aligned with the effective sandbox tool policy. (#54492) Thanks @ngutman.
@@ -16,6 +16,10 @@ const fastModeEnv = vi.hoisted(() => {
return { previous };
});
const acpSpawnMocks = vi.hoisted(() => ({
spawnAcpDirect: vi.fn(),
}));
vi.mock("./pi-embedded.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./pi-embedded.js")>();
return {
@@ -27,6 +31,12 @@ vi.mock("./pi-embedded.js", async (importOriginal) => {
};
});
vi.mock("./acp-spawn.js", () => ({
ACP_SPAWN_MODES: ["run", "session"],
ACP_SPAWN_STREAM_TARGETS: ["parent"],
spawnAcpDirect: (...args: unknown[]) => acpSpawnMocks.spawnAcpDirect(...args),
}));
vi.mock("./tools/agent-step.js", () => ({
readLatestAssistantReply: async () => "done",
}));
@@ -122,6 +132,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
});
resetSubagentRegistryForTests();
callGatewayMock.mockClear();
acpSpawnMocks.spawnAcpDirect.mockReset();
});
afterAll(() => {
@@ -311,6 +322,92 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true);
});
it("tracks ACP run-mode spawns for auto-announce via agent.wait", async () => {
let deletedKey: string | undefined;
acpSpawnMocks.spawnAcpDirect.mockResolvedValue({
status: "accepted",
childSessionKey: "agent:codex:acp:child-1",
runId: "run-acp-1",
mode: "run",
});
const ctx = setupSessionsSpawnGatewayMock({
includeChatHistory: true,
...buildDiscordCleanupHooks((key) => {
deletedKey = key;
}),
agentWaitResult: { status: "ok", startedAt: 3000, endedAt: 4000 },
});
const tool = await getDiscordGroupSpawnTool();
const result = await tool.execute("call-acp", {
runtime: "acp",
task: "do thing",
agentId: "codex",
runTimeoutSeconds: RUN_TIMEOUT_SECONDS,
cleanup: "delete",
});
expect(result.details).toMatchObject({
status: "accepted",
childSessionKey: "agent:codex:acp:child-1",
runId: "run-acp-1",
});
await waitFor(
() =>
ctx.waitCalls.some((call) => call.runId === "run-acp-1") &&
Boolean(deletedKey) &&
ctx.calls.some((call) => call.method === "agent"),
);
expect(acpSpawnMocks.spawnAcpDirect).toHaveBeenCalledWith(
expect.objectContaining({
task: "do thing",
agentId: "codex",
}),
expect.objectContaining({
agentSessionKey: "discord:group:req",
}),
);
const announceCall = ctx.calls.find((call) => call.method === "agent");
const announceParams = announceCall?.params as
| { sessionKey?: string; deliver?: boolean; message?: string }
| undefined;
expect(announceParams?.sessionKey).toBe("agent:main:discord:group:req");
expect(announceParams?.deliver).toBe(false);
expect(announceParams?.message).toContain("do thing");
expect(deletedKey).toBe("agent:codex:acp:child-1");
});
it('does not track ACP spawns through auto-announce when streamTo="parent"', async () => {
acpSpawnMocks.spawnAcpDirect.mockResolvedValue({
status: "accepted",
childSessionKey: "agent:codex:acp:child-2",
runId: "run-acp-2",
mode: "run",
});
const ctx = setupSessionsSpawnGatewayMock({
includeChatHistory: true,
agentWaitResult: { status: "ok", startedAt: 5000, endedAt: 6000 },
});
const tool = await getDiscordGroupSpawnTool();
const result = await tool.execute("call-acp-parent", {
runtime: "acp",
task: "stream progress",
agentId: "codex",
runTimeoutSeconds: RUN_TIMEOUT_SECONDS,
streamTo: "parent",
});
expect(result.details).toMatchObject({
status: "accepted",
childSessionKey: "agent:codex:acp:child-2",
runId: "run-acp-2",
});
expect(ctx.waitCalls).toHaveLength(0);
expect(ctx.calls.filter((call) => call.method === "agent")).toHaveLength(0);
});
it("sessions_spawn reports timed out when agent.wait returns timeout", async () => {
const ctx = setupSessionsSpawnGatewayMock({
includeChatHistory: true,
+108
View File
@@ -1,11 +1,20 @@
import { Type } from "@sinclair/typebox";
import { loadConfig } from "../../config/config.js";
import { callGateway } from "../../gateway/call.js";
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
import { ACP_SPAWN_MODES, ACP_SPAWN_STREAM_TARGETS, spawnAcpDirect } from "../acp-spawn.js";
import { optionalStringEnum } from "../schema/typebox.js";
import type { SpawnedToolContext } from "../spawned-context.js";
import { registerSubagentRun } from "../subagent-registry.js";
import { SUBAGENT_SPAWN_MODES, spawnSubagentDirect } from "../subagent-spawn.js";
import type { AnyAgentTool } from "./common.js";
import { jsonResult, readStringParam, ToolInputError } from "./common.js";
import {
resolveDisplaySessionKey,
resolveInternalSessionKey,
resolveMainSessionAlias,
} from "./sessions-helpers.js";
const SESSIONS_SPAWN_RUNTIMES = ["subagent", "acp"] as const;
const SESSIONS_SPAWN_SANDBOX_MODES = ["inherit", "require"] as const;
@@ -20,6 +29,46 @@ const UNSUPPORTED_SESSIONS_SPAWN_PARAM_KEYS = [
"reply_to",
] as const;
function summarizeError(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
if (typeof err === "string") {
return err;
}
return "error";
}
function resolveTrackedSpawnMode(params: {
requestedMode?: "run" | "session";
threadRequested: boolean;
}): "run" | "session" {
if (params.requestedMode === "run" || params.requestedMode === "session") {
return params.requestedMode;
}
return params.threadRequested ? "session" : "run";
}
async function cleanupUntrackedAcpSession(sessionKey: string): Promise<void> {
const key = sessionKey.trim();
if (!key) {
return;
}
try {
await callGateway({
method: "sessions.delete",
params: {
key,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch {
// Best-effort cleanup only.
}
}
const SessionsSpawnToolSchema = Type.Object({
task: Type.String(),
label: Type.Optional(Type.String()),
@@ -170,6 +219,65 @@ export function createSessionsSpawnTool(
sandboxed: opts?.sandboxed,
},
);
const childSessionKey = result.childSessionKey?.trim();
const childRunId = result.runId?.trim();
const shouldTrackViaRegistry =
result.status === "accepted" &&
Boolean(childSessionKey) &&
Boolean(childRunId) &&
streamTo !== "parent";
if (shouldTrackViaRegistry && childSessionKey && childRunId) {
const cfg = loadConfig();
const trackedSpawnMode = resolveTrackedSpawnMode({
requestedMode: result.mode,
threadRequested: thread,
});
const trackedCleanup = trackedSpawnMode === "session" ? "keep" : cleanup;
const { mainKey, alias } = resolveMainSessionAlias(cfg);
const requesterInternalKey = opts?.agentSessionKey
? resolveInternalSessionKey({
key: opts.agentSessionKey,
alias,
mainKey,
})
: alias;
const requesterDisplayKey = resolveDisplaySessionKey({
key: requesterInternalKey,
alias,
mainKey,
});
const requesterOrigin = normalizeDeliveryContext({
channel: opts?.agentChannel,
accountId: opts?.agentAccountId,
to: opts?.agentTo,
threadId: opts?.agentThreadId,
});
try {
registerSubagentRun({
runId: childRunId,
childSessionKey,
requesterSessionKey: requesterInternalKey,
requesterOrigin,
requesterDisplayKey,
task,
cleanup: trackedCleanup,
label: label || undefined,
runTimeoutSeconds,
expectsCompletionMessage: true,
spawnMode: trackedSpawnMode,
});
} catch (err) {
// Best-effort only: the ACP turn was already started above, so deleting the
// child session record here does not guarantee the in-flight run was aborted.
await cleanupUntrackedAcpSession(childSessionKey);
return jsonResult({
status: "error",
error: `Failed to register ACP run: ${summarizeError(err)}. Cleanup was attempted, but the already-started ACP run may still finish in the background.`,
childSessionKey,
runId: childRunId,
});
}
}
return jsonResult(result);
}
+27
View File
@@ -13,6 +13,7 @@ import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
@@ -308,6 +309,7 @@ export async function tryDispatchAcpReply(params: {
ctx: FinalizedMsgContext;
cfg: OpenClawConfig;
dispatcher: ReplyDispatcher;
runId?: string;
sessionKey?: string;
abortSignal?: AbortSignal;
inboundAudio: boolean;
@@ -467,6 +469,18 @@ export async function tryDispatchAcpReply(params: {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
if (params.runId?.trim()) {
emitAgentEvent({
runId: params.runId.trim(),
sessionKey,
stream: "lifecycle",
data: {
phase: "end",
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
@@ -492,6 +506,19 @@ export async function tryDispatchAcpReply(params: {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
if (params.runId?.trim()) {
emitAgentEvent({
runId: params.runId.trim(),
sessionKey,
stream: "lifecycle",
data: {
phase: "error",
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
error: acpError.message,
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
@@ -78,6 +78,9 @@ const sessionStoreMocks = vi.hoisted(() => ({
resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"),
resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })),
}));
const agentEventMocks = vi.hoisted(() => ({
emitAgentEvent: vi.fn(),
}));
const ttsMocks = vi.hoisted(() => {
const state = {
synthesizeFinalAudio: false,
@@ -207,6 +210,13 @@ vi.mock("../../infra/outbound/session-binding-service.js", async (importOriginal
}),
};
});
vi.mock("../../infra/agent-events.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../infra/agent-events.js")>();
return {
...actual,
emitAgentEvent: (params: unknown) => agentEventMocks.emitAgentEvent(params),
};
});
vi.mock("../../tts/tts.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value),
@@ -349,6 +359,7 @@ describe("dispatchReplyFromConfig", () => {
acpMocks.upsertAcpSessionMeta.mockReset();
acpMocks.upsertAcpSessionMeta.mockResolvedValue(null);
acpMocks.requireAcpRuntimeBackend.mockReset();
agentEventMocks.emitAgentEvent.mockReset();
sessionBindingMocks.listBySession.mockReset();
sessionBindingMocks.listBySession.mockReturnValue([]);
pluginBindingTesting.reset();
@@ -1126,6 +1137,127 @@ describe("dispatchReplyFromConfig", () => {
expect(outcome).toBe("settled");
});
it("emits lifecycle end for ACP turns using the current run id", async () => {
setNoAbort();
const runtime = createAcpRuntime([{ type: "text_delta", text: "done" }, { type: "done" }]);
acpMocks.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
storePath: "/tmp/mock-sessions.json",
entry: {},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime,
});
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "write a test",
});
await dispatchReplyFromConfig({
ctx,
cfg: {
acp: {
enabled: true,
dispatch: { enabled: true },
stream: { coalesceIdleMs: 0, maxChunkChars: 128 },
},
} as OpenClawConfig,
dispatcher,
replyOptions: {
runId: "run-acp-lifecycle-end",
},
});
expect(agentEventMocks.emitAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-acp-lifecycle-end",
sessionKey: "agent:codex-acp:session-1",
stream: "lifecycle",
data: expect.objectContaining({
phase: "end",
}),
}),
);
});
it("emits lifecycle error for ACP turn failures using the current run id", async () => {
setNoAbort();
const runtime = createAcpRuntime([]);
runtime.runTurn.mockImplementation(async function* () {
yield { type: "status", tag: "usage_update", text: "warming up" };
throw new Error("ACP exploded");
});
acpMocks.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
storePath: "/tmp/mock-sessions.json",
entry: {},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime,
});
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "write a test",
});
await dispatchReplyFromConfig({
ctx,
cfg: {
acp: {
enabled: true,
dispatch: { enabled: true },
stream: { coalesceIdleMs: 0, maxChunkChars: 128 },
},
} as OpenClawConfig,
dispatcher,
replyOptions: {
runId: "run-acp-lifecycle-error",
},
});
expect(agentEventMocks.emitAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-acp-lifecycle-error",
sessionKey: "agent:codex-acp:session-1",
stream: "lifecycle",
data: expect.objectContaining({
phase: "error",
error: expect.stringContaining("ACP exploded"),
}),
}),
);
});
it("posts a one-time resolved-session-id notice in thread after the first ACP turn", async () => {
setNoAbort();
const runtime = createAcpRuntime([{ type: "text_delta", text: "hello" }, { type: "done" }]);
@@ -595,6 +595,7 @@ export async function dispatchReplyFromConfig(params: {
ctx,
cfg,
dispatcher,
runId: params.replyOptions?.runId,
sessionKey: acpDispatchSessionKey,
abortSignal: params.replyOptions?.abortSignal,
inboundAudio,
@@ -733,6 +734,7 @@ export async function dispatchReplyFromConfig(params: {
ctx,
cfg,
dispatcher,
runId: params.replyOptions?.runId,
sessionKey: acpDispatchSessionKey,
abortSignal: params.replyOptions?.abortSignal,
inboundAudio,