From 1e0767a4ed04c75af9c3c98ae89e4f576c166313 Mon Sep 17 00:00:00 2001 From: Shaheer Sarfaraz <53654735+DaKheera47@users.noreply.github.com> Date: Fri, 20 Feb 2026 17:20:06 +0000 Subject: [PATCH] Avoid reprocessing previously ingested Gmail messages (#213) * Avoid reprocessing previously ingested Gmail messages * Avoid duplicate message lookup in Gmail sync upsert path --- .../repositories/post-application-messages.ts | 15 +++-- .../ingestion/gmail-sync.idempotency.test.ts | 63 +++++++++++++++---- .../post-application/ingestion/gmail-sync.ts | 59 ++++++++++++++++- 3 files changed, 119 insertions(+), 18 deletions(-) diff --git a/orchestrator/src/server/repositories/post-application-messages.ts b/orchestrator/src/server/repositories/post-application-messages.ts index 8fafffc..18bc403 100644 --- a/orchestrator/src/server/repositories/post-application-messages.ts +++ b/orchestrator/src/server/repositories/post-application-messages.ts @@ -44,6 +44,7 @@ type UpsertPostApplicationMessageInput = { decidedBy?: string | null; errorCode?: string | null; errorMessage?: string | null; + existingMessage?: PostApplicationMessage | null; }; type UpdatePostApplicationMessageSuggestionInput = { @@ -123,7 +124,7 @@ function mapRowToPostApplicationMessage( }; } -async function getPostApplicationMessageByExternalId( +export async function getPostApplicationMessageByExternalId( provider: PostApplicationProvider, accountKey: string, externalMessageId: string, @@ -163,11 +164,13 @@ export async function upsertPostApplicationMessage( suggestedStageTarget: stageTarget, }; const nowIso = new Date().toISOString(); - const existing = await getPostApplicationMessageByExternalId( - input.provider, - input.accountKey, - input.externalMessageId, - ); + const existing = + input.existingMessage ?? + (await getPostApplicationMessageByExternalId( + input.provider, + input.accountKey, + input.externalMessageId, + )); if (existing) { const nextProcessingStatus = isTerminalProcessingStatus( diff --git a/orchestrator/src/server/services/post-application/ingestion/gmail-sync.idempotency.test.ts b/orchestrator/src/server/services/post-application/ingestion/gmail-sync.idempotency.test.ts index 12fd424..d81b820 100644 --- a/orchestrator/src/server/services/post-application/ingestion/gmail-sync.idempotency.test.ts +++ b/orchestrator/src/server/services/post-application/ingestion/gmail-sync.idempotency.test.ts @@ -40,8 +40,10 @@ vi.mock("@server/repositories/jobs", () => ({ ]), })); +const getPostApplicationMessageByExternalId = vi.fn(); const upsertPostApplicationMessage = vi.fn(); vi.mock("@server/repositories/post-application-messages", () => ({ + getPostApplicationMessageByExternalId, upsertPostApplicationMessage, })); @@ -54,20 +56,22 @@ vi.mock("@server/repositories/settings", () => ({ getSetting: vi.fn().mockResolvedValue(null), })); +const llmCallJson = vi.fn().mockResolvedValue({ + success: true, + data: { + bestMatchIndex: 1, + confidence: 99, + stageTarget: "assessment", + isRelevant: true, + stageEventPayload: null, + reason: "matches", + }, +}); + vi.mock("@server/services/llm-service", () => ({ LlmService: class { callJson() { - return Promise.resolve({ - success: true, - data: { - bestMatchIndex: 1, - confidence: 99, - stageTarget: "assessment", - isRelevant: true, - stageEventPayload: null, - reason: "matches", - }, - }); + return llmCallJson(); } }, })); @@ -83,6 +87,7 @@ function makeJsonResponse(body: unknown): Response { describe("gmail sync auto-log idempotency", () => { beforeEach(() => { vi.clearAllMocks(); + llmCallJson.mockClear(); vi.stubGlobal( "fetch", @@ -129,6 +134,41 @@ describe("gmail sync auto-log idempotency", () => { it("creates auto stage event only on first auto_linked transition", async () => { const { runGmailIngestionSync } = await import("./gmail-sync"); + getPostApplicationMessageByExternalId + .mockResolvedValueOnce(null) + .mockResolvedValueOnce({ + id: "post-msg-1", + provider: "gmail", + accountKey: "default", + integrationId: "integration-1", + syncRunId: "sync-run-1", + externalMessageId: "message-1", + externalThreadId: "thread-1", + fromAddress: "jobs@example.com", + fromDomain: "example.com", + senderName: "Recruiter", + subject: "Interview update", + receivedAt: Date.now(), + snippet: "snippet", + classificationLabel: "assessment", + classificationConfidence: 0.99, + classificationPayload: { method: "smart_router", reason: "matches" }, + relevanceLlmScore: 99, + relevanceDecision: "relevant", + matchedJobId: "job-1", + matchConfidence: 99, + stageTarget: "assessment", + messageType: "interview", + stageEventPayload: null, + processingStatus: "auto_linked", + decidedAt: null, + decidedBy: null, + errorCode: null, + errorMessage: null, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + upsertPostApplicationMessage .mockResolvedValueOnce({ message: { @@ -160,5 +200,6 @@ describe("gmail sync auto-log idempotency", () => { expect(upsertPostApplicationMessage).toHaveBeenCalledTimes(2); expect(transitionStage).toHaveBeenCalledTimes(1); + expect(llmCallJson).toHaveBeenCalledTimes(1); }); }); diff --git a/orchestrator/src/server/services/post-application/ingestion/gmail-sync.ts b/orchestrator/src/server/services/post-application/ingestion/gmail-sync.ts index 03b2ec5..7d9e716 100644 --- a/orchestrator/src/server/services/post-application/ingestion/gmail-sync.ts +++ b/orchestrator/src/server/services/post-application/ingestion/gmail-sync.ts @@ -6,7 +6,10 @@ import { updatePostApplicationIntegrationSyncState, upsertConnectedPostApplicationIntegration, } from "@server/repositories/post-application-integrations"; -import { upsertPostApplicationMessage } from "@server/repositories/post-application-messages"; +import { + getPostApplicationMessageByExternalId, + upsertPostApplicationMessage, +} from "@server/repositories/post-application-messages"; import { completePostApplicationSyncRun, startPostApplicationSyncRun, @@ -857,6 +860,60 @@ export async function runGmailIngestionSync(args: { const date = headerValue(metadata.headers, "Date"); const { fromAddress, fromDomain, senderName } = parseFromHeader(from); const receivedAt = parseReceivedAt(date); + const existingMessage = await getPostApplicationMessageByExternalId( + "gmail", + args.accountKey, + metadata.id, + ); + + if (existingMessage) { + const { message: savedMessage, autoLinkTransitioned } = + await upsertPostApplicationMessage({ + provider: "gmail", + accountKey: args.accountKey, + integrationId: integration.id, + syncRunId: syncRun.id, + externalMessageId: metadata.id, + externalThreadId: metadata.threadId, + fromAddress, + fromDomain, + senderName, + subject, + receivedAt, + snippet: metadata.snippet, + classificationLabel: existingMessage.classificationLabel, + classificationConfidence: + existingMessage.classificationConfidence, + classificationPayload: existingMessage.classificationPayload, + relevanceLlmScore: existingMessage.relevanceLlmScore, + relevanceDecision: existingMessage.relevanceDecision, + matchedJobId: existingMessage.matchedJobId, + matchConfidence: existingMessage.matchConfidence, + stageTarget: existingMessage.stageTarget, + messageType: existingMessage.messageType, + stageEventPayload: existingMessage.stageEventPayload, + processingStatus: existingMessage.processingStatus, + existingMessage, + }); + + if (savedMessage.processingStatus !== "ignored") { + relevant += 1; + } + classified += 1; + if (savedMessage.matchedJobId) { + matched += 1; + } + + if (autoLinkTransitioned && savedMessage.matchedJobId) { + await createAutoStageEvent({ + jobId: savedMessage.matchedJobId, + stageTarget: savedMessage.stageTarget ?? "no_change", + receivedAt: savedMessage.receivedAt, + note: "Auto-created from Smart Router.", + }); + } + return; + } const fullMessage = await getMessageFull(accessToken, message.id); const body = extractBodyText(fullMessage.payload);