diff --git a/docs-site/docs/features/orchestrator.md b/docs-site/docs/features/orchestrator.md index 34d62f6..00b0a14 100644 --- a/docs-site/docs/features/orchestrator.md +++ b/docs-site/docs/features/orchestrator.md @@ -72,7 +72,7 @@ PDF generation uses: Common paths: -- Discovered to finalization: `POST /api/jobs/:id/process` +- Discovered to finalization: `POST /api/jobs/actions` with `{ "action": "move_to_ready", "jobIds": [""] }` - Ready regeneration: `POST /api/jobs/:id/generate-pdf` ### Regenerating PDFs after edits (copy-pasteable examples) diff --git a/docs-site/versioned_docs/version-0.1.20/features/orchestrator.md b/docs-site/versioned_docs/version-0.1.20/features/orchestrator.md index d9fd609..4f4ef27 100644 --- a/docs-site/versioned_docs/version-0.1.20/features/orchestrator.md +++ b/docs-site/versioned_docs/version-0.1.20/features/orchestrator.md @@ -65,7 +65,18 @@ PDF generation uses: Common paths: -- Discovered to finalization: `POST /api/jobs/:id/process` +- Discovered to finalization: `POST /api/jobs/actions` with payload: + + ```bash + curl -X POST "http://localhost:3001/api/jobs/actions" \ + -H "content-type: application/json" \ + -d '{ + "action": "move_to_ready", + "jobIds": [""] + }' + ``` + +- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload) - Ready regeneration: `POST /api/jobs/:id/generate-pdf` ### Regenerating PDFs after edits (copy-pasteable examples) diff --git a/docs-site/versioned_docs/version-0.1.21/features/orchestrator.md b/docs-site/versioned_docs/version-0.1.21/features/orchestrator.md index 571e28f..18b00ae 100644 --- a/docs-site/versioned_docs/version-0.1.21/features/orchestrator.md +++ b/docs-site/versioned_docs/version-0.1.21/features/orchestrator.md @@ -65,7 +65,18 @@ PDF generation uses: Common paths: -- Discovered to finalization: `POST /api/jobs/:id/process` +- Discovered to finalization: `POST /api/jobs/actions` with payload: + + ```bash + curl -X POST "http://localhost:3001/api/jobs/actions" \ + -H "content-type: application/json" \ + -d '{ + "action": "move_to_ready", + "jobIds": [""] + }' + ``` + +- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload) - Ready regeneration: `POST /api/jobs/:id/generate-pdf` ### Regenerating PDFs after edits (copy-pasteable examples) diff --git a/docs-site/versioned_docs/version-0.1.22/features/orchestrator.md b/docs-site/versioned_docs/version-0.1.22/features/orchestrator.md index 0126206..7e79869 100644 --- a/docs-site/versioned_docs/version-0.1.22/features/orchestrator.md +++ b/docs-site/versioned_docs/version-0.1.22/features/orchestrator.md @@ -67,7 +67,18 @@ PDF generation uses: Common paths: -- Discovered to finalization: `POST /api/jobs/:id/process` +- Discovered to finalization: `POST /api/jobs/actions` with payload: + + ```bash + curl -X POST "http://localhost:3001/api/jobs/actions" \ + -H "content-type: application/json" \ + -d '{ + "action": "move_to_ready", + "jobIds": [""] + }' + ``` + +- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload) - Ready regeneration: `POST /api/jobs/:id/generate-pdf` ### Regenerating PDFs after edits (copy-pasteable examples) diff --git a/docs-site/versioned_docs/version-0.1.23/features/orchestrator.md b/docs-site/versioned_docs/version-0.1.23/features/orchestrator.md index 34d62f6..31831a6 100644 --- a/docs-site/versioned_docs/version-0.1.23/features/orchestrator.md +++ b/docs-site/versioned_docs/version-0.1.23/features/orchestrator.md @@ -72,7 +72,18 @@ PDF generation uses: Common paths: -- Discovered to finalization: `POST /api/jobs/:id/process` +- Discovered to finalization: `POST /api/jobs/actions` with payload: + + ```bash + curl -X POST "http://localhost:3001/api/jobs/actions" \ + -H "content-type: application/json" \ + -d '{ + "action": "move_to_ready", + "jobIds": [""] + }' + ``` + +- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload) - Ready regeneration: `POST /api/jobs/:id/generate-pdf` ### Regenerating PDFs after edits (copy-pasteable examples) diff --git a/docs-site/versioned_docs/version-0.1.24/features/orchestrator.md b/docs-site/versioned_docs/version-0.1.24/features/orchestrator.md index 34d62f6..31831a6 100644 --- a/docs-site/versioned_docs/version-0.1.24/features/orchestrator.md +++ b/docs-site/versioned_docs/version-0.1.24/features/orchestrator.md @@ -72,7 +72,18 @@ PDF generation uses: Common paths: -- Discovered to finalization: `POST /api/jobs/:id/process` +- Discovered to finalization: `POST /api/jobs/actions` with payload: + + ```bash + curl -X POST "http://localhost:3001/api/jobs/actions" \ + -H "content-type: application/json" \ + -d '{ + "action": "move_to_ready", + "jobIds": [""] + }' + ``` + +- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload) - Ready regeneration: `POST /api/jobs/:id/generate-pdf` ### Regenerating PDFs after edits (copy-pasteable examples) diff --git a/docs-site/versioned_docs/version-0.1.25/features/orchestrator.md b/docs-site/versioned_docs/version-0.1.25/features/orchestrator.md index 34d62f6..31831a6 100644 --- a/docs-site/versioned_docs/version-0.1.25/features/orchestrator.md +++ b/docs-site/versioned_docs/version-0.1.25/features/orchestrator.md @@ -72,7 +72,18 @@ PDF generation uses: Common paths: -- Discovered to finalization: `POST /api/jobs/:id/process` +- Discovered to finalization: `POST /api/jobs/actions` with payload: + + ```bash + curl -X POST "http://localhost:3001/api/jobs/actions" \ + -H "content-type: application/json" \ + -d '{ + "action": "move_to_ready", + "jobIds": [""] + }' + ``` + +- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload) - Ready regeneration: `POST /api/jobs/:id/generate-pdf` ### Regenerating PDFs after edits (copy-pasteable examples) diff --git a/orchestrator/README.md b/orchestrator/README.md index 4ad818b..35a5b00 100644 --- a/orchestrator/README.md +++ b/orchestrator/README.md @@ -65,9 +65,9 @@ orchestrator/ | GET | `/api/jobs` | List all jobs (filter with `?status=ready,discovered`) | | GET | `/api/jobs/:id` | Get single job | | PATCH | `/api/jobs/:id` | Update job | -| POST | `/api/jobs/:id/process` | Generate resume for job | +| POST | `/api/jobs/actions` | Run job actions (`move_to_ready`, `rescore`, `skip`) for one or many jobs | +| POST | `/api/jobs/actions/stream` | Stream job action progress/events for one or many jobs | | POST | `/api/jobs/:id/apply` | Mark as applied | -| POST | `/api/jobs/:id/skip` | Mark as skipped | ### Pipeline diff --git a/orchestrator/src/client/api/client.stream.test.ts b/orchestrator/src/client/api/client.stream.test.ts index f03f354..666d726 100644 --- a/orchestrator/src/client/api/client.stream.test.ts +++ b/orchestrator/src/client/api/client.stream.test.ts @@ -28,7 +28,7 @@ describe("API client SSE streaming", () => { } as Response); await expect( - api.streamBulkJobAction( + api.streamJobAction( { action: "skip", jobIds: ["job-1"] }, { onEvent: () => { diff --git a/orchestrator/src/client/api/client.ts b/orchestrator/src/client/api/client.ts index 59dfa91..c30811c 100644 --- a/orchestrator/src/client/api/client.ts +++ b/orchestrator/src/client/api/client.ts @@ -9,13 +9,11 @@ import type { ApplicationTask, AppSettings, BackupInfo, - BulkJobActionRequest, - BulkJobActionResponse, - BulkJobActionStreamEvent, - BulkPostApplicationAction, - BulkPostApplicationActionResponse, DemoInfoResponse, Job, + JobActionRequest, + JobActionResponse, + JobActionStreamEvent, JobChatMessage, JobChatStreamEvent, JobChatThread, @@ -29,6 +27,8 @@ import type { ManualJobFetchResponse, ManualJobInferenceResponse, PipelineStatusResponse, + PostApplicationAction, + PostApplicationActionResponse, PostApplicationInboxItem, PostApplicationProvider, PostApplicationProviderActionResponse, @@ -84,7 +84,7 @@ type LegacyApiResponse = }; type StreamSseInput = - | BulkJobActionRequest + | JobActionRequest | { content: string; stream: true } | { stream: true }; @@ -734,20 +734,45 @@ export async function streamRegenerateJobGhostwriterMessage( ); } +function toJobIdList(idOrIds: string | string[]): string[] { + return Array.isArray(idOrIds) ? idOrIds : [idOrIds]; +} + +export async function processJob( + ids: string[], + options?: { force?: boolean }, +): Promise; export async function processJob( id: string, options?: { force?: boolean }, -): Promise { - const query = options?.force ? "?force=1" : ""; - return fetchApi(`/jobs/${id}/process${query}`, { - method: "POST", +): Promise; +export async function processJob( + idOrIds: string | string[], + options?: { force?: boolean }, +): Promise { + const jobIds = toJobIdList(idOrIds); + const result = await runJobAction({ + action: "move_to_ready", + jobIds, + ...(options?.force ? { options: { force: true } } : {}), }); + + if (Array.isArray(idOrIds)) return result; + return getSingleJobFromActionResult(result, idOrIds); } -export async function rescoreJob(id: string): Promise { - return fetchApi(`/jobs/${id}/rescore`, { - method: "POST", +export async function rescoreJob(ids: string[]): Promise; +export async function rescoreJob(id: string): Promise; +export async function rescoreJob( + idOrIds: string | string[], +): Promise { + const jobIds = toJobIdList(idOrIds); + const result = await runJobAction({ + action: "rescore", + jobIds, }); + if (Array.isArray(idOrIds)) return result; + return getSingleJobFromActionResult(result, idOrIds); } export async function summarizeJob( @@ -778,30 +803,54 @@ export async function markAsApplied(id: string): Promise { }); } -export async function skipJob(id: string): Promise { - return fetchApi(`/jobs/${id}/skip`, { - method: "POST", +export async function skipJob(ids: string[]): Promise; +export async function skipJob(id: string): Promise; +export async function skipJob( + idOrIds: string | string[], +): Promise { + const jobIds = toJobIdList(idOrIds); + const result = await runJobAction({ + action: "skip", + jobIds, }); + if (Array.isArray(idOrIds)) return result; + return getSingleJobFromActionResult(result, idOrIds); } -export async function bulkJobAction( - input: BulkJobActionRequest, -): Promise { - return fetchApi("/jobs/bulk-actions", { +export async function runJobAction( + input: JobActionRequest, +): Promise { + return fetchApi("/jobs/actions", { method: "POST", body: JSON.stringify(input), }); } -export async function streamBulkJobAction( - input: BulkJobActionRequest, +function getSingleJobFromActionResult( + response: JobActionResponse, + jobId: string, +): Job { + const result = response.results.find((entry) => entry.jobId === jobId); + if (!result) { + throw new ApiClientError("Job action did not return a result for the job"); + } + if (!result.ok) { + throw new ApiClientError(result.error.message, { + code: result.error.code, + }); + } + return result.job; +} + +export async function streamJobAction( + input: JobActionRequest, handlers: { - onEvent: (event: BulkJobActionStreamEvent) => void; + onEvent: (event: JobActionStreamEvent) => void; signal?: AbortSignal; }, ): Promise { - return streamSseEvents( - "/jobs/bulk-actions/stream", + return streamSseEvents( + "/jobs/actions/stream", input, handlers, ); @@ -1083,14 +1132,14 @@ export async function denyPostApplicationInboxItem(input: { ); } -export async function bulkPostApplicationInboxAction(input: { - action: BulkPostApplicationAction; +export async function runPostApplicationInboxAction(input: { + action: PostApplicationAction; provider?: PostApplicationProvider; accountKey?: string; decidedBy?: string; -}): Promise { - return fetchApi( - "/post-application/inbox/bulk", +}): Promise { + return fetchApi( + "/post-application/inbox/actions", { method: "POST", body: JSON.stringify({ @@ -1363,7 +1412,7 @@ export async function updateVisaSponsorList(): Promise<{ }); } -// Bulk operations (intentionally none - processing is manual) +// Multi-job operations (intentionally none - processing is manual) // Backup API export interface BackupListResponse { diff --git a/orchestrator/src/client/pages/OrchestratorPage.test.tsx b/orchestrator/src/client/pages/OrchestratorPage.test.tsx index 3317ffc..a857fad 100644 --- a/orchestrator/src/client/pages/OrchestratorPage.test.tsx +++ b/orchestrator/src/client/pages/OrchestratorPage.test.tsx @@ -816,7 +816,7 @@ describe("OrchestratorPage", () => { }); }); - it("shows and hides bulk Recalculate match based on selected statuses", async () => { + it("shows and hides Recalculate match based on selected statuses", async () => { window.matchMedia = createMatchMedia( true, ) as unknown as typeof window.matchMedia; diff --git a/orchestrator/src/client/pages/OrchestratorPage.tsx b/orchestrator/src/client/pages/OrchestratorPage.tsx index 4917220..7089e71 100644 --- a/orchestrator/src/client/pages/OrchestratorPage.tsx +++ b/orchestrator/src/client/pages/OrchestratorPage.tsx @@ -21,7 +21,7 @@ import type { AutomaticRunValues } from "./orchestrator/automatic-run"; import { deriveExtractorLimits } from "./orchestrator/automatic-run"; import type { FilterTab } from "./orchestrator/constants"; import { tabs } from "./orchestrator/constants"; -import { FloatingBulkActionsBar } from "./orchestrator/FloatingBulkActionsBar"; +import { FloatingJobActionsBar } from "./orchestrator/FloatingJobActionsBar"; import { JobCommandBar } from "./orchestrator/JobCommandBar"; import { JobDetailPanel } from "./orchestrator/JobDetailPanel"; import { JobListPanel } from "./orchestrator/JobListPanel"; @@ -30,8 +30,8 @@ import { OrchestratorHeader } from "./orchestrator/OrchestratorHeader"; import { OrchestratorSummary } from "./orchestrator/OrchestratorSummary"; import { RunModeModal } from "./orchestrator/RunModeModal"; import type { RunMode } from "./orchestrator/run-mode"; -import { useBulkJobSelection } from "./orchestrator/useBulkJobSelection"; import { useFilteredJobs } from "./orchestrator/useFilteredJobs"; +import { useJobSelectionActions } from "./orchestrator/useJobSelectionActions"; import { useOrchestratorData } from "./orchestrator/useOrchestratorData"; import { useOrchestratorFilters } from "./orchestrator/useOrchestratorFilters"; import { usePipelineSources } from "./orchestrator/usePipelineSources"; @@ -179,12 +179,12 @@ export const OrchestratorPage: React.FC = () => { canSkipSelected, canMoveSelected, canRescoreSelected, - bulkActionInFlight, + jobActionInFlight, toggleSelectJob, toggleSelectAll, clearSelection, - runBulkAction, - } = useBulkJobSelection({ + runJobAction, + } = useJobSelectionActions({ activeJobs, activeTab, loadJobs, @@ -403,9 +403,16 @@ export const OrchestratorPage: React.FC = () => { // ── Context actions ───────────────────────────────────────────────── [SHORTCUTS.skip.key]: () => { - if (!selectedJob) return; if (!["discovered", "ready"].includes(activeTab)) return; if (shortcutActionInFlight.current) return; + + // Selection action takes precedence if selection exists + if (selectedJobIds.size > 0) { + void runJobAction("skip"); + return; + } + + if (!selectedJob) return; shortcutActionInFlight.current = true; const jobId = selectedJob.id; api @@ -454,9 +461,9 @@ export const OrchestratorPage: React.FC = () => { if (activeTab !== "discovered") return; if (shortcutActionInFlight.current) return; - // Bulk action takes precedence if selection exists + // Selection action takes precedence if selection exists if (selectedJobIds.size > 0) { - void runBulkAction("move_to_ready"); + void runJobAction("move_to_ready"); return; } @@ -713,15 +720,15 @@ export const OrchestratorPage: React.FC = () => { - void runBulkAction("move_to_ready")} - onSkipSelected={() => void runBulkAction("skip")} - onRescoreSelected={() => void runBulkAction("rescore")} + jobActionInFlight={jobActionInFlight !== null} + onMoveToReady={() => void runJobAction("move_to_ready")} + onSkipSelected={() => void runJobAction("skip")} + onRescoreSelected={() => void runJobAction("rescore")} onClear={clearSelection} /> diff --git a/orchestrator/src/client/pages/TrackingInboxPage.tsx b/orchestrator/src/client/pages/TrackingInboxPage.tsx index f490841..664927f 100644 --- a/orchestrator/src/client/pages/TrackingInboxPage.tsx +++ b/orchestrator/src/client/pages/TrackingInboxPage.tsx @@ -164,7 +164,7 @@ export const TrackingInboxPage: React.FC = () => { const isAppliedJobsLoading = appliedJobsQuery.isPending || appliedJobsQuery.isFetching; - const [bulkActionDialog, setBulkActionDialog] = useState<{ + const [inboxActionDialog, setInboxActionDialog] = useState<{ isOpen: boolean; action: "approve" | "deny" | null; itemCount: number; @@ -436,15 +436,15 @@ export const TrackingInboxPage: React.FC = () => { [accountKey, appliedJobByMessageId, provider, refresh], ); - const handleBulkAction = useCallback( + const handleInboxAction = useCallback( async (action: "approve" | "deny") => { if (inbox.length === 0) return; setIsActionLoading(true); - setBulkActionDialog({ isOpen: false, action: null, itemCount: 0 }); + setInboxActionDialog({ isOpen: false, action: null, itemCount: 0 }); try { - const result = await api.bulkPostApplicationInboxAction({ + const result = await api.runPostApplicationInboxAction({ action, provider, accountKey, @@ -479,7 +479,7 @@ export const TrackingInboxPage: React.FC = () => { [accountKey, inbox.length, provider, refresh], ); - const openBulkActionDialog = useCallback( + const openInboxActionDialog = useCallback( (action: "approve" | "deny") => { const eligibleCount = action === "approve" @@ -495,7 +495,7 @@ export const TrackingInboxPage: React.FC = () => { return; } - setBulkActionDialog({ + setInboxActionDialog({ isOpen: true, action, itemCount: eligibleCount, @@ -706,7 +706,7 @@ export const TrackingInboxPage: React.FC = () => { size="sm" className="gap-1" disabled={isActionLoading} - onClick={() => openBulkActionDialog("approve")} + onClick={() => openInboxActionDialog("approve")} > Approve All @@ -716,7 +716,7 @@ export const TrackingInboxPage: React.FC = () => { size="sm" className="gap-1" disabled={isActionLoading} - onClick={() => openBulkActionDialog("deny")} + onClick={() => openInboxActionDialog("deny")} > Ignore All @@ -840,34 +840,34 @@ export const TrackingInboxPage: React.FC = () => { - setBulkActionDialog((previous) => ({ ...previous, isOpen: open })) + setInboxActionDialog((previous) => ({ ...previous, isOpen: open })) } > - {bulkActionDialog.action === "approve" + {inboxActionDialog.action === "approve" ? "Approve All Messages?" : "Ignore All Messages?"} - {bulkActionDialog.action === "approve" - ? `This will approve ${bulkActionDialog.itemCount} message${bulkActionDialog.itemCount === 1 ? "" : "s"} with suggested job matches. Messages without matches will be skipped.` - : `This will ignore all ${bulkActionDialog.itemCount} pending message${bulkActionDialog.itemCount === 1 ? "" : "s"}.`} + {inboxActionDialog.action === "approve" + ? `This will approve ${inboxActionDialog.itemCount} message${inboxActionDialog.itemCount === 1 ? "" : "s"} with suggested job matches. Messages without matches will be skipped.` + : `This will ignore all ${inboxActionDialog.itemCount} pending message${inboxActionDialog.itemCount === 1 ? "" : "s"}.`} Cancel { - if (bulkActionDialog.action) { - void handleBulkAction(bulkActionDialog.action); + if (inboxActionDialog.action) { + void handleInboxAction(inboxActionDialog.action); } }} > - {bulkActionDialog.action === "approve" + {inboxActionDialog.action === "approve" ? "Approve All" : "Ignore All"} diff --git a/orchestrator/src/client/pages/orchestrator/FloatingBulkActionsBar.tsx b/orchestrator/src/client/pages/orchestrator/FloatingJobActionsBar.tsx similarity index 89% rename from orchestrator/src/client/pages/orchestrator/FloatingBulkActionsBar.tsx rename to orchestrator/src/client/pages/orchestrator/FloatingJobActionsBar.tsx index 897487a..f018b93 100644 --- a/orchestrator/src/client/pages/orchestrator/FloatingBulkActionsBar.tsx +++ b/orchestrator/src/client/pages/orchestrator/FloatingJobActionsBar.tsx @@ -3,24 +3,24 @@ import { useEffect, useState } from "react"; import { Button } from "@/components/ui/button"; import { cn } from "@/lib/utils"; -interface FloatingBulkActionsBarProps { +interface FloatingJobActionsBarProps { selectedCount: number; canMoveSelected: boolean; canSkipSelected: boolean; canRescoreSelected: boolean; - bulkActionInFlight: boolean; + jobActionInFlight: boolean; onMoveToReady: () => void; onSkipSelected: () => void; onRescoreSelected: () => void; onClear: () => void; } -export const FloatingBulkActionsBar: React.FC = ({ +export const FloatingJobActionsBar: React.FC = ({ selectedCount, canMoveSelected, canSkipSelected, canRescoreSelected, - bulkActionInFlight, + jobActionInFlight, onMoveToReady, onSkipSelected, onRescoreSelected, @@ -62,7 +62,7 @@ export const FloatingBulkActionsBar: React.FC = ({ size="sm" variant="outline" className="w-full sm:w-auto" - disabled={bulkActionInFlight} + disabled={jobActionInFlight} onClick={onMoveToReady} > Move to Ready @@ -74,7 +74,7 @@ export const FloatingBulkActionsBar: React.FC = ({ size="sm" variant="outline" className="w-full sm:w-auto" - disabled={bulkActionInFlight} + disabled={jobActionInFlight} onClick={onSkipSelected} > Skip selected @@ -86,7 +86,7 @@ export const FloatingBulkActionsBar: React.FC = ({ size="sm" variant="outline" className="w-full sm:w-auto" - disabled={bulkActionInFlight} + disabled={jobActionInFlight} onClick={onRescoreSelected} > Recalculate match @@ -98,7 +98,7 @@ export const FloatingBulkActionsBar: React.FC = ({ variant="ghost" className="w-full sm:w-auto" onClick={onClear} - disabled={bulkActionInFlight} + disabled={jobActionInFlight} > Clear diff --git a/orchestrator/src/client/pages/orchestrator/BulkActionProgressToast.tsx b/orchestrator/src/client/pages/orchestrator/JobActionProgressToast.tsx similarity index 85% rename from orchestrator/src/client/pages/orchestrator/BulkActionProgressToast.tsx rename to orchestrator/src/client/pages/orchestrator/JobActionProgressToast.tsx index 23f3426..06d348d 100644 --- a/orchestrator/src/client/pages/orchestrator/BulkActionProgressToast.tsx +++ b/orchestrator/src/client/pages/orchestrator/JobActionProgressToast.tsx @@ -1,19 +1,19 @@ import { Progress } from "@/components/ui/progress"; import { clampNumber } from "./utils"; -interface BulkActionProgressToastProps { +interface JobActionProgressToastProps { completed: number; requested: number; succeeded: number; failed: number; } -export function BulkActionProgressToast({ +export function JobActionProgressToast({ completed, requested, succeeded, failed, -}: BulkActionProgressToastProps) { +}: JobActionProgressToastProps) { const safeRequested = Math.max(requested, 1); const safeCompleted = clampNumber(completed, 0, safeRequested); const progressValue = Math.round((safeCompleted / safeRequested) * 100); diff --git a/orchestrator/src/client/pages/orchestrator/bulkActions.test.ts b/orchestrator/src/client/pages/orchestrator/jobActions.test.ts similarity index 74% rename from orchestrator/src/client/pages/orchestrator/bulkActions.test.ts rename to orchestrator/src/client/pages/orchestrator/jobActions.test.ts index 2f13c87..7e2679e 100644 --- a/orchestrator/src/client/pages/orchestrator/bulkActions.test.ts +++ b/orchestrator/src/client/pages/orchestrator/jobActions.test.ts @@ -1,37 +1,35 @@ import { createJob } from "@shared/testing/factories.js"; -import type { BulkJobActionResponse } from "@shared/types.js"; +import type { JobActionResponse } from "@shared/types.js"; import { describe, expect, it } from "vitest"; import { - canBulkMoveToReady, - canBulkRescore, - canBulkSkip, + canMoveToReady, + canRescore, + canSkip, getFailedJobIds, -} from "./bulkActions"; +} from "./jobActions"; -describe("bulkActions", () => { +describe("jobActions", () => { it("computes eligibility for skip, move-to-ready, and rescore", () => { expect( - canBulkSkip([ + canSkip([ createJob({ id: "1", status: "discovered" }), createJob({ id: "2", status: "ready" }), ]), ).toBe(true); - expect(canBulkSkip([createJob({ id: "1", status: "applied" })])).toBe( - false, - ); + expect(canSkip([createJob({ id: "1", status: "applied" })])).toBe(false); expect( - canBulkMoveToReady([ + canMoveToReady([ createJob({ id: "1", status: "discovered" }), createJob({ id: "2", status: "discovered" }), ]), ).toBe(true); - expect(canBulkMoveToReady([createJob({ id: "1", status: "ready" })])).toBe( + expect(canMoveToReady([createJob({ id: "1", status: "ready" })])).toBe( false, ); expect( - canBulkRescore([ + canRescore([ createJob({ id: "1", status: "discovered" }), createJob({ id: "2", status: "ready" }), createJob({ id: "3", status: "applied" }), @@ -40,15 +38,15 @@ describe("bulkActions", () => { ]), ).toBe(true); expect( - canBulkRescore([ + canRescore([ createJob({ id: "1", status: "ready" }), createJob({ id: "2", status: "processing" }), ]), ).toBe(false); }); - it("extracts failed job ids from a bulk response", () => { - const response: BulkJobActionResponse = { + it("extracts failed job ids from an action response", () => { + const response: JobActionResponse = { action: "skip", requested: 3, succeeded: 1, diff --git a/orchestrator/src/client/pages/orchestrator/bulkActions.ts b/orchestrator/src/client/pages/orchestrator/jobActions.ts similarity index 57% rename from orchestrator/src/client/pages/orchestrator/bulkActions.ts rename to orchestrator/src/client/pages/orchestrator/jobActions.ts index 71417f4..9a9d0e8 100644 --- a/orchestrator/src/client/pages/orchestrator/bulkActions.ts +++ b/orchestrator/src/client/pages/orchestrator/jobActions.ts @@ -1,22 +1,22 @@ -import type { BulkJobActionResponse, JobListItem } from "@shared/types"; +import type { JobActionResponse, JobListItem } from "@shared/types"; const SKIPPABLE_STATUSES = new Set(["discovered", "ready"]); -export function canBulkSkip(jobs: JobListItem[]): boolean { +export function canSkip(jobs: JobListItem[]): boolean { return ( jobs.length > 0 && jobs.every((job) => SKIPPABLE_STATUSES.has(job.status)) ); } -export function canBulkMoveToReady(jobs: JobListItem[]): boolean { +export function canMoveToReady(jobs: JobListItem[]): boolean { return jobs.length > 0 && jobs.every((job) => job.status === "discovered"); } -export function canBulkRescore(jobs: JobListItem[]): boolean { +export function canRescore(jobs: JobListItem[]): boolean { return jobs.length > 0 && jobs.every((job) => job.status !== "processing"); } -export function getFailedJobIds(response: BulkJobActionResponse): Set { +export function getFailedJobIds(response: JobActionResponse): Set { const failedIds = response.results .filter((result) => !result.ok) .map((result) => result.jobId); diff --git a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.test.ts b/orchestrator/src/client/pages/orchestrator/useJobSelectionActions.test.ts similarity index 83% rename from orchestrator/src/client/pages/orchestrator/useBulkJobSelection.test.ts rename to orchestrator/src/client/pages/orchestrator/useJobSelectionActions.test.ts index a845ebd..665580f 100644 --- a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.test.ts +++ b/orchestrator/src/client/pages/orchestrator/useJobSelectionActions.test.ts @@ -1,16 +1,13 @@ import { createJob } from "@shared/testing/factories.js"; -import type { - BulkJobActionResponse, - BulkJobActionStreamEvent, -} from "@shared/types.js"; +import type { JobActionResponse, JobActionStreamEvent } from "@shared/types.js"; import { act, renderHook, waitFor } from "@testing-library/react"; import { toast } from "sonner"; import { beforeEach, describe, expect, it, vi } from "vitest"; import * as api from "../../api"; -import { useBulkJobSelection } from "./useBulkJobSelection"; +import { useJobSelectionActions } from "./useJobSelectionActions"; vi.mock("../../api", () => ({ - streamBulkJobAction: vi.fn(), + streamJobAction: vi.fn(), })); vi.mock("sonner", () => ({ @@ -36,10 +33,10 @@ const deferred = (): Deferred => { }; const asStreamEvents = ( - response: BulkJobActionResponse, - requestId = "req-bulk", -): BulkJobActionStreamEvent[] => { - const events: BulkJobActionStreamEvent[] = [ + response: JobActionResponse, + requestId = "req-action", +): JobActionStreamEvent[] => { + const events: JobActionStreamEvent[] = [ { type: "started", action: response.action, @@ -82,11 +79,11 @@ const asStreamEvents = ( return events; }; -const mockStreamBulkAction = ( - response: BulkJobActionResponse, +const mockStreamJobAction = ( + response: JobActionResponse, waitForRelease?: Promise, ) => { - vi.mocked(api.streamBulkJobAction).mockImplementation( + vi.mocked(api.streamJobAction).mockImplementation( async (_input, handlers) => { for (const event of asStreamEvents(response)) { if (event.type === "started") handlers.onEvent(event); @@ -100,10 +97,10 @@ const mockStreamBulkAction = ( ); }; -describe("useBulkJobSelection", () => { +describe("useJobSelectionActions", () => { beforeEach(() => { vi.clearAllMocks(); - vi.mocked(toast.loading).mockReturnValue("bulk-progress-toast"); + vi.mocked(toast.loading).mockReturnValue("job-progress-toast"); }); it("caps select-all to the API max", () => { @@ -112,7 +109,7 @@ describe("useBulkJobSelection", () => { ); const loadJobs = vi.fn().mockResolvedValue(undefined); const { result } = renderHook(() => - useBulkJobSelection({ + useJobSelectionActions({ activeJobs, activeTab: "discovered", loadJobs, @@ -126,13 +123,13 @@ describe("useBulkJobSelection", () => { expect(result.current.selectedJobIds.size).toBe(100); }); - it("does not send bulk requests above the max selection size", async () => { + it("does not send action requests above the max selection size", async () => { const activeJobs = Array.from({ length: 101 }, (_, index) => createJob({ id: `job-${index + 1}`, status: "discovered" }), ); const loadJobs = vi.fn().mockResolvedValue(undefined); const { result } = renderHook(() => - useBulkJobSelection({ + useJobSelectionActions({ activeJobs, activeTab: "discovered", loadJobs, @@ -146,10 +143,10 @@ describe("useBulkJobSelection", () => { }); await act(async () => { - await result.current.runBulkAction("skip"); + await result.current.runJobAction("skip"); }); - expect(api.streamBulkJobAction).not.toHaveBeenCalled(); + expect(api.streamJobAction).not.toHaveBeenCalled(); }); it("reconciles failures with selection changes made during in-flight action", async () => { @@ -160,7 +157,7 @@ describe("useBulkJobSelection", () => { ]; const loadJobs = vi.fn().mockResolvedValue(undefined); const release = deferred(); - mockStreamBulkAction( + mockStreamJobAction( { action: "skip", requested: 2, @@ -183,7 +180,7 @@ describe("useBulkJobSelection", () => { ); const { result } = renderHook(() => - useBulkJobSelection({ + useJobSelectionActions({ activeJobs, activeTab: "discovered", loadJobs, @@ -197,7 +194,7 @@ describe("useBulkJobSelection", () => { let runPromise: Promise; await act(async () => { - runPromise = result.current.runBulkAction("skip"); + runPromise = result.current.runJobAction("skip"); }); expect(toast.loading).toHaveBeenCalled(); @@ -220,13 +217,13 @@ describe("useBulkJobSelection", () => { expect(toast.dismiss).toHaveBeenCalled(); }); - it("runs bulk rescore and reports success copy", async () => { + it("runs rescore and reports success copy", async () => { const activeJobs = [ createJob({ id: "job-1", status: "ready" }), createJob({ id: "job-2", status: "ready" }), ]; const loadJobs = vi.fn().mockResolvedValue(undefined); - mockStreamBulkAction({ + mockStreamJobAction({ action: "rescore", requested: 2, succeeded: 2, @@ -246,7 +243,7 @@ describe("useBulkJobSelection", () => { }); const { result } = renderHook(() => - useBulkJobSelection({ + useJobSelectionActions({ activeJobs, activeTab: "ready", loadJobs, @@ -259,10 +256,10 @@ describe("useBulkJobSelection", () => { }); await act(async () => { - await result.current.runBulkAction("rescore"); + await result.current.runJobAction("rescore"); }); - expect(api.streamBulkJobAction).toHaveBeenCalledWith( + expect(api.streamJobAction).toHaveBeenCalledWith( { action: "rescore", jobIds: ["job-1", "job-2"] }, expect.objectContaining({ onEvent: expect.any(Function), diff --git a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.tsx b/orchestrator/src/client/pages/orchestrator/useJobSelectionActions.tsx similarity index 78% rename from orchestrator/src/client/pages/orchestrator/useBulkJobSelection.tsx rename to orchestrator/src/client/pages/orchestrator/useJobSelectionActions.tsx index 5e0c2de..a63a133 100644 --- a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.tsx +++ b/orchestrator/src/client/pages/orchestrator/useJobSelectionActions.tsx @@ -1,51 +1,52 @@ import type { - BulkJobAction, - BulkJobActionResponse, + JobAction, + JobActionResponse, JobListItem, } from "@shared/types.js"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import * as api from "../../api"; -import { BulkActionProgressToast } from "./BulkActionProgressToast"; -import { - canBulkMoveToReady, - canBulkRescore, - canBulkSkip, - getFailedJobIds, -} from "./bulkActions"; import type { FilterTab } from "./constants"; +import { JobActionProgressToast } from "./JobActionProgressToast"; +import { + canMoveToReady, + canRescore, + canSkip, + getFailedJobIds, +} from "./jobActions"; import { clampNumber } from "./utils"; -const MAX_BULK_ACTION_JOB_IDS = 100; +const MAX_JOB_ACTION_JOB_IDS = 100; -const bulkActionLabel: Record = { +const jobActionLabel: Record = { move_to_ready: "Moving jobs to Ready...", skip: "Skipping selected jobs...", rescore: "Calculating match scores...", }; -const bulkActionSuccessLabel: Record = { +const jobActionSuccessLabel: Record = { move_to_ready: "jobs moved to Ready", skip: "jobs skipped", rescore: "matches recalculated", }; -interface UseBulkJobSelectionArgs { +interface UseJobSelectionActionsArgs { activeJobs: JobListItem[]; activeTab: FilterTab; loadJobs: () => Promise; } -export function useBulkJobSelection({ +export function useJobSelectionActions({ activeJobs, activeTab, loadJobs, -}: UseBulkJobSelectionArgs) { +}: UseJobSelectionActionsArgs) { const [selectedJobIds, setSelectedJobIds] = useState>( () => new Set(), ); - const [bulkActionInFlight, setBulkActionInFlight] = - useState(null); + const [jobActionInFlight, setJobActionInFlight] = useState( + null, + ); const previousActiveTabRef = useRef(activeTab); const selectedJobs = useMemo( @@ -53,16 +54,13 @@ export function useBulkJobSelection({ [activeJobs, selectedJobIds], ); - const canSkipSelected = useMemo( - () => canBulkSkip(selectedJobs), - [selectedJobs], - ); + const canSkipSelected = useMemo(() => canSkip(selectedJobs), [selectedJobs]); const canMoveSelected = useMemo( - () => canBulkMoveToReady(selectedJobs), + () => canMoveToReady(selectedJobs), [selectedJobs], ); const canRescoreSelected = useMemo( - () => canBulkRescore(selectedJobs), + () => canRescore(selectedJobs), [selectedJobs], ); @@ -100,13 +98,13 @@ export function useBulkJobSelection({ setSelectedJobIds(() => { if (!checked) return new Set(); const allIds = activeJobs.map((job) => job.id); - if (allIds.length <= MAX_BULK_ACTION_JOB_IDS) { + if (allIds.length <= MAX_JOB_ACTION_JOB_IDS) { return new Set(allIds); } toast.error( - `Select all is limited to ${MAX_BULK_ACTION_JOB_IDS} jobs per action.`, + `Select all is limited to ${MAX_JOB_ACTION_JOB_IDS} jobs per action.`, ); - return new Set(allIds.slice(0, MAX_BULK_ACTION_JOB_IDS)); + return new Set(allIds.slice(0, MAX_JOB_ACTION_JOB_IDS)); }); }, [activeJobs], @@ -116,20 +114,20 @@ export function useBulkJobSelection({ setSelectedJobIds(new Set()); }, []); - const runBulkAction = useCallback( - async (action: BulkJobAction) => { + const runJobAction = useCallback( + async (action: JobAction) => { const selectedAtStart = Array.from(selectedJobIds); if (selectedAtStart.length === 0) return; - if (selectedAtStart.length > MAX_BULK_ACTION_JOB_IDS) { + if (selectedAtStart.length > MAX_JOB_ACTION_JOB_IDS) { toast.error( - `You can run bulk actions on up to ${MAX_BULK_ACTION_JOB_IDS} jobs at a time.`, + `You can run job actions on up to ${MAX_JOB_ACTION_JOB_IDS} jobs at a time.`, ); return; } const selectedAtStartSet = new Set(selectedAtStart); let progressToastId: string | number | undefined; - let finalResult: BulkJobActionResponse | null = null; + let finalResult: JobActionResponse | null = null; let streamError: string | null = null; let latestProgress = { requested: selectedAtStart.length, @@ -145,13 +143,13 @@ export function useBulkJobSelection({ 0, safeRequested, ); - return `${safeCompleted}/${safeRequested} ${bulkActionLabel[action]}`; + return `${safeCompleted}/${safeRequested} ${jobActionLabel[action]}`; }; const upsertProgressToast = () => { progressToastId = toast.loading(getProgressTitle(), { description: ( - { if (event.type === "error") { - streamError = event.message || "Failed to run bulk action"; + streamError = event.message || "Failed to run job action"; return; } @@ -223,12 +221,12 @@ export function useBulkJobSelection({ } if (!finalResult) { - throw new Error("Bulk action stream ended before completion"); + throw new Error("Job action stream ended before completion"); } - const result = finalResult as BulkJobActionResponse; + const result = finalResult as JobActionResponse; const failedIds = getFailedJobIds(result); - const successLabel = bulkActionSuccessLabel[action]; + const successLabel = jobActionSuccessLabel[action]; if (result.failed === 0) { toast.success(`${result.succeeded} ${successLabel}`); @@ -255,13 +253,13 @@ export function useBulkJobSelection({ }); } catch (error) { const message = - error instanceof Error ? error.message : "Failed to run bulk action"; + error instanceof Error ? error.message : "Failed to run job action"; toast.error(message); } finally { if (progressToastId !== undefined) { toast.dismiss(progressToastId); } - setBulkActionInFlight(null); + setJobActionInFlight(null); } }, [selectedJobIds, loadJobs], @@ -272,10 +270,10 @@ export function useBulkJobSelection({ canSkipSelected, canMoveSelected, canRescoreSelected, - bulkActionInFlight, + jobActionInFlight, toggleSelectJob, toggleSelectAll, clearSelection, - runBulkAction, + runJobAction, }; } diff --git a/orchestrator/src/server/api/routes/jobs.test.ts b/orchestrator/src/server/api/routes/jobs.test.ts index 56d7c10..be871cf 100644 --- a/orchestrator/src/server/api/routes/jobs.test.ts +++ b/orchestrator/src/server/api/routes/jobs.test.ts @@ -396,11 +396,15 @@ describe.sequential("Jobs API routes", () => { expect(patchBody.data.suitabilityScore).toBe(77); expect(typeof patchBody.meta.requestId).toBe("string"); - const skipRes = await fetch(`${baseUrl}/api/jobs/${job.id}/skip`, { + const skipRes = await fetch(`${baseUrl}/api/jobs/actions`, { method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ action: "skip", jobIds: [job.id] }), }); const skipBody = await skipRes.json(); - expect(skipBody.data.status).toBe("skipped"); + expect(skipBody.data.results).toHaveLength(1); + expect(skipBody.data.results[0].ok).toBe(true); + expect(skipBody.data.results[0].job.status).toBe("skipped"); const deleteRes = await fetch(`${baseUrl}/api/jobs/status/skipped`, { method: "DELETE", @@ -409,34 +413,34 @@ describe.sequential("Jobs API routes", () => { expect(deleteBody.data.count).toBe(1); }); - it("runs bulk skip with partial failures", async () => { + it("runs skip action with partial failures", async () => { const { createJob } = await import("../../repositories/jobs"); const discovered = await createJob({ source: "manual", title: "Discovered Role", employer: "Acme", - jobUrl: "https://example.com/job/bulk-discovered", + jobUrl: "https://example.com/job/action-discovered", jobDescription: "Test description", }); const ready = await createJob({ source: "manual", title: "Ready Role", employer: "Beta", - jobUrl: "https://example.com/job/bulk-ready", + jobUrl: "https://example.com/job/action-ready", jobDescription: "Test description", }); const applied = await createJob({ source: "manual", title: "Applied Role", employer: "Gamma", - jobUrl: "https://example.com/job/bulk-applied", + jobUrl: "https://example.com/job/action-applied", jobDescription: "Test description", }); const { updateJob } = await import("../../repositories/jobs"); await updateJob(ready.id, { status: "ready" }); await updateJob(applied.id, { status: "applied" }); - const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, { + const res = await fetch(`${baseUrl}/api/jobs/actions`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ @@ -460,45 +464,92 @@ describe.sequential("Jobs API routes", () => { ]); }); - it("runs bulk move_to_ready and rejects ineligible statuses", async () => { + it("runs move_to_ready action and rejects ineligible statuses", async () => { const { createJob, updateJob } = await import("../../repositories/jobs"); const discovered = await createJob({ source: "manual", title: "New Role", employer: "Acme", - jobUrl: "https://example.com/job/bulk-ready-1", + jobUrl: "https://example.com/job/action-ready-1", jobDescription: "Test description", }); const ready = await createJob({ source: "manual", title: "Already Ready", employer: "Acme", - jobUrl: "https://example.com/job/bulk-ready-2", + jobUrl: "https://example.com/job/action-ready-2", jobDescription: "Test description", }); await updateJob(ready.id, { status: "ready" }); const { processJob } = await import("../../pipeline/index"); + const previousBaseUrl = process.env.JOBOPS_PUBLIC_BASE_URL; + process.env.JOBOPS_PUBLIC_BASE_URL = "https://canonical.jobops.example"; - const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - action: "move_to_ready", - jobIds: [discovered.id, ready.id], - }), - }); - const body = await res.json(); + try { + const res = await fetch(`${baseUrl}/api/jobs/actions`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + action: "move_to_ready", + jobIds: [discovered.id, ready.id], + }), + }); + const body = await res.json(); - expect(body.ok).toBe(true); - expect(body.data.succeeded).toBe(1); - expect(body.data.failed).toBe(1); - expect(vi.mocked(processJob)).toHaveBeenCalledWith(discovered.id); - expect( - body.data.results.find((r: any) => r.jobId === ready.id).error.code, - ).toBe("INVALID_REQUEST"); + expect(body.ok).toBe(true); + expect(body.data.succeeded).toBe(1); + expect(body.data.failed).toBe(1); + expect(vi.mocked(processJob)).toHaveBeenCalledWith(discovered.id, { + force: false, + requestOrigin: "https://canonical.jobops.example", + }); + expect( + body.data.results.find((r: any) => r.jobId === ready.id).error.code, + ).toBe("INVALID_REQUEST"); + } finally { + if (previousBaseUrl === undefined) { + delete process.env.JOBOPS_PUBLIC_BASE_URL; + } else { + process.env.JOBOPS_PUBLIC_BASE_URL = previousBaseUrl; + } + } }); - it("runs bulk rescore with partial failures", async () => { + it("supports legacy move_to_ready endpoint", async () => { + const { createJob } = await import("../../repositories/jobs"); + const { processJob } = await import("../../pipeline/index"); + const job = await createJob({ + source: "manual", + title: "Legacy Ready Route", + employer: "Acme", + jobUrl: "https://example.com/job/legacy-process-1", + jobDescription: "Test description", + }); + + const previousBaseUrl = process.env.JOBOPS_PUBLIC_BASE_URL; + process.env.JOBOPS_PUBLIC_BASE_URL = "https://canonical.jobops.example"; + try { + const res = await fetch(`${baseUrl}/api/jobs/${job.id}/process`, { + method: "POST", + }); + const body = await res.json(); + + expect(res.status).toBe(200); + expect(body.ok).toBe(true); + expect(vi.mocked(processJob)).toHaveBeenCalledWith(job.id, { + force: false, + requestOrigin: "https://canonical.jobops.example", + }); + } finally { + if (previousBaseUrl === undefined) { + delete process.env.JOBOPS_PUBLIC_BASE_URL; + } else { + process.env.JOBOPS_PUBLIC_BASE_URL = previousBaseUrl; + } + } + }); + + it("runs rescore action with partial failures", async () => { const { createJob, updateJob } = await import("../../repositories/jobs"); const { scoreJobSuitability } = await import("../../services/scorer"); const { getProfile } = await import("../../services/profile"); @@ -506,34 +557,34 @@ describe.sequential("Jobs API routes", () => { vi.mocked(getProfile).mockResolvedValue({}); vi.mocked(scoreJobSuitability).mockResolvedValue({ score: 81, - reason: "Updated fit from bulk rescore", + reason: "Updated fit from action rescore", }); const discovered = await createJob({ source: "manual", title: "Discovered Role", employer: "Acme", - jobUrl: "https://example.com/job/bulk-rescore-1", + jobUrl: "https://example.com/job/action-rescore-1", jobDescription: "Test description", }); const ready = await createJob({ source: "manual", title: "Ready Role", employer: "Beta", - jobUrl: "https://example.com/job/bulk-rescore-2", + jobUrl: "https://example.com/job/action-rescore-2", jobDescription: "Test description", }); const processing = await createJob({ source: "manual", title: "Processing Role", employer: "Gamma", - jobUrl: "https://example.com/job/bulk-rescore-3", + jobUrl: "https://example.com/job/action-rescore-3", jobDescription: "Test description", }); await updateJob(ready.id, { status: "ready" }); await updateJob(processing.id, { status: "processing" }); - const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, { + const res = await fetch(`${baseUrl}/api/jobs/actions`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ @@ -566,33 +617,33 @@ describe.sequential("Jobs API routes", () => { expect(vi.mocked(getProfile)).toHaveBeenCalledTimes(1); }); - it("streams bulk action progress with done counters", async () => { + it("streams job action progress with done counters", async () => { const { createJob, updateJob } = await import("../../repositories/jobs"); const discovered = await createJob({ source: "manual", title: "Discovered Role", employer: "Acme", - jobUrl: "https://example.com/job/bulk-stream-1", + jobUrl: "https://example.com/job/action-stream-1", jobDescription: "Test description", }); const ready = await createJob({ source: "manual", title: "Ready Role", employer: "Beta", - jobUrl: "https://example.com/job/bulk-stream-2", + jobUrl: "https://example.com/job/action-stream-2", jobDescription: "Test description", }); const applied = await createJob({ source: "manual", title: "Applied Role", employer: "Gamma", - jobUrl: "https://example.com/job/bulk-stream-3", + jobUrl: "https://example.com/job/action-stream-3", jobDescription: "Test description", }); await updateJob(ready.id, { status: "ready" }); await updateJob(applied.id, { status: "applied" }); - const res = await fetch(`${baseUrl}/api/jobs/bulk-actions/stream`, { + const res = await fetch(`${baseUrl}/api/jobs/actions/stream`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ @@ -655,12 +706,12 @@ describe.sequential("Jobs API routes", () => { expect(events.at(-1)?.failed).toBe(1); }); - it("validates bulk action payloads", async () => { + it("validates job action payloads", async () => { const tooManyIds = Array.from( { length: 101 }, (_, index) => `job-${index}`, ); - const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, { + const res = await fetch(`${baseUrl}/api/jobs/actions`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ @@ -719,14 +770,18 @@ describe.sequential("Jobs API routes", () => { suitabilityReason: "Old fit", }); - const res = await fetch(`${baseUrl}/api/jobs/${job.id}/rescore`, { + const res = await fetch(`${baseUrl}/api/jobs/actions`, { method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ action: "rescore", jobIds: [job.id] }), }); const body = await res.json(); expect(body.ok).toBe(true); - expect(body.data.suitabilityScore).toBe(77); - expect(body.data.suitabilityReason).toBe("Updated fit"); + expect(body.data.results).toHaveLength(1); + expect(body.data.results[0].ok).toBe(true); + expect(body.data.results[0].job.suitabilityScore).toBe(77); + expect(body.data.results[0].job.suitabilityReason).toBe("Updated fit"); }); it("deletes jobs below a score threshold (excluding applied)", async () => { diff --git a/orchestrator/src/server/api/routes/jobs.ts b/orchestrator/src/server/api/routes/jobs.ts index 7083404..f923fd9 100644 --- a/orchestrator/src/server/api/routes/jobs.ts +++ b/orchestrator/src/server/api/routes/jobs.ts @@ -5,11 +5,11 @@ import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse"; import { APPLICATION_OUTCOMES, APPLICATION_STAGES, - type BulkJobAction, - type BulkJobActionResponse, - type BulkJobActionResult, - type BulkJobActionStreamEvent, type Job, + type JobAction, + type JobActionResponse, + type JobActionResult, + type JobActionStreamEvent, type JobListItem, type JobStatus, type JobsListResponse, @@ -18,7 +18,12 @@ import { import { type Request, type Response, Router } from "express"; import { z } from "zod"; import { isDemoMode, sendDemoBlocked } from "../../config/demo"; -import { AppError, badRequest, conflict } from "../../infra/errors"; +import { + AppError, + type AppErrorCode, + badRequest, + conflict, +} from "../../infra/errors"; import { generateFinalPdf, processJob, @@ -48,7 +53,7 @@ import * as visaSponsors from "../../services/visa-sponsors/index"; import { asyncPool } from "../../utils/async-pool"; export const jobsRouter = Router(); -const BULK_ACTION_CONCURRENCY = 4; +const JOB_ACTION_CONCURRENCY = 4; const tailoredSkillsPayloadSchema = z.array( z.object({ @@ -195,10 +200,25 @@ const updateOutcomeSchema = z.object({ closedAt: z.number().int().nullable().optional(), }); -const bulkActionRequestSchema = z.object({ - action: z.enum(["skip", "move_to_ready", "rescore"]), - jobIds: z.array(z.string().min(1)).min(1).max(100), -}); +const jobActionRequestSchema = z.discriminatedUnion("action", [ + z.object({ + action: z.literal("skip"), + jobIds: z.array(z.string().min(1)).min(1).max(100), + }), + z.object({ + action: z.literal("rescore"), + jobIds: z.array(z.string().min(1)).min(1).max(100), + }), + z.object({ + action: z.literal("move_to_ready"), + jobIds: z.array(z.string().min(1)).min(1).max(100), + options: z + .object({ + force: z.boolean().optional(), + }) + .optional(), + }), +]); const listJobsQuerySchema = z.object({ status: z.string().optional(), @@ -277,11 +297,15 @@ function mapErrorForResult(error: unknown): { }; } -type BulkExecutionOptions = { +type JobActionExecutionOptions = { getProfileForRescore?: () => Promise>; + forceMoveToReady?: boolean; + requestOrigin?: string | null; }; -function createBulkProfileLoader(): () => Promise> { +function createSharedRescoreProfileLoader(): () => Promise< + Record +> { let profilePromise: Promise> | null = null; return async () => { @@ -302,11 +326,11 @@ function createBulkProfileLoader(): () => Promise> { }; } -async function executeBulkActionForJob( - action: BulkJobAction, +async function executeJobActionForJob( + action: JobAction, jobId: string, - options?: BulkExecutionOptions, -): Promise { + options?: JobActionExecutionOptions, +): Promise { try { const job = await jobsRepo.getJobById(jobId); if (!job) { @@ -350,13 +374,29 @@ async function executeBulkActionForJob( ); } - const processed = await processJob(jobId); - if (!processed.success) { - throw new AppError({ - status: 500, - code: "INTERNAL_ERROR", - message: processed.error || "Failed to process job", + if (isDemoMode()) { + const simulated = await simulateProcessJob(jobId, { + force: options?.forceMoveToReady ?? false, }); + if (!simulated.success) { + throw new AppError({ + status: 500, + code: "INTERNAL_ERROR", + message: simulated.error || "Failed to process job", + }); + } + } else { + const processed = await processJob(jobId, { + force: options?.forceMoveToReady ?? false, + requestOrigin: options?.requestOrigin ?? null, + }); + if (!processed.success) { + throw new AppError({ + status: 500, + code: "INTERNAL_ERROR", + message: processed.error || "Failed to process job", + }); + } } const updated = await jobsRepo.getJobById(jobId); @@ -426,6 +466,32 @@ async function executeBulkActionForJob( } } +function mapJobActionFailure( + failure: Extract, +): AppError { + const statusByCode: Record = { + INVALID_REQUEST: 400, + UNAUTHORIZED: 401, + FORBIDDEN: 403, + NOT_FOUND: 404, + REQUEST_TIMEOUT: 408, + CONFLICT: 409, + UNPROCESSABLE_ENTITY: 422, + SERVICE_UNAVAILABLE: 503, + UPSTREAM_ERROR: 502, + INTERNAL_ERROR: 500, + }; + const code = ( + failure.error.code in statusByCode ? failure.error.code : "INTERNAL_ERROR" + ) as AppErrorCode; + + return new AppError({ + status: statusByCode[code], + code, + message: failure.error.message, + }); +} + /** * GET /api/jobs - List all jobs * Query params: status (comma-separated list of statuses to filter) @@ -532,27 +598,34 @@ jobsRouter.get("/revision", async (req: Request, res: Response) => { }); /** - * POST /api/jobs/bulk-actions - Run a bulk action across selected jobs + * POST /api/jobs/actions - Run a job action across selected jobs */ -jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => { +jobsRouter.post("/actions", async (req: Request, res: Response) => { try { - const parsed = bulkActionRequestSchema.parse(req.body); + const parsed = jobActionRequestSchema.parse(req.body); const dedupedJobIds = Array.from(new Set(parsed.jobIds)); - const executionOptions: BulkExecutionOptions = - parsed.action === "rescore" && !isDemoMode() - ? { getProfileForRescore: createBulkProfileLoader() } - : {}; + const requestOrigin = resolveRequestOrigin(req); + const executionOptions: JobActionExecutionOptions = { + ...(parsed.action === "rescore" && !isDemoMode() + ? { getProfileForRescore: createSharedRescoreProfileLoader() } + : {}), + ...(parsed.action === "move_to_ready" && + parsed.options?.force !== undefined + ? { forceMoveToReady: parsed.options.force } + : {}), + ...(parsed.action === "move_to_ready" ? { requestOrigin } : {}), + }; const results = await asyncPool({ items: dedupedJobIds, - concurrency: BULK_ACTION_CONCURRENCY, + concurrency: JOB_ACTION_CONCURRENCY, task: async (jobId) => - executeBulkActionForJob(parsed.action, jobId, executionOptions), + executeJobActionForJob(parsed.action, jobId, executionOptions), }); const succeeded = results.filter((result) => result.ok).length; const failed = results.length - succeeded; - const payload: BulkJobActionResponse = { + const payload: JobActionResponse = { action: parsed.action, requested: dedupedJobIds.length, succeeded, @@ -560,20 +633,20 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => { results, }; - logger.info("Bulk job action completed", { - route: "POST /api/jobs/bulk-actions", + logger.info("Job action completed", { + route: "POST /api/jobs/actions", action: parsed.action, requested: dedupedJobIds.length, succeeded, failed, - concurrency: BULK_ACTION_CONCURRENCY, + concurrency: JOB_ACTION_CONCURRENCY, }); ok(res, payload); } catch (error) { const err = error instanceof z.ZodError - ? badRequest("Invalid bulk action request", error.flatten()) + ? badRequest("Invalid job action request", error.flatten()) : error instanceof AppError ? error : new AppError({ @@ -582,8 +655,8 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => { message: error instanceof Error ? error.message : "Unknown error", }); - logger.error("Bulk job action failed", { - route: "POST /api/jobs/bulk-actions", + logger.error("Job action failed", { + route: "POST /api/jobs/actions", status: err.status, code: err.code, details: err.details, @@ -594,26 +667,32 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => { }); /** - * POST /api/jobs/bulk-actions/stream - Run a bulk action and stream per-job progress via SSE + * POST /api/jobs/actions/stream - Run a job action and stream per-job progress via SSE */ -jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { - const parsed = bulkActionRequestSchema.safeParse(req.body); +jobsRouter.post("/actions/stream", async (req: Request, res: Response) => { + const parsed = jobActionRequestSchema.safeParse(req.body); if (!parsed.success) { return fail( res, - badRequest("Invalid bulk action request", parsed.error.flatten()), + badRequest("Invalid job action request", parsed.error.flatten()), ); } const dedupedJobIds = Array.from(new Set(parsed.data.jobIds)); + const requestOrigin = resolveRequestOrigin(req); const requestId = String(res.getHeader("x-request-id") || "unknown"); const action = parsed.data.action; - const executionOptions: BulkExecutionOptions = - action === "rescore" && !isDemoMode() - ? { getProfileForRescore: createBulkProfileLoader() } - : {}; + const executionOptions: JobActionExecutionOptions = { + ...(action === "rescore" && !isDemoMode() + ? { getProfileForRescore: createSharedRescoreProfileLoader() } + : {}), + ...(action === "move_to_ready" && parsed.data.options?.force !== undefined + ? { forceMoveToReady: parsed.data.options.force } + : {}), + ...(action === "move_to_ready" ? { requestOrigin } : {}), + }; const requested = dedupedJobIds.length; - const results: BulkJobActionResult[] = []; + const results: JobActionResult[] = []; let succeeded = 0; let failed = 0; @@ -633,7 +712,7 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { const isResponseWritable = () => !clientDisconnected && !res.writableEnded && !res.destroyed; - const sendEvent = (event: BulkJobActionStreamEvent) => { + const sendEvent = (event: JobActionStreamEvent) => { if (!isResponseWritable()) return false; writeSseData(res, event); return true; @@ -651,8 +730,8 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { requestId, }) ) { - logger.info("Client disconnected before bulk stream started", { - route: "POST /api/jobs/bulk-actions/stream", + logger.info("Client disconnected before action stream started", { + route: "POST /api/jobs/actions/stream", action, requested, succeeded, @@ -664,12 +743,12 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { await asyncPool({ items: dedupedJobIds, - concurrency: BULK_ACTION_CONCURRENCY, + concurrency: JOB_ACTION_CONCURRENCY, shouldStop: () => !isResponseWritable(), task: async (jobId) => { if (!isResponseWritable()) return; - const result = await executeBulkActionForJob( + const result = await executeJobActionForJob( action, jobId, executionOptions, @@ -691,9 +770,9 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { }) ) { logger.info( - "Client disconnected while writing bulk stream progress", + "Client disconnected while writing action stream progress", { - route: "POST /api/jobs/bulk-actions/stream", + route: "POST /api/jobs/actions/stream", action, requested, succeeded, @@ -716,13 +795,13 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { requestId, }); - logger.info("Bulk job action stream completed", { - route: "POST /api/jobs/bulk-actions/stream", + logger.info("Job action stream completed", { + route: "POST /api/jobs/actions/stream", action, requested, succeeded, failed, - concurrency: BULK_ACTION_CONCURRENCY, + concurrency: JOB_ACTION_CONCURRENCY, requestId, }); } catch (error) { @@ -735,8 +814,8 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { message: error instanceof Error ? error.message : "Unknown error", }); - logger.error("Bulk job action stream failed", { - route: "POST /api/jobs/bulk-actions/stream", + logger.error("Job action stream failed", { + route: "POST /api/jobs/actions/stream", action, requested, succeeded, @@ -755,7 +834,7 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { }) ) { logger.info("Skipping stream error event because client disconnected", { - route: "POST /api/jobs/bulk-actions/stream", + route: "POST /api/jobs/actions/stream", action, requested, succeeded, @@ -771,6 +850,33 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { } }); +jobsRouter.post("/:id/process", async (req: Request, res: Response) => { + const forceRaw = req.query.force as string | undefined; + const force = forceRaw === "1" || forceRaw === "true"; + const result = await executeJobActionForJob("move_to_ready", req.params.id, { + forceMoveToReady: force, + requestOrigin: resolveRequestOrigin(req), + }); + if (!result.ok) return fail(res, mapJobActionFailure(result)); + ok(res, result.job); +}); + +jobsRouter.post("/:id/skip", async (req: Request, res: Response) => { + const result = await executeJobActionForJob("skip", req.params.id); + if (!result.ok) return fail(res, mapJobActionFailure(result)); + ok(res, result.job); +}); + +jobsRouter.post("/:id/rescore", async (req: Request, res: Response) => { + const result = await executeJobActionForJob("rescore", req.params.id, { + ...(isDemoMode() + ? {} + : { getProfileForRescore: createSharedRescoreProfileLoader() }), + }); + if (!result.ok) return fail(res, mapJobActionFailure(result)); + ok(res, result.job); +}); + /** * GET /api/jobs/:id - Get a single job */ @@ -1039,54 +1145,6 @@ jobsRouter.post("/:id/summarize", async (req: Request, res: Response) => { } }); -/** - * POST /api/jobs/:id/rescore - Regenerate suitability score + reason - */ -jobsRouter.post("/:id/rescore", async (req: Request, res: Response) => { - try { - if (isDemoMode()) { - const simulatedJob = await simulateRescoreJob(req.params.id); - return okWithMeta(res, simulatedJob, { simulated: true }); - } - - const job = await jobsRepo.getJobById(req.params.id); - - if (!job) { - return res.status(404).json({ success: false, error: "Job not found" }); - } - - const rawProfile = await getProfile(); - if ( - !rawProfile || - typeof rawProfile !== "object" || - Array.isArray(rawProfile) - ) { - return res - .status(400) - .json({ success: false, error: "Invalid resume profile format" }); - } - - const { score, reason } = await scoreJobSuitability( - job, - rawProfile as Record, - ); - - const updatedJob = await jobsRepo.updateJob(job.id, { - suitabilityScore: score, - suitabilityReason: reason, - }); - - if (!updatedJob) { - return res.status(404).json({ success: false, error: "Job not found" }); - } - - res.json({ success: true, data: updatedJob }); - } catch (error) { - const message = error instanceof Error ? error.message : "Unknown error"; - res.status(500).json({ success: false, error: message }); - } -}); - /** * POST /api/jobs/:id/check-sponsor - Check if employer is a visa sponsor */ @@ -1166,43 +1224,6 @@ jobsRouter.post("/:id/generate-pdf", async (req: Request, res: Response) => { } }); -/** - * POST /api/jobs/:id/process - Process a single job (generate summary + PDF) - */ -jobsRouter.post("/:id/process", async (req: Request, res: Response) => { - try { - const forceRaw = req.query.force as string | undefined; - const force = forceRaw === "1" || forceRaw === "true"; - - if (isDemoMode()) { - const result = await simulateProcessJob(req.params.id, { force }); - if (!result.success) { - return res.status(400).json({ success: false, error: result.error }); - } - const job = await jobsRepo.getJobById(req.params.id); - if (!job) { - return res.status(404).json({ success: false, error: "Job not found" }); - } - return okWithMeta(res, job, { simulated: true }); - } - - const result = await processJob(req.params.id, { - force, - requestOrigin: resolveRequestOrigin(req), - }); - - if (!result.success) { - return res.status(400).json({ success: false, error: result.error }); - } - - const job = await jobsRepo.getJobById(req.params.id); - res.json({ success: true, data: job }); - } catch (error) { - const message = error instanceof Error ? error.message : "Unknown error"; - res.status(500).json({ success: false, error: message }); - } -}); - /** * POST /api/jobs/:id/apply - Mark a job as applied */ @@ -1255,24 +1276,6 @@ jobsRouter.post("/:id/apply", async (req: Request, res: Response) => { } }); -/** - * POST /api/jobs/:id/skip - Mark a job as skipped - */ -jobsRouter.post("/:id/skip", async (req: Request, res: Response) => { - try { - const job = await jobsRepo.updateJob(req.params.id, { status: "skipped" }); - - if (!job) { - return res.status(404).json({ success: false, error: "Job not found" }); - } - - res.json({ success: true, data: job }); - } catch (error) { - const message = error instanceof Error ? error.message : "Unknown error"; - res.status(500).json({ success: false, error: message }); - } -}); - /** * DELETE /api/jobs/status/:status - Clear jobs with a specific status */ diff --git a/orchestrator/src/server/api/routes/post-application-review.test.ts b/orchestrator/src/server/api/routes/post-application-review.test.ts index 57a3f49..d7764f5 100644 --- a/orchestrator/src/server/api/routes/post-application-review.test.ts +++ b/orchestrator/src/server/api/routes/post-application-review.test.ts @@ -194,7 +194,7 @@ describe.sequential("Post-Application Review Workflow API", () => { it("counts no-suggested-match approve items as skipped, not failed", async () => { await seedPendingMessage({ matchedJobId: null }); - const res = await fetch(`${baseUrl}/api/post-application/inbox/bulk`, { + const res = await fetch(`${baseUrl}/api/post-application/inbox/actions`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ diff --git a/orchestrator/src/server/api/routes/post-application-review.ts b/orchestrator/src/server/api/routes/post-application-review.ts index e76ce44..0894388 100644 --- a/orchestrator/src/server/api/routes/post-application-review.ts +++ b/orchestrator/src/server/api/routes/post-application-review.ts @@ -9,11 +9,11 @@ import { type Request, type Response, Router } from "express"; import { z } from "zod"; import { approvePostApplicationInboxItem, - bulkPostApplicationInboxAction, denyPostApplicationInboxItem, listPostApplicationInbox, listPostApplicationReviewRuns, listPostApplicationRunMessages, + runPostApplicationInboxAction, } from "../../services/post-application/review"; const listQuerySchema = z.object({ @@ -46,7 +46,7 @@ const denyBodySchema = z.object({ decidedBy: z.string().max(255).optional(), }); -const bulkActionBodySchema = z.object({ +const actionBodySchema = z.object({ action: z.enum(["approve", "deny"]), provider: z.enum(POST_APPLICATION_PROVIDERS).default("gmail"), accountKey: z.string().min(1).max(255).default("default"), @@ -179,12 +179,12 @@ postApplicationReviewRouter.post( ); postApplicationReviewRouter.post( - "/inbox/bulk", + "/inbox/actions", asyncRoute(async (req: Request, res: Response) => { try { - const input = bulkActionBodySchema.parse(req.body ?? {}); + const input = actionBodySchema.parse(req.body ?? {}); - const result = await bulkPostApplicationInboxAction({ + const result = await runPostApplicationInboxAction({ action: input.action, provider: input.provider, accountKey: input.accountKey, diff --git a/orchestrator/src/server/basic-auth.test.ts b/orchestrator/src/server/basic-auth.test.ts index 196463a..face46b 100644 --- a/orchestrator/src/server/basic-auth.test.ts +++ b/orchestrator/src/server/basic-auth.test.ts @@ -67,8 +67,10 @@ describe.sequential("Basic Auth read-only enforcement", () => { ({ server, baseUrl } = await startServer()); - const postRes = await fetch(`${baseUrl}/api/jobs/123/skip`, { + const postRes = await fetch(`${baseUrl}/api/jobs/actions`, { method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ action: "skip", jobIds: ["123"] }), }); expect(postRes.status).toBe(401); expect(postRes.headers.get("www-authenticate")).toBeNull(); @@ -93,9 +95,13 @@ describe.sequential("Basic Auth read-only enforcement", () => { ({ server, baseUrl } = await startServer()); const authHeader = buildAuthHeader("user", "pass"); - const res = await fetch(`${baseUrl}/api/jobs/123/skip`, { + const res = await fetch(`${baseUrl}/api/jobs/actions`, { method: "POST", - headers: { Authorization: authHeader }, + headers: { + Authorization: authHeader, + "Content-Type": "application/json", + }, + body: JSON.stringify({ action: "skip", jobIds: ["123"] }), }); expect(res.status).not.toBe(401); @@ -107,7 +113,11 @@ describe.sequential("Basic Auth read-only enforcement", () => { ({ server, baseUrl } = await startServer()); - const res = await fetch(`${baseUrl}/api/jobs/123/skip`, { method: "POST" }); + const res = await fetch(`${baseUrl}/api/jobs/actions`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ action: "skip", jobIds: ["123"] }), + }); expect(res.status).not.toBe(401); }); }); diff --git a/orchestrator/src/server/pipeline/progress.ts b/orchestrator/src/server/pipeline/progress.ts index 3d0ad54..d81d81b 100644 --- a/orchestrator/src/server/pipeline/progress.ts +++ b/orchestrator/src/server/pipeline/progress.ts @@ -89,6 +89,63 @@ const emptyCrawlingStats = { crawlingCurrentUrl: undefined, }; +type SourceCrawlingStats = { + termsProcessed: number; + termsTotal: number; + listPagesProcessed: number; + listPagesTotal: number; + jobCardsFound: number; + jobPagesEnqueued: number; + jobPagesSkipped: number; + jobPagesProcessed: number; +}; + +const emptySourceCrawlingStats = (): SourceCrawlingStats => ({ + termsProcessed: 0, + termsTotal: 0, + listPagesProcessed: 0, + listPagesTotal: 0, + jobCardsFound: 0, + jobPagesEnqueued: 0, + jobPagesSkipped: 0, + jobPagesProcessed: 0, +}); + +const crawlingStatsBySource = new Map(); + +function aggregateCrawlingStats() { + let termsProcessed = 0; + let termsTotal = 0; + let listPagesProcessed = 0; + let listPagesTotal = 0; + let jobCardsFound = 0; + let jobPagesEnqueued = 0; + let jobPagesSkipped = 0; + let jobPagesProcessed = 0; + + for (const stats of crawlingStatsBySource.values()) { + termsProcessed += stats.termsProcessed; + termsTotal += stats.termsTotal; + listPagesProcessed += stats.listPagesProcessed; + listPagesTotal += stats.listPagesTotal; + jobCardsFound += stats.jobCardsFound; + jobPagesEnqueued += stats.jobPagesEnqueued; + jobPagesSkipped += stats.jobPagesSkipped; + jobPagesProcessed += stats.jobPagesProcessed; + } + + return { + termsProcessed, + termsTotal, + listPagesProcessed, + listPagesTotal, + jobCardsFound, + jobPagesEnqueued, + jobPagesSkipped, + jobPagesProcessed, + }; +} + /** * Update the current progress and notify all listeners. */ @@ -131,6 +188,7 @@ export function subscribeToProgress(listener: ProgressListener): () => void { * Reset progress to idle state. */ export function resetProgress(): void { + crawlingStatsBySource.clear(); currentProgress = { step: "idle", message: "Ready", @@ -150,27 +208,38 @@ export function resetProgress(): void { */ export const progressHelpers = { startCrawling: (sourcesTotal = 0) => - updateProgress({ - step: "crawling", - message: "Fetching jobs from sources...", - detail: "Starting crawler", - startedAt: new Date().toISOString(), - crawlingSource: null, - crawlingSourcesCompleted: 0, - crawlingSourcesTotal: sourcesTotal, - ...emptyCrawlingStats, - jobsDiscovered: 0, - jobsScored: 0, - jobsProcessed: 0, - totalToProcess: 0, - }), + (() => { + crawlingStatsBySource.clear(); + updateProgress({ + step: "crawling", + message: "Fetching jobs from sources...", + detail: "Starting crawler", + startedAt: new Date().toISOString(), + crawlingSource: null, + crawlingSourcesCompleted: 0, + crawlingSourcesTotal: sourcesTotal, + ...emptyCrawlingStats, + jobsDiscovered: 0, + jobsScored: 0, + jobsProcessed: 0, + totalToProcess: 0, + }); + })(), startSource: ( source: CrawlSource, sourcesCompleted: number, sourcesTotal: number, options?: { termsTotal?: number; detail?: string }, - ) => + ) => { + const existing = + crawlingStatsBySource.get(source) ?? emptySourceCrawlingStats(); + crawlingStatsBySource.set(source, { + ...emptySourceCrawlingStats(), + termsTotal: options?.termsTotal ?? existing.termsTotal, + }); + const aggregated = aggregateCrawlingStats(); + updateProgress({ step: "crawling", message: `Fetching jobs from ${source}...`, @@ -178,9 +247,18 @@ export const progressHelpers = { crawlingSource: source, crawlingSourcesCompleted: sourcesCompleted, crawlingSourcesTotal: sourcesTotal, - ...emptyCrawlingStats, - crawlingTermsTotal: options?.termsTotal ?? 0, - }), + crawlingTermsProcessed: aggregated.termsProcessed, + crawlingTermsTotal: aggregated.termsTotal, + crawlingListPagesProcessed: aggregated.listPagesProcessed, + crawlingListPagesTotal: aggregated.listPagesTotal, + crawlingJobCardsFound: aggregated.jobCardsFound, + crawlingJobPagesEnqueued: aggregated.jobPagesEnqueued, + crawlingJobPagesSkipped: aggregated.jobPagesSkipped, + crawlingJobPagesProcessed: aggregated.jobPagesProcessed, + crawlingPhase: undefined, + crawlingCurrentUrl: undefined, + }); + }, completeSource: (sourcesCompleted: number, sourcesTotal: number) => updateProgress({ @@ -204,24 +282,52 @@ export const progressHelpers = { currentUrl?: string; }) => { const current = getProgress(); + if (update.source) { + const existing = + crawlingStatsBySource.get(update.source) ?? emptySourceCrawlingStats(); + const nextForSource: SourceCrawlingStats = { + termsProcessed: update.termsProcessed ?? existing.termsProcessed, + termsTotal: update.termsTotal ?? existing.termsTotal, + listPagesProcessed: + update.listPagesProcessed ?? existing.listPagesProcessed, + listPagesTotal: update.listPagesTotal ?? existing.listPagesTotal, + jobCardsFound: update.jobCardsFound ?? existing.jobCardsFound, + jobPagesEnqueued: update.jobPagesEnqueued ?? existing.jobPagesEnqueued, + jobPagesSkipped: update.jobPagesSkipped ?? existing.jobPagesSkipped, + jobPagesProcessed: + update.jobPagesProcessed ?? existing.jobPagesProcessed, + }; + crawlingStatsBySource.set(update.source, nextForSource); + } + + const aggregated = aggregateCrawlingStats(); const next = { ...current, crawlingSource: update.source ?? current.crawlingSource, - crawlingTermsProcessed: - update.termsProcessed ?? current.crawlingTermsProcessed, - crawlingTermsTotal: update.termsTotal ?? current.crawlingTermsTotal, - crawlingListPagesProcessed: - update.listPagesProcessed ?? current.crawlingListPagesProcessed, - crawlingListPagesTotal: - update.listPagesTotal ?? current.crawlingListPagesTotal, - crawlingJobCardsFound: - update.jobCardsFound ?? current.crawlingJobCardsFound, - crawlingJobPagesEnqueued: - update.jobPagesEnqueued ?? current.crawlingJobPagesEnqueued, - crawlingJobPagesSkipped: - update.jobPagesSkipped ?? current.crawlingJobPagesSkipped, - crawlingJobPagesProcessed: - update.jobPagesProcessed ?? current.crawlingJobPagesProcessed, + crawlingTermsProcessed: update.source + ? aggregated.termsProcessed + : (update.termsProcessed ?? current.crawlingTermsProcessed), + crawlingTermsTotal: update.source + ? aggregated.termsTotal + : (update.termsTotal ?? current.crawlingTermsTotal), + crawlingListPagesProcessed: update.source + ? aggregated.listPagesProcessed + : (update.listPagesProcessed ?? current.crawlingListPagesProcessed), + crawlingListPagesTotal: update.source + ? aggregated.listPagesTotal + : (update.listPagesTotal ?? current.crawlingListPagesTotal), + crawlingJobCardsFound: update.source + ? aggregated.jobCardsFound + : (update.jobCardsFound ?? current.crawlingJobCardsFound), + crawlingJobPagesEnqueued: update.source + ? aggregated.jobPagesEnqueued + : (update.jobPagesEnqueued ?? current.crawlingJobPagesEnqueued), + crawlingJobPagesSkipped: update.source + ? aggregated.jobPagesSkipped + : (update.jobPagesSkipped ?? current.crawlingJobPagesSkipped), + crawlingJobPagesProcessed: update.source + ? aggregated.jobPagesProcessed + : (update.jobPagesProcessed ?? current.crawlingJobPagesProcessed), crawlingPhase: update.phase ?? current.crawlingPhase, crawlingCurrentUrl: update.currentUrl ?? current.crawlingCurrentUrl, }; @@ -316,7 +422,6 @@ export const progressHelpers = { step: "processing", message: `Processing job ${index}/${total}...`, detail: `${job.title} @ ${job.employer}`, - jobsProcessed: index - 1, totalToProcess: total, currentJob: job, }), diff --git a/orchestrator/src/server/pipeline/sponsor-matching.test.ts b/orchestrator/src/server/pipeline/sponsor-matching.test.ts index f316048..2b5e873 100644 --- a/orchestrator/src/server/pipeline/sponsor-matching.test.ts +++ b/orchestrator/src/server/pipeline/sponsor-matching.test.ts @@ -25,7 +25,7 @@ vi.mock("../repositories/jobs", () => ({ updateJob: vi.fn(), getUnscoredDiscoveredJobs: vi.fn(), getJobById: vi.fn(), - bulkCreateJobs: vi.fn(), + createJobs: vi.fn(), getAllJobUrls: vi.fn(), })); @@ -77,7 +77,7 @@ describe("Sponsor Match Calculation", () => { let scoreJobSuitability: ReturnType; let updateJob: ReturnType; let getUnscoredDiscoveredJobs: ReturnType; - let bulkCreateJobs: ReturnType; + let createJobs: ReturnType; beforeEach(async () => { vi.clearAllMocks(); @@ -96,11 +96,11 @@ describe("Sponsor Match Calculation", () => { updateJob = jobsRepo.updateJob as ReturnType; getUnscoredDiscoveredJobs = jobsRepo.getUnscoredDiscoveredJobs as ReturnType; - bulkCreateJobs = jobsRepo.bulkCreateJobs as ReturnType; + createJobs = jobsRepo.createJobs as ReturnType; // Default mock implementations scoreJobSuitability.mockResolvedValue({ score: 75, reason: "Good match" }); - bulkCreateJobs.mockResolvedValue({ created: 0, skipped: 0 }); + createJobs.mockResolvedValue({ created: 0, skipped: 0 }); updateJob.mockResolvedValue(undefined); calculateSponsorMatchSummary.mockImplementation((results: any[]) => { diff --git a/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts b/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts index b37c240..149ec24 100644 --- a/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts +++ b/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts @@ -459,8 +459,8 @@ describe("discoverJobsStep", () => { }); const progress = getProgress(); - expect(progress.crawlingTermsProcessed).toBe(1); - expect(progress.crawlingTermsTotal).toBe(2); + expect(progress.crawlingTermsProcessed).toBe(3); + expect(progress.crawlingTermsTotal).toBe(4); expect(progress.crawlingListPagesProcessed).toBe(2); expect(progress.crawlingListPagesTotal).toBe(4); expect(progress.crawlingJobPagesEnqueued).toBe(18); diff --git a/orchestrator/src/server/pipeline/steps/discover-jobs.ts b/orchestrator/src/server/pipeline/steps/discover-jobs.ts index 12d936e..acf8572 100644 --- a/orchestrator/src/server/pipeline/steps/discover-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/discover-jobs.ts @@ -13,7 +13,22 @@ import { runCrawler } from "../../services/crawler"; import { runHiringCafe } from "../../services/hiring-cafe"; import { runJobSpy } from "../../services/jobspy"; import { runUkVisaJobs } from "../../services/ukvisajobs"; -import { progressHelpers, updateProgress } from "../progress"; +import { asyncPool } from "../../utils/async-pool"; +import { type CrawlSource, progressHelpers, updateProgress } from "../progress"; + +const DISCOVERY_CONCURRENCY = 3; + +type DiscoveryTaskResult = { + discoveredJobs: CreateJobInput[]; + sourceErrors: string[]; +}; + +type DiscoverySourceTask = { + source: CrawlSource; + termsTotal?: number; + detail: string; + run: () => Promise; +}; export async function discoverJobsStep(args: { mergedConfig: PipelineConfig; @@ -74,385 +89,431 @@ export async function discoverJobsStep(args: { source === "indeed" || source === "linkedin" || source === "glassdoor", ); - const shouldRunJobSpy = jobSpySites.length > 0; - const shouldRunAdzuna = compatibleSources.includes("adzuna"); - const shouldRunHiringCafe = compatibleSources.includes("hiringcafe"); - const shouldRunGradcracker = compatibleSources.includes("gradcracker"); - const shouldRunUkVisaJobs = compatibleSources.includes("ukvisajobs"); + const sourceTasks: DiscoverySourceTask[] = []; - const totalSources = - Number(shouldRunJobSpy) + - Number(shouldRunAdzuna) + - Number(shouldRunHiringCafe) + - Number(shouldRunGradcracker) + - Number(shouldRunUkVisaJobs); - let completedSources = 0; - - progressHelpers.startCrawling(totalSources); - - const markSourceComplete = () => { - completedSources += 1; - progressHelpers.completeSource(completedSources, totalSources); - }; - - if (args.shouldCancel?.()) { - return { discoveredJobs, sourceErrors }; - } - - if (shouldRunJobSpy) { - progressHelpers.startSource("jobspy", completedSources, totalSources, { + if (jobSpySites.length > 0) { + sourceTasks.push({ + source: "jobspy", termsTotal: searchTerms.length, detail: `JobSpy: scraping ${jobSpySites.join(", ")}...`, - }); + run: async () => { + const jobSpyResult = await runJobSpy({ + sites: jobSpySites, + searchTerms, + location: settings.jobspyLocation ?? undefined, + resultsWanted: settings.jobspyResultsWanted + ? parseInt(settings.jobspyResultsWanted, 10) + : undefined, + countryIndeed: settings.jobspyCountryIndeed ?? undefined, + onProgress: (event) => { + if (event.type === "term_start") { + progressHelpers.crawlingUpdate({ + source: "jobspy", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm, + }); + updateProgress({ + step: "crawling", + detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }); + return; + } - const jobSpyResult = await runJobSpy({ - sites: jobSpySites, - searchTerms, - location: settings.jobspyLocation ?? undefined, - resultsWanted: settings.jobspyResultsWanted - ? parseInt(settings.jobspyResultsWanted, 10) - : undefined, - countryIndeed: settings.jobspyCountryIndeed ?? undefined, - onProgress: (event) => { - if (event.type === "term_start") { - progressHelpers.crawlingUpdate({ - source: "jobspy", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); - return; - } - - progressHelpers.crawlingUpdate({ - source: "jobspy", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`, - }); - }, - }); - - if (!jobSpyResult.success) { - sourceErrors.push(`jobspy: ${jobSpyResult.error ?? "unknown error"}`); - } else { - discoveredJobs.push(...jobSpyResult.jobs); - } - - markSourceComplete(); - } - - if (args.shouldCancel?.()) { - return { discoveredJobs, sourceErrors }; - } - - if (shouldRunAdzuna) { - progressHelpers.startSource("adzuna", completedSources, totalSources, { - termsTotal: searchTerms.length, - detail: "Adzuna: fetching jobs...", - }); - - const adzunaCountryCode = getAdzunaCountryCode(selectedCountry); - if (!adzunaCountryCode) { - sourceErrors.push( - `adzuna: unsupported country ${formatCountryLabel(selectedCountry)}`, - ); - markSourceComplete(); - } else { - const adzunaMaxJobsPerTerm = settings.adzunaMaxJobsPerTerm - ? parseInt(settings.adzunaMaxJobsPerTerm, 10) - : 50; - - const adzunaResult = await runAdzuna({ - country: adzunaCountryCode, - searchTerms, - maxJobsPerTerm: adzunaMaxJobsPerTerm, - onProgress: (event) => { - if (event.type === "term_start") { progressHelpers.crawlingUpdate({ - source: "adzuna", - termsProcessed: Math.max(event.termIndex - 1, 0), + source: "jobspy", + termsProcessed: event.termIndex, termsTotal: event.termTotal, phase: "list", currentUrl: event.searchTerm, }); updateProgress({ step: "crawling", - detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`, }); - return; - } + }, + }); + + if (!jobSpyResult.success) { + return { + discoveredJobs: [], + sourceErrors: [`jobspy: ${jobSpyResult.error ?? "unknown error"}`], + }; + } + + return { + discoveredJobs: jobSpyResult.jobs, + sourceErrors: [], + }; + }, + }); + } + + if (compatibleSources.includes("adzuna")) { + sourceTasks.push({ + source: "adzuna", + termsTotal: searchTerms.length, + detail: "Adzuna: fetching jobs...", + run: async () => { + const adzunaCountryCode = getAdzunaCountryCode(selectedCountry); + if (!adzunaCountryCode) { + return { + discoveredJobs: [], + sourceErrors: [ + `adzuna: unsupported country ${formatCountryLabel(selectedCountry)}`, + ], + }; + } + + const adzunaMaxJobsPerTerm = settings.adzunaMaxJobsPerTerm + ? parseInt(settings.adzunaMaxJobsPerTerm, 10) + : 50; + + const adzunaResult = await runAdzuna({ + country: adzunaCountryCode, + searchTerms, + maxJobsPerTerm: adzunaMaxJobsPerTerm, + onProgress: (event) => { + if (event.type === "term_start") { + progressHelpers.crawlingUpdate({ + source: "adzuna", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm, + }); + updateProgress({ + step: "crawling", + detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }); + return; + } + + if (event.type === "page_fetched") { + progressHelpers.crawlingUpdate({ + source: "adzuna", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: event.pageNo, + jobPagesEnqueued: event.totalCollected, + jobPagesProcessed: event.totalCollected, + phase: "list", + currentUrl: `page ${event.pageNo}`, + }); + updateProgress({ + step: "crawling", + detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo} (${event.totalCollected} collected)`, + }); + return; + } - if (event.type === "page_fetched") { progressHelpers.crawlingUpdate({ source: "adzuna", - termsProcessed: Math.max(event.termIndex - 1, 0), + termsProcessed: event.termIndex, termsTotal: event.termTotal, - listPagesProcessed: event.pageNo, - jobPagesEnqueued: event.totalCollected, - jobPagesProcessed: event.totalCollected, phase: "list", - currentUrl: `page ${event.pageNo}`, + currentUrl: event.searchTerm, }); updateProgress({ step: "crawling", - detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo} (${event.totalCollected} collected)`, + detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, }); - return; - } + }, + }); - progressHelpers.crawlingUpdate({ - source: "adzuna", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); - }, - }); + if (!adzunaResult.success) { + return { + discoveredJobs: [], + sourceErrors: [`adzuna: ${adzunaResult.error ?? "unknown error"}`], + }; + } - if (!adzunaResult.success) { - sourceErrors.push(`adzuna: ${adzunaResult.error ?? "unknown error"}`); - } else { - discoveredJobs.push(...adzunaResult.jobs); - } - - markSourceComplete(); - } + return { + discoveredJobs: adzunaResult.jobs, + sourceErrors: [], + }; + }, + }); } - if (args.shouldCancel?.()) { - return { discoveredJobs, sourceErrors }; - } - - if (shouldRunHiringCafe) { - progressHelpers.startSource("hiringcafe", completedSources, totalSources, { + if (compatibleSources.includes("hiringcafe")) { + sourceTasks.push({ + source: "hiringcafe", termsTotal: searchTerms.length, detail: "Hiring Cafe: fetching jobs...", - }); + run: async () => { + const hiringCafeMaxJobsPerTerm = settings.jobspyResultsWanted + ? parseInt(settings.jobspyResultsWanted, 10) + : 200; - const hiringCafeMaxJobsPerTerm = settings.jobspyResultsWanted - ? parseInt(settings.jobspyResultsWanted, 10) - : 200; + const hiringCafeResult = await runHiringCafe({ + country: selectedCountry, + searchTerms, + maxJobsPerTerm: hiringCafeMaxJobsPerTerm, + onProgress: (event) => { + if (event.type === "term_start") { + progressHelpers.crawlingUpdate({ + source: "hiringcafe", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm, + }); + updateProgress({ + step: "crawling", + detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }); + return; + } - const hiringCafeResult = await runHiringCafe({ - country: selectedCountry, - searchTerms, - maxJobsPerTerm: hiringCafeMaxJobsPerTerm, - onProgress: (event) => { - if (event.type === "term_start") { - progressHelpers.crawlingUpdate({ - source: "hiringcafe", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); - return; + if (event.type === "page_fetched") { + const displayPageNo = event.pageNo + 1; + progressHelpers.crawlingUpdate({ + source: "hiringcafe", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: displayPageNo, + jobPagesEnqueued: event.totalCollected, + jobPagesProcessed: event.totalCollected, + phase: "list", + currentUrl: `page ${displayPageNo}`, + }); + updateProgress({ + step: "crawling", + detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${displayPageNo} (${event.totalCollected} collected)`, + }); + return; + } + + progressHelpers.crawlingUpdate({ + source: "hiringcafe", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm, + }); + updateProgress({ + step: "crawling", + detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }); + }, + }); + + if (!hiringCafeResult.success) { + return { + discoveredJobs: [], + sourceErrors: [ + `hiringcafe: ${hiringCafeResult.error ?? "unknown error"}`, + ], + }; } - if (event.type === "page_fetched") { - const displayPageNo = event.pageNo + 1; - progressHelpers.crawlingUpdate({ - source: "hiringcafe", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - listPagesProcessed: displayPageNo, - jobPagesEnqueued: event.totalCollected, - jobPagesProcessed: event.totalCollected, - phase: "list", - currentUrl: `page ${displayPageNo}`, - }); - updateProgress({ - step: "crawling", - detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${displayPageNo} (${event.totalCollected} collected)`, - }); - return; - } - - progressHelpers.crawlingUpdate({ - source: "hiringcafe", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); + return { + discoveredJobs: hiringCafeResult.jobs, + sourceErrors: [], + }; }, }); - - if (!hiringCafeResult.success) { - sourceErrors.push( - `hiringcafe: ${hiringCafeResult.error ?? "unknown error"}`, - ); - } else { - discoveredJobs.push(...hiringCafeResult.jobs); - } - - markSourceComplete(); } - if (args.shouldCancel?.()) { - return { discoveredJobs, sourceErrors }; - } - - if (shouldRunGradcracker) { - progressHelpers.startSource("gradcracker", completedSources, totalSources, { + if (compatibleSources.includes("gradcracker")) { + sourceTasks.push({ + source: "gradcracker", detail: "Gradcracker: scraping...", - }); + run: async () => { + const existingJobUrls = await jobsRepo.getAllJobUrls(); + const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm + ? parseInt(settings.gradcrackerMaxJobsPerTerm, 10) + : 50; - const existingJobUrls = await jobsRepo.getAllJobUrls(); - const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm - ? parseInt(settings.gradcrackerMaxJobsPerTerm, 10) - : 50; - - const crawlerResult = await runCrawler({ - existingJobUrls, - searchTerms, - maxJobsPerTerm: gradcrackerMaxJobs, - onProgress: (progress) => { - progressHelpers.crawlingUpdate({ - source: "gradcracker", - listPagesProcessed: progress.listPagesProcessed, - listPagesTotal: progress.listPagesTotal, - jobCardsFound: progress.jobCardsFound, - jobPagesEnqueued: progress.jobPagesEnqueued, - jobPagesSkipped: progress.jobPagesSkipped, - jobPagesProcessed: progress.jobPagesProcessed, - phase: progress.phase, - currentUrl: progress.currentUrl, + const crawlerResult = await runCrawler({ + existingJobUrls, + searchTerms, + maxJobsPerTerm: gradcrackerMaxJobs, + onProgress: (progress) => { + progressHelpers.crawlingUpdate({ + source: "gradcracker", + listPagesProcessed: progress.listPagesProcessed, + listPagesTotal: progress.listPagesTotal, + jobCardsFound: progress.jobCardsFound, + jobPagesEnqueued: progress.jobPagesEnqueued, + jobPagesSkipped: progress.jobPagesSkipped, + jobPagesProcessed: progress.jobPagesProcessed, + phase: progress.phase, + currentUrl: progress.currentUrl, + }); + }, }); + + if (!crawlerResult.success) { + return { + discoveredJobs: [], + sourceErrors: [ + `gradcracker: ${crawlerResult.error ?? "unknown error"}`, + ], + }; + } + + return { + discoveredJobs: crawlerResult.jobs, + sourceErrors: [], + }; }, }); - - if (!crawlerResult.success) { - sourceErrors.push( - `gradcracker: ${crawlerResult.error ?? "unknown error"}`, - ); - } else { - discoveredJobs.push(...crawlerResult.jobs); - } - - markSourceComplete(); } - if (args.shouldCancel?.()) { - return { discoveredJobs, sourceErrors }; - } - - if (shouldRunUkVisaJobs) { - progressHelpers.startSource("ukvisajobs", completedSources, totalSources, { + if (compatibleSources.includes("ukvisajobs")) { + sourceTasks.push({ + source: "ukvisajobs", termsTotal: searchTerms.length, detail: "UKVisaJobs: scraping visa-sponsoring jobs...", - }); + run: async () => { + const ukvisajobsMaxJobs = settings.ukvisajobsMaxJobs + ? parseInt(settings.ukvisajobsMaxJobs, 10) + : 50; - const ukvisajobsMaxJobs = settings.ukvisajobsMaxJobs - ? parseInt(settings.ukvisajobsMaxJobs, 10) - : 50; + const ukVisaResult = await runUkVisaJobs({ + maxJobs: ukvisajobsMaxJobs, + searchTerms, + onProgress: (event) => { + if (event.type === "init") { + progressHelpers.crawlingUpdate({ + source: "ukvisajobs", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: 0, + listPagesTotal: event.maxPages, + jobPagesEnqueued: 0, + jobPagesProcessed: 0, + jobPagesSkipped: 0, + phase: "list", + currentUrl: event.searchTerm || "all jobs", + }); + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, + }); + return; + } - const ukVisaResult = await runUkVisaJobs({ - maxJobs: ukvisajobsMaxJobs, - searchTerms, - onProgress: (event) => { - if (event.type === "init") { - progressHelpers.crawlingUpdate({ - source: "ukvisajobs", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - listPagesProcessed: 0, - listPagesTotal: event.maxPages, - jobPagesEnqueued: 0, - jobPagesProcessed: 0, - jobPagesSkipped: 0, - phase: "list", - currentUrl: event.searchTerm || "all jobs", - }); - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, - }); - return; + if (event.type === "page_fetched") { + progressHelpers.crawlingUpdate({ + source: "ukvisajobs", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: event.pageNo, + listPagesTotal: event.maxPages, + jobPagesEnqueued: event.totalCollected, + jobPagesProcessed: event.totalCollected, + phase: "list", + currentUrl: `page ${event.pageNo}/${event.maxPages}`, + }); + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`, + }); + return; + } + + if (event.type === "term_complete") { + progressHelpers.crawlingUpdate({ + source: "ukvisajobs", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + phase: "list", + currentUrl: event.searchTerm || "all jobs", + }); + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, + }); + return; + } + + if (event.type === "empty_page") { + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`, + }); + return; + } + + if (event.type === "error") { + updateProgress({ + step: "crawling", + detail: `UKVisaJobs: ${event.message}`, + }); + } + }, + }); + + if (!ukVisaResult.success) { + return { + discoveredJobs: [], + sourceErrors: [ + `ukvisajobs: ${ukVisaResult.error ?? "unknown error"}`, + ], + }; } - if (event.type === "page_fetched") { - progressHelpers.crawlingUpdate({ - source: "ukvisajobs", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - listPagesProcessed: event.pageNo, - listPagesTotal: event.maxPages, - jobPagesEnqueued: event.totalCollected, - jobPagesProcessed: event.totalCollected, - phase: "list", - currentUrl: `page ${event.pageNo}/${event.maxPages}`, - }); - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`, - }); - return; - } - - if (event.type === "term_complete") { - progressHelpers.crawlingUpdate({ - source: "ukvisajobs", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm || "all jobs", - }); - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, - }); - return; - } - - if (event.type === "empty_page") { - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`, - }); - return; - } - - if (event.type === "error") { - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: ${event.message}`, - }); - } + return { + discoveredJobs: ukVisaResult.jobs, + sourceErrors: [], + }; }, }); + } - if (!ukVisaResult.success) { - sourceErrors.push(`ukvisajobs: ${ukVisaResult.error ?? "unknown error"}`); - } else { - discoveredJobs.push(...ukVisaResult.jobs); - } + const totalSources = sourceTasks.length; + let completedSources = 0; - markSourceComplete(); + progressHelpers.startCrawling(totalSources); + + if (args.shouldCancel?.()) { + return { discoveredJobs, sourceErrors }; + } + + const sourceResults = await asyncPool({ + items: sourceTasks, + concurrency: DISCOVERY_CONCURRENCY, + shouldStop: args.shouldCancel, + onTaskStarted: (sourceTask) => { + progressHelpers.startSource( + sourceTask.source, + completedSources, + totalSources, + { + termsTotal: sourceTask.termsTotal, + detail: sourceTask.detail, + }, + ); + }, + onTaskSettled: () => { + completedSources += 1; + progressHelpers.completeSource(completedSources, totalSources); + }, + task: async (sourceTask) => { + try { + return await sourceTask.run(); + } catch (error) { + return { + discoveredJobs: [], + sourceErrors: [ + `${sourceTask.source}: ${error instanceof Error ? error.message : "unknown error"}`, + ], + }; + } + }, + }); + + for (const sourceResult of sourceResults) { + discoveredJobs.push(...sourceResult.discoveredJobs); + sourceErrors.push(...sourceResult.sourceErrors); + } + + if (args.shouldCancel?.()) { + return { discoveredJobs, sourceErrors }; } if (discoveredJobs.length === 0 && sourceErrors.length > 0) { diff --git a/orchestrator/src/server/pipeline/steps/import-jobs.ts b/orchestrator/src/server/pipeline/steps/import-jobs.ts index aee279c..9a93868 100644 --- a/orchestrator/src/server/pipeline/steps/import-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/import-jobs.ts @@ -7,9 +7,7 @@ export async function importJobsStep(args: { discoveredJobs: CreateJobInput[]; }): Promise<{ created: number; skipped: number }> { logger.info("Importing discovered jobs"); - const { created, skipped } = await jobsRepo.bulkCreateJobs( - args.discoveredJobs, - ); + const { created, skipped } = await jobsRepo.createJobs(args.discoveredJobs); logger.info("Import step complete", { created, skipped }); progressHelpers.importComplete(created, skipped); diff --git a/orchestrator/src/server/pipeline/steps/process-jobs.ts b/orchestrator/src/server/pipeline/steps/process-jobs.ts index 018ca94..b3c9e53 100644 --- a/orchestrator/src/server/pipeline/steps/process-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/process-jobs.ts @@ -1,4 +1,5 @@ import { logger } from "@infra/logger"; +import { asyncPool } from "../../utils/async-pool"; import { progressHelpers, updateProgress } from "../progress"; import type { ScoredJob } from "./types"; @@ -6,6 +7,7 @@ type ProcessJobFn = ( jobId: string, options?: { force?: boolean }, ) => Promise<{ success: boolean; error?: string }>; +const PROCESSING_CONCURRENCY = 3; export async function processJobsStep(args: { jobsToProcess: ScoredJob[]; @@ -15,31 +17,41 @@ export async function processJobsStep(args: { let processedCount = 0; if (args.jobsToProcess.length > 0) { + const total = args.jobsToProcess.length; + let startedCount = 0; + let completedCount = 0; + updateProgress({ step: "processing", jobsProcessed: 0, - totalToProcess: args.jobsToProcess.length, + totalToProcess: total, }); - for (let i = 0; i < args.jobsToProcess.length; i++) { - if (args.shouldCancel?.()) break; - - const job = args.jobsToProcess[i]; - progressHelpers.processingJob(i + 1, args.jobsToProcess.length, job); - - const result = await args.processJob(job.id, { force: false }); - - if (result.success) { - processedCount++; - } else { - logger.warn("Failed to process job", { - jobId: job.id, - error: result.error, - }); - } - - progressHelpers.jobComplete(i + 1, args.jobsToProcess.length); - } + await asyncPool({ + items: args.jobsToProcess, + concurrency: PROCESSING_CONCURRENCY, + shouldStop: args.shouldCancel, + onTaskStarted: (job) => { + startedCount += 1; + progressHelpers.processingJob(startedCount, total, job); + }, + onTaskSettled: (_job, _index) => { + completedCount += 1; + progressHelpers.jobComplete(completedCount, total); + }, + task: async (job) => { + const result = await args.processJob(job.id, { force: false }); + if (result.success) { + processedCount += 1; + } else { + logger.warn("Failed to process job", { + jobId: job.id, + error: result.error, + }); + } + return result; + }, + }); } return { processedCount }; diff --git a/orchestrator/src/server/repositories/jobs.ts b/orchestrator/src/server/repositories/jobs.ts index 8b496bc..8186ca8 100644 --- a/orchestrator/src/server/repositories/jobs.ts +++ b/orchestrator/src/server/repositories/jobs.ts @@ -164,16 +164,7 @@ export async function getAllJobUrls(): Promise { return rows.map((r) => r.jobUrl); } -/** - * Create a new job (or return existing if URL matches). - */ -export async function createJob(input: CreateJobInput): Promise { - // Check for existing job with same URL - const existing = await getJobByUrl(input.jobUrl); - if (existing) { - return existing; - } - +async function insertJob(input: CreateJobInput): Promise { const id = randomUUID(); const now = new Date().toISOString(); @@ -232,6 +223,95 @@ export async function createJob(input: CreateJobInput): Promise { return job; } +function isJobUrlUniqueViolation(error: unknown): boolean { + if (!(error instanceof Error)) return false; + return /UNIQUE constraint failed: jobs\.job_url/i.test(error.message); +} + +async function tryInsertJob(input: CreateJobInput): Promise { + try { + return await insertJob(input); + } catch (error) { + if (isJobUrlUniqueViolation(error)) return null; + throw error; + } +} + +/** + * Create jobs (or return existing jobs for duplicate URLs). + */ +export async function createJobs(input: CreateJobInput): Promise; +export async function createJobs( + inputs: CreateJobInput[], +): Promise<{ created: number; skipped: number }>; +export async function createJobs( + inputOrInputs: CreateJobInput | CreateJobInput[], +): Promise { + if (!Array.isArray(inputOrInputs)) { + const inserted = await tryInsertJob(inputOrInputs); + if (inserted) return inserted; + const existing = await getJobByUrl(inputOrInputs.jobUrl); + if (existing) return existing; + throw new Error("Failed to create or resolve existing job by URL"); + } + + const byUrl = new Map< + string, + { + input: CreateJobInput; + count: number; + } + >(); + + for (const input of inputOrInputs) { + const existing = byUrl.get(input.jobUrl); + if (existing) { + existing.count += 1; + } else { + byUrl.set(input.jobUrl, { input, count: 1 }); + } + } + + let created = 0; + let skipped = 0; + + const uniqueUrls = Array.from(byUrl.keys()); + if (uniqueUrls.length === 0) { + return { created, skipped }; + } + + const existingRows = await db + .select({ jobUrl: jobs.jobUrl }) + .from(jobs) + .where(inArray(jobs.jobUrl, uniqueUrls)); + const existingUrlSet = new Set(existingRows.map((row) => row.jobUrl)); + + for (const { input, count } of byUrl.values()) { + if (existingUrlSet.has(input.jobUrl)) { + skipped += count; + continue; + } + + const inserted = await tryInsertJob(input); + if (!inserted) { + skipped += count; + continue; + } + + created += 1; + skipped += count - 1; + } + + return { created, skipped }; +} + +/** + * Create a single job (or return existing if URL matches). + */ +export async function createJob(input: CreateJobInput): Promise { + return createJobs(input); +} + /** * Update a job. */ @@ -256,29 +336,6 @@ export async function updateJob( return getJobById(id); } -/** - * Bulk create jobs from crawler results. - */ -export async function bulkCreateJobs( - inputs: CreateJobInput[], -): Promise<{ created: number; skipped: number }> { - let created = 0; - let skipped = 0; - - for (const input of inputs) { - const existing = await getJobByUrl(input.jobUrl); - if (existing) { - skipped++; - continue; - } - - await createJob(input); - created++; - } - - return { created, skipped }; -} - /** * Get job statistics by status. */ diff --git a/orchestrator/src/server/services/post-application/review/index.ts b/orchestrator/src/server/services/post-application/review/index.ts index 6b2fc8b..165fadb 100644 --- a/orchestrator/src/server/services/post-application/review/index.ts +++ b/orchestrator/src/server/services/post-application/review/index.ts @@ -1,8 +1,8 @@ export { approvePostApplicationInboxItem, - bulkPostApplicationInboxAction, denyPostApplicationInboxItem, listPostApplicationInbox, listPostApplicationReviewRuns, listPostApplicationRunMessages, + runPostApplicationInboxAction, } from "./service"; diff --git a/orchestrator/src/server/services/post-application/review/service.ts b/orchestrator/src/server/services/post-application/review/service.ts index c980e17..90cda80 100644 --- a/orchestrator/src/server/services/post-application/review/service.ts +++ b/orchestrator/src/server/services/post-application/review/service.ts @@ -22,9 +22,9 @@ import { } from "@server/services/post-application/stage-target"; import type { ApplicationStage, - BulkPostApplicationActionRequest, - BulkPostApplicationActionResponse, - BulkPostApplicationActionResult, + PostApplicationActionRequest, + PostApplicationActionResponse, + PostApplicationActionResult, PostApplicationInboxItem, PostApplicationMessage, PostApplicationProvider, @@ -253,9 +253,9 @@ export async function denyPostApplicationInboxItem(args: { return { message: updatedMessage }; } -export async function bulkPostApplicationInboxAction( - args: BulkPostApplicationActionRequest & { decidedBy?: string | null }, -): Promise { +export async function runPostApplicationInboxAction( + args: PostApplicationActionRequest & { decidedBy?: string | null }, +): Promise { const { provider, accountKey, action, decidedBy } = args; const pendingItems = await listPostApplicationInbox({ @@ -264,7 +264,7 @@ export async function bulkPostApplicationInboxAction( limit: 1000, }); - const results: BulkPostApplicationActionResult[] = []; + const results: PostApplicationActionResult[] = []; let skipped = 0; let failed = 0; diff --git a/orchestrator/src/server/utils/async-pool.test.ts b/orchestrator/src/server/utils/async-pool.test.ts index b497b62..1a31122 100644 --- a/orchestrator/src/server/utils/async-pool.test.ts +++ b/orchestrator/src/server/utils/async-pool.test.ts @@ -75,4 +75,52 @@ describe("asyncPool", () => { expect(result.length).toBeGreaterThanOrEqual(2); expect(result.slice(0, 2)).toEqual([1, 2]); }); + + it("emits task lifecycle callbacks", async () => { + const started: number[] = []; + const settled: Array = []; + + await expect( + asyncPool({ + items: [1, 2, 3], + concurrency: 2, + onTaskStarted: (item) => { + started.push(item); + }, + onTaskSettled: (item, _index, outcome) => { + settled.push( + outcome.status === "fulfilled" + ? `${item}:ok` + : `${item}:fail:${String(outcome.error)}`, + ); + }, + task: async (item) => { + if (item === 2) throw new Error("boom"); + await new Promise((resolve) => setTimeout(resolve, 5)); + return item * 2; + }, + }), + ).rejects.toThrow("boom"); + + expect(started).toEqual(expect.arrayContaining([1, 2])); + expect(settled).toContain("1:ok"); + expect(settled.some((entry) => entry.startsWith("2:fail:"))).toBe(true); + expect(started).not.toContain(3); + }); + + it("ignores lifecycle hook failures", async () => { + const result = await asyncPool({ + items: [1, 2], + concurrency: 2, + onTaskStarted: () => { + throw new Error("hook failed"); + }, + onTaskSettled: () => { + throw new Error("hook failed"); + }, + task: async (item) => item * 2, + }); + + expect(result).toEqual([2, 4]); + }); }); diff --git a/orchestrator/src/server/utils/async-pool.ts b/orchestrator/src/server/utils/async-pool.ts index 7f1dbde..c5c9ebe 100644 --- a/orchestrator/src/server/utils/async-pool.ts +++ b/orchestrator/src/server/utils/async-pool.ts @@ -1,10 +1,20 @@ +type AsyncPoolTaskStatus = + | { status: "fulfilled"; result: TResult } + | { status: "rejected"; error: unknown }; + export async function asyncPool(args: { items: readonly TItem[]; concurrency: number; shouldStop?: () => boolean; task: (item: TItem, index: number) => Promise; + onTaskStarted?: (item: TItem, index: number) => void; + onTaskSettled?: ( + item: TItem, + index: number, + outcome: AsyncPoolTaskStatus, + ) => void; }): Promise { - const { items, task, shouldStop } = args; + const { items, task, shouldStop, onTaskStarted, onTaskSettled } = args; const rawConcurrency = Number.isFinite(args.concurrency) ? args.concurrency : 1; @@ -18,21 +28,60 @@ export async function asyncPool(args: { () => UNSET, ); let nextIndex = 0; + let firstError: unknown = null; + + const callTaskStarted = (item: TItem, index: number) => { + if (!onTaskStarted) return; + try { + onTaskStarted(item, index); + } catch { + // Hook failures should not change pool semantics. + } + }; + + const callTaskSettled = ( + item: TItem, + index: number, + outcome: AsyncPoolTaskStatus, + ) => { + if (!onTaskSettled) return; + try { + onTaskSettled(item, index, outcome); + } catch { + // Hook failures should not change pool semantics. + } + }; const worker = async (): Promise => { while (true) { - if (shouldStop?.()) return; + if (shouldStop?.() || firstError !== null) return; const currentIndex = nextIndex; nextIndex += 1; if (currentIndex >= items.length) return; - - results[currentIndex] = await task(items[currentIndex], currentIndex); + const item = items[currentIndex]; + callTaskStarted(item, currentIndex); + try { + const result = await task(item, currentIndex); + results[currentIndex] = result; + callTaskSettled(item, currentIndex, { + status: "fulfilled", + result, + }); + } catch (error) { + callTaskSettled(item, currentIndex, { + status: "rejected", + error, + }); + if (firstError === null) firstError = error; + return; + } } }; const workerCount = Math.min(safeConcurrency, items.length); await Promise.all(Array.from({ length: workerCount }, () => worker())); + if (firstError !== null) throw firstError; return results.filter((value): value is TResult => value !== UNSET); } diff --git a/shared/src/types.ts b/shared/src/types.ts index 1500fb0..900a601 100644 --- a/shared/src/types.ts +++ b/shared/src/types.ts @@ -646,15 +646,15 @@ export interface PostApplicationInboxItem { } | null; } -export type BulkPostApplicationAction = "approve" | "deny"; +export type PostApplicationAction = "approve" | "deny"; -export interface BulkPostApplicationActionRequest { - action: BulkPostApplicationAction; +export interface PostApplicationActionRequest { + action: PostApplicationAction; provider: PostApplicationProvider; accountKey: string; } -export type BulkPostApplicationActionResult = +export type PostApplicationActionResult = | { messageId: string; ok: true; @@ -670,13 +670,13 @@ export type BulkPostApplicationActionResult = }; }; -export interface BulkPostApplicationActionResponse { - action: BulkPostApplicationAction; +export interface PostApplicationActionResponse { + action: PostApplicationAction; requested: number; succeeded: number; failed: number; skipped: number; - results: BulkPostApplicationActionResult[]; + results: PostApplicationActionResult[]; } export interface JobsListResponse { @@ -693,14 +693,22 @@ export interface JobsRevisionResponse { statusFilter: string | null; } -export type BulkJobAction = "skip" | "move_to_ready" | "rescore"; +export type JobAction = "skip" | "move_to_ready" | "rescore"; -export interface BulkJobActionRequest { - action: BulkJobAction; - jobIds: string[]; -} +export type JobActionRequest = + | { + action: "skip" | "rescore"; + jobIds: string[]; + } + | { + action: "move_to_ready"; + jobIds: string[]; + options?: { + force?: boolean; + }; + }; -export type BulkJobActionResult = +export type JobActionResult = | { jobId: string; ok: true; @@ -715,18 +723,18 @@ export type BulkJobActionResult = }; }; -export interface BulkJobActionResponse { - action: BulkJobAction; +export interface JobActionResponse { + action: JobAction; requested: number; succeeded: number; failed: number; - results: BulkJobActionResult[]; + results: JobActionResult[]; } -export type BulkJobActionStreamEvent = +export type JobActionStreamEvent = | { type: "started"; - action: BulkJobAction; + action: JobAction; requested: number; completed: number; succeeded: number; @@ -735,22 +743,22 @@ export type BulkJobActionStreamEvent = } | { type: "progress"; - action: BulkJobAction; + action: JobAction; requested: number; completed: number; succeeded: number; failed: number; - result: BulkJobActionResult; + result: JobActionResult; requestId: string; } | { type: "completed"; - action: BulkJobAction; + action: JobAction; requested: number; completed: number; succeeded: number; failed: number; - results: BulkJobActionResult[]; + results: JobActionResult[]; requestId: string; } | {