From a409aa5ee0ef272c5460d4e476d78bc9f0c95366 Mon Sep 17 00:00:00 2001 From: Shaheer Sarfaraz <53654735+DaKheera47@users.noreply.github.com> Date: Sat, 7 Feb 2026 22:44:00 +0000 Subject: [PATCH] Live scraping updates in pipeline UI (#100) * initial commit * fix clear script * cancelling pipelines * formatting --- extractors/jobspy/scrape_jobs.py | 27 +++ extractors/ukvisajobs/src/main.ts | 39 ++++ orchestrator/src/client/api/client.ts | 14 ++ .../components/PipelineProgress.test.tsx | 100 ++++++++++ .../client/components/PipelineProgress.tsx | 84 +++++++-- .../client/pages/OrchestratorPage.test.tsx | 44 ++++- .../src/client/pages/OrchestratorPage.tsx | 30 ++- .../orchestrator/OrchestratorHeader.test.tsx | 8 + .../pages/orchestrator/OrchestratorHeader.tsx | 46 +++-- .../src/server/api/routes/pipeline.test.ts | 36 +++- .../src/server/api/routes/pipeline.ts | 40 +++- .../src/server/api/routes/test-utils.ts | 11 ++ orchestrator/src/server/db/clear.ts | 3 +- orchestrator/src/server/db/migrate.ts | 18 +- orchestrator/src/server/db/schema.ts | 2 +- .../src/server/pipeline/cancellation.test.ts | 84 +++++++++ .../src/server/pipeline/orchestrator.ts | 101 +++++++++- orchestrator/src/server/pipeline/progress.ts | 103 ++++++++-- .../pipeline/steps/discover-jobs.test.ts | 163 ++++++++++++++++ .../server/pipeline/steps/discover-jobs.ts | 175 +++++++++++++++-- .../src/server/pipeline/steps/process-jobs.ts | 3 + .../src/server/pipeline/steps/score-jobs.ts | 3 + .../src/server/repositories/pipeline.ts | 2 +- .../src/server/services/jobspy.test.ts | 40 ++++ orchestrator/src/server/services/jobspy.ts | 82 +++++++- .../src/server/services/ukvisajobs.test.ts | 72 +++++++ .../src/server/services/ukvisajobs.ts | 177 +++++++++++++++++- shared/src/types.ts | 2 +- 28 files changed, 1425 insertions(+), 84 deletions(-) create mode 100644 orchestrator/src/client/components/PipelineProgress.test.tsx create mode 100644 orchestrator/src/server/pipeline/cancellation.test.ts create mode 100644 orchestrator/src/server/services/jobspy.test.ts create mode 100644 orchestrator/src/server/services/ukvisajobs.test.ts diff --git a/extractors/jobspy/scrape_jobs.py b/extractors/jobspy/scrape_jobs.py index 7454f04..81bfe47 100644 --- a/extractors/jobspy/scrape_jobs.py +++ b/extractors/jobspy/scrape_jobs.py @@ -1,9 +1,12 @@ import csv +import json import os from pathlib import Path from jobspy import scrape_jobs +PROGRESS_PREFIX = "JOBOPS_PROGRESS " + def _env_str(name: str, default: str) -> str: value = os.getenv(name) @@ -27,6 +30,11 @@ def _env_bool(name: str, default: bool) -> bool: return value.strip().lower() in ("1", "true", "yes", "y", "on") +def _emit_progress(event: str, payload: dict) -> None: + serialized = json.dumps({"event": event, **payload}, ensure_ascii=True) + print(f"{PROGRESS_PREFIX}{serialized}", flush=True) + + def _parse_sites(raw: str) -> list[str]: return [s.strip() for s in raw.split(",") if s.strip()] @@ -40,6 +48,8 @@ def main() -> int: country_indeed = _env_str("JOBSPY_COUNTRY_INDEED", "UK") linkedin_fetch_description = _env_bool("JOBSPY_LINKEDIN_FETCH_DESCRIPTION", True) is_remote = _env_bool("JOBSPY_IS_REMOTE", False) + term_index = _env_int("JOBSPY_TERM_INDEX", 1) + term_total = _env_int("JOBSPY_TERM_TOTAL", 1) output_csv = Path(_env_str("JOBSPY_OUTPUT_CSV", "jobs.csv")) output_json = Path( @@ -50,6 +60,14 @@ def main() -> int: output_json.parent.mkdir(parents=True, exist_ok=True) print(f"jobspy: Search term: {search_term}") + _emit_progress( + "term_start", + { + "termIndex": term_index, + "termTotal": term_total, + "searchTerm": search_term, + }, + ) jobs = scrape_jobs( site_name=sites, search_term=search_term, @@ -62,6 +80,15 @@ def main() -> int: ) print(f"Found {len(jobs)} jobs") + _emit_progress( + "term_complete", + { + "termIndex": term_index, + "termTotal": term_total, + "searchTerm": search_term, + "jobsFoundTerm": int(len(jobs)), + }, + ) jobs.to_csv( output_csv, diff --git a/extractors/ukvisajobs/src/main.ts b/extractors/ukvisajobs/src/main.ts index 83e61f8..e07cc52 100644 --- a/extractors/ukvisajobs/src/main.ts +++ b/extractors/ukvisajobs/src/main.ts @@ -32,6 +32,16 @@ const AUTH_CACHE_PATH = join(__dirname, "../storage/ukvisajobs-auth.json"); const JOBS_PER_PAGE = 15; const DEFAULT_MAX_JOBS = 50; const MAX_ALLOWED_JOBS = 200; +const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; + +function emitProgress( + event: string, + payload: Record = {}, +): void { + if (process.env.JOBOPS_EMIT_PROGRESS !== "1") return; + const serialized = JSON.stringify({ event, ...payload }); + process.stdout.write(`${JOBOPS_PROGRESS_PREFIX}${serialized}\n`); +} interface UkVisaJobsApiJob { id: string; @@ -444,6 +454,11 @@ async function main(): Promise { if (searchKeyword) { console.log(` Search keyword: ${searchKeyword}`); } + emitProgress("init", { + maxPages, + maxJobs, + searchKeyword: searchKeyword || "", + }); const allJobs: ExtractedJob[] = []; const seenIds = new Set(); @@ -481,6 +496,11 @@ async function main(): Promise { } if (response.status !== 1) { + emitProgress("error", { + pageNo, + status: response.status, + message: `API returned status ${response.status}`, + }); console.warn( ` ⚠️ API returned status ${response.status} on page ${pageNo}`, ); @@ -493,6 +513,11 @@ async function main(): Promise { } if (!response.jobs || response.jobs.length === 0) { + emitProgress("empty_page", { + pageNo, + maxPages, + totalCollected: allJobs.length, + }); console.log(` No more jobs on page ${pageNo}`); break; } @@ -508,6 +533,14 @@ async function main(): Promise { allJobs.push(mapped); } + emitProgress("page_fetched", { + pageNo, + maxPages, + jobsOnPage: response.jobs.length, + totalCollected: allJobs.length, + totalAvailable, + }); + // If we got fewer jobs than a full page, we're at the end if (response.jobs.length < JOBS_PER_PAGE) { break; @@ -519,6 +552,11 @@ async function main(): Promise { await new Promise((resolve) => setTimeout(resolve, 500)); } + emitProgress("done", { + maxPages, + totalCollected: allJobs.length, + totalAvailable, + }); console.log(`✅ Scraped ${allJobs.length} jobs`); // Write output to storage directory (similar to Crawlee dataset structure) @@ -542,6 +580,7 @@ async function main(): Promise { console.log(` Jobs file: ${outputFile}`); } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; + emitProgress("error", { message }); console.error(`❌ Error: ${message}`); process.exit(1); } diff --git a/orchestrator/src/client/api/client.ts b/orchestrator/src/client/api/client.ts index 62b9fc4..df4d748 100644 --- a/orchestrator/src/client/api/client.ts +++ b/orchestrator/src/client/api/client.ts @@ -319,6 +319,20 @@ export async function runPipeline(config?: { }); } +export async function cancelPipeline(): Promise<{ + message: string; + pipelineRunId: string | null; + alreadyRequested: boolean; +}> { + return fetchApi<{ + message: string; + pipelineRunId: string | null; + alreadyRequested: boolean; + }>("/pipeline/cancel", { + method: "POST", + }); +} + export async function getDemoInfo(): Promise { return fetchApi("/demo/info"); } diff --git a/orchestrator/src/client/components/PipelineProgress.test.tsx b/orchestrator/src/client/components/PipelineProgress.test.tsx new file mode 100644 index 0000000..48988cf --- /dev/null +++ b/orchestrator/src/client/components/PipelineProgress.test.tsx @@ -0,0 +1,100 @@ +import { act, render, screen } from "@testing-library/react"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { PipelineProgress } from "./PipelineProgress"; + +class MockEventSource { + static instances: MockEventSource[] = []; + onopen: ((event: Event) => void) | null = null; + onmessage: ((event: MessageEvent) => void) | null = null; + onerror: ((event: Event) => void) | null = null; + + constructor(public url: string) { + MockEventSource.instances.push(this); + } + + close = vi.fn(); + + emitOpen() { + this.onopen?.(new Event("open")); + } + + emitMessage(payload: unknown) { + this.onmessage?.({ + data: JSON.stringify(payload), + } as MessageEvent); + } +} + +const baseProgress = { + step: "crawling" as const, + message: "Fetching jobs from sources...", + detail: "Running crawler", + crawlingSource: "jobspy" as const, + crawlingSourcesCompleted: 1, + crawlingSourcesTotal: 3, + crawlingTermsProcessed: 2, + crawlingTermsTotal: 4, + crawlingListPagesProcessed: 0, + crawlingListPagesTotal: 0, + crawlingJobCardsFound: 0, + crawlingJobPagesEnqueued: 0, + crawlingJobPagesSkipped: 0, + crawlingJobPagesProcessed: 0, + crawlingPhase: "list" as const, + crawlingCurrentUrl: "engineer", + jobsDiscovered: 0, + jobsScored: 0, + jobsProcessed: 0, + totalToProcess: 0, +}; + +describe("PipelineProgress", () => { + beforeEach(() => { + MockEventSource.instances = []; + (globalThis as any).EventSource = MockEventSource; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("renders renamed crawling labels and source/terms context", () => { + render(); + const sse = MockEventSource.instances[0]; + + act(() => { + sse.emitOpen(); + sse.emitMessage({ + ...baseProgress, + crawlingListPagesProcessed: 3, + crawlingListPagesTotal: 10, + crawlingJobPagesProcessed: 8, + crawlingJobPagesEnqueued: 30, + crawlingJobPagesSkipped: 4, + }); + }); + + expect(screen.getByText("List pages")).toBeInTheDocument(); + expect(screen.getByText("Job pages")).toBeInTheDocument(); + expect(screen.getByText("Enqueued")).toBeInTheDocument(); + expect(screen.getByText("Skipped")).toBeInTheDocument(); + expect(screen.getByText("3/10")).toBeInTheDocument(); + expect(screen.getByText("8/30")).toBeInTheDocument(); + expect( + screen.getByText(/Source:\s+JobSpy\s+\(1\/3\)\s+Terms:\s+2\/4/), + ).toBeInTheDocument(); + }); + + it("uses fallback dashes for unknown page denominators", () => { + render(); + const sse = MockEventSource.instances[0]; + + act(() => { + sse.emitOpen(); + sse.emitMessage(baseProgress); + }); + + expect(screen.queryByText("0/0")).not.toBeInTheDocument(); + expect(screen.getAllByText("—").length).toBeGreaterThanOrEqual(2); + }); +}); diff --git a/orchestrator/src/client/components/PipelineProgress.tsx b/orchestrator/src/client/components/PipelineProgress.tsx index 76bde24..38defd0 100644 --- a/orchestrator/src/client/components/PipelineProgress.tsx +++ b/orchestrator/src/client/components/PipelineProgress.tsx @@ -20,9 +20,15 @@ interface PipelineProgress { | "scoring" | "processing" | "completed" + | "cancelled" | "failed"; message: string; detail?: string; + crawlingSource: "gradcracker" | "jobspy" | "ukvisajobs" | null; + crawlingSourcesCompleted: number; + crawlingSourcesTotal: number; + crawlingTermsProcessed: number; + crawlingTermsTotal: number; crawlingListPagesProcessed: number; crawlingListPagesTotal: number; crawlingJobCardsFound: number; @@ -56,6 +62,7 @@ const stepLabels: Record = { scoring: "Scoring", processing: "Processing", completed: "Complete", + cancelled: "Cancelled", failed: "Failed", }; @@ -66,9 +73,19 @@ const stepBadgeClasses: Record = { scoring: "bg-amber-500/10 text-amber-400 border-amber-500/20", processing: "bg-primary/10 text-primary border-primary/20", completed: "bg-emerald-500/10 text-emerald-400 border-emerald-500/20", + cancelled: "bg-muted text-muted-foreground border-border", failed: "bg-destructive/10 text-destructive border-destructive/20", }; +const sourceLabel: Record< + Exclude, + string +> = { + gradcracker: "Gradcracker", + jobspy: "JobSpy", + ukvisajobs: "UKVisaJobs", +}; + const clamp = (value: number, min: number, max: number) => Math.max(min, Math.min(max, value)); @@ -83,6 +100,15 @@ export const PipelineProgress: React.FC = ({ switch (progress.step) { case "crawling": { + if (progress.crawlingTermsTotal > 0) { + return clamp( + 5 + + (progress.crawlingTermsProcessed / progress.crawlingTermsTotal) * + 10, + 5, + 15, + ); + } if (progress.crawlingListPagesTotal > 0) { return clamp( (progress.crawlingListPagesProcessed / @@ -119,6 +145,7 @@ export const PipelineProgress: React.FC = ({ return 55; } case "completed": + case "cancelled": case "failed": return 100; default: @@ -162,11 +189,31 @@ export const PipelineProgress: React.FC = ({ } const step = progress?.step ?? "idle"; - const isActive = step !== "idle" && step !== "completed" && step !== "failed"; + const isActive = + step !== "idle" && + step !== "completed" && + step !== "cancelled" && + step !== "failed"; + const listPagesText = progress + ? progress.crawlingListPagesTotal > 0 + ? `${progress.crawlingListPagesProcessed}/${progress.crawlingListPagesTotal}` + : progress.crawlingListPagesProcessed > 0 + ? `${progress.crawlingListPagesProcessed}` + : "—" + : "—"; + const jobPagesText = progress + ? progress.crawlingJobPagesEnqueued > 0 + ? `${progress.crawlingJobPagesProcessed}/${progress.crawlingJobPagesEnqueued}` + : progress.crawlingJobPagesProcessed > 0 + ? `${progress.crawlingJobPagesProcessed}` + : "—" + : "—"; const showStats = !!progress && - ["crawling", "scoring", "processing", "completed"].includes(step); + ["crawling", "scoring", "processing", "completed", "cancelled"].includes( + step, + ); return ( @@ -201,6 +248,23 @@ export const PipelineProgress: React.FC = ({ {progress.detail && (

{progress.detail}

)} + {step === "crawling" && ( +

+ Source:{" "} + {progress.crawlingSource + ? sourceLabel[progress.crawlingSource] + : "starting"} + {" "}({progress.crawlingSourcesCompleted}/ + {Math.max(progress.crawlingSourcesTotal, 0)}) + {progress.crawlingTermsTotal > 0 && ( + <> + {" "} + Terms: {progress.crawlingTermsProcessed}/ + {progress.crawlingTermsTotal} + + )} +

+ )} {showStats && ( @@ -211,21 +275,15 @@ export const PipelineProgress: React.FC = ({ <>
- Sources -
-
- {progress.crawlingListPagesProcessed} - {progress.crawlingListPagesTotal > 0 - ? `/${progress.crawlingListPagesTotal}` - : ""} + List pages
+
{listPagesText}
-
Pages
-
- {progress.crawlingJobPagesProcessed}/ - {Math.max(progress.crawlingJobPagesEnqueued, 0)} +
+ Job pages
+
{jobPagesText}
diff --git a/orchestrator/src/client/pages/OrchestratorPage.test.tsx b/orchestrator/src/client/pages/OrchestratorPage.test.tsx index 7f36976..fb8123e 100644 --- a/orchestrator/src/client/pages/OrchestratorPage.test.tsx +++ b/orchestrator/src/client/pages/OrchestratorPage.test.tsx @@ -9,6 +9,11 @@ import type { FilterTab } from "./orchestrator/constants"; vi.mock("../api", () => ({ updateSettings: vi.fn().mockResolvedValue({}), runPipeline: vi.fn().mockResolvedValue({ message: "ok" }), + cancelPipeline: vi.fn().mockResolvedValue({ + message: "Pipeline cancellation requested", + pipelineRunId: "run-1", + alreadyRequested: false, + }), getPipelineStatus: vi.fn().mockResolvedValue({ isRunning: false, lastRun: null, @@ -16,6 +21,8 @@ vi.mock("../api", () => ({ }), })); +let mockIsPipelineRunning = false; + const jobFixture: Job = { id: "job-1", source: "linkedin", @@ -103,7 +110,7 @@ vi.mock("./orchestrator/useOrchestratorData", () => ({ expired: 0, }, isLoading: false, - isPipelineRunning: false, + isPipelineRunning: mockIsPipelineRunning, setIsPipelineRunning: vi.fn(), loadJobs: vi.fn(), }), @@ -129,7 +136,17 @@ vi.mock("../hooks/useSettings", () => ({ })); vi.mock("./orchestrator/OrchestratorHeader", () => ({ - OrchestratorHeader: () =>
, + OrchestratorHeader: ({ + onCancelPipeline, + }: { + onCancelPipeline: () => void; + }) => ( +
+ +
+ ), })); vi.mock("./orchestrator/OrchestratorSummary", () => ({ @@ -240,6 +257,7 @@ const LocationWatcher = () => { describe("OrchestratorPage", () => { beforeEach(() => { vi.clearAllMocks(); + mockIsPipelineRunning = false; }); it("syncs tab selection to the URL", () => { @@ -261,6 +279,28 @@ describe("OrchestratorPage", () => { expect(screen.getByTestId("location").textContent).toContain("/discovered"); }); + it("requests pipeline cancellation when running", async () => { + mockIsPipelineRunning = true; + window.matchMedia = createMatchMedia( + true, + ) as unknown as typeof window.matchMedia; + + render( + + + } /> + } /> + + , + ); + + fireEvent.click(screen.getByText("Cancel Pipeline")); + + await waitFor(() => { + expect(api.cancelPipeline).toHaveBeenCalledTimes(1); + }); + }); + it("syncs job selection to the URL", async () => { window.matchMedia = createMatchMedia( true, diff --git a/orchestrator/src/client/pages/OrchestratorPage.tsx b/orchestrator/src/client/pages/OrchestratorPage.tsx index d5641f5..b61a42c 100644 --- a/orchestrator/src/client/pages/OrchestratorPage.tsx +++ b/orchestrator/src/client/pages/OrchestratorPage.tsx @@ -137,6 +137,7 @@ export const OrchestratorPage: React.FC = () => { const [isRunModeModalOpen, setIsRunModeModalOpen] = useState(false); const [runMode, setRunMode] = useState("automatic"); const [isDetailDrawerOpen, setIsDetailDrawerOpen] = useState(false); + const [isCancelling, setIsCancelling] = useState(false); const [isDesktop, setIsDesktop] = useState(() => typeof window !== "undefined" ? window.matchMedia("(min-width: 1024px)").matches @@ -232,6 +233,7 @@ export const OrchestratorPage: React.FC = () => { }) => { try { setIsPipelineRunning(true); + setIsCancelling(false); await api.runPipeline(config); toast.message("Pipeline started", { description: `Sources: ${config.sources.join(", ")}. This may take a few minutes.`, @@ -243,8 +245,16 @@ export const OrchestratorPage: React.FC = () => { if (!status.isRunning) { clearInterval(pollInterval); setIsPipelineRunning(false); + setIsCancelling(false); await loadJobs(); - toast.success("Pipeline completed"); + const outcome = status.lastRun?.status; + if (outcome === "cancelled") { + toast.message("Pipeline cancelled"); + } else if (outcome === "failed") { + toast.error(status.lastRun?.errorMessage || "Pipeline failed"); + } else { + toast.success("Pipeline completed"); + } } } catch { // Ignore errors @@ -252,6 +262,7 @@ export const OrchestratorPage: React.FC = () => { }, 5000); } catch (error) { setIsPipelineRunning(false); + setIsCancelling(false); const message = error instanceof Error ? error.message : "Failed to start pipeline"; toast.error(message); @@ -260,6 +271,21 @@ export const OrchestratorPage: React.FC = () => { [loadJobs, setIsPipelineRunning], ); + const handleCancelPipeline = useCallback(async () => { + if (isCancelling || !isPipelineRunning) return; + + try { + setIsCancelling(true); + const result = await api.cancelPipeline(); + toast.message(result.message); + } catch (error) { + setIsCancelling(false); + const message = + error instanceof Error ? error.message : "Failed to cancel pipeline"; + toast.error(message); + } + }, [isCancelling, isPipelineRunning]); + const handleSaveAndRunAutomatic = useCallback( async (values: AutomaticRunValues) => { const limits = deriveExtractorLimits({ @@ -352,8 +378,10 @@ export const OrchestratorPage: React.FC = () => { navOpen={navOpen} onNavOpenChange={setNavOpen} isPipelineRunning={isPipelineRunning} + isCancelling={isCancelling} pipelineSources={pipelineSources} onOpenAutomaticRun={() => openRunMode("automatic")} + onCancelPipeline={handleCancelPipeline} />
diff --git a/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.test.tsx b/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.test.tsx index 554805f..5d4c6b1 100644 --- a/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.test.tsx +++ b/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.test.tsx @@ -28,8 +28,10 @@ const renderHeader = ( navOpen: false, onNavOpenChange: vi.fn(), isPipelineRunning: false, + isCancelling: false, pipelineSources: ["gradcracker"], onOpenAutomaticRun: vi.fn(), + onCancelPipeline: vi.fn(), ...overrides, }; @@ -56,4 +58,10 @@ describe("OrchestratorHeader", () => { screen.queryByRole("button", { name: /manual import/i }), ).not.toBeInTheDocument(); }); + + it("renders cancel button while running and triggers cancel", () => { + const { props } = renderHeader({ isPipelineRunning: true }); + fireEvent.click(screen.getByRole("button", { name: /cancel run/i })); + expect(props.onCancelPipeline).toHaveBeenCalled(); + }); }); diff --git a/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.tsx b/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.tsx index c5ace5c..e11e852 100644 --- a/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.tsx +++ b/orchestrator/src/client/pages/orchestrator/OrchestratorHeader.tsx @@ -1,6 +1,6 @@ import { isNavActive, NAV_LINKS } from "@client/components/navigation"; import type { JobSource } from "@shared/types.js"; -import { Loader2, Menu, Play, Sparkles } from "lucide-react"; +import { Loader2, Menu, Play, Sparkles, Square } from "lucide-react"; import type React from "react"; import { useLocation, useNavigate } from "react-router-dom"; import { Button } from "@/components/ui/button"; @@ -17,16 +17,20 @@ interface OrchestratorHeaderProps { navOpen: boolean; onNavOpenChange: (open: boolean) => void; isPipelineRunning: boolean; + isCancelling: boolean; pipelineSources: JobSource[]; onOpenAutomaticRun: () => void; + onCancelPipeline: () => void; } export const OrchestratorHeader: React.FC = ({ navOpen, onNavOpenChange, isPipelineRunning, + isCancelling, pipelineSources, onOpenAutomaticRun, + onCancelPipeline, }) => { const location = useLocation(); const navigate = useNavigate(); @@ -94,23 +98,31 @@ export const OrchestratorHeader: React.FC = ({
- + ) : ( + + Run pipeline + + )}
diff --git a/orchestrator/src/server/api/routes/pipeline.test.ts b/orchestrator/src/server/api/routes/pipeline.test.ts index 3f89822..c485954 100644 --- a/orchestrator/src/server/api/routes/pipeline.test.ts +++ b/orchestrator/src/server/api/routes/pipeline.test.ts @@ -1,5 +1,5 @@ import type { Server } from "node:http"; -import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { startServer, stopServer } from "./test-utils"; describe.sequential("Pipeline API routes", () => { @@ -46,6 +46,38 @@ describe.sequential("Pipeline API routes", () => { }); }); + it("returns conflict when cancelling with no active pipeline", async () => { + const res = await fetch(`${baseUrl}/api/pipeline/cancel`, { + method: "POST", + }); + const body = await res.json(); + + expect(res.status).toBe(409); + expect(body.ok).toBe(false); + expect(body.error.code).toBe("CONFLICT"); + expect(typeof body.meta.requestId).toBe("string"); + }); + + it("accepts cancellation when pipeline is running", async () => { + const { requestPipelineCancel } = await import("../../pipeline/index"); + vi.mocked(requestPipelineCancel).mockReturnValue({ + accepted: true, + pipelineRunId: "run-1", + alreadyRequested: false, + }); + + const res = await fetch(`${baseUrl}/api/pipeline/cancel`, { + method: "POST", + }); + const body = await res.json(); + + expect(res.status).toBe(200); + expect(body.ok).toBe(true); + expect(body.data.pipelineRunId).toBe("run-1"); + expect(body.data.alreadyRequested).toBe(false); + expect(typeof body.meta.requestId).toBe("string"); + }); + it("streams pipeline progress over SSE", async () => { const controller = new AbortController(); const res = await fetch(`${baseUrl}/api/pipeline/progress`, { @@ -61,6 +93,8 @@ describe.sequential("Pipeline API routes", () => { const { value } = await reader.read(); const text = new TextDecoder().decode(value); expect(text).toContain("data:"); + expect(text).toContain('"crawlingSource"'); + expect(text).toContain('"crawlingSourcesTotal"'); } finally { await reader.cancel(); controller.abort(); diff --git a/orchestrator/src/server/api/routes/pipeline.ts b/orchestrator/src/server/api/routes/pipeline.ts index a22aabb..7b873a3 100644 --- a/orchestrator/src/server/api/routes/pipeline.ts +++ b/orchestrator/src/server/api/routes/pipeline.ts @@ -1,4 +1,4 @@ -import { AppError, badRequest, requestTimeout } from "@infra/errors"; +import { AppError, badRequest, conflict, requestTimeout } from "@infra/errors"; import { fail, ok, okWithMeta } from "@infra/http"; import { logger } from "@infra/logger"; import { runWithRequestContext } from "@infra/request-context"; @@ -8,6 +8,7 @@ import { z } from "zod"; import { isDemoMode } from "../../config/demo"; import { getPipelineStatus, + requestPipelineCancel, runPipeline, subscribeToProgress, } from "../../pipeline/index"; @@ -135,3 +136,40 @@ pipelineRouter.post("/run", async (req: Request, res: Response) => { ); } }); + +/** + * POST /api/pipeline/cancel - Request cancellation of active pipeline run + */ +pipelineRouter.post("/cancel", async (_req: Request, res: Response) => { + try { + const cancelResult = requestPipelineCancel(); + if (!cancelResult.accepted) { + return fail(res, conflict("No running pipeline to cancel")); + } + + logger.info("Pipeline cancellation requested", { + route: "/api/pipeline/cancel", + action: "cancel", + status: "accepted", + pipelineRunId: cancelResult.pipelineRunId, + alreadyRequested: cancelResult.alreadyRequested, + }); + + ok(res, { + message: cancelResult.alreadyRequested + ? "Pipeline cancellation already requested" + : "Pipeline cancellation requested", + pipelineRunId: cancelResult.pipelineRunId, + alreadyRequested: cancelResult.alreadyRequested, + }); + } catch (error) { + fail( + res, + new AppError({ + status: 500, + code: "INTERNAL_ERROR", + message: error instanceof Error ? error.message : "Unknown error", + }), + ); + } +}); diff --git a/orchestrator/src/server/api/routes/test-utils.ts b/orchestrator/src/server/api/routes/test-utils.ts index cb59d12..d86c183 100644 --- a/orchestrator/src/server/api/routes/test-utils.ts +++ b/orchestrator/src/server/api/routes/test-utils.ts @@ -8,6 +8,11 @@ vi.mock("../../pipeline/index", () => { const progress = { step: "idle", message: "Ready", + crawlingSource: null, + crawlingSourcesCompleted: 0, + crawlingSourcesTotal: 0, + crawlingTermsProcessed: 0, + crawlingTermsTotal: 0, crawlingListPagesProcessed: 0, crawlingListPagesTotal: 0, crawlingJobCardsFound: 0, @@ -30,6 +35,12 @@ vi.mock("../../pipeline/index", () => { summarizeJob: vi.fn().mockResolvedValue({ success: true }), generateFinalPdf: vi.fn().mockResolvedValue({ success: true }), getPipelineStatus: vi.fn(() => ({ isRunning: false })), + requestPipelineCancel: vi.fn(() => ({ + accepted: false, + pipelineRunId: null, + alreadyRequested: false, + })), + isPipelineCancelRequested: vi.fn(() => false), subscribeToProgress: vi.fn((listener: (data: unknown) => void) => { listener(progress); return () => {}; diff --git a/orchestrator/src/server/db/clear.ts b/orchestrator/src/server/db/clear.ts index e752844..a4f3443 100644 --- a/orchestrator/src/server/db/clear.ts +++ b/orchestrator/src/server/db/clear.ts @@ -2,6 +2,7 @@ * Database utility scripts. */ +import { existsSync, unlinkSync } from "node:fs"; import { join } from "node:path"; import Database from "better-sqlite3"; import { getDataDir } from "../config/dataDir"; @@ -38,8 +39,6 @@ export function clearDatabase(): { jobsDeleted: number; runsDeleted: number } { * Delete database file completely (will recreate on next run). */ export function dropDatabase(): void { - const { unlinkSync, existsSync } = require("node:fs"); - if (existsSync(DB_PATH)) { unlinkSync(DB_PATH); console.log("🗑️ Database file deleted"); diff --git a/orchestrator/src/server/db/migrate.ts b/orchestrator/src/server/db/migrate.ts index 7ca9c84..2b02378 100644 --- a/orchestrator/src/server/db/migrate.ts +++ b/orchestrator/src/server/db/migrate.ts @@ -80,7 +80,7 @@ const migrations = [ id TEXT PRIMARY KEY, started_at TEXT NOT NULL DEFAULT (datetime('now')), completed_at TEXT, - status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running', 'completed', 'failed')), + status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running', 'completed', 'failed', 'cancelled')), jobs_discovered INTEGER NOT NULL DEFAULT 0, jobs_processed INTEGER NOT NULL DEFAULT 0, error_message TEXT @@ -179,6 +179,22 @@ const migrations = [ `ALTER TABLE stage_events ADD COLUMN title TEXT NOT NULL DEFAULT ''`, `ALTER TABLE stage_events ADD COLUMN group_id TEXT`, + // Ensure pipeline_runs status supports "cancelled" for existing databases. + `CREATE TABLE IF NOT EXISTS pipeline_runs_new ( + id TEXT PRIMARY KEY, + started_at TEXT NOT NULL DEFAULT (datetime('now')), + completed_at TEXT, + status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running', 'completed', 'failed', 'cancelled')), + jobs_discovered INTEGER NOT NULL DEFAULT 0, + jobs_processed INTEGER NOT NULL DEFAULT 0, + error_message TEXT + )`, + `INSERT OR REPLACE INTO pipeline_runs_new (id, started_at, completed_at, status, jobs_discovered, jobs_processed, error_message) + SELECT id, started_at, completed_at, status, jobs_discovered, jobs_processed, error_message + FROM pipeline_runs`, + `DROP TABLE IF EXISTS pipeline_runs`, + `ALTER TABLE pipeline_runs_new RENAME TO pipeline_runs`, + `CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)`, `CREATE INDEX IF NOT EXISTS idx_jobs_discovered_at ON jobs(discovered_at)`, `CREATE INDEX IF NOT EXISTS idx_pipeline_runs_started_at ON pipeline_runs(started_at)`, diff --git a/orchestrator/src/server/db/schema.ts b/orchestrator/src/server/db/schema.ts index c1b2bf7..6a41bfa 100644 --- a/orchestrator/src/server/db/schema.ts +++ b/orchestrator/src/server/db/schema.ts @@ -141,7 +141,7 @@ export const pipelineRuns = sqliteTable("pipeline_runs", { startedAt: text("started_at").notNull().default(sql`(datetime('now'))`), completedAt: text("completed_at"), status: text("status", { - enum: ["running", "completed", "failed"], + enum: ["running", "completed", "failed", "cancelled"], }) .notNull() .default("running"), diff --git a/orchestrator/src/server/pipeline/cancellation.test.ts b/orchestrator/src/server/pipeline/cancellation.test.ts new file mode 100644 index 0000000..a3066af --- /dev/null +++ b/orchestrator/src/server/pipeline/cancellation.test.ts @@ -0,0 +1,84 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const stepState = vi.hoisted(() => { + let resolveDiscover: + | ((value: { discoveredJobs: []; sourceErrors: [] }) => void) + | null = null; + return { + setResolver: ( + fn: (value: { discoveredJobs: []; sourceErrors: [] }) => void, + ) => { + resolveDiscover = fn; + }, + resolveDiscover: () => + resolveDiscover?.({ discoveredJobs: [], sourceErrors: [] }), + }; +}); + +vi.mock("../repositories/pipeline", () => ({ + createPipelineRun: vi.fn(async () => ({ + id: "run-cancel-1", + startedAt: new Date().toISOString(), + completedAt: null, + status: "running", + jobsDiscovered: 0, + jobsProcessed: 0, + errorMessage: null, + })), + updatePipelineRun: vi.fn(async () => undefined), +})); + +vi.mock("./steps", () => ({ + loadProfileStep: vi.fn(async () => ({})), + discoverJobsStep: vi.fn( + () => + new Promise<{ discoveredJobs: []; sourceErrors: [] }>((resolve) => { + stepState.setResolver(resolve); + }), + ), + importJobsStep: vi.fn(async () => ({ created: 0, skipped: 0 })), + scoreJobsStep: vi.fn(async () => ({ unprocessedJobs: [], scoredJobs: [] })), + selectJobsStep: vi.fn(() => []), + processJobsStep: vi.fn(async () => ({ processedCount: 0 })), + notifyPipelineWebhookStep: vi.fn(async () => undefined), +})); + +describe.sequential("pipeline cancellation", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("marks run as cancelled at checkpoint and resets running state", async () => { + const pipeline = await import("./orchestrator"); + const pipelineRepo = await import("../repositories/pipeline"); + const steps = await import("./steps"); + + const runPromise = pipeline.runPipeline({ sources: [] }); + + await Promise.resolve(); + + const cancelRequest = pipeline.requestPipelineCancel(); + expect(cancelRequest.accepted).toBe(true); + expect([null, "run-cancel-1"]).toContain(cancelRequest.pipelineRunId); + expect(pipeline.isPipelineCancelRequested()).toBe(true); + + const duplicateRequest = pipeline.requestPipelineCancel(); + expect(duplicateRequest.accepted).toBe(true); + expect(duplicateRequest.alreadyRequested).toBe(true); + + stepState.resolveDiscover(); + const result = await runPromise; + + expect(result.success).toBe(false); + expect(result.error).toContain("Cancelled"); + expect(vi.mocked(steps.importJobsStep)).not.toHaveBeenCalled(); + expect(vi.mocked(pipelineRepo.updatePipelineRun)).toHaveBeenCalledWith( + "run-cancel-1", + expect.objectContaining({ + status: "cancelled", + }), + ); + expect(pipeline.getPipelineStatus().isRunning).toBe(false); + expect(pipeline.isPipelineCancelRequested()).toBe(false); + }); +}); diff --git a/orchestrator/src/server/pipeline/orchestrator.ts b/orchestrator/src/server/pipeline/orchestrator.ts index 8fa9401..4dc4132 100644 --- a/orchestrator/src/server/pipeline/orchestrator.ts +++ b/orchestrator/src/server/pipeline/orchestrator.ts @@ -47,6 +47,21 @@ const DEFAULT_CONFIG: PipelineConfig = { // Track if pipeline is currently running let isPipelineRunning = false; +let activePipelineRunId: string | null = null; +let cancelRequestedAt: string | null = null; + +class PipelineCancelledError extends Error { + constructor(message = "Pipeline cancellation requested") { + super(message); + this.name = "PipelineCancelledError"; + } +} + +function ensureNotCancelled(): void { + if (cancelRequestedAt) { + throw new PipelineCancelledError(); + } +} /** * Run the full job discovery and processing pipeline. @@ -69,12 +84,18 @@ export async function runPipeline( } isPipelineRunning = true; + activePipelineRunId = "pending"; + cancelRequestedAt = null; resetProgress(); const mergedConfig = { ...DEFAULT_CONFIG, ...config }; const pipelineRun = await pipelineRepo.createPipelineRun(); + activePipelineRunId = pipelineRun.id; + return runWithRequestContext({ pipelineRunId: pipelineRun.id }, async () => { const pipelineLogger = logger.child({ pipelineRunId: pipelineRun.id }); + let jobsDiscovered = 0; + let jobsProcessed = 0; pipelineLogger.info("Starting pipeline run", { topN: mergedConfig.topN, minSuitabilityScore: mergedConfig.minSuitabilityScore, @@ -82,18 +103,30 @@ export async function runPipeline( }); try { + ensureNotCancelled(); const profile = await loadProfileStep(); - const { discoveredJobs } = await discoverJobsStep({ mergedConfig }); + ensureNotCancelled(); + const { discoveredJobs } = await discoverJobsStep({ + mergedConfig, + shouldCancel: () => cancelRequestedAt !== null, + }); + ensureNotCancelled(); const { created } = await importJobsStep({ discoveredJobs }); + jobsDiscovered = created; await pipelineRepo.updatePipelineRun(pipelineRun.id, { jobsDiscovered: created, }); - const { unprocessedJobs, scoredJobs } = await scoreJobsStep({ profile }); + ensureNotCancelled(); + const { unprocessedJobs, scoredJobs } = await scoreJobsStep({ + profile, + shouldCancel: () => cancelRequestedAt !== null, + }); + ensureNotCancelled(); const jobsToProcess = selectJobsStep({ scoredJobs, mergedConfig, @@ -106,7 +139,9 @@ export async function runPipeline( const { processedCount } = await processJobsStep({ jobsToProcess, processJob, + shouldCancel: () => cancelRequestedAt !== null, }); + jobsProcessed = processedCount; await pipelineRepo.updatePipelineRun(pipelineRun.id, { status: "completed", @@ -133,6 +168,28 @@ export async function runPipeline( jobsProcessed: processedCount, }; } catch (error) { + if (error instanceof PipelineCancelledError) { + const message = "Cancelled by user request"; + await pipelineRepo.updatePipelineRun(pipelineRun.id, { + status: "cancelled", + completedAt: new Date().toISOString(), + jobsDiscovered, + jobsProcessed, + errorMessage: message, + }); + progressHelpers.cancelled(message); + pipelineLogger.info("Pipeline run cancelled", { + jobsDiscovered, + jobsProcessed, + }); + return { + success: false, + jobsDiscovered, + jobsProcessed, + error: message, + }; + } + const message = error instanceof Error ? error.message : "Unknown error"; await pipelineRepo.updatePipelineRun(pipelineRun.id, { @@ -151,12 +208,14 @@ export async function runPipeline( return { success: false, - jobsDiscovered: 0, - jobsProcessed: 0, + jobsDiscovered, + jobsProcessed, error: message, }; } finally { isPipelineRunning = false; + activePipelineRunId = null; + cancelRequestedAt = null; } }); } @@ -340,3 +399,37 @@ export async function processJob( export function getPipelineStatus(): { isRunning: boolean } { return { isRunning: isPipelineRunning }; } + +export function requestPipelineCancel(): { + accepted: boolean; + pipelineRunId: string | null; + alreadyRequested: boolean; +} { + if (!isPipelineRunning) { + return { accepted: false, pipelineRunId: null, alreadyRequested: false }; + } + + const pipelineRunId = + activePipelineRunId && activePipelineRunId !== "pending" + ? activePipelineRunId + : null; + + if (cancelRequestedAt) { + return { + accepted: true, + pipelineRunId, + alreadyRequested: true, + }; + } + + cancelRequestedAt = new Date().toISOString(); + return { + accepted: true, + pipelineRunId, + alreadyRequested: false, + }; +} + +export function isPipelineCancelRequested(): boolean { + return cancelRequestedAt !== null; +} diff --git a/orchestrator/src/server/pipeline/progress.ts b/orchestrator/src/server/pipeline/progress.ts index edbbdd9..43cf2b1 100644 --- a/orchestrator/src/server/pipeline/progress.ts +++ b/orchestrator/src/server/pipeline/progress.ts @@ -11,12 +11,20 @@ export type PipelineStep = | "scoring" | "processing" | "completed" + | "cancelled" | "failed"; +export type CrawlSource = "gradcracker" | "jobspy" | "ukvisajobs"; + export interface PipelineProgress { step: PipelineStep; message: string; detail?: string; + crawlingSource: CrawlSource | null; + crawlingSourcesCompleted: number; + crawlingSourcesTotal: number; + crawlingTermsProcessed: number; + crawlingTermsTotal: number; crawlingListPagesProcessed: number; crawlingListPagesTotal: number; crawlingJobCardsFound: number; @@ -46,6 +54,11 @@ const listeners: Set = new Set(); let currentProgress: PipelineProgress = { step: "idle", message: "Ready", + crawlingSource: null, + crawlingSourcesCompleted: 0, + crawlingSourcesTotal: 0, + crawlingTermsProcessed: 0, + crawlingTermsTotal: 0, crawlingListPagesProcessed: 0, crawlingListPagesTotal: 0, crawlingJobCardsFound: 0, @@ -58,6 +71,19 @@ let currentProgress: PipelineProgress = { totalToProcess: 0, }; +const emptyCrawlingStats = { + crawlingTermsProcessed: 0, + crawlingTermsTotal: 0, + crawlingListPagesProcessed: 0, + crawlingListPagesTotal: 0, + crawlingJobCardsFound: 0, + crawlingJobPagesEnqueued: 0, + crawlingJobPagesSkipped: 0, + crawlingJobPagesProcessed: 0, + crawlingPhase: undefined, + crawlingCurrentUrl: undefined, +}; + /** * Update the current progress and notify all listeners. */ @@ -103,14 +129,10 @@ export function resetProgress(): void { currentProgress = { step: "idle", message: "Ready", - crawlingListPagesProcessed: 0, - crawlingListPagesTotal: 0, - crawlingJobCardsFound: 0, - crawlingJobPagesEnqueued: 0, - crawlingJobPagesSkipped: 0, - crawlingJobPagesProcessed: 0, - crawlingPhase: undefined, - crawlingCurrentUrl: undefined, + crawlingSource: null, + crawlingSourcesCompleted: 0, + crawlingSourcesTotal: 0, + ...emptyCrawlingStats, jobsDiscovered: 0, jobsScored: 0, jobsProcessed: 0, @@ -122,27 +144,51 @@ export function resetProgress(): void { * Helper to create progress updates for each step. */ export const progressHelpers = { - startCrawling: () => + startCrawling: (sourcesTotal = 0) => updateProgress({ step: "crawling", message: "Fetching jobs from sources...", detail: "Starting crawler", startedAt: new Date().toISOString(), - crawlingListPagesProcessed: 0, - crawlingListPagesTotal: 0, - crawlingJobCardsFound: 0, - crawlingJobPagesEnqueued: 0, - crawlingJobPagesSkipped: 0, - crawlingJobPagesProcessed: 0, - crawlingPhase: undefined, - crawlingCurrentUrl: undefined, + crawlingSource: null, + crawlingSourcesCompleted: 0, + crawlingSourcesTotal: sourcesTotal, + ...emptyCrawlingStats, jobsDiscovered: 0, jobsScored: 0, jobsProcessed: 0, totalToProcess: 0, }), + startSource: ( + source: CrawlSource, + sourcesCompleted: number, + sourcesTotal: number, + options?: { termsTotal?: number; detail?: string }, + ) => + updateProgress({ + step: "crawling", + message: `Fetching jobs from ${source}...`, + detail: options?.detail, + crawlingSource: source, + crawlingSourcesCompleted: sourcesCompleted, + crawlingSourcesTotal: sourcesTotal, + ...emptyCrawlingStats, + crawlingTermsTotal: options?.termsTotal ?? 0, + }), + + completeSource: (sourcesCompleted: number, sourcesTotal: number) => + updateProgress({ + crawlingSourcesCompleted: sourcesCompleted, + crawlingSourcesTotal: sourcesTotal, + crawlingCurrentUrl: undefined, + crawlingPhase: undefined, + }), + crawlingUpdate: (update: { + source?: CrawlSource; + termsProcessed?: number; + termsTotal?: number; listPagesProcessed?: number; listPagesTotal?: number; jobCardsFound?: number; @@ -155,6 +201,10 @@ export const progressHelpers = { const current = getProgress(); const next = { ...current, + crawlingSource: update.source ?? current.crawlingSource, + crawlingTermsProcessed: + update.termsProcessed ?? current.crawlingTermsProcessed, + crawlingTermsTotal: update.termsTotal ?? current.crawlingTermsTotal, crawlingListPagesProcessed: update.listPagesProcessed ?? current.crawlingListPagesProcessed, crawlingListPagesTotal: @@ -177,6 +227,10 @@ export const progressHelpers = { : `${next.crawlingListPagesProcessed}`; const pagesPart = `${next.crawlingJobPagesProcessed}/${next.crawlingJobPagesEnqueued}`; + const termsPart = + next.crawlingTermsTotal > 0 + ? `, terms ${next.crawlingTermsProcessed}/${next.crawlingTermsTotal}` + : ""; const skippedPart = next.crawlingJobPagesSkipped > 0 ? `, skipped ${next.crawlingJobPagesSkipped}` @@ -186,7 +240,7 @@ export const progressHelpers = { ? `, cards ${next.crawlingJobCardsFound}` : ""; - const message = `Crawling jobs (${sourcesPart} sources, pages ${pagesPart}${skippedPart}${cardsPart})...`; + const message = `Crawling jobs (list pages ${sourcesPart}, job pages ${pagesPart}${termsPart}${skippedPart}${cardsPart})...`; const detail = next.crawlingCurrentUrl && next.crawlingPhase ? `${next.crawlingPhase === "list" ? "List" : "Job"}: ${next.crawlingCurrentUrl}` @@ -198,6 +252,9 @@ export const progressHelpers = { step: "crawling", message, detail, + crawlingSource: next.crawlingSource, + crawlingTermsProcessed: next.crawlingTermsProcessed, + crawlingTermsTotal: next.crawlingTermsTotal, crawlingListPagesProcessed: next.crawlingListPagesProcessed, crawlingListPagesTotal: next.crawlingListPagesTotal, crawlingJobCardsFound: next.crawlingJobCardsFound, @@ -215,6 +272,7 @@ export const progressHelpers = { message: `Found ${jobsFound} jobs, importing to database...`, detail: "Deduplicating and saving", jobsDiscovered: jobsFound, + crawlingSource: null, crawlingCurrentUrl: undefined, }), @@ -283,6 +341,15 @@ export const progressHelpers = { currentJob: undefined, }), + cancelled: (reason: string) => + updateProgress({ + step: "cancelled", + message: "Pipeline cancelled", + detail: reason, + completedAt: new Date().toISOString(), + currentJob: undefined, + }), + failed: (error: string) => updateProgress({ step: "failed", diff --git a/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts b/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts index 914b02d..53fac4b 100644 --- a/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts +++ b/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts @@ -1,5 +1,6 @@ import type { PipelineConfig } from "@shared/types"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { getProgress, resetProgress } from "../progress"; import { discoverJobsStep } from "./discover-jobs"; vi.mock("../../repositories/jobs", () => ({ @@ -36,6 +37,7 @@ const config: PipelineConfig = { describe("discoverJobsStep", () => { beforeEach(() => { vi.clearAllMocks(); + resetProgress(); }); it("applies jobspySites setting and aggregates source errors", async () => { @@ -96,4 +98,165 @@ describe("discoverJobsStep", () => { }), ).rejects.toThrow("All sources failed: ukvisajobs: boom"); }); + + it("maps Gradcracker progress callback into live crawling counters", async () => { + const settingsRepo = await import("../../repositories/settings"); + const crawler = await import("../../services/crawler"); + + vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ + searchTerms: JSON.stringify(["engineer"]), + } as any); + + vi.mocked(crawler.runCrawler).mockImplementation(async (options: any) => { + options?.onProgress?.({ + phase: "list", + currentUrl: "https://example.com/list", + listPagesProcessed: 3, + listPagesTotal: 10, + jobCardsFound: 42, + jobPagesEnqueued: 30, + jobPagesSkipped: 4, + jobPagesProcessed: 8, + }); + return { success: true, jobs: [] } as any; + }); + + await discoverJobsStep({ + mergedConfig: { + ...config, + sources: ["gradcracker"], + }, + }); + + const progress = getProgress(); + expect(progress.crawlingSource).toBeNull(); + expect(progress.crawlingListPagesProcessed).toBe(3); + expect(progress.crawlingListPagesTotal).toBe(10); + expect(progress.crawlingJobCardsFound).toBe(42); + expect(progress.crawlingJobPagesEnqueued).toBe(30); + expect(progress.crawlingJobPagesSkipped).toBe(4); + expect(progress.crawlingJobPagesProcessed).toBe(8); + }); + + it("updates JobSpy terms and UKVisa pages via progress callbacks", async () => { + const settingsRepo = await import("../../repositories/settings"); + const jobSpy = await import("../../services/jobspy"); + const ukVisa = await import("../../services/ukvisajobs"); + + vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ + searchTerms: JSON.stringify(["engineer", "frontend"]), + jobspySites: JSON.stringify(["linkedin"]), + } as any); + + vi.mocked(jobSpy.runJobSpy).mockImplementation(async (options: any) => { + options?.onProgress?.({ + type: "term_start", + termIndex: 1, + termTotal: 2, + searchTerm: "engineer", + }); + options?.onProgress?.({ + type: "term_complete", + termIndex: 1, + termTotal: 2, + searchTerm: "engineer", + jobsFoundTerm: 10, + }); + options?.onProgress?.({ + type: "term_start", + termIndex: 2, + termTotal: 2, + searchTerm: "frontend", + }); + options?.onProgress?.({ + type: "term_complete", + termIndex: 2, + termTotal: 2, + searchTerm: "frontend", + jobsFoundTerm: 8, + }); + return { success: true, jobs: [] } as any; + }); + + vi.mocked(ukVisa.runUkVisaJobs).mockImplementation(async (options: any) => { + options?.onProgress?.({ + type: "init", + termIndex: 1, + termTotal: 2, + searchTerm: "engineer", + maxPages: 4, + maxJobs: 50, + }); + options?.onProgress?.({ + type: "page_fetched", + termIndex: 1, + termTotal: 2, + searchTerm: "engineer", + pageNo: 2, + maxPages: 4, + jobsOnPage: 15, + totalCollected: 18, + totalAvailable: 100, + }); + options?.onProgress?.({ + type: "term_complete", + termIndex: 1, + termTotal: 2, + searchTerm: "engineer", + jobsFoundTerm: 18, + totalCollected: 18, + }); + return { success: true, jobs: [] } as any; + }); + + await discoverJobsStep({ + mergedConfig: { + ...config, + sources: ["linkedin", "ukvisajobs"], + }, + }); + + const progress = getProgress(); + expect(progress.crawlingTermsProcessed).toBe(1); + expect(progress.crawlingTermsTotal).toBe(2); + expect(progress.crawlingListPagesProcessed).toBe(2); + expect(progress.crawlingListPagesTotal).toBe(4); + expect(progress.crawlingJobPagesEnqueued).toBe(18); + expect(progress.crawlingJobPagesProcessed).toBe(18); + }); + + it("tracks source completion counters across source transitions", async () => { + const settingsRepo = await import("../../repositories/settings"); + const jobSpy = await import("../../services/jobspy"); + const crawler = await import("../../services/crawler"); + const ukVisa = await import("../../services/ukvisajobs"); + + vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ + searchTerms: JSON.stringify(["engineer"]), + } as any); + + vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ + success: true, + jobs: [], + } as any); + vi.mocked(crawler.runCrawler).mockResolvedValue({ + success: true, + jobs: [], + } as any); + vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({ + success: true, + jobs: [], + } as any); + + await discoverJobsStep({ + mergedConfig: { + ...config, + sources: ["linkedin", "gradcracker", "ukvisajobs"], + }, + }); + + const progress = getProgress(); + expect(progress.crawlingSourcesTotal).toBe(3); + expect(progress.crawlingSourcesCompleted).toBe(3); + }); }); diff --git a/orchestrator/src/server/pipeline/steps/discover-jobs.ts b/orchestrator/src/server/pipeline/steps/discover-jobs.ts index 749e9d0..16fc315 100644 --- a/orchestrator/src/server/pipeline/steps/discover-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/discover-jobs.ts @@ -9,12 +9,12 @@ import { progressHelpers, updateProgress } from "../progress"; export async function discoverJobsStep(args: { mergedConfig: PipelineConfig; + shouldCancel?: () => boolean; }): Promise<{ discoveredJobs: CreateJobInput[]; sourceErrors: string[]; }> { logger.info("Running discovery step"); - progressHelpers.startCrawling(); const discoveredJobs: CreateJobInput[] = []; const sourceErrors: string[] = []; @@ -52,9 +52,31 @@ export async function discoverJobsStep(args: { } } - if (jobSpySites.length > 0) { - updateProgress({ - step: "crawling", + const shouldRunJobSpy = jobSpySites.length > 0; + const shouldRunGradcracker = + args.mergedConfig.sources.includes("gradcracker"); + const shouldRunUkVisaJobs = args.mergedConfig.sources.includes("ukvisajobs"); + + const totalSources = + Number(shouldRunJobSpy) + + Number(shouldRunGradcracker) + + Number(shouldRunUkVisaJobs); + let completedSources = 0; + + progressHelpers.startCrawling(totalSources); + + const markSourceComplete = () => { + completedSources += 1; + progressHelpers.completeSource(completedSources, totalSources); + }; + + if (args.shouldCancel?.()) { + return { discoveredJobs, sourceErrors }; + } + + if (shouldRunJobSpy) { + progressHelpers.startSource("jobspy", completedSources, totalSources, { + termsTotal: searchTerms.length, detail: `JobSpy: scraping ${jobSpySites.join(", ")}...`, }); @@ -79,6 +101,34 @@ export async function discoverJobsStep(args: { settings.jobspyIsRemote !== undefined ? settings.jobspyIsRemote === "1" : undefined, + onProgress: (event) => { + if (event.type === "term_start") { + progressHelpers.crawlingUpdate({ + source: "jobspy", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm, + }); + updateProgress({ + step: "crawling", + detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }); + return; + } + + progressHelpers.crawlingUpdate({ + source: "jobspy", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm, + }); + updateProgress({ + step: "crawling", + detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`, + }); + }, }); if (!jobSpyResult.success) { @@ -86,10 +136,18 @@ export async function discoverJobsStep(args: { } else { discoveredJobs.push(...jobSpyResult.jobs); } + + markSourceComplete(); } - if (args.mergedConfig.sources.includes("gradcracker")) { - updateProgress({ step: "crawling", detail: "Gradcracker: scraping..." }); + if (args.shouldCancel?.()) { + return { discoveredJobs, sourceErrors }; + } + + if (shouldRunGradcracker) { + progressHelpers.startSource("gradcracker", completedSources, totalSources, { + detail: "Gradcracker: scraping...", + }); const existingJobUrls = await jobsRepo.getAllJobUrls(); const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm @@ -101,16 +159,17 @@ export async function discoverJobsStep(args: { searchTerms, maxJobsPerTerm: gradcrackerMaxJobs, onProgress: (progress) => { - if (progress.listPagesTotal && progress.listPagesTotal > 0) { - const percent = Math.round( - ((progress.listPagesProcessed ?? 0) / progress.listPagesTotal) * - 100, - ); - updateProgress({ - step: "crawling", - detail: `Gradcracker: ${percent}% (scan ${progress.listPagesProcessed}/${progress.listPagesTotal}, found ${progress.jobCardsFound})`, - }); - } + progressHelpers.crawlingUpdate({ + source: "gradcracker", + listPagesProcessed: progress.listPagesProcessed, + listPagesTotal: progress.listPagesTotal, + jobCardsFound: progress.jobCardsFound, + jobPagesEnqueued: progress.jobPagesEnqueued, + jobPagesSkipped: progress.jobPagesSkipped, + jobPagesProcessed: progress.jobPagesProcessed, + phase: progress.phase, + currentUrl: progress.currentUrl, + }); }, }); @@ -121,11 +180,17 @@ export async function discoverJobsStep(args: { } else { discoveredJobs.push(...crawlerResult.jobs); } + + markSourceComplete(); } - if (args.mergedConfig.sources.includes("ukvisajobs")) { - updateProgress({ - step: "crawling", + if (args.shouldCancel?.()) { + return { discoveredJobs, sourceErrors }; + } + + if (shouldRunUkVisaJobs) { + progressHelpers.startSource("ukvisajobs", completedSources, totalSources, { + termsTotal: searchTerms.length, detail: "UKVisaJobs: scraping visa-sponsoring jobs...", }); @@ -136,6 +201,76 @@ export async function discoverJobsStep(args: { const ukVisaResult = await runUkVisaJobs({ maxJobs: ukvisajobsMaxJobs, searchTerms, + onProgress: (event) => { + if (event.type === "init") { + progressHelpers.crawlingUpdate({ + source: "ukvisajobs", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: 0, + listPagesTotal: event.maxPages, + jobPagesEnqueued: 0, + jobPagesProcessed: 0, + jobPagesSkipped: 0, + phase: "list", + currentUrl: event.searchTerm || "all jobs", + }); + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, + }); + return; + } + + if (event.type === "page_fetched") { + progressHelpers.crawlingUpdate({ + source: "ukvisajobs", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: event.pageNo, + listPagesTotal: event.maxPages, + jobPagesEnqueued: event.totalCollected, + jobPagesProcessed: event.totalCollected, + phase: "list", + currentUrl: `page ${event.pageNo}/${event.maxPages}`, + }); + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`, + }); + return; + } + + if (event.type === "term_complete") { + progressHelpers.crawlingUpdate({ + source: "ukvisajobs", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm || "all jobs", + }); + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, + }); + return; + } + + if (event.type === "empty_page") { + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`, + }); + return; + } + + if (event.type === "error") { + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: ${event.message}`, + }); + } + }, }); if (!ukVisaResult.success) { @@ -143,6 +278,8 @@ export async function discoverJobsStep(args: { } else { discoveredJobs.push(...ukVisaResult.jobs); } + + markSourceComplete(); } if (discoveredJobs.length === 0 && sourceErrors.length > 0) { diff --git a/orchestrator/src/server/pipeline/steps/process-jobs.ts b/orchestrator/src/server/pipeline/steps/process-jobs.ts index 2b20920..018ca94 100644 --- a/orchestrator/src/server/pipeline/steps/process-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/process-jobs.ts @@ -10,6 +10,7 @@ type ProcessJobFn = ( export async function processJobsStep(args: { jobsToProcess: ScoredJob[]; processJob: ProcessJobFn; + shouldCancel?: () => boolean; }): Promise<{ processedCount: number }> { let processedCount = 0; @@ -21,6 +22,8 @@ export async function processJobsStep(args: { }); for (let i = 0; i < args.jobsToProcess.length; i++) { + if (args.shouldCancel?.()) break; + const job = args.jobsToProcess[i]; progressHelpers.processingJob(i + 1, args.jobsToProcess.length, job); diff --git a/orchestrator/src/server/pipeline/steps/score-jobs.ts b/orchestrator/src/server/pipeline/steps/score-jobs.ts index c54f9fb..e488f93 100644 --- a/orchestrator/src/server/pipeline/steps/score-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/score-jobs.ts @@ -9,6 +9,7 @@ import type { ScoredJob } from "./types"; export async function scoreJobsStep(args: { profile: Record; + shouldCancel?: () => boolean; }): Promise<{ unprocessedJobs: Job[]; scoredJobs: ScoredJob[] }> { logger.info("Running scoring step"); const unprocessedJobs = await jobsRepo.getUnscoredDiscoveredJobs(); @@ -33,6 +34,8 @@ export async function scoreJobsStep(args: { const scoredJobs: ScoredJob[] = []; for (let i = 0; i < unprocessedJobs.length; i++) { + if (args.shouldCancel?.()) break; + const job = unprocessedJobs[i]; const hasCachedScore = typeof job.suitabilityScore === "number" && diff --git a/orchestrator/src/server/repositories/pipeline.ts b/orchestrator/src/server/repositories/pipeline.ts index 53fdbec..9e74a2e 100644 --- a/orchestrator/src/server/repositories/pipeline.ts +++ b/orchestrator/src/server/repositories/pipeline.ts @@ -40,7 +40,7 @@ export async function updatePipelineRun( id: string, update: Partial<{ completedAt: string; - status: "running" | "completed" | "failed"; + status: "running" | "completed" | "failed" | "cancelled"; jobsDiscovered: number; jobsProcessed: number; errorMessage: string; diff --git a/orchestrator/src/server/services/jobspy.test.ts b/orchestrator/src/server/services/jobspy.test.ts new file mode 100644 index 0000000..4dca5d6 --- /dev/null +++ b/orchestrator/src/server/services/jobspy.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from "vitest"; +import { parseJobSpyProgressLine } from "./jobspy"; + +describe("parseJobSpyProgressLine", () => { + it("parses term_start progress lines", () => { + const event = parseJobSpyProgressLine( + 'JOBOPS_PROGRESS {"event":"term_start","termIndex":1,"termTotal":3,"searchTerm":"engineer"}', + ); + + expect(event).toEqual({ + type: "term_start", + termIndex: 1, + termTotal: 3, + searchTerm: "engineer", + }); + }); + + it("parses term_complete progress lines", () => { + const event = parseJobSpyProgressLine( + 'JOBOPS_PROGRESS {"event":"term_complete","termIndex":2,"termTotal":3,"searchTerm":"frontend","jobsFoundTerm":17}', + ); + + expect(event).toEqual({ + type: "term_complete", + termIndex: 2, + termTotal: 3, + searchTerm: "frontend", + jobsFoundTerm: 17, + }); + }); + + it("returns null for malformed payloads", () => { + expect(parseJobSpyProgressLine("JOBOPS_PROGRESS {bad json")).toBeNull(); + expect(parseJobSpyProgressLine("JOBOPS_PROGRESS {}")).toBeNull(); + }); + + it("returns null for non-progress lines", () => { + expect(parseJobSpyProgressLine("Found 20 jobs")).toBeNull(); + }); +}); diff --git a/orchestrator/src/server/services/jobspy.ts b/orchestrator/src/server/services/jobspy.ts index 10a679c..f72370d 100644 --- a/orchestrator/src/server/services/jobspy.ts +++ b/orchestrator/src/server/services/jobspy.ts @@ -7,6 +7,7 @@ import { spawn } from "node:child_process"; import { mkdir, readFile, unlink } from "node:fs/promises"; import { dirname, join } from "node:path"; +import { createInterface } from "node:readline"; import { fileURLToPath } from "node:url"; import type { CreateJobInput, JobSource } from "@shared/types"; import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion"; @@ -15,6 +16,61 @@ import { getDataDir } from "../config/dataDir"; const __dirname = dirname(fileURLToPath(import.meta.url)); const JOBSPY_DIR = join(__dirname, "../../../../extractors/jobspy"); const JOBSPY_SCRIPT = join(JOBSPY_DIR, "scrape_jobs.py"); +const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; + +export type JobSpyProgressEvent = + | { + type: "term_start"; + termIndex: number; + termTotal: number; + searchTerm: string; + } + | { + type: "term_complete"; + termIndex: number; + termTotal: number; + searchTerm: string; + jobsFoundTerm: number; + }; + +export function parseJobSpyProgressLine( + line: string, +): JobSpyProgressEvent | null { + if (!line.startsWith(JOBOPS_PROGRESS_PREFIX)) return null; + const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim(); + let parsed: Record; + try { + parsed = JSON.parse(raw) as Record; + } catch { + return null; + } + + const eventName = toStringOrNull(parsed.event); + const termIndex = toNumberOrNull(parsed.termIndex); + const termTotal = toNumberOrNull(parsed.termTotal); + const searchTerm = toStringOrNull(parsed.searchTerm) ?? ""; + + if (!eventName || termIndex === null || termTotal === null) return null; + if (eventName === "term_start") { + return { + type: "term_start", + termIndex, + termTotal, + searchTerm, + }; + } + if (eventName === "term_complete") { + return { + type: "term_complete", + termIndex, + termTotal, + searchTerm, + jobsFoundTerm: toNumberOrNull(parsed.jobsFoundTerm) ?? 0, + }; + } + + return null; +} function getPythonPath(): string { if (process.env.PYTHON_PATH) return process.env.PYTHON_PATH; @@ -92,6 +148,7 @@ export interface RunJobSpyOptions { countryIndeed?: string; linkedinFetchDescription?: boolean; isRemote?: boolean; + onProgress?: (event: JobSpyProgressEvent) => void; } export interface JobSpyResult { @@ -131,11 +188,13 @@ export async function runJobSpy( const child = spawn(pythonPath, [JOBSPY_SCRIPT], { cwd: JOBSPY_DIR, shell: false, - stdio: "inherit", + stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, JOBSPY_SITES: sites || "indeed,linkedin", JOBSPY_SEARCH_TERM: searchTerm, + JOBSPY_TERM_INDEX: String(i + 1), + JOBSPY_TERM_TOTAL: String(searchTerms.length), JOBSPY_LOCATION: options.location ?? process.env.JOBSPY_LOCATION ?? "UK", JOBSPY_RESULTS_WANTED: String( @@ -161,7 +220,28 @@ export async function runJobSpy( }, }); + const handleLine = (line: string, stream: NodeJS.WriteStream) => { + const event = parseJobSpyProgressLine(line); + if (event) { + options.onProgress?.(event); + return; + } + stream.write(`${line}\n`); + }; + + const stdoutRl = child.stdout + ? createInterface({ input: child.stdout }) + : null; + const stderrRl = child.stderr + ? createInterface({ input: child.stderr }) + : null; + + stdoutRl?.on("line", (line) => handleLine(line, process.stdout)); + stderrRl?.on("line", (line) => handleLine(line, process.stderr)); + child.on("close", (code) => { + stdoutRl?.close(); + stderrRl?.close(); if (code === 0) resolve(); else reject(new Error(`JobSpy exited with code ${code}`)); }); diff --git a/orchestrator/src/server/services/ukvisajobs.test.ts b/orchestrator/src/server/services/ukvisajobs.test.ts new file mode 100644 index 0000000..d854255 --- /dev/null +++ b/orchestrator/src/server/services/ukvisajobs.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it } from "vitest"; +import { parseUkVisaJobsProgressLine } from "./ukvisajobs"; + +describe("parseUkVisaJobsProgressLine", () => { + it("parses init events", () => { + const event = parseUkVisaJobsProgressLine( + 'JOBOPS_PROGRESS {"event":"init","maxPages":4,"maxJobs":50,"searchKeyword":"engineer"}', + ); + + expect(event).toEqual({ + type: "init", + maxPages: 4, + maxJobs: 50, + searchKeyword: "engineer", + }); + }); + + it("parses page_fetched events", () => { + const event = parseUkVisaJobsProgressLine( + 'JOBOPS_PROGRESS {"event":"page_fetched","pageNo":2,"maxPages":4,"jobsOnPage":15,"totalCollected":28,"totalAvailable":105}', + ); + + expect(event).toEqual({ + type: "page_fetched", + pageNo: 2, + maxPages: 4, + jobsOnPage: 15, + totalCollected: 28, + totalAvailable: 105, + }); + }); + + it("parses terminal and error events", () => { + expect( + parseUkVisaJobsProgressLine( + 'JOBOPS_PROGRESS {"event":"empty_page","pageNo":3,"maxPages":4,"totalCollected":28}', + ), + ).toEqual({ + type: "empty_page", + pageNo: 3, + maxPages: 4, + totalCollected: 28, + }); + + expect( + parseUkVisaJobsProgressLine( + 'JOBOPS_PROGRESS {"event":"done","maxPages":4,"totalCollected":42,"totalAvailable":105}', + ), + ).toEqual({ + type: "done", + maxPages: 4, + totalCollected: 42, + totalAvailable: 105, + }); + + expect( + parseUkVisaJobsProgressLine( + 'JOBOPS_PROGRESS {"event":"error","message":"boom","pageNo":2,"status":500}', + ), + ).toEqual({ + type: "error", + message: "boom", + pageNo: 2, + status: 500, + }); + }); + + it("ignores malformed or unrelated lines", () => { + expect(parseUkVisaJobsProgressLine("JOBOPS_PROGRESS {bad")).toBeNull(); + expect(parseUkVisaJobsProgressLine("normal log line")).toBeNull(); + }); +}); diff --git a/orchestrator/src/server/services/ukvisajobs.ts b/orchestrator/src/server/services/ukvisajobs.ts index 629fb53..eaf2f08 100644 --- a/orchestrator/src/server/services/ukvisajobs.ts +++ b/orchestrator/src/server/services/ukvisajobs.ts @@ -7,6 +7,7 @@ import { spawn } from "node:child_process"; import { mkdir, readdir, readFile, rm } from "node:fs/promises"; import { dirname, join } from "node:path"; +import { createInterface } from "node:readline"; import { fileURLToPath } from "node:url"; import type { CreateJobInput } from "@shared/types"; import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion"; @@ -18,6 +19,7 @@ const AUTH_CACHE_PATH = join(UKVISAJOBS_DIR, "storage/ukvisajobs-auth.json"); const UKVISAJOBS_API_URL = "https://my.ukvisajobs.com/ukvisa-api/api/fetch-jobs-data"; const UKVISAJOBS_PAGE_SIZE = 15; +const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; let isUkVisaJobsRunning = false; interface UkVisaJobsAuthSession { @@ -34,6 +36,8 @@ export interface RunUkVisaJobsOptions { searchKeyword?: string; /** List of search terms to run sequentially */ searchTerms?: string[]; + /** Optional callback for structured progress emitted by extractor runs. */ + onProgress?: (event: UkVisaJobsProgressEvent) => void; } export interface UkVisaJobsResult { @@ -42,6 +46,133 @@ export interface UkVisaJobsResult { error?: string; } +type UkVisaJobsExtractorProgressEvent = + | { + type: "init"; + maxPages: number; + maxJobs: number; + searchKeyword: string; + } + | { + type: "page_fetched"; + pageNo: number; + maxPages: number; + jobsOnPage: number; + totalCollected: number; + totalAvailable: number; + } + | { + type: "done"; + maxPages: number; + totalCollected: number; + totalAvailable: number; + } + | { + type: "empty_page"; + pageNo: number; + maxPages: number; + totalCollected: number; + } + | { + type: "error"; + message: string; + pageNo?: number; + status?: number; + }; + +type UkVisaJobsExtractorEventWithTerm = UkVisaJobsExtractorProgressEvent & { + termIndex: number; + termTotal: number; + searchTerm: string; +}; + +export type UkVisaJobsProgressEvent = + | UkVisaJobsExtractorEventWithTerm + | { + type: "term_complete"; + termIndex: number; + termTotal: number; + searchTerm: string; + jobsFoundTerm: number; + totalCollected: number; + }; + +export function parseUkVisaJobsProgressLine( + line: string, +): UkVisaJobsExtractorProgressEvent | null { + if (!line.startsWith(JOBOPS_PROGRESS_PREFIX)) return null; + const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim(); + let parsed: Record; + try { + parsed = JSON.parse(raw) as Record; + } catch { + return null; + } + + const event = toStringOrNull(parsed.event); + if (!event) return null; + + if (event === "init") { + const maxPages = toNumberOrNull(parsed.maxPages); + const maxJobs = toNumberOrNull(parsed.maxJobs); + if (maxPages === null || maxJobs === null) return null; + return { + type: "init", + maxPages, + maxJobs, + searchKeyword: toStringOrNull(parsed.searchKeyword) ?? "", + }; + } + + if (event === "page_fetched") { + const pageNo = toNumberOrNull(parsed.pageNo); + const maxPages = toNumberOrNull(parsed.maxPages); + if (pageNo === null || maxPages === null) return null; + return { + type: "page_fetched", + pageNo, + maxPages, + jobsOnPage: toNumberOrNull(parsed.jobsOnPage) ?? 0, + totalCollected: toNumberOrNull(parsed.totalCollected) ?? 0, + totalAvailable: toNumberOrNull(parsed.totalAvailable) ?? 0, + }; + } + + if (event === "done") { + const maxPages = toNumberOrNull(parsed.maxPages); + if (maxPages === null) return null; + return { + type: "done", + maxPages, + totalCollected: toNumberOrNull(parsed.totalCollected) ?? 0, + totalAvailable: toNumberOrNull(parsed.totalAvailable) ?? 0, + }; + } + + if (event === "empty_page") { + const pageNo = toNumberOrNull(parsed.pageNo); + const maxPages = toNumberOrNull(parsed.maxPages); + if (pageNo === null || maxPages === null) return null; + return { + type: "empty_page", + pageNo, + maxPages, + totalCollected: toNumberOrNull(parsed.totalCollected) ?? 0, + }; + } + + if (event === "error") { + return { + type: "error", + message: toStringOrNull(parsed.message) ?? "unknown error", + pageNo: toNumberOrNull(parsed.pageNo) ?? undefined, + status: toNumberOrNull(parsed.status) ?? undefined, + }; + } + + return null; +} + function buildCookieHeader(session: UkVisaJobsAuthSession): string { const cookieParts: string[] = []; if (session.csrfToken) cookieParts.push(`csrf_token=${session.csrfToken}`); @@ -442,10 +573,12 @@ export async function runUkVisaJobs( const allJobs: CreateJobInput[] = []; const seenIds = new Set(); + const termTotal = terms.length; for (let i = 0; i < terms.length; i++) { const term = terms[i]; const termLabel = term ? `"${term}"` : "all jobs"; + const termIndex = i + 1; console.log(` Running for ${termLabel}...`); try { @@ -457,15 +590,42 @@ export async function runUkVisaJobs( await new Promise((resolve, reject) => { const child = spawn("npx", ["tsx", "src/main.ts"], { cwd: UKVISAJOBS_DIR, - stdio: "inherit", + stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, + JOBOPS_EMIT_PROGRESS: "1", UKVISAJOBS_MAX_JOBS: String(options.maxJobs ?? 50), UKVISAJOBS_SEARCH_KEYWORD: term, }, }); + const handleLine = (line: string, stream: NodeJS.WriteStream) => { + const progressEvent = parseUkVisaJobsProgressLine(line); + if (progressEvent) { + options.onProgress?.({ + ...progressEvent, + termIndex, + termTotal, + searchTerm: term, + }); + return; + } + stream.write(`${line}\n`); + }; + + const stdoutRl = child.stdout + ? createInterface({ input: child.stdout }) + : null; + const stderrRl = child.stderr + ? createInterface({ input: child.stderr }) + : null; + + stdoutRl?.on("line", (line) => handleLine(line, process.stdout)); + stderrRl?.on("line", (line) => handleLine(line, process.stderr)); + child.on("close", (code) => { + stdoutRl?.close(); + stderrRl?.close(); if (code === 0) resolve(); else reject( @@ -508,10 +668,25 @@ export async function runUkVisaJobs( console.log( ` ✅ Fetched ${runJobs.length} jobs for ${termLabel} (${newCount} new unique)`, ); + options.onProgress?.({ + type: "term_complete", + termIndex, + termTotal, + searchTerm: term, + jobsFoundTerm: newCount, + totalCollected: allJobs.length, + }); } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; console.error(`❌ UK Visa Jobs failed for ${termLabel}: ${message}`); + options.onProgress?.({ + type: "error", + termIndex, + termTotal, + searchTerm: term, + message, + }); // Continue to next term instead of failing completely } diff --git a/shared/src/types.ts b/shared/src/types.ts index d283ff8..837fb35 100644 --- a/shared/src/types.ts +++ b/shared/src/types.ts @@ -302,7 +302,7 @@ export interface PipelineRun { id: string; startedAt: string; completedAt: string | null; - status: "running" | "completed" | "failed"; + status: "running" | "completed" | "failed" | "cancelled"; jobsDiscovered: number; jobsProcessed: number; errorMessage: string | null;