2025-11-25 02:16:54 +01:00
import crypto from "node:crypto" ;
2025-12-09 02:25:37 +01:00
2025-12-05 22:33:09 +01:00
import { lookupContextTokens } from "../agents/context.js" ;
2025-12-14 04:21:27 +00:00
import {
DEFAULT_CONTEXT_TOKENS ,
DEFAULT_MODEL ,
DEFAULT_PROVIDER ,
} from "../agents/defaults.js" ;
2025-12-23 23:45:20 +00:00
import { loadModelCatalog } from "../agents/model-catalog.js" ;
import {
buildAllowedModelSet ,
2025-12-26 23:26:14 +00:00
buildModelAliasIndex ,
2025-12-23 23:45:20 +00:00
modelKey ,
2025-12-26 00:16:29 +01:00
resolveConfiguredModelRef ,
2025-12-26 23:54:30 +00:00
resolveModelRefFromString ,
2026-01-03 12:18:50 +00:00
resolveThinkingDefault ,
2025-12-23 23:45:20 +00:00
} from "../agents/model-selection.js" ;
2025-12-20 16:10:46 +01:00
import {
2025-12-26 13:35:44 +01:00
abortEmbeddedPiRun ,
2026-01-03 04:26:36 +01:00
isEmbeddedPiRunActive ,
isEmbeddedPiRunStreaming ,
2025-12-20 16:10:46 +01:00
queueEmbeddedPiMessage ,
2025-12-26 13:35:44 +01:00
resolveEmbeddedSessionLane ,
2025-12-20 16:10:46 +01:00
runEmbeddedPiAgent ,
} from "../agents/pi-embedded.js" ;
2026-01-03 04:26:36 +01:00
import {
buildWorkspaceSkillSnapshot ,
type SkillSnapshot ,
} from "../agents/skills.js" ;
2025-12-14 03:14:51 +00:00
import {
DEFAULT_AGENT_WORKSPACE_DIR ,
ensureAgentWorkspace ,
} from "../agents/workspace.js" ;
2026-01-03 05:10:09 +01:00
import { parseDurationMs } from "../cli/parse-duration.js" ;
2025-12-09 23:16:57 +01:00
import { type ClawdisConfig , loadConfig } from "../config/config.js" ;
2025-11-25 02:16:54 +01:00
import {
2026-01-02 12:20:38 +01:00
buildGroupDisplayName ,
2025-11-26 00:53:53 +01:00
DEFAULT_IDLE_MINUTES ,
2025-12-22 20:36:29 +01:00
DEFAULT_RESET_TRIGGERS ,
2025-11-26 00:53:53 +01:00
loadSessionStore ,
2026-01-02 10:14:58 +01:00
resolveGroupSessionKey ,
2025-12-06 23:16:23 +01:00
resolveSessionKey ,
2025-12-17 11:29:04 +01:00
resolveSessionTranscriptPath ,
2025-11-26 00:53:53 +01:00
resolveStorePath ,
2025-12-02 20:09:51 +00:00
type SessionEntry ,
2025-11-26 00:53:53 +01:00
saveSessionStore ,
2025-11-25 02:16:54 +01:00
} from "../config/sessions.js" ;
2025-12-17 11:29:04 +01:00
import { logVerbose } from "../globals.js" ;
2026-01-02 01:04:59 +01:00
import { registerAgentRunContext } from "../infra/agent-events.js" ;
2025-12-09 02:25:37 +01:00
import { buildProviderSummary } from "../infra/provider-summary.js" ;
2025-12-09 18:00:01 +00:00
import { triggerClawdisRestart } from "../infra/restart.js" ;
2025-12-27 04:02:13 +01:00
import {
drainSystemEvents ,
enqueueSystemEvent ,
} from "../infra/system-events.js" ;
2025-12-26 13:35:44 +01:00
import { clearCommandLane , getQueueSize } from "../process/command-queue.js" ;
2025-12-05 19:03:59 +00:00
import { defaultRuntime } from "../runtime.js" ;
2026-01-03 23:44:38 +01:00
import { resolveSendPolicy } from "../sessions/send-policy.js" ;
2025-12-22 20:36:29 +01:00
import { normalizeE164 } from "../utils.js" ;
2025-12-07 18:49:55 +01:00
import { resolveHeartbeatSeconds } from "../web/reconnect.js" ;
import { getWebAuthAgeMs , webAuthExists } from "../web/session.js" ;
2026-01-03 12:35:16 -06:00
import { resolveTextChunkLimit , type TextChunkSurface } from "./chunk.js" ;
2025-12-22 20:36:29 +01:00
import {
normalizeGroupActivation ,
2025-12-22 20:45:22 +00:00
parseActivationCommand ,
2025-12-22 20:36:29 +01:00
} from "./group-activation.js" ;
2026-01-02 01:42:27 +01:00
import { stripHeartbeatToken } from "./heartbeat.js" ;
2025-12-24 00:33:35 +00:00
import { extractModelDirective } from "./model.js" ;
2026-01-04 00:06:02 +01:00
import { parseSendPolicyCommand } from "./send-policy.js" ;
2025-12-07 18:49:55 +01:00
import { buildStatusMessage } from "./status.js" ;
2025-12-17 11:29:04 +01:00
import type { MsgContext , TemplateContext } from "./templating.js" ;
2025-12-04 17:53:37 +00:00
import {
normalizeThinkLevel ,
normalizeVerboseLevel ,
type ThinkLevel ,
type VerboseLevel ,
} from "./thinking.js" ;
2026-01-02 01:42:27 +01:00
import { SILENT_REPLY_TOKEN } from "./tokens.js" ;
2025-12-04 18:02:51 +00:00
import { isAudio , transcribeInboundAudio } from "./transcription.js" ;
import type { GetReplyOptions , ReplyPayload } from "./types.js" ;
2025-11-26 02:34:43 +01:00
export type { GetReplyOptions , ReplyPayload } from "./types.js" ;
2025-11-25 04:58:31 +01:00
2025-12-02 20:09:51 +00:00
const ABORT_TRIGGERS = new Set ( [ "stop" , "esc" , "abort" , "wait" , "exit" ] ) ;
const ABORT_MEMORY = new Map < string , boolean > ( ) ;
2025-12-05 23:43:14 +00:00
const SYSTEM_MARK = "⚙️" ;
2026-01-03 16:45:53 +01:00
const DEFAULT_BLOCK_STREAM_MIN = 800 ;
const DEFAULT_BLOCK_STREAM_MAX = 1200 ;
const BLOCK_CHUNK_SURFACES = new Set < TextChunkSurface > ( [
"whatsapp" ,
"telegram" ,
"discord" ,
"signal" ,
"imessage" ,
"webchat" ,
] ) ;
function normalizeChunkSurface ( surface? : string ) : TextChunkSurface | undefined {
if ( ! surface ) return undefined ;
const cleaned = surface . trim ( ) . toLowerCase ( ) ;
return BLOCK_CHUNK_SURFACES . has ( cleaned as TextChunkSurface )
? ( cleaned as TextChunkSurface )
: undefined ;
}
function resolveBlockStreamingChunking (
cfg : ClawdisConfig | undefined ,
surface? : string ,
) : {
minChars : number ;
maxChars : number ;
breakPreference : "paragraph" | "newline" | "sentence" ;
} {
const surfaceKey = normalizeChunkSurface ( surface ) ;
const textLimit = resolveTextChunkLimit ( cfg , surfaceKey ) ;
const chunkCfg = cfg ? . agent ? . blockStreamingChunk ;
const maxRequested = Math . max (
1 ,
Math . floor ( chunkCfg ? . maxChars ? ? DEFAULT_BLOCK_STREAM_MAX ) ,
) ;
const maxChars = Math . max ( 1 , Math . min ( maxRequested , textLimit ) ) ;
const minRequested = Math . max (
1 ,
Math . floor ( chunkCfg ? . minChars ? ? DEFAULT_BLOCK_STREAM_MIN ) ,
) ;
const minChars = Math . min ( minRequested , maxChars ) ;
const breakPreference =
chunkCfg ? . breakPreference === "newline" ||
chunkCfg ? . breakPreference === "sentence"
? chunkCfg . breakPreference
: "paragraph" ;
return { minChars , maxChars , breakPreference } ;
}
2025-12-02 20:09:51 +00:00
2026-01-03 04:26:36 +01:00
type QueueMode =
| "steer"
| "followup"
| "collect"
| "steer-backlog"
| "interrupt"
| "queue" ;
type QueueDropPolicy = "old" | "new" | "summarize" ;
type QueueSettings = {
mode : QueueMode ;
debounceMs? : number ;
cap? : number ;
dropPolicy? : QueueDropPolicy ;
} ;
type FollowupRun = {
prompt : string ;
summaryLine? : string ;
enqueuedAt : number ;
run : {
sessionId : string ;
sessionKey? : string ;
2026-01-03 12:33:42 +01:00
surface? : string ;
2026-01-03 04:26:36 +01:00
sessionFile : string ;
workspaceDir : string ;
config : ClawdisConfig ;
skillsSnapshot? : SkillSnapshot ;
provider : string ;
model : string ;
thinkLevel? : ThinkLevel ;
verboseLevel? : VerboseLevel ;
timeoutMs : number ;
blockReplyBreak : "text_end" | "message_end" ;
ownerNumbers? : string [ ] ;
extraSystemPrompt? : string ;
enforceFinalTag? : boolean ;
} ;
} ;
type FollowupQueueState = {
items : FollowupRun [ ] ;
draining : boolean ;
lastEnqueuedAt : number ;
mode : QueueMode ;
debounceMs : number ;
cap : number ;
dropPolicy : QueueDropPolicy ;
droppedCount : number ;
summaryLines : string [ ] ;
lastRun? : FollowupRun [ "run" ] ;
} ;
const DEFAULT_QUEUE_DEBOUNCE_MS = 1000 ;
const DEFAULT_QUEUE_CAP = 20 ;
const DEFAULT_QUEUE_DROP : QueueDropPolicy = "summarize" ;
const FOLLOWUP_QUEUES = new Map < string , FollowupQueueState > ( ) ;
2025-12-20 13:04:55 +00:00
const BARE_SESSION_RESET_PROMPT =
2025-12-23 23:45:20 +00:00
"A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. Do not mention internal steps, files, tools, or reasoning." ;
2025-12-20 13:04:55 +00:00
2025-12-05 22:28:36 +00:00
export function extractThinkDirective ( body? : string ) : {
2025-12-03 09:09:34 +00:00
cleaned : string ;
thinkLevel? : ThinkLevel ;
rawLevel? : string ;
hasDirective : boolean ;
2025-12-03 00:45:27 +00:00
} {
2025-12-03 09:09:34 +00:00
if ( ! body ) return { cleaned : "" , hasDirective : false } ;
2025-12-06 00:49:46 +01:00
// Match the longest keyword first to avoid partial captures (e.g. "/think:high")
2025-12-05 22:28:36 +00:00
const match = body . match (
/(?:^|\s)\/(?:thinking|think|t)\s*:?\s*([a-zA-Z-]+)\b/i ,
) ;
2025-12-03 09:09:34 +00:00
const thinkLevel = normalizeThinkLevel ( match ? . [ 1 ] ) ;
const cleaned = match
? body . replace ( match [ 0 ] , "" ) . replace ( /\s+/g , " " ) . trim ( )
: body . trim ( ) ;
return {
cleaned ,
thinkLevel ,
rawLevel : match?. [ 1 ] ,
hasDirective : ! ! match ,
} ;
2025-12-03 00:45:27 +00:00
}
2025-12-05 22:28:36 +00:00
export function extractVerboseDirective ( body? : string ) : {
2025-12-03 09:09:34 +00:00
cleaned : string ;
verboseLevel? : VerboseLevel ;
rawLevel? : string ;
hasDirective : boolean ;
2025-12-03 09:04:37 +00:00
} {
2025-12-03 09:09:34 +00:00
if ( ! body ) return { cleaned : "" , hasDirective : false } ;
2025-12-06 00:49:46 +01:00
const match = body . match (
/(?:^|\s)\/(?:verbose|v)(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i ,
) ;
2025-12-03 09:09:34 +00:00
const verboseLevel = normalizeVerboseLevel ( match ? . [ 1 ] ) ;
const cleaned = match
? body . replace ( match [ 0 ] , "" ) . replace ( /\s+/g , " " ) . trim ( )
: body . trim ( ) ;
return {
cleaned ,
verboseLevel ,
rawLevel : match?. [ 1 ] ,
hasDirective : ! ! match ,
} ;
2025-12-03 09:04:37 +00:00
}
2025-12-26 13:35:44 +01:00
function normalizeQueueMode ( raw? : string ) : QueueMode | undefined {
if ( ! raw ) return undefined ;
const cleaned = raw . trim ( ) . toLowerCase ( ) ;
2026-01-03 04:26:36 +01:00
if ( cleaned === "queue" || cleaned === "queued" ) return "steer" ;
2026-01-03 05:10:09 +01:00
if (
cleaned === "interrupt" ||
cleaned === "interrupts" ||
cleaned === "abort"
)
2026-01-03 04:26:36 +01:00
return "interrupt" ;
if ( cleaned === "steer" || cleaned === "steering" ) return "steer" ;
2026-01-03 05:10:09 +01:00
if (
cleaned === "followup" ||
cleaned === "follow-ups" ||
cleaned === "followups"
)
2026-01-03 04:26:36 +01:00
return "followup" ;
if ( cleaned === "collect" || cleaned === "coalesce" ) return "collect" ;
2025-12-26 14:38:37 +01:00
if (
2026-01-03 04:26:36 +01:00
cleaned === "steer+backlog" ||
cleaned === "steer-backlog" ||
cleaned === "steer_backlog"
2025-12-26 14:38:37 +01:00
)
2026-01-03 04:26:36 +01:00
return "steer-backlog" ;
return undefined ;
}
function normalizeQueueDropPolicy ( raw? : string ) : QueueDropPolicy | undefined {
if ( ! raw ) return undefined ;
const cleaned = raw . trim ( ) . toLowerCase ( ) ;
if ( cleaned === "old" || cleaned === "oldest" ) return "old" ;
if ( cleaned === "new" || cleaned === "newest" ) return "new" ;
if ( cleaned === "summarize" || cleaned === "summary" ) return "summarize" ;
2025-12-26 13:35:44 +01:00
return undefined ;
}
2026-01-03 04:26:36 +01:00
function parseQueueDebounce ( raw? : string ) : number | undefined {
if ( ! raw ) return undefined ;
2026-01-03 04:44:36 +01:00
try {
const parsed = parseDurationMs ( raw . trim ( ) , { defaultUnit : "ms" } ) ;
if ( ! parsed || parsed < 0 ) return undefined ;
return Math . round ( parsed ) ;
} catch {
return undefined ;
}
2026-01-03 04:26:36 +01:00
}
function parseQueueCap ( raw? : string ) : number | undefined {
if ( ! raw ) return undefined ;
const num = Number ( raw ) ;
if ( ! Number . isFinite ( num ) ) return undefined ;
const cap = Math . floor ( num ) ;
if ( cap < 1 ) return undefined ;
return cap ;
}
function parseQueueDirectiveArgs ( raw : string ) : {
consumed : number ;
queueMode? : QueueMode ;
queueReset : boolean ;
rawMode? : string ;
debounceMs? : number ;
cap? : number ;
dropPolicy? : QueueDropPolicy ;
rawDebounce? : string ;
rawCap? : string ;
rawDrop? : string ;
hasOptions : boolean ;
} {
let i = 0 ;
const len = raw . length ;
while ( i < len && /\s/ . test ( raw [ i ] ) ) i += 1 ;
if ( raw [ i ] === ":" ) {
i += 1 ;
while ( i < len && /\s/ . test ( raw [ i ] ) ) i += 1 ;
}
let consumed = i ;
let queueMode : QueueMode | undefined ;
let queueReset = false ;
let rawMode : string | undefined ;
let debounceMs : number | undefined ;
let cap : number | undefined ;
let dropPolicy : QueueDropPolicy | undefined ;
let rawDebounce : string | undefined ;
let rawCap : string | undefined ;
let rawDrop : string | undefined ;
let hasOptions = false ;
const takeToken = ( ) : string | null = > {
if ( i >= len ) return null ;
const start = i ;
while ( i < len && ! /\s/ . test ( raw [ i ] ) ) i += 1 ;
if ( start === i ) return null ;
const token = raw . slice ( start , i ) ;
while ( i < len && /\s/ . test ( raw [ i ] ) ) i += 1 ;
return token ;
} ;
while ( i < len ) {
const token = takeToken ( ) ;
if ( ! token ) break ;
const lowered = token . trim ( ) . toLowerCase ( ) ;
if ( lowered === "default" || lowered === "reset" || lowered === "clear" ) {
queueReset = true ;
consumed = i ;
break ;
}
if ( lowered . startsWith ( "debounce:" ) || lowered . startsWith ( "debounce=" ) ) {
rawDebounce = token . split ( /[:=]/ ) [ 1 ] ? ? "" ;
debounceMs = parseQueueDebounce ( rawDebounce ) ;
hasOptions = true ;
consumed = i ;
continue ;
}
if ( lowered . startsWith ( "cap:" ) || lowered . startsWith ( "cap=" ) ) {
rawCap = token . split ( /[:=]/ ) [ 1 ] ? ? "" ;
cap = parseQueueCap ( rawCap ) ;
hasOptions = true ;
consumed = i ;
continue ;
}
if ( lowered . startsWith ( "drop:" ) || lowered . startsWith ( "drop=" ) ) {
rawDrop = token . split ( /[:=]/ ) [ 1 ] ? ? "" ;
dropPolicy = normalizeQueueDropPolicy ( rawDrop ) ;
hasOptions = true ;
consumed = i ;
continue ;
}
const mode = normalizeQueueMode ( token ) ;
if ( mode ) {
queueMode = mode ;
rawMode = token ;
consumed = i ;
continue ;
}
// Stop at first unrecognized token.
break ;
}
return {
consumed ,
queueMode ,
queueReset ,
rawMode ,
debounceMs ,
cap ,
dropPolicy ,
rawDebounce ,
rawCap ,
rawDrop ,
hasOptions ,
} ;
}
2025-12-26 13:35:44 +01:00
export function extractQueueDirective ( body? : string ) : {
cleaned : string ;
queueMode? : QueueMode ;
2025-12-26 14:24:53 +01:00
queueReset : boolean ;
2025-12-26 13:35:44 +01:00
rawMode? : string ;
hasDirective : boolean ;
2026-01-03 04:26:36 +01:00
debounceMs? : number ;
cap? : number ;
dropPolicy? : QueueDropPolicy ;
rawDebounce? : string ;
rawCap? : string ;
rawDrop? : string ;
hasOptions : boolean ;
2025-12-26 13:35:44 +01:00
} {
2026-01-03 04:26:36 +01:00
if ( ! body )
return {
cleaned : "" ,
hasDirective : false ,
queueReset : false ,
hasOptions : false ,
} ;
const re = /(?:^|\s)\/queue(?=$|\s|:)/i ;
const match = re . exec ( body ) ;
if ( ! match ) {
return {
cleaned : body.trim ( ) ,
hasDirective : false ,
queueReset : false ,
hasOptions : false ,
} ;
}
const start = match . index + match [ 0 ] . indexOf ( "/queue" ) ;
const argsStart = start + "/queue" . length ;
const args = body . slice ( argsStart ) ;
const parsed = parseQueueDirectiveArgs ( args ) ;
const cleanedRaw =
body . slice ( 0 , start ) + body . slice ( argsStart + parsed . consumed ) ;
const cleaned = cleanedRaw . replace ( /\s+/g , " " ) . trim ( ) ;
2025-12-26 13:35:44 +01:00
return {
cleaned ,
2026-01-03 04:26:36 +01:00
queueMode : parsed.queueMode ,
queueReset : parsed.queueReset ,
rawMode : parsed.rawMode ,
debounceMs : parsed.debounceMs ,
cap : parsed.cap ,
dropPolicy : parsed.dropPolicy ,
rawDebounce : parsed.rawDebounce ,
rawCap : parsed.rawCap ,
rawDrop : parsed.rawDrop ,
hasDirective : true ,
hasOptions : parsed.hasOptions ,
2025-12-26 13:35:44 +01:00
} ;
}
2026-01-02 23:18:41 +01:00
export function extractReplyToTag (
text? : string ,
currentMessageId? : string ,
) : {
cleaned : string ;
replyToId? : string ;
hasTag : boolean ;
} {
if ( ! text ) return { cleaned : "" , hasTag : false } ;
let cleaned = text ;
let replyToId : string | undefined ;
let hasTag = false ;
const currentMatch = cleaned . match ( /\[\[reply_to_current\]\]/i ) ;
if ( currentMatch ) {
cleaned = cleaned . replace ( /\[\[reply_to_current\]\]/gi , " " ) ;
hasTag = true ;
if ( currentMessageId ? . trim ( ) ) {
replyToId = currentMessageId . trim ( ) ;
}
}
const idMatch = cleaned . match ( /\[\[reply_to:([^\]\n]+)\]\]/i ) ;
if ( idMatch ? . [ 1 ] ) {
cleaned = cleaned . replace ( /\[\[reply_to:[^\]\n]+\]\]/gi , " " ) ;
replyToId = idMatch [ 1 ] . trim ( ) ;
hasTag = true ;
}
2026-01-02 23:36:43 +00:00
cleaned = cleaned
. replace ( /[ \t]+/g , " " )
. replace ( /[ \t]*\n[ \t]*/g , "\n" )
. trim ( ) ;
2026-01-02 23:18:41 +01:00
return { cleaned , replyToId , hasTag } ;
}
2026-01-03 04:26:36 +01:00
function elideText ( text : string , limit = 140 ) : string {
if ( text . length <= limit ) return text ;
return ` ${ text . slice ( 0 , Math . max ( 0 , limit - 1 ) ) . trimEnd ( ) } … ` ;
}
function buildQueueSummaryLine ( run : FollowupRun ) : string {
const base = run . summaryLine ? . trim ( ) || run . prompt . trim ( ) ;
const cleaned = base . replace ( /\s+/g , " " ) . trim ( ) ;
return elideText ( cleaned , 160 ) ;
}
function getFollowupQueue (
key : string ,
settings : QueueSettings ,
) : FollowupQueueState {
const existing = FOLLOWUP_QUEUES . get ( key ) ;
if ( existing ) {
existing . mode = settings . mode ;
existing . debounceMs =
typeof settings . debounceMs === "number"
? Math . max ( 0 , settings . debounceMs )
: existing . debounceMs ;
existing . cap =
typeof settings . cap === "number" && settings . cap > 0
? Math . floor ( settings . cap )
: existing . cap ;
existing . dropPolicy = settings . dropPolicy ? ? existing . dropPolicy ;
return existing ;
}
const created : FollowupQueueState = {
items : [ ] ,
draining : false ,
lastEnqueuedAt : 0 ,
mode : settings.mode ,
debounceMs :
typeof settings . debounceMs === "number"
? Math . max ( 0 , settings . debounceMs )
: DEFAULT_QUEUE_DEBOUNCE_MS ,
cap :
typeof settings . cap === "number" && settings . cap > 0
? Math . floor ( settings . cap )
: DEFAULT_QUEUE_CAP ,
dropPolicy : settings.dropPolicy ? ? DEFAULT_QUEUE_DROP ,
droppedCount : 0 ,
summaryLines : [ ] ,
} ;
FOLLOWUP_QUEUES . set ( key , created ) ;
return created ;
}
function enqueueFollowupRun (
key : string ,
run : FollowupRun ,
settings : QueueSettings ,
) : boolean {
const queue = getFollowupQueue ( key , settings ) ;
queue . lastEnqueuedAt = Date . now ( ) ;
queue . lastRun = run . run ;
const cap = queue . cap ;
if ( cap > 0 && queue . items . length >= cap ) {
if ( queue . dropPolicy === "new" ) {
return false ;
}
const dropCount = queue . items . length - cap + 1 ;
const dropped = queue . items . splice ( 0 , dropCount ) ;
if ( queue . dropPolicy === "summarize" ) {
for ( const item of dropped ) {
queue . droppedCount += 1 ;
queue . summaryLines . push ( buildQueueSummaryLine ( item ) ) ;
}
while ( queue . summaryLines . length > cap ) queue . summaryLines . shift ( ) ;
}
}
queue . items . push ( run ) ;
return true ;
}
async function waitForQueueDebounce ( queue : FollowupQueueState ) : Promise < void > {
const debounceMs = Math . max ( 0 , queue . debounceMs ) ;
if ( debounceMs <= 0 ) return ;
while ( true ) {
const since = Date . now ( ) - queue . lastEnqueuedAt ;
if ( since >= debounceMs ) return ;
await new Promise ( ( resolve ) = > setTimeout ( resolve , debounceMs - since ) ) ;
}
}
function buildSummaryPrompt ( queue : FollowupQueueState ) : string | undefined {
if ( queue . dropPolicy !== "summarize" || queue . droppedCount <= 0 ) {
return undefined ;
}
const lines = [
` [Queue overflow] Dropped ${ queue . droppedCount } message ${ queue . droppedCount === 1 ? "" : "s" } due to cap. ` ,
] ;
if ( queue . summaryLines . length > 0 ) {
lines . push ( "Summary:" ) ;
for ( const line of queue . summaryLines ) {
lines . push ( ` - ${ line } ` ) ;
}
}
queue . droppedCount = 0 ;
queue . summaryLines = [ ] ;
return lines . join ( "\n" ) ;
}
2026-01-03 05:10:09 +01:00
function buildCollectPrompt ( items : FollowupRun [ ] , summary? : string ) : string {
2026-01-03 04:26:36 +01:00
const blocks : string [ ] = [ "[Queued messages while agent was busy]" ] ;
if ( summary ) {
blocks . push ( summary ) ;
}
items . forEach ( ( item , idx ) = > {
blocks . push ( ` --- \ nQueued # ${ idx + 1 } \ n ${ item . prompt } ` . trim ( ) ) ;
} ) ;
return blocks . join ( "\n\n" ) ;
}
function scheduleFollowupDrain (
key : string ,
runFollowup : ( run : FollowupRun ) = > Promise < void > ,
) : void {
const queue = FOLLOWUP_QUEUES . get ( key ) ;
if ( ! queue || queue . draining ) return ;
queue . draining = true ;
void ( async ( ) = > {
try {
while ( queue . items . length > 0 || queue . droppedCount > 0 ) {
await waitForQueueDebounce ( queue ) ;
if ( queue . mode === "collect" ) {
const items = queue . items . splice ( 0 , queue . items . length ) ;
const summary = buildSummaryPrompt ( queue ) ;
const run = items . at ( - 1 ) ? . run ? ? queue . lastRun ;
if ( ! run ) break ;
const prompt = buildCollectPrompt ( items , summary ) ;
await runFollowup ( {
prompt ,
run ,
enqueuedAt : Date.now ( ) ,
} ) ;
continue ;
}
const summaryPrompt = buildSummaryPrompt ( queue ) ;
if ( summaryPrompt ) {
const run = queue . lastRun ;
if ( ! run ) break ;
await runFollowup ( {
prompt : summaryPrompt ,
run ,
enqueuedAt : Date.now ( ) ,
} ) ;
continue ;
}
const next = queue . items . shift ( ) ;
if ( ! next ) break ;
await runFollowup ( next ) ;
}
} catch ( err ) {
defaultRuntime . error ? . (
` followup queue drain failed for ${ key } : ${ String ( err ) } ` ,
) ;
} finally {
queue . draining = false ;
if ( queue . items . length === 0 && queue . droppedCount === 0 ) {
FOLLOWUP_QUEUES . delete ( key ) ;
} else {
scheduleFollowupDrain ( key , runFollowup ) ;
}
}
} ) ( ) ;
}
2025-12-02 20:09:51 +00:00
function isAbortTrigger ( text? : string ) : boolean {
if ( ! text ) return false ;
const normalized = text . trim ( ) . toLowerCase ( ) ;
return ABORT_TRIGGERS . has ( normalized ) ;
}
2025-12-03 14:32:58 +00:00
function stripStructuralPrefixes ( text : string ) : string {
// Ignore wrapper labels, timestamps, and sender prefixes so directive-only
// detection still works in group batches that include history/context.
const marker = "[Current message - respond to this]" ;
const afterMarker = text . includes ( marker )
? text . slice ( text . indexOf ( marker ) + marker . length )
: text ;
return afterMarker
. replace ( /\[[^\]]+\]\s*/g , "" )
. replace ( /^[ \t]*[A-Za-z0-9+()\-_. ]+:\s*/gm , "" )
. replace ( /\s+/g , " " )
. trim ( ) ;
}
2025-12-04 02:29:32 +00:00
function stripMentions (
text : string ,
ctx : MsgContext ,
2025-12-09 18:00:01 +00:00
cfg : ClawdisConfig | undefined ,
2025-12-04 02:29:32 +00:00
) : string {
let result = text ;
2025-12-24 00:22:52 +00:00
const patterns = cfg ? . routing ? . groupChat ? . mentionPatterns ? ? [ ] ;
2025-12-04 02:29:32 +00:00
for ( const p of patterns ) {
try {
const re = new RegExp ( p , "gi" ) ;
result = result . replace ( re , " " ) ;
} catch {
// ignore invalid regex
}
}
const selfE164 = ( ctx . To ? ? "" ) . replace ( /^whatsapp:/ , "" ) ;
if ( selfE164 ) {
const esc = selfE164 . replace ( /[.*+?^${}()|[\]\\]/g , "\\$&" ) ;
result = result
. replace ( new RegExp ( esc , "gi" ) , " " )
. replace ( new RegExp ( ` @ ${ esc } ` , "gi" ) , " " ) ;
}
// Generic mention patterns like @123456789 or plain digits
result = result . replace ( /@[0-9+]{5,}/g , " " ) ;
2025-12-26 13:35:44 +01:00
// Discord-style mentions (<@123> or <@!123>)
result = result . replace ( /<@!?\d+>/g , " " ) ;
2025-12-04 02:29:32 +00:00
return result . replace ( /\s+/g , " " ) . trim ( ) ;
}
2025-12-26 13:35:44 +01:00
function defaultQueueModeForSurface ( surface? : string ) : QueueMode {
const normalized = surface ? . trim ( ) . toLowerCase ( ) ;
2026-01-03 04:26:36 +01:00
if ( normalized === "discord" ) return "collect" ;
if ( normalized === "webchat" ) return "collect" ;
if ( normalized === "whatsapp" ) return "collect" ;
if ( normalized === "telegram" ) return "collect" ;
if ( normalized === "imessage" ) return "collect" ;
if ( normalized === "signal" ) return "collect" ;
return "collect" ;
2025-12-26 13:35:44 +01:00
}
2026-01-03 04:26:36 +01:00
function resolveQueueSettings ( params : {
2025-12-26 13:35:44 +01:00
cfg : ClawdisConfig ;
surface? : string ;
sessionEntry? : SessionEntry ;
inlineMode? : QueueMode ;
2026-01-03 04:26:36 +01:00
inlineOptions? : Partial < QueueSettings > ;
} ) : QueueSettings {
2025-12-26 13:35:44 +01:00
const surfaceKey = params . surface ? . trim ( ) . toLowerCase ( ) ;
const queueCfg = params . cfg . routing ? . queue ;
2026-01-03 04:26:36 +01:00
const surfaceModeRaw =
2025-12-26 13:35:44 +01:00
surfaceKey && queueCfg ? . bySurface
2026-01-03 04:26:36 +01:00
? ( queueCfg . bySurface as Record < string , string | undefined > ) [ surfaceKey ]
2025-12-26 13:35:44 +01:00
: undefined ;
2026-01-03 04:26:36 +01:00
const resolvedMode =
2025-12-26 13:35:44 +01:00
params . inlineMode ? ?
2026-01-03 04:26:36 +01:00
normalizeQueueMode ( params . sessionEntry ? . queueMode ) ? ?
normalizeQueueMode ( surfaceModeRaw ) ? ?
normalizeQueueMode ( queueCfg ? . mode ) ? ?
defaultQueueModeForSurface ( surfaceKey ) ;
const debounceRaw =
params . inlineOptions ? . debounceMs ? ?
params . sessionEntry ? . queueDebounceMs ? ?
queueCfg ? . debounceMs ? ?
DEFAULT_QUEUE_DEBOUNCE_MS ;
const capRaw =
params . inlineOptions ? . cap ? ?
params . sessionEntry ? . queueCap ? ?
queueCfg ? . cap ? ?
DEFAULT_QUEUE_CAP ;
const dropRaw =
params . inlineOptions ? . dropPolicy ? ?
params . sessionEntry ? . queueDrop ? ?
normalizeQueueDropPolicy ( queueCfg ? . drop ) ? ?
DEFAULT_QUEUE_DROP ;
return {
mode : resolvedMode ,
debounceMs :
typeof debounceRaw === "number" ? Math . max ( 0 , debounceRaw ) : undefined ,
2026-01-03 05:10:09 +01:00
cap :
typeof capRaw === "number" ? Math . max ( 1 , Math . floor ( capRaw ) ) : undefined ,
2026-01-03 04:26:36 +01:00
dropPolicy : dropRaw ,
} ;
2025-12-26 13:35:44 +01:00
}
2025-11-25 02:16:54 +01:00
export async function getReplyFromConfig (
2025-11-26 00:53:53 +01:00
ctx : MsgContext ,
opts? : GetReplyOptions ,
2025-12-09 18:00:01 +00:00
configOverride? : ClawdisConfig ,
2025-12-03 00:35:57 +00:00
) : Promise < ReplyPayload | ReplyPayload [ ] | undefined > {
2025-11-26 00:53:53 +01:00
const cfg = configOverride ? ? loadConfig ( ) ;
2025-12-23 23:45:20 +00:00
const workspaceDirRaw = cfg . agent ? . workspace ? ? DEFAULT_AGENT_WORKSPACE_DIR ;
const agentCfg = cfg . agent ;
2025-12-24 00:22:52 +00:00
const sessionCfg = cfg . session ;
2025-12-17 11:29:04 +01:00
2025-12-26 01:13:13 +01:00
const mainModel = resolveConfiguredModelRef ( {
cfg ,
defaultProvider : DEFAULT_PROVIDER ,
defaultModel : DEFAULT_MODEL ,
} ) ;
const defaultProvider = mainModel . provider ;
const defaultModel = mainModel . model ;
2025-12-26 23:26:14 +00:00
const aliasIndex = buildModelAliasIndex ( { cfg , defaultProvider } ) ;
2025-12-23 23:45:20 +00:00
let provider = defaultProvider ;
let model = defaultModel ;
2025-12-26 01:13:13 +01:00
if ( opts ? . isHeartbeat ) {
const heartbeatRaw = agentCfg ? . heartbeat ? . model ? . trim ( ) ? ? "" ;
const heartbeatRef = heartbeatRaw
2025-12-26 23:26:14 +00:00
? resolveModelRefFromString ( {
raw : heartbeatRaw ,
defaultProvider ,
aliasIndex ,
} )
2025-12-26 01:13:13 +01:00
: null ;
if ( heartbeatRef ) {
2025-12-26 23:26:14 +00:00
provider = heartbeatRef . ref . provider ;
model = heartbeatRef . ref . model ;
2025-12-26 01:13:13 +01:00
}
}
2025-12-23 23:45:20 +00:00
let contextTokens =
2025-12-17 11:29:04 +01:00
agentCfg ? . contextTokens ? ?
lookupContextTokens ( model ) ? ?
DEFAULT_CONTEXT_TOKENS ;
// Bootstrap the workspace and the required files (AGENTS.md, SOUL.md, TOOLS.md).
const workspace = await ensureAgentWorkspace ( {
dir : workspaceDirRaw ,
ensureBootstrapFiles : true ,
} ) ;
const workspaceDir = workspace . dir ;
const timeoutSeconds = Math . max ( agentCfg ? . timeoutSeconds ? ? 600 , 1 ) ;
2025-11-26 00:53:53 +01:00
const timeoutMs = timeoutSeconds * 1000 ;
let started = false ;
const triggerTyping = async ( ) = > {
await opts ? . onReplyStart ? . ( ) ;
} ;
const onReplyStart = async ( ) = > {
if ( started ) return ;
started = true ;
await triggerTyping ( ) ;
} ;
let typingTimer : NodeJS.Timeout | undefined ;
2025-12-23 15:03:05 +00:00
const configuredTypingSeconds =
agentCfg ? . typingIntervalSeconds ? ? sessionCfg ? . typingIntervalSeconds ;
const typingIntervalSeconds =
2025-12-24 00:33:35 +00:00
typeof configuredTypingSeconds === "number" ? configuredTypingSeconds : 6 ;
2025-12-23 15:03:05 +00:00
const typingIntervalMs = typingIntervalSeconds * 1000 ;
2026-01-04 00:26:29 +00:00
const typingTtlMs = 2 * 60 _000 ;
const formatTypingTtl = ( ms : number ) = > {
if ( ms % 60 _000 === 0 ) return ` ${ ms / 60 _000 } m ` ;
return ` ${ Math . round ( ms / 1000 ) } s ` ;
} ;
2025-11-26 00:53:53 +01:00
const cleanupTyping = ( ) = > {
2026-01-03 14:09:19 +00:00
if ( typingTtlTimer ) {
clearTimeout ( typingTtlTimer ) ;
typingTtlTimer = undefined ;
}
2025-11-26 00:53:53 +01:00
if ( typingTimer ) {
clearInterval ( typingTimer ) ;
typingTimer = undefined ;
}
} ;
2026-01-03 14:09:19 +00:00
let typingTtlTimer : NodeJS.Timeout | undefined ;
const refreshTypingTtl = ( ) = > {
if ( ! typingIntervalMs || typingIntervalMs <= 0 ) return ;
if ( typingTtlMs <= 0 ) return ;
if ( typingTtlTimer ) {
clearTimeout ( typingTtlTimer ) ;
}
typingTtlTimer = setTimeout ( ( ) = > {
if ( ! typingTimer ) return ;
2026-01-03 15:01:38 +00:00
defaultRuntime . log (
2026-01-04 00:26:29 +00:00
` typing TTL reached ( ${ formatTypingTtl ( typingTtlMs ) } ); stopping typing indicator ` ,
2026-01-03 14:09:19 +00:00
) ;
cleanupTyping ( ) ;
} , typingTtlMs ) ;
} ;
2025-11-26 00:53:53 +01:00
const startTypingLoop = async ( ) = > {
if ( ! opts ? . onReplyStart ) return ;
if ( typingIntervalMs <= 0 ) return ;
if ( typingTimer ) return ;
2025-12-23 13:55:01 +00:00
await onReplyStart ( ) ;
2026-01-03 14:09:19 +00:00
refreshTypingTtl ( ) ;
2025-11-26 00:53:53 +01:00
typingTimer = setInterval ( ( ) = > {
void triggerTyping ( ) ;
} , typingIntervalMs ) ;
} ;
2025-12-23 13:55:01 +00:00
const startTypingOnText = async ( text? : string ) = > {
const trimmed = text ? . trim ( ) ;
if ( ! trimmed ) return ;
if ( trimmed === SILENT_REPLY_TOKEN ) return ;
2026-01-03 14:09:19 +00:00
refreshTypingTtl ( ) ;
2025-12-23 13:55:01 +00:00
await startTypingLoop ( ) ;
} ;
2025-11-26 00:53:53 +01:00
let transcribedText : string | undefined ;
// Optional audio transcription before templating/session handling.
2025-12-24 00:22:52 +00:00
if ( cfg . routing ? . transcribeAudio && isAudio ( ctx . MediaType ) ) {
2025-11-26 00:53:53 +01:00
const transcribed = await transcribeInboundAudio ( cfg , ctx , defaultRuntime ) ;
if ( transcribed ? . text ) {
transcribedText = transcribed . text ;
ctx . Body = transcribed . text ;
ctx . Transcript = transcribed . text ;
logVerbose ( "Replaced Body with audio transcript for reply flow" ) ;
}
}
// Optional session handling (conversation reuse + /new resets)
2025-12-06 23:16:23 +01:00
const mainKey = sessionCfg ? . mainKey ? ? "main" ;
2025-11-26 00:53:53 +01:00
const resetTriggers = sessionCfg ? . resetTriggers ? . length
? sessionCfg . resetTriggers
2025-12-22 20:36:29 +01:00
: DEFAULT_RESET_TRIGGERS ;
2025-11-26 00:53:53 +01:00
const idleMinutes = Math . max (
sessionCfg ? . idleMinutes ? ? DEFAULT_IDLE_MINUTES ,
1 ,
) ;
const sessionScope = sessionCfg ? . scope ? ? "per-sender" ;
2025-12-06 00:49:46 +01:00
const storePath = resolveStorePath ( sessionCfg ? . store ) ;
2025-11-26 00:53:53 +01:00
let sessionStore : ReturnType < typeof loadSessionStore > | undefined ;
let sessionKey : string | undefined ;
2025-12-02 20:09:51 +00:00
let sessionEntry : SessionEntry | undefined ;
2025-11-26 00:53:53 +01:00
let sessionId : string | undefined ;
let isNewSession = false ;
let bodyStripped : string | undefined ;
let systemSent = false ;
2025-12-02 20:09:51 +00:00
let abortedLastRun = false ;
2025-11-26 00:53:53 +01:00
2025-12-03 08:45:23 +00:00
let persistedThinking : string | undefined ;
2025-12-03 09:04:37 +00:00
let persistedVerbose : string | undefined ;
2025-12-23 23:45:20 +00:00
let persistedModelOverride : string | undefined ;
let persistedProviderOverride : string | undefined ;
2025-12-03 08:45:23 +00:00
2026-01-02 10:14:58 +01:00
const groupResolution = resolveGroupSessionKey ( ctx ) ;
2025-12-23 13:20:11 +00:00
const isGroup =
2026-01-02 10:14:58 +01:00
ctx . ChatType ? . trim ( ) . toLowerCase ( ) === "group" || Boolean ( groupResolution ) ;
2025-12-06 00:49:46 +01:00
const triggerBodyNormalized = stripStructuralPrefixes ( ctx . Body ? ? "" )
2025-12-05 21:29:41 +00:00
. trim ( )
. toLowerCase ( ) ;
2025-12-17 11:29:04 +01:00
const rawBody = ctx . Body ? ? "" ;
const trimmedBody = rawBody . trim ( ) ;
// Timestamp/message prefixes (e.g. "[Dec 4 17:35] ") are added by the
// web inbox before we get here. They prevented reset triggers like "/new"
// from matching, so strip structural wrappers when checking for resets.
2025-12-23 13:20:11 +00:00
const strippedForReset = isGroup
? stripMentions ( triggerBodyNormalized , ctx , cfg )
: triggerBodyNormalized ;
2025-12-17 11:29:04 +01:00
for ( const trigger of resetTriggers ) {
if ( ! trigger ) continue ;
if ( trimmedBody === trigger || strippedForReset === trigger ) {
isNewSession = true ;
bodyStripped = "" ;
break ;
2025-11-26 00:53:53 +01:00
}
2025-12-17 11:29:04 +01:00
const triggerPrefix = ` ${ trigger } ` ;
if (
trimmedBody . startsWith ( triggerPrefix ) ||
strippedForReset . startsWith ( triggerPrefix )
) {
2025-11-26 00:53:53 +01:00
isNewSession = true ;
2025-12-17 11:29:04 +01:00
bodyStripped = strippedForReset . slice ( trigger . length ) . trimStart ( ) ;
break ;
2025-11-26 00:53:53 +01:00
}
2025-12-17 11:29:04 +01:00
}
2025-11-26 00:53:53 +01:00
2025-12-17 11:29:04 +01:00
sessionKey = resolveSessionKey ( sessionScope , ctx , mainKey ) ;
sessionStore = loadSessionStore ( storePath ) ;
2026-01-02 12:20:38 +01:00
if ( groupResolution ? . legacyKey && groupResolution . legacyKey !== sessionKey ) {
2026-01-02 10:14:58 +01:00
const legacyEntry = sessionStore [ groupResolution . legacyKey ] ;
if ( legacyEntry && ! sessionStore [ sessionKey ] ) {
sessionStore [ sessionKey ] = legacyEntry ;
delete sessionStore [ groupResolution . legacyKey ] ;
}
}
2025-12-17 11:29:04 +01:00
const entry = sessionStore [ sessionKey ] ;
const idleMs = idleMinutes * 60 _000 ;
const freshEntry = entry && Date . now ( ) - entry . updatedAt <= idleMs ;
if ( ! isNewSession && freshEntry ) {
sessionId = entry . sessionId ;
systemSent = entry . systemSent ? ? false ;
abortedLastRun = entry . abortedLastRun ? ? false ;
persistedThinking = entry . thinkingLevel ;
persistedVerbose = entry . verboseLevel ;
2025-12-23 23:45:20 +00:00
persistedModelOverride = entry . modelOverride ;
persistedProviderOverride = entry . providerOverride ;
2025-12-17 11:29:04 +01:00
} else {
sessionId = crypto . randomUUID ( ) ;
isNewSession = true ;
systemSent = false ;
abortedLastRun = false ;
2025-11-26 00:53:53 +01:00
}
2025-12-17 11:29:04 +01:00
const baseEntry = ! isNewSession && freshEntry ? entry : undefined ;
sessionEntry = {
. . . baseEntry ,
sessionId ,
updatedAt : Date.now ( ) ,
systemSent ,
abortedLastRun ,
// Persist previously stored thinking/verbose levels when present.
thinkingLevel : persistedThinking ? ? baseEntry ? . thinkingLevel ,
verboseLevel : persistedVerbose ? ? baseEntry ? . verboseLevel ,
2025-12-23 23:45:20 +00:00
modelOverride : persistedModelOverride ? ? baseEntry ? . modelOverride ,
providerOverride : persistedProviderOverride ? ? baseEntry ? . providerOverride ,
2026-01-03 23:44:38 +01:00
sendPolicy : baseEntry?.sendPolicy ,
2025-12-26 13:35:44 +01:00
queueMode : baseEntry?.queueMode ,
2026-01-03 04:26:36 +01:00
queueDebounceMs : baseEntry?.queueDebounceMs ,
queueCap : baseEntry?.queueCap ,
queueDrop : baseEntry?.queueDrop ,
2026-01-02 10:14:58 +01:00
displayName : baseEntry?.displayName ,
chatType : baseEntry?.chatType ,
surface : baseEntry?.surface ,
subject : baseEntry?.subject ,
room : baseEntry?.room ,
space : baseEntry?.space ,
2025-12-17 11:29:04 +01:00
} ;
2026-01-02 10:14:58 +01:00
if ( groupResolution ? . surface ) {
const surface = groupResolution . surface ;
const subject = ctx . GroupSubject ? . trim ( ) ;
2026-01-02 11:15:52 +01:00
const space = ctx . GroupSpace ? . trim ( ) ;
const explicitRoom = ctx . GroupRoom ? . trim ( ) ;
2026-01-02 10:14:58 +01:00
const isRoomSurface = surface === "discord" || surface === "slack" ;
const nextRoom =
2026-01-02 11:15:52 +01:00
explicitRoom ? ?
( isRoomSurface && subject && subject . startsWith ( "#" )
? subject
: undefined ) ;
2026-01-02 10:14:58 +01:00
const nextSubject = nextRoom ? undefined : subject ;
sessionEntry . chatType = groupResolution . chatType ? ? "group" ;
sessionEntry . surface = surface ;
if ( nextSubject ) sessionEntry . subject = nextSubject ;
if ( nextRoom ) sessionEntry . room = nextRoom ;
2026-01-02 11:15:52 +01:00
if ( space ) sessionEntry . space = space ;
2026-01-02 10:14:58 +01:00
sessionEntry . displayName = buildGroupDisplayName ( {
surface : sessionEntry.surface ,
subject : sessionEntry.subject ,
room : sessionEntry.room ,
space : sessionEntry.space ,
id : groupResolution.id ,
key : sessionKey ,
} ) ;
} else if ( ! sessionEntry . chatType ) {
sessionEntry . chatType = "direct" ;
}
2025-12-17 11:29:04 +01:00
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
2025-11-26 00:53:53 +01:00
const sessionCtx : TemplateContext = {
. . . ctx ,
BodyStripped : bodyStripped ? ? ctx . Body ,
SessionId : sessionId ,
IsNewSession : isNewSession ? "true" : "false" ,
} ;
2025-12-03 09:09:34 +00:00
const {
2025-12-06 00:49:46 +01:00
cleaned : thinkCleaned ,
2025-12-03 09:09:34 +00:00
thinkLevel : inlineThink ,
rawLevel : rawThinkLevel ,
hasDirective : hasThinkDirective ,
2025-12-06 00:49:46 +01:00
} = extractThinkDirective ( sessionCtx . BodyStripped ? ? sessionCtx . Body ? ? "" ) ;
2025-12-03 09:09:34 +00:00
const {
2025-12-06 00:49:46 +01:00
cleaned : verboseCleaned ,
2025-12-03 09:09:34 +00:00
verboseLevel : inlineVerbose ,
rawLevel : rawVerboseLevel ,
hasDirective : hasVerboseDirective ,
2025-12-06 00:49:46 +01:00
} = extractVerboseDirective ( thinkCleaned ) ;
2025-12-23 23:45:20 +00:00
const {
cleaned : modelCleaned ,
rawModel : rawModelDirective ,
hasDirective : hasModelDirective ,
} = extractModelDirective ( verboseCleaned ) ;
2025-12-26 13:35:44 +01:00
const {
cleaned : queueCleaned ,
queueMode : inlineQueueMode ,
2025-12-26 14:24:53 +01:00
queueReset : inlineQueueReset ,
2025-12-26 13:35:44 +01:00
rawMode : rawQueueMode ,
2026-01-03 04:26:36 +01:00
debounceMs : inlineQueueDebounceMs ,
cap : inlineQueueCap ,
dropPolicy : inlineQueueDrop ,
rawDebounce : rawQueueDebounce ,
rawCap : rawQueueCap ,
rawDrop : rawQueueDrop ,
2025-12-26 13:35:44 +01:00
hasDirective : hasQueueDirective ,
} = extractQueueDirective ( modelCleaned ) ;
sessionCtx . Body = queueCleaned ;
sessionCtx . BodyStripped = queueCleaned ;
2025-12-03 09:09:34 +00:00
2026-01-02 22:23:00 +01:00
const resolveGroupRequireMention = ( ) = > {
const surface =
groupResolution ? . surface ? ? ctx . Surface ? . trim ( ) . toLowerCase ( ) ;
const groupId = groupResolution ? . id ? ? ctx . From ? . replace ( /^group:/ , "" ) ;
if ( surface === "telegram" ) {
if ( groupId ) {
const groupConfig = cfg . telegram ? . groups ? . [ groupId ] ;
if ( typeof groupConfig ? . requireMention === "boolean" ) {
return groupConfig . requireMention ;
}
}
const groupDefault = cfg . telegram ? . groups ? . [ "*" ] ? . requireMention ;
if ( typeof groupDefault === "boolean" ) return groupDefault ;
return true ;
}
if ( surface === "whatsapp" ) {
if ( groupId ) {
const groupConfig = cfg . whatsapp ? . groups ? . [ groupId ] ;
if ( typeof groupConfig ? . requireMention === "boolean" ) {
return groupConfig . requireMention ;
}
}
const groupDefault = cfg . whatsapp ? . groups ? . [ "*" ] ? . requireMention ;
if ( typeof groupDefault === "boolean" ) return groupDefault ;
return true ;
}
if ( surface === "imessage" ) {
if ( groupId ) {
const groupConfig = cfg . imessage ? . groups ? . [ groupId ] ;
if ( typeof groupConfig ? . requireMention === "boolean" ) {
return groupConfig . requireMention ;
}
}
const groupDefault = cfg . imessage ? . groups ? . [ "*" ] ? . requireMention ;
if ( typeof groupDefault === "boolean" ) return groupDefault ;
return true ;
}
return true ;
} ;
2025-12-23 12:53:30 +00:00
const defaultGroupActivation = ( ) = > {
2026-01-02 22:23:00 +01:00
const requireMention = resolveGroupRequireMention ( ) ;
2025-12-23 12:53:30 +00:00
return requireMention === false ? "always" : "mention" ;
} ;
2025-12-03 09:09:34 +00:00
let resolvedThinkLevel =
inlineThink ? ?
( sessionEntry ? . thinkingLevel as ThinkLevel | undefined ) ? ?
2025-12-17 11:29:04 +01:00
( agentCfg ? . thinkingDefault as ThinkLevel | undefined ) ;
2025-12-03 09:09:34 +00:00
const resolvedVerboseLevel =
inlineVerbose ? ?
( sessionEntry ? . verboseLevel as VerboseLevel | undefined ) ? ?
2025-12-17 11:29:04 +01:00
( agentCfg ? . verboseDefault as VerboseLevel | undefined ) ;
2026-01-03 00:28:33 +01:00
const resolvedBlockStreaming =
agentCfg ? . blockStreamingDefault === "off" ? "off" : "on" ;
2026-01-03 00:52:02 +01:00
const resolvedBlockStreamingBreak =
2026-01-03 12:35:16 -06:00
agentCfg ? . blockStreamingBreak === "message_end"
? "message_end"
: "text_end" ;
2026-01-03 00:28:33 +01:00
const blockStreamingEnabled = resolvedBlockStreaming === "on" ;
2026-01-03 16:45:53 +01:00
const blockReplyChunking = blockStreamingEnabled
? resolveBlockStreamingChunking ( cfg , sessionCtx . Surface )
: undefined ;
2026-01-03 00:28:33 +01:00
const streamedPayloadKeys = new Set < string > ( ) ;
2026-01-03 17:10:47 +01:00
const pendingStreamedPayloadKeys = new Set < string > ( ) ;
2026-01-03 00:28:33 +01:00
const pendingBlockTasks = new Set < Promise < void > > ( ) ;
2026-01-03 17:14:01 +01:00
let didStreamBlockReply = false ;
2026-01-03 00:28:33 +01:00
const buildPayloadKey = ( payload : ReplyPayload ) = > {
const text = payload . text ? . trim ( ) ? ? "" ;
const mediaList = payload . mediaUrls ? . length
? payload . mediaUrls
: payload . mediaUrl
? [ payload . mediaUrl ]
: [ ] ;
return JSON . stringify ( {
text ,
mediaList ,
replyToId : payload.replyToId ? ? null ,
} ) ;
} ;
2025-12-20 13:52:04 +00:00
const shouldEmitToolResult = ( ) = > {
if ( ! sessionKey || ! storePath ) {
return resolvedVerboseLevel === "on" ;
}
try {
const store = loadSessionStore ( storePath ) ;
const entry = store [ sessionKey ] ;
const current = normalizeVerboseLevel ( entry ? . verboseLevel ) ;
if ( current ) return current === "on" ;
} catch {
// ignore store read failures
}
return resolvedVerboseLevel === "on" ;
} ;
2025-12-03 09:09:34 +00:00
2025-12-23 23:45:20 +00:00
const hasAllowlist = ( agentCfg ? . allowedModels ? . length ? ? 0 ) > 0 ;
const hasStoredOverride = Boolean (
sessionEntry ? . modelOverride || sessionEntry ? . providerOverride ,
) ;
2025-12-24 00:33:35 +00:00
const needsModelCatalog =
hasModelDirective || hasAllowlist || hasStoredOverride ;
2025-12-23 23:45:20 +00:00
let allowedModelKeys = new Set < string > ( ) ;
let allowedModelCatalog : Awaited < ReturnType < typeof loadModelCatalog > > = [ ] ;
2026-01-03 12:18:50 +00:00
let modelCatalog : Awaited < ReturnType < typeof loadModelCatalog > > | null = null ;
2025-12-23 23:45:20 +00:00
let resetModelOverride = false ;
if ( needsModelCatalog ) {
2026-01-03 12:18:50 +00:00
modelCatalog = await loadModelCatalog ( { config : cfg } ) ;
2025-12-23 23:45:20 +00:00
const allowed = buildAllowedModelSet ( {
cfg ,
2026-01-03 12:18:50 +00:00
catalog : modelCatalog ,
2025-12-23 23:45:20 +00:00
defaultProvider ,
} ) ;
allowedModelCatalog = allowed . allowedCatalog ;
allowedModelKeys = allowed . allowedKeys ;
}
if ( sessionEntry && sessionStore && sessionKey && hasStoredOverride ) {
const overrideProvider =
sessionEntry . providerOverride ? . trim ( ) || defaultProvider ;
const overrideModel = sessionEntry . modelOverride ? . trim ( ) ;
if ( overrideModel ) {
const key = modelKey ( overrideProvider , overrideModel ) ;
if ( allowedModelKeys . size > 0 && ! allowedModelKeys . has ( key ) ) {
delete sessionEntry . providerOverride ;
delete sessionEntry . modelOverride ;
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
resetModelOverride = true ;
}
}
}
const storedProviderOverride = sessionEntry ? . providerOverride ? . trim ( ) ;
const storedModelOverride = sessionEntry ? . modelOverride ? . trim ( ) ;
if ( storedModelOverride ) {
const candidateProvider = storedProviderOverride || defaultProvider ;
const key = modelKey ( candidateProvider , storedModelOverride ) ;
if ( allowedModelKeys . size === 0 || allowedModelKeys . has ( key ) ) {
provider = candidateProvider ;
model = storedModelOverride ;
}
}
2026-01-03 12:18:50 +00:00
let defaultThinkingLevel : ThinkLevel | undefined ;
const resolveDefaultThinkingLevel = async ( ) = > {
if ( defaultThinkingLevel ) return defaultThinkingLevel ;
let catalogForThinking = modelCatalog ? ? allowedModelCatalog ;
if ( ! catalogForThinking || catalogForThinking . length === 0 ) {
modelCatalog = await loadModelCatalog ( { config : cfg } ) ;
catalogForThinking = modelCatalog ;
}
defaultThinkingLevel = resolveThinkingDefault ( {
cfg ,
provider ,
model ,
catalog : catalogForThinking ,
} ) ;
return defaultThinkingLevel ;
} ;
2025-12-23 23:45:20 +00:00
contextTokens =
agentCfg ? . contextTokens ? ?
lookupContextTokens ( model ) ? ?
DEFAULT_CONTEXT_TOKENS ;
2025-12-04 02:29:32 +00:00
2025-12-27 01:17:03 +00:00
const initialModelLabel = ` ${ provider } / ${ model } ` ;
const formatModelSwitchEvent = ( label : string , alias? : string ) = >
2025-12-27 04:02:13 +01:00
alias
? ` Model switched to ${ alias } ( ${ label } ). `
: ` Model switched to ${ label } . ` ;
2025-12-27 12:10:44 +00:00
const isModelListAlias =
hasModelDirective && rawModelDirective ? . trim ( ) . toLowerCase ( ) === "status" ;
const effectiveModelDirective = isModelListAlias
? undefined
: rawModelDirective ;
2025-12-27 01:17:03 +00:00
2025-12-03 09:09:34 +00:00
const directiveOnly = ( ( ) = > {
2025-12-26 13:35:44 +01:00
if (
! hasThinkDirective &&
! hasVerboseDirective &&
! hasModelDirective &&
! hasQueueDirective
)
2025-12-23 23:45:20 +00:00
return false ;
2025-12-26 13:35:44 +01:00
const stripped = stripStructuralPrefixes ( queueCleaned ? ? "" ) ;
2025-12-04 02:29:32 +00:00
const noMentions = isGroup ? stripMentions ( stripped , ctx , cfg ) : stripped ;
return noMentions . length === 0 ;
2025-12-03 09:09:34 +00:00
} ) ( ) ;
2025-12-23 23:45:20 +00:00
if ( directiveOnly ) {
2025-12-27 12:10:44 +00:00
if ( hasModelDirective && ( ! rawModelDirective || isModelListAlias ) ) {
2025-12-23 23:45:20 +00:00
if ( allowedModelCatalog . length === 0 ) {
cleanupTyping ( ) ;
return { text : "No models available." } ;
}
const current = ` ${ provider } / ${ model } ` ;
const defaultLabel = ` ${ defaultProvider } / ${ defaultModel } ` ;
const header =
current === defaultLabel
? ` Models (current: ${ current } ): `
: ` Models (current: ${ current } , default: ${ defaultLabel } ): ` ;
const lines = [ header ] ;
if ( resetModelOverride ) {
lines . push ( ` (previous selection reset to default) ` ) ;
}
for ( const entry of allowedModelCatalog ) {
const label = ` ${ entry . provider } / ${ entry . id } ` ;
2025-12-26 23:26:14 +00:00
const aliases = aliasIndex . byKey . get ( label ) ;
const aliasSuffix =
2025-12-26 23:54:30 +00:00
aliases && aliases . length > 0
? ` (alias: ${ aliases . join ( ", " ) } ) `
: "" ;
2025-12-24 00:33:35 +00:00
const suffix =
entry . name && entry . name !== entry . id ? ` — ${ entry . name } ` : "" ;
2025-12-26 23:26:14 +00:00
lines . push ( ` - ${ label } ${ aliasSuffix } ${ suffix } ` ) ;
2025-12-23 23:45:20 +00:00
}
cleanupTyping ( ) ;
return { text : lines.join ( "\n" ) } ;
}
if ( hasThinkDirective && ! inlineThink ) {
2025-12-03 09:09:34 +00:00
cleanupTyping ( ) ;
return {
2025-12-06 00:49:46 +01:00
text : ` Unrecognized thinking level " ${ rawThinkLevel ? ? "" } ". Valid levels: off, minimal, low, medium, high. ` ,
2025-12-03 09:09:34 +00:00
} ;
}
2025-12-23 23:45:20 +00:00
if ( hasVerboseDirective && ! inlineVerbose ) {
2025-12-03 09:09:34 +00:00
cleanupTyping ( ) ;
return {
2025-12-06 00:49:46 +01:00
text : ` Unrecognized verbose level " ${ rawVerboseLevel ? ? "" } ". Valid levels: off, on. ` ,
2025-12-03 09:09:34 +00:00
} ;
}
2026-01-03 04:26:36 +01:00
const queueModeInvalid =
hasQueueDirective &&
! inlineQueueMode &&
! inlineQueueReset &&
Boolean ( rawQueueMode ) ;
const queueDebounceInvalid =
hasQueueDirective &&
rawQueueDebounce !== undefined &&
typeof inlineQueueDebounceMs !== "number" ;
const queueCapInvalid =
hasQueueDirective &&
rawQueueCap !== undefined &&
typeof inlineQueueCap !== "number" ;
const queueDropInvalid =
hasQueueDirective && rawQueueDrop !== undefined && ! inlineQueueDrop ;
if (
queueModeInvalid ||
queueDebounceInvalid ||
queueCapInvalid ||
queueDropInvalid
) {
const errors : string [ ] = [ ] ;
if ( queueModeInvalid ) {
errors . push (
` Unrecognized queue mode " ${ rawQueueMode ? ? "" } ". Valid modes: steer, followup, collect, steer+backlog, interrupt. ` ,
) ;
}
if ( queueDebounceInvalid ) {
errors . push (
` Invalid debounce " ${ rawQueueDebounce ? ? "" } ". Use ms/s/m (e.g. debounce:1500ms, debounce:2s). ` ,
) ;
}
if ( queueCapInvalid ) {
errors . push (
` Invalid cap " ${ rawQueueCap ? ? "" } ". Use a positive integer (e.g. cap:10). ` ,
) ;
}
if ( queueDropInvalid ) {
errors . push (
` Invalid drop policy " ${ rawQueueDrop ? ? "" } ". Use drop:old, drop:new, or drop:summarize. ` ,
) ;
}
2025-12-26 13:35:44 +01:00
cleanupTyping ( ) ;
2026-01-03 04:26:36 +01:00
return { text : errors.join ( " " ) } ;
2025-12-26 13:35:44 +01:00
}
2025-12-23 23:45:20 +00:00
let modelSelection :
2025-12-26 23:26:14 +00:00
| { provider : string ; model : string ; isDefault : boolean ; alias? : string }
2025-12-23 23:45:20 +00:00
| undefined ;
2025-12-27 12:10:44 +00:00
if ( hasModelDirective && effectiveModelDirective ) {
2025-12-26 23:26:14 +00:00
const resolved = resolveModelRefFromString ( {
2025-12-27 12:10:44 +00:00
raw : effectiveModelDirective ,
2025-12-26 23:26:14 +00:00
defaultProvider ,
aliasIndex ,
} ) ;
if ( ! resolved ) {
2025-12-23 23:45:20 +00:00
cleanupTyping ( ) ;
return {
2025-12-27 12:10:44 +00:00
text : ` Unrecognized model " ${ effectiveModelDirective } ". Use /model to list available models. ` ,
2025-12-23 23:45:20 +00:00
} ;
}
2025-12-26 23:26:14 +00:00
const key = modelKey ( resolved . ref . provider , resolved . ref . model ) ;
2025-12-23 23:45:20 +00:00
if ( allowedModelKeys . size > 0 && ! allowedModelKeys . has ( key ) ) {
cleanupTyping ( ) ;
return {
2025-12-26 23:26:14 +00:00
text : ` Model " ${ resolved . ref . provider } / ${ resolved . ref . model } " is not allowed. Use /model to list available models. ` ,
2025-12-23 23:45:20 +00:00
} ;
}
const isDefault =
2025-12-26 23:26:14 +00:00
resolved . ref . provider === defaultProvider &&
resolved . ref . model === defaultModel ;
modelSelection = {
provider : resolved.ref.provider ,
model : resolved.ref.model ,
isDefault ,
alias : resolved.alias ,
} ;
2025-12-27 01:17:03 +00:00
const nextLabel = ` ${ modelSelection . provider } / ${ modelSelection . model } ` ;
if ( nextLabel !== initialModelLabel ) {
2025-12-27 04:02:13 +01:00
enqueueSystemEvent (
formatModelSwitchEvent ( nextLabel , modelSelection . alias ) ,
{
contextKey : ` model: ${ nextLabel } ` ,
} ,
) ;
2025-12-27 01:17:03 +00:00
}
2025-12-23 23:45:20 +00:00
}
2025-12-03 09:09:34 +00:00
if ( sessionEntry && sessionStore && sessionKey ) {
2025-12-23 23:45:20 +00:00
if ( hasThinkDirective && inlineThink ) {
if ( inlineThink === "off" ) delete sessionEntry . thinkingLevel ;
else sessionEntry . thinkingLevel = inlineThink ;
}
if ( hasVerboseDirective && inlineVerbose ) {
if ( inlineVerbose === "off" ) delete sessionEntry . verboseLevel ;
else sessionEntry . verboseLevel = inlineVerbose ;
}
if ( modelSelection ) {
if ( modelSelection . isDefault ) {
delete sessionEntry . providerOverride ;
delete sessionEntry . modelOverride ;
} else {
sessionEntry . providerOverride = modelSelection . provider ;
sessionEntry . modelOverride = modelSelection . model ;
}
2025-12-03 09:09:34 +00:00
}
2025-12-26 14:24:53 +01:00
if ( hasQueueDirective && inlineQueueReset ) {
delete sessionEntry . queueMode ;
2026-01-03 04:26:36 +01:00
delete sessionEntry . queueDebounceMs ;
delete sessionEntry . queueCap ;
delete sessionEntry . queueDrop ;
} else if ( hasQueueDirective ) {
if ( inlineQueueMode ) sessionEntry . queueMode = inlineQueueMode ;
if ( typeof inlineQueueDebounceMs === "number" ) {
sessionEntry . queueDebounceMs = inlineQueueDebounceMs ;
}
if ( typeof inlineQueueCap === "number" ) {
sessionEntry . queueCap = inlineQueueCap ;
}
if ( inlineQueueDrop ) {
sessionEntry . queueDrop = inlineQueueDrop ;
}
2025-12-26 13:35:44 +01:00
}
2025-12-03 09:09:34 +00:00
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
2025-12-23 23:45:20 +00:00
const parts : string [ ] = [ ] ;
if ( hasThinkDirective && inlineThink ) {
parts . push (
inlineThink === "off"
? "Thinking disabled."
: ` Thinking level set to ${ inlineThink } . ` ,
) ;
}
if ( hasVerboseDirective && inlineVerbose ) {
parts . push (
inlineVerbose === "off"
? ` ${ SYSTEM_MARK } Verbose logging disabled. `
: ` ${ SYSTEM_MARK } Verbose logging enabled. ` ,
) ;
}
if ( modelSelection ) {
const label = ` ${ modelSelection . provider } / ${ modelSelection . model } ` ;
2025-12-26 23:26:14 +00:00
const labelWithAlias = modelSelection . alias
? ` ${ modelSelection . alias } ( ${ label } ) `
: label ;
2025-12-23 23:45:20 +00:00
parts . push (
modelSelection . isDefault
2025-12-26 23:26:14 +00:00
? ` Model reset to default ( ${ labelWithAlias } ). `
: ` Model set to ${ labelWithAlias } . ` ,
2025-12-23 23:45:20 +00:00
) ;
}
2025-12-26 13:35:44 +01:00
if ( hasQueueDirective && inlineQueueMode ) {
parts . push ( ` ${ SYSTEM_MARK } Queue mode set to ${ inlineQueueMode } . ` ) ;
2025-12-26 14:24:53 +01:00
} else if ( hasQueueDirective && inlineQueueReset ) {
parts . push ( ` ${ SYSTEM_MARK } Queue mode reset to default. ` ) ;
2025-12-26 13:35:44 +01:00
}
2026-01-03 04:26:36 +01:00
if ( hasQueueDirective && typeof inlineQueueDebounceMs === "number" ) {
parts . push (
` ${ SYSTEM_MARK } Queue debounce set to ${ inlineQueueDebounceMs } ms. ` ,
) ;
}
if ( hasQueueDirective && typeof inlineQueueCap === "number" ) {
parts . push ( ` ${ SYSTEM_MARK } Queue cap set to ${ inlineQueueCap } . ` ) ;
}
if ( hasQueueDirective && inlineQueueDrop ) {
parts . push ( ` ${ SYSTEM_MARK } Queue drop set to ${ inlineQueueDrop } . ` ) ;
}
2025-12-23 23:45:20 +00:00
const ack = parts . join ( " " ) . trim ( ) ;
2025-12-03 09:09:34 +00:00
cleanupTyping ( ) ;
2025-12-23 23:45:20 +00:00
return { text : ack || "OK." } ;
2025-12-03 09:09:34 +00:00
}
2025-12-03 09:04:37 +00:00
2025-12-23 23:45:20 +00:00
// Persist inline think/verbose/model settings even when additional content follows.
2025-12-06 00:49:46 +01:00
if ( sessionEntry && sessionStore && sessionKey ) {
let updated = false ;
if ( hasThinkDirective && inlineThink ) {
if ( inlineThink === "off" ) {
delete sessionEntry . thinkingLevel ;
} else {
sessionEntry . thinkingLevel = inlineThink ;
2025-12-05 21:13:17 +00:00
}
2025-12-06 00:49:46 +01:00
updated = true ;
}
if ( hasVerboseDirective && inlineVerbose ) {
if ( inlineVerbose === "off" ) {
delete sessionEntry . verboseLevel ;
} else {
sessionEntry . verboseLevel = inlineVerbose ;
2025-12-05 21:13:17 +00:00
}
2025-12-06 00:49:46 +01:00
updated = true ;
}
2025-12-27 12:10:44 +00:00
if ( hasModelDirective && effectiveModelDirective ) {
2025-12-26 23:26:14 +00:00
const resolved = resolveModelRefFromString ( {
2025-12-27 12:10:44 +00:00
raw : effectiveModelDirective ,
2025-12-26 23:26:14 +00:00
defaultProvider ,
aliasIndex ,
} ) ;
if ( resolved ) {
const key = modelKey ( resolved . ref . provider , resolved . ref . model ) ;
2025-12-23 23:45:20 +00:00
if ( allowedModelKeys . size === 0 || allowedModelKeys . has ( key ) ) {
const isDefault =
2025-12-26 23:26:14 +00:00
resolved . ref . provider === defaultProvider &&
resolved . ref . model === defaultModel ;
2025-12-23 23:45:20 +00:00
if ( isDefault ) {
delete sessionEntry . providerOverride ;
delete sessionEntry . modelOverride ;
} else {
2025-12-26 23:26:14 +00:00
sessionEntry . providerOverride = resolved . ref . provider ;
sessionEntry . modelOverride = resolved . ref . model ;
2025-12-23 23:45:20 +00:00
}
2025-12-26 23:26:14 +00:00
provider = resolved . ref . provider ;
model = resolved . ref . model ;
2025-12-27 01:17:03 +00:00
const nextLabel = ` ${ provider } / ${ model } ` ;
if ( nextLabel !== initialModelLabel ) {
enqueueSystemEvent (
formatModelSwitchEvent ( nextLabel , resolved . alias ) ,
{ contextKey : ` model: ${ nextLabel } ` } ,
) ;
}
2025-12-23 23:45:20 +00:00
contextTokens =
agentCfg ? . contextTokens ? ?
lookupContextTokens ( model ) ? ?
DEFAULT_CONTEXT_TOKENS ;
updated = true ;
}
}
}
2025-12-26 14:24:53 +01:00
if ( hasQueueDirective && inlineQueueReset ) {
delete sessionEntry . queueMode ;
2026-01-03 04:26:36 +01:00
delete sessionEntry . queueDebounceMs ;
delete sessionEntry . queueCap ;
delete sessionEntry . queueDrop ;
2025-12-26 14:24:53 +01:00
updated = true ;
}
2025-12-06 00:49:46 +01:00
if ( updated ) {
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
2025-12-05 21:13:17 +00:00
}
}
2025-12-26 14:24:53 +01:00
const perMessageQueueMode =
hasQueueDirective && ! inlineQueueReset ? inlineQueueMode : undefined ;
2026-01-03 04:26:36 +01:00
const perMessageQueueOptions =
hasQueueDirective && ! inlineQueueReset
? {
debounceMs : inlineQueueDebounceMs ,
cap : inlineQueueCap ,
dropPolicy : inlineQueueDrop ,
}
: undefined ;
2025-12-05 21:13:17 +00:00
2026-01-02 12:59:47 +01:00
const surface = ( ctx . Surface ? ? "" ) . trim ( ) . toLowerCase ( ) ;
const isWhatsAppSurface =
surface === "whatsapp" ||
( ctx . From ? ? "" ) . startsWith ( "whatsapp:" ) ||
( ctx . To ? ? "" ) . startsWith ( "whatsapp:" ) ;
// WhatsApp owner allowlist (E.164 without whatsapp: prefix); used for group activation only.
const configuredAllowFrom = isWhatsAppSurface
? cfg . whatsapp ? . allowFrom
: undefined ;
2025-11-29 04:50:56 +00:00
const from = ( ctx . From ? ? "" ) . replace ( /^whatsapp:/ , "" ) ;
const to = ( ctx . To ? ? "" ) . replace ( /^whatsapp:/ , "" ) ;
2026-01-02 17:15:12 +01:00
const isEmptyConfig = Object . keys ( cfg ) . length === 0 ;
if ( isWhatsAppSurface && isEmptyConfig && from && to && from !== to ) {
cleanupTyping ( ) ;
return undefined ;
}
2025-12-12 00:50:40 +00:00
const defaultAllowFrom =
2026-01-02 16:56:27 +01:00
isWhatsAppSurface &&
( ! configuredAllowFrom || configuredAllowFrom . length === 0 ) &&
to
2025-12-12 00:50:40 +00:00
? [ to ]
: undefined ;
const allowFrom =
configuredAllowFrom && configuredAllowFrom . length > 0
? configuredAllowFrom
: defaultAllowFrom ;
2025-12-02 20:09:51 +00:00
const abortKey = sessionKey ? ? ( from || undefined ) ? ? ( to || undefined ) ;
2025-12-05 21:29:41 +00:00
const rawBodyNormalized = triggerBodyNormalized ;
2025-12-22 20:36:29 +01:00
const commandBodyNormalized = isGroup
? stripMentions ( rawBodyNormalized , ctx , cfg )
: rawBodyNormalized ;
const activationCommand = parseActivationCommand ( commandBodyNormalized ) ;
2026-01-03 23:44:38 +01:00
const sendPolicyCommand = parseSendPolicyCommand ( commandBodyNormalized ) ;
2025-12-22 20:36:29 +01:00
const senderE164 = normalizeE164 ( ctx . SenderE164 ? ? "" ) ;
2026-01-02 12:59:47 +01:00
const ownerCandidates = isWhatsAppSurface
? ( allowFrom ? ? [ ] ) . filter ( ( entry ) = > entry && entry !== "*" )
: [ ] ;
if ( isWhatsAppSurface && ownerCandidates . length === 0 && to ) {
ownerCandidates . push ( to ) ;
}
2025-12-22 20:36:29 +01:00
const ownerList = ownerCandidates
. map ( ( entry ) = > normalizeE164 ( entry ) )
. filter ( ( entry ) : entry is string = > Boolean ( entry ) ) ;
const isOwnerSender =
Boolean ( senderE164 ) && ownerList . includes ( senderE164 ? ? "" ) ;
2025-12-02 20:09:51 +00:00
if ( ! sessionEntry && abortKey ) {
abortedLastRun = ABORT_MEMORY . get ( abortKey ) ? ? false ;
}
2025-11-29 04:50:56 +00:00
2025-12-22 20:36:29 +01:00
if ( activationCommand . hasCommand ) {
if ( ! isGroup ) {
cleanupTyping ( ) ;
return { text : "⚙️ Group activation only applies to group chats." } ;
}
if ( ! isOwnerSender ) {
logVerbose (
` Ignoring /activation from non-owner in group: ${ senderE164 || "<unknown>" } ` ,
) ;
cleanupTyping ( ) ;
return undefined ;
}
if ( ! activationCommand . mode ) {
cleanupTyping ( ) ;
return { text : "⚙️ Usage: /activation mention|always" } ;
}
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . groupActivation = activationCommand . mode ;
2025-12-23 13:32:07 +00:00
sessionEntry . groupActivationNeedsSystemIntro = true ;
2025-12-22 20:36:29 +01:00
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
cleanupTyping ( ) ;
return {
text : ` ⚙️ Group activation set to ${ activationCommand . mode } . ` ,
} ;
}
2026-01-03 23:44:38 +01:00
if ( sendPolicyCommand . hasCommand ) {
if ( ! isOwnerSender ) {
2026-01-04 00:06:02 +01:00
logVerbose ( ` Ignoring /send from non-owner: ${ senderE164 || "<unknown>" } ` ) ;
2026-01-03 23:44:38 +01:00
cleanupTyping ( ) ;
return undefined ;
}
if ( ! sendPolicyCommand . mode ) {
cleanupTyping ( ) ;
return { text : "⚙️ Usage: /send on|off|inherit" } ;
}
if ( sessionEntry && sessionStore && sessionKey ) {
if ( sendPolicyCommand . mode === "inherit" ) {
delete sessionEntry . sendPolicy ;
} else {
sessionEntry . sendPolicy = sendPolicyCommand . mode ;
}
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
cleanupTyping ( ) ;
const label =
sendPolicyCommand . mode === "inherit"
? "inherit"
: sendPolicyCommand . mode === "allow"
? "on"
: "off" ;
return { text : ` ⚙️ Send policy set to ${ label } . ` } ;
}
2025-12-03 12:14:32 +00:00
if (
2025-12-22 20:36:29 +01:00
commandBodyNormalized === "/restart" ||
commandBodyNormalized === "restart" ||
commandBodyNormalized . startsWith ( "/restart " )
2025-12-03 12:14:32 +00:00
) {
2025-12-22 20:36:29 +01:00
if ( isGroup && ! isOwnerSender ) {
logVerbose (
` Ignoring /restart from non-owner in group: ${ senderE164 || "<unknown>" } ` ,
) ;
cleanupTyping ( ) ;
return undefined ;
}
2026-01-01 17:48:17 +01:00
const restartMethod = triggerClawdisRestart ( ) ;
2025-12-03 12:14:32 +00:00
cleanupTyping ( ) ;
return {
2026-01-01 17:48:17 +01:00
text : ` ⚙️ Restarting clawdis via ${ restartMethod } ; give me a few seconds to come back online. ` ,
2025-12-03 12:14:32 +00:00
} ;
}
2025-12-07 16:53:19 +00:00
if (
2025-12-22 20:36:29 +01:00
commandBodyNormalized === "/status" ||
commandBodyNormalized === "status" ||
commandBodyNormalized . startsWith ( "/status " )
2025-12-07 16:53:19 +00:00
) {
2025-12-22 20:36:29 +01:00
if ( isGroup && ! isOwnerSender ) {
logVerbose (
` Ignoring /status from non-owner in group: ${ senderE164 || "<unknown>" } ` ,
) ;
cleanupTyping ( ) ;
return undefined ;
}
2025-12-07 16:53:19 +00:00
const webLinked = await webAuthExists ( ) ;
const webAuthAgeMs = getWebAuthAgeMs ( ) ;
const heartbeatSeconds = resolveHeartbeatSeconds ( cfg , undefined ) ;
2026-01-02 22:23:00 +01:00
const groupActivation = isGroup
2026-01-02 22:50:51 +01:00
? ( normalizeGroupActivation ( sessionEntry ? . groupActivation ) ? ?
defaultGroupActivation ( ) )
2026-01-02 22:23:00 +01:00
: undefined ;
2025-12-07 16:53:19 +00:00
const statusText = buildStatusMessage ( {
2025-12-17 11:29:04 +01:00
agent : {
model ,
contextTokens ,
thinkingDefault : agentCfg?.thinkingDefault ,
verboseDefault : agentCfg?.verboseDefault ,
} ,
workspaceDir ,
2025-12-07 16:53:19 +00:00
sessionEntry ,
sessionKey ,
sessionScope ,
storePath ,
2026-01-02 22:23:00 +01:00
groupActivation ,
2026-01-03 12:18:50 +00:00
resolvedThink :
resolvedThinkLevel ? ? ( await resolveDefaultThinkingLevel ( ) ) ,
2025-12-07 16:53:19 +00:00
resolvedVerbose : resolvedVerboseLevel ,
webLinked ,
webAuthAgeMs ,
heartbeatSeconds ,
} ) ;
cleanupTyping ( ) ;
return { text : statusText } ;
}
2025-12-17 11:29:04 +01:00
const abortRequested = isAbortTrigger ( rawBodyNormalized ) ;
2025-12-02 20:09:51 +00:00
if ( abortRequested ) {
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . abortedLastRun = true ;
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
} else if ( abortKey ) {
ABORT_MEMORY . set ( abortKey , true ) ;
}
cleanupTyping ( ) ;
2025-12-05 21:37:11 +00:00
return { text : "⚙️ Agent was aborted." } ;
2025-12-02 20:09:51 +00:00
}
2026-01-03 23:44:38 +01:00
const sendPolicy = resolveSendPolicy ( {
cfg ,
entry : sessionEntry ,
sessionKey ,
surface : sessionEntry?.surface ? ? surface ,
chatType : sessionEntry?.chatType ,
} ) ;
if ( sendPolicy === "deny" ) {
2026-01-04 00:06:02 +01:00
logVerbose ( ` Send blocked by policy for session ${ sessionKey ? ? "unknown" } ` ) ;
2026-01-03 23:44:38 +01:00
cleanupTyping ( ) ;
return undefined ;
}
2025-11-26 00:53:53 +01:00
const isFirstTurnInSession = isNewSession || ! systemSent ;
2025-12-23 14:17:18 +00:00
const isGroupChat = sessionCtx . ChatType === "group" ;
const wasMentioned = ctx . WasMentioned === true ;
const shouldEagerType = ! isGroupChat || wasMentioned ;
2025-12-23 13:32:07 +00:00
const shouldInjectGroupIntro =
2025-12-23 14:17:18 +00:00
isGroupChat &&
2025-12-23 13:32:07 +00:00
( isFirstTurnInSession || sessionEntry ? . groupActivationNeedsSystemIntro ) ;
2025-12-24 00:33:35 +00:00
const groupIntro = shouldInjectGroupIntro
? ( ( ) = > {
const activation =
normalizeGroupActivation ( sessionEntry ? . groupActivation ) ? ?
defaultGroupActivation ( ) ;
const subject = sessionCtx . GroupSubject ? . trim ( ) ;
const members = sessionCtx . GroupMembers ? . trim ( ) ;
2025-12-15 10:11:18 -06:00
const surface = sessionCtx . Surface ? . trim ( ) . toLowerCase ( ) ;
const surfaceLabel = ( ( ) = > {
if ( ! surface ) return "chat" ;
if ( surface === "whatsapp" ) return "WhatsApp" ;
if ( surface === "telegram" ) return "Telegram" ;
if ( surface === "discord" ) return "Discord" ;
if ( surface === "webchat" ) return "WebChat" ;
return ` ${ surface . at ( 0 ) ? . toUpperCase ( ) ? ? "" } ${ surface . slice ( 1 ) } ` ;
} ) ( ) ;
2025-12-24 00:33:35 +00:00
const subjectLine = subject
2025-12-15 10:11:18 -06:00
? ` You are replying inside the ${ surfaceLabel } group " ${ subject } ". `
: ` You are replying inside a ${ surfaceLabel } group chat. ` ;
2025-12-24 00:33:35 +00:00
const membersLine = members ? ` Group members: ${ members } . ` : undefined ;
const activationLine =
activation === "always"
? "Activation: always-on (you receive every group message)."
: "Activation: trigger-only (you are invoked only when explicitly mentioned; recent context may be included)." ;
const silenceLine =
activation === "always"
? ` If no response is needed, reply with exactly " ${ SILENT_REPLY_TOKEN } " (no other text) so Clawdis stays silent. `
: undefined ;
const cautionLine =
activation === "always"
? "Be extremely selective: reply only when you are directly addressed, asked a question, or can add clear value. Otherwise stay silent."
2025-12-03 13:33:32 +00:00
: undefined ;
2026-01-03 02:02:55 +00:00
const lurkLine =
2026-01-03 02:28:40 +00:00
"Be a good group participant: lurk and follow the conversation, but only chime in when you have something genuinely helpful or relevant to add. Don't feel obligated to respond to every message — quality over quantity. Even when lurking silently, you can use emoji reactions to acknowledge messages, show support, or react to humor — reactions are always appreciated and don't clutter the chat." ;
2025-12-24 00:33:35 +00:00
return [
subjectLine ,
membersLine ,
activationLine ,
silenceLine ,
cautionLine ,
2026-01-03 02:02:55 +00:00
lurkLine ,
2025-12-24 00:33:35 +00:00
]
. filter ( Boolean )
. join ( " " )
. concat ( " Address the specific sender noted in the message context." ) ;
} ) ( )
: "" ;
2025-11-26 00:53:53 +01:00
const baseBody = sessionCtx . BodyStripped ? ? sessionCtx . Body ? ? "" ;
2025-12-10 15:55:20 +00:00
const rawBodyTrimmed = ( ctx . Body ? ? "" ) . trim ( ) ;
2025-12-20 13:04:55 +00:00
const baseBodyTrimmedRaw = baseBody . trim ( ) ;
2025-12-10 15:55:20 +00:00
const isBareSessionReset =
2025-12-20 13:04:55 +00:00
isNewSession &&
baseBodyTrimmedRaw . length === 0 &&
rawBodyTrimmed . length > 0 ;
2025-12-20 13:31:28 +00:00
const baseBodyFinal = isBareSessionReset
? BARE_SESSION_RESET_PROMPT
: baseBody ;
2025-12-20 13:04:55 +00:00
const baseBodyTrimmed = baseBodyFinal . trim ( ) ;
2025-12-10 13:51:06 +00:00
// Bail early if the cleaned body is empty to avoid sending blank prompts to the agent.
// This can happen if an inbound platform delivers an empty text message or we strip everything out.
2025-12-10 15:55:20 +00:00
if ( ! baseBodyTrimmed ) {
2025-12-10 13:51:06 +00:00
await onReplyStart ( ) ;
logVerbose ( "Inbound body empty after normalization; skipping agent run" ) ;
cleanupTyping ( ) ;
return {
2025-12-10 15:55:20 +00:00
text : "I didn't receive any text in your message. Please resend or add a caption." ,
2025-12-10 13:51:06 +00:00
} ;
}
2025-12-17 11:29:04 +01:00
const abortedHint = abortedLastRun
? "Note: The previous agent run was aborted by the user. Resume carefully or ask for clarification."
: "" ;
2025-12-20 13:04:55 +00:00
let prefixedBodyBase = baseBodyFinal ;
2025-12-02 20:09:51 +00:00
if ( abortedHint ) {
prefixedBodyBase = ` ${ abortedHint } \ n \ n ${ prefixedBodyBase } ` ;
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . abortedLastRun = false ;
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
} else if ( abortKey ) {
ABORT_MEMORY . set ( abortKey , false ) ;
2025-11-26 00:53:53 +01:00
}
2025-12-02 20:09:51 +00:00
}
2026-01-02 23:18:41 +01:00
const messageIdHint = sessionCtx . MessageSid ? . trim ( )
? ` [message_id: ${ sessionCtx . MessageSid . trim ( ) } ] `
: "" ;
if ( messageIdHint ) {
prefixedBodyBase = ` ${ prefixedBodyBase } \ n ${ messageIdHint } ` ;
}
2025-12-09 02:25:37 +01:00
2025-12-17 08:31:23 +01:00
// Prepend queued system events (transitions only) and (for new main sessions) a provider snapshot.
// Token efficiency: we filter out periodic/heartbeat noise and keep the lines compact.
2025-12-09 02:25:37 +01:00
const isGroupSession =
2026-01-02 10:14:58 +01:00
sessionEntry ? . chatType === "group" || sessionEntry ? . chatType === "room" ;
2025-12-09 02:25:37 +01:00
const isMainSession =
! isGroupSession && sessionKey === ( sessionCfg ? . mainKey ? ? "main" ) ;
if ( isMainSession ) {
2025-12-17 08:31:23 +01:00
const compactSystemEvent = ( line : string ) : string | null = > {
const trimmed = line . trim ( ) ;
if ( ! trimmed ) return null ;
const lower = trimmed . toLowerCase ( ) ;
if ( lower . includes ( "reason periodic" ) ) return null ;
if ( lower . includes ( "heartbeat" ) ) return null ;
if ( trimmed . startsWith ( "Node:" ) ) {
// Drop the chatty "last input … ago" segment; keep connect/disconnect/launch reasons.
return trimmed . replace ( / · last input [^·]+/i , "" ) . trim ( ) ;
}
return trimmed ;
} ;
const systemLines : string [ ] = [ ] ;
const queued = drainSystemEvents ( ) ;
systemLines . push (
. . . queued . map ( compactSystemEvent ) . filter ( ( v ) : v is string = > Boolean ( v ) ) ,
) ;
2025-12-09 02:25:37 +01:00
if ( isNewSession ) {
const summary = await buildProviderSummary ( cfg ) ;
2025-12-17 08:31:23 +01:00
if ( summary . length > 0 ) systemLines . unshift ( . . . summary ) ;
}
if ( systemLines . length > 0 ) {
const block = systemLines . map ( ( l ) = > ` System: ${ l } ` ) . join ( "\n" ) ;
prefixedBodyBase = ` ${ block } \ n \ n ${ prefixedBodyBase } ` ;
2025-12-09 02:25:37 +01:00
}
}
2025-12-17 11:29:04 +01:00
if ( isFirstTurnInSession && sessionStore && sessionKey ) {
2025-12-02 20:09:51 +00:00
const current = sessionEntry ? ?
sessionStore [ sessionKey ] ? ? {
sessionId : sessionId ? ? crypto . randomUUID ( ) ,
updatedAt : Date.now ( ) ,
} ;
2025-12-20 12:22:15 +01:00
const skillSnapshot =
isFirstTurnInSession || ! current . skillsSnapshot
? buildWorkspaceSkillSnapshot ( workspaceDir , { config : cfg } )
: current . skillsSnapshot ;
2025-12-02 20:09:51 +00:00
sessionEntry = {
. . . current ,
sessionId : sessionId ? ? current . sessionId ? ? crypto . randomUUID ( ) ,
2025-11-26 00:53:53 +01:00
updatedAt : Date.now ( ) ,
systemSent : true ,
2025-12-20 12:22:15 +01:00
skillsSnapshot : skillSnapshot ,
2025-11-26 00:53:53 +01:00
} ;
2025-12-02 20:09:51 +00:00
sessionStore [ sessionKey ] = sessionEntry ;
2025-11-26 00:53:53 +01:00
await saveSessionStore ( storePath , sessionStore ) ;
2025-12-02 21:23:56 +00:00
systemSent = true ;
2025-11-26 00:53:53 +01:00
}
2025-12-20 12:22:15 +01:00
const skillsSnapshot =
sessionEntry ? . skillsSnapshot ? ?
( isFirstTurnInSession
? undefined
: buildWorkspaceSkillSnapshot ( workspaceDir , { config : cfg } ) ) ;
if (
skillsSnapshot &&
sessionStore &&
sessionKey &&
! isFirstTurnInSession &&
! sessionEntry ? . skillsSnapshot
) {
const current = sessionEntry ? ? {
sessionId : sessionId ? ? crypto . randomUUID ( ) ,
updatedAt : Date.now ( ) ,
} ;
sessionEntry = {
. . . current ,
sessionId : sessionId ? ? current . sessionId ? ? crypto . randomUUID ( ) ,
updatedAt : Date.now ( ) ,
skillsSnapshot ,
} ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
2025-12-17 11:29:04 +01:00
const prefixedBody = transcribedText
? [ prefixedBodyBase , ` Transcript: \ n ${ transcribedText } ` ]
. filter ( Boolean )
. join ( "\n\n" )
: prefixedBodyBase ;
2025-11-26 00:53:53 +01:00
const mediaNote = ctx . MediaPath ? . length
? ` [media attached: ${ ctx . MediaPath } ${ ctx . MediaType ? ` ( ${ ctx . MediaType } ) ` : "" } ${ ctx . MediaUrl ? ` | ${ ctx . MediaUrl } ` : "" } ] `
: undefined ;
2025-12-17 11:29:04 +01:00
const mediaReplyHint = mediaNote
? "To send an image back, add a line like: MEDIA:https://example.com/image.jpg (no spaces). Keep caption in the text body."
: undefined ;
2025-12-03 08:45:23 +00:00
let commandBody = mediaNote
2025-11-26 00:53:53 +01:00
? [ mediaNote , mediaReplyHint , prefixedBody ? ? "" ]
. filter ( Boolean )
. join ( "\n" )
. trim ( )
: prefixedBody ;
2025-12-03 08:45:23 +00:00
// Fallback: if a stray leading level token remains, consume it
if ( ! resolvedThinkLevel && commandBody ) {
const parts = commandBody . split ( /\s+/ ) ;
const maybeLevel = normalizeThinkLevel ( parts [ 0 ] ) ;
if ( maybeLevel ) {
resolvedThinkLevel = maybeLevel ;
commandBody = parts . slice ( 1 ) . join ( " " ) . trim ( ) ;
}
}
2026-01-03 12:18:50 +00:00
if ( ! resolvedThinkLevel ) {
resolvedThinkLevel = await resolveDefaultThinkingLevel ( ) ;
}
2025-11-26 00:53:53 +01:00
2025-12-17 11:29:04 +01:00
const sessionIdFinal = sessionId ? ? crypto . randomUUID ( ) ;
const sessionFile = resolveSessionTranscriptPath ( sessionIdFinal ) ;
2025-12-20 16:10:46 +01:00
const queueBodyBase = transcribedText
? [ baseBodyFinal , ` Transcript: \ n ${ transcribedText } ` ]
. filter ( Boolean )
. join ( "\n\n" )
: baseBodyFinal ;
const queuedBody = mediaNote
2025-12-20 17:50:45 +01:00
? [ mediaNote , mediaReplyHint , queueBodyBase ]
. filter ( Boolean )
. join ( "\n" )
. trim ( )
2025-12-20 16:10:46 +01:00
: queueBodyBase ;
2026-01-03 04:26:36 +01:00
const resolvedQueue = resolveQueueSettings ( {
2025-12-26 13:35:44 +01:00
cfg ,
surface : sessionCtx.Surface ,
sessionEntry ,
inlineMode : perMessageQueueMode ,
2026-01-03 04:26:36 +01:00
inlineOptions : perMessageQueueOptions ,
2025-12-26 13:35:44 +01:00
} ) ;
const sessionLaneKey = resolveEmbeddedSessionLane (
sessionKey ? ? sessionIdFinal ,
) ;
const laneSize = getQueueSize ( sessionLaneKey ) ;
2026-01-03 04:26:36 +01:00
if ( resolvedQueue . mode === "interrupt" && laneSize > 0 ) {
2025-12-26 13:35:44 +01:00
const cleared = clearCommandLane ( sessionLaneKey ) ;
const aborted = abortEmbeddedPiRun ( sessionIdFinal ) ;
logVerbose (
` Interrupting ${ sessionLaneKey } (cleared ${ cleared } , aborted= ${ aborted } ) ` ,
) ;
}
2026-01-03 04:26:36 +01:00
const queueKey = sessionKey ? ? sessionIdFinal ;
const isActive = isEmbeddedPiRunActive ( sessionIdFinal ) ;
const isStreaming = isEmbeddedPiRunStreaming ( sessionIdFinal ) ;
const shouldSteer =
resolvedQueue . mode === "steer" || resolvedQueue . mode === "steer-backlog" ;
const shouldFollowup =
resolvedQueue . mode === "followup" ||
resolvedQueue . mode === "collect" ||
resolvedQueue . mode === "steer-backlog" ;
const followupRun : FollowupRun = {
prompt : queuedBody ,
summaryLine : baseBodyTrimmedRaw ,
enqueuedAt : Date.now ( ) ,
run : {
sessionId : sessionIdFinal ,
sessionKey ,
2026-01-03 12:33:42 +01:00
surface : sessionCtx.Surface?.trim ( ) . toLowerCase ( ) || undefined ,
2026-01-03 04:26:36 +01:00
sessionFile ,
workspaceDir ,
config : cfg ,
skillsSnapshot ,
provider ,
model ,
thinkLevel : resolvedThinkLevel ,
verboseLevel : resolvedVerboseLevel ,
timeoutMs ,
blockReplyBreak : resolvedBlockStreamingBreak ,
ownerNumbers : ownerList.length > 0 ? ownerList : undefined ,
extraSystemPrompt : groupIntro || undefined ,
enforceFinalTag : provider === "ollama" ? true : undefined ,
} ,
} ;
if ( shouldSteer && isStreaming ) {
const steered = queueEmbeddedPiMessage ( sessionIdFinal , queuedBody ) ;
if ( steered && ! shouldFollowup ) {
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
cleanupTyping ( ) ;
return undefined ;
}
}
if ( isActive && ( shouldFollowup || resolvedQueue . mode === "steer" ) ) {
enqueueFollowupRun ( queueKey , followupRun , resolvedQueue ) ;
2025-12-20 16:10:46 +01:00
if ( sessionEntry && sessionStore && sessionKey ) {
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
cleanupTyping ( ) ;
return undefined ;
}
2026-01-03 04:26:36 +01:00
const sendFollowupPayloads = async ( payloads : ReplyPayload [ ] ) = > {
if ( ! opts ? . onBlockReply ) {
logVerbose ( "followup queue: no onBlockReply handler; dropping payloads" ) ;
return ;
}
for ( const payload of payloads ) {
if ( ! payload ? . text && ! payload ? . mediaUrl && ! payload ? . mediaUrls ? . length ) {
continue ;
}
if (
payload . text ? . trim ( ) === SILENT_REPLY_TOKEN &&
! payload . mediaUrl &&
! payload . mediaUrls ? . length
) {
continue ;
}
await startTypingOnText ( payload . text ) ;
await opts . onBlockReply ( payload ) ;
}
} ;
const runFollowupTurn = async ( queued : FollowupRun ) = > {
const runId = crypto . randomUUID ( ) ;
if ( queued . run . sessionKey ) {
registerAgentRunContext ( runId , { sessionKey : queued.run.sessionKey } ) ;
}
let runResult : Awaited < ReturnType < typeof runEmbeddedPiAgent > > ;
try {
runResult = await runEmbeddedPiAgent ( {
sessionId : queued.run.sessionId ,
sessionKey : queued.run.sessionKey ,
2026-01-03 12:33:42 +01:00
surface : queued.run.surface ,
2026-01-03 04:26:36 +01:00
sessionFile : queued.run.sessionFile ,
workspaceDir : queued.run.workspaceDir ,
config : queued.run.config ,
skillsSnapshot : queued.run.skillsSnapshot ,
prompt : queued.prompt ,
extraSystemPrompt : queued.run.extraSystemPrompt ,
ownerNumbers : queued.run.ownerNumbers ,
enforceFinalTag : queued.run.enforceFinalTag ,
provider : queued.run.provider ,
model : queued.run.model ,
thinkLevel : queued.run.thinkLevel ,
verboseLevel : queued.run.verboseLevel ,
timeoutMs : queued.run.timeoutMs ,
runId ,
blockReplyBreak : queued.run.blockReplyBreak ,
} ) ;
} catch ( err ) {
const message = err instanceof Error ? err.message : String ( err ) ;
2026-01-03 05:10:09 +01:00
defaultRuntime . error ? . ( ` Followup agent failed before reply: ${ message } ` ) ;
2026-01-03 04:26:36 +01:00
return ;
}
const payloadArray = runResult . payloads ? ? [ ] ;
if ( payloadArray . length === 0 ) return ;
const sanitizedPayloads = payloadArray . flatMap ( ( payload ) = > {
const text = payload . text ;
if ( ! text || ! text . includes ( "HEARTBEAT_OK" ) ) return [ payload ] ;
const stripped = stripHeartbeatToken ( text , { mode : "message" } ) ;
const hasMedia =
Boolean ( payload . mediaUrl ) || ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( stripped . shouldSkip && ! hasMedia ) return [ ] ;
return [ { . . . payload , text : stripped.text } ] ;
} ) ;
const replyTaggedPayloads : ReplyPayload [ ] = sanitizedPayloads
. map ( ( payload ) = > {
const { cleaned , replyToId } = extractReplyToTag ( payload . text ) ;
return {
. . . payload ,
text : cleaned ? cleaned : undefined ,
replyToId : replyToId ? ? payload . replyToId ,
} ;
} )
. filter (
( payload ) = >
payload . text ||
payload . mediaUrl ||
( payload . mediaUrls && payload . mediaUrls . length > 0 ) ,
) ;
if ( replyTaggedPayloads . length === 0 ) return ;
if ( sessionStore && sessionKey ) {
const usage = runResult . meta . agentMeta ? . usage ;
const modelUsed = runResult . meta . agentMeta ? . model ? ? defaultModel ;
const contextTokensUsed =
agentCfg ? . contextTokens ? ?
lookupContextTokens ( modelUsed ) ? ?
sessionEntry ? . contextTokens ? ?
DEFAULT_CONTEXT_TOKENS ;
if ( usage ) {
const entry = sessionStore [ sessionKey ] ;
if ( entry ) {
const input = usage . input ? ? 0 ;
const output = usage . output ? ? 0 ;
const promptTokens =
input + ( usage . cacheRead ? ? 0 ) + ( usage . cacheWrite ? ? 0 ) ;
sessionStore [ sessionKey ] = {
. . . entry ,
inputTokens : input ,
outputTokens : output ,
2026-01-03 05:10:09 +01:00
totalTokens :
promptTokens > 0 ? promptTokens : ( usage . total ? ? input ) ,
2026-01-03 04:26:36 +01:00
model : modelUsed ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
updatedAt : Date.now ( ) ,
} ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
} else if ( modelUsed || contextTokensUsed ) {
const entry = sessionStore [ sessionKey ] ;
if ( entry ) {
sessionStore [ sessionKey ] = {
. . . entry ,
model : modelUsed ? ? entry . model ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
} ;
if ( storePath ) {
await saveSessionStore ( storePath , sessionStore ) ;
}
}
}
}
await sendFollowupPayloads ( replyTaggedPayloads ) ;
} ;
const finalizeWithFollowup = < T > ( value : T ) : T = > {
scheduleFollowupDrain ( queueKey , runFollowupTurn ) ;
return value ;
} ;
2026-01-02 01:42:27 +01:00
let didLogHeartbeatStrip = false ;
2025-12-17 11:29:04 +01:00
try {
2025-12-23 15:03:05 +00:00
if ( shouldEagerType ) {
await startTypingLoop ( ) ;
}
2025-12-17 11:29:04 +01:00
const runId = crypto . randomUUID ( ) ;
2026-01-02 01:04:59 +01:00
if ( sessionKey ) {
registerAgentRunContext ( runId , { sessionKey } ) ;
}
2025-12-26 10:16:50 +01:00
let runResult : Awaited < ReturnType < typeof runEmbeddedPiAgent > > ;
try {
runResult = await runEmbeddedPiAgent ( {
sessionId : sessionIdFinal ,
sessionKey ,
2026-01-03 12:33:42 +01:00
surface : sessionCtx.Surface?.trim ( ) . toLowerCase ( ) || undefined ,
2025-12-26 10:16:50 +01:00
sessionFile ,
workspaceDir ,
config : cfg ,
skillsSnapshot ,
prompt : commandBody ,
extraSystemPrompt : groupIntro || undefined ,
ownerNumbers : ownerList.length > 0 ? ownerList : undefined ,
2025-12-27 00:28:52 +00:00
enforceFinalTag : provider === "ollama" ? true : undefined ,
2025-12-26 10:16:50 +01:00
provider ,
model ,
thinkLevel : resolvedThinkLevel ,
verboseLevel : resolvedVerboseLevel ,
timeoutMs ,
runId ,
2026-01-03 00:52:02 +01:00
blockReplyBreak : resolvedBlockStreamingBreak ,
2026-01-03 16:45:53 +01:00
blockReplyChunking ,
2025-12-26 10:16:50 +01:00
onPartialReply : opts?.onPartialReply
? async ( payload ) = > {
2026-01-02 01:42:27 +01:00
let text = payload . text ;
if ( ! opts ? . isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
const stripped = stripHeartbeatToken ( text , { mode : "message" } ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose ( "Stripped stray HEARTBEAT_OK token from reply" ) ;
}
if (
stripped . shouldSkip &&
( payload . mediaUrls ? . length ? ? 0 ) === 0
) {
return ;
}
text = stripped . text ;
2026-01-01 23:53:29 +01:00
}
2026-01-02 01:42:27 +01:00
await startTypingOnText ( text ) ;
2025-12-26 10:16:50 +01:00
await opts . onPartialReply ? . ( {
2026-01-02 01:42:27 +01:00
text ,
2025-12-26 10:16:50 +01:00
mediaUrls : payload.mediaUrls ,
} ) ;
}
: undefined ,
2026-01-03 00:28:33 +01:00
onBlockReply :
blockStreamingEnabled && opts ? . onBlockReply
? async ( payload ) = > {
let text = payload . text ;
if ( ! opts ? . isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
const stripped = stripHeartbeatToken ( text , {
mode : "message" ,
} ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose ( "Stripped stray HEARTBEAT_OK token from reply" ) ;
}
const hasMedia = ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( stripped . shouldSkip && ! hasMedia ) return ;
text = stripped . text ;
}
const tagResult = extractReplyToTag (
text ,
sessionCtx . MessageSid ,
) ;
const cleaned = tagResult . cleaned || undefined ;
const hasMedia = ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( ! cleaned && ! hasMedia ) return ;
if ( cleaned ? . trim ( ) === SILENT_REPLY_TOKEN && ! hasMedia ) return ;
const blockPayload : ReplyPayload = {
text : cleaned ,
mediaUrls : payload.mediaUrls ,
mediaUrl : payload.mediaUrls?. [ 0 ] ,
replyToId : tagResult.replyToId ,
} ;
2026-01-03 02:16:01 +01:00
const payloadKey = buildPayloadKey ( blockPayload ) ;
2026-01-03 17:10:47 +01:00
if (
streamedPayloadKeys . has ( payloadKey ) ||
pendingStreamedPayloadKeys . has ( payloadKey )
) {
return ;
}
pendingStreamedPayloadKeys . add ( payloadKey ) ;
2026-01-03 02:16:01 +01:00
const task = ( async ( ) = > {
await startTypingOnText ( cleaned ) ;
await opts . onBlockReply ? . ( blockPayload ) ;
} ) ( )
2026-01-03 00:28:33 +01:00
. then ( ( ) = > {
2026-01-03 02:16:01 +01:00
streamedPayloadKeys . add ( payloadKey ) ;
2026-01-03 16:50:14 +00:00
didStreamBlockReply = true ;
2026-01-03 00:28:33 +01:00
} )
. catch ( ( err ) = > {
logVerbose ( ` block reply delivery failed: ${ String ( err ) } ` ) ;
2026-01-03 17:10:47 +01:00
} )
. finally ( ( ) = > {
pendingStreamedPayloadKeys . delete ( payloadKey ) ;
2026-01-03 00:28:33 +01:00
} ) ;
pendingBlockTasks . add ( task ) ;
void task . finally ( ( ) = > pendingBlockTasks . delete ( task ) ) ;
}
: undefined ,
2025-12-26 10:16:50 +01:00
shouldEmitToolResult ,
onToolResult : opts?.onToolResult
? async ( payload ) = > {
2026-01-02 01:42:27 +01:00
let text = payload . text ;
if ( ! opts ? . isHeartbeat && text ? . includes ( "HEARTBEAT_OK" ) ) {
const stripped = stripHeartbeatToken ( text , { mode : "message" } ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose ( "Stripped stray HEARTBEAT_OK token from reply" ) ;
}
if (
stripped . shouldSkip &&
( payload . mediaUrls ? . length ? ? 0 ) === 0
) {
return ;
}
text = stripped . text ;
2026-01-01 23:53:29 +01:00
}
2026-01-02 01:42:27 +01:00
await startTypingOnText ( text ) ;
await opts . onToolResult ? . ( { text , mediaUrls : payload.mediaUrls } ) ;
2025-12-26 10:16:50 +01:00
}
: undefined ,
} ) ;
} catch ( err ) {
const message = err instanceof Error ? err.message : String ( err ) ;
const isContextOverflow =
/context.*overflow|too large|context window/i . test ( message ) ;
defaultRuntime . error ( ` Embedded agent failed before reply: ${ message } ` ) ;
2026-01-03 04:26:36 +01:00
return finalizeWithFollowup ( {
2025-12-26 10:16:50 +01:00
text : isContextOverflow
? "⚠️ Context overflow - conversation too long. Starting fresh might help!"
2026-01-03 21:30:43 +00:00
: ` ⚠️ Agent failed before reply: ${ message } . Check gateway logs for details. ` ,
2026-01-03 04:26:36 +01:00
} ) ;
2025-12-26 10:16:50 +01:00
}
2025-12-03 00:40:19 +00:00
2025-12-23 13:32:07 +00:00
if (
shouldInjectGroupIntro &&
sessionEntry &&
sessionStore &&
sessionKey &&
sessionEntry . groupActivationNeedsSystemIntro
) {
sessionEntry . groupActivationNeedsSystemIntro = false ;
sessionEntry . updatedAt = Date . now ( ) ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
2025-12-17 11:29:04 +01:00
const payloadArray = runResult . payloads ? ? [ ] ;
2026-01-03 04:26:36 +01:00
if ( payloadArray . length === 0 ) return finalizeWithFollowup ( undefined ) ;
2026-01-03 00:28:33 +01:00
if ( pendingBlockTasks . size > 0 ) {
await Promise . allSettled ( pendingBlockTasks ) ;
}
2026-01-02 01:42:27 +01:00
const sanitizedPayloads = opts ? . isHeartbeat
? payloadArray
: payloadArray . flatMap ( ( payload ) = > {
const text = payload . text ;
if ( ! text || ! text . includes ( "HEARTBEAT_OK" ) ) return [ payload ] ;
const stripped = stripHeartbeatToken ( text , { mode : "message" } ) ;
if ( stripped . didStrip && ! didLogHeartbeatStrip ) {
didLogHeartbeatStrip = true ;
logVerbose ( "Stripped stray HEARTBEAT_OK token from reply" ) ;
}
const hasMedia =
Boolean ( payload . mediaUrl ) || ( payload . mediaUrls ? . length ? ? 0 ) > 0 ;
if ( stripped . shouldSkip && ! hasMedia ) return [ ] ;
return [ { . . . payload , text : stripped.text } ] ;
} ) ;
2026-01-02 23:18:41 +01:00
const replyTaggedPayloads : ReplyPayload [ ] = sanitizedPayloads
. map ( ( payload ) = > {
const { cleaned , replyToId } = extractReplyToTag (
payload . text ,
sessionCtx . MessageSid ,
) ;
return {
. . . payload ,
text : cleaned ? cleaned : undefined ,
replyToId : replyToId ? ? payload . replyToId ,
} ;
} )
. filter (
( payload ) = >
payload . text ||
payload . mediaUrl ||
( payload . mediaUrls && payload . mediaUrls . length > 0 ) ,
) ;
2026-01-03 17:14:01 +01:00
const shouldDropFinalPayloads =
2026-01-03 16:50:14 +00:00
blockStreamingEnabled && didStreamBlockReply ;
2026-01-03 17:14:01 +01:00
const filteredPayloads = shouldDropFinalPayloads
? [ ]
: blockStreamingEnabled
? replyTaggedPayloads . filter (
( payload ) = > ! streamedPayloadKeys . has ( buildPayloadKey ( payload ) ) ,
)
: replyTaggedPayloads ;
2026-01-03 00:28:33 +01:00
2026-01-03 04:26:36 +01:00
if ( filteredPayloads . length === 0 ) return finalizeWithFollowup ( undefined ) ;
2026-01-02 01:42:27 +01:00
2026-01-03 00:28:33 +01:00
const shouldSignalTyping = filteredPayloads . some ( ( payload ) = > {
2025-12-23 13:55:01 +00:00
const trimmed = payload . text ? . trim ( ) ;
2026-01-02 01:42:27 +01:00
if ( trimmed && trimmed !== SILENT_REPLY_TOKEN ) return true ;
2025-12-23 13:55:01 +00:00
if ( payload . mediaUrl ) return true ;
if ( payload . mediaUrls && payload . mediaUrls . length > 0 ) return true ;
return false ;
} ) ;
if ( shouldSignalTyping ) {
2025-12-23 15:03:05 +00:00
await startTypingLoop ( ) ;
2025-12-23 13:55:01 +00:00
}
2025-12-17 11:29:04 +01:00
if ( sessionStore && sessionKey ) {
const usage = runResult . meta . agentMeta ? . usage ;
2025-12-26 00:50:46 +00:00
const modelUsed = runResult . meta . agentMeta ? . model ? ? defaultModel ;
2025-12-17 11:29:04 +01:00
const contextTokensUsed =
agentCfg ? . contextTokens ? ?
lookupContextTokens ( modelUsed ) ? ?
sessionEntry ? . contextTokens ? ?
DEFAULT_CONTEXT_TOKENS ;
if ( usage ) {
const entry = sessionEntry ? ? sessionStore [ sessionKey ] ;
if ( entry ) {
const input = usage . input ? ? 0 ;
const output = usage . output ? ? 0 ;
const promptTokens =
input + ( usage . cacheRead ? ? 0 ) + ( usage . cacheWrite ? ? 0 ) ;
2025-12-02 20:09:51 +00:00
sessionEntry = {
. . . entry ,
2025-12-17 11:29:04 +01:00
inputTokens : input ,
outputTokens : output ,
totalTokens :
promptTokens > 0 ? promptTokens : ( usage . total ? ? input ) ,
model : modelUsed ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
2025-12-02 20:09:51 +00:00
updatedAt : Date.now ( ) ,
} ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
}
2025-12-17 11:29:04 +01:00
} else if ( modelUsed || contextTokensUsed ) {
const entry = sessionEntry ? ? sessionStore [ sessionKey ] ;
if ( entry ) {
sessionEntry = {
. . . entry ,
model : modelUsed ? ? entry . model ,
contextTokens : contextTokensUsed ? ? entry . contextTokens ,
} ;
sessionStore [ sessionKey ] = sessionEntry ;
await saveSessionStore ( storePath , sessionStore ) ;
2025-12-05 22:33:09 +01:00
}
2025-12-02 20:09:51 +00:00
}
2025-12-03 09:09:34 +00:00
}
2025-11-26 00:53:53 +01:00
2025-12-17 11:29:04 +01:00
// If verbose is enabled and this is a new session, prepend a session hint.
2026-01-03 00:28:33 +01:00
let finalPayloads = filteredPayloads ;
2025-12-17 11:29:04 +01:00
if ( resolvedVerboseLevel === "on" && isNewSession ) {
finalPayloads = [
{ text : ` 🧭 New session: ${ sessionIdFinal } ` } ,
2026-01-02 01:42:27 +01:00
. . . finalPayloads ,
2025-12-17 11:29:04 +01:00
] ;
}
2026-01-03 04:26:36 +01:00
return finalizeWithFollowup (
finalPayloads . length === 1 ? finalPayloads [ 0 ] : finalPayloads ,
) ;
2025-12-17 11:29:04 +01:00
} finally {
cleanupTyping ( ) ;
}
2025-11-25 02:16:54 +01:00
}