feat(pipeline): parallelize discovery/process via evolved asyncPool (#211)

* feat(pipeline): centralize concurrency hooks and parallelize discovery/process steps

* feat(orchestrator): unify single and bulk job actions API

* job actions de-bulk-ified

* application inbox section debulk

* chore(orchestrator): remove remaining bulk wording from job action flow

* select multiple to skip with shortcut

* comments

* coomeents

* fix progress ordinal and add jobs actions payload examples
This commit is contained in:
Shaheer Sarfaraz 2026-02-20 16:49:13 +00:00 committed by GitHub
parent 2cb116340a
commit f3c164d252
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 1369 additions and 848 deletions

View File

@ -72,7 +72,7 @@ PDF generation uses:
Common paths:
- Discovered to finalization: `POST /api/jobs/:id/process`
- Discovered to finalization: `POST /api/jobs/actions` with `{ "action": "move_to_ready", "jobIds": ["<jobId>"] }`
- Ready regeneration: `POST /api/jobs/:id/generate-pdf`
### Regenerating PDFs after edits (copy-pasteable examples)

View File

@ -65,7 +65,18 @@ PDF generation uses:
Common paths:
- Discovered to finalization: `POST /api/jobs/:id/process`
- Discovered to finalization: `POST /api/jobs/actions` with payload:
```bash
curl -X POST "http://localhost:3001/api/jobs/actions" \
-H "content-type: application/json" \
-d '{
"action": "move_to_ready",
"jobIds": ["<jobId>"]
}'
```
- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload)
- Ready regeneration: `POST /api/jobs/:id/generate-pdf`
### Regenerating PDFs after edits (copy-pasteable examples)

View File

@ -65,7 +65,18 @@ PDF generation uses:
Common paths:
- Discovered to finalization: `POST /api/jobs/:id/process`
- Discovered to finalization: `POST /api/jobs/actions` with payload:
```bash
curl -X POST "http://localhost:3001/api/jobs/actions" \
-H "content-type: application/json" \
-d '{
"action": "move_to_ready",
"jobIds": ["<jobId>"]
}'
```
- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload)
- Ready regeneration: `POST /api/jobs/:id/generate-pdf`
### Regenerating PDFs after edits (copy-pasteable examples)

View File

@ -67,7 +67,18 @@ PDF generation uses:
Common paths:
- Discovered to finalization: `POST /api/jobs/:id/process`
- Discovered to finalization: `POST /api/jobs/actions` with payload:
```bash
curl -X POST "http://localhost:3001/api/jobs/actions" \
-H "content-type: application/json" \
-d '{
"action": "move_to_ready",
"jobIds": ["<jobId>"]
}'
```
- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload)
- Ready regeneration: `POST /api/jobs/:id/generate-pdf`
### Regenerating PDFs after edits (copy-pasteable examples)

View File

@ -72,7 +72,18 @@ PDF generation uses:
Common paths:
- Discovered to finalization: `POST /api/jobs/:id/process`
- Discovered to finalization: `POST /api/jobs/actions` with payload:
```bash
curl -X POST "http://localhost:3001/api/jobs/actions" \
-H "content-type: application/json" \
-d '{
"action": "move_to_ready",
"jobIds": ["<jobId>"]
}'
```
- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload)
- Ready regeneration: `POST /api/jobs/:id/generate-pdf`
### Regenerating PDFs after edits (copy-pasteable examples)

View File

@ -72,7 +72,18 @@ PDF generation uses:
Common paths:
- Discovered to finalization: `POST /api/jobs/:id/process`
- Discovered to finalization: `POST /api/jobs/actions` with payload:
```bash
curl -X POST "http://localhost:3001/api/jobs/actions" \
-H "content-type: application/json" \
-d '{
"action": "move_to_ready",
"jobIds": ["<jobId>"]
}'
```
- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload)
- Ready regeneration: `POST /api/jobs/:id/generate-pdf`
### Regenerating PDFs after edits (copy-pasteable examples)

View File

@ -72,7 +72,18 @@ PDF generation uses:
Common paths:
- Discovered to finalization: `POST /api/jobs/:id/process`
- Discovered to finalization: `POST /api/jobs/actions` with payload:
```bash
curl -X POST "http://localhost:3001/api/jobs/actions" \
-H "content-type: application/json" \
-d '{
"action": "move_to_ready",
"jobIds": ["<jobId>"]
}'
```
- Streaming progress: `POST /api/jobs/actions/stream` (same JSON payload)
- Ready regeneration: `POST /api/jobs/:id/generate-pdf`
### Regenerating PDFs after edits (copy-pasteable examples)

View File

@ -65,9 +65,9 @@ orchestrator/
| GET | `/api/jobs` | List all jobs (filter with `?status=ready,discovered`) |
| GET | `/api/jobs/:id` | Get single job |
| PATCH | `/api/jobs/:id` | Update job |
| POST | `/api/jobs/:id/process` | Generate resume for job |
| POST | `/api/jobs/actions` | Run job actions (`move_to_ready`, `rescore`, `skip`) for one or many jobs |
| POST | `/api/jobs/actions/stream` | Stream job action progress/events for one or many jobs |
| POST | `/api/jobs/:id/apply` | Mark as applied |
| POST | `/api/jobs/:id/skip` | Mark as skipped |
### Pipeline

View File

@ -28,7 +28,7 @@ describe("API client SSE streaming", () => {
} as Response);
await expect(
api.streamBulkJobAction(
api.streamJobAction(
{ action: "skip", jobIds: ["job-1"] },
{
onEvent: () => {

View File

@ -9,13 +9,11 @@ import type {
ApplicationTask,
AppSettings,
BackupInfo,
BulkJobActionRequest,
BulkJobActionResponse,
BulkJobActionStreamEvent,
BulkPostApplicationAction,
BulkPostApplicationActionResponse,
DemoInfoResponse,
Job,
JobActionRequest,
JobActionResponse,
JobActionStreamEvent,
JobChatMessage,
JobChatStreamEvent,
JobChatThread,
@ -29,6 +27,8 @@ import type {
ManualJobFetchResponse,
ManualJobInferenceResponse,
PipelineStatusResponse,
PostApplicationAction,
PostApplicationActionResponse,
PostApplicationInboxItem,
PostApplicationProvider,
PostApplicationProviderActionResponse,
@ -84,7 +84,7 @@ type LegacyApiResponse<T> =
};
type StreamSseInput =
| BulkJobActionRequest
| JobActionRequest
| { content: string; stream: true }
| { stream: true };
@ -734,20 +734,45 @@ export async function streamRegenerateJobGhostwriterMessage(
);
}
function toJobIdList(idOrIds: string | string[]): string[] {
return Array.isArray(idOrIds) ? idOrIds : [idOrIds];
}
export async function processJob(
ids: string[],
options?: { force?: boolean },
): Promise<JobActionResponse>;
export async function processJob(
id: string,
options?: { force?: boolean },
): Promise<Job> {
const query = options?.force ? "?force=1" : "";
return fetchApi<Job>(`/jobs/${id}/process${query}`, {
method: "POST",
): Promise<Job>;
export async function processJob(
idOrIds: string | string[],
options?: { force?: boolean },
): Promise<Job | JobActionResponse> {
const jobIds = toJobIdList(idOrIds);
const result = await runJobAction({
action: "move_to_ready",
jobIds,
...(options?.force ? { options: { force: true } } : {}),
});
if (Array.isArray(idOrIds)) return result;
return getSingleJobFromActionResult(result, idOrIds);
}
export async function rescoreJob(id: string): Promise<Job> {
return fetchApi<Job>(`/jobs/${id}/rescore`, {
method: "POST",
export async function rescoreJob(ids: string[]): Promise<JobActionResponse>;
export async function rescoreJob(id: string): Promise<Job>;
export async function rescoreJob(
idOrIds: string | string[],
): Promise<Job | JobActionResponse> {
const jobIds = toJobIdList(idOrIds);
const result = await runJobAction({
action: "rescore",
jobIds,
});
if (Array.isArray(idOrIds)) return result;
return getSingleJobFromActionResult(result, idOrIds);
}
export async function summarizeJob(
@ -778,30 +803,54 @@ export async function markAsApplied(id: string): Promise<Job> {
});
}
export async function skipJob(id: string): Promise<Job> {
return fetchApi<Job>(`/jobs/${id}/skip`, {
method: "POST",
export async function skipJob(ids: string[]): Promise<JobActionResponse>;
export async function skipJob(id: string): Promise<Job>;
export async function skipJob(
idOrIds: string | string[],
): Promise<Job | JobActionResponse> {
const jobIds = toJobIdList(idOrIds);
const result = await runJobAction({
action: "skip",
jobIds,
});
if (Array.isArray(idOrIds)) return result;
return getSingleJobFromActionResult(result, idOrIds);
}
export async function bulkJobAction(
input: BulkJobActionRequest,
): Promise<BulkJobActionResponse> {
return fetchApi<BulkJobActionResponse>("/jobs/bulk-actions", {
export async function runJobAction(
input: JobActionRequest,
): Promise<JobActionResponse> {
return fetchApi<JobActionResponse>("/jobs/actions", {
method: "POST",
body: JSON.stringify(input),
});
}
export async function streamBulkJobAction(
input: BulkJobActionRequest,
function getSingleJobFromActionResult(
response: JobActionResponse,
jobId: string,
): Job {
const result = response.results.find((entry) => entry.jobId === jobId);
if (!result) {
throw new ApiClientError("Job action did not return a result for the job");
}
if (!result.ok) {
throw new ApiClientError(result.error.message, {
code: result.error.code,
});
}
return result.job;
}
export async function streamJobAction(
input: JobActionRequest,
handlers: {
onEvent: (event: BulkJobActionStreamEvent) => void;
onEvent: (event: JobActionStreamEvent) => void;
signal?: AbortSignal;
},
): Promise<void> {
return streamSseEvents<BulkJobActionStreamEvent>(
"/jobs/bulk-actions/stream",
return streamSseEvents<JobActionStreamEvent>(
"/jobs/actions/stream",
input,
handlers,
);
@ -1083,14 +1132,14 @@ export async function denyPostApplicationInboxItem(input: {
);
}
export async function bulkPostApplicationInboxAction(input: {
action: BulkPostApplicationAction;
export async function runPostApplicationInboxAction(input: {
action: PostApplicationAction;
provider?: PostApplicationProvider;
accountKey?: string;
decidedBy?: string;
}): Promise<BulkPostApplicationActionResponse> {
return fetchApi<BulkPostApplicationActionResponse>(
"/post-application/inbox/bulk",
}): Promise<PostApplicationActionResponse> {
return fetchApi<PostApplicationActionResponse>(
"/post-application/inbox/actions",
{
method: "POST",
body: JSON.stringify({
@ -1363,7 +1412,7 @@ export async function updateVisaSponsorList(): Promise<{
});
}
// Bulk operations (intentionally none - processing is manual)
// Multi-job operations (intentionally none - processing is manual)
// Backup API
export interface BackupListResponse {

View File

@ -816,7 +816,7 @@ describe("OrchestratorPage", () => {
});
});
it("shows and hides bulk Recalculate match based on selected statuses", async () => {
it("shows and hides Recalculate match based on selected statuses", async () => {
window.matchMedia = createMatchMedia(
true,
) as unknown as typeof window.matchMedia;

View File

@ -21,7 +21,7 @@ import type { AutomaticRunValues } from "./orchestrator/automatic-run";
import { deriveExtractorLimits } from "./orchestrator/automatic-run";
import type { FilterTab } from "./orchestrator/constants";
import { tabs } from "./orchestrator/constants";
import { FloatingBulkActionsBar } from "./orchestrator/FloatingBulkActionsBar";
import { FloatingJobActionsBar } from "./orchestrator/FloatingJobActionsBar";
import { JobCommandBar } from "./orchestrator/JobCommandBar";
import { JobDetailPanel } from "./orchestrator/JobDetailPanel";
import { JobListPanel } from "./orchestrator/JobListPanel";
@ -30,8 +30,8 @@ import { OrchestratorHeader } from "./orchestrator/OrchestratorHeader";
import { OrchestratorSummary } from "./orchestrator/OrchestratorSummary";
import { RunModeModal } from "./orchestrator/RunModeModal";
import type { RunMode } from "./orchestrator/run-mode";
import { useBulkJobSelection } from "./orchestrator/useBulkJobSelection";
import { useFilteredJobs } from "./orchestrator/useFilteredJobs";
import { useJobSelectionActions } from "./orchestrator/useJobSelectionActions";
import { useOrchestratorData } from "./orchestrator/useOrchestratorData";
import { useOrchestratorFilters } from "./orchestrator/useOrchestratorFilters";
import { usePipelineSources } from "./orchestrator/usePipelineSources";
@ -179,12 +179,12 @@ export const OrchestratorPage: React.FC = () => {
canSkipSelected,
canMoveSelected,
canRescoreSelected,
bulkActionInFlight,
jobActionInFlight,
toggleSelectJob,
toggleSelectAll,
clearSelection,
runBulkAction,
} = useBulkJobSelection({
runJobAction,
} = useJobSelectionActions({
activeJobs,
activeTab,
loadJobs,
@ -403,9 +403,16 @@ export const OrchestratorPage: React.FC = () => {
// ── Context actions ─────────────────────────────────────────────────
[SHORTCUTS.skip.key]: () => {
if (!selectedJob) return;
if (!["discovered", "ready"].includes(activeTab)) return;
if (shortcutActionInFlight.current) return;
// Selection action takes precedence if selection exists
if (selectedJobIds.size > 0) {
void runJobAction("skip");
return;
}
if (!selectedJob) return;
shortcutActionInFlight.current = true;
const jobId = selectedJob.id;
api
@ -454,9 +461,9 @@ export const OrchestratorPage: React.FC = () => {
if (activeTab !== "discovered") return;
if (shortcutActionInFlight.current) return;
// Bulk action takes precedence if selection exists
// Selection action takes precedence if selection exists
if (selectedJobIds.size > 0) {
void runBulkAction("move_to_ready");
void runJobAction("move_to_ready");
return;
}
@ -713,15 +720,15 @@ export const OrchestratorPage: React.FC = () => {
</section>
</main>
<FloatingBulkActionsBar
<FloatingJobActionsBar
selectedCount={selectedJobIds.size}
canMoveSelected={canMoveSelected}
canSkipSelected={canSkipSelected}
canRescoreSelected={canRescoreSelected}
bulkActionInFlight={bulkActionInFlight !== null}
onMoveToReady={() => void runBulkAction("move_to_ready")}
onSkipSelected={() => void runBulkAction("skip")}
onRescoreSelected={() => void runBulkAction("rescore")}
jobActionInFlight={jobActionInFlight !== null}
onMoveToReady={() => void runJobAction("move_to_ready")}
onSkipSelected={() => void runJobAction("skip")}
onRescoreSelected={() => void runJobAction("rescore")}
onClear={clearSelection}
/>

View File

@ -164,7 +164,7 @@ export const TrackingInboxPage: React.FC = () => {
const isAppliedJobsLoading =
appliedJobsQuery.isPending || appliedJobsQuery.isFetching;
const [bulkActionDialog, setBulkActionDialog] = useState<{
const [inboxActionDialog, setInboxActionDialog] = useState<{
isOpen: boolean;
action: "approve" | "deny" | null;
itemCount: number;
@ -436,15 +436,15 @@ export const TrackingInboxPage: React.FC = () => {
[accountKey, appliedJobByMessageId, provider, refresh],
);
const handleBulkAction = useCallback(
const handleInboxAction = useCallback(
async (action: "approve" | "deny") => {
if (inbox.length === 0) return;
setIsActionLoading(true);
setBulkActionDialog({ isOpen: false, action: null, itemCount: 0 });
setInboxActionDialog({ isOpen: false, action: null, itemCount: 0 });
try {
const result = await api.bulkPostApplicationInboxAction({
const result = await api.runPostApplicationInboxAction({
action,
provider,
accountKey,
@ -479,7 +479,7 @@ export const TrackingInboxPage: React.FC = () => {
[accountKey, inbox.length, provider, refresh],
);
const openBulkActionDialog = useCallback(
const openInboxActionDialog = useCallback(
(action: "approve" | "deny") => {
const eligibleCount =
action === "approve"
@ -495,7 +495,7 @@ export const TrackingInboxPage: React.FC = () => {
return;
}
setBulkActionDialog({
setInboxActionDialog({
isOpen: true,
action,
itemCount: eligibleCount,
@ -706,7 +706,7 @@ export const TrackingInboxPage: React.FC = () => {
size="sm"
className="gap-1"
disabled={isActionLoading}
onClick={() => openBulkActionDialog("approve")}
onClick={() => openInboxActionDialog("approve")}
>
<CheckCircle className="h-4 w-4" />
Approve All
@ -716,7 +716,7 @@ export const TrackingInboxPage: React.FC = () => {
size="sm"
className="gap-1"
disabled={isActionLoading}
onClick={() => openBulkActionDialog("deny")}
onClick={() => openInboxActionDialog("deny")}
>
<XCircle className="h-4 w-4" />
Ignore All
@ -840,34 +840,34 @@ export const TrackingInboxPage: React.FC = () => {
</Dialog>
<AlertDialog
open={bulkActionDialog.isOpen}
open={inboxActionDialog.isOpen}
onOpenChange={(open) =>
setBulkActionDialog((previous) => ({ ...previous, isOpen: open }))
setInboxActionDialog((previous) => ({ ...previous, isOpen: open }))
}
>
<AlertDialogContent>
<AlertDialogHeader>
<AlertDialogTitle>
{bulkActionDialog.action === "approve"
{inboxActionDialog.action === "approve"
? "Approve All Messages?"
: "Ignore All Messages?"}
</AlertDialogTitle>
<AlertDialogDescription>
{bulkActionDialog.action === "approve"
? `This will approve ${bulkActionDialog.itemCount} message${bulkActionDialog.itemCount === 1 ? "" : "s"} with suggested job matches. Messages without matches will be skipped.`
: `This will ignore all ${bulkActionDialog.itemCount} pending message${bulkActionDialog.itemCount === 1 ? "" : "s"}.`}
{inboxActionDialog.action === "approve"
? `This will approve ${inboxActionDialog.itemCount} message${inboxActionDialog.itemCount === 1 ? "" : "s"} with suggested job matches. Messages without matches will be skipped.`
: `This will ignore all ${inboxActionDialog.itemCount} pending message${inboxActionDialog.itemCount === 1 ? "" : "s"}.`}
</AlertDialogDescription>
</AlertDialogHeader>
<AlertDialogFooter>
<AlertDialogCancel>Cancel</AlertDialogCancel>
<AlertDialogAction
onClick={() => {
if (bulkActionDialog.action) {
void handleBulkAction(bulkActionDialog.action);
if (inboxActionDialog.action) {
void handleInboxAction(inboxActionDialog.action);
}
}}
>
{bulkActionDialog.action === "approve"
{inboxActionDialog.action === "approve"
? "Approve All"
: "Ignore All"}
</AlertDialogAction>

View File

@ -3,24 +3,24 @@ import { useEffect, useState } from "react";
import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils";
interface FloatingBulkActionsBarProps {
interface FloatingJobActionsBarProps {
selectedCount: number;
canMoveSelected: boolean;
canSkipSelected: boolean;
canRescoreSelected: boolean;
bulkActionInFlight: boolean;
jobActionInFlight: boolean;
onMoveToReady: () => void;
onSkipSelected: () => void;
onRescoreSelected: () => void;
onClear: () => void;
}
export const FloatingBulkActionsBar: React.FC<FloatingBulkActionsBarProps> = ({
export const FloatingJobActionsBar: React.FC<FloatingJobActionsBarProps> = ({
selectedCount,
canMoveSelected,
canSkipSelected,
canRescoreSelected,
bulkActionInFlight,
jobActionInFlight,
onMoveToReady,
onSkipSelected,
onRescoreSelected,
@ -62,7 +62,7 @@ export const FloatingBulkActionsBar: React.FC<FloatingBulkActionsBarProps> = ({
size="sm"
variant="outline"
className="w-full sm:w-auto"
disabled={bulkActionInFlight}
disabled={jobActionInFlight}
onClick={onMoveToReady}
>
Move to Ready
@ -74,7 +74,7 @@ export const FloatingBulkActionsBar: React.FC<FloatingBulkActionsBarProps> = ({
size="sm"
variant="outline"
className="w-full sm:w-auto"
disabled={bulkActionInFlight}
disabled={jobActionInFlight}
onClick={onSkipSelected}
>
Skip selected
@ -86,7 +86,7 @@ export const FloatingBulkActionsBar: React.FC<FloatingBulkActionsBarProps> = ({
size="sm"
variant="outline"
className="w-full sm:w-auto"
disabled={bulkActionInFlight}
disabled={jobActionInFlight}
onClick={onRescoreSelected}
>
Recalculate match
@ -98,7 +98,7 @@ export const FloatingBulkActionsBar: React.FC<FloatingBulkActionsBarProps> = ({
variant="ghost"
className="w-full sm:w-auto"
onClick={onClear}
disabled={bulkActionInFlight}
disabled={jobActionInFlight}
>
Clear
</Button>

View File

@ -1,19 +1,19 @@
import { Progress } from "@/components/ui/progress";
import { clampNumber } from "./utils";
interface BulkActionProgressToastProps {
interface JobActionProgressToastProps {
completed: number;
requested: number;
succeeded: number;
failed: number;
}
export function BulkActionProgressToast({
export function JobActionProgressToast({
completed,
requested,
succeeded,
failed,
}: BulkActionProgressToastProps) {
}: JobActionProgressToastProps) {
const safeRequested = Math.max(requested, 1);
const safeCompleted = clampNumber(completed, 0, safeRequested);
const progressValue = Math.round((safeCompleted / safeRequested) * 100);

View File

@ -1,37 +1,35 @@
import { createJob } from "@shared/testing/factories.js";
import type { BulkJobActionResponse } from "@shared/types.js";
import type { JobActionResponse } from "@shared/types.js";
import { describe, expect, it } from "vitest";
import {
canBulkMoveToReady,
canBulkRescore,
canBulkSkip,
canMoveToReady,
canRescore,
canSkip,
getFailedJobIds,
} from "./bulkActions";
} from "./jobActions";
describe("bulkActions", () => {
describe("jobActions", () => {
it("computes eligibility for skip, move-to-ready, and rescore", () => {
expect(
canBulkSkip([
canSkip([
createJob({ id: "1", status: "discovered" }),
createJob({ id: "2", status: "ready" }),
]),
).toBe(true);
expect(canBulkSkip([createJob({ id: "1", status: "applied" })])).toBe(
false,
);
expect(canSkip([createJob({ id: "1", status: "applied" })])).toBe(false);
expect(
canBulkMoveToReady([
canMoveToReady([
createJob({ id: "1", status: "discovered" }),
createJob({ id: "2", status: "discovered" }),
]),
).toBe(true);
expect(canBulkMoveToReady([createJob({ id: "1", status: "ready" })])).toBe(
expect(canMoveToReady([createJob({ id: "1", status: "ready" })])).toBe(
false,
);
expect(
canBulkRescore([
canRescore([
createJob({ id: "1", status: "discovered" }),
createJob({ id: "2", status: "ready" }),
createJob({ id: "3", status: "applied" }),
@ -40,15 +38,15 @@ describe("bulkActions", () => {
]),
).toBe(true);
expect(
canBulkRescore([
canRescore([
createJob({ id: "1", status: "ready" }),
createJob({ id: "2", status: "processing" }),
]),
).toBe(false);
});
it("extracts failed job ids from a bulk response", () => {
const response: BulkJobActionResponse = {
it("extracts failed job ids from an action response", () => {
const response: JobActionResponse = {
action: "skip",
requested: 3,
succeeded: 1,

View File

@ -1,22 +1,22 @@
import type { BulkJobActionResponse, JobListItem } from "@shared/types";
import type { JobActionResponse, JobListItem } from "@shared/types";
const SKIPPABLE_STATUSES = new Set(["discovered", "ready"]);
export function canBulkSkip(jobs: JobListItem[]): boolean {
export function canSkip(jobs: JobListItem[]): boolean {
return (
jobs.length > 0 && jobs.every((job) => SKIPPABLE_STATUSES.has(job.status))
);
}
export function canBulkMoveToReady(jobs: JobListItem[]): boolean {
export function canMoveToReady(jobs: JobListItem[]): boolean {
return jobs.length > 0 && jobs.every((job) => job.status === "discovered");
}
export function canBulkRescore(jobs: JobListItem[]): boolean {
export function canRescore(jobs: JobListItem[]): boolean {
return jobs.length > 0 && jobs.every((job) => job.status !== "processing");
}
export function getFailedJobIds(response: BulkJobActionResponse): Set<string> {
export function getFailedJobIds(response: JobActionResponse): Set<string> {
const failedIds = response.results
.filter((result) => !result.ok)
.map((result) => result.jobId);

View File

@ -1,16 +1,13 @@
import { createJob } from "@shared/testing/factories.js";
import type {
BulkJobActionResponse,
BulkJobActionStreamEvent,
} from "@shared/types.js";
import type { JobActionResponse, JobActionStreamEvent } from "@shared/types.js";
import { act, renderHook, waitFor } from "@testing-library/react";
import { toast } from "sonner";
import { beforeEach, describe, expect, it, vi } from "vitest";
import * as api from "../../api";
import { useBulkJobSelection } from "./useBulkJobSelection";
import { useJobSelectionActions } from "./useJobSelectionActions";
vi.mock("../../api", () => ({
streamBulkJobAction: vi.fn(),
streamJobAction: vi.fn(),
}));
vi.mock("sonner", () => ({
@ -36,10 +33,10 @@ const deferred = <T>(): Deferred<T> => {
};
const asStreamEvents = (
response: BulkJobActionResponse,
requestId = "req-bulk",
): BulkJobActionStreamEvent[] => {
const events: BulkJobActionStreamEvent[] = [
response: JobActionResponse,
requestId = "req-action",
): JobActionStreamEvent[] => {
const events: JobActionStreamEvent[] = [
{
type: "started",
action: response.action,
@ -82,11 +79,11 @@ const asStreamEvents = (
return events;
};
const mockStreamBulkAction = (
response: BulkJobActionResponse,
const mockStreamJobAction = (
response: JobActionResponse,
waitForRelease?: Promise<void>,
) => {
vi.mocked(api.streamBulkJobAction).mockImplementation(
vi.mocked(api.streamJobAction).mockImplementation(
async (_input, handlers) => {
for (const event of asStreamEvents(response)) {
if (event.type === "started") handlers.onEvent(event);
@ -100,10 +97,10 @@ const mockStreamBulkAction = (
);
};
describe("useBulkJobSelection", () => {
describe("useJobSelectionActions", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.mocked(toast.loading).mockReturnValue("bulk-progress-toast");
vi.mocked(toast.loading).mockReturnValue("job-progress-toast");
});
it("caps select-all to the API max", () => {
@ -112,7 +109,7 @@ describe("useBulkJobSelection", () => {
);
const loadJobs = vi.fn().mockResolvedValue(undefined);
const { result } = renderHook(() =>
useBulkJobSelection({
useJobSelectionActions({
activeJobs,
activeTab: "discovered",
loadJobs,
@ -126,13 +123,13 @@ describe("useBulkJobSelection", () => {
expect(result.current.selectedJobIds.size).toBe(100);
});
it("does not send bulk requests above the max selection size", async () => {
it("does not send action requests above the max selection size", async () => {
const activeJobs = Array.from({ length: 101 }, (_, index) =>
createJob({ id: `job-${index + 1}`, status: "discovered" }),
);
const loadJobs = vi.fn().mockResolvedValue(undefined);
const { result } = renderHook(() =>
useBulkJobSelection({
useJobSelectionActions({
activeJobs,
activeTab: "discovered",
loadJobs,
@ -146,10 +143,10 @@ describe("useBulkJobSelection", () => {
});
await act(async () => {
await result.current.runBulkAction("skip");
await result.current.runJobAction("skip");
});
expect(api.streamBulkJobAction).not.toHaveBeenCalled();
expect(api.streamJobAction).not.toHaveBeenCalled();
});
it("reconciles failures with selection changes made during in-flight action", async () => {
@ -160,7 +157,7 @@ describe("useBulkJobSelection", () => {
];
const loadJobs = vi.fn().mockResolvedValue(undefined);
const release = deferred<void>();
mockStreamBulkAction(
mockStreamJobAction(
{
action: "skip",
requested: 2,
@ -183,7 +180,7 @@ describe("useBulkJobSelection", () => {
);
const { result } = renderHook(() =>
useBulkJobSelection({
useJobSelectionActions({
activeJobs,
activeTab: "discovered",
loadJobs,
@ -197,7 +194,7 @@ describe("useBulkJobSelection", () => {
let runPromise: Promise<void>;
await act(async () => {
runPromise = result.current.runBulkAction("skip");
runPromise = result.current.runJobAction("skip");
});
expect(toast.loading).toHaveBeenCalled();
@ -220,13 +217,13 @@ describe("useBulkJobSelection", () => {
expect(toast.dismiss).toHaveBeenCalled();
});
it("runs bulk rescore and reports success copy", async () => {
it("runs rescore and reports success copy", async () => {
const activeJobs = [
createJob({ id: "job-1", status: "ready" }),
createJob({ id: "job-2", status: "ready" }),
];
const loadJobs = vi.fn().mockResolvedValue(undefined);
mockStreamBulkAction({
mockStreamJobAction({
action: "rescore",
requested: 2,
succeeded: 2,
@ -246,7 +243,7 @@ describe("useBulkJobSelection", () => {
});
const { result } = renderHook(() =>
useBulkJobSelection({
useJobSelectionActions({
activeJobs,
activeTab: "ready",
loadJobs,
@ -259,10 +256,10 @@ describe("useBulkJobSelection", () => {
});
await act(async () => {
await result.current.runBulkAction("rescore");
await result.current.runJobAction("rescore");
});
expect(api.streamBulkJobAction).toHaveBeenCalledWith(
expect(api.streamJobAction).toHaveBeenCalledWith(
{ action: "rescore", jobIds: ["job-1", "job-2"] },
expect.objectContaining({
onEvent: expect.any(Function),

View File

@ -1,51 +1,52 @@
import type {
BulkJobAction,
BulkJobActionResponse,
JobAction,
JobActionResponse,
JobListItem,
} from "@shared/types.js";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
import * as api from "../../api";
import { BulkActionProgressToast } from "./BulkActionProgressToast";
import {
canBulkMoveToReady,
canBulkRescore,
canBulkSkip,
getFailedJobIds,
} from "./bulkActions";
import type { FilterTab } from "./constants";
import { JobActionProgressToast } from "./JobActionProgressToast";
import {
canMoveToReady,
canRescore,
canSkip,
getFailedJobIds,
} from "./jobActions";
import { clampNumber } from "./utils";
const MAX_BULK_ACTION_JOB_IDS = 100;
const MAX_JOB_ACTION_JOB_IDS = 100;
const bulkActionLabel: Record<BulkJobAction, string> = {
const jobActionLabel: Record<JobAction, string> = {
move_to_ready: "Moving jobs to Ready...",
skip: "Skipping selected jobs...",
rescore: "Calculating match scores...",
};
const bulkActionSuccessLabel: Record<BulkJobAction, string> = {
const jobActionSuccessLabel: Record<JobAction, string> = {
move_to_ready: "jobs moved to Ready",
skip: "jobs skipped",
rescore: "matches recalculated",
};
interface UseBulkJobSelectionArgs {
interface UseJobSelectionActionsArgs {
activeJobs: JobListItem[];
activeTab: FilterTab;
loadJobs: () => Promise<void>;
}
export function useBulkJobSelection({
export function useJobSelectionActions({
activeJobs,
activeTab,
loadJobs,
}: UseBulkJobSelectionArgs) {
}: UseJobSelectionActionsArgs) {
const [selectedJobIds, setSelectedJobIds] = useState<Set<string>>(
() => new Set(),
);
const [bulkActionInFlight, setBulkActionInFlight] =
useState<null | BulkJobAction>(null);
const [jobActionInFlight, setJobActionInFlight] = useState<null | JobAction>(
null,
);
const previousActiveTabRef = useRef<FilterTab>(activeTab);
const selectedJobs = useMemo(
@ -53,16 +54,13 @@ export function useBulkJobSelection({
[activeJobs, selectedJobIds],
);
const canSkipSelected = useMemo(
() => canBulkSkip(selectedJobs),
[selectedJobs],
);
const canSkipSelected = useMemo(() => canSkip(selectedJobs), [selectedJobs]);
const canMoveSelected = useMemo(
() => canBulkMoveToReady(selectedJobs),
() => canMoveToReady(selectedJobs),
[selectedJobs],
);
const canRescoreSelected = useMemo(
() => canBulkRescore(selectedJobs),
() => canRescore(selectedJobs),
[selectedJobs],
);
@ -100,13 +98,13 @@ export function useBulkJobSelection({
setSelectedJobIds(() => {
if (!checked) return new Set();
const allIds = activeJobs.map((job) => job.id);
if (allIds.length <= MAX_BULK_ACTION_JOB_IDS) {
if (allIds.length <= MAX_JOB_ACTION_JOB_IDS) {
return new Set(allIds);
}
toast.error(
`Select all is limited to ${MAX_BULK_ACTION_JOB_IDS} jobs per action.`,
`Select all is limited to ${MAX_JOB_ACTION_JOB_IDS} jobs per action.`,
);
return new Set(allIds.slice(0, MAX_BULK_ACTION_JOB_IDS));
return new Set(allIds.slice(0, MAX_JOB_ACTION_JOB_IDS));
});
},
[activeJobs],
@ -116,20 +114,20 @@ export function useBulkJobSelection({
setSelectedJobIds(new Set());
}, []);
const runBulkAction = useCallback(
async (action: BulkJobAction) => {
const runJobAction = useCallback(
async (action: JobAction) => {
const selectedAtStart = Array.from(selectedJobIds);
if (selectedAtStart.length === 0) return;
if (selectedAtStart.length > MAX_BULK_ACTION_JOB_IDS) {
if (selectedAtStart.length > MAX_JOB_ACTION_JOB_IDS) {
toast.error(
`You can run bulk actions on up to ${MAX_BULK_ACTION_JOB_IDS} jobs at a time.`,
`You can run job actions on up to ${MAX_JOB_ACTION_JOB_IDS} jobs at a time.`,
);
return;
}
const selectedAtStartSet = new Set(selectedAtStart);
let progressToastId: string | number | undefined;
let finalResult: BulkJobActionResponse | null = null;
let finalResult: JobActionResponse | null = null;
let streamError: string | null = null;
let latestProgress = {
requested: selectedAtStart.length,
@ -145,13 +143,13 @@ export function useBulkJobSelection({
0,
safeRequested,
);
return `${safeCompleted}/${safeRequested} ${bulkActionLabel[action]}`;
return `${safeCompleted}/${safeRequested} ${jobActionLabel[action]}`;
};
const upsertProgressToast = () => {
progressToastId = toast.loading(getProgressTitle(), {
description: (
<BulkActionProgressToast
<JobActionProgressToast
requested={latestProgress.requested}
completed={latestProgress.completed}
succeeded={latestProgress.succeeded}
@ -164,9 +162,9 @@ export function useBulkJobSelection({
};
try {
setBulkActionInFlight(action);
setJobActionInFlight(action);
upsertProgressToast();
await api.streamBulkJobAction(
await api.streamJobAction(
{
action,
jobIds: selectedAtStart,
@ -174,7 +172,7 @@ export function useBulkJobSelection({
{
onEvent: (event) => {
if (event.type === "error") {
streamError = event.message || "Failed to run bulk action";
streamError = event.message || "Failed to run job action";
return;
}
@ -223,12 +221,12 @@ export function useBulkJobSelection({
}
if (!finalResult) {
throw new Error("Bulk action stream ended before completion");
throw new Error("Job action stream ended before completion");
}
const result = finalResult as BulkJobActionResponse;
const result = finalResult as JobActionResponse;
const failedIds = getFailedJobIds(result);
const successLabel = bulkActionSuccessLabel[action];
const successLabel = jobActionSuccessLabel[action];
if (result.failed === 0) {
toast.success(`${result.succeeded} ${successLabel}`);
@ -255,13 +253,13 @@ export function useBulkJobSelection({
});
} catch (error) {
const message =
error instanceof Error ? error.message : "Failed to run bulk action";
error instanceof Error ? error.message : "Failed to run job action";
toast.error(message);
} finally {
if (progressToastId !== undefined) {
toast.dismiss(progressToastId);
}
setBulkActionInFlight(null);
setJobActionInFlight(null);
}
},
[selectedJobIds, loadJobs],
@ -272,10 +270,10 @@ export function useBulkJobSelection({
canSkipSelected,
canMoveSelected,
canRescoreSelected,
bulkActionInFlight,
jobActionInFlight,
toggleSelectJob,
toggleSelectAll,
clearSelection,
runBulkAction,
runJobAction,
};
}

View File

@ -396,11 +396,15 @@ describe.sequential("Jobs API routes", () => {
expect(patchBody.data.suitabilityScore).toBe(77);
expect(typeof patchBody.meta.requestId).toBe("string");
const skipRes = await fetch(`${baseUrl}/api/jobs/${job.id}/skip`, {
const skipRes = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ action: "skip", jobIds: [job.id] }),
});
const skipBody = await skipRes.json();
expect(skipBody.data.status).toBe("skipped");
expect(skipBody.data.results).toHaveLength(1);
expect(skipBody.data.results[0].ok).toBe(true);
expect(skipBody.data.results[0].job.status).toBe("skipped");
const deleteRes = await fetch(`${baseUrl}/api/jobs/status/skipped`, {
method: "DELETE",
@ -409,34 +413,34 @@ describe.sequential("Jobs API routes", () => {
expect(deleteBody.data.count).toBe(1);
});
it("runs bulk skip with partial failures", async () => {
it("runs skip action with partial failures", async () => {
const { createJob } = await import("../../repositories/jobs");
const discovered = await createJob({
source: "manual",
title: "Discovered Role",
employer: "Acme",
jobUrl: "https://example.com/job/bulk-discovered",
jobUrl: "https://example.com/job/action-discovered",
jobDescription: "Test description",
});
const ready = await createJob({
source: "manual",
title: "Ready Role",
employer: "Beta",
jobUrl: "https://example.com/job/bulk-ready",
jobUrl: "https://example.com/job/action-ready",
jobDescription: "Test description",
});
const applied = await createJob({
source: "manual",
title: "Applied Role",
employer: "Gamma",
jobUrl: "https://example.com/job/bulk-applied",
jobUrl: "https://example.com/job/action-applied",
jobDescription: "Test description",
});
const { updateJob } = await import("../../repositories/jobs");
await updateJob(ready.id, { status: "ready" });
await updateJob(applied.id, { status: "applied" });
const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, {
const res = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
@ -460,45 +464,92 @@ describe.sequential("Jobs API routes", () => {
]);
});
it("runs bulk move_to_ready and rejects ineligible statuses", async () => {
it("runs move_to_ready action and rejects ineligible statuses", async () => {
const { createJob, updateJob } = await import("../../repositories/jobs");
const discovered = await createJob({
source: "manual",
title: "New Role",
employer: "Acme",
jobUrl: "https://example.com/job/bulk-ready-1",
jobUrl: "https://example.com/job/action-ready-1",
jobDescription: "Test description",
});
const ready = await createJob({
source: "manual",
title: "Already Ready",
employer: "Acme",
jobUrl: "https://example.com/job/bulk-ready-2",
jobUrl: "https://example.com/job/action-ready-2",
jobDescription: "Test description",
});
await updateJob(ready.id, { status: "ready" });
const { processJob } = await import("../../pipeline/index");
const previousBaseUrl = process.env.JOBOPS_PUBLIC_BASE_URL;
process.env.JOBOPS_PUBLIC_BASE_URL = "https://canonical.jobops.example";
const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
action: "move_to_ready",
jobIds: [discovered.id, ready.id],
}),
});
const body = await res.json();
try {
const res = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
action: "move_to_ready",
jobIds: [discovered.id, ready.id],
}),
});
const body = await res.json();
expect(body.ok).toBe(true);
expect(body.data.succeeded).toBe(1);
expect(body.data.failed).toBe(1);
expect(vi.mocked(processJob)).toHaveBeenCalledWith(discovered.id);
expect(
body.data.results.find((r: any) => r.jobId === ready.id).error.code,
).toBe("INVALID_REQUEST");
expect(body.ok).toBe(true);
expect(body.data.succeeded).toBe(1);
expect(body.data.failed).toBe(1);
expect(vi.mocked(processJob)).toHaveBeenCalledWith(discovered.id, {
force: false,
requestOrigin: "https://canonical.jobops.example",
});
expect(
body.data.results.find((r: any) => r.jobId === ready.id).error.code,
).toBe("INVALID_REQUEST");
} finally {
if (previousBaseUrl === undefined) {
delete process.env.JOBOPS_PUBLIC_BASE_URL;
} else {
process.env.JOBOPS_PUBLIC_BASE_URL = previousBaseUrl;
}
}
});
it("runs bulk rescore with partial failures", async () => {
it("supports legacy move_to_ready endpoint", async () => {
const { createJob } = await import("../../repositories/jobs");
const { processJob } = await import("../../pipeline/index");
const job = await createJob({
source: "manual",
title: "Legacy Ready Route",
employer: "Acme",
jobUrl: "https://example.com/job/legacy-process-1",
jobDescription: "Test description",
});
const previousBaseUrl = process.env.JOBOPS_PUBLIC_BASE_URL;
process.env.JOBOPS_PUBLIC_BASE_URL = "https://canonical.jobops.example";
try {
const res = await fetch(`${baseUrl}/api/jobs/${job.id}/process`, {
method: "POST",
});
const body = await res.json();
expect(res.status).toBe(200);
expect(body.ok).toBe(true);
expect(vi.mocked(processJob)).toHaveBeenCalledWith(job.id, {
force: false,
requestOrigin: "https://canonical.jobops.example",
});
} finally {
if (previousBaseUrl === undefined) {
delete process.env.JOBOPS_PUBLIC_BASE_URL;
} else {
process.env.JOBOPS_PUBLIC_BASE_URL = previousBaseUrl;
}
}
});
it("runs rescore action with partial failures", async () => {
const { createJob, updateJob } = await import("../../repositories/jobs");
const { scoreJobSuitability } = await import("../../services/scorer");
const { getProfile } = await import("../../services/profile");
@ -506,34 +557,34 @@ describe.sequential("Jobs API routes", () => {
vi.mocked(getProfile).mockResolvedValue({});
vi.mocked(scoreJobSuitability).mockResolvedValue({
score: 81,
reason: "Updated fit from bulk rescore",
reason: "Updated fit from action rescore",
});
const discovered = await createJob({
source: "manual",
title: "Discovered Role",
employer: "Acme",
jobUrl: "https://example.com/job/bulk-rescore-1",
jobUrl: "https://example.com/job/action-rescore-1",
jobDescription: "Test description",
});
const ready = await createJob({
source: "manual",
title: "Ready Role",
employer: "Beta",
jobUrl: "https://example.com/job/bulk-rescore-2",
jobUrl: "https://example.com/job/action-rescore-2",
jobDescription: "Test description",
});
const processing = await createJob({
source: "manual",
title: "Processing Role",
employer: "Gamma",
jobUrl: "https://example.com/job/bulk-rescore-3",
jobUrl: "https://example.com/job/action-rescore-3",
jobDescription: "Test description",
});
await updateJob(ready.id, { status: "ready" });
await updateJob(processing.id, { status: "processing" });
const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, {
const res = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
@ -566,33 +617,33 @@ describe.sequential("Jobs API routes", () => {
expect(vi.mocked(getProfile)).toHaveBeenCalledTimes(1);
});
it("streams bulk action progress with done counters", async () => {
it("streams job action progress with done counters", async () => {
const { createJob, updateJob } = await import("../../repositories/jobs");
const discovered = await createJob({
source: "manual",
title: "Discovered Role",
employer: "Acme",
jobUrl: "https://example.com/job/bulk-stream-1",
jobUrl: "https://example.com/job/action-stream-1",
jobDescription: "Test description",
});
const ready = await createJob({
source: "manual",
title: "Ready Role",
employer: "Beta",
jobUrl: "https://example.com/job/bulk-stream-2",
jobUrl: "https://example.com/job/action-stream-2",
jobDescription: "Test description",
});
const applied = await createJob({
source: "manual",
title: "Applied Role",
employer: "Gamma",
jobUrl: "https://example.com/job/bulk-stream-3",
jobUrl: "https://example.com/job/action-stream-3",
jobDescription: "Test description",
});
await updateJob(ready.id, { status: "ready" });
await updateJob(applied.id, { status: "applied" });
const res = await fetch(`${baseUrl}/api/jobs/bulk-actions/stream`, {
const res = await fetch(`${baseUrl}/api/jobs/actions/stream`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
@ -655,12 +706,12 @@ describe.sequential("Jobs API routes", () => {
expect(events.at(-1)?.failed).toBe(1);
});
it("validates bulk action payloads", async () => {
it("validates job action payloads", async () => {
const tooManyIds = Array.from(
{ length: 101 },
(_, index) => `job-${index}`,
);
const res = await fetch(`${baseUrl}/api/jobs/bulk-actions`, {
const res = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
@ -719,14 +770,18 @@ describe.sequential("Jobs API routes", () => {
suitabilityReason: "Old fit",
});
const res = await fetch(`${baseUrl}/api/jobs/${job.id}/rescore`, {
const res = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ action: "rescore", jobIds: [job.id] }),
});
const body = await res.json();
expect(body.ok).toBe(true);
expect(body.data.suitabilityScore).toBe(77);
expect(body.data.suitabilityReason).toBe("Updated fit");
expect(body.data.results).toHaveLength(1);
expect(body.data.results[0].ok).toBe(true);
expect(body.data.results[0].job.suitabilityScore).toBe(77);
expect(body.data.results[0].job.suitabilityReason).toBe("Updated fit");
});
it("deletes jobs below a score threshold (excluding applied)", async () => {

View File

@ -5,11 +5,11 @@ import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse";
import {
APPLICATION_OUTCOMES,
APPLICATION_STAGES,
type BulkJobAction,
type BulkJobActionResponse,
type BulkJobActionResult,
type BulkJobActionStreamEvent,
type Job,
type JobAction,
type JobActionResponse,
type JobActionResult,
type JobActionStreamEvent,
type JobListItem,
type JobStatus,
type JobsListResponse,
@ -18,7 +18,12 @@ import {
import { type Request, type Response, Router } from "express";
import { z } from "zod";
import { isDemoMode, sendDemoBlocked } from "../../config/demo";
import { AppError, badRequest, conflict } from "../../infra/errors";
import {
AppError,
type AppErrorCode,
badRequest,
conflict,
} from "../../infra/errors";
import {
generateFinalPdf,
processJob,
@ -48,7 +53,7 @@ import * as visaSponsors from "../../services/visa-sponsors/index";
import { asyncPool } from "../../utils/async-pool";
export const jobsRouter = Router();
const BULK_ACTION_CONCURRENCY = 4;
const JOB_ACTION_CONCURRENCY = 4;
const tailoredSkillsPayloadSchema = z.array(
z.object({
@ -195,10 +200,25 @@ const updateOutcomeSchema = z.object({
closedAt: z.number().int().nullable().optional(),
});
const bulkActionRequestSchema = z.object({
action: z.enum(["skip", "move_to_ready", "rescore"]),
jobIds: z.array(z.string().min(1)).min(1).max(100),
});
const jobActionRequestSchema = z.discriminatedUnion("action", [
z.object({
action: z.literal("skip"),
jobIds: z.array(z.string().min(1)).min(1).max(100),
}),
z.object({
action: z.literal("rescore"),
jobIds: z.array(z.string().min(1)).min(1).max(100),
}),
z.object({
action: z.literal("move_to_ready"),
jobIds: z.array(z.string().min(1)).min(1).max(100),
options: z
.object({
force: z.boolean().optional(),
})
.optional(),
}),
]);
const listJobsQuerySchema = z.object({
status: z.string().optional(),
@ -277,11 +297,15 @@ function mapErrorForResult(error: unknown): {
};
}
type BulkExecutionOptions = {
type JobActionExecutionOptions = {
getProfileForRescore?: () => Promise<Record<string, unknown>>;
forceMoveToReady?: boolean;
requestOrigin?: string | null;
};
function createBulkProfileLoader(): () => Promise<Record<string, unknown>> {
function createSharedRescoreProfileLoader(): () => Promise<
Record<string, unknown>
> {
let profilePromise: Promise<Record<string, unknown>> | null = null;
return async () => {
@ -302,11 +326,11 @@ function createBulkProfileLoader(): () => Promise<Record<string, unknown>> {
};
}
async function executeBulkActionForJob(
action: BulkJobAction,
async function executeJobActionForJob(
action: JobAction,
jobId: string,
options?: BulkExecutionOptions,
): Promise<BulkJobActionResult> {
options?: JobActionExecutionOptions,
): Promise<JobActionResult> {
try {
const job = await jobsRepo.getJobById(jobId);
if (!job) {
@ -350,13 +374,29 @@ async function executeBulkActionForJob(
);
}
const processed = await processJob(jobId);
if (!processed.success) {
throw new AppError({
status: 500,
code: "INTERNAL_ERROR",
message: processed.error || "Failed to process job",
if (isDemoMode()) {
const simulated = await simulateProcessJob(jobId, {
force: options?.forceMoveToReady ?? false,
});
if (!simulated.success) {
throw new AppError({
status: 500,
code: "INTERNAL_ERROR",
message: simulated.error || "Failed to process job",
});
}
} else {
const processed = await processJob(jobId, {
force: options?.forceMoveToReady ?? false,
requestOrigin: options?.requestOrigin ?? null,
});
if (!processed.success) {
throw new AppError({
status: 500,
code: "INTERNAL_ERROR",
message: processed.error || "Failed to process job",
});
}
}
const updated = await jobsRepo.getJobById(jobId);
@ -426,6 +466,32 @@ async function executeBulkActionForJob(
}
}
function mapJobActionFailure(
failure: Extract<JobActionResult, { ok: false }>,
): AppError {
const statusByCode: Record<AppErrorCode, number> = {
INVALID_REQUEST: 400,
UNAUTHORIZED: 401,
FORBIDDEN: 403,
NOT_FOUND: 404,
REQUEST_TIMEOUT: 408,
CONFLICT: 409,
UNPROCESSABLE_ENTITY: 422,
SERVICE_UNAVAILABLE: 503,
UPSTREAM_ERROR: 502,
INTERNAL_ERROR: 500,
};
const code = (
failure.error.code in statusByCode ? failure.error.code : "INTERNAL_ERROR"
) as AppErrorCode;
return new AppError({
status: statusByCode[code],
code,
message: failure.error.message,
});
}
/**
* GET /api/jobs - List all jobs
* Query params: status (comma-separated list of statuses to filter)
@ -532,27 +598,34 @@ jobsRouter.get("/revision", async (req: Request, res: Response) => {
});
/**
* POST /api/jobs/bulk-actions - Run a bulk action across selected jobs
* POST /api/jobs/actions - Run a job action across selected jobs
*/
jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => {
jobsRouter.post("/actions", async (req: Request, res: Response) => {
try {
const parsed = bulkActionRequestSchema.parse(req.body);
const parsed = jobActionRequestSchema.parse(req.body);
const dedupedJobIds = Array.from(new Set(parsed.jobIds));
const executionOptions: BulkExecutionOptions =
parsed.action === "rescore" && !isDemoMode()
? { getProfileForRescore: createBulkProfileLoader() }
: {};
const requestOrigin = resolveRequestOrigin(req);
const executionOptions: JobActionExecutionOptions = {
...(parsed.action === "rescore" && !isDemoMode()
? { getProfileForRescore: createSharedRescoreProfileLoader() }
: {}),
...(parsed.action === "move_to_ready" &&
parsed.options?.force !== undefined
? { forceMoveToReady: parsed.options.force }
: {}),
...(parsed.action === "move_to_ready" ? { requestOrigin } : {}),
};
const results = await asyncPool({
items: dedupedJobIds,
concurrency: BULK_ACTION_CONCURRENCY,
concurrency: JOB_ACTION_CONCURRENCY,
task: async (jobId) =>
executeBulkActionForJob(parsed.action, jobId, executionOptions),
executeJobActionForJob(parsed.action, jobId, executionOptions),
});
const succeeded = results.filter((result) => result.ok).length;
const failed = results.length - succeeded;
const payload: BulkJobActionResponse = {
const payload: JobActionResponse = {
action: parsed.action,
requested: dedupedJobIds.length,
succeeded,
@ -560,20 +633,20 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => {
results,
};
logger.info("Bulk job action completed", {
route: "POST /api/jobs/bulk-actions",
logger.info("Job action completed", {
route: "POST /api/jobs/actions",
action: parsed.action,
requested: dedupedJobIds.length,
succeeded,
failed,
concurrency: BULK_ACTION_CONCURRENCY,
concurrency: JOB_ACTION_CONCURRENCY,
});
ok(res, payload);
} catch (error) {
const err =
error instanceof z.ZodError
? badRequest("Invalid bulk action request", error.flatten())
? badRequest("Invalid job action request", error.flatten())
: error instanceof AppError
? error
: new AppError({
@ -582,8 +655,8 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => {
message: error instanceof Error ? error.message : "Unknown error",
});
logger.error("Bulk job action failed", {
route: "POST /api/jobs/bulk-actions",
logger.error("Job action failed", {
route: "POST /api/jobs/actions",
status: err.status,
code: err.code,
details: err.details,
@ -594,26 +667,32 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => {
});
/**
* POST /api/jobs/bulk-actions/stream - Run a bulk action and stream per-job progress via SSE
* POST /api/jobs/actions/stream - Run a job action and stream per-job progress via SSE
*/
jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
const parsed = bulkActionRequestSchema.safeParse(req.body);
jobsRouter.post("/actions/stream", async (req: Request, res: Response) => {
const parsed = jobActionRequestSchema.safeParse(req.body);
if (!parsed.success) {
return fail(
res,
badRequest("Invalid bulk action request", parsed.error.flatten()),
badRequest("Invalid job action request", parsed.error.flatten()),
);
}
const dedupedJobIds = Array.from(new Set(parsed.data.jobIds));
const requestOrigin = resolveRequestOrigin(req);
const requestId = String(res.getHeader("x-request-id") || "unknown");
const action = parsed.data.action;
const executionOptions: BulkExecutionOptions =
action === "rescore" && !isDemoMode()
? { getProfileForRescore: createBulkProfileLoader() }
: {};
const executionOptions: JobActionExecutionOptions = {
...(action === "rescore" && !isDemoMode()
? { getProfileForRescore: createSharedRescoreProfileLoader() }
: {}),
...(action === "move_to_ready" && parsed.data.options?.force !== undefined
? { forceMoveToReady: parsed.data.options.force }
: {}),
...(action === "move_to_ready" ? { requestOrigin } : {}),
};
const requested = dedupedJobIds.length;
const results: BulkJobActionResult[] = [];
const results: JobActionResult[] = [];
let succeeded = 0;
let failed = 0;
@ -633,7 +712,7 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
const isResponseWritable = () =>
!clientDisconnected && !res.writableEnded && !res.destroyed;
const sendEvent = (event: BulkJobActionStreamEvent) => {
const sendEvent = (event: JobActionStreamEvent) => {
if (!isResponseWritable()) return false;
writeSseData(res, event);
return true;
@ -651,8 +730,8 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
requestId,
})
) {
logger.info("Client disconnected before bulk stream started", {
route: "POST /api/jobs/bulk-actions/stream",
logger.info("Client disconnected before action stream started", {
route: "POST /api/jobs/actions/stream",
action,
requested,
succeeded,
@ -664,12 +743,12 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
await asyncPool({
items: dedupedJobIds,
concurrency: BULK_ACTION_CONCURRENCY,
concurrency: JOB_ACTION_CONCURRENCY,
shouldStop: () => !isResponseWritable(),
task: async (jobId) => {
if (!isResponseWritable()) return;
const result = await executeBulkActionForJob(
const result = await executeJobActionForJob(
action,
jobId,
executionOptions,
@ -691,9 +770,9 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
})
) {
logger.info(
"Client disconnected while writing bulk stream progress",
"Client disconnected while writing action stream progress",
{
route: "POST /api/jobs/bulk-actions/stream",
route: "POST /api/jobs/actions/stream",
action,
requested,
succeeded,
@ -716,13 +795,13 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
requestId,
});
logger.info("Bulk job action stream completed", {
route: "POST /api/jobs/bulk-actions/stream",
logger.info("Job action stream completed", {
route: "POST /api/jobs/actions/stream",
action,
requested,
succeeded,
failed,
concurrency: BULK_ACTION_CONCURRENCY,
concurrency: JOB_ACTION_CONCURRENCY,
requestId,
});
} catch (error) {
@ -735,8 +814,8 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
message: error instanceof Error ? error.message : "Unknown error",
});
logger.error("Bulk job action stream failed", {
route: "POST /api/jobs/bulk-actions/stream",
logger.error("Job action stream failed", {
route: "POST /api/jobs/actions/stream",
action,
requested,
succeeded,
@ -755,7 +834,7 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
})
) {
logger.info("Skipping stream error event because client disconnected", {
route: "POST /api/jobs/bulk-actions/stream",
route: "POST /api/jobs/actions/stream",
action,
requested,
succeeded,
@ -771,6 +850,33 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
}
});
jobsRouter.post("/:id/process", async (req: Request, res: Response) => {
const forceRaw = req.query.force as string | undefined;
const force = forceRaw === "1" || forceRaw === "true";
const result = await executeJobActionForJob("move_to_ready", req.params.id, {
forceMoveToReady: force,
requestOrigin: resolveRequestOrigin(req),
});
if (!result.ok) return fail(res, mapJobActionFailure(result));
ok(res, result.job);
});
jobsRouter.post("/:id/skip", async (req: Request, res: Response) => {
const result = await executeJobActionForJob("skip", req.params.id);
if (!result.ok) return fail(res, mapJobActionFailure(result));
ok(res, result.job);
});
jobsRouter.post("/:id/rescore", async (req: Request, res: Response) => {
const result = await executeJobActionForJob("rescore", req.params.id, {
...(isDemoMode()
? {}
: { getProfileForRescore: createSharedRescoreProfileLoader() }),
});
if (!result.ok) return fail(res, mapJobActionFailure(result));
ok(res, result.job);
});
/**
* GET /api/jobs/:id - Get a single job
*/
@ -1039,54 +1145,6 @@ jobsRouter.post("/:id/summarize", async (req: Request, res: Response) => {
}
});
/**
* POST /api/jobs/:id/rescore - Regenerate suitability score + reason
*/
jobsRouter.post("/:id/rescore", async (req: Request, res: Response) => {
try {
if (isDemoMode()) {
const simulatedJob = await simulateRescoreJob(req.params.id);
return okWithMeta(res, simulatedJob, { simulated: true });
}
const job = await jobsRepo.getJobById(req.params.id);
if (!job) {
return res.status(404).json({ success: false, error: "Job not found" });
}
const rawProfile = await getProfile();
if (
!rawProfile ||
typeof rawProfile !== "object" ||
Array.isArray(rawProfile)
) {
return res
.status(400)
.json({ success: false, error: "Invalid resume profile format" });
}
const { score, reason } = await scoreJobSuitability(
job,
rawProfile as Record<string, unknown>,
);
const updatedJob = await jobsRepo.updateJob(job.id, {
suitabilityScore: score,
suitabilityReason: reason,
});
if (!updatedJob) {
return res.status(404).json({ success: false, error: "Job not found" });
}
res.json({ success: true, data: updatedJob });
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
res.status(500).json({ success: false, error: message });
}
});
/**
* POST /api/jobs/:id/check-sponsor - Check if employer is a visa sponsor
*/
@ -1166,43 +1224,6 @@ jobsRouter.post("/:id/generate-pdf", async (req: Request, res: Response) => {
}
});
/**
* POST /api/jobs/:id/process - Process a single job (generate summary + PDF)
*/
jobsRouter.post("/:id/process", async (req: Request, res: Response) => {
try {
const forceRaw = req.query.force as string | undefined;
const force = forceRaw === "1" || forceRaw === "true";
if (isDemoMode()) {
const result = await simulateProcessJob(req.params.id, { force });
if (!result.success) {
return res.status(400).json({ success: false, error: result.error });
}
const job = await jobsRepo.getJobById(req.params.id);
if (!job) {
return res.status(404).json({ success: false, error: "Job not found" });
}
return okWithMeta(res, job, { simulated: true });
}
const result = await processJob(req.params.id, {
force,
requestOrigin: resolveRequestOrigin(req),
});
if (!result.success) {
return res.status(400).json({ success: false, error: result.error });
}
const job = await jobsRepo.getJobById(req.params.id);
res.json({ success: true, data: job });
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
res.status(500).json({ success: false, error: message });
}
});
/**
* POST /api/jobs/:id/apply - Mark a job as applied
*/
@ -1255,24 +1276,6 @@ jobsRouter.post("/:id/apply", async (req: Request, res: Response) => {
}
});
/**
* POST /api/jobs/:id/skip - Mark a job as skipped
*/
jobsRouter.post("/:id/skip", async (req: Request, res: Response) => {
try {
const job = await jobsRepo.updateJob(req.params.id, { status: "skipped" });
if (!job) {
return res.status(404).json({ success: false, error: "Job not found" });
}
res.json({ success: true, data: job });
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
res.status(500).json({ success: false, error: message });
}
});
/**
* DELETE /api/jobs/status/:status - Clear jobs with a specific status
*/

View File

@ -194,7 +194,7 @@ describe.sequential("Post-Application Review Workflow API", () => {
it("counts no-suggested-match approve items as skipped, not failed", async () => {
await seedPendingMessage({ matchedJobId: null });
const res = await fetch(`${baseUrl}/api/post-application/inbox/bulk`, {
const res = await fetch(`${baseUrl}/api/post-application/inbox/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({

View File

@ -9,11 +9,11 @@ import { type Request, type Response, Router } from "express";
import { z } from "zod";
import {
approvePostApplicationInboxItem,
bulkPostApplicationInboxAction,
denyPostApplicationInboxItem,
listPostApplicationInbox,
listPostApplicationReviewRuns,
listPostApplicationRunMessages,
runPostApplicationInboxAction,
} from "../../services/post-application/review";
const listQuerySchema = z.object({
@ -46,7 +46,7 @@ const denyBodySchema = z.object({
decidedBy: z.string().max(255).optional(),
});
const bulkActionBodySchema = z.object({
const actionBodySchema = z.object({
action: z.enum(["approve", "deny"]),
provider: z.enum(POST_APPLICATION_PROVIDERS).default("gmail"),
accountKey: z.string().min(1).max(255).default("default"),
@ -179,12 +179,12 @@ postApplicationReviewRouter.post(
);
postApplicationReviewRouter.post(
"/inbox/bulk",
"/inbox/actions",
asyncRoute(async (req: Request, res: Response) => {
try {
const input = bulkActionBodySchema.parse(req.body ?? {});
const input = actionBodySchema.parse(req.body ?? {});
const result = await bulkPostApplicationInboxAction({
const result = await runPostApplicationInboxAction({
action: input.action,
provider: input.provider,
accountKey: input.accountKey,

View File

@ -67,8 +67,10 @@ describe.sequential("Basic Auth read-only enforcement", () => {
({ server, baseUrl } = await startServer());
const postRes = await fetch(`${baseUrl}/api/jobs/123/skip`, {
const postRes = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ action: "skip", jobIds: ["123"] }),
});
expect(postRes.status).toBe(401);
expect(postRes.headers.get("www-authenticate")).toBeNull();
@ -93,9 +95,13 @@ describe.sequential("Basic Auth read-only enforcement", () => {
({ server, baseUrl } = await startServer());
const authHeader = buildAuthHeader("user", "pass");
const res = await fetch(`${baseUrl}/api/jobs/123/skip`, {
const res = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { Authorization: authHeader },
headers: {
Authorization: authHeader,
"Content-Type": "application/json",
},
body: JSON.stringify({ action: "skip", jobIds: ["123"] }),
});
expect(res.status).not.toBe(401);
@ -107,7 +113,11 @@ describe.sequential("Basic Auth read-only enforcement", () => {
({ server, baseUrl } = await startServer());
const res = await fetch(`${baseUrl}/api/jobs/123/skip`, { method: "POST" });
const res = await fetch(`${baseUrl}/api/jobs/actions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ action: "skip", jobIds: ["123"] }),
});
expect(res.status).not.toBe(401);
});
});

View File

@ -89,6 +89,63 @@ const emptyCrawlingStats = {
crawlingCurrentUrl: undefined,
};
type SourceCrawlingStats = {
termsProcessed: number;
termsTotal: number;
listPagesProcessed: number;
listPagesTotal: number;
jobCardsFound: number;
jobPagesEnqueued: number;
jobPagesSkipped: number;
jobPagesProcessed: number;
};
const emptySourceCrawlingStats = (): SourceCrawlingStats => ({
termsProcessed: 0,
termsTotal: 0,
listPagesProcessed: 0,
listPagesTotal: 0,
jobCardsFound: 0,
jobPagesEnqueued: 0,
jobPagesSkipped: 0,
jobPagesProcessed: 0,
});
const crawlingStatsBySource = new Map<CrawlSource, SourceCrawlingStats>();
function aggregateCrawlingStats() {
let termsProcessed = 0;
let termsTotal = 0;
let listPagesProcessed = 0;
let listPagesTotal = 0;
let jobCardsFound = 0;
let jobPagesEnqueued = 0;
let jobPagesSkipped = 0;
let jobPagesProcessed = 0;
for (const stats of crawlingStatsBySource.values()) {
termsProcessed += stats.termsProcessed;
termsTotal += stats.termsTotal;
listPagesProcessed += stats.listPagesProcessed;
listPagesTotal += stats.listPagesTotal;
jobCardsFound += stats.jobCardsFound;
jobPagesEnqueued += stats.jobPagesEnqueued;
jobPagesSkipped += stats.jobPagesSkipped;
jobPagesProcessed += stats.jobPagesProcessed;
}
return {
termsProcessed,
termsTotal,
listPagesProcessed,
listPagesTotal,
jobCardsFound,
jobPagesEnqueued,
jobPagesSkipped,
jobPagesProcessed,
};
}
/**
* Update the current progress and notify all listeners.
*/
@ -131,6 +188,7 @@ export function subscribeToProgress(listener: ProgressListener): () => void {
* Reset progress to idle state.
*/
export function resetProgress(): void {
crawlingStatsBySource.clear();
currentProgress = {
step: "idle",
message: "Ready",
@ -150,27 +208,38 @@ export function resetProgress(): void {
*/
export const progressHelpers = {
startCrawling: (sourcesTotal = 0) =>
updateProgress({
step: "crawling",
message: "Fetching jobs from sources...",
detail: "Starting crawler",
startedAt: new Date().toISOString(),
crawlingSource: null,
crawlingSourcesCompleted: 0,
crawlingSourcesTotal: sourcesTotal,
...emptyCrawlingStats,
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
}),
(() => {
crawlingStatsBySource.clear();
updateProgress({
step: "crawling",
message: "Fetching jobs from sources...",
detail: "Starting crawler",
startedAt: new Date().toISOString(),
crawlingSource: null,
crawlingSourcesCompleted: 0,
crawlingSourcesTotal: sourcesTotal,
...emptyCrawlingStats,
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
});
})(),
startSource: (
source: CrawlSource,
sourcesCompleted: number,
sourcesTotal: number,
options?: { termsTotal?: number; detail?: string },
) =>
) => {
const existing =
crawlingStatsBySource.get(source) ?? emptySourceCrawlingStats();
crawlingStatsBySource.set(source, {
...emptySourceCrawlingStats(),
termsTotal: options?.termsTotal ?? existing.termsTotal,
});
const aggregated = aggregateCrawlingStats();
updateProgress({
step: "crawling",
message: `Fetching jobs from ${source}...`,
@ -178,9 +247,18 @@ export const progressHelpers = {
crawlingSource: source,
crawlingSourcesCompleted: sourcesCompleted,
crawlingSourcesTotal: sourcesTotal,
...emptyCrawlingStats,
crawlingTermsTotal: options?.termsTotal ?? 0,
}),
crawlingTermsProcessed: aggregated.termsProcessed,
crawlingTermsTotal: aggregated.termsTotal,
crawlingListPagesProcessed: aggregated.listPagesProcessed,
crawlingListPagesTotal: aggregated.listPagesTotal,
crawlingJobCardsFound: aggregated.jobCardsFound,
crawlingJobPagesEnqueued: aggregated.jobPagesEnqueued,
crawlingJobPagesSkipped: aggregated.jobPagesSkipped,
crawlingJobPagesProcessed: aggregated.jobPagesProcessed,
crawlingPhase: undefined,
crawlingCurrentUrl: undefined,
});
},
completeSource: (sourcesCompleted: number, sourcesTotal: number) =>
updateProgress({
@ -204,24 +282,52 @@ export const progressHelpers = {
currentUrl?: string;
}) => {
const current = getProgress();
if (update.source) {
const existing =
crawlingStatsBySource.get(update.source) ?? emptySourceCrawlingStats();
const nextForSource: SourceCrawlingStats = {
termsProcessed: update.termsProcessed ?? existing.termsProcessed,
termsTotal: update.termsTotal ?? existing.termsTotal,
listPagesProcessed:
update.listPagesProcessed ?? existing.listPagesProcessed,
listPagesTotal: update.listPagesTotal ?? existing.listPagesTotal,
jobCardsFound: update.jobCardsFound ?? existing.jobCardsFound,
jobPagesEnqueued: update.jobPagesEnqueued ?? existing.jobPagesEnqueued,
jobPagesSkipped: update.jobPagesSkipped ?? existing.jobPagesSkipped,
jobPagesProcessed:
update.jobPagesProcessed ?? existing.jobPagesProcessed,
};
crawlingStatsBySource.set(update.source, nextForSource);
}
const aggregated = aggregateCrawlingStats();
const next = {
...current,
crawlingSource: update.source ?? current.crawlingSource,
crawlingTermsProcessed:
update.termsProcessed ?? current.crawlingTermsProcessed,
crawlingTermsTotal: update.termsTotal ?? current.crawlingTermsTotal,
crawlingListPagesProcessed:
update.listPagesProcessed ?? current.crawlingListPagesProcessed,
crawlingListPagesTotal:
update.listPagesTotal ?? current.crawlingListPagesTotal,
crawlingJobCardsFound:
update.jobCardsFound ?? current.crawlingJobCardsFound,
crawlingJobPagesEnqueued:
update.jobPagesEnqueued ?? current.crawlingJobPagesEnqueued,
crawlingJobPagesSkipped:
update.jobPagesSkipped ?? current.crawlingJobPagesSkipped,
crawlingJobPagesProcessed:
update.jobPagesProcessed ?? current.crawlingJobPagesProcessed,
crawlingTermsProcessed: update.source
? aggregated.termsProcessed
: (update.termsProcessed ?? current.crawlingTermsProcessed),
crawlingTermsTotal: update.source
? aggregated.termsTotal
: (update.termsTotal ?? current.crawlingTermsTotal),
crawlingListPagesProcessed: update.source
? aggregated.listPagesProcessed
: (update.listPagesProcessed ?? current.crawlingListPagesProcessed),
crawlingListPagesTotal: update.source
? aggregated.listPagesTotal
: (update.listPagesTotal ?? current.crawlingListPagesTotal),
crawlingJobCardsFound: update.source
? aggregated.jobCardsFound
: (update.jobCardsFound ?? current.crawlingJobCardsFound),
crawlingJobPagesEnqueued: update.source
? aggregated.jobPagesEnqueued
: (update.jobPagesEnqueued ?? current.crawlingJobPagesEnqueued),
crawlingJobPagesSkipped: update.source
? aggregated.jobPagesSkipped
: (update.jobPagesSkipped ?? current.crawlingJobPagesSkipped),
crawlingJobPagesProcessed: update.source
? aggregated.jobPagesProcessed
: (update.jobPagesProcessed ?? current.crawlingJobPagesProcessed),
crawlingPhase: update.phase ?? current.crawlingPhase,
crawlingCurrentUrl: update.currentUrl ?? current.crawlingCurrentUrl,
};
@ -316,7 +422,6 @@ export const progressHelpers = {
step: "processing",
message: `Processing job ${index}/${total}...`,
detail: `${job.title} @ ${job.employer}`,
jobsProcessed: index - 1,
totalToProcess: total,
currentJob: job,
}),

View File

@ -25,7 +25,7 @@ vi.mock("../repositories/jobs", () => ({
updateJob: vi.fn(),
getUnscoredDiscoveredJobs: vi.fn(),
getJobById: vi.fn(),
bulkCreateJobs: vi.fn(),
createJobs: vi.fn(),
getAllJobUrls: vi.fn(),
}));
@ -77,7 +77,7 @@ describe("Sponsor Match Calculation", () => {
let scoreJobSuitability: ReturnType<typeof vi.fn>;
let updateJob: ReturnType<typeof vi.fn>;
let getUnscoredDiscoveredJobs: ReturnType<typeof vi.fn>;
let bulkCreateJobs: ReturnType<typeof vi.fn>;
let createJobs: ReturnType<typeof vi.fn>;
beforeEach(async () => {
vi.clearAllMocks();
@ -96,11 +96,11 @@ describe("Sponsor Match Calculation", () => {
updateJob = jobsRepo.updateJob as ReturnType<typeof vi.fn>;
getUnscoredDiscoveredJobs =
jobsRepo.getUnscoredDiscoveredJobs as ReturnType<typeof vi.fn>;
bulkCreateJobs = jobsRepo.bulkCreateJobs as ReturnType<typeof vi.fn>;
createJobs = jobsRepo.createJobs as ReturnType<typeof vi.fn>;
// Default mock implementations
scoreJobSuitability.mockResolvedValue({ score: 75, reason: "Good match" });
bulkCreateJobs.mockResolvedValue({ created: 0, skipped: 0 });
createJobs.mockResolvedValue({ created: 0, skipped: 0 });
updateJob.mockResolvedValue(undefined);
calculateSponsorMatchSummary.mockImplementation((results: any[]) => {

View File

@ -459,8 +459,8 @@ describe("discoverJobsStep", () => {
});
const progress = getProgress();
expect(progress.crawlingTermsProcessed).toBe(1);
expect(progress.crawlingTermsTotal).toBe(2);
expect(progress.crawlingTermsProcessed).toBe(3);
expect(progress.crawlingTermsTotal).toBe(4);
expect(progress.crawlingListPagesProcessed).toBe(2);
expect(progress.crawlingListPagesTotal).toBe(4);
expect(progress.crawlingJobPagesEnqueued).toBe(18);

View File

@ -13,7 +13,22 @@ import { runCrawler } from "../../services/crawler";
import { runHiringCafe } from "../../services/hiring-cafe";
import { runJobSpy } from "../../services/jobspy";
import { runUkVisaJobs } from "../../services/ukvisajobs";
import { progressHelpers, updateProgress } from "../progress";
import { asyncPool } from "../../utils/async-pool";
import { type CrawlSource, progressHelpers, updateProgress } from "../progress";
const DISCOVERY_CONCURRENCY = 3;
type DiscoveryTaskResult = {
discoveredJobs: CreateJobInput[];
sourceErrors: string[];
};
type DiscoverySourceTask = {
source: CrawlSource;
termsTotal?: number;
detail: string;
run: () => Promise<DiscoveryTaskResult>;
};
export async function discoverJobsStep(args: {
mergedConfig: PipelineConfig;
@ -74,385 +89,431 @@ export async function discoverJobsStep(args: {
source === "indeed" || source === "linkedin" || source === "glassdoor",
);
const shouldRunJobSpy = jobSpySites.length > 0;
const shouldRunAdzuna = compatibleSources.includes("adzuna");
const shouldRunHiringCafe = compatibleSources.includes("hiringcafe");
const shouldRunGradcracker = compatibleSources.includes("gradcracker");
const shouldRunUkVisaJobs = compatibleSources.includes("ukvisajobs");
const sourceTasks: DiscoverySourceTask[] = [];
const totalSources =
Number(shouldRunJobSpy) +
Number(shouldRunAdzuna) +
Number(shouldRunHiringCafe) +
Number(shouldRunGradcracker) +
Number(shouldRunUkVisaJobs);
let completedSources = 0;
progressHelpers.startCrawling(totalSources);
const markSourceComplete = () => {
completedSources += 1;
progressHelpers.completeSource(completedSources, totalSources);
};
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunJobSpy) {
progressHelpers.startSource("jobspy", completedSources, totalSources, {
if (jobSpySites.length > 0) {
sourceTasks.push({
source: "jobspy",
termsTotal: searchTerms.length,
detail: `JobSpy: scraping ${jobSpySites.join(", ")}...`,
});
run: async () => {
const jobSpyResult = await runJobSpy({
sites: jobSpySites,
searchTerms,
location: settings.jobspyLocation ?? undefined,
resultsWanted: settings.jobspyResultsWanted
? parseInt(settings.jobspyResultsWanted, 10)
: undefined,
countryIndeed: settings.jobspyCountryIndeed ?? undefined,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "jobspy",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
const jobSpyResult = await runJobSpy({
sites: jobSpySites,
searchTerms,
location: settings.jobspyLocation ?? undefined,
resultsWanted: settings.jobspyResultsWanted
? parseInt(settings.jobspyResultsWanted, 10)
: undefined,
countryIndeed: settings.jobspyCountryIndeed ?? undefined,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "jobspy",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
progressHelpers.crawlingUpdate({
source: "jobspy",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`,
});
},
});
if (!jobSpyResult.success) {
sourceErrors.push(`jobspy: ${jobSpyResult.error ?? "unknown error"}`);
} else {
discoveredJobs.push(...jobSpyResult.jobs);
}
markSourceComplete();
}
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunAdzuna) {
progressHelpers.startSource("adzuna", completedSources, totalSources, {
termsTotal: searchTerms.length,
detail: "Adzuna: fetching jobs...",
});
const adzunaCountryCode = getAdzunaCountryCode(selectedCountry);
if (!adzunaCountryCode) {
sourceErrors.push(
`adzuna: unsupported country ${formatCountryLabel(selectedCountry)}`,
);
markSourceComplete();
} else {
const adzunaMaxJobsPerTerm = settings.adzunaMaxJobsPerTerm
? parseInt(settings.adzunaMaxJobsPerTerm, 10)
: 50;
const adzunaResult = await runAdzuna({
country: adzunaCountryCode,
searchTerms,
maxJobsPerTerm: adzunaMaxJobsPerTerm,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: Math.max(event.termIndex - 1, 0),
source: "jobspy",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`,
});
return;
}
},
});
if (!jobSpyResult.success) {
return {
discoveredJobs: [],
sourceErrors: [`jobspy: ${jobSpyResult.error ?? "unknown error"}`],
};
}
return {
discoveredJobs: jobSpyResult.jobs,
sourceErrors: [],
};
},
});
}
if (compatibleSources.includes("adzuna")) {
sourceTasks.push({
source: "adzuna",
termsTotal: searchTerms.length,
detail: "Adzuna: fetching jobs...",
run: async () => {
const adzunaCountryCode = getAdzunaCountryCode(selectedCountry);
if (!adzunaCountryCode) {
return {
discoveredJobs: [],
sourceErrors: [
`adzuna: unsupported country ${formatCountryLabel(selectedCountry)}`,
],
};
}
const adzunaMaxJobsPerTerm = settings.adzunaMaxJobsPerTerm
? parseInt(settings.adzunaMaxJobsPerTerm, 10)
: 50;
const adzunaResult = await runAdzuna({
country: adzunaCountryCode,
searchTerms,
maxJobsPerTerm: adzunaMaxJobsPerTerm,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
if (event.type === "page_fetched") {
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${event.pageNo}`,
});
updateProgress({
step: "crawling",
detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo} (${event.totalCollected} collected)`,
});
return;
}
if (event.type === "page_fetched") {
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${event.pageNo}`,
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo} (${event.totalCollected} collected)`,
detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
},
});
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
},
});
if (!adzunaResult.success) {
return {
discoveredJobs: [],
sourceErrors: [`adzuna: ${adzunaResult.error ?? "unknown error"}`],
};
}
if (!adzunaResult.success) {
sourceErrors.push(`adzuna: ${adzunaResult.error ?? "unknown error"}`);
} else {
discoveredJobs.push(...adzunaResult.jobs);
}
markSourceComplete();
}
return {
discoveredJobs: adzunaResult.jobs,
sourceErrors: [],
};
},
});
}
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunHiringCafe) {
progressHelpers.startSource("hiringcafe", completedSources, totalSources, {
if (compatibleSources.includes("hiringcafe")) {
sourceTasks.push({
source: "hiringcafe",
termsTotal: searchTerms.length,
detail: "Hiring Cafe: fetching jobs...",
});
run: async () => {
const hiringCafeMaxJobsPerTerm = settings.jobspyResultsWanted
? parseInt(settings.jobspyResultsWanted, 10)
: 200;
const hiringCafeMaxJobsPerTerm = settings.jobspyResultsWanted
? parseInt(settings.jobspyResultsWanted, 10)
: 200;
const hiringCafeResult = await runHiringCafe({
country: selectedCountry,
searchTerms,
maxJobsPerTerm: hiringCafeMaxJobsPerTerm,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
const hiringCafeResult = await runHiringCafe({
country: selectedCountry,
searchTerms,
maxJobsPerTerm: hiringCafeMaxJobsPerTerm,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
if (event.type === "page_fetched") {
const displayPageNo = event.pageNo + 1;
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: displayPageNo,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${displayPageNo}`,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${displayPageNo} (${event.totalCollected} collected)`,
});
return;
}
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
},
});
if (!hiringCafeResult.success) {
return {
discoveredJobs: [],
sourceErrors: [
`hiringcafe: ${hiringCafeResult.error ?? "unknown error"}`,
],
};
}
if (event.type === "page_fetched") {
const displayPageNo = event.pageNo + 1;
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: displayPageNo,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${displayPageNo}`,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${displayPageNo} (${event.totalCollected} collected)`,
});
return;
}
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return {
discoveredJobs: hiringCafeResult.jobs,
sourceErrors: [],
};
},
});
if (!hiringCafeResult.success) {
sourceErrors.push(
`hiringcafe: ${hiringCafeResult.error ?? "unknown error"}`,
);
} else {
discoveredJobs.push(...hiringCafeResult.jobs);
}
markSourceComplete();
}
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunGradcracker) {
progressHelpers.startSource("gradcracker", completedSources, totalSources, {
if (compatibleSources.includes("gradcracker")) {
sourceTasks.push({
source: "gradcracker",
detail: "Gradcracker: scraping...",
});
run: async () => {
const existingJobUrls = await jobsRepo.getAllJobUrls();
const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm
? parseInt(settings.gradcrackerMaxJobsPerTerm, 10)
: 50;
const existingJobUrls = await jobsRepo.getAllJobUrls();
const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm
? parseInt(settings.gradcrackerMaxJobsPerTerm, 10)
: 50;
const crawlerResult = await runCrawler({
existingJobUrls,
searchTerms,
maxJobsPerTerm: gradcrackerMaxJobs,
onProgress: (progress) => {
progressHelpers.crawlingUpdate({
source: "gradcracker",
listPagesProcessed: progress.listPagesProcessed,
listPagesTotal: progress.listPagesTotal,
jobCardsFound: progress.jobCardsFound,
jobPagesEnqueued: progress.jobPagesEnqueued,
jobPagesSkipped: progress.jobPagesSkipped,
jobPagesProcessed: progress.jobPagesProcessed,
phase: progress.phase,
currentUrl: progress.currentUrl,
const crawlerResult = await runCrawler({
existingJobUrls,
searchTerms,
maxJobsPerTerm: gradcrackerMaxJobs,
onProgress: (progress) => {
progressHelpers.crawlingUpdate({
source: "gradcracker",
listPagesProcessed: progress.listPagesProcessed,
listPagesTotal: progress.listPagesTotal,
jobCardsFound: progress.jobCardsFound,
jobPagesEnqueued: progress.jobPagesEnqueued,
jobPagesSkipped: progress.jobPagesSkipped,
jobPagesProcessed: progress.jobPagesProcessed,
phase: progress.phase,
currentUrl: progress.currentUrl,
});
},
});
if (!crawlerResult.success) {
return {
discoveredJobs: [],
sourceErrors: [
`gradcracker: ${crawlerResult.error ?? "unknown error"}`,
],
};
}
return {
discoveredJobs: crawlerResult.jobs,
sourceErrors: [],
};
},
});
if (!crawlerResult.success) {
sourceErrors.push(
`gradcracker: ${crawlerResult.error ?? "unknown error"}`,
);
} else {
discoveredJobs.push(...crawlerResult.jobs);
}
markSourceComplete();
}
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (shouldRunUkVisaJobs) {
progressHelpers.startSource("ukvisajobs", completedSources, totalSources, {
if (compatibleSources.includes("ukvisajobs")) {
sourceTasks.push({
source: "ukvisajobs",
termsTotal: searchTerms.length,
detail: "UKVisaJobs: scraping visa-sponsoring jobs...",
});
run: async () => {
const ukvisajobsMaxJobs = settings.ukvisajobsMaxJobs
? parseInt(settings.ukvisajobsMaxJobs, 10)
: 50;
const ukvisajobsMaxJobs = settings.ukvisajobsMaxJobs
? parseInt(settings.ukvisajobsMaxJobs, 10)
: 50;
const ukVisaResult = await runUkVisaJobs({
maxJobs: ukvisajobsMaxJobs,
searchTerms,
onProgress: (event) => {
if (event.type === "init") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: 0,
listPagesTotal: event.maxPages,
jobPagesEnqueued: 0,
jobPagesProcessed: 0,
jobPagesSkipped: 0,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
}
const ukVisaResult = await runUkVisaJobs({
maxJobs: ukvisajobsMaxJobs,
searchTerms,
onProgress: (event) => {
if (event.type === "init") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: 0,
listPagesTotal: event.maxPages,
jobPagesEnqueued: 0,
jobPagesProcessed: 0,
jobPagesSkipped: 0,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
if (event.type === "page_fetched") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo,
listPagesTotal: event.maxPages,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${event.pageNo}/${event.maxPages}`,
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`,
});
return;
}
if (event.type === "term_complete") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
}
if (event.type === "empty_page") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`,
});
return;
}
if (event.type === "error") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: ${event.message}`,
});
}
},
});
if (!ukVisaResult.success) {
return {
discoveredJobs: [],
sourceErrors: [
`ukvisajobs: ${ukVisaResult.error ?? "unknown error"}`,
],
};
}
if (event.type === "page_fetched") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo,
listPagesTotal: event.maxPages,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${event.pageNo}/${event.maxPages}`,
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`,
});
return;
}
if (event.type === "term_complete") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
}
if (event.type === "empty_page") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`,
});
return;
}
if (event.type === "error") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: ${event.message}`,
});
}
return {
discoveredJobs: ukVisaResult.jobs,
sourceErrors: [],
};
},
});
}
if (!ukVisaResult.success) {
sourceErrors.push(`ukvisajobs: ${ukVisaResult.error ?? "unknown error"}`);
} else {
discoveredJobs.push(...ukVisaResult.jobs);
}
const totalSources = sourceTasks.length;
let completedSources = 0;
markSourceComplete();
progressHelpers.startCrawling(totalSources);
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
const sourceResults = await asyncPool({
items: sourceTasks,
concurrency: DISCOVERY_CONCURRENCY,
shouldStop: args.shouldCancel,
onTaskStarted: (sourceTask) => {
progressHelpers.startSource(
sourceTask.source,
completedSources,
totalSources,
{
termsTotal: sourceTask.termsTotal,
detail: sourceTask.detail,
},
);
},
onTaskSettled: () => {
completedSources += 1;
progressHelpers.completeSource(completedSources, totalSources);
},
task: async (sourceTask) => {
try {
return await sourceTask.run();
} catch (error) {
return {
discoveredJobs: [],
sourceErrors: [
`${sourceTask.source}: ${error instanceof Error ? error.message : "unknown error"}`,
],
};
}
},
});
for (const sourceResult of sourceResults) {
discoveredJobs.push(...sourceResult.discoveredJobs);
sourceErrors.push(...sourceResult.sourceErrors);
}
if (args.shouldCancel?.()) {
return { discoveredJobs, sourceErrors };
}
if (discoveredJobs.length === 0 && sourceErrors.length > 0) {

View File

@ -7,9 +7,7 @@ export async function importJobsStep(args: {
discoveredJobs: CreateJobInput[];
}): Promise<{ created: number; skipped: number }> {
logger.info("Importing discovered jobs");
const { created, skipped } = await jobsRepo.bulkCreateJobs(
args.discoveredJobs,
);
const { created, skipped } = await jobsRepo.createJobs(args.discoveredJobs);
logger.info("Import step complete", { created, skipped });
progressHelpers.importComplete(created, skipped);

View File

@ -1,4 +1,5 @@
import { logger } from "@infra/logger";
import { asyncPool } from "../../utils/async-pool";
import { progressHelpers, updateProgress } from "../progress";
import type { ScoredJob } from "./types";
@ -6,6 +7,7 @@ type ProcessJobFn = (
jobId: string,
options?: { force?: boolean },
) => Promise<{ success: boolean; error?: string }>;
const PROCESSING_CONCURRENCY = 3;
export async function processJobsStep(args: {
jobsToProcess: ScoredJob[];
@ -15,31 +17,41 @@ export async function processJobsStep(args: {
let processedCount = 0;
if (args.jobsToProcess.length > 0) {
const total = args.jobsToProcess.length;
let startedCount = 0;
let completedCount = 0;
updateProgress({
step: "processing",
jobsProcessed: 0,
totalToProcess: args.jobsToProcess.length,
totalToProcess: total,
});
for (let i = 0; i < args.jobsToProcess.length; i++) {
if (args.shouldCancel?.()) break;
const job = args.jobsToProcess[i];
progressHelpers.processingJob(i + 1, args.jobsToProcess.length, job);
const result = await args.processJob(job.id, { force: false });
if (result.success) {
processedCount++;
} else {
logger.warn("Failed to process job", {
jobId: job.id,
error: result.error,
});
}
progressHelpers.jobComplete(i + 1, args.jobsToProcess.length);
}
await asyncPool({
items: args.jobsToProcess,
concurrency: PROCESSING_CONCURRENCY,
shouldStop: args.shouldCancel,
onTaskStarted: (job) => {
startedCount += 1;
progressHelpers.processingJob(startedCount, total, job);
},
onTaskSettled: (_job, _index) => {
completedCount += 1;
progressHelpers.jobComplete(completedCount, total);
},
task: async (job) => {
const result = await args.processJob(job.id, { force: false });
if (result.success) {
processedCount += 1;
} else {
logger.warn("Failed to process job", {
jobId: job.id,
error: result.error,
});
}
return result;
},
});
}
return { processedCount };

View File

@ -164,16 +164,7 @@ export async function getAllJobUrls(): Promise<string[]> {
return rows.map((r) => r.jobUrl);
}
/**
* Create a new job (or return existing if URL matches).
*/
export async function createJob(input: CreateJobInput): Promise<Job> {
// Check for existing job with same URL
const existing = await getJobByUrl(input.jobUrl);
if (existing) {
return existing;
}
async function insertJob(input: CreateJobInput): Promise<Job> {
const id = randomUUID();
const now = new Date().toISOString();
@ -232,6 +223,95 @@ export async function createJob(input: CreateJobInput): Promise<Job> {
return job;
}
function isJobUrlUniqueViolation(error: unknown): boolean {
if (!(error instanceof Error)) return false;
return /UNIQUE constraint failed: jobs\.job_url/i.test(error.message);
}
async function tryInsertJob(input: CreateJobInput): Promise<Job | null> {
try {
return await insertJob(input);
} catch (error) {
if (isJobUrlUniqueViolation(error)) return null;
throw error;
}
}
/**
* Create jobs (or return existing jobs for duplicate URLs).
*/
export async function createJobs(input: CreateJobInput): Promise<Job>;
export async function createJobs(
inputs: CreateJobInput[],
): Promise<{ created: number; skipped: number }>;
export async function createJobs(
inputOrInputs: CreateJobInput | CreateJobInput[],
): Promise<Job | { created: number; skipped: number }> {
if (!Array.isArray(inputOrInputs)) {
const inserted = await tryInsertJob(inputOrInputs);
if (inserted) return inserted;
const existing = await getJobByUrl(inputOrInputs.jobUrl);
if (existing) return existing;
throw new Error("Failed to create or resolve existing job by URL");
}
const byUrl = new Map<
string,
{
input: CreateJobInput;
count: number;
}
>();
for (const input of inputOrInputs) {
const existing = byUrl.get(input.jobUrl);
if (existing) {
existing.count += 1;
} else {
byUrl.set(input.jobUrl, { input, count: 1 });
}
}
let created = 0;
let skipped = 0;
const uniqueUrls = Array.from(byUrl.keys());
if (uniqueUrls.length === 0) {
return { created, skipped };
}
const existingRows = await db
.select({ jobUrl: jobs.jobUrl })
.from(jobs)
.where(inArray(jobs.jobUrl, uniqueUrls));
const existingUrlSet = new Set(existingRows.map((row) => row.jobUrl));
for (const { input, count } of byUrl.values()) {
if (existingUrlSet.has(input.jobUrl)) {
skipped += count;
continue;
}
const inserted = await tryInsertJob(input);
if (!inserted) {
skipped += count;
continue;
}
created += 1;
skipped += count - 1;
}
return { created, skipped };
}
/**
* Create a single job (or return existing if URL matches).
*/
export async function createJob(input: CreateJobInput): Promise<Job> {
return createJobs(input);
}
/**
* Update a job.
*/
@ -256,29 +336,6 @@ export async function updateJob(
return getJobById(id);
}
/**
* Bulk create jobs from crawler results.
*/
export async function bulkCreateJobs(
inputs: CreateJobInput[],
): Promise<{ created: number; skipped: number }> {
let created = 0;
let skipped = 0;
for (const input of inputs) {
const existing = await getJobByUrl(input.jobUrl);
if (existing) {
skipped++;
continue;
}
await createJob(input);
created++;
}
return { created, skipped };
}
/**
* Get job statistics by status.
*/

View File

@ -1,8 +1,8 @@
export {
approvePostApplicationInboxItem,
bulkPostApplicationInboxAction,
denyPostApplicationInboxItem,
listPostApplicationInbox,
listPostApplicationReviewRuns,
listPostApplicationRunMessages,
runPostApplicationInboxAction,
} from "./service";

View File

@ -22,9 +22,9 @@ import {
} from "@server/services/post-application/stage-target";
import type {
ApplicationStage,
BulkPostApplicationActionRequest,
BulkPostApplicationActionResponse,
BulkPostApplicationActionResult,
PostApplicationActionRequest,
PostApplicationActionResponse,
PostApplicationActionResult,
PostApplicationInboxItem,
PostApplicationMessage,
PostApplicationProvider,
@ -253,9 +253,9 @@ export async function denyPostApplicationInboxItem(args: {
return { message: updatedMessage };
}
export async function bulkPostApplicationInboxAction(
args: BulkPostApplicationActionRequest & { decidedBy?: string | null },
): Promise<BulkPostApplicationActionResponse> {
export async function runPostApplicationInboxAction(
args: PostApplicationActionRequest & { decidedBy?: string | null },
): Promise<PostApplicationActionResponse> {
const { provider, accountKey, action, decidedBy } = args;
const pendingItems = await listPostApplicationInbox({
@ -264,7 +264,7 @@ export async function bulkPostApplicationInboxAction(
limit: 1000,
});
const results: BulkPostApplicationActionResult[] = [];
const results: PostApplicationActionResult[] = [];
let skipped = 0;
let failed = 0;

View File

@ -75,4 +75,52 @@ describe("asyncPool", () => {
expect(result.length).toBeGreaterThanOrEqual(2);
expect(result.slice(0, 2)).toEqual([1, 2]);
});
it("emits task lifecycle callbacks", async () => {
const started: number[] = [];
const settled: Array<string> = [];
await expect(
asyncPool({
items: [1, 2, 3],
concurrency: 2,
onTaskStarted: (item) => {
started.push(item);
},
onTaskSettled: (item, _index, outcome) => {
settled.push(
outcome.status === "fulfilled"
? `${item}:ok`
: `${item}:fail:${String(outcome.error)}`,
);
},
task: async (item) => {
if (item === 2) throw new Error("boom");
await new Promise((resolve) => setTimeout(resolve, 5));
return item * 2;
},
}),
).rejects.toThrow("boom");
expect(started).toEqual(expect.arrayContaining([1, 2]));
expect(settled).toContain("1:ok");
expect(settled.some((entry) => entry.startsWith("2:fail:"))).toBe(true);
expect(started).not.toContain(3);
});
it("ignores lifecycle hook failures", async () => {
const result = await asyncPool({
items: [1, 2],
concurrency: 2,
onTaskStarted: () => {
throw new Error("hook failed");
},
onTaskSettled: () => {
throw new Error("hook failed");
},
task: async (item) => item * 2,
});
expect(result).toEqual([2, 4]);
});
});

View File

@ -1,10 +1,20 @@
type AsyncPoolTaskStatus<TResult> =
| { status: "fulfilled"; result: TResult }
| { status: "rejected"; error: unknown };
export async function asyncPool<TItem, TResult>(args: {
items: readonly TItem[];
concurrency: number;
shouldStop?: () => boolean;
task: (item: TItem, index: number) => Promise<TResult>;
onTaskStarted?: (item: TItem, index: number) => void;
onTaskSettled?: (
item: TItem,
index: number,
outcome: AsyncPoolTaskStatus<TResult>,
) => void;
}): Promise<TResult[]> {
const { items, task, shouldStop } = args;
const { items, task, shouldStop, onTaskStarted, onTaskSettled } = args;
const rawConcurrency = Number.isFinite(args.concurrency)
? args.concurrency
: 1;
@ -18,21 +28,60 @@ export async function asyncPool<TItem, TResult>(args: {
() => UNSET,
);
let nextIndex = 0;
let firstError: unknown = null;
const callTaskStarted = (item: TItem, index: number) => {
if (!onTaskStarted) return;
try {
onTaskStarted(item, index);
} catch {
// Hook failures should not change pool semantics.
}
};
const callTaskSettled = (
item: TItem,
index: number,
outcome: AsyncPoolTaskStatus<TResult>,
) => {
if (!onTaskSettled) return;
try {
onTaskSettled(item, index, outcome);
} catch {
// Hook failures should not change pool semantics.
}
};
const worker = async (): Promise<void> => {
while (true) {
if (shouldStop?.()) return;
if (shouldStop?.() || firstError !== null) return;
const currentIndex = nextIndex;
nextIndex += 1;
if (currentIndex >= items.length) return;
results[currentIndex] = await task(items[currentIndex], currentIndex);
const item = items[currentIndex];
callTaskStarted(item, currentIndex);
try {
const result = await task(item, currentIndex);
results[currentIndex] = result;
callTaskSettled(item, currentIndex, {
status: "fulfilled",
result,
});
} catch (error) {
callTaskSettled(item, currentIndex, {
status: "rejected",
error,
});
if (firstError === null) firstError = error;
return;
}
}
};
const workerCount = Math.min(safeConcurrency, items.length);
await Promise.all(Array.from({ length: workerCount }, () => worker()));
if (firstError !== null) throw firstError;
return results.filter((value): value is TResult => value !== UNSET);
}

View File

@ -646,15 +646,15 @@ export interface PostApplicationInboxItem {
} | null;
}
export type BulkPostApplicationAction = "approve" | "deny";
export type PostApplicationAction = "approve" | "deny";
export interface BulkPostApplicationActionRequest {
action: BulkPostApplicationAction;
export interface PostApplicationActionRequest {
action: PostApplicationAction;
provider: PostApplicationProvider;
accountKey: string;
}
export type BulkPostApplicationActionResult =
export type PostApplicationActionResult =
| {
messageId: string;
ok: true;
@ -670,13 +670,13 @@ export type BulkPostApplicationActionResult =
};
};
export interface BulkPostApplicationActionResponse {
action: BulkPostApplicationAction;
export interface PostApplicationActionResponse {
action: PostApplicationAction;
requested: number;
succeeded: number;
failed: number;
skipped: number;
results: BulkPostApplicationActionResult[];
results: PostApplicationActionResult[];
}
export interface JobsListResponse<TJob = Job> {
@ -693,14 +693,22 @@ export interface JobsRevisionResponse {
statusFilter: string | null;
}
export type BulkJobAction = "skip" | "move_to_ready" | "rescore";
export type JobAction = "skip" | "move_to_ready" | "rescore";
export interface BulkJobActionRequest {
action: BulkJobAction;
jobIds: string[];
}
export type JobActionRequest =
| {
action: "skip" | "rescore";
jobIds: string[];
}
| {
action: "move_to_ready";
jobIds: string[];
options?: {
force?: boolean;
};
};
export type BulkJobActionResult =
export type JobActionResult =
| {
jobId: string;
ok: true;
@ -715,18 +723,18 @@ export type BulkJobActionResult =
};
};
export interface BulkJobActionResponse {
action: BulkJobAction;
export interface JobActionResponse {
action: JobAction;
requested: number;
succeeded: number;
failed: number;
results: BulkJobActionResult[];
results: JobActionResult[];
}
export type BulkJobActionStreamEvent =
export type JobActionStreamEvent =
| {
type: "started";
action: BulkJobAction;
action: JobAction;
requested: number;
completed: number;
succeeded: number;
@ -735,22 +743,22 @@ export type BulkJobActionStreamEvent =
}
| {
type: "progress";
action: BulkJobAction;
action: JobAction;
requested: number;
completed: number;
succeeded: number;
failed: number;
result: BulkJobActionResult;
result: JobActionResult;
requestId: string;
}
| {
type: "completed";
action: BulkJobAction;
action: JobAction;
requested: number;
completed: number;
succeeded: number;
failed: number;
results: BulkJobActionResult[];
results: JobActionResult[];
requestId: string;
}
| {