Fix #162: real-time bulk action streaming progress (#187)

* initial

* refactor: centralize SSE plumbing for client and server

* docs: add centralized SSE usage standards to agents guide

* use sse to stream actions to the client

* ui: align bulk progress toast with default sonner style

* ui: remove hide action from bulk progress toast

* full width progress bar

* fix(stream): track client disconnect and writability

* fix(stream): stop bulk loop when SSE client disconnects

* fix(stream): avoid writing error/end to closed SSE response

* fix(stream): gate started/progress frames on writable SSE socket

* types(api): narrow SSE stream payload input contract

* refactor(ui): share clamp helper for bulk progress

* fix(stream): add heartbeat to bulk action SSE route

* feat(stream): include completed count in bulk completion event

* fix(client-sse): separate parse vs handler errors and cancel reader
This commit is contained in:
Shaheer Sarfaraz 2026-02-18 15:54:39 +00:00 committed by GitHub
parent 4f8664cb9c
commit 032626bd7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 854 additions and 197 deletions

View File

@ -36,6 +36,14 @@ Use consistent status/code mapping:
- Log structured objects, not free-form dumps. - Log structured objects, not free-form dumps.
- Include useful context fields (e.g. `requestId`, `pipelineRunId`, `jobId`, `route`, `status`). - Include useful context fields (e.g. `requestId`, `pipelineRunId`, `jobId`, `route`, `status`).
## SSE Standards
- Use centralized SSE helpers by default.
- Server: use `orchestrator/src/server/infra/sse.ts` for setup, data writes, comments, and heartbeats.
- Client (`EventSource`): use `orchestrator/src/client/lib/sse.ts` for subscription/open/message/error plumbing.
- Do not duplicate raw SSE setup (`Content-Type`, `Connection`, heartbeat loops, or ad-hoc `JSON.parse` event parsing) when these helpers apply.
- Keep feature payload types domain-local (pipeline, ghostwriter, bulk actions), but reuse shared transport plumbing.
## Redaction and Sanitization ## Redaction and Sanitization
- Always sanitize objects before logging or returning in error `details`. - Always sanitize objects before logging or returning in error `details`.

View File

@ -0,0 +1,43 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import * as api from "./client";
describe("API client SSE streaming", () => {
beforeEach(() => {
vi.restoreAllMocks();
api.__resetApiClientAuthForTests();
});
it("propagates handler errors and cancels the stream reader", async () => {
const encoder = new TextEncoder();
const cancelSpy = vi.fn();
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(
encoder.encode('data: {"type":"started","requestId":"req-1"}\n\n'),
);
},
cancel() {
cancelSpy();
},
});
vi.spyOn(global, "fetch").mockResolvedValue({
ok: true,
status: 200,
body: stream,
} as Response);
await expect(
api.streamBulkJobAction(
{ action: "skip", jobIds: ["job-1"] },
{
onEvent: () => {
throw new Error("handler exploded");
},
},
),
).rejects.toThrow("handler exploded");
expect(cancelSpy).toHaveBeenCalledTimes(1);
});
});

View File

@ -11,6 +11,7 @@ import type {
BackupInfo, BackupInfo,
BulkJobActionRequest, BulkJobActionRequest,
BulkJobActionResponse, BulkJobActionResponse,
BulkJobActionStreamEvent,
BulkPostApplicationAction, BulkPostApplicationAction,
BulkPostApplicationActionResponse, BulkPostApplicationActionResponse,
DemoInfoResponse, DemoInfoResponse,
@ -79,6 +80,11 @@ type LegacyApiResponse<T> =
details?: unknown; details?: unknown;
}; };
type StreamSseInput =
| BulkJobActionRequest
| { content: string; stream: true }
| { stream: true };
export type BasicAuthCredentials = { export type BasicAuthCredentials = {
username: string; username: string;
password: string; password: string;
@ -393,11 +399,11 @@ export async function updateJob(
}); });
} }
async function streamSseEvents( async function streamSseEvents<TEvent>(
endpoint: string, endpoint: string,
input: Record<string, unknown>, input: StreamSseInput,
handlers: { handlers: {
onEvent: (event: JobChatStreamEvent) => void; onEvent: (event: TEvent) => void;
signal?: AbortSignal; signal?: AbortSignal;
}, },
): Promise<void> { ): Promise<void> {
@ -439,29 +445,40 @@ async function streamSseEvents(
const reader = response.body.getReader(); const reader = response.body.getReader();
let buffer = ""; let buffer = "";
while (true) { try {
const { done, value } = await reader.read(); while (true) {
if (done) break; const { done, value } = await reader.read();
buffer += decoder.decode(value, { stream: true }); if (done) break;
buffer += decoder.decode(value, { stream: true });
let separatorIndex = buffer.indexOf("\n\n"); let separatorIndex = buffer.indexOf("\n\n");
while (separatorIndex !== -1) { while (separatorIndex !== -1) {
const frame = buffer.slice(0, separatorIndex); const frame = buffer.slice(0, separatorIndex);
buffer = buffer.slice(separatorIndex + 2); buffer = buffer.slice(separatorIndex + 2);
const dataLines = frame const dataLines = frame
.split("\n") .split("\n")
.filter((line) => line.startsWith("data:")) .filter((line) => line.startsWith("data:"))
.map((line) => line.slice(5).trim()) .map((line) => line.slice(5).trim())
.filter(Boolean); .filter(Boolean);
for (const line of dataLines) { for (const line of dataLines) {
try { let parsedEvent: TEvent;
handlers.onEvent(JSON.parse(line) as JobChatStreamEvent); try {
} catch { parsedEvent = JSON.parse(line) as TEvent;
// Ignore malformed events to keep stream resilient } catch {
// Ignore malformed events to keep stream resilient
continue;
}
handlers.onEvent(parsedEvent);
} }
separatorIndex = buffer.indexOf("\n\n");
} }
separatorIndex = buffer.indexOf("\n\n"); }
} finally {
try {
await reader.cancel();
} catch {
// Ignore cancellation errors when stream is already closed
} }
} }
} }
@ -710,6 +727,20 @@ export async function bulkJobAction(
}); });
} }
export async function streamBulkJobAction(
input: BulkJobActionRequest,
handlers: {
onEvent: (event: BulkJobActionStreamEvent) => void;
signal?: AbortSignal;
},
): Promise<void> {
return streamSseEvents<BulkJobActionStreamEvent>(
"/jobs/bulk-actions/stream",
input,
handlers,
);
}
export async function getJobStageEvents(id: string): Promise<StageEvent[]> { export async function getJobStageEvents(id: string): Promise<StageEvent[]> {
return fetchApi<StageEvent[]>(`/jobs/${id}/events?t=${Date.now()}`); return fetchApi<StageEvent[]>(`/jobs/${id}/events?t=${Date.now()}`);
} }

View File

@ -5,7 +5,7 @@
import { Loader2 } from "lucide-react"; import { Loader2 } from "lucide-react";
import type React from "react"; import type React from "react";
import { useEffect, useMemo, useState } from "react"; import { useEffect, useMemo, useState } from "react";
import { subscribeToEventSource } from "@/client/lib/sse";
import { Badge } from "@/components/ui/badge"; import { Badge } from "@/components/ui/badge";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Progress } from "@/components/ui/progress"; import { Progress } from "@/components/ui/progress";
@ -161,26 +161,23 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({
return; return;
} }
const eventSource = new EventSource("/api/pipeline/progress"); const unsubscribe = subscribeToEventSource<PipelineProgress>(
"/api/pipeline/progress",
eventSource.onopen = () => { {
setIsConnected(true); onOpen: () => {
}; setIsConnected(true);
},
eventSource.onmessage = (event) => { onMessage: (payload) => {
try { setProgress(payload);
setProgress(JSON.parse(event.data)); },
} catch { onError: () => {
// Ignore parse errors setIsConnected(false);
} },
}; },
);
eventSource.onerror = () => {
setIsConnected(false);
};
return () => { return () => {
eventSource.close(); unsubscribe();
setIsConnected(false); setIsConnected(false);
}; };
}, [isRunning]); }, [isRunning]);

View File

@ -0,0 +1,32 @@
interface EventSourceSubscriptionHandlers<T> {
onOpen?: () => void;
onMessage: (payload: T) => void;
onError?: () => void;
}
export function subscribeToEventSource<T>(
url: string,
handlers: EventSourceSubscriptionHandlers<T>,
): () => void {
const eventSource = new EventSource(url);
eventSource.onopen = () => {
handlers.onOpen?.();
};
eventSource.onmessage = (event) => {
try {
handlers.onMessage(JSON.parse(event.data) as T);
} catch {
// Ignore malformed events to keep stream resilient.
}
};
eventSource.onerror = () => {
handlers.onError?.();
};
return () => {
eventSource.close();
};
}

View File

@ -0,0 +1,29 @@
import { Progress } from "@/components/ui/progress";
import { clampNumber } from "./utils";
interface BulkActionProgressToastProps {
completed: number;
requested: number;
succeeded: number;
failed: number;
}
export function BulkActionProgressToast({
completed,
requested,
succeeded,
failed,
}: BulkActionProgressToastProps) {
const safeRequested = Math.max(requested, 1);
const safeCompleted = clampNumber(completed, 0, safeRequested);
const progressValue = Math.round((safeCompleted / safeRequested) * 100);
return (
<div className="mt-2 w-full space-y-1.5">
<Progress value={progressValue} className="h-1.5 w-full" />
<p className="tabular-nums text-xs text-muted-foreground">
{succeeded} succeeded, {failed} failed
</p>
</div>
);
}

View File

@ -1,5 +1,8 @@
import { createJob } from "@shared/testing/factories.js"; import { createJob } from "@shared/testing/factories.js";
import type { BulkJobActionResponse } from "@shared/types.js"; import type {
BulkJobActionResponse,
BulkJobActionStreamEvent,
} from "@shared/types.js";
import { act, renderHook, waitFor } from "@testing-library/react"; import { act, renderHook, waitFor } from "@testing-library/react";
import { toast } from "sonner"; import { toast } from "sonner";
import { beforeEach, describe, expect, it, vi } from "vitest"; import { beforeEach, describe, expect, it, vi } from "vitest";
@ -7,11 +10,13 @@ import * as api from "../../api";
import { useBulkJobSelection } from "./useBulkJobSelection"; import { useBulkJobSelection } from "./useBulkJobSelection";
vi.mock("../../api", () => ({ vi.mock("../../api", () => ({
bulkJobAction: vi.fn(), streamBulkJobAction: vi.fn(),
})); }));
vi.mock("sonner", () => ({ vi.mock("sonner", () => ({
toast: { toast: {
loading: vi.fn(),
dismiss: vi.fn(),
error: vi.fn(), error: vi.fn(),
success: vi.fn(), success: vi.fn(),
}, },
@ -30,9 +35,75 @@ const deferred = <T>(): Deferred<T> => {
return { promise, resolve }; return { promise, resolve };
}; };
const asStreamEvents = (
response: BulkJobActionResponse,
requestId = "req-bulk",
): BulkJobActionStreamEvent[] => {
const events: BulkJobActionStreamEvent[] = [
{
type: "started",
action: response.action,
requested: response.requested,
completed: 0,
succeeded: 0,
failed: 0,
requestId,
},
];
let succeeded = 0;
let failed = 0;
response.results.forEach((result, index) => {
if (result.ok) succeeded += 1;
else failed += 1;
events.push({
type: "progress",
action: response.action,
requested: response.requested,
completed: index + 1,
succeeded,
failed,
result,
requestId,
});
});
events.push({
type: "completed",
action: response.action,
requested: response.requested,
completed: response.requested,
succeeded: response.succeeded,
failed: response.failed,
results: response.results,
requestId,
});
return events;
};
const mockStreamBulkAction = (
response: BulkJobActionResponse,
waitForRelease?: Promise<void>,
) => {
vi.mocked(api.streamBulkJobAction).mockImplementation(
async (_input, handlers) => {
for (const event of asStreamEvents(response)) {
if (event.type === "started") handlers.onEvent(event);
}
if (waitForRelease) await waitForRelease;
for (const event of asStreamEvents(response)) {
if (event.type !== "started") handlers.onEvent(event);
}
return;
},
);
};
describe("useBulkJobSelection", () => { describe("useBulkJobSelection", () => {
beforeEach(() => { beforeEach(() => {
vi.clearAllMocks(); vi.clearAllMocks();
vi.mocked(toast.loading).mockReturnValue("bulk-progress-toast");
}); });
it("caps select-all to the API max", () => { it("caps select-all to the API max", () => {
@ -78,7 +149,7 @@ describe("useBulkJobSelection", () => {
await result.current.runBulkAction("skip"); await result.current.runBulkAction("skip");
}); });
expect(api.bulkJobAction).not.toHaveBeenCalled(); expect(api.streamBulkJobAction).not.toHaveBeenCalled();
}); });
it("reconciles failures with selection changes made during in-flight action", async () => { it("reconciles failures with selection changes made during in-flight action", async () => {
@ -88,8 +159,28 @@ describe("useBulkJobSelection", () => {
createJob({ id: "job-3", status: "discovered" }), createJob({ id: "job-3", status: "discovered" }),
]; ];
const loadJobs = vi.fn().mockResolvedValue(undefined); const loadJobs = vi.fn().mockResolvedValue(undefined);
const pending = deferred<BulkJobActionResponse>(); const release = deferred<void>();
vi.mocked(api.bulkJobAction).mockImplementation(() => pending.promise); mockStreamBulkAction(
{
action: "skip",
requested: 2,
succeeded: 1,
failed: 1,
results: [
{
jobId: "job-1",
ok: true,
job: createJob({ id: "job-1", status: "skipped" }),
},
{
jobId: "job-2",
ok: false,
error: { code: "INVALID_REQUEST", message: "bad status" },
},
],
},
release.promise,
);
const { result } = renderHook(() => const { result } = renderHook(() =>
useBulkJobSelection({ useBulkJobSelection({
@ -109,36 +200,24 @@ describe("useBulkJobSelection", () => {
runPromise = result.current.runBulkAction("skip"); runPromise = result.current.runBulkAction("skip");
}); });
expect(toast.loading).toHaveBeenCalled();
const firstLoadingCall = vi.mocked(toast.loading).mock.calls[0];
expect(firstLoadingCall[1]).not.toHaveProperty("cancel");
act(() => { act(() => {
result.current.toggleSelectJob("job-2"); result.current.toggleSelectJob("job-2");
result.current.toggleSelectJob("job-3"); result.current.toggleSelectJob("job-3");
}); });
await act(async () => { await act(async () => {
pending.resolve({ release.resolve();
action: "skip",
requested: 2,
succeeded: 1,
failed: 1,
results: [
{
jobId: "job-1",
ok: true,
job: createJob({ id: "job-1", status: "skipped" }),
},
{
jobId: "job-2",
ok: false,
error: { code: "INVALID_REQUEST", message: "bad status" },
},
],
});
await runPromise; await runPromise;
}); });
await waitFor(() => { await waitFor(() => {
expect(Array.from(result.current.selectedJobIds)).toEqual(["job-3"]); expect(Array.from(result.current.selectedJobIds)).toEqual(["job-3"]);
}); });
expect(toast.dismiss).toHaveBeenCalled();
}); });
it("runs bulk rescore and reports success copy", async () => { it("runs bulk rescore and reports success copy", async () => {
@ -147,7 +226,7 @@ describe("useBulkJobSelection", () => {
createJob({ id: "job-2", status: "ready" }), createJob({ id: "job-2", status: "ready" }),
]; ];
const loadJobs = vi.fn().mockResolvedValue(undefined); const loadJobs = vi.fn().mockResolvedValue(undefined);
vi.mocked(api.bulkJobAction).mockResolvedValue({ mockStreamBulkAction({
action: "rescore", action: "rescore",
requested: 2, requested: 2,
succeeded: 2, succeeded: 2,
@ -183,10 +262,12 @@ describe("useBulkJobSelection", () => {
await result.current.runBulkAction("rescore"); await result.current.runBulkAction("rescore");
}); });
expect(api.bulkJobAction).toHaveBeenCalledWith({ expect(api.streamBulkJobAction).toHaveBeenCalledWith(
action: "rescore", { action: "rescore", jobIds: ["job-1", "job-2"] },
jobIds: ["job-1", "job-2"], expect.objectContaining({
}); onEvent: expect.any(Function),
}),
);
expect(toast.success).toHaveBeenCalledWith("2 matches recalculated"); expect(toast.success).toHaveBeenCalledWith("2 matches recalculated");
}); });
}); });

View File

@ -1,7 +1,12 @@
import type { BulkJobAction, JobListItem } from "@shared/types.js"; import type {
BulkJobAction,
BulkJobActionResponse,
JobListItem,
} from "@shared/types.js";
import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import * as api from "../../api"; import * as api from "../../api";
import { BulkActionProgressToast } from "./BulkActionProgressToast";
import { import {
canBulkMoveToReady, canBulkMoveToReady,
canBulkRescore, canBulkRescore,
@ -9,9 +14,16 @@ import {
getFailedJobIds, getFailedJobIds,
} from "./bulkActions"; } from "./bulkActions";
import type { FilterTab } from "./constants"; import type { FilterTab } from "./constants";
import { clampNumber } from "./utils";
const MAX_BULK_ACTION_JOB_IDS = 100; const MAX_BULK_ACTION_JOB_IDS = 100;
const bulkActionLabel: Record<BulkJobAction, string> = {
move_to_ready: "Moving jobs to Ready...",
skip: "Skipping selected jobs...",
rescore: "Calculating match scores...",
};
interface UseBulkJobSelectionArgs { interface UseBulkJobSelectionArgs {
activeJobs: JobListItem[]; activeJobs: JobListItem[];
activeTab: FilterTab; activeTab: FilterTab;
@ -110,17 +122,105 @@ export function useBulkJobSelection({
} }
const selectedAtStartSet = new Set(selectedAtStart); const selectedAtStartSet = new Set(selectedAtStart);
let progressToastId: string | number | undefined;
let finalResult: BulkJobActionResponse | null = null;
let streamError: string | null = null;
let latestProgress = {
requested: selectedAtStart.length,
completed: 0,
succeeded: 0,
failed: 0,
};
const getProgressTitle = () => {
const safeRequested = Math.max(latestProgress.requested, 1);
const safeCompleted = clampNumber(
latestProgress.completed,
0,
safeRequested,
);
return `${safeCompleted}/${safeRequested} ${bulkActionLabel[action]}`;
};
const upsertProgressToast = () => {
progressToastId = toast.loading(getProgressTitle(), {
description: (
<BulkActionProgressToast
requested={latestProgress.requested}
completed={latestProgress.completed}
succeeded={latestProgress.succeeded}
failed={latestProgress.failed}
/>
),
...(progressToastId !== undefined ? { id: progressToastId } : {}),
duration: Number.POSITIVE_INFINITY,
});
};
try { try {
setBulkActionInFlight(action); setBulkActionInFlight(action);
if (action === "move_to_ready") { upsertProgressToast();
toast.message("Moving jobs to Ready..."); await api.streamBulkJobAction(
{
action,
jobIds: selectedAtStart,
},
{
onEvent: (event) => {
if (event.type === "error") {
streamError = event.message || "Failed to run bulk action";
return;
}
if (event.type === "started") {
latestProgress = {
requested: event.requested,
completed: event.completed,
succeeded: event.succeeded,
failed: event.failed,
};
upsertProgressToast();
return;
}
if (event.type === "progress") {
latestProgress = {
requested: event.requested,
completed: event.completed,
succeeded: event.succeeded,
failed: event.failed,
};
upsertProgressToast();
return;
}
latestProgress = {
requested: event.requested,
completed: event.completed,
succeeded: event.succeeded,
failed: event.failed,
};
finalResult = {
action: event.action,
requested: event.requested,
succeeded: event.succeeded,
failed: event.failed,
results: event.results,
};
upsertProgressToast();
},
},
);
if (streamError) {
throw new Error(streamError);
} }
const result = await api.bulkJobAction({ if (!finalResult) {
action, throw new Error("Bulk action stream ended before completion");
jobIds: selectedAtStart, }
});
const result = finalResult as BulkJobActionResponse;
const failedIds = getFailedJobIds(result); const failedIds = getFailedJobIds(result);
const successLabel = const successLabel =
action === "skip" action === "skip"
@ -157,6 +257,9 @@ export function useBulkJobSelection({
error instanceof Error ? error.message : "Failed to run bulk action"; error instanceof Error ? error.message : "Failed to run bulk action";
toast.error(message); toast.error(message);
} finally { } finally {
if (progressToastId !== undefined) {
toast.dismiss(progressToastId);
}
setBulkActionInFlight(null); setBulkActionInFlight(null);
} }
}, },

View File

@ -2,6 +2,7 @@ import type { Job, JobListItem, JobStatus } from "@shared/types";
import { useCallback, useEffect, useRef, useState } from "react"; import { useCallback, useEffect, useRef, useState } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import * as api from "../../api"; import * as api from "../../api";
import { subscribeToEventSource } from "../../lib/sse";
const initialStats: Record<JobStatus, number> = { const initialStats: Record<JobStatus, number> = {
discovered: 0, discovered: 0,
@ -320,73 +321,67 @@ export const useOrchestratorData = (selectedJobId: string | null) => {
useEffect(() => { useEffect(() => {
if (typeof EventSource === "undefined") return; if (typeof EventSource === "undefined") return;
const eventSource = new EventSource("/api/pipeline/progress"); const unsubscribe = subscribeToEventSource<unknown>(
"/api/pipeline/progress",
{
onOpen: () => {
setIsPipelineSseConnected(true);
},
onMessage: (payload) => {
if (!payload || typeof payload !== "object") return;
const step = (payload as { step?: unknown }).step;
if (typeof step !== "string") return;
if (
!ACTIVE_PIPELINE_STEPS.has(step as PipelineProgressStep) &&
!TERMINAL_PIPELINE_STEPS.has(step as PipelineProgressStep) &&
step !== "idle"
) {
return;
}
eventSource.onopen = () => { const typedStep = step as PipelineProgressStep;
setIsPipelineSseConnected(true); const isActiveStep = ACTIVE_PIPELINE_STEPS.has(typedStep);
}; if (isActiveStep) {
observePipelineState({ isRunning: true, terminal: null });
} else if (typedStep === "idle") {
observePipelineState({ isRunning: false, terminal: null });
}
eventSource.onmessage = (event) => { if (isActiveStep) {
let payload: unknown; const now = Date.now();
try { if (now - lastSseRefreshAtRef.current >= 2500) {
payload = JSON.parse(event.data); lastSseRefreshAtRef.current = now;
} catch { void checkForJobChanges();
return; }
} return;
}
if (!payload || typeof payload !== "object") return; if (TERMINAL_PIPELINE_STEPS.has(typedStep)) {
const step = (payload as { step?: unknown }).step; const eventPayload = payload as PipelineProgressEvent;
if (typeof step !== "string") return; const terminal = typedStep as PipelineTerminalStatus;
if ( observePipelineState({
!ACTIVE_PIPELINE_STEPS.has(step as PipelineProgressStep) && isRunning: false,
!TERMINAL_PIPELINE_STEPS.has(step as PipelineProgressStep) && terminal: {
step !== "idle" status: terminal,
) { errorMessage: eventPayload.error ?? null,
return; signature: buildTerminalSignature({
} status: terminal,
startedAt: eventPayload.startedAt,
const typedStep = step as PipelineProgressStep; completedAt: eventPayload.completedAt,
const isActiveStep = ACTIVE_PIPELINE_STEPS.has(typedStep); }),
if (isActiveStep) { },
observePipelineState({ isRunning: true, terminal: null }); });
} else if (typedStep === "idle") { void loadJobs();
observePipelineState({ isRunning: false, terminal: null }); }
} },
onError: () => {
if (isActiveStep) { setIsPipelineSseConnected(false);
const now = Date.now(); },
if (now - lastSseRefreshAtRef.current >= 2500) { },
lastSseRefreshAtRef.current = now; );
void checkForJobChanges();
}
return;
}
if (TERMINAL_PIPELINE_STEPS.has(typedStep)) {
const eventPayload = payload as PipelineProgressEvent;
const terminal = typedStep as PipelineTerminalStatus;
observePipelineState({
isRunning: false,
terminal: {
status: terminal,
errorMessage: eventPayload.error ?? null,
signature: buildTerminalSignature({
status: terminal,
startedAt: eventPayload.startedAt,
completedAt: eventPayload.completedAt,
}),
},
});
void loadJobs();
}
};
eventSource.onerror = () => {
setIsPipelineSseConnected(false);
};
return () => { return () => {
eventSource.close(); unsubscribe();
}; };
}, [checkForJobChanges, loadJobs, observePipelineState]); }, [checkForJobChanges, loadJobs, observePipelineState]);

View File

@ -16,6 +16,9 @@ const compareString = (a: string, b: string) =>
a.localeCompare(b, undefined, { sensitivity: "base" }); a.localeCompare(b, undefined, { sensitivity: "base" });
const compareNumber = (a: number, b: number) => a - b; const compareNumber = (a: number, b: number) => a - b;
export const clampNumber = (value: number, min: number, max: number) =>
Math.max(min, Math.min(max, value));
export const parseSalaryBounds = ( export const parseSalaryBounds = (
job: JobListItem, job: JobListItem,
): { min: number; max: number } | null => { ): { min: number; max: number } | null => {

View File

@ -11,6 +11,8 @@ const Toaster = ({ ...props }: ToasterProps) => {
classNames: { classNames: {
toast: toast:
"group toast group-[.toaster]:bg-background group-[.toaster]:text-foreground group-[.toaster]:border-border group-[.toaster]:shadow-lg", "group toast group-[.toaster]:bg-background group-[.toaster]:text-foreground group-[.toaster]:border-border group-[.toaster]:shadow-lg",
content:
"group-[.toast]:w-full group-[.toast]:flex-1 group-[.toast]:min-w-0",
description: "group-[.toast]:text-muted-foreground", description: "group-[.toast]:text-muted-foreground",
actionButton: actionButton:
"group-[.toast]:bg-primary group-[.toast]:text-primary-foreground", "group-[.toast]:bg-primary group-[.toast]:text-primary-foreground",

View File

@ -1,7 +1,8 @@
import { asyncRoute, fail, ok } from "@infra/http"; import { asyncRoute, fail, ok } from "@infra/http";
import { runWithRequestContext } from "@infra/request-context"; import { runWithRequestContext } from "@infra/request-context";
import { setupSse, writeSseData } from "@infra/sse";
import { badRequest, toAppError } from "@server/infra/errors"; import { badRequest, toAppError } from "@server/infra/errors";
import { type Request, type Response, Router } from "express"; import { type Request, Router } from "express";
import { z } from "zod"; import { z } from "zod";
import * as ghostwriterService from "../../services/ghostwriter"; import * as ghostwriterService from "../../services/ghostwriter";
@ -33,10 +34,6 @@ function getJobId(req: Request): string {
return jobId; return jobId;
} }
function writeSse(res: Response, event: unknown): void {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
ghostwriterRouter.get( ghostwriterRouter.get(
"/messages", "/messages",
asyncRoute(async (req, res) => { asyncRoute(async (req, res) => {
@ -75,11 +72,10 @@ ghostwriterRouter.post(
await runWithRequestContext({ jobId }, async () => { await runWithRequestContext({ jobId }, async () => {
if (parsed.data.stream) { if (parsed.data.stream) {
res.status(200); setupSse(res, {
res.setHeader("Content-Type", "text/event-stream"); cacheControl: "no-cache, no-transform",
res.setHeader("Cache-Control", "no-cache, no-transform"); flushHeaders: true,
res.setHeader("Connection", "keep-alive"); });
res.flushHeaders?.();
try { try {
await ghostwriterService.sendMessageForJob({ await ghostwriterService.sendMessageForJob({
@ -87,7 +83,7 @@ ghostwriterRouter.post(
content: parsed.data.content, content: parsed.data.content,
stream: { stream: {
onReady: ({ runId, threadId, messageId, requestId }) => onReady: ({ runId, threadId, messageId, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "ready", type: "ready",
runId, runId,
threadId, threadId,
@ -95,26 +91,26 @@ ghostwriterRouter.post(
requestId, requestId,
}), }),
onDelta: ({ runId, messageId, delta }) => onDelta: ({ runId, messageId, delta }) =>
writeSse(res, { writeSseData(res, {
type: "delta", type: "delta",
runId, runId,
messageId, messageId,
delta, delta,
}), }),
onCompleted: ({ runId, message }) => onCompleted: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "completed", type: "completed",
runId, runId,
message, message,
}), }),
onCancelled: ({ runId, message }) => onCancelled: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "cancelled", type: "cancelled",
runId, runId,
message, message,
}), }),
onError: ({ runId, code, message, requestId }) => onError: ({ runId, code, message, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
runId, runId,
code, code,
@ -125,7 +121,7 @@ ghostwriterRouter.post(
}); });
} catch (error) { } catch (error) {
const appError = toAppError(error); const appError = toAppError(error);
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
code: appError.code, code: appError.code,
message: appError.message, message: appError.message,
@ -191,11 +187,10 @@ ghostwriterRouter.post(
await runWithRequestContext({ jobId }, async () => { await runWithRequestContext({ jobId }, async () => {
if (parsed.data.stream) { if (parsed.data.stream) {
res.status(200); setupSse(res, {
res.setHeader("Content-Type", "text/event-stream"); cacheControl: "no-cache, no-transform",
res.setHeader("Cache-Control", "no-cache, no-transform"); flushHeaders: true,
res.setHeader("Connection", "keep-alive"); });
res.flushHeaders?.();
try { try {
await ghostwriterService.regenerateMessageForJob({ await ghostwriterService.regenerateMessageForJob({
@ -203,7 +198,7 @@ ghostwriterRouter.post(
assistantMessageId, assistantMessageId,
stream: { stream: {
onReady: ({ runId, threadId, messageId, requestId }) => onReady: ({ runId, threadId, messageId, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "ready", type: "ready",
runId, runId,
threadId, threadId,
@ -211,26 +206,26 @@ ghostwriterRouter.post(
requestId, requestId,
}), }),
onDelta: ({ runId, messageId, delta }) => onDelta: ({ runId, messageId, delta }) =>
writeSse(res, { writeSseData(res, {
type: "delta", type: "delta",
runId, runId,
messageId, messageId,
delta, delta,
}), }),
onCompleted: ({ runId, message }) => onCompleted: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "completed", type: "completed",
runId, runId,
message, message,
}), }),
onCancelled: ({ runId, message }) => onCancelled: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "cancelled", type: "cancelled",
runId, runId,
message, message,
}), }),
onError: ({ runId, code, message, requestId }) => onError: ({ runId, code, message, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
runId, runId,
code, code,
@ -241,7 +236,7 @@ ghostwriterRouter.post(
}); });
} catch (error) { } catch (error) {
const appError = toAppError(error); const appError = toAppError(error);
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
code: appError.code, code: appError.code,
message: appError.message, message: appError.message,
@ -346,11 +341,10 @@ ghostwriterRouter.post(
await runWithRequestContext({ jobId }, async () => { await runWithRequestContext({ jobId }, async () => {
if (parsed.data.stream) { if (parsed.data.stream) {
res.status(200); setupSse(res, {
res.setHeader("Content-Type", "text/event-stream"); cacheControl: "no-cache, no-transform",
res.setHeader("Cache-Control", "no-cache, no-transform"); flushHeaders: true,
res.setHeader("Connection", "keep-alive"); });
res.flushHeaders?.();
try { try {
await ghostwriterService.sendMessage({ await ghostwriterService.sendMessage({
@ -359,7 +353,7 @@ ghostwriterRouter.post(
content: parsed.data.content, content: parsed.data.content,
stream: { stream: {
onReady: ({ runId, messageId, requestId }) => onReady: ({ runId, messageId, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "ready", type: "ready",
runId, runId,
threadId, threadId,
@ -367,26 +361,26 @@ ghostwriterRouter.post(
requestId, requestId,
}), }),
onDelta: ({ runId, messageId, delta }) => onDelta: ({ runId, messageId, delta }) =>
writeSse(res, { writeSseData(res, {
type: "delta", type: "delta",
runId, runId,
messageId, messageId,
delta, delta,
}), }),
onCompleted: ({ runId, message }) => onCompleted: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "completed", type: "completed",
runId, runId,
message, message,
}), }),
onCancelled: ({ runId, message }) => onCancelled: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "cancelled", type: "cancelled",
runId, runId,
message, message,
}), }),
onError: ({ runId, code, message, requestId }) => onError: ({ runId, code, message, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
runId, runId,
code, code,
@ -397,7 +391,7 @@ ghostwriterRouter.post(
}); });
} catch (error) { } catch (error) {
const appError = toAppError(error); const appError = toAppError(error);
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
code: appError.code, code: appError.code,
message: appError.message, message: appError.message,
@ -469,11 +463,10 @@ ghostwriterRouter.post(
await runWithRequestContext({ jobId }, async () => { await runWithRequestContext({ jobId }, async () => {
if (parsed.data.stream) { if (parsed.data.stream) {
res.status(200); setupSse(res, {
res.setHeader("Content-Type", "text/event-stream"); cacheControl: "no-cache, no-transform",
res.setHeader("Cache-Control", "no-cache, no-transform"); flushHeaders: true,
res.setHeader("Connection", "keep-alive"); });
res.flushHeaders?.();
try { try {
await ghostwriterService.regenerateMessage({ await ghostwriterService.regenerateMessage({
@ -482,7 +475,7 @@ ghostwriterRouter.post(
assistantMessageId, assistantMessageId,
stream: { stream: {
onReady: ({ runId, messageId, requestId }) => onReady: ({ runId, messageId, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "ready", type: "ready",
runId, runId,
threadId, threadId,
@ -490,26 +483,26 @@ ghostwriterRouter.post(
requestId, requestId,
}), }),
onDelta: ({ runId, messageId, delta }) => onDelta: ({ runId, messageId, delta }) =>
writeSse(res, { writeSseData(res, {
type: "delta", type: "delta",
runId, runId,
messageId, messageId,
delta, delta,
}), }),
onCompleted: ({ runId, message }) => onCompleted: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "completed", type: "completed",
runId, runId,
message, message,
}), }),
onCancelled: ({ runId, message }) => onCancelled: ({ runId, message }) =>
writeSse(res, { writeSseData(res, {
type: "cancelled", type: "cancelled",
runId, runId,
message, message,
}), }),
onError: ({ runId, code, message, requestId }) => onError: ({ runId, code, message, requestId }) =>
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
runId, runId,
code, code,
@ -520,7 +513,7 @@ ghostwriterRouter.post(
}); });
} catch (error) { } catch (error) {
const appError = toAppError(error); const appError = toAppError(error);
writeSse(res, { writeSseData(res, {
type: "error", type: "error",
code: appError.code, code: appError.code,
message: appError.message, message: appError.message,

View File

@ -431,6 +431,95 @@ describe.sequential("Jobs API routes", () => {
).toBe("NOT_FOUND"); ).toBe("NOT_FOUND");
}); });
it("streams bulk action progress with done counters", async () => {
const { createJob, updateJob } = await import("../../repositories/jobs");
const discovered = await createJob({
source: "manual",
title: "Discovered Role",
employer: "Acme",
jobUrl: "https://example.com/job/bulk-stream-1",
jobDescription: "Test description",
});
const ready = await createJob({
source: "manual",
title: "Ready Role",
employer: "Beta",
jobUrl: "https://example.com/job/bulk-stream-2",
jobDescription: "Test description",
});
const applied = await createJob({
source: "manual",
title: "Applied Role",
employer: "Gamma",
jobUrl: "https://example.com/job/bulk-stream-3",
jobDescription: "Test description",
});
await updateJob(ready.id, { status: "ready" });
await updateJob(applied.id, { status: "applied" });
const res = await fetch(`${baseUrl}/api/jobs/bulk-actions/stream`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
action: "skip",
jobIds: [discovered.id, ready.id, applied.id],
}),
});
expect(res.status).toBe(200);
expect(res.headers.get("content-type")).toContain("text/event-stream");
const reader = res.body?.getReader();
expect(reader).toBeDefined();
if (!reader) return;
const decoder = new TextDecoder();
const events: any[] = [];
let buffer = "";
let hasCompleted = false;
try {
while (!hasCompleted) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let separatorIndex = buffer.indexOf("\n\n");
while (separatorIndex !== -1) {
const frame = buffer.slice(0, separatorIndex);
buffer = buffer.slice(separatorIndex + 2);
const dataLines = frame
.split("\n")
.filter((line) => line.startsWith("data:"))
.map((line) => line.slice(5).trim())
.filter(Boolean);
for (const line of dataLines) {
const event = JSON.parse(line);
events.push(event);
if (event.type === "completed") {
hasCompleted = true;
}
}
separatorIndex = buffer.indexOf("\n\n");
}
}
} finally {
await reader.cancel();
}
expect(events[0].type).toBe("started");
expect(events[0].completed).toBe(0);
expect(events[0].requested).toBe(3);
expect(events.filter((event) => event.type === "progress")).toHaveLength(3);
expect(events.at(-1)?.type).toBe("completed");
expect(events.at(-1)?.completed).toBe(3);
expect(events.at(-1)?.succeeded).toBe(2);
expect(events.at(-1)?.failed).toBe(1);
});
it("validates bulk action payloads", async () => { it("validates bulk action payloads", async () => {
const tooManyIds = Array.from( const tooManyIds = Array.from(
{ length: 101 }, { length: 101 },

View File

@ -1,12 +1,14 @@
import { fail, ok, okWithMeta } from "@infra/http"; import { fail, ok, okWithMeta } from "@infra/http";
import { logger } from "@infra/logger"; import { logger } from "@infra/logger";
import { sanitizeWebhookPayload } from "@infra/sanitize"; import { sanitizeWebhookPayload } from "@infra/sanitize";
import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse";
import { import {
APPLICATION_OUTCOMES, APPLICATION_OUTCOMES,
APPLICATION_STAGES, APPLICATION_STAGES,
type BulkJobAction, type BulkJobAction,
type BulkJobActionResponse, type BulkJobActionResponse,
type BulkJobActionResult, type BulkJobActionResult,
type BulkJobActionStreamEvent,
type Job, type Job,
type JobListItem, type JobListItem,
type JobStatus, type JobStatus,
@ -523,6 +525,178 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => {
} }
}); });
/**
* POST /api/jobs/bulk-actions/stream - Run a bulk action and stream per-job progress via SSE
*/
jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
const parsed = bulkActionRequestSchema.safeParse(req.body);
if (!parsed.success) {
return fail(
res,
badRequest("Invalid bulk action request", parsed.error.flatten()),
);
}
const dedupedJobIds = Array.from(new Set(parsed.data.jobIds));
const requestId = String(res.getHeader("x-request-id") || "unknown");
const action = parsed.data.action;
const requested = dedupedJobIds.length;
const results: BulkJobActionResult[] = [];
let succeeded = 0;
let failed = 0;
setupSse(res, {
cacheControl: "no-cache, no-transform",
disableBuffering: true,
flushHeaders: true,
});
const stopHeartbeat = startSseHeartbeat(res);
let clientDisconnected = false;
res.on("close", () => {
clientDisconnected = true;
stopHeartbeat();
});
const isResponseWritable = () =>
!clientDisconnected && !res.writableEnded && !res.destroyed;
const sendEvent = (event: BulkJobActionStreamEvent) => {
if (!isResponseWritable()) return false;
writeSseData(res, event);
return true;
};
try {
if (
!sendEvent({
type: "started",
action,
requested,
completed: 0,
succeeded: 0,
failed: 0,
requestId,
})
) {
logger.info("Client disconnected before bulk stream started", {
route: "POST /api/jobs/bulk-actions/stream",
action,
requested,
succeeded,
failed,
requestId,
});
return;
}
for (const jobId of dedupedJobIds) {
if (!isResponseWritable()) {
logger.info("Client disconnected; stopping bulk job stream", {
route: "POST /api/jobs/bulk-actions/stream",
action,
requested,
succeeded,
failed,
requestId,
});
break;
}
const result = await executeBulkActionForJob(action, jobId);
results.push(result);
if (result.ok) succeeded += 1;
else failed += 1;
if (
!sendEvent({
type: "progress",
action,
requested,
completed: results.length,
succeeded,
failed,
result,
requestId,
})
) {
logger.info("Client disconnected while writing bulk stream progress", {
route: "POST /api/jobs/bulk-actions/stream",
action,
requested,
succeeded,
failed,
requestId,
});
break;
}
}
sendEvent({
type: "completed",
action,
requested,
completed: results.length,
succeeded,
failed,
results,
requestId,
});
logger.info("Bulk job action stream completed", {
route: "POST /api/jobs/bulk-actions/stream",
action,
requested,
succeeded,
failed,
requestId,
});
} catch (error) {
const err =
error instanceof AppError
? error
: new AppError({
status: 500,
code: "INTERNAL_ERROR",
message: error instanceof Error ? error.message : "Unknown error",
});
logger.error("Bulk job action stream failed", {
route: "POST /api/jobs/bulk-actions/stream",
action,
requested,
succeeded,
failed,
status: err.status,
code: err.code,
requestId,
});
if (
!sendEvent({
type: "error",
code: err.code,
message: err.message,
requestId,
})
) {
logger.info("Skipping stream error event because client disconnected", {
route: "POST /api/jobs/bulk-actions/stream",
action,
requested,
succeeded,
failed,
requestId,
});
}
} finally {
stopHeartbeat();
if (!res.writableEnded && !res.destroyed) {
res.end();
}
}
});
/** /**
* GET /api/jobs/:id - Get a single job * GET /api/jobs/:id - Get a single job
*/ */

View File

@ -2,6 +2,7 @@ import { AppError, badRequest, conflict, requestTimeout } from "@infra/errors";
import { fail, ok, okWithMeta } from "@infra/http"; import { fail, ok, okWithMeta } from "@infra/http";
import { logger } from "@infra/logger"; import { logger } from "@infra/logger";
import { runWithRequestContext } from "@infra/request-context"; import { runWithRequestContext } from "@infra/request-context";
import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse";
import type { PipelineStatusResponse } from "@shared/types"; import type { PipelineStatusResponse } from "@shared/types";
import { type Request, type Response, Router } from "express"; import { type Request, type Response, Router } from "express";
import { z } from "zod"; import { z } from "zod";
@ -46,28 +47,22 @@ pipelineRouter.get("/status", async (_req: Request, res: Response) => {
* GET /api/pipeline/progress - Server-Sent Events endpoint for live progress * GET /api/pipeline/progress - Server-Sent Events endpoint for live progress
*/ */
pipelineRouter.get("/progress", (req: Request, res: Response) => { pipelineRouter.get("/progress", (req: Request, res: Response) => {
// Set headers for SSE setupSse(res, { disableBuffering: true });
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no"); // Disable Nginx buffering
// Send initial progress // Send initial progress
const sendProgress = (data: unknown) => { const sendProgress = (data: unknown) => {
res.write(`data: ${JSON.stringify(data)}\n\n`); writeSseData(res, data);
}; };
// Subscribe to progress updates // Subscribe to progress updates
const unsubscribe = subscribeToProgress(sendProgress); const unsubscribe = subscribeToProgress(sendProgress);
// Send heartbeat every 30 seconds to keep connection alive // Send heartbeat every 30 seconds to keep connection alive
const heartbeat = setInterval(() => { const stopHeartbeat = startSseHeartbeat(res);
res.write(": heartbeat\n\n");
}, 30000);
// Cleanup on close // Cleanup on close
req.on("close", () => { req.on("close", () => {
clearInterval(heartbeat); stopHeartbeat();
unsubscribe(); unsubscribe();
}); });
}); });

View File

@ -0,0 +1,45 @@
import type { Response } from "express";
interface SetupSseOptions {
cacheControl?: string;
disableBuffering?: boolean;
flushHeaders?: boolean;
}
const DEFAULT_HEARTBEAT_MS = 30_000;
export function setupSse(res: Response, options: SetupSseOptions = {}): void {
res.status(200);
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", options.cacheControl ?? "no-cache");
res.setHeader("Connection", "keep-alive");
if (options.disableBuffering) {
res.setHeader("X-Accel-Buffering", "no");
}
if (options.flushHeaders) {
res.flushHeaders?.();
}
}
export function writeSseData(res: Response, data: unknown): void {
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
export function writeSseComment(res: Response, comment: string): void {
res.write(`: ${comment}\n\n`);
}
export function startSseHeartbeat(
res: Response,
intervalMs = DEFAULT_HEARTBEAT_MS,
): () => void {
const heartbeat = setInterval(() => {
writeSseComment(res, "heartbeat");
}, intervalMs);
return () => {
clearInterval(heartbeat);
};
}

View File

@ -621,6 +621,43 @@ export interface BulkJobActionResponse {
results: BulkJobActionResult[]; results: BulkJobActionResult[];
} }
export type BulkJobActionStreamEvent =
| {
type: "started";
action: BulkJobAction;
requested: number;
completed: number;
succeeded: number;
failed: number;
requestId: string;
}
| {
type: "progress";
action: BulkJobAction;
requested: number;
completed: number;
succeeded: number;
failed: number;
result: BulkJobActionResult;
requestId: string;
}
| {
type: "completed";
action: BulkJobAction;
requested: number;
completed: number;
succeeded: number;
failed: number;
results: BulkJobActionResult[];
requestId: string;
}
| {
type: "error";
code: string;
message: string;
requestId: string;
};
export const JOB_CHAT_MESSAGE_ROLES = [ export const JOB_CHAT_MESSAGE_ROLES = [
"system", "system",
"user", "user",