diff --git a/orchestrator/src/client/App.tsx b/orchestrator/src/client/App.tsx index 43bb021..41a6267 100644 --- a/orchestrator/src/client/App.tsx +++ b/orchestrator/src/client/App.tsx @@ -4,7 +4,7 @@ import React, { useState, useEffect, useCallback } from 'react'; import type { Job, JobStatus } from '../shared/types'; -import { Header, Stats, JobList, ToastContainer, Toast } from './components'; +import { Header, Stats, JobList, ToastContainer, Toast, PipelineProgress } from './components'; import * as api from './api'; export const App: React.FC = () => { @@ -158,6 +158,8 @@ export const App: React.FC = () => { />
+ + = { + idle: 'Ready', + crawling: 'Crawling Jobs', + importing: 'Importing', + scoring: 'Scoring Jobs', + processing: 'Generating Resumes', + completed: 'Complete', + failed: 'Failed', +}; + +const stepColors: Record = { + idle: 'var(--color-muted)', + crawling: 'var(--color-info)', + importing: 'var(--color-info)', + scoring: 'var(--color-warning)', + processing: 'var(--color-primary-500)', + completed: 'var(--color-success)', + failed: 'var(--color-error)', +}; + +export const PipelineProgress: React.FC = ({ isRunning }) => { + const [progress, setProgress] = useState(null); + const [isConnected, setIsConnected] = useState(false); + + useEffect(() => { + if (!isRunning) { + setProgress(null); + return; + } + + // Connect to SSE endpoint + const eventSource = new EventSource('/api/pipeline/progress'); + + eventSource.onopen = () => { + setIsConnected(true); + }; + + eventSource.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + setProgress(data); + } catch { + // Ignore parse errors + } + }; + + eventSource.onerror = () => { + setIsConnected(false); + }; + + return () => { + eventSource.close(); + setIsConnected(false); + }; + }, [isRunning]); + + if (!isRunning && !progress) { + return null; + } + + const step = progress?.step || 'idle'; + const isActive = progress && step !== 'idle' && step !== 'completed' && step !== 'failed'; + + // Calculate overall progress percentage + let percentage = 0; + if (progress) { + switch (step) { + case 'crawling': + percentage = 10; + break; + case 'importing': + percentage = 20; + break; + case 'scoring': + if (progress.jobsScored > 0) { + percentage = 20 + (progress.jobsScored / Math.max(progress.jobsDiscovered, 1)) * 30; + } else { + percentage = 25; + } + break; + case 'processing': + if (progress.totalToProcess > 0) { + percentage = 50 + (progress.jobsProcessed / progress.totalToProcess) * 50; + } else { + percentage = 55; + } + break; + case 'completed': + percentage = 100; + break; + case 'failed': + percentage = 100; + break; + } + } + + return ( +
+ {/* Header */} +
+
+ {isActive && ( +
+ )} + + {stepLabels[step]} + +
+ + {Math.round(percentage)}% + +
+ + {/* Progress bar */} +
+
+
+ + {/* Message */} + {progress && ( +
+

+ {progress.message} +

+ {progress.detail && ( +

+ {progress.detail} +

+ )} +
+ )} + + {/* Stats */} + {progress && (step === 'scoring' || step === 'processing' || step === 'completed') && ( +
+
+ Discovered: + + {progress.jobsDiscovered} + +
+ {progress.jobsScored > 0 && ( +
+ Scored: + + {progress.jobsScored} + +
+ )} + {progress.totalToProcess > 0 && ( +
+ Processed: + + {progress.jobsProcessed}/{progress.totalToProcess} + +
+ )} +
+ )} + + {/* Error state */} + {step === 'failed' && progress?.error && ( +
+ {progress.error} +
+ )} +
+ ); +}; diff --git a/orchestrator/src/client/components/index.ts b/orchestrator/src/client/components/index.ts index 362abdb..3246784 100644 --- a/orchestrator/src/client/components/index.ts +++ b/orchestrator/src/client/components/index.ts @@ -5,4 +5,5 @@ export { ScoreIndicator } from './ScoreIndicator'; export { JobCard } from './JobCard'; export { JobList } from './JobList'; export { ToastContainer, type Toast } from './Toast'; +export { PipelineProgress } from './PipelineProgress'; export * from './Icons'; diff --git a/orchestrator/src/server/api/routes.ts b/orchestrator/src/server/api/routes.ts index 872f3bb..71adc76 100644 --- a/orchestrator/src/server/api/routes.ts +++ b/orchestrator/src/server/api/routes.ts @@ -6,7 +6,7 @@ import { Router, Request, Response } from 'express'; import { z } from 'zod'; import * as jobsRepo from '../repositories/jobs.js'; import * as pipelineRepo from '../repositories/pipeline.js'; -import { runPipeline, processJob, getPipelineStatus } from '../pipeline/index.js'; +import { runPipeline, processJob, getPipelineStatus, subscribeToProgress, getProgress } from '../pipeline/index.js'; import { createNotionEntry } from '../services/notion.js'; import { clearDatabase } from '../db/clear.js'; import type { JobStatus, ApiResponse, JobsListResponse, PipelineStatusResponse } from '../../shared/types.js'; @@ -198,6 +198,36 @@ apiRouter.get('/pipeline/status', async (req: Request, res: Response) => { } }); +/** + * GET /api/pipeline/progress - Server-Sent Events endpoint for live progress + */ +apiRouter.get('/pipeline/progress', (req: Request, res: Response) => { + // Set headers for SSE + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); // Disable Nginx buffering + + // Send initial progress + const sendProgress = (data: unknown) => { + res.write(`data: ${JSON.stringify(data)}\n\n`); + }; + + // Subscribe to progress updates + const unsubscribe = subscribeToProgress(sendProgress); + + // Send heartbeat every 30 seconds to keep connection alive + const heartbeat = setInterval(() => { + res.write(': heartbeat\n\n'); + }, 30000); + + // Cleanup on close + req.on('close', () => { + clearInterval(heartbeat); + unsubscribe(); + }); +}); + /** * GET /api/pipeline/runs - Get recent pipeline runs */ diff --git a/orchestrator/src/server/pipeline/index.ts b/orchestrator/src/server/pipeline/index.ts index 47060d4..2a27e2f 100644 --- a/orchestrator/src/server/pipeline/index.ts +++ b/orchestrator/src/server/pipeline/index.ts @@ -1 +1,2 @@ export * from './orchestrator.js'; +export * from './progress.js'; diff --git a/orchestrator/src/server/pipeline/orchestrator.ts b/orchestrator/src/server/pipeline/orchestrator.ts index dd2b3e7..0a7d75c 100644 --- a/orchestrator/src/server/pipeline/orchestrator.ts +++ b/orchestrator/src/server/pipeline/orchestrator.ts @@ -14,11 +14,12 @@ import { readFile } from 'fs/promises'; import { join, dirname } from 'path'; import { fileURLToPath } from 'url'; import { runCrawler } from '../services/crawler.js'; -import { scoreAndRankJobs } from '../services/scorer.js'; +import { scoreAndRankJobs, scoreJobSuitability } from '../services/scorer.js'; import { generateSummary } from '../services/summary.js'; import { generatePdf } from '../services/pdf.js'; import * as jobsRepo from '../repositories/jobs.js'; import * as pipelineRepo from '../repositories/pipeline.js'; +import { progressHelpers, resetProgress } from './progress.js'; import type { Job, PipelineConfig } from '../../shared/types.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); @@ -54,6 +55,7 @@ export async function runPipeline(config: Partial = {}): Promise } isPipelineRunning = true; + resetProgress(); const mergedConfig = { ...DEFAULT_CONFIG, ...config }; // Create pipeline run record @@ -69,39 +71,60 @@ export async function runPipeline(config: Partial = {}): Promise // Step 2: Run crawler console.log('\nšŸ•·ļø Running crawler...'); + progressHelpers.startCrawling(); const crawlerResult = await runCrawler(); if (!crawlerResult.success) { throw new Error(`Crawler failed: ${crawlerResult.error}`); } + progressHelpers.crawlingComplete(crawlerResult.jobs.length); + // Step 3: Import discovered jobs console.log('\nšŸ’¾ Importing jobs to database...'); const { created, skipped } = await jobsRepo.bulkCreateJobs(crawlerResult.jobs); console.log(` Created: ${created}, Skipped (duplicates): ${skipped}`); + progressHelpers.importComplete(created, skipped); + await pipelineRepo.updatePipelineRun(pipelineRun.id, { jobsDiscovered: created, }); // Step 4: Get unprocessed jobs and score them console.log('\nšŸŽÆ Scoring jobs for suitability...'); - const unprocessedJobs = await jobsRepo.getJobsForProcessing(50); // Get more than topN for ranking - const rankedJobs = await scoreAndRankJobs(unprocessedJobs, profile); + const unprocessedJobs = await jobsRepo.getJobsForProcessing(50); - // Update scores in database - for (const job of rankedJobs) { + // Score jobs with progress updates + const scoredJobs: Array = []; + for (let i = 0; i < unprocessedJobs.length; i++) { + const job = unprocessedJobs[i]; + progressHelpers.scoringJob(i + 1, unprocessedJobs.length, job.title); + + const { score, reason } = await scoreJobSuitability(job, profile); + scoredJobs.push({ + ...job, + suitabilityScore: score, + suitabilityReason: reason, + }); + + // Update score in database await jobsRepo.updateJob(job.id, { - suitabilityScore: job.suitabilityScore, - suitabilityReason: job.suitabilityReason, + suitabilityScore: score, + suitabilityReason: reason, }); } + // Sort by score + scoredJobs.sort((a, b) => b.suitabilityScore - a.suitabilityScore); + // Step 5: Pick top N jobs above threshold - const topJobs = rankedJobs + const topJobs = scoredJobs .filter(j => j.suitabilityScore >= mergedConfig.minSuitabilityScore) .slice(0, mergedConfig.topN); + progressHelpers.scoringComplete(scoredJobs.length, topJobs.length); + console.log(`\nšŸ“Š Selected ${topJobs.length} top jobs for processing:`); for (const job of topJobs) { console.log(` - ${job.title} @ ${job.employer} (score: ${job.suitabilityScore})`); @@ -110,15 +133,24 @@ export async function runPipeline(config: Partial = {}): Promise // Step 6: Process each top job let processed = 0; - for (const job of topJobs) { + for (let i = 0; i < topJobs.length; i++) { + const job = topJobs[i]; console.log(`\nšŸ“ Processing: ${job.title} @ ${job.employer}`); + progressHelpers.processingJob(i + 1, topJobs.length, { + id: job.id, + title: job.title, + employer: job.employer, + }); + try { // Mark as processing await jobsRepo.updateJob(job.id, { status: 'processing' }); // Generate tailored summary console.log(' Generating summary...'); + progressHelpers.generatingSummary({ title: job.title, employer: job.employer }); + const summaryResult = await generateSummary( job.jobDescription || '', profile @@ -136,6 +168,8 @@ export async function runPipeline(config: Partial = {}): Promise // Generate PDF console.log(' Generating PDF...'); + progressHelpers.generatingPdf({ title: job.title, employer: job.employer }); + const pdfResult = await generatePdf( job.id, summaryResult.summary!, @@ -150,10 +184,11 @@ export async function runPipeline(config: Partial = {}): Promise // Mark as ready await jobsRepo.updateJob(job.id, { status: 'ready', - pdfPath: pdfResult.pdfPath ?? null, + pdfPath: pdfResult.pdfPath ?? undefined, }); processed++; + progressHelpers.jobComplete(processed, topJobs.length); console.log(` āœ… Ready for review!`); } catch (error) { @@ -173,6 +208,7 @@ export async function runPipeline(config: Partial = {}): Promise console.log(` Jobs discovered: ${created}`); console.log(` Jobs processed: ${processed}`); + progressHelpers.complete(created, processed); isPipelineRunning = false; return { @@ -190,6 +226,7 @@ export async function runPipeline(config: Partial = {}): Promise errorMessage: message, }); + progressHelpers.failed(message); isPipelineRunning = false; console.error('\nāŒ Pipeline failed:', message); @@ -250,7 +287,7 @@ export async function processJob(jobId: string): Promise<{ // Mark as ready await jobsRepo.updateJob(job.id, { status: 'ready', - pdfPath: pdfResult.pdfPath ?? null, + pdfPath: pdfResult.pdfPath ?? undefined, }); console.log(' āœ… Done!'); diff --git a/orchestrator/src/server/pipeline/progress.ts b/orchestrator/src/server/pipeline/progress.ts new file mode 100644 index 0000000..f0ad7a9 --- /dev/null +++ b/orchestrator/src/server/pipeline/progress.ts @@ -0,0 +1,177 @@ +/** + * Pipeline progress tracking with Server-Sent Events. + */ + +export type PipelineStep = + | 'idle' + | 'crawling' + | 'importing' + | 'scoring' + | 'processing' + | 'completed' + | 'failed'; + +export interface PipelineProgress { + step: PipelineStep; + message: string; + detail?: string; + jobsDiscovered: number; + jobsScored: number; + jobsProcessed: number; + totalToProcess: number; + currentJob?: { + id: string; + title: string; + employer: string; + }; + error?: string; + startedAt?: string; + completedAt?: string; +} + +// Event emitter for progress updates +type ProgressListener = (progress: PipelineProgress) => void; +const listeners: Set = new Set(); + +let currentProgress: PipelineProgress = { + step: 'idle', + message: 'Ready', + jobsDiscovered: 0, + jobsScored: 0, + jobsProcessed: 0, + totalToProcess: 0, +}; + +/** + * Update the current progress and notify all listeners. + */ +export function updateProgress(update: Partial): void { + currentProgress = { ...currentProgress, ...update }; + + // Notify all listeners + for (const listener of listeners) { + try { + listener(currentProgress); + } catch (error) { + console.error('Error in progress listener:', error); + } + } +} + +/** + * Get the current progress state. + */ +export function getProgress(): PipelineProgress { + return { ...currentProgress }; +} + +/** + * Subscribe to progress updates. + */ +export function subscribeToProgress(listener: ProgressListener): () => void { + listeners.add(listener); + + // Send current state immediately + listener(currentProgress); + + // Return unsubscribe function + return () => { + listeners.delete(listener); + }; +} + +/** + * Reset progress to idle state. + */ +export function resetProgress(): void { + currentProgress = { + step: 'idle', + message: 'Ready', + jobsDiscovered: 0, + jobsScored: 0, + jobsProcessed: 0, + totalToProcess: 0, + }; +} + +/** + * Helper to create progress updates for each step. + */ +export const progressHelpers = { + startCrawling: () => updateProgress({ + step: 'crawling', + message: 'Fetching jobs from sources...', + detail: 'Running Crawlee crawler', + startedAt: new Date().toISOString(), + jobsDiscovered: 0, + jobsScored: 0, + jobsProcessed: 0, + totalToProcess: 0, + }), + + crawlingComplete: (jobsFound: number) => updateProgress({ + step: 'importing', + message: `Found ${jobsFound} jobs, importing to database...`, + detail: 'Deduplicating and saving', + jobsDiscovered: jobsFound, + }), + + importComplete: (created: number, skipped: number) => updateProgress({ + step: 'scoring', + message: `Imported ${created} new jobs (${skipped} duplicates). Scoring...`, + detail: 'Using AI to evaluate job fit', + }), + + scoringJob: (index: number, total: number, title: string) => updateProgress({ + step: 'scoring', + message: `Scoring jobs (${index}/${total})...`, + detail: title, + jobsScored: index, + }), + + scoringComplete: (totalScored: number, topN: number) => updateProgress({ + step: 'processing', + message: `Scored ${totalScored} jobs. Processing top ${topN}...`, + detail: 'Generating tailored resumes', + jobsScored: totalScored, + totalToProcess: topN, + }), + + processingJob: (index: number, total: number, job: { id: string; title: string; employer: string }) => updateProgress({ + step: 'processing', + message: `Processing job ${index}/${total}...`, + detail: `${job.title} @ ${job.employer}`, + jobsProcessed: index - 1, + totalToProcess: total, + currentJob: job, + }), + + generatingSummary: (job: { title: string; employer: string }) => updateProgress({ + detail: `Generating summary for ${job.title}...`, + }), + + generatingPdf: (job: { title: string; employer: string }) => updateProgress({ + detail: `Generating PDF for ${job.title}...`, + }), + + jobComplete: (index: number, total: number) => updateProgress({ + jobsProcessed: index, + detail: `Completed ${index}/${total} jobs`, + }), + + complete: (discovered: number, processed: number) => updateProgress({ + step: 'completed', + message: `Pipeline complete! Discovered ${discovered} jobs, processed ${processed}.`, + detail: 'Ready for review', + completedAt: new Date().toISOString(), + currentJob: undefined, + }), + + failed: (error: string) => updateProgress({ + step: 'failed', + message: 'Pipeline failed', + detail: error, + error, + completedAt: new Date().toISOString(), + }), +};