diff --git a/.github/workflows/docs-version.yml b/.github/workflows/docs-version.yml index 735c9b5..2fb9e5a 100644 --- a/.github/workflows/docs-version.yml +++ b/.github/workflows/docs-version.yml @@ -46,6 +46,13 @@ jobs: run: npm run docs:version -- "${{ steps.vars.outputs.version }}" working-directory: . + - name: Format generated docs files + run: | + ./orchestrator/node_modules/.bin/biome check --write \ + docs-site/versions.json \ + docs-site/versioned_docs \ + docs-site/versioned_sidebars + - name: Commit and push generated docs version files run: | git config user.name "github-actions[bot]" diff --git a/docs-site/versioned_sidebars/version-0.1.24-sidebars.json b/docs-site/versioned_sidebars/version-0.1.24-sidebars.json index 0e3ec0d..5b2e8f3 100644 --- a/docs-site/versioned_sidebars/version-0.1.24-sidebars.json +++ b/docs-site/versioned_sidebars/version-0.1.24-sidebars.json @@ -61,17 +61,12 @@ { "type": "category", "label": "Troubleshooting", - "items": [ - "troubleshooting/common-problems" - ] + "items": ["troubleshooting/common-problems"] }, { "type": "category", "label": "Reference / FAQ", - "items": [ - "reference/faq", - "reference/documentation-style-guide" - ] + "items": ["reference/faq", "reference/documentation-style-guide"] } ] } diff --git a/docs-site/versions.json b/docs-site/versions.json index b2d26a4..68eb78a 100644 --- a/docs-site/versions.json +++ b/docs-site/versions.json @@ -1,7 +1 @@ -[ - "0.1.24", - "0.1.23", - "0.1.22", - "0.1.21", - "0.1.20" -] +["0.1.24", "0.1.23", "0.1.22", "0.1.21", "0.1.20"] diff --git a/orchestrator/src/server/api/routes/jobs.test.ts b/orchestrator/src/server/api/routes/jobs.test.ts index ea296bf..56d7c10 100644 --- a/orchestrator/src/server/api/routes/jobs.test.ts +++ b/orchestrator/src/server/api/routes/jobs.test.ts @@ -563,6 +563,7 @@ describe.sequential("Jobs API routes", () => { expect( body.data.results.find((r: any) => r.jobId === "missing-id").error.code, ).toBe("NOT_FOUND"); + expect(vi.mocked(getProfile)).toHaveBeenCalledTimes(1); }); it("streams bulk action progress with done counters", async () => { diff --git a/orchestrator/src/server/api/routes/jobs.ts b/orchestrator/src/server/api/routes/jobs.ts index f85b270..7083404 100644 --- a/orchestrator/src/server/api/routes/jobs.ts +++ b/orchestrator/src/server/api/routes/jobs.ts @@ -45,8 +45,10 @@ import { getProfile } from "../../services/profile"; import { scoreJobSuitability } from "../../services/scorer"; import { getTracerReadiness } from "../../services/tracer-links"; import * as visaSponsors from "../../services/visa-sponsors/index"; +import { asyncPool } from "../../utils/async-pool"; export const jobsRouter = Router(); +const BULK_ACTION_CONCURRENCY = 4; const tailoredSkillsPayloadSchema = z.array( z.object({ @@ -275,9 +277,35 @@ function mapErrorForResult(error: unknown): { }; } +type BulkExecutionOptions = { + getProfileForRescore?: () => Promise>; +}; + +function createBulkProfileLoader(): () => Promise> { + let profilePromise: Promise> | null = null; + + return async () => { + if (!profilePromise) { + profilePromise = (async () => { + const rawProfile = await getProfile(); + if ( + !rawProfile || + typeof rawProfile !== "object" || + Array.isArray(rawProfile) + ) { + throw badRequest("Invalid resume profile format"); + } + return rawProfile as Record; + })(); + } + return profilePromise; + }; +} + async function executeBulkActionForJob( action: BulkJobAction, jobId: string, + options?: BulkExecutionOptions, ): Promise { try { const job = await jobsRepo.getJobById(jobId); @@ -356,19 +384,21 @@ async function executeBulkActionForJob( return { jobId, ok: true, job: simulated }; } - const rawProfile = await getProfile(); - if ( - !rawProfile || - typeof rawProfile !== "object" || - Array.isArray(rawProfile) - ) { - throw badRequest("Invalid resume profile format"); - } + const profile = options?.getProfileForRescore + ? await options.getProfileForRescore() + : await (async () => { + const rawProfile = await getProfile(); + if ( + !rawProfile || + typeof rawProfile !== "object" || + Array.isArray(rawProfile) + ) { + throw badRequest("Invalid resume profile format"); + } + return rawProfile as Record; + })(); - const { score, reason } = await scoreJobSuitability( - job, - rawProfile as Record, - ); + const { score, reason } = await scoreJobSuitability(job, profile); const updated = await jobsRepo.updateJob(job.id, { suitabilityScore: score, @@ -508,12 +538,17 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => { try { const parsed = bulkActionRequestSchema.parse(req.body); const dedupedJobIds = Array.from(new Set(parsed.jobIds)); + const executionOptions: BulkExecutionOptions = + parsed.action === "rescore" && !isDemoMode() + ? { getProfileForRescore: createBulkProfileLoader() } + : {}; - const results: BulkJobActionResult[] = []; - for (const jobId of dedupedJobIds) { - const result = await executeBulkActionForJob(parsed.action, jobId); - results.push(result); - } + const results = await asyncPool({ + items: dedupedJobIds, + concurrency: BULK_ACTION_CONCURRENCY, + task: async (jobId) => + executeBulkActionForJob(parsed.action, jobId, executionOptions), + }); const succeeded = results.filter((result) => result.ok).length; const failed = results.length - succeeded; @@ -531,6 +566,7 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => { requested: dedupedJobIds.length, succeeded, failed, + concurrency: BULK_ACTION_CONCURRENCY, }); ok(res, payload); @@ -572,6 +608,10 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { const dedupedJobIds = Array.from(new Set(parsed.data.jobIds)); const requestId = String(res.getHeader("x-request-id") || "unknown"); const action = parsed.data.action; + const executionOptions: BulkExecutionOptions = + action === "rescore" && !isDemoMode() + ? { getProfileForRescore: createBulkProfileLoader() } + : {}; const requested = dedupedJobIds.length; const results: BulkJobActionResult[] = []; let succeeded = 0; @@ -622,47 +662,48 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { return; } - for (const jobId of dedupedJobIds) { - if (!isResponseWritable()) { - logger.info("Client disconnected; stopping bulk job stream", { - route: "POST /api/jobs/bulk-actions/stream", - action, - requested, - succeeded, - failed, - requestId, - }); - break; - } + await asyncPool({ + items: dedupedJobIds, + concurrency: BULK_ACTION_CONCURRENCY, + shouldStop: () => !isResponseWritable(), + task: async (jobId) => { + if (!isResponseWritable()) return; - const result = await executeBulkActionForJob(action, jobId); - results.push(result); - if (result.ok) succeeded += 1; - else failed += 1; + const result = await executeBulkActionForJob( + action, + jobId, + executionOptions, + ); + results.push(result); + if (result.ok) succeeded += 1; + else failed += 1; - if ( - !sendEvent({ - type: "progress", - action, - requested, - completed: results.length, - succeeded, - failed, - result, - requestId, - }) - ) { - logger.info("Client disconnected while writing bulk stream progress", { - route: "POST /api/jobs/bulk-actions/stream", - action, - requested, - succeeded, - failed, - requestId, - }); - break; - } - } + if ( + !sendEvent({ + type: "progress", + action, + requested, + completed: results.length, + succeeded, + failed, + result, + requestId, + }) + ) { + logger.info( + "Client disconnected while writing bulk stream progress", + { + route: "POST /api/jobs/bulk-actions/stream", + action, + requested, + succeeded, + failed, + requestId, + }, + ); + } + }, + }); sendEvent({ type: "completed", @@ -681,6 +722,7 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => { requested, succeeded, failed, + concurrency: BULK_ACTION_CONCURRENCY, requestId, }); } catch (error) { diff --git a/orchestrator/src/server/pipeline/steps/score-jobs.test.ts b/orchestrator/src/server/pipeline/steps/score-jobs.test.ts index f03e324..9c1c5c9 100644 --- a/orchestrator/src/server/pipeline/steps/score-jobs.test.ts +++ b/orchestrator/src/server/pipeline/steps/score-jobs.test.ts @@ -183,4 +183,59 @@ describe("scoreJobsStep auto-skip behavior", () => { expect.objectContaining({ jobId: "job-applied" }), ); }); + + it("scores multiple jobs and reports completion progress", async () => { + const jobsRepo = await import("../../repositories/jobs"); + const scorer = await import("../../services/scorer"); + const { progressHelpers } = await import("../progress"); + + vi.mocked(jobsRepo.getUnscoredDiscoveredJobs).mockResolvedValue([ + createJob({ + id: "job-1", + title: "First Role", + employer: "Acme", + suitabilityScore: null, + }), + createJob({ + id: "job-2", + title: "Second Role", + employer: "Beta", + suitabilityScore: null, + }), + ]); + + vi.mocked(scorer.scoreJobSuitability) + .mockResolvedValueOnce({ score: 61, reason: "First score" }) + .mockResolvedValueOnce({ score: 72, reason: "Second score" }); + + const result = await scoreJobsStep({ profile: {} }); + + expect(result.scoredJobs).toHaveLength(2); + expect(vi.mocked(jobsRepo.updateJob)).toHaveBeenCalledTimes(2); + expect(vi.mocked(progressHelpers.scoringJob)).toHaveBeenCalledTimes(2); + expect(vi.mocked(progressHelpers.scoringComplete)).toHaveBeenCalledWith(2); + }); + + it("stops before processing when cancellation is requested", async () => { + const jobsRepo = await import("../../repositories/jobs"); + const scorer = await import("../../services/scorer"); + + vi.mocked(jobsRepo.getUnscoredDiscoveredJobs).mockResolvedValue([ + createJob({ + id: "job-1", + title: "Cancelled Role", + employer: "Acme", + suitabilityScore: null, + }), + ]); + + const result = await scoreJobsStep({ + profile: {}, + shouldCancel: () => true, + }); + + expect(result.scoredJobs).toHaveLength(0); + expect(vi.mocked(scorer.scoreJobSuitability)).not.toHaveBeenCalled(); + expect(vi.mocked(jobsRepo.updateJob)).not.toHaveBeenCalled(); + }); }); diff --git a/orchestrator/src/server/pipeline/steps/score-jobs.ts b/orchestrator/src/server/pipeline/steps/score-jobs.ts index e488f93..a372232 100644 --- a/orchestrator/src/server/pipeline/steps/score-jobs.ts +++ b/orchestrator/src/server/pipeline/steps/score-jobs.ts @@ -4,9 +4,12 @@ import * as jobsRepo from "../../repositories/jobs"; import * as settingsRepo from "../../repositories/settings"; import { scoreJobSuitability } from "../../services/scorer"; import * as visaSponsors from "../../services/visa-sponsors/index"; +import { asyncPool } from "../../utils/async-pool"; import { progressHelpers, updateProgress } from "../progress"; import type { ScoredJob } from "./types"; +const SCORING_CONCURRENCY = 4; + export async function scoreJobsStep(args: { profile: Record; shouldCancel?: () => boolean; @@ -32,78 +35,91 @@ export async function scoreJobsStep(args: { }); const scoredJobs: ScoredJob[] = []; + let completed = 0; - for (let i = 0; i < unprocessedJobs.length; i++) { - if (args.shouldCancel?.()) break; + await asyncPool({ + items: unprocessedJobs, + concurrency: SCORING_CONCURRENCY, + shouldStop: args.shouldCancel, + task: async (job) => { + if (args.shouldCancel?.()) return; - const job = unprocessedJobs[i]; - const hasCachedScore = - typeof job.suitabilityScore === "number" && - !Number.isNaN(job.suitabilityScore); + const hasCachedScore = + typeof job.suitabilityScore === "number" && + !Number.isNaN(job.suitabilityScore); - progressHelpers.scoringJob( - i + 1, - unprocessedJobs.length, - hasCachedScore ? `${job.title} (cached)` : job.title, - ); + if (hasCachedScore) { + completed += 1; + progressHelpers.scoringJob( + completed, + unprocessedJobs.length, + `${job.title} (cached)`, + ); + scoredJobs.push({ + ...job, + suitabilityScore: job.suitabilityScore as number, + suitabilityReason: job.suitabilityReason ?? "", + }); + return; + } - if (hasCachedScore) { + const { score, reason } = await scoreJobSuitability(job, args.profile); + if (args.shouldCancel?.()) return; + + let sponsorMatchScore = 0; + let sponsorMatchNames: string | undefined; + + if (job.employer) { + const sponsorResults = visaSponsors.searchSponsors(job.employer, { + limit: 10, + minScore: 50, + }); + + const summary = + visaSponsors.calculateSponsorMatchSummary(sponsorResults); + sponsorMatchScore = summary.sponsorMatchScore; + sponsorMatchNames = summary.sponsorMatchNames ?? undefined; + } + + // Check if job should be auto-skipped based on score threshold + const shouldAutoSkip = + job.status !== "applied" && + autoSkipThreshold !== null && + !Number.isNaN(autoSkipThreshold) && + score < autoSkipThreshold; + + await jobsRepo.updateJob(job.id, { + suitabilityScore: score, + suitabilityReason: reason, + sponsorMatchScore, + sponsorMatchNames, + ...(shouldAutoSkip ? { status: "skipped" } : {}), + }); + + if (shouldAutoSkip) { + logger.info("Auto-skipped job due to low score", { + jobId: job.id, + title: job.title, + score, + threshold: autoSkipThreshold, + }); + } + + completed += 1; + progressHelpers.scoringJob(completed, unprocessedJobs.length, job.title); scoredJobs.push({ ...job, - suitabilityScore: job.suitabilityScore as number, - suitabilityReason: job.suitabilityReason ?? "", + suitabilityScore: score, + suitabilityReason: reason, }); - continue; - } - - const { score, reason } = await scoreJobSuitability(job, args.profile); - scoredJobs.push({ - ...job, - suitabilityScore: score, - suitabilityReason: reason, - }); - - let sponsorMatchScore = 0; - let sponsorMatchNames: string | undefined; - - if (job.employer) { - const sponsorResults = visaSponsors.searchSponsors(job.employer, { - limit: 10, - minScore: 50, - }); - - const summary = visaSponsors.calculateSponsorMatchSummary(sponsorResults); - sponsorMatchScore = summary.sponsorMatchScore; - sponsorMatchNames = summary.sponsorMatchNames ?? undefined; - } - - // Check if job should be auto-skipped based on score threshold - const shouldAutoSkip = - job.status !== "applied" && - autoSkipThreshold !== null && - !Number.isNaN(autoSkipThreshold) && - score < autoSkipThreshold; - - await jobsRepo.updateJob(job.id, { - suitabilityScore: score, - suitabilityReason: reason, - sponsorMatchScore, - sponsorMatchNames, - ...(shouldAutoSkip ? { status: "skipped" } : {}), - }); - - if (shouldAutoSkip) { - logger.info("Auto-skipped job due to low score", { - jobId: job.id, - title: job.title, - score, - threshold: autoSkipThreshold, - }); - } - } + }, + }); progressHelpers.scoringComplete(scoredJobs.length); - logger.info("Scoring step completed", { scoredJobs: scoredJobs.length }); + logger.info("Scoring step completed", { + scoredJobs: scoredJobs.length, + concurrency: SCORING_CONCURRENCY, + }); return { unprocessedJobs, scoredJobs }; } diff --git a/orchestrator/src/server/utils/async-pool.test.ts b/orchestrator/src/server/utils/async-pool.test.ts new file mode 100644 index 0000000..b497b62 --- /dev/null +++ b/orchestrator/src/server/utils/async-pool.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, it } from "vitest"; +import { asyncPool } from "./async-pool"; + +describe("asyncPool", () => { + it("preserves input order in output", async () => { + const items = [1, 2, 3, 4]; + const result = await asyncPool({ + items, + concurrency: 3, + task: async (item) => { + await new Promise((resolve) => setTimeout(resolve, (5 - item) * 5)); + return item * 10; + }, + }); + + expect(result).toEqual([10, 20, 30, 40]); + }); + + it("clamps non-finite and out-of-range concurrency values", async () => { + let inFlight = 0; + let maxInFlight = 0; + const items = Array.from({ length: 20 }, (_, index) => index); + + await asyncPool({ + items, + concurrency: Number.NaN, + task: async (item) => item, + }); + + await asyncPool({ + items, + concurrency: 100, + task: async (item) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await new Promise((resolve) => setTimeout(resolve, 2)); + inFlight -= 1; + return item; + }, + }); + + expect(maxInFlight).toBeLessThanOrEqual(10); + }); + + it("propagates task errors", async () => { + await expect( + asyncPool({ + items: [1, 2, 3], + concurrency: 2, + task: async (item) => { + if (item === 2) throw new Error("boom"); + return item; + }, + }), + ).rejects.toThrow("boom"); + }); + + it("returns only completed results when stopped early", async () => { + let shouldStop = false; + let completed = 0; + + const result = await asyncPool({ + items: [1, 2, 3, 4, 5], + concurrency: 2, + shouldStop: () => shouldStop, + task: async (item) => { + await new Promise((resolve) => setTimeout(resolve, 3)); + completed += 1; + if (completed >= 2) shouldStop = true; + return item; + }, + }); + + expect(result.length).toBeLessThan(5); + expect(result.length).toBeGreaterThanOrEqual(2); + expect(result.slice(0, 2)).toEqual([1, 2]); + }); +}); diff --git a/orchestrator/src/server/utils/async-pool.ts b/orchestrator/src/server/utils/async-pool.ts new file mode 100644 index 0000000..7f1dbde --- /dev/null +++ b/orchestrator/src/server/utils/async-pool.ts @@ -0,0 +1,38 @@ +export async function asyncPool(args: { + items: readonly TItem[]; + concurrency: number; + shouldStop?: () => boolean; + task: (item: TItem, index: number) => Promise; +}): Promise { + const { items, task, shouldStop } = args; + const rawConcurrency = Number.isFinite(args.concurrency) + ? args.concurrency + : 1; + const safeConcurrency = Math.max(1, Math.min(10, Math.floor(rawConcurrency))); + + if (items.length === 0) return []; + + const UNSET = Symbol("unset"); + const results: Array = Array.from( + { length: items.length }, + () => UNSET, + ); + let nextIndex = 0; + + const worker = async (): Promise => { + while (true) { + if (shouldStop?.()) return; + + const currentIndex = nextIndex; + nextIndex += 1; + if (currentIndex >= items.length) return; + + results[currentIndex] = await task(items[currentIndex], currentIndex); + } + }; + + const workerCount = Math.min(safeConcurrency, items.length); + await Promise.all(Array.from({ length: workerCount }, () => worker())); + + return results.filter((value): value is TResult => value !== UNSET); +}