From 032626bd7d1a4847674da099abb12eb8ac91e28f Mon Sep 17 00:00:00 2001 From: Shaheer Sarfaraz <53654735+DaKheera47@users.noreply.github.com> Date: Wed, 18 Feb 2026 15:54:39 +0000 Subject: [PATCH] Fix #162: real-time bulk action streaming progress (#187) * initial * refactor: centralize SSE plumbing for client and server * docs: add centralized SSE usage standards to agents guide * use sse to stream actions to the client * ui: align bulk progress toast with default sonner style * ui: remove hide action from bulk progress toast * full width progress bar * fix(stream): track client disconnect and writability * fix(stream): stop bulk loop when SSE client disconnects * fix(stream): avoid writing error/end to closed SSE response * fix(stream): gate started/progress frames on writable SSE socket * types(api): narrow SSE stream payload input contract * refactor(ui): share clamp helper for bulk progress * fix(stream): add heartbeat to bulk action SSE route * feat(stream): include completed count in bulk completion event * fix(client-sse): separate parse vs handler errors and cancel reader --- AGENTS.md | 8 + .../src/client/api/client.stream.test.ts | 43 +++++ orchestrator/src/client/api/client.ts | 75 +++++--- .../client/components/PipelineProgress.tsx | 35 ++-- orchestrator/src/client/lib/sse.ts | 32 ++++ .../orchestrator/BulkActionProgressToast.tsx | 29 +++ .../orchestrator/useBulkJobSelection.test.ts | 137 +++++++++++--- ...obSelection.ts => useBulkJobSelection.tsx} | 117 +++++++++++- .../pages/orchestrator/useOrchestratorData.ts | 119 ++++++------ .../src/client/pages/orchestrator/utils.ts | 3 + orchestrator/src/components/ui/sonner.tsx | 2 + .../src/server/api/routes/ghostwriter.ts | 91 +++++---- .../src/server/api/routes/jobs.test.ts | 89 +++++++++ orchestrator/src/server/api/routes/jobs.ts | 174 ++++++++++++++++++ .../src/server/api/routes/pipeline.ts | 15 +- orchestrator/src/server/infra/sse.ts | 45 +++++ shared/src/types.ts | 37 ++++ 17 files changed, 854 insertions(+), 197 deletions(-) create mode 100644 orchestrator/src/client/api/client.stream.test.ts create mode 100644 orchestrator/src/client/lib/sse.ts create mode 100644 orchestrator/src/client/pages/orchestrator/BulkActionProgressToast.tsx rename orchestrator/src/client/pages/orchestrator/{useBulkJobSelection.ts => useBulkJobSelection.tsx} (57%) create mode 100644 orchestrator/src/server/infra/sse.ts diff --git a/AGENTS.md b/AGENTS.md index b15b646..5082cfb 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -36,6 +36,14 @@ Use consistent status/code mapping: - Log structured objects, not free-form dumps. - Include useful context fields (e.g. `requestId`, `pipelineRunId`, `jobId`, `route`, `status`). +## SSE Standards + +- Use centralized SSE helpers by default. +- Server: use `orchestrator/src/server/infra/sse.ts` for setup, data writes, comments, and heartbeats. +- Client (`EventSource`): use `orchestrator/src/client/lib/sse.ts` for subscription/open/message/error plumbing. +- Do not duplicate raw SSE setup (`Content-Type`, `Connection`, heartbeat loops, or ad-hoc `JSON.parse` event parsing) when these helpers apply. +- Keep feature payload types domain-local (pipeline, ghostwriter, bulk actions), but reuse shared transport plumbing. + ## Redaction and Sanitization - Always sanitize objects before logging or returning in error `details`. diff --git a/orchestrator/src/client/api/client.stream.test.ts b/orchestrator/src/client/api/client.stream.test.ts new file mode 100644 index 0000000..f03f354 --- /dev/null +++ b/orchestrator/src/client/api/client.stream.test.ts @@ -0,0 +1,43 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import * as api from "./client"; + +describe("API client SSE streaming", () => { + beforeEach(() => { + vi.restoreAllMocks(); + api.__resetApiClientAuthForTests(); + }); + + it("propagates handler errors and cancels the stream reader", async () => { + const encoder = new TextEncoder(); + const cancelSpy = vi.fn(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode('data: {"type":"started","requestId":"req-1"}\n\n'), + ); + }, + cancel() { + cancelSpy(); + }, + }); + + vi.spyOn(global, "fetch").mockResolvedValue({ + ok: true, + status: 200, + body: stream, + } as Response); + + await expect( + api.streamBulkJobAction( + { action: "skip", jobIds: ["job-1"] }, + { + onEvent: () => { + throw new Error("handler exploded"); + }, + }, + ), + ).rejects.toThrow("handler exploded"); + + expect(cancelSpy).toHaveBeenCalledTimes(1); + }); +}); diff --git a/orchestrator/src/client/api/client.ts b/orchestrator/src/client/api/client.ts index 049042e..a66854d 100644 --- a/orchestrator/src/client/api/client.ts +++ b/orchestrator/src/client/api/client.ts @@ -11,6 +11,7 @@ import type { BackupInfo, BulkJobActionRequest, BulkJobActionResponse, + BulkJobActionStreamEvent, BulkPostApplicationAction, BulkPostApplicationActionResponse, DemoInfoResponse, @@ -79,6 +80,11 @@ type LegacyApiResponse = details?: unknown; }; +type StreamSseInput = + | BulkJobActionRequest + | { content: string; stream: true } + | { stream: true }; + export type BasicAuthCredentials = { username: string; password: string; @@ -393,11 +399,11 @@ export async function updateJob( }); } -async function streamSseEvents( +async function streamSseEvents( endpoint: string, - input: Record, + input: StreamSseInput, handlers: { - onEvent: (event: JobChatStreamEvent) => void; + onEvent: (event: TEvent) => void; signal?: AbortSignal; }, ): Promise { @@ -439,29 +445,40 @@ async function streamSseEvents( const reader = response.body.getReader(); let buffer = ""; - while (true) { - const { done, value } = await reader.read(); - if (done) break; - buffer += decoder.decode(value, { stream: true }); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); - let separatorIndex = buffer.indexOf("\n\n"); - while (separatorIndex !== -1) { - const frame = buffer.slice(0, separatorIndex); - buffer = buffer.slice(separatorIndex + 2); - const dataLines = frame - .split("\n") - .filter((line) => line.startsWith("data:")) - .map((line) => line.slice(5).trim()) - .filter(Boolean); + let separatorIndex = buffer.indexOf("\n\n"); + while (separatorIndex !== -1) { + const frame = buffer.slice(0, separatorIndex); + buffer = buffer.slice(separatorIndex + 2); + const dataLines = frame + .split("\n") + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trim()) + .filter(Boolean); - for (const line of dataLines) { - try { - handlers.onEvent(JSON.parse(line) as JobChatStreamEvent); - } catch { - // Ignore malformed events to keep stream resilient + for (const line of dataLines) { + let parsedEvent: TEvent; + try { + parsedEvent = JSON.parse(line) as TEvent; + } catch { + // Ignore malformed events to keep stream resilient + continue; + } + handlers.onEvent(parsedEvent); } + separatorIndex = buffer.indexOf("\n\n"); } - separatorIndex = buffer.indexOf("\n\n"); + } + } finally { + try { + await reader.cancel(); + } catch { + // Ignore cancellation errors when stream is already closed } } } @@ -710,6 +727,20 @@ export async function bulkJobAction( }); } +export async function streamBulkJobAction( + input: BulkJobActionRequest, + handlers: { + onEvent: (event: BulkJobActionStreamEvent) => void; + signal?: AbortSignal; + }, +): Promise { + return streamSseEvents( + "/jobs/bulk-actions/stream", + input, + handlers, + ); +} + export async function getJobStageEvents(id: string): Promise { return fetchApi(`/jobs/${id}/events?t=${Date.now()}`); } diff --git a/orchestrator/src/client/components/PipelineProgress.tsx b/orchestrator/src/client/components/PipelineProgress.tsx index 1c1f40e..42198e2 100644 --- a/orchestrator/src/client/components/PipelineProgress.tsx +++ b/orchestrator/src/client/components/PipelineProgress.tsx @@ -5,7 +5,7 @@ import { Loader2 } from "lucide-react"; import type React from "react"; import { useEffect, useMemo, useState } from "react"; - +import { subscribeToEventSource } from "@/client/lib/sse"; import { Badge } from "@/components/ui/badge"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Progress } from "@/components/ui/progress"; @@ -161,26 +161,23 @@ export const PipelineProgress: React.FC = ({ return; } - const eventSource = new EventSource("/api/pipeline/progress"); - - eventSource.onopen = () => { - setIsConnected(true); - }; - - eventSource.onmessage = (event) => { - try { - setProgress(JSON.parse(event.data)); - } catch { - // Ignore parse errors - } - }; - - eventSource.onerror = () => { - setIsConnected(false); - }; + const unsubscribe = subscribeToEventSource( + "/api/pipeline/progress", + { + onOpen: () => { + setIsConnected(true); + }, + onMessage: (payload) => { + setProgress(payload); + }, + onError: () => { + setIsConnected(false); + }, + }, + ); return () => { - eventSource.close(); + unsubscribe(); setIsConnected(false); }; }, [isRunning]); diff --git a/orchestrator/src/client/lib/sse.ts b/orchestrator/src/client/lib/sse.ts new file mode 100644 index 0000000..27b50c9 --- /dev/null +++ b/orchestrator/src/client/lib/sse.ts @@ -0,0 +1,32 @@ +interface EventSourceSubscriptionHandlers { + onOpen?: () => void; + onMessage: (payload: T) => void; + onError?: () => void; +} + +export function subscribeToEventSource( + url: string, + handlers: EventSourceSubscriptionHandlers, +): () => void { + const eventSource = new EventSource(url); + + eventSource.onopen = () => { + handlers.onOpen?.(); + }; + + eventSource.onmessage = (event) => { + try { + handlers.onMessage(JSON.parse(event.data) as T); + } catch { + // Ignore malformed events to keep stream resilient. + } + }; + + eventSource.onerror = () => { + handlers.onError?.(); + }; + + return () => { + eventSource.close(); + }; +} diff --git a/orchestrator/src/client/pages/orchestrator/BulkActionProgressToast.tsx b/orchestrator/src/client/pages/orchestrator/BulkActionProgressToast.tsx new file mode 100644 index 0000000..23f3426 --- /dev/null +++ b/orchestrator/src/client/pages/orchestrator/BulkActionProgressToast.tsx @@ -0,0 +1,29 @@ +import { Progress } from "@/components/ui/progress"; +import { clampNumber } from "./utils"; + +interface BulkActionProgressToastProps { + completed: number; + requested: number; + succeeded: number; + failed: number; +} + +export function BulkActionProgressToast({ + completed, + requested, + succeeded, + failed, +}: BulkActionProgressToastProps) { + const safeRequested = Math.max(requested, 1); + const safeCompleted = clampNumber(completed, 0, safeRequested); + const progressValue = Math.round((safeCompleted / safeRequested) * 100); + + return ( +
+ +

+ {succeeded} succeeded, {failed} failed +

+
+ ); +} diff --git a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.test.ts b/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.test.ts index ec43a22..a845ebd 100644 --- a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.test.ts +++ b/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.test.ts @@ -1,5 +1,8 @@ import { createJob } from "@shared/testing/factories.js"; -import type { BulkJobActionResponse } from "@shared/types.js"; +import type { + BulkJobActionResponse, + BulkJobActionStreamEvent, +} from "@shared/types.js"; import { act, renderHook, waitFor } from "@testing-library/react"; import { toast } from "sonner"; import { beforeEach, describe, expect, it, vi } from "vitest"; @@ -7,11 +10,13 @@ import * as api from "../../api"; import { useBulkJobSelection } from "./useBulkJobSelection"; vi.mock("../../api", () => ({ - bulkJobAction: vi.fn(), + streamBulkJobAction: vi.fn(), })); vi.mock("sonner", () => ({ toast: { + loading: vi.fn(), + dismiss: vi.fn(), error: vi.fn(), success: vi.fn(), }, @@ -30,9 +35,75 @@ const deferred = (): Deferred => { return { promise, resolve }; }; +const asStreamEvents = ( + response: BulkJobActionResponse, + requestId = "req-bulk", +): BulkJobActionStreamEvent[] => { + const events: BulkJobActionStreamEvent[] = [ + { + type: "started", + action: response.action, + requested: response.requested, + completed: 0, + succeeded: 0, + failed: 0, + requestId, + }, + ]; + + let succeeded = 0; + let failed = 0; + response.results.forEach((result, index) => { + if (result.ok) succeeded += 1; + else failed += 1; + events.push({ + type: "progress", + action: response.action, + requested: response.requested, + completed: index + 1, + succeeded, + failed, + result, + requestId, + }); + }); + + events.push({ + type: "completed", + action: response.action, + requested: response.requested, + completed: response.requested, + succeeded: response.succeeded, + failed: response.failed, + results: response.results, + requestId, + }); + + return events; +}; + +const mockStreamBulkAction = ( + response: BulkJobActionResponse, + waitForRelease?: Promise, +) => { + vi.mocked(api.streamBulkJobAction).mockImplementation( + async (_input, handlers) => { + for (const event of asStreamEvents(response)) { + if (event.type === "started") handlers.onEvent(event); + } + if (waitForRelease) await waitForRelease; + for (const event of asStreamEvents(response)) { + if (event.type !== "started") handlers.onEvent(event); + } + return; + }, + ); +}; + describe("useBulkJobSelection", () => { beforeEach(() => { vi.clearAllMocks(); + vi.mocked(toast.loading).mockReturnValue("bulk-progress-toast"); }); it("caps select-all to the API max", () => { @@ -78,7 +149,7 @@ describe("useBulkJobSelection", () => { await result.current.runBulkAction("skip"); }); - expect(api.bulkJobAction).not.toHaveBeenCalled(); + expect(api.streamBulkJobAction).not.toHaveBeenCalled(); }); it("reconciles failures with selection changes made during in-flight action", async () => { @@ -88,8 +159,28 @@ describe("useBulkJobSelection", () => { createJob({ id: "job-3", status: "discovered" }), ]; const loadJobs = vi.fn().mockResolvedValue(undefined); - const pending = deferred(); - vi.mocked(api.bulkJobAction).mockImplementation(() => pending.promise); + const release = deferred(); + mockStreamBulkAction( + { + action: "skip", + requested: 2, + succeeded: 1, + failed: 1, + results: [ + { + jobId: "job-1", + ok: true, + job: createJob({ id: "job-1", status: "skipped" }), + }, + { + jobId: "job-2", + ok: false, + error: { code: "INVALID_REQUEST", message: "bad status" }, + }, + ], + }, + release.promise, + ); const { result } = renderHook(() => useBulkJobSelection({ @@ -109,36 +200,24 @@ describe("useBulkJobSelection", () => { runPromise = result.current.runBulkAction("skip"); }); + expect(toast.loading).toHaveBeenCalled(); + const firstLoadingCall = vi.mocked(toast.loading).mock.calls[0]; + expect(firstLoadingCall[1]).not.toHaveProperty("cancel"); + act(() => { result.current.toggleSelectJob("job-2"); result.current.toggleSelectJob("job-3"); }); await act(async () => { - pending.resolve({ - action: "skip", - requested: 2, - succeeded: 1, - failed: 1, - results: [ - { - jobId: "job-1", - ok: true, - job: createJob({ id: "job-1", status: "skipped" }), - }, - { - jobId: "job-2", - ok: false, - error: { code: "INVALID_REQUEST", message: "bad status" }, - }, - ], - }); + release.resolve(); await runPromise; }); await waitFor(() => { expect(Array.from(result.current.selectedJobIds)).toEqual(["job-3"]); }); + expect(toast.dismiss).toHaveBeenCalled(); }); it("runs bulk rescore and reports success copy", async () => { @@ -147,7 +226,7 @@ describe("useBulkJobSelection", () => { createJob({ id: "job-2", status: "ready" }), ]; const loadJobs = vi.fn().mockResolvedValue(undefined); - vi.mocked(api.bulkJobAction).mockResolvedValue({ + mockStreamBulkAction({ action: "rescore", requested: 2, succeeded: 2, @@ -183,10 +262,12 @@ describe("useBulkJobSelection", () => { await result.current.runBulkAction("rescore"); }); - expect(api.bulkJobAction).toHaveBeenCalledWith({ - action: "rescore", - jobIds: ["job-1", "job-2"], - }); + expect(api.streamBulkJobAction).toHaveBeenCalledWith( + { action: "rescore", jobIds: ["job-1", "job-2"] }, + expect.objectContaining({ + onEvent: expect.any(Function), + }), + ); expect(toast.success).toHaveBeenCalledWith("2 matches recalculated"); }); }); diff --git a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.ts b/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.tsx similarity index 57% rename from orchestrator/src/client/pages/orchestrator/useBulkJobSelection.ts rename to orchestrator/src/client/pages/orchestrator/useBulkJobSelection.tsx index 42a2401..c3d9894 100644 --- a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.ts +++ b/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.tsx @@ -1,7 +1,12 @@ -import type { BulkJobAction, JobListItem } from "@shared/types.js"; +import type { + BulkJobAction, + BulkJobActionResponse, + JobListItem, +} from "@shared/types.js"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import * as api from "../../api"; +import { BulkActionProgressToast } from "./BulkActionProgressToast"; import { canBulkMoveToReady, canBulkRescore, @@ -9,9 +14,16 @@ import { getFailedJobIds, } from "./bulkActions"; import type { FilterTab } from "./constants"; +import { clampNumber } from "./utils"; const MAX_BULK_ACTION_JOB_IDS = 100; +const bulkActionLabel: Record = { + move_to_ready: "Moving jobs to Ready...", + skip: "Skipping selected jobs...", + rescore: "Calculating match scores...", +}; + interface UseBulkJobSelectionArgs { activeJobs: JobListItem[]; activeTab: FilterTab; @@ -110,17 +122,105 @@ export function useBulkJobSelection({ } const selectedAtStartSet = new Set(selectedAtStart); + let progressToastId: string | number | undefined; + let finalResult: BulkJobActionResponse | null = null; + let streamError: string | null = null; + let latestProgress = { + requested: selectedAtStart.length, + completed: 0, + succeeded: 0, + failed: 0, + }; + + const getProgressTitle = () => { + const safeRequested = Math.max(latestProgress.requested, 1); + const safeCompleted = clampNumber( + latestProgress.completed, + 0, + safeRequested, + ); + return `${safeCompleted}/${safeRequested} ${bulkActionLabel[action]}`; + }; + + const upsertProgressToast = () => { + progressToastId = toast.loading(getProgressTitle(), { + description: ( + + ), + ...(progressToastId !== undefined ? { id: progressToastId } : {}), + duration: Number.POSITIVE_INFINITY, + }); + }; + try { setBulkActionInFlight(action); - if (action === "move_to_ready") { - toast.message("Moving jobs to Ready..."); + upsertProgressToast(); + await api.streamBulkJobAction( + { + action, + jobIds: selectedAtStart, + }, + { + onEvent: (event) => { + if (event.type === "error") { + streamError = event.message || "Failed to run bulk action"; + return; + } + + if (event.type === "started") { + latestProgress = { + requested: event.requested, + completed: event.completed, + succeeded: event.succeeded, + failed: event.failed, + }; + upsertProgressToast(); + return; + } + + if (event.type === "progress") { + latestProgress = { + requested: event.requested, + completed: event.completed, + succeeded: event.succeeded, + failed: event.failed, + }; + upsertProgressToast(); + return; + } + + latestProgress = { + requested: event.requested, + completed: event.completed, + succeeded: event.succeeded, + failed: event.failed, + }; + finalResult = { + action: event.action, + requested: event.requested, + succeeded: event.succeeded, + failed: event.failed, + results: event.results, + }; + upsertProgressToast(); + }, + }, + ); + + if (streamError) { + throw new Error(streamError); } - const result = await api.bulkJobAction({ - action, - jobIds: selectedAtStart, - }); + if (!finalResult) { + throw new Error("Bulk action stream ended before completion"); + } + const result = finalResult as BulkJobActionResponse; const failedIds = getFailedJobIds(result); const successLabel = action === "skip" @@ -157,6 +257,9 @@ export function useBulkJobSelection({ error instanceof Error ? error.message : "Failed to run bulk action"; toast.error(message); } finally { + if (progressToastId !== undefined) { + toast.dismiss(progressToastId); + } setBulkActionInFlight(null); } }, diff --git a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts index 02b8003..5e0de52 100644 --- a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts +++ b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts @@ -2,6 +2,7 @@ import type { Job, JobListItem, JobStatus } from "@shared/types"; import { useCallback, useEffect, useRef, useState } from "react"; import { toast } from "sonner"; import * as api from "../../api"; +import { subscribeToEventSource } from "../../lib/sse"; const initialStats: Record = { discovered: 0, @@ -320,73 +321,67 @@ export const useOrchestratorData = (selectedJobId: string | null) => { useEffect(() => { if (typeof EventSource === "undefined") return; - const eventSource = new EventSource("/api/pipeline/progress"); + const unsubscribe = subscribeToEventSource( + "/api/pipeline/progress", + { + onOpen: () => { + setIsPipelineSseConnected(true); + }, + onMessage: (payload) => { + if (!payload || typeof payload !== "object") return; + const step = (payload as { step?: unknown }).step; + if (typeof step !== "string") return; + if ( + !ACTIVE_PIPELINE_STEPS.has(step as PipelineProgressStep) && + !TERMINAL_PIPELINE_STEPS.has(step as PipelineProgressStep) && + step !== "idle" + ) { + return; + } - eventSource.onopen = () => { - setIsPipelineSseConnected(true); - }; + const typedStep = step as PipelineProgressStep; + const isActiveStep = ACTIVE_PIPELINE_STEPS.has(typedStep); + if (isActiveStep) { + observePipelineState({ isRunning: true, terminal: null }); + } else if (typedStep === "idle") { + observePipelineState({ isRunning: false, terminal: null }); + } - eventSource.onmessage = (event) => { - let payload: unknown; - try { - payload = JSON.parse(event.data); - } catch { - return; - } + if (isActiveStep) { + const now = Date.now(); + if (now - lastSseRefreshAtRef.current >= 2500) { + lastSseRefreshAtRef.current = now; + void checkForJobChanges(); + } + return; + } - if (!payload || typeof payload !== "object") return; - const step = (payload as { step?: unknown }).step; - if (typeof step !== "string") return; - if ( - !ACTIVE_PIPELINE_STEPS.has(step as PipelineProgressStep) && - !TERMINAL_PIPELINE_STEPS.has(step as PipelineProgressStep) && - step !== "idle" - ) { - return; - } - - const typedStep = step as PipelineProgressStep; - const isActiveStep = ACTIVE_PIPELINE_STEPS.has(typedStep); - if (isActiveStep) { - observePipelineState({ isRunning: true, terminal: null }); - } else if (typedStep === "idle") { - observePipelineState({ isRunning: false, terminal: null }); - } - - if (isActiveStep) { - const now = Date.now(); - if (now - lastSseRefreshAtRef.current >= 2500) { - lastSseRefreshAtRef.current = now; - void checkForJobChanges(); - } - return; - } - - if (TERMINAL_PIPELINE_STEPS.has(typedStep)) { - const eventPayload = payload as PipelineProgressEvent; - const terminal = typedStep as PipelineTerminalStatus; - observePipelineState({ - isRunning: false, - terminal: { - status: terminal, - errorMessage: eventPayload.error ?? null, - signature: buildTerminalSignature({ - status: terminal, - startedAt: eventPayload.startedAt, - completedAt: eventPayload.completedAt, - }), - }, - }); - void loadJobs(); - } - }; - - eventSource.onerror = () => { - setIsPipelineSseConnected(false); - }; + if (TERMINAL_PIPELINE_STEPS.has(typedStep)) { + const eventPayload = payload as PipelineProgressEvent; + const terminal = typedStep as PipelineTerminalStatus; + observePipelineState({ + isRunning: false, + terminal: { + status: terminal, + errorMessage: eventPayload.error ?? null, + signature: buildTerminalSignature({ + status: terminal, + startedAt: eventPayload.startedAt, + completedAt: eventPayload.completedAt, + }), + }, + }); + void loadJobs(); + } + }, + onError: () => { + setIsPipelineSseConnected(false); + }, + }, + ); return () => { - eventSource.close(); + unsubscribe(); }; }, [checkForJobChanges, loadJobs, observePipelineState]); diff --git a/orchestrator/src/client/pages/orchestrator/utils.ts b/orchestrator/src/client/pages/orchestrator/utils.ts index ebb33b2..3c89286 100644 --- a/orchestrator/src/client/pages/orchestrator/utils.ts +++ b/orchestrator/src/client/pages/orchestrator/utils.ts @@ -16,6 +16,9 @@ const compareString = (a: string, b: string) => a.localeCompare(b, undefined, { sensitivity: "base" }); const compareNumber = (a: number, b: number) => a - b; +export const clampNumber = (value: number, min: number, max: number) => + Math.max(min, Math.min(max, value)); + export const parseSalaryBounds = ( job: JobListItem, ): { min: number; max: number } | null => { diff --git a/orchestrator/src/components/ui/sonner.tsx b/orchestrator/src/components/ui/sonner.tsx index e86d48d..c61585b 100644 --- a/orchestrator/src/components/ui/sonner.tsx +++ b/orchestrator/src/components/ui/sonner.tsx @@ -11,6 +11,8 @@ const Toaster = ({ ...props }: ToasterProps) => { classNames: { toast: "group toast group-[.toaster]:bg-background group-[.toaster]:text-foreground group-[.toaster]:border-border group-[.toaster]:shadow-lg", + content: + "group-[.toast]:w-full group-[.toast]:flex-1 group-[.toast]:min-w-0", description: "group-[.toast]:text-muted-foreground", actionButton: "group-[.toast]:bg-primary group-[.toast]:text-primary-foreground", diff --git a/orchestrator/src/server/api/routes/ghostwriter.ts b/orchestrator/src/server/api/routes/ghostwriter.ts index 5f0b235..9d6ba42 100644 --- a/orchestrator/src/server/api/routes/ghostwriter.ts +++ b/orchestrator/src/server/api/routes/ghostwriter.ts @@ -1,7 +1,8 @@ import { asyncRoute, fail, ok } from "@infra/http"; import { runWithRequestContext } from "@infra/request-context"; +import { setupSse, writeSseData } from "@infra/sse"; import { badRequest, toAppError } from "@server/infra/errors"; -import { type Request, type Response, Router } from "express"; +import { type Request, Router } from "express"; import { z } from "zod"; import * as ghostwriterService from "../../services/ghostwriter"; @@ -33,10 +34,6 @@ function getJobId(req: Request): string { return jobId; } -function writeSse(res: Response, event: unknown): void { - res.write(`data: ${JSON.stringify(event)}\n\n`); -} - ghostwriterRouter.get( "/messages", asyncRoute(async (req, res) => { @@ -75,11 +72,10 @@ ghostwriterRouter.post( await runWithRequestContext({ jobId }, async () => { if (parsed.data.stream) { - res.status(200); - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache, no-transform"); - res.setHeader("Connection", "keep-alive"); - res.flushHeaders?.(); + setupSse(res, { + cacheControl: "no-cache, no-transform", + flushHeaders: true, + }); try { await ghostwriterService.sendMessageForJob({ @@ -87,7 +83,7 @@ ghostwriterRouter.post( content: parsed.data.content, stream: { onReady: ({ runId, threadId, messageId, requestId }) => - writeSse(res, { + writeSseData(res, { type: "ready", runId, threadId, @@ -95,26 +91,26 @@ ghostwriterRouter.post( requestId, }), onDelta: ({ runId, messageId, delta }) => - writeSse(res, { + writeSseData(res, { type: "delta", runId, messageId, delta, }), onCompleted: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "completed", runId, message, }), onCancelled: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "cancelled", runId, message, }), onError: ({ runId, code, message, requestId }) => - writeSse(res, { + writeSseData(res, { type: "error", runId, code, @@ -125,7 +121,7 @@ ghostwriterRouter.post( }); } catch (error) { const appError = toAppError(error); - writeSse(res, { + writeSseData(res, { type: "error", code: appError.code, message: appError.message, @@ -191,11 +187,10 @@ ghostwriterRouter.post( await runWithRequestContext({ jobId }, async () => { if (parsed.data.stream) { - res.status(200); - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache, no-transform"); - res.setHeader("Connection", "keep-alive"); - res.flushHeaders?.(); + setupSse(res, { + cacheControl: "no-cache, no-transform", + flushHeaders: true, + }); try { await ghostwriterService.regenerateMessageForJob({ @@ -203,7 +198,7 @@ ghostwriterRouter.post( assistantMessageId, stream: { onReady: ({ runId, threadId, messageId, requestId }) => - writeSse(res, { + writeSseData(res, { type: "ready", runId, threadId, @@ -211,26 +206,26 @@ ghostwriterRouter.post( requestId, }), onDelta: ({ runId, messageId, delta }) => - writeSse(res, { + writeSseData(res, { type: "delta", runId, messageId, delta, }), onCompleted: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "completed", runId, message, }), onCancelled: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "cancelled", runId, message, }), onError: ({ runId, code, message, requestId }) => - writeSse(res, { + writeSseData(res, { type: "error", runId, code, @@ -241,7 +236,7 @@ ghostwriterRouter.post( }); } catch (error) { const appError = toAppError(error); - writeSse(res, { + writeSseData(res, { type: "error", code: appError.code, message: appError.message, @@ -346,11 +341,10 @@ ghostwriterRouter.post( await runWithRequestContext({ jobId }, async () => { if (parsed.data.stream) { - res.status(200); - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache, no-transform"); - res.setHeader("Connection", "keep-alive"); - res.flushHeaders?.(); + setupSse(res, { + cacheControl: "no-cache, no-transform", + flushHeaders: true, + }); try { await ghostwriterService.sendMessage({ @@ -359,7 +353,7 @@ ghostwriterRouter.post( content: parsed.data.content, stream: { onReady: ({ runId, messageId, requestId }) => - writeSse(res, { + writeSseData(res, { type: "ready", runId, threadId, @@ -367,26 +361,26 @@ ghostwriterRouter.post( requestId, }), onDelta: ({ runId, messageId, delta }) => - writeSse(res, { + writeSseData(res, { type: "delta", runId, messageId, delta, }), onCompleted: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "completed", runId, message, }), onCancelled: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "cancelled", runId, message, }), onError: ({ runId, code, message, requestId }) => - writeSse(res, { + writeSseData(res, { type: "error", runId, code, @@ -397,7 +391,7 @@ ghostwriterRouter.post( }); } catch (error) { const appError = toAppError(error); - writeSse(res, { + writeSseData(res, { type: "error", code: appError.code, message: appError.message, @@ -469,11 +463,10 @@ ghostwriterRouter.post( await runWithRequestContext({ jobId }, async () => { if (parsed.data.stream) { - res.status(200); - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache, no-transform"); - res.setHeader("Connection", "keep-alive"); - res.flushHeaders?.(); + setupSse(res, { + cacheControl: "no-cache, no-transform", + flushHeaders: true, + }); try { await ghostwriterService.regenerateMessage({ @@ -482,7 +475,7 @@ ghostwriterRouter.post( assistantMessageId, stream: { onReady: ({ runId, messageId, requestId }) => - writeSse(res, { + writeSseData(res, { type: "ready", runId, threadId, @@ -490,26 +483,26 @@ ghostwriterRouter.post( requestId, }), onDelta: ({ runId, messageId, delta }) => - writeSse(res, { + writeSseData(res, { type: "delta", runId, messageId, delta, }), onCompleted: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "completed", runId, message, }), onCancelled: ({ runId, message }) => - writeSse(res, { + writeSseData(res, { type: "cancelled", runId, message, }), onError: ({ runId, code, message, requestId }) => - writeSse(res, { + writeSseData(res, { type: "error", runId, code, @@ -520,7 +513,7 @@ ghostwriterRouter.post( }); } catch (error) { const appError = toAppError(error); - writeSse(res, { + writeSseData(res, { type: "error", code: appError.code, message: appError.message, diff --git a/orchestrator/src/server/api/routes/jobs.test.ts b/orchestrator/src/server/api/routes/jobs.test.ts index 717786e..e2ef3f7 100644 --- a/orchestrator/src/server/api/routes/jobs.test.ts +++ b/orchestrator/src/server/api/routes/jobs.test.ts @@ -431,6 +431,95 @@ describe.sequential("Jobs API routes", () => { ).toBe("NOT_FOUND"); }); + it("streams bulk action progress with done counters", async () => { + const { createJob, updateJob } = await import("../../repositories/jobs"); + const discovered = await createJob({ + source: "manual", + title: "Discovered Role", + employer: "Acme", + jobUrl: "https://example.com/job/bulk-stream-1", + jobDescription: "Test description", + }); + const ready = await createJob({ + source: "manual", + title: "Ready Role", + employer: "Beta", + jobUrl: "https://example.com/job/bulk-stream-2", + jobDescription: "Test description", + }); + const applied = await createJob({ + source: "manual", + title: "Applied Role", + employer: "Gamma", + jobUrl: "https://example.com/job/bulk-stream-3", + jobDescription: "Test description", + }); + await updateJob(ready.id, { status: "ready" }); + await updateJob(applied.id, { status: "applied" }); + + const res = await fetch(`${baseUrl}/api/jobs/bulk-actions/stream`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + action: "skip", + jobIds: [discovered.id, ready.id, applied.id], + }), + }); + + expect(res.status).toBe(200); + expect(res.headers.get("content-type")).toContain("text/event-stream"); + + const reader = res.body?.getReader(); + expect(reader).toBeDefined(); + if (!reader) return; + + const decoder = new TextDecoder(); + const events: any[] = []; + let buffer = ""; + let hasCompleted = false; + + try { + while (!hasCompleted) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + + let separatorIndex = buffer.indexOf("\n\n"); + while (separatorIndex !== -1) { + const frame = buffer.slice(0, separatorIndex); + buffer = buffer.slice(separatorIndex + 2); + + const dataLines = frame + .split("\n") + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trim()) + .filter(Boolean); + + for (const line of dataLines) { + const event = JSON.parse(line); + events.push(event); + if (event.type === "completed") { + hasCompleted = true; + } + } + + separatorIndex = buffer.indexOf("\n\n"); + } + } + } finally { + await reader.cancel(); + } + + expect(events[0].type).toBe("started"); + expect(events[0].completed).toBe(0); + expect(events[0].requested).toBe(3); + expect(events.filter((event) => event.type === "progress")).toHaveLength(3); + expect(events.at(-1)?.type).toBe("completed"); + expect(events.at(-1)?.completed).toBe(3); + expect(events.at(-1)?.succeeded).toBe(2); + expect(events.at(-1)?.failed).toBe(1); + }); + it("validates bulk action payloads", async () => { const tooManyIds = Array.from( { length: 101 }, diff --git a/orchestrator/src/server/api/routes/jobs.ts b/orchestrator/src/server/api/routes/jobs.ts index a7d7bd8..9e6e1f5 100644 --- a/orchestrator/src/server/api/routes/jobs.ts +++ b/orchestrator/src/server/api/routes/jobs.ts @@ -1,12 +1,14 @@ import { fail, ok, okWithMeta } from "@infra/http"; import { logger } from "@infra/logger"; import { sanitizeWebhookPayload } from "@infra/sanitize"; +import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse"; import { APPLICATION_OUTCOMES, APPLICATION_STAGES, type BulkJobAction, type BulkJobActionResponse, type BulkJobActionResult, + type BulkJobActionStreamEvent, type Job, type JobListItem, type JobStatus, @@ -523,6 +525,178 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => { } }); +/** + * POST /api/jobs/bulk-actions/stream - Run a bulk action and stream per-job progress via SSE + */ +jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { + const parsed = bulkActionRequestSchema.safeParse(req.body); + if (!parsed.success) { + return fail( + res, + badRequest("Invalid bulk action request", parsed.error.flatten()), + ); + } + + const dedupedJobIds = Array.from(new Set(parsed.data.jobIds)); + const requestId = String(res.getHeader("x-request-id") || "unknown"); + const action = parsed.data.action; + const requested = dedupedJobIds.length; + const results: BulkJobActionResult[] = []; + let succeeded = 0; + let failed = 0; + + setupSse(res, { + cacheControl: "no-cache, no-transform", + disableBuffering: true, + flushHeaders: true, + }); + const stopHeartbeat = startSseHeartbeat(res); + + let clientDisconnected = false; + res.on("close", () => { + clientDisconnected = true; + stopHeartbeat(); + }); + + const isResponseWritable = () => + !clientDisconnected && !res.writableEnded && !res.destroyed; + + const sendEvent = (event: BulkJobActionStreamEvent) => { + if (!isResponseWritable()) return false; + writeSseData(res, event); + return true; + }; + + try { + if ( + !sendEvent({ + type: "started", + action, + requested, + completed: 0, + succeeded: 0, + failed: 0, + requestId, + }) + ) { + logger.info("Client disconnected before bulk stream started", { + route: "POST /api/jobs/bulk-actions/stream", + action, + requested, + succeeded, + failed, + requestId, + }); + return; + } + + for (const jobId of dedupedJobIds) { + if (!isResponseWritable()) { + logger.info("Client disconnected; stopping bulk job stream", { + route: "POST /api/jobs/bulk-actions/stream", + action, + requested, + succeeded, + failed, + requestId, + }); + break; + } + + const result = await executeBulkActionForJob(action, jobId); + results.push(result); + if (result.ok) succeeded += 1; + else failed += 1; + + if ( + !sendEvent({ + type: "progress", + action, + requested, + completed: results.length, + succeeded, + failed, + result, + requestId, + }) + ) { + logger.info("Client disconnected while writing bulk stream progress", { + route: "POST /api/jobs/bulk-actions/stream", + action, + requested, + succeeded, + failed, + requestId, + }); + break; + } + } + + sendEvent({ + type: "completed", + action, + requested, + completed: results.length, + succeeded, + failed, + results, + requestId, + }); + + logger.info("Bulk job action stream completed", { + route: "POST /api/jobs/bulk-actions/stream", + action, + requested, + succeeded, + failed, + requestId, + }); + } catch (error) { + const err = + error instanceof AppError + ? error + : new AppError({ + status: 500, + code: "INTERNAL_ERROR", + message: error instanceof Error ? error.message : "Unknown error", + }); + + logger.error("Bulk job action stream failed", { + route: "POST /api/jobs/bulk-actions/stream", + action, + requested, + succeeded, + failed, + status: err.status, + code: err.code, + requestId, + }); + + if ( + !sendEvent({ + type: "error", + code: err.code, + message: err.message, + requestId, + }) + ) { + logger.info("Skipping stream error event because client disconnected", { + route: "POST /api/jobs/bulk-actions/stream", + action, + requested, + succeeded, + failed, + requestId, + }); + } + } finally { + stopHeartbeat(); + if (!res.writableEnded && !res.destroyed) { + res.end(); + } + } +}); + /** * GET /api/jobs/:id - Get a single job */ diff --git a/orchestrator/src/server/api/routes/pipeline.ts b/orchestrator/src/server/api/routes/pipeline.ts index 31628e3..84c69bf 100644 --- a/orchestrator/src/server/api/routes/pipeline.ts +++ b/orchestrator/src/server/api/routes/pipeline.ts @@ -2,6 +2,7 @@ import { AppError, badRequest, conflict, requestTimeout } from "@infra/errors"; import { fail, ok, okWithMeta } from "@infra/http"; import { logger } from "@infra/logger"; import { runWithRequestContext } from "@infra/request-context"; +import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse"; import type { PipelineStatusResponse } from "@shared/types"; import { type Request, type Response, Router } from "express"; import { z } from "zod"; @@ -46,28 +47,22 @@ pipelineRouter.get("/status", async (_req: Request, res: Response) => { * GET /api/pipeline/progress - Server-Sent Events endpoint for live progress */ pipelineRouter.get("/progress", (req: Request, res: Response) => { - // Set headers for SSE - res.setHeader("Content-Type", "text/event-stream"); - res.setHeader("Cache-Control", "no-cache"); - res.setHeader("Connection", "keep-alive"); - res.setHeader("X-Accel-Buffering", "no"); // Disable Nginx buffering + setupSse(res, { disableBuffering: true }); // Send initial progress const sendProgress = (data: unknown) => { - res.write(`data: ${JSON.stringify(data)}\n\n`); + writeSseData(res, data); }; // Subscribe to progress updates const unsubscribe = subscribeToProgress(sendProgress); // Send heartbeat every 30 seconds to keep connection alive - const heartbeat = setInterval(() => { - res.write(": heartbeat\n\n"); - }, 30000); + const stopHeartbeat = startSseHeartbeat(res); // Cleanup on close req.on("close", () => { - clearInterval(heartbeat); + stopHeartbeat(); unsubscribe(); }); }); diff --git a/orchestrator/src/server/infra/sse.ts b/orchestrator/src/server/infra/sse.ts new file mode 100644 index 0000000..dd58bea --- /dev/null +++ b/orchestrator/src/server/infra/sse.ts @@ -0,0 +1,45 @@ +import type { Response } from "express"; + +interface SetupSseOptions { + cacheControl?: string; + disableBuffering?: boolean; + flushHeaders?: boolean; +} + +const DEFAULT_HEARTBEAT_MS = 30_000; + +export function setupSse(res: Response, options: SetupSseOptions = {}): void { + res.status(200); + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", options.cacheControl ?? "no-cache"); + res.setHeader("Connection", "keep-alive"); + + if (options.disableBuffering) { + res.setHeader("X-Accel-Buffering", "no"); + } + + if (options.flushHeaders) { + res.flushHeaders?.(); + } +} + +export function writeSseData(res: Response, data: unknown): void { + res.write(`data: ${JSON.stringify(data)}\n\n`); +} + +export function writeSseComment(res: Response, comment: string): void { + res.write(`: ${comment}\n\n`); +} + +export function startSseHeartbeat( + res: Response, + intervalMs = DEFAULT_HEARTBEAT_MS, +): () => void { + const heartbeat = setInterval(() => { + writeSseComment(res, "heartbeat"); + }, intervalMs); + + return () => { + clearInterval(heartbeat); + }; +} diff --git a/shared/src/types.ts b/shared/src/types.ts index 9dc78ec..b85b21c 100644 --- a/shared/src/types.ts +++ b/shared/src/types.ts @@ -621,6 +621,43 @@ export interface BulkJobActionResponse { results: BulkJobActionResult[]; } +export type BulkJobActionStreamEvent = + | { + type: "started"; + action: BulkJobAction; + requested: number; + completed: number; + succeeded: number; + failed: number; + requestId: string; + } + | { + type: "progress"; + action: BulkJobAction; + requested: number; + completed: number; + succeeded: number; + failed: number; + result: BulkJobActionResult; + requestId: string; + } + | { + type: "completed"; + action: BulkJobAction; + requested: number; + completed: number; + succeeded: number; + failed: number; + results: BulkJobActionResult[]; + requestId: string; + } + | { + type: "error"; + code: string; + message: string; + requestId: string; + }; + export const JOB_CHAT_MESSAGE_ROLES = [ "system", "user",