live progress for scraping

This commit is contained in:
DaKheera47 2025-12-11 23:00:00 +00:00
parent a6a4cb9f89
commit f9bf790bb3
7 changed files with 502 additions and 13 deletions

View File

@ -4,7 +4,7 @@
import React, { useState, useEffect, useCallback } from 'react';
import type { Job, JobStatus } from '../shared/types';
import { Header, Stats, JobList, ToastContainer, Toast } from './components';
import { Header, Stats, JobList, ToastContainer, Toast, PipelineProgress } from './components';
import * as api from './api';
export const App: React.FC = () => {
@ -158,6 +158,8 @@ export const App: React.FC = () => {
/>
<main className="container" style={{ paddingBottom: 'var(--space-12)' }}>
<PipelineProgress isRunning={isPipelineRunning} />
<Stats stats={stats} />
<JobList

View File

@ -0,0 +1,241 @@
/**
* Live pipeline progress display component.
*/
import React, { useEffect, useState } from 'react';
interface PipelineProgress {
step: 'idle' | 'crawling' | 'importing' | 'scoring' | 'processing' | 'completed' | 'failed';
message: string;
detail?: string;
jobsDiscovered: number;
jobsScored: number;
jobsProcessed: number;
totalToProcess: number;
currentJob?: {
id: string;
title: string;
employer: string;
};
error?: string;
startedAt?: string;
completedAt?: string;
}
interface PipelineProgressProps {
isRunning: boolean;
}
const stepLabels: Record<PipelineProgress['step'], string> = {
idle: 'Ready',
crawling: 'Crawling Jobs',
importing: 'Importing',
scoring: 'Scoring Jobs',
processing: 'Generating Resumes',
completed: 'Complete',
failed: 'Failed',
};
const stepColors: Record<PipelineProgress['step'], string> = {
idle: 'var(--color-muted)',
crawling: 'var(--color-info)',
importing: 'var(--color-info)',
scoring: 'var(--color-warning)',
processing: 'var(--color-primary-500)',
completed: 'var(--color-success)',
failed: 'var(--color-error)',
};
export const PipelineProgress: React.FC<PipelineProgressProps> = ({ isRunning }) => {
const [progress, setProgress] = useState<PipelineProgress | null>(null);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
if (!isRunning) {
setProgress(null);
return;
}
// Connect to SSE endpoint
const eventSource = new EventSource('/api/pipeline/progress');
eventSource.onopen = () => {
setIsConnected(true);
};
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
setProgress(data);
} catch {
// Ignore parse errors
}
};
eventSource.onerror = () => {
setIsConnected(false);
};
return () => {
eventSource.close();
setIsConnected(false);
};
}, [isRunning]);
if (!isRunning && !progress) {
return null;
}
const step = progress?.step || 'idle';
const isActive = progress && step !== 'idle' && step !== 'completed' && step !== 'failed';
// Calculate overall progress percentage
let percentage = 0;
if (progress) {
switch (step) {
case 'crawling':
percentage = 10;
break;
case 'importing':
percentage = 20;
break;
case 'scoring':
if (progress.jobsScored > 0) {
percentage = 20 + (progress.jobsScored / Math.max(progress.jobsDiscovered, 1)) * 30;
} else {
percentage = 25;
}
break;
case 'processing':
if (progress.totalToProcess > 0) {
percentage = 50 + (progress.jobsProcessed / progress.totalToProcess) * 50;
} else {
percentage = 55;
}
break;
case 'completed':
percentage = 100;
break;
case 'failed':
percentage = 100;
break;
}
}
return (
<div className="pipeline-progress" style={{
background: 'var(--glass-background)',
backdropFilter: 'blur(12px)',
border: '1px solid var(--glass-border)',
borderRadius: 'var(--radius-lg)',
padding: 'var(--space-6)',
marginBottom: 'var(--space-6)',
}}>
{/* Header */}
<div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', marginBottom: 'var(--space-4)' }}>
<div style={{ display: 'flex', alignItems: 'center', gap: 'var(--space-3)' }}>
{isActive && (
<div className="spinner" style={{ width: '16px', height: '16px' }} />
)}
<span style={{
color: stepColors[step],
fontWeight: '600',
fontSize: 'var(--font-sm)',
textTransform: 'uppercase',
letterSpacing: '0.05em',
}}>
{stepLabels[step]}
</span>
</div>
<span style={{ color: 'var(--color-muted)', fontSize: 'var(--font-xs)' }}>
{Math.round(percentage)}%
</span>
</div>
{/* Progress bar */}
<div style={{
height: '6px',
background: 'var(--color-surface-elevated)',
borderRadius: '3px',
overflow: 'hidden',
marginBottom: 'var(--space-4)',
}}>
<div style={{
height: '100%',
width: `${percentage}%`,
background: step === 'failed'
? 'var(--color-error)'
: 'linear-gradient(90deg, var(--color-primary-500), var(--color-primary-400))',
borderRadius: '3px',
transition: 'width 0.3s ease',
}} />
</div>
{/* Message */}
{progress && (
<div style={{ marginBottom: 'var(--space-3)' }}>
<p style={{ color: 'var(--color-text)', margin: 0 }}>
{progress.message}
</p>
{progress.detail && (
<p style={{
color: 'var(--color-muted)',
fontSize: 'var(--font-sm)',
margin: 'var(--space-1) 0 0 0',
}}>
{progress.detail}
</p>
)}
</div>
)}
{/* Stats */}
{progress && (step === 'scoring' || step === 'processing' || step === 'completed') && (
<div style={{
display: 'flex',
gap: 'var(--space-6)',
paddingTop: 'var(--space-3)',
borderTop: '1px solid var(--glass-border)',
fontSize: 'var(--font-sm)',
}}>
<div>
<span style={{ color: 'var(--color-muted)' }}>Discovered: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.jobsDiscovered}
</span>
</div>
{progress.jobsScored > 0 && (
<div>
<span style={{ color: 'var(--color-muted)' }}>Scored: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.jobsScored}
</span>
</div>
)}
{progress.totalToProcess > 0 && (
<div>
<span style={{ color: 'var(--color-muted)' }}>Processed: </span>
<span style={{ color: 'var(--color-text)', fontWeight: '500' }}>
{progress.jobsProcessed}/{progress.totalToProcess}
</span>
</div>
)}
</div>
)}
{/* Error state */}
{step === 'failed' && progress?.error && (
<div style={{
marginTop: 'var(--space-3)',
padding: 'var(--space-3)',
background: 'rgba(var(--color-error-rgb), 0.1)',
borderRadius: 'var(--radius-md)',
color: 'var(--color-error)',
fontSize: 'var(--font-sm)',
}}>
{progress.error}
</div>
)}
</div>
);
};

View File

@ -5,4 +5,5 @@ export { ScoreIndicator } from './ScoreIndicator';
export { JobCard } from './JobCard';
export { JobList } from './JobList';
export { ToastContainer, type Toast } from './Toast';
export { PipelineProgress } from './PipelineProgress';
export * from './Icons';

View File

@ -6,7 +6,7 @@ import { Router, Request, Response } from 'express';
import { z } from 'zod';
import * as jobsRepo from '../repositories/jobs.js';
import * as pipelineRepo from '../repositories/pipeline.js';
import { runPipeline, processJob, getPipelineStatus } from '../pipeline/index.js';
import { runPipeline, processJob, getPipelineStatus, subscribeToProgress, getProgress } from '../pipeline/index.js';
import { createNotionEntry } from '../services/notion.js';
import { clearDatabase } from '../db/clear.js';
import type { JobStatus, ApiResponse, JobsListResponse, PipelineStatusResponse } from '../../shared/types.js';
@ -198,6 +198,36 @@ apiRouter.get('/pipeline/status', async (req: Request, res: Response) => {
}
});
/**
* GET /api/pipeline/progress - Server-Sent Events endpoint for live progress
*/
apiRouter.get('/pipeline/progress', (req: Request, res: Response) => {
// Set headers for SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Disable Nginx buffering
// Send initial progress
const sendProgress = (data: unknown) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// Subscribe to progress updates
const unsubscribe = subscribeToProgress(sendProgress);
// Send heartbeat every 30 seconds to keep connection alive
const heartbeat = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
// Cleanup on close
req.on('close', () => {
clearInterval(heartbeat);
unsubscribe();
});
});
/**
* GET /api/pipeline/runs - Get recent pipeline runs
*/

View File

@ -1 +1,2 @@
export * from './orchestrator.js';
export * from './progress.js';

View File

@ -14,11 +14,12 @@ import { readFile } from 'fs/promises';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import { runCrawler } from '../services/crawler.js';
import { scoreAndRankJobs } from '../services/scorer.js';
import { scoreAndRankJobs, scoreJobSuitability } from '../services/scorer.js';
import { generateSummary } from '../services/summary.js';
import { generatePdf } from '../services/pdf.js';
import * as jobsRepo from '../repositories/jobs.js';
import * as pipelineRepo from '../repositories/pipeline.js';
import { progressHelpers, resetProgress } from './progress.js';
import type { Job, PipelineConfig } from '../../shared/types.js';
const __dirname = dirname(fileURLToPath(import.meta.url));
@ -54,6 +55,7 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
}
isPipelineRunning = true;
resetProgress();
const mergedConfig = { ...DEFAULT_CONFIG, ...config };
// Create pipeline run record
@ -69,39 +71,60 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
// Step 2: Run crawler
console.log('\n🕷 Running crawler...');
progressHelpers.startCrawling();
const crawlerResult = await runCrawler();
if (!crawlerResult.success) {
throw new Error(`Crawler failed: ${crawlerResult.error}`);
}
progressHelpers.crawlingComplete(crawlerResult.jobs.length);
// Step 3: Import discovered jobs
console.log('\n💾 Importing jobs to database...');
const { created, skipped } = await jobsRepo.bulkCreateJobs(crawlerResult.jobs);
console.log(` Created: ${created}, Skipped (duplicates): ${skipped}`);
progressHelpers.importComplete(created, skipped);
await pipelineRepo.updatePipelineRun(pipelineRun.id, {
jobsDiscovered: created,
});
// Step 4: Get unprocessed jobs and score them
console.log('\n🎯 Scoring jobs for suitability...');
const unprocessedJobs = await jobsRepo.getJobsForProcessing(50); // Get more than topN for ranking
const rankedJobs = await scoreAndRankJobs(unprocessedJobs, profile);
const unprocessedJobs = await jobsRepo.getJobsForProcessing(50);
// Update scores in database
for (const job of rankedJobs) {
// Score jobs with progress updates
const scoredJobs: Array<Job & { suitabilityScore: number; suitabilityReason: string }> = [];
for (let i = 0; i < unprocessedJobs.length; i++) {
const job = unprocessedJobs[i];
progressHelpers.scoringJob(i + 1, unprocessedJobs.length, job.title);
const { score, reason } = await scoreJobSuitability(job, profile);
scoredJobs.push({
...job,
suitabilityScore: score,
suitabilityReason: reason,
});
// Update score in database
await jobsRepo.updateJob(job.id, {
suitabilityScore: job.suitabilityScore,
suitabilityReason: job.suitabilityReason,
suitabilityScore: score,
suitabilityReason: reason,
});
}
// Sort by score
scoredJobs.sort((a, b) => b.suitabilityScore - a.suitabilityScore);
// Step 5: Pick top N jobs above threshold
const topJobs = rankedJobs
const topJobs = scoredJobs
.filter(j => j.suitabilityScore >= mergedConfig.minSuitabilityScore)
.slice(0, mergedConfig.topN);
progressHelpers.scoringComplete(scoredJobs.length, topJobs.length);
console.log(`\n📊 Selected ${topJobs.length} top jobs for processing:`);
for (const job of topJobs) {
console.log(` - ${job.title} @ ${job.employer} (score: ${job.suitabilityScore})`);
@ -110,15 +133,24 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
// Step 6: Process each top job
let processed = 0;
for (const job of topJobs) {
for (let i = 0; i < topJobs.length; i++) {
const job = topJobs[i];
console.log(`\n📝 Processing: ${job.title} @ ${job.employer}`);
progressHelpers.processingJob(i + 1, topJobs.length, {
id: job.id,
title: job.title,
employer: job.employer,
});
try {
// Mark as processing
await jobsRepo.updateJob(job.id, { status: 'processing' });
// Generate tailored summary
console.log(' Generating summary...');
progressHelpers.generatingSummary({ title: job.title, employer: job.employer });
const summaryResult = await generateSummary(
job.jobDescription || '',
profile
@ -136,6 +168,8 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
// Generate PDF
console.log(' Generating PDF...');
progressHelpers.generatingPdf({ title: job.title, employer: job.employer });
const pdfResult = await generatePdf(
job.id,
summaryResult.summary!,
@ -150,10 +184,11 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
// Mark as ready
await jobsRepo.updateJob(job.id, {
status: 'ready',
pdfPath: pdfResult.pdfPath ?? null,
pdfPath: pdfResult.pdfPath ?? undefined,
});
processed++;
progressHelpers.jobComplete(processed, topJobs.length);
console.log(` ✅ Ready for review!`);
} catch (error) {
@ -173,6 +208,7 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
console.log(` Jobs discovered: ${created}`);
console.log(` Jobs processed: ${processed}`);
progressHelpers.complete(created, processed);
isPipelineRunning = false;
return {
@ -190,6 +226,7 @@ export async function runPipeline(config: Partial<PipelineConfig> = {}): Promise
errorMessage: message,
});
progressHelpers.failed(message);
isPipelineRunning = false;
console.error('\n❌ Pipeline failed:', message);
@ -250,7 +287,7 @@ export async function processJob(jobId: string): Promise<{
// Mark as ready
await jobsRepo.updateJob(job.id, {
status: 'ready',
pdfPath: pdfResult.pdfPath ?? null,
pdfPath: pdfResult.pdfPath ?? undefined,
});
console.log(' ✅ Done!');

View File

@ -0,0 +1,177 @@
/**
* Pipeline progress tracking with Server-Sent Events.
*/
export type PipelineStep =
| 'idle'
| 'crawling'
| 'importing'
| 'scoring'
| 'processing'
| 'completed'
| 'failed';
export interface PipelineProgress {
step: PipelineStep;
message: string;
detail?: string;
jobsDiscovered: number;
jobsScored: number;
jobsProcessed: number;
totalToProcess: number;
currentJob?: {
id: string;
title: string;
employer: string;
};
error?: string;
startedAt?: string;
completedAt?: string;
}
// Event emitter for progress updates
type ProgressListener = (progress: PipelineProgress) => void;
const listeners: Set<ProgressListener> = new Set();
let currentProgress: PipelineProgress = {
step: 'idle',
message: 'Ready',
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
};
/**
* Update the current progress and notify all listeners.
*/
export function updateProgress(update: Partial<PipelineProgress>): void {
currentProgress = { ...currentProgress, ...update };
// Notify all listeners
for (const listener of listeners) {
try {
listener(currentProgress);
} catch (error) {
console.error('Error in progress listener:', error);
}
}
}
/**
* Get the current progress state.
*/
export function getProgress(): PipelineProgress {
return { ...currentProgress };
}
/**
* Subscribe to progress updates.
*/
export function subscribeToProgress(listener: ProgressListener): () => void {
listeners.add(listener);
// Send current state immediately
listener(currentProgress);
// Return unsubscribe function
return () => {
listeners.delete(listener);
};
}
/**
* Reset progress to idle state.
*/
export function resetProgress(): void {
currentProgress = {
step: 'idle',
message: 'Ready',
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
};
}
/**
* Helper to create progress updates for each step.
*/
export const progressHelpers = {
startCrawling: () => updateProgress({
step: 'crawling',
message: 'Fetching jobs from sources...',
detail: 'Running Crawlee crawler',
startedAt: new Date().toISOString(),
jobsDiscovered: 0,
jobsScored: 0,
jobsProcessed: 0,
totalToProcess: 0,
}),
crawlingComplete: (jobsFound: number) => updateProgress({
step: 'importing',
message: `Found ${jobsFound} jobs, importing to database...`,
detail: 'Deduplicating and saving',
jobsDiscovered: jobsFound,
}),
importComplete: (created: number, skipped: number) => updateProgress({
step: 'scoring',
message: `Imported ${created} new jobs (${skipped} duplicates). Scoring...`,
detail: 'Using AI to evaluate job fit',
}),
scoringJob: (index: number, total: number, title: string) => updateProgress({
step: 'scoring',
message: `Scoring jobs (${index}/${total})...`,
detail: title,
jobsScored: index,
}),
scoringComplete: (totalScored: number, topN: number) => updateProgress({
step: 'processing',
message: `Scored ${totalScored} jobs. Processing top ${topN}...`,
detail: 'Generating tailored resumes',
jobsScored: totalScored,
totalToProcess: topN,
}),
processingJob: (index: number, total: number, job: { id: string; title: string; employer: string }) => updateProgress({
step: 'processing',
message: `Processing job ${index}/${total}...`,
detail: `${job.title} @ ${job.employer}`,
jobsProcessed: index - 1,
totalToProcess: total,
currentJob: job,
}),
generatingSummary: (job: { title: string; employer: string }) => updateProgress({
detail: `Generating summary for ${job.title}...`,
}),
generatingPdf: (job: { title: string; employer: string }) => updateProgress({
detail: `Generating PDF for ${job.title}...`,
}),
jobComplete: (index: number, total: number) => updateProgress({
jobsProcessed: index,
detail: `Completed ${index}/${total} jobs`,
}),
complete: (discovered: number, processed: number) => updateProgress({
step: 'completed',
message: `Pipeline complete! Discovered ${discovered} jobs, processed ${processed}.`,
detail: 'Ready for review',
completedAt: new Date().toISOString(),
currentJob: undefined,
}),
failed: (error: string) => updateProgress({
step: 'failed',
message: 'Pipeline failed',
detail: error,
error,
completedAt: new Date().toISOString(),
}),
};