Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2b941ccc93 | |||
| ed080ae988 | |||
| f31e89d5af | |||
| 52c311e47f | |||
| 5b54d4de7a | |||
| 96152f6577 | |||
| e881b3c5de | |||
| e86b507da7 | |||
| 2fc3a822c8 | |||
| 1b0e1edb08 | |||
| d107b79c63 | |||
| c5ab442f46 | |||
| c5677df56e | |||
| 21ba0fb8a4 | |||
| 69319a0569 | |||
| 37d8e55991 | |||
| 8d20edb028 | |||
| 7564c4e7f4 | |||
| 26e02a9b8b | |||
| 25ec133574 | |||
| d88ede92b9 |
@@ -36,3 +36,19 @@
|
||||
## Agent-Specific Notes
|
||||
- If the relay is running in tmux (`warelay-relay`), restart it after code changes: kill pane/session and run `pnpm warelay relay --verbose` inside tmux. Check tmux before editing; keep the watcher healthy if you start it.
|
||||
- Also read the shared guardrails at `~/Projects/oracle/AGENTS.md` and `~/Projects/agent-scripts/AGENTS.MD` before making changes; align with any cross-repo rules noted there.
|
||||
|
||||
## Exclamation Mark Escaping Workaround
|
||||
The Claude Code Bash tool escapes `!` to `\!` in command arguments. When using `warelay send` with messages containing exclamation marks, use heredoc syntax:
|
||||
|
||||
```bash
|
||||
# WRONG - will send "Hello\!" with backslash
|
||||
warelay send --provider web --to "+1234" --message 'Hello!'
|
||||
|
||||
# CORRECT - use heredoc to avoid escaping
|
||||
warelay send --provider web --to "+1234" --message "$(cat <<'EOF'
|
||||
Hello!
|
||||
EOF
|
||||
)"
|
||||
```
|
||||
|
||||
This is a Claude Code quirk, not a warelay bug.
|
||||
|
||||
@@ -1,10 +1,40 @@
|
||||
# Changelog
|
||||
|
||||
## 1.3.0 — Unreleased
|
||||
|
||||
### Highlights
|
||||
- **Pluggable agents (Claude, Pi, Codex, Opencode):** New `inbound.reply.agent` block chooses the CLI and parser per command reply; per-agent argv builders inject the right flags/identity/prompt handling and parse NDJSON streams, enabling Pi/Codex swaps without changing templates.
|
||||
|
||||
### Bug Fixes
|
||||
- **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`.
|
||||
- **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`.
|
||||
- **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses.
|
||||
- **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files.
|
||||
- **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently.
|
||||
|
||||
### Changes
|
||||
- **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise.
|
||||
- **Batched inbound messaging with timestamps:** When multiple WhatsApp messages queue up, they’re sent to the agent in one combined batch, each line timestamped consistently to preserve ordering and context.
|
||||
- **Typing indicator after IPC send:** After sending a message via IPC (e.g., `warelay send`), the relay now automatically shows the typing indicator ("composing") to signal that more messages may be coming.
|
||||
- **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives.
|
||||
- **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter.
|
||||
- **Test isolation improvements:** Mock `loadConfig()` in all test files to prevent loading real user config (with emojis/prefixes) during tests. Default test config now has no prefixes/timestamps for cleaner assertions.
|
||||
- **Same-phone mode (self-messaging):** warelay now supports running on the same phone number you message from. This enables setups where you chat with yourself to control an AI assistant. Same-phone mode (`from === to`) is always allowed, even without configuring `allowFrom`. Echo detection prevents infinite loops by tracking recently sent message text and skipping auto-replies when incoming messages match.
|
||||
- **Echo detection:** The `fromMe` filter in `inbound.ts` is deliberately removed for same-phone setups; instead, text-based echo detection in `auto-reply.ts` tracks sent messages in a bounded Set (max 100 entries) and skips processing when a match is found.
|
||||
- **Same-phone detection logging:** Verbose mode now logs `📱 Same-phone mode detected` when `from === to`.
|
||||
- **Configurable same-phone marker:** New `inbound.samePhoneMarker` config option to customize the prefix added to messages in same-phone mode (default: `[same-phone]`). Set it to something cute like `[🦞 same-phone]` to help distinguish bot replies.
|
||||
|
||||
## 1.2.2 — 2025-11-28
|
||||
|
||||
### Changes
|
||||
- **Manual heartbeat sends:** `warelay heartbeat` accepts `--message/--body` with `--provider web|twilio` to push real outbound messages through the same plumbing; `--dry-run` previews payloads without sending.
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Changes
|
||||
- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests.
|
||||
- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs.
|
||||
|
||||
## 1.2.1 — 2025-11-28
|
||||
|
||||
### Changes
|
||||
|
||||
@@ -93,6 +93,16 @@ Install from npm (global): `npm install -g warelay` (Node 22+). Then choose **on
|
||||
|
||||
Best practice: use a dedicated WhatsApp account (separate SIM/eSIM or business account) for automation instead of your primary personal account to avoid unexpected logouts or rate limits.
|
||||
|
||||
### Same-phone mode (self-messaging)
|
||||
warelay supports running on the same phone number you message from—you chat with yourself and an AI assistant replies in the same bubble. This requires:
|
||||
- Adding your own number to `allowFrom` in `warelay.json`
|
||||
- The `fromMe` filter is disabled; echo detection in `auto-reply.ts` prevents loops
|
||||
|
||||
**Gotchas:**
|
||||
- Messages appear in the same chat bubble (WhatsApp "Note to self")
|
||||
- Echo detection relies on exact text matching; if the reply is identical to your input, it may be skipped
|
||||
- Works best with a dedicated WhatsApp account
|
||||
|
||||
## Configuration
|
||||
|
||||
### Environment (.env)
|
||||
@@ -157,6 +167,9 @@ Best practice: use a dedicated WhatsApp account (separate SIM/eSIM or business a
|
||||
| Key | Type & default | Notes |
|
||||
| --- | --- | --- |
|
||||
| `inbound.allowFrom` | `string[]` (default: empty) | E.164 numbers allowed to trigger auto-reply (no `whatsapp:`); `"*"` allows any sender. |
|
||||
| `inbound.messagePrefix` | `string` (default: `"[warelay]"` if no allowFrom, else `""`) | Prefix added to all inbound messages before passing to command. |
|
||||
| `inbound.responsePrefix` | `string` (default: —) | Prefix auto-added to all outbound replies (e.g., `"🦞"`). |
|
||||
| `inbound.timestampPrefix` | `boolean \| string` (default: `true`) | Timestamp prefix: `true` (UTC), `false` (disabled), or IANA timezone like `"Europe/Vienna"`. |
|
||||
| `inbound.reply.mode` | `"text"` \| `"command"` (default: —) | Reply style. |
|
||||
| `inbound.reply.text` | `string` (default: —) | Used when `mode=text`; templating supported. |
|
||||
| `inbound.reply.command` | `string[]` (default: —) | Argv for `mode=command`; each element templated. Stdout (trimmed) is sent. |
|
||||
|
||||
Regular → Executable
@@ -0,0 +1,77 @@
|
||||
# Agent Abstraction Refactor Plan
|
||||
|
||||
Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact.
|
||||
|
||||
## Overview
|
||||
- Introduce a pluggable agent layer (`src/agents/*`), selected by config.
|
||||
- Normalize config (`agent` block) and remove `claudeOutputFormat` legacy knobs.
|
||||
- Provide per-agent argv builders and output parsers (including NDJSON streams).
|
||||
- Preserve MEDIA-token handling and shared queue/heartbeat behavior.
|
||||
|
||||
## Configuration
|
||||
- New shape (no backward compat):
|
||||
```json5
|
||||
inbound: {
|
||||
reply: {
|
||||
mode: "command",
|
||||
agent: {
|
||||
kind: "claude" | "opencode" | "pi" | "codex",
|
||||
format?: "text" | "json",
|
||||
identityPrefix?: string
|
||||
},
|
||||
command: ["claude", "{{Body}}"],
|
||||
cwd?: string,
|
||||
session?: { ... },
|
||||
timeoutSeconds?: number,
|
||||
bodyPrefix?: string,
|
||||
mediaUrl?: string,
|
||||
mediaMaxMb?: number,
|
||||
typingIntervalSeconds?: number,
|
||||
heartbeatMinutes?: number
|
||||
}
|
||||
}
|
||||
```
|
||||
- Validation moves to `config.ts` (new `AgentKind`/`AgentConfig` types).
|
||||
- If `agent` is missing → config error.
|
||||
|
||||
## Agent modules
|
||||
- `src/agents/types.ts` – `AgentKind`, `AgentSpec`:
|
||||
- `buildArgs(argv: string[], body: string, ctx: { sessionId?, isNewSession?, sendSystemOnce?, systemSent?, identityPrefix? }): string[]`
|
||||
- `parse(stdout: string): { text?: string; mediaUrls?: string[]; meta?: AgentMeta }`
|
||||
- `src/agents/claude.ts` – current flag injection (`--output-format`, `-p`), identity prepend.
|
||||
- `src/agents/opencode.ts` – reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix.
|
||||
- `src/agents/pi.ts` – parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags.
|
||||
- `src/agents/codex.ts` – parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only.
|
||||
- Shared MEDIA extraction stays in `media/parse.ts`.
|
||||
|
||||
## Command runner changes
|
||||
- `runCommandReply`:
|
||||
- Resolve agent spec from config.
|
||||
- Apply `buildArgs` (handles identity prepend and session args per agent).
|
||||
- Run command; send stdout to `spec.parse` → `text`, `mediaUrls`, `meta` (stored as `agentMeta`).
|
||||
- Remove `claudeMeta` naming; tests updated to `agentMeta`.
|
||||
|
||||
## Sessions
|
||||
- Session arg defaults become agent-specific (Claude: `--resume/--session-id`; Opencode/Pi/Codex: `--session`).
|
||||
- Still overridable via `sessionArgNew/sessionArgResume` in config.
|
||||
|
||||
## Tests
|
||||
- Update existing tests to new config (no `claudeOutputFormat`).
|
||||
- Add fixtures:
|
||||
- Opencode NDJSON sample (from PR #5) → parsed text + meta.
|
||||
- Codex NDJSON sample (captured: thread/turn/item/usage) → parsed text.
|
||||
- Pi NDJSON sample (AssistantMessageEvent) → parsed text.
|
||||
- Ensure MEDIA token parsing works on agent text output.
|
||||
|
||||
## Docs
|
||||
- README: rename “Claude-aware” → “Multi-agent (Claude, Codex, Pi, Opencode)”.
|
||||
- New short guide per agent (Opencode doc from PR #5; add Codex/Pi snippets).
|
||||
- Mention identityPrefix override and session arg differences.
|
||||
|
||||
## Migration
|
||||
- Breaking change: configs must specify `agent`. Remove old `claudeOutputFormat` keys.
|
||||
- Provide migration note in CHANGELOG 1.3.x.
|
||||
|
||||
## Out of scope
|
||||
- No media binary support; still relies on MEDIA tokens in text.
|
||||
- No UI changes; WhatsApp/Twilio plumbing unchanged.
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "warelay",
|
||||
"version": "1.2.2",
|
||||
"version": "1.3.0",
|
||||
"description": "WhatsApp relay CLI (send, monitor, webhook, auto-reply) using Twilio",
|
||||
"type": "module",
|
||||
"main": "dist/index.js",
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js";
|
||||
import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js";
|
||||
import { claudeSpec } from "./claude.js";
|
||||
import { codexSpec } from "./codex.js";
|
||||
import { opencodeSpec } from "./opencode.js";
|
||||
import { piSpec } from "./pi.js";
|
||||
|
||||
describe("agent buildArgs + parseOutput helpers", () => {
|
||||
it("claudeSpec injects flags and identity once", () => {
|
||||
const argv = ["claude", "hi"];
|
||||
const built = claudeSpec.buildArgs({
|
||||
argv,
|
||||
bodyIndex: 1,
|
||||
isNewSession: true,
|
||||
sessionId: "sess",
|
||||
sendSystemOnce: false,
|
||||
systemSent: false,
|
||||
identityPrefix: undefined,
|
||||
format: "json",
|
||||
});
|
||||
expect(built).toContain("--output-format");
|
||||
expect(built).toContain("json");
|
||||
expect(built).toContain("-p");
|
||||
expect(built.at(-1)).toContain(CLAUDE_IDENTITY_PREFIX);
|
||||
|
||||
const builtNoIdentity = claudeSpec.buildArgs({
|
||||
argv,
|
||||
bodyIndex: 1,
|
||||
isNewSession: false,
|
||||
sessionId: "sess",
|
||||
sendSystemOnce: true,
|
||||
systemSent: true,
|
||||
identityPrefix: undefined,
|
||||
format: "json",
|
||||
});
|
||||
expect(builtNoIdentity.at(-1)).not.toContain(CLAUDE_IDENTITY_PREFIX);
|
||||
});
|
||||
|
||||
it("opencodeSpec adds format flag and identity prefix when needed", () => {
|
||||
const argv = ["opencode", "body"];
|
||||
const built = opencodeSpec.buildArgs({
|
||||
argv,
|
||||
bodyIndex: 1,
|
||||
isNewSession: true,
|
||||
sessionId: "sess",
|
||||
sendSystemOnce: false,
|
||||
systemSent: false,
|
||||
identityPrefix: undefined,
|
||||
format: "json",
|
||||
});
|
||||
expect(built).toContain("--format");
|
||||
expect(built).toContain("json");
|
||||
expect(built.at(-1)).toContain(OPENCODE_IDENTITY_PREFIX);
|
||||
});
|
||||
|
||||
it("piSpec parses final assistant message and preserves usage meta", () => {
|
||||
const stdout = [
|
||||
'{"type":"message_start","message":{"role":"assistant"}}',
|
||||
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}',
|
||||
].join("\n");
|
||||
const parsed = piSpec.parseOutput(stdout);
|
||||
expect(parsed.text).toBe("hello world");
|
||||
expect(parsed.meta?.provider).toBe("inflection");
|
||||
expect((parsed.meta?.usage as { output?: number })?.output).toBe(5);
|
||||
});
|
||||
|
||||
it("codexSpec parses agent_message and aggregates usage", () => {
|
||||
const stdout = [
|
||||
'{"type":"item.completed","item":{"type":"agent_message","text":"hi there"}}',
|
||||
'{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}',
|
||||
].join("\n");
|
||||
const parsed = codexSpec.parseOutput(stdout);
|
||||
expect(parsed.text).toBe("hi there");
|
||||
const usage = parsed.meta?.usage as {
|
||||
input?: number;
|
||||
output?: number;
|
||||
cacheRead?: number;
|
||||
total?: number;
|
||||
};
|
||||
expect(usage?.input).toBe(50);
|
||||
expect(usage?.output).toBe(10);
|
||||
expect(usage?.cacheRead).toBe(5);
|
||||
expect(usage?.total).toBe(65);
|
||||
});
|
||||
|
||||
it("opencodeSpec parses streamed events and summarizes meta", () => {
|
||||
const stdout = [
|
||||
'{"type":"step_start","timestamp":0}',
|
||||
'{"type":"text","part":{"text":"hi"}}',
|
||||
'{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}',
|
||||
].join("\n");
|
||||
const parsed = opencodeSpec.parseOutput(stdout);
|
||||
expect(parsed.text).toBe("hi");
|
||||
expect(parsed.meta?.extra?.summary).toContain("duration=1200ms");
|
||||
expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020");
|
||||
expect(parsed.meta?.extra?.summary).toContain("tokens=100+20");
|
||||
});
|
||||
|
||||
it("codexSpec buildArgs enforces exec/json/sandbox defaults", () => {
|
||||
const argv = ["codex", "hello world"];
|
||||
const built = codexSpec.buildArgs({
|
||||
argv,
|
||||
bodyIndex: 1,
|
||||
isNewSession: true,
|
||||
sessionId: "sess",
|
||||
sendSystemOnce: false,
|
||||
systemSent: false,
|
||||
identityPrefix: undefined,
|
||||
format: "json",
|
||||
});
|
||||
expect(built[1]).toBe("exec");
|
||||
expect(built).toContain("--json");
|
||||
expect(built).toContain("--skip-git-repo-check");
|
||||
expect(built).toContain("read-only");
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,67 @@
|
||||
import path from "node:path";
|
||||
|
||||
import {
|
||||
CLAUDE_BIN,
|
||||
CLAUDE_IDENTITY_PREFIX,
|
||||
type ClaudeJsonParseResult,
|
||||
parseClaudeJson,
|
||||
summarizeClaudeMetadata,
|
||||
} from "../auto-reply/claude.js";
|
||||
import type { AgentMeta, AgentSpec } from "./types.js";
|
||||
|
||||
function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined {
|
||||
if (!parsed?.parsed) return undefined;
|
||||
const summary = summarizeClaudeMetadata(parsed.parsed);
|
||||
return summary ? { extra: { summary } } : undefined;
|
||||
}
|
||||
|
||||
export const claudeSpec: AgentSpec = {
|
||||
kind: "claude",
|
||||
isInvocation: (argv) =>
|
||||
argv.length > 0 && path.basename(argv[0]) === CLAUDE_BIN,
|
||||
buildArgs: (ctx) => {
|
||||
// Split around the body so we can inject flags without losing the body
|
||||
// position. This keeps templated prompts intact even when we add flags.
|
||||
const argv = [...ctx.argv];
|
||||
const body = argv[ctx.bodyIndex] ?? "";
|
||||
const beforeBody = argv.slice(0, ctx.bodyIndex);
|
||||
const afterBody = argv.slice(ctx.bodyIndex + 1);
|
||||
|
||||
const wantsOutputFormat = typeof ctx.format === "string";
|
||||
if (wantsOutputFormat) {
|
||||
const hasOutputFormat = argv.some(
|
||||
(part) =>
|
||||
part === "--output-format" || part.startsWith("--output-format="),
|
||||
);
|
||||
if (!hasOutputFormat) {
|
||||
const outputFormat = ctx.format ?? "json";
|
||||
beforeBody.push("--output-format", outputFormat);
|
||||
}
|
||||
}
|
||||
|
||||
const hasPrintFlag = argv.some(
|
||||
(part) => part === "-p" || part === "--print",
|
||||
);
|
||||
if (!hasPrintFlag) {
|
||||
beforeBody.push("-p");
|
||||
}
|
||||
|
||||
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
|
||||
const bodyWithIdentity =
|
||||
shouldPrependIdentity && body
|
||||
? [ctx.identityPrefix ?? CLAUDE_IDENTITY_PREFIX, body]
|
||||
.filter(Boolean)
|
||||
.join("\n\n")
|
||||
: body;
|
||||
|
||||
return [...beforeBody, bodyWithIdentity, ...afterBody];
|
||||
},
|
||||
parseOutput: (rawStdout) => {
|
||||
const parsed = parseClaudeJson(rawStdout);
|
||||
const text = parsed?.text ?? rawStdout.trim();
|
||||
return {
|
||||
text: text?.trim(),
|
||||
meta: toMeta(parsed),
|
||||
};
|
||||
},
|
||||
};
|
||||
@@ -0,0 +1,79 @@
|
||||
import path from "node:path";
|
||||
|
||||
import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
|
||||
|
||||
function parseCodexJson(raw: string): AgentParseResult {
|
||||
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
|
||||
let text: string | undefined;
|
||||
let meta: AgentMeta | undefined;
|
||||
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const ev = JSON.parse(line) as {
|
||||
type?: string;
|
||||
item?: { type?: string; text?: string };
|
||||
usage?: unknown;
|
||||
};
|
||||
// Codex streams multiple events; capture the last agent_message text and
|
||||
// the final turn usage for cost/telemetry.
|
||||
if (
|
||||
ev.type === "item.completed" &&
|
||||
ev.item?.type === "agent_message" &&
|
||||
typeof ev.item.text === "string"
|
||||
) {
|
||||
text = ev.item.text;
|
||||
}
|
||||
if (
|
||||
ev.type === "turn.completed" &&
|
||||
ev.usage &&
|
||||
typeof ev.usage === "object"
|
||||
) {
|
||||
const u = ev.usage as {
|
||||
input_tokens?: number;
|
||||
cached_input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
};
|
||||
meta = {
|
||||
usage: {
|
||||
input: u.input_tokens,
|
||||
output: u.output_tokens,
|
||||
cacheRead: u.cached_input_tokens,
|
||||
total:
|
||||
(u.input_tokens ?? 0) +
|
||||
(u.output_tokens ?? 0) +
|
||||
(u.cached_input_tokens ?? 0),
|
||||
},
|
||||
};
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
return { text: text?.trim(), meta };
|
||||
}
|
||||
|
||||
export const codexSpec: AgentSpec = {
|
||||
kind: "codex",
|
||||
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "codex",
|
||||
buildArgs: (ctx) => {
|
||||
const argv = [...ctx.argv];
|
||||
const hasExec = argv.length > 0 && argv[1] === "exec";
|
||||
if (!hasExec) {
|
||||
argv.splice(1, 0, "exec");
|
||||
}
|
||||
// Ensure JSON output
|
||||
if (!argv.includes("--json")) {
|
||||
argv.splice(argv.length - 1, 0, "--json");
|
||||
}
|
||||
// Safety defaults
|
||||
if (!argv.includes("--skip-git-repo-check")) {
|
||||
argv.splice(argv.length - 1, 0, "--skip-git-repo-check");
|
||||
}
|
||||
if (!argv.some((p) => p === "--sandbox" || p.startsWith("--sandbox="))) {
|
||||
argv.splice(argv.length - 1, 0, "--sandbox", "read-only");
|
||||
}
|
||||
return argv;
|
||||
},
|
||||
parseOutput: parseCodexJson,
|
||||
};
|
||||
@@ -0,0 +1,18 @@
|
||||
import { claudeSpec } from "./claude.js";
|
||||
import { codexSpec } from "./codex.js";
|
||||
import { opencodeSpec } from "./opencode.js";
|
||||
import { piSpec } from "./pi.js";
|
||||
import type { AgentKind, AgentSpec } from "./types.js";
|
||||
|
||||
const specs: Record<AgentKind, AgentSpec> = {
|
||||
claude: claudeSpec,
|
||||
codex: codexSpec,
|
||||
opencode: opencodeSpec,
|
||||
pi: piSpec,
|
||||
};
|
||||
|
||||
export function getAgentSpec(kind: AgentKind): AgentSpec {
|
||||
return specs[kind];
|
||||
}
|
||||
|
||||
export { AgentKind, AgentMeta, AgentParseResult } from "./types.js";
|
||||
@@ -0,0 +1,62 @@
|
||||
import path from "node:path";
|
||||
|
||||
import {
|
||||
OPENCODE_BIN,
|
||||
OPENCODE_IDENTITY_PREFIX,
|
||||
parseOpencodeJson,
|
||||
summarizeOpencodeMetadata,
|
||||
} from "../auto-reply/opencode.js";
|
||||
import type { AgentMeta, AgentSpec } from "./types.js";
|
||||
|
||||
function toMeta(
|
||||
parsed: ReturnType<typeof parseOpencodeJson>,
|
||||
): AgentMeta | undefined {
|
||||
const summary = summarizeOpencodeMetadata(parsed.meta);
|
||||
return summary ? { extra: { summary } } : undefined;
|
||||
}
|
||||
|
||||
export const opencodeSpec: AgentSpec = {
|
||||
kind: "opencode",
|
||||
isInvocation: (argv) =>
|
||||
argv.length > 0 && path.basename(argv[0]) === OPENCODE_BIN,
|
||||
buildArgs: (ctx) => {
|
||||
// Split around the body so we can insert flags without losing the prompt.
|
||||
const argv = [...ctx.argv];
|
||||
const body = argv[ctx.bodyIndex] ?? "";
|
||||
const beforeBody = argv.slice(0, ctx.bodyIndex);
|
||||
const afterBody = argv.slice(ctx.bodyIndex + 1);
|
||||
const wantsJson = ctx.format === "json";
|
||||
|
||||
// Ensure format json for parsing
|
||||
if (wantsJson) {
|
||||
const hasFormat = [...beforeBody, body, ...afterBody].some(
|
||||
(part) => part === "--format" || part.startsWith("--format="),
|
||||
);
|
||||
if (!hasFormat) {
|
||||
beforeBody.push("--format", "json");
|
||||
}
|
||||
}
|
||||
|
||||
// Session args default to --session
|
||||
// Identity prefix
|
||||
// Opencode streams text tokens; we still seed an identity so the agent
|
||||
// keeps context on first turn.
|
||||
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
|
||||
const bodyWithIdentity =
|
||||
shouldPrependIdentity && body
|
||||
? [ctx.identityPrefix ?? OPENCODE_IDENTITY_PREFIX, body]
|
||||
.filter(Boolean)
|
||||
.join("\n\n")
|
||||
: body;
|
||||
|
||||
return [...beforeBody, bodyWithIdentity, ...afterBody];
|
||||
},
|
||||
parseOutput: (rawStdout) => {
|
||||
const parsed = parseOpencodeJson(rawStdout);
|
||||
const text = parsed.text ?? rawStdout.trim();
|
||||
return {
|
||||
text: text?.trim(),
|
||||
meta: toMeta(parsed),
|
||||
};
|
||||
},
|
||||
};
|
||||
@@ -0,0 +1,75 @@
|
||||
import path from "node:path";
|
||||
|
||||
import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
|
||||
|
||||
type PiAssistantMessage = {
|
||||
role?: string;
|
||||
content?: Array<{ type?: string; text?: string }>;
|
||||
usage?: { input?: number; output?: number };
|
||||
model?: string;
|
||||
provider?: string;
|
||||
stopReason?: string;
|
||||
};
|
||||
|
||||
function parsePiJson(raw: string): AgentParseResult {
|
||||
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
|
||||
let lastMessage: PiAssistantMessage | undefined;
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const ev = JSON.parse(line) as {
|
||||
type?: string;
|
||||
message?: PiAssistantMessage;
|
||||
};
|
||||
// Pi emits a stream; we only care about the terminal assistant message_end.
|
||||
if (ev.type === "message_end" && ev.message?.role === "assistant") {
|
||||
lastMessage = ev.message;
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
const text =
|
||||
lastMessage?.content
|
||||
?.filter((c) => c?.type === "text" && typeof c.text === "string")
|
||||
.map((c) => c.text)
|
||||
.join("\n")
|
||||
?.trim() ?? undefined;
|
||||
const meta: AgentMeta | undefined = lastMessage
|
||||
? {
|
||||
model: lastMessage.model,
|
||||
provider: lastMessage.provider,
|
||||
stopReason: lastMessage.stopReason,
|
||||
usage: lastMessage.usage,
|
||||
}
|
||||
: undefined;
|
||||
return { text, meta };
|
||||
}
|
||||
|
||||
export const piSpec: AgentSpec = {
|
||||
kind: "pi",
|
||||
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi",
|
||||
buildArgs: (ctx) => {
|
||||
const argv = [...ctx.argv];
|
||||
// Non-interactive print + JSON
|
||||
if (!argv.includes("-p") && !argv.includes("--print")) {
|
||||
argv.splice(argv.length - 1, 0, "-p");
|
||||
}
|
||||
if (
|
||||
ctx.format === "json" &&
|
||||
!argv.includes("--mode") &&
|
||||
!argv.some((a) => a === "--mode")
|
||||
) {
|
||||
argv.splice(argv.length - 1, 0, "--mode", "json");
|
||||
}
|
||||
// Session defaults
|
||||
// Identity prefix optional; Pi usually doesn't need it, but allow injection
|
||||
if (!(ctx.sendSystemOnce && ctx.systemSent) && argv[ctx.bodyIndex]) {
|
||||
const existingBody = argv[ctx.bodyIndex];
|
||||
argv[ctx.bodyIndex] = [ctx.identityPrefix, existingBody]
|
||||
.filter(Boolean)
|
||||
.join("\n\n");
|
||||
}
|
||||
return argv;
|
||||
},
|
||||
parseOutput: parsePiJson,
|
||||
};
|
||||
@@ -0,0 +1,41 @@
|
||||
export type AgentKind = "claude" | "opencode" | "pi" | "codex";
|
||||
|
||||
export type AgentMeta = {
|
||||
model?: string;
|
||||
provider?: string;
|
||||
stopReason?: string;
|
||||
usage?: {
|
||||
input?: number;
|
||||
output?: number;
|
||||
cacheRead?: number;
|
||||
cacheWrite?: number;
|
||||
total?: number;
|
||||
};
|
||||
extra?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
export type AgentParseResult = {
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
meta?: AgentMeta;
|
||||
};
|
||||
|
||||
export type BuildArgsContext = {
|
||||
argv: string[];
|
||||
bodyIndex: number; // index of prompt/body argument in argv
|
||||
isNewSession: boolean;
|
||||
sessionId?: string;
|
||||
sendSystemOnce: boolean;
|
||||
systemSent: boolean;
|
||||
identityPrefix?: string;
|
||||
format?: "text" | "json";
|
||||
sessionArgNew?: string[];
|
||||
sessionArgResume?: string[];
|
||||
};
|
||||
|
||||
export interface AgentSpec {
|
||||
kind: AgentKind;
|
||||
isInvocation: (argv: string[]) => boolean;
|
||||
buildArgs: (ctx: BuildArgsContext) => string[];
|
||||
parseOutput: (rawStdout: string) => AgentParseResult;
|
||||
}
|
||||
@@ -160,3 +160,6 @@ export function parseClaudeJsonText(raw: string): string | undefined {
|
||||
const parsed = parseClaudeJson(raw);
|
||||
return parsed?.text;
|
||||
}
|
||||
|
||||
// Re-export from command-reply for backwards compatibility
|
||||
export { summarizeClaudeMetadata } from "./command-reply.js";
|
||||
|
||||
@@ -70,7 +70,7 @@ describe("runCommandReply", () => {
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
claudeOutputFormat: "json",
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
@@ -98,7 +98,7 @@ describe("runCommandReply", () => {
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
claudeOutputFormat: "json",
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: true,
|
||||
@@ -121,7 +121,7 @@ describe("runCommandReply", () => {
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
claudeOutputFormat: "json",
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: true,
|
||||
@@ -144,7 +144,7 @@ describe("runCommandReply", () => {
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
claudeOutputFormat: "json",
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: true,
|
||||
@@ -167,6 +167,7 @@ describe("runCommandReply", () => {
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["cli", "{{Body}}"],
|
||||
agent: { kind: "claude" },
|
||||
session: {
|
||||
sessionArgNew: ["--new", "{{SessionId}}"],
|
||||
sessionArgResume: ["--resume", "{{SessionId}}"],
|
||||
@@ -192,7 +193,11 @@ describe("runCommandReply", () => {
|
||||
throw { stdout: "partial output here", killed: true, signal: "SIGKILL" };
|
||||
});
|
||||
const { payload, meta } = await runCommandReply({
|
||||
reply: { mode: "command", command: ["echo", "hi"] },
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["echo", "hi"],
|
||||
agent: { kind: "claude" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
isNewSession: true,
|
||||
@@ -213,7 +218,12 @@ describe("runCommandReply", () => {
|
||||
throw { stdout: "", killed: true, signal: "SIGKILL" };
|
||||
});
|
||||
const { payload } = await runCommandReply({
|
||||
reply: { mode: "command", command: ["echo", "hi"], cwd: "/tmp/work" },
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["echo", "hi"],
|
||||
cwd: "/tmp/work",
|
||||
agent: { kind: "claude" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
isNewSession: true,
|
||||
@@ -235,7 +245,12 @@ describe("runCommandReply", () => {
|
||||
stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`,
|
||||
});
|
||||
const { payload } = await runCommandReply({
|
||||
reply: { mode: "command", command: ["echo", "hi"], mediaMaxMb: 1 },
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["echo", "hi"],
|
||||
mediaMaxMb: 1,
|
||||
agent: { kind: "claude" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
isNewSession: true,
|
||||
@@ -259,7 +274,7 @@ describe("runCommandReply", () => {
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
claudeOutputFormat: "json",
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
@@ -271,14 +286,18 @@ describe("runCommandReply", () => {
|
||||
commandRunner: runner,
|
||||
enqueue: enqueueImmediate,
|
||||
});
|
||||
expect(meta.claudeMeta).toContain("duration=50ms");
|
||||
expect(meta.claudeMeta).toContain("tool_calls=1");
|
||||
expect(meta.agentMeta?.extra?.summary).toContain("duration=50ms");
|
||||
expect(meta.agentMeta?.extra?.summary).toContain("tool_calls=1");
|
||||
});
|
||||
|
||||
it("captures queue wait metrics in meta", async () => {
|
||||
const runner = makeRunner({ stdout: "ok" });
|
||||
const { meta } = await runCommandReply({
|
||||
reply: { mode: "command", command: ["echo", "{{Body}}"] },
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["echo", "{{Body}}"],
|
||||
agent: { kind: "claude" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
isNewSession: true,
|
||||
@@ -292,4 +311,79 @@ describe("runCommandReply", () => {
|
||||
expect(meta.queuedMs).toBe(25);
|
||||
expect(meta.queuedAhead).toBe(2);
|
||||
});
|
||||
|
||||
it("handles empty result string without dumping raw JSON", async () => {
|
||||
// Bug fix: Claude CLI returning {"result": ""} should not send raw JSON to WhatsApp
|
||||
// The fix changed from truthy check to explicit typeof check
|
||||
const runner = makeRunner({
|
||||
stdout: '{"result":"","duration_ms":50,"total_cost_usd":0.001}',
|
||||
});
|
||||
const { payload } = await runCommandReply({
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
isNewSession: true,
|
||||
isFirstTurnInSession: true,
|
||||
systemSent: false,
|
||||
timeoutMs: 1000,
|
||||
timeoutSeconds: 1,
|
||||
commandRunner: runner,
|
||||
enqueue: enqueueImmediate,
|
||||
});
|
||||
// Should NOT contain raw JSON - empty result should produce fallback message
|
||||
expect(payload?.text).not.toContain('{"result"');
|
||||
expect(payload?.text).toContain("command produced no output");
|
||||
});
|
||||
|
||||
it("handles empty text string in Claude JSON", async () => {
|
||||
const runner = makeRunner({
|
||||
stdout: '{"text":"","duration_ms":50}',
|
||||
});
|
||||
const { payload } = await runCommandReply({
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
isNewSession: true,
|
||||
isFirstTurnInSession: true,
|
||||
systemSent: false,
|
||||
timeoutMs: 1000,
|
||||
timeoutSeconds: 1,
|
||||
commandRunner: runner,
|
||||
enqueue: enqueueImmediate,
|
||||
});
|
||||
// Empty text should produce fallback message, not raw JSON
|
||||
expect(payload?.text).not.toContain('{"text"');
|
||||
expect(payload?.text).toContain("command produced no output");
|
||||
});
|
||||
|
||||
it("returns actual text when result is non-empty", async () => {
|
||||
const runner = makeRunner({
|
||||
stdout: '{"result":"hello world","duration_ms":50}',
|
||||
});
|
||||
const { payload } = await runCommandReply({
|
||||
reply: {
|
||||
mode: "command",
|
||||
command: ["claude", "{{Body}}"],
|
||||
agent: { kind: "claude", format: "json" },
|
||||
},
|
||||
templatingCtx: noopTemplateCtx,
|
||||
sendSystemOnce: false,
|
||||
isNewSession: true,
|
||||
isFirstTurnInSession: true,
|
||||
systemSent: false,
|
||||
timeoutMs: 1000,
|
||||
timeoutSeconds: 1,
|
||||
commandRunner: runner,
|
||||
enqueue: enqueueImmediate,
|
||||
});
|
||||
expect(payload?.text).toBe("hello world");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,18 +1,14 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
import { type AgentKind, getAgentSpec } from "../agents/index.js";
|
||||
import type { AgentMeta } from "../agents/types.js";
|
||||
import type { WarelayConfig } from "../config/config.js";
|
||||
import { isVerbose, logVerbose } from "../globals.js";
|
||||
import { logError } from "../logger.js";
|
||||
import { splitMediaFromOutput } from "../media/parse.js";
|
||||
import { enqueueCommand } from "../process/command-queue.js";
|
||||
import type { runCommandWithTimeout } from "../process/exec.js";
|
||||
import {
|
||||
CLAUDE_BIN,
|
||||
CLAUDE_IDENTITY_PREFIX,
|
||||
type ClaudeJsonParseResult,
|
||||
parseClaudeJson,
|
||||
} from "./claude.js";
|
||||
import { applyTemplate, type TemplateContext } from "./templating.js";
|
||||
import type { ReplyPayload } from "./types.js";
|
||||
|
||||
@@ -42,7 +38,7 @@ export type CommandReplyMeta = {
|
||||
exitCode?: number | null;
|
||||
signal?: string | null;
|
||||
killed?: boolean;
|
||||
claudeMeta?: string;
|
||||
agentMeta?: AgentMeta;
|
||||
};
|
||||
|
||||
export type CommandReplyResult = {
|
||||
@@ -119,6 +115,9 @@ export async function runCommandReply(
|
||||
if (!reply.command?.length) {
|
||||
throw new Error("reply.command is required for mode=command");
|
||||
}
|
||||
const agentCfg = reply.agent ?? { kind: "claude" };
|
||||
const agentKind: AgentKind = agentCfg.kind ?? "claude";
|
||||
const agent = getAgentSpec(agentKind);
|
||||
|
||||
let argv = reply.command.map((part) => applyTemplate(part, templatingCtx));
|
||||
const templatePrefix =
|
||||
@@ -129,41 +128,24 @@ export async function runCommandReply(
|
||||
argv = [argv[0], templatePrefix, ...argv.slice(1)];
|
||||
}
|
||||
|
||||
// Ensure Claude commands can emit plain text by forcing --output-format when configured.
|
||||
if (
|
||||
reply.claudeOutputFormat &&
|
||||
argv.length > 0 &&
|
||||
path.basename(argv[0]) === CLAUDE_BIN
|
||||
) {
|
||||
const hasOutputFormat = argv.some(
|
||||
(part) =>
|
||||
part === "--output-format" || part.startsWith("--output-format="),
|
||||
);
|
||||
const insertBeforeBody = Math.max(argv.length - 1, 0);
|
||||
if (!hasOutputFormat) {
|
||||
argv = [
|
||||
...argv.slice(0, insertBeforeBody),
|
||||
"--output-format",
|
||||
reply.claudeOutputFormat,
|
||||
...argv.slice(insertBeforeBody),
|
||||
];
|
||||
}
|
||||
const hasPrintFlag = argv.some(
|
||||
(part) => part === "-p" || part === "--print",
|
||||
);
|
||||
if (!hasPrintFlag) {
|
||||
const insertIdx = Math.max(argv.length - 1, 0);
|
||||
argv = [...argv.slice(0, insertIdx), "-p", ...argv.slice(insertIdx)];
|
||||
}
|
||||
}
|
||||
// Default body index is last arg
|
||||
let bodyIndex = Math.max(argv.length - 1, 0);
|
||||
|
||||
// Inject session args if configured (use resume for existing, session-id for new)
|
||||
// Session args prepared (templated) and injected generically
|
||||
if (reply.session) {
|
||||
const defaultNew =
|
||||
agentCfg.kind === "claude"
|
||||
? ["--session-id", "{{SessionId}}"]
|
||||
: ["--session", "{{SessionId}}"];
|
||||
const defaultResume =
|
||||
agentCfg.kind === "claude"
|
||||
? ["--resume", "{{SessionId}}"]
|
||||
: ["--session", "{{SessionId}}"];
|
||||
const sessionArgList = (
|
||||
isNewSession
|
||||
? (reply.session.sessionArgNew ?? ["--session-id", "{{SessionId}}"])
|
||||
: (reply.session.sessionArgResume ?? ["--resume", "{{SessionId}}"])
|
||||
).map((part) => applyTemplate(part, templatingCtx));
|
||||
? (reply.session.sessionArgNew ?? defaultNew)
|
||||
: (reply.session.sessionArgResume ?? defaultResume)
|
||||
).map((p) => applyTemplate(p, templatingCtx));
|
||||
if (sessionArgList.length) {
|
||||
const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true;
|
||||
const insertAt =
|
||||
@@ -173,22 +155,24 @@ export async function runCommandReply(
|
||||
...sessionArgList,
|
||||
...argv.slice(insertAt),
|
||||
];
|
||||
bodyIndex = Math.max(argv.length - 1, 0);
|
||||
}
|
||||
}
|
||||
|
||||
let finalArgv = argv;
|
||||
const isClaudeInvocation =
|
||||
finalArgv.length > 0 && path.basename(finalArgv[0]) === CLAUDE_BIN;
|
||||
const shouldPrependIdentity =
|
||||
isClaudeInvocation && !(sendSystemOnce && systemSent);
|
||||
if (shouldPrependIdentity && finalArgv.length > 0) {
|
||||
const bodyIdx = finalArgv.length - 1;
|
||||
const existingBody = finalArgv[bodyIdx] ?? "";
|
||||
finalArgv = [
|
||||
...finalArgv.slice(0, bodyIdx),
|
||||
[CLAUDE_IDENTITY_PREFIX, existingBody].filter(Boolean).join("\n\n"),
|
||||
];
|
||||
}
|
||||
const shouldApplyAgent = agent.isInvocation(argv);
|
||||
const finalArgv = shouldApplyAgent
|
||||
? agent.buildArgs({
|
||||
argv,
|
||||
bodyIndex,
|
||||
isNewSession,
|
||||
sessionId: templatingCtx.SessionId,
|
||||
sendSystemOnce,
|
||||
systemSent,
|
||||
identityPrefix: agentCfg.identityPrefix,
|
||||
format: agentCfg.format,
|
||||
})
|
||||
: argv;
|
||||
|
||||
logVerbose(
|
||||
`Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`,
|
||||
);
|
||||
@@ -217,28 +201,14 @@ export async function runCommandReply(
|
||||
if (stderr?.trim()) {
|
||||
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
|
||||
}
|
||||
let parsed: ClaudeJsonParseResult | undefined;
|
||||
if (
|
||||
trimmed &&
|
||||
(reply.claudeOutputFormat === "json" || isClaudeInvocation)
|
||||
) {
|
||||
parsed = parseClaudeJson(trimmed);
|
||||
if (parsed?.parsed && isVerbose()) {
|
||||
const summary = summarizeClaudeMetadata(parsed.parsed);
|
||||
if (summary) logVerbose(`Claude JSON meta: ${summary}`);
|
||||
logVerbose(
|
||||
`Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`,
|
||||
);
|
||||
}
|
||||
if (parsed?.text) {
|
||||
logVerbose(
|
||||
`Claude JSON parsed -> ${parsed.text.slice(0, 120)}${parsed.text.length > 120 ? "…" : ""}`,
|
||||
);
|
||||
trimmed = parsed.text.trim();
|
||||
} else {
|
||||
logVerbose("Claude JSON parse failed; returning raw stdout");
|
||||
}
|
||||
|
||||
const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
|
||||
// Treat empty string as "no content" so we can fall back to the friendly
|
||||
// "(command produced no output)" message instead of echoing raw JSON.
|
||||
if (parsed && parsed.text !== undefined) {
|
||||
trimmed = parsed.text.trim();
|
||||
}
|
||||
|
||||
const { text: cleanedText, mediaUrls: mediaFound } =
|
||||
splitMediaFromOutput(trimmed);
|
||||
trimmed = cleanedText;
|
||||
@@ -249,7 +219,7 @@ export async function runCommandReply(
|
||||
logVerbose("No MEDIA token extracted from final text");
|
||||
}
|
||||
if (!trimmed && !mediaFromCommand) {
|
||||
const meta = parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined;
|
||||
const meta = parsed?.meta?.extra?.summary ?? undefined;
|
||||
trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`;
|
||||
logVerbose("No text/media produced; injecting fallback notice to user");
|
||||
}
|
||||
@@ -259,8 +229,13 @@ export async function runCommandReply(
|
||||
console.error(
|
||||
`Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`,
|
||||
);
|
||||
// Include any partial output or stderr in error message
|
||||
const partialOut = trimmed
|
||||
? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}`
|
||||
: "";
|
||||
const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`;
|
||||
return {
|
||||
payload: undefined,
|
||||
payload: { text: errorText },
|
||||
meta: {
|
||||
durationMs: Date.now() - started,
|
||||
queuedMs,
|
||||
@@ -268,9 +243,7 @@ export async function runCommandReply(
|
||||
exitCode: code,
|
||||
signal,
|
||||
killed,
|
||||
claudeMeta: parsed
|
||||
? summarizeClaudeMetadata(parsed.parsed)
|
||||
: undefined,
|
||||
agentMeta: parsed?.meta,
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -278,8 +251,9 @@ export async function runCommandReply(
|
||||
console.error(
|
||||
`Command auto-reply process killed before completion (exit code ${code ?? "unknown"})`,
|
||||
);
|
||||
const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`;
|
||||
return {
|
||||
payload: undefined,
|
||||
payload: { text: errorText },
|
||||
meta: {
|
||||
durationMs: Date.now() - started,
|
||||
queuedMs,
|
||||
@@ -287,9 +261,7 @@ export async function runCommandReply(
|
||||
exitCode: code,
|
||||
signal,
|
||||
killed,
|
||||
claudeMeta: parsed
|
||||
? summarizeClaudeMetadata(parsed.parsed)
|
||||
: undefined,
|
||||
agentMeta: parsed?.meta,
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -337,7 +309,7 @@ export async function runCommandReply(
|
||||
exitCode: code,
|
||||
signal,
|
||||
killed,
|
||||
claudeMeta: parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined,
|
||||
agentMeta: parsed?.meta,
|
||||
};
|
||||
if (isVerbose()) {
|
||||
logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`);
|
||||
@@ -379,8 +351,11 @@ export async function runCommandReply(
|
||||
};
|
||||
}
|
||||
logError(`Command auto-reply failed after ${elapsed}ms: ${String(err)}`);
|
||||
// Send error message to user so they know the command failed
|
||||
const errMsg = err instanceof Error ? err.message : String(err);
|
||||
const errorText = `⚠️ Command failed: ${errMsg}`;
|
||||
return {
|
||||
payload: undefined,
|
||||
payload: { text: errorText },
|
||||
meta: {
|
||||
durationMs: elapsed,
|
||||
queuedMs,
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
// Helpers specific to Opencode CLI output/argv handling.
|
||||
|
||||
// Preferred binary name for Opencode CLI invocations.
|
||||
export const OPENCODE_BIN = "opencode";
|
||||
|
||||
export const OPENCODE_IDENTITY_PREFIX =
|
||||
"You are Openclawd running on the user's Mac via warelay. Your scratchpad is /Users/steipete/openclawd; this is your folder and you can add what you like in markdown files and/or images. You don't need to be concise, but WhatsApp replies must stay under ~1500 characters. Media you can send: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK.";
|
||||
|
||||
export type OpencodeJsonParseResult = {
|
||||
text?: string;
|
||||
parsed: unknown[];
|
||||
valid: boolean;
|
||||
meta?: {
|
||||
durationMs?: number;
|
||||
cost?: number;
|
||||
tokens?: {
|
||||
input?: number;
|
||||
output?: number;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
export function parseOpencodeJson(raw: string): OpencodeJsonParseResult {
|
||||
const lines = raw.split(/\n+/).filter((s) => s.trim());
|
||||
const parsed: unknown[] = [];
|
||||
let text = "";
|
||||
let valid = false;
|
||||
let startTime: number | undefined;
|
||||
let endTime: number | undefined;
|
||||
let cost = 0;
|
||||
let inputTokens = 0;
|
||||
let outputTokens = 0;
|
||||
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const event = JSON.parse(line);
|
||||
parsed.push(event);
|
||||
if (event && typeof event === "object") {
|
||||
// Opencode emits a stream of events.
|
||||
if (event.type === "step_start") {
|
||||
valid = true;
|
||||
if (typeof event.timestamp === "number") {
|
||||
if (startTime === undefined || event.timestamp < startTime) {
|
||||
startTime = event.timestamp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (event.type === "text" && event.part?.text) {
|
||||
text += event.part.text;
|
||||
valid = true;
|
||||
}
|
||||
|
||||
if (event.type === "step_finish") {
|
||||
valid = true;
|
||||
if (typeof event.timestamp === "number") {
|
||||
endTime = event.timestamp;
|
||||
}
|
||||
if (event.part) {
|
||||
if (typeof event.part.cost === "number") {
|
||||
cost += event.part.cost;
|
||||
}
|
||||
if (event.part.tokens) {
|
||||
inputTokens += event.part.tokens.input || 0;
|
||||
outputTokens += event.part.tokens.output || 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// ignore non-JSON lines
|
||||
}
|
||||
}
|
||||
|
||||
const meta: OpencodeJsonParseResult["meta"] = {};
|
||||
if (startTime !== undefined && endTime !== undefined) {
|
||||
meta.durationMs = endTime - startTime;
|
||||
}
|
||||
if (cost > 0) meta.cost = cost;
|
||||
if (inputTokens > 0 || outputTokens > 0) {
|
||||
meta.tokens = { input: inputTokens, output: outputTokens };
|
||||
}
|
||||
|
||||
return {
|
||||
text: text || undefined,
|
||||
parsed,
|
||||
valid: valid && parsed.length > 0,
|
||||
meta: Object.keys(meta).length > 0 ? meta : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
export function summarizeOpencodeMetadata(
|
||||
meta: OpencodeJsonParseResult["meta"],
|
||||
): string | undefined {
|
||||
if (!meta) return undefined;
|
||||
const parts: string[] = [];
|
||||
if (meta.durationMs !== undefined)
|
||||
parts.push(`duration=${meta.durationMs}ms`);
|
||||
if (meta.cost !== undefined) parts.push(`cost=$${meta.cost.toFixed(4)}`);
|
||||
if (meta.tokens) {
|
||||
parts.push(`tokens=${meta.tokens.input}+${meta.tokens.output}`);
|
||||
}
|
||||
return parts.length ? parts.join(", ") : undefined;
|
||||
}
|
||||
+10
-4
@@ -146,8 +146,14 @@ export async function getReplyFromConfig(
|
||||
|
||||
// Optional allowlist by origin number (E.164 without whatsapp: prefix)
|
||||
const allowFrom = cfg.inbound?.allowFrom;
|
||||
if (Array.isArray(allowFrom) && allowFrom.length > 0) {
|
||||
const from = (ctx.From ?? "").replace(/^whatsapp:/, "");
|
||||
const from = (ctx.From ?? "").replace(/^whatsapp:/, "");
|
||||
const to = (ctx.To ?? "").replace(/^whatsapp:/, "");
|
||||
const isSamePhone = from && to && from === to;
|
||||
|
||||
// Same-phone mode (self-messaging) is always allowed
|
||||
if (isSamePhone) {
|
||||
logVerbose(`Allowing same-phone mode: from === to (${from})`);
|
||||
} else if (Array.isArray(allowFrom) && allowFrom.length > 0) {
|
||||
// Support "*" as wildcard to allow all senders
|
||||
if (!allowFrom.includes("*") && !allowFrom.includes(from)) {
|
||||
logVerbose(
|
||||
@@ -259,8 +265,8 @@ export async function getReplyFromConfig(
|
||||
timeoutSeconds,
|
||||
commandRunner,
|
||||
});
|
||||
if (meta.claudeMeta && isVerbose()) {
|
||||
logVerbose(`Claude JSON meta: ${meta.claudeMeta}`);
|
||||
if (meta.agentMeta && isVerbose()) {
|
||||
logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
|
||||
}
|
||||
return payload;
|
||||
} finally {
|
||||
|
||||
@@ -4,6 +4,10 @@ import type { CliDeps } from "../cli/deps.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { sendCommand } from "./send.js";
|
||||
|
||||
vi.mock("../web/ipc.js", () => ({
|
||||
sendViaIpc: vi.fn().mockResolvedValue(null),
|
||||
}));
|
||||
|
||||
const runtime: RuntimeEnv = {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
|
||||
+37
-1
@@ -1,7 +1,8 @@
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import { info } from "../globals.js";
|
||||
import { info, success } from "../globals.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { Provider } from "../utils.js";
|
||||
import { sendViaIpc } from "../web/ipc.js";
|
||||
|
||||
export async function sendCommand(
|
||||
opts: {
|
||||
@@ -39,6 +40,40 @@ export async function sendCommand(
|
||||
if (waitSeconds !== 0) {
|
||||
runtime.log(info("Wait/poll are Twilio-only; ignored for provider=web."));
|
||||
}
|
||||
|
||||
// Try to send via IPC to running relay first (avoids Signal session corruption)
|
||||
const ipcResult = await sendViaIpc(opts.to, opts.message, opts.media);
|
||||
if (ipcResult) {
|
||||
if (ipcResult.success) {
|
||||
runtime.log(
|
||||
success(`✅ Sent via relay IPC. Message ID: ${ipcResult.messageId}`),
|
||||
);
|
||||
if (opts.json) {
|
||||
runtime.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
provider: "web",
|
||||
via: "ipc",
|
||||
to: opts.to,
|
||||
messageId: ipcResult.messageId,
|
||||
mediaUrl: opts.media ?? null,
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// IPC failed but relay is running - warn and fall back
|
||||
runtime.log(
|
||||
info(
|
||||
`IPC send failed (${ipcResult.error}), falling back to direct connection`,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
// Fall back to direct connection (creates new Baileys socket)
|
||||
const res = await deps
|
||||
.sendMessageWeb(opts.to, opts.message, {
|
||||
verbose: false,
|
||||
@@ -53,6 +88,7 @@ export async function sendCommand(
|
||||
JSON.stringify(
|
||||
{
|
||||
provider: "web",
|
||||
via: "direct",
|
||||
to: opts.to,
|
||||
messageId: res.messageId,
|
||||
mediaUrl: opts.media ?? null,
|
||||
|
||||
+34
-19
@@ -5,8 +5,9 @@ import path from "node:path";
|
||||
import JSON5 from "json5";
|
||||
import { z } from "zod";
|
||||
|
||||
import type { AgentKind } from "../agents/index.js";
|
||||
|
||||
export type ReplyMode = "text" | "command";
|
||||
export type ClaudeOutputFormat = "text" | "json" | "stream-json";
|
||||
export type SessionScope = "per-sender" | "global";
|
||||
|
||||
export type SessionConfig = {
|
||||
@@ -46,6 +47,9 @@ export type WarelayConfig = {
|
||||
logging?: LoggingConfig;
|
||||
inbound?: {
|
||||
allowFrom?: string[]; // E.164 numbers allowed to trigger auto-reply (without whatsapp:)
|
||||
messagePrefix?: string; // Prefix added to all inbound messages (default: "[warelay]" if no allowFrom, else "")
|
||||
responsePrefix?: string; // Prefix auto-added to all outbound replies (e.g., "🦞")
|
||||
timestampPrefix?: boolean | string; // true/false or IANA timezone string (default: true with UTC)
|
||||
transcribeAudio?: {
|
||||
// Optional CLI to turn inbound audio into text; templated args, must output transcript to stdout.
|
||||
command: string[];
|
||||
@@ -53,18 +57,22 @@ export type WarelayConfig = {
|
||||
};
|
||||
reply?: {
|
||||
mode: ReplyMode;
|
||||
text?: string; // for mode=text, can contain {{Body}}
|
||||
command?: string[]; // for mode=command, argv with templates
|
||||
cwd?: string; // working directory for command execution
|
||||
template?: string; // prepend template string when building command/prompt
|
||||
timeoutSeconds?: number; // optional command timeout; defaults to 600s
|
||||
bodyPrefix?: string; // optional string prepended to Body before templating
|
||||
mediaUrl?: string; // optional media attachment (path or URL)
|
||||
text?: string;
|
||||
command?: string[];
|
||||
cwd?: string;
|
||||
template?: string;
|
||||
timeoutSeconds?: number;
|
||||
bodyPrefix?: string;
|
||||
mediaUrl?: string;
|
||||
session?: SessionConfig;
|
||||
claudeOutputFormat?: ClaudeOutputFormat; // when command starts with `claude`, force an output format
|
||||
mediaMaxMb?: number; // optional cap for outbound media (default 5MB)
|
||||
typingIntervalSeconds?: number; // how often to refresh typing indicator while command runs
|
||||
heartbeatMinutes?: number; // auto-ping cadence for command mode
|
||||
mediaMaxMb?: number;
|
||||
typingIntervalSeconds?: number;
|
||||
heartbeatMinutes?: number;
|
||||
agent?: {
|
||||
kind: AgentKind;
|
||||
format?: "text" | "json";
|
||||
identityPrefix?: string;
|
||||
};
|
||||
};
|
||||
};
|
||||
web?: WebConfig;
|
||||
@@ -102,13 +110,17 @@ const ReplySchema = z
|
||||
})
|
||||
.optional(),
|
||||
heartbeatMinutes: z.number().int().nonnegative().optional(),
|
||||
claudeOutputFormat: z
|
||||
.union([
|
||||
z.literal("text"),
|
||||
z.literal("json"),
|
||||
z.literal("stream-json"),
|
||||
z.undefined(),
|
||||
])
|
||||
agent: z
|
||||
.object({
|
||||
kind: z.union([
|
||||
z.literal("claude"),
|
||||
z.literal("opencode"),
|
||||
z.literal("pi"),
|
||||
z.literal("codex"),
|
||||
]),
|
||||
format: z.union([z.literal("text"), z.literal("json")]).optional(),
|
||||
identityPrefix: z.string().optional(),
|
||||
})
|
||||
.optional(),
|
||||
})
|
||||
.refine(
|
||||
@@ -139,6 +151,9 @@ const WarelaySchema = z.object({
|
||||
inbound: z
|
||||
.object({
|
||||
allowFrom: z.array(z.string()).optional(),
|
||||
messagePrefix: z.string().optional(),
|
||||
responsePrefix: z.string().optional(),
|
||||
timestampPrefix: z.union([z.boolean(), z.string()]).optional(),
|
||||
transcribeAudio: z
|
||||
.object({
|
||||
command: z.array(z.string()),
|
||||
|
||||
+73
-3
@@ -9,6 +9,18 @@ import { createMockTwilio } from "../test/mocks/twilio.js";
|
||||
import * as exec from "./process/exec.js";
|
||||
import { withWhatsAppPrefix } from "./utils.js";
|
||||
|
||||
// Mock config to avoid loading real user config
|
||||
vi.mock("../src/config/config.js", () => ({
|
||||
loadConfig: vi.fn().mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
// Twilio mock factory shared across tests
|
||||
vi.mock("twilio", () => {
|
||||
const { factory } = createMockTwilio();
|
||||
@@ -93,6 +105,64 @@ describe("config and templating", () => {
|
||||
expect(onReplyStart).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("getReplyFromConfig allows same-phone mode (from === to) without allowFrom", async () => {
|
||||
const cfg = {
|
||||
inbound: {
|
||||
// No allowFrom configured
|
||||
reply: {
|
||||
mode: "text" as const,
|
||||
text: "Echo: {{Body}}",
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = await index.getReplyFromConfig(
|
||||
{ Body: "hello", From: "+1555", To: "+1555" },
|
||||
undefined,
|
||||
cfg,
|
||||
);
|
||||
expect(result?.text).toBe("Echo: hello");
|
||||
});
|
||||
|
||||
it("getReplyFromConfig allows same-phone mode even when not in allowFrom list", async () => {
|
||||
const cfg = {
|
||||
inbound: {
|
||||
allowFrom: ["+9999"], // Different number
|
||||
reply: {
|
||||
mode: "text" as const,
|
||||
text: "Reply: {{Body}}",
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Same-phone mode should bypass allowFrom check
|
||||
const result = await index.getReplyFromConfig(
|
||||
{ Body: "test", From: "+1555", To: "+1555" },
|
||||
undefined,
|
||||
cfg,
|
||||
);
|
||||
expect(result?.text).toBe("Reply: test");
|
||||
});
|
||||
|
||||
it("getReplyFromConfig rejects non-same-phone when not in allowFrom", async () => {
|
||||
const cfg = {
|
||||
inbound: {
|
||||
allowFrom: ["+9999"],
|
||||
reply: {
|
||||
mode: "text" as const,
|
||||
text: "Should not see this",
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = await index.getReplyFromConfig(
|
||||
{ Body: "test", From: "+1555", To: "+2666" },
|
||||
undefined,
|
||||
cfg,
|
||||
);
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it("getReplyFromConfig templating includes media fields", async () => {
|
||||
const cfg = {
|
||||
inbound: {
|
||||
@@ -692,7 +762,7 @@ describe("config and templating", () => {
|
||||
reply: {
|
||||
mode: "command" as const,
|
||||
command: ["claude", "{{Body}}"],
|
||||
claudeOutputFormat: "text" as const,
|
||||
agent: { kind: "claude", format: "text" as const },
|
||||
},
|
||||
},
|
||||
};
|
||||
@@ -732,7 +802,7 @@ describe("config and templating", () => {
|
||||
reply: {
|
||||
mode: "command" as const,
|
||||
command: ["claude", "{{Body}}"],
|
||||
claudeOutputFormat: "json" as const,
|
||||
agent: { kind: "claude", format: "json" as const },
|
||||
},
|
||||
},
|
||||
};
|
||||
@@ -760,7 +830,7 @@ describe("config and templating", () => {
|
||||
reply: {
|
||||
mode: "command" as const,
|
||||
command: ["claude", "{{Body}}"],
|
||||
// No claudeOutputFormat set on purpose
|
||||
agent: { kind: "claude" },
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
+387
-40
@@ -1,3 +1,4 @@
|
||||
import "./test-helpers.js";
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
@@ -6,8 +7,8 @@ import sharp from "sharp";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { WarelayConfig } from "../config/config.js";
|
||||
import { resolveStorePath } from "../config/sessions.js";
|
||||
import { resetLogger, setLoggerOverride } from "../logging.js";
|
||||
import * as commandQueue from "../process/command-queue.js";
|
||||
import {
|
||||
HEARTBEAT_PROMPT,
|
||||
HEARTBEAT_TOKEN,
|
||||
@@ -24,6 +25,18 @@ import {
|
||||
setLoadConfigMock,
|
||||
} from "./test-helpers.js";
|
||||
|
||||
const makeSessionStore = async (
|
||||
entries: Record<string, unknown> = {},
|
||||
): Promise<{ storePath: string; cleanup: () => Promise<void> }> => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-session-"));
|
||||
const storePath = path.join(dir, "sessions.json");
|
||||
await fs.writeFile(storePath, JSON.stringify(entries));
|
||||
return {
|
||||
storePath,
|
||||
cleanup: () => fs.rm(dir, { recursive: true, force: true }),
|
||||
};
|
||||
};
|
||||
|
||||
describe("heartbeat helpers", () => {
|
||||
it("strips heartbeat token and skips when only token", () => {
|
||||
expect(stripHeartbeatToken(undefined)).toEqual({
|
||||
@@ -78,19 +91,9 @@ describe("heartbeat helpers", () => {
|
||||
});
|
||||
|
||||
describe("resolveHeartbeatRecipients", () => {
|
||||
const makeStore = async (entries: Record<string, { updatedAt: number }>) => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-heartbeat-"));
|
||||
const storePath = path.join(dir, "sessions.json");
|
||||
await fs.writeFile(storePath, JSON.stringify(entries));
|
||||
return {
|
||||
storePath,
|
||||
cleanup: async () => fs.rm(dir, { recursive: true, force: true }),
|
||||
};
|
||||
};
|
||||
|
||||
it("returns the sole session recipient", async () => {
|
||||
const now = Date.now();
|
||||
const store = await makeStore({ "+1000": { updatedAt: now } });
|
||||
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["+1999"],
|
||||
@@ -105,7 +108,7 @@ describe("resolveHeartbeatRecipients", () => {
|
||||
|
||||
it("surfaces ambiguity when multiple sessions exist", async () => {
|
||||
const now = Date.now();
|
||||
const store = await makeStore({
|
||||
const store = await makeSessionStore({
|
||||
"+1000": { updatedAt: now },
|
||||
"+2000": { updatedAt: now - 10 },
|
||||
});
|
||||
@@ -122,7 +125,7 @@ describe("resolveHeartbeatRecipients", () => {
|
||||
});
|
||||
|
||||
it("filters wildcard allowFrom when no sessions exist", async () => {
|
||||
const store = await makeStore({});
|
||||
const store = await makeSessionStore({});
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
@@ -137,7 +140,7 @@ describe("resolveHeartbeatRecipients", () => {
|
||||
|
||||
it("merges sessions and allowFrom when --all is set", async () => {
|
||||
const now = Date.now();
|
||||
const store = await makeStore({ "+1000": { updatedAt: now } });
|
||||
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
|
||||
const cfg: WarelayConfig = {
|
||||
inbound: {
|
||||
allowFrom: ["+1999"],
|
||||
@@ -153,12 +156,16 @@ describe("resolveHeartbeatRecipients", () => {
|
||||
|
||||
describe("runWebHeartbeatOnce", () => {
|
||||
it("skips when heartbeat token returned", async () => {
|
||||
const store = await makeSessionStore();
|
||||
const sender: typeof sendMessageWeb = vi.fn();
|
||||
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
|
||||
setLoadConfigMock({
|
||||
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
|
||||
});
|
||||
await runWebHeartbeatOnce({
|
||||
cfg: {
|
||||
inbound: {
|
||||
allowFrom: ["+1555"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
},
|
||||
to: "+1555",
|
||||
verbose: false,
|
||||
sender,
|
||||
@@ -166,54 +173,58 @@ describe("runWebHeartbeatOnce", () => {
|
||||
});
|
||||
expect(resolver).toHaveBeenCalled();
|
||||
expect(sender).not.toHaveBeenCalled();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("sends when alert text present", async () => {
|
||||
const store = await makeSessionStore();
|
||||
const sender: typeof sendMessageWeb = vi
|
||||
.fn()
|
||||
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
|
||||
const resolver = vi.fn(async () => ({ text: "ALERT" }));
|
||||
setLoadConfigMock({
|
||||
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
|
||||
});
|
||||
await runWebHeartbeatOnce({
|
||||
cfg: {
|
||||
inbound: {
|
||||
allowFrom: ["+1555"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
},
|
||||
to: "+1555",
|
||||
verbose: false,
|
||||
sender,
|
||||
replyResolver: resolver,
|
||||
});
|
||||
expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false });
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("falls back to most recent session when no to is provided", async () => {
|
||||
const store = await makeSessionStore();
|
||||
const storePath = store.storePath;
|
||||
const sender: typeof sendMessageWeb = vi
|
||||
.fn()
|
||||
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
|
||||
const resolver = vi.fn(async () => ({ text: "ALERT" }));
|
||||
// Seed session store
|
||||
const now = Date.now();
|
||||
const store = {
|
||||
const sessionEntries = {
|
||||
"+1222": { sessionId: "s1", updatedAt: now - 1000 },
|
||||
"+1333": { sessionId: "s2", updatedAt: now },
|
||||
};
|
||||
const storePath = resolveStorePath();
|
||||
await fs.mkdir(resolveStorePath().replace("sessions.json", ""), {
|
||||
recursive: true,
|
||||
});
|
||||
await fs.writeFile(storePath, JSON.stringify(store));
|
||||
setLoadConfigMock({
|
||||
inbound: {
|
||||
allowFrom: ["+1999"],
|
||||
reply: { mode: "command", session: {} },
|
||||
},
|
||||
});
|
||||
await fs.writeFile(storePath, JSON.stringify(sessionEntries));
|
||||
await runWebHeartbeatOnce({
|
||||
cfg: {
|
||||
inbound: {
|
||||
allowFrom: ["+1999"],
|
||||
reply: { mode: "command", session: { store: storePath } },
|
||||
},
|
||||
},
|
||||
to: "+1999",
|
||||
verbose: false,
|
||||
sender,
|
||||
replyResolver: resolver,
|
||||
});
|
||||
expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false });
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("does not refresh updatedAt when heartbeat is skipped", async () => {
|
||||
@@ -353,14 +364,18 @@ describe("runWebHeartbeatOnce", () => {
|
||||
});
|
||||
|
||||
it("sends overrideBody directly and skips resolver", async () => {
|
||||
const store = await makeSessionStore();
|
||||
const sender: typeof sendMessageWeb = vi
|
||||
.fn()
|
||||
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
|
||||
const resolver = vi.fn();
|
||||
setLoadConfigMock({
|
||||
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
|
||||
});
|
||||
await runWebHeartbeatOnce({
|
||||
cfg: {
|
||||
inbound: {
|
||||
allowFrom: ["+1555"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
},
|
||||
to: "+1555",
|
||||
verbose: false,
|
||||
sender,
|
||||
@@ -371,15 +386,20 @@ describe("runWebHeartbeatOnce", () => {
|
||||
verbose: false,
|
||||
});
|
||||
expect(resolver).not.toHaveBeenCalled();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("dry-run overrideBody prints and skips send", async () => {
|
||||
const store = await makeSessionStore();
|
||||
const sender: typeof sendMessageWeb = vi.fn();
|
||||
const resolver = vi.fn();
|
||||
setLoadConfigMock({
|
||||
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
|
||||
});
|
||||
await runWebHeartbeatOnce({
|
||||
cfg: {
|
||||
inbound: {
|
||||
allowFrom: ["+1555"],
|
||||
reply: { mode: "command", session: { store: store.storePath } },
|
||||
},
|
||||
},
|
||||
to: "+1555",
|
||||
verbose: false,
|
||||
sender,
|
||||
@@ -389,6 +409,7 @@ describe("runWebHeartbeatOnce", () => {
|
||||
});
|
||||
expect(sender).not.toHaveBeenCalled();
|
||||
expect(resolver).not.toHaveBeenCalled();
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -504,6 +525,123 @@ describe("web auto-reply", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("skips reply heartbeat when requests are running", async () => {
|
||||
const tmpDir = await fs.mkdtemp(
|
||||
path.join(os.tmpdir(), "warelay-heartbeat-queue-"),
|
||||
);
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
await fs.writeFile(storePath, JSON.stringify({}));
|
||||
|
||||
const queueSpy = vi.spyOn(commandQueue, "getQueueSize").mockReturnValue(2);
|
||||
const replyResolver = vi.fn();
|
||||
const listenerFactory = vi.fn(async () => {
|
||||
const onClose = new Promise<void>(() => {
|
||||
// stay open until aborted
|
||||
});
|
||||
return { close: vi.fn(), onClose };
|
||||
});
|
||||
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
|
||||
|
||||
setLoadConfigMock(() => ({
|
||||
inbound: {
|
||||
allowFrom: ["+1555"],
|
||||
reply: { mode: "command", session: { store: storePath } },
|
||||
},
|
||||
}));
|
||||
|
||||
const controller = new AbortController();
|
||||
const run = monitorWebProvider(
|
||||
false,
|
||||
listenerFactory,
|
||||
true,
|
||||
replyResolver,
|
||||
runtime,
|
||||
controller.signal,
|
||||
{ replyHeartbeatMinutes: 1, replyHeartbeatNow: true },
|
||||
);
|
||||
|
||||
try {
|
||||
await Promise.resolve();
|
||||
controller.abort();
|
||||
await run;
|
||||
expect(replyResolver).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
queueSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("batches inbound messages while queue is busy and preserves timestamps", async () => {
|
||||
vi.useFakeTimers();
|
||||
const originalMax = process.getMaxListeners();
|
||||
process.setMaxListeners?.(1); // force low to confirm bump
|
||||
|
||||
const sendMedia = vi.fn();
|
||||
const reply = vi.fn().mockResolvedValue(undefined);
|
||||
const sendComposing = vi.fn();
|
||||
const resolver = vi.fn().mockResolvedValue({ text: "batched" });
|
||||
|
||||
let capturedOnMessage:
|
||||
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
|
||||
| undefined;
|
||||
const listenerFactory = async (opts: {
|
||||
onMessage: (
|
||||
msg: import("./inbound.js").WebInboundMessage,
|
||||
) => Promise<void>;
|
||||
}) => {
|
||||
capturedOnMessage = opts.onMessage;
|
||||
return { close: vi.fn() };
|
||||
};
|
||||
|
||||
// Queue starts busy, then frees after one polling tick.
|
||||
let queueBusy = true;
|
||||
const queueSpy = vi
|
||||
.spyOn(commandQueue, "getQueueSize")
|
||||
.mockImplementation(() => (queueBusy ? 1 : 0));
|
||||
|
||||
setLoadConfigMock(() => ({ inbound: { timestampPrefix: "UTC" } }));
|
||||
|
||||
await monitorWebProvider(false, listenerFactory, false, resolver);
|
||||
expect(capturedOnMessage).toBeDefined();
|
||||
|
||||
// Two messages from the same sender with fixed timestamps
|
||||
await capturedOnMessage?.({
|
||||
body: "first",
|
||||
from: "+1",
|
||||
to: "+2",
|
||||
id: "m1",
|
||||
timestamp: 1735689600000, // Jan 1 2025 00:00:00 UTC
|
||||
sendComposing,
|
||||
reply,
|
||||
sendMedia,
|
||||
});
|
||||
await capturedOnMessage?.({
|
||||
body: "second",
|
||||
from: "+1",
|
||||
to: "+2",
|
||||
id: "m2",
|
||||
timestamp: 1735693200000, // Jan 1 2025 01:00:00 UTC
|
||||
sendComposing,
|
||||
reply,
|
||||
sendMedia,
|
||||
});
|
||||
|
||||
// Let the queued batch flush once the queue is free
|
||||
queueBusy = false;
|
||||
vi.advanceTimersByTime(200);
|
||||
|
||||
expect(resolver).toHaveBeenCalledTimes(1);
|
||||
const args = resolver.mock.calls[0][0];
|
||||
expect(args.Body).toContain("[Jan 1 00:00] [warelay] first");
|
||||
expect(args.Body).toContain("[Jan 1 01:00] [warelay] second");
|
||||
|
||||
// Max listeners bumped to avoid warnings in multi-instance test runs
|
||||
expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50);
|
||||
|
||||
queueSpy.mockRestore();
|
||||
process.setMaxListeners?.(originalMax);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("falls back to text when media send fails", async () => {
|
||||
const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
|
||||
const reply = vi.fn().mockResolvedValue(undefined);
|
||||
@@ -945,4 +1083,213 @@ describe("web auto-reply", () => {
|
||||
expect(content).toContain('"module":"web-auto-reply"');
|
||||
expect(content).toContain('"text":"auto"');
|
||||
});
|
||||
|
||||
it("prefixes body with same-phone marker when from === to", async () => {
|
||||
// Enable messagePrefix for same-phone mode testing
|
||||
setLoadConfigMock(() => ({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: "[same-phone]",
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
}));
|
||||
|
||||
let capturedOnMessage:
|
||||
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
|
||||
| undefined;
|
||||
const listenerFactory = async (opts: {
|
||||
onMessage: (
|
||||
msg: import("./inbound.js").WebInboundMessage,
|
||||
) => Promise<void>;
|
||||
}) => {
|
||||
capturedOnMessage = opts.onMessage;
|
||||
return { close: vi.fn() };
|
||||
};
|
||||
|
||||
const resolver = vi.fn().mockResolvedValue({ text: "reply" });
|
||||
|
||||
await monitorWebProvider(false, listenerFactory, false, resolver);
|
||||
expect(capturedOnMessage).toBeDefined();
|
||||
|
||||
await capturedOnMessage?.({
|
||||
body: "hello",
|
||||
from: "+1555",
|
||||
to: "+1555", // Same phone!
|
||||
id: "msg1",
|
||||
sendComposing: vi.fn(),
|
||||
reply: vi.fn(),
|
||||
sendMedia: vi.fn(),
|
||||
});
|
||||
|
||||
// The resolver should receive a prefixed body with the configured marker
|
||||
const callArg = resolver.mock.calls[0]?.[0] as { Body?: string };
|
||||
expect(callArg?.Body).toBeDefined();
|
||||
expect(callArg?.Body).toBe("[same-phone] hello");
|
||||
resetLoadConfigMock();
|
||||
});
|
||||
|
||||
it("does not prefix body when from !== to", async () => {
|
||||
let capturedOnMessage:
|
||||
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
|
||||
| undefined;
|
||||
const listenerFactory = async (opts: {
|
||||
onMessage: (
|
||||
msg: import("./inbound.js").WebInboundMessage,
|
||||
) => Promise<void>;
|
||||
}) => {
|
||||
capturedOnMessage = opts.onMessage;
|
||||
return { close: vi.fn() };
|
||||
};
|
||||
|
||||
const resolver = vi.fn().mockResolvedValue({ text: "reply" });
|
||||
|
||||
await monitorWebProvider(false, listenerFactory, false, resolver);
|
||||
expect(capturedOnMessage).toBeDefined();
|
||||
|
||||
await capturedOnMessage?.({
|
||||
body: "hello",
|
||||
from: "+1555",
|
||||
to: "+2666", // Different phones
|
||||
id: "msg1",
|
||||
sendComposing: vi.fn(),
|
||||
reply: vi.fn(),
|
||||
sendMedia: vi.fn(),
|
||||
});
|
||||
|
||||
// Body should NOT be prefixed
|
||||
const callArg = resolver.mock.calls[0]?.[0] as { Body?: string };
|
||||
expect(callArg?.Body).toBe("hello");
|
||||
});
|
||||
|
||||
it("applies responsePrefix to regular replies", async () => {
|
||||
setLoadConfigMock(() => ({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: "🦞",
|
||||
timestampPrefix: false,
|
||||
},
|
||||
}));
|
||||
|
||||
let capturedOnMessage:
|
||||
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
|
||||
| undefined;
|
||||
const reply = vi.fn();
|
||||
const listenerFactory = async (opts: {
|
||||
onMessage: (
|
||||
msg: import("./inbound.js").WebInboundMessage,
|
||||
) => Promise<void>;
|
||||
}) => {
|
||||
capturedOnMessage = opts.onMessage;
|
||||
return { close: vi.fn() };
|
||||
};
|
||||
|
||||
const resolver = vi.fn().mockResolvedValue({ text: "hello there" });
|
||||
|
||||
await monitorWebProvider(false, listenerFactory, false, resolver);
|
||||
expect(capturedOnMessage).toBeDefined();
|
||||
|
||||
await capturedOnMessage?.({
|
||||
body: "hi",
|
||||
from: "+1555",
|
||||
to: "+2666",
|
||||
id: "msg1",
|
||||
sendComposing: vi.fn(),
|
||||
reply,
|
||||
sendMedia: vi.fn(),
|
||||
});
|
||||
|
||||
// Reply should have responsePrefix prepended
|
||||
expect(reply).toHaveBeenCalledWith("🦞 hello there");
|
||||
resetLoadConfigMock();
|
||||
});
|
||||
|
||||
it("skips responsePrefix for HEARTBEAT_OK responses", async () => {
|
||||
setLoadConfigMock(() => ({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: "🦞",
|
||||
timestampPrefix: false,
|
||||
},
|
||||
}));
|
||||
|
||||
let capturedOnMessage:
|
||||
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
|
||||
| undefined;
|
||||
const reply = vi.fn();
|
||||
const listenerFactory = async (opts: {
|
||||
onMessage: (
|
||||
msg: import("./inbound.js").WebInboundMessage,
|
||||
) => Promise<void>;
|
||||
}) => {
|
||||
capturedOnMessage = opts.onMessage;
|
||||
return { close: vi.fn() };
|
||||
};
|
||||
|
||||
// Resolver returns exact HEARTBEAT_OK
|
||||
const resolver = vi.fn().mockResolvedValue({ text: HEARTBEAT_TOKEN });
|
||||
|
||||
await monitorWebProvider(false, listenerFactory, false, resolver);
|
||||
expect(capturedOnMessage).toBeDefined();
|
||||
|
||||
await capturedOnMessage?.({
|
||||
body: "test",
|
||||
from: "+1555",
|
||||
to: "+2666",
|
||||
id: "msg1",
|
||||
sendComposing: vi.fn(),
|
||||
reply,
|
||||
sendMedia: vi.fn(),
|
||||
});
|
||||
|
||||
// HEARTBEAT_OK should NOT have prefix - warelay needs exact match
|
||||
expect(reply).toHaveBeenCalledWith(HEARTBEAT_TOKEN);
|
||||
resetLoadConfigMock();
|
||||
});
|
||||
|
||||
it("does not double-prefix if responsePrefix already present", async () => {
|
||||
setLoadConfigMock(() => ({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: "🦞",
|
||||
timestampPrefix: false,
|
||||
},
|
||||
}));
|
||||
|
||||
let capturedOnMessage:
|
||||
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
|
||||
| undefined;
|
||||
const reply = vi.fn();
|
||||
const listenerFactory = async (opts: {
|
||||
onMessage: (
|
||||
msg: import("./inbound.js").WebInboundMessage,
|
||||
) => Promise<void>;
|
||||
}) => {
|
||||
capturedOnMessage = opts.onMessage;
|
||||
return { close: vi.fn() };
|
||||
};
|
||||
|
||||
// Resolver returns text that already has prefix
|
||||
const resolver = vi.fn().mockResolvedValue({ text: "🦞 already prefixed" });
|
||||
|
||||
await monitorWebProvider(false, listenerFactory, false, resolver);
|
||||
expect(capturedOnMessage).toBeDefined();
|
||||
|
||||
await capturedOnMessage?.({
|
||||
body: "test",
|
||||
from: "+1555",
|
||||
to: "+2666",
|
||||
id: "msg1",
|
||||
sendComposing: vi.fn(),
|
||||
reply,
|
||||
sendMedia: vi.fn(),
|
||||
});
|
||||
|
||||
// Should not double-prefix
|
||||
expect(reply).toHaveBeenCalledWith("🦞 already prefixed");
|
||||
resetLoadConfigMock();
|
||||
});
|
||||
});
|
||||
|
||||
+348
-96
@@ -9,12 +9,14 @@ import {
|
||||
resolveStorePath,
|
||||
saveSessionStore,
|
||||
} from "../config/sessions.js";
|
||||
import { danger, isVerbose, logVerbose, success } from "../globals.js";
|
||||
import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
|
||||
import { logInfo } from "../logger.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { enqueueCommand, getQueueSize } from "../process/command-queue.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { normalizeE164 } from "../utils.js";
|
||||
import { monitorWebInbox } from "./inbound.js";
|
||||
import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js";
|
||||
import { loadWebMedia } from "./media.js";
|
||||
import { sendMessageWeb } from "./outbound.js";
|
||||
import {
|
||||
@@ -27,6 +29,26 @@ import {
|
||||
} from "./reconnect.js";
|
||||
import { getWebAuthAgeMs } from "./session.js";
|
||||
|
||||
/**
|
||||
* Send a message via IPC if relay is running, otherwise fall back to direct.
|
||||
* This avoids Signal session corruption from multiple Baileys connections.
|
||||
*/
|
||||
async function sendWithIpcFallback(
|
||||
to: string,
|
||||
message: string,
|
||||
opts: { verbose: boolean; mediaUrl?: string },
|
||||
): Promise<{ messageId: string; toJid: string }> {
|
||||
const ipcResult = await sendViaIpc(to, message, opts.mediaUrl);
|
||||
if (ipcResult?.success && ipcResult.messageId) {
|
||||
if (opts.verbose) {
|
||||
console.log(info(`Sent via relay IPC (avoiding session corruption)`));
|
||||
}
|
||||
return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` };
|
||||
}
|
||||
// Fall back to direct send
|
||||
return sendMessageWeb(to, message, opts);
|
||||
}
|
||||
|
||||
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
|
||||
type WebInboundMsg = Parameters<
|
||||
typeof monitorWebInbox
|
||||
@@ -94,7 +116,7 @@ export async function runWebHeartbeatOnce(opts: {
|
||||
} = opts;
|
||||
const _runtime = opts.runtime ?? defaultRuntime;
|
||||
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
|
||||
const sender = opts.sender ?? sendMessageWeb;
|
||||
const sender = opts.sender ?? sendWithIpcFallback;
|
||||
const runId = newConnectionId();
|
||||
const heartbeatLogger = getChildLogger({
|
||||
module: "web-heartbeat",
|
||||
@@ -493,6 +515,13 @@ export async function monitorWebProvider(
|
||||
}),
|
||||
);
|
||||
|
||||
// Avoid noisy MaxListenersExceeded warnings in test environments where
|
||||
// multiple relay instances may be constructed.
|
||||
const currentMaxListeners = process.getMaxListeners?.() ?? 10;
|
||||
if (process.setMaxListeners && currentMaxListeners < 50) {
|
||||
process.setMaxListeners(50);
|
||||
}
|
||||
|
||||
let sigintStop = false;
|
||||
const handleSigint = () => {
|
||||
sigintStop = true;
|
||||
@@ -501,6 +530,10 @@ export async function monitorWebProvider(
|
||||
|
||||
let reconnectAttempts = 0;
|
||||
|
||||
// Track recently sent messages to prevent echo loops
|
||||
const recentlySent = new Set<string>();
|
||||
const MAX_RECENT_MESSAGES = 100;
|
||||
|
||||
while (true) {
|
||||
if (stopRequested()) break;
|
||||
|
||||
@@ -508,96 +541,251 @@ export async function monitorWebProvider(
|
||||
const startedAt = Date.now();
|
||||
let heartbeat: NodeJS.Timeout | null = null;
|
||||
let replyHeartbeatTimer: NodeJS.Timeout | null = null;
|
||||
let watchdogTimer: NodeJS.Timeout | null = null;
|
||||
let lastMessageAt: number | null = null;
|
||||
let handledMessages = 0;
|
||||
let lastInboundMsg: WebInboundMsg | null = null;
|
||||
|
||||
// Watchdog to detect stuck message processing (e.g., event emitter died)
|
||||
// Should be significantly longer than heartbeatMinutes to avoid false positives
|
||||
const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages
|
||||
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
|
||||
|
||||
// Batch inbound messages while command queue is busy, then send one
|
||||
// combined prompt with per-message timestamps (inbound-only behavior).
|
||||
type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout };
|
||||
const pendingBatches = new Map<string, PendingBatch>();
|
||||
|
||||
const formatTimestamp = (ts?: number) => {
|
||||
const tsCfg = cfg.inbound?.timestampPrefix;
|
||||
const tsEnabled = tsCfg !== false; // default true
|
||||
if (!tsEnabled) return "";
|
||||
const tz = typeof tsCfg === "string" ? tsCfg : "UTC";
|
||||
const date = ts ? new Date(ts) : new Date();
|
||||
try {
|
||||
return `[${date.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${date.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `;
|
||||
} catch {
|
||||
return `[${date.toISOString().slice(5, 16).replace("T", " ")}] `;
|
||||
}
|
||||
};
|
||||
|
||||
const buildLine = (msg: WebInboundMsg) => {
|
||||
// Build message prefix: explicit config > default based on allowFrom
|
||||
let messagePrefix = cfg.inbound?.messagePrefix;
|
||||
if (messagePrefix === undefined) {
|
||||
const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0;
|
||||
messagePrefix = hasAllowFrom ? "" : "[warelay]";
|
||||
}
|
||||
const prefixStr = messagePrefix ? `${messagePrefix} ` : "";
|
||||
return `${formatTimestamp(msg.timestamp)}${prefixStr}${msg.body}`;
|
||||
};
|
||||
|
||||
const processBatch = async (from: string) => {
|
||||
const batch = pendingBatches.get(from);
|
||||
if (!batch || batch.messages.length === 0) return;
|
||||
if (getQueueSize() > 0) {
|
||||
// Wait until command queue is free to run the combined prompt.
|
||||
batch.timer = setTimeout(() => void processBatch(from), 150);
|
||||
return;
|
||||
}
|
||||
pendingBatches.delete(from);
|
||||
|
||||
const messages = batch.messages;
|
||||
const latest = messages[messages.length - 1];
|
||||
const combinedBody = messages.map(buildLine).join("\n");
|
||||
|
||||
// Echo detection uses combined body so we don't respond twice.
|
||||
if (recentlySent.has(combinedBody)) {
|
||||
logVerbose(`Skipping auto-reply: detected echo for combined batch`);
|
||||
recentlySent.delete(combinedBody);
|
||||
return;
|
||||
}
|
||||
|
||||
const correlationId = latest.id ?? newConnectionId();
|
||||
replyLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
correlationId,
|
||||
from,
|
||||
to: latest.to,
|
||||
body: combinedBody,
|
||||
mediaType: latest.mediaType ?? null,
|
||||
mediaPath: latest.mediaPath ?? null,
|
||||
batchSize: messages.length,
|
||||
},
|
||||
"inbound web message (batched)",
|
||||
);
|
||||
|
||||
const tsDisplay = latest.timestamp
|
||||
? new Date(latest.timestamp).toISOString()
|
||||
: new Date().toISOString();
|
||||
console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`);
|
||||
|
||||
const replyResult = await enqueueCommand(() =>
|
||||
(replyResolver ?? getReplyFromConfig)(
|
||||
{
|
||||
Body: combinedBody,
|
||||
From: latest.from,
|
||||
To: latest.to,
|
||||
MessageSid: latest.id,
|
||||
MediaPath: latest.mediaPath,
|
||||
MediaUrl: latest.mediaUrl,
|
||||
MediaType: latest.mediaType,
|
||||
},
|
||||
{
|
||||
onReplyStart: latest.sendComposing,
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
if (
|
||||
!replyResult ||
|
||||
(!replyResult.text &&
|
||||
!replyResult.mediaUrl &&
|
||||
!replyResult.mediaUrls?.length)
|
||||
) {
|
||||
logVerbose("Skipping auto-reply: no text/media returned from resolver");
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match)
|
||||
const responsePrefix = cfg.inbound?.responsePrefix;
|
||||
if (
|
||||
responsePrefix &&
|
||||
replyResult.text &&
|
||||
replyResult.text.trim() !== HEARTBEAT_TOKEN
|
||||
) {
|
||||
if (!replyResult.text.startsWith(responsePrefix)) {
|
||||
replyResult.text = `${responsePrefix} ${replyResult.text}`;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await deliverWebReply({
|
||||
replyResult,
|
||||
msg: latest,
|
||||
maxMediaBytes,
|
||||
replyLogger,
|
||||
runtime,
|
||||
connectionId,
|
||||
});
|
||||
|
||||
if (replyResult.text) {
|
||||
recentlySent.add(replyResult.text);
|
||||
recentlySent.add(combinedBody); // Prevent echo on the batch text itself
|
||||
logVerbose(
|
||||
`Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`,
|
||||
);
|
||||
if (recentlySent.size > MAX_RECENT_MESSAGES) {
|
||||
const firstKey = recentlySent.values().next().value;
|
||||
if (firstKey) recentlySent.delete(firstKey);
|
||||
}
|
||||
}
|
||||
|
||||
if (isVerbose()) {
|
||||
console.log(
|
||||
success(
|
||||
`↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`,
|
||||
),
|
||||
);
|
||||
} else {
|
||||
console.log(
|
||||
success(
|
||||
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`,
|
||||
),
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
danger(`Failed sending web auto-reply to ${from}: ${String(err)}`),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const enqueueBatch = async (msg: WebInboundMsg) => {
|
||||
const bucket = pendingBatches.get(msg.from) ?? { messages: [] };
|
||||
bucket.messages.push(msg);
|
||||
pendingBatches.set(msg.from, bucket);
|
||||
|
||||
// Process immediately when queue is free; otherwise wait until it drains.
|
||||
if (getQueueSize() === 0) {
|
||||
await processBatch(msg.from);
|
||||
} else {
|
||||
bucket.timer =
|
||||
bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150);
|
||||
}
|
||||
};
|
||||
|
||||
const listener = await (listenerFactory ?? monitorWebInbox)({
|
||||
verbose,
|
||||
onMessage: async (msg) => {
|
||||
handledMessages += 1;
|
||||
lastMessageAt = Date.now();
|
||||
const ts = msg.timestamp
|
||||
? new Date(msg.timestamp).toISOString()
|
||||
: new Date().toISOString();
|
||||
const correlationId = msg.id ?? newConnectionId();
|
||||
replyLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
correlationId,
|
||||
from: msg.from,
|
||||
to: msg.to,
|
||||
body: msg.body,
|
||||
mediaType: msg.mediaType ?? null,
|
||||
mediaPath: msg.mediaPath ?? null,
|
||||
},
|
||||
"inbound web message",
|
||||
);
|
||||
|
||||
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
|
||||
|
||||
lastInboundMsg = msg;
|
||||
|
||||
const replyResult = await (replyResolver ?? getReplyFromConfig)(
|
||||
{
|
||||
Body: msg.body,
|
||||
From: msg.from,
|
||||
To: msg.to,
|
||||
MessageSid: msg.id,
|
||||
MediaPath: msg.mediaPath,
|
||||
MediaUrl: msg.mediaUrl,
|
||||
MediaType: msg.mediaType,
|
||||
},
|
||||
{
|
||||
onReplyStart: msg.sendComposing,
|
||||
},
|
||||
);
|
||||
if (
|
||||
!replyResult ||
|
||||
(!replyResult.text &&
|
||||
!replyResult.mediaUrl &&
|
||||
!replyResult.mediaUrls?.length)
|
||||
) {
|
||||
// Same-phone mode logging retained
|
||||
if (msg.from === msg.to) {
|
||||
logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`);
|
||||
}
|
||||
|
||||
// Skip if this is a message we just sent (echo detection)
|
||||
if (recentlySent.has(msg.body)) {
|
||||
console.log(`⏭️ Skipping echo: detected recently sent message`);
|
||||
logVerbose(
|
||||
"Skipping auto-reply: no text/media returned from resolver",
|
||||
`Skipping auto-reply: detected echo (message matches recently sent text)`,
|
||||
);
|
||||
recentlySent.delete(msg.body);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await deliverWebReply({
|
||||
replyResult,
|
||||
msg,
|
||||
maxMediaBytes,
|
||||
replyLogger,
|
||||
runtime,
|
||||
connectionId,
|
||||
});
|
||||
if (isVerbose()) {
|
||||
console.log(
|
||||
success(
|
||||
`↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`,
|
||||
),
|
||||
);
|
||||
} else {
|
||||
console.log(
|
||||
success(
|
||||
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`,
|
||||
),
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
danger(
|
||||
`Failed sending web auto-reply to ${msg.from}: ${String(err)}`,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return enqueueBatch(msg);
|
||||
},
|
||||
});
|
||||
|
||||
// Start IPC server so `warelay send` can use this connection
|
||||
// instead of creating a new one (which would corrupt Signal session)
|
||||
if ("sendMessage" in listener && "sendComposingTo" in listener) {
|
||||
startIpcServer(async (to, message, mediaUrl) => {
|
||||
let mediaBuffer: Buffer | undefined;
|
||||
let mediaType: string | undefined;
|
||||
if (mediaUrl) {
|
||||
const media = await loadWebMedia(mediaUrl);
|
||||
mediaBuffer = media.buffer;
|
||||
mediaType = media.contentType;
|
||||
}
|
||||
const result = await listener.sendMessage(
|
||||
to,
|
||||
message,
|
||||
mediaBuffer,
|
||||
mediaType,
|
||||
);
|
||||
// Add to echo detection so we don't process our own message
|
||||
if (message) {
|
||||
recentlySent.add(message);
|
||||
if (recentlySent.size > MAX_RECENT_MESSAGES) {
|
||||
const firstKey = recentlySent.values().next().value;
|
||||
if (firstKey) recentlySent.delete(firstKey);
|
||||
}
|
||||
}
|
||||
logInfo(
|
||||
`📤 IPC send to ${to}: ${message.substring(0, 50)}...`,
|
||||
runtime,
|
||||
);
|
||||
// Show typing indicator after send so user knows more may be coming
|
||||
try {
|
||||
await listener.sendComposingTo(to);
|
||||
} catch {
|
||||
// Ignore typing indicator errors - not critical
|
||||
}
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
const closeListener = async () => {
|
||||
stopIpcServer();
|
||||
if (heartbeat) clearInterval(heartbeat);
|
||||
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
|
||||
if (watchdogTimer) clearInterval(watchdogTimer);
|
||||
try {
|
||||
await listener.close();
|
||||
} catch (err) {
|
||||
@@ -608,21 +796,69 @@ export async function monitorWebProvider(
|
||||
if (keepAlive) {
|
||||
heartbeat = setInterval(() => {
|
||||
const authAgeMs = getWebAuthAgeMs();
|
||||
heartbeatLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
reconnectAttempts,
|
||||
messagesHandled: handledMessages,
|
||||
lastMessageAt,
|
||||
authAgeMs,
|
||||
uptimeMs: Date.now() - startedAt,
|
||||
},
|
||||
"web relay heartbeat",
|
||||
);
|
||||
const minutesSinceLastMessage = lastMessageAt
|
||||
? Math.floor((Date.now() - lastMessageAt) / 60000)
|
||||
: null;
|
||||
|
||||
const logData = {
|
||||
connectionId,
|
||||
reconnectAttempts,
|
||||
messagesHandled: handledMessages,
|
||||
lastMessageAt,
|
||||
authAgeMs,
|
||||
uptimeMs: Date.now() - startedAt,
|
||||
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
|
||||
? { minutesSinceLastMessage }
|
||||
: {}),
|
||||
};
|
||||
|
||||
// Warn if no messages in 30+ minutes
|
||||
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
|
||||
heartbeatLogger.warn(
|
||||
logData,
|
||||
"⚠️ web relay heartbeat - no messages in 30+ minutes",
|
||||
);
|
||||
} else {
|
||||
heartbeatLogger.info(logData, "web relay heartbeat");
|
||||
}
|
||||
}, heartbeatSeconds * 1000);
|
||||
|
||||
// Watchdog: Auto-restart if no messages received for MESSAGE_TIMEOUT_MS
|
||||
watchdogTimer = setInterval(() => {
|
||||
if (lastMessageAt) {
|
||||
const timeSinceLastMessage = Date.now() - lastMessageAt;
|
||||
if (timeSinceLastMessage > MESSAGE_TIMEOUT_MS) {
|
||||
const minutesSinceLastMessage = Math.floor(
|
||||
timeSinceLastMessage / 60000,
|
||||
);
|
||||
heartbeatLogger.warn(
|
||||
{
|
||||
connectionId,
|
||||
minutesSinceLastMessage,
|
||||
lastMessageAt: new Date(lastMessageAt),
|
||||
messagesHandled: handledMessages,
|
||||
},
|
||||
"Message timeout detected - forcing reconnect",
|
||||
);
|
||||
console.error(
|
||||
`⚠️ No messages received in ${minutesSinceLastMessage}m - restarting connection`,
|
||||
);
|
||||
closeListener(); // Trigger reconnect
|
||||
}
|
||||
}
|
||||
}, WATCHDOG_CHECK_MS);
|
||||
}
|
||||
|
||||
const runReplyHeartbeat = async () => {
|
||||
const queued = getQueueSize();
|
||||
if (queued > 0) {
|
||||
heartbeatLogger.info(
|
||||
{ connectionId, reason: "requests-in-flight", queued },
|
||||
"reply heartbeat skipped",
|
||||
);
|
||||
console.log(success("heartbeat: skipped (requests in flight)"));
|
||||
return;
|
||||
}
|
||||
if (!replyHeartbeatMinutes) return;
|
||||
const tickStart = Date.now();
|
||||
if (!lastInboundMsg) {
|
||||
@@ -695,19 +931,24 @@ export async function monitorWebProvider(
|
||||
"reply heartbeat start",
|
||||
);
|
||||
}
|
||||
const replyResult = await (replyResolver ?? getReplyFromConfig)(
|
||||
{
|
||||
Body: HEARTBEAT_PROMPT,
|
||||
From: lastInboundMsg.from,
|
||||
To: lastInboundMsg.to,
|
||||
MessageSid: snapshot.entry?.sessionId,
|
||||
MediaPath: undefined,
|
||||
MediaUrl: undefined,
|
||||
MediaType: undefined,
|
||||
},
|
||||
{
|
||||
onReplyStart: lastInboundMsg.sendComposing,
|
||||
},
|
||||
const hbFrom = lastInboundMsg.from;
|
||||
const hbTo = lastInboundMsg.to;
|
||||
const hbComposing = lastInboundMsg.sendComposing;
|
||||
const replyResult = await enqueueCommand(() =>
|
||||
(replyResolver ?? getReplyFromConfig)(
|
||||
{
|
||||
Body: HEARTBEAT_PROMPT,
|
||||
From: hbFrom,
|
||||
To: hbTo,
|
||||
MessageSid: snapshot.entry?.sessionId,
|
||||
MediaPath: undefined,
|
||||
MediaUrl: undefined,
|
||||
MediaType: undefined,
|
||||
},
|
||||
{
|
||||
onReplyStart: hbComposing,
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
if (
|
||||
@@ -746,9 +987,20 @@ export async function monitorWebProvider(
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply response prefix if configured (same as regular messages)
|
||||
let finalText = stripped.text;
|
||||
const responsePrefix = cfg.inbound?.responsePrefix;
|
||||
if (
|
||||
responsePrefix &&
|
||||
finalText &&
|
||||
!finalText.startsWith(responsePrefix)
|
||||
) {
|
||||
finalText = `${responsePrefix} ${finalText}`;
|
||||
}
|
||||
|
||||
const cleanedReply: ReplyPayload = {
|
||||
...replyResult,
|
||||
text: stripped.text,
|
||||
text: finalText,
|
||||
};
|
||||
|
||||
await deliverWebReply({
|
||||
|
||||
@@ -5,6 +5,17 @@ import path from "node:path";
|
||||
|
||||
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: vi.fn().mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"], // Allow all in tests
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
const HOME = path.join(
|
||||
os.tmpdir(),
|
||||
`warelay-inbound-media-${crypto.randomUUID()}`,
|
||||
|
||||
+75
-2
@@ -8,10 +8,11 @@ import {
|
||||
downloadMediaMessage,
|
||||
} from "@whiskeysockets/baileys";
|
||||
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { isVerbose, logVerbose } from "../globals.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { saveMediaBuffer } from "../media/store.js";
|
||||
import { jidToE164 } from "../utils.js";
|
||||
import { jidToE164, normalizeE164 } from "../utils.js";
|
||||
import {
|
||||
createWaSocket,
|
||||
getStatusCode,
|
||||
@@ -70,7 +71,7 @@ export async function monitorWebInbox(options: {
|
||||
// De-dupe on message id; Baileys can emit retries.
|
||||
if (id && seen.has(id)) continue;
|
||||
if (id) seen.add(id);
|
||||
if (msg.key?.fromMe) continue;
|
||||
// Note: not filtering fromMe here - echo detection happens in auto-reply layer
|
||||
const remoteJid = msg.key?.remoteJid;
|
||||
if (!remoteJid) continue;
|
||||
// Ignore status/broadcast traffic; we only care about direct chats.
|
||||
@@ -94,6 +95,25 @@ export async function monitorWebInbox(options: {
|
||||
}
|
||||
const from = jidToE164(remoteJid);
|
||||
if (!from) continue;
|
||||
|
||||
// Filter unauthorized senders early to prevent wasted processing
|
||||
// and potential session corruption from Bad MAC errors
|
||||
const cfg = loadConfig();
|
||||
const allowFrom = cfg.inbound?.allowFrom;
|
||||
const isSamePhone = from === selfE164;
|
||||
|
||||
if (!isSamePhone && Array.isArray(allowFrom) && allowFrom.length > 0) {
|
||||
if (
|
||||
!allowFrom.includes("*") &&
|
||||
!allowFrom.map(normalizeE164).includes(from)
|
||||
) {
|
||||
logVerbose(
|
||||
`Blocked unauthorized sender ${from} (not in allowFrom list)`,
|
||||
);
|
||||
continue; // Skip processing entirely
|
||||
}
|
||||
}
|
||||
|
||||
let body = extractText(msg.message ?? undefined);
|
||||
if (!body) {
|
||||
body = extractMediaPlaceholder(msg.message ?? undefined);
|
||||
@@ -197,6 +217,59 @@ export async function monitorWebInbox(options: {
|
||||
}
|
||||
},
|
||||
onClose,
|
||||
/**
|
||||
* Send a message through this connection's socket.
|
||||
* Used by IPC to avoid creating new connections.
|
||||
*/
|
||||
sendMessage: async (
|
||||
to: string,
|
||||
text: string,
|
||||
mediaBuffer?: Buffer,
|
||||
mediaType?: string,
|
||||
): Promise<{ messageId: string }> => {
|
||||
const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`;
|
||||
let payload: AnyMessageContent;
|
||||
if (mediaBuffer && mediaType) {
|
||||
if (mediaType.startsWith("image/")) {
|
||||
payload = {
|
||||
image: mediaBuffer,
|
||||
caption: text || undefined,
|
||||
mimetype: mediaType,
|
||||
};
|
||||
} else if (mediaType.startsWith("audio/")) {
|
||||
payload = {
|
||||
audio: mediaBuffer,
|
||||
ptt: true,
|
||||
mimetype: mediaType,
|
||||
};
|
||||
} else if (mediaType.startsWith("video/")) {
|
||||
payload = {
|
||||
video: mediaBuffer,
|
||||
caption: text || undefined,
|
||||
mimetype: mediaType,
|
||||
};
|
||||
} else {
|
||||
payload = {
|
||||
document: mediaBuffer,
|
||||
fileName: "file",
|
||||
caption: text || undefined,
|
||||
mimetype: mediaType,
|
||||
};
|
||||
}
|
||||
} else {
|
||||
payload = { text };
|
||||
}
|
||||
const result = await sock.sendMessage(jid, payload);
|
||||
return { messageId: result?.key?.id ?? "unknown" };
|
||||
},
|
||||
/**
|
||||
* Send typing indicator ("composing") to a chat.
|
||||
* Used after IPC send to show more messages are coming.
|
||||
*/
|
||||
sendComposingTo: async (to: string): Promise<void> => {
|
||||
const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`;
|
||||
await sock.sendPresenceUpdate("composing", jid);
|
||||
},
|
||||
} as const;
|
||||
}
|
||||
|
||||
|
||||
+225
@@ -0,0 +1,225 @@
|
||||
/**
|
||||
* IPC server for warelay relay.
|
||||
*
|
||||
* When the relay is running, it starts a Unix socket server that allows
|
||||
* `warelay send` and `warelay heartbeat` to send messages through the
|
||||
* existing WhatsApp connection instead of creating new ones.
|
||||
*
|
||||
* This prevents Signal session ratchet corruption from multiple connections.
|
||||
*/
|
||||
|
||||
import fs from "node:fs";
|
||||
import net from "node:net";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { getChildLogger } from "../logging.js";
|
||||
|
||||
const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock");
|
||||
|
||||
export interface IpcSendRequest {
|
||||
type: "send";
|
||||
to: string;
|
||||
message: string;
|
||||
mediaUrl?: string;
|
||||
}
|
||||
|
||||
export interface IpcSendResponse {
|
||||
success: boolean;
|
||||
messageId?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
type SendHandler = (
|
||||
to: string,
|
||||
message: string,
|
||||
mediaUrl?: string,
|
||||
) => Promise<{ messageId: string }>;
|
||||
|
||||
let server: net.Server | null = null;
|
||||
|
||||
/**
|
||||
* Start the IPC server. Called by the relay when it starts.
|
||||
*/
|
||||
export function startIpcServer(sendHandler: SendHandler): void {
|
||||
const logger = getChildLogger({ module: "ipc-server" });
|
||||
|
||||
// Clean up stale socket file
|
||||
try {
|
||||
fs.unlinkSync(SOCKET_PATH);
|
||||
} catch {
|
||||
// Ignore if doesn't exist
|
||||
}
|
||||
|
||||
server = net.createServer((conn) => {
|
||||
let buffer = "";
|
||||
|
||||
conn.on("data", async (data) => {
|
||||
buffer += data.toString();
|
||||
|
||||
// Try to parse complete JSON messages (newline-delimited)
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() ?? ""; // Keep incomplete line in buffer
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
|
||||
try {
|
||||
const request = JSON.parse(line) as IpcSendRequest;
|
||||
|
||||
if (request.type === "send") {
|
||||
try {
|
||||
const result = await sendHandler(
|
||||
request.to,
|
||||
request.message,
|
||||
request.mediaUrl,
|
||||
);
|
||||
const response: IpcSendResponse = {
|
||||
success: true,
|
||||
messageId: result.messageId,
|
||||
};
|
||||
conn.write(`${JSON.stringify(response)}\n`);
|
||||
} catch (err) {
|
||||
const response: IpcSendResponse = {
|
||||
success: false,
|
||||
error: String(err),
|
||||
};
|
||||
conn.write(`${JSON.stringify(response)}\n`);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn({ error: String(err) }, "failed to parse IPC request");
|
||||
const response: IpcSendResponse = {
|
||||
success: false,
|
||||
error: "Invalid request format",
|
||||
};
|
||||
conn.write(`${JSON.stringify(response)}\n`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
conn.on("error", (err) => {
|
||||
logger.debug({ error: String(err) }, "IPC connection error");
|
||||
});
|
||||
});
|
||||
|
||||
server.listen(SOCKET_PATH, () => {
|
||||
logger.info({ socketPath: SOCKET_PATH }, "IPC server started");
|
||||
// Make socket accessible
|
||||
fs.chmodSync(SOCKET_PATH, 0o600);
|
||||
});
|
||||
|
||||
server.on("error", (err) => {
|
||||
logger.error({ error: String(err) }, "IPC server error");
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the IPC server. Called when relay shuts down.
|
||||
*/
|
||||
export function stopIpcServer(): void {
|
||||
if (server) {
|
||||
server.close();
|
||||
server = null;
|
||||
}
|
||||
try {
|
||||
fs.unlinkSync(SOCKET_PATH);
|
||||
} catch {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the relay IPC server is running.
|
||||
*/
|
||||
export function isRelayRunning(): boolean {
|
||||
try {
|
||||
fs.accessSync(SOCKET_PATH);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message through the running relay's IPC.
|
||||
* Returns null if relay is not running.
|
||||
*/
|
||||
export async function sendViaIpc(
|
||||
to: string,
|
||||
message: string,
|
||||
mediaUrl?: string,
|
||||
): Promise<IpcSendResponse | null> {
|
||||
if (!isRelayRunning()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const client = net.createConnection(SOCKET_PATH);
|
||||
let buffer = "";
|
||||
let resolved = false;
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
client.destroy();
|
||||
resolve({ success: false, error: "IPC timeout" });
|
||||
}
|
||||
}, 30000); // 30 second timeout
|
||||
|
||||
client.on("connect", () => {
|
||||
const request: IpcSendRequest = {
|
||||
type: "send",
|
||||
to,
|
||||
message,
|
||||
mediaUrl,
|
||||
};
|
||||
client.write(`${JSON.stringify(request)}\n`);
|
||||
});
|
||||
|
||||
client.on("data", (data) => {
|
||||
buffer += data.toString();
|
||||
const lines = buffer.split("\n");
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
try {
|
||||
const response = JSON.parse(line) as IpcSendResponse;
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
clearTimeout(timeout);
|
||||
client.end();
|
||||
resolve(response);
|
||||
}
|
||||
return;
|
||||
} catch {
|
||||
// Keep reading
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
client.on("error", (_err) => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
clearTimeout(timeout);
|
||||
// Socket exists but can't connect - relay might have crashed
|
||||
resolve(null);
|
||||
}
|
||||
});
|
||||
|
||||
client.on("close", () => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
clearTimeout(timeout);
|
||||
resolve({ success: false, error: "Connection closed" });
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the IPC socket path for debugging/status.
|
||||
*/
|
||||
export function getSocketPath(): string {
|
||||
return SOCKET_PATH;
|
||||
}
|
||||
@@ -9,6 +9,19 @@ vi.mock("../media/store.js", () => ({
|
||||
}),
|
||||
}));
|
||||
|
||||
const mockLoadConfig = vi.fn().mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"], // Allow all in tests
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
});
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: () => mockLoadConfig(),
|
||||
}));
|
||||
|
||||
vi.mock("./session.js", () => {
|
||||
const { EventEmitter } = require("node:events");
|
||||
const ev = new EventEmitter();
|
||||
@@ -216,4 +229,150 @@ describe("web monitor inbox", () => {
|
||||
]);
|
||||
await listener.close();
|
||||
});
|
||||
|
||||
it("blocks messages from unauthorized senders not in allowFrom", async () => {
|
||||
// Test for auto-recovery fix: early allowFrom filtering prevents Bad MAC errors
|
||||
// from unauthorized senders corrupting sessions
|
||||
mockLoadConfig.mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["+111"], // Only allow +111
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
});
|
||||
|
||||
const onMessage = vi.fn();
|
||||
const listener = await monitorWebInbox({ verbose: false, onMessage });
|
||||
const sock = await createWaSocket();
|
||||
|
||||
// Message from unauthorized sender +999 (not in allowFrom)
|
||||
const upsert = {
|
||||
type: "notify",
|
||||
messages: [
|
||||
{
|
||||
key: {
|
||||
id: "unauth1",
|
||||
fromMe: false,
|
||||
remoteJid: "999@s.whatsapp.net",
|
||||
},
|
||||
message: { conversation: "unauthorized message" },
|
||||
messageTimestamp: 1_700_000_000,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
sock.ev.emit("messages.upsert", upsert);
|
||||
await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
// Should NOT call onMessage for unauthorized senders
|
||||
expect(onMessage).not.toHaveBeenCalled();
|
||||
|
||||
// Reset mock for other tests
|
||||
mockLoadConfig.mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
});
|
||||
|
||||
await listener.close();
|
||||
});
|
||||
|
||||
it("allows messages from senders in allowFrom list", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["+111", "+999"], // Allow +999
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
});
|
||||
|
||||
const onMessage = vi.fn();
|
||||
const listener = await monitorWebInbox({ verbose: false, onMessage });
|
||||
const sock = await createWaSocket();
|
||||
|
||||
const upsert = {
|
||||
type: "notify",
|
||||
messages: [
|
||||
{
|
||||
key: { id: "auth1", fromMe: false, remoteJid: "999@s.whatsapp.net" },
|
||||
message: { conversation: "authorized message" },
|
||||
messageTimestamp: 1_700_000_000,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
sock.ev.emit("messages.upsert", upsert);
|
||||
await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
// Should call onMessage for authorized senders
|
||||
expect(onMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ body: "authorized message", from: "+999" }),
|
||||
);
|
||||
|
||||
// Reset mock for other tests
|
||||
mockLoadConfig.mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
});
|
||||
|
||||
await listener.close();
|
||||
});
|
||||
|
||||
it("allows same-phone messages even if not in allowFrom", async () => {
|
||||
// Same-phone mode: when from === selfJid, should always be allowed
|
||||
// This allows users to message themselves even with restrictive allowFrom
|
||||
mockLoadConfig.mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["+111"], // Only allow +111, but self is +123
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
});
|
||||
|
||||
const onMessage = vi.fn();
|
||||
const listener = await monitorWebInbox({ verbose: false, onMessage });
|
||||
const sock = await createWaSocket();
|
||||
|
||||
// Message from self (sock.user.id is "123@s.whatsapp.net" in mock)
|
||||
const upsert = {
|
||||
type: "notify",
|
||||
messages: [
|
||||
{
|
||||
key: { id: "self1", fromMe: false, remoteJid: "123@s.whatsapp.net" },
|
||||
message: { conversation: "self message" },
|
||||
messageTimestamp: 1_700_000_000,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
sock.ev.emit("messages.upsert", upsert);
|
||||
await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
// Should allow self-messages even if not in allowFrom
|
||||
expect(onMessage).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ body: "self message", from: "+123" }),
|
||||
);
|
||||
|
||||
// Reset mock for other tests
|
||||
mockLoadConfig.mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"],
|
||||
messagePrefix: undefined,
|
||||
responsePrefix: undefined,
|
||||
timestampPrefix: false,
|
||||
},
|
||||
});
|
||||
|
||||
await listener.close();
|
||||
});
|
||||
});
|
||||
|
||||
+24
-5
@@ -3,18 +3,37 @@ import { vi } from "vitest";
|
||||
import type { MockBaileysSocket } from "../../test/mocks/baileys.js";
|
||||
import { createMockBaileys } from "../../test/mocks/baileys.js";
|
||||
|
||||
let loadConfigMock: () => unknown = () => ({});
|
||||
// Use globalThis to store the mock config so it survives vi.mock hoisting
|
||||
const CONFIG_KEY = Symbol.for("warelay:testConfigMock");
|
||||
const DEFAULT_CONFIG = {
|
||||
inbound: {
|
||||
allowFrom: ["*"], // Allow all in tests by default
|
||||
messagePrefix: undefined, // No message prefix in tests
|
||||
responsePrefix: undefined, // No response prefix in tests
|
||||
timestampPrefix: false, // No timestamp in tests
|
||||
},
|
||||
};
|
||||
|
||||
export function setLoadConfigMock(fn: () => unknown) {
|
||||
loadConfigMock = fn;
|
||||
// Initialize default if not set
|
||||
if (!(globalThis as Record<symbol, unknown>)[CONFIG_KEY]) {
|
||||
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] = () => DEFAULT_CONFIG;
|
||||
}
|
||||
|
||||
export function setLoadConfigMock(fn: (() => unknown) | unknown) {
|
||||
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] =
|
||||
typeof fn === "function" ? fn : () => fn;
|
||||
}
|
||||
|
||||
export function resetLoadConfigMock() {
|
||||
loadConfigMock = () => ({});
|
||||
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] = () => DEFAULT_CONFIG;
|
||||
}
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: () => loadConfigMock(),
|
||||
loadConfig: () => {
|
||||
const getter = (globalThis as Record<symbol, unknown>)[CONFIG_KEY];
|
||||
if (typeof getter === "function") return getter();
|
||||
return DEFAULT_CONFIG;
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("../media/store.js", () => ({
|
||||
|
||||
Reference in New Issue
Block a user