diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b6866ed..2afa9bf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,8 +23,8 @@ jobs: - name: Run Biome run: biome ci . - test-orchestrator: - name: Orchestrator Tests + tests: + name: Tests runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -40,8 +40,8 @@ jobs: - name: Build better-sqlite3 run: npm --workspace orchestrator rebuild better-sqlite3 working-directory: . - - name: Run Vitest - run: npm --workspace orchestrator run test:run + - name: Run Tests + run: npm run test:all working-directory: . typecheck: diff --git a/docs-site/docs/extractors/overview.md b/docs-site/docs/extractors/overview.md index 553bbfa..74eb009 100644 --- a/docs-site/docs/extractors/overview.md +++ b/docs-site/docs/extractors/overview.md @@ -7,6 +7,8 @@ sidebar_position: 1 This page helps you choose the right extractor for your run, understand key constraints, and navigate to detailed technical guides. +Extractor integrations are now registered through manifests and loaded automatically at orchestrator startup. Runtime discovery only scans `extractors/*/(manifest.ts|src/manifest.ts)` and does not read manifests from `orchestrator/**`. Extractor-specific run logic should also remain in `extractors//` so orchestrator stays source-agnostic. To add a new source, follow [Add an Extractor](/docs/next/workflows/add-an-extractor). + ## Extractor chooser | Extractor | Best use case | Core constraints/dependencies | Notable controls | Output/behavior notes | @@ -37,3 +39,4 @@ Many runs combine sources: broad discovery first, then manual import for high-pr - [Hiring Cafe](/docs/next/extractors/hiring-cafe) - [UKVisaJobs](/docs/next/extractors/ukvisajobs) - [Manual Import](/docs/next/extractors/manual) +- [Add an Extractor](/docs/next/workflows/add-an-extractor) diff --git a/docs-site/docs/workflows/add-an-extractor.md b/docs-site/docs/workflows/add-an-extractor.md new file mode 100644 index 0000000..e03e4e0 --- /dev/null +++ b/docs-site/docs/workflows/add-an-extractor.md @@ -0,0 +1,95 @@ +--- +id: add-an-extractor +title: Add an Extractor +description: How to add a new extractor using the manifest contract and shared extractor catalog. +sidebar_position: 2 +--- + +## What it is + +This guide explains how to add a new extractor that is auto-registered at orchestrator startup. + +The extractor runtime is discovered from a local `manifest.ts` file, and the source is type-safe across API/client through the shared catalog in `shared/src/extractors/index.ts`. + +Extractor manifests must live in extractor packages under `extractors//` only. Do not add manifest files inside `orchestrator/`. +Extractor run logic should also live in the extractor package so orchestrator stays extractor-agnostic. + +## Why it exists + +Without a manifest contract, adding extractors required touching multiple orchestrator files. + +With the manifest system, contributors only need to: + +1. Add a manifest in their extractor package. +2. Add the new source id to the shared typed catalog. + +That keeps runtime wiring dynamic while preserving compile-time safety in API and client code. + +## How to use it + +1. Create your extractor package under `extractors//`. +2. Add a `manifest.ts` in the extractor package root (or `src/manifest.ts`). + - Valid locations are only `extractors//manifest.ts` or `extractors//src/manifest.ts`. + - `orchestrator/**/manifest.ts` is not used for extractor discovery. +3. Export a manifest with: + - `id` + - `displayName` + - `providesSources` + - `requiredEnvVars` (optional) + - `run(context)` that returns `{ success, jobs, error? }` +4. Add the new source id to `shared/src/extractors/index.ts`: + - append to `EXTRACTOR_SOURCE_IDS` + - add an entry in `EXTRACTOR_SOURCE_METADATA` +5. Ensure your extractor maps output to `CreateJobInput[]`. +6. Run the full CI checks. + +Example manifest: + +```ts +import type { ExtractorManifest } from "@shared/types/extractors"; + +export const manifest: ExtractorManifest = { + id: "myextractor", + displayName: "My Extractor", + providesSources: ["myextractor"], + requiredEnvVars: ["MYEXTRACTOR_API_KEY"], + async run(context) { + // context.searchTerms, context.settings, context.onProgress, context.shouldCancel + const jobs = []; + return { success: true, jobs }; + }, +}; + +export default manifest; +``` + +Subprocess extractors are supported. Keep subprocess spawning inside `run(context)` so orchestrator only depends on the manifest contract. + +## Common problems + +### Extractor not discovered at startup + +- Check file path: `extractors//manifest.ts` or `extractors//src/manifest.ts`. +- Ensure the file exports `default` or named `manifest`. + +### Source compiles in extractor but fails in API/client + +- Add the new source id to `shared/src/extractors/index.ts`. +- Confirm metadata exists for that source id. + +### Source appears in shared catalog but is unavailable at runtime + +- The manifest was not loaded successfully. +- Check startup logs for registry warnings. + +### Source requires credentials but never returns jobs + +- Add and validate `requiredEnvVars`. +- Verify your manifest `run(context)` reads settings/env values correctly. + +## Related pages + +- [Extractors Overview](/docs/next/extractors/overview) +- [Adzuna Extractor](/docs/next/extractors/adzuna) +- [Hiring Cafe Extractor](/docs/next/extractors/hiring-cafe) +- [UKVisaJobs Extractor](/docs/next/extractors/ukvisajobs) diff --git a/docs-site/sidebars.ts b/docs-site/sidebars.ts index 0fd5780..cefa234 100644 --- a/docs-site/sidebars.ts +++ b/docs-site/sidebars.ts @@ -17,6 +17,7 @@ const sidebars: SidebarsConfig = { items: [ "workflows/find-jobs-and-apply-workflow", "workflows/post-application-workflow", + "workflows/add-an-extractor", ], }, { diff --git a/extractors/adzuna/manifest.ts b/extractors/adzuna/manifest.ts new file mode 100644 index 0000000..8ea1592 --- /dev/null +++ b/extractors/adzuna/manifest.ts @@ -0,0 +1,120 @@ +import { getAdzunaCountryCode } from "@shared/location-support.js"; +import { resolveSearchCities } from "@shared/search-cities.js"; +import type { + ExtractorManifest, + ExtractorProgressEvent, +} from "@shared/types/extractors"; +import { runAdzuna } from "./src/run"; + +function toProgress(event: { + type: string; + termIndex: number; + termTotal: number; + searchTerm: string; + pageNo?: number; + totalCollected?: number; +}): ExtractorProgressEvent { + if (event.type === "term_start") { + return { + phase: "list", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + currentUrl: event.searchTerm, + detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }; + } + + if (event.type === "page_fetched") { + const pageNo = event.pageNo ?? 0; + const totalCollected = event.totalCollected ?? 0; + return { + phase: "list", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: pageNo, + jobPagesEnqueued: totalCollected, + jobPagesProcessed: totalCollected, + currentUrl: `page ${pageNo}`, + detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${pageNo} (${totalCollected} collected)`, + }; + } + + return { + phase: "list", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + currentUrl: event.searchTerm, + detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }; +} + +export const manifest: ExtractorManifest = { + id: "adzuna", + displayName: "Adzuna", + providesSources: ["adzuna"], + requiredEnvVars: ["ADZUNA_APP_ID", "ADZUNA_APP_KEY"], + async run(context) { + if (context.shouldCancel?.()) { + return { success: true, jobs: [] }; + } + + const countryCode = getAdzunaCountryCode(context.selectedCountry); + if (!countryCode) { + return { + success: false, + jobs: [], + error: `unsupported country ${context.selectedCountry}`, + }; + } + + const maxJobsPerTerm = context.settings.adzunaMaxJobsPerTerm + ? parseInt(context.settings.adzunaMaxJobsPerTerm, 10) + : 50; + + let result: Awaited>; + try { + result = await runAdzuna({ + country: countryCode, + countryKey: context.selectedCountry, + searchTerms: context.searchTerms, + locations: resolveSearchCities({ + single: + context.settings.searchCities ?? context.settings.jobspyLocation, + }), + maxJobsPerTerm, + onProgress: (event) => { + if (context.shouldCancel?.()) return; + + context.onProgress?.(toProgress(event)); + }, + }); + } catch (error) { + const message = + error instanceof Error + ? error.message + : typeof error === "string" + ? error + : "Unexpected error while running Adzuna extractor."; + return { + success: false, + jobs: [], + error: message, + }; + } + + if (!result.success) { + return { + success: false, + jobs: [], + error: result.error, + }; + } + + return { + success: true, + jobs: result.jobs, + }; + }, +}; + +export default manifest; diff --git a/orchestrator/src/server/services/adzuna.ts b/extractors/adzuna/src/run.ts similarity index 87% rename from orchestrator/src/server/services/adzuna.ts rename to extractors/adzuna/src/run.ts index d47abba..599ec29 100644 --- a/orchestrator/src/server/services/adzuna.ts +++ b/extractors/adzuna/src/run.ts @@ -4,19 +4,20 @@ import { createRequire } from "node:module"; import { dirname, join } from "node:path"; import { createInterface } from "node:readline"; import { fileURLToPath } from "node:url"; -import { logger } from "@infra/logger"; import { normalizeCountryKey } from "@shared/location-support.js"; import { - matchesRequestedCity, - parseSearchCitiesSetting, + resolveSearchCities, shouldApplyStrictCityFilter, } from "@shared/search-cities.js"; -import type { CreateJobInput } from "@shared/types"; -import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion"; +import type { CreateJobInput } from "@shared/types/jobs"; +import { + toNumberOrNull, + toStringOrNull, +} from "@shared/utils/type-conversion.js"; -const __dirname = dirname(fileURLToPath(import.meta.url)); -const ADZUNA_DIR = join(__dirname, "../../../../extractors/adzuna"); -const DATASET_PATH = join(ADZUNA_DIR, "storage/datasets/default/jobs.json"); +const srcDir = dirname(fileURLToPath(import.meta.url)); +const EXTRACTOR_DIR = join(srcDir, ".."); +const DATASET_PATH = join(EXTRACTOR_DIR, "storage/datasets/default/jobs.json"); const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; const require = createRequire(import.meta.url); const TSX_CLI_PATH = resolveTsxCliPath(); @@ -69,20 +70,6 @@ export function shouldApplyStrictLocationFilter( return shouldApplyStrictCityFilter(location, countryKey); } -export function matchesRequestedLocation( - jobLocation: string | undefined, - requestedLocation: string, -): boolean { - return matchesRequestedCity(jobLocation, requestedLocation); -} - -function resolveLocations(options: RunAdzunaOptions): string[] { - const raw = options.locations?.length - ? options.locations - : parseSearchCitiesSetting(process.env.ADZUNA_LOCATION_QUERY ?? ""); - return raw.map((value) => value.trim()).filter(Boolean); -} - function resolveTsxCliPath(): string | null { try { return require.resolve("tsx/dist/cli.mjs"); @@ -205,7 +192,10 @@ export async function runAdzuna( options.searchTerms && options.searchTerms.length > 0 ? options.searchTerms : ["web developer"]; - const locations = resolveLocations(options); + const locations = resolveSearchCities({ + list: options.locations, + env: process.env.ADZUNA_LOCATION_QUERY, + }); const runLocations = locations.length > 0 ? locations : [null]; const termTotal = searchTerms.length * runLocations.length; const useNpmCommand = canRunNpmCommand(); @@ -241,7 +231,7 @@ export async function runAdzuna( }; const child = useNpmCommand ? spawn("npm", ["run", "start"], { - cwd: ADZUNA_DIR, + cwd: EXTRACTOR_DIR, stdio: ["ignore", "pipe", "pipe"], env: extractorEnv, }) @@ -253,7 +243,7 @@ export async function runAdzuna( ); } return spawn(process.execPath, [tsxCliPath, "src/main.ts"], { - cwd: ADZUNA_DIR, + cwd: EXTRACTOR_DIR, stdio: ["ignore", "pipe", "pipe"], env: extractorEnv, }); @@ -293,11 +283,7 @@ export async function runAdzuna( }); const runJobs = await readDataset(); - const filtered = strictLocationFilter - ? runJobs.filter((job) => - matchesRequestedLocation(job.location, location), - ) - : runJobs; + const filtered = runJobs; for (const job of filtered) { const key = job.sourceJobId || job.jobUrl; @@ -310,7 +296,6 @@ export async function runAdzuna( return { success: true, jobs }; } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; - logger.warn("Adzuna extractor run failed", { error: message }); return { success: false, jobs: [], error: message }; } } diff --git a/extractors/adzuna/tests/location.test.ts b/extractors/adzuna/tests/location.test.ts new file mode 100644 index 0000000..af19f51 --- /dev/null +++ b/extractors/adzuna/tests/location.test.ts @@ -0,0 +1,15 @@ +import { describe, expect, it } from "vitest"; +import { shouldApplyStrictLocationFilter } from "../src/run"; + +describe("adzuna location query strictness", () => { + it("enables strict filtering when city differs from country", () => { + expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe( + true, + ); + }); + + it("disables strict filtering when location is country-level", () => { + expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false); + expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false); + }); +}); diff --git a/extractors/adzuna/tsconfig.json b/extractors/adzuna/tsconfig.json index 6ace792..808b15f 100644 --- a/extractors/adzuna/tsconfig.json +++ b/extractors/adzuna/tsconfig.json @@ -1,13 +1,17 @@ { "compilerOptions": { - "module": "NodeNext", - "moduleResolution": "NodeNext", + "module": "ESNext", + "moduleResolution": "bundler", "target": "ES2022", "outDir": "dist", "strict": true, "noUnusedLocals": false, "lib": ["ES2022", "DOM"], - "types": ["node"] + "types": ["node"], + "baseUrl": ".", + "paths": { + "@shared/*": ["../../shared/src/*"] + } }, "include": ["./src/**/*"] } diff --git a/extractors/gradcracker/manifest.ts b/extractors/gradcracker/manifest.ts new file mode 100644 index 0000000..1a8e42c --- /dev/null +++ b/extractors/gradcracker/manifest.ts @@ -0,0 +1,56 @@ +import type { + ExtractorManifest, + ExtractorRuntimeContext, +} from "@shared/types/extractors"; +import { runCrawler } from "./src/run"; + +export const manifest: ExtractorManifest = { + id: "gradcracker", + displayName: "Gradcracker", + providesSources: ["gradcracker"], + async run(context: ExtractorRuntimeContext) { + if (context.shouldCancel?.()) { + return { success: true, jobs: [] }; + } + + const existingJobUrls = await context.getExistingJobUrls?.(); + const maxJobsPerTerm = context.settings.gradcrackerMaxJobsPerTerm + ? parseInt(context.settings.gradcrackerMaxJobsPerTerm, 10) + : 50; + + const result = await runCrawler({ + existingJobUrls, + searchTerms: context.searchTerms, + maxJobsPerTerm, + onProgress: (progress) => { + if (context.shouldCancel?.()) return; + + context.onProgress?.({ + phase: progress.phase, + currentUrl: progress.currentUrl, + listPagesProcessed: progress.listPagesProcessed, + listPagesTotal: progress.listPagesTotal, + jobCardsFound: progress.jobCardsFound, + jobPagesEnqueued: progress.jobPagesEnqueued, + jobPagesSkipped: progress.jobPagesSkipped, + jobPagesProcessed: progress.jobPagesProcessed, + }); + }, + }); + + if (!result.success) { + return { + success: false, + jobs: [], + error: result.error, + }; + } + + return { + success: true, + jobs: result.jobs, + }; + }, +}; + +export default manifest; diff --git a/orchestrator/src/server/services/crawler.ts b/extractors/gradcracker/src/run.ts similarity index 63% rename from orchestrator/src/server/services/crawler.ts rename to extractors/gradcracker/src/run.ts index 4af4512..5890bd7 100644 --- a/orchestrator/src/server/services/crawler.ts +++ b/extractors/gradcracker/src/run.ts @@ -1,19 +1,30 @@ -/** - * Service for running the Gradcracker crawler (extractors/gradcracker). - * Wraps the existing Crawlee-based crawler. - */ - import { spawn } from "node:child_process"; -import { mkdir, readdir, readFile, writeFile } from "node:fs/promises"; +import { mkdir, readdir, readFile, rm, writeFile } from "node:fs/promises"; import { dirname, join } from "node:path"; import { createInterface } from "node:readline"; import { fileURLToPath } from "node:url"; -import type { CreateJobInput } from "@shared/types"; -const __dirname = dirname(fileURLToPath(import.meta.url)); -const CRAWLER_DIR = join(__dirname, "../../../../extractors/gradcracker"); -const STORAGE_DIR = join(CRAWLER_DIR, "storage/datasets/default"); -const JOBOPS_STORAGE_DIR = join(CRAWLER_DIR, "storage/jobops"); +type CreateJobInput = { + source: "gradcracker"; + title: string; + employer: string; + jobUrl: string; + employerUrl?: string; + applicationLink?: string; + disciplines?: string; + deadline?: string; + salary?: string; + location?: string; + degreeRequired?: string; + starting?: string; + jobDescription?: string; +}; + +const srcDir = dirname(fileURLToPath(import.meta.url)); +const EXTRACTOR_DIR = join(srcDir, ".."); +const STORAGE_DIR = join(EXTRACTOR_DIR, "storage/datasets/default"); +const JOBOPS_STORAGE_DIR = join(EXTRACTOR_DIR, "storage/jobops"); +const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; export interface CrawlerResult { success: boolean; @@ -22,25 +33,9 @@ export interface CrawlerResult { } export interface RunCrawlerOptions { - /** - * List of job page URLs already present in the orchestrator DB. - * Used by the crawler to avoid expensive/undesired interactions (e.g. apply button click). - */ existingJobUrls?: string[]; - - /** - * Optional callback for live crawl progress emitted by the Gradcracker extractor. - */ onProgress?: (update: JobExtractorProgress) => void; - - /** - * List of search terms to be used as roles for URL generation. - */ searchTerms?: string[]; - - /** - * Max jobs to fetch per search term. - */ maxJobsPerTerm?: number; } @@ -56,8 +51,6 @@ interface JobExtractorProgress { ts?: string; } -const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; - async function writeExistingJobUrlsFile( existingJobUrls: string[] | undefined, ): Promise { @@ -68,26 +61,18 @@ async function writeExistingJobUrlsFile( return filePath; } -/** - * Run the Gradcracker crawler and return discovered jobs. - */ export async function runCrawler( options: RunCrawlerOptions = {}, ): Promise { - console.log("🕷️ Starting job crawler..."); - try { - // Clear previous results await clearStorageDataset(); - const existingJobUrlsFile = await writeExistingJobUrlsFile( options.existingJobUrls, ); - // Run the crawler await new Promise((resolve, reject) => { const child = spawn("npm", ["run", "start"], { - cwd: CRAWLER_DIR, + cwd: EXTRACTOR_DIR, shell: true, stdio: ["ignore", "pipe", "pipe"], env: { @@ -113,7 +98,7 @@ export async function runCrawler( const parsed = JSON.parse(raw) as JobExtractorProgress; options.onProgress?.(parsed); } catch { - // Ignore malformed progress lines + // ignore malformed progress lines } return; } @@ -143,66 +128,58 @@ export async function runCrawler( child.on("error", reject); }); - // Read crawled jobs from storage const jobs = await readCrawledJobs(); - - console.log(`✅ Crawler completed. Found ${jobs.length} jobs.`); - return { success: true, jobs }; } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; - console.error("❌ Crawler failed:", message); return { success: false, jobs: [], error: message }; } } -/** - * Read crawled jobs from the Crawlee storage dataset. - */ async function readCrawledJobs(): Promise { try { const files = await readdir(STORAGE_DIR); - const jsonFiles = files.filter((f) => f.endsWith(".json")); - + const jsonFiles = files.filter((file) => file.endsWith(".json")); const jobs: CreateJobInput[] = []; for (const file of jsonFiles) { const content = await readFile(join(STORAGE_DIR, file), "utf-8"); - const data = JSON.parse(content); + const data = JSON.parse(content) as Record; - // Map crawler output to our job input format jobs.push({ source: "gradcracker", - title: data.title || "Unknown Title", - employer: data.employer || "Unknown Employer", - employerUrl: data.employerUrl, - jobUrl: data.url || data.jobUrl, - applicationLink: data.applicationLink, - disciplines: data.disciplines, - deadline: data.deadline, - salary: data.salary, - location: data.location, - degreeRequired: data.degreeRequired, - starting: data.starting, - jobDescription: data.jobDescription, + title: (data.title as string) || "Unknown Title", + employer: (data.employer as string) || "Unknown Employer", + employerUrl: data.employerUrl as string | undefined, + jobUrl: (data.url as string) || (data.jobUrl as string), + applicationLink: data.applicationLink as string | undefined, + disciplines: + typeof data.disciplines === "string" + ? data.disciplines + : Array.isArray(data.disciplines) + ? data.disciplines + .filter((value): value is string => typeof value === "string") + .join(", ") + : undefined, + deadline: data.deadline as string | undefined, + salary: data.salary as string | undefined, + location: data.location as string | undefined, + degreeRequired: data.degreeRequired as string | undefined, + starting: data.starting as string | undefined, + jobDescription: data.jobDescription as string | undefined, }); } return jobs; - } catch (error) { - console.error("Failed to read crawled jobs:", error); + } catch { return []; } } -/** - * Clear previous crawl results. - */ async function clearStorageDataset(): Promise { - const { rm } = await import("node:fs/promises"); try { await rm(STORAGE_DIR, { recursive: true, force: true }); } catch { - // Ignore if directory doesn't exist + // ignore } } diff --git a/extractors/gradcracker/tsconfig.json b/extractors/gradcracker/tsconfig.json index e7c2f4a..a71a65c 100644 --- a/extractors/gradcracker/tsconfig.json +++ b/extractors/gradcracker/tsconfig.json @@ -6,7 +6,11 @@ "target": "ES2022", "outDir": "dist", "noUnusedLocals": false, - "lib": ["DOM"] + "lib": ["DOM"], + "baseUrl": ".", + "paths": { + "@shared/*": ["../../shared/src/*"] + } }, "include": ["./src/**/*"] } diff --git a/extractors/hiringcafe/manifest.ts b/extractors/hiringcafe/manifest.ts new file mode 100644 index 0000000..c5aa431 --- /dev/null +++ b/extractors/hiringcafe/manifest.ts @@ -0,0 +1,94 @@ +import { resolveSearchCities } from "@shared/search-cities.js"; +import type { + ExtractorManifest, + ExtractorProgressEvent, +} from "@shared/types/extractors"; +import { runHiringCafe } from "./src/run"; + +function toProgress(event: { + type: string; + termIndex: number; + termTotal: number; + searchTerm: string; + pageNo?: number; + totalCollected?: number; +}): ExtractorProgressEvent { + if (event.type === "term_start") { + return { + phase: "list", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + currentUrl: event.searchTerm, + detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }; + } + + if (event.type === "page_fetched") { + const pageNo = (event.pageNo ?? 0) + 1; + const totalCollected = event.totalCollected ?? 0; + return { + phase: "list", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: pageNo, + jobPagesEnqueued: totalCollected, + jobPagesProcessed: totalCollected, + currentUrl: `page ${pageNo}`, + detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${pageNo} (${totalCollected} collected)`, + }; + } + + return { + phase: "list", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + currentUrl: event.searchTerm, + detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }; +} + +export const manifest: ExtractorManifest = { + id: "hiringcafe", + displayName: "Hiring Cafe", + providesSources: ["hiringcafe"], + async run(context) { + if (context.shouldCancel?.()) { + return { success: true, jobs: [] }; + } + + const maxJobsPerTerm = context.settings.jobspyResultsWanted + ? parseInt(context.settings.jobspyResultsWanted, 10) + : 200; + + const result = await runHiringCafe({ + country: context.selectedCountry, + countryKey: context.selectedCountry, + searchTerms: context.searchTerms, + locations: resolveSearchCities({ + single: + context.settings.searchCities ?? context.settings.jobspyLocation, + }), + maxJobsPerTerm, + onProgress: (event) => { + if (context.shouldCancel?.()) return; + + context.onProgress?.(toProgress(event)); + }, + }); + + if (!result.success) { + return { + success: false, + jobs: [], + error: result.error, + }; + } + + return { + success: true, + jobs: result.jobs, + }; + }, +}; + +export default manifest; diff --git a/orchestrator/src/server/services/hiring-cafe.ts b/extractors/hiringcafe/src/run.ts similarity index 85% rename from orchestrator/src/server/services/hiring-cafe.ts rename to extractors/hiringcafe/src/run.ts index da2cf82..ff77bc2 100644 --- a/orchestrator/src/server/services/hiring-cafe.ts +++ b/extractors/hiringcafe/src/run.ts @@ -4,26 +4,22 @@ import { createRequire } from "node:module"; import { dirname, join } from "node:path"; import { createInterface } from "node:readline"; import { fileURLToPath } from "node:url"; -import { logger } from "@infra/logger"; -import { sanitizeUnknown } from "@infra/sanitize"; import { normalizeCountryKey } from "@shared/location-support.js"; import { - matchesRequestedCity, - parseSearchCitiesSetting, + resolveSearchCities, shouldApplyStrictCityFilter, } from "@shared/search-cities.js"; -import type { CreateJobInput } from "@shared/types"; -import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion"; +import type { CreateJobInput } from "@shared/types/jobs"; +import { + toNumberOrNull, + toStringOrNull, +} from "@shared/utils/type-conversion.js"; -const __dirname = dirname(fileURLToPath(import.meta.url)); -const HIRING_CAFE_DIR = join(__dirname, "../../../../extractors/hiringcafe"); -const DATASET_PATH = join( - HIRING_CAFE_DIR, - "storage/datasets/default/jobs.json", -); -const STORAGE_DATASET_DIR = join(HIRING_CAFE_DIR, "storage/datasets/default"); +const srcDir = dirname(fileURLToPath(import.meta.url)); +const EXTRACTOR_DIR = join(srcDir, ".."); +const DATASET_PATH = join(EXTRACTOR_DIR, "storage/datasets/default/jobs.json"); +const STORAGE_DATASET_DIR = join(EXTRACTOR_DIR, "storage/datasets/default"); const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; - const require = createRequire(import.meta.url); const TSX_CLI_PATH = resolveTsxCliPath(); @@ -76,20 +72,6 @@ export function shouldApplyStrictLocationFilter( return shouldApplyStrictCityFilter(location, countryKey); } -export function matchesRequestedLocation( - jobLocation: string | undefined, - requestedLocation: string, -): boolean { - return matchesRequestedCity(jobLocation, requestedLocation); -} - -function resolveLocations(options: RunHiringCafeOptions): string[] { - const raw = options.locations?.length - ? options.locations - : parseSearchCitiesSetting(process.env.HIRING_CAFE_LOCATION_QUERY ?? ""); - return raw.map((value) => value.trim()).filter(Boolean); -} - function resolveTsxCliPath(): string | null { try { return require.resolve("tsx/dist/cli.mjs"); @@ -105,7 +87,6 @@ function canRunNpmCommand(): boolean { function parseProgressLine(line: string): HiringCafeProgressEvent | null { if (!line.startsWith(JOBOPS_PROGRESS_PREFIX)) return null; - const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim(); let parsed: Record; @@ -119,10 +100,7 @@ function parseProgressLine(line: string): HiringCafeProgressEvent | null { const termIndex = toNumberOrNull(parsed.termIndex); const termTotal = toNumberOrNull(parsed.termTotal); const searchTerm = toStringOrNull(parsed.searchTerm) ?? ""; - - if (!event || termIndex === null || termTotal === null) { - return null; - } + if (!event || termIndex === null || termTotal === null) return null; if (event === "term_start") { return { type: "term_start", termIndex, termTotal, searchTerm }; @@ -131,7 +109,6 @@ function parseProgressLine(line: string): HiringCafeProgressEvent | null { if (event === "page_fetched") { const pageNo = toNumberOrNull(parsed.pageNo); if (pageNo === null) return null; - return { type: "page_fetched", termIndex, @@ -182,20 +159,15 @@ async function readDataset(): Promise { const jobs: CreateJobInput[] = []; const seen = new Set(); - for (const value of parsed) { if (!value || typeof value !== "object" || Array.isArray(value)) continue; - const mapped = mapHiringCafeRow(value as HiringCafeRawJob); if (!mapped) continue; - const dedupeKey = mapped.sourceJobId || mapped.jobUrl; if (seen.has(dedupeKey)) continue; - seen.add(dedupeKey); jobs.push(mapped); } - return jobs; } @@ -218,7 +190,10 @@ export async function runHiringCafe( 1, Math.floor(options.locationRadiusMiles ?? 1), ); - const locations = resolveLocations(options); + const locations = resolveSearchCities({ + list: options.locations, + env: process.env.HIRING_CAFE_LOCATION_QUERY, + }); const runLocations = locations.length > 0 ? locations : [null]; const termTotal = searchTerms.length * runLocations.length; @@ -259,7 +234,7 @@ export async function runHiringCafe( const child = useNpmCommand ? spawn("npm", ["run", "start"], { - cwd: HIRING_CAFE_DIR, + cwd: EXTRACTOR_DIR, stdio: ["ignore", "pipe", "pipe"], env: extractorEnv, }) @@ -272,7 +247,7 @@ export async function runHiringCafe( } return spawn(process.execPath, [tsxCliPath, "src/main.ts"], { - cwd: HIRING_CAFE_DIR, + cwd: EXTRACTOR_DIR, stdio: ["ignore", "pipe", "pipe"], env: extractorEnv, }); @@ -314,11 +289,7 @@ export async function runHiringCafe( }); const runJobs = await readDataset(); - const filtered = strictLocationFilter - ? runJobs.filter((job) => - matchesRequestedLocation(job.location, location), - ) - : runJobs; + const filtered = runJobs; for (const job of filtered) { const key = job.sourceJobId || job.jobUrl; @@ -331,10 +302,6 @@ export async function runHiringCafe( return { success: true, jobs }; } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; - logger.warn("Hiring Cafe extractor run failed", { - error: message, - details: sanitizeUnknown(error), - }); return { success: false, jobs: [], error: message }; } } diff --git a/extractors/hiringcafe/tests/location.test.ts b/extractors/hiringcafe/tests/location.test.ts new file mode 100644 index 0000000..91c81aa --- /dev/null +++ b/extractors/hiringcafe/tests/location.test.ts @@ -0,0 +1,15 @@ +import { describe, expect, it } from "vitest"; +import { shouldApplyStrictLocationFilter } from "../src/run"; + +describe("hiringcafe location query strictness", () => { + it("enables strict filtering when city differs from country", () => { + expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe( + true, + ); + }); + + it("disables strict filtering when location is country-level", () => { + expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false); + expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false); + }); +}); diff --git a/extractors/hiringcafe/tsconfig.json b/extractors/hiringcafe/tsconfig.json index 6ace792..808b15f 100644 --- a/extractors/hiringcafe/tsconfig.json +++ b/extractors/hiringcafe/tsconfig.json @@ -1,13 +1,17 @@ { "compilerOptions": { - "module": "NodeNext", - "moduleResolution": "NodeNext", + "module": "ESNext", + "moduleResolution": "bundler", "target": "ES2022", "outDir": "dist", "strict": true, "noUnusedLocals": false, "lib": ["ES2022", "DOM"], - "types": ["node"] + "types": ["node"], + "baseUrl": ".", + "paths": { + "@shared/*": ["../../shared/src/*"] + } }, "include": ["./src/**/*"] } diff --git a/extractors/jobspy/manifest.ts b/extractors/jobspy/manifest.ts new file mode 100644 index 0000000..79b9fce --- /dev/null +++ b/extractors/jobspy/manifest.ts @@ -0,0 +1,74 @@ +import type { + ExtractorManifest, + ExtractorRuntimeContext, +} from "@shared/types/extractors"; +import { runJobSpy } from "./src/run"; + +type JobSpySite = NonNullable[0]["sites"]>[number]; + +const JOBSPY_SOURCES = new Set(["indeed", "linkedin", "glassdoor"]); + +function isJobSpySite(source: string): source is JobSpySite { + return JOBSPY_SOURCES.has(source as JobSpySite); +} + +export const manifest: ExtractorManifest = { + id: "jobspy", + displayName: "JobSpy", + providesSources: ["indeed", "linkedin", "glassdoor"], + async run(context: ExtractorRuntimeContext) { + if (context.shouldCancel?.()) { + return { success: true, jobs: [] }; + } + + const sites = context.selectedSources.filter(isJobSpySite); + + const result = await runJobSpy({ + sites, + searchTerms: context.searchTerms, + location: + context.settings.searchCities ?? context.settings.jobspyLocation, + resultsWanted: context.settings.jobspyResultsWanted + ? parseInt(context.settings.jobspyResultsWanted, 10) + : undefined, + countryIndeed: context.settings.jobspyCountryIndeed, + onProgress: (event) => { + if (context.shouldCancel?.()) return; + + if (event.type === "term_start") { + context.onProgress?.({ + phase: "list", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + currentUrl: event.searchTerm, + detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + }); + return; + } + + context.onProgress?.({ + phase: "list", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + currentUrl: event.searchTerm, + detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`, + }); + }, + }); + + if (!result.success) { + return { + success: false, + jobs: [], + error: result.error, + }; + } + + return { + success: true, + jobs: result.jobs, + }; + }, +}; + +export default manifest; diff --git a/orchestrator/src/server/services/jobspy.ts b/extractors/jobspy/src/run.ts similarity index 77% rename from orchestrator/src/server/services/jobspy.ts rename to extractors/jobspy/src/run.ts index 753bc8c..f2cdba4 100644 --- a/orchestrator/src/server/services/jobspy.ts +++ b/extractors/jobspy/src/run.ts @@ -1,26 +1,19 @@ -/** - * Service for scraping jobs via JobSpy (Indeed/LinkedIn/etc) and mapping them into our DB shape. - * - * Uses a small Python wrapper script that writes both CSV + JSON to disk; we ingest the JSON. - */ - import { spawn } from "node:child_process"; import { mkdir, readFile, unlink } from "node:fs/promises"; import { dirname, join } from "node:path"; import { createInterface } from "node:readline"; import { fileURLToPath } from "node:url"; +import { resolveSearchCities } from "@shared/search-cities.js"; +import type { CreateJobInput, JobSource } from "@shared/types/jobs"; import { - matchesRequestedCity, - parseSearchCitiesSetting, - shouldApplyStrictCityFilter, -} from "@shared/search-cities.js"; -import type { CreateJobInput, JobSource } from "@shared/types"; -import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion"; -import { getDataDir } from "../config/dataDir"; + toNumberOrNull, + toStringOrNull, +} from "@shared/utils/type-conversion.js"; -const __dirname = dirname(fileURLToPath(import.meta.url)); -const JOBSPY_DIR = join(__dirname, "../../../../extractors/jobspy"); -const JOBSPY_SCRIPT = join(JOBSPY_DIR, "scrape_jobs.py"); +const srcDir = dirname(fileURLToPath(import.meta.url)); +const EXTRACTOR_DIR = join(srcDir, ".."); +const JOBSPY_SCRIPT = join(EXTRACTOR_DIR, "scrape_jobs.py"); +const OUTPUT_DIR = join(EXTRACTOR_DIR, "storage/imports"); const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; export type JobSpyProgressEvent = @@ -57,12 +50,7 @@ export function parseJobSpyProgressLine( if (!eventName || termIndex === null || termTotal === null) return null; if (eventName === "term_start") { - return { - type: "term_start", - termIndex, - termTotal, - searchTerm, - }; + return { type: "term_start", termIndex, termTotal, searchTerm }; } if (eventName === "term_complete") { return { @@ -73,15 +61,9 @@ export function parseJobSpyProgressLine( jobsFoundTerm: toNumberOrNull(parsed.jobsFoundTerm) ?? 0, }; } - return null; } -function getPythonPath(): string { - if (process.env.PYTHON_PATH) return process.env.PYTHON_PATH; - return process.platform === "win32" ? "python" : "python3"; -} - function toBooleanOrNull(value: unknown): boolean | null { if (value === null || value === undefined) return null; if (typeof value === "boolean") return value; @@ -123,19 +105,14 @@ function formatSalary(params: { const { minAmount, maxAmount, currency, interval } = params; if (minAmount === null && maxAmount === null) return null; - const fmt = (n: number) => { - // Avoid locale ambiguity; keep it simple. - const rounded = Math.round(n); - return `${rounded}`; - }; - + const fmt = (n: number) => `${Math.round(n)}`; let range: string; if (minAmount !== null && maxAmount !== null) { range = `${fmt(minAmount)}-${fmt(maxAmount)}`; } else if (minAmount !== null) { range = `${fmt(minAmount)}+`; } else if (maxAmount !== null) { - range = `${fmt(maxAmount)}`; + range = fmt(maxAmount); } else { return null; } @@ -164,33 +141,25 @@ export interface JobSpyResult { error?: string; } -export function shouldApplyStrictLocationFilter( - location: string, - countryIndeed: string, -): boolean { - return shouldApplyStrictCityFilter(location, countryIndeed); -} - -export function matchesRequestedLocation( - jobLocation: string | undefined, - requestedLocation: string, -): boolean { - return matchesRequestedCity(jobLocation, requestedLocation); -} - export async function runJobSpy( options: RunJobSpyOptions = {}, ): Promise { - const dataDir = getDataDir(); - const outputDir = join(dataDir, "imports"); - await mkdir(outputDir, { recursive: true }); + await mkdir(OUTPUT_DIR, { recursive: true }); const sites = (options.sites ?? ["indeed", "linkedin", "glassdoor"]) - .filter((s) => s === "indeed" || s === "linkedin" || s === "glassdoor") + .filter( + (site) => + site === "indeed" || site === "linkedin" || site === "glassdoor", + ) .join(","); const searchTerms = resolveSearchTerms(options); - const locations = resolveLocations(options); + const locations = resolveSearchCities({ + list: options.locations, + single: options.location, + env: process.env.JOBSPY_LOCATION, + fallback: "UK", + }); const countryIndeed = options.countryIndeed ?? process.env.JOBSPY_COUNTRY_INDEED ?? "UK"; if (searchTerms.length === 0) { @@ -200,7 +169,6 @@ export async function runJobSpy( try { const jobs: CreateJobInput[] = []; const seenJobUrls = new Set(); - const totalRuns = searchTerms.length * locations.length; let runIndex = 0; @@ -208,13 +176,18 @@ export async function runJobSpy( for (const location of locations) { runIndex += 1; const suffix = `${runIndex}_${slugForFilename(searchTerm)}_${slugForFilename(location)}`; - const outputCsv = join(outputDir, `jobspy_jobs_${suffix}.csv`); - const outputJson = join(outputDir, `jobspy_jobs_${suffix}.json`); + const outputCsv = join(OUTPUT_DIR, `jobspy_jobs_${suffix}.csv`); + const outputJson = join(OUTPUT_DIR, `jobspy_jobs_${suffix}.json`); await new Promise((resolve, reject) => { - const pythonPath = getPythonPath(); + const pythonPath = process.env.PYTHON_PATH + ? process.env.PYTHON_PATH + : process.platform === "win32" + ? "python" + : "python3"; + const child = spawn(pythonPath, [JOBSPY_SCRIPT], { - cwd: JOBSPY_DIR, + cwd: EXTRACTOR_DIR, shell: false, stdio: ["ignore", "pipe", "pipe"], env: { @@ -276,21 +249,11 @@ export async function runJobSpy( const raw = await readFile(outputJson, "utf-8"); const parsed = JSON.parse(raw) as Array>; - const mapped = mapJobSpyRows(parsed); - const strictLocationFilter = shouldApplyStrictLocationFilter( - location, - countryIndeed, - ); - const filtered = strictLocationFilter - ? mapped.filter((job) => - matchesRequestedLocation(job.location, location), - ) - : mapped; + const filtered = mapJobSpyRows(parsed); for (const job of filtered) { - const url = job.jobUrl; - if (seenJobUrls.has(url)) continue; - seenJobUrls.add(url); + if (seenJobUrls.has(job.jobUrl)) continue; + seenJobUrls.add(job.jobUrl); jobs.push(job); } @@ -298,7 +261,7 @@ export async function runJobSpy( await unlink(outputJson); await unlink(outputCsv); } catch { - // Ignore cleanup errors + // ignore cleanup errors } } } @@ -310,16 +273,6 @@ export async function runJobSpy( } } -function resolveLocations(options: RunJobSpyOptions): string[] { - const fromOptions = options.locations?.length ? options.locations : null; - const fromSingle = options.location?.trim(); - const fromEnv = process.env.JOBSPY_LOCATION?.trim(); - const raw = - fromOptions ?? parseSearchCitiesSetting(fromSingle ?? fromEnv ?? "UK"); - const out = raw.map((value) => value.trim()).filter(Boolean); - return out.length > 0 ? out : ["UK"]; -} - function resolveSearchTerms(options: RunJobSpyOptions): string[] { const fromOptions = options.searchTerms?.length ? options.searchTerms : null; const fromEnv = parseSearchTermsEnv(process.env.JOBSPY_SEARCH_TERMS); @@ -335,7 +288,6 @@ function resolveSearchTerms(options: RunJobSpyOptions): string[] { seen.add(key); out.push(normalized); } - return out; } @@ -347,7 +299,10 @@ function parseSearchTermsEnv(raw: string | undefined): string[] | null { if (trimmed.startsWith("[")) { try { const parsed = JSON.parse(trimmed) as unknown; - if (Array.isArray(parsed) && parsed.every((v) => typeof v === "string")) { + if ( + Array.isArray(parsed) && + parsed.every((value) => typeof value === "string") + ) { return parsed; } } catch { @@ -362,7 +317,7 @@ function parseSearchTermsEnv(raw: string | undefined): string[] | null { : ","; const split = trimmed .split(delimiter) - .map((t) => t.trim()) + .map((term) => term.trim()) .filter(Boolean); return split.length > 0 ? split : null; } @@ -388,34 +343,27 @@ function mapJobSpyRows( const jobUrl = toStringOrNull(row.job_url); if (!jobUrl) continue; - const title = toStringOrNull(row.title) ?? "Unknown Title"; - const employer = toStringOrNull(row.company) ?? "Unknown Employer"; - - const jobUrlDirect = toStringOrNull(row.job_url_direct); - const applicationLink = jobUrlDirect ?? jobUrl; - const minAmount = toNumberOrNull(row.min_amount); const maxAmount = toNumberOrNull(row.max_amount); const currency = toStringOrNull(row.currency); const interval = toStringOrNull(row.interval); - const salary = formatSalary({ minAmount, maxAmount, currency, interval }); + const jobUrlDirect = toStringOrNull(row.job_url_direct); + jobs.push({ source, sourceJobId: toStringOrNull(row.id) ?? undefined, jobUrlDirect: jobUrlDirect ?? undefined, datePosted: toStringOrNull(row.date_posted) ?? undefined, - - title, - employer, + title: toStringOrNull(row.title) ?? "Unknown Title", + employer: toStringOrNull(row.company) ?? "Unknown Employer", employerUrl: toStringOrNull(row.company_url) ?? undefined, jobUrl, - applicationLink, + applicationLink: jobUrlDirect ?? jobUrl, location: toStringOrNull(row.location) ?? undefined, jobDescription: toStringOrNull(row.description) ?? undefined, salary: salary ?? undefined, - jobType: toStringOrNull(row.job_type) ?? undefined, salarySource: toStringOrNull(row.salary_source) ?? undefined, salaryInterval: interval ?? undefined, diff --git a/orchestrator/src/server/services/jobspy.test.ts b/extractors/jobspy/tests/run.test.ts similarity index 55% rename from orchestrator/src/server/services/jobspy.test.ts rename to extractors/jobspy/tests/run.test.ts index dd5cb38..48a3b66 100644 --- a/orchestrator/src/server/services/jobspy.test.ts +++ b/extractors/jobspy/tests/run.test.ts @@ -1,9 +1,5 @@ import { describe, expect, it } from "vitest"; -import { - matchesRequestedLocation, - parseJobSpyProgressLine, - shouldApplyStrictLocationFilter, -} from "./jobspy"; +import { parseJobSpyProgressLine } from "../src/run"; describe("parseJobSpyProgressLine", () => { it("parses term_start progress lines", () => { @@ -42,24 +38,3 @@ describe("parseJobSpyProgressLine", () => { expect(parseJobSpyProgressLine("Found 20 jobs")).toBeNull(); }); }); - -describe("strict location filtering", () => { - it("enables strict filtering when location differs from country", () => { - expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe( - true, - ); - }); - - it("disables strict filtering when location is country-level", () => { - expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false); - expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false); - }); - - it("matches location using case-insensitive contains checks", () => { - expect(matchesRequestedLocation("Leeds, England, UK", "leeds")).toBe(true); - expect(matchesRequestedLocation("Halifax, England, UK", "leeds")).toBe( - false, - ); - expect(matchesRequestedLocation(undefined, "leeds")).toBe(false); - }); -}); diff --git a/extractors/jobspy/tsconfig.json b/extractors/jobspy/tsconfig.json new file mode 100644 index 0000000..8151bf8 --- /dev/null +++ b/extractors/jobspy/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "module": "NodeNext", + "moduleResolution": "NodeNext", + "target": "ES2022", + "strict": true, + "noUnusedLocals": false, + "lib": ["ES2022", "DOM"], + "types": ["node"], + "baseUrl": ".", + "paths": { + "@shared/*": ["../../shared/src/*"] + } + }, + "include": ["./src/**/*"] +} diff --git a/extractors/ukvisajobs/manifest.ts b/extractors/ukvisajobs/manifest.ts new file mode 100644 index 0000000..157d675 --- /dev/null +++ b/extractors/ukvisajobs/manifest.ts @@ -0,0 +1,104 @@ +import type { + ExtractorManifest, + ExtractorProgressEvent, + ExtractorRuntimeContext, +} from "@shared/types/extractors"; +import { runUkVisaJobs } from "./src/run"; + +function toProgress(event: { + type: string; + termIndex: number; + termTotal: number; + searchTerm: string; + pageNo?: number; + maxPages?: number; + totalCollected?: number; + message?: string; +}): ExtractorProgressEvent { + if (event.type === "init") { + return { + phase: "list", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: 0, + listPagesTotal: event.maxPages ?? 0, + currentUrl: event.searchTerm || "all jobs", + detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, + }; + } + + if (event.type === "page_fetched") { + return { + phase: "list", + termsProcessed: Math.max(event.termIndex - 1, 0), + termsTotal: event.termTotal, + listPagesProcessed: event.pageNo ?? 0, + listPagesTotal: event.maxPages ?? 0, + jobPagesEnqueued: event.totalCollected ?? 0, + jobPagesProcessed: event.totalCollected ?? 0, + currentUrl: `page ${event.pageNo ?? 0}/${event.maxPages ?? 0}`, + detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo ?? 0}/${event.maxPages ?? 0} (${event.totalCollected ?? 0} collected)`, + }; + } + + if (event.type === "term_complete") { + return { + phase: "list", + termsProcessed: event.termIndex, + termsTotal: event.termTotal, + currentUrl: event.searchTerm || "all jobs", + detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, + }; + } + + if (event.type === "empty_page") { + return { + detail: `UKVisaJobs: page ${event.pageNo ?? 0} returned no jobs`, + }; + } + + return { + detail: `UKVisaJobs: ${event.message ?? "unknown event"}`, + }; +} + +export const manifest: ExtractorManifest = { + id: "ukvisajobs", + displayName: "UK Visa Jobs", + providesSources: ["ukvisajobs"], + requiredEnvVars: ["UKVISAJOBS_EMAIL", "UKVISAJOBS_PASSWORD"], + async run(context: ExtractorRuntimeContext) { + if (context.shouldCancel?.()) { + return { success: true, jobs: [] }; + } + + const maxJobs = context.settings.ukvisajobsMaxJobs + ? parseInt(context.settings.ukvisajobsMaxJobs, 10) + : 50; + + const result = await runUkVisaJobs({ + maxJobs, + searchTerms: context.searchTerms, + onProgress: (event) => { + if (context.shouldCancel?.()) return; + + context.onProgress?.(toProgress(event)); + }, + }); + + if (!result.success) { + return { + success: false, + jobs: [], + error: result.error, + }; + } + + return { + success: true, + jobs: result.jobs, + }; + }, +}; + +export default manifest; diff --git a/orchestrator/src/server/services/ukvisajobs.ts b/extractors/ukvisajobs/src/run.ts similarity index 69% rename from orchestrator/src/server/services/ukvisajobs.ts rename to extractors/ukvisajobs/src/run.ts index 652d299..e4e2e40 100644 --- a/orchestrator/src/server/services/ukvisajobs.ts +++ b/extractors/ukvisajobs/src/run.ts @@ -1,21 +1,36 @@ -/** - * Service for running the UK Visa Jobs extractor (extractors/ukvisajobs). - * - * Spawns the extractor as a child process and reads its output dataset. - */ - import { spawn } from "node:child_process"; import { mkdir, readdir, readFile, rm } from "node:fs/promises"; import { dirname, join } from "node:path"; import { createInterface } from "node:readline"; import { fileURLToPath } from "node:url"; -import type { CreateJobInput } from "@shared/types"; -import { toNumberOrNull, toStringOrNull } from "@shared/utils/type-conversion"; -const __dirname = dirname(fileURLToPath(import.meta.url)); -const UKVISAJOBS_DIR = join(__dirname, "../../../../extractors/ukvisajobs"); -const STORAGE_DIR = join(UKVISAJOBS_DIR, "storage/datasets/default"); -const AUTH_CACHE_PATH = join(UKVISAJOBS_DIR, "storage/ukvisajobs-auth.json"); +type CreateJobInput = { + source: "ukvisajobs"; + sourceJobId?: string; + title: string; + employer: string; + employerUrl?: string; + jobUrl: string; + applicationLink?: string; + location?: string; + deadline?: string; + salary?: string; + jobDescription?: string; + datePosted?: string; + degreeRequired?: string; + jobType?: string; + jobLevel?: string; +}; + +import { + toNumberOrNull, + toStringOrNull, +} from "@shared/utils/type-conversion.js"; + +const srcDir = dirname(fileURLToPath(import.meta.url)); +const EXTRACTOR_DIR = join(srcDir, ".."); +const STORAGE_DIR = join(EXTRACTOR_DIR, "storage/datasets/default"); +const AUTH_CACHE_PATH = join(EXTRACTOR_DIR, "storage/ukvisajobs-auth.json"); const JOBOPS_PROGRESS_PREFIX = "JOBOPS_PROGRESS "; let isUkVisaJobsRunning = false; @@ -27,13 +42,9 @@ interface UkVisaJobsAuthSession { } export interface RunUkVisaJobsOptions { - /** Maximum number of jobs to fetch per search term. Defaults to 50, max 200. */ maxJobs?: number; - /** Search keyword filter (single) - legacy support */ searchKeyword?: string; - /** List of search terms to run sequentially */ searchTerms?: string[]; - /** Optional callback for structured progress emitted by extractor runs. */ onProgress?: (event: UkVisaJobsProgressEvent) => void; } @@ -170,14 +181,9 @@ export function parseUkVisaJobsProgressLine( return null; } -/** - * Basic HTML to text conversion to extract job description. - */ function cleanHtml(html: string): string { - // Remove script, style tags and their content let text = html.replace(/<(script|style)[^>]*>[\s\S]*?<\/\1>/gi, ""); - // Try to extract content between
tags if present, or fallback to body const mainMatch = html.match(/]*>([\s\S]*?)<\/main>/i); const bodyMatch = html.match(/]*>([\s\S]*?)<\/body>/i); if (mainMatch) { @@ -186,21 +192,15 @@ function cleanHtml(html: string): string { text = bodyMatch[1]; } - // Remove remaining HTML tags text = text.replace(/<[^>]+>/g, " "); - - // Unescape common entities text = text .replace(/ /g, " ") .replace(/&/g, "&") .replace(/</g, "<") .replace(/>/g, ">") .replace(/"/g, '"'); - - // Normalize whitespace text = text.replace(/\s+/g, " ").trim(); - // Limit length to avoid blowing up AI context if (text.length > 8000) { text = `${text.substring(0, 8000)}...`; } @@ -208,19 +208,16 @@ function cleanHtml(html: string): string { return text; } -/** - * Fetch job description from the job URL. - */ async function fetchJobDescription(url: string): Promise { try { - console.log(` Fetching description from ${url}...`); - const authSession = await loadCachedAuthSession(); const cookieParts: string[] = []; - if (authSession?.csrfToken) + if (authSession?.csrfToken) { cookieParts.push(`csrf_token=${authSession.csrfToken}`); - if (authSession?.ciSession) + } + if (authSession?.ciSession) { cookieParts.push(`ci_session=${authSession.ciSession}`); + } const token = authSession?.authToken || authSession?.token; if (token) cookieParts.push(`authToken=${token}`); @@ -235,20 +232,14 @@ async function fetchJobDescription(url: string): Promise { const response = await fetch(url, { headers, - signal: AbortSignal.timeout(10000), // 10s timeout + signal: AbortSignal.timeout(10000), }); - if (!response.ok) return null; const html = await response.text(); const cleaned = cleanHtml(html); - - // If we only got a tiny bit of text, it might have failed return cleaned.length > 100 ? cleaned : null; - } catch (error) { - console.warn( - ` ⚠️ Failed to fetch description: ${error instanceof Error ? error.message : "Unknown error"}`, - ); + } catch { return null; } } @@ -262,14 +253,11 @@ async function loadCachedAuthSession(): Promise { } } -/** - * Clear previous extraction results. - */ async function clearStorageDataset(): Promise { try { await rm(STORAGE_DIR, { recursive: true, force: true }); } catch { - // Ignore if directory doesn't exist + // ignore } } @@ -286,16 +274,12 @@ export async function runUkVisaJobs( isUkVisaJobsRunning = true; try { - console.log("🇬🇧 Running UK Visa Jobs extractor..."); - - // Determine terms to run const terms: string[] = []; if (options.searchTerms && options.searchTerms.length > 0) { terms.push(...options.searchTerms); } else if (options.searchKeyword) { terms.push(options.searchKeyword); } else { - // No search terms = run once without keyword terms.push(""); } @@ -303,21 +287,17 @@ export async function runUkVisaJobs( const seenIds = new Set(); const termTotal = terms.length; - for (let i = 0; i < terms.length; i++) { + for (let i = 0; i < terms.length; i += 1) { const term = terms[i]; - const termLabel = term ? `"${term}"` : "all jobs"; const termIndex = i + 1; - console.log(` Running for ${termLabel}...`); try { - // Clear previous results for this run await clearStorageDataset(); await mkdir(STORAGE_DIR, { recursive: true }); - // Run the extractor await new Promise((resolve, reject) => { const child = spawn("npx", ["tsx", "src/main.ts"], { - cwd: UKVISAJOBS_DIR, + cwd: EXTRACTOR_DIR, stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, @@ -355,59 +335,51 @@ export async function runUkVisaJobs( stdoutRl?.close(); stderrRl?.close(); if (code === 0) resolve(); - else + else { reject( new Error(`UK Visa Jobs extractor exited with code ${code}`), ); + } }); child.on("error", reject); }); - // Read the output dataset and accumulate const runJobs = await readDataset(); - let newCount = 0; + let jobsFoundTerm = 0; for (const job of runJobs) { - // Deduplicate by sourceJobId or jobUrl const id = job.sourceJobId || job.jobUrl; - if (!seenIds.has(id)) { - seenIds.add(id); + if (seenIds.has(id)) continue; + seenIds.add(id); - // Enrich description if missing or poor - const isPoorDescription = - !job.jobDescription || - job.jobDescription.length < 100 || - job.jobDescription.startsWith("Visa sponsorship info:"); + const isPoorDescription = + !job.jobDescription || + job.jobDescription.length < 100 || + job.jobDescription.startsWith("Visa sponsorship info:"); - if (isPoorDescription && job.jobUrl) { - const enriched = await fetchJobDescription(job.jobUrl); - if (enriched) { - job.jobDescription = enriched; - } - // Small delay to avoid hammering the server - await new Promise((resolve) => setTimeout(resolve, 500)); + if (isPoorDescription && job.jobUrl) { + const enriched = await fetchJobDescription(job.jobUrl); + if (enriched) { + job.jobDescription = enriched; } - - allJobs.push(job); - newCount++; + await new Promise((resolve) => setTimeout(resolve, 500)); } + + allJobs.push(job); + jobsFoundTerm += 1; } - console.log( - ` ✅ Fetched ${runJobs.length} jobs for ${termLabel} (${newCount} new unique)`, - ); options.onProgress?.({ type: "term_complete", termIndex, termTotal, searchTerm: term, - jobsFoundTerm: newCount, + jobsFoundTerm, totalCollected: allJobs.length, }); } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; - console.error(`❌ UK Visa Jobs failed for ${termLabel}: ${message}`); options.onProgress?.({ type: "error", termIndex, @@ -415,66 +387,58 @@ export async function runUkVisaJobs( searchTerm: term, message, }); - // Continue to next term instead of failing completely } - // Delay between terms if (i < terms.length - 1) { - console.log(" Waiting 5s before next search term..."); await new Promise((resolve) => setTimeout(resolve, 5000)); } } - console.log( - `✅ UK Visa Jobs: imported total ${allJobs.length} unique jobs`, - ); return { success: true, jobs: allJobs }; } finally { isUkVisaJobsRunning = false; } } -/** - * Read jobs from the extractor's output dataset. - */ async function readDataset(): Promise { const jobs: CreateJobInput[] = []; try { const files = await readdir(STORAGE_DIR); const jsonFiles = files.filter( - (f) => f.endsWith(".json") && f !== "jobs.json", + (file) => file.endsWith(".json") && file !== "jobs.json", ); for (const file of jsonFiles.sort()) { try { const content = await readFile(join(STORAGE_DIR, file), "utf-8"); - const job = JSON.parse(content); + const job = JSON.parse(content) as Record; - // Map to CreateJobInput format jobs.push({ source: "ukvisajobs", - sourceJobId: job.sourceJobId, - title: job.title || "Unknown Title", - employer: job.employer || "Unknown Employer", - employerUrl: job.employerUrl, - jobUrl: job.jobUrl, - applicationLink: job.applicationLink || job.jobUrl, - location: job.location, - deadline: job.deadline, - salary: job.salary, - jobDescription: job.jobDescription, - datePosted: job.datePosted, - degreeRequired: job.degreeRequired, - jobType: job.jobType, - jobLevel: job.jobLevel, + sourceJobId: job.sourceJobId as string | undefined, + title: (job.title as string) || "Unknown Title", + employer: (job.employer as string) || "Unknown Employer", + employerUrl: job.employerUrl as string | undefined, + jobUrl: job.jobUrl as string, + applicationLink: + (job.applicationLink as string | undefined) || + (job.jobUrl as string), + location: job.location as string | undefined, + deadline: job.deadline as string | undefined, + salary: job.salary as string | undefined, + jobDescription: job.jobDescription as string | undefined, + datePosted: job.datePosted as string | undefined, + degreeRequired: job.degreeRequired as string | undefined, + jobType: job.jobType as string | undefined, + jobLevel: job.jobLevel as string | undefined, }); } catch { - // Skip invalid files + // ignore invalid file } } } catch { - // Dataset directory doesn't exist yet + // ignore missing dir } return jobs; diff --git a/orchestrator/src/server/services/ukvisajobs.test.ts b/extractors/ukvisajobs/tests/run.test.ts similarity index 96% rename from orchestrator/src/server/services/ukvisajobs.test.ts rename to extractors/ukvisajobs/tests/run.test.ts index d854255..7fc7de3 100644 --- a/orchestrator/src/server/services/ukvisajobs.test.ts +++ b/extractors/ukvisajobs/tests/run.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { parseUkVisaJobsProgressLine } from "./ukvisajobs"; +import { parseUkVisaJobsProgressLine } from "../src/run"; describe("parseUkVisaJobsProgressLine", () => { it("parses init events", () => { diff --git a/orchestrator/src/client/components/PipelineProgress.tsx b/orchestrator/src/client/components/PipelineProgress.tsx index 16ab531..3e217a7 100644 --- a/orchestrator/src/client/components/PipelineProgress.tsx +++ b/orchestrator/src/client/components/PipelineProgress.tsx @@ -2,6 +2,10 @@ * Live pipeline progress display component. */ +import { + sourceLabel as getSourceLabel, + isExtractorSourceId, +} from "@shared/extractors"; import { Loader2 } from "lucide-react"; import type React from "react"; import { useEffect, useMemo, useState } from "react"; @@ -24,13 +28,7 @@ interface PipelineProgress { | "failed"; message: string; detail?: string; - crawlingSource: - | "gradcracker" - | "jobspy" - | "ukvisajobs" - | "adzuna" - | "hiringcafe" - | null; + crawlingSource: string | null; crawlingSourcesCompleted: number; crawlingSourcesTotal: number; crawlingTermsProcessed: number; @@ -83,20 +81,15 @@ const stepBadgeClasses: Record = { failed: "bg-destructive/10 text-destructive border-destructive/20", }; -const sourceLabel: Record< - Exclude, - string -> = { - gradcracker: "Gradcracker", - jobspy: "JobSpy", - ukvisajobs: "UKVisaJobs", - adzuna: "Adzuna", - hiringcafe: "Hiring Cafe", -}; - const clamp = (value: number, min: number, max: number) => Math.max(min, Math.min(max, value)); +function resolveSourceLabel(source: string): string { + if (source === "jobspy") return "JobSpy"; + if (isExtractorSourceId(source)) return getSourceLabel(source); + return source; +} + export const PipelineProgress: React.FC = ({ isRunning, }) => { @@ -257,7 +250,7 @@ export const PipelineProgress: React.FC = ({

Source:{" "} {progress.crawlingSource - ? sourceLabel[progress.crawlingSource] + ? resolveSourceLabel(progress.crawlingSource) : "starting"} {" "}({progress.crawlingSourcesCompleted}/ {Math.max(progress.crawlingSourcesTotal, 0)}) diff --git a/orchestrator/src/client/components/charts/ResponseRateBySourceChart.tsx b/orchestrator/src/client/components/charts/ResponseRateBySourceChart.tsx index 449d1c3..ad97fd1 100644 --- a/orchestrator/src/client/components/charts/ResponseRateBySourceChart.tsx +++ b/orchestrator/src/client/components/charts/ResponseRateBySourceChart.tsx @@ -5,6 +5,7 @@ * Ghosted/no-reply and rejected outcomes are both excluded from the numerator. */ +import { isExtractorSourceId, sourceLabel } from "@shared/extractors"; import type { JobSource, StageEvent } from "@shared/types.js"; import { useMemo, useState } from "react"; import { @@ -64,17 +65,6 @@ const RESPONSE_STAGES = new Set([ "offer", ]); -const SOURCE_LABELS: Record = { - gradcracker: "Gradcracker", - indeed: "Indeed", - linkedin: "LinkedIn", - glassdoor: "Glassdoor", - ukvisajobs: "UKVisaJobs", - adzuna: "Adzuna", - hiringcafe: "HiringCafe", - manual: "Manual", -}; - const BAR_COLORS = [ "#3b82f6", "#8b5cf6", @@ -110,7 +100,7 @@ const buildResponseRateBySource = ( return Array.from(bySource.entries()) .map(([source, { applied, responded }]) => ({ - source: `${SOURCE_LABELS[source] ?? source} (${applied})`, + source: `${isExtractorSourceId(source) ? sourceLabel(source) : source} (${applied})`, applied, responded, rate: applied > 0 ? (responded / applied) * 100 : 0, diff --git a/orchestrator/src/client/pages/orchestrator/AutomaticRunTab.tsx b/orchestrator/src/client/pages/orchestrator/AutomaticRunTab.tsx index 20b1eff..bbe62b2 100644 --- a/orchestrator/src/client/pages/orchestrator/AutomaticRunTab.tsx +++ b/orchestrator/src/client/pages/orchestrator/AutomaticRunTab.tsx @@ -1,3 +1,4 @@ +import { EXTRACTOR_SOURCE_METADATA } from "@shared/extractors"; import { formatCountryLabel, isSourceAllowedForCountry, @@ -77,7 +78,6 @@ const GLASSDOOR_COUNTRY_REASON = "Glassdoor is not available for the selected country."; const GLASSDOOR_LOCATION_REASON = "Add at least one city in Advanced settings to enable Glassdoor."; -const UK_ONLY_SOURCES = new Set(["gradcracker", "ukvisajobs"]); const HIDDEN_COUNTRY_KEYS = new Set(["usa/ca"]); function normalizeUiCountryKey(value: string): string { @@ -95,7 +95,7 @@ function getSourceDisabledReason( ? GLASSDOOR_LOCATION_REASON : GLASSDOOR_COUNTRY_REASON; } - if (UK_ONLY_SOURCES.has(source)) { + if (EXTRACTOR_SOURCE_METADATA[source]?.ukOnly) { return `${sourceLabel[source]} is available only when country is United Kingdom.`; } return `${sourceLabel[source]} is not available for the selected country.`; diff --git a/orchestrator/src/client/pages/orchestrator/constants.ts b/orchestrator/src/client/pages/orchestrator/constants.ts index 1f5d2c6..655741d 100644 --- a/orchestrator/src/client/pages/orchestrator/constants.ts +++ b/orchestrator/src/client/pages/orchestrator/constants.ts @@ -1,3 +1,8 @@ +import { + EXTRACTOR_SOURCE_IDS, + EXTRACTOR_SOURCE_METADATA, + PIPELINE_EXTRACTOR_SOURCE_IDS, +} from "@shared/extractors"; import type { JobSource, JobStatus } from "@shared/types"; export const DEFAULT_PIPELINE_SOURCES: JobSource[] = [ @@ -9,15 +14,17 @@ export const DEFAULT_PIPELINE_SOURCES: JobSource[] = [ export const PIPELINE_SOURCES_STORAGE_KEY = "jobops.pipeline.sources"; export const orderedSources: JobSource[] = [ - "gradcracker", - "indeed", - "linkedin", - "glassdoor", - "adzuna", - "hiringcafe", - "ukvisajobs", -]; -export const orderedFilterSources: JobSource[] = [...orderedSources, "manual"]; + ...PIPELINE_EXTRACTOR_SOURCE_IDS, +].sort( + (left, right) => + EXTRACTOR_SOURCE_METADATA[left].order - + EXTRACTOR_SOURCE_METADATA[right].order, +); +export const orderedFilterSources: JobSource[] = [...EXTRACTOR_SOURCE_IDS].sort( + (left, right) => + EXTRACTOR_SOURCE_METADATA[left].order - + EXTRACTOR_SOURCE_METADATA[right].order, +); export const statusTokens: Record< JobStatus, diff --git a/orchestrator/src/client/pages/orchestrator/utils.ts b/orchestrator/src/client/pages/orchestrator/utils.ts index b1f66ad..74ade1e 100644 --- a/orchestrator/src/client/pages/orchestrator/utils.ts +++ b/orchestrator/src/client/pages/orchestrator/utils.ts @@ -168,8 +168,7 @@ export const getSourcesWithJobs = (jobs: JobListItem[]): JobSource[] => { export const getEnabledSources = ( settings: AppSettings | null, ): JobSource[] => { - if (!settings) - return [...DEFAULT_PIPELINE_SOURCES, "glassdoor", "hiringcafe"]; + if (!settings) return [...orderedSources]; const enabled: JobSource[] = []; const hasUkVisaJobsAuth = Boolean( diff --git a/orchestrator/src/lib/utils.ts b/orchestrator/src/lib/utils.ts index c8def09..149b465 100644 --- a/orchestrator/src/lib/utils.ts +++ b/orchestrator/src/lib/utils.ts @@ -1,3 +1,7 @@ +import { + EXTRACTOR_SOURCE_IDS, + sourceLabel as getExtractorSourceLabel, +} from "@shared/extractors"; import type { Job } from "@shared/types"; import { type ClassValue, clsx } from "clsx"; import { twMerge } from "tailwind-merge"; @@ -137,13 +141,11 @@ export const formatJobForWebhook = (job: Job) => { ); }; -export const sourceLabel: Record = { - gradcracker: "Gradcracker", - indeed: "Indeed", - linkedin: "LinkedIn", - glassdoor: "Glassdoor", - ukvisajobs: "UK Visa Jobs", - adzuna: "Adzuna", - hiringcafe: "Hiring Cafe", - manual: "Manual", -}; +export const sourceLabel: Record = + EXTRACTOR_SOURCE_IDS.reduce( + (acc, source) => { + acc[source] = getExtractorSourceLabel(source); + return acc; + }, + {} as Record, + ); diff --git a/orchestrator/src/server/api/routes/pipeline.ts b/orchestrator/src/server/api/routes/pipeline.ts index b3414e9..13a9fee 100644 --- a/orchestrator/src/server/api/routes/pipeline.ts +++ b/orchestrator/src/server/api/routes/pipeline.ts @@ -1,12 +1,23 @@ -import { AppError, badRequest, conflict, requestTimeout } from "@infra/errors"; +import { + AppError, + badRequest, + conflict, + requestTimeout, + serviceUnavailable, +} from "@infra/errors"; import { fail, ok, okWithMeta } from "@infra/http"; import { logger } from "@infra/logger"; import { runWithRequestContext } from "@infra/request-context"; import { setupSse, startSseHeartbeat, writeSseData } from "@infra/sse"; +import { PIPELINE_EXTRACTOR_SOURCE_IDS } from "@shared/extractors"; import type { PipelineStatusResponse } from "@shared/types"; import { type Request, type Response, Router } from "express"; import { z } from "zod"; import { isDemoMode } from "../../config/demo"; +import { + type ExtractorRegistry, + getExtractorRegistry, +} from "../../extractors/registry"; import { getPipelineStatus, requestPipelineCancel, @@ -94,15 +105,12 @@ const runPipelineSchema = z.object({ minSuitabilityScore: z.number().min(0).max(100).optional(), sources: z .array( - z.enum([ - "gradcracker", - "indeed", - "linkedin", - "glassdoor", - "ukvisajobs", - "adzuna", - "hiringcafe", - ]), + z.enum( + PIPELINE_EXTRACTOR_SOURCE_IDS as [ + (typeof PIPELINE_EXTRACTOR_SOURCE_IDS)[number], + ...(typeof PIPELINE_EXTRACTOR_SOURCE_IDS)[number][], + ], + ), ) .min(1) .optional(), @@ -111,6 +119,38 @@ const runPipelineSchema = z.object({ pipelineRouter.post("/run", async (req: Request, res: Response) => { try { const config = runPipelineSchema.parse(req.body); + if (config.sources && config.sources.length > 0) { + let registry: ExtractorRegistry; + try { + registry = await getExtractorRegistry(); + } catch (error) { + logger.error( + "Extractor registry unavailable during source validation", + { + route: "/api/pipeline/run", + error, + }, + ); + return fail( + res, + serviceUnavailable( + "Extractor registry is unavailable. Try again after fixing startup errors.", + ), + ); + } + const unavailableSources = config.sources.filter( + (source) => !registry.manifestBySource.has(source), + ); + if (unavailableSources.length > 0) { + return fail( + res, + badRequest( + `Requested sources are not available at runtime: ${unavailableSources.join(", ")}`, + { unavailableSources }, + ), + ); + } + } if (isDemoMode()) { const simulated = await simulatePipelineRun(config); diff --git a/orchestrator/src/server/db/schema.ts b/orchestrator/src/server/db/schema.ts index 99c0ff7..994c41a 100644 --- a/orchestrator/src/server/db/schema.ts +++ b/orchestrator/src/server/db/schema.ts @@ -32,20 +32,7 @@ export const jobs = sqliteTable("jobs", { id: text("id").primaryKey(), // From crawler - source: text("source", { - enum: [ - "gradcracker", - "indeed", - "linkedin", - "glassdoor", - "ukvisajobs", - "adzuna", - "hiringcafe", - "manual", - ], - }) - .notNull() - .default("gradcracker"), + source: text("source").notNull().default("gradcracker"), sourceJobId: text("source_job_id"), jobUrlDirect: text("job_url_direct"), datePosted: text("date_posted"), diff --git a/orchestrator/src/server/extractors/discovery.test.ts b/orchestrator/src/server/extractors/discovery.test.ts new file mode 100644 index 0000000..c0af4fa --- /dev/null +++ b/orchestrator/src/server/extractors/discovery.test.ts @@ -0,0 +1,107 @@ +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { discoverManifestPaths, loadManifestFromFile } from "./discovery"; + +const tempRoots: string[] = []; + +async function makeTempRoot(): Promise { + const testTmpBase = join(process.cwd(), "orchestrator", ".tmp"); + await mkdir(testTmpBase, { recursive: true }); + const tempDir = await mkdtemp(join(testTmpBase, "extractor-discovery-")); + const root = join(tempDir, "extractors"); + await mkdir(root, { recursive: true }); + tempRoots.push(tempDir); + return root; +} + +afterEach(async () => { + for (const root of tempRoots.splice(0)) { + await rm(root, { recursive: true, force: true }); + } +}); + +describe("extractor discovery", () => { + it("finds manifest.ts and src/manifest.ts files", async () => { + const root = await makeTempRoot(); + await mkdir(join(root, "adzuna", "src"), { recursive: true }); + await mkdir(join(root, "jobspy"), { recursive: true }); + + await writeFile( + join(root, "adzuna", "src", "manifest.ts"), + "export const manifest = { id: 'adzuna', displayName: 'Adzuna', providesSources: ['adzuna'], async run() { return { success: true, jobs: [] }; } };", + "utf8", + ); + await writeFile( + join(root, "jobspy", "manifest.ts"), + "export default { id: 'jobspy', displayName: 'JobSpy', providesSources: ['indeed'], async run() { return { success: true, jobs: [] }; } };", + "utf8", + ); + + const found = await discoverManifestPaths(root); + + expect(found).toEqual([ + join(root, "adzuna", "src", "manifest.ts"), + join(root, "jobspy", "manifest.ts"), + ]); + }); + + it("returns empty list when extractor root does not exist", async () => { + const root = join(tmpdir(), `missing-extractors-${Date.now()}`); + await expect(discoverManifestPaths(root)).resolves.toEqual([]); + }); + + it("returns empty list when root is not named extractors", async () => { + const root = await makeTempRoot(); + const invalidRoot = join(root, ".."); + await expect(discoverManifestPaths(invalidRoot)).resolves.toEqual([]); + }); + + it("loads and validates manifest modules", async () => { + const root = await makeTempRoot(); + const validPath = join(root, "valid-manifest.mjs"); + await writeFile( + validPath, + "export const manifest = { id: 'valid', displayName: 'Valid', providesSources: ['indeed'], requiredEnvVars: ['A'], async run() { return { success: true, jobs: [] }; } };", + "utf8", + ); + + const manifest = await loadManifestFromFile(validPath); + expect(manifest.id).toBe("valid"); + expect(manifest.providesSources).toEqual(["indeed"]); + expect(manifest.requiredEnvVars).toEqual(["A"]); + }); + + it("prefers named manifest export when default is a wrapper object", async () => { + const root = await makeTempRoot(); + const wrappedPath = join(root, "wrapped-manifest.mjs"); + await writeFile( + wrappedPath, + [ + "const valid = { id: 'wrapped', displayName: 'Wrapped', providesSources: ['indeed'], async run() { return { success: true, jobs: [] }; } };", + "export const manifest = valid;", + "export default { default: valid, manifest: valid };", + ].join("\n"), + "utf8", + ); + + const manifest = await loadManifestFromFile(wrappedPath); + expect(manifest.id).toBe("wrapped"); + expect(manifest.providesSources).toEqual(["indeed"]); + }); + + it("throws for invalid manifest exports", async () => { + const root = await makeTempRoot(); + const invalidPath = join(root, "invalid-manifest.mjs"); + await writeFile( + invalidPath, + "export default { id: 'invalid', displayName: 'Invalid', providesSources: ['indeed'] };", + "utf8", + ); + + await expect(loadManifestFromFile(invalidPath)).rejects.toThrow( + "Invalid manifest export", + ); + }); +}); diff --git a/orchestrator/src/server/extractors/discovery.ts b/orchestrator/src/server/extractors/discovery.ts new file mode 100644 index 0000000..735165d --- /dev/null +++ b/orchestrator/src/server/extractors/discovery.ts @@ -0,0 +1,111 @@ +import type { Dirent } from "node:fs"; +import { access, readdir, stat } from "node:fs/promises"; +import { basename, dirname, join, resolve } from "node:path"; +import { fileURLToPath, pathToFileURL } from "node:url"; +import type { ExtractorManifest } from "@shared/types"; + +const moduleDir = dirname(fileURLToPath(import.meta.url)); +const DEFAULT_EXTRACTORS_ROOT = resolve(process.cwd(), "../extractors"); +const MODULE_RELATIVE_EXTRACTORS_ROOT = resolve( + moduleDir, + "../../../../extractors", +); + +const MANIFEST_CANDIDATES = ["manifest.ts", "src/manifest.ts"] as const; + +async function fileExists(path: string): Promise { + try { + await access(path); + return true; + } catch { + return false; + } +} + +async function directoryExists(path: string): Promise { + try { + const info = await stat(path); + return info.isDirectory(); + } catch { + return false; + } +} + +async function resolveExtractorsRoot(): Promise { + if (await directoryExists(DEFAULT_EXTRACTORS_ROOT)) { + return DEFAULT_EXTRACTORS_ROOT; + } + + if (await directoryExists(MODULE_RELATIVE_EXTRACTORS_ROOT)) { + return MODULE_RELATIVE_EXTRACTORS_ROOT; + } + + return DEFAULT_EXTRACTORS_ROOT; +} + +export async function discoverManifestPaths( + extractorsRoot?: string, +): Promise { + const root = extractorsRoot ?? (await resolveExtractorsRoot()); + if (basename(root) !== "extractors") { + return []; + } + + let entries: Dirent[] = []; + try { + entries = await readdir(root, { withFileTypes: true }); + } catch (error) { + const known = error as NodeJS.ErrnoException; + if (known.code === "ENOENT") return []; + throw error; + } + const paths: string[] = []; + + for (const entry of entries) { + if (!entry.isDirectory()) continue; + for (const candidate of MANIFEST_CANDIDATES) { + const fullPath = join(root, entry.name, candidate); + if (await fileExists(fullPath)) { + paths.push(fullPath); + break; + } + } + } + + return paths.sort(); +} + +function isManifest(value: unknown): value is ExtractorManifest { + if (!value || typeof value !== "object") return false; + const manifest = value as Partial; + return ( + typeof manifest.id === "string" && + typeof manifest.displayName === "string" && + Array.isArray(manifest.providesSources) && + manifest.providesSources.every((source) => typeof source === "string") && + typeof manifest.run === "function" + ); +} + +export async function loadManifestFromFile( + path: string, +): Promise { + const loaded = await import(pathToFileURL(path).href); + const candidateManifest = (loaded as { manifest?: unknown }).manifest; + const candidateDefault = (loaded as { default?: unknown }).default; + const manifest = isManifest(candidateManifest) + ? candidateManifest + : candidateDefault; + + if (!isManifest(manifest)) { + throw new Error(`Invalid manifest export in ${path}`); + } + + return { + ...manifest, + providesSources: [...manifest.providesSources], + requiredEnvVars: manifest.requiredEnvVars + ? [...manifest.requiredEnvVars] + : undefined, + }; +} diff --git a/orchestrator/src/server/extractors/registry.test.ts b/orchestrator/src/server/extractors/registry.test.ts new file mode 100644 index 0000000..3144d7a --- /dev/null +++ b/orchestrator/src/server/extractors/registry.test.ts @@ -0,0 +1,194 @@ +import { logger } from "@infra/logger"; +import type { ExtractorManifest } from "@shared/types"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("./discovery", () => ({ + discoverManifestPaths: vi.fn(), + loadManifestFromFile: vi.fn(), +})); + +function makeManifest( + id: string, + sources: string[], + displayName = id, +): ExtractorManifest { + return { + id, + displayName, + providesSources: sources, + run: vi.fn(), + }; +} + +describe("extractor registry", () => { + let previousStrict: string | undefined; + + beforeEach(async () => { + vi.clearAllMocks(); + previousStrict = process.env.EXTRACTOR_REGISTRY_STRICT; + process.env.EXTRACTOR_REGISTRY_STRICT = "false"; + const module = await import("./registry"); + module.__resetExtractorRegistryForTests(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + if (previousStrict === undefined) { + delete process.env.EXTRACTOR_REGISTRY_STRICT; + return; + } + process.env.EXTRACTOR_REGISTRY_STRICT = previousStrict; + }); + + it("loads manifests and maps sources", async () => { + const discovery = await import("./discovery"); + const registryModule = await import("./registry"); + registryModule.__resetExtractorRegistryForTests(); + + vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([ + "/tmp/jobspy.ts", + "/tmp/ukvisajobs.ts", + ]); + vi.mocked(discovery.loadManifestFromFile).mockImplementation( + async (path) => + path === "/tmp/jobspy.ts" + ? makeManifest( + "jobspy", + ["indeed", "linkedin", "glassdoor"], + "JobSpy", + ) + : makeManifest("ukvisajobs", ["ukvisajobs"], "UK Visa Jobs"), + ); + + const registry = await registryModule.initializeExtractorRegistry(); + + expect(registry.manifests.size).toBe(2); + expect(registry.manifestBySource.get("linkedin")?.id).toBe("jobspy"); + expect(registry.manifestBySource.get("ukvisajobs")?.id).toBe("ukvisajobs"); + }); + + it("throws on duplicate manifest ids in strict mode", async () => { + const discovery = await import("./discovery"); + const registryModule = await import("./registry"); + registryModule.__resetExtractorRegistryForTests(); + process.env.EXTRACTOR_REGISTRY_STRICT = "true"; + + vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([ + "/tmp/one.ts", + "/tmp/two.ts", + ]); + vi.mocked(discovery.loadManifestFromFile).mockImplementation(async (path) => + makeManifest( + "duplicate", + path === "/tmp/one.ts" ? ["indeed"] : ["linkedin"], + `Manifest ${path}`, + ), + ); + + await expect(registryModule.initializeExtractorRegistry()).rejects.toThrow( + "Duplicate extractor manifest id: duplicate", + ); + }); + + it("throws on duplicate source providers even in non-strict mode", async () => { + const discovery = await import("./discovery"); + const registryModule = await import("./registry"); + registryModule.__resetExtractorRegistryForTests(); + process.env.EXTRACTOR_REGISTRY_STRICT = "false"; + + vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([ + "/tmp/one.ts", + "/tmp/two.ts", + ]); + vi.mocked(discovery.loadManifestFromFile).mockImplementation( + async (path) => + path === "/tmp/one.ts" + ? makeManifest("one", ["indeed"], "One") + : makeManifest("two", ["indeed"], "Two"), + ); + + await expect(registryModule.initializeExtractorRegistry()).rejects.toThrow( + "Source indeed is provided by multiple manifests (one, two)", + ); + }); + + it("warns and skips manifests with unknown sources", async () => { + const discovery = await import("./discovery"); + const registryModule = await import("./registry"); + registryModule.__resetExtractorRegistryForTests(); + const warnSpy = vi.spyOn(logger, "warn").mockImplementation(() => {}); + + vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([ + "/tmp/unknown.ts", + "/tmp/valid.ts", + ]); + vi.mocked(discovery.loadManifestFromFile).mockImplementation( + async (path) => + path === "/tmp/unknown.ts" + ? makeManifest("unknown", ["not-a-real-source"], "Unknown") + : makeManifest("valid", ["indeed"], "Valid"), + ); + + const registry = await registryModule.initializeExtractorRegistry(); + + expect(registry.manifests.size).toBe(1); + expect(registry.manifests.has("valid")).toBe(true); + expect( + warnSpy.mock.calls.some( + ([message]) => + typeof message === "string" && + message.includes("Skipping extractor manifest with no known sources"), + ), + ).toBe(true); + }); + + it("warns when catalog pipeline sources have no runtime manifest", async () => { + const discovery = await import("./discovery"); + const registryModule = await import("./registry"); + registryModule.__resetExtractorRegistryForTests(); + const warnSpy = vi.spyOn(logger, "warn").mockImplementation(() => {}); + + vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([ + "/tmp/only-indeed.ts", + ]); + vi.mocked(discovery.loadManifestFromFile).mockResolvedValue( + makeManifest("only-indeed", ["indeed"], "Only Indeed"), + ); + + await registryModule.initializeExtractorRegistry(); + + expect( + warnSpy.mock.calls.some( + ([message]) => + typeof message === "string" && + message.includes("Shared extractor sources have no runtime manifest"), + ), + ).toBe(true); + }); + + it("continues loading valid manifests in non-strict mode when one manifest fails", async () => { + const discovery = await import("./discovery"); + const registryModule = await import("./registry"); + registryModule.__resetExtractorRegistryForTests(); + process.env.EXTRACTOR_REGISTRY_STRICT = "false"; + + vi.mocked(discovery.discoverManifestPaths).mockResolvedValue([ + "/tmp/broken.ts", + "/tmp/valid.ts", + ]); + vi.mocked(discovery.loadManifestFromFile).mockImplementation( + async (path) => { + if (path === "/tmp/broken.ts") { + throw new Error("bad manifest"); + } + return makeManifest("valid", ["indeed"], "Valid"); + }, + ); + + const registry = await registryModule.initializeExtractorRegistry(); + + expect(registry.manifests.size).toBe(1); + expect(registry.manifests.has("valid")).toBe(true); + expect(registry.manifestBySource.get("indeed")?.id).toBe("valid"); + }); +}); diff --git a/orchestrator/src/server/extractors/registry.ts b/orchestrator/src/server/extractors/registry.ts new file mode 100644 index 0000000..4f7c1ce --- /dev/null +++ b/orchestrator/src/server/extractors/registry.ts @@ -0,0 +1,239 @@ +import { logger } from "@infra/logger"; +import { sanitizeUnknown } from "@infra/sanitize"; +import { + EXTRACTOR_SOURCE_IDS, + EXTRACTOR_SOURCE_METADATA, + type ExtractorSourceId, + PIPELINE_EXTRACTOR_SOURCE_IDS, +} from "@shared/extractors"; +import type { ExtractorManifest } from "@shared/types"; +import { discoverManifestPaths, loadManifestFromFile } from "./discovery"; + +export interface ExtractorRegistry { + manifests: Map; + manifestBySource: Map; + availableSources: ExtractorSourceId[]; +} + +let registry: ExtractorRegistry | null = null; +let initPromise: Promise | null = null; + +class DuplicateManifestIdError extends Error { + readonly manifestId: string; + + constructor(manifestId: string) { + super(`Duplicate extractor manifest id: ${manifestId}`); + this.manifestId = manifestId; + this.name = "DuplicateManifestIdError"; + } +} + +class DuplicateSourceProviderError extends Error { + readonly source: ExtractorSourceId; + readonly existingManifestId: string; + readonly duplicateManifestId: string; + + constructor(args: { + source: ExtractorSourceId; + existingManifestId: string; + duplicateManifestId: string; + }) { + super( + `Source ${args.source} is provided by multiple manifests (${args.existingManifestId}, ${args.duplicateManifestId})`, + ); + this.source = args.source; + this.existingManifestId = args.existingManifestId; + this.duplicateManifestId = args.duplicateManifestId; + this.name = "DuplicateSourceProviderError"; + } +} + +export function __resetExtractorRegistryForTests(): void { + registry = null; + initPromise = null; +} + +function strictModeEnabled(): boolean { + if (process.env.EXTRACTOR_REGISTRY_STRICT) { + const raw = process.env.EXTRACTOR_REGISTRY_STRICT.toLowerCase(); + return raw === "1" || raw === "true"; + } + + return process.env.NODE_ENV === "production"; +} + +function resolveCatalogMismatches( + manifests: Map, +): void { + const missingFromCatalog = new Set(); + const missingManifest = new Set(); + + for (const manifest of manifests.values()) { + for (const source of manifest.providesSources) { + if (!EXTRACTOR_SOURCE_IDS.includes(source as ExtractorSourceId)) { + missingFromCatalog.add(source); + } + } + } + + for (const sourceId of PIPELINE_EXTRACTOR_SOURCE_IDS) { + const hasManifest = Array.from(manifests.values()).some((manifest) => + manifest.providesSources.includes(sourceId), + ); + if (!hasManifest) { + missingManifest.add(sourceId); + } + } + + const strict = strictModeEnabled(); + if (missingFromCatalog.size > 0) { + const message = + "Extractor sources are missing from shared extractor catalog"; + const context = { + missingFromCatalog: [...missingFromCatalog], + strict, + }; + if (strict) { + throw new Error(`${message}: ${[...missingFromCatalog].join(", ")}`); + } + logger.warn(message, context); + } + + if (missingManifest.size > 0) { + logger.warn("Shared extractor sources have no runtime manifest", { + missingManifest: [...missingManifest], + strict, + }); + } +} + +async function createRegistry(): Promise { + const manifestPaths = await discoverManifestPaths(); + const manifests = new Map(); + const manifestBySource = new Map(); + + for (const path of manifestPaths) { + try { + const manifest = await loadManifestFromFile(path); + if (manifests.has(manifest.id)) { + throw new DuplicateManifestIdError(manifest.id); + } + + const invalidSources = manifest.providesSources.filter( + (source) => !EXTRACTOR_SOURCE_IDS.includes(source as ExtractorSourceId), + ); + const validSources = manifest.providesSources.filter((source) => + EXTRACTOR_SOURCE_IDS.includes(source as ExtractorSourceId), + ) as ExtractorSourceId[]; + + if (invalidSources.length > 0) { + logger.warn("Extractor manifest contains unknown sources", { + manifestId: manifest.id, + path, + invalidSources, + }); + } + + if (validSources.length === 0) { + logger.warn("Skipping extractor manifest with no known sources", { + manifestId: manifest.id, + path, + declaredSources: manifest.providesSources, + }); + continue; + } + + for (const typedSource of validSources) { + if (manifestBySource.has(typedSource)) { + const existing = manifestBySource.get(typedSource); + throw new DuplicateSourceProviderError({ + source: typedSource, + existingManifestId: existing?.id ?? "unknown", + duplicateManifestId: manifest.id, + }); + } + } + + manifests.set(manifest.id, manifest); + for (const source of validSources) { + manifestBySource.set(source, manifest); + } + } catch (error) { + if (error instanceof DuplicateSourceProviderError) { + throw error; + } + + if (error instanceof DuplicateManifestIdError && strictModeEnabled()) { + throw error; + } + + logger.warn("Skipping invalid extractor manifest", { + path, + error: sanitizeUnknown(error), + }); + } + } + + resolveCatalogMismatches(manifests); + + const availableSources = PIPELINE_EXTRACTOR_SOURCE_IDS.filter((source) => + manifestBySource.has(source), + ); + + logger.info("Extractor registry initialized", { + manifestCount: manifests.size, + sourceCount: availableSources.length, + manifests: Array.from(manifests.values()).map((manifest) => ({ + id: manifest.id, + sources: manifest.providesSources, + requiredEnvVarsCount: manifest.requiredEnvVars?.length ?? 0, + })), + }); + + return { + manifests, + manifestBySource, + availableSources, + }; +} + +export async function initializeExtractorRegistry(): Promise { + if (registry) return registry; + if (!initPromise) { + initPromise = createRegistry() + .then((created) => { + registry = created; + return created; + }) + .catch((error) => { + logger.error("Failed to initialize extractor registry", { + error: sanitizeUnknown(error), + }); + registry = null; + initPromise = null; + throw error; + }); + } + return initPromise; +} + +export async function getExtractorRegistry(): Promise { + return initializeExtractorRegistry(); +} + +export async function listAvailableSources(): Promise< + Array<{ id: ExtractorSourceId; label: string }> +> { + const current = await getExtractorRegistry(); + return current.availableSources.map((source) => ({ + id: source, + label: EXTRACTOR_SOURCE_METADATA[source].label, + })); +} + +export async function isSourceAvailable( + source: ExtractorSourceId, +): Promise { + const current = await getExtractorRegistry(); + return current.manifestBySource.has(source); +} diff --git a/orchestrator/src/server/index.ts b/orchestrator/src/server/index.ts index f72a126..f686617 100644 --- a/orchestrator/src/server/index.ts +++ b/orchestrator/src/server/index.ts @@ -3,7 +3,10 @@ */ import "./config/env"; +import { logger } from "@infra/logger"; +import { sanitizeUnknown } from "@infra/sanitize"; import { createApp } from "./app"; +import { initializeExtractorRegistry } from "./extractors/registry"; import * as settingsRepo from "./repositories/settings"; import { getBackupSettings, @@ -16,6 +19,25 @@ import { initialize as initializeVisaSponsors } from "./services/visa-sponsors/i async function startServer() { await applyStoredEnvOverrides(); + try { + await initializeExtractorRegistry(); + } catch (error) { + const sanitizedError = sanitizeUnknown(error); + logger.error("Failed to initialize extractor registry", { + error: sanitizedError, + }); + if (process.env.NODE_ENV === "production") { + logger.error( + "Extractor registry initialization failed in production. Shutting down server.", + ); + process.exit(1); + } + + logger.error( + "Extractor registry initialization failed outside production. Server startup aborted.", + ); + return; + } const app = createApp(); const PORT = process.env.PORT || 3001; @@ -46,7 +68,9 @@ async function startServer() { await initializeVisaSponsors(); } } catch (error) { - console.warn("⚠️ Failed to initialize visa sponsors service:", error); + logger.warn("Failed to initialize visa sponsors service", { + error: sanitizeUnknown(error), + }); } // Initialize backup service (load settings and start scheduler if enabled) @@ -85,13 +109,17 @@ async function startServer() { ); } } catch (error) { - console.warn("⚠️ Failed to initialize backup service:", error); + logger.warn("Failed to initialize backup service", { + error: sanitizeUnknown(error), + }); } try { await initializeDemoModeServices(); } catch (error) { - console.warn("⚠️ Failed to initialize demo mode services:", error); + logger.warn("Failed to initialize demo mode services", { + error: sanitizeUnknown(error), + }); } }); } diff --git a/orchestrator/src/server/pipeline/progress.ts b/orchestrator/src/server/pipeline/progress.ts index d81d81b..2984a66 100644 --- a/orchestrator/src/server/pipeline/progress.ts +++ b/orchestrator/src/server/pipeline/progress.ts @@ -14,12 +14,7 @@ export type PipelineStep = | "cancelled" | "failed"; -export type CrawlSource = - | "gradcracker" - | "jobspy" - | "ukvisajobs" - | "adzuna" - | "hiringcafe"; +export type CrawlSource = string; export interface PipelineProgress { step: PipelineStep; diff --git a/orchestrator/src/server/pipeline/sponsor-matching.test.ts b/orchestrator/src/server/pipeline/sponsor-matching.test.ts index 2b5e873..911a7cb 100644 --- a/orchestrator/src/server/pipeline/sponsor-matching.test.ts +++ b/orchestrator/src/server/pipeline/sponsor-matching.test.ts @@ -40,18 +40,6 @@ vi.mock("../repositories/settings", () => ({ getAllSettings: vi.fn().mockResolvedValue({}), })); -vi.mock("../services/crawler", () => ({ - runCrawler: vi.fn(() => ({ success: true, jobs: [] })), -})); - -vi.mock("../services/jobspy", () => ({ - runJobSpy: vi.fn(() => ({ success: true, jobs: [] })), -})); - -vi.mock("../services/ukvisajobs", () => ({ - runUkVisaJobs: vi.fn(() => ({ success: true, jobs: [] })), -})); - const now = new Date().toISOString(); const createJob = (overrides: Partial = {}): Job => diff --git a/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts b/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts index 53eaf4c..7143f54 100644 --- a/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts +++ b/orchestrator/src/server/pipeline/steps/discover-jobs.test.ts @@ -3,35 +3,19 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { getProgress, resetProgress } from "../progress"; import { discoverJobsStep } from "./discover-jobs"; -vi.mock("../../repositories/jobs", () => ({ - getAllJobUrls: vi.fn().mockResolvedValue([]), -})); - vi.mock("../../repositories/settings", () => ({ getAllSettings: vi.fn(), })); -vi.mock("../../services/jobspy", () => ({ - runJobSpy: vi.fn(), +vi.mock("../../repositories/jobs", () => ({ + getAllJobUrls: vi.fn().mockResolvedValue([]), })); -vi.mock("../../services/crawler", () => ({ - runCrawler: vi.fn(), +vi.mock("../../extractors/registry", () => ({ + getExtractorRegistry: vi.fn(), })); -vi.mock("../../services/adzuna", () => ({ - runAdzuna: vi.fn(), -})); - -vi.mock("../../services/hiring-cafe", () => ({ - runHiringCafe: vi.fn(), -})); - -vi.mock("../../services/ukvisajobs", () => ({ - runUkVisaJobs: vi.fn(), -})); - -const config: PipelineConfig = { +const baseConfig: PipelineConfig = { topN: 10, minSuitabilityScore: 50, sources: ["indeed", "linkedin", "ukvisajobs"], @@ -50,565 +34,120 @@ describe("discoverJobsStep", () => { it("aggregates source errors for enabled sources", async () => { const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); - const ukVisa = await import("../../services/ukvisajobs"); + const registryModule = await import("../../extractors/registry"); - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - } as any); - - vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ - success: true, - jobs: [ - { - source: "linkedin", - title: "Engineer", - employer: "ACME", - jobUrl: "https://example.com/job", - }, - ], - } as any); - - vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({ - success: false, - error: "login failed", - } as any); - - const result = await discoverJobsStep({ mergedConfig: config }); - - expect(result.discoveredJobs).toHaveLength(1); - expect(result.sourceErrors).toEqual(["ukvisajobs: login failed"]); - expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith( - expect.objectContaining({ sites: ["indeed", "linkedin"] }), - ); - }); - - it("passes glassdoor through to JobSpy when selected", async () => { - const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - } as any); - - vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ - success: true, - jobs: [ - { - source: "glassdoor", - title: "Engineer", - employer: "ACME", - jobUrl: "https://example.com/job", - }, - ], - } as any); - - const result = await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["glassdoor"], - }, - }); - - expect(result.discoveredJobs).toHaveLength(1); - expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith( - expect.objectContaining({ sites: ["glassdoor"] }), - ); - }); - - it("passes serialized multi-city locations to JobSpy", async () => { - const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "united kingdom", - searchCities: "London|Manchester", - } as any); - - vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ - success: true, - jobs: [], - } as any); - - await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["linkedin"], - }, - }); - - expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith( - expect.objectContaining({ - location: "London|Manchester", + const jobspyManifest = { + id: "jobspy", + displayName: "JobSpy", + providesSources: ["indeed", "linkedin", "glassdoor"], + run: vi.fn().mockResolvedValue({ + success: true, + jobs: [ + { + source: "linkedin", + title: "Engineer", + employer: "ACME", + jobUrl: "https://example.com/job", + }, + ], }), - ); - }); - - it("filters out glassdoor for unsupported countries", async () => { - const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); + }; + const ukvisaManifest = { + id: "ukvisajobs", + displayName: "UK Visa Jobs", + providesSources: ["ukvisajobs"], + run: vi.fn().mockResolvedValue({ + success: false, + jobs: [], + error: "login failed", + }), + }; vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "japan", } as any); - vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ - success: true, - jobs: [], + vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({ + manifests: new Map([ + ["jobspy", jobspyManifest as any], + ["ukvisajobs", ukvisaManifest as any], + ]), + manifestBySource: new Map([ + ["indeed", jobspyManifest as any], + ["linkedin", jobspyManifest as any], + ["glassdoor", jobspyManifest as any], + ["ukvisajobs", ukvisaManifest as any], + ]), + availableSources: ["indeed", "linkedin", "glassdoor", "ukvisajobs"], } as any); - await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["glassdoor", "linkedin"], - }, - }); + const result = await discoverJobsStep({ mergedConfig: baseConfig }); - expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledWith( - expect.objectContaining({ sites: ["linkedin"] }), + expect(result.discoveredJobs).toHaveLength(1); + expect(result.sourceErrors).toEqual([ + "UK Visa Jobs: login failed (sources: ukvisajobs)", + ]); + expect(jobspyManifest.run).toHaveBeenCalledWith( + expect.objectContaining({ selectedSources: ["indeed", "linkedin"] }), ); }); it("throws when all enabled sources fail", async () => { const settingsRepo = await import("../../repositories/settings"); - const ukVisa = await import("../../services/ukvisajobs"); + const registryModule = await import("../../extractors/registry"); + + const ukvisaManifest = { + id: "ukvisajobs", + displayName: "UK Visa Jobs", + providesSources: ["ukvisajobs"], + run: vi.fn().mockResolvedValue({ + success: false, + jobs: [], + error: "boom", + }), + }; vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ searchTerms: JSON.stringify(["engineer"]), } as any); - vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({ - success: false, - error: "boom", + vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({ + manifests: new Map([["ukvisajobs", ukvisaManifest as any]]), + manifestBySource: new Map([["ukvisajobs", ukvisaManifest as any]]), + availableSources: ["ukvisajobs"], } as any); await expect( discoverJobsStep({ mergedConfig: { - ...config, + ...baseConfig, sources: ["ukvisajobs"], }, }), - ).rejects.toThrow("All sources failed: ukvisajobs: boom"); - }); - - it("runs adzuna when selected and country is compatible", async () => { - const settingsRepo = await import("../../repositories/settings"); - const adzuna = await import("../../services/adzuna"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "united states", - } as any); - - vi.mocked(adzuna.runAdzuna).mockResolvedValue({ - success: true, - jobs: [ - { - source: "adzuna", - sourceJobId: "adzu-1", - title: "Engineer", - employer: "ACME", - jobUrl: "https://example.com/job", - applicationLink: "https://example.com/job", - }, - ], - } as any); - - const result = await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["adzuna"], - }, - }); - - expect(result.discoveredJobs).toHaveLength(1); - expect(vi.mocked(adzuna.runAdzuna)).toHaveBeenCalledWith( - expect.objectContaining({ country: "us" }), + ).rejects.toThrow( + "All sources failed: UK Visa Jobs: boom (sources: ukvisajobs)", ); }); - it("passes configured city locations to adzuna", async () => { - const settingsRepo = await import("../../repositories/settings"); - const adzuna = await import("../../services/adzuna"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "united kingdom", - searchCities: "Leeds|Manchester", - } as any); - - vi.mocked(adzuna.runAdzuna).mockResolvedValue({ - success: true, - jobs: [], - } as any); - - await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["adzuna"], - }, - }); - - expect(vi.mocked(adzuna.runAdzuna)).toHaveBeenCalledWith( - expect.objectContaining({ - country: "gb", - countryKey: "united kingdom", - locations: ["Leeds", "Manchester"], - }), - ); - }); - - it("skips adzuna for unsupported countries", async () => { - const settingsRepo = await import("../../repositories/settings"); - const adzuna = await import("../../services/adzuna"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "japan", - } as any); - - await expect( - discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["adzuna"], - }, - }), - ).rejects.toThrow("No compatible sources for selected country: Japan"); - - expect(vi.mocked(adzuna.runAdzuna)).not.toHaveBeenCalled(); - }); - - it("runs hiringcafe when selected and passes country/terms/cap", async () => { - const settingsRepo = await import("../../repositories/settings"); - const hiringCafe = await import("../../services/hiring-cafe"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "united states", - jobspyResultsWanted: "25", - } as any); - - vi.mocked(hiringCafe.runHiringCafe).mockResolvedValue({ - success: true, - jobs: [ - { - source: "hiringcafe", - sourceJobId: "hc-1", - title: "Engineer", - employer: "ACME", - jobUrl: "https://example.com/hc", - applicationLink: "https://example.com/hc", - }, - ], - } as any); - - const result = await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["hiringcafe"], - }, - }); - - expect(result.discoveredJobs).toHaveLength(1); - expect(vi.mocked(hiringCafe.runHiringCafe)).toHaveBeenCalledWith( - expect.objectContaining({ - country: "united states", - countryKey: "united states", - locations: [], - searchTerms: ["engineer"], - maxJobsPerTerm: 25, - }), - ); - }); - - it("passes configured city locations to hiringcafe", async () => { - const settingsRepo = await import("../../repositories/settings"); - const hiringCafe = await import("../../services/hiring-cafe"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "united kingdom", - jobspyResultsWanted: "25", - searchCities: "Leeds|Manchester", - } as any); - - vi.mocked(hiringCafe.runHiringCafe).mockResolvedValue({ - success: true, - jobs: [], - } as any); - - await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["hiringcafe"], - }, - }); - - expect(vi.mocked(hiringCafe.runHiringCafe)).toHaveBeenCalledWith( - expect.objectContaining({ - country: "united kingdom", - countryKey: "united kingdom", - locations: ["Leeds", "Manchester"], - }), - ); - }); - - it("updates Hiring Cafe terms and pages via progress callbacks", async () => { - const settingsRepo = await import("../../repositories/settings"); - const hiringCafe = await import("../../services/hiring-cafe"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer", "frontend"]), - jobspyCountryIndeed: "united kingdom", - jobspyResultsWanted: "50", - } as any); - - vi.mocked(hiringCafe.runHiringCafe).mockImplementation( - async (options: any) => { - options?.onProgress?.({ - type: "term_start", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - }); - options?.onProgress?.({ - type: "page_fetched", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - pageNo: 0, - resultsOnPage: 10, - totalCollected: 10, - }); - options?.onProgress?.({ - type: "term_complete", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - jobsFoundTerm: 10, - }); - return { success: true, jobs: [] } as any; - }, - ); - - await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["hiringcafe"], - }, - }); - - const progress = getProgress(); - expect(progress.crawlingTermsProcessed).toBe(1); - expect(progress.crawlingTermsTotal).toBe(2); - expect(progress.crawlingListPagesProcessed).toBe(1); - expect(progress.crawlingJobPagesEnqueued).toBe(10); - expect(progress.crawlingJobPagesProcessed).toBe(10); - }); - - it("returns Hiring Cafe source error when extractor fails", async () => { - const settingsRepo = await import("../../repositories/settings"); - const hiringCafe = await import("../../services/hiring-cafe"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "united kingdom", - jobspyResultsWanted: "50", - } as any); - - vi.mocked(hiringCafe.runHiringCafe).mockResolvedValue({ - success: false, - jobs: [], - error: "blocked upstream", - } as any); - - await expect( - discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["hiringcafe"], - }, - }), - ).rejects.toThrow("All sources failed: hiringcafe: blocked upstream"); - }); - - it("maps Gradcracker progress callback into live crawling counters", async () => { - const settingsRepo = await import("../../repositories/settings"); - const crawler = await import("../../services/crawler"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - } as any); - - vi.mocked(crawler.runCrawler).mockImplementation(async (options: any) => { - options?.onProgress?.({ - phase: "list", - currentUrl: "https://example.com/list", - listPagesProcessed: 3, - listPagesTotal: 10, - jobCardsFound: 42, - jobPagesEnqueued: 30, - jobPagesSkipped: 4, - jobPagesProcessed: 8, - }); - return { success: true, jobs: [] } as any; - }); - - await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["gradcracker"], - }, - }); - - const progress = getProgress(); - expect(progress.crawlingSource).toBeNull(); - expect(progress.crawlingListPagesProcessed).toBe(3); - expect(progress.crawlingListPagesTotal).toBe(10); - expect(progress.crawlingJobCardsFound).toBe(42); - expect(progress.crawlingJobPagesEnqueued).toBe(30); - expect(progress.crawlingJobPagesSkipped).toBe(4); - expect(progress.crawlingJobPagesProcessed).toBe(8); - }); - - it("updates JobSpy terms and UKVisa pages via progress callbacks", async () => { - const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); - const ukVisa = await import("../../services/ukvisajobs"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer", "frontend"]), - } as any); - - vi.mocked(jobSpy.runJobSpy).mockImplementation(async (options: any) => { - options?.onProgress?.({ - type: "term_start", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - }); - options?.onProgress?.({ - type: "term_complete", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - jobsFoundTerm: 10, - }); - options?.onProgress?.({ - type: "term_start", - termIndex: 2, - termTotal: 2, - searchTerm: "frontend", - }); - options?.onProgress?.({ - type: "term_complete", - termIndex: 2, - termTotal: 2, - searchTerm: "frontend", - jobsFoundTerm: 8, - }); - return { success: true, jobs: [] } as any; - }); - - vi.mocked(ukVisa.runUkVisaJobs).mockImplementation(async (options: any) => { - options?.onProgress?.({ - type: "init", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - maxPages: 4, - maxJobs: 50, - }); - options?.onProgress?.({ - type: "page_fetched", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - pageNo: 2, - maxPages: 4, - jobsOnPage: 15, - totalCollected: 18, - totalAvailable: 100, - }); - options?.onProgress?.({ - type: "term_complete", - termIndex: 1, - termTotal: 2, - searchTerm: "engineer", - jobsFoundTerm: 18, - totalCollected: 18, - }); - return { success: true, jobs: [] } as any; - }); - - await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["linkedin", "ukvisajobs"], - }, - }); - - const progress = getProgress(); - expect(progress.crawlingTermsProcessed).toBe(3); - expect(progress.crawlingTermsTotal).toBe(4); - expect(progress.crawlingListPagesProcessed).toBe(2); - expect(progress.crawlingListPagesTotal).toBe(4); - expect(progress.crawlingJobPagesEnqueued).toBe(18); - expect(progress.crawlingJobPagesProcessed).toBe(18); - }); - - it("skips UK-only sources for non-UK country and runs compatible sources", async () => { - const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); - const crawler = await import("../../services/crawler"); - const ukVisa = await import("../../services/ukvisajobs"); - - vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ - searchTerms: JSON.stringify(["engineer"]), - jobspyCountryIndeed: "united states", - } as any); - - vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ - success: true, - jobs: [ - { - source: "linkedin", - title: "Engineer", - employer: "ACME", - jobUrl: "https://example.com/job", - }, - ], - } as any); - - const result = await discoverJobsStep({ - mergedConfig: { - ...config, - sources: ["linkedin", "gradcracker", "ukvisajobs"], - }, - }); - - expect(result.discoveredJobs).toHaveLength(1); - expect(vi.mocked(jobSpy.runJobSpy)).toHaveBeenCalledTimes(1); - expect(vi.mocked(crawler.runCrawler)).not.toHaveBeenCalled(); - expect(vi.mocked(ukVisa.runUkVisaJobs)).not.toHaveBeenCalled(); - }); - it("throws when all requested sources are incompatible for country", async () => { const settingsRepo = await import("../../repositories/settings"); + const registryModule = await import("../../extractors/registry"); vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ searchTerms: JSON.stringify(["engineer"]), jobspyCountryIndeed: "united states", } as any); + vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({ + manifests: new Map(), + manifestBySource: new Map(), + availableSources: [], + } as any); + await expect( discoverJobsStep({ mergedConfig: { - ...config, + ...baseConfig, sources: ["gradcracker", "ukvisajobs"], }, }), @@ -619,63 +158,75 @@ describe("discoverJobsStep", () => { it("does not throw when no sources are requested", async () => { const settingsRepo = await import("../../repositories/settings"); - const adzuna = await import("../../services/adzuna"); - const hiringCafe = await import("../../services/hiring-cafe"); - const jobSpy = await import("../../services/jobspy"); - const crawler = await import("../../services/crawler"); - const ukVisa = await import("../../services/ukvisajobs"); + const registryModule = await import("../../extractors/registry"); vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ searchTerms: JSON.stringify(["engineer"]), jobspyCountryIndeed: "united states", } as any); + vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({ + manifests: new Map(), + manifestBySource: new Map(), + availableSources: [], + } as any); + const result = await discoverJobsStep({ mergedConfig: { - ...config, + ...baseConfig, sources: [], }, }); expect(result.discoveredJobs).toEqual([]); expect(result.sourceErrors).toEqual([]); - expect(vi.mocked(jobSpy.runJobSpy)).not.toHaveBeenCalled(); - expect(vi.mocked(adzuna.runAdzuna)).not.toHaveBeenCalled(); - expect(vi.mocked(hiringCafe.runHiringCafe)).not.toHaveBeenCalled(); - expect(vi.mocked(crawler.runCrawler)).not.toHaveBeenCalled(); - expect(vi.mocked(ukVisa.runUkVisaJobs)).not.toHaveBeenCalled(); }); it("drops discovered jobs when employer matches blocked company keywords", async () => { const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); + const registryModule = await import("../../extractors/registry"); + + const jobspyManifest = { + id: "jobspy", + displayName: "JobSpy", + providesSources: ["indeed", "linkedin", "glassdoor"], + run: vi.fn().mockResolvedValue({ + success: true, + jobs: [ + { + source: "linkedin", + title: "Engineer", + employer: "Acme Staffing", + jobUrl: "https://example.com/job-1", + }, + { + source: "linkedin", + title: "Engineer II", + employer: "Contoso", + jobUrl: "https://example.com/job-2", + }, + ], + }), + }; vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ searchTerms: JSON.stringify(["engineer"]), blockedCompanyKeywords: JSON.stringify(["recruit", "staffing"]), } as any); - vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ - success: true, - jobs: [ - { - source: "linkedin", - title: "Engineer", - employer: "Acme Staffing", - jobUrl: "https://example.com/job-1", - }, - { - source: "linkedin", - title: "Engineer II", - employer: "Contoso", - jobUrl: "https://example.com/job-2", - }, - ], + vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({ + manifests: new Map([["jobspy", jobspyManifest as any]]), + manifestBySource: new Map([ + ["indeed", jobspyManifest as any], + ["linkedin", jobspyManifest as any], + ["glassdoor", jobspyManifest as any], + ]), + availableSources: ["indeed", "linkedin", "glassdoor"], } as any); const result = await discoverJobsStep({ mergedConfig: { - ...config, + ...baseConfig, sources: ["linkedin"], }, }); @@ -684,32 +235,139 @@ describe("discoverJobsStep", () => { expect(result.discoveredJobs[0]?.employer).toBe("Contoso"); }); + it("applies shared city filtering for sources without native city filtering", async () => { + const settingsRepo = await import("../../repositories/settings"); + const registryModule = await import("../../extractors/registry"); + + const gradcrackerManifest = { + id: "gradcracker", + displayName: "Gradcracker", + providesSources: ["gradcracker"], + run: vi.fn().mockResolvedValue({ + success: true, + jobs: [ + { + source: "gradcracker", + title: "Engineer - Leeds", + employer: "ACME", + location: "Leeds, England, UK", + jobUrl: "https://example.com/grad-1", + }, + { + source: "gradcracker", + title: "Engineer - London", + employer: "ACME", + location: "London, England, UK", + jobUrl: "https://example.com/grad-2", + }, + ], + }), + }; + const ukvisaManifest = { + id: "ukvisajobs", + displayName: "UK Visa Jobs", + providesSources: ["ukvisajobs"], + run: vi.fn().mockResolvedValue({ + success: true, + jobs: [ + { + source: "ukvisajobs", + title: "Developer - Leeds", + employer: "Contoso", + location: "Leeds, England, UK", + jobUrl: "https://example.com/ukv-1", + }, + ], + }), + }; + + vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ + searchTerms: JSON.stringify(["engineer"]), + searchCities: "Leeds", + jobspyCountryIndeed: "united kingdom", + } as any); + + vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({ + manifests: new Map([ + ["gradcracker", gradcrackerManifest as any], + ["ukvisajobs", ukvisaManifest as any], + ]), + manifestBySource: new Map([ + ["gradcracker", gradcrackerManifest as any], + ["ukvisajobs", ukvisaManifest as any], + ]), + availableSources: ["gradcracker", "ukvisajobs"], + } as any); + + const result = await discoverJobsStep({ + mergedConfig: { + ...baseConfig, + sources: ["gradcracker", "ukvisajobs"], + }, + }); + + expect(result.discoveredJobs).toHaveLength(2); + expect( + result.discoveredJobs.every((job) => job.location?.includes("Leeds")), + ).toBe(true); + }); + it("tracks source completion counters across source transitions", async () => { const settingsRepo = await import("../../repositories/settings"); - const jobSpy = await import("../../services/jobspy"); - const crawler = await import("../../services/crawler"); - const ukVisa = await import("../../services/ukvisajobs"); + const jobsRepo = await import("../../repositories/jobs"); + const registryModule = await import("../../extractors/registry"); + + const jobspyManifest = { + id: "jobspy", + displayName: "JobSpy", + providesSources: ["indeed", "linkedin", "glassdoor"], + run: vi.fn().mockResolvedValue({ success: true, jobs: [] }), + }; + const gradcrackerManifest = { + id: "gradcracker", + displayName: "Gradcracker", + providesSources: ["gradcracker"], + run: vi.fn().mockResolvedValue({ success: true, jobs: [] }), + }; + const ukvisaManifest = { + id: "ukvisajobs", + displayName: "UK Visa Jobs", + providesSources: ["ukvisajobs"], + run: vi.fn().mockResolvedValue({ success: true, jobs: [] }), + }; vi.mocked(settingsRepo.getAllSettings).mockResolvedValue({ searchTerms: JSON.stringify(["engineer"]), } as any); + vi.mocked(jobsRepo.getAllJobUrls).mockResolvedValue([ + "https://example.com/existing", + ]); - vi.mocked(jobSpy.runJobSpy).mockResolvedValue({ - success: true, - jobs: [], - } as any); - vi.mocked(crawler.runCrawler).mockResolvedValue({ - success: true, - jobs: [], - } as any); - vi.mocked(ukVisa.runUkVisaJobs).mockResolvedValue({ - success: true, - jobs: [], + vi.mocked(registryModule.getExtractorRegistry).mockResolvedValue({ + manifests: new Map([ + ["jobspy", jobspyManifest as any], + ["gradcracker", gradcrackerManifest as any], + ["ukvisajobs", ukvisaManifest as any], + ]), + manifestBySource: new Map([ + ["indeed", jobspyManifest as any], + ["linkedin", jobspyManifest as any], + ["glassdoor", jobspyManifest as any], + ["gradcracker", gradcrackerManifest as any], + ["ukvisajobs", ukvisaManifest as any], + ]), + availableSources: [ + "indeed", + "linkedin", + "glassdoor", + "gradcracker", + "ukvisajobs", + ], } as any); await discoverJobsStep({ mergedConfig: { - ...config, + ...baseConfig, sources: ["linkedin", "gradcracker", "ukvisajobs"], }, }); @@ -717,5 +375,17 @@ describe("discoverJobsStep", () => { const progress = getProgress(); expect(progress.crawlingSourcesTotal).toBe(3); expect(progress.crawlingSourcesCompleted).toBe(3); + expect(gradcrackerManifest.run).toHaveBeenCalledWith( + expect.objectContaining({ + getExistingJobUrls: expect.any(Function), + }), + ); + + const [{ getExistingJobUrls }] = gradcrackerManifest.run.mock.calls[0] as [ + { getExistingJobUrls: () => Promise }, + ]; + await expect(getExistingJobUrls()).resolves.toEqual([ + "https://example.com/existing", + ]); }); }); diff --git a/orchestrator/src/server/pipeline/steps/discover-jobs.ts b/orchestrator/src/server/pipeline/steps/discover-jobs.ts index a0195be..fbe7e12 100644 --- a/orchestrator/src/server/pipeline/steps/discover-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/discover-jobs.ts @@ -1,20 +1,20 @@ import { logger } from "@infra/logger"; +import { sanitizeUnknown } from "@infra/sanitize"; import { formatCountryLabel, - getAdzunaCountryCode, isSourceAllowedForCountry, normalizeCountryKey, } from "@shared/location-support.js"; import { normalizeStringArray } from "@shared/normalize-string-array.js"; -import { parseSearchCitiesSetting } from "@shared/search-cities.js"; +import { + matchesRequestedCity, + resolveSearchCities, + shouldApplyStrictCityFilter, +} from "@shared/search-cities.js"; import type { CreateJobInput, PipelineConfig } from "@shared/types"; -import * as jobsRepo from "../../repositories/jobs"; +import { getExtractorRegistry } from "../../extractors/registry"; +import { getAllJobUrls } from "../../repositories/jobs"; import * as settingsRepo from "../../repositories/settings"; -import { runAdzuna } from "../../services/adzuna"; -import { runCrawler } from "../../services/crawler"; -import { runHiringCafe } from "../../services/hiring-cafe"; -import { runJobSpy } from "../../services/jobspy"; -import { runUkVisaJobs } from "../../services/ukvisajobs"; import { asyncPool } from "../../utils/async-pool"; import { type CrawlSource, progressHelpers, updateProgress } from "../progress"; @@ -57,6 +57,26 @@ function isBlockedEmployer( ); } +function filterJobsByRequestedCities(args: { + jobs: CreateJobInput[]; + selectedCountry: string; + requestedCities: string[]; +}): CreateJobInput[] { + const { jobs, selectedCountry, requestedCities } = args; + if (requestedCities.length === 0) return jobs; + + return jobs.filter((job) => + requestedCities.some((requestedCity) => { + const strict = shouldApplyStrictCityFilter( + requestedCity, + selectedCountry, + ); + if (!strict) return true; + return matchesRequestedCity(job.location, requestedCity); + }), + ); +} + export async function discoverJobsStep(args: { mergedConfig: PipelineConfig; shouldCancel?: () => boolean; @@ -70,6 +90,7 @@ export async function discoverJobsStep(args: { const sourceErrors: string[] = []; const settings = await settingsRepo.getAllSettings(); + const registry = await getExtractorRegistry(); const searchTermsSetting = settings.searchTerms; let searchTerms: string[] = []; @@ -94,6 +115,13 @@ export async function discoverJobsStep(args: { const compatibleSources = args.mergedConfig.sources.filter((source) => isSourceAllowedForCountry(source, selectedCountry), ); + let existingJobUrlsPromise: Promise | null = null; + const getExistingJobUrls = (): Promise => { + if (!existingJobUrlsPromise) { + existingJobUrlsPromise = getAllJobUrls(); + } + return existingJobUrlsPromise; + }; const skippedSources = args.mergedConfig.sources.filter( (source) => !compatibleSources.includes(source), ); @@ -114,390 +142,95 @@ export async function discoverJobsStep(args: { ); } - const jobSpySites = compatibleSources.filter( - (source): source is "indeed" | "linkedin" | "glassdoor" => - source === "indeed" || source === "linkedin" || source === "glassdoor", - ); + const groupedByManifest = new Map< + string, + { sources: string[]; detail: string; termsTotal?: number } + >(); + + for (const source of compatibleSources) { + const manifest = registry.manifestBySource.get(source); + if (!manifest) { + sourceErrors.push(`${source}: extractor manifest not registered`); + continue; + } + + const existing = groupedByManifest.get(manifest.id); + if (existing) { + existing.sources.push(source); + continue; + } + + groupedByManifest.set(manifest.id, { + sources: [source], + termsTotal: searchTerms.length, + detail: `${manifest.displayName}: fetching jobs...`, + }); + } const sourceTasks: DiscoverySourceTask[] = []; - if (jobSpySites.length > 0) { + for (const [manifestId, grouped] of groupedByManifest) { + const manifest = registry.manifests.get(manifestId); + if (!manifest) continue; + sourceTasks.push({ - source: "jobspy", - termsTotal: searchTerms.length, - detail: `JobSpy: scraping ${jobSpySites.join(", ")}...`, + source: manifest.id, + termsTotal: grouped.termsTotal, + detail: + grouped.sources.length > 1 + ? `${manifest.displayName}: ${grouped.sources.join(", ")}...` + : grouped.detail, run: async () => { - const jobSpyResult = await runJobSpy({ - sites: jobSpySites, - searchTerms, - location: - settings.searchCities ?? settings.jobspyLocation ?? undefined, - resultsWanted: settings.jobspyResultsWanted - ? parseInt(settings.jobspyResultsWanted, 10) - : undefined, - countryIndeed: settings.jobspyCountryIndeed ?? undefined, - onProgress: (event) => { - if (event.type === "term_start") { - progressHelpers.crawlingUpdate({ - source: "jobspy", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `JobSpy: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); - return; - } - - progressHelpers.crawlingUpdate({ - source: "jobspy", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `JobSpy: completed ${event.termIndex}/${event.termTotal} (${event.searchTerm}) with ${event.jobsFoundTerm} jobs`, - }); - }, - }); - - if (!jobSpyResult.success) { - return { - discoveredJobs: [], - sourceErrors: [`jobspy: ${jobSpyResult.error ?? "unknown error"}`], - }; - } - - return { - discoveredJobs: jobSpyResult.jobs, - sourceErrors: [], - }; - }, - }); - } - - if (compatibleSources.includes("adzuna")) { - sourceTasks.push({ - source: "adzuna", - termsTotal: searchTerms.length, - detail: "Adzuna: fetching jobs...", - run: async () => { - const adzunaCountryCode = getAdzunaCountryCode(selectedCountry); - if (!adzunaCountryCode) { - return { - discoveredJobs: [], - sourceErrors: [ - `adzuna: unsupported country ${formatCountryLabel(selectedCountry)}`, - ], - }; - } - - const adzunaMaxJobsPerTerm = settings.adzunaMaxJobsPerTerm - ? parseInt(settings.adzunaMaxJobsPerTerm, 10) - : 50; - - const adzunaResult = await runAdzuna({ - country: adzunaCountryCode, - countryKey: selectedCountry, - locations: parseSearchCitiesSetting( - settings.searchCities ?? settings.jobspyLocation, + const filteredSettings = Object.fromEntries( + Object.entries(settings).filter( + ([, value]) => + typeof value === "string" || typeof value === "undefined", ), + ) as Record; + + const result = await manifest.run({ + source: grouped.sources[0], + selectedSources: grouped.sources, + settings: filteredSettings, searchTerms, - maxJobsPerTerm: adzunaMaxJobsPerTerm, + selectedCountry, + getExistingJobUrls, + shouldCancel: args.shouldCancel, onProgress: (event) => { - if (event.type === "term_start") { - progressHelpers.crawlingUpdate({ - source: "adzuna", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `Adzuna: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); - return; - } - - if (event.type === "page_fetched") { - progressHelpers.crawlingUpdate({ - source: "adzuna", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - listPagesProcessed: event.pageNo, - jobPagesEnqueued: event.totalCollected, - jobPagesProcessed: event.totalCollected, - phase: "list", - currentUrl: `page ${event.pageNo}`, - }); - updateProgress({ - step: "crawling", - detail: `Adzuna: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo} (${event.totalCollected} collected)`, - }); - return; - } - progressHelpers.crawlingUpdate({ - source: "adzuna", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `Adzuna: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, + source: manifest.id, + termsProcessed: event.termsProcessed, + termsTotal: event.termsTotal, + listPagesProcessed: event.listPagesProcessed, + listPagesTotal: event.listPagesTotal, + jobCardsFound: event.jobCardsFound, + jobPagesEnqueued: event.jobPagesEnqueued, + jobPagesSkipped: event.jobPagesSkipped, + jobPagesProcessed: event.jobPagesProcessed, + phase: event.phase, + currentUrl: event.currentUrl, }); + + if (event.detail) { + updateProgress({ + step: "crawling", + detail: event.detail, + }); + } }, }); - if (!adzunaResult.success) { - return { - discoveredJobs: [], - sourceErrors: [`adzuna: ${adzunaResult.error ?? "unknown error"}`], - }; - } - - return { - discoveredJobs: adzunaResult.jobs, - sourceErrors: [], - }; - }, - }); - } - - if (compatibleSources.includes("hiringcafe")) { - sourceTasks.push({ - source: "hiringcafe", - termsTotal: searchTerms.length, - detail: "Hiring Cafe: fetching jobs...", - run: async () => { - const hiringCafeMaxJobsPerTerm = settings.jobspyResultsWanted - ? parseInt(settings.jobspyResultsWanted, 10) - : 200; - - const hiringCafeResult = await runHiringCafe({ - country: selectedCountry, - countryKey: selectedCountry, - locations: parseSearchCitiesSetting( - settings.searchCities ?? settings.jobspyLocation, - ), - searchTerms, - maxJobsPerTerm: hiringCafeMaxJobsPerTerm, - onProgress: (event) => { - if (event.type === "term_start") { - progressHelpers.crawlingUpdate({ - source: "hiringcafe", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); - return; - } - - if (event.type === "page_fetched") { - const displayPageNo = event.pageNo + 1; - progressHelpers.crawlingUpdate({ - source: "hiringcafe", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - listPagesProcessed: displayPageNo, - jobPagesEnqueued: event.totalCollected, - jobPagesProcessed: event.totalCollected, - phase: "list", - currentUrl: `page ${displayPageNo}`, - }); - updateProgress({ - step: "crawling", - detail: `Hiring Cafe: term ${event.termIndex}/${event.termTotal}, page ${displayPageNo} (${event.totalCollected} collected)`, - }); - return; - } - - progressHelpers.crawlingUpdate({ - source: "hiringcafe", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm, - }); - updateProgress({ - step: "crawling", - detail: `Hiring Cafe: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm})`, - }); - }, - }); - - if (!hiringCafeResult.success) { + if (!result.success) { return { discoveredJobs: [], sourceErrors: [ - `hiringcafe: ${hiringCafeResult.error ?? "unknown error"}`, + `${manifest.displayName || manifest.id}: ${result.error ?? "unknown error"} (sources: ${grouped.sources.join(",")})`, ], }; } return { - discoveredJobs: hiringCafeResult.jobs, - sourceErrors: [], - }; - }, - }); - } - - if (compatibleSources.includes("gradcracker")) { - sourceTasks.push({ - source: "gradcracker", - detail: "Gradcracker: scraping...", - run: async () => { - const existingJobUrls = await jobsRepo.getAllJobUrls(); - const gradcrackerMaxJobs = settings.gradcrackerMaxJobsPerTerm - ? parseInt(settings.gradcrackerMaxJobsPerTerm, 10) - : 50; - - const crawlerResult = await runCrawler({ - existingJobUrls, - searchTerms, - maxJobsPerTerm: gradcrackerMaxJobs, - onProgress: (progress) => { - progressHelpers.crawlingUpdate({ - source: "gradcracker", - listPagesProcessed: progress.listPagesProcessed, - listPagesTotal: progress.listPagesTotal, - jobCardsFound: progress.jobCardsFound, - jobPagesEnqueued: progress.jobPagesEnqueued, - jobPagesSkipped: progress.jobPagesSkipped, - jobPagesProcessed: progress.jobPagesProcessed, - phase: progress.phase, - currentUrl: progress.currentUrl, - }); - }, - }); - - if (!crawlerResult.success) { - return { - discoveredJobs: [], - sourceErrors: [ - `gradcracker: ${crawlerResult.error ?? "unknown error"}`, - ], - }; - } - - return { - discoveredJobs: crawlerResult.jobs, - sourceErrors: [], - }; - }, - }); - } - - if (compatibleSources.includes("ukvisajobs")) { - sourceTasks.push({ - source: "ukvisajobs", - termsTotal: searchTerms.length, - detail: "UKVisaJobs: scraping visa-sponsoring jobs...", - run: async () => { - const ukvisajobsMaxJobs = settings.ukvisajobsMaxJobs - ? parseInt(settings.ukvisajobsMaxJobs, 10) - : 50; - - const ukVisaResult = await runUkVisaJobs({ - maxJobs: ukvisajobsMaxJobs, - searchTerms, - onProgress: (event) => { - if (event.type === "init") { - progressHelpers.crawlingUpdate({ - source: "ukvisajobs", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - listPagesProcessed: 0, - listPagesTotal: event.maxPages, - jobPagesEnqueued: 0, - jobPagesProcessed: 0, - jobPagesSkipped: 0, - phase: "list", - currentUrl: event.searchTerm || "all jobs", - }); - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, - }); - return; - } - - if (event.type === "page_fetched") { - progressHelpers.crawlingUpdate({ - source: "ukvisajobs", - termsProcessed: Math.max(event.termIndex - 1, 0), - termsTotal: event.termTotal, - listPagesProcessed: event.pageNo, - listPagesTotal: event.maxPages, - jobPagesEnqueued: event.totalCollected, - jobPagesProcessed: event.totalCollected, - phase: "list", - currentUrl: `page ${event.pageNo}/${event.maxPages}`, - }); - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: term ${event.termIndex}/${event.termTotal}, page ${event.pageNo}/${event.maxPages} (${event.totalCollected} collected)`, - }); - return; - } - - if (event.type === "term_complete") { - progressHelpers.crawlingUpdate({ - source: "ukvisajobs", - termsProcessed: event.termIndex, - termsTotal: event.termTotal, - phase: "list", - currentUrl: event.searchTerm || "all jobs", - }); - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: completed term ${event.termIndex}/${event.termTotal} (${event.searchTerm || "all jobs"})`, - }); - return; - } - - if (event.type === "empty_page") { - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: page ${event.pageNo} returned no jobs`, - }); - return; - } - - if (event.type === "error") { - updateProgress({ - step: "crawling", - detail: `UKVisaJobs: ${event.message}`, - }); - } - }, - }); - - if (!ukVisaResult.success) { - return { - discoveredJobs: [], - sourceErrors: [ - `ukvisajobs: ${ukVisaResult.error ?? "unknown error"}`, - ], - }; - } - - return { - discoveredJobs: ukVisaResult.jobs, + discoveredJobs: result.jobs, sourceErrors: [], }; }, @@ -536,6 +269,11 @@ export async function discoverJobsStep(args: { try { return await sourceTask.run(); } catch (error) { + logger.warn("Discovery source task failed", { + sourceTask: sourceTask.source, + error: sanitizeUnknown(error), + }); + return { discoveredJobs: [], sourceErrors: [ @@ -551,16 +289,35 @@ export async function discoverJobsStep(args: { sourceErrors.push(...sourceResult.sourceErrors); } + const requestedCities = resolveSearchCities({ + single: settings.searchCities ?? settings.jobspyLocation, + }); + const cityFilteredJobs = filterJobsByRequestedCities({ + jobs: discoveredJobs, + selectedCountry, + requestedCities, + }); + const cityFilteredOutCount = discoveredJobs.length - cityFilteredJobs.length; + + if (cityFilteredOutCount > 0) { + logger.info("Dropped discovered jobs that did not match requested cities", { + step: "discover-jobs", + droppedCount: cityFilteredOutCount, + requestedCities, + selectedCountry, + }); + } + const blockedCompanyKeywords = parseBlockedCompanyKeywords( settings.blockedCompanyKeywords, ); const blockedKeywordsLowerCase = blockedCompanyKeywords.map((value) => value.toLowerCase(), ); - const filteredDiscoveredJobs = discoveredJobs.filter( + const filteredDiscoveredJobs = cityFilteredJobs.filter( (job) => !isBlockedEmployer(job.employer, blockedKeywordsLowerCase), ); - const droppedCount = discoveredJobs.length - filteredDiscoveredJobs.length; + const droppedCount = cityFilteredJobs.length - filteredDiscoveredJobs.length; if (droppedCount > 0) { const blockedCompanyKeywordsPreview = blockedCompanyKeywords.slice(0, 10); diff --git a/orchestrator/src/server/services/adzuna.location.test.ts b/orchestrator/src/server/services/adzuna.location.test.ts deleted file mode 100644 index 7250c2f..0000000 --- a/orchestrator/src/server/services/adzuna.location.test.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { describe, expect, it } from "vitest"; -import { - matchesRequestedLocation, - shouldApplyStrictLocationFilter, -} from "./adzuna"; - -describe("adzuna strict location filtering", () => { - it("enables strict filtering when city differs from country", () => { - expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe( - true, - ); - }); - - it("disables strict filtering when location is country-level", () => { - expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false); - expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false); - }); - - it("matches requested location by case-insensitive contains", () => { - expect(matchesRequestedLocation("Leeds, England, UK", "leeds")).toBe(true); - expect(matchesRequestedLocation("Halifax, England, UK", "leeds")).toBe( - false, - ); - expect(matchesRequestedLocation(undefined, "leeds")).toBe(false); - }); -}); diff --git a/orchestrator/src/server/services/hiring-cafe.location.test.ts b/orchestrator/src/server/services/hiring-cafe.location.test.ts deleted file mode 100644 index c543eb5..0000000 --- a/orchestrator/src/server/services/hiring-cafe.location.test.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { describe, expect, it } from "vitest"; -import { - matchesRequestedLocation, - shouldApplyStrictLocationFilter, -} from "./hiring-cafe"; - -describe("hiringcafe strict location filtering", () => { - it("enables strict filtering when city differs from country", () => { - expect(shouldApplyStrictLocationFilter("Leeds", "united kingdom")).toBe( - true, - ); - }); - - it("disables strict filtering when location is country-level", () => { - expect(shouldApplyStrictLocationFilter("UK", "united kingdom")).toBe(false); - expect(shouldApplyStrictLocationFilter("United States", "us")).toBe(false); - }); - - it("matches requested location by case-insensitive contains", () => { - expect(matchesRequestedLocation("Leeds, England, UK", "leeds")).toBe(true); - expect(matchesRequestedLocation("Halifax, England, UK", "leeds")).toBe( - false, - ); - expect(matchesRequestedLocation(undefined, "leeds")).toBe(false); - }); -}); diff --git a/orchestrator/vite.config.ts b/orchestrator/vite.config.ts index 9f0773f..8b63b57 100644 --- a/orchestrator/vite.config.ts +++ b/orchestrator/vite.config.ts @@ -32,6 +32,7 @@ export default defineConfig({ "src/**/*.test.ts", "src/**/*.test.tsx", "../shared/src/**/*.test.ts", + "../extractors/**/tests/**/*.test.ts", ], exclude: ["node_modules/**", "dist/**"], }, diff --git a/package.json b/package.json index 4e0eb70..5286224 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "shared" ], "scripts": { + "test:all": "npm --workspace orchestrator run test:run", "check:types:shared": "npm --workspace shared run check:types", "check:types": "npm --workspace shared run check:types && npm --workspace orchestrator run check:types", "check:types:ukvisajobs": "npm --workspace ukvisajobs-extractor run check:types", diff --git a/shared/src/extractors/index.test.ts b/shared/src/extractors/index.test.ts new file mode 100644 index 0000000..73e2dbe --- /dev/null +++ b/shared/src/extractors/index.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from "vitest"; +import { + EXTRACTOR_SOURCE_IDS, + EXTRACTOR_SOURCE_METADATA, + extractorSourceEnum, + isExtractorSourceId, +} from "./index"; + +describe("extractor source catalog", () => { + it("validates known source ids", () => { + for (const source of EXTRACTOR_SOURCE_IDS) { + expect(extractorSourceEnum.parse(source)).toBe(source); + expect(isExtractorSourceId(source)).toBe(true); + } + }); + + it("rejects unknown source ids", () => { + expect(isExtractorSourceId("unknown-source")).toBe(false); + expect(() => extractorSourceEnum.parse("unknown-source")).toThrow(); + }); + + it("provides metadata for every known source", () => { + for (const source of EXTRACTOR_SOURCE_IDS) { + expect(EXTRACTOR_SOURCE_METADATA[source]).toBeDefined(); + expect(EXTRACTOR_SOURCE_METADATA[source].label.length).toBeGreaterThan(0); + } + }); +}); diff --git a/shared/src/extractors/index.ts b/shared/src/extractors/index.ts new file mode 100644 index 0000000..ad9ee1e --- /dev/null +++ b/shared/src/extractors/index.ts @@ -0,0 +1,81 @@ +import { z } from "zod"; + +export const EXTRACTOR_SOURCE_IDS = [ + "gradcracker", + "indeed", + "linkedin", + "glassdoor", + "ukvisajobs", + "adzuna", + "hiringcafe", + "manual", +] as const; + +export type ExtractorSourceId = (typeof EXTRACTOR_SOURCE_IDS)[number]; + +export interface ExtractorSourceMetadata { + label: string; + order: number; + category: "pipeline" | "manual"; + requiresCredentials?: boolean; + ukOnly?: boolean; +} + +export const EXTRACTOR_SOURCE_METADATA: Record< + ExtractorSourceId, + ExtractorSourceMetadata +> = { + gradcracker: { + label: "Gradcracker", + order: 10, + category: "pipeline", + ukOnly: true, + }, + indeed: { label: "Indeed", order: 20, category: "pipeline" }, + linkedin: { label: "LinkedIn", order: 30, category: "pipeline" }, + glassdoor: { label: "Glassdoor", order: 40, category: "pipeline" }, + ukvisajobs: { + label: "UK Visa Jobs", + order: 50, + category: "pipeline", + requiresCredentials: true, + ukOnly: true, + }, + adzuna: { + label: "Adzuna", + order: 60, + category: "pipeline", + requiresCredentials: true, + }, + hiringcafe: { label: "Hiring Cafe", order: 70, category: "pipeline" }, + manual: { label: "Manual", order: 90, category: "manual" }, +}; + +export const PIPELINE_EXTRACTOR_SOURCE_IDS = EXTRACTOR_SOURCE_IDS.filter( + (source) => EXTRACTOR_SOURCE_METADATA[source].category === "pipeline", +); + +const extractorSourceTuple = EXTRACTOR_SOURCE_IDS as unknown as [ + ExtractorSourceId, + ...ExtractorSourceId[], +]; + +export const extractorSourceEnum = z.enum(extractorSourceTuple); + +export function isExtractorSourceId(value: string): value is ExtractorSourceId { + return EXTRACTOR_SOURCE_IDS.includes(value as ExtractorSourceId); +} + +export function sourceLabel(source: ExtractorSourceId): string { + return EXTRACTOR_SOURCE_METADATA[source].label; +} + +export function sortSources( + values: T[], +): T[] { + return [...values].sort( + (left, right) => + EXTRACTOR_SOURCE_METADATA[left.source].order - + EXTRACTOR_SOURCE_METADATA[right.source].order, + ); +} diff --git a/shared/src/index.ts b/shared/src/index.ts index 1a1ec42..33a0a42 100644 --- a/shared/src/index.ts +++ b/shared/src/index.ts @@ -1,3 +1,4 @@ +export * from "./extractors"; export * from "./location-support"; export * from "./types"; export * from "./utils/type-conversion"; diff --git a/shared/src/search-cities.test.ts b/shared/src/search-cities.test.ts index 941f4f4..fa7eb9a 100644 --- a/shared/src/search-cities.test.ts +++ b/shared/src/search-cities.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it } from "vitest"; import { matchesRequestedCity, parseSearchCitiesSetting, + resolveSearchCities, serializeSearchCitiesSetting, shouldApplyStrictCityFilter, } from "./search-cities"; @@ -26,6 +27,43 @@ describe("search-cities", () => { expect(serializeSearchCitiesSetting([])).toBeNull(); }); + it("resolves search cities from list/single/env/fallback", () => { + expect( + resolveSearchCities({ + list: [" Leeds ", "London", "leeds"], + }), + ).toEqual(["Leeds", "London"]); + + expect(resolveSearchCities({ single: "Leeds|London" })).toEqual([ + "Leeds", + "London", + ]); + expect(resolveSearchCities({ env: "Leeds\nLondon" })).toEqual([ + "Leeds", + "London", + ]); + expect(resolveSearchCities({ fallback: "UK" })).toEqual(["UK"]); + }); + + it("falls back when single/env values parse to empty", () => { + expect(resolveSearchCities({ single: "", fallback: "UK" })).toEqual(["UK"]); + expect(resolveSearchCities({ single: "||", fallback: "UK" })).toEqual([ + "UK", + ]); + expect(resolveSearchCities({ env: " ", fallback: "UK" })).toEqual(["UK"]); + }); + + it("returns empty array when all resolve options are empty", () => { + expect( + resolveSearchCities({ + list: [], + single: "", + env: "", + fallback: "", + }), + ).toEqual([]); + }); + it("applies strict filter only when city differs from country", () => { expect(shouldApplyStrictCityFilter("Leeds", "united kingdom")).toBe(true); expect(shouldApplyStrictCityFilter("UK", "united kingdom")).toBe(false); diff --git a/shared/src/search-cities.ts b/shared/src/search-cities.ts index 4fcbfd4..54d4002 100644 --- a/shared/src/search-cities.ts +++ b/shared/src/search-cities.ts @@ -37,6 +37,36 @@ export function parseSearchCitiesSetting( return out; } +interface ResolveSearchCitiesOptions { + list?: string[] | null; + single?: string | null; + env?: string | null; + fallback?: string | null; +} + +export function resolveSearchCities( + options: ResolveSearchCitiesOptions, +): string[] { + // Priority order: + // 1) explicit list (searchCities array in config) + // 2) explicit single value + // 3) environment fallback + // 4) final hardcoded/default fallback + if (options.list && options.list.length > 0) { + const parsedList = parseSearchCitiesSetting(options.list.join("|")); + if (parsedList.length > 0) return parsedList; + } + + const fallbackCandidates = [options.single, options.env, options.fallback]; + for (const candidate of fallbackCandidates) { + if (candidate === null || candidate === undefined) continue; + const parsed = parseSearchCitiesSetting(candidate); + if (parsed.length > 0) return parsed; + } + + return []; +} + export function serializeSearchCitiesSetting(cities: string[]): string | null { if (cities.length === 0) return null; return cities.join("|"); diff --git a/shared/src/types.ts b/shared/src/types.ts index db6ee45..0fc026c 100644 --- a/shared/src/types.ts +++ b/shared/src/types.ts @@ -7,6 +7,7 @@ export * from "./types/api"; export * from "./types/chat"; +export * from "./types/extractors"; export * from "./types/jobs"; export * from "./types/pipeline"; export * from "./types/post-application"; diff --git a/shared/src/types/extractors.ts b/shared/src/types/extractors.ts new file mode 100644 index 0000000..1606e48 --- /dev/null +++ b/shared/src/types/extractors.ts @@ -0,0 +1,40 @@ +import type { CreateJobInput } from "./jobs"; + +export interface ExtractorProgressEvent { + phase?: "list" | "job"; + currentUrl?: string; + termsProcessed?: number; + termsTotal?: number; + listPagesProcessed?: number; + listPagesTotal?: number; + jobCardsFound?: number; + jobPagesEnqueued?: number; + jobPagesSkipped?: number; + jobPagesProcessed?: number; + detail?: string; +} + +export interface ExtractorRuntimeContext { + source: string; + selectedSources: string[]; + settings: Record; + searchTerms: string[]; + selectedCountry: string; + getExistingJobUrls?: () => Promise; + shouldCancel?: () => boolean; + onProgress?: (event: ExtractorProgressEvent) => void; +} + +export interface ExtractorRunResult { + success: boolean; + jobs: CreateJobInput[]; + error?: string; +} + +export interface ExtractorManifest { + id: string; + displayName: string; + providesSources: readonly string[]; + requiredEnvVars?: readonly string[]; + run: (context: ExtractorRuntimeContext) => Promise; +} diff --git a/shared/src/types/jobs.ts b/shared/src/types/jobs.ts index 755c8c7..a625894 100644 --- a/shared/src/types/jobs.ts +++ b/shared/src/types/jobs.ts @@ -1,3 +1,5 @@ +import type { ExtractorSourceId } from "../extractors"; + export type JobStatus = | "discovered" // Crawled but not processed | "processing" // Currently generating resume @@ -115,15 +117,7 @@ export interface Interview { outcome: InterviewOutcome | null; } -export type JobSource = - | "gradcracker" - | "indeed" - | "linkedin" - | "glassdoor" - | "ukvisajobs" - | "adzuna" - | "hiringcafe" - | "manual"; +export type JobSource = ExtractorSourceId; export interface Job { id: string; diff --git a/shared/src/types/pipeline.ts b/shared/src/types/pipeline.ts index e28b71d..fefc789 100644 --- a/shared/src/types/pipeline.ts +++ b/shared/src/types/pipeline.ts @@ -1,9 +1,10 @@ -import type { Job, JobSource, JobStatus } from "./jobs"; +import type { ExtractorSourceId } from "../extractors"; +import type { Job, JobStatus } from "./jobs"; export interface PipelineConfig { topN: number; // Number of top jobs to process minSuitabilityScore: number; // Minimum score to auto-process - sources: JobSource[]; // Job sources to crawl + sources: ExtractorSourceId[]; // Job sources to crawl outputDir: string; // Directory for generated PDFs enableCrawling?: boolean; enableScoring?: boolean;