Compare commits

...

21 Commits

Author SHA1 Message Date
Peter Steinberger 2b941ccc93 Changelog: note multi-agent and batching
CI / build (push) Failing after 27s
Co-authored-by: RealSid08 <RealSid08@users.noreply.github.com>
2025-12-02 11:11:50 +00:00
Peter Steinberger ed080ae988 Tests: cover agents and fix web defaults
Co-authored-by: RealSid08 <RealSid08@users.noreply.github.com>
2025-12-02 11:08:00 +00:00
Peter Steinberger f31e89d5af Agents: add pluggable CLIs
Co-authored-by: RealSid08 <RealSid08@users.noreply.github.com>
2025-12-02 11:07:46 +00:00
Peter Steinberger 52c311e47f chore: bump version to 1.3.0 2025-12-02 07:54:49 +00:00
Peter Steinberger 5b54d4de7a feat(web): batch inbound messages 2025-12-02 07:54:13 +00:00
Peter Steinberger 96152f6577 Add typing indicator after IPC send
After sending via IPC, automatically show "composing" indicator so
user knows more messages may be coming from the running session.
2025-12-02 06:58:17 +00:00
Peter Steinberger e881b3c5de Document exclamation mark escaping workaround for Claude Code
Add symlink CLAUDE.md -> AGENTS.md for Claude Code compatibility.
2025-12-02 06:52:56 +00:00
Peter Steinberger e86b507da7 Add IPC to prevent Signal session corruption from concurrent connections
When the relay is running, `warelay send` and `warelay heartbeat` now
communicate via Unix socket IPC (~/.warelay/relay.sock) to send messages
through the relay's existing WhatsApp connection.

Previously, these commands created new Baileys sockets that wrote to the
same auth state files, corrupting the Signal session ratchet and causing
the relay's subsequent sends to fail silently.

Changes:
- Add src/web/ipc.ts with Unix socket server/client
- Relay starts IPC server after connecting
- send command tries IPC first, falls back to direct
- heartbeat uses sendWithIpcFallback helper
- inbound.ts exposes sendMessage on listener object
- Messages sent via IPC are added to echo detection set
2025-12-02 06:31:07 +00:00
Peter Steinberger 2fc3a822c8 web: isolate session fixtures and skip heartbeat when busy 2025-12-02 06:17:16 +00:00
Peter Steinberger 1b0e1edb08 Update changelog with error message and test isolation fixes 2025-12-02 05:59:31 +00:00
Peter Steinberger d107b79c63 Fix test corrupting production sessions.json
The test 'falls back to most recent session when no to is provided' was
using resolveStorePath() which returns the real ~/.warelay/sessions.json.
This overwrote production session data with test values, causing session
fragmentation issues.

Changed to use a temp directory like other tests.
2025-12-02 05:54:31 +00:00
Peter Steinberger c5ab442f46 Fix empty result JSON dump and missing heartbeat prefix
Bug fixes:
- Empty result field handling: Changed truthy check to explicit type
  check (`typeof parsed?.text === "string"`) in command-reply.ts.
  Previously, Claude CLI returning `result: ""` would cause raw JSON
  to be sent to WhatsApp.
- Response prefix on heartbeat: Apply `responsePrefix` to heartbeat
  alert messages in runReplyHeartbeat, matching behavior of regular
  message handler.
2025-12-02 04:29:17 +00:00
Peter Steinberger c5677df56e Increase watchdog timeout to 30 minutes
Changed from 10 to 30 minutes to avoid false positives when
heartbeatMinutes is set to 10. The watchdog should be significantly
longer than the heartbeat interval to account for:
- Network latency
- Slow command responses
- Brief connection hiccups

With heartbeatMinutes=10, a 30-minute watchdog gives 3x buffer before
triggering auto-restart.
2025-11-30 18:03:19 +00:00
Peter Steinberger 21ba0fb8a4 Fix test isolation to prevent loading real user config
Tests were picking up real ~/.warelay/warelay.json with emojis and
prefixes (like "🦞"), causing test assertions to fail. Added proper
config mocks to all test files.

Changes:
- Mock loadConfig() in index.core.test.ts, inbound.media.test.ts,
  monitor-inbox.test.ts
- Update test-helpers.ts default mock to disable all prefixes
- Tests now use clean config: no messagePrefix, no responsePrefix,
  no timestamp, allowFrom=["*"]

This ensures tests validate core behavior without user-specific config.
The responsePrefix feature itself is already fully config-driven - this
only fixes test isolation.
2025-11-30 18:00:57 +00:00
Peter Steinberger 69319a0569 Add auto-recovery from stuck WhatsApp sessions
Fixes issue where unauthorized messages from +212652169245 (5elements spa)
triggered Bad MAC errors and silently killed the event emitter, preventing
all future message processing.

Changes:
1. Early allowFrom filtering in inbound.ts - blocks unauthorized senders
   before they trigger encryption errors
2. Message timeout watchdog - auto-restarts connection if no messages
   received for 10 minutes
3. Health monitoring in heartbeat - warns if >30 min without messages
4. Mock loadConfig in tests to handle new dependency

Root cause: Event emitter stopped firing after Bad MAC errors from
decryption attempts on messages from unauthorized senders. Connection
stayed alive but all subsequent messages.upsert events silently failed.
2025-11-30 17:53:32 +00:00
Peter Steinberger 37d8e55991 Skip responsePrefix for HEARTBEAT_OK responses
Preserve exact match so warelay recognizes heartbeat responses
and doesn't send them as messages.
2025-11-29 06:02:21 +00:00
Peter Steinberger 8d20edb028 Simplify timestampPrefix: bool or timezone string, default true
- timestampPrefix: true (UTC), false (off), or 'America/New_York'
- Removed separate timestampTimezone option
- Default is now enabled (true/UTC) unless explicitly false
2025-11-29 05:29:29 +00:00
Peter Steinberger 7564c4e7f4 Generalize prefix config: messagePrefix + responsePrefix
Replaces samePhoneMarker/samePhoneResponsePrefix with:
- messagePrefix: prefix for all inbound messages
  - Default: '[warelay]' if no allowFrom, else ''
- responsePrefix: prefix for all outbound replies

Also adds timestamp options:
- timestampPrefix: boolean to enable [Nov 29 06:30] format
- timestampTimezone: IANA timezone (default UTC)

Updated README with new config table entries.
2025-11-29 05:27:58 +00:00
Peter Steinberger 26e02a9b8b Add timestampPrefix config for datetime awareness
New config options:
- timestampPrefix: boolean - prepend timestamp to messages
- timestampTimezone: string - IANA timezone (default: UTC)

Format: [Nov 29 06:30] - compact but informative
Helps AI assistants stay aware of current date/time.
2025-11-29 05:25:53 +00:00
Peter Steinberger 25ec133574 Add samePhoneResponsePrefix config option
Automatically prefixes responses with a configurable string when in
same-phone mode. This helps distinguish bot replies from user messages
in the same chat bubble.

Example config:
  "samePhoneResponsePrefix": "🦞"

Will prefix all same-phone replies with the lobster emoji.
2025-11-29 05:24:01 +00:00
Peter Steinberger d88ede92b9 feat: same-phone mode with echo detection and configurable marker
Adds full support for self-messaging setups where you chat with yourself
and an AI assistant replies in the same WhatsApp bubble.

Changes:
- Same-phone mode (from === to) always allowed, bypasses allowFrom check
- Echo detection via bounded Set (max 100) prevents infinite loops
- Configurable samePhoneMarker in config (default: "[same-phone]")
- Messages prefixed with marker so assistants know the context
- fromMe filter removed from inbound.ts (echo detection in auto-reply)
- Verbose logging for same-phone detection and echo skips

Tests:
- Same-phone allowed without/despite allowFrom configuration
- Body prefixed only when from === to
- Non-same-phone rejected when not in allowFrom
2025-11-29 04:52:21 +00:00
30 changed files with 2255 additions and 265 deletions
+16
View File
@@ -36,3 +36,19 @@
## Agent-Specific Notes
- If the relay is running in tmux (`warelay-relay`), restart it after code changes: kill pane/session and run `pnpm warelay relay --verbose` inside tmux. Check tmux before editing; keep the watcher healthy if you start it.
- Also read the shared guardrails at `~/Projects/oracle/AGENTS.md` and `~/Projects/agent-scripts/AGENTS.MD` before making changes; align with any cross-repo rules noted there.
## Exclamation Mark Escaping Workaround
The Claude Code Bash tool escapes `!` to `\!` in command arguments. When using `warelay send` with messages containing exclamation marks, use heredoc syntax:
```bash
# WRONG - will send "Hello\!" with backslash
warelay send --provider web --to "+1234" --message 'Hello!'
# CORRECT - use heredoc to avoid escaping
warelay send --provider web --to "+1234" --message "$(cat <<'EOF'
Hello!
EOF
)"
```
This is a Claude Code quirk, not a warelay bug.
+30
View File
@@ -1,10 +1,40 @@
# Changelog
## 1.3.0 — Unreleased
### Highlights
- **Pluggable agents (Claude, Pi, Codex, Opencode):** New `inbound.reply.agent` block chooses the CLI and parser per command reply; per-agent argv builders inject the right flags/identity/prompt handling and parse NDJSON streams, enabling Pi/Codex swaps without changing templates.
### Bug Fixes
- **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`.
- **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`.
- **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses.
- **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files.
- **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently.
### Changes
- **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise.
- **Batched inbound messaging with timestamps:** When multiple WhatsApp messages queue up, theyre sent to the agent in one combined batch, each line timestamped consistently to preserve ordering and context.
- **Typing indicator after IPC send:** After sending a message via IPC (e.g., `warelay send`), the relay now automatically shows the typing indicator ("composing") to signal that more messages may be coming.
- **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives.
- **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter.
- **Test isolation improvements:** Mock `loadConfig()` in all test files to prevent loading real user config (with emojis/prefixes) during tests. Default test config now has no prefixes/timestamps for cleaner assertions.
- **Same-phone mode (self-messaging):** warelay now supports running on the same phone number you message from. This enables setups where you chat with yourself to control an AI assistant. Same-phone mode (`from === to`) is always allowed, even without configuring `allowFrom`. Echo detection prevents infinite loops by tracking recently sent message text and skipping auto-replies when incoming messages match.
- **Echo detection:** The `fromMe` filter in `inbound.ts` is deliberately removed for same-phone setups; instead, text-based echo detection in `auto-reply.ts` tracks sent messages in a bounded Set (max 100 entries) and skips processing when a match is found.
- **Same-phone detection logging:** Verbose mode now logs `📱 Same-phone mode detected` when `from === to`.
- **Configurable same-phone marker:** New `inbound.samePhoneMarker` config option to customize the prefix added to messages in same-phone mode (default: `[same-phone]`). Set it to something cute like `[🦞 same-phone]` to help distinguish bot replies.
## 1.2.2 — 2025-11-28
### Changes
- **Manual heartbeat sends:** `warelay heartbeat` accepts `--message/--body` with `--provider web|twilio` to push real outbound messages through the same plumbing; `--dry-run` previews payloads without sending.
## Unreleased
### Changes
- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests.
- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs.
## 1.2.1 — 2025-11-28
### Changes
Symlink
+1
View File
@@ -0,0 +1 @@
AGENTS.md
+13
View File
@@ -93,6 +93,16 @@ Install from npm (global): `npm install -g warelay` (Node 22+). Then choose **on
Best practice: use a dedicated WhatsApp account (separate SIM/eSIM or business account) for automation instead of your primary personal account to avoid unexpected logouts or rate limits.
### Same-phone mode (self-messaging)
warelay supports running on the same phone number you message from—you chat with yourself and an AI assistant replies in the same bubble. This requires:
- Adding your own number to `allowFrom` in `warelay.json`
- The `fromMe` filter is disabled; echo detection in `auto-reply.ts` prevents loops
**Gotchas:**
- Messages appear in the same chat bubble (WhatsApp "Note to self")
- Echo detection relies on exact text matching; if the reply is identical to your input, it may be skipped
- Works best with a dedicated WhatsApp account
## Configuration
### Environment (.env)
@@ -157,6 +167,9 @@ Best practice: use a dedicated WhatsApp account (separate SIM/eSIM or business a
| Key | Type & default | Notes |
| --- | --- | --- |
| `inbound.allowFrom` | `string[]` (default: empty) | E.164 numbers allowed to trigger auto-reply (no `whatsapp:`); `"*"` allows any sender. |
| `inbound.messagePrefix` | `string` (default: `"[warelay]"` if no allowFrom, else `""`) | Prefix added to all inbound messages before passing to command. |
| `inbound.responsePrefix` | `string` (default: —) | Prefix auto-added to all outbound replies (e.g., `"🦞"`). |
| `inbound.timestampPrefix` | `boolean \| string` (default: `true`) | Timestamp prefix: `true` (UTC), `false` (disabled), or IANA timezone like `"Europe/Vienna"`. |
| `inbound.reply.mode` | `"text"` \| `"command"` (default: —) | Reply style. |
| `inbound.reply.text` | `string` (default: —) | Used when `mode=text`; templating supported. |
| `inbound.reply.command` | `string[]` (default: —) | Argv for `mode=command`; each element templated. Stdout (trimmed) is sent. |
Regular → Executable
View File
+77
View File
@@ -0,0 +1,77 @@
# Agent Abstraction Refactor Plan
Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact.
## Overview
- Introduce a pluggable agent layer (`src/agents/*`), selected by config.
- Normalize config (`agent` block) and remove `claudeOutputFormat` legacy knobs.
- Provide per-agent argv builders and output parsers (including NDJSON streams).
- Preserve MEDIA-token handling and shared queue/heartbeat behavior.
## Configuration
- New shape (no backward compat):
```json5
inbound: {
reply: {
mode: "command",
agent: {
kind: "claude" | "opencode" | "pi" | "codex",
format?: "text" | "json",
identityPrefix?: string
},
command: ["claude", "{{Body}}"],
cwd?: string,
session?: { ... },
timeoutSeconds?: number,
bodyPrefix?: string,
mediaUrl?: string,
mediaMaxMb?: number,
typingIntervalSeconds?: number,
heartbeatMinutes?: number
}
}
```
- Validation moves to `config.ts` (new `AgentKind`/`AgentConfig` types).
- If `agent` is missing → config error.
## Agent modules
- `src/agents/types.ts` `AgentKind`, `AgentSpec`:
- `buildArgs(argv: string[], body: string, ctx: { sessionId?, isNewSession?, sendSystemOnce?, systemSent?, identityPrefix? }): string[]`
- `parse(stdout: string): { text?: string; mediaUrls?: string[]; meta?: AgentMeta }`
- `src/agents/claude.ts` current flag injection (`--output-format`, `-p`), identity prepend.
- `src/agents/opencode.ts` reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix.
- `src/agents/pi.ts` parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags.
- `src/agents/codex.ts` parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only.
- Shared MEDIA extraction stays in `media/parse.ts`.
## Command runner changes
- `runCommandReply`:
- Resolve agent spec from config.
- Apply `buildArgs` (handles identity prepend and session args per agent).
- Run command; send stdout to `spec.parse` → `text`, `mediaUrls`, `meta` (stored as `agentMeta`).
- Remove `claudeMeta` naming; tests updated to `agentMeta`.
## Sessions
- Session arg defaults become agent-specific (Claude: `--resume/--session-id`; Opencode/Pi/Codex: `--session`).
- Still overridable via `sessionArgNew/sessionArgResume` in config.
## Tests
- Update existing tests to new config (no `claudeOutputFormat`).
- Add fixtures:
- Opencode NDJSON sample (from PR #5) → parsed text + meta.
- Codex NDJSON sample (captured: thread/turn/item/usage) → parsed text.
- Pi NDJSON sample (AssistantMessageEvent) → parsed text.
- Ensure MEDIA token parsing works on agent text output.
## Docs
- README: rename “Claude-aware” → “Multi-agent (Claude, Codex, Pi, Opencode)”.
- New short guide per agent (Opencode doc from PR #5; add Codex/Pi snippets).
- Mention identityPrefix override and session arg differences.
## Migration
- Breaking change: configs must specify `agent`. Remove old `claudeOutputFormat` keys.
- Provide migration note in CHANGELOG 1.3.x.
## Out of scope
- No media binary support; still relies on MEDIA tokens in text.
- No UI changes; WhatsApp/Twilio plumbing unchanged.
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "warelay",
"version": "1.2.2",
"version": "1.3.0",
"description": "WhatsApp relay CLI (send, monitor, webhook, auto-reply) using Twilio",
"type": "module",
"main": "dist/index.js",
+118
View File
@@ -0,0 +1,118 @@
import { describe, expect, it } from "vitest";
import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js";
import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js";
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
describe("agent buildArgs + parseOutput helpers", () => {
it("claudeSpec injects flags and identity once", () => {
const argv = ["claude", "hi"];
const built = claudeSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built).toContain("--output-format");
expect(built).toContain("json");
expect(built).toContain("-p");
expect(built.at(-1)).toContain(CLAUDE_IDENTITY_PREFIX);
const builtNoIdentity = claudeSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: false,
sessionId: "sess",
sendSystemOnce: true,
systemSent: true,
identityPrefix: undefined,
format: "json",
});
expect(builtNoIdentity.at(-1)).not.toContain(CLAUDE_IDENTITY_PREFIX);
});
it("opencodeSpec adds format flag and identity prefix when needed", () => {
const argv = ["opencode", "body"];
const built = opencodeSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built).toContain("--format");
expect(built).toContain("json");
expect(built.at(-1)).toContain(OPENCODE_IDENTITY_PREFIX);
});
it("piSpec parses final assistant message and preserves usage meta", () => {
const stdout = [
'{"type":"message_start","message":{"role":"assistant"}}',
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}',
].join("\n");
const parsed = piSpec.parseOutput(stdout);
expect(parsed.text).toBe("hello world");
expect(parsed.meta?.provider).toBe("inflection");
expect((parsed.meta?.usage as { output?: number })?.output).toBe(5);
});
it("codexSpec parses agent_message and aggregates usage", () => {
const stdout = [
'{"type":"item.completed","item":{"type":"agent_message","text":"hi there"}}',
'{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}',
].join("\n");
const parsed = codexSpec.parseOutput(stdout);
expect(parsed.text).toBe("hi there");
const usage = parsed.meta?.usage as {
input?: number;
output?: number;
cacheRead?: number;
total?: number;
};
expect(usage?.input).toBe(50);
expect(usage?.output).toBe(10);
expect(usage?.cacheRead).toBe(5);
expect(usage?.total).toBe(65);
});
it("opencodeSpec parses streamed events and summarizes meta", () => {
const stdout = [
'{"type":"step_start","timestamp":0}',
'{"type":"text","part":{"text":"hi"}}',
'{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}',
].join("\n");
const parsed = opencodeSpec.parseOutput(stdout);
expect(parsed.text).toBe("hi");
expect(parsed.meta?.extra?.summary).toContain("duration=1200ms");
expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020");
expect(parsed.meta?.extra?.summary).toContain("tokens=100+20");
});
it("codexSpec buildArgs enforces exec/json/sandbox defaults", () => {
const argv = ["codex", "hello world"];
const built = codexSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built[1]).toBe("exec");
expect(built).toContain("--json");
expect(built).toContain("--skip-git-repo-check");
expect(built).toContain("read-only");
});
});
+67
View File
@@ -0,0 +1,67 @@
import path from "node:path";
import {
CLAUDE_BIN,
CLAUDE_IDENTITY_PREFIX,
type ClaudeJsonParseResult,
parseClaudeJson,
summarizeClaudeMetadata,
} from "../auto-reply/claude.js";
import type { AgentMeta, AgentSpec } from "./types.js";
function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined {
if (!parsed?.parsed) return undefined;
const summary = summarizeClaudeMetadata(parsed.parsed);
return summary ? { extra: { summary } } : undefined;
}
export const claudeSpec: AgentSpec = {
kind: "claude",
isInvocation: (argv) =>
argv.length > 0 && path.basename(argv[0]) === CLAUDE_BIN,
buildArgs: (ctx) => {
// Split around the body so we can inject flags without losing the body
// position. This keeps templated prompts intact even when we add flags.
const argv = [...ctx.argv];
const body = argv[ctx.bodyIndex] ?? "";
const beforeBody = argv.slice(0, ctx.bodyIndex);
const afterBody = argv.slice(ctx.bodyIndex + 1);
const wantsOutputFormat = typeof ctx.format === "string";
if (wantsOutputFormat) {
const hasOutputFormat = argv.some(
(part) =>
part === "--output-format" || part.startsWith("--output-format="),
);
if (!hasOutputFormat) {
const outputFormat = ctx.format ?? "json";
beforeBody.push("--output-format", outputFormat);
}
}
const hasPrintFlag = argv.some(
(part) => part === "-p" || part === "--print",
);
if (!hasPrintFlag) {
beforeBody.push("-p");
}
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
const bodyWithIdentity =
shouldPrependIdentity && body
? [ctx.identityPrefix ?? CLAUDE_IDENTITY_PREFIX, body]
.filter(Boolean)
.join("\n\n")
: body;
return [...beforeBody, bodyWithIdentity, ...afterBody];
},
parseOutput: (rawStdout) => {
const parsed = parseClaudeJson(rawStdout);
const text = parsed?.text ?? rawStdout.trim();
return {
text: text?.trim(),
meta: toMeta(parsed),
};
},
};
+79
View File
@@ -0,0 +1,79 @@
import path from "node:path";
import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
function parseCodexJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let text: string | undefined;
let meta: AgentMeta | undefined;
for (const line of lines) {
try {
const ev = JSON.parse(line) as {
type?: string;
item?: { type?: string; text?: string };
usage?: unknown;
};
// Codex streams multiple events; capture the last agent_message text and
// the final turn usage for cost/telemetry.
if (
ev.type === "item.completed" &&
ev.item?.type === "agent_message" &&
typeof ev.item.text === "string"
) {
text = ev.item.text;
}
if (
ev.type === "turn.completed" &&
ev.usage &&
typeof ev.usage === "object"
) {
const u = ev.usage as {
input_tokens?: number;
cached_input_tokens?: number;
output_tokens?: number;
};
meta = {
usage: {
input: u.input_tokens,
output: u.output_tokens,
cacheRead: u.cached_input_tokens,
total:
(u.input_tokens ?? 0) +
(u.output_tokens ?? 0) +
(u.cached_input_tokens ?? 0),
},
};
}
} catch {
// ignore
}
}
return { text: text?.trim(), meta };
}
export const codexSpec: AgentSpec = {
kind: "codex",
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "codex",
buildArgs: (ctx) => {
const argv = [...ctx.argv];
const hasExec = argv.length > 0 && argv[1] === "exec";
if (!hasExec) {
argv.splice(1, 0, "exec");
}
// Ensure JSON output
if (!argv.includes("--json")) {
argv.splice(argv.length - 1, 0, "--json");
}
// Safety defaults
if (!argv.includes("--skip-git-repo-check")) {
argv.splice(argv.length - 1, 0, "--skip-git-repo-check");
}
if (!argv.some((p) => p === "--sandbox" || p.startsWith("--sandbox="))) {
argv.splice(argv.length - 1, 0, "--sandbox", "read-only");
}
return argv;
},
parseOutput: parseCodexJson,
};
+18
View File
@@ -0,0 +1,18 @@
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
import type { AgentKind, AgentSpec } from "./types.js";
const specs: Record<AgentKind, AgentSpec> = {
claude: claudeSpec,
codex: codexSpec,
opencode: opencodeSpec,
pi: piSpec,
};
export function getAgentSpec(kind: AgentKind): AgentSpec {
return specs[kind];
}
export { AgentKind, AgentMeta, AgentParseResult } from "./types.js";
+62
View File
@@ -0,0 +1,62 @@
import path from "node:path";
import {
OPENCODE_BIN,
OPENCODE_IDENTITY_PREFIX,
parseOpencodeJson,
summarizeOpencodeMetadata,
} from "../auto-reply/opencode.js";
import type { AgentMeta, AgentSpec } from "./types.js";
function toMeta(
parsed: ReturnType<typeof parseOpencodeJson>,
): AgentMeta | undefined {
const summary = summarizeOpencodeMetadata(parsed.meta);
return summary ? { extra: { summary } } : undefined;
}
export const opencodeSpec: AgentSpec = {
kind: "opencode",
isInvocation: (argv) =>
argv.length > 0 && path.basename(argv[0]) === OPENCODE_BIN,
buildArgs: (ctx) => {
// Split around the body so we can insert flags without losing the prompt.
const argv = [...ctx.argv];
const body = argv[ctx.bodyIndex] ?? "";
const beforeBody = argv.slice(0, ctx.bodyIndex);
const afterBody = argv.slice(ctx.bodyIndex + 1);
const wantsJson = ctx.format === "json";
// Ensure format json for parsing
if (wantsJson) {
const hasFormat = [...beforeBody, body, ...afterBody].some(
(part) => part === "--format" || part.startsWith("--format="),
);
if (!hasFormat) {
beforeBody.push("--format", "json");
}
}
// Session args default to --session
// Identity prefix
// Opencode streams text tokens; we still seed an identity so the agent
// keeps context on first turn.
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
const bodyWithIdentity =
shouldPrependIdentity && body
? [ctx.identityPrefix ?? OPENCODE_IDENTITY_PREFIX, body]
.filter(Boolean)
.join("\n\n")
: body;
return [...beforeBody, bodyWithIdentity, ...afterBody];
},
parseOutput: (rawStdout) => {
const parsed = parseOpencodeJson(rawStdout);
const text = parsed.text ?? rawStdout.trim();
return {
text: text?.trim(),
meta: toMeta(parsed),
};
},
};
+75
View File
@@ -0,0 +1,75 @@
import path from "node:path";
import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
type PiAssistantMessage = {
role?: string;
content?: Array<{ type?: string; text?: string }>;
usage?: { input?: number; output?: number };
model?: string;
provider?: string;
stopReason?: string;
};
function parsePiJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let lastMessage: PiAssistantMessage | undefined;
for (const line of lines) {
try {
const ev = JSON.parse(line) as {
type?: string;
message?: PiAssistantMessage;
};
// Pi emits a stream; we only care about the terminal assistant message_end.
if (ev.type === "message_end" && ev.message?.role === "assistant") {
lastMessage = ev.message;
}
} catch {
// ignore
}
}
const text =
lastMessage?.content
?.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
?.trim() ?? undefined;
const meta: AgentMeta | undefined = lastMessage
? {
model: lastMessage.model,
provider: lastMessage.provider,
stopReason: lastMessage.stopReason,
usage: lastMessage.usage,
}
: undefined;
return { text, meta };
}
export const piSpec: AgentSpec = {
kind: "pi",
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi",
buildArgs: (ctx) => {
const argv = [...ctx.argv];
// Non-interactive print + JSON
if (!argv.includes("-p") && !argv.includes("--print")) {
argv.splice(argv.length - 1, 0, "-p");
}
if (
ctx.format === "json" &&
!argv.includes("--mode") &&
!argv.some((a) => a === "--mode")
) {
argv.splice(argv.length - 1, 0, "--mode", "json");
}
// Session defaults
// Identity prefix optional; Pi usually doesn't need it, but allow injection
if (!(ctx.sendSystemOnce && ctx.systemSent) && argv[ctx.bodyIndex]) {
const existingBody = argv[ctx.bodyIndex];
argv[ctx.bodyIndex] = [ctx.identityPrefix, existingBody]
.filter(Boolean)
.join("\n\n");
}
return argv;
},
parseOutput: parsePiJson,
};
+41
View File
@@ -0,0 +1,41 @@
export type AgentKind = "claude" | "opencode" | "pi" | "codex";
export type AgentMeta = {
model?: string;
provider?: string;
stopReason?: string;
usage?: {
input?: number;
output?: number;
cacheRead?: number;
cacheWrite?: number;
total?: number;
};
extra?: Record<string, unknown>;
};
export type AgentParseResult = {
text?: string;
mediaUrls?: string[];
meta?: AgentMeta;
};
export type BuildArgsContext = {
argv: string[];
bodyIndex: number; // index of prompt/body argument in argv
isNewSession: boolean;
sessionId?: string;
sendSystemOnce: boolean;
systemSent: boolean;
identityPrefix?: string;
format?: "text" | "json";
sessionArgNew?: string[];
sessionArgResume?: string[];
};
export interface AgentSpec {
kind: AgentKind;
isInvocation: (argv: string[]) => boolean;
buildArgs: (ctx: BuildArgsContext) => string[];
parseOutput: (rawStdout: string) => AgentParseResult;
}
+3
View File
@@ -160,3 +160,6 @@ export function parseClaudeJsonText(raw: string): string | undefined {
const parsed = parseClaudeJson(raw);
return parsed?.text;
}
// Re-export from command-reply for backwards compatibility
export { summarizeClaudeMetadata } from "./command-reply.js";
+105 -11
View File
@@ -70,7 +70,7 @@ describe("runCommandReply", () => {
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
@@ -98,7 +98,7 @@ describe("runCommandReply", () => {
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: true,
@@ -121,7 +121,7 @@ describe("runCommandReply", () => {
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: true,
@@ -144,7 +144,7 @@ describe("runCommandReply", () => {
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: true,
@@ -167,6 +167,7 @@ describe("runCommandReply", () => {
reply: {
mode: "command",
command: ["cli", "{{Body}}"],
agent: { kind: "claude" },
session: {
sessionArgNew: ["--new", "{{SessionId}}"],
sessionArgResume: ["--resume", "{{SessionId}}"],
@@ -192,7 +193,11 @@ describe("runCommandReply", () => {
throw { stdout: "partial output here", killed: true, signal: "SIGKILL" };
});
const { payload, meta } = await runCommandReply({
reply: { mode: "command", command: ["echo", "hi"] },
reply: {
mode: "command",
command: ["echo", "hi"],
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
@@ -213,7 +218,12 @@ describe("runCommandReply", () => {
throw { stdout: "", killed: true, signal: "SIGKILL" };
});
const { payload } = await runCommandReply({
reply: { mode: "command", command: ["echo", "hi"], cwd: "/tmp/work" },
reply: {
mode: "command",
command: ["echo", "hi"],
cwd: "/tmp/work",
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
@@ -235,7 +245,12 @@ describe("runCommandReply", () => {
stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`,
});
const { payload } = await runCommandReply({
reply: { mode: "command", command: ["echo", "hi"], mediaMaxMb: 1 },
reply: {
mode: "command",
command: ["echo", "hi"],
mediaMaxMb: 1,
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
@@ -259,7 +274,7 @@ describe("runCommandReply", () => {
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
@@ -271,14 +286,18 @@ describe("runCommandReply", () => {
commandRunner: runner,
enqueue: enqueueImmediate,
});
expect(meta.claudeMeta).toContain("duration=50ms");
expect(meta.claudeMeta).toContain("tool_calls=1");
expect(meta.agentMeta?.extra?.summary).toContain("duration=50ms");
expect(meta.agentMeta?.extra?.summary).toContain("tool_calls=1");
});
it("captures queue wait metrics in meta", async () => {
const runner = makeRunner({ stdout: "ok" });
const { meta } = await runCommandReply({
reply: { mode: "command", command: ["echo", "{{Body}}"] },
reply: {
mode: "command",
command: ["echo", "{{Body}}"],
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
@@ -292,4 +311,79 @@ describe("runCommandReply", () => {
expect(meta.queuedMs).toBe(25);
expect(meta.queuedAhead).toBe(2);
});
it("handles empty result string without dumping raw JSON", async () => {
// Bug fix: Claude CLI returning {"result": ""} should not send raw JSON to WhatsApp
// The fix changed from truthy check to explicit typeof check
const runner = makeRunner({
stdout: '{"result":"","duration_ms":50,"total_cost_usd":0.001}',
});
const { payload } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: runner,
enqueue: enqueueImmediate,
});
// Should NOT contain raw JSON - empty result should produce fallback message
expect(payload?.text).not.toContain('{"result"');
expect(payload?.text).toContain("command produced no output");
});
it("handles empty text string in Claude JSON", async () => {
const runner = makeRunner({
stdout: '{"text":"","duration_ms":50}',
});
const { payload } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: runner,
enqueue: enqueueImmediate,
});
// Empty text should produce fallback message, not raw JSON
expect(payload?.text).not.toContain('{"text"');
expect(payload?.text).toContain("command produced no output");
});
it("returns actual text when result is non-empty", async () => {
const runner = makeRunner({
stdout: '{"result":"hello world","duration_ms":50}',
});
const { payload } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
agent: { kind: "claude", format: "json" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: runner,
enqueue: enqueueImmediate,
});
expect(payload?.text).toBe("hello world");
});
});
+58 -83
View File
@@ -1,18 +1,14 @@
import fs from "node:fs/promises";
import path from "node:path";
import { type AgentKind, getAgentSpec } from "../agents/index.js";
import type { AgentMeta } from "../agents/types.js";
import type { WarelayConfig } from "../config/config.js";
import { isVerbose, logVerbose } from "../globals.js";
import { logError } from "../logger.js";
import { splitMediaFromOutput } from "../media/parse.js";
import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js";
import {
CLAUDE_BIN,
CLAUDE_IDENTITY_PREFIX,
type ClaudeJsonParseResult,
parseClaudeJson,
} from "./claude.js";
import { applyTemplate, type TemplateContext } from "./templating.js";
import type { ReplyPayload } from "./types.js";
@@ -42,7 +38,7 @@ export type CommandReplyMeta = {
exitCode?: number | null;
signal?: string | null;
killed?: boolean;
claudeMeta?: string;
agentMeta?: AgentMeta;
};
export type CommandReplyResult = {
@@ -119,6 +115,9 @@ export async function runCommandReply(
if (!reply.command?.length) {
throw new Error("reply.command is required for mode=command");
}
const agentCfg = reply.agent ?? { kind: "claude" };
const agentKind: AgentKind = agentCfg.kind ?? "claude";
const agent = getAgentSpec(agentKind);
let argv = reply.command.map((part) => applyTemplate(part, templatingCtx));
const templatePrefix =
@@ -129,41 +128,24 @@ export async function runCommandReply(
argv = [argv[0], templatePrefix, ...argv.slice(1)];
}
// Ensure Claude commands can emit plain text by forcing --output-format when configured.
if (
reply.claudeOutputFormat &&
argv.length > 0 &&
path.basename(argv[0]) === CLAUDE_BIN
) {
const hasOutputFormat = argv.some(
(part) =>
part === "--output-format" || part.startsWith("--output-format="),
);
const insertBeforeBody = Math.max(argv.length - 1, 0);
if (!hasOutputFormat) {
argv = [
...argv.slice(0, insertBeforeBody),
"--output-format",
reply.claudeOutputFormat,
...argv.slice(insertBeforeBody),
];
}
const hasPrintFlag = argv.some(
(part) => part === "-p" || part === "--print",
);
if (!hasPrintFlag) {
const insertIdx = Math.max(argv.length - 1, 0);
argv = [...argv.slice(0, insertIdx), "-p", ...argv.slice(insertIdx)];
}
}
// Default body index is last arg
let bodyIndex = Math.max(argv.length - 1, 0);
// Inject session args if configured (use resume for existing, session-id for new)
// Session args prepared (templated) and injected generically
if (reply.session) {
const defaultNew =
agentCfg.kind === "claude"
? ["--session-id", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const defaultResume =
agentCfg.kind === "claude"
? ["--resume", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const sessionArgList = (
isNewSession
? (reply.session.sessionArgNew ?? ["--session-id", "{{SessionId}}"])
: (reply.session.sessionArgResume ?? ["--resume", "{{SessionId}}"])
).map((part) => applyTemplate(part, templatingCtx));
? (reply.session.sessionArgNew ?? defaultNew)
: (reply.session.sessionArgResume ?? defaultResume)
).map((p) => applyTemplate(p, templatingCtx));
if (sessionArgList.length) {
const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true;
const insertAt =
@@ -173,22 +155,24 @@ export async function runCommandReply(
...sessionArgList,
...argv.slice(insertAt),
];
bodyIndex = Math.max(argv.length - 1, 0);
}
}
let finalArgv = argv;
const isClaudeInvocation =
finalArgv.length > 0 && path.basename(finalArgv[0]) === CLAUDE_BIN;
const shouldPrependIdentity =
isClaudeInvocation && !(sendSystemOnce && systemSent);
if (shouldPrependIdentity && finalArgv.length > 0) {
const bodyIdx = finalArgv.length - 1;
const existingBody = finalArgv[bodyIdx] ?? "";
finalArgv = [
...finalArgv.slice(0, bodyIdx),
[CLAUDE_IDENTITY_PREFIX, existingBody].filter(Boolean).join("\n\n"),
];
}
const shouldApplyAgent = agent.isInvocation(argv);
const finalArgv = shouldApplyAgent
? agent.buildArgs({
argv,
bodyIndex,
isNewSession,
sessionId: templatingCtx.SessionId,
sendSystemOnce,
systemSent,
identityPrefix: agentCfg.identityPrefix,
format: agentCfg.format,
})
: argv;
logVerbose(
`Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`,
);
@@ -217,28 +201,14 @@ export async function runCommandReply(
if (stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
}
let parsed: ClaudeJsonParseResult | undefined;
if (
trimmed &&
(reply.claudeOutputFormat === "json" || isClaudeInvocation)
) {
parsed = parseClaudeJson(trimmed);
if (parsed?.parsed && isVerbose()) {
const summary = summarizeClaudeMetadata(parsed.parsed);
if (summary) logVerbose(`Claude JSON meta: ${summary}`);
logVerbose(
`Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`,
);
}
if (parsed?.text) {
logVerbose(
`Claude JSON parsed -> ${parsed.text.slice(0, 120)}${parsed.text.length > 120 ? "…" : ""}`,
);
trimmed = parsed.text.trim();
} else {
logVerbose("Claude JSON parse failed; returning raw stdout");
}
const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
// Treat empty string as "no content" so we can fall back to the friendly
// "(command produced no output)" message instead of echoing raw JSON.
if (parsed && parsed.text !== undefined) {
trimmed = parsed.text.trim();
}
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(trimmed);
trimmed = cleanedText;
@@ -249,7 +219,7 @@ export async function runCommandReply(
logVerbose("No MEDIA token extracted from final text");
}
if (!trimmed && !mediaFromCommand) {
const meta = parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined;
const meta = parsed?.meta?.extra?.summary ?? undefined;
trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`;
logVerbose("No text/media produced; injecting fallback notice to user");
}
@@ -259,8 +229,13 @@ export async function runCommandReply(
console.error(
`Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`,
);
// Include any partial output or stderr in error message
const partialOut = trimmed
? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}`
: "";
const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`;
return {
payload: undefined,
payload: { text: errorText },
meta: {
durationMs: Date.now() - started,
queuedMs,
@@ -268,9 +243,7 @@ export async function runCommandReply(
exitCode: code,
signal,
killed,
claudeMeta: parsed
? summarizeClaudeMetadata(parsed.parsed)
: undefined,
agentMeta: parsed?.meta,
},
};
}
@@ -278,8 +251,9 @@ export async function runCommandReply(
console.error(
`Command auto-reply process killed before completion (exit code ${code ?? "unknown"})`,
);
const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`;
return {
payload: undefined,
payload: { text: errorText },
meta: {
durationMs: Date.now() - started,
queuedMs,
@@ -287,9 +261,7 @@ export async function runCommandReply(
exitCode: code,
signal,
killed,
claudeMeta: parsed
? summarizeClaudeMetadata(parsed.parsed)
: undefined,
agentMeta: parsed?.meta,
},
};
}
@@ -337,7 +309,7 @@ export async function runCommandReply(
exitCode: code,
signal,
killed,
claudeMeta: parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined,
agentMeta: parsed?.meta,
};
if (isVerbose()) {
logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`);
@@ -379,8 +351,11 @@ export async function runCommandReply(
};
}
logError(`Command auto-reply failed after ${elapsed}ms: ${String(err)}`);
// Send error message to user so they know the command failed
const errMsg = err instanceof Error ? err.message : String(err);
const errorText = `⚠️ Command failed: ${errMsg}`;
return {
payload: undefined,
payload: { text: errorText },
meta: {
durationMs: elapsed,
queuedMs,
+104
View File
@@ -0,0 +1,104 @@
// Helpers specific to Opencode CLI output/argv handling.
// Preferred binary name for Opencode CLI invocations.
export const OPENCODE_BIN = "opencode";
export const OPENCODE_IDENTITY_PREFIX =
"You are Openclawd running on the user's Mac via warelay. Your scratchpad is /Users/steipete/openclawd; this is your folder and you can add what you like in markdown files and/or images. You don't need to be concise, but WhatsApp replies must stay under ~1500 characters. Media you can send: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK.";
export type OpencodeJsonParseResult = {
text?: string;
parsed: unknown[];
valid: boolean;
meta?: {
durationMs?: number;
cost?: number;
tokens?: {
input?: number;
output?: number;
};
};
};
export function parseOpencodeJson(raw: string): OpencodeJsonParseResult {
const lines = raw.split(/\n+/).filter((s) => s.trim());
const parsed: unknown[] = [];
let text = "";
let valid = false;
let startTime: number | undefined;
let endTime: number | undefined;
let cost = 0;
let inputTokens = 0;
let outputTokens = 0;
for (const line of lines) {
try {
const event = JSON.parse(line);
parsed.push(event);
if (event && typeof event === "object") {
// Opencode emits a stream of events.
if (event.type === "step_start") {
valid = true;
if (typeof event.timestamp === "number") {
if (startTime === undefined || event.timestamp < startTime) {
startTime = event.timestamp;
}
}
}
if (event.type === "text" && event.part?.text) {
text += event.part.text;
valid = true;
}
if (event.type === "step_finish") {
valid = true;
if (typeof event.timestamp === "number") {
endTime = event.timestamp;
}
if (event.part) {
if (typeof event.part.cost === "number") {
cost += event.part.cost;
}
if (event.part.tokens) {
inputTokens += event.part.tokens.input || 0;
outputTokens += event.part.tokens.output || 0;
}
}
}
}
} catch {
// ignore non-JSON lines
}
}
const meta: OpencodeJsonParseResult["meta"] = {};
if (startTime !== undefined && endTime !== undefined) {
meta.durationMs = endTime - startTime;
}
if (cost > 0) meta.cost = cost;
if (inputTokens > 0 || outputTokens > 0) {
meta.tokens = { input: inputTokens, output: outputTokens };
}
return {
text: text || undefined,
parsed,
valid: valid && parsed.length > 0,
meta: Object.keys(meta).length > 0 ? meta : undefined,
};
}
export function summarizeOpencodeMetadata(
meta: OpencodeJsonParseResult["meta"],
): string | undefined {
if (!meta) return undefined;
const parts: string[] = [];
if (meta.durationMs !== undefined)
parts.push(`duration=${meta.durationMs}ms`);
if (meta.cost !== undefined) parts.push(`cost=$${meta.cost.toFixed(4)}`);
if (meta.tokens) {
parts.push(`tokens=${meta.tokens.input}+${meta.tokens.output}`);
}
return parts.length ? parts.join(", ") : undefined;
}
+10 -4
View File
@@ -146,8 +146,14 @@ export async function getReplyFromConfig(
// Optional allowlist by origin number (E.164 without whatsapp: prefix)
const allowFrom = cfg.inbound?.allowFrom;
if (Array.isArray(allowFrom) && allowFrom.length > 0) {
const from = (ctx.From ?? "").replace(/^whatsapp:/, "");
const from = (ctx.From ?? "").replace(/^whatsapp:/, "");
const to = (ctx.To ?? "").replace(/^whatsapp:/, "");
const isSamePhone = from && to && from === to;
// Same-phone mode (self-messaging) is always allowed
if (isSamePhone) {
logVerbose(`Allowing same-phone mode: from === to (${from})`);
} else if (Array.isArray(allowFrom) && allowFrom.length > 0) {
// Support "*" as wildcard to allow all senders
if (!allowFrom.includes("*") && !allowFrom.includes(from)) {
logVerbose(
@@ -259,8 +265,8 @@ export async function getReplyFromConfig(
timeoutSeconds,
commandRunner,
});
if (meta.claudeMeta && isVerbose()) {
logVerbose(`Claude JSON meta: ${meta.claudeMeta}`);
if (meta.agentMeta && isVerbose()) {
logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
}
return payload;
} finally {
+4
View File
@@ -4,6 +4,10 @@ import type { CliDeps } from "../cli/deps.js";
import type { RuntimeEnv } from "../runtime.js";
import { sendCommand } from "./send.js";
vi.mock("../web/ipc.js", () => ({
sendViaIpc: vi.fn().mockResolvedValue(null),
}));
const runtime: RuntimeEnv = {
log: vi.fn(),
error: vi.fn(),
+37 -1
View File
@@ -1,7 +1,8 @@
import type { CliDeps } from "../cli/deps.js";
import { info } from "../globals.js";
import { info, success } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import type { Provider } from "../utils.js";
import { sendViaIpc } from "../web/ipc.js";
export async function sendCommand(
opts: {
@@ -39,6 +40,40 @@ export async function sendCommand(
if (waitSeconds !== 0) {
runtime.log(info("Wait/poll are Twilio-only; ignored for provider=web."));
}
// Try to send via IPC to running relay first (avoids Signal session corruption)
const ipcResult = await sendViaIpc(opts.to, opts.message, opts.media);
if (ipcResult) {
if (ipcResult.success) {
runtime.log(
success(`✅ Sent via relay IPC. Message ID: ${ipcResult.messageId}`),
);
if (opts.json) {
runtime.log(
JSON.stringify(
{
provider: "web",
via: "ipc",
to: opts.to,
messageId: ipcResult.messageId,
mediaUrl: opts.media ?? null,
},
null,
2,
),
);
}
return;
}
// IPC failed but relay is running - warn and fall back
runtime.log(
info(
`IPC send failed (${ipcResult.error}), falling back to direct connection`,
),
);
}
// Fall back to direct connection (creates new Baileys socket)
const res = await deps
.sendMessageWeb(opts.to, opts.message, {
verbose: false,
@@ -53,6 +88,7 @@ export async function sendCommand(
JSON.stringify(
{
provider: "web",
via: "direct",
to: opts.to,
messageId: res.messageId,
mediaUrl: opts.media ?? null,
+34 -19
View File
@@ -5,8 +5,9 @@ import path from "node:path";
import JSON5 from "json5";
import { z } from "zod";
import type { AgentKind } from "../agents/index.js";
export type ReplyMode = "text" | "command";
export type ClaudeOutputFormat = "text" | "json" | "stream-json";
export type SessionScope = "per-sender" | "global";
export type SessionConfig = {
@@ -46,6 +47,9 @@ export type WarelayConfig = {
logging?: LoggingConfig;
inbound?: {
allowFrom?: string[]; // E.164 numbers allowed to trigger auto-reply (without whatsapp:)
messagePrefix?: string; // Prefix added to all inbound messages (default: "[warelay]" if no allowFrom, else "")
responsePrefix?: string; // Prefix auto-added to all outbound replies (e.g., "🦞")
timestampPrefix?: boolean | string; // true/false or IANA timezone string (default: true with UTC)
transcribeAudio?: {
// Optional CLI to turn inbound audio into text; templated args, must output transcript to stdout.
command: string[];
@@ -53,18 +57,22 @@ export type WarelayConfig = {
};
reply?: {
mode: ReplyMode;
text?: string; // for mode=text, can contain {{Body}}
command?: string[]; // for mode=command, argv with templates
cwd?: string; // working directory for command execution
template?: string; // prepend template string when building command/prompt
timeoutSeconds?: number; // optional command timeout; defaults to 600s
bodyPrefix?: string; // optional string prepended to Body before templating
mediaUrl?: string; // optional media attachment (path or URL)
text?: string;
command?: string[];
cwd?: string;
template?: string;
timeoutSeconds?: number;
bodyPrefix?: string;
mediaUrl?: string;
session?: SessionConfig;
claudeOutputFormat?: ClaudeOutputFormat; // when command starts with `claude`, force an output format
mediaMaxMb?: number; // optional cap for outbound media (default 5MB)
typingIntervalSeconds?: number; // how often to refresh typing indicator while command runs
heartbeatMinutes?: number; // auto-ping cadence for command mode
mediaMaxMb?: number;
typingIntervalSeconds?: number;
heartbeatMinutes?: number;
agent?: {
kind: AgentKind;
format?: "text" | "json";
identityPrefix?: string;
};
};
};
web?: WebConfig;
@@ -102,13 +110,17 @@ const ReplySchema = z
})
.optional(),
heartbeatMinutes: z.number().int().nonnegative().optional(),
claudeOutputFormat: z
.union([
z.literal("text"),
z.literal("json"),
z.literal("stream-json"),
z.undefined(),
])
agent: z
.object({
kind: z.union([
z.literal("claude"),
z.literal("opencode"),
z.literal("pi"),
z.literal("codex"),
]),
format: z.union([z.literal("text"), z.literal("json")]).optional(),
identityPrefix: z.string().optional(),
})
.optional(),
})
.refine(
@@ -139,6 +151,9 @@ const WarelaySchema = z.object({
inbound: z
.object({
allowFrom: z.array(z.string()).optional(),
messagePrefix: z.string().optional(),
responsePrefix: z.string().optional(),
timestampPrefix: z.union([z.boolean(), z.string()]).optional(),
transcribeAudio: z
.object({
command: z.array(z.string()),
+73 -3
View File
@@ -9,6 +9,18 @@ import { createMockTwilio } from "../test/mocks/twilio.js";
import * as exec from "./process/exec.js";
import { withWhatsAppPrefix } from "./utils.js";
// Mock config to avoid loading real user config
vi.mock("../src/config/config.js", () => ({
loadConfig: vi.fn().mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
}),
}));
// Twilio mock factory shared across tests
vi.mock("twilio", () => {
const { factory } = createMockTwilio();
@@ -93,6 +105,64 @@ describe("config and templating", () => {
expect(onReplyStart).toHaveBeenCalled();
});
it("getReplyFromConfig allows same-phone mode (from === to) without allowFrom", async () => {
const cfg = {
inbound: {
// No allowFrom configured
reply: {
mode: "text" as const,
text: "Echo: {{Body}}",
},
},
};
const result = await index.getReplyFromConfig(
{ Body: "hello", From: "+1555", To: "+1555" },
undefined,
cfg,
);
expect(result?.text).toBe("Echo: hello");
});
it("getReplyFromConfig allows same-phone mode even when not in allowFrom list", async () => {
const cfg = {
inbound: {
allowFrom: ["+9999"], // Different number
reply: {
mode: "text" as const,
text: "Reply: {{Body}}",
},
},
};
// Same-phone mode should bypass allowFrom check
const result = await index.getReplyFromConfig(
{ Body: "test", From: "+1555", To: "+1555" },
undefined,
cfg,
);
expect(result?.text).toBe("Reply: test");
});
it("getReplyFromConfig rejects non-same-phone when not in allowFrom", async () => {
const cfg = {
inbound: {
allowFrom: ["+9999"],
reply: {
mode: "text" as const,
text: "Should not see this",
},
},
};
const result = await index.getReplyFromConfig(
{ Body: "test", From: "+1555", To: "+2666" },
undefined,
cfg,
);
expect(result).toBeUndefined();
});
it("getReplyFromConfig templating includes media fields", async () => {
const cfg = {
inbound: {
@@ -692,7 +762,7 @@ describe("config and templating", () => {
reply: {
mode: "command" as const,
command: ["claude", "{{Body}}"],
claudeOutputFormat: "text" as const,
agent: { kind: "claude", format: "text" as const },
},
},
};
@@ -732,7 +802,7 @@ describe("config and templating", () => {
reply: {
mode: "command" as const,
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json" as const,
agent: { kind: "claude", format: "json" as const },
},
},
};
@@ -760,7 +830,7 @@ describe("config and templating", () => {
reply: {
mode: "command" as const,
command: ["claude", "{{Body}}"],
// No claudeOutputFormat set on purpose
agent: { kind: "claude" },
},
},
};
+387 -40
View File
@@ -1,3 +1,4 @@
import "./test-helpers.js";
import crypto from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
@@ -6,8 +7,8 @@ import sharp from "sharp";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { WarelayConfig } from "../config/config.js";
import { resolveStorePath } from "../config/sessions.js";
import { resetLogger, setLoggerOverride } from "../logging.js";
import * as commandQueue from "../process/command-queue.js";
import {
HEARTBEAT_PROMPT,
HEARTBEAT_TOKEN,
@@ -24,6 +25,18 @@ import {
setLoadConfigMock,
} from "./test-helpers.js";
const makeSessionStore = async (
entries: Record<string, unknown> = {},
): Promise<{ storePath: string; cleanup: () => Promise<void> }> => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-session-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify(entries));
return {
storePath,
cleanup: () => fs.rm(dir, { recursive: true, force: true }),
};
};
describe("heartbeat helpers", () => {
it("strips heartbeat token and skips when only token", () => {
expect(stripHeartbeatToken(undefined)).toEqual({
@@ -78,19 +91,9 @@ describe("heartbeat helpers", () => {
});
describe("resolveHeartbeatRecipients", () => {
const makeStore = async (entries: Record<string, { updatedAt: number }>) => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-heartbeat-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify(entries));
return {
storePath,
cleanup: async () => fs.rm(dir, { recursive: true, force: true }),
};
};
it("returns the sole session recipient", async () => {
const now = Date.now();
const store = await makeStore({ "+1000": { updatedAt: now } });
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["+1999"],
@@ -105,7 +108,7 @@ describe("resolveHeartbeatRecipients", () => {
it("surfaces ambiguity when multiple sessions exist", async () => {
const now = Date.now();
const store = await makeStore({
const store = await makeSessionStore({
"+1000": { updatedAt: now },
"+2000": { updatedAt: now - 10 },
});
@@ -122,7 +125,7 @@ describe("resolveHeartbeatRecipients", () => {
});
it("filters wildcard allowFrom when no sessions exist", async () => {
const store = await makeStore({});
const store = await makeSessionStore({});
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["*"],
@@ -137,7 +140,7 @@ describe("resolveHeartbeatRecipients", () => {
it("merges sessions and allowFrom when --all is set", async () => {
const now = Date.now();
const store = await makeStore({ "+1000": { updatedAt: now } });
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["+1999"],
@@ -153,12 +156,16 @@ describe("resolveHeartbeatRecipients", () => {
describe("runWebHeartbeatOnce", () => {
it("skips when heartbeat token returned", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi.fn();
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@@ -166,54 +173,58 @@ describe("runWebHeartbeatOnce", () => {
});
expect(resolver).toHaveBeenCalled();
expect(sender).not.toHaveBeenCalled();
await store.cleanup();
});
it("sends when alert text present", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false });
await store.cleanup();
});
it("falls back to most recent session when no to is provided", async () => {
const store = await makeSessionStore();
const storePath = store.storePath;
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
// Seed session store
const now = Date.now();
const store = {
const sessionEntries = {
"+1222": { sessionId: "s1", updatedAt: now - 1000 },
"+1333": { sessionId: "s2", updatedAt: now },
};
const storePath = resolveStorePath();
await fs.mkdir(resolveStorePath().replace("sessions.json", ""), {
recursive: true,
});
await fs.writeFile(storePath, JSON.stringify(store));
setLoadConfigMock({
inbound: {
allowFrom: ["+1999"],
reply: { mode: "command", session: {} },
},
});
await fs.writeFile(storePath, JSON.stringify(sessionEntries));
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1999"],
reply: { mode: "command", session: { store: storePath } },
},
},
to: "+1999",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false });
await store.cleanup();
});
it("does not refresh updatedAt when heartbeat is skipped", async () => {
@@ -353,14 +364,18 @@ describe("runWebHeartbeatOnce", () => {
});
it("sends overrideBody directly and skips resolver", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn();
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@@ -371,15 +386,20 @@ describe("runWebHeartbeatOnce", () => {
verbose: false,
});
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
it("dry-run overrideBody prints and skips send", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi.fn();
const resolver = vi.fn();
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@@ -389,6 +409,7 @@ describe("runWebHeartbeatOnce", () => {
});
expect(sender).not.toHaveBeenCalled();
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
});
@@ -504,6 +525,123 @@ describe("web auto-reply", () => {
);
});
it("skips reply heartbeat when requests are running", async () => {
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "warelay-heartbeat-queue-"),
);
const storePath = path.join(tmpDir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify({}));
const queueSpy = vi.spyOn(commandQueue, "getQueueSize").mockReturnValue(2);
const replyResolver = vi.fn();
const listenerFactory = vi.fn(async () => {
const onClose = new Promise<void>(() => {
// stay open until aborted
});
return { close: vi.fn(), onClose };
});
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: storePath } },
},
}));
const controller = new AbortController();
const run = monitorWebProvider(
false,
listenerFactory,
true,
replyResolver,
runtime,
controller.signal,
{ replyHeartbeatMinutes: 1, replyHeartbeatNow: true },
);
try {
await Promise.resolve();
controller.abort();
await run;
expect(replyResolver).not.toHaveBeenCalled();
} finally {
queueSpy.mockRestore();
}
});
it("batches inbound messages while queue is busy and preserves timestamps", async () => {
vi.useFakeTimers();
const originalMax = process.getMaxListeners();
process.setMaxListeners?.(1); // force low to confirm bump
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const resolver = vi.fn().mockResolvedValue({ text: "batched" });
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
// Queue starts busy, then frees after one polling tick.
let queueBusy = true;
const queueSpy = vi
.spyOn(commandQueue, "getQueueSize")
.mockImplementation(() => (queueBusy ? 1 : 0));
setLoadConfigMock(() => ({ inbound: { timestampPrefix: "UTC" } }));
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
// Two messages from the same sender with fixed timestamps
await capturedOnMessage?.({
body: "first",
from: "+1",
to: "+2",
id: "m1",
timestamp: 1735689600000, // Jan 1 2025 00:00:00 UTC
sendComposing,
reply,
sendMedia,
});
await capturedOnMessage?.({
body: "second",
from: "+1",
to: "+2",
id: "m2",
timestamp: 1735693200000, // Jan 1 2025 01:00:00 UTC
sendComposing,
reply,
sendMedia,
});
// Let the queued batch flush once the queue is free
queueBusy = false;
vi.advanceTimersByTime(200);
expect(resolver).toHaveBeenCalledTimes(1);
const args = resolver.mock.calls[0][0];
expect(args.Body).toContain("[Jan 1 00:00] [warelay] first");
expect(args.Body).toContain("[Jan 1 01:00] [warelay] second");
// Max listeners bumped to avoid warnings in multi-instance test runs
expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50);
queueSpy.mockRestore();
process.setMaxListeners?.(originalMax);
vi.useRealTimers();
});
it("falls back to text when media send fails", async () => {
const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
const reply = vi.fn().mockResolvedValue(undefined);
@@ -945,4 +1083,213 @@ describe("web auto-reply", () => {
expect(content).toContain('"module":"web-auto-reply"');
expect(content).toContain('"text":"auto"');
});
it("prefixes body with same-phone marker when from === to", async () => {
// Enable messagePrefix for same-phone mode testing
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: "[same-phone]",
responsePrefix: undefined,
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
const resolver = vi.fn().mockResolvedValue({ text: "reply" });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "hello",
from: "+1555",
to: "+1555", // Same phone!
id: "msg1",
sendComposing: vi.fn(),
reply: vi.fn(),
sendMedia: vi.fn(),
});
// The resolver should receive a prefixed body with the configured marker
const callArg = resolver.mock.calls[0]?.[0] as { Body?: string };
expect(callArg?.Body).toBeDefined();
expect(callArg?.Body).toBe("[same-phone] hello");
resetLoadConfigMock();
});
it("does not prefix body when from !== to", async () => {
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
const resolver = vi.fn().mockResolvedValue({ text: "reply" });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "hello",
from: "+1555",
to: "+2666", // Different phones
id: "msg1",
sendComposing: vi.fn(),
reply: vi.fn(),
sendMedia: vi.fn(),
});
// Body should NOT be prefixed
const callArg = resolver.mock.calls[0]?.[0] as { Body?: string };
expect(callArg?.Body).toBe("hello");
});
it("applies responsePrefix to regular replies", async () => {
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: "🦞",
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const reply = vi.fn();
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
const resolver = vi.fn().mockResolvedValue({ text: "hello there" });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "hi",
from: "+1555",
to: "+2666",
id: "msg1",
sendComposing: vi.fn(),
reply,
sendMedia: vi.fn(),
});
// Reply should have responsePrefix prepended
expect(reply).toHaveBeenCalledWith("🦞 hello there");
resetLoadConfigMock();
});
it("skips responsePrefix for HEARTBEAT_OK responses", async () => {
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: "🦞",
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const reply = vi.fn();
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
// Resolver returns exact HEARTBEAT_OK
const resolver = vi.fn().mockResolvedValue({ text: HEARTBEAT_TOKEN });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "test",
from: "+1555",
to: "+2666",
id: "msg1",
sendComposing: vi.fn(),
reply,
sendMedia: vi.fn(),
});
// HEARTBEAT_OK should NOT have prefix - warelay needs exact match
expect(reply).toHaveBeenCalledWith(HEARTBEAT_TOKEN);
resetLoadConfigMock();
});
it("does not double-prefix if responsePrefix already present", async () => {
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: "🦞",
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const reply = vi.fn();
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
// Resolver returns text that already has prefix
const resolver = vi.fn().mockResolvedValue({ text: "🦞 already prefixed" });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "test",
from: "+1555",
to: "+2666",
id: "msg1",
sendComposing: vi.fn(),
reply,
sendMedia: vi.fn(),
});
// Should not double-prefix
expect(reply).toHaveBeenCalledWith("🦞 already prefixed");
resetLoadConfigMock();
});
});
+348 -96
View File
@@ -9,12 +9,14 @@ import {
resolveStorePath,
saveSessionStore,
} from "../config/sessions.js";
import { danger, isVerbose, logVerbose, success } from "../globals.js";
import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { enqueueCommand, getQueueSize } from "../process/command-queue.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js";
import { loadWebMedia } from "./media.js";
import { sendMessageWeb } from "./outbound.js";
import {
@@ -27,6 +29,26 @@ import {
} from "./reconnect.js";
import { getWebAuthAgeMs } from "./session.js";
/**
* Send a message via IPC if relay is running, otherwise fall back to direct.
* This avoids Signal session corruption from multiple Baileys connections.
*/
async function sendWithIpcFallback(
to: string,
message: string,
opts: { verbose: boolean; mediaUrl?: string },
): Promise<{ messageId: string; toJid: string }> {
const ipcResult = await sendViaIpc(to, message, opts.mediaUrl);
if (ipcResult?.success && ipcResult.messageId) {
if (opts.verbose) {
console.log(info(`Sent via relay IPC (avoiding session corruption)`));
}
return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` };
}
// Fall back to direct send
return sendMessageWeb(to, message, opts);
}
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
type WebInboundMsg = Parameters<
typeof monitorWebInbox
@@ -94,7 +116,7 @@ export async function runWebHeartbeatOnce(opts: {
} = opts;
const _runtime = opts.runtime ?? defaultRuntime;
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
const sender = opts.sender ?? sendMessageWeb;
const sender = opts.sender ?? sendWithIpcFallback;
const runId = newConnectionId();
const heartbeatLogger = getChildLogger({
module: "web-heartbeat",
@@ -493,6 +515,13 @@ export async function monitorWebProvider(
}),
);
// Avoid noisy MaxListenersExceeded warnings in test environments where
// multiple relay instances may be constructed.
const currentMaxListeners = process.getMaxListeners?.() ?? 10;
if (process.setMaxListeners && currentMaxListeners < 50) {
process.setMaxListeners(50);
}
let sigintStop = false;
const handleSigint = () => {
sigintStop = true;
@@ -501,6 +530,10 @@ export async function monitorWebProvider(
let reconnectAttempts = 0;
// Track recently sent messages to prevent echo loops
const recentlySent = new Set<string>();
const MAX_RECENT_MESSAGES = 100;
while (true) {
if (stopRequested()) break;
@@ -508,96 +541,251 @@ export async function monitorWebProvider(
const startedAt = Date.now();
let heartbeat: NodeJS.Timeout | null = null;
let replyHeartbeatTimer: NodeJS.Timeout | null = null;
let watchdogTimer: NodeJS.Timeout | null = null;
let lastMessageAt: number | null = null;
let handledMessages = 0;
let lastInboundMsg: WebInboundMsg | null = null;
// Watchdog to detect stuck message processing (e.g., event emitter died)
// Should be significantly longer than heartbeatMinutes to avoid false positives
const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
// Batch inbound messages while command queue is busy, then send one
// combined prompt with per-message timestamps (inbound-only behavior).
type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout };
const pendingBatches = new Map<string, PendingBatch>();
const formatTimestamp = (ts?: number) => {
const tsCfg = cfg.inbound?.timestampPrefix;
const tsEnabled = tsCfg !== false; // default true
if (!tsEnabled) return "";
const tz = typeof tsCfg === "string" ? tsCfg : "UTC";
const date = ts ? new Date(ts) : new Date();
try {
return `[${date.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${date.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `;
} catch {
return `[${date.toISOString().slice(5, 16).replace("T", " ")}] `;
}
};
const buildLine = (msg: WebInboundMsg) => {
// Build message prefix: explicit config > default based on allowFrom
let messagePrefix = cfg.inbound?.messagePrefix;
if (messagePrefix === undefined) {
const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0;
messagePrefix = hasAllowFrom ? "" : "[warelay]";
}
const prefixStr = messagePrefix ? `${messagePrefix} ` : "";
return `${formatTimestamp(msg.timestamp)}${prefixStr}${msg.body}`;
};
const processBatch = async (from: string) => {
const batch = pendingBatches.get(from);
if (!batch || batch.messages.length === 0) return;
if (getQueueSize() > 0) {
// Wait until command queue is free to run the combined prompt.
batch.timer = setTimeout(() => void processBatch(from), 150);
return;
}
pendingBatches.delete(from);
const messages = batch.messages;
const latest = messages[messages.length - 1];
const combinedBody = messages.map(buildLine).join("\n");
// Echo detection uses combined body so we don't respond twice.
if (recentlySent.has(combinedBody)) {
logVerbose(`Skipping auto-reply: detected echo for combined batch`);
recentlySent.delete(combinedBody);
return;
}
const correlationId = latest.id ?? newConnectionId();
replyLogger.info(
{
connectionId,
correlationId,
from,
to: latest.to,
body: combinedBody,
mediaType: latest.mediaType ?? null,
mediaPath: latest.mediaPath ?? null,
batchSize: messages.length,
},
"inbound web message (batched)",
);
const tsDisplay = latest.timestamp
? new Date(latest.timestamp).toISOString()
: new Date().toISOString();
console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`);
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{
Body: combinedBody,
From: latest.from,
To: latest.to,
MessageSid: latest.id,
MediaPath: latest.mediaPath,
MediaUrl: latest.mediaUrl,
MediaType: latest.mediaType,
},
{
onReplyStart: latest.sendComposing,
},
),
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
logVerbose("Skipping auto-reply: no text/media returned from resolver");
return;
}
// Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match)
const responsePrefix = cfg.inbound?.responsePrefix;
if (
responsePrefix &&
replyResult.text &&
replyResult.text.trim() !== HEARTBEAT_TOKEN
) {
if (!replyResult.text.startsWith(responsePrefix)) {
replyResult.text = `${responsePrefix} ${replyResult.text}`;
}
}
try {
await deliverWebReply({
replyResult,
msg: latest,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
if (replyResult.text) {
recentlySent.add(replyResult.text);
recentlySent.add(combinedBody); // Prevent echo on the batch text itself
logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`,
);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`,
),
);
} else {
console.log(
success(
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`,
),
);
}
} catch (err) {
console.error(
danger(`Failed sending web auto-reply to ${from}: ${String(err)}`),
);
}
};
const enqueueBatch = async (msg: WebInboundMsg) => {
const bucket = pendingBatches.get(msg.from) ?? { messages: [] };
bucket.messages.push(msg);
pendingBatches.set(msg.from, bucket);
// Process immediately when queue is free; otherwise wait until it drains.
if (getQueueSize() === 0) {
await processBatch(msg.from);
} else {
bucket.timer =
bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150);
}
};
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
handledMessages += 1;
lastMessageAt = Date.now();
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
const correlationId = msg.id ?? newConnectionId();
replyLogger.info(
{
connectionId,
correlationId,
from: msg.from,
to: msg.to,
body: msg.body,
mediaType: msg.mediaType ?? null,
mediaPath: msg.mediaPath ?? null,
},
"inbound web message",
);
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
lastInboundMsg = msg;
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: msg.body,
From: msg.from,
To: msg.to,
MessageSid: msg.id,
MediaPath: msg.mediaPath,
MediaUrl: msg.mediaUrl,
MediaType: msg.mediaType,
},
{
onReplyStart: msg.sendComposing,
},
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
// Same-phone mode logging retained
if (msg.from === msg.to) {
logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`);
}
// Skip if this is a message we just sent (echo detection)
if (recentlySent.has(msg.body)) {
console.log(`⏭️ Skipping echo: detected recently sent message`);
logVerbose(
"Skipping auto-reply: no text/media returned from resolver",
`Skipping auto-reply: detected echo (message matches recently sent text)`,
);
recentlySent.delete(msg.body);
return;
}
try {
await deliverWebReply({
replyResult,
msg,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`,
),
);
} else {
console.log(
success(
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`,
),
);
}
} catch (err) {
console.error(
danger(
`Failed sending web auto-reply to ${msg.from}: ${String(err)}`,
),
);
}
return enqueueBatch(msg);
},
});
// Start IPC server so `warelay send` can use this connection
// instead of creating a new one (which would corrupt Signal session)
if ("sendMessage" in listener && "sendComposingTo" in listener) {
startIpcServer(async (to, message, mediaUrl) => {
let mediaBuffer: Buffer | undefined;
let mediaType: string | undefined;
if (mediaUrl) {
const media = await loadWebMedia(mediaUrl);
mediaBuffer = media.buffer;
mediaType = media.contentType;
}
const result = await listener.sendMessage(
to,
message,
mediaBuffer,
mediaType,
);
// Add to echo detection so we don't process our own message
if (message) {
recentlySent.add(message);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
logInfo(
`📤 IPC send to ${to}: ${message.substring(0, 50)}...`,
runtime,
);
// Show typing indicator after send so user knows more may be coming
try {
await listener.sendComposingTo(to);
} catch {
// Ignore typing indicator errors - not critical
}
return result;
});
}
const closeListener = async () => {
stopIpcServer();
if (heartbeat) clearInterval(heartbeat);
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
if (watchdogTimer) clearInterval(watchdogTimer);
try {
await listener.close();
} catch (err) {
@@ -608,21 +796,69 @@ export async function monitorWebProvider(
if (keepAlive) {
heartbeat = setInterval(() => {
const authAgeMs = getWebAuthAgeMs();
heartbeatLogger.info(
{
connectionId,
reconnectAttempts,
messagesHandled: handledMessages,
lastMessageAt,
authAgeMs,
uptimeMs: Date.now() - startedAt,
},
"web relay heartbeat",
);
const minutesSinceLastMessage = lastMessageAt
? Math.floor((Date.now() - lastMessageAt) / 60000)
: null;
const logData = {
connectionId,
reconnectAttempts,
messagesHandled: handledMessages,
lastMessageAt,
authAgeMs,
uptimeMs: Date.now() - startedAt,
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
? { minutesSinceLastMessage }
: {}),
};
// Warn if no messages in 30+ minutes
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
heartbeatLogger.warn(
logData,
"⚠️ web relay heartbeat - no messages in 30+ minutes",
);
} else {
heartbeatLogger.info(logData, "web relay heartbeat");
}
}, heartbeatSeconds * 1000);
// Watchdog: Auto-restart if no messages received for MESSAGE_TIMEOUT_MS
watchdogTimer = setInterval(() => {
if (lastMessageAt) {
const timeSinceLastMessage = Date.now() - lastMessageAt;
if (timeSinceLastMessage > MESSAGE_TIMEOUT_MS) {
const minutesSinceLastMessage = Math.floor(
timeSinceLastMessage / 60000,
);
heartbeatLogger.warn(
{
connectionId,
minutesSinceLastMessage,
lastMessageAt: new Date(lastMessageAt),
messagesHandled: handledMessages,
},
"Message timeout detected - forcing reconnect",
);
console.error(
`⚠️ No messages received in ${minutesSinceLastMessage}m - restarting connection`,
);
closeListener(); // Trigger reconnect
}
}
}, WATCHDOG_CHECK_MS);
}
const runReplyHeartbeat = async () => {
const queued = getQueueSize();
if (queued > 0) {
heartbeatLogger.info(
{ connectionId, reason: "requests-in-flight", queued },
"reply heartbeat skipped",
);
console.log(success("heartbeat: skipped (requests in flight)"));
return;
}
if (!replyHeartbeatMinutes) return;
const tickStart = Date.now();
if (!lastInboundMsg) {
@@ -695,19 +931,24 @@ export async function monitorWebProvider(
"reply heartbeat start",
);
}
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: HEARTBEAT_PROMPT,
From: lastInboundMsg.from,
To: lastInboundMsg.to,
MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: lastInboundMsg.sendComposing,
},
const hbFrom = lastInboundMsg.from;
const hbTo = lastInboundMsg.to;
const hbComposing = lastInboundMsg.sendComposing;
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{
Body: HEARTBEAT_PROMPT,
From: hbFrom,
To: hbTo,
MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: hbComposing,
},
),
);
if (
@@ -746,9 +987,20 @@ export async function monitorWebProvider(
return;
}
// Apply response prefix if configured (same as regular messages)
let finalText = stripped.text;
const responsePrefix = cfg.inbound?.responsePrefix;
if (
responsePrefix &&
finalText &&
!finalText.startsWith(responsePrefix)
) {
finalText = `${responsePrefix} ${finalText}`;
}
const cleanedReply: ReplyPayload = {
...replyResult,
text: stripped.text,
text: finalText,
};
await deliverWebReply({
+11
View File
@@ -5,6 +5,17 @@ import path from "node:path";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
vi.mock("../config/config.js", () => ({
loadConfig: vi.fn().mockReturnValue({
inbound: {
allowFrom: ["*"], // Allow all in tests
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
}),
}));
const HOME = path.join(
os.tmpdir(),
`warelay-inbound-media-${crypto.randomUUID()}`,
+75 -2
View File
@@ -8,10 +8,11 @@ import {
downloadMediaMessage,
} from "@whiskeysockets/baileys";
import { loadConfig } from "../config/config.js";
import { isVerbose, logVerbose } from "../globals.js";
import { getChildLogger } from "../logging.js";
import { saveMediaBuffer } from "../media/store.js";
import { jidToE164 } from "../utils.js";
import { jidToE164, normalizeE164 } from "../utils.js";
import {
createWaSocket,
getStatusCode,
@@ -70,7 +71,7 @@ export async function monitorWebInbox(options: {
// De-dupe on message id; Baileys can emit retries.
if (id && seen.has(id)) continue;
if (id) seen.add(id);
if (msg.key?.fromMe) continue;
// Note: not filtering fromMe here - echo detection happens in auto-reply layer
const remoteJid = msg.key?.remoteJid;
if (!remoteJid) continue;
// Ignore status/broadcast traffic; we only care about direct chats.
@@ -94,6 +95,25 @@ export async function monitorWebInbox(options: {
}
const from = jidToE164(remoteJid);
if (!from) continue;
// Filter unauthorized senders early to prevent wasted processing
// and potential session corruption from Bad MAC errors
const cfg = loadConfig();
const allowFrom = cfg.inbound?.allowFrom;
const isSamePhone = from === selfE164;
if (!isSamePhone && Array.isArray(allowFrom) && allowFrom.length > 0) {
if (
!allowFrom.includes("*") &&
!allowFrom.map(normalizeE164).includes(from)
) {
logVerbose(
`Blocked unauthorized sender ${from} (not in allowFrom list)`,
);
continue; // Skip processing entirely
}
}
let body = extractText(msg.message ?? undefined);
if (!body) {
body = extractMediaPlaceholder(msg.message ?? undefined);
@@ -197,6 +217,59 @@ export async function monitorWebInbox(options: {
}
},
onClose,
/**
* Send a message through this connection's socket.
* Used by IPC to avoid creating new connections.
*/
sendMessage: async (
to: string,
text: string,
mediaBuffer?: Buffer,
mediaType?: string,
): Promise<{ messageId: string }> => {
const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`;
let payload: AnyMessageContent;
if (mediaBuffer && mediaType) {
if (mediaType.startsWith("image/")) {
payload = {
image: mediaBuffer,
caption: text || undefined,
mimetype: mediaType,
};
} else if (mediaType.startsWith("audio/")) {
payload = {
audio: mediaBuffer,
ptt: true,
mimetype: mediaType,
};
} else if (mediaType.startsWith("video/")) {
payload = {
video: mediaBuffer,
caption: text || undefined,
mimetype: mediaType,
};
} else {
payload = {
document: mediaBuffer,
fileName: "file",
caption: text || undefined,
mimetype: mediaType,
};
}
} else {
payload = { text };
}
const result = await sock.sendMessage(jid, payload);
return { messageId: result?.key?.id ?? "unknown" };
},
/**
* Send typing indicator ("composing") to a chat.
* Used after IPC send to show more messages are coming.
*/
sendComposingTo: async (to: string): Promise<void> => {
const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`;
await sock.sendPresenceUpdate("composing", jid);
},
} as const;
}
+225
View File
@@ -0,0 +1,225 @@
/**
* IPC server for warelay relay.
*
* When the relay is running, it starts a Unix socket server that allows
* `warelay send` and `warelay heartbeat` to send messages through the
* existing WhatsApp connection instead of creating new ones.
*
* This prevents Signal session ratchet corruption from multiple connections.
*/
import fs from "node:fs";
import net from "node:net";
import os from "node:os";
import path from "node:path";
import { getChildLogger } from "../logging.js";
const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock");
export interface IpcSendRequest {
type: "send";
to: string;
message: string;
mediaUrl?: string;
}
export interface IpcSendResponse {
success: boolean;
messageId?: string;
error?: string;
}
type SendHandler = (
to: string,
message: string,
mediaUrl?: string,
) => Promise<{ messageId: string }>;
let server: net.Server | null = null;
/**
* Start the IPC server. Called by the relay when it starts.
*/
export function startIpcServer(sendHandler: SendHandler): void {
const logger = getChildLogger({ module: "ipc-server" });
// Clean up stale socket file
try {
fs.unlinkSync(SOCKET_PATH);
} catch {
// Ignore if doesn't exist
}
server = net.createServer((conn) => {
let buffer = "";
conn.on("data", async (data) => {
buffer += data.toString();
// Try to parse complete JSON messages (newline-delimited)
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // Keep incomplete line in buffer
for (const line of lines) {
if (!line.trim()) continue;
try {
const request = JSON.parse(line) as IpcSendRequest;
if (request.type === "send") {
try {
const result = await sendHandler(
request.to,
request.message,
request.mediaUrl,
);
const response: IpcSendResponse = {
success: true,
messageId: result.messageId,
};
conn.write(`${JSON.stringify(response)}\n`);
} catch (err) {
const response: IpcSendResponse = {
success: false,
error: String(err),
};
conn.write(`${JSON.stringify(response)}\n`);
}
}
} catch (err) {
logger.warn({ error: String(err) }, "failed to parse IPC request");
const response: IpcSendResponse = {
success: false,
error: "Invalid request format",
};
conn.write(`${JSON.stringify(response)}\n`);
}
}
});
conn.on("error", (err) => {
logger.debug({ error: String(err) }, "IPC connection error");
});
});
server.listen(SOCKET_PATH, () => {
logger.info({ socketPath: SOCKET_PATH }, "IPC server started");
// Make socket accessible
fs.chmodSync(SOCKET_PATH, 0o600);
});
server.on("error", (err) => {
logger.error({ error: String(err) }, "IPC server error");
});
}
/**
* Stop the IPC server. Called when relay shuts down.
*/
export function stopIpcServer(): void {
if (server) {
server.close();
server = null;
}
try {
fs.unlinkSync(SOCKET_PATH);
} catch {
// Ignore
}
}
/**
* Check if the relay IPC server is running.
*/
export function isRelayRunning(): boolean {
try {
fs.accessSync(SOCKET_PATH);
return true;
} catch {
return false;
}
}
/**
* Send a message through the running relay's IPC.
* Returns null if relay is not running.
*/
export async function sendViaIpc(
to: string,
message: string,
mediaUrl?: string,
): Promise<IpcSendResponse | null> {
if (!isRelayRunning()) {
return null;
}
return new Promise((resolve) => {
const client = net.createConnection(SOCKET_PATH);
let buffer = "";
let resolved = false;
const timeout = setTimeout(() => {
if (!resolved) {
resolved = true;
client.destroy();
resolve({ success: false, error: "IPC timeout" });
}
}, 30000); // 30 second timeout
client.on("connect", () => {
const request: IpcSendRequest = {
type: "send",
to,
message,
mediaUrl,
};
client.write(`${JSON.stringify(request)}\n`);
});
client.on("data", (data) => {
buffer += data.toString();
const lines = buffer.split("\n");
for (const line of lines) {
if (!line.trim()) continue;
try {
const response = JSON.parse(line) as IpcSendResponse;
if (!resolved) {
resolved = true;
clearTimeout(timeout);
client.end();
resolve(response);
}
return;
} catch {
// Keep reading
}
}
});
client.on("error", (_err) => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
// Socket exists but can't connect - relay might have crashed
resolve(null);
}
});
client.on("close", () => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
resolve({ success: false, error: "Connection closed" });
}
});
});
}
/**
* Get the IPC socket path for debugging/status.
*/
export function getSocketPath(): string {
return SOCKET_PATH;
}
+159
View File
@@ -9,6 +9,19 @@ vi.mock("../media/store.js", () => ({
}),
}));
const mockLoadConfig = vi.fn().mockReturnValue({
inbound: {
allowFrom: ["*"], // Allow all in tests
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
vi.mock("../config/config.js", () => ({
loadConfig: () => mockLoadConfig(),
}));
vi.mock("./session.js", () => {
const { EventEmitter } = require("node:events");
const ev = new EventEmitter();
@@ -216,4 +229,150 @@ describe("web monitor inbox", () => {
]);
await listener.close();
});
it("blocks messages from unauthorized senders not in allowFrom", async () => {
// Test for auto-recovery fix: early allowFrom filtering prevents Bad MAC errors
// from unauthorized senders corrupting sessions
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["+111"], // Only allow +111
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
// Message from unauthorized sender +999 (not in allowFrom)
const upsert = {
type: "notify",
messages: [
{
key: {
id: "unauth1",
fromMe: false,
remoteJid: "999@s.whatsapp.net",
},
message: { conversation: "unauthorized message" },
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
// Should NOT call onMessage for unauthorized senders
expect(onMessage).not.toHaveBeenCalled();
// Reset mock for other tests
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
await listener.close();
});
it("allows messages from senders in allowFrom list", async () => {
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["+111", "+999"], // Allow +999
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
const upsert = {
type: "notify",
messages: [
{
key: { id: "auth1", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "authorized message" },
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
// Should call onMessage for authorized senders
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ body: "authorized message", from: "+999" }),
);
// Reset mock for other tests
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
await listener.close();
});
it("allows same-phone messages even if not in allowFrom", async () => {
// Same-phone mode: when from === selfJid, should always be allowed
// This allows users to message themselves even with restrictive allowFrom
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["+111"], // Only allow +111, but self is +123
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
// Message from self (sock.user.id is "123@s.whatsapp.net" in mock)
const upsert = {
type: "notify",
messages: [
{
key: { id: "self1", fromMe: false, remoteJid: "123@s.whatsapp.net" },
message: { conversation: "self message" },
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
// Should allow self-messages even if not in allowFrom
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ body: "self message", from: "+123" }),
);
// Reset mock for other tests
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
await listener.close();
});
});
+24 -5
View File
@@ -3,18 +3,37 @@ import { vi } from "vitest";
import type { MockBaileysSocket } from "../../test/mocks/baileys.js";
import { createMockBaileys } from "../../test/mocks/baileys.js";
let loadConfigMock: () => unknown = () => ({});
// Use globalThis to store the mock config so it survives vi.mock hoisting
const CONFIG_KEY = Symbol.for("warelay:testConfigMock");
const DEFAULT_CONFIG = {
inbound: {
allowFrom: ["*"], // Allow all in tests by default
messagePrefix: undefined, // No message prefix in tests
responsePrefix: undefined, // No response prefix in tests
timestampPrefix: false, // No timestamp in tests
},
};
export function setLoadConfigMock(fn: () => unknown) {
loadConfigMock = fn;
// Initialize default if not set
if (!(globalThis as Record<symbol, unknown>)[CONFIG_KEY]) {
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] = () => DEFAULT_CONFIG;
}
export function setLoadConfigMock(fn: (() => unknown) | unknown) {
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] =
typeof fn === "function" ? fn : () => fn;
}
export function resetLoadConfigMock() {
loadConfigMock = () => ({});
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] = () => DEFAULT_CONFIG;
}
vi.mock("../config/config.js", () => ({
loadConfig: () => loadConfigMock(),
loadConfig: () => {
const getter = (globalThis as Record<symbol, unknown>)[CONFIG_KEY];
if (typeof getter === "function") return getter();
return DEFAULT_CONFIG;
},
}));
vi.mock("../media/store.js", () => ({