Files
openclaw/scripts/claw-broker/broker.mjs
Fedor 2cbe4e2808
Some checks failed
Stale / stale (push) Has been cancelled
Stale / lock-closed-issues (push) Has been cancelled
feat: add claw approval MVP with privileged broker
Implement Postgres-backed claw approval flow and integrate gateway methods for create/list/get/approve/reject/execute/audit. Add a minimal systemd-run privileged broker with bearer auth, strict scope and exact-command validation, dangerous-shell blocking, atomic once-grant consumption, and execution audit updates.
2026-03-13 12:41:23 +00:00

438 lines
12 KiB
JavaScript

#!/usr/bin/env node
import { spawn } from "node:child_process";
import { randomUUID } from "node:crypto";
import http from "node:http";
import pg from "pg";
const { Pool } = pg;
const MAX_SUMMARY_CHARS = Number(process.env.CLAW_BROKER_MAX_SUMMARY_CHARS ?? "2000");
const CMD_TIMEOUT_MS = Number(process.env.CLAW_BROKER_CMD_TIMEOUT_MS ?? "120000");
const BIND_HOST = process.env.CLAW_BROKER_BIND ?? "127.0.0.1";
const BIND_PORT = Number(process.env.CLAW_BROKER_PORT ?? "8787");
const REQUIRED_TOKEN = (process.env.CLAW_BROKER_TOKEN ?? "").trim();
function env(name, fallback = undefined) {
return process.env[`CLAW_${name}`] ?? process.env[name] ?? fallback;
}
function requiredEnv(name) {
const value = env(name, "");
if (!value || !String(value).trim()) {
throw new Error(`missing env: ${name} (or CLAW_${name})`);
}
return String(value);
}
const pool = new Pool({
host: requiredEnv("PGHOST"),
port: Number(env("PGPORT", "5432")),
user: requiredEnv("PGUSER"),
password: requiredEnv("PGPASSWORD"),
database: requiredEnv("PGDATABASE"),
max: 10,
});
if (!REQUIRED_TOKEN) {
throw new Error("missing CLAW_BROKER_TOKEN");
}
function json(res, code, body) {
const payload = JSON.stringify(body);
res.writeHead(code, {
"content-type": "application/json; charset=utf-8",
"content-length": Buffer.byteLength(payload),
});
res.end(payload);
}
function normalizeCommand(input) {
return String(input).trim().replace(/\s+/g, " ");
}
function hasDangerousShellConstruct(command) {
const source = String(command).toLowerCase();
const checks = [
/\bbash\s+-c\b/,
/\bsh\s+-c\b/,
/\bsudo\s+su\b/,
/\bsudo\s+-i\b/,
/&&/,
/\|\|/,
/;/,
/\|/,
/>/,
/</,
/\$\(/,
/`/,
/<<[-\w]*/,
];
return checks.some((r) => r.test(source));
}
function summarize(text) {
const value = String(text ?? "");
return value.length <= MAX_SUMMARY_CHARS ? value : `${value.slice(0, MAX_SUMMARY_CHARS)}`;
}
async function insertAudit(client, args) {
await client.query(
`INSERT INTO claw_audit_events (
event_type, request_id, grant_id, execution_id,
actor_type, actor_id, target_host, target_user,
command_snapshot, status, exit_code, stdout_summary, stderr_summary, metadata
) VALUES (
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14::jsonb
)`,
[
args.eventType,
args.requestId ?? null,
args.grantId ?? null,
args.executionId ?? null,
args.actorType,
args.actorId,
args.targetHost ?? null,
args.targetUser ?? null,
args.commandSnapshot ?? null,
args.status ?? null,
args.exitCode ?? null,
args.stdoutSummary ?? null,
args.stderrSummary ?? null,
JSON.stringify(args.metadata ?? {}),
],
);
}
function requireString(body, key) {
const value = body?.[key];
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`${key} is required`);
}
return value.trim();
}
async function verifyAndMarkStarted(body) {
const executionId = requireString(body, "executionId");
const approvalRequestId = requireString(body, "approvalRequestId");
const approvalGrantId = requireString(body, "approvalGrantId");
const exactCommand = requireString(body, "exactCommand");
const targetHost = requireString(body, "targetHost");
const targetUser = requireString(body, "targetUser");
const requestedBy = requireString(body, "requestedBy");
const channel = requireString(body, "channel");
const chatId = requireString(body, "chatId");
const humanUserId = requireString(body, "humanUserId");
const sessionId = requireString(body, "sessionId");
if (hasDangerousShellConstruct(exactCommand)) {
throw new Error("dangerous shell policy violation");
}
const client = await pool.connect();
try {
await client.query("BEGIN");
const reqRes = await client.query(
`SELECT * FROM claw_approval_requests WHERE id = $1 FOR UPDATE`,
[approvalRequestId],
);
if (reqRes.rowCount === 0) {
throw new Error("approval request not found");
}
const request = reqRes.rows[0];
if (!["approved_once", "approved_always"].includes(String(request.status))) {
throw new Error(`request status does not allow execution: ${request.status}`);
}
const grantRes = await client.query(
`SELECT * FROM claw_approval_grants WHERE id = $1 AND request_id = $2 FOR UPDATE`,
[approvalGrantId, approvalRequestId],
);
if (grantRes.rowCount === 0) {
throw new Error("approval grant not found");
}
const grant = grantRes.rows[0];
const dbExact = String(request.exact_command);
if (normalizeCommand(dbExact) !== normalizeCommand(exactCommand)) {
throw new Error("exact command mismatch");
}
if (normalizeCommand(String(grant.exact_command)) !== normalizeCommand(exactCommand)) {
throw new Error("grant command mismatch");
}
const scopeChecks = [
[String(request.target_host), targetHost, "targetHost"],
[String(request.target_user), targetUser, "targetUser"],
[String(request.channel), channel, "channel"],
[String(request.chat_id), chatId, "chatId"],
[String(request.human_user_id), humanUserId, "humanUserId"],
[String(request.session_id), sessionId, "sessionId"],
[String(grant.target_host), targetHost, "grant.targetHost"],
[String(grant.target_user), targetUser, "grant.targetUser"],
[String(grant.channel), channel, "grant.channel"],
[String(grant.chat_id), chatId, "grant.chatId"],
[String(grant.human_user_id), humanUserId, "grant.humanUserId"],
[String(grant.session_id), sessionId, "grant.sessionId"],
];
for (const [db, incoming, label] of scopeChecks) {
if (db !== incoming) {
throw new Error(`scope mismatch: ${label}`);
}
}
if (hasDangerousShellConstruct(String(request.exact_command))) {
throw new Error("dangerous shell policy violation (request)");
}
if (String(grant.grant_type) === "once") {
const consumeRes = await client.query(
`UPDATE claw_approval_grants
SET used_at = now()
WHERE id = $1
AND grant_type = 'once'
AND used_at IS NULL
AND revoked_at IS NULL
AND expires_at > now()
RETURNING id`,
[approvalGrantId],
);
if (consumeRes.rowCount === 0) {
throw new Error("once grant expired/revoked/already used");
}
await insertAudit(client, {
eventType: "grant_consumed",
actorType: "broker",
actorId: requestedBy,
requestId: approvalRequestId,
grantId: approvalGrantId,
executionId,
targetHost,
targetUser,
commandSnapshot: exactCommand,
status: "grant_consumed",
});
}
await client.query(
`UPDATE claw_approval_requests SET execution_id = $2, updated_at = now() WHERE id = $1`,
[approvalRequestId, executionId],
);
await insertAudit(client, {
eventType: "execution_started",
actorType: "broker",
actorId: requestedBy,
requestId: approvalRequestId,
grantId: approvalGrantId,
executionId,
targetHost,
targetUser,
commandSnapshot: exactCommand,
status: "execution_started",
});
await client.query("COMMIT");
return {
executionId,
approvalRequestId,
approvalGrantId,
exactCommand,
targetHost,
targetUser,
requestedBy,
cwd: request.cwd ? String(request.cwd) : undefined,
};
} catch (err) {
await client.query("ROLLBACK");
throw err;
} finally {
client.release();
}
}
async function runCommand(command, cwd) {
return await new Promise((resolve) => {
const child = spawn("bash", ["-lc", command], {
cwd: cwd || undefined,
env: process.env,
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
let timedOut = false;
const timer = setTimeout(() => {
timedOut = true;
child.kill("SIGKILL");
}, CMD_TIMEOUT_MS);
child.stdout.on("data", (chunk) => {
stdout += chunk.toString("utf8");
});
child.stderr.on("data", (chunk) => {
stderr += chunk.toString("utf8");
});
child.on("close", (code) => {
clearTimeout(timer);
resolve({
exitCode: timedOut ? 124 : Number(code ?? 1),
stdout,
stderr: timedOut ? `${stderr}\nCommand timed out.` : stderr,
});
});
});
}
async function finalizeExecution({
executionId,
approvalRequestId,
approvalGrantId,
exactCommand,
targetHost,
targetUser,
requestedBy,
ok,
exitCode,
stdoutSummary,
stderrSummary,
}) {
const client = await pool.connect();
try {
await client.query("BEGIN");
const finalStatus = ok ? "executed" : "execution_failed";
const lastError = ok ? null : stderrSummary;
await client.query(
`UPDATE claw_approval_requests
SET status = $2::claw_approval_status,
executed_at = now(),
updated_at = now(),
last_error = $3
WHERE id = $1`,
[approvalRequestId, finalStatus, lastError],
);
await insertAudit(client, {
eventType: ok ? "execution_succeeded" : "execution_failed",
actorType: "broker",
actorId: requestedBy,
requestId: approvalRequestId,
grantId: approvalGrantId,
executionId,
targetHost,
targetUser,
commandSnapshot: exactCommand,
status: ok ? "executed" : "execution_failed",
exitCode,
stdoutSummary,
stderrSummary,
});
await client.query("COMMIT");
} catch (err) {
await client.query("ROLLBACK");
throw err;
} finally {
client.release();
}
}
async function handleExecute(body) {
const startedAt = new Date().toISOString();
const validated = await verifyAndMarkStarted(body);
const run = await runCommand(validated.exactCommand, validated.cwd);
const ok = run.exitCode === 0;
const stdoutSummary = summarize(run.stdout);
const stderrSummary = summarize(run.stderr);
await finalizeExecution({
executionId: validated.executionId,
approvalRequestId: validated.approvalRequestId,
approvalGrantId: validated.approvalGrantId,
exactCommand: validated.exactCommand,
targetHost: validated.targetHost,
targetUser: validated.targetUser,
requestedBy: validated.requestedBy,
ok,
exitCode: run.exitCode,
stdoutSummary,
stderrSummary,
});
const finishedAt = new Date().toISOString();
return {
ok,
executionId: validated.executionId,
status: ok ? "executed" : "execution_failed",
exitCode: run.exitCode,
stdoutSummary,
stderrSummary,
startedAt,
finishedAt,
};
}
function getBearerToken(req) {
const raw = String(req.headers.authorization ?? "");
if (!raw.toLowerCase().startsWith("bearer ")) {
return "";
}
return raw.slice(7).trim();
}
const server = http.createServer((req, res) => {
const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "127.0.0.1"}`);
if (req.method === "GET" && url.pathname === "/healthz") {
json(res, 200, { ok: true, service: "claw-broker" });
return;
}
if (req.method !== "POST" || url.pathname !== "/v1/execute") {
json(res, 404, { ok: false, error: "not_found" });
return;
}
const token = getBearerToken(req);
if (!token || token !== REQUIRED_TOKEN) {
json(res, 401, { ok: false, error: "unauthorized" });
return;
}
let raw = "";
req.on("data", (chunk) => {
raw += chunk.toString("utf8");
if (raw.length > 1_000_000) {
req.destroy();
}
});
req.on("end", async () => {
const fallbackExecutionId = randomUUID();
try {
const body = raw.length ? JSON.parse(raw) : {};
if (!body.executionId) {
body.executionId = fallbackExecutionId;
}
const result = await handleExecute(body);
json(res, 200, result);
} catch (err) {
console.error("[claw-broker] execute error:", err);
const nowIso = new Date().toISOString();
json(res, 400, {
ok: false,
executionId: fallbackExecutionId,
status: "execution_failed",
exitCode: 1,
stdoutSummary: "",
stderrSummary: String(err),
startedAt: nowIso,
finishedAt: nowIso,
});
}
});
});
server.listen(BIND_PORT, BIND_HOST, () => {
process.stdout.write(`claw-broker listening on http://${BIND_HOST}:${BIND_PORT}\n`);
});