Live scraping updates in pipeline UI (#100)

* initial commit

* fix clear script

* cancelling pipelines

* formatting
This commit is contained in:
Shaheer Sarfaraz 2026-02-07 22:44:00 +00:00 committed by GitHub
parent 60788b0f6a
commit a409aa5ee0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 1425 additions and 84 deletions

View File

@ -1,9 +1,12 @@
import csv
import json
import os
from pathlib import Path
from jobspy import scrape_jobs
PROGRESS_PREFIX = "JOBOPS_PROGRESS "
def _env_str(name: str, default: str) -> str:
value = os.getenv(name)
@ -27,6 +30,11 @@ def _env_bool(name: str, default: bool) -> bool:
return value.strip().lower() in ("1", "true", "yes", "y", "on")
def _emit_progress(event: str, payload: dict) -> None:
serialized = json.dumps({"event": event, **payload}, ensure_ascii=True)
print(f"{PROGRESS_PREFIX}{serialized}", flush=True)
def _parse_sites(raw: str) -> list[str]:
return [s.strip() for s in raw.split(",") if s.strip()]
@ -40,6 +48,8 @@ def main() -> int:
country_indeed = _env_str("JOBSPY_COUNTRY_INDEED", "UK")
linkedin_fetch_description = _env_bool("JOBSPY_LINKEDIN_FETCH_DESCRIPTION", True)
is_remote = _env_bool("JOBSPY_IS_REMOTE", False)
term_index = _env_int("JOBSPY_TERM_INDEX", 1)
term_total = _env_int("JOBSPY_TERM_TOTAL", 1)
output_csv = Path(_env_str("JOBSPY_OUTPUT_CSV", "jobs.csv"))
output_json = Path(
@ -50,6 +60,14 @@ def main() -> int:
output_json.parent.mkdir(parents=True, exist_ok=True)
print(f"jobspy: Search term: {search_term}")
_emit_progress(
"term_start",
{
"termIndex": term_index,
"termTotal": term_total,
"searchTerm": search_term,
},
)
jobs = scrape_jobs(
site_name=sites,
search_term=search_term,
@ -62,6 +80,15 @@ def main() -> int:
)
print(f"Found {len(jobs)} jobs")
_emit_progress(
"term_complete",
{
"termIndex": term_index,
"termTotal": term_total,
"searchTerm": search_term,
"jobsFoundTerm": int(len(jobs)),
},
)
jobs.to_csv(
output_csv,

View File

@ -32,6 +32,16 @@ const AUTH_CACHE_PATH = join(__dirname, "../storage/ukvisajobs-auth.json");
const JOBS_PER_PAGE = 15;
const DEFAULT_MAX_JOBS = 50;
const MAX_ALLOWED_JOBS = 200;
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
function emitProgress(
event: string,
payload: Record<string, unknown> = {},
): void {
if (process.env.JOBOPS_EMIT_PROGRESS !== "1") return;
const serialized = JSON.stringify({ event, ...payload });
process.stdout.write(`${JOBOPS_PROGRESS_PREFIX}${serialized}\n`);
}
interface UkVisaJobsApiJob {
id: string;
@ -444,6 +454,11 @@ async function main(): Promise<void> {
if (searchKeyword) {
console.log(` Search keyword: ${searchKeyword}`);
}
emitProgress("init", {
maxPages,
maxJobs,
searchKeyword: searchKeyword || "",
});
const allJobs: ExtractedJob[] = [];
const seenIds = new Set<string>();
@ -481,6 +496,11 @@ async function main(): Promise<void> {
}
if (response.status !== 1) {
emitProgress("error", {
pageNo,
status: response.status,
message: `API returned status ${response.status}`,
});
console.warn(
` ⚠️ API returned status ${response.status} on page ${pageNo}`,
);
@ -493,6 +513,11 @@ async function main(): Promise<void> {
}
if (!response.jobs || response.jobs.length === 0) {
emitProgress("empty_page", {
pageNo,
maxPages,
totalCollected: allJobs.length,
});
console.log(` No more jobs on page ${pageNo}`);
break;
}
@ -508,6 +533,14 @@ async function main(): Promise<void> {
allJobs.push(mapped);
}
emitProgress("page_fetched", {
pageNo,
maxPages,
jobsOnPage: response.jobs.length,
totalCollected: allJobs.length,
totalAvailable,
});
// If we got fewer jobs than a full page, we're at the end
if (response.jobs.length < JOBS_PER_PAGE) {
break;
@ -519,6 +552,11 @@ async function main(): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, 500));
}
emitProgress("done", {
maxPages,
totalCollected: allJobs.length,
totalAvailable,
});
console.log(`✅ Scraped ${allJobs.length} jobs`);
// Write output to storage directory (similar to Crawlee dataset structure)
@ -542,6 +580,7 @@ async function main(): Promise<void> {
console.log(` Jobs file: ${outputFile}`);
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
emitProgress("error", { message });
console.error(`❌ Error: ${message}`);
process.exit(1);
}

View File

@ -319,6 +319,20 @@ export async function runPipeline(config?: {
});
}
export async function cancelPipeline(): Promise<{
message: string;
pipelineRunId: string | null;
alreadyRequested: boolean;
}> {
return fetchApi<{
message: string;
pipelineRunId: string | null;
alreadyRequested: boolean;
}>("/pipeline/cancel", {
method: "POST",
});
}
export async function getDemoInfo(): Promise<DemoInfoResponse> {
return fetchApi<DemoInfoResponse>("/demo/info");
}

View File

@ -0,0 +1,100 @@
import { act, render, screen } from "@testing-library/react";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { PipelineProgress } from "./PipelineProgress";
class MockEventSource {
static instances: MockEventSource[] = [];
onopen: ((event: Event) => void) | null = null;
onmessage: ((event: MessageEvent) => void) | null = null;
onerror: ((event: Event) => void) | null = null;
constructor(public url: string) {
MockEventSource.instances.push(this);
}
close = vi.fn();
emitOpen() {
this.onopen?.(new Event("open"));
}
emitMessage(payload: unknown) {
this.onmessage?.({
data: JSON.stringify(payload),
} as MessageEvent);
}
}
const baseProgress = {
step: "crawling" as const,
message: "Fetching jobs from sources...",
detail: "Running crawler",
crawlingSource: "jobspy" as const,
crawlingSourcesCompleted: 1,
crawlingSourcesTotal: 3,
crawlingTermsProcessed: 2,
crawlingTermsTotal: 4,
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
crawlingJobPagesEnqueued: 0,
crawlingJobPagesSkipped: 0,
crawlingJobPagesProcessed: 0,
crawlingPhase: "list" as const,
crawlingCurrentUrl: "engineer",
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
};
describe("PipelineProgress", () => {
beforeEach(() => {
MockEventSource.instances = [];
(globalThis as any).EventSource = MockEventSource;
});
afterEach(() => {
vi.restoreAllMocks();
});
it("renders renamed crawling labels and source/terms context", () => {
render(<PipelineProgress isRunning />);
const sse = MockEventSource.instances[0];
act(() => {
sse.emitOpen();
sse.emitMessage({
...baseProgress,
crawlingListPagesProcessed: 3,
crawlingListPagesTotal: 10,
crawlingJobPagesProcessed: 8,
crawlingJobPagesEnqueued: 30,
crawlingJobPagesSkipped: 4,
});
});
expect(screen.getByText("List pages")).toBeInTheDocument();
expect(screen.getByText("Job pages")).toBeInTheDocument();
expect(screen.getByText("Enqueued")).toBeInTheDocument();
expect(screen.getByText("Skipped")).toBeInTheDocument();
expect(screen.getByText("3/10")).toBeInTheDocument();
expect(screen.getByText("8/30")).toBeInTheDocument();
expect(
screen.getByText(/Source:\s+JobSpy\s+\(1\/3\)\s+Terms:\s+2\/4/),
).toBeInTheDocument();
});
it("uses fallback dashes for unknown page denominators", () => {
render(<PipelineProgress isRunning />);
const sse = MockEventSource.instances[0];
act(() => {
sse.emitOpen();
sse.emitMessage(baseProgress);
});
expect(screen.queryByText("0/0")).not.toBeInTheDocument();
expect(screen.getAllByText("—").length).toBeGreaterThanOrEqual(2);
});
});

View File

@ -20,9 +20,15 @@ interface PipelineProgress {
| "scoring"
| "processing"
| "completed"
| "cancelled"
| "failed";
message: string;
detail?: string;
crawlingSource: "gradcracker" | "jobspy" | "ukvisajobs" | null;
crawlingSourcesCompleted: number;
crawlingSourcesTotal: number;
crawlingTermsProcessed: number;
crawlingTermsTotal: number;
crawlingListPagesProcessed: number;
crawlingListPagesTotal: number;
crawlingJobCardsFound: number;
@ -56,6 +62,7 @@ const stepLabels: Record<PipelineProgress["step"], string> = {
scoring: "Scoring",
processing: "Processing",
completed: "Complete",
cancelled: "Cancelled",
failed: "Failed",
};
@ -66,9 +73,19 @@ const stepBadgeClasses: Record<PipelineProgress["step"], string> = {
scoring: "bg-amber-500/10 text-amber-400 border-amber-500/20",
processing: "bg-primary/10 text-primary border-primary/20",
completed: "bg-emerald-500/10 text-emerald-400 border-emerald-500/20",
cancelled: "bg-muted text-muted-foreground border-border",
failed: "bg-destructive/10 text-destructive border-destructive/20",
};
const sourceLabel: Record<
Exclude<PipelineProgress["crawlingSource"], null>,
string
> = {
gradcracker: "Gradcracker",
jobspy: "JobSpy",
ukvisajobs: "UKVisaJobs",
};
const clamp = (value: number, min: number, max: number) =>
Math.max(min, Math.min(max, value));
@ -83,6 +100,15 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({
switch (progress.step) {
case "crawling": {
if (progress.crawlingTermsTotal > 0) {
return clamp(
5 +
(progress.crawlingTermsProcessed / progress.crawlingTermsTotal) *
10,
5,
15,
);
}
if (progress.crawlingListPagesTotal > 0) {
return clamp(
(progress.crawlingListPagesProcessed /
@ -119,6 +145,7 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({
return 55;
}
case "completed":
case "cancelled":
case "failed":
return 100;
default:
@ -162,11 +189,31 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({
}
const step = progress?.step ?? "idle";
const isActive = step !== "idle" && step !== "completed" && step !== "failed";
const isActive =
step !== "idle" &&
step !== "completed" &&
step !== "cancelled" &&
step !== "failed";
const listPagesText = progress
? progress.crawlingListPagesTotal > 0
? `${progress.crawlingListPagesProcessed}/${progress.crawlingListPagesTotal}`
: progress.crawlingListPagesProcessed > 0
? `${progress.crawlingListPagesProcessed}`
: "—"
: "—";
const jobPagesText = progress
? progress.crawlingJobPagesEnqueued > 0
? `${progress.crawlingJobPagesProcessed}/${progress.crawlingJobPagesEnqueued}`
: progress.crawlingJobPagesProcessed > 0
? `${progress.crawlingJobPagesProcessed}`
: "—"
: "—";
const showStats =
!!progress &&
["crawling", "scoring", "processing", "completed"].includes(step);
["crawling", "scoring", "processing", "completed", "cancelled"].includes(
step,
);
return (
<Card>
@ -201,6 +248,23 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({
{progress.detail && (
<p className="text-sm text-muted-foreground">{progress.detail}</p>
)}
{step === "crawling" && (
<p className="text-xs text-muted-foreground">
Source:{" "}
{progress.crawlingSource
? sourceLabel[progress.crawlingSource]
: "starting"}
{" "}({progress.crawlingSourcesCompleted}/
{Math.max(progress.crawlingSourcesTotal, 0)})
{progress.crawlingTermsTotal > 0 && (
<>
{" "}
Terms: {progress.crawlingTermsProcessed}/
{progress.crawlingTermsTotal}
</>
)}
</p>
)}
</div>
{showStats && (
@ -211,21 +275,15 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({
<>
<div>
<div className="text-xs text-muted-foreground">
Sources
</div>
<div className="tabular-nums">
{progress.crawlingListPagesProcessed}
{progress.crawlingListPagesTotal > 0
? `/${progress.crawlingListPagesTotal}`
: ""}
List pages
</div>
<div className="tabular-nums">{listPagesText}</div>
</div>
<div>
<div className="text-xs text-muted-foreground">Pages</div>
<div className="tabular-nums">
{progress.crawlingJobPagesProcessed}/
{Math.max(progress.crawlingJobPagesEnqueued, 0)}
<div className="text-xs text-muted-foreground">
Job pages
</div>
<div className="tabular-nums">{jobPagesText}</div>
</div>
<div>
<div className="text-xs text-muted-foreground">

View File

@ -9,6 +9,11 @@ import type { FilterTab } from "./orchestrator/constants";
vi.mock("../api", () => ({
updateSettings: vi.fn().mockResolvedValue({}),
runPipeline: vi.fn().mockResolvedValue({ message: "ok" }),
cancelPipeline: vi.fn().mockResolvedValue({
message: "Pipeline cancellation requested",
pipelineRunId: "run-1",
alreadyRequested: false,
}),
getPipelineStatus: vi.fn().mockResolvedValue({
isRunning: false,
lastRun: null,
@ -16,6 +21,8 @@ vi.mock("../api", () => ({
}),
}));
let mockIsPipelineRunning = false;
const jobFixture: Job = {
id: "job-1",
source: "linkedin",
@ -103,7 +110,7 @@ vi.mock("./orchestrator/useOrchestratorData", () => ({
expired: 0,
},
isLoading: false,
isPipelineRunning: false,
isPipelineRunning: mockIsPipelineRunning,
setIsPipelineRunning: vi.fn(),
loadJobs: vi.fn(),
}),
@ -129,7 +136,17 @@ vi.mock("../hooks/useSettings", () => ({
}));
vi.mock("./orchestrator/OrchestratorHeader", () => ({
OrchestratorHeader: () => <div data-testid="header" />,
OrchestratorHeader: ({
onCancelPipeline,
}: {
onCancelPipeline: () => void;
}) => (
<div data-testid="header">
<button type="button" onClick={onCancelPipeline}>
Cancel Pipeline
</button>
</div>
),
}));
vi.mock("./orchestrator/OrchestratorSummary", () => ({
@ -240,6 +257,7 @@ const LocationWatcher = () => {
describe("OrchestratorPage", () => {
beforeEach(() => {
vi.clearAllMocks();
mockIsPipelineRunning = false;
});
it("syncs tab selection to the URL", () => {
@ -261,6 +279,28 @@ describe("OrchestratorPage", () => {
expect(screen.getByTestId("location").textContent).toContain("/discovered");
});
it("requests pipeline cancellation when running", async () => {
mockIsPipelineRunning = true;
window.matchMedia = createMatchMedia(
true,
) as unknown as typeof window.matchMedia;
render(
<MemoryRouter initialEntries={["/ready"]}>
<Routes>
<Route path="/:tab" element={<OrchestratorPage />} />
<Route path="/:tab/:jobId" element={<OrchestratorPage />} />
</Routes>
</MemoryRouter>,
);
fireEvent.click(screen.getByText("Cancel Pipeline"));
await waitFor(() => {
expect(api.cancelPipeline).toHaveBeenCalledTimes(1);
});
});
it("syncs job selection to the URL", async () => {
window.matchMedia = createMatchMedia(
true,

View File

@ -137,6 +137,7 @@ export const OrchestratorPage: React.FC = () => {
const [isRunModeModalOpen, setIsRunModeModalOpen] = useState(false);
const [runMode, setRunMode] = useState<RunMode>("automatic");
const [isDetailDrawerOpen, setIsDetailDrawerOpen] = useState(false);
const [isCancelling, setIsCancelling] = useState(false);
const [isDesktop, setIsDesktop] = useState(() =>
typeof window !== "undefined"
? window.matchMedia("(min-width: 1024px)").matches
@ -232,6 +233,7 @@ export const OrchestratorPage: React.FC = () => {
}) => {
try {
setIsPipelineRunning(true);
setIsCancelling(false);
await api.runPipeline(config);
toast.message("Pipeline started", {
description: `Sources: ${config.sources.join(", ")}. This may take a few minutes.`,
@ -243,8 +245,16 @@ export const OrchestratorPage: React.FC = () => {
if (!status.isRunning) {
clearInterval(pollInterval);
setIsPipelineRunning(false);
setIsCancelling(false);
await loadJobs();
toast.success("Pipeline completed");
const outcome = status.lastRun?.status;
if (outcome === "cancelled") {
toast.message("Pipeline cancelled");
} else if (outcome === "failed") {
toast.error(status.lastRun?.errorMessage || "Pipeline failed");
} else {
toast.success("Pipeline completed");
}
}
} catch {
// Ignore errors
@ -252,6 +262,7 @@ export const OrchestratorPage: React.FC = () => {
}, 5000);
} catch (error) {
setIsPipelineRunning(false);
setIsCancelling(false);
const message =
error instanceof Error ? error.message : "Failed to start pipeline";
toast.error(message);
@ -260,6 +271,21 @@ export const OrchestratorPage: React.FC = () => {
[loadJobs, setIsPipelineRunning],
);
const handleCancelPipeline = useCallback(async () => {
if (isCancelling || !isPipelineRunning) return;
try {
setIsCancelling(true);
const result = await api.cancelPipeline();
toast.message(result.message);
} catch (error) {
setIsCancelling(false);
const message =
error instanceof Error ? error.message : "Failed to cancel pipeline";
toast.error(message);
}
}, [isCancelling, isPipelineRunning]);
const handleSaveAndRunAutomatic = useCallback(
async (values: AutomaticRunValues) => {
const limits = deriveExtractorLimits({
@ -352,8 +378,10 @@ export const OrchestratorPage: React.FC = () => {
navOpen={navOpen}
onNavOpenChange={setNavOpen}
isPipelineRunning={isPipelineRunning}
isCancelling={isCancelling}
pipelineSources={pipelineSources}
onOpenAutomaticRun={() => openRunMode("automatic")}
onCancelPipeline={handleCancelPipeline}
/>
<main className="container mx-auto max-w-7xl space-y-6 px-4 py-6 pb-12">

View File

@ -28,8 +28,10 @@ const renderHeader = (
navOpen: false,
onNavOpenChange: vi.fn(),
isPipelineRunning: false,
isCancelling: false,
pipelineSources: ["gradcracker"],
onOpenAutomaticRun: vi.fn(),
onCancelPipeline: vi.fn(),
...overrides,
};
@ -56,4 +58,10 @@ describe("OrchestratorHeader", () => {
screen.queryByRole("button", { name: /manual import/i }),
).not.toBeInTheDocument();
});
it("renders cancel button while running and triggers cancel", () => {
const { props } = renderHeader({ isPipelineRunning: true });
fireEvent.click(screen.getByRole("button", { name: /cancel run/i }));
expect(props.onCancelPipeline).toHaveBeenCalled();
});
});

View File

@ -1,6 +1,6 @@
import { isNavActive, NAV_LINKS } from "@client/components/navigation";
import type { JobSource } from "@shared/types.js";
import { Loader2, Menu, Play, Sparkles } from "lucide-react";
import { Loader2, Menu, Play, Sparkles, Square } from "lucide-react";
import type React from "react";
import { useLocation, useNavigate } from "react-router-dom";
import { Button } from "@/components/ui/button";
@ -17,16 +17,20 @@ interface OrchestratorHeaderProps {
navOpen: boolean;
onNavOpenChange: (open: boolean) => void;
isPipelineRunning: boolean;
isCancelling: boolean;
pipelineSources: JobSource[];
onOpenAutomaticRun: () => void;
onCancelPipeline: () => void;
}
export const OrchestratorHeader: React.FC<OrchestratorHeaderProps> = ({
navOpen,
onNavOpenChange,
isPipelineRunning,
isCancelling,
pipelineSources,
onOpenAutomaticRun,
onCancelPipeline,
}) => {
const location = useLocation();
const navigate = useNavigate();
@ -94,23 +98,31 @@ export const OrchestratorHeader: React.FC<OrchestratorHeaderProps> = ({
</div>
<div className="flex items-center gap-2">
<Button
size="sm"
onClick={onOpenAutomaticRun}
disabled={isPipelineRunning}
className="gap-2"
>
{isPipelineRunning ? (
<Loader2 className="h-4 w-4 animate-spin" />
) : (
{isPipelineRunning ? (
<Button
size="sm"
onClick={onCancelPipeline}
disabled={isCancelling}
variant="destructive"
className="gap-2"
>
{isCancelling ? (
<Loader2 className="h-4 w-4 animate-spin" />
) : (
<Square className="h-4 w-4" />
)}
<span className="hidden sm:inline">
{isCancelling
? `Cancelling (${pipelineSources.length})`
: `Cancel run`}
</span>
</Button>
) : (
<Button size="sm" onClick={onOpenAutomaticRun} className="gap-2">
<Play className="h-4 w-4" />
)}
<span className="hidden sm:inline">
{isPipelineRunning
? `Running (${pipelineSources.length})`
: `Run pipeline`}
</span>
</Button>
<span className="hidden sm:inline">Run pipeline</span>
</Button>
)}
</div>
</div>
</header>

View File

@ -1,5 +1,5 @@
import type { Server } from "node:http";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { startServer, stopServer } from "./test-utils";
describe.sequential("Pipeline API routes", () => {
@ -46,6 +46,38 @@ describe.sequential("Pipeline API routes", () => {
});
});
it("returns conflict when cancelling with no active pipeline", async () => {
const res = await fetch(`${baseUrl}/api/pipeline/cancel`, {
method: "POST",
});
const body = await res.json();
expect(res.status).toBe(409);
expect(body.ok).toBe(false);
expect(body.error.code).toBe("CONFLICT");
expect(typeof body.meta.requestId).toBe("string");
});
it("accepts cancellation when pipeline is running", async () => {
const { requestPipelineCancel } = await import("../../pipeline/index");
vi.mocked(requestPipelineCancel).mockReturnValue({
accepted: true,
pipelineRunId: "run-1",
alreadyRequested: false,
});
const res = await fetch(`${baseUrl}/api/pipeline/cancel`, {
method: "POST",
});
const body = await res.json();
expect(res.status).toBe(200);
expect(body.ok).toBe(true);
expect(body.data.pipelineRunId).toBe("run-1");
expect(body.data.alreadyRequested).toBe(false);
expect(typeof body.meta.requestId).toBe("string");
});
it("streams pipeline progress over SSE", async () => {
const controller = new AbortController();
const res = await fetch(`${baseUrl}/api/pipeline/progress`, {
@ -61,6 +93,8 @@ describe.sequential("Pipeline API routes", () => {
const { value } = await reader.read();
const text = new TextDecoder().decode(value);
expect(text).toContain("data:");
expect(text).toContain('"crawlingSource"');
expect(text).toContain('"crawlingSourcesTotal"');
} finally {
await reader.cancel();
controller.abort();

View File

@ -1,4 +1,4 @@
import { AppError, badRequest, requestTimeout } from "@infra/errors";
import { AppError, badRequest, conflict, requestTimeout } from "@infra/errors";
import { fail, ok, okWithMeta } from "@infra/http";
import { logger } from "@infra/logger";
import { runWithRequestContext } from "@infra/request-context";
@ -8,6 +8,7 @@ import { z } from "zod";
import { isDemoMode } from "../../config/demo";
import {
getPipelineStatus,
requestPipelineCancel,
runPipeline,
subscribeToProgress,
} from "../../pipeline/index";
@ -135,3 +136,40 @@ pipelineRouter.post("/run", async (req: Request, res: Response) => {
);
}
});
/**
* POST /api/pipeline/cancel - Request cancellation of active pipeline run
*/
pipelineRouter.post("/cancel", async (_req: Request, res: Response) => {
try {
const cancelResult = requestPipelineCancel();
if (!cancelResult.accepted) {
return fail(res, conflict("No running pipeline to cancel"));
}
logger.info("Pipeline cancellation requested", {
route: "/api/pipeline/cancel",
action: "cancel",
status: "accepted",
pipelineRunId: cancelResult.pipelineRunId,
alreadyRequested: cancelResult.alreadyRequested,
});
ok(res, {
message: cancelResult.alreadyRequested
? "Pipeline cancellation already requested"
: "Pipeline cancellation requested",
pipelineRunId: cancelResult.pipelineRunId,
alreadyRequested: cancelResult.alreadyRequested,
});
} catch (error) {
fail(
res,
new AppError({
status: 500,
code: "INTERNAL_ERROR",
message: error instanceof Error ? error.message : "Unknown error",
}),
);
}
});

View File

@ -8,6 +8,11 @@ vi.mock("../../pipeline/index", () => {
const progress = {
step: "idle",
message: "Ready",
crawlingSource: null,
crawlingSourcesCompleted: 0,
crawlingSourcesTotal: 0,
crawlingTermsProcessed: 0,
crawlingTermsTotal: 0,
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
@ -30,6 +35,12 @@ vi.mock("../../pipeline/index", () => {
summarizeJob: vi.fn().mockResolvedValue({ success: true }),
generateFinalPdf: vi.fn().mockResolvedValue({ success: true }),
getPipelineStatus: vi.fn(() => ({ isRunning: false })),
requestPipelineCancel: vi.fn(() => ({
accepted: false,
pipelineRunId: null,
alreadyRequested: false,
})),
isPipelineCancelRequested: vi.fn(() => false),
subscribeToProgress: vi.fn((listener: (data: unknown) => void) => {
listener(progress);
return () => {};

View File

@ -2,6 +2,7 @@
* Database utility scripts.
*/
import { existsSync, unlinkSync } from "node:fs";
import { join } from "node:path";
import Database from "better-sqlite3";
import { getDataDir } from "../config/dataDir";
@ -38,8 +39,6 @@ export function clearDatabase(): { jobsDeleted: number; runsDeleted: number } {
* Delete database file completely (will recreate on next run).
*/
export function dropDatabase(): void {
const { unlinkSync, existsSync } = require("node:fs");
if (existsSync(DB_PATH)) {
unlinkSync(DB_PATH);
console.log("🗑️ Database file deleted");

View File

@ -80,7 +80,7 @@ const migrations = [
id TEXT PRIMARY KEY,
started_at TEXT NOT NULL DEFAULT (datetime('now')),
completed_at TEXT,
status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running', 'completed', 'failed')),
status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running', 'completed', 'failed', 'cancelled')),
jobs_discovered INTEGER NOT NULL DEFAULT 0,
jobs_processed INTEGER NOT NULL DEFAULT 0,
error_message TEXT
@ -179,6 +179,22 @@ const migrations = [
`ALTER TABLE stage_events ADD COLUMN title TEXT NOT NULL DEFAULT ''`,
`ALTER TABLE stage_events ADD COLUMN group_id TEXT`,
// Ensure pipeline_runs status supports "cancelled" for existing databases.
`CREATE TABLE IF NOT EXISTS pipeline_runs_new (
id TEXT PRIMARY KEY,
started_at TEXT NOT NULL DEFAULT (datetime('now')),
completed_at TEXT,
status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running', 'completed', 'failed', 'cancelled')),
jobs_discovered INTEGER NOT NULL DEFAULT 0,
jobs_processed INTEGER NOT NULL DEFAULT 0,
error_message TEXT
)`,
`INSERT OR REPLACE INTO pipeline_runs_new (id, started_at, completed_at, status, jobs_discovered, jobs_processed, error_message)
SELECT id, started_at, completed_at, status, jobs_discovered, jobs_processed, error_message
FROM pipeline_runs`,
`DROP TABLE IF EXISTS pipeline_runs`,
`ALTER TABLE pipeline_runs_new RENAME TO pipeline_runs`,
`CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)`,
`CREATE INDEX IF NOT EXISTS idx_jobs_discovered_at ON jobs(discovered_at)`,
`CREATE INDEX IF NOT EXISTS idx_pipeline_runs_started_at ON pipeline_runs(started_at)`,

View File

@ -141,7 +141,7 @@ export const pipelineRuns = sqliteTable("pipeline_runs", {
startedAt: text("started_at").notNull().default(sql`(datetime('now'))`),
completedAt: text("completed_at"),
status: text("status", {
enum: ["running", "completed", "failed"],
enum: ["running", "completed", "failed", "cancelled"],
})
.notNull()
.default("running"),

View File

@ -0,0 +1,84 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const stepState = vi.hoisted(() => {
let resolveDiscover:
| ((value: { discoveredJobs: []; sourceErrors: [] }) => void)
| null = null;
return {
setResolver: (
fn: (value: { discoveredJobs: []; sourceErrors: [] }) => void,
) => {
resolveDiscover = fn;
},
resolveDiscover: () =>
resolveDiscover?.({ discoveredJobs: [], sourceErrors: [] }),
};
});
vi.mock("../repositories/pipeline", () => ({
createPipelineRun: vi.fn(async () => ({
id: "run-cancel-1",
startedAt: new Date().toISOString(),
completedAt: null,
status: "running",
jobsDiscovered: 0,
jobsProcessed: 0,
errorMessage: null,
})),
updatePipelineRun: vi.fn(async () => undefined),
}));
vi.mock("./steps", () => ({
loadProfileStep: vi.fn(async () => ({})),
discoverJobsStep: vi.fn(
() =>
new Promise<{ discoveredJobs: []; sourceErrors: [] }>((resolve) => {
stepState.setResolver(resolve);
}),
),
importJobsStep: vi.fn(async () => ({ created: 0, skipped: 0 })),
scoreJobsStep: vi.fn(async () => ({ unprocessedJobs: [], scoredJobs: [] })),
selectJobsStep: vi.fn(() => []),
processJobsStep: vi.fn(async () => ({ processedCount: 0 })),
notifyPipelineWebhookStep: vi.fn(async () => undefined),
}));
describe.sequential("pipeline cancellation", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("marks run as cancelled at checkpoint and resets running state", async () => {
const pipeline = await import("./orchestrator");
const pipelineRepo = await import("../repositories/pipeline");
const steps = await import("./steps");
const runPromise = pipeline.runPipeline({ sources: [] });
await Promise.resolve();
const cancelRequest = pipeline.requestPipelineCancel();
expect(cancelRequest.accepted).toBe(true);
expect([null, "run-cancel-1"]).toContain(cancelRequest.pipelineRunId);
expect(pipeline.isPipelineCancelRequested()).toBe(true);
const duplicateRequest = pipeline.requestPipelineCancel();
expect(duplicateRequest.accepted).toBe(true);
expect(duplicateRequest.alreadyRequested).toBe(true);
stepState.resolveDiscover();
const result = await runPromise;
expect(result.success).toBe(false);
expect(result.error).toContain("Cancelled");
expect(vi.mocked(steps.importJobsStep)).not.toHaveBeenCalled();
expect(vi.mocked(pipelineRepo.updatePipelineRun)).toHaveBeenCalledWith(
"run-cancel-1",
expect.objectContaining({
status: "cancelled",
}),
);
expect(pipeline.getPipelineStatus().isRunning).toBe(false);
expect(pipeline.isPipelineCancelRequested()).toBe(false);
});
});

View File

@ -47,6 +47,21 @@ const DEFAULT_CONFIG: PipelineConfig = {
// Track if pipeline is currently running
let isPipelineRunning = false;
let activePipelineRunId: string | null = null;
let cancelRequestedAt: string | null = null;
class PipelineCancelledError extends Error {
constructor(message = "Pipeline cancellation requested") {
super(message);
this.name = "PipelineCancelledError";
}
}
function ensureNotCancelled(): void {
if (cancelRequestedAt) {
throw new PipelineCancelledError();
}
}
/**
* Run the full job discovery and processing pipeline.
@ -69,12 +84,18 @@ export async function runPipeline(
}
isPipelineRunning = true;
activePipelineRunId = "pending";
cancelRequestedAt = null;
resetProgress();
const mergedConfig = { ...DEFAULT_CONFIG, ...config };
const pipelineRun = await pipelineRepo.createPipelineRun();
activePipelineRunId = pipelineRun.id;
return runWithRequestContext({ pipelineRunId: pipelineRun.id }, async () => {
const pipelineLogger = logger.child({ pipelineRunId: pipelineRun.id });
let jobsDiscovered = 0;
let jobsProcessed = 0;
pipelineLogger.info("Starting pipeline run", {
topN: mergedConfig.topN,
minSuitabilityScore: mergedConfig.minSuitabilityScore,
@ -82,18 +103,30 @@ export async function runPipeline(
});
try {
ensureNotCancelled();
const profile = await loadProfileStep();
const { discoveredJobs } = await discoverJobsStep({ mergedConfig });
ensureNotCancelled();
const { discoveredJobs } = await discoverJobsStep({
mergedConfig,
shouldCancel: () => cancelRequestedAt !== null,
});
ensureNotCancelled();
const { created } = await importJobsStep({ discoveredJobs });
jobsDiscovered = created;
await pipelineRepo.updatePipelineRun(pipelineRun.id, {
jobsDiscovered: created,
});
const { unprocessedJobs, scoredJobs } = await scoreJobsStep({ profile });
ensureNotCancelled();
const { unprocessedJobs, scoredJobs } = await scoreJobsStep({
profile,
shouldCancel: () => cancelRequestedAt !== null,
});
ensureNotCancelled();
const jobsToProcess = selectJobsStep({
scoredJobs,
mergedConfig,
@ -106,7 +139,9 @@ export async function runPipeline(
const { processedCount } = await processJobsStep({
jobsToProcess,
processJob,
shouldCancel: () => cancelRequestedAt !== null,
});
jobsProcessed = processedCount;
await pipelineRepo.updatePipelineRun(pipelineRun.id, {
status: "completed",
@ -133,6 +168,28 @@ export async function runPipeline(
jobsProcessed: processedCount,
};
} catch (error) {
if (error instanceof PipelineCancelledError) {
const message = "Cancelled by user request";
await pipelineRepo.updatePipelineRun(pipelineRun.id, {
status: "cancelled",
completedAt: new Date().toISOString(),
jobsDiscovered,
jobsProcessed,
errorMessage: message,
});
progressHelpers.cancelled(message);
pipelineLogger.info("Pipeline run cancelled", {
jobsDiscovered,
jobsProcessed,
});
return {
success: false,
jobsDiscovered,
jobsProcessed,
error: message,
};
}
const message = error instanceof Error ? error.message : "Unknown error";
await pipelineRepo.updatePipelineRun(pipelineRun.id, {
@ -151,12 +208,14 @@ export async function runPipeline(
return {
success: false,
jobsDiscovered: 0,
jobsProcessed: 0,
jobsDiscovered,
jobsProcessed,
error: message,
};
} finally {
isPipelineRunning = false;
activePipelineRunId = null;
cancelRequestedAt = null;
}
});
}
@ -340,3 +399,37 @@ export async function processJob(
export function getPipelineStatus(): { isRunning: boolean } {
return { isRunning: isPipelineRunning };
}
export function requestPipelineCancel(): {
accepted: boolean;
pipelineRunId: string | null;
alreadyRequested: boolean;
} {
if (!isPipelineRunning) {
return { accepted: false, pipelineRunId: null, alreadyRequested: false };
}
const pipelineRunId =
activePipelineRunId && activePipelineRunId !== "pending"
? activePipelineRunId
: null;
if (cancelRequestedAt) {
return {
accepted: true,
pipelineRunId,
alreadyRequested: true,
};
}
cancelRequestedAt = new Date().toISOString();
return {
accepted: true,
pipelineRunId,
alreadyRequested: false,
};
}
export function isPipelineCancelRequested(): boolean {
return cancelRequestedAt !== null;
}

View File

@ -11,12 +11,20 @@ export type PipelineStep =
| "scoring"
| "processing"
| "completed"
| "cancelled"
| "failed";
export type CrawlSource = "gradcracker" | "jobspy" | "ukvisajobs";
export interface PipelineProgress {
step: PipelineStep;
message: string;
detail?: string;
crawlingSource: CrawlSource | null;
crawlingSourcesCompleted: number;
crawlingSourcesTotal: number;
crawlingTermsProcessed: number;
crawlingTermsTotal: number;
crawlingListPagesProcessed: number;
crawlingListPagesTotal: number;
crawlingJobCardsFound: number;
@ -46,6 +54,11 @@ const listeners: Set<ProgressListener> = new Set();
let currentProgress: PipelineProgress = {
step: "idle",
message: "Ready",
crawlingSource: null,
crawlingSourcesCompleted: 0,
crawlingSourcesTotal: 0,
crawlingTermsProcessed: 0,
crawlingTermsTotal: 0,
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
@ -58,6 +71,19 @@ let currentProgress: PipelineProgress = {
totalToProcess: 0,
};
const emptyCrawlingStats = {
crawlingTermsProcessed: 0,
crawlingTermsTotal: 0,
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
crawlingJobPagesEnqueued: 0,
crawlingJobPagesSkipped: 0,
crawlingJobPagesProcessed: 0,
crawlingPhase: undefined,
crawlingCurrentUrl: undefined,
};
/**
* Update the current progress and notify all listeners.
*/
@ -103,14 +129,10 @@ export function resetProgress(): void {
currentProgress = {
step: "idle",
message: "Ready",
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
crawlingJobPagesEnqueued: 0,
crawlingJobPagesSkipped: 0,
crawlingJobPagesProcessed: 0,
crawlingPhase: undefined,
crawlingCurrentUrl: undefined,
crawlingSource: null,
crawlingSourcesCompleted: 0,
crawlingSourcesTotal: 0,
...emptyCrawlingStats,
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
@ -122,27 +144,51 @@ export function resetProgress(): void {
* Helper to create progress updates for each step.
*/
export const progressHelpers = {
startCrawling: () =>
startCrawling: (sourcesTotal = 0) =>
updateProgress({
step: "crawling",
message: "Fetching jobs from sources...",
detail: "Starting crawler",
startedAt: new Date().toISOString(),
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
crawlingJobPagesEnqueued: 0,
crawlingJobPagesSkipped: 0,
crawlingJobPagesProcessed: 0,
crawlingPhase: undefined,
crawlingCurrentUrl: undefined,
crawlingSource: null,
crawlingSourcesCompleted: 0,
crawlingSourcesTotal: sourcesTotal,
...emptyCrawlingStats,
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
}),
startSource: (
source: CrawlSource,
sourcesCompleted: number,
sourcesTotal: number,
options?: { termsTotal?: number; detail?: string },
) =>
updateProgress({
step: "crawling",
message: `Fetching jobs from ${source}...`,
detail: options?.detail,
crawlingSource: source,
crawlingSourcesCompleted: sourcesCompleted,
crawlingSourcesTotal: sourcesTotal,
...emptyCrawlingStats,
crawlingTermsTotal: options?.termsTotal ?? 0,
}),
completeSource: (sourcesCompleted: number, sourcesTotal: number) =>
updateProgress({
crawlingSourcesCompleted: sourcesCompleted,
crawlingSourcesTotal: sourcesTotal,
crawlingCurrentUrl: undefined,
crawlingPhase: undefined,
}),
crawlingUpdate: (update: {
source?: CrawlSource;
termsProcessed?: number;
termsTotal?: number;
listPagesProcessed?: number;
listPagesTotal?: number;
jobCardsFound?: number;
@ -155,6 +201,10 @@ export const progressHelpers = {
const current = getProgress();
const next = {
...current,
crawlingSource: update.source ?? current.crawlingSource,
crawlingTermsProcessed:
update.termsProcessed ?? current.crawlingTermsProcessed,
crawlingTermsTotal: update.termsTotal ?? current.crawlingTermsTotal,
crawlingListPagesProcessed:
update.listPagesProcessed ?? current.crawlingListPagesProcessed,
crawlingListPagesTotal:
@ -177,6 +227,10 @@ export const progressHelpers = {
: `${next.crawlingListPagesProcessed}`;
const pagesPart = `${next.crawlingJobPagesProcessed}/${next.crawlingJobPagesEnqueued}`;
const termsPart =
next.crawlingTermsTotal > 0
? `, terms ${next.crawlingTermsProcessed}/${next.crawlingTermsTotal}`
: "";
const skippedPart =
next.crawlingJobPagesSkipped > 0
? `, skipped ${next.crawlingJobPagesSkipped}`
@ -186,7 +240,7 @@ export const progressHelpers = {
? `, cards ${next.crawlingJobCardsFound}`
: "";
const message = `Crawling jobs (${sourcesPart} sources, pages ${pagesPart}${skippedPart}${cardsPart})...`;
const message = `Crawling jobs (list pages ${sourcesPart}, job pages ${pagesPart}${termsPart}${skippedPart}${cardsPart})...`;
const detail =
next.crawlingCurrentUrl && next.crawlingPhase
? `${next.crawlingPhase === "list" ? "List" : "Job"}: ${next.crawlingCurrentUrl}`
@ -198,6 +252,9 @@ export const progressHelpers = {
step: "crawling",
message,
detail,
crawlingSource: next.crawlingSource,
crawlingTermsProcessed: next.crawlingTermsProcessed,
crawlingTermsTotal: next.crawlingTermsTotal,
crawlingListPagesProcessed: next.crawlingListPagesProcessed,
crawlingListPagesTotal: next.crawlingListPagesTotal,
crawlingJobCardsFound: next.crawlingJobCardsFound,
@ -215,6 +272,7 @@ export const progressHelpers = {
message: `Found ${jobsFound} jobs, importing to database...`,
detail: "Deduplicating and saving",
jobsDiscovered: jobsFound,
crawlingSource: null,
crawlingCurrentUrl: undefined,
}),
@ -283,6 +341,15 @@ export const progressHelpers = {
currentJob: undefined,
}),
cancelled: (reason: string) =>
updateProgress({
step: "cancelled",
message: "Pipeline cancelled",
detail: reason,
completedAt: new Date().toISOString(),
currentJob: undefined,
}),
failed: (error: string) =>
updateProgress({
step: "failed",

View File

@ -1,5 +1,6 @@
import type { PipelineConfig } from "@shared/types";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { getProgress, resetProgress } from "../progress";
import { discoverJobsStep } from "./discover-jobs";
vi.mock("../../repositories/jobs", () => ({
@ -36,6 +37,7 @@ const config: PipelineConfig = {
describe("discoverJobsStep", () => {
beforeEach(() => {
vi.clearAllMocks();
resetProgress();
});
it("applies jobspySites setting and aggregates source errors", async () => {
@ -96,4 +98,165 @@ describe("discoverJobsStep", () => {
}),
).rejects.toThrow("All sources failed: ukvisajobs: boom");
});
it("maps Gradcracker progress callback into live crawling counters", async () => {
const settingsRepo = await import("../../repositories/settings");
const crawler = await import("../../services/crawler");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
} as any);
vi.mocked(crawler.runCrawler).mockImplementation(async (options: any) => {
options?.onProgress?.({
phase: "list",
currentUrl: "https://example.com/list",
listPagesProcessed: 3,
listPagesTotal: 10,
jobCardsFound: 42,
jobPagesEnqueued: 30,
jobPagesSkipped: 4,
jobPagesProcessed: 8,
});
return { success: true, jobs: [] } as any;
});
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["gradcracker"],
},
});
const progress = getProgress();
expect(progress.crawlingSource).toBeNull();
expect(progress.crawlingListPagesProcessed).toBe(3);
expect(progress.crawlingListPagesTotal).toBe(10);
expect(progress.crawlingJobCardsFound).toBe(42);
expect(progress.crawlingJobPagesEnqueued).toBe(30);
expect(progress.crawlingJobPagesSkipped).toBe(4);
expect(progress.crawlingJobPagesProcessed).toBe(8);
});
it("updates JobSpy terms and UKVisa pages via progress callbacks", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
const ukVisa = await import("../../services/ukvisajobs");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer", "frontend"]),
jobspySites: JSON.stringify(["linkedin"]),
} as any);
vi.mocked(jobSpy.runJobSpy).mockImplementation(async (options: any) => {
options?.onProgress?.({
type: "term_start",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
});
options?.onProgress?.({
type: "term_complete",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
jobsFoundTerm: 10,
});
options?.onProgress?.({
type: "term_start",
termIndex: 2,
termTotal: 2,
searchTerm: "frontend",
});
options?.onProgress?.({
type: "term_complete",
termIndex: 2,
termTotal: 2,
searchTerm: "frontend",
jobsFoundTerm: 8,
});
return { success: true, jobs: [] } as any;
});
vi.mocked(ukVisa.runUkVisaJobs).mockImplementation(async (options: any) => {
options?.onProgress?.({
type: "init",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
maxPages: 4,
maxJobs: 50,
});
options?.onProgress?.({
type: "page_fetched",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
pageNo: 2,
maxPages: 4,
jobsOnPage: 15,
totalCollected: 18,
totalAvailable: 100,
});
options?.onProgress?.({
type: "term_complete",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
jobsFoundTerm: 18,
totalCollected: 18,
});
return { success: true, jobs: [] } as any;
});
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["linkedin", "ukvisajobs"],
},
});
const progress = getProgress();
expect(progress.crawlingTermsProcessed).toBe(1);
expect(progress.crawlingTermsTotal).toBe(2);
expect(progress.crawlingListPagesProcessed).toBe(2);
expect(progress.crawlingListPagesTotal).toBe(4);
expect(progress.crawlingJobPagesEnqueued).toBe(18);
expect(progress.crawlingJobPagesProcessed).toBe(18);
});
it("tracks source completion counters across source transitions", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
const crawler = await import("../../services/crawler");
const ukVisa = await import("../../services/ukvisajobs");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
} as any);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [],
} as any);
vi.mocked(crawler.runCrawler).mockResolvedValue({
success: true,
jobs: [],
} as any);
vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({
success: true,
jobs: [],
} as any);
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["linkedin", "gradcracker", "ukvisajobs"],
},
});
const progress = getProgress();
expect(progress.crawlingSourcesTotal).toBe(3);
expect(progress.crawlingSourcesCompleted).toBe(3);
});
});

View File

@ -9,12 +9,12 @@ import { progressHelpers, updateProgress } from "../progress";
export async function discoverJobsStep(args: {
mergedConfig: PipelineConfig;
shouldCancel?: () => boolean;
}): Promise<{
discoveredJobs: CreateJobInput[];
sourceErrors: string[];
}> {
logger.info("Running discovery step");
progressHelpers.startCrawling();
const discoveredJobs: CreateJobInput[] = [];
const sourceErrors: string[] = [];
@ -52,9 +52,31 @@ export async function discoverJobsStep(args: {
}
}
if (jobSpySites.length > 0) {
updateProgress({
step: "crawling",
const shouldRunJobSpy = jobSpySites.length > 0;
const shouldRunGradcracker =
args.mergedConfig.sources.includes("gradcracker");
const shouldRunUkVisaJobs = args.mergedConfig.sources.includes("ukvisajobs");
const totalSources =
Number(shouldRunJobSpy) +
Number(shouldRunGradcracker) +
Number(shouldRunUkVisaJobs);
let completedSources = 0;
progressHelpers.startCrawling(totalSources);
const markSourceComplete = () => {
completedSources += 1;
progressHelpers.completeSource(completedSources, totalSources);
};
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunJobSpy) {
progressHelpers.startSource("jobspy", completedSources, totalSources, {
termsTotal: searchTerms.length,
detail: `JobSpy: scraping ${jobSpySites.join(", ")}...`,
});
@ -79,6 +101,34 @@ export async function discoverJobsStep(args: {
settings.jobspyIsRemote !== undefined
? settings.jobspyIsRemote === "1"
: undefined,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "jobspy",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
progressHelpers.crawlingUpdate({
source: "jobspy",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`,
});
},
});
if (!jobSpyResult.success) {
@ -86,10 +136,18 @@ export async function discoverJobsStep(args: {
} else {
discoveredJobs.push(...jobSpyResult.jobs);
}
markSourceComplete();
}
if (args.mergedConfig.sources.includes("gradcracker")) {
updateProgress({ step: "crawling", detail: "Gradcracker: scraping..." });
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunGradcracker) {
progressHelpers.startSource("gradcracker", completedSources, totalSources, {
detail: "Gradcracker: scraping...",
});
const existingJobUrls = await jobsRepo.getAllJobUrls();
const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm
@ -101,16 +159,17 @@ export async function discoverJobsStep(args: {
searchTerms,
maxJobsPerTerm: gradcrackerMaxJobs,
onProgress: (progress) => {
if (progress.listPagesTotal && progress.listPagesTotal > 0) {
const percent = Math.round(
((progress.listPagesProcessed ?? 0) / progress.listPagesTotal) *
100,
);
updateProgress({
step: "crawling",
detail: `Gradcracker: ${percent}% (scan ${progress.listPagesProcessed}/${progress.listPagesTotal}, found ${progress.jobCardsFound})`,
});
}
progressHelpers.crawlingUpdate({
source: "gradcracker",
listPagesProcessed: progress.listPagesProcessed,
listPagesTotal: progress.listPagesTotal,
jobCardsFound: progress.jobCardsFound,
jobPagesEnqueued: progress.jobPagesEnqueued,
jobPagesSkipped: progress.jobPagesSkipped,
jobPagesProcessed: progress.jobPagesProcessed,
phase: progress.phase,
currentUrl: progress.currentUrl,
});
},
});
@ -121,11 +180,17 @@ export async function discoverJobsStep(args: {
} else {
discoveredJobs.push(...crawlerResult.jobs);
}
markSourceComplete();
}
if (args.mergedConfig.sources.includes("ukvisajobs")) {
updateProgress({
step: "crawling",
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunUkVisaJobs) {
progressHelpers.startSource("ukvisajobs", completedSources, totalSources, {
termsTotal: searchTerms.length,
detail: "UKVisaJobs: scraping visa-sponsoring jobs...",
});
@ -136,6 +201,76 @@ export async function discoverJobsStep(args: {
const ukVisaResult = await runUkVisaJobs({
maxJobs: ukvisajobsMaxJobs,
searchTerms,
onProgress: (event) => {
if (event.type === "init") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: 0,
listPagesTotal: event.maxPages,
jobPagesEnqueued: 0,
jobPagesProcessed: 0,
jobPagesSkipped: 0,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
}
if (event.type === "page_fetched") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo,
listPagesTotal: event.maxPages,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${event.pageNo}/${event.maxPages}`,
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`,
});
return;
}
if (event.type === "term_complete") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
}
if (event.type === "empty_page") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`,
});
return;
}
if (event.type === "error") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: ${event.message}`,
});
}
},
});
if (!ukVisaResult.success) {
@ -143,6 +278,8 @@ export async function discoverJobsStep(args: {
} else {
discoveredJobs.push(...ukVisaResult.jobs);
}
markSourceComplete();
}
if (discoveredJobs.length === 0 && sourceErrors.length > 0) {

View File

@ -10,6 +10,7 @@ type ProcessJobFn = (
export async function processJobsStep(args: {
jobsToProcess: ScoredJob[];
processJob: ProcessJobFn;
shouldCancel?: () => boolean;
}): Promise<{ processedCount: number }> {
let processedCount = 0;
@ -21,6 +22,8 @@ export async function processJobsStep(args: {
});
for (let i = 0; i < args.jobsToProcess.length; i++) {
if (args.shouldCancel?.()) break;
const job = args.jobsToProcess[i];
progressHelpers.processingJob(i + 1, args.jobsToProcess.length, job);

View File

@ -9,6 +9,7 @@ import type { ScoredJob } from "./types";
export async function scoreJobsStep(args: {
profile: Record<string, unknown>;
shouldCancel?: () => boolean;
}): Promise<{ unprocessedJobs: Job[]; scoredJobs: ScoredJob[] }> {
logger.info("Running scoring step");
const unprocessedJobs = await jobsRepo.getUnscoredDiscoveredJobs();
@ -33,6 +34,8 @@ export async function scoreJobsStep(args: {
const scoredJobs: ScoredJob[] = [];
for (let i = 0; i < unprocessedJobs.length; i++) {
if (args.shouldCancel?.()) break;
const job = unprocessedJobs[i];
const hasCachedScore =
typeof job.suitabilityScore === "number" &&

View File

@ -40,7 +40,7 @@ export async function updatePipelineRun(
id: string,
update: Partial<{
completedAt: string;
status: "running" | "completed" | "failed";
status: "running" | "completed" | "failed" | "cancelled";
jobsDiscovered: number;
jobsProcessed: number;
errorMessage: string;

View File

@ -0,0 +1,40 @@
import { describe, expect, it } from "vitest";
import { parseJobSpyProgressLine } from "./jobspy";
describe("parseJobSpyProgressLine", () => {
it("parses term_start progress lines", () => {
const event = parseJobSpyProgressLine(
'JOBOPS_PROGRESS {"event":"term_start","termIndex":1,"termTotal":3,"searchTerm":"engineer"}',
);
expect(event).toEqual({
type: "term_start",
termIndex: 1,
termTotal: 3,
searchTerm: "engineer",
});
});
it("parses term_complete progress lines", () => {
const event = parseJobSpyProgressLine(
'JOBOPS_PROGRESS {"event":"term_complete","termIndex":2,"termTotal":3,"searchTerm":"frontend","jobsFoundTerm":17}',
);
expect(event).toEqual({
type: "term_complete",
termIndex: 2,
termTotal: 3,
searchTerm: "frontend",
jobsFoundTerm: 17,
});
});
it("returns null for malformed payloads", () => {
expect(parseJobSpyProgressLine("JOBOPS_PROGRESS {bad json")).toBeNull();
expect(parseJobSpyProgressLine("JOBOPS_PROGRESS {}")).toBeNull();
});
it("returns null for non-progress lines", () => {
expect(parseJobSpyProgressLine("Found 20 jobs")).toBeNull();
});
});

View File

@ -7,6 +7,7 @@
import { spawn } from "node:child_process";
import { mkdir, readFile, unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
import type { CreateJobInput, JobSource } from "@shared/types";
import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion";
@ -15,6 +16,61 @@ import { getDataDir } from "../config/dataDir";
const __dirname = dirname(fileURLToPath(import.meta.url));
const JOBSPY_DIR = join(__dirname, "../../../../extractors/jobspy");
const JOBSPY_SCRIPT = join(JOBSPY_DIR, "scrape_jobs.py");
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
export type JobSpyProgressEvent =
| {
type: "term_start";
termIndex: number;
termTotal: number;
searchTerm: string;
}
| {
type: "term_complete";
termIndex: number;
termTotal: number;
searchTerm: string;
jobsFoundTerm: number;
};
export function parseJobSpyProgressLine(
line: string,
): JobSpyProgressEvent | null {
if (!line.startsWith(JOBOPS_PROGRESS_PREFIX)) return null;
const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim();
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(raw) as Record<string, unknown>;
} catch {
return null;
}
const eventName = toStringOrNull(parsed.event);
const termIndex = toNumberOrNull(parsed.termIndex);
const termTotal = toNumberOrNull(parsed.termTotal);
const searchTerm = toStringOrNull(parsed.searchTerm) ?? "";
if (!eventName || termIndex === null || termTotal === null) return null;
if (eventName === "term_start") {
return {
type: "term_start",
termIndex,
termTotal,
searchTerm,
};
}
if (eventName === "term_complete") {
return {
type: "term_complete",
termIndex,
termTotal,
searchTerm,
jobsFoundTerm: toNumberOrNull(parsed.jobsFoundTerm) ?? 0,
};
}
return null;
}
function getPythonPath(): string {
if (process.env.PYTHON_PATH) return process.env.PYTHON_PATH;
@ -92,6 +148,7 @@ export interface RunJobSpyOptions {
countryIndeed?: string;
linkedinFetchDescription?: boolean;
isRemote?: boolean;
onProgress?: (event: JobSpyProgressEvent) => void;
}
export interface JobSpyResult {
@ -131,11 +188,13 @@ export async function runJobSpy(
const child = spawn(pythonPath, [JOBSPY_SCRIPT], {
cwd: JOBSPY_DIR,
shell: false,
stdio: "inherit",
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
JOBSPY_SITES: sites || "indeed,linkedin",
JOBSPY_SEARCH_TERM: searchTerm,
JOBSPY_TERM_INDEX: String(i + 1),
JOBSPY_TERM_TOTAL: String(searchTerms.length),
JOBSPY_LOCATION:
options.location ?? process.env.JOBSPY_LOCATION ?? "UK",
JOBSPY_RESULTS_WANTED: String(
@ -161,7 +220,28 @@ export async function runJobSpy(
},
});
const handleLine = (line: string, stream: NodeJS.WriteStream) => {
const event = parseJobSpyProgressLine(line);
if (event) {
options.onProgress?.(event);
return;
}
stream.write(`${line}\n`);
};
const stdoutRl = child.stdout
? createInterface({ input: child.stdout })
: null;
const stderrRl = child.stderr
? createInterface({ input: child.stderr })
: null;
stdoutRl?.on("line", (line) => handleLine(line, process.stdout));
stderrRl?.on("line", (line) => handleLine(line, process.stderr));
child.on("close", (code) => {
stdoutRl?.close();
stderrRl?.close();
if (code === 0) resolve();
else reject(new Error(`JobSpy exited with code ${code}`));
});

View File

@ -0,0 +1,72 @@
import { describe, expect, it } from "vitest";
import { parseUkVisaJobsProgressLine } from "./ukvisajobs";
describe("parseUkVisaJobsProgressLine", () => {
it("parses init events", () => {
const event = parseUkVisaJobsProgressLine(
'JOBOPS_PROGRESS {"event":"init","maxPages":4,"maxJobs":50,"searchKeyword":"engineer"}',
);
expect(event).toEqual({
type: "init",
maxPages: 4,
maxJobs: 50,
searchKeyword: "engineer",
});
});
it("parses page_fetched events", () => {
const event = parseUkVisaJobsProgressLine(
'JOBOPS_PROGRESS {"event":"page_fetched","pageNo":2,"maxPages":4,"jobsOnPage":15,"totalCollected":28,"totalAvailable":105}',
);
expect(event).toEqual({
type: "page_fetched",
pageNo: 2,
maxPages: 4,
jobsOnPage: 15,
totalCollected: 28,
totalAvailable: 105,
});
});
it("parses terminal and error events", () => {
expect(
parseUkVisaJobsProgressLine(
'JOBOPS_PROGRESS {"event":"empty_page","pageNo":3,"maxPages":4,"totalCollected":28}',
),
).toEqual({
type: "empty_page",
pageNo: 3,
maxPages: 4,
totalCollected: 28,
});
expect(
parseUkVisaJobsProgressLine(
'JOBOPS_PROGRESS {"event":"done","maxPages":4,"totalCollected":42,"totalAvailable":105}',
),
).toEqual({
type: "done",
maxPages: 4,
totalCollected: 42,
totalAvailable: 105,
});
expect(
parseUkVisaJobsProgressLine(
'JOBOPS_PROGRESS {"event":"error","message":"boom","pageNo":2,"status":500}',
),
).toEqual({
type: "error",
message: "boom",
pageNo: 2,
status: 500,
});
});
it("ignores malformed or unrelated lines", () => {
expect(parseUkVisaJobsProgressLine("JOBOPS_PROGRESS {bad")).toBeNull();
expect(parseUkVisaJobsProgressLine("normal log line")).toBeNull();
});
});

View File

@ -7,6 +7,7 @@
import { spawn } from "node:child_process";
import { mkdir, readdir, readFile, rm } from "node:fs/promises";
import { dirname, join } from "node:path";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
import type { CreateJobInput } from "@shared/types";
import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion";
@ -18,6 +19,7 @@ const AUTH_CACHE_PATH = join(UKVISAJOBS_DIR, "storage/ukvisajobs-auth.json");
const UKVISAJOBS_API_URL =
"https://my.ukvisajobs.com/ukvisa-api/api/fetch-jobs-data";
const UKVISAJOBS_PAGE_SIZE = 15;
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
let isUkVisaJobsRunning = false;
interface UkVisaJobsAuthSession {
@ -34,6 +36,8 @@ export interface RunUkVisaJobsOptions {
searchKeyword?: string;
/** List of search terms to run sequentially */
searchTerms?: string[];
/** Optional callback for structured progress emitted by extractor runs. */
onProgress?: (event: UkVisaJobsProgressEvent) => void;
}
export interface UkVisaJobsResult {
@ -42,6 +46,133 @@ export interface UkVisaJobsResult {
error?: string;
}
type UkVisaJobsExtractorProgressEvent =
| {
type: "init";
maxPages: number;
maxJobs: number;
searchKeyword: string;
}
| {
type: "page_fetched";
pageNo: number;
maxPages: number;
jobsOnPage: number;
totalCollected: number;
totalAvailable: number;
}
| {
type: "done";
maxPages: number;
totalCollected: number;
totalAvailable: number;
}
| {
type: "empty_page";
pageNo: number;
maxPages: number;
totalCollected: number;
}
| {
type: "error";
message: string;
pageNo?: number;
status?: number;
};
type UkVisaJobsExtractorEventWithTerm = UkVisaJobsExtractorProgressEvent & {
termIndex: number;
termTotal: number;
searchTerm: string;
};
export type UkVisaJobsProgressEvent =
| UkVisaJobsExtractorEventWithTerm
| {
type: "term_complete";
termIndex: number;
termTotal: number;
searchTerm: string;
jobsFoundTerm: number;
totalCollected: number;
};
export function parseUkVisaJobsProgressLine(
line: string,
): UkVisaJobsExtractorProgressEvent | null {
if (!line.startsWith(JOBOPS_PROGRESS_PREFIX)) return null;
const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim();
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(raw) as Record<string, unknown>;
} catch {
return null;
}
const event = toStringOrNull(parsed.event);
if (!event) return null;
if (event === "init") {
const maxPages = toNumberOrNull(parsed.maxPages);
const maxJobs = toNumberOrNull(parsed.maxJobs);
if (maxPages === null || maxJobs === null) return null;
return {
type: "init",
maxPages,
maxJobs,
searchKeyword: toStringOrNull(parsed.searchKeyword) ?? "",
};
}
if (event === "page_fetched") {
const pageNo = toNumberOrNull(parsed.pageNo);
const maxPages = toNumberOrNull(parsed.maxPages);
if (pageNo === null || maxPages === null) return null;
return {
type: "page_fetched",
pageNo,
maxPages,
jobsOnPage: toNumberOrNull(parsed.jobsOnPage) ?? 0,
totalCollected: toNumberOrNull(parsed.totalCollected) ?? 0,
totalAvailable: toNumberOrNull(parsed.totalAvailable) ?? 0,
};
}
if (event === "done") {
const maxPages = toNumberOrNull(parsed.maxPages);
if (maxPages === null) return null;
return {
type: "done",
maxPages,
totalCollected: toNumberOrNull(parsed.totalCollected) ?? 0,
totalAvailable: toNumberOrNull(parsed.totalAvailable) ?? 0,
};
}
if (event === "empty_page") {
const pageNo = toNumberOrNull(parsed.pageNo);
const maxPages = toNumberOrNull(parsed.maxPages);
if (pageNo === null || maxPages === null) return null;
return {
type: "empty_page",
pageNo,
maxPages,
totalCollected: toNumberOrNull(parsed.totalCollected) ?? 0,
};
}
if (event === "error") {
return {
type: "error",
message: toStringOrNull(parsed.message) ?? "unknown error",
pageNo: toNumberOrNull(parsed.pageNo) ?? undefined,
status: toNumberOrNull(parsed.status) ?? undefined,
};
}
return null;
}
function buildCookieHeader(session: UkVisaJobsAuthSession): string {
const cookieParts: string[] = [];
if (session.csrfToken) cookieParts.push(`csrf_token=${session.csrfToken}`);
@ -442,10 +573,12 @@ export async function runUkVisaJobs(
const allJobs: CreateJobInput[] = [];
const seenIds = new Set<string>();
const termTotal = terms.length;
for (let i = 0; i < terms.length; i++) {
const term = terms[i];
const termLabel = term ? `"${term}"` : "all jobs";
const termIndex = i + 1;
console.log(` Running for ${termLabel}...`);
try {
@ -457,15 +590,42 @@ export async function runUkVisaJobs(
await new Promise<void>((resolve, reject) => {
const child = spawn("npx", ["tsx", "src/main.ts"], {
cwd: UKVISAJOBS_DIR,
stdio: "inherit",
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
JOBOPS_EMIT_PROGRESS: "1",
UKVISAJOBS_MAX_JOBS: String(options.maxJobs ?? 50),
UKVISAJOBS_SEARCH_KEYWORD: term,
},
});
const handleLine = (line: string, stream: NodeJS.WriteStream) => {
const progressEvent = parseUkVisaJobsProgressLine(line);
if (progressEvent) {
options.onProgress?.({
...progressEvent,
termIndex,
termTotal,
searchTerm: term,
});
return;
}
stream.write(`${line}\n`);
};
const stdoutRl = child.stdout
? createInterface({ input: child.stdout })
: null;
const stderrRl = child.stderr
? createInterface({ input: child.stderr })
: null;
stdoutRl?.on("line", (line) => handleLine(line, process.stdout));
stderrRl?.on("line", (line) => handleLine(line, process.stderr));
child.on("close", (code) => {
stdoutRl?.close();
stderrRl?.close();
if (code === 0) resolve();
else
reject(
@ -508,10 +668,25 @@ export async function runUkVisaJobs(
console.log(
` ✅ Fetched ${runJobs.length} jobs for ${termLabel} (${newCount} new unique)`,
);
options.onProgress?.({
type: "term_complete",
termIndex,
termTotal,
searchTerm: term,
jobsFoundTerm: newCount,
totalCollected: allJobs.length,
});
} catch (error) {
const message =
error instanceof Error ? error.message : "Unknown error";
console.error(`❌ UK Visa Jobs failed for ${termLabel}: ${message}`);
options.onProgress?.({
type: "error",
termIndex,
termTotal,
searchTerm: term,
message,
});
// Continue to next term instead of failing completely
}

View File

@ -302,7 +302,7 @@ export interface PipelineRun {
id: string;
startedAt: string;
completedAt: string | null;
status: "running" | "completed" | "failed";
status: "running" | "completed" | "failed" | "cancelled";
jobsDiscovered: number;
jobsProcessed: number;
errorMessage: string | null;