From 2c8de6c92e5e0accaf82614c5c67304cb3693450 Mon Sep 17 00:00:00 2001 From: Shaheer Sarfaraz <53654735+DaKheera47@users.noreply.github.com> Date: Mon, 9 Feb 2026 21:38:04 +0000 Subject: [PATCH] pipeline completed successfully toast fires on every reload (#120) --- .../orchestrator/useOrchestratorData.test.ts | 182 ++++++++++++++++++ .../pages/orchestrator/useOrchestratorData.ts | 154 ++++++++++++--- 2 files changed, 307 insertions(+), 29 deletions(-) diff --git a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts index 5574535..6245519 100644 --- a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts +++ b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts @@ -327,6 +327,188 @@ describe("useOrchestratorData", () => { expect(api.getJobsRevision).toHaveBeenCalledTimes(2); }); + it("does not publish terminal on reload when status and SSE report the same completed run", async () => { + vi.mocked(api.getPipelineStatus).mockResolvedValue({ + isRunning: false, + lastRun: { + id: "run-1", + status: "completed", + startedAt: "2026-01-01T00:00:00.000Z", + completedAt: "2026-01-01T00:05:00.000Z", + errorMessage: null, + }, + } as any); + + const { result } = renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + expect(result.current.pipelineTerminalEvent).toBeNull(); + + const sse = MockEventSource.instances[0]; + act(() => { + sse.emitOpen(); + sse.emitMessage({ + step: "completed", + startedAt: "2026-01-01T00:00:00.000Z", + completedAt: "2026-01-01T00:05:00.000Z", + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(result.current.pipelineTerminalEvent).toBeNull(); + }); + + it("publishes one terminal event when active SSE transitions to completed", async () => { + const { result } = renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + expect(result.current.pipelineTerminalEvent).toBeNull(); + + const sse = MockEventSource.instances[0]; + act(() => { + sse.emitOpen(); + sse.emitMessage({ step: "crawling" }); + sse.emitMessage({ + step: "completed", + startedAt: "2026-02-01T10:00:00.000Z", + completedAt: "2026-02-01T10:05:00.000Z", + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(result.current.pipelineTerminalEvent).toEqual({ + status: "completed", + errorMessage: null, + token: 1, + }); + + act(() => { + sse.emitMessage({ + step: "completed", + startedAt: "2026-02-01T10:00:00.000Z", + completedAt: "2026-02-01T10:05:00.000Z", + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(result.current.pipelineTerminalEvent).toEqual({ + status: "completed", + errorMessage: null, + token: 1, + }); + }); + + it("publishes one terminal event when polling observes running then completed", async () => { + vi.useFakeTimers(); + vi.mocked(api.getPipelineStatus) + .mockResolvedValueOnce({ + isRunning: true, + lastRun: null, + } as any) + .mockResolvedValueOnce({ + isRunning: false, + lastRun: { + id: "run-polling", + status: "completed", + startedAt: "2026-02-01T11:00:00.000Z", + completedAt: "2026-02-01T11:05:00.000Z", + errorMessage: null, + }, + } as any); + + const { result } = renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + expect(result.current.pipelineTerminalEvent).toBeNull(); + + const sse = MockEventSource.instances[0]; + act(() => { + sse.emitOpen(); + sse.emitError(); + }); + + await act(async () => { + vi.advanceTimersByTime(30000); + await Promise.resolve(); + }); + + expect(result.current.pipelineTerminalEvent).toEqual({ + status: "completed", + errorMessage: null, + token: 1, + }); + }); + + it("dedupes the same terminal run reported by status and SSE", async () => { + vi.useFakeTimers(); + vi.mocked(api.getPipelineStatus) + .mockResolvedValueOnce({ + isRunning: true, + lastRun: null, + } as any) + .mockResolvedValueOnce({ + isRunning: false, + lastRun: { + id: "run-dedupe", + status: "completed", + startedAt: "2026-02-01T12:00:00.000Z", + completedAt: "2026-02-01T12:05:00.000Z", + errorMessage: null, + }, + } as any); + + const { result } = renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + + const sse = MockEventSource.instances[0]; + act(() => { + sse.emitOpen(); + sse.emitError(); + }); + + await act(async () => { + vi.advanceTimersByTime(30000); + await Promise.resolve(); + }); + expect(result.current.pipelineTerminalEvent).toEqual({ + status: "completed", + errorMessage: null, + token: 1, + }); + + act(() => { + sse.emitMessage({ + step: "completed", + startedAt: "2026-02-01T12:00:00.000Z", + completedAt: "2026-02-01T12:05:00.000Z", + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(result.current.pipelineTerminalEvent).toEqual({ + status: "completed", + errorMessage: null, + token: 1, + }); + }); + it("forces a jobs reload on terminal pipeline SSE step", async () => { vi.useFakeTimers(); vi.mocked(api.getJobs) diff --git a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts index bba8e9c..522654c 100644 --- a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts +++ b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts @@ -40,6 +40,12 @@ type PipelineTerminalEvent = { token: number; }; +type PipelineTerminalSnapshot = { + status: PipelineTerminalStatus; + errorMessage: string | null; + signature: string; +}; + const ACTIVE_PIPELINE_STEPS: ReadonlySet = new Set([ "crawling", "importing", @@ -53,6 +59,23 @@ const TERMINAL_PIPELINE_STEPS: ReadonlySet = new Set([ "failed", ]); +const buildTerminalSignature = ({ + status, + startedAt, + completedAt, + runId, +}: { + status: PipelineTerminalStatus; + startedAt?: string | null; + completedAt?: string | null; + runId?: string | null; +}) => { + if (startedAt || completedAt) { + return `${status}:${startedAt ?? ""}:${completedAt ?? ""}`; + } + return `${status}:run:${runId ?? "unknown"}`; +}; + export const useOrchestratorData = (selectedJobId: string | null) => { const [jobListItems, setJobListItems] = useState([]); const [selectedJob, setSelectedJob] = useState(null); @@ -70,18 +93,14 @@ export const useOrchestratorData = (selectedJobId: string | null) => { const selectedJobCacheRef = useRef>(new Map()); const lastRevisionRef = useRef(null); const lastSseRefreshAtRef = useRef(0); + const hasHydratedPipelineStateRef = useRef(false); + const seenRunningThisSessionRef = useRef(false); + const baselineTerminalSignatureRef = useRef(null); const lastTerminalSignatureRef = useRef(null); - const lastTerminalNotificationKeyRef = useRef(null); const terminalEventTokenRef = useRef(0); const publishPipelineTerminal = useCallback( - ( - status: PipelineTerminalStatus, - errorMessage: string | null, - dedupeKey: string, - ) => { - if (dedupeKey === lastTerminalNotificationKeyRef.current) return; - lastTerminalNotificationKeyRef.current = dedupeKey; + (status: PipelineTerminalStatus, errorMessage: string | null) => { terminalEventTokenRef.current += 1; setPipelineTerminalEvent({ status, @@ -92,6 +111,55 @@ export const useOrchestratorData = (selectedJobId: string | null) => { [], ); + const observePipelineState = useCallback( + (snapshot: { + isRunning: boolean; + terminal: PipelineTerminalSnapshot | null; + }) => { + setIsPipelineRunning(snapshot.isRunning); + if (snapshot.isRunning) { + seenRunningThisSessionRef.current = true; + } + + if (!snapshot.terminal) { + if (!hasHydratedPipelineStateRef.current) { + hasHydratedPipelineStateRef.current = true; + } + return; + } + + const signature = snapshot.terminal.signature; + const isFirstPipelineObservation = !hasHydratedPipelineStateRef.current; + + if (isFirstPipelineObservation) { + hasHydratedPipelineStateRef.current = true; + baselineTerminalSignatureRef.current = signature; + lastTerminalSignatureRef.current = signature; + return; + } + + if (signature === lastTerminalSignatureRef.current) { + return; + } + + lastTerminalSignatureRef.current = signature; + if (!seenRunningThisSessionRef.current) { + return; + } + + if (signature === baselineTerminalSignatureRef.current) { + return; + } + + seenRunningThisSessionRef.current = false; + publishPipelineTerminal( + snapshot.terminal.status, + snapshot.terminal.errorMessage, + ); + }, + [publishPipelineTerminal], + ); + const loadSelectedJob = useCallback( async (jobId: string) => { const seq = ++selectedJobRequestSeqRef.current; @@ -145,24 +213,39 @@ export const useOrchestratorData = (selectedJobId: string | null) => { const checkPipelineStatus = useCallback(async () => { try { const status = await api.getPipelineStatus(); - setIsPipelineRunning(status.isRunning); const terminalStatus = status.lastRun?.status; + + if (status.isRunning) { + observePipelineState({ isRunning: true, terminal: null }); + return; + } + if ( - status.isRunning || !terminalStatus || !TERMINAL_PIPELINE_STEPS.has(terminalStatus as PipelineProgressStep) ) { + observePipelineState({ isRunning: false, terminal: null }); return; } - publishPipelineTerminal( - terminalStatus as PipelineTerminalStatus, - status.lastRun?.errorMessage ?? null, - `status:${status.lastRun?.id ?? "unknown"}:${terminalStatus}:${status.lastRun?.completedAt ?? ""}`, - ); + + const terminal = terminalStatus as PipelineTerminalStatus; + observePipelineState({ + isRunning: false, + terminal: { + status: terminal, + errorMessage: status.lastRun?.errorMessage ?? null, + signature: buildTerminalSignature({ + status: terminal, + startedAt: status.lastRun?.startedAt ?? null, + completedAt: status.lastRun?.completedAt ?? null, + runId: status.lastRun?.id ?? null, + }), + }, + }); } catch { // Ignore errors } - }, [publishPipelineTerminal]); + }, [observePipelineState]); const checkForJobChanges = useCallback(async () => { if (isRefreshPaused || !isDocumentVisible()) return; @@ -186,6 +269,11 @@ export const useOrchestratorData = (selectedJobId: string | null) => { void checkPipelineStatus(); }, [checkPipelineStatus, loadJobs]); + useEffect(() => { + if (!isPipelineRunning) return; + seenRunningThisSessionRef.current = true; + }, [isPipelineRunning]); + useEffect(() => { const interval = setInterval(() => { if (!isDocumentVisible() || isRefreshPaused) return; @@ -257,9 +345,14 @@ export const useOrchestratorData = (selectedJobId: string | null) => { } const typedStep = step as PipelineProgressStep; - setIsPipelineRunning(ACTIVE_PIPELINE_STEPS.has(typedStep)); + const isActiveStep = ACTIVE_PIPELINE_STEPS.has(typedStep); + if (isActiveStep) { + observePipelineState({ isRunning: true, terminal: null }); + } else if (typedStep === "idle") { + observePipelineState({ isRunning: false, terminal: null }); + } - if (ACTIVE_PIPELINE_STEPS.has(typedStep)) { + if (isActiveStep) { const now = Date.now(); if (now - lastSseRefreshAtRef.current >= 2500) { lastSseRefreshAtRef.current = now; @@ -270,16 +363,19 @@ export const useOrchestratorData = (selectedJobId: string | null) => { if (TERMINAL_PIPELINE_STEPS.has(typedStep)) { const eventPayload = payload as PipelineProgressEvent; - const terminalSignature = `${typedStep}:${eventPayload.startedAt ?? ""}:${ - eventPayload.completedAt ?? "" - }`; - if (terminalSignature === lastTerminalSignatureRef.current) return; - lastTerminalSignatureRef.current = terminalSignature; - publishPipelineTerminal( - typedStep as PipelineTerminalStatus, - eventPayload.error ?? null, - `sse:${terminalSignature}`, - ); + const terminal = typedStep as PipelineTerminalStatus; + observePipelineState({ + isRunning: false, + terminal: { + status: terminal, + errorMessage: eventPayload.error ?? null, + signature: buildTerminalSignature({ + status: terminal, + startedAt: eventPayload.startedAt, + completedAt: eventPayload.completedAt, + }), + }, + }); void loadJobs(); } }; @@ -291,7 +387,7 @@ export const useOrchestratorData = (selectedJobId: string | null) => { return () => { eventSource.close(); }; - }, [checkForJobChanges, loadJobs, publishPipelineTerminal]); + }, [checkForJobChanges, loadJobs, observePipelineState]); useEffect(() => { if (isPipelineSseConnected) return;