pipeline completed successfully toast fires on every reload (#120)

This commit is contained in:
Shaheer Sarfaraz 2026-02-09 21:38:04 +00:00 committed by GitHub
parent d82c69b4b0
commit 2c8de6c92e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 307 additions and 29 deletions

View File

@ -327,6 +327,188 @@ describe("useOrchestratorData", () => {
expect(api.getJobsRevision).toHaveBeenCalledTimes(2);
});
it("does not publish terminal on reload when status and SSE report the same completed run", async () => {
vi.mocked(api.getPipelineStatus).mockResolvedValue({
isRunning: false,
lastRun: {
id: "run-1",
status: "completed",
startedAt: "2026-01-01T00:00:00.000Z",
completedAt: "2026-01-01T00:05:00.000Z",
errorMessage: null,
},
} as any);
const { result } = renderHook(() => useOrchestratorData(null));
await act(async () => {
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toBeNull();
const sse = MockEventSource.instances[0];
act(() => {
sse.emitOpen();
sse.emitMessage({
step: "completed",
startedAt: "2026-01-01T00:00:00.000Z",
completedAt: "2026-01-01T00:05:00.000Z",
});
});
await act(async () => {
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toBeNull();
});
it("publishes one terminal event when active SSE transitions to completed", async () => {
const { result } = renderHook(() => useOrchestratorData(null));
await act(async () => {
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toBeNull();
const sse = MockEventSource.instances[0];
act(() => {
sse.emitOpen();
sse.emitMessage({ step: "crawling" });
sse.emitMessage({
step: "completed",
startedAt: "2026-02-01T10:00:00.000Z",
completedAt: "2026-02-01T10:05:00.000Z",
});
});
await act(async () => {
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toEqual({
status: "completed",
errorMessage: null,
token: 1,
});
act(() => {
sse.emitMessage({
step: "completed",
startedAt: "2026-02-01T10:00:00.000Z",
completedAt: "2026-02-01T10:05:00.000Z",
});
});
await act(async () => {
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toEqual({
status: "completed",
errorMessage: null,
token: 1,
});
});
it("publishes one terminal event when polling observes running then completed", async () => {
vi.useFakeTimers();
vi.mocked(api.getPipelineStatus)
.mockResolvedValueOnce({
isRunning: true,
lastRun: null,
} as any)
.mockResolvedValueOnce({
isRunning: false,
lastRun: {
id: "run-polling",
status: "completed",
startedAt: "2026-02-01T11:00:00.000Z",
completedAt: "2026-02-01T11:05:00.000Z",
errorMessage: null,
},
} as any);
const { result } = renderHook(() => useOrchestratorData(null));
await act(async () => {
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toBeNull();
const sse = MockEventSource.instances[0];
act(() => {
sse.emitOpen();
sse.emitError();
});
await act(async () => {
vi.advanceTimersByTime(30000);
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toEqual({
status: "completed",
errorMessage: null,
token: 1,
});
});
it("dedupes the same terminal run reported by status and SSE", async () => {
vi.useFakeTimers();
vi.mocked(api.getPipelineStatus)
.mockResolvedValueOnce({
isRunning: true,
lastRun: null,
} as any)
.mockResolvedValueOnce({
isRunning: false,
lastRun: {
id: "run-dedupe",
status: "completed",
startedAt: "2026-02-01T12:00:00.000Z",
completedAt: "2026-02-01T12:05:00.000Z",
errorMessage: null,
},
} as any);
const { result } = renderHook(() => useOrchestratorData(null));
await act(async () => {
await Promise.resolve();
});
const sse = MockEventSource.instances[0];
act(() => {
sse.emitOpen();
sse.emitError();
});
await act(async () => {
vi.advanceTimersByTime(30000);
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toEqual({
status: "completed",
errorMessage: null,
token: 1,
});
act(() => {
sse.emitMessage({
step: "completed",
startedAt: "2026-02-01T12:00:00.000Z",
completedAt: "2026-02-01T12:05:00.000Z",
});
});
await act(async () => {
await Promise.resolve();
});
expect(result.current.pipelineTerminalEvent).toEqual({
status: "completed",
errorMessage: null,
token: 1,
});
});
it("forces a jobs reload on terminal pipeline SSE step", async () => {
vi.useFakeTimers();
vi.mocked(api.getJobs)

View File

@ -40,6 +40,12 @@ type PipelineTerminalEvent = {
token: number;
};
type PipelineTerminalSnapshot = {
status: PipelineTerminalStatus;
errorMessage: string | null;
signature: string;
};
const ACTIVE_PIPELINE_STEPS: ReadonlySet<PipelineProgressStep> = new Set([
"crawling",
"importing",
@ -53,6 +59,23 @@ const TERMINAL_PIPELINE_STEPS: ReadonlySet<PipelineProgressStep> = new Set([
"failed",
]);
const buildTerminalSignature = ({
status,
startedAt,
completedAt,
runId,
}: {
status: PipelineTerminalStatus;
startedAt?: string | null;
completedAt?: string | null;
runId?: string | null;
}) => {
if (startedAt || completedAt) {
return `${status}:${startedAt ?? ""}:${completedAt ?? ""}`;
}
return `${status}:run:${runId ?? "unknown"}`;
};
export const useOrchestratorData = (selectedJobId: string | null) => {
const [jobListItems, setJobListItems] = useState<JobListItem[]>([]);
const [selectedJob, setSelectedJob] = useState<Job | null>(null);
@ -70,18 +93,14 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
const selectedJobCacheRef = useRef<Map<string, Job>>(new Map());
const lastRevisionRef = useRef<string | null>(null);
const lastSseRefreshAtRef = useRef(0);
const hasHydratedPipelineStateRef = useRef(false);
const seenRunningThisSessionRef = useRef(false);
const baselineTerminalSignatureRef = useRef<string | null>(null);
const lastTerminalSignatureRef = useRef<string | null>(null);
const lastTerminalNotificationKeyRef = useRef<string | null>(null);
const terminalEventTokenRef = useRef(0);
const publishPipelineTerminal = useCallback(
(
status: PipelineTerminalStatus,
errorMessage: string | null,
dedupeKey: string,
) => {
if (dedupeKey === lastTerminalNotificationKeyRef.current) return;
lastTerminalNotificationKeyRef.current = dedupeKey;
(status: PipelineTerminalStatus, errorMessage: string | null) => {
terminalEventTokenRef.current += 1;
setPipelineTerminalEvent({
status,
@ -92,6 +111,55 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
[],
);
const observePipelineState = useCallback(
(snapshot: {
isRunning: boolean;
terminal: PipelineTerminalSnapshot | null;
}) => {
setIsPipelineRunning(snapshot.isRunning);
if (snapshot.isRunning) {
seenRunningThisSessionRef.current = true;
}
if (!snapshot.terminal) {
if (!hasHydratedPipelineStateRef.current) {
hasHydratedPipelineStateRef.current = true;
}
return;
}
const signature = snapshot.terminal.signature;
const isFirstPipelineObservation = !hasHydratedPipelineStateRef.current;
if (isFirstPipelineObservation) {
hasHydratedPipelineStateRef.current = true;
baselineTerminalSignatureRef.current = signature;
lastTerminalSignatureRef.current = signature;
return;
}
if (signature === lastTerminalSignatureRef.current) {
return;
}
lastTerminalSignatureRef.current = signature;
if (!seenRunningThisSessionRef.current) {
return;
}
if (signature === baselineTerminalSignatureRef.current) {
return;
}
seenRunningThisSessionRef.current = false;
publishPipelineTerminal(
snapshot.terminal.status,
snapshot.terminal.errorMessage,
);
},
[publishPipelineTerminal],
);
const loadSelectedJob = useCallback(
async (jobId: string) => {
const seq = ++selectedJobRequestSeqRef.current;
@ -145,24 +213,39 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
const checkPipelineStatus = useCallback(async () => {
try {
const status = await api.getPipelineStatus();
setIsPipelineRunning(status.isRunning);
const terminalStatus = status.lastRun?.status;
if (status.isRunning) {
observePipelineState({ isRunning: true, terminal: null });
return;
}
if (
status.isRunning ||
!terminalStatus ||
!TERMINAL_PIPELINE_STEPS.has(terminalStatus as PipelineProgressStep)
) {
observePipelineState({ isRunning: false, terminal: null });
return;
}
publishPipelineTerminal(
terminalStatus as PipelineTerminalStatus,
status.lastRun?.errorMessage ?? null,
`status:${status.lastRun?.id ?? "unknown"}:${terminalStatus}:${status.lastRun?.completedAt ?? ""}`,
);
const terminal = terminalStatus as PipelineTerminalStatus;
observePipelineState({
isRunning: false,
terminal: {
status: terminal,
errorMessage: status.lastRun?.errorMessage ?? null,
signature: buildTerminalSignature({
status: terminal,
startedAt: status.lastRun?.startedAt ?? null,
completedAt: status.lastRun?.completedAt ?? null,
runId: status.lastRun?.id ?? null,
}),
},
});
} catch {
// Ignore errors
}
}, [publishPipelineTerminal]);
}, [observePipelineState]);
const checkForJobChanges = useCallback(async () => {
if (isRefreshPaused || !isDocumentVisible()) return;
@ -186,6 +269,11 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
void checkPipelineStatus();
}, [checkPipelineStatus, loadJobs]);
useEffect(() => {
if (!isPipelineRunning) return;
seenRunningThisSessionRef.current = true;
}, [isPipelineRunning]);
useEffect(() => {
const interval = setInterval(() => {
if (!isDocumentVisible() || isRefreshPaused) return;
@ -257,9 +345,14 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
}
const typedStep = step as PipelineProgressStep;
setIsPipelineRunning(ACTIVE_PIPELINE_STEPS.has(typedStep));
const isActiveStep = ACTIVE_PIPELINE_STEPS.has(typedStep);
if (isActiveStep) {
observePipelineState({ isRunning: true, terminal: null });
} else if (typedStep === "idle") {
observePipelineState({ isRunning: false, terminal: null });
}
if (ACTIVE_PIPELINE_STEPS.has(typedStep)) {
if (isActiveStep) {
const now = Date.now();
if (now - lastSseRefreshAtRef.current >= 2500) {
lastSseRefreshAtRef.current = now;
@ -270,16 +363,19 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
if (TERMINAL_PIPELINE_STEPS.has(typedStep)) {
const eventPayload = payload as PipelineProgressEvent;
const terminalSignature = `${typedStep}:${eventPayload.startedAt ?? ""}:${
eventPayload.completedAt ?? ""
}`;
if (terminalSignature === lastTerminalSignatureRef.current) return;
lastTerminalSignatureRef.current = terminalSignature;
publishPipelineTerminal(
typedStep as PipelineTerminalStatus,
eventPayload.error ?? null,
`sse:${terminalSignature}`,
);
const terminal = typedStep as PipelineTerminalStatus;
observePipelineState({
isRunning: false,
terminal: {
status: terminal,
errorMessage: eventPayload.error ?? null,
signature: buildTerminalSignature({
status: terminal,
startedAt: eventPayload.startedAt,
completedAt: eventPayload.completedAt,
}),
},
});
void loadJobs();
}
};
@ -291,7 +387,7 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
return () => {
eventSource.close();
};
}, [checkForJobChanges, loadJobs, publishPipelineTerminal]);
}, [checkForJobChanges, loadJobs, observePipelineState]);
useEffect(() => {
if (isPipelineSseConnected) return;