live progress report in the UI.

This commit is contained in:
DaKheera47 2025-12-14 16:01:02 +00:00
parent 59c943b4b6
commit 38ff39b7f3
7 changed files with 318 additions and 14 deletions

View File

@ -4,6 +4,7 @@ import { PlaywrightCrawler } from "crawlee";
import { firefox } from "playwright";
import { router } from "./routes.js";
import { initJobOpsProgress } from "./progress.js";
// locations
const locations = [
@ -35,6 +36,8 @@ const startUrls = gradcrackerUrls.map((url) => ({
userData: { label: "gradcracker-list-page" },
}));
initJobOpsProgress(startUrls.length);
const crawler = new PlaywrightCrawler({
// proxyConfiguration: new ProxyConfiguration({ proxyUrls: ['...'] }),
requestHandler: router,

View File

@ -0,0 +1,83 @@
type CrawlPhase = "list" | "job";
export interface JobOpsCrawlProgressPayload {
phase: CrawlPhase;
currentUrl?: string;
listPagesProcessed: number;
listPagesTotal?: number;
jobCardsFound: number;
jobPagesEnqueued: number;
jobPagesSkipped: number;
jobPagesProcessed: number;
ts: string;
}
interface JobOpsCrawlProgressState {
listPagesProcessed: number;
listPagesTotal?: number;
jobCardsFound: number;
jobPagesEnqueued: number;
jobPagesSkipped: number;
jobPagesProcessed: number;
currentUrl?: string;
phase: CrawlPhase;
}
const PROGRESS_PREFIX = "JOBOPS_PROGRESS ";
const isEnabled = () => process.env.JOBOPS_EMIT_PROGRESS === "1";
let state: JobOpsCrawlProgressState = {
listPagesProcessed: 0,
jobCardsFound: 0,
jobPagesEnqueued: 0,
jobPagesSkipped: 0,
jobPagesProcessed: 0,
phase: "list",
};
function emit(): void {
if (!isEnabled()) return;
const payload: JobOpsCrawlProgressPayload = {
phase: state.phase,
currentUrl: state.currentUrl,
listPagesProcessed: state.listPagesProcessed,
listPagesTotal: state.listPagesTotal,
jobCardsFound: state.jobCardsFound,
jobPagesEnqueued: state.jobPagesEnqueued,
jobPagesSkipped: state.jobPagesSkipped,
jobPagesProcessed: state.jobPagesProcessed,
ts: new Date().toISOString(),
};
process.stdout.write(`${PROGRESS_PREFIX}${JSON.stringify(payload)}\n`);
}
export function initJobOpsProgress(listPagesTotal: number): void {
state.listPagesTotal = listPagesTotal;
state.phase = "list";
emit();
}
export function markListPageDone(params: {
currentUrl: string;
jobCardsFound: number;
jobPagesEnqueued: number;
jobPagesSkipped: number;
}): void {
state.listPagesProcessed += 1;
state.phase = "list";
state.currentUrl = params.currentUrl;
state.jobCardsFound += params.jobCardsFound;
state.jobPagesEnqueued += params.jobPagesEnqueued;
state.jobPagesSkipped += params.jobPagesSkipped;
emit();
}
export function markJobPageDone(params: { currentUrl: string }): void {
state.jobPagesProcessed += 1;
state.phase = "job";
state.currentUrl = params.currentUrl;
emit();
}

View File

@ -1,5 +1,6 @@
import { createPlaywrightRouter, log } from "crawlee";
import { readFileSync } from "node:fs";
import { markJobPageDone, markListPageDone } from "./progress.js";
function normalizeUrl(raw: string | null | undefined): string | null {
if (!raw) return null;
@ -189,6 +190,13 @@ router.addHandler(
`Skipping ${skippedKnownJobs} already-known job pages; enqueued ${enqueuedJobs} new job pages.`
);
}
markListPageDone({
currentUrl: request.url,
jobCardsFound: jobs.length,
jobPagesEnqueued: enqueuedJobs,
jobPagesSkipped: skippedKnownJobs,
});
}
);
@ -303,5 +311,7 @@ router.addHandler(
applicationLink, // External or same-page URL after click
jobDescription,
});
markJobPageDone({ currentUrl: request.url });
}
);

View File

@ -8,6 +8,14 @@ interface PipelineProgress {
step: 'idle' | 'crawling' | 'importing' | 'scoring' | 'processing' | 'completed' | 'failed';
message: string;
detail?: string;
crawlingListPagesProcessed: number;
crawlingListPagesTotal: number;
crawlingJobCardsFound: number;
crawlingJobPagesEnqueued: number;
crawlingJobPagesSkipped: number;
crawlingJobPagesProcessed: number;
crawlingPhase?: 'list' | 'job';
crawlingCurrentUrl?: string;
jobsDiscovered: number;
jobsScored: number;
jobsProcessed: number;
@ -94,7 +102,13 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({ isRunning })
if (progress) {
switch (step) {
case 'crawling':
percentage = 10;
if (progress.crawlingListPagesTotal > 0) {
percentage = (progress.crawlingListPagesProcessed / progress.crawlingListPagesTotal) * 15;
} else if (progress.crawlingListPagesProcessed > 0) {
percentage = 8;
} else {
percentage = 5;
}
break;
case 'importing':
percentage = 20;
@ -190,7 +204,7 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({ isRunning })
)}
{/* Stats */}
{progress && (step === 'scoring' || step === 'processing' || step === 'completed') && (
{progress && (step === 'crawling' || step === 'scoring' || step === 'processing' || step === 'completed') && (
<div style={{
display: 'flex',
gap: 'var(--space-6)',
@ -198,12 +212,53 @@ export const PipelineProgress: React.FC<PipelineProgressProps> = ({ isRunning })
borderTop: '1px solid var(--glass-border)',
fontSize: 'var(--font-sm)',
}}>
<div>
<span style={{ color: 'var(--color-muted)' }}>Discovered: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.jobsDiscovered}
</span>
</div>
{step === 'crawling' && (
<>
<div>
<span style={{ color: 'var(--color-muted)' }}>Sources: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.crawlingListPagesProcessed}
{progress.crawlingListPagesTotal > 0 ? `/${progress.crawlingListPagesTotal}` : ''}
</span>
</div>
<div>
<span style={{ color: 'var(--color-muted)' }}>Pages: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.crawlingJobPagesProcessed}/{Math.max(progress.crawlingJobPagesEnqueued, 0)}
</span>
</div>
<div>
<span style={{ color: 'var(--color-muted)' }}>Enqueued: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.crawlingJobPagesEnqueued}
</span>
</div>
{progress.crawlingJobPagesSkipped > 0 && (
<div>
<span style={{ color: 'var(--color-muted)' }}>Skipped: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.crawlingJobPagesSkipped}
</span>
</div>
)}
{progress.crawlingJobCardsFound > 0 && (
<div>
<span style={{ color: 'var(--color-muted)' }}>Cards: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.crawlingJobCardsFound}
</span>
</div>
)}
</>
)}
{step !== 'crawling' && (
<div>
<span style={{ color: 'var(--color-muted)' }}>Discovered: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.jobsDiscovered}
</span>
</div>
)}
{progress.jobsScored > 0 && (
<div>
<span style={{ color: 'var(--color-muted)' }}>Scored: </span>

View File

@ -73,7 +73,21 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
console.log('\n🕷 Running crawler...');
progressHelpers.startCrawling();
const existingJobUrls = await jobsRepo.getAllJobUrls();
const crawlerResult = await runCrawler({ existingJobUrls });
const crawlerResult = await runCrawler({
existingJobUrls,
onProgress: (update) => {
progressHelpers.crawlingUpdate({
listPagesProcessed: update.listPagesProcessed,
listPagesTotal: update.listPagesTotal,
jobCardsFound: update.jobCardsFound,
jobPagesEnqueued: update.jobPagesEnqueued,
jobPagesSkipped: update.jobPagesSkipped,
jobPagesProcessed: update.jobPagesProcessed,
phase: update.phase,
currentUrl: update.currentUrl,
});
},
});
if (!crawlerResult.success) {
throw new Error(`Crawler failed: ${crawlerResult.error}`);
@ -100,15 +114,25 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
const scoredJobs: Array<Job & { suitabilityScore: number; suitabilityReason: string }> = [];
for (let i = 0; i < unprocessedJobs.length; i++) {
const job = unprocessedJobs[i];
progressHelpers.scoringJob(i + 1, unprocessedJobs.length, job.title);
const hasCachedScore = typeof job.suitabilityScore === 'number' && !Number.isNaN(job.suitabilityScore);
progressHelpers.scoringJob(i + 1, unprocessedJobs.length, hasCachedScore ? `${job.title} (cached)` : job.title);
if (hasCachedScore) {
scoredJobs.push({
...job,
suitabilityScore: job.suitabilityScore as number,
suitabilityReason: job.suitabilityReason ?? '',
});
continue;
}
const { score, reason } = await scoreJobSuitability(job, profile);
scoredJobs.push({
...job,
suitabilityScore: score,
suitabilityReason: reason,
});
// Update score in database
await jobsRepo.updateJob(job.id, {
suitabilityScore: score,

View File

@ -15,6 +15,14 @@ export interface PipelineProgress {
step: PipelineStep;
message: string;
detail?: string;
crawlingListPagesProcessed: number;
crawlingListPagesTotal: number;
crawlingJobCardsFound: number;
crawlingJobPagesEnqueued: number;
crawlingJobPagesSkipped: number;
crawlingJobPagesProcessed: number;
crawlingPhase?: 'list' | 'job';
crawlingCurrentUrl?: string;
jobsDiscovered: number;
jobsScored: number;
jobsProcessed: number;
@ -36,6 +44,12 @@ const listeners: Set<ProgressListener> = new Set();
let currentProgress: PipelineProgress = {
step: 'idle',
message: 'Ready',
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
crawlingJobPagesEnqueued: 0,
crawlingJobPagesSkipped: 0,
crawlingJobPagesProcessed: 0,
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
@ -87,6 +101,14 @@ export function resetProgress(): void {
currentProgress = {
step: 'idle',
message: 'Ready',
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
crawlingJobPagesEnqueued: 0,
crawlingJobPagesSkipped: 0,
crawlingJobPagesProcessed: 0,
crawlingPhase: undefined,
crawlingCurrentUrl: undefined,
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
@ -101,19 +123,83 @@ export const progressHelpers = {
startCrawling: () => updateProgress({
step: 'crawling',
message: 'Fetching jobs from sources...',
detail: 'Running Crawlee crawler',
detail: 'Starting crawler',
startedAt: new Date().toISOString(),
crawlingListPagesProcessed: 0,
crawlingListPagesTotal: 0,
crawlingJobCardsFound: 0,
crawlingJobPagesEnqueued: 0,
crawlingJobPagesSkipped: 0,
crawlingJobPagesProcessed: 0,
crawlingPhase: undefined,
crawlingCurrentUrl: undefined,
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
}),
crawlingUpdate: (update: {
listPagesProcessed?: number;
listPagesTotal?: number;
jobCardsFound?: number;
jobPagesEnqueued?: number;
jobPagesSkipped?: number;
jobPagesProcessed?: number;
phase?: 'list' | 'job';
currentUrl?: string;
}) => {
const current = getProgress();
const next = {
...current,
crawlingListPagesProcessed: update.listPagesProcessed ?? current.crawlingListPagesProcessed,
crawlingListPagesTotal: update.listPagesTotal ?? current.crawlingListPagesTotal,
crawlingJobCardsFound: update.jobCardsFound ?? current.crawlingJobCardsFound,
crawlingJobPagesEnqueued: update.jobPagesEnqueued ?? current.crawlingJobPagesEnqueued,
crawlingJobPagesSkipped: update.jobPagesSkipped ?? current.crawlingJobPagesSkipped,
crawlingJobPagesProcessed: update.jobPagesProcessed ?? current.crawlingJobPagesProcessed,
crawlingPhase: update.phase ?? current.crawlingPhase,
crawlingCurrentUrl: update.currentUrl ?? current.crawlingCurrentUrl,
};
const sourcesPart =
next.crawlingListPagesTotal > 0
? `${next.crawlingListPagesProcessed}/${next.crawlingListPagesTotal}`
: `${next.crawlingListPagesProcessed}`;
const pagesPart = `${next.crawlingJobPagesProcessed}/${next.crawlingJobPagesEnqueued}`;
const skippedPart = next.crawlingJobPagesSkipped > 0 ? `, skipped ${next.crawlingJobPagesSkipped}` : '';
const cardsPart = next.crawlingJobCardsFound > 0 ? `, cards ${next.crawlingJobCardsFound}` : '';
const message = `Crawling jobs (${sourcesPart} sources, pages ${pagesPart}${skippedPart}${cardsPart})...`;
const detail =
next.crawlingCurrentUrl && next.crawlingPhase
? `${next.crawlingPhase === 'list' ? 'List' : 'Job'}: ${next.crawlingCurrentUrl}`
: next.crawlingCurrentUrl
? next.crawlingCurrentUrl
: 'Running crawler';
updateProgress({
step: 'crawling',
message,
detail,
crawlingListPagesProcessed: next.crawlingListPagesProcessed,
crawlingListPagesTotal: next.crawlingListPagesTotal,
crawlingJobCardsFound: next.crawlingJobCardsFound,
crawlingJobPagesEnqueued: next.crawlingJobPagesEnqueued,
crawlingJobPagesSkipped: next.crawlingJobPagesSkipped,
crawlingJobPagesProcessed: next.crawlingJobPagesProcessed,
crawlingPhase: next.crawlingPhase,
crawlingCurrentUrl: next.crawlingCurrentUrl,
});
},
crawlingComplete: (jobsFound: number) => updateProgress({
step: 'importing',
message: `Found ${jobsFound} jobs, importing to database...`,
detail: 'Deduplicating and saving',
jobsDiscovered: jobsFound,
crawlingCurrentUrl: undefined,
}),
importComplete: (created: number, skipped: number) => updateProgress({

View File

@ -7,6 +7,7 @@ import { spawn } from 'child_process';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import { mkdir, readdir, readFile, writeFile } from 'fs/promises';
import { createInterface } from 'readline';
import type { CreateJobInput } from '../../shared/types.js';
const __dirname = dirname(fileURLToPath(import.meta.url));
@ -26,8 +27,27 @@ export interface RunCrawlerOptions {
* Used by the crawler to avoid expensive/undesired interactions (e.g. apply button click).
*/
existingJobUrls?: string[];
/**
* Optional callback for live crawl progress emitted by job-extractor.
*/
onProgress?: (update: JobExtractorProgress) => void;
}
interface JobExtractorProgress {
phase?: 'list' | 'job';
currentUrl?: string;
listPagesProcessed?: number;
listPagesTotal?: number;
jobCardsFound?: number;
jobPagesEnqueued?: number;
jobPagesSkipped?: number;
jobPagesProcessed?: number;
ts?: string;
}
const JOBOPS_PROGRESS_PREFIX = 'JOBOPS_PROGRESS ';
async function writeExistingJobUrlsFile(existingJobUrls: string[] | undefined): Promise<string | null> {
if (!existingJobUrls || existingJobUrls.length === 0) return null;
await mkdir(JOBOPS_STORAGE_DIR, { recursive: true });
@ -53,15 +73,38 @@ export async function runCrawler(options: RunCrawlerOptions = {}): Promise<Crawl
const child = spawn('npm', ['run', 'start'], {
cwd: CRAWLER_DIR,
shell: true,
stdio: 'inherit',
stdio: ['ignore', 'pipe', 'pipe'],
env: {
...process.env,
JOBOPS_SKIP_APPLY_FOR_EXISTING: '1',
JOBOPS_EMIT_PROGRESS: '1',
...(existingJobUrlsFile ? { JOBOPS_EXISTING_JOB_URLS_FILE: existingJobUrlsFile } : {}),
},
});
const handleLine = (line: string, stream: NodeJS.WriteStream) => {
if (line.startsWith(JOBOPS_PROGRESS_PREFIX)) {
const raw = line.slice(JOBOPS_PROGRESS_PREFIX.length).trim();
try {
const parsed = JSON.parse(raw) as JobExtractorProgress;
options.onProgress?.(parsed);
} catch {
// Ignore malformed progress lines
}
return;
}
stream.write(`${line}\n`);
};
const stdoutRl = child.stdout ? createInterface({ input: child.stdout }) : null;
const stderrRl = child.stderr ? createInterface({ input: child.stderr }) : null;
stdoutRl?.on('line', (line) => handleLine(line, process.stdout));
stderrRl?.on('line', (line) => handleLine(line, process.stderr));
child.on('close', (code) => {
stdoutRl?.close();
stderrRl?.close();
if (code === 0) {
resolve();
} else {