import { logger } from "@infra/logger"; import { getRequestId } from "@infra/request-context"; import type { JobChatMessage, JobChatRun } from "@shared/types"; import { badRequest, conflict, notFound, requestTimeout, upstreamError, } from "../infra/errors"; import * as jobChatRepo from "../repositories/ghostwriter"; import * as settingsRepo from "../repositories/settings"; import { buildJobChatPromptContext } from "./ghostwriter-context"; import { LlmService } from "./llm/service"; import type { JsonSchemaDefinition } from "./llm/types"; type LlmRuntimeSettings = { model: string; provider: string | null; baseUrl: string | null; apiKey: string | null; }; const abortControllers = new Map(); const CHAT_RESPONSE_SCHEMA: JsonSchemaDefinition = { name: "job_chat_response", schema: { type: "object", properties: { response: { type: "string", }, }, required: ["response"], additionalProperties: false, }, }; function estimateTokenCount(value: string): number { if (!value) return 0; return Math.ceil(value.length / 4); } function chunkText(value: string, maxChunk = 60): string[] { if (!value) return []; const chunks: string[] = []; let cursor = 0; while (cursor < value.length) { chunks.push(value.slice(cursor, cursor + maxChunk)); cursor += maxChunk; } return chunks; } function isRunningRunUniqueConstraintError(error: unknown): boolean { const message = error instanceof Error ? error.message : String(error); return ( message.includes("idx_job_chat_runs_thread_running_unique") || message.includes("UNIQUE constraint failed: job_chat_runs.thread_id") ); } async function resolveLlmRuntimeSettings(): Promise { const overrides = await settingsRepo.getAllSettings(); const model = overrides.modelTailoring || overrides.model || process.env.MODEL || "google/gemini-3-flash-preview"; const provider = overrides.llmProvider || process.env.LLM_PROVIDER || "openrouter"; const baseUrl = overrides.llmBaseUrl || process.env.LLM_BASE_URL || null; const apiKey = overrides.llmApiKey || process.env.LLM_API_KEY || null; return { model, provider, baseUrl, apiKey, }; } async function buildConversationMessages( threadId: string, ): Promise> { const messages = await jobChatRepo.listMessagesForThread(threadId, { limit: 40, }); return messages .filter( (message): message is typeof message & { role: "user" | "assistant" } => message.role === "user" || message.role === "assistant", ) .filter((message) => message.status !== "failed") .map((message) => ({ role: message.role, content: message.content, })); } type GenerateReplyOptions = { jobId: string; threadId: string; prompt: string; replaceMessageId?: string; version?: number; stream?: { onReady: (payload: { runId: string; threadId: string; messageId: string; requestId: string; }) => void; onDelta: (payload: { runId: string; messageId: string; delta: string; }) => void; onCompleted: (payload: { runId: string; message: Awaited>; }) => void; onCancelled: (payload: { runId: string; message: Awaited>; }) => void; onError: (payload: { runId: string; code: string; message: string; requestId: string; }) => void; }; }; async function ensureJobThread(jobId: string, title?: string | null) { return jobChatRepo.getOrCreateThreadForJob({ jobId, title: title ?? null, }); } export async function createThread(input: { jobId: string; title?: string | null; }) { return ensureJobThread(input.jobId, input.title); } export async function listThreads(jobId: string) { const thread = await ensureJobThread(jobId); return [thread]; } export async function listMessages(input: { jobId: string; threadId: string; limit?: number; offset?: number; }) { const thread = await jobChatRepo.getThreadForJob(input.jobId, input.threadId); if (!thread) { throw notFound("Thread not found for this job"); } return jobChatRepo.listMessagesForThread(input.threadId, { limit: input.limit, offset: input.offset, }); } export async function listMessagesForJob(input: { jobId: string; limit?: number; offset?: number; }) { const thread = await ensureJobThread(input.jobId); return jobChatRepo.listMessagesForThread(thread.id, { limit: input.limit, offset: input.offset, }); } async function runAssistantReply( options: GenerateReplyOptions, ): Promise<{ runId: string; messageId: string; message: string }> { const thread = await jobChatRepo.getThreadForJob( options.jobId, options.threadId, ); if (!thread) { throw notFound("Thread not found for this job"); } const activeRun = await jobChatRepo.getActiveRunForThread(options.threadId); if (activeRun) { throw conflict("A chat generation is already running for this thread"); } const [context, llmConfig, history] = await Promise.all([ buildJobChatPromptContext(options.jobId), resolveLlmRuntimeSettings(), buildConversationMessages(options.threadId), ]); const requestId = getRequestId() ?? "unknown"; let run: JobChatRun; try { run = await jobChatRepo.createRun({ threadId: options.threadId, jobId: options.jobId, model: llmConfig.model, provider: llmConfig.provider, requestId, }); } catch (error) { if (isRunningRunUniqueConstraintError(error)) { throw conflict("A chat generation is already running for this thread"); } throw error; } let assistantMessage: JobChatMessage; try { assistantMessage = await jobChatRepo.createMessage({ threadId: options.threadId, jobId: options.jobId, role: "assistant", content: "", status: "partial", version: options.version ?? 1, replacesMessageId: options.replaceMessageId ?? null, }); } catch (error) { await jobChatRepo.completeRun(run.id, { status: "failed", errorCode: "INTERNAL_ERROR", errorMessage: "Failed to create assistant message", }); throw error; } const controller = new AbortController(); abortControllers.set(run.id, controller); options.stream?.onReady({ runId: run.id, threadId: options.threadId, messageId: assistantMessage.id, requestId, }); let accumulated = ""; try { const llm = new LlmService({ provider: llmConfig.provider, baseUrl: llmConfig.baseUrl, apiKey: llmConfig.apiKey, }); const llmResult = await llm.callJson<{ response: string }>({ model: llmConfig.model, messages: [ { role: "system", content: context.systemPrompt, }, { role: "system", content: `Job Context (JSON):\n${context.jobSnapshot}`, }, { role: "system", content: `Profile Context:\n${context.profileSnapshot || "No profile context available."}`, }, ...history, { role: "user", content: options.prompt, }, ], jsonSchema: CHAT_RESPONSE_SCHEMA, maxRetries: 1, retryDelayMs: 300, jobId: options.jobId, signal: controller.signal, }); if (!llmResult.success) { if (controller.signal.aborted) { throw requestTimeout("Chat generation was cancelled"); } throw upstreamError("LLM generation failed", { reason: llmResult.error, }); } const finalText = (llmResult.data.response || "").trim(); const chunks = chunkText(finalText); for (const chunk of chunks) { if (controller.signal.aborted) { const cancelled = await jobChatRepo.updateMessage(assistantMessage.id, { content: accumulated, status: "cancelled", tokensIn: estimateTokenCount(options.prompt), tokensOut: estimateTokenCount(accumulated), }); await jobChatRepo.completeRun(run.id, { status: "cancelled", errorCode: "REQUEST_TIMEOUT", errorMessage: "Generation cancelled by user", }); options.stream?.onCancelled({ runId: run.id, message: cancelled }); return { runId: run.id, messageId: assistantMessage.id, message: accumulated, }; } accumulated += chunk; options.stream?.onDelta({ runId: run.id, messageId: assistantMessage.id, delta: chunk, }); } const completedMessage = await jobChatRepo.updateMessage( assistantMessage.id, { content: accumulated, status: "complete", tokensIn: estimateTokenCount(options.prompt), tokensOut: estimateTokenCount(accumulated), }, ); const runAfterComplete = await jobChatRepo.completeRunIfRunning(run.id, { status: "completed", }); if (!runAfterComplete || runAfterComplete.status !== "completed") { if (runAfterComplete?.status === "cancelled") { const cancelledMessage = await jobChatRepo.updateMessage( assistantMessage.id, { content: accumulated, status: "cancelled", tokensIn: estimateTokenCount(options.prompt), tokensOut: estimateTokenCount(accumulated), }, ); options.stream?.onCancelled({ runId: run.id, message: cancelledMessage, }); } return { runId: run.id, messageId: assistantMessage.id, message: accumulated, }; } options.stream?.onCompleted({ runId: run.id, message: completedMessage, }); return { runId: run.id, messageId: assistantMessage.id, message: accumulated, }; } catch (error) { const appError = error instanceof Error ? error : new Error(String(error)); const isCancelled = controller.signal.aborted || appError.name === "AbortError"; const status = isCancelled ? "cancelled" : "failed"; const code = isCancelled ? "REQUEST_TIMEOUT" : "UPSTREAM_ERROR"; const message = isCancelled ? "Generation cancelled by user" : appError.message || "Generation failed"; const failedMessage = await jobChatRepo.updateMessage(assistantMessage.id, { content: accumulated, status: isCancelled ? "cancelled" : "failed", tokensIn: estimateTokenCount(options.prompt), tokensOut: estimateTokenCount(accumulated), }); await jobChatRepo.completeRun(run.id, { status, errorCode: code, errorMessage: message, }); if (isCancelled) { options.stream?.onCancelled({ runId: run.id, message: failedMessage }); return { runId: run.id, messageId: assistantMessage.id, message: accumulated, }; } options.stream?.onError({ runId: run.id, code, message, requestId, }); throw upstreamError(message, { runId: run.id }); } finally { abortControllers.delete(run.id); logger.info("Job chat run finished", { jobId: options.jobId, threadId: options.threadId, runId: run.id, }); } } export async function sendMessage(input: { jobId: string; threadId: string; content: string; stream?: GenerateReplyOptions["stream"]; }) { const content = input.content.trim(); if (!content) { throw badRequest("Message content is required"); } const thread = await jobChatRepo.getThreadForJob(input.jobId, input.threadId); if (!thread) { throw notFound("Thread not found for this job"); } const userMessage = await jobChatRepo.createMessage({ threadId: input.threadId, jobId: input.jobId, role: "user", content, status: "complete", tokensIn: estimateTokenCount(content), tokensOut: null, }); const result = await runAssistantReply({ jobId: input.jobId, threadId: input.threadId, prompt: content, stream: input.stream, }); const assistantMessage = await jobChatRepo.getMessageById(result.messageId); return { userMessage, assistantMessage, runId: result.runId, }; } export async function sendMessageForJob(input: { jobId: string; content: string; stream?: GenerateReplyOptions["stream"]; }) { const thread = await ensureJobThread(input.jobId); return sendMessage({ jobId: input.jobId, threadId: thread.id, content: input.content, stream: input.stream, }); } export async function regenerateMessage(input: { jobId: string; threadId: string; assistantMessageId: string; stream?: GenerateReplyOptions["stream"]; }) { const thread = await jobChatRepo.getThreadForJob(input.jobId, input.threadId); if (!thread) { throw notFound("Thread not found for this job"); } const target = await jobChatRepo.getMessageById(input.assistantMessageId); if ( !target || target.threadId !== input.threadId || target.jobId !== input.jobId ) { throw notFound("Assistant message not found for this thread"); } if (target.role !== "assistant") { throw badRequest("Only assistant messages can be regenerated"); } const latestAssistant = await jobChatRepo.getLatestAssistantMessage( input.threadId, ); if (!latestAssistant || latestAssistant.id !== target.id) { throw badRequest("Only the latest assistant message can be regenerated"); } const messages = await jobChatRepo.listMessagesForThread(input.threadId, { limit: 200, }); const targetIndex = messages.findIndex((message) => message.id === target.id); const priorUser = targetIndex > 0 ? [...messages.slice(0, targetIndex)] .reverse() .find((message) => message.role === "user") : null; if (!priorUser) { throw badRequest("Could not find a user message to regenerate from"); } const result = await runAssistantReply({ jobId: input.jobId, threadId: input.threadId, prompt: priorUser.content, replaceMessageId: target.id, version: (target.version || 1) + 1, stream: input.stream, }); const assistantMessage = await jobChatRepo.getMessageById(result.messageId); return { runId: result.runId, assistantMessage, }; } export async function regenerateMessageForJob(input: { jobId: string; assistantMessageId: string; stream?: GenerateReplyOptions["stream"]; }) { const thread = await ensureJobThread(input.jobId); return regenerateMessage({ jobId: input.jobId, threadId: thread.id, assistantMessageId: input.assistantMessageId, stream: input.stream, }); } export async function cancelRun(input: { jobId: string; threadId: string; runId: string; }): Promise<{ cancelled: boolean; alreadyFinished: boolean }> { const run = await jobChatRepo.getRunById(input.runId); if (!run || run.threadId !== input.threadId || run.jobId !== input.jobId) { throw notFound("Run not found for this thread"); } if (run.status !== "running") { return { cancelled: false, alreadyFinished: true, }; } const controller = abortControllers.get(input.runId); if (controller) { controller.abort(); } const runAfterCancel = await jobChatRepo.completeRunIfRunning(input.runId, { status: "cancelled", errorCode: "REQUEST_TIMEOUT", errorMessage: "Generation cancelled by user", }); if (!runAfterCancel || runAfterCancel.status !== "cancelled") { return { cancelled: false, alreadyFinished: true, }; } return { cancelled: true, alreadyFinished: false, }; } export async function cancelRunForJob(input: { jobId: string; runId: string; }): Promise<{ cancelled: boolean; alreadyFinished: boolean }> { const thread = await ensureJobThread(input.jobId); return cancelRun({ jobId: input.jobId, threadId: thread.id, runId: input.runId, }); }