Files
openclaw/src/gateway/server-cron.ts
Josh Avant 806803b7ef feat(secrets): expand SecretRef coverage across user-supplied credentials (#29580)
* feat(secrets): expand secret target coverage and gateway tooling

* docs(secrets): align gateway and CLI secret docs

* chore(protocol): regenerate swift gateway models for secrets methods

* fix(config): restore talk apiKey fallback and stabilize runner test

* ci(windows): reduce test worker count for shard stability

* ci(windows): raise node heap for test shard stability

* test(feishu): make proxy env precedence assertion windows-safe

* fix(gateway): resolve auth password SecretInput refs for clients

* fix(gateway): resolve remote SecretInput credentials for clients

* fix(secrets): skip inactive refs in command snapshot assignments

* fix(secrets): scope gateway.remote refs to effective auth surfaces

* fix(secrets): ignore memory defaults when enabled agents disable search

* fix(secrets): honor Google Chat serviceAccountRef inheritance

* fix(secrets): address tsgo errors in command and gateway collectors

* fix(secrets): avoid auth-store load in providers-only configure

* fix(gateway): defer local password ref resolution by precedence

* fix(secrets): gate telegram webhook secret refs by webhook mode

* fix(secrets): gate slack signing secret refs to http mode

* fix(secrets): skip telegram botToken refs when tokenFile is set

* fix(secrets): gate discord pluralkit refs by enabled flag

* fix(secrets): gate discord voice tts refs by voice enabled

* test(secrets): make runtime fixture modes explicit

* fix(cli): resolve local qr password secret refs

* fix(cli): fail when gateway leaves command refs unresolved

* fix(gateway): fail when local password SecretRef is unresolved

* fix(gateway): fail when required remote SecretRefs are unresolved

* fix(gateway): resolve local password refs only when password can win

* fix(cli): skip local password SecretRef resolution on qr token override

* test(gateway): cast SecretRef fixtures to OpenClawConfig

* test(secrets): activate mode-gated targets in runtime coverage fixture

* fix(cron): support SecretInput webhook tokens safely

* fix(bluebubbles): support SecretInput passwords across config paths

* fix(msteams): make appPassword SecretInput-safe in onboarding/token paths

* fix(bluebubbles): align SecretInput schema helper typing

* fix(cli): clarify secrets.resolve version-skew errors

* refactor(secrets): return structured inactive paths from secrets.resolve

* refactor(gateway): type onboarding secret writes as SecretInput

* chore(protocol): regenerate swift models for secrets.resolve

* feat(secrets): expand extension credential secretref support

* fix(secrets): gate web-search refs by active provider

* fix(onboarding): detect SecretRef credentials in extension status

* fix(onboarding): allow keeping existing ref in secret prompt

* fix(onboarding): resolve gateway password SecretRefs for probe and tui

* fix(onboarding): honor secret-input-mode for local gateway auth

* fix(acp): resolve gateway SecretInput credentials

* fix(secrets): gate gateway.remote refs to remote surfaces

* test(secrets): cover pattern matching and inactive array refs

* docs(secrets): clarify secrets.resolve and remote active surfaces

* fix(bluebubbles): keep existing SecretRef during onboarding

* fix(tests): resolve CI type errors in new SecretRef coverage

* fix(extensions): replace raw fetch with SSRF-guarded fetch

* test(secrets): mark gateway remote targets active in runtime coverage

* test(infra): normalize home-prefix expectation across platforms

* fix(cli): only resolve local qr password refs in password mode

* test(cli): cover local qr token mode with unresolved password ref

* docs(cli): clarify local qr password ref resolution behavior

* refactor(extensions): reuse sdk SecretInput helpers

* fix(wizard): resolve onboarding env-template secrets before plaintext

* fix(cli): surface secrets.resolve diagnostics in memory and qr

* test(secrets): repair post-rebase runtime and fixtures

* fix(gateway): skip remote password ref resolution when token wins

* fix(secrets): treat tailscale remote gateway refs as active

* fix(gateway): allow remote password fallback when token ref is unresolved

* fix(gateway): ignore stale local password refs for none and trusted-proxy

* fix(gateway): skip remote secret ref resolution on local call paths

* test(cli): cover qr remote tailscale secret ref resolution

* fix(secrets): align gateway password active-surface with auth inference

* fix(cli): resolve inferred local gateway password refs in qr

* fix(gateway): prefer resolvable remote password over token ref pre-resolution

* test(gateway): cover none and trusted-proxy stale password refs

* docs(secrets): sync qr and gateway active-surface behavior

* fix: restore stability blockers from pre-release audit

* Secrets: fix collector/runtime precedence contradictions

* docs: align secrets and web credential docs

* fix(rebase): resolve integration regressions after main rebase

* fix(node-host): resolve gateway secret refs for auth

* fix(secrets): harden secretinput runtime readers

* gateway: skip inactive auth secretref resolution

* cli: avoid gateway preflight for inactive secret refs

* extensions: allow unresolved refs in onboarding status

* tests: fix qr-cli module mock hoist ordering

* Security: align audit checks with SecretInput resolution

* Gateway: resolve local-mode remote fallback secret refs

* Node host: avoid resolving inactive password secret refs

* Secrets runtime: mark Slack appToken inactive for HTTP mode

* secrets: keep inactive gateway remote refs non-blocking

* cli: include agent memory secret targets in runtime resolution

* docs(secrets): sync docs with active-surface and web search behavior

* fix(secrets): keep telegram top-level token refs active for blank account tokens

* fix(daemon): resolve gateway password secret refs for probe auth

* fix(secrets): skip IRC NickServ ref resolution when NickServ is disabled

* fix(secrets): align token inheritance and exec timeout defaults

* docs(secrets): clarify active-surface notes in cli docs

* cli: require secrets.resolve gateway capability

* gateway: log auth secret surface diagnostics

* secrets: remove dead provider resolver module

* fix(secrets): restore gateway auth precedence and fallback resolution

* fix(tests): align plugin runtime mock typings

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
2026-03-03 02:58:20 +00:00

506 lines
17 KiB
TypeScript

import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import type { CliDeps } from "../cli/deps.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import { loadConfig } from "../config/config.js";
import {
canonicalizeMainSessionAlias,
resolveAgentIdFromSessionKey,
resolveAgentMainSessionKey,
} from "../config/sessions.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { resolveFailureDestination, sendFailureNotificationAnnounce } from "../cron/delivery.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import { resolveDeliveryTarget } from "../cron/isolated-agent/delivery-target.js";
import {
appendCronRunLog,
resolveCronRunLogPath,
resolveCronRunLogPruneOptions,
} from "../cron/run-log.js";
import { CronService } from "../cron/service.js";
import { resolveCronStorePath } from "../cron/store.js";
import { normalizeHttpWebhookUrl } from "../cron/webhook-url.js";
import { formatErrorMessage } from "../infra/errors.js";
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js";
import { SsrFBlockedError } from "../infra/net/ssrf.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
export type GatewayCronState = {
cron: CronService;
storePath: string;
cronEnabled: boolean;
};
const CRON_WEBHOOK_TIMEOUT_MS = 10_000;
function trimToOptionalString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
function redactWebhookUrl(url: string): string {
try {
const parsed = new URL(url);
return `${parsed.origin}${parsed.pathname}`;
} catch {
return "<invalid-webhook-url>";
}
}
type CronWebhookTarget = {
url: string;
source: "delivery" | "legacy";
};
function resolveCronWebhookTarget(params: {
delivery?: { mode?: string; to?: string };
legacyNotify?: boolean;
legacyWebhook?: string;
}): CronWebhookTarget | null {
const mode = params.delivery?.mode?.trim().toLowerCase();
if (mode === "webhook") {
const url = normalizeHttpWebhookUrl(params.delivery?.to);
return url ? { url, source: "delivery" } : null;
}
if (params.legacyNotify) {
const legacyUrl = normalizeHttpWebhookUrl(params.legacyWebhook);
if (legacyUrl) {
return { url: legacyUrl, source: "legacy" };
}
}
return null;
}
function buildCronWebhookHeaders(webhookToken?: string): Record<string, string> {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
return headers;
}
async function postCronWebhook(params: {
webhookUrl: string;
webhookToken?: string;
payload: unknown;
logContext: Record<string, unknown>;
blockedLog: string;
failedLog: string;
logger: ReturnType<typeof getChildLogger>;
}): Promise<void> {
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
try {
const result = await fetchWithSsrFGuard({
url: params.webhookUrl,
init: {
method: "POST",
headers: buildCronWebhookHeaders(params.webhookToken),
body: JSON.stringify(params.payload),
signal: abortController.signal,
},
});
await result.release();
} catch (err) {
if (err instanceof SsrFBlockedError) {
params.logger.warn(
{
...params.logContext,
reason: formatErrorMessage(err),
webhookUrl: redactWebhookUrl(params.webhookUrl),
},
params.blockedLog,
);
} else {
params.logger.warn(
{
...params.logContext,
err: formatErrorMessage(err),
webhookUrl: redactWebhookUrl(params.webhookUrl),
},
params.failedLog,
);
}
} finally {
clearTimeout(timeout);
}
}
export function buildGatewayCronService(params: {
cfg: ReturnType<typeof loadConfig>;
deps: CliDeps;
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
}): GatewayCronState {
const cronLogger = getChildLogger({ module: "cron" });
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
const resolveCronAgent = (requested?: string | null) => {
const runtimeConfig = loadConfig();
const normalized =
typeof requested === "string" && requested.trim() ? normalizeAgentId(requested) : undefined;
const hasAgent =
normalized !== undefined &&
Array.isArray(runtimeConfig.agents?.list) &&
runtimeConfig.agents.list.some(
(entry) =>
entry && typeof entry.id === "string" && normalizeAgentId(entry.id) === normalized,
);
const agentId = hasAgent ? normalized : resolveDefaultAgentId(runtimeConfig);
return { agentId, cfg: runtimeConfig };
};
const resolveCronSessionKey = (params: {
runtimeConfig: ReturnType<typeof loadConfig>;
agentId: string;
requestedSessionKey?: string | null;
}) => {
const requested = params.requestedSessionKey?.trim();
if (!requested) {
return resolveAgentMainSessionKey({
cfg: params.runtimeConfig,
agentId: params.agentId,
});
}
const candidate = toAgentStoreSessionKey({
agentId: params.agentId,
requestKey: requested,
mainKey: params.runtimeConfig.session?.mainKey,
});
const canonical = canonicalizeMainSessionAlias({
cfg: params.runtimeConfig,
agentId: params.agentId,
sessionKey: candidate,
});
if (canonical !== "global") {
const sessionAgentId = resolveAgentIdFromSessionKey(canonical);
if (normalizeAgentId(sessionAgentId) !== normalizeAgentId(params.agentId)) {
return resolveAgentMainSessionKey({
cfg: params.runtimeConfig,
agentId: params.agentId,
});
}
}
return canonical;
};
const resolveCronWakeTarget = (opts?: { agentId?: string; sessionKey?: string | null }) => {
const runtimeConfig = loadConfig();
const requestedAgentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined;
const derivedAgentId =
requestedAgentId ??
(opts?.sessionKey
? normalizeAgentId(resolveAgentIdFromSessionKey(opts.sessionKey))
: undefined);
const agentId = derivedAgentId || undefined;
const sessionKey =
opts?.sessionKey && agentId
? resolveCronSessionKey({
runtimeConfig,
agentId,
requestedSessionKey: opts.sessionKey,
})
: undefined;
return { runtimeConfig, agentId, sessionKey };
};
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const runLogPrune = resolveCronRunLogPruneOptions(params.cfg.cron?.runLog);
const resolveSessionStorePath = (agentId?: string) =>
resolveStorePath(params.cfg.session?.store, {
agentId: agentId ?? defaultAgentId,
});
const sessionStorePath = resolveSessionStorePath(defaultAgentId);
const warnedLegacyWebhookJobs = new Set<string>();
const cron = new CronService({
storePath,
cronEnabled,
cronConfig: params.cfg.cron,
defaultAgentId,
resolveSessionStorePath,
sessionStorePath,
enqueueSystemEvent: (text, opts) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId);
const sessionKey = resolveCronSessionKey({
runtimeConfig,
agentId,
requestedSessionKey: opts?.sessionKey,
});
enqueueSystemEvent(text, { sessionKey, contextKey: opts?.contextKey });
},
requestHeartbeatNow: (opts) => {
const { agentId, sessionKey } = resolveCronWakeTarget(opts);
requestHeartbeatNow({
reason: opts?.reason,
agentId,
sessionKey,
});
},
runHeartbeatOnce: async (opts) => {
const { runtimeConfig, agentId, sessionKey } = resolveCronWakeTarget(opts);
// Merge cron-supplied heartbeat overrides (e.g. target: "last") with the
// fully resolved agent heartbeat config so cron-triggered heartbeats
// respect agent-specific overrides (agents.list[].heartbeat) before
// falling back to agents.defaults.heartbeat.
const agentEntry =
Array.isArray(runtimeConfig.agents?.list) &&
runtimeConfig.agents.list.find(
(entry) =>
entry && typeof entry.id === "string" && normalizeAgentId(entry.id) === agentId,
);
const agentHeartbeat =
agentEntry && typeof agentEntry === "object" ? agentEntry.heartbeat : undefined;
const baseHeartbeat = {
...runtimeConfig.agents?.defaults?.heartbeat,
...agentHeartbeat,
};
const heartbeatOverride = opts?.heartbeat
? { ...baseHeartbeat, ...opts.heartbeat }
: undefined;
return await runHeartbeatOnce({
cfg: runtimeConfig,
reason: opts?.reason,
agentId,
sessionKey,
heartbeat: heartbeatOverride,
deps: { ...params.deps, runtime: defaultRuntime },
});
},
runIsolatedAgentJob: async ({ job, message, abortSignal }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
return await runCronIsolatedAgentTurn({
cfg: runtimeConfig,
deps: params.deps,
job,
message,
abortSignal,
agentId,
sessionKey: `cron:${job.id}`,
lane: "cron",
});
},
sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
const webhookToken = trimToOptionalString(params.cfg.cron?.webhookToken);
// Webhook mode requires a URL - fail closed if missing
if (mode === "webhook" && !to) {
cronLogger.warn(
{ jobId: job.id },
"cron: failure alert webhook mode requires URL, skipping",
);
return;
}
if (mode === "webhook" && to) {
const webhookUrl = normalizeHttpWebhookUrl(to);
if (webhookUrl) {
await postCronWebhook({
webhookUrl,
webhookToken,
payload: {
jobId: job.id,
jobName: job.name,
message: text,
},
logContext: { jobId: job.id },
blockedLog: "cron: failure alert webhook blocked by SSRF guard",
failedLog: "cron: failure alert webhook failed",
logger: cronLogger,
});
} else {
cronLogger.warn(
{
jobId: job.id,
webhookUrl: redactWebhookUrl(to),
},
"cron: failure alert webhook URL is invalid, skipping",
);
}
return;
}
const target = await resolveDeliveryTarget(runtimeConfig, agentId, {
channel,
to,
accountId,
});
if (!target.ok) {
throw target.error;
}
await deliverOutboundPayloads({
cfg: runtimeConfig,
channel: target.channel,
to: target.to,
accountId: target.accountId,
threadId: target.threadId,
payloads: [{ text }],
deps: createOutboundSendDeps(params.deps),
});
},
log: getChildLogger({ module: "cron", storePath }),
onEvent: (evt) => {
params.broadcast("cron", evt, { dropIfSlow: true });
if (evt.action === "finished") {
const webhookToken = trimToOptionalString(params.cfg.cron?.webhookToken);
const legacyWebhook = trimToOptionalString(params.cfg.cron?.webhook);
const job = cron.getJob(evt.jobId);
const legacyNotify = (job as { notify?: unknown } | undefined)?.notify === true;
const webhookTarget = resolveCronWebhookTarget({
delivery:
job?.delivery && typeof job.delivery.mode === "string"
? { mode: job.delivery.mode, to: job.delivery.to }
: undefined,
legacyNotify,
legacyWebhook,
});
if (!webhookTarget && job?.delivery?.mode === "webhook") {
cronLogger.warn(
{
jobId: evt.jobId,
deliveryTo: job.delivery.to,
},
"cron: skipped webhook delivery, delivery.to must be a valid http(s) URL",
);
}
if (webhookTarget?.source === "legacy" && !warnedLegacyWebhookJobs.has(evt.jobId)) {
warnedLegacyWebhookJobs.add(evt.jobId);
cronLogger.warn(
{
jobId: evt.jobId,
legacyWebhook: redactWebhookUrl(webhookTarget.url),
},
"cron: deprecated notify+cron.webhook fallback in use, migrate to delivery.mode=webhook with delivery.to",
);
}
if (webhookTarget && evt.summary) {
void (async () => {
await postCronWebhook({
webhookUrl: webhookTarget.url,
webhookToken,
payload: evt,
logContext: { jobId: evt.jobId },
blockedLog: "cron: webhook delivery blocked by SSRF guard",
failedLog: "cron: webhook delivery failed",
logger: cronLogger,
});
})();
}
if (evt.status === "error" && job) {
const failureDest = resolveFailureDestination(job, params.cfg.cron?.failureDestination);
if (failureDest) {
const isBestEffort =
job.delivery?.bestEffort === true ||
(job.payload.kind === "agentTurn" && job.payload.bestEffortDeliver === true);
if (!isBestEffort) {
const failureMessage = `Cron job "${job.name}" failed: ${evt.error ?? "unknown error"}`;
const failurePayload = {
jobId: job.id,
jobName: job.name,
message: failureMessage,
status: evt.status,
error: evt.error,
runAtMs: evt.runAtMs,
durationMs: evt.durationMs,
nextRunAtMs: evt.nextRunAtMs,
};
if (failureDest.mode === "webhook" && failureDest.to) {
const webhookUrl = normalizeHttpWebhookUrl(failureDest.to);
if (webhookUrl) {
void (async () => {
await postCronWebhook({
webhookUrl,
webhookToken,
payload: failurePayload,
logContext: { jobId: evt.jobId },
blockedLog: "cron: failure destination webhook blocked by SSRF guard",
failedLog: "cron: failure destination webhook failed",
logger: cronLogger,
});
})();
} else {
cronLogger.warn(
{
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(failureDest.to),
},
"cron: failure destination webhook URL is invalid, skipping",
);
}
} else if (failureDest.mode === "announce") {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
void sendFailureNotificationAnnounce(
params.deps,
runtimeConfig,
agentId,
job.id,
{
channel: failureDest.channel,
to: failureDest.to,
accountId: failureDest.accountId,
},
`[Cron Failure] ${failureMessage}`,
);
}
}
}
}
const logPath = resolveCronRunLogPath({
storePath,
jobId: evt.jobId,
});
void appendCronRunLog(
logPath,
{
ts: Date.now(),
jobId: evt.jobId,
action: "finished",
status: evt.status,
error: evt.error,
summary: evt.summary,
delivered: evt.delivered,
deliveryStatus: evt.deliveryStatus,
deliveryError: evt.deliveryError,
sessionId: evt.sessionId,
sessionKey: evt.sessionKey,
runAtMs: evt.runAtMs,
durationMs: evt.durationMs,
nextRunAtMs: evt.nextRunAtMs,
model: evt.model,
provider: evt.provider,
usage: evt.usage,
},
runLogPrune,
).catch((err) => {
cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed");
});
}
},
});
return { cron, storePath, cronEnabled };
}