race condition

This commit is contained in:
DaKheera47 2026-02-15 21:38:33 +00:00
parent 2536e184a7
commit 2fc692557f
3 changed files with 81 additions and 16 deletions

View File

@ -451,6 +451,25 @@ const migrations = [
`CREATE INDEX IF NOT EXISTS idx_job_chat_threads_job_updated ON job_chat_threads(job_id, updated_at)`,
`CREATE INDEX IF NOT EXISTS idx_job_chat_messages_thread_created ON job_chat_messages(thread_id, created_at)`,
`CREATE INDEX IF NOT EXISTS idx_job_chat_runs_thread_status ON job_chat_runs(thread_id, status)`,
// Ensure only one running run per thread; backfill any duplicates first.
`WITH ranked AS (
SELECT
id,
ROW_NUMBER() OVER (PARTITION BY thread_id ORDER BY started_at DESC, id DESC) AS rank_in_thread
FROM job_chat_runs
WHERE status = 'running'
)
UPDATE job_chat_runs
SET
status = 'failed',
error_code = COALESCE(error_code, 'CONFLICT'),
error_message = COALESCE(error_message, 'Recovered duplicate running run during migration'),
completed_at = COALESCE(completed_at, CAST(strftime('%s', 'now') AS INTEGER)),
updated_at = datetime('now')
WHERE id IN (SELECT id FROM ranked WHERE rank_in_thread > 1)`,
`CREATE UNIQUE INDEX IF NOT EXISTS idx_job_chat_runs_thread_running_unique
ON job_chat_runs(thread_id)
WHERE status = 'running'`,
// Backfill: Create "Applied" events for legacy jobs that have applied_at set but no event entry
`INSERT INTO stage_events (id, application_id, title, from_stage, to_stage, occurred_at, metadata)

View File

@ -356,4 +356,23 @@ describe("ghostwriter service", () => {
expect(result).toEqual({ cancelled: false, alreadyFinished: true });
expect(mocks.repo.completeRun).not.toHaveBeenCalled();
});
it("maps createRun unique constraint races to conflict", async () => {
mocks.repo.createMessage.mockResolvedValue(baseUserMessage);
mocks.repo.createRun.mockRejectedValue(
new Error(
"UNIQUE constraint failed: job_chat_runs.thread_id (idx_job_chat_runs_thread_running_unique)",
),
);
await expect(
sendMessageForJob({
jobId: "job-1",
content: "hello",
}),
).rejects.toMatchObject({
code: "CONFLICT",
status: 409,
});
});
});

View File

@ -1,5 +1,6 @@
import { logger } from "@infra/logger";
import { getRequestId } from "@infra/request-context";
import type { JobChatMessage, JobChatRun } from "@shared/types";
import {
badRequest,
conflict,
@ -52,6 +53,14 @@ function chunkText(value: string, maxChunk = 60): string[] {
return chunks;
}
function isRunningRunUniqueConstraintError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return (
message.includes("idx_job_chat_runs_thread_running_unique") ||
message.includes("UNIQUE constraint failed: job_chat_runs.thread_id")
);
}
async function resolveLlmRuntimeSettings(): Promise<LlmRuntimeSettings> {
const overrides = await settingsRepo.getAllSettings();
@ -202,23 +211,41 @@ async function runAssistantReply(
const requestId = getRequestId() ?? "unknown";
const assistantMessage = await jobChatRepo.createMessage({
threadId: options.threadId,
jobId: options.jobId,
role: "assistant",
content: "",
status: "partial",
version: options.version ?? 1,
replacesMessageId: options.replaceMessageId ?? null,
});
let run: JobChatRun;
try {
run = await jobChatRepo.createRun({
threadId: options.threadId,
jobId: options.jobId,
model: llmConfig.model,
provider: llmConfig.provider,
requestId,
});
} catch (error) {
if (isRunningRunUniqueConstraintError(error)) {
throw conflict("A chat generation is already running for this thread");
}
throw error;
}
const run = await jobChatRepo.createRun({
threadId: options.threadId,
jobId: options.jobId,
model: llmConfig.model,
provider: llmConfig.provider,
requestId,
});
let assistantMessage: JobChatMessage;
try {
assistantMessage = await jobChatRepo.createMessage({
threadId: options.threadId,
jobId: options.jobId,
role: "assistant",
content: "",
status: "partial",
version: options.version ?? 1,
replacesMessageId: options.replaceMessageId ?? null,
});
} catch (error) {
await jobChatRepo.completeRun(run.id, {
status: "failed",
errorCode: "INTERNAL_ERROR",
errorMessage: "Failed to create assistant message",
});
throw error;
}
const controller = new AbortController();
abortControllers.set(run.id, controller);