feat(orchestrator): parallelize scoring and bulk rescore with bounded concurrency (#194)
* feat(orchestrator): parallelize scoring and bulk rescore with bounded concurrency * chore(docs): format versioned docs metadata for biome * fix(orchestrator): address PR review feedback on async pool and bulk rescore * ci(docs): run biome --write before docs version commit
This commit is contained in:
parent
c0c465b2e1
commit
aefb6ca78b
7
.github/workflows/docs-version.yml
vendored
7
.github/workflows/docs-version.yml
vendored
@ -46,6 +46,13 @@ jobs:
|
|||||||
run: npm run docs:version -- "${{ steps.vars.outputs.version }}"
|
run: npm run docs:version -- "${{ steps.vars.outputs.version }}"
|
||||||
working-directory: .
|
working-directory: .
|
||||||
|
|
||||||
|
- name: Format generated docs files
|
||||||
|
run: |
|
||||||
|
./orchestrator/node_modules/.bin/biome check --write \
|
||||||
|
docs-site/versions.json \
|
||||||
|
docs-site/versioned_docs \
|
||||||
|
docs-site/versioned_sidebars
|
||||||
|
|
||||||
- name: Commit and push generated docs version files
|
- name: Commit and push generated docs version files
|
||||||
run: |
|
run: |
|
||||||
git config user.name "github-actions[bot]"
|
git config user.name "github-actions[bot]"
|
||||||
|
|||||||
@ -61,17 +61,12 @@
|
|||||||
{
|
{
|
||||||
"type": "category",
|
"type": "category",
|
||||||
"label": "Troubleshooting",
|
"label": "Troubleshooting",
|
||||||
"items": [
|
"items": ["troubleshooting/common-problems"]
|
||||||
"troubleshooting/common-problems"
|
|
||||||
]
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"type": "category",
|
"type": "category",
|
||||||
"label": "Reference / FAQ",
|
"label": "Reference / FAQ",
|
||||||
"items": [
|
"items": ["reference/faq", "reference/documentation-style-guide"]
|
||||||
"reference/faq",
|
|
||||||
"reference/documentation-style-guide"
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1 @@
|
|||||||
[
|
["0.1.24", "0.1.23", "0.1.22", "0.1.21", "0.1.20"]
|
||||||
"0.1.24",
|
|
||||||
"0.1.23",
|
|
||||||
"0.1.22",
|
|
||||||
"0.1.21",
|
|
||||||
"0.1.20"
|
|
||||||
]
|
|
||||||
|
|||||||
@ -563,6 +563,7 @@ describe.sequential("Jobs API routes", () => {
|
|||||||
expect(
|
expect(
|
||||||
body.data.results.find((r: any) => r.jobId === "missing-id").error.code,
|
body.data.results.find((r: any) => r.jobId === "missing-id").error.code,
|
||||||
).toBe("NOT_FOUND");
|
).toBe("NOT_FOUND");
|
||||||
|
expect(vi.mocked(getProfile)).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("streams bulk action progress with done counters", async () => {
|
it("streams bulk action progress with done counters", async () => {
|
||||||
|
|||||||
@ -45,8 +45,10 @@ import { getProfile } from "../../services/profile";
|
|||||||
import { scoreJobSuitability } from "../../services/scorer";
|
import { scoreJobSuitability } from "../../services/scorer";
|
||||||
import { getTracerReadiness } from "../../services/tracer-links";
|
import { getTracerReadiness } from "../../services/tracer-links";
|
||||||
import * as visaSponsors from "../../services/visa-sponsors/index";
|
import * as visaSponsors from "../../services/visa-sponsors/index";
|
||||||
|
import { asyncPool } from "../../utils/async-pool";
|
||||||
|
|
||||||
export const jobsRouter = Router();
|
export const jobsRouter = Router();
|
||||||
|
const BULK_ACTION_CONCURRENCY = 4;
|
||||||
|
|
||||||
const tailoredSkillsPayloadSchema = z.array(
|
const tailoredSkillsPayloadSchema = z.array(
|
||||||
z.object({
|
z.object({
|
||||||
@ -275,9 +277,35 @@ function mapErrorForResult(error: unknown): {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BulkExecutionOptions = {
|
||||||
|
getProfileForRescore?: () => Promise<Record<string, unknown>>;
|
||||||
|
};
|
||||||
|
|
||||||
|
function createBulkProfileLoader(): () => Promise<Record<string, unknown>> {
|
||||||
|
let profilePromise: Promise<Record<string, unknown>> | null = null;
|
||||||
|
|
||||||
|
return async () => {
|
||||||
|
if (!profilePromise) {
|
||||||
|
profilePromise = (async () => {
|
||||||
|
const rawProfile = await getProfile();
|
||||||
|
if (
|
||||||
|
!rawProfile ||
|
||||||
|
typeof rawProfile !== "object" ||
|
||||||
|
Array.isArray(rawProfile)
|
||||||
|
) {
|
||||||
|
throw badRequest("Invalid resume profile format");
|
||||||
|
}
|
||||||
|
return rawProfile as Record<string, unknown>;
|
||||||
|
})();
|
||||||
|
}
|
||||||
|
return profilePromise;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
async function executeBulkActionForJob(
|
async function executeBulkActionForJob(
|
||||||
action: BulkJobAction,
|
action: BulkJobAction,
|
||||||
jobId: string,
|
jobId: string,
|
||||||
|
options?: BulkExecutionOptions,
|
||||||
): Promise<BulkJobActionResult> {
|
): Promise<BulkJobActionResult> {
|
||||||
try {
|
try {
|
||||||
const job = await jobsRepo.getJobById(jobId);
|
const job = await jobsRepo.getJobById(jobId);
|
||||||
@ -356,19 +384,21 @@ async function executeBulkActionForJob(
|
|||||||
return { jobId, ok: true, job: simulated };
|
return { jobId, ok: true, job: simulated };
|
||||||
}
|
}
|
||||||
|
|
||||||
const rawProfile = await getProfile();
|
const profile = options?.getProfileForRescore
|
||||||
if (
|
? await options.getProfileForRescore()
|
||||||
!rawProfile ||
|
: await (async () => {
|
||||||
typeof rawProfile !== "object" ||
|
const rawProfile = await getProfile();
|
||||||
Array.isArray(rawProfile)
|
if (
|
||||||
) {
|
!rawProfile ||
|
||||||
throw badRequest("Invalid resume profile format");
|
typeof rawProfile !== "object" ||
|
||||||
}
|
Array.isArray(rawProfile)
|
||||||
|
) {
|
||||||
|
throw badRequest("Invalid resume profile format");
|
||||||
|
}
|
||||||
|
return rawProfile as Record<string, unknown>;
|
||||||
|
})();
|
||||||
|
|
||||||
const { score, reason } = await scoreJobSuitability(
|
const { score, reason } = await scoreJobSuitability(job, profile);
|
||||||
job,
|
|
||||||
rawProfile as Record<string, unknown>,
|
|
||||||
);
|
|
||||||
|
|
||||||
const updated = await jobsRepo.updateJob(job.id, {
|
const updated = await jobsRepo.updateJob(job.id, {
|
||||||
suitabilityScore: score,
|
suitabilityScore: score,
|
||||||
@ -508,12 +538,17 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => {
|
|||||||
try {
|
try {
|
||||||
const parsed = bulkActionRequestSchema.parse(req.body);
|
const parsed = bulkActionRequestSchema.parse(req.body);
|
||||||
const dedupedJobIds = Array.from(new Set(parsed.jobIds));
|
const dedupedJobIds = Array.from(new Set(parsed.jobIds));
|
||||||
|
const executionOptions: BulkExecutionOptions =
|
||||||
|
parsed.action === "rescore" && !isDemoMode()
|
||||||
|
? { getProfileForRescore: createBulkProfileLoader() }
|
||||||
|
: {};
|
||||||
|
|
||||||
const results: BulkJobActionResult[] = [];
|
const results = await asyncPool({
|
||||||
for (const jobId of dedupedJobIds) {
|
items: dedupedJobIds,
|
||||||
const result = await executeBulkActionForJob(parsed.action, jobId);
|
concurrency: BULK_ACTION_CONCURRENCY,
|
||||||
results.push(result);
|
task: async (jobId) =>
|
||||||
}
|
executeBulkActionForJob(parsed.action, jobId, executionOptions),
|
||||||
|
});
|
||||||
|
|
||||||
const succeeded = results.filter((result) => result.ok).length;
|
const succeeded = results.filter((result) => result.ok).length;
|
||||||
const failed = results.length - succeeded;
|
const failed = results.length - succeeded;
|
||||||
@ -531,6 +566,7 @@ jobsRouter.post("/bulk-actions", async (req: Request, res: Response) => {
|
|||||||
requested: dedupedJobIds.length,
|
requested: dedupedJobIds.length,
|
||||||
succeeded,
|
succeeded,
|
||||||
failed,
|
failed,
|
||||||
|
concurrency: BULK_ACTION_CONCURRENCY,
|
||||||
});
|
});
|
||||||
|
|
||||||
ok(res, payload);
|
ok(res, payload);
|
||||||
@ -572,6 +608,10 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
|
|||||||
const dedupedJobIds = Array.from(new Set(parsed.data.jobIds));
|
const dedupedJobIds = Array.from(new Set(parsed.data.jobIds));
|
||||||
const requestId = String(res.getHeader("x-request-id") || "unknown");
|
const requestId = String(res.getHeader("x-request-id") || "unknown");
|
||||||
const action = parsed.data.action;
|
const action = parsed.data.action;
|
||||||
|
const executionOptions: BulkExecutionOptions =
|
||||||
|
action === "rescore" && !isDemoMode()
|
||||||
|
? { getProfileForRescore: createBulkProfileLoader() }
|
||||||
|
: {};
|
||||||
const requested = dedupedJobIds.length;
|
const requested = dedupedJobIds.length;
|
||||||
const results: BulkJobActionResult[] = [];
|
const results: BulkJobActionResult[] = [];
|
||||||
let succeeded = 0;
|
let succeeded = 0;
|
||||||
@ -622,47 +662,48 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const jobId of dedupedJobIds) {
|
await asyncPool({
|
||||||
if (!isResponseWritable()) {
|
items: dedupedJobIds,
|
||||||
logger.info("Client disconnected; stopping bulk job stream", {
|
concurrency: BULK_ACTION_CONCURRENCY,
|
||||||
route: "POST /api/jobs/bulk-actions/stream",
|
shouldStop: () => !isResponseWritable(),
|
||||||
action,
|
task: async (jobId) => {
|
||||||
requested,
|
if (!isResponseWritable()) return;
|
||||||
succeeded,
|
|
||||||
failed,
|
|
||||||
requestId,
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
const result = await executeBulkActionForJob(action, jobId);
|
const result = await executeBulkActionForJob(
|
||||||
results.push(result);
|
action,
|
||||||
if (result.ok) succeeded += 1;
|
jobId,
|
||||||
else failed += 1;
|
executionOptions,
|
||||||
|
);
|
||||||
|
results.push(result);
|
||||||
|
if (result.ok) succeeded += 1;
|
||||||
|
else failed += 1;
|
||||||
|
|
||||||
if (
|
if (
|
||||||
!sendEvent({
|
!sendEvent({
|
||||||
type: "progress",
|
type: "progress",
|
||||||
action,
|
action,
|
||||||
requested,
|
requested,
|
||||||
completed: results.length,
|
completed: results.length,
|
||||||
succeeded,
|
succeeded,
|
||||||
failed,
|
failed,
|
||||||
result,
|
result,
|
||||||
requestId,
|
requestId,
|
||||||
})
|
})
|
||||||
) {
|
) {
|
||||||
logger.info("Client disconnected while writing bulk stream progress", {
|
logger.info(
|
||||||
route: "POST /api/jobs/bulk-actions/stream",
|
"Client disconnected while writing bulk stream progress",
|
||||||
action,
|
{
|
||||||
requested,
|
route: "POST /api/jobs/bulk-actions/stream",
|
||||||
succeeded,
|
action,
|
||||||
failed,
|
requested,
|
||||||
requestId,
|
succeeded,
|
||||||
});
|
failed,
|
||||||
break;
|
requestId,
|
||||||
}
|
},
|
||||||
}
|
);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
sendEvent({
|
sendEvent({
|
||||||
type: "completed",
|
type: "completed",
|
||||||
@ -681,6 +722,7 @@ jobsRouter.post("/bulk-actions/stream", async (req: Request, res: Response) => {
|
|||||||
requested,
|
requested,
|
||||||
succeeded,
|
succeeded,
|
||||||
failed,
|
failed,
|
||||||
|
concurrency: BULK_ACTION_CONCURRENCY,
|
||||||
requestId,
|
requestId,
|
||||||
});
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|||||||
@ -183,4 +183,59 @@ describe("scoreJobsStep auto-skip behavior", () => {
|
|||||||
expect.objectContaining({ jobId: "job-applied" }),
|
expect.objectContaining({ jobId: "job-applied" }),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("scores multiple jobs and reports completion progress", async () => {
|
||||||
|
const jobsRepo = await import("../../repositories/jobs");
|
||||||
|
const scorer = await import("../../services/scorer");
|
||||||
|
const { progressHelpers } = await import("../progress");
|
||||||
|
|
||||||
|
vi.mocked(jobsRepo.getUnscoredDiscoveredJobs).mockResolvedValue([
|
||||||
|
createJob({
|
||||||
|
id: "job-1",
|
||||||
|
title: "First Role",
|
||||||
|
employer: "Acme",
|
||||||
|
suitabilityScore: null,
|
||||||
|
}),
|
||||||
|
createJob({
|
||||||
|
id: "job-2",
|
||||||
|
title: "Second Role",
|
||||||
|
employer: "Beta",
|
||||||
|
suitabilityScore: null,
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
|
||||||
|
vi.mocked(scorer.scoreJobSuitability)
|
||||||
|
.mockResolvedValueOnce({ score: 61, reason: "First score" })
|
||||||
|
.mockResolvedValueOnce({ score: 72, reason: "Second score" });
|
||||||
|
|
||||||
|
const result = await scoreJobsStep({ profile: {} });
|
||||||
|
|
||||||
|
expect(result.scoredJobs).toHaveLength(2);
|
||||||
|
expect(vi.mocked(jobsRepo.updateJob)).toHaveBeenCalledTimes(2);
|
||||||
|
expect(vi.mocked(progressHelpers.scoringJob)).toHaveBeenCalledTimes(2);
|
||||||
|
expect(vi.mocked(progressHelpers.scoringComplete)).toHaveBeenCalledWith(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stops before processing when cancellation is requested", async () => {
|
||||||
|
const jobsRepo = await import("../../repositories/jobs");
|
||||||
|
const scorer = await import("../../services/scorer");
|
||||||
|
|
||||||
|
vi.mocked(jobsRepo.getUnscoredDiscoveredJobs).mockResolvedValue([
|
||||||
|
createJob({
|
||||||
|
id: "job-1",
|
||||||
|
title: "Cancelled Role",
|
||||||
|
employer: "Acme",
|
||||||
|
suitabilityScore: null,
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await scoreJobsStep({
|
||||||
|
profile: {},
|
||||||
|
shouldCancel: () => true,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result.scoredJobs).toHaveLength(0);
|
||||||
|
expect(vi.mocked(scorer.scoreJobSuitability)).not.toHaveBeenCalled();
|
||||||
|
expect(vi.mocked(jobsRepo.updateJob)).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -4,9 +4,12 @@ import * as jobsRepo from "../../repositories/jobs";
|
|||||||
import * as settingsRepo from "../../repositories/settings";
|
import * as settingsRepo from "../../repositories/settings";
|
||||||
import { scoreJobSuitability } from "../../services/scorer";
|
import { scoreJobSuitability } from "../../services/scorer";
|
||||||
import * as visaSponsors from "../../services/visa-sponsors/index";
|
import * as visaSponsors from "../../services/visa-sponsors/index";
|
||||||
|
import { asyncPool } from "../../utils/async-pool";
|
||||||
import { progressHelpers, updateProgress } from "../progress";
|
import { progressHelpers, updateProgress } from "../progress";
|
||||||
import type { ScoredJob } from "./types";
|
import type { ScoredJob } from "./types";
|
||||||
|
|
||||||
|
const SCORING_CONCURRENCY = 4;
|
||||||
|
|
||||||
export async function scoreJobsStep(args: {
|
export async function scoreJobsStep(args: {
|
||||||
profile: Record<string, unknown>;
|
profile: Record<string, unknown>;
|
||||||
shouldCancel?: () => boolean;
|
shouldCancel?: () => boolean;
|
||||||
@ -32,78 +35,91 @@ export async function scoreJobsStep(args: {
|
|||||||
});
|
});
|
||||||
|
|
||||||
const scoredJobs: ScoredJob[] = [];
|
const scoredJobs: ScoredJob[] = [];
|
||||||
|
let completed = 0;
|
||||||
|
|
||||||
for (let i = 0; i < unprocessedJobs.length; i++) {
|
await asyncPool({
|
||||||
if (args.shouldCancel?.()) break;
|
items: unprocessedJobs,
|
||||||
|
concurrency: SCORING_CONCURRENCY,
|
||||||
|
shouldStop: args.shouldCancel,
|
||||||
|
task: async (job) => {
|
||||||
|
if (args.shouldCancel?.()) return;
|
||||||
|
|
||||||
const job = unprocessedJobs[i];
|
const hasCachedScore =
|
||||||
const hasCachedScore =
|
typeof job.suitabilityScore === "number" &&
|
||||||
typeof job.suitabilityScore === "number" &&
|
!Number.isNaN(job.suitabilityScore);
|
||||||
!Number.isNaN(job.suitabilityScore);
|
|
||||||
|
|
||||||
progressHelpers.scoringJob(
|
if (hasCachedScore) {
|
||||||
i + 1,
|
completed += 1;
|
||||||
unprocessedJobs.length,
|
progressHelpers.scoringJob(
|
||||||
hasCachedScore ? `${job.title} (cached)` : job.title,
|
completed,
|
||||||
);
|
unprocessedJobs.length,
|
||||||
|
`${job.title} (cached)`,
|
||||||
|
);
|
||||||
|
scoredJobs.push({
|
||||||
|
...job,
|
||||||
|
suitabilityScore: job.suitabilityScore as number,
|
||||||
|
suitabilityReason: job.suitabilityReason ?? "",
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (hasCachedScore) {
|
const { score, reason } = await scoreJobSuitability(job, args.profile);
|
||||||
|
if (args.shouldCancel?.()) return;
|
||||||
|
|
||||||
|
let sponsorMatchScore = 0;
|
||||||
|
let sponsorMatchNames: string | undefined;
|
||||||
|
|
||||||
|
if (job.employer) {
|
||||||
|
const sponsorResults = visaSponsors.searchSponsors(job.employer, {
|
||||||
|
limit: 10,
|
||||||
|
minScore: 50,
|
||||||
|
});
|
||||||
|
|
||||||
|
const summary =
|
||||||
|
visaSponsors.calculateSponsorMatchSummary(sponsorResults);
|
||||||
|
sponsorMatchScore = summary.sponsorMatchScore;
|
||||||
|
sponsorMatchNames = summary.sponsorMatchNames ?? undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if job should be auto-skipped based on score threshold
|
||||||
|
const shouldAutoSkip =
|
||||||
|
job.status !== "applied" &&
|
||||||
|
autoSkipThreshold !== null &&
|
||||||
|
!Number.isNaN(autoSkipThreshold) &&
|
||||||
|
score < autoSkipThreshold;
|
||||||
|
|
||||||
|
await jobsRepo.updateJob(job.id, {
|
||||||
|
suitabilityScore: score,
|
||||||
|
suitabilityReason: reason,
|
||||||
|
sponsorMatchScore,
|
||||||
|
sponsorMatchNames,
|
||||||
|
...(shouldAutoSkip ? { status: "skipped" } : {}),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (shouldAutoSkip) {
|
||||||
|
logger.info("Auto-skipped job due to low score", {
|
||||||
|
jobId: job.id,
|
||||||
|
title: job.title,
|
||||||
|
score,
|
||||||
|
threshold: autoSkipThreshold,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
completed += 1;
|
||||||
|
progressHelpers.scoringJob(completed, unprocessedJobs.length, job.title);
|
||||||
scoredJobs.push({
|
scoredJobs.push({
|
||||||
...job,
|
...job,
|
||||||
suitabilityScore: job.suitabilityScore as number,
|
suitabilityScore: score,
|
||||||
suitabilityReason: job.suitabilityReason ?? "",
|
suitabilityReason: reason,
|
||||||
});
|
});
|
||||||
continue;
|
},
|
||||||
}
|
});
|
||||||
|
|
||||||
const { score, reason } = await scoreJobSuitability(job, args.profile);
|
|
||||||
scoredJobs.push({
|
|
||||||
...job,
|
|
||||||
suitabilityScore: score,
|
|
||||||
suitabilityReason: reason,
|
|
||||||
});
|
|
||||||
|
|
||||||
let sponsorMatchScore = 0;
|
|
||||||
let sponsorMatchNames: string | undefined;
|
|
||||||
|
|
||||||
if (job.employer) {
|
|
||||||
const sponsorResults = visaSponsors.searchSponsors(job.employer, {
|
|
||||||
limit: 10,
|
|
||||||
minScore: 50,
|
|
||||||
});
|
|
||||||
|
|
||||||
const summary = visaSponsors.calculateSponsorMatchSummary(sponsorResults);
|
|
||||||
sponsorMatchScore = summary.sponsorMatchScore;
|
|
||||||
sponsorMatchNames = summary.sponsorMatchNames ?? undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if job should be auto-skipped based on score threshold
|
|
||||||
const shouldAutoSkip =
|
|
||||||
job.status !== "applied" &&
|
|
||||||
autoSkipThreshold !== null &&
|
|
||||||
!Number.isNaN(autoSkipThreshold) &&
|
|
||||||
score < autoSkipThreshold;
|
|
||||||
|
|
||||||
await jobsRepo.updateJob(job.id, {
|
|
||||||
suitabilityScore: score,
|
|
||||||
suitabilityReason: reason,
|
|
||||||
sponsorMatchScore,
|
|
||||||
sponsorMatchNames,
|
|
||||||
...(shouldAutoSkip ? { status: "skipped" } : {}),
|
|
||||||
});
|
|
||||||
|
|
||||||
if (shouldAutoSkip) {
|
|
||||||
logger.info("Auto-skipped job due to low score", {
|
|
||||||
jobId: job.id,
|
|
||||||
title: job.title,
|
|
||||||
score,
|
|
||||||
threshold: autoSkipThreshold,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
progressHelpers.scoringComplete(scoredJobs.length);
|
progressHelpers.scoringComplete(scoredJobs.length);
|
||||||
logger.info("Scoring step completed", { scoredJobs: scoredJobs.length });
|
logger.info("Scoring step completed", {
|
||||||
|
scoredJobs: scoredJobs.length,
|
||||||
|
concurrency: SCORING_CONCURRENCY,
|
||||||
|
});
|
||||||
|
|
||||||
return { unprocessedJobs, scoredJobs };
|
return { unprocessedJobs, scoredJobs };
|
||||||
}
|
}
|
||||||
|
|||||||
78
orchestrator/src/server/utils/async-pool.test.ts
Normal file
78
orchestrator/src/server/utils/async-pool.test.ts
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import { asyncPool } from "./async-pool";
|
||||||
|
|
||||||
|
describe("asyncPool", () => {
|
||||||
|
it("preserves input order in output", async () => {
|
||||||
|
const items = [1, 2, 3, 4];
|
||||||
|
const result = await asyncPool({
|
||||||
|
items,
|
||||||
|
concurrency: 3,
|
||||||
|
task: async (item) => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, (5 - item) * 5));
|
||||||
|
return item * 10;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result).toEqual([10, 20, 30, 40]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("clamps non-finite and out-of-range concurrency values", async () => {
|
||||||
|
let inFlight = 0;
|
||||||
|
let maxInFlight = 0;
|
||||||
|
const items = Array.from({ length: 20 }, (_, index) => index);
|
||||||
|
|
||||||
|
await asyncPool({
|
||||||
|
items,
|
||||||
|
concurrency: Number.NaN,
|
||||||
|
task: async (item) => item,
|
||||||
|
});
|
||||||
|
|
||||||
|
await asyncPool({
|
||||||
|
items,
|
||||||
|
concurrency: 100,
|
||||||
|
task: async (item) => {
|
||||||
|
inFlight += 1;
|
||||||
|
maxInFlight = Math.max(maxInFlight, inFlight);
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 2));
|
||||||
|
inFlight -= 1;
|
||||||
|
return item;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(maxInFlight).toBeLessThanOrEqual(10);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("propagates task errors", async () => {
|
||||||
|
await expect(
|
||||||
|
asyncPool({
|
||||||
|
items: [1, 2, 3],
|
||||||
|
concurrency: 2,
|
||||||
|
task: async (item) => {
|
||||||
|
if (item === 2) throw new Error("boom");
|
||||||
|
return item;
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
).rejects.toThrow("boom");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns only completed results when stopped early", async () => {
|
||||||
|
let shouldStop = false;
|
||||||
|
let completed = 0;
|
||||||
|
|
||||||
|
const result = await asyncPool({
|
||||||
|
items: [1, 2, 3, 4, 5],
|
||||||
|
concurrency: 2,
|
||||||
|
shouldStop: () => shouldStop,
|
||||||
|
task: async (item) => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 3));
|
||||||
|
completed += 1;
|
||||||
|
if (completed >= 2) shouldStop = true;
|
||||||
|
return item;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result.length).toBeLessThan(5);
|
||||||
|
expect(result.length).toBeGreaterThanOrEqual(2);
|
||||||
|
expect(result.slice(0, 2)).toEqual([1, 2]);
|
||||||
|
});
|
||||||
|
});
|
||||||
38
orchestrator/src/server/utils/async-pool.ts
Normal file
38
orchestrator/src/server/utils/async-pool.ts
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
export async function asyncPool<TItem, TResult>(args: {
|
||||||
|
items: readonly TItem[];
|
||||||
|
concurrency: number;
|
||||||
|
shouldStop?: () => boolean;
|
||||||
|
task: (item: TItem, index: number) => Promise<TResult>;
|
||||||
|
}): Promise<TResult[]> {
|
||||||
|
const { items, task, shouldStop } = args;
|
||||||
|
const rawConcurrency = Number.isFinite(args.concurrency)
|
||||||
|
? args.concurrency
|
||||||
|
: 1;
|
||||||
|
const safeConcurrency = Math.max(1, Math.min(10, Math.floor(rawConcurrency)));
|
||||||
|
|
||||||
|
if (items.length === 0) return [];
|
||||||
|
|
||||||
|
const UNSET = Symbol("unset");
|
||||||
|
const results: Array<TResult | typeof UNSET> = Array.from(
|
||||||
|
{ length: items.length },
|
||||||
|
() => UNSET,
|
||||||
|
);
|
||||||
|
let nextIndex = 0;
|
||||||
|
|
||||||
|
const worker = async (): Promise<void> => {
|
||||||
|
while (true) {
|
||||||
|
if (shouldStop?.()) return;
|
||||||
|
|
||||||
|
const currentIndex = nextIndex;
|
||||||
|
nextIndex += 1;
|
||||||
|
if (currentIndex >= items.length) return;
|
||||||
|
|
||||||
|
results[currentIndex] = await task(items[currentIndex], currentIndex);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const workerCount = Math.min(safeConcurrency, items.length);
|
||||||
|
await Promise.all(Array.from({ length: workerCount }, () => worker()));
|
||||||
|
|
||||||
|
return results.filter((value): value is TResult => value !== UNSET);
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user