From 38ff39b7f366d966bec47a4fd5fec20635881328 Mon Sep 17 00:00:00 2001 From: DaKheera47 Date: Sun, 14 Dec 2025 16:01:02 +0000 Subject: [PATCH] live progress report in the UI. --- job-extractor/src/main.ts | 3 + job-extractor/src/progress.ts | 83 +++++++++++++++++ job-extractor/src/routes.ts | 10 +++ .../client/components/PipelineProgress.tsx | 71 +++++++++++++-- .../src/server/pipeline/orchestrator.ts | 32 ++++++- orchestrator/src/server/pipeline/progress.ts | 88 ++++++++++++++++++- orchestrator/src/server/services/crawler.ts | 45 +++++++++- 7 files changed, 318 insertions(+), 14 deletions(-) create mode 100644 job-extractor/src/progress.ts diff --git a/job-extractor/src/main.ts b/job-extractor/src/main.ts index 2ebf03b..850d765 100644 --- a/job-extractor/src/main.ts +++ b/job-extractor/src/main.ts @@ -4,6 +4,7 @@ import { PlaywrightCrawler } from "crawlee"; import { firefox } from "playwright"; import { router } from "./routes.js"; +import { initJobOpsProgress } from "./progress.js"; // locations const locations = [ @@ -35,6 +36,8 @@ const startUrls = gradcrackerUrls.map((url) => ({ userData: { label: "gradcracker-list-page" }, })); +initJobOpsProgress(startUrls.length); + const crawler = new PlaywrightCrawler({ // proxyConfiguration: new ProxyConfiguration({ proxyUrls: ['...'] }), requestHandler: router, diff --git a/job-extractor/src/progress.ts b/job-extractor/src/progress.ts new file mode 100644 index 0000000..06d05a4 --- /dev/null +++ b/job-extractor/src/progress.ts @@ -0,0 +1,83 @@ +type CrawlPhase = "list" | "job"; + +export interface JobOpsCrawlProgressPayload { + phase: CrawlPhase; + currentUrl?: string; + listPagesProcessed: number; + listPagesTotal?: number; + jobCardsFound: number; + jobPagesEnqueued: number; + jobPagesSkipped: number; + jobPagesProcessed: number; + ts: string; +} + +interface JobOpsCrawlProgressState { + listPagesProcessed: number; + listPagesTotal?: number; + jobCardsFound: number; + jobPagesEnqueued: number; + jobPagesSkipped: number; + jobPagesProcessed: number; + currentUrl?: string; + phase: CrawlPhase; +} + +const PROGRESS_PREFIX = "JOBOPS_PROGRESS "; +const isEnabled = () => process.env.JOBOPS_EMIT_PROGRESS === "1"; + +let state: JobOpsCrawlProgressState = { + listPagesProcessed: 0, + jobCardsFound: 0, + jobPagesEnqueued: 0, + jobPagesSkipped: 0, + jobPagesProcessed: 0, + phase: "list", +}; + +function emit(): void { + if (!isEnabled()) return; + + const payload: JobOpsCrawlProgressPayload = { + phase: state.phase, + currentUrl: state.currentUrl, + listPagesProcessed: state.listPagesProcessed, + listPagesTotal: state.listPagesTotal, + jobCardsFound: state.jobCardsFound, + jobPagesEnqueued: state.jobPagesEnqueued, + jobPagesSkipped: state.jobPagesSkipped, + jobPagesProcessed: state.jobPagesProcessed, + ts: new Date().toISOString(), + }; + + process.stdout.write(`${PROGRESS_PREFIX}${JSON.stringify(payload)}\n`); +} + +export function initJobOpsProgress(listPagesTotal: number): void { + state.listPagesTotal = listPagesTotal; + state.phase = "list"; + emit(); +} + +export function markListPageDone(params: { + currentUrl: string; + jobCardsFound: number; + jobPagesEnqueued: number; + jobPagesSkipped: number; +}): void { + state.listPagesProcessed += 1; + state.phase = "list"; + state.currentUrl = params.currentUrl; + state.jobCardsFound += params.jobCardsFound; + state.jobPagesEnqueued += params.jobPagesEnqueued; + state.jobPagesSkipped += params.jobPagesSkipped; + emit(); +} + +export function markJobPageDone(params: { currentUrl: string }): void { + state.jobPagesProcessed += 1; + state.phase = "job"; + state.currentUrl = params.currentUrl; + emit(); +} + diff --git a/job-extractor/src/routes.ts b/job-extractor/src/routes.ts index 27a9729..be9ba86 100644 --- a/job-extractor/src/routes.ts +++ b/job-extractor/src/routes.ts @@ -1,5 +1,6 @@ import { createPlaywrightRouter, log } from "crawlee"; import { readFileSync } from "node:fs"; +import { markJobPageDone, markListPageDone } from "./progress.js"; function normalizeUrl(raw: string | null | undefined): string | null { if (!raw) return null; @@ -189,6 +190,13 @@ router.addHandler( `Skipping ${skippedKnownJobs} already-known job pages; enqueued ${enqueuedJobs} new job pages.` ); } + + markListPageDone({ + currentUrl: request.url, + jobCardsFound: jobs.length, + jobPagesEnqueued: enqueuedJobs, + jobPagesSkipped: skippedKnownJobs, + }); } ); @@ -303,5 +311,7 @@ router.addHandler( applicationLink, // External or same-page URL after click jobDescription, }); + + markJobPageDone({ currentUrl: request.url }); } ); diff --git a/orchestrator/src/client/components/PipelineProgress.tsx b/orchestrator/src/client/components/PipelineProgress.tsx index 73e203d..2097f3d 100644 --- a/orchestrator/src/client/components/PipelineProgress.tsx +++ b/orchestrator/src/client/components/PipelineProgress.tsx @@ -8,6 +8,14 @@ interface PipelineProgress { step: 'idle' | 'crawling' | 'importing' | 'scoring' | 'processing' | 'completed' | 'failed'; message: string; detail?: string; + crawlingListPagesProcessed: number; + crawlingListPagesTotal: number; + crawlingJobCardsFound: number; + crawlingJobPagesEnqueued: number; + crawlingJobPagesSkipped: number; + crawlingJobPagesProcessed: number; + crawlingPhase?: 'list' | 'job'; + crawlingCurrentUrl?: string; jobsDiscovered: number; jobsScored: number; jobsProcessed: number; @@ -94,7 +102,13 @@ export const PipelineProgress: React.FC = ({ isRunning }) if (progress) { switch (step) { case 'crawling': - percentage = 10; + if (progress.crawlingListPagesTotal > 0) { + percentage = (progress.crawlingListPagesProcessed / progress.crawlingListPagesTotal) * 15; + } else if (progress.crawlingListPagesProcessed > 0) { + percentage = 8; + } else { + percentage = 5; + } break; case 'importing': percentage = 20; @@ -190,7 +204,7 @@ export const PipelineProgress: React.FC = ({ isRunning }) )} {/* Stats */} - {progress && (step === 'scoring' || step === 'processing' || step === 'completed') && ( + {progress && (step === 'crawling' || step === 'scoring' || step === 'processing' || step === 'completed') && (
= ({ isRunning }) borderTop: '1px solid var(--glass-border)', fontSize: 'var(--font-sm)', }}> -
- Discovered: - - {progress.jobsDiscovered} - -
+ {step === 'crawling' && ( + <> +
+ Sources: + + {progress.crawlingListPagesProcessed} + {progress.crawlingListPagesTotal > 0 ? `/${progress.crawlingListPagesTotal}` : ''} + +
+
+ Pages: + + {progress.crawlingJobPagesProcessed}/{Math.max(progress.crawlingJobPagesEnqueued, 0)} + +
+
+ Enqueued: + + {progress.crawlingJobPagesEnqueued} + +
+ {progress.crawlingJobPagesSkipped > 0 && ( +
+ Skipped: + + {progress.crawlingJobPagesSkipped} + +
+ )} + {progress.crawlingJobCardsFound > 0 && ( +
+ Cards: + + {progress.crawlingJobCardsFound} + +
+ )} + + )} + {step !== 'crawling' && ( +
+ Discovered: + + {progress.jobsDiscovered} + +
+ )} {progress.jobsScored > 0 && (
Scored: diff --git a/orchestrator/src/server/pipeline/orchestrator.ts b/orchestrator/src/server/pipeline/orchestrator.ts index 99a78a8..18d9980 100644 --- a/orchestrator/src/server/pipeline/orchestrator.ts +++ b/orchestrator/src/server/pipeline/orchestrator.ts @@ -73,7 +73,21 @@ export async function runPipeline(config: Partial = {}): Promise console.log('\n🕷️ Running crawler...'); progressHelpers.startCrawling(); const existingJobUrls = await jobsRepo.getAllJobUrls(); - const crawlerResult = await runCrawler({ existingJobUrls }); + const crawlerResult = await runCrawler({ + existingJobUrls, + onProgress: (update) => { + progressHelpers.crawlingUpdate({ + listPagesProcessed: update.listPagesProcessed, + listPagesTotal: update.listPagesTotal, + jobCardsFound: update.jobCardsFound, + jobPagesEnqueued: update.jobPagesEnqueued, + jobPagesSkipped: update.jobPagesSkipped, + jobPagesProcessed: update.jobPagesProcessed, + phase: update.phase, + currentUrl: update.currentUrl, + }); + }, + }); if (!crawlerResult.success) { throw new Error(`Crawler failed: ${crawlerResult.error}`); @@ -100,15 +114,25 @@ export async function runPipeline(config: Partial = {}): Promise const scoredJobs: Array = []; for (let i = 0; i < unprocessedJobs.length; i++) { const job = unprocessedJobs[i]; - progressHelpers.scoringJob(i + 1, unprocessedJobs.length, job.title); - + const hasCachedScore = typeof job.suitabilityScore === 'number' && !Number.isNaN(job.suitabilityScore); + progressHelpers.scoringJob(i + 1, unprocessedJobs.length, hasCachedScore ? `${job.title} (cached)` : job.title); + + if (hasCachedScore) { + scoredJobs.push({ + ...job, + suitabilityScore: job.suitabilityScore as number, + suitabilityReason: job.suitabilityReason ?? '', + }); + continue; + } + const { score, reason } = await scoreJobSuitability(job, profile); scoredJobs.push({ ...job, suitabilityScore: score, suitabilityReason: reason, }); - + // Update score in database await jobsRepo.updateJob(job.id, { suitabilityScore: score, diff --git a/orchestrator/src/server/pipeline/progress.ts b/orchestrator/src/server/pipeline/progress.ts index f0ad7a9..0815e5d 100644 --- a/orchestrator/src/server/pipeline/progress.ts +++ b/orchestrator/src/server/pipeline/progress.ts @@ -15,6 +15,14 @@ export interface PipelineProgress { step: PipelineStep; message: string; detail?: string; + crawlingListPagesProcessed: number; + crawlingListPagesTotal: number; + crawlingJobCardsFound: number; + crawlingJobPagesEnqueued: number; + crawlingJobPagesSkipped: number; + crawlingJobPagesProcessed: number; + crawlingPhase?: 'list' | 'job'; + crawlingCurrentUrl?: string; jobsDiscovered: number; jobsScored: number; jobsProcessed: number; @@ -36,6 +44,12 @@ const listeners: Set = new Set(); let currentProgress: PipelineProgress = { step: 'idle', message: 'Ready', + crawlingListPagesProcessed: 0, + crawlingListPagesTotal: 0, + crawlingJobCardsFound: 0, + crawlingJobPagesEnqueued: 0, + crawlingJobPagesSkipped: 0, + crawlingJobPagesProcessed: 0, jobsDiscovered: 0, jobsScored: 0, jobsProcessed: 0, @@ -87,6 +101,14 @@ export function resetProgress(): void { currentProgress = { step: 'idle', message: 'Ready', + crawlingListPagesProcessed: 0, + crawlingListPagesTotal: 0, + crawlingJobCardsFound: 0, + crawlingJobPagesEnqueued: 0, + crawlingJobPagesSkipped: 0, + crawlingJobPagesProcessed: 0, + crawlingPhase: undefined, + crawlingCurrentUrl: undefined, jobsDiscovered: 0, jobsScored: 0, jobsProcessed: 0, @@ -101,19 +123,83 @@ export const progressHelpers = { startCrawling: () => updateProgress({ step: 'crawling', message: 'Fetching jobs from sources...', - detail: 'Running Crawlee crawler', + detail: 'Starting crawler', startedAt: new Date().toISOString(), + crawlingListPagesProcessed: 0, + crawlingListPagesTotal: 0, + crawlingJobCardsFound: 0, + crawlingJobPagesEnqueued: 0, + crawlingJobPagesSkipped: 0, + crawlingJobPagesProcessed: 0, + crawlingPhase: undefined, + crawlingCurrentUrl: undefined, jobsDiscovered: 0, jobsScored: 0, jobsProcessed: 0, totalToProcess: 0, }), + + crawlingUpdate: (update: { + listPagesProcessed?: number; + listPagesTotal?: number; + jobCardsFound?: number; + jobPagesEnqueued?: number; + jobPagesSkipped?: number; + jobPagesProcessed?: number; + phase?: 'list' | 'job'; + currentUrl?: string; + }) => { + const current = getProgress(); + const next = { + ...current, + crawlingListPagesProcessed: update.listPagesProcessed ?? current.crawlingListPagesProcessed, + crawlingListPagesTotal: update.listPagesTotal ?? current.crawlingListPagesTotal, + crawlingJobCardsFound: update.jobCardsFound ?? current.crawlingJobCardsFound, + crawlingJobPagesEnqueued: update.jobPagesEnqueued ?? current.crawlingJobPagesEnqueued, + crawlingJobPagesSkipped: update.jobPagesSkipped ?? current.crawlingJobPagesSkipped, + crawlingJobPagesProcessed: update.jobPagesProcessed ?? current.crawlingJobPagesProcessed, + crawlingPhase: update.phase ?? current.crawlingPhase, + crawlingCurrentUrl: update.currentUrl ?? current.crawlingCurrentUrl, + }; + + const sourcesPart = + next.crawlingListPagesTotal > 0 + ? `${next.crawlingListPagesProcessed}/${next.crawlingListPagesTotal}` + : `${next.crawlingListPagesProcessed}`; + + const pagesPart = `${next.crawlingJobPagesProcessed}/${next.crawlingJobPagesEnqueued}`; + const skippedPart = next.crawlingJobPagesSkipped > 0 ? `, skipped ${next.crawlingJobPagesSkipped}` : ''; + const cardsPart = next.crawlingJobCardsFound > 0 ? `, cards ${next.crawlingJobCardsFound}` : ''; + + const message = `Crawling jobs (${sourcesPart} sources, pages ${pagesPart}${skippedPart}${cardsPart})...`; + const detail = + next.crawlingCurrentUrl && next.crawlingPhase + ? `${next.crawlingPhase === 'list' ? 'List' : 'Job'}: ${next.crawlingCurrentUrl}` + : next.crawlingCurrentUrl + ? next.crawlingCurrentUrl + : 'Running crawler'; + + updateProgress({ + step: 'crawling', + message, + detail, + crawlingListPagesProcessed: next.crawlingListPagesProcessed, + crawlingListPagesTotal: next.crawlingListPagesTotal, + crawlingJobCardsFound: next.crawlingJobCardsFound, + crawlingJobPagesEnqueued: next.crawlingJobPagesEnqueued, + crawlingJobPagesSkipped: next.crawlingJobPagesSkipped, + crawlingJobPagesProcessed: next.crawlingJobPagesProcessed, + crawlingPhase: next.crawlingPhase, + crawlingCurrentUrl: next.crawlingCurrentUrl, + }); + }, crawlingComplete: (jobsFound: number) => updateProgress({ step: 'importing', message: `Found ${jobsFound} jobs, importing to database...`, detail: 'Deduplicating and saving', jobsDiscovered: jobsFound, + crawlingCurrentUrl: undefined, }), importComplete: (created: number, skipped: number) => updateProgress({ diff --git a/orchestrator/src/server/services/crawler.ts b/orchestrator/src/server/services/crawler.ts index acb8915..c654dae 100644 --- a/orchestrator/src/server/services/crawler.ts +++ b/orchestrator/src/server/services/crawler.ts @@ -7,6 +7,7 @@ import { spawn } from 'child_process'; import { join, dirname } from 'path'; import { fileURLToPath } from 'url'; import { mkdir, readdir, readFile, writeFile } from 'fs/promises'; +import { createInterface } from 'readline'; import type { CreateJobInput } from '../../shared/types.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); @@ -26,8 +27,27 @@ export interface RunCrawlerOptions { * Used by the crawler to avoid expensive/undesired interactions (e.g. apply button click). */ existingJobUrls?: string[]; + + /** + * Optional callback for live crawl progress emitted by job-extractor. + */ + onProgress?: (update: JobExtractorProgress) => void; } +interface JobExtractorProgress { + phase?: 'list' | 'job'; + currentUrl?: string; + listPagesProcessed?: number; + listPagesTotal?: number; + jobCardsFound?: number; + jobPagesEnqueued?: number; + jobPagesSkipped?: number; + jobPagesProcessed?: number; + ts?: string; +} + +const JOBOPS_PROGRESS_PREFIX = 'JOBOPS_PROGRESS '; + async function writeExistingJobUrlsFile(existingJobUrls: string[] | undefined): Promise { if (!existingJobUrls || existingJobUrls.length === 0) return null; await mkdir(JOBOPS_STORAGE_DIR, { recursive: true }); @@ -53,15 +73,38 @@ export async function runCrawler(options: RunCrawlerOptions = {}): Promise { + if (line.startsWith(JOBOPS_PROGRESS_PREFIX)) { + const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim(); + try { + const parsed = JSON.parse(raw) as JobExtractorProgress; + options.onProgress?.(parsed); + } catch { + // Ignore malformed progress lines + } + return; + } + stream.write(`${line}\n`); + }; + + const stdoutRl = child.stdout ? createInterface({ input: child.stdout }) : null; + const stderrRl = child.stderr ? createInterface({ input: child.stderr }) : null; + + stdoutRl?.on('line', (line) => handleLine(line, process.stdout)); + stderrRl?.on('line', (line) => handleLine(line, process.stderr)); child.on('close', (code) => { + stdoutRl?.close(); + stderrRl?.close(); if (code === 0) { resolve(); } else {