Auto-Registering Extractor System (#223)

* initial commit?

* Address PR feedback on extractor discovery and startup resilience

* Address latest PR review comments

* fix city resolution fallback when input parses empty

* address PR feedback on extractor registry and pipeline validation

* address copilot comments on manifests and registry startup

* fix extractor discovery export handling and env isolation in tests

* enforce duplicate manifest id failures in strict mode

* Fix remaining extractor registry and runtime review comments

* docs

* docs

* test all, logic remains in extractors

* Address PR review feedback on extractor registry and validation

* Revert extractor moduleResolution to bundler

* Enforce shared city filtering across all discovery sources

* Deduplicate extractor strict city post-filtering
This commit is contained in:
Shaheer Sarfaraz 2026-02-21 17:44:07 +00:00 committed by GitHub
parent cc7cacd7f5
commit 82e142a8a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 2207 additions and 1515 deletions

View File

@ -23,8 +23,8 @@ jobs:
- name: Run Biome
run: biome ci .
test-orchestrator:
name: Orchestrator Tests
tests:
name: Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
@ -40,8 +40,8 @@ jobs:
- name: Build better-sqlite3
run: npm --workspace orchestrator rebuild better-sqlite3
working-directory: .
- name: Run Vitest
run: npm --workspace orchestrator run test:run
- name: Run Tests
run: npm run test:all
working-directory: .
typecheck:

View File

@ -7,6 +7,8 @@ sidebar_position: 1
This page helps you choose the right extractor for your run, understand key constraints, and navigate to detailed technical guides.
Extractor integrations are now registered through manifests and loaded automatically at orchestrator startup. Runtime discovery only scans `extractors/*/(manifest.ts|src/manifest.ts)` and does not read manifests from `orchestrator/**`. Extractor-specific run logic should also remain in `extractors/<name>/` so orchestrator stays source-agnostic. To add a new source, follow [Add an Extractor](/docs/next/workflows/add-an-extractor).
## Extractor chooser
| Extractor | Best use case | Core constraints/dependencies | Notable controls | Output/behavior notes |
@ -37,3 +39,4 @@ Many runs combine sources: broad discovery first, then manual import for high-pr
- [Hiring Cafe](/docs/next/extractors/hiring-cafe)
- [UKVisaJobs](/docs/next/extractors/ukvisajobs)
- [Manual Import](/docs/next/extractors/manual)
- [Add an Extractor](/docs/next/workflows/add-an-extractor)

View File

@ -0,0 +1,95 @@
---
id: add-an-extractor
title: Add an Extractor
description: How to add a new extractor using the manifest contract and shared extractor catalog.
sidebar_position: 2
---
## What it is
This guide explains how to add a new extractor that is auto-registered at orchestrator startup.
The extractor runtime is discovered from a local `manifest.ts` file, and the source is type-safe across API/client through the shared catalog in `shared/src/extractors/index.ts`.
Extractor manifests must live in extractor packages under `extractors/<name>/` only. Do not add manifest files inside `orchestrator/`.
Extractor run logic should also live in the extractor package so orchestrator stays extractor-agnostic.
## Why it exists
Without a manifest contract, adding extractors required touching multiple orchestrator files.
With the manifest system, contributors only need to:
1. Add a manifest in their extractor package.
2. Add the new source id to the shared typed catalog.
That keeps runtime wiring dynamic while preserving compile-time safety in API and client code.
## How to use it
1. Create your extractor package under `extractors/<name>/`.
2. Add a `manifest.ts` in the extractor package root (or `src/manifest.ts`).
- Valid locations are only `extractors/<name>/manifest.ts` or `extractors/<name>/src/manifest.ts`.
- `orchestrator/**/manifest.ts` is not used for extractor discovery.
3. Export a manifest with:
- `id`
- `displayName`
- `providesSources`
- `requiredEnvVars` (optional)
- `run(context)` that returns `{ success, jobs, error? }`
4. Add the new source id to `shared/src/extractors/index.ts`:
- append to `EXTRACTOR_SOURCE_IDS`
- add an entry in `EXTRACTOR_SOURCE_METADATA`
5. Ensure your extractor maps output to `CreateJobInput[]`.
6. Run the full CI checks.
Example manifest:
```ts
import type { ExtractorManifest } from "@shared/types/extractors";
export const manifest: ExtractorManifest = {
id: "myextractor",
displayName: "My Extractor",
providesSources: ["myextractor"],
requiredEnvVars: ["MYEXTRACTOR_API_KEY"],
async run(context) {
// context.searchTerms, context.settings, context.onProgress, context.shouldCancel
const jobs = [];
return { success: true, jobs };
},
};
export default manifest;
```
Subprocess extractors are supported. Keep subprocess spawning inside `run(context)` so orchestrator only depends on the manifest contract.
## Common problems
### Extractor not discovered at startup
- Check file path: `extractors/<name>/manifest.ts` or `extractors/<name>/src/manifest.ts`.
- Ensure the file exports `default` or named `manifest`.
### Source compiles in extractor but fails in API/client
- Add the new source id to `shared/src/extractors/index.ts`.
- Confirm metadata exists for that source id.
### Source appears in shared catalog but is unavailable at runtime
- The manifest was not loaded successfully.
- Check startup logs for registry warnings.
### Source requires credentials but never returns jobs
- Add and validate `requiredEnvVars`.
- Verify your manifest `run(context)` reads settings/env values correctly.
## Related pages
- [Extractors Overview](/docs/next/extractors/overview)
- [Adzuna Extractor](/docs/next/extractors/adzuna)
- [Hiring Cafe Extractor](/docs/next/extractors/hiring-cafe)
- [UKVisaJobs Extractor](/docs/next/extractors/ukvisajobs)

View File

@ -17,6 +17,7 @@ const sidebars: SidebarsConfig = {
items: [
"workflows/find-jobs-and-apply-workflow",
"workflows/post-application-workflow",
"workflows/add-an-extractor",
],
},
{

View File

@ -0,0 +1,120 @@
import { getAdzunaCountryCode } from "@shared/location-support.js";
import { resolveSearchCities } from "@shared/search-cities.js";
import type {
ExtractorManifest,
ExtractorProgressEvent,
} from "@shared/types/extractors";
import { runAdzuna } from "./src/run";
function toProgress(event: {
type: string;
termIndex: number;
termTotal: number;
searchTerm: string;
pageNo?: number;
totalCollected?: number;
}): ExtractorProgressEvent {
if (event.type === "term_start") {
return {
phase: "list",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
currentUrl: event.searchTerm,
detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
};
}
if (event.type === "page_fetched") {
const pageNo = event.pageNo ?? 0;
const totalCollected = event.totalCollected ?? 0;
return {
phase: "list",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: pageNo,
jobPagesEnqueued: totalCollected,
jobPagesProcessed: totalCollected,
currentUrl: `page ${pageNo}`,
detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${pageNo} (${totalCollected} collected)`,
};
}
return {
phase: "list",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
currentUrl: event.searchTerm,
detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
};
}
export const manifest: ExtractorManifest = {
id: "adzuna",
displayName: "Adzuna",
providesSources: ["adzuna"],
requiredEnvVars: ["ADZUNA_APP_ID", "ADZUNA_APP_KEY"],
async run(context) {
if (context.shouldCancel?.()) {
return { success: true, jobs: [] };
}
const countryCode = getAdzunaCountryCode(context.selectedCountry);
if (!countryCode) {
return {
success: false,
jobs: [],
error: `unsupported country ${context.selectedCountry}`,
};
}
const maxJobsPerTerm = context.settings.adzunaMaxJobsPerTerm
? parseInt(context.settings.adzunaMaxJobsPerTerm, 10)
: 50;
let result: Awaited<ReturnType<typeof runAdzuna>>;
try {
result = await runAdzuna({
country: countryCode,
countryKey: context.selectedCountry,
searchTerms: context.searchTerms,
locations: resolveSearchCities({
single:
context.settings.searchCities ?? context.settings.jobspyLocation,
}),
maxJobsPerTerm,
onProgress: (event) => {
if (context.shouldCancel?.()) return;
context.onProgress?.(toProgress(event));
},
});
} catch (error) {
const message =
error instanceof Error
? error.message
: typeof error === "string"
? error
: "Unexpected error while running Adzuna extractor.";
return {
success: false,
jobs: [],
error: message,
};
}
if (!result.success) {
return {
success: false,
jobs: [],
error: result.error,
};
}
return {
success: true,
jobs: result.jobs,
};
},
};
export default manifest;

View File

@ -4,19 +4,20 @@ import { createRequire } from "node:module";
import { dirname, join } from "node:path";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
import { logger } from "@infra/logger";
import { normalizeCountryKey } from "@shared/location-support.js";
import {
matchesRequestedCity,
parseSearchCitiesSetting,
resolveSearchCities,
shouldApplyStrictCityFilter,
} from "@shared/search-cities.js";
import type { CreateJobInput } from "@shared/types";
import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion";
import type { CreateJobInput } from "@shared/types/jobs";
import {
toNumberOrNull,
toStringOrNull,
} from "@shared/utils/type-conversion.js";
const __dirname = dirname(fileURLToPath(import.meta.url));
const ADZUNA_DIR = join(__dirname, "../../../../extractors/adzuna");
const DATASET_PATH = join(ADZUNA_DIR, "storage/datasets/default/jobs.json");
const srcDir = dirname(fileURLToPath(import.meta.url));
const EXTRACTOR_DIR = join(srcDir, "..");
const DATASET_PATH = join(EXTRACTOR_DIR, "storage/datasets/default/jobs.json");
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
const require = createRequire(import.meta.url);
const TSX_CLI_PATH = resolveTsxCliPath();
@ -69,20 +70,6 @@ export function shouldApplyStrictLocationFilter(
return shouldApplyStrictCityFilter(location, countryKey);
}
export function matchesRequestedLocation(
jobLocation: string | undefined,
requestedLocation: string,
): boolean {
return matchesRequestedCity(jobLocation, requestedLocation);
}
function resolveLocations(options: RunAdzunaOptions): string[] {
const raw = options.locations?.length
? options.locations
: parseSearchCitiesSetting(process.env.ADZUNA_LOCATION_QUERY ?? "");
return raw.map((value) => value.trim()).filter(Boolean);
}
function resolveTsxCliPath(): string | null {
try {
return require.resolve("tsx/dist/cli.mjs");
@ -205,7 +192,10 @@ export async function runAdzuna(
options.searchTerms && options.searchTerms.length > 0
? options.searchTerms
: ["web developer"];
const locations = resolveLocations(options);
const locations = resolveSearchCities({
list: options.locations,
env: process.env.ADZUNA_LOCATION_QUERY,
});
const runLocations = locations.length > 0 ? locations : [null];
const termTotal = searchTerms.length * runLocations.length;
const useNpmCommand = canRunNpmCommand();
@ -241,7 +231,7 @@ export async function runAdzuna(
};
const child = useNpmCommand
? spawn("npm", ["run", "start"], {
cwd: ADZUNA_DIR,
cwd: EXTRACTOR_DIR,
stdio: ["ignore", "pipe", "pipe"],
env: extractorEnv,
})
@ -253,7 +243,7 @@ export async function runAdzuna(
);
}
return spawn(process.execPath, [tsxCliPath, "src/main.ts"], {
cwd: ADZUNA_DIR,
cwd: EXTRACTOR_DIR,
stdio: ["ignore", "pipe", "pipe"],
env: extractorEnv,
});
@ -293,11 +283,7 @@ export async function runAdzuna(
});
const runJobs = await readDataset();
const filtered = strictLocationFilter
? runJobs.filter((job) =>
matchesRequestedLocation(job.location, location),
)
: runJobs;
const filtered = runJobs;
for (const job of filtered) {
const key = job.sourceJobId || job.jobUrl;
@ -310,7 +296,6 @@ export async function runAdzuna(
return { success: true, jobs };
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
logger.warn("Adzuna extractor run failed", { error: message });
return { success: false, jobs: [], error: message };
}
}

View File

@ -0,0 +1,15 @@
import { describe, expect, it } from "vitest";
import { shouldApplyStrictLocationFilter } from "../src/run";
describe("adzuna location query strictness", () => {
it("enables strict filtering when city differs from country", () => {
expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe(
true,
);
});
it("disables strict filtering when location is country-level", () => {
expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false);
expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false);
});
});

View File

@ -1,13 +1,17 @@
{
"compilerOptions": {
"module": "NodeNext",
"moduleResolution": "NodeNext",
"module": "ESNext",
"moduleResolution": "bundler",
"target": "ES2022",
"outDir": "dist",
"strict": true,
"noUnusedLocals": false,
"lib": ["ES2022", "DOM"],
"types": ["node"]
"types": ["node"],
"baseUrl": ".",
"paths": {
"@shared/*": ["../../shared/src/*"]
}
},
"include": ["./src/**/*"]
}

View File

@ -0,0 +1,56 @@
import type {
ExtractorManifest,
ExtractorRuntimeContext,
} from "@shared/types/extractors";
import { runCrawler } from "./src/run";
export const manifest: ExtractorManifest = {
id: "gradcracker",
displayName: "Gradcracker",
providesSources: ["gradcracker"],
async run(context: ExtractorRuntimeContext) {
if (context.shouldCancel?.()) {
return { success: true, jobs: [] };
}
const existingJobUrls = await context.getExistingJobUrls?.();
const maxJobsPerTerm = context.settings.gradcrackerMaxJobsPerTerm
? parseInt(context.settings.gradcrackerMaxJobsPerTerm, 10)
: 50;
const result = await runCrawler({
existingJobUrls,
searchTerms: context.searchTerms,
maxJobsPerTerm,
onProgress: (progress) => {
if (context.shouldCancel?.()) return;
context.onProgress?.({
phase: progress.phase,
currentUrl: progress.currentUrl,
listPagesProcessed: progress.listPagesProcessed,
listPagesTotal: progress.listPagesTotal,
jobCardsFound: progress.jobCardsFound,
jobPagesEnqueued: progress.jobPagesEnqueued,
jobPagesSkipped: progress.jobPagesSkipped,
jobPagesProcessed: progress.jobPagesProcessed,
});
},
});
if (!result.success) {
return {
success: false,
jobs: [],
error: result.error,
};
}
return {
success: true,
jobs: result.jobs,
};
},
};
export default manifest;

View File

@ -1,19 +1,30 @@
/**
* Service for running the Gradcracker crawler (extractors/gradcracker).
* Wraps the existing Crawlee-based crawler.
*/
import { spawn } from "node:child_process";
import { mkdir, readdir, readFile, writeFile } from "node:fs/promises";
import { mkdir, readdir, readFile, rm, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
import type { CreateJobInput } from "@shared/types";
const __dirname = dirname(fileURLToPath(import.meta.url));
const CRAWLER_DIR = join(__dirname, "../../../../extractors/gradcracker");
const STORAGE_DIR = join(CRAWLER_DIR, "storage/datasets/default");
const JOBOPS_STORAGE_DIR = join(CRAWLER_DIR, "storage/jobops");
type CreateJobInput = {
source: "gradcracker";
title: string;
employer: string;
jobUrl: string;
employerUrl?: string;
applicationLink?: string;
disciplines?: string;
deadline?: string;
salary?: string;
location?: string;
degreeRequired?: string;
starting?: string;
jobDescription?: string;
};
const srcDir = dirname(fileURLToPath(import.meta.url));
const EXTRACTOR_DIR = join(srcDir, "..");
const STORAGE_DIR = join(EXTRACTOR_DIR, "storage/datasets/default");
const JOBOPS_STORAGE_DIR = join(EXTRACTOR_DIR, "storage/jobops");
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
export interface CrawlerResult {
success: boolean;
@ -22,25 +33,9 @@ export interface CrawlerResult {
}
export interface RunCrawlerOptions {
/**
* List of job page URLs already present in the orchestrator DB.
* Used by the crawler to avoid expensive/undesired interactions (e.g. apply button click).
*/
existingJobUrls?: string[];
/**
* Optional callback for live crawl progress emitted by the Gradcracker extractor.
*/
onProgress?: (update: JobExtractorProgress) => void;
/**
* List of search terms to be used as roles for URL generation.
*/
searchTerms?: string[];
/**
* Max jobs to fetch per search term.
*/
maxJobsPerTerm?: number;
}
@ -56,8 +51,6 @@ interface JobExtractorProgress {
ts?: string;
}
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
async function writeExistingJobUrlsFile(
existingJobUrls: string[] | undefined,
): Promise<string | null> {
@ -68,26 +61,18 @@ async function writeExistingJobUrlsFile(
return filePath;
}
/**
* Run the Gradcracker crawler and return discovered jobs.
*/
export async function runCrawler(
options: RunCrawlerOptions = {},
): Promise<CrawlerResult> {
console.log("🕷️ Starting job crawler...");
try {
// Clear previous results
await clearStorageDataset();
const existingJobUrlsFile = await writeExistingJobUrlsFile(
options.existingJobUrls,
);
// Run the crawler
await new Promise<void>((resolve, reject) => {
const child = spawn("npm", ["run", "start"], {
cwd: CRAWLER_DIR,
cwd: EXTRACTOR_DIR,
shell: true,
stdio: ["ignore", "pipe", "pipe"],
env: {
@ -113,7 +98,7 @@ export async function runCrawler(
const parsed = JSON.parse(raw) as JobExtractorProgress;
options.onProgress?.(parsed);
} catch {
// Ignore malformed progress lines
// ignore malformed progress lines
}
return;
}
@ -143,66 +128,58 @@ export async function runCrawler(
child.on("error", reject);
});
// Read crawled jobs from storage
const jobs = await readCrawledJobs();
console.log(`✅ Crawler completed. Found ${jobs.length} jobs.`);
return { success: true, jobs };
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
console.error("❌ Crawler failed:", message);
return { success: false, jobs: [], error: message };
}
}
/**
* Read crawled jobs from the Crawlee storage dataset.
*/
async function readCrawledJobs(): Promise<CreateJobInput[]> {
try {
const files = await readdir(STORAGE_DIR);
const jsonFiles = files.filter((f) => f.endsWith(".json"));
const jsonFiles = files.filter((file) => file.endsWith(".json"));
const jobs: CreateJobInput[] = [];
for (const file of jsonFiles) {
const content = await readFile(join(STORAGE_DIR, file), "utf-8");
const data = JSON.parse(content);
const data = JSON.parse(content) as Record<string, unknown>;
// Map crawler output to our job input format
jobs.push({
source: "gradcracker",
title: data.title || "Unknown Title",
employer: data.employer || "Unknown Employer",
employerUrl: data.employerUrl,
jobUrl: data.url || data.jobUrl,
applicationLink: data.applicationLink,
disciplines: data.disciplines,
deadline: data.deadline,
salary: data.salary,
location: data.location,
degreeRequired: data.degreeRequired,
starting: data.starting,
jobDescription: data.jobDescription,
title: (data.title as string) || "Unknown Title",
employer: (data.employer as string) || "Unknown Employer",
employerUrl: data.employerUrl as string | undefined,
jobUrl: (data.url as string) || (data.jobUrl as string),
applicationLink: data.applicationLink as string | undefined,
disciplines:
typeof data.disciplines === "string"
? data.disciplines
: Array.isArray(data.disciplines)
? data.disciplines
.filter((value): value is string => typeof value === "string")
.join(", ")
: undefined,
deadline: data.deadline as string | undefined,
salary: data.salary as string | undefined,
location: data.location as string | undefined,
degreeRequired: data.degreeRequired as string | undefined,
starting: data.starting as string | undefined,
jobDescription: data.jobDescription as string | undefined,
});
}
return jobs;
} catch (error) {
console.error("Failed to read crawled jobs:", error);
} catch {
return [];
}
}
/**
* Clear previous crawl results.
*/
async function clearStorageDataset(): Promise<void> {
const { rm } = await import("node:fs/promises");
try {
await rm(STORAGE_DIR, { recursive: true, force: true });
} catch {
// Ignore if directory doesn't exist
// ignore
}
}

View File

@ -6,7 +6,11 @@
"target": "ES2022",
"outDir": "dist",
"noUnusedLocals": false,
"lib": ["DOM"]
"lib": ["DOM"],
"baseUrl": ".",
"paths": {
"@shared/*": ["../../shared/src/*"]
}
},
"include": ["./src/**/*"]
}

View File

@ -0,0 +1,94 @@
import { resolveSearchCities } from "@shared/search-cities.js";
import type {
ExtractorManifest,
ExtractorProgressEvent,
} from "@shared/types/extractors";
import { runHiringCafe } from "./src/run";
function toProgress(event: {
type: string;
termIndex: number;
termTotal: number;
searchTerm: string;
pageNo?: number;
totalCollected?: number;
}): ExtractorProgressEvent {
if (event.type === "term_start") {
return {
phase: "list",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
currentUrl: event.searchTerm,
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
};
}
if (event.type === "page_fetched") {
const pageNo = (event.pageNo ?? 0) + 1;
const totalCollected = event.totalCollected ?? 0;
return {
phase: "list",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: pageNo,
jobPagesEnqueued: totalCollected,
jobPagesProcessed: totalCollected,
currentUrl: `page ${pageNo}`,
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${pageNo} (${totalCollected} collected)`,
};
}
return {
phase: "list",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
currentUrl: event.searchTerm,
detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
};
}
export const manifest: ExtractorManifest = {
id: "hiringcafe",
displayName: "Hiring Cafe",
providesSources: ["hiringcafe"],
async run(context) {
if (context.shouldCancel?.()) {
return { success: true, jobs: [] };
}
const maxJobsPerTerm = context.settings.jobspyResultsWanted
? parseInt(context.settings.jobspyResultsWanted, 10)
: 200;
const result = await runHiringCafe({
country: context.selectedCountry,
countryKey: context.selectedCountry,
searchTerms: context.searchTerms,
locations: resolveSearchCities({
single:
context.settings.searchCities ?? context.settings.jobspyLocation,
}),
maxJobsPerTerm,
onProgress: (event) => {
if (context.shouldCancel?.()) return;
context.onProgress?.(toProgress(event));
},
});
if (!result.success) {
return {
success: false,
jobs: [],
error: result.error,
};
}
return {
success: true,
jobs: result.jobs,
};
},
};
export default manifest;

View File

@ -4,26 +4,22 @@ import { createRequire } from "node:module";
import { dirname, join } from "node:path";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
import { logger } from "@infra/logger";
import { sanitizeUnknown } from "@infra/sanitize";
import { normalizeCountryKey } from "@shared/location-support.js";
import {
matchesRequestedCity,
parseSearchCitiesSetting,
resolveSearchCities,
shouldApplyStrictCityFilter,
} from "@shared/search-cities.js";
import type { CreateJobInput } from "@shared/types";
import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion";
import type { CreateJobInput } from "@shared/types/jobs";
import {
toNumberOrNull,
toStringOrNull,
} from "@shared/utils/type-conversion.js";
const __dirname = dirname(fileURLToPath(import.meta.url));
const HIRING_CAFE_DIR = join(__dirname, "../../../../extractors/hiringcafe");
const DATASET_PATH = join(
HIRING_CAFE_DIR,
"storage/datasets/default/jobs.json",
);
const STORAGE_DATASET_DIR = join(HIRING_CAFE_DIR, "storage/datasets/default");
const srcDir = dirname(fileURLToPath(import.meta.url));
const EXTRACTOR_DIR = join(srcDir, "..");
const DATASET_PATH = join(EXTRACTOR_DIR, "storage/datasets/default/jobs.json");
const STORAGE_DATASET_DIR = join(EXTRACTOR_DIR, "storage/datasets/default");
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
const require = createRequire(import.meta.url);
const TSX_CLI_PATH = resolveTsxCliPath();
@ -76,20 +72,6 @@ export function shouldApplyStrictLocationFilter(
return shouldApplyStrictCityFilter(location, countryKey);
}
export function matchesRequestedLocation(
jobLocation: string | undefined,
requestedLocation: string,
): boolean {
return matchesRequestedCity(jobLocation, requestedLocation);
}
function resolveLocations(options: RunHiringCafeOptions): string[] {
const raw = options.locations?.length
? options.locations
: parseSearchCitiesSetting(process.env.HIRING_CAFE_LOCATION_QUERY ?? "");
return raw.map((value) => value.trim()).filter(Boolean);
}
function resolveTsxCliPath(): string | null {
try {
return require.resolve("tsx/dist/cli.mjs");
@ -105,7 +87,6 @@ function canRunNpmCommand(): boolean {
function parseProgressLine(line: string): HiringCafeProgressEvent | null {
if (!line.startsWith(JOBOPS_PROGRESS_PREFIX)) return null;
const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim();
let parsed: Record<string, unknown>;
@ -119,10 +100,7 @@ function parseProgressLine(line: string): HiringCafeProgressEvent | null {
const termIndex = toNumberOrNull(parsed.termIndex);
const termTotal = toNumberOrNull(parsed.termTotal);
const searchTerm = toStringOrNull(parsed.searchTerm) ?? "";
if (!event || termIndex === null || termTotal === null) {
return null;
}
if (!event || termIndex === null || termTotal === null) return null;
if (event === "term_start") {
return { type: "term_start", termIndex, termTotal, searchTerm };
@ -131,7 +109,6 @@ function parseProgressLine(line: string): HiringCafeProgressEvent | null {
if (event === "page_fetched") {
const pageNo = toNumberOrNull(parsed.pageNo);
if (pageNo === null) return null;
return {
type: "page_fetched",
termIndex,
@ -182,20 +159,15 @@ async function readDataset(): Promise<CreateJobInput[]> {
const jobs: CreateJobInput[] = [];
const seen = new Set<string>();
for (const value of parsed) {
if (!value || typeof value !== "object" || Array.isArray(value)) continue;
const mapped = mapHiringCafeRow(value as HiringCafeRawJob);
if (!mapped) continue;
const dedupeKey = mapped.sourceJobId || mapped.jobUrl;
if (seen.has(dedupeKey)) continue;
seen.add(dedupeKey);
jobs.push(mapped);
}
return jobs;
}
@ -218,7 +190,10 @@ export async function runHiringCafe(
1,
Math.floor(options.locationRadiusMiles ?? 1),
);
const locations = resolveLocations(options);
const locations = resolveSearchCities({
list: options.locations,
env: process.env.HIRING_CAFE_LOCATION_QUERY,
});
const runLocations = locations.length > 0 ? locations : [null];
const termTotal = searchTerms.length * runLocations.length;
@ -259,7 +234,7 @@ export async function runHiringCafe(
const child = useNpmCommand
? spawn("npm", ["run", "start"], {
cwd: HIRING_CAFE_DIR,
cwd: EXTRACTOR_DIR,
stdio: ["ignore", "pipe", "pipe"],
env: extractorEnv,
})
@ -272,7 +247,7 @@ export async function runHiringCafe(
}
return spawn(process.execPath, [tsxCliPath, "src/main.ts"], {
cwd: HIRING_CAFE_DIR,
cwd: EXTRACTOR_DIR,
stdio: ["ignore", "pipe", "pipe"],
env: extractorEnv,
});
@ -314,11 +289,7 @@ export async function runHiringCafe(
});
const runJobs = await readDataset();
const filtered = strictLocationFilter
? runJobs.filter((job) =>
matchesRequestedLocation(job.location, location),
)
: runJobs;
const filtered = runJobs;
for (const job of filtered) {
const key = job.sourceJobId || job.jobUrl;
@ -331,10 +302,6 @@ export async function runHiringCafe(
return { success: true, jobs };
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
logger.warn("Hiring Cafe extractor run failed", {
error: message,
details: sanitizeUnknown(error),
});
return { success: false, jobs: [], error: message };
}
}

View File

@ -0,0 +1,15 @@
import { describe, expect, it } from "vitest";
import { shouldApplyStrictLocationFilter } from "../src/run";
describe("hiringcafe location query strictness", () => {
it("enables strict filtering when city differs from country", () => {
expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe(
true,
);
});
it("disables strict filtering when location is country-level", () => {
expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false);
expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false);
});
});

View File

@ -1,13 +1,17 @@
{
"compilerOptions": {
"module": "NodeNext",
"moduleResolution": "NodeNext",
"module": "ESNext",
"moduleResolution": "bundler",
"target": "ES2022",
"outDir": "dist",
"strict": true,
"noUnusedLocals": false,
"lib": ["ES2022", "DOM"],
"types": ["node"]
"types": ["node"],
"baseUrl": ".",
"paths": {
"@shared/*": ["../../shared/src/*"]
}
},
"include": ["./src/**/*"]
}

View File

@ -0,0 +1,74 @@
import type {
ExtractorManifest,
ExtractorRuntimeContext,
} from "@shared/types/extractors";
import { runJobSpy } from "./src/run";
type JobSpySite = NonNullable<Parameters<typeof runJobSpy>[0]["sites"]>[number];
const JOBSPY_SOURCES = new Set<JobSpySite>(["indeed", "linkedin", "glassdoor"]);
function isJobSpySite(source: string): source is JobSpySite {
return JOBSPY_SOURCES.has(source as JobSpySite);
}
export const manifest: ExtractorManifest = {
id: "jobspy",
displayName: "JobSpy",
providesSources: ["indeed", "linkedin", "glassdoor"],
async run(context: ExtractorRuntimeContext) {
if (context.shouldCancel?.()) {
return { success: true, jobs: [] };
}
const sites = context.selectedSources.filter(isJobSpySite);
const result = await runJobSpy({
sites,
searchTerms: context.searchTerms,
location:
context.settings.searchCities ?? context.settings.jobspyLocation,
resultsWanted: context.settings.jobspyResultsWanted
? parseInt(context.settings.jobspyResultsWanted, 10)
: undefined,
countryIndeed: context.settings.jobspyCountryIndeed,
onProgress: (event) => {
if (context.shouldCancel?.()) return;
if (event.type === "term_start") {
context.onProgress?.({
phase: "list",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
currentUrl: event.searchTerm,
detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
context.onProgress?.({
phase: "list",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
currentUrl: event.searchTerm,
detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`,
});
},
});
if (!result.success) {
return {
success: false,
jobs: [],
error: result.error,
};
}
return {
success: true,
jobs: result.jobs,
};
},
};
export default manifest;

View File

@ -1,26 +1,19 @@
/**
* Service for scraping jobs via JobSpy (Indeed/LinkedIn/etc) and mapping them into our DB shape.
*
* Uses a small Python wrapper script that writes both CSV + JSON to disk; we ingest the JSON.
*/
import { spawn } from "node:child_process";
import { mkdir, readFile, unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
import { resolveSearchCities } from "@shared/search-cities.js";
import type { CreateJobInput, JobSource } from "@shared/types/jobs";
import {
matchesRequestedCity,
parseSearchCitiesSetting,
shouldApplyStrictCityFilter,
} from "@shared/search-cities.js";
import type { CreateJobInput, JobSource } from "@shared/types";
import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion";
import { getDataDir } from "../config/dataDir";
toNumberOrNull,
toStringOrNull,
} from "@shared/utils/type-conversion.js";
const __dirname = dirname(fileURLToPath(import.meta.url));
const JOBSPY_DIR = join(__dirname, "../../../../extractors/jobspy");
const JOBSPY_SCRIPT = join(JOBSPY_DIR, "scrape_jobs.py");
const srcDir = dirname(fileURLToPath(import.meta.url));
const EXTRACTOR_DIR = join(srcDir, "..");
const JOBSPY_SCRIPT = join(EXTRACTOR_DIR, "scrape_jobs.py");
const OUTPUT_DIR = join(EXTRACTOR_DIR, "storage/imports");
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
export type JobSpyProgressEvent =
@ -57,12 +50,7 @@ export function parseJobSpyProgressLine(
if (!eventName || termIndex === null || termTotal === null) return null;
if (eventName === "term_start") {
return {
type: "term_start",
termIndex,
termTotal,
searchTerm,
};
return { type: "term_start", termIndex, termTotal, searchTerm };
}
if (eventName === "term_complete") {
return {
@ -73,15 +61,9 @@ export function parseJobSpyProgressLine(
jobsFoundTerm: toNumberOrNull(parsed.jobsFoundTerm) ?? 0,
};
}
return null;
}
function getPythonPath(): string {
if (process.env.PYTHON_PATH) return process.env.PYTHON_PATH;
return process.platform === "win32" ? "python" : "python3";
}
function toBooleanOrNull(value: unknown): boolean | null {
if (value === null || value === undefined) return null;
if (typeof value === "boolean") return value;
@ -123,19 +105,14 @@ function formatSalary(params: {
const { minAmount, maxAmount, currency, interval } = params;
if (minAmount === null && maxAmount === null) return null;
const fmt = (n: number) => {
// Avoid locale ambiguity; keep it simple.
const rounded = Math.round(n);
return `${rounded}`;
};
const fmt = (n: number) => `${Math.round(n)}`;
let range: string;
if (minAmount !== null && maxAmount !== null) {
range = `${fmt(minAmount)}-${fmt(maxAmount)}`;
} else if (minAmount !== null) {
range = `${fmt(minAmount)}+`;
} else if (maxAmount !== null) {
range = `${fmt(maxAmount)}`;
range = fmt(maxAmount);
} else {
return null;
}
@ -164,33 +141,25 @@ export interface JobSpyResult {
error?: string;
}
export function shouldApplyStrictLocationFilter(
location: string,
countryIndeed: string,
): boolean {
return shouldApplyStrictCityFilter(location, countryIndeed);
}
export function matchesRequestedLocation(
jobLocation: string | undefined,
requestedLocation: string,
): boolean {
return matchesRequestedCity(jobLocation, requestedLocation);
}
export async function runJobSpy(
options: RunJobSpyOptions = {},
): Promise<JobSpyResult> {
const dataDir = getDataDir();
const outputDir = join(dataDir, "imports");
await mkdir(outputDir, { recursive: true });
await mkdir(OUTPUT_DIR, { recursive: true });
const sites = (options.sites ?? ["indeed", "linkedin", "glassdoor"])
.filter((s) => s === "indeed" || s === "linkedin" || s === "glassdoor")
.filter(
(site) =>
site === "indeed" || site === "linkedin" || site === "glassdoor",
)
.join(",");
const searchTerms = resolveSearchTerms(options);
const locations = resolveLocations(options);
const locations = resolveSearchCities({
list: options.locations,
single: options.location,
env: process.env.JOBSPY_LOCATION,
fallback: "UK",
});
const countryIndeed =
options.countryIndeed ?? process.env.JOBSPY_COUNTRY_INDEED ?? "UK";
if (searchTerms.length === 0) {
@ -200,7 +169,6 @@ export async function runJobSpy(
try {
const jobs: CreateJobInput[] = [];
const seenJobUrls = new Set<string>();
const totalRuns = searchTerms.length * locations.length;
let runIndex = 0;
@ -208,13 +176,18 @@ export async function runJobSpy(
for (const location of locations) {
runIndex += 1;
const suffix = `${runIndex}_${slugForFilename(searchTerm)}_${slugForFilename(location)}`;
const outputCsv = join(outputDir, `jobspy_jobs_${suffix}.csv`);
const outputJson = join(outputDir, `jobspy_jobs_${suffix}.json`);
const outputCsv = join(OUTPUT_DIR, `jobspy_jobs_${suffix}.csv`);
const outputJson = join(OUTPUT_DIR, `jobspy_jobs_${suffix}.json`);
await new Promise<void>((resolve, reject) => {
const pythonPath = getPythonPath();
const pythonPath = process.env.PYTHON_PATH
? process.env.PYTHON_PATH
: process.platform === "win32"
? "python"
: "python3";
const child = spawn(pythonPath, [JOBSPY_SCRIPT], {
cwd: JOBSPY_DIR,
cwd: EXTRACTOR_DIR,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
env: {
@ -276,21 +249,11 @@ export async function runJobSpy(
const raw = await readFile(outputJson, "utf-8");
const parsed = JSON.parse(raw) as Array<Record<string, unknown>>;
const mapped = mapJobSpyRows(parsed);
const strictLocationFilter = shouldApplyStrictLocationFilter(
location,
countryIndeed,
);
const filtered = strictLocationFilter
? mapped.filter((job) =>
matchesRequestedLocation(job.location, location),
)
: mapped;
const filtered = mapJobSpyRows(parsed);
for (const job of filtered) {
const url = job.jobUrl;
if (seenJobUrls.has(url)) continue;
seenJobUrls.add(url);
if (seenJobUrls.has(job.jobUrl)) continue;
seenJobUrls.add(job.jobUrl);
jobs.push(job);
}
@ -298,7 +261,7 @@ export async function runJobSpy(
await unlink(outputJson);
await unlink(outputCsv);
} catch {
// Ignore cleanup errors
// ignore cleanup errors
}
}
}
@ -310,16 +273,6 @@ export async function runJobSpy(
}
}
function resolveLocations(options: RunJobSpyOptions): string[] {
const fromOptions = options.locations?.length ? options.locations : null;
const fromSingle = options.location?.trim();
const fromEnv = process.env.JOBSPY_LOCATION?.trim();
const raw =
fromOptions ?? parseSearchCitiesSetting(fromSingle ?? fromEnv ?? "UK");
const out = raw.map((value) => value.trim()).filter(Boolean);
return out.length > 0 ? out : ["UK"];
}
function resolveSearchTerms(options: RunJobSpyOptions): string[] {
const fromOptions = options.searchTerms?.length ? options.searchTerms : null;
const fromEnv = parseSearchTermsEnv(process.env.JOBSPY_SEARCH_TERMS);
@ -335,7 +288,6 @@ function resolveSearchTerms(options: RunJobSpyOptions): string[] {
seen.add(key);
out.push(normalized);
}
return out;
}
@ -347,7 +299,10 @@ function parseSearchTermsEnv(raw: string | undefined): string[] | null {
if (trimmed.startsWith("[")) {
try {
const parsed = JSON.parse(trimmed) as unknown;
if (Array.isArray(parsed) && parsed.every((v) => typeof v === "string")) {
if (
Array.isArray(parsed) &&
parsed.every((value) => typeof value === "string")
) {
return parsed;
}
} catch {
@ -362,7 +317,7 @@ function parseSearchTermsEnv(raw: string | undefined): string[] | null {
: ",";
const split = trimmed
.split(delimiter)
.map((t) => t.trim())
.map((term) => term.trim())
.filter(Boolean);
return split.length > 0 ? split : null;
}
@ -388,34 +343,27 @@ function mapJobSpyRows(
const jobUrl = toStringOrNull(row.job_url);
if (!jobUrl) continue;
const title = toStringOrNull(row.title) ?? "Unknown Title";
const employer = toStringOrNull(row.company) ?? "Unknown Employer";
const jobUrlDirect = toStringOrNull(row.job_url_direct);
const applicationLink = jobUrlDirect ?? jobUrl;
const minAmount = toNumberOrNull(row.min_amount);
const maxAmount = toNumberOrNull(row.max_amount);
const currency = toStringOrNull(row.currency);
const interval = toStringOrNull(row.interval);
const salary = formatSalary({ minAmount, maxAmount, currency, interval });
const jobUrlDirect = toStringOrNull(row.job_url_direct);
jobs.push({
source,
sourceJobId: toStringOrNull(row.id) ?? undefined,
jobUrlDirect: jobUrlDirect ?? undefined,
datePosted: toStringOrNull(row.date_posted) ?? undefined,
title,
employer,
title: toStringOrNull(row.title) ?? "Unknown Title",
employer: toStringOrNull(row.company) ?? "Unknown Employer",
employerUrl: toStringOrNull(row.company_url) ?? undefined,
jobUrl,
applicationLink,
applicationLink: jobUrlDirect ?? jobUrl,
location: toStringOrNull(row.location) ?? undefined,
jobDescription: toStringOrNull(row.description) ?? undefined,
salary: salary ?? undefined,
jobType: toStringOrNull(row.job_type) ?? undefined,
salarySource: toStringOrNull(row.salary_source) ?? undefined,
salaryInterval: interval ?? undefined,

View File

@ -1,9 +1,5 @@
import { describe, expect, it } from "vitest";
import {
matchesRequestedLocation,
parseJobSpyProgressLine,
shouldApplyStrictLocationFilter,
} from "./jobspy";
import { parseJobSpyProgressLine } from "../src/run";
describe("parseJobSpyProgressLine", () => {
it("parses term_start progress lines", () => {
@ -42,24 +38,3 @@ describe("parseJobSpyProgressLine", () => {
expect(parseJobSpyProgressLine("Found 20 jobs")).toBeNull();
});
});
describe("strict location filtering", () => {
it("enables strict filtering when location differs from country", () => {
expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe(
true,
);
});
it("disables strict filtering when location is country-level", () => {
expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false);
expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false);
});
it("matches location using case-insensitive contains checks", () => {
expect(matchesRequestedLocation("Leeds, England, UK", "leeds")).toBe(true);
expect(matchesRequestedLocation("Halifax, England, UK", "leeds")).toBe(
false,
);
expect(matchesRequestedLocation(undefined, "leeds")).toBe(false);
});
});

View File

@ -0,0 +1,16 @@
{
"compilerOptions": {
"module": "NodeNext",
"moduleResolution": "NodeNext",
"target": "ES2022",
"strict": true,
"noUnusedLocals": false,
"lib": ["ES2022", "DOM"],
"types": ["node"],
"baseUrl": ".",
"paths": {
"@shared/*": ["../../shared/src/*"]
}
},
"include": ["./src/**/*"]
}

View File

@ -0,0 +1,104 @@
import type {
ExtractorManifest,
ExtractorProgressEvent,
ExtractorRuntimeContext,
} from "@shared/types/extractors";
import { runUkVisaJobs } from "./src/run";
function toProgress(event: {
type: string;
termIndex: number;
termTotal: number;
searchTerm: string;
pageNo?: number;
maxPages?: number;
totalCollected?: number;
message?: string;
}): ExtractorProgressEvent {
if (event.type === "init") {
return {
phase: "list",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: 0,
listPagesTotal: event.maxPages ?? 0,
currentUrl: event.searchTerm || "all jobs",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
};
}
if (event.type === "page_fetched") {
return {
phase: "list",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo ?? 0,
listPagesTotal: event.maxPages ?? 0,
jobPagesEnqueued: event.totalCollected ?? 0,
jobPagesProcessed: event.totalCollected ?? 0,
currentUrl: `page ${event.pageNo ?? 0}/${event.maxPages ?? 0}`,
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo ?? 0}/${event.maxPages ?? 0} (${event.totalCollected ?? 0} collected)`,
};
}
if (event.type === "term_complete") {
return {
phase: "list",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
currentUrl: event.searchTerm || "all jobs",
detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
};
}
if (event.type === "empty_page") {
return {
detail: `UKVisaJobs: page ${event.pageNo ?? 0} returned no jobs`,
};
}
return {
detail: `UKVisaJobs: ${event.message ?? "unknown event"}`,
};
}
export const manifest: ExtractorManifest = {
id: "ukvisajobs",
displayName: "UK Visa Jobs",
providesSources: ["ukvisajobs"],
requiredEnvVars: ["UKVISAJOBS_EMAIL", "UKVISAJOBS_PASSWORD"],
async run(context: ExtractorRuntimeContext) {
if (context.shouldCancel?.()) {
return { success: true, jobs: [] };
}
const maxJobs = context.settings.ukvisajobsMaxJobs
? parseInt(context.settings.ukvisajobsMaxJobs, 10)
: 50;
const result = await runUkVisaJobs({
maxJobs,
searchTerms: context.searchTerms,
onProgress: (event) => {
if (context.shouldCancel?.()) return;
context.onProgress?.(toProgress(event));
},
});
if (!result.success) {
return {
success: false,
jobs: [],
error: result.error,
};
}
return {
success: true,
jobs: result.jobs,
};
},
};
export default manifest;

View File

@ -1,21 +1,36 @@
/**
* Service for running the UK Visa Jobs extractor (extractors/ukvisajobs).
*
* Spawns the extractor as a child process and reads its output dataset.
*/
import { spawn } from "node:child_process";
import { mkdir, readdir, readFile, rm } from "node:fs/promises";
import { dirname, join } from "node:path";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
import type { CreateJobInput } from "@shared/types";
import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion";
const __dirname = dirname(fileURLToPath(import.meta.url));
const UKVISAJOBS_DIR = join(__dirname, "../../../../extractors/ukvisajobs");
const STORAGE_DIR = join(UKVISAJOBS_DIR, "storage/datasets/default");
const AUTH_CACHE_PATH = join(UKVISAJOBS_DIR, "storage/ukvisajobs-auth.json");
type CreateJobInput = {
source: "ukvisajobs";
sourceJobId?: string;
title: string;
employer: string;
employerUrl?: string;
jobUrl: string;
applicationLink?: string;
location?: string;
deadline?: string;
salary?: string;
jobDescription?: string;
datePosted?: string;
degreeRequired?: string;
jobType?: string;
jobLevel?: string;
};
import {
toNumberOrNull,
toStringOrNull,
} from "@shared/utils/type-conversion.js";
const srcDir = dirname(fileURLToPath(import.meta.url));
const EXTRACTOR_DIR = join(srcDir, "..");
const STORAGE_DIR = join(EXTRACTOR_DIR, "storage/datasets/default");
const AUTH_CACHE_PATH = join(EXTRACTOR_DIR, "storage/ukvisajobs-auth.json");
const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
let isUkVisaJobsRunning = false;
@ -27,13 +42,9 @@ interface UkVisaJobsAuthSession {
}
export interface RunUkVisaJobsOptions {
/** Maximum number of jobs to fetch per search term. Defaults to 50, max 200. */
maxJobs?: number;
/** Search keyword filter (single) - legacy support */
searchKeyword?: string;
/** List of search terms to run sequentially */
searchTerms?: string[];
/** Optional callback for structured progress emitted by extractor runs. */
onProgress?: (event: UkVisaJobsProgressEvent) => void;
}
@ -170,14 +181,9 @@ export function parseUkVisaJobsProgressLine(
return null;
}
/**
* Basic HTML to text conversion to extract job description.
*/
function cleanHtml(html: string): string {
// Remove script, style tags and their content
let text = html.replace(/<(script|style)[^>]*>[\s\S]*?<\/\1>/gi, "");
// Try to extract content between <main> tags if present, or fallback to body
const mainMatch = html.match(/<main[^>]*>([\s\S]*?)<\/main>/i);
const bodyMatch = html.match(/<body[^>]*>([\s\S]*?)<\/body>/i);
if (mainMatch) {
@ -186,21 +192,15 @@ function cleanHtml(html: string): string {
text = bodyMatch[1];
}
// Remove remaining HTML tags
text = text.replace(/<[^>]+>/g, " ");
// Unescape common entities
text = text
.replace(/&nbsp;/g, " ")
.replace(/&amp;/g, "&")
.replace(/&lt;/g, "<")
.replace(/&gt;/g, ">")
.replace(/&quot;/g, '"');
// Normalize whitespace
text = text.replace(/\s+/g, " ").trim();
// Limit length to avoid blowing up AI context
if (text.length > 8000) {
text = `${text.substring(0, 8000)}...`;
}
@ -208,19 +208,16 @@ function cleanHtml(html: string): string {
return text;
}
/**
* Fetch job description from the job URL.
*/
async function fetchJobDescription(url: string): Promise<string | null> {
try {
console.log(` Fetching description from ${url}...`);
const authSession = await loadCachedAuthSession();
const cookieParts: string[] = [];
if (authSession?.csrfToken)
if (authSession?.csrfToken) {
cookieParts.push(`csrf_token=${authSession.csrfToken}`);
if (authSession?.ciSession)
}
if (authSession?.ciSession) {
cookieParts.push(`ci_session=${authSession.ciSession}`);
}
const token = authSession?.authToken || authSession?.token;
if (token) cookieParts.push(`authToken=${token}`);
@ -235,20 +232,14 @@ async function fetchJobDescription(url: string): Promise<string | null> {
const response = await fetch(url, {
headers,
signal: AbortSignal.timeout(10000), // 10s timeout
signal: AbortSignal.timeout(10000),
});
if (!response.ok) return null;
const html = await response.text();
const cleaned = cleanHtml(html);
// If we only got a tiny bit of text, it might have failed
return cleaned.length > 100 ? cleaned : null;
} catch (error) {
console.warn(
` ⚠️ Failed to fetch description: ${error instanceof Error ? error.message : "Unknown error"}`,
);
} catch {
return null;
}
}
@ -262,14 +253,11 @@ async function loadCachedAuthSession(): Promise<UkVisaJobsAuthSession | null> {
}
}
/**
* Clear previous extraction results.
*/
async function clearStorageDataset(): Promise<void> {
try {
await rm(STORAGE_DIR, { recursive: true, force: true });
} catch {
// Ignore if directory doesn't exist
// ignore
}
}
@ -286,16 +274,12 @@ export async function runUkVisaJobs(
isUkVisaJobsRunning = true;
try {
console.log("🇬🇧 Running UK Visa Jobs extractor...");
// Determine terms to run
const terms: string[] = [];
if (options.searchTerms && options.searchTerms.length > 0) {
terms.push(...options.searchTerms);
} else if (options.searchKeyword) {
terms.push(options.searchKeyword);
} else {
// No search terms = run once without keyword
terms.push("");
}
@ -303,21 +287,17 @@ export async function runUkVisaJobs(
const seenIds = new Set<string>();
const termTotal = terms.length;
for (let i = 0; i < terms.length; i++) {
for (let i = 0; i < terms.length; i += 1) {
const term = terms[i];
const termLabel = term ? `"${term}"` : "all jobs";
const termIndex = i + 1;
console.log(` Running for ${termLabel}...`);
try {
// Clear previous results for this run
await clearStorageDataset();
await mkdir(STORAGE_DIR, { recursive: true });
// Run the extractor
await new Promise<void>((resolve, reject) => {
const child = spawn("npx", ["tsx", "src/main.ts"], {
cwd: UKVISAJOBS_DIR,
cwd: EXTRACTOR_DIR,
stdio: ["ignore", "pipe", "pipe"],
env: {
...process.env,
@ -355,59 +335,51 @@ export async function runUkVisaJobs(
stdoutRl?.close();
stderrRl?.close();
if (code === 0) resolve();
else
else {
reject(
new Error(`UK Visa Jobs extractor exited with code ${code}`),
);
}
});
child.on("error", reject);
});
// Read the output dataset and accumulate
const runJobs = await readDataset();
let newCount = 0;
let jobsFoundTerm = 0;
for (const job of runJobs) {
// Deduplicate by sourceJobId or jobUrl
const id = job.sourceJobId || job.jobUrl;
if (!seenIds.has(id)) {
seenIds.add(id);
if (seenIds.has(id)) continue;
seenIds.add(id);
// Enrich description if missing or poor
const isPoorDescription =
!job.jobDescription ||
job.jobDescription.length < 100 ||
job.jobDescription.startsWith("Visa sponsorship info:");
const isPoorDescription =
!job.jobDescription ||
job.jobDescription.length < 100 ||
job.jobDescription.startsWith("Visa sponsorship info:");
if (isPoorDescription && job.jobUrl) {
const enriched = await fetchJobDescription(job.jobUrl);
if (enriched) {
job.jobDescription = enriched;
}
// Small delay to avoid hammering the server
await new Promise((resolve) => setTimeout(resolve, 500));
if (isPoorDescription && job.jobUrl) {
const enriched = await fetchJobDescription(job.jobUrl);
if (enriched) {
job.jobDescription = enriched;
}
allJobs.push(job);
newCount++;
await new Promise((resolve) => setTimeout(resolve, 500));
}
allJobs.push(job);
jobsFoundTerm += 1;
}
console.log(
` ✅ Fetched ${runJobs.length} jobs for ${termLabel} (${newCount} new unique)`,
);
options.onProgress?.({
type: "term_complete",
termIndex,
termTotal,
searchTerm: term,
jobsFoundTerm: newCount,
jobsFoundTerm,
totalCollected: allJobs.length,
});
} catch (error) {
const message =
error instanceof Error ? error.message : "Unknown error";
console.error(`❌ UK Visa Jobs failed for ${termLabel}: ${message}`);
options.onProgress?.({
type: "error",
termIndex,
@ -415,66 +387,58 @@ export async function runUkVisaJobs(
searchTerm: term,
message,
});
// Continue to next term instead of failing completely
}
// Delay between terms
if (i < terms.length - 1) {
console.log(" Waiting 5s before next search term...");
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
console.log(
`✅ UK Visa Jobs: imported total ${allJobs.length} unique jobs`,
);
return { success: true, jobs: allJobs };
} finally {
isUkVisaJobsRunning = false;
}
}
/**
* Read jobs from the extractor's output dataset.
*/
async function readDataset(): Promise<CreateJobInput[]> {
const jobs: CreateJobInput[] = [];
try {
const files = await readdir(STORAGE_DIR);
const jsonFiles = files.filter(
(f) => f.endsWith(".json") && f !== "jobs.json",
(file) => file.endsWith(".json") && file !== "jobs.json",
);
for (const file of jsonFiles.sort()) {
try {
const content = await readFile(join(STORAGE_DIR, file), "utf-8");
const job = JSON.parse(content);
const job = JSON.parse(content) as Record<string, unknown>;
// Map to CreateJobInput format
jobs.push({
source: "ukvisajobs",
sourceJobId: job.sourceJobId,
title: job.title || "Unknown Title",
employer: job.employer || "Unknown Employer",
employerUrl: job.employerUrl,
jobUrl: job.jobUrl,
applicationLink: job.applicationLink || job.jobUrl,
location: job.location,
deadline: job.deadline,
salary: job.salary,
jobDescription: job.jobDescription,
datePosted: job.datePosted,
degreeRequired: job.degreeRequired,
jobType: job.jobType,
jobLevel: job.jobLevel,
sourceJobId: job.sourceJobId as string | undefined,
title: (job.title as string) || "Unknown Title",
employer: (job.employer as string) || "Unknown Employer",
employerUrl: job.employerUrl as string | undefined,
jobUrl: job.jobUrl as string,
applicationLink:
(job.applicationLink as string | undefined) ||
(job.jobUrl as string),
location: job.location as string | undefined,
deadline: job.deadline as string | undefined,
salary: job.salary as string | undefined,
jobDescription: job.jobDescription as string | undefined,
datePosted: job.datePosted as string | undefined,
degreeRequired: job.degreeRequired as string | undefined,
jobType: job.jobType as string | undefined,
jobLevel: job.jobLevel as string | undefined,
});
} catch {
// Skip invalid files
// ignore invalid file
}
}
} catch {
// Dataset directory doesn't exist yet
// ignore missing dir
}
return jobs;

View File

@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { parseUkVisaJobsProgressLine } from "./ukvisajobs";
import { parseUkVisaJobsProgressLine } from "../src/run";
describe("parseUkVisaJobsProgressLine", () => {
it("parses init events", () => {

View File

@ -2,6 +2,10 @@
* Live pipeline progress display component.
*/
import {
sourceLabel as getSourceLabel,
isExtractorSourceId,
} from "@shared/extractors";
import { Loader2 } from "lucide-react";
import type React from "react";
import { useEffect, useMemo, useState } from "react";
@ -24,13 +28,7 @@ interface PipelineProgress {
| "failed";
message: string;
detail?: string;
crawlingSource:
| "gradcracker"
| "jobspy"
| "ukvisajobs"
| "adzuna"
| "hiringcafe"
| null;
crawlingSource: string | null;
crawlingSourcesCompleted: number;
crawlingSourcesTotal: number;
crawlingTermsProcessed: number;
@ -83,20 +81,15 @@ const stepBadgeClasses: Record<PipelineProgress["step"], string> = {
failed: "bg-destructive/10 text-destructive border-destructive/20",
};
const sourceLabel: Record<
Exclude<PipelineProgress["crawlingSource"], null>,
string
> = {
gradcracker: "Gradcracker",
jobspy: "JobSpy",
ukvisajobs: "UKVisaJobs",
adzuna: "Adzuna",
hiringcafe: "Hiring Cafe",
};
const clamp = (value: number, min: number, max: number) =>
Math.max(min, Math.min(max, value));
function resolveSourceLabel(source: string): string {
if (source === "jobspy") return "JobSpy";
if (isExtractorSourceId(source)) return getSourceLabel(source);
return source;
}
export const PipelineProgress: React.FC<PipelineProgressProps> = ({
isRunning,
}) => {
@ -257,7 +250,7 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({
<p className="text-xs text-muted-foreground">
Source:{" "}
{progress.crawlingSource
? sourceLabel[progress.crawlingSource]
? resolveSourceLabel(progress.crawlingSource)
: "starting"}
{" "}({progress.crawlingSourcesCompleted}/
{Math.max(progress.crawlingSourcesTotal, 0)})

View File

@ -5,6 +5,7 @@
* Ghosted/no-reply and rejected outcomes are both excluded from the numerator.
*/
import { isExtractorSourceId, sourceLabel } from "@shared/extractors";
import type { JobSource, StageEvent } from "@shared/types.js";
import { useMemo, useState } from "react";
import {
@ -64,17 +65,6 @@ const RESPONSE_STAGES = new Set([
"offer",
]);
const SOURCE_LABELS: Record<JobSource, string> = {
gradcracker: "Gradcracker",
indeed: "Indeed",
linkedin: "LinkedIn",
glassdoor: "Glassdoor",
ukvisajobs: "UKVisaJobs",
adzuna: "Adzuna",
hiringcafe: "HiringCafe",
manual: "Manual",
};
const BAR_COLORS = [
"#3b82f6",
"#8b5cf6",
@ -110,7 +100,7 @@ const buildResponseRateBySource = (
return Array.from(bySource.entries())
.map(([source, { applied, responded }]) => ({
source: `${SOURCE_LABELS[source] ?? source} (${applied})`,
source: `${isExtractorSourceId(source) ? sourceLabel(source) : source} (${applied})`,
applied,
responded,
rate: applied > 0 ? (responded / applied) * 100 : 0,

View File

@ -1,3 +1,4 @@
import { EXTRACTOR_SOURCE_METADATA } from "@shared/extractors";
import {
formatCountryLabel,
isSourceAllowedForCountry,
@ -77,7 +78,6 @@ const GLASSDOOR_COUNTRY_REASON =
"Glassdoor is not available for the selected country.";
const GLASSDOOR_LOCATION_REASON =
"Add at least one city in Advanced settings to enable Glassdoor.";
const UK_ONLY_SOURCES = new Set<JobSource>(["gradcracker", "ukvisajobs"]);
const HIDDEN_COUNTRY_KEYS = new Set(["usa/ca"]);
function normalizeUiCountryKey(value: string): string {
@ -95,7 +95,7 @@ function getSourceDisabledReason(
? GLASSDOOR_LOCATION_REASON
: GLASSDOOR_COUNTRY_REASON;
}
if (UK_ONLY_SOURCES.has(source)) {
if (EXTRACTOR_SOURCE_METADATA[source]?.ukOnly) {
return `${sourceLabel[source]} is available only when country is United Kingdom.`;
}
return `${sourceLabel[source]} is not available for the selected country.`;

View File

@ -1,3 +1,8 @@
import {
EXTRACTOR_SOURCE_IDS,
EXTRACTOR_SOURCE_METADATA,
PIPELINE_EXTRACTOR_SOURCE_IDS,
} from "@shared/extractors";
import type { JobSource, JobStatus } from "@shared/types";
export const DEFAULT_PIPELINE_SOURCES: JobSource[] = [
@ -9,15 +14,17 @@ export const DEFAULT_PIPELINE_SOURCES: JobSource[] = [
export const PIPELINE_SOURCES_STORAGE_KEY = "jobops.pipeline.sources";
export const orderedSources: JobSource[] = [
"gradcracker",
"indeed",
"linkedin",
"glassdoor",
"adzuna",
"hiringcafe",
"ukvisajobs",
];
export const orderedFilterSources: JobSource[] = [...orderedSources, "manual"];
...PIPELINE_EXTRACTOR_SOURCE_IDS,
].sort(
(left, right) =>
EXTRACTOR_SOURCE_METADATA[left].order -
EXTRACTOR_SOURCE_METADATA[right].order,
);
export const orderedFilterSources: JobSource[] = [...EXTRACTOR_SOURCE_IDS].sort(
(left, right) =>
EXTRACTOR_SOURCE_METADATA[left].order -
EXTRACTOR_SOURCE_METADATA[right].order,
);
export const statusTokens: Record<
JobStatus,

View File

@ -168,8 +168,7 @@ export const getSourcesWithJobs = (jobs: JobListItem[]): JobSource[] => {
export const getEnabledSources = (
settings: AppSettings | null,
): JobSource[] => {
if (!settings)
return [...DEFAULT_PIPELINE_SOURCES, "glassdoor", "hiringcafe"];
if (!settings) return [...orderedSources];
const enabled: JobSource[] = [];
const hasUkVisaJobsAuth = Boolean(

View File

@ -1,3 +1,7 @@
import {
EXTRACTOR_SOURCE_IDS,
sourceLabel as getExtractorSourceLabel,
} from "@shared/extractors";
import type { Job } from "@shared/types";
import { type ClassValue, clsx } from "clsx";
import { twMerge } from "tailwind-merge";
@ -137,13 +141,11 @@ export const formatJobForWebhook = (job: Job) => {
);
};
export const sourceLabel: Record<Job["source"], string> = {
gradcracker: "Gradcracker",
indeed: "Indeed",
linkedin: "LinkedIn",
glassdoor: "Glassdoor",
ukvisajobs: "UK Visa Jobs",
adzuna: "Adzuna",
hiringcafe: "Hiring Cafe",
manual: "Manual",
};
export const sourceLabel: Record<Job["source"], string> =
EXTRACTOR_SOURCE_IDS.reduce(
(acc, source) => {
acc[source] = getExtractorSourceLabel(source);
return acc;
},
{} as Record<Job["source"], string>,
);

View File

@ -1,12 +1,23 @@
import { AppError, badRequest, conflict, requestTimeout } from "@infra/errors";
import {
AppError,
badRequest,
conflict,
requestTimeout,
serviceUnavailable,
} from "@infra/errors";
import { fail, ok, okWithMeta } from "@infra/http";
import { logger } from "@infra/logger";
import { runWithRequestContext } from "@infra/request-context";
import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse";
import { PIPELINE_EXTRACTOR_SOURCE_IDS } from "@shared/extractors";
import type { PipelineStatusResponse } from "@shared/types";
import { type Request, type Response, Router } from "express";
import { z } from "zod";
import { isDemoMode } from "../../config/demo";
import {
type ExtractorRegistry,
getExtractorRegistry,
} from "../../extractors/registry";
import {
getPipelineStatus,
requestPipelineCancel,
@ -94,15 +105,12 @@ const runPipelineSchema = z.object({
minSuitabilityScore: z.number().min(0).max(100).optional(),
sources: z
.array(
z.enum([
"gradcracker",
"indeed",
"linkedin",
"glassdoor",
"ukvisajobs",
"adzuna",
"hiringcafe",
]),
z.enum(
PIPELINE_EXTRACTOR_SOURCE_IDS as [
(typeof PIPELINE_EXTRACTOR_SOURCE_IDS)[number],
...(typeof PIPELINE_EXTRACTOR_SOURCE_IDS)[number][],
],
),
)
.min(1)
.optional(),
@ -111,6 +119,38 @@ const runPipelineSchema = z.object({
pipelineRouter.post("/run", async (req: Request, res: Response) => {
try {
const config = runPipelineSchema.parse(req.body);
if (config.sources && config.sources.length > 0) {
let registry: ExtractorRegistry;
try {
registry = await getExtractorRegistry();
} catch (error) {
logger.error(
"Extractor registry unavailable during source validation",
{
route: "/api/pipeline/run",
error,
},
);
return fail(
res,
serviceUnavailable(
"Extractor registry is unavailable. Try again after fixing startup errors.",
),
);
}
const unavailableSources = config.sources.filter(
(source) => !registry.manifestBySource.has(source),
);
if (unavailableSources.length > 0) {
return fail(
res,
badRequest(
`Requested sources are not available at runtime: ${unavailableSources.join(", ")}`,
{ unavailableSources },
),
);
}
}
if (isDemoMode()) {
const simulated = await simulatePipelineRun(config);

View File

@ -32,20 +32,7 @@ export const jobs = sqliteTable("jobs", {
id: text("id").primaryKey(),
// From crawler
source: text("source", {
enum: [
"gradcracker",
"indeed",
"linkedin",
"glassdoor",
"ukvisajobs",
"adzuna",
"hiringcafe",
"manual",
],
})
.notNull()
.default("gradcracker"),
source: text("source").notNull().default("gradcracker"),
sourceJobId: text("source_job_id"),
jobUrlDirect: text("job_url_direct"),
datePosted: text("date_posted"),

View File

@ -0,0 +1,107 @@
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { discoverManifestPaths, loadManifestFromFile } from "./discovery";
const tempRoots: string[] = [];
async function makeTempRoot(): Promise<string> {
const testTmpBase = join(process.cwd(), "orchestrator", ".tmp");
await mkdir(testTmpBase, { recursive: true });
const tempDir = await mkdtemp(join(testTmpBase, "extractor-discovery-"));
const root = join(tempDir, "extractors");
await mkdir(root, { recursive: true });
tempRoots.push(tempDir);
return root;
}
afterEach(async () => {
for (const root of tempRoots.splice(0)) {
await rm(root, { recursive: true, force: true });
}
});
describe("extractor discovery", () => {
it("finds manifest.ts and src/manifest.ts files", async () => {
const root = await makeTempRoot();
await mkdir(join(root, "adzuna", "src"), { recursive: true });
await mkdir(join(root, "jobspy"), { recursive: true });
await writeFile(
join(root, "adzuna", "src", "manifest.ts"),
"export const manifest = { id: 'adzuna', displayName: 'Adzuna', providesSources: ['adzuna'], async run() { return { success: true, jobs: [] }; } };",
"utf8",
);
await writeFile(
join(root, "jobspy", "manifest.ts"),
"export default { id: 'jobspy', displayName: 'JobSpy', providesSources: ['indeed'], async run() { return { success: true, jobs: [] }; } };",
"utf8",
);
const found = await discoverManifestPaths(root);
expect(found).toEqual([
join(root, "adzuna", "src", "manifest.ts"),
join(root, "jobspy", "manifest.ts"),
]);
});
it("returns empty list when extractor root does not exist", async () => {
const root = join(tmpdir(), `missing-extractors-${Date.now()}`);
await expect(discoverManifestPaths(root)).resolves.toEqual([]);
});
it("returns empty list when root is not named extractors", async () => {
const root = await makeTempRoot();
const invalidRoot = join(root, "..");
await expect(discoverManifestPaths(invalidRoot)).resolves.toEqual([]);
});
it("loads and validates manifest modules", async () => {
const root = await makeTempRoot();
const validPath = join(root, "valid-manifest.mjs");
await writeFile(
validPath,
"export const manifest = { id: 'valid', displayName: 'Valid', providesSources: ['indeed'], requiredEnvVars: ['A'], async run() { return { success: true, jobs: [] }; } };",
"utf8",
);
const manifest = await loadManifestFromFile(validPath);
expect(manifest.id).toBe("valid");
expect(manifest.providesSources).toEqual(["indeed"]);
expect(manifest.requiredEnvVars).toEqual(["A"]);
});
it("prefers named manifest export when default is a wrapper object", async () => {
const root = await makeTempRoot();
const wrappedPath = join(root, "wrapped-manifest.mjs");
await writeFile(
wrappedPath,
[
"const valid = { id: 'wrapped', displayName: 'Wrapped', providesSources: ['indeed'], async run() { return { success: true, jobs: [] }; } };",
"export const manifest = valid;",
"export default { default: valid, manifest: valid };",
].join("\n"),
"utf8",
);
const manifest = await loadManifestFromFile(wrappedPath);
expect(manifest.id).toBe("wrapped");
expect(manifest.providesSources).toEqual(["indeed"]);
});
it("throws for invalid manifest exports", async () => {
const root = await makeTempRoot();
const invalidPath = join(root, "invalid-manifest.mjs");
await writeFile(
invalidPath,
"export default { id: 'invalid', displayName: 'Invalid', providesSources: ['indeed'] };",
"utf8",
);
await expect(loadManifestFromFile(invalidPath)).rejects.toThrow(
"Invalid manifest export",
);
});
});

View File

@ -0,0 +1,111 @@
import type { Dirent } from "node:fs";
import { access, readdir, stat } from "node:fs/promises";
import { basename, dirname, join, resolve } from "node:path";
import { fileURLToPath, pathToFileURL } from "node:url";
import type { ExtractorManifest } from "@shared/types";
const moduleDir = dirname(fileURLToPath(import.meta.url));
const DEFAULT_EXTRACTORS_ROOT = resolve(process.cwd(), "../extractors");
const MODULE_RELATIVE_EXTRACTORS_ROOT = resolve(
moduleDir,
"../../../../extractors",
);
const MANIFEST_CANDIDATES = ["manifest.ts", "src/manifest.ts"] as const;
async function fileExists(path: string): Promise<boolean> {
try {
await access(path);
return true;
} catch {
return false;
}
}
async function directoryExists(path: string): Promise<boolean> {
try {
const info = await stat(path);
return info.isDirectory();
} catch {
return false;
}
}
async function resolveExtractorsRoot(): Promise<string> {
if (await directoryExists(DEFAULT_EXTRACTORS_ROOT)) {
return DEFAULT_EXTRACTORS_ROOT;
}
if (await directoryExists(MODULE_RELATIVE_EXTRACTORS_ROOT)) {
return MODULE_RELATIVE_EXTRACTORS_ROOT;
}
return DEFAULT_EXTRACTORS_ROOT;
}
export async function discoverManifestPaths(
extractorsRoot?: string,
): Promise<string[]> {
const root = extractorsRoot ?? (await resolveExtractorsRoot());
if (basename(root) !== "extractors") {
return [];
}
let entries: Dirent[] = [];
try {
entries = await readdir(root, { withFileTypes: true });
} catch (error) {
const known = error as NodeJS.ErrnoException;
if (known.code === "ENOENT") return [];
throw error;
}
const paths: string[] = [];
for (const entry of entries) {
if (!entry.isDirectory()) continue;
for (const candidate of MANIFEST_CANDIDATES) {
const fullPath = join(root, entry.name, candidate);
if (await fileExists(fullPath)) {
paths.push(fullPath);
break;
}
}
}
return paths.sort();
}
function isManifest(value: unknown): value is ExtractorManifest {
if (!value || typeof value !== "object") return false;
const manifest = value as Partial<ExtractorManifest>;
return (
typeof manifest.id === "string" &&
typeof manifest.displayName === "string" &&
Array.isArray(manifest.providesSources) &&
manifest.providesSources.every((source) => typeof source === "string") &&
typeof manifest.run === "function"
);
}
export async function loadManifestFromFile(
path: string,
): Promise<ExtractorManifest> {
const loaded = await import(pathToFileURL(path).href);
const candidateManifest = (loaded as { manifest?: unknown }).manifest;
const candidateDefault = (loaded as { default?: unknown }).default;
const manifest = isManifest(candidateManifest)
? candidateManifest
: candidateDefault;
if (!isManifest(manifest)) {
throw new Error(`Invalid manifest export in ${path}`);
}
return {
...manifest,
providesSources: [...manifest.providesSources],
requiredEnvVars: manifest.requiredEnvVars
? [...manifest.requiredEnvVars]
: undefined,
};
}

View File

@ -0,0 +1,194 @@
import { logger } from "@infra/logger";
import type { ExtractorManifest } from "@shared/types";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
vi.mock("./discovery", () => ({
discoverManifestPaths: vi.fn(),
loadManifestFromFile: vi.fn(),
}));
function makeManifest(
id: string,
sources: string[],
displayName = id,
): ExtractorManifest {
return {
id,
displayName,
providesSources: sources,
run: vi.fn(),
};
}
describe("extractor registry", () => {
let previousStrict: string | undefined;
beforeEach(async () => {
vi.clearAllMocks();
previousStrict = process.env.EXTRACTOR_REGISTRY_STRICT;
process.env.EXTRACTOR_REGISTRY_STRICT = "false";
const module = await import("./registry");
module.__resetExtractorRegistryForTests();
});
afterEach(() => {
vi.restoreAllMocks();
if (previousStrict === undefined) {
delete process.env.EXTRACTOR_REGISTRY_STRICT;
return;
}
process.env.EXTRACTOR_REGISTRY_STRICT = previousStrict;
});
it("loads manifests and maps sources", async () => {
const discovery = await import("./discovery");
const registryModule = await import("./registry");
registryModule.__resetExtractorRegistryForTests();
vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([
"/tmp/jobspy.ts",
"/tmp/ukvisajobs.ts",
]);
vi.mocked(discovery.loadManifestFromFile).mockImplementation(
async (path) =>
path === "/tmp/jobspy.ts"
? makeManifest(
"jobspy",
["indeed", "linkedin", "glassdoor"],
"JobSpy",
)
: makeManifest("ukvisajobs", ["ukvisajobs"], "UK Visa Jobs"),
);
const registry = await registryModule.initializeExtractorRegistry();
expect(registry.manifests.size).toBe(2);
expect(registry.manifestBySource.get("linkedin")?.id).toBe("jobspy");
expect(registry.manifestBySource.get("ukvisajobs")?.id).toBe("ukvisajobs");
});
it("throws on duplicate manifest ids in strict mode", async () => {
const discovery = await import("./discovery");
const registryModule = await import("./registry");
registryModule.__resetExtractorRegistryForTests();
process.env.EXTRACTOR_REGISTRY_STRICT = "true";
vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([
"/tmp/one.ts",
"/tmp/two.ts",
]);
vi.mocked(discovery.loadManifestFromFile).mockImplementation(async (path) =>
makeManifest(
"duplicate",
path === "/tmp/one.ts" ? ["indeed"] : ["linkedin"],
`Manifest ${path}`,
),
);
await expect(registryModule.initializeExtractorRegistry()).rejects.toThrow(
"Duplicate extractor manifest id: duplicate",
);
});
it("throws on duplicate source providers even in non-strict mode", async () => {
const discovery = await import("./discovery");
const registryModule = await import("./registry");
registryModule.__resetExtractorRegistryForTests();
process.env.EXTRACTOR_REGISTRY_STRICT = "false";
vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([
"/tmp/one.ts",
"/tmp/two.ts",
]);
vi.mocked(discovery.loadManifestFromFile).mockImplementation(
async (path) =>
path === "/tmp/one.ts"
? makeManifest("one", ["indeed"], "One")
: makeManifest("two", ["indeed"], "Two"),
);
await expect(registryModule.initializeExtractorRegistry()).rejects.toThrow(
"Source indeed is provided by multiple manifests (one, two)",
);
});
it("warns and skips manifests with unknown sources", async () => {
const discovery = await import("./discovery");
const registryModule = await import("./registry");
registryModule.__resetExtractorRegistryForTests();
const warnSpy = vi.spyOn(logger, "warn").mockImplementation(() => {});
vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([
"/tmp/unknown.ts",
"/tmp/valid.ts",
]);
vi.mocked(discovery.loadManifestFromFile).mockImplementation(
async (path) =>
path === "/tmp/unknown.ts"
? makeManifest("unknown", ["not-a-real-source"], "Unknown")
: makeManifest("valid", ["indeed"], "Valid"),
);
const registry = await registryModule.initializeExtractorRegistry();
expect(registry.manifests.size).toBe(1);
expect(registry.manifests.has("valid")).toBe(true);
expect(
warnSpy.mock.calls.some(
([message]) =>
typeof message === "string" &&
message.includes("Skipping extractor manifest with no known sources"),
),
).toBe(true);
});
it("warns when catalog pipeline sources have no runtime manifest", async () => {
const discovery = await import("./discovery");
const registryModule = await import("./registry");
registryModule.__resetExtractorRegistryForTests();
const warnSpy = vi.spyOn(logger, "warn").mockImplementation(() => {});
vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([
"/tmp/only-indeed.ts",
]);
vi.mocked(discovery.loadManifestFromFile).mockResolvedValue(
makeManifest("only-indeed", ["indeed"], "Only Indeed"),
);
await registryModule.initializeExtractorRegistry();
expect(
warnSpy.mock.calls.some(
([message]) =>
typeof message === "string" &&
message.includes("Shared extractor sources have no runtime manifest"),
),
).toBe(true);
});
it("continues loading valid manifests in non-strict mode when one manifest fails", async () => {
const discovery = await import("./discovery");
const registryModule = await import("./registry");
registryModule.__resetExtractorRegistryForTests();
process.env.EXTRACTOR_REGISTRY_STRICT = "false";
vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([
"/tmp/broken.ts",
"/tmp/valid.ts",
]);
vi.mocked(discovery.loadManifestFromFile).mockImplementation(
async (path) => {
if (path === "/tmp/broken.ts") {
throw new Error("bad manifest");
}
return makeManifest("valid", ["indeed"], "Valid");
},
);
const registry = await registryModule.initializeExtractorRegistry();
expect(registry.manifests.size).toBe(1);
expect(registry.manifests.has("valid")).toBe(true);
expect(registry.manifestBySource.get("indeed")?.id).toBe("valid");
});
});

View File

@ -0,0 +1,239 @@
import { logger } from "@infra/logger";
import { sanitizeUnknown } from "@infra/sanitize";
import {
EXTRACTOR_SOURCE_IDS,
EXTRACTOR_SOURCE_METADATA,
type ExtractorSourceId,
PIPELINE_EXTRACTOR_SOURCE_IDS,
} from "@shared/extractors";
import type { ExtractorManifest } from "@shared/types";
import { discoverManifestPaths, loadManifestFromFile } from "./discovery";
export interface ExtractorRegistry {
manifests: Map<string, ExtractorManifest>;
manifestBySource: Map<ExtractorSourceId, ExtractorManifest>;
availableSources: ExtractorSourceId[];
}
let registry: ExtractorRegistry | null = null;
let initPromise: Promise<ExtractorRegistry> | null = null;
class DuplicateManifestIdError extends Error {
readonly manifestId: string;
constructor(manifestId: string) {
super(`Duplicate extractor manifest id: ${manifestId}`);
this.manifestId = manifestId;
this.name = "DuplicateManifestIdError";
}
}
class DuplicateSourceProviderError extends Error {
readonly source: ExtractorSourceId;
readonly existingManifestId: string;
readonly duplicateManifestId: string;
constructor(args: {
source: ExtractorSourceId;
existingManifestId: string;
duplicateManifestId: string;
}) {
super(
`Source ${args.source} is provided by multiple manifests (${args.existingManifestId}, ${args.duplicateManifestId})`,
);
this.source = args.source;
this.existingManifestId = args.existingManifestId;
this.duplicateManifestId = args.duplicateManifestId;
this.name = "DuplicateSourceProviderError";
}
}
export function __resetExtractorRegistryForTests(): void {
registry = null;
initPromise = null;
}
function strictModeEnabled(): boolean {
if (process.env.EXTRACTOR_REGISTRY_STRICT) {
const raw = process.env.EXTRACTOR_REGISTRY_STRICT.toLowerCase();
return raw === "1" || raw === "true";
}
return process.env.NODE_ENV === "production";
}
function resolveCatalogMismatches(
manifests: Map<string, ExtractorManifest>,
): void {
const missingFromCatalog = new Set<string>();
const missingManifest = new Set<ExtractorSourceId>();
for (const manifest of manifests.values()) {
for (const source of manifest.providesSources) {
if (!EXTRACTOR_SOURCE_IDS.includes(source as ExtractorSourceId)) {
missingFromCatalog.add(source);
}
}
}
for (const sourceId of PIPELINE_EXTRACTOR_SOURCE_IDS) {
const hasManifest = Array.from(manifests.values()).some((manifest) =>
manifest.providesSources.includes(sourceId),
);
if (!hasManifest) {
missingManifest.add(sourceId);
}
}
const strict = strictModeEnabled();
if (missingFromCatalog.size > 0) {
const message =
"Extractor sources are missing from shared extractor catalog";
const context = {
missingFromCatalog: [...missingFromCatalog],
strict,
};
if (strict) {
throw new Error(`${message}: ${[...missingFromCatalog].join(", ")}`);
}
logger.warn(message, context);
}
if (missingManifest.size > 0) {
logger.warn("Shared extractor sources have no runtime manifest", {
missingManifest: [...missingManifest],
strict,
});
}
}
async function createRegistry(): Promise<ExtractorRegistry> {
const manifestPaths = await discoverManifestPaths();
const manifests = new Map<string, ExtractorManifest>();
const manifestBySource = new Map<ExtractorSourceId, ExtractorManifest>();
for (const path of manifestPaths) {
try {
const manifest = await loadManifestFromFile(path);
if (manifests.has(manifest.id)) {
throw new DuplicateManifestIdError(manifest.id);
}
const invalidSources = manifest.providesSources.filter(
(source) => !EXTRACTOR_SOURCE_IDS.includes(source as ExtractorSourceId),
);
const validSources = manifest.providesSources.filter((source) =>
EXTRACTOR_SOURCE_IDS.includes(source as ExtractorSourceId),
) as ExtractorSourceId[];
if (invalidSources.length > 0) {
logger.warn("Extractor manifest contains unknown sources", {
manifestId: manifest.id,
path,
invalidSources,
});
}
if (validSources.length === 0) {
logger.warn("Skipping extractor manifest with no known sources", {
manifestId: manifest.id,
path,
declaredSources: manifest.providesSources,
});
continue;
}
for (const typedSource of validSources) {
if (manifestBySource.has(typedSource)) {
const existing = manifestBySource.get(typedSource);
throw new DuplicateSourceProviderError({
source: typedSource,
existingManifestId: existing?.id ?? "unknown",
duplicateManifestId: manifest.id,
});
}
}
manifests.set(manifest.id, manifest);
for (const source of validSources) {
manifestBySource.set(source, manifest);
}
} catch (error) {
if (error instanceof DuplicateSourceProviderError) {
throw error;
}
if (error instanceof DuplicateManifestIdError && strictModeEnabled()) {
throw error;
}
logger.warn("Skipping invalid extractor manifest", {
path,
error: sanitizeUnknown(error),
});
}
}
resolveCatalogMismatches(manifests);
const availableSources = PIPELINE_EXTRACTOR_SOURCE_IDS.filter((source) =>
manifestBySource.has(source),
);
logger.info("Extractor registry initialized", {
manifestCount: manifests.size,
sourceCount: availableSources.length,
manifests: Array.from(manifests.values()).map((manifest) => ({
id: manifest.id,
sources: manifest.providesSources,
requiredEnvVarsCount: manifest.requiredEnvVars?.length ?? 0,
})),
});
return {
manifests,
manifestBySource,
availableSources,
};
}
export async function initializeExtractorRegistry(): Promise<ExtractorRegistry> {
if (registry) return registry;
if (!initPromise) {
initPromise = createRegistry()
.then((created) => {
registry = created;
return created;
})
.catch((error) => {
logger.error("Failed to initialize extractor registry", {
error: sanitizeUnknown(error),
});
registry = null;
initPromise = null;
throw error;
});
}
return initPromise;
}
export async function getExtractorRegistry(): Promise<ExtractorRegistry> {
return initializeExtractorRegistry();
}
export async function listAvailableSources(): Promise<
Array<{ id: ExtractorSourceId; label: string }>
> {
const current = await getExtractorRegistry();
return current.availableSources.map((source) => ({
id: source,
label: EXTRACTOR_SOURCE_METADATA[source].label,
}));
}
export async function isSourceAvailable(
source: ExtractorSourceId,
): Promise<boolean> {
const current = await getExtractorRegistry();
return current.manifestBySource.has(source);
}

View File

@ -3,7 +3,10 @@
*/
import "./config/env";
import { logger } from "@infra/logger";
import { sanitizeUnknown } from "@infra/sanitize";
import { createApp } from "./app";
import { initializeExtractorRegistry } from "./extractors/registry";
import * as settingsRepo from "./repositories/settings";
import {
getBackupSettings,
@ -16,6 +19,25 @@ import { initialize as initializeVisaSponsors } from "./services/visa-sponsors/i
async function startServer() {
await applyStoredEnvOverrides();
try {
await initializeExtractorRegistry();
} catch (error) {
const sanitizedError = sanitizeUnknown(error);
logger.error("Failed to initialize extractor registry", {
error: sanitizedError,
});
if (process.env.NODE_ENV === "production") {
logger.error(
"Extractor registry initialization failed in production. Shutting down server.",
);
process.exit(1);
}
logger.error(
"Extractor registry initialization failed outside production. Server startup aborted.",
);
return;
}
const app = createApp();
const PORT = process.env.PORT || 3001;
@ -46,7 +68,9 @@ async function startServer() {
await initializeVisaSponsors();
}
} catch (error) {
console.warn("⚠️ Failed to initialize visa sponsors service:", error);
logger.warn("Failed to initialize visa sponsors service", {
error: sanitizeUnknown(error),
});
}
// Initialize backup service (load settings and start scheduler if enabled)
@ -85,13 +109,17 @@ async function startServer() {
);
}
} catch (error) {
console.warn("⚠️ Failed to initialize backup service:", error);
logger.warn("Failed to initialize backup service", {
error: sanitizeUnknown(error),
});
}
try {
await initializeDemoModeServices();
} catch (error) {
console.warn("⚠️ Failed to initialize demo mode services:", error);
logger.warn("Failed to initialize demo mode services", {
error: sanitizeUnknown(error),
});
}
});
}

View File

@ -14,12 +14,7 @@ export type PipelineStep =
| "cancelled"
| "failed";
export type CrawlSource =
| "gradcracker"
| "jobspy"
| "ukvisajobs"
| "adzuna"
| "hiringcafe";
export type CrawlSource = string;
export interface PipelineProgress {
step: PipelineStep;

View File

@ -40,18 +40,6 @@ vi.mock("../repositories/settings", () => ({
getAllSettings: vi.fn().mockResolvedValue({}),
}));
vi.mock("../services/crawler", () => ({
runCrawler: vi.fn(() => ({ success: true, jobs: [] })),
}));
vi.mock("../services/jobspy", () => ({
runJobSpy: vi.fn(() => ({ success: true, jobs: [] })),
}));
vi.mock("../services/ukvisajobs", () => ({
runUkVisaJobs: vi.fn(() => ({ success: true, jobs: [] })),
}));
const now = new Date().toISOString();
const createJob = (overrides: Partial<Job> = {}): Job =>

View File

@ -3,35 +3,19 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import { getProgress, resetProgress } from "../progress";
import { discoverJobsStep } from "./discover-jobs";
vi.mock("../../repositories/jobs", () => ({
getAllJobUrls: vi.fn().mockResolvedValue([]),
}));
vi.mock("../../repositories/settings", () => ({
getAllSettings: vi.fn(),
}));
vi.mock("../../services/jobspy", () => ({
runJobSpy: vi.fn(),
vi.mock("../../repositories/jobs", () => ({
getAllJobUrls: vi.fn().mockResolvedValue([]),
}));
vi.mock("../../services/crawler", () => ({
runCrawler: vi.fn(),
vi.mock("../../extractors/registry", () => ({
getExtractorRegistry: vi.fn(),
}));
vi.mock("../../services/adzuna", () => ({
runAdzuna: vi.fn(),
}));
vi.mock("../../services/hiring-cafe", () => ({
runHiringCafe: vi.fn(),
}));
vi.mock("../../services/ukvisajobs", () => ({
runUkVisaJobs: vi.fn(),
}));
const config: PipelineConfig = {
const baseConfig: PipelineConfig = {
topN: 10,
minSuitabilityScore: 50,
sources: ["indeed", "linkedin", "ukvisajobs"],
@ -50,565 +34,120 @@ describe("discoverJobsStep", () => {
it("aggregates source errors for enabled sources", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
const ukVisa = await import("../../services/ukvisajobs");
const registryModule = await import("../../extractors/registry");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
} as any);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [
{
source: "linkedin",
title: "Engineer",
employer: "ACME",
jobUrl: "https://example.com/job",
},
],
} as any);
vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({
success: false,
error: "login failed",
} as any);
const result = await discoverJobsStep({ mergedConfig: config });
expect(result.discoveredJobs).toHaveLength(1);
expect(result.sourceErrors).toEqual(["ukvisajobs: login failed"]);
expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith(
expect.objectContaining({ sites: ["indeed", "linkedin"] }),
);
});
it("passes glassdoor through to JobSpy when selected", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
} as any);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [
{
source: "glassdoor",
title: "Engineer",
employer: "ACME",
jobUrl: "https://example.com/job",
},
],
} as any);
const result = await discoverJobsStep({
mergedConfig: {
...config,
sources: ["glassdoor"],
},
});
expect(result.discoveredJobs).toHaveLength(1);
expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith(
expect.objectContaining({ sites: ["glassdoor"] }),
);
});
it("passes serialized multi-city locations to JobSpy", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united kingdom",
searchCities: "London|Manchester",
} as any);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [],
} as any);
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["linkedin"],
},
});
expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith(
expect.objectContaining({
location: "London|Manchester",
const jobspyManifest = {
id: "jobspy",
displayName: "JobSpy",
providesSources: ["indeed", "linkedin", "glassdoor"],
run: vi.fn().mockResolvedValue({
success: true,
jobs: [
{
source: "linkedin",
title: "Engineer",
employer: "ACME",
jobUrl: "https://example.com/job",
},
],
}),
);
});
it("filters out glassdoor for unsupported countries", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
};
const ukvisaManifest = {
id: "ukvisajobs",
displayName: "UK Visa Jobs",
providesSources: ["ukvisajobs"],
run: vi.fn().mockResolvedValue({
success: false,
jobs: [],
error: "login failed",
}),
};
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "japan",
} as any);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [],
vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({
manifests: new Map([
["jobspy", jobspyManifest as any],
["ukvisajobs", ukvisaManifest as any],
]),
manifestBySource: new Map([
["indeed", jobspyManifest as any],
["linkedin", jobspyManifest as any],
["glassdoor", jobspyManifest as any],
["ukvisajobs", ukvisaManifest as any],
]),
availableSources: ["indeed", "linkedin", "glassdoor", "ukvisajobs"],
} as any);
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["glassdoor", "linkedin"],
},
});
const result = await discoverJobsStep({ mergedConfig: baseConfig });
expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith(
expect.objectContaining({ sites: ["linkedin"] }),
expect(result.discoveredJobs).toHaveLength(1);
expect(result.sourceErrors).toEqual([
"UK Visa Jobs: login failed (sources: ukvisajobs)",
]);
expect(jobspyManifest.run).toHaveBeenCalledWith(
expect.objectContaining({ selectedSources: ["indeed", "linkedin"] }),
);
});
it("throws when all enabled sources fail", async () => {
const settingsRepo = await import("../../repositories/settings");
const ukVisa = await import("../../services/ukvisajobs");
const registryModule = await import("../../extractors/registry");
const ukvisaManifest = {
id: "ukvisajobs",
displayName: "UK Visa Jobs",
providesSources: ["ukvisajobs"],
run: vi.fn().mockResolvedValue({
success: false,
jobs: [],
error: "boom",
}),
};
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
} as any);
vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({
success: false,
error: "boom",
vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({
manifests: new Map([["ukvisajobs", ukvisaManifest as any]]),
manifestBySource: new Map([["ukvisajobs", ukvisaManifest as any]]),
availableSources: ["ukvisajobs"],
} as any);
await expect(
discoverJobsStep({
mergedConfig: {
...config,
...baseConfig,
sources: ["ukvisajobs"],
},
}),
).rejects.toThrow("All sources failed: ukvisajobs: boom");
});
it("runs adzuna when selected and country is compatible", async () => {
const settingsRepo = await import("../../repositories/settings");
const adzuna = await import("../../services/adzuna");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united states",
} as any);
vi.mocked(adzuna.runAdzuna).mockResolvedValue({
success: true,
jobs: [
{
source: "adzuna",
sourceJobId: "adzu-1",
title: "Engineer",
employer: "ACME",
jobUrl: "https://example.com/job",
applicationLink: "https://example.com/job",
},
],
} as any);
const result = await discoverJobsStep({
mergedConfig: {
...config,
sources: ["adzuna"],
},
});
expect(result.discoveredJobs).toHaveLength(1);
expect(vi.mocked(adzuna.runAdzuna)).toHaveBeenCalledWith(
expect.objectContaining({ country: "us" }),
).rejects.toThrow(
"All sources failed: UK Visa Jobs: boom (sources: ukvisajobs)",
);
});
it("passes configured city locations to adzuna", async () => {
const settingsRepo = await import("../../repositories/settings");
const adzuna = await import("../../services/adzuna");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united kingdom",
searchCities: "Leeds|Manchester",
} as any);
vi.mocked(adzuna.runAdzuna).mockResolvedValue({
success: true,
jobs: [],
} as any);
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["adzuna"],
},
});
expect(vi.mocked(adzuna.runAdzuna)).toHaveBeenCalledWith(
expect.objectContaining({
country: "gb",
countryKey: "united kingdom",
locations: ["Leeds", "Manchester"],
}),
);
});
it("skips adzuna for unsupported countries", async () => {
const settingsRepo = await import("../../repositories/settings");
const adzuna = await import("../../services/adzuna");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "japan",
} as any);
await expect(
discoverJobsStep({
mergedConfig: {
...config,
sources: ["adzuna"],
},
}),
).rejects.toThrow("No compatible sources for selected country: Japan");
expect(vi.mocked(adzuna.runAdzuna)).not.toHaveBeenCalled();
});
it("runs hiringcafe when selected and passes country/terms/cap", async () => {
const settingsRepo = await import("../../repositories/settings");
const hiringCafe = await import("../../services/hiring-cafe");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united states",
jobspyResultsWanted: "25",
} as any);
vi.mocked(hiringCafe.runHiringCafe).mockResolvedValue({
success: true,
jobs: [
{
source: "hiringcafe",
sourceJobId: "hc-1",
title: "Engineer",
employer: "ACME",
jobUrl: "https://example.com/hc",
applicationLink: "https://example.com/hc",
},
],
} as any);
const result = await discoverJobsStep({
mergedConfig: {
...config,
sources: ["hiringcafe"],
},
});
expect(result.discoveredJobs).toHaveLength(1);
expect(vi.mocked(hiringCafe.runHiringCafe)).toHaveBeenCalledWith(
expect.objectContaining({
country: "united states",
countryKey: "united states",
locations: [],
searchTerms: ["engineer"],
maxJobsPerTerm: 25,
}),
);
});
it("passes configured city locations to hiringcafe", async () => {
const settingsRepo = await import("../../repositories/settings");
const hiringCafe = await import("../../services/hiring-cafe");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united kingdom",
jobspyResultsWanted: "25",
searchCities: "Leeds|Manchester",
} as any);
vi.mocked(hiringCafe.runHiringCafe).mockResolvedValue({
success: true,
jobs: [],
} as any);
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["hiringcafe"],
},
});
expect(vi.mocked(hiringCafe.runHiringCafe)).toHaveBeenCalledWith(
expect.objectContaining({
country: "united kingdom",
countryKey: "united kingdom",
locations: ["Leeds", "Manchester"],
}),
);
});
it("updates Hiring Cafe terms and pages via progress callbacks", async () => {
const settingsRepo = await import("../../repositories/settings");
const hiringCafe = await import("../../services/hiring-cafe");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer", "frontend"]),
jobspyCountryIndeed: "united kingdom",
jobspyResultsWanted: "50",
} as any);
vi.mocked(hiringCafe.runHiringCafe).mockImplementation(
async (options: any) => {
options?.onProgress?.({
type: "term_start",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
});
options?.onProgress?.({
type: "page_fetched",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
pageNo: 0,
resultsOnPage: 10,
totalCollected: 10,
});
options?.onProgress?.({
type: "term_complete",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
jobsFoundTerm: 10,
});
return { success: true, jobs: [] } as any;
},
);
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["hiringcafe"],
},
});
const progress = getProgress();
expect(progress.crawlingTermsProcessed).toBe(1);
expect(progress.crawlingTermsTotal).toBe(2);
expect(progress.crawlingListPagesProcessed).toBe(1);
expect(progress.crawlingJobPagesEnqueued).toBe(10);
expect(progress.crawlingJobPagesProcessed).toBe(10);
});
it("returns Hiring Cafe source error when extractor fails", async () => {
const settingsRepo = await import("../../repositories/settings");
const hiringCafe = await import("../../services/hiring-cafe");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united kingdom",
jobspyResultsWanted: "50",
} as any);
vi.mocked(hiringCafe.runHiringCafe).mockResolvedValue({
success: false,
jobs: [],
error: "blocked upstream",
} as any);
await expect(
discoverJobsStep({
mergedConfig: {
...config,
sources: ["hiringcafe"],
},
}),
).rejects.toThrow("All sources failed: hiringcafe: blocked upstream");
});
it("maps Gradcracker progress callback into live crawling counters", async () => {
const settingsRepo = await import("../../repositories/settings");
const crawler = await import("../../services/crawler");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
} as any);
vi.mocked(crawler.runCrawler).mockImplementation(async (options: any) => {
options?.onProgress?.({
phase: "list",
currentUrl: "https://example.com/list",
listPagesProcessed: 3,
listPagesTotal: 10,
jobCardsFound: 42,
jobPagesEnqueued: 30,
jobPagesSkipped: 4,
jobPagesProcessed: 8,
});
return { success: true, jobs: [] } as any;
});
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["gradcracker"],
},
});
const progress = getProgress();
expect(progress.crawlingSource).toBeNull();
expect(progress.crawlingListPagesProcessed).toBe(3);
expect(progress.crawlingListPagesTotal).toBe(10);
expect(progress.crawlingJobCardsFound).toBe(42);
expect(progress.crawlingJobPagesEnqueued).toBe(30);
expect(progress.crawlingJobPagesSkipped).toBe(4);
expect(progress.crawlingJobPagesProcessed).toBe(8);
});
it("updates JobSpy terms and UKVisa pages via progress callbacks", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
const ukVisa = await import("../../services/ukvisajobs");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer", "frontend"]),
} as any);
vi.mocked(jobSpy.runJobSpy).mockImplementation(async (options: any) => {
options?.onProgress?.({
type: "term_start",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
});
options?.onProgress?.({
type: "term_complete",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
jobsFoundTerm: 10,
});
options?.onProgress?.({
type: "term_start",
termIndex: 2,
termTotal: 2,
searchTerm: "frontend",
});
options?.onProgress?.({
type: "term_complete",
termIndex: 2,
termTotal: 2,
searchTerm: "frontend",
jobsFoundTerm: 8,
});
return { success: true, jobs: [] } as any;
});
vi.mocked(ukVisa.runUkVisaJobs).mockImplementation(async (options: any) => {
options?.onProgress?.({
type: "init",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
maxPages: 4,
maxJobs: 50,
});
options?.onProgress?.({
type: "page_fetched",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
pageNo: 2,
maxPages: 4,
jobsOnPage: 15,
totalCollected: 18,
totalAvailable: 100,
});
options?.onProgress?.({
type: "term_complete",
termIndex: 1,
termTotal: 2,
searchTerm: "engineer",
jobsFoundTerm: 18,
totalCollected: 18,
});
return { success: true, jobs: [] } as any;
});
await discoverJobsStep({
mergedConfig: {
...config,
sources: ["linkedin", "ukvisajobs"],
},
});
const progress = getProgress();
expect(progress.crawlingTermsProcessed).toBe(3);
expect(progress.crawlingTermsTotal).toBe(4);
expect(progress.crawlingListPagesProcessed).toBe(2);
expect(progress.crawlingListPagesTotal).toBe(4);
expect(progress.crawlingJobPagesEnqueued).toBe(18);
expect(progress.crawlingJobPagesProcessed).toBe(18);
});
it("skips UK-only sources for non-UK country and runs compatible sources", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
const crawler = await import("../../services/crawler");
const ukVisa = await import("../../services/ukvisajobs");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united states",
} as any);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [
{
source: "linkedin",
title: "Engineer",
employer: "ACME",
jobUrl: "https://example.com/job",
},
],
} as any);
const result = await discoverJobsStep({
mergedConfig: {
...config,
sources: ["linkedin", "gradcracker", "ukvisajobs"],
},
});
expect(result.discoveredJobs).toHaveLength(1);
expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledTimes(1);
expect(vi.mocked(crawler.runCrawler)).not.toHaveBeenCalled();
expect(vi.mocked(ukVisa.runUkVisaJobs)).not.toHaveBeenCalled();
});
it("throws when all requested sources are incompatible for country", async () => {
const settingsRepo = await import("../../repositories/settings");
const registryModule = await import("../../extractors/registry");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united states",
} as any);
vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({
manifests: new Map(),
manifestBySource: new Map(),
availableSources: [],
} as any);
await expect(
discoverJobsStep({
mergedConfig: {
...config,
...baseConfig,
sources: ["gradcracker", "ukvisajobs"],
},
}),
@ -619,63 +158,75 @@ describe("discoverJobsStep", () => {
it("does not throw when no sources are requested", async () => {
const settingsRepo = await import("../../repositories/settings");
const adzuna = await import("../../services/adzuna");
const hiringCafe = await import("../../services/hiring-cafe");
const jobSpy = await import("../../services/jobspy");
const crawler = await import("../../services/crawler");
const ukVisa = await import("../../services/ukvisajobs");
const registryModule = await import("../../extractors/registry");
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
jobspyCountryIndeed: "united states",
} as any);
vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({
manifests: new Map(),
manifestBySource: new Map(),
availableSources: [],
} as any);
const result = await discoverJobsStep({
mergedConfig: {
...config,
...baseConfig,
sources: [],
},
});
expect(result.discoveredJobs).toEqual([]);
expect(result.sourceErrors).toEqual([]);
expect(vi.mocked(jobSpy.runJobSpy)).not.toHaveBeenCalled();
expect(vi.mocked(adzuna.runAdzuna)).not.toHaveBeenCalled();
expect(vi.mocked(hiringCafe.runHiringCafe)).not.toHaveBeenCalled();
expect(vi.mocked(crawler.runCrawler)).not.toHaveBeenCalled();
expect(vi.mocked(ukVisa.runUkVisaJobs)).not.toHaveBeenCalled();
});
it("drops discovered jobs when employer matches blocked company keywords", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
const registryModule = await import("../../extractors/registry");
const jobspyManifest = {
id: "jobspy",
displayName: "JobSpy",
providesSources: ["indeed", "linkedin", "glassdoor"],
run: vi.fn().mockResolvedValue({
success: true,
jobs: [
{
source: "linkedin",
title: "Engineer",
employer: "Acme Staffing",
jobUrl: "https://example.com/job-1",
},
{
source: "linkedin",
title: "Engineer II",
employer: "Contoso",
jobUrl: "https://example.com/job-2",
},
],
}),
};
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
blockedCompanyKeywords: JSON.stringify(["recruit", "staffing"]),
} as any);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [
{
source: "linkedin",
title: "Engineer",
employer: "Acme Staffing",
jobUrl: "https://example.com/job-1",
},
{
source: "linkedin",
title: "Engineer II",
employer: "Contoso",
jobUrl: "https://example.com/job-2",
},
],
vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({
manifests: new Map([["jobspy", jobspyManifest as any]]),
manifestBySource: new Map([
["indeed", jobspyManifest as any],
["linkedin", jobspyManifest as any],
["glassdoor", jobspyManifest as any],
]),
availableSources: ["indeed", "linkedin", "glassdoor"],
} as any);
const result = await discoverJobsStep({
mergedConfig: {
...config,
...baseConfig,
sources: ["linkedin"],
},
});
@ -684,32 +235,139 @@ describe("discoverJobsStep", () => {
expect(result.discoveredJobs[0]?.employer).toBe("Contoso");
});
it("applies shared city filtering for sources without native city filtering", async () => {
const settingsRepo = await import("../../repositories/settings");
const registryModule = await import("../../extractors/registry");
const gradcrackerManifest = {
id: "gradcracker",
displayName: "Gradcracker",
providesSources: ["gradcracker"],
run: vi.fn().mockResolvedValue({
success: true,
jobs: [
{
source: "gradcracker",
title: "Engineer - Leeds",
employer: "ACME",
location: "Leeds, England, UK",
jobUrl: "https://example.com/grad-1",
},
{
source: "gradcracker",
title: "Engineer - London",
employer: "ACME",
location: "London, England, UK",
jobUrl: "https://example.com/grad-2",
},
],
}),
};
const ukvisaManifest = {
id: "ukvisajobs",
displayName: "UK Visa Jobs",
providesSources: ["ukvisajobs"],
run: vi.fn().mockResolvedValue({
success: true,
jobs: [
{
source: "ukvisajobs",
title: "Developer - Leeds",
employer: "Contoso",
location: "Leeds, England, UK",
jobUrl: "https://example.com/ukv-1",
},
],
}),
};
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
searchCities: "Leeds",
jobspyCountryIndeed: "united kingdom",
} as any);
vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({
manifests: new Map([
["gradcracker", gradcrackerManifest as any],
["ukvisajobs", ukvisaManifest as any],
]),
manifestBySource: new Map([
["gradcracker", gradcrackerManifest as any],
["ukvisajobs", ukvisaManifest as any],
]),
availableSources: ["gradcracker", "ukvisajobs"],
} as any);
const result = await discoverJobsStep({
mergedConfig: {
...baseConfig,
sources: ["gradcracker", "ukvisajobs"],
},
});
expect(result.discoveredJobs).toHaveLength(2);
expect(
result.discoveredJobs.every((job) => job.location?.includes("Leeds")),
).toBe(true);
});
it("tracks source completion counters across source transitions", async () => {
const settingsRepo = await import("../../repositories/settings");
const jobSpy = await import("../../services/jobspy");
const crawler = await import("../../services/crawler");
const ukVisa = await import("../../services/ukvisajobs");
const jobsRepo = await import("../../repositories/jobs");
const registryModule = await import("../../extractors/registry");
const jobspyManifest = {
id: "jobspy",
displayName: "JobSpy",
providesSources: ["indeed", "linkedin", "glassdoor"],
run: vi.fn().mockResolvedValue({ success: true, jobs: [] }),
};
const gradcrackerManifest = {
id: "gradcracker",
displayName: "Gradcracker",
providesSources: ["gradcracker"],
run: vi.fn().mockResolvedValue({ success: true, jobs: [] }),
};
const ukvisaManifest = {
id: "ukvisajobs",
displayName: "UK Visa Jobs",
providesSources: ["ukvisajobs"],
run: vi.fn().mockResolvedValue({ success: true, jobs: [] }),
};
vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({
searchTerms: JSON.stringify(["engineer"]),
} as any);
vi.mocked(jobsRepo.getAllJobUrls).mockResolvedValue([
"https://example.com/existing",
]);
vi.mocked(jobSpy.runJobSpy).mockResolvedValue({
success: true,
jobs: [],
} as any);
vi.mocked(crawler.runCrawler).mockResolvedValue({
success: true,
jobs: [],
} as any);
vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({
success: true,
jobs: [],
vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({
manifests: new Map([
["jobspy", jobspyManifest as any],
["gradcracker", gradcrackerManifest as any],
["ukvisajobs", ukvisaManifest as any],
]),
manifestBySource: new Map([
["indeed", jobspyManifest as any],
["linkedin", jobspyManifest as any],
["glassdoor", jobspyManifest as any],
["gradcracker", gradcrackerManifest as any],
["ukvisajobs", ukvisaManifest as any],
]),
availableSources: [
"indeed",
"linkedin",
"glassdoor",
"gradcracker",
"ukvisajobs",
],
} as any);
await discoverJobsStep({
mergedConfig: {
...config,
...baseConfig,
sources: ["linkedin", "gradcracker", "ukvisajobs"],
},
});
@ -717,5 +375,17 @@ describe("discoverJobsStep", () => {
const progress = getProgress();
expect(progress.crawlingSourcesTotal).toBe(3);
expect(progress.crawlingSourcesCompleted).toBe(3);
expect(gradcrackerManifest.run).toHaveBeenCalledWith(
expect.objectContaining({
getExistingJobUrls: expect.any(Function),
}),
);
const [{ getExistingJobUrls }] = gradcrackerManifest.run.mock.calls[0] as [
{ getExistingJobUrls: () => Promise<string[]> },
];
await expect(getExistingJobUrls()).resolves.toEqual([
"https://example.com/existing",
]);
});
});

View File

@ -1,20 +1,20 @@
import { logger } from "@infra/logger";
import { sanitizeUnknown } from "@infra/sanitize";
import {
formatCountryLabel,
getAdzunaCountryCode,
isSourceAllowedForCountry,
normalizeCountryKey,
} from "@shared/location-support.js";
import { normalizeStringArray } from "@shared/normalize-string-array.js";
import { parseSearchCitiesSetting } from "@shared/search-cities.js";
import {
matchesRequestedCity,
resolveSearchCities,
shouldApplyStrictCityFilter,
} from "@shared/search-cities.js";
import type { CreateJobInput, PipelineConfig } from "@shared/types";
import * as jobsRepo from "../../repositories/jobs";
import { getExtractorRegistry } from "../../extractors/registry";
import { getAllJobUrls } from "../../repositories/jobs";
import * as settingsRepo from "../../repositories/settings";
import { runAdzuna } from "../../services/adzuna";
import { runCrawler } from "../../services/crawler";
import { runHiringCafe } from "../../services/hiring-cafe";
import { runJobSpy } from "../../services/jobspy";
import { runUkVisaJobs } from "../../services/ukvisajobs";
import { asyncPool } from "../../utils/async-pool";
import { type CrawlSource, progressHelpers, updateProgress } from "../progress";
@ -57,6 +57,26 @@ function isBlockedEmployer(
);
}
function filterJobsByRequestedCities(args: {
jobs: CreateJobInput[];
selectedCountry: string;
requestedCities: string[];
}): CreateJobInput[] {
const { jobs, selectedCountry, requestedCities } = args;
if (requestedCities.length === 0) return jobs;
return jobs.filter((job) =>
requestedCities.some((requestedCity) => {
const strict = shouldApplyStrictCityFilter(
requestedCity,
selectedCountry,
);
if (!strict) return true;
return matchesRequestedCity(job.location, requestedCity);
}),
);
}
export async function discoverJobsStep(args: {
mergedConfig: PipelineConfig;
shouldCancel?: () => boolean;
@ -70,6 +90,7 @@ export async function discoverJobsStep(args: {
const sourceErrors: string[] = [];
const settings = await settingsRepo.getAllSettings();
const registry = await getExtractorRegistry();
const searchTermsSetting = settings.searchTerms;
let searchTerms: string[] = [];
@ -94,6 +115,13 @@ export async function discoverJobsStep(args: {
const compatibleSources = args.mergedConfig.sources.filter((source) =>
isSourceAllowedForCountry(source, selectedCountry),
);
let existingJobUrlsPromise: Promise<string[]> | null = null;
const getExistingJobUrls = (): Promise<string[]> => {
if (!existingJobUrlsPromise) {
existingJobUrlsPromise = getAllJobUrls();
}
return existingJobUrlsPromise;
};
const skippedSources = args.mergedConfig.sources.filter(
(source) => !compatibleSources.includes(source),
);
@ -114,390 +142,95 @@ export async function discoverJobsStep(args: {
);
}
const jobSpySites = compatibleSources.filter(
(source): source is "indeed" | "linkedin" | "glassdoor" =>
source === "indeed" || source === "linkedin" || source === "glassdoor",
);
const groupedByManifest = new Map<
string,
{ sources: string[]; detail: string; termsTotal?: number }
>();
for (const source of compatibleSources) {
const manifest = registry.manifestBySource.get(source);
if (!manifest) {
sourceErrors.push(`${source}: extractor manifest not registered`);
continue;
}
const existing = groupedByManifest.get(manifest.id);
if (existing) {
existing.sources.push(source);
continue;
}
groupedByManifest.set(manifest.id, {
sources: [source],
termsTotal: searchTerms.length,
detail: `${manifest.displayName}: fetching jobs...`,
});
}
const sourceTasks: DiscoverySourceTask[] = [];
if (jobSpySites.length > 0) {
for (const [manifestId, grouped] of groupedByManifest) {
const manifest = registry.manifests.get(manifestId);
if (!manifest) continue;
sourceTasks.push({
source: "jobspy",
termsTotal: searchTerms.length,
detail: `JobSpy: scraping ${jobSpySites.join(", ")}...`,
source: manifest.id,
termsTotal: grouped.termsTotal,
detail:
grouped.sources.length > 1
? `${manifest.displayName}: ${grouped.sources.join(", ")}...`
: grouped.detail,
run: async () => {
const jobSpyResult = await runJobSpy({
sites: jobSpySites,
searchTerms,
location:
settings.searchCities ?? settings.jobspyLocation ?? undefined,
resultsWanted: settings.jobspyResultsWanted
? parseInt(settings.jobspyResultsWanted, 10)
: undefined,
countryIndeed: settings.jobspyCountryIndeed ?? undefined,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "jobspy",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
progressHelpers.crawlingUpdate({
source: "jobspy",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`,
});
},
});
if (!jobSpyResult.success) {
return {
discoveredJobs: [],
sourceErrors: [`jobspy: ${jobSpyResult.error ?? "unknown error"}`],
};
}
return {
discoveredJobs: jobSpyResult.jobs,
sourceErrors: [],
};
},
});
}
if (compatibleSources.includes("adzuna")) {
sourceTasks.push({
source: "adzuna",
termsTotal: searchTerms.length,
detail: "Adzuna: fetching jobs...",
run: async () => {
const adzunaCountryCode = getAdzunaCountryCode(selectedCountry);
if (!adzunaCountryCode) {
return {
discoveredJobs: [],
sourceErrors: [
`adzuna: unsupported country ${formatCountryLabel(selectedCountry)}`,
],
};
}
const adzunaMaxJobsPerTerm = settings.adzunaMaxJobsPerTerm
? parseInt(settings.adzunaMaxJobsPerTerm, 10)
: 50;
const adzunaResult = await runAdzuna({
country: adzunaCountryCode,
countryKey: selectedCountry,
locations: parseSearchCitiesSetting(
settings.searchCities ?? settings.jobspyLocation,
const filteredSettings = Object.fromEntries(
Object.entries(settings).filter(
([, value]) =>
typeof value === "string" || typeof value === "undefined",
),
) as Record<string, string | undefined>;
const result = await manifest.run({
source: grouped.sources[0],
selectedSources: grouped.sources,
settings: filteredSettings,
searchTerms,
maxJobsPerTerm: adzunaMaxJobsPerTerm,
selectedCountry,
getExistingJobUrls,
shouldCancel: args.shouldCancel,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
if (event.type === "page_fetched") {
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${event.pageNo}`,
});
updateProgress({
step: "crawling",
detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo} (${event.totalCollected} collected)`,
});
return;
}
progressHelpers.crawlingUpdate({
source: "adzuna",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
source: manifest.id,
termsProcessed: event.termsProcessed,
termsTotal: event.termsTotal,
listPagesProcessed: event.listPagesProcessed,
listPagesTotal: event.listPagesTotal,
jobCardsFound: event.jobCardsFound,
jobPagesEnqueued: event.jobPagesEnqueued,
jobPagesSkipped: event.jobPagesSkipped,
jobPagesProcessed: event.jobPagesProcessed,
phase: event.phase,
currentUrl: event.currentUrl,
});
if (event.detail) {
updateProgress({
step: "crawling",
detail: event.detail,
});
}
},
});
if (!adzunaResult.success) {
return {
discoveredJobs: [],
sourceErrors: [`adzuna: ${adzunaResult.error ?? "unknown error"}`],
};
}
return {
discoveredJobs: adzunaResult.jobs,
sourceErrors: [],
};
},
});
}
if (compatibleSources.includes("hiringcafe")) {
sourceTasks.push({
source: "hiringcafe",
termsTotal: searchTerms.length,
detail: "Hiring Cafe: fetching jobs...",
run: async () => {
const hiringCafeMaxJobsPerTerm = settings.jobspyResultsWanted
? parseInt(settings.jobspyResultsWanted, 10)
: 200;
const hiringCafeResult = await runHiringCafe({
country: selectedCountry,
countryKey: selectedCountry,
locations: parseSearchCitiesSetting(
settings.searchCities ?? settings.jobspyLocation,
),
searchTerms,
maxJobsPerTerm: hiringCafeMaxJobsPerTerm,
onProgress: (event) => {
if (event.type === "term_start") {
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
return;
}
if (event.type === "page_fetched") {
const displayPageNo = event.pageNo + 1;
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: displayPageNo,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${displayPageNo}`,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${displayPageNo} (${event.totalCollected} collected)`,
});
return;
}
progressHelpers.crawlingUpdate({
source: "hiringcafe",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm,
});
updateProgress({
step: "crawling",
detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`,
});
},
});
if (!hiringCafeResult.success) {
if (!result.success) {
return {
discoveredJobs: [],
sourceErrors: [
`hiringcafe: ${hiringCafeResult.error ?? "unknown error"}`,
`${manifest.displayName || manifest.id}: ${result.error ?? "unknown error"} (sources: ${grouped.sources.join(",")})`,
],
};
}
return {
discoveredJobs: hiringCafeResult.jobs,
sourceErrors: [],
};
},
});
}
if (compatibleSources.includes("gradcracker")) {
sourceTasks.push({
source: "gradcracker",
detail: "Gradcracker: scraping...",
run: async () => {
const existingJobUrls = await jobsRepo.getAllJobUrls();
const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm
? parseInt(settings.gradcrackerMaxJobsPerTerm, 10)
: 50;
const crawlerResult = await runCrawler({
existingJobUrls,
searchTerms,
maxJobsPerTerm: gradcrackerMaxJobs,
onProgress: (progress) => {
progressHelpers.crawlingUpdate({
source: "gradcracker",
listPagesProcessed: progress.listPagesProcessed,
listPagesTotal: progress.listPagesTotal,
jobCardsFound: progress.jobCardsFound,
jobPagesEnqueued: progress.jobPagesEnqueued,
jobPagesSkipped: progress.jobPagesSkipped,
jobPagesProcessed: progress.jobPagesProcessed,
phase: progress.phase,
currentUrl: progress.currentUrl,
});
},
});
if (!crawlerResult.success) {
return {
discoveredJobs: [],
sourceErrors: [
`gradcracker: ${crawlerResult.error ?? "unknown error"}`,
],
};
}
return {
discoveredJobs: crawlerResult.jobs,
sourceErrors: [],
};
},
});
}
if (compatibleSources.includes("ukvisajobs")) {
sourceTasks.push({
source: "ukvisajobs",
termsTotal: searchTerms.length,
detail: "UKVisaJobs: scraping visa-sponsoring jobs...",
run: async () => {
const ukvisajobsMaxJobs = settings.ukvisajobsMaxJobs
? parseInt(settings.ukvisajobsMaxJobs, 10)
: 50;
const ukVisaResult = await runUkVisaJobs({
maxJobs: ukvisajobsMaxJobs,
searchTerms,
onProgress: (event) => {
if (event.type === "init") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: 0,
listPagesTotal: event.maxPages,
jobPagesEnqueued: 0,
jobPagesProcessed: 0,
jobPagesSkipped: 0,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
}
if (event.type === "page_fetched") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: Math.max(event.termIndex - 1, 0),
termsTotal: event.termTotal,
listPagesProcessed: event.pageNo,
listPagesTotal: event.maxPages,
jobPagesEnqueued: event.totalCollected,
jobPagesProcessed: event.totalCollected,
phase: "list",
currentUrl: `page ${event.pageNo}/${event.maxPages}`,
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`,
});
return;
}
if (event.type === "term_complete") {
progressHelpers.crawlingUpdate({
source: "ukvisajobs",
termsProcessed: event.termIndex,
termsTotal: event.termTotal,
phase: "list",
currentUrl: event.searchTerm || "all jobs",
});
updateProgress({
step: "crawling",
detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`,
});
return;
}
if (event.type === "empty_page") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`,
});
return;
}
if (event.type === "error") {
updateProgress({
step: "crawling",
detail: `UKVisaJobs: ${event.message}`,
});
}
},
});
if (!ukVisaResult.success) {
return {
discoveredJobs: [],
sourceErrors: [
`ukvisajobs: ${ukVisaResult.error ?? "unknown error"}`,
],
};
}
return {
discoveredJobs: ukVisaResult.jobs,
discoveredJobs: result.jobs,
sourceErrors: [],
};
},
@ -536,6 +269,11 @@ export async function discoverJobsStep(args: {
try {
return await sourceTask.run();
} catch (error) {
logger.warn("Discovery source task failed", {
sourceTask: sourceTask.source,
error: sanitizeUnknown(error),
});
return {
discoveredJobs: [],
sourceErrors: [
@ -551,16 +289,35 @@ export async function discoverJobsStep(args: {
sourceErrors.push(...sourceResult.sourceErrors);
}
const requestedCities = resolveSearchCities({
single: settings.searchCities ?? settings.jobspyLocation,
});
const cityFilteredJobs = filterJobsByRequestedCities({
jobs: discoveredJobs,
selectedCountry,
requestedCities,
});
const cityFilteredOutCount = discoveredJobs.length - cityFilteredJobs.length;
if (cityFilteredOutCount > 0) {
logger.info("Dropped discovered jobs that did not match requested cities", {
step: "discover-jobs",
droppedCount: cityFilteredOutCount,
requestedCities,
selectedCountry,
});
}
const blockedCompanyKeywords = parseBlockedCompanyKeywords(
settings.blockedCompanyKeywords,
);
const blockedKeywordsLowerCase = blockedCompanyKeywords.map((value) =>
value.toLowerCase(),
);
const filteredDiscoveredJobs = discoveredJobs.filter(
const filteredDiscoveredJobs = cityFilteredJobs.filter(
(job) => !isBlockedEmployer(job.employer, blockedKeywordsLowerCase),
);
const droppedCount = discoveredJobs.length - filteredDiscoveredJobs.length;
const droppedCount = cityFilteredJobs.length - filteredDiscoveredJobs.length;
if (droppedCount > 0) {
const blockedCompanyKeywordsPreview = blockedCompanyKeywords.slice(0, 10);

View File

@ -1,26 +0,0 @@
import { describe, expect, it } from "vitest";
import {
matchesRequestedLocation,
shouldApplyStrictLocationFilter,
} from "./adzuna";
describe("adzuna strict location filtering", () => {
it("enables strict filtering when city differs from country", () => {
expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe(
true,
);
});
it("disables strict filtering when location is country-level", () => {
expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false);
expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false);
});
it("matches requested location by case-insensitive contains", () => {
expect(matchesRequestedLocation("Leeds, England, UK", "leeds")).toBe(true);
expect(matchesRequestedLocation("Halifax, England, UK", "leeds")).toBe(
false,
);
expect(matchesRequestedLocation(undefined, "leeds")).toBe(false);
});
});

View File

@ -1,26 +0,0 @@
import { describe, expect, it } from "vitest";
import {
matchesRequestedLocation,
shouldApplyStrictLocationFilter,
} from "./hiring-cafe";
describe("hiringcafe strict location filtering", () => {
it("enables strict filtering when city differs from country", () => {
expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe(
true,
);
});
it("disables strict filtering when location is country-level", () => {
expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false);
expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false);
});
it("matches requested location by case-insensitive contains", () => {
expect(matchesRequestedLocation("Leeds, England, UK", "leeds")).toBe(true);
expect(matchesRequestedLocation("Halifax, England, UK", "leeds")).toBe(
false,
);
expect(matchesRequestedLocation(undefined, "leeds")).toBe(false);
});
});

View File

@ -32,6 +32,7 @@ export default defineConfig({
"src/**/*.test.ts",
"src/**/*.test.tsx",
"../shared/src/**/*.test.ts",
"../extractors/**/tests/**/*.test.ts",
],
exclude: ["node_modules/**", "dist/**"],
},

View File

@ -8,6 +8,7 @@
"shared"
],
"scripts": {
"test:all": "npm --workspace orchestrator run test:run",
"check:types:shared": "npm --workspace shared run check:types",
"check:types": "npm --workspace shared run check:types && npm --workspace orchestrator run check:types",
"check:types:ukvisajobs": "npm --workspace ukvisajobs-extractor run check:types",

View File

@ -0,0 +1,28 @@
import { describe, expect, it } from "vitest";
import {
EXTRACTOR_SOURCE_IDS,
EXTRACTOR_SOURCE_METADATA,
extractorSourceEnum,
isExtractorSourceId,
} from "./index";
describe("extractor source catalog", () => {
it("validates known source ids", () => {
for (const source of EXTRACTOR_SOURCE_IDS) {
expect(extractorSourceEnum.parse(source)).toBe(source);
expect(isExtractorSourceId(source)).toBe(true);
}
});
it("rejects unknown source ids", () => {
expect(isExtractorSourceId("unknown-source")).toBe(false);
expect(() => extractorSourceEnum.parse("unknown-source")).toThrow();
});
it("provides metadata for every known source", () => {
for (const source of EXTRACTOR_SOURCE_IDS) {
expect(EXTRACTOR_SOURCE_METADATA[source]).toBeDefined();
expect(EXTRACTOR_SOURCE_METADATA[source].label.length).toBeGreaterThan(0);
}
});
});

View File

@ -0,0 +1,81 @@
import { z } from "zod";
export const EXTRACTOR_SOURCE_IDS = [
"gradcracker",
"indeed",
"linkedin",
"glassdoor",
"ukvisajobs",
"adzuna",
"hiringcafe",
"manual",
] as const;
export type ExtractorSourceId = (typeof EXTRACTOR_SOURCE_IDS)[number];
export interface ExtractorSourceMetadata {
label: string;
order: number;
category: "pipeline" | "manual";
requiresCredentials?: boolean;
ukOnly?: boolean;
}
export const EXTRACTOR_SOURCE_METADATA: Record<
ExtractorSourceId,
ExtractorSourceMetadata
> = {
gradcracker: {
label: "Gradcracker",
order: 10,
category: "pipeline",
ukOnly: true,
},
indeed: { label: "Indeed", order: 20, category: "pipeline" },
linkedin: { label: "LinkedIn", order: 30, category: "pipeline" },
glassdoor: { label: "Glassdoor", order: 40, category: "pipeline" },
ukvisajobs: {
label: "UK Visa Jobs",
order: 50,
category: "pipeline",
requiresCredentials: true,
ukOnly: true,
},
adzuna: {
label: "Adzuna",
order: 60,
category: "pipeline",
requiresCredentials: true,
},
hiringcafe: { label: "Hiring Cafe", order: 70, category: "pipeline" },
manual: { label: "Manual", order: 90, category: "manual" },
};
export const PIPELINE_EXTRACTOR_SOURCE_IDS = EXTRACTOR_SOURCE_IDS.filter(
(source) => EXTRACTOR_SOURCE_METADATA[source].category === "pipeline",
);
const extractorSourceTuple = EXTRACTOR_SOURCE_IDS as unknown as [
ExtractorSourceId,
...ExtractorSourceId[],
];
export const extractorSourceEnum = z.enum(extractorSourceTuple);
export function isExtractorSourceId(value: string): value is ExtractorSourceId {
return EXTRACTOR_SOURCE_IDS.includes(value as ExtractorSourceId);
}
export function sourceLabel(source: ExtractorSourceId): string {
return EXTRACTOR_SOURCE_METADATA[source].label;
}
export function sortSources<T extends { source: ExtractorSourceId }>(
values: T[],
): T[] {
return [...values].sort(
(left, right) =>
EXTRACTOR_SOURCE_METADATA[left.source].order -
EXTRACTOR_SOURCE_METADATA[right.source].order,
);
}

View File

@ -1,3 +1,4 @@
export * from "./extractors";
export * from "./location-support";
export * from "./types";
export * from "./utils/type-conversion";

View File

@ -2,6 +2,7 @@ import { describe, expect, it } from "vitest";
import {
matchesRequestedCity,
parseSearchCitiesSetting,
resolveSearchCities,
serializeSearchCitiesSetting,
shouldApplyStrictCityFilter,
} from "./search-cities";
@ -26,6 +27,43 @@ describe("search-cities", () => {
expect(serializeSearchCitiesSetting([])).toBeNull();
});
it("resolves search cities from list/single/env/fallback", () => {
expect(
resolveSearchCities({
list: [" Leeds ", "London", "leeds"],
}),
).toEqual(["Leeds", "London"]);
expect(resolveSearchCities({ single: "Leeds|London" })).toEqual([
"Leeds",
"London",
]);
expect(resolveSearchCities({ env: "Leeds\nLondon" })).toEqual([
"Leeds",
"London",
]);
expect(resolveSearchCities({ fallback: "UK" })).toEqual(["UK"]);
});
it("falls back when single/env values parse to empty", () => {
expect(resolveSearchCities({ single: "", fallback: "UK" })).toEqual(["UK"]);
expect(resolveSearchCities({ single: "||", fallback: "UK" })).toEqual([
"UK",
]);
expect(resolveSearchCities({ env: " ", fallback: "UK" })).toEqual(["UK"]);
});
it("returns empty array when all resolve options are empty", () => {
expect(
resolveSearchCities({
list: [],
single: "",
env: "",
fallback: "",
}),
).toEqual([]);
});
it("applies strict filter only when city differs from country", () => {
expect(shouldApplyStrictCityFilter("Leeds", "united kingdom")).toBe(true);
expect(shouldApplyStrictCityFilter("UK", "united kingdom")).toBe(false);

View File

@ -37,6 +37,36 @@ export function parseSearchCitiesSetting(
return out;
}
interface ResolveSearchCitiesOptions {
list?: string[] | null;
single?: string | null;
env?: string | null;
fallback?: string | null;
}
export function resolveSearchCities(
options: ResolveSearchCitiesOptions,
): string[] {
// Priority order:
// 1) explicit list (searchCities array in config)
// 2) explicit single value
// 3) environment fallback
// 4) final hardcoded/default fallback
if (options.list && options.list.length > 0) {
const parsedList = parseSearchCitiesSetting(options.list.join("|"));
if (parsedList.length > 0) return parsedList;
}
const fallbackCandidates = [options.single, options.env, options.fallback];
for (const candidate of fallbackCandidates) {
if (candidate === null || candidate === undefined) continue;
const parsed = parseSearchCitiesSetting(candidate);
if (parsed.length > 0) return parsed;
}
return [];
}
export function serializeSearchCitiesSetting(cities: string[]): string | null {
if (cities.length === 0) return null;
return cities.join("|");

View File

@ -7,6 +7,7 @@
export * from "./types/api";
export * from "./types/chat";
export * from "./types/extractors";
export * from "./types/jobs";
export * from "./types/pipeline";
export * from "./types/post-application";

View File

@ -0,0 +1,40 @@
import type { CreateJobInput } from "./jobs";
export interface ExtractorProgressEvent {
phase?: "list" | "job";
currentUrl?: string;
termsProcessed?: number;
termsTotal?: number;
listPagesProcessed?: number;
listPagesTotal?: number;
jobCardsFound?: number;
jobPagesEnqueued?: number;
jobPagesSkipped?: number;
jobPagesProcessed?: number;
detail?: string;
}
export interface ExtractorRuntimeContext {
source: string;
selectedSources: string[];
settings: Record<string, string | undefined>;
searchTerms: string[];
selectedCountry: string;
getExistingJobUrls?: () => Promise<string[]>;
shouldCancel?: () => boolean;
onProgress?: (event: ExtractorProgressEvent) => void;
}
export interface ExtractorRunResult {
success: boolean;
jobs: CreateJobInput[];
error?: string;
}
export interface ExtractorManifest {
id: string;
displayName: string;
providesSources: readonly string[];
requiredEnvVars?: readonly string[];
run: (context: ExtractorRuntimeContext) => Promise<ExtractorRunResult>;
}

View File

@ -1,3 +1,5 @@
import type { ExtractorSourceId } from "../extractors";
export type JobStatus =
| "discovered" // Crawled but not processed
| "processing" // Currently generating resume
@ -115,15 +117,7 @@ export interface Interview {
outcome: InterviewOutcome | null;
}
export type JobSource =
| "gradcracker"
| "indeed"
| "linkedin"
| "glassdoor"
| "ukvisajobs"
| "adzuna"
| "hiringcafe"
| "manual";
export type JobSource = ExtractorSourceId;
export interface Job {
id: string;

View File

@ -1,9 +1,10 @@
import type { Job, JobSource, JobStatus } from "./jobs";
import type { ExtractorSourceId } from "../extractors";
import type { Job, JobStatus } from "./jobs";
export interface PipelineConfig {
topN: number; // Number of top jobs to process
minSuitabilityScore: number; // Minimum score to auto-process
sources: JobSource[]; // Job sources to crawl
sources: ExtractorSourceId[]; // Job sources to crawl
outputDir: string; // Directory for generated PDFs
enableCrawling?: boolean;
enableScoring?: boolean;