From 2fc692557f2cacf87f17dae09ac26d39b1238bf6 Mon Sep 17 00:00:00 2001 From: DaKheera47 Date: Sun, 15 Feb 2026 21:38:33 +0000 Subject: [PATCH] race condition --- orchestrator/src/server/db/migrate.ts | 19 ++++++ .../src/server/services/ghostwriter.test.ts | 19 ++++++ .../src/server/services/ghostwriter.ts | 59 ++++++++++++++----- 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/orchestrator/src/server/db/migrate.ts b/orchestrator/src/server/db/migrate.ts index 32ac22a..a0edadd 100644 --- a/orchestrator/src/server/db/migrate.ts +++ b/orchestrator/src/server/db/migrate.ts @@ -451,6 +451,25 @@ const migrations = [ `CREATE INDEX IF NOT EXISTS idx_job_chat_threads_job_updated ON job_chat_threads(job_id, updated_at)`, `CREATE INDEX IF NOT EXISTS idx_job_chat_messages_thread_created ON job_chat_messages(thread_id, created_at)`, `CREATE INDEX IF NOT EXISTS idx_job_chat_runs_thread_status ON job_chat_runs(thread_id, status)`, + // Ensure only one running run per thread; backfill any duplicates first. + `WITH ranked AS ( + SELECT + id, + ROW_NUMBER() OVER (PARTITION BY thread_id ORDER BY started_at DESC, id DESC) AS rank_in_thread + FROM job_chat_runs + WHERE status = 'running' + ) + UPDATE job_chat_runs + SET + status = 'failed', + error_code = COALESCE(error_code, 'CONFLICT'), + error_message = COALESCE(error_message, 'Recovered duplicate running run during migration'), + completed_at = COALESCE(completed_at, CAST(strftime('%s', 'now') AS INTEGER)), + updated_at = datetime('now') + WHERE id IN (SELECT id FROM ranked WHERE rank_in_thread > 1)`, + `CREATE UNIQUE INDEX IF NOT EXISTS idx_job_chat_runs_thread_running_unique + ON job_chat_runs(thread_id) + WHERE status = 'running'`, // Backfill: Create "Applied" events for legacy jobs that have applied_at set but no event entry `INSERT INTO stage_events (id, application_id, title, from_stage, to_stage, occurred_at, metadata) diff --git a/orchestrator/src/server/services/ghostwriter.test.ts b/orchestrator/src/server/services/ghostwriter.test.ts index 8fc823c..069881f 100644 --- a/orchestrator/src/server/services/ghostwriter.test.ts +++ b/orchestrator/src/server/services/ghostwriter.test.ts @@ -356,4 +356,23 @@ describe("ghostwriter service", () => { expect(result).toEqual({ cancelled: false, alreadyFinished: true }); expect(mocks.repo.completeRun).not.toHaveBeenCalled(); }); + + it("maps createRun unique constraint races to conflict", async () => { + mocks.repo.createMessage.mockResolvedValue(baseUserMessage); + mocks.repo.createRun.mockRejectedValue( + new Error( + "UNIQUE constraint failed: job_chat_runs.thread_id (idx_job_chat_runs_thread_running_unique)", + ), + ); + + await expect( + sendMessageForJob({ + jobId: "job-1", + content: "hello", + }), + ).rejects.toMatchObject({ + code: "CONFLICT", + status: 409, + }); + }); }); diff --git a/orchestrator/src/server/services/ghostwriter.ts b/orchestrator/src/server/services/ghostwriter.ts index de78714..908b0c6 100644 --- a/orchestrator/src/server/services/ghostwriter.ts +++ b/orchestrator/src/server/services/ghostwriter.ts @@ -1,5 +1,6 @@ import { logger } from "@infra/logger"; import { getRequestId } from "@infra/request-context"; +import type { JobChatMessage, JobChatRun } from "@shared/types"; import { badRequest, conflict, @@ -52,6 +53,14 @@ function chunkText(value: string, maxChunk = 60): string[] { return chunks; } +function isRunningRunUniqueConstraintError(error: unknown): boolean { + const message = error instanceof Error ? error.message : String(error); + return ( + message.includes("idx_job_chat_runs_thread_running_unique") || + message.includes("UNIQUE constraint failed: job_chat_runs.thread_id") + ); +} + async function resolveLlmRuntimeSettings(): Promise { const overrides = await settingsRepo.getAllSettings(); @@ -202,23 +211,41 @@ async function runAssistantReply( const requestId = getRequestId() ?? "unknown"; - const assistantMessage = await jobChatRepo.createMessage({ - threadId: options.threadId, - jobId: options.jobId, - role: "assistant", - content: "", - status: "partial", - version: options.version ?? 1, - replacesMessageId: options.replaceMessageId ?? null, - }); + let run: JobChatRun; + try { + run = await jobChatRepo.createRun({ + threadId: options.threadId, + jobId: options.jobId, + model: llmConfig.model, + provider: llmConfig.provider, + requestId, + }); + } catch (error) { + if (isRunningRunUniqueConstraintError(error)) { + throw conflict("A chat generation is already running for this thread"); + } + throw error; + } - const run = await jobChatRepo.createRun({ - threadId: options.threadId, - jobId: options.jobId, - model: llmConfig.model, - provider: llmConfig.provider, - requestId, - }); + let assistantMessage: JobChatMessage; + try { + assistantMessage = await jobChatRepo.createMessage({ + threadId: options.threadId, + jobId: options.jobId, + role: "assistant", + content: "", + status: "partial", + version: options.version ?? 1, + replacesMessageId: options.replaceMessageId ?? null, + }); + } catch (error) { + await jobChatRepo.completeRun(run.id, { + status: "failed", + errorCode: "INTERNAL_ERROR", + errorMessage: "Failed to create assistant message", + }); + throw error; + } const controller = new AbortController(); abortControllers.set(run.id, controller);