From b456ab19515d3152a467ff027f9eeadca4c9b8b0 Mon Sep 17 00:00:00 2001 From: Shaheer Sarfaraz <53654735+DaKheera47@users.noreply.github.com> Date: Mon, 9 Feb 2026 20:49:54 +0000 Subject: [PATCH] Job api performance (#117) * feat(api): add lightweight jobs list view without pagination * refactor(client): use lightweight jobs list and on-demand job detail * refactor(ui): separate job list rows from full job detail * perf(home): reduce jobs payload to applied lightweight rows * perf(db): add safe composite index for jobs list queries * feat(api): default jobs endpoint to lightweight list view * style: apply biome formatting for jobs list-view changes * feat(api): add jobs revision endpoint for lightweight change detection * refactor(client): switch jobs auto-refresh to revision checks * perf(client): drive pipeline freshness via sse with polling fallback * refactor(orchestrator): remove pipeline status polling loop from page * chore(client): add periodic safety refresh and refresh lifecycle hardening * refactor(types): define JobListItem via Pick to prevent drift --- orchestrator/src/client/api/client.ts | 38 +- orchestrator/src/client/pages/HomePage.tsx | 5 +- .../client/pages/OrchestratorPage.test.tsx | 91 ++++ .../src/client/pages/OrchestratorPage.tsx | 53 +-- .../pages/orchestrator/JobCommandBar.tsx | 4 +- .../pages/orchestrator/JobCommandBar.utils.ts | 17 +- .../pages/orchestrator/JobDetailPanel.tsx | 4 +- .../pages/orchestrator/JobListPanel.tsx | 6 +- .../pages/orchestrator/JobRowContent.tsx | 4 +- .../client/pages/orchestrator/bulkActions.ts | 8 +- .../pages/orchestrator/useBulkJobSelection.ts | 4 +- .../pages/orchestrator/useFilteredJobs.ts | 4 +- .../orchestrator/useOrchestratorData.test.ts | 407 +++++++++++++++++- .../pages/orchestrator/useOrchestratorData.ts | 288 ++++++++++++- .../src/client/pages/orchestrator/utils.ts | 14 +- .../src/server/api/routes/jobs.test.ts | 90 ++++ orchestrator/src/server/api/routes/jobs.ts | 124 +++++- orchestrator/src/server/db/migrate.ts | 11 + orchestrator/src/server/repositories/jobs.ts | 88 ++++ shared/src/types.ts | 37 +- 20 files changed, 1177 insertions(+), 120 deletions(-) diff --git a/orchestrator/src/client/api/client.ts b/orchestrator/src/client/api/client.ts index 71b9702..c612e6f 100644 --- a/orchestrator/src/client/api/client.ts +++ b/orchestrator/src/client/api/client.ts @@ -12,9 +12,11 @@ import type { BulkJobActionResponse, DemoInfoResponse, Job, + JobListItem, JobOutcome, JobSource, JobsListResponse, + JobsRevisionResponse, ManualJobDraft, ManualJobFetchResponse, ManualJobInferenceResponse, @@ -157,9 +159,39 @@ async function fetchApi( } // Jobs API -export async function getJobs(statuses?: string[]): Promise { - const query = statuses?.length ? `?status=${statuses.join(",")}` : ""; - return fetchApi(`/jobs${query}`); +export function getJobs(): Promise>; +export function getJobs(options: { + statuses?: string[]; + view?: "list"; +}): Promise>; +export function getJobs(options?: { + statuses?: string[]; + view: "full"; +}): Promise>; +export async function getJobs(options?: { + statuses?: string[]; + view?: "full" | "list"; +}): Promise | JobsListResponse> { + const params = new URLSearchParams(); + if (options?.statuses?.length) + params.set("status", options.statuses.join(",")); + if (options?.view) params.set("view", options.view); + const query = params.toString(); + return fetchApi | JobsListResponse>( + `/jobs${query ? `?${query}` : ""}`, + ); +} + +export async function getJobsRevision(options?: { + statuses?: string[]; +}): Promise { + const params = new URLSearchParams(); + if (options?.statuses?.length) + params.set("status", options.statuses.join(",")); + const query = params.toString(); + return fetchApi( + `/jobs/revision${query ? `?${query}` : ""}`, + ); } export async function getJob(id: string): Promise { diff --git a/orchestrator/src/client/pages/HomePage.tsx b/orchestrator/src/client/pages/HomePage.tsx index 959bd06..207be3e 100644 --- a/orchestrator/src/client/pages/HomePage.tsx +++ b/orchestrator/src/client/pages/HomePage.tsx @@ -60,7 +60,10 @@ export const HomePage: React.FC = () => { setIsLoading(true); api - .getJobs() + .getJobs({ + statuses: ["applied"], + view: "list", + }) .then(async (response) => { if (!isMounted) return; const appliedDates = response.jobs.map((job) => job.appliedAt); diff --git a/orchestrator/src/client/pages/OrchestratorPage.test.tsx b/orchestrator/src/client/pages/OrchestratorPage.test.tsx index 20c9b70..34a5fc7 100644 --- a/orchestrator/src/client/pages/OrchestratorPage.test.tsx +++ b/orchestrator/src/client/pages/OrchestratorPage.test.tsx @@ -1,6 +1,7 @@ import type { Job } from "@shared/types.js"; import { fireEvent, render, screen, waitFor } from "@testing-library/react"; import { MemoryRouter, Route, Routes, useLocation } from "react-router-dom"; +import { toast } from "sonner"; import { beforeEach, describe, expect, it, vi } from "vitest"; import * as api from "../api"; import { OrchestratorPage } from "./OrchestratorPage"; @@ -21,7 +22,20 @@ vi.mock("../api", () => ({ }), })); +vi.mock("sonner", () => ({ + toast: { + message: vi.fn(), + success: vi.fn(), + error: vi.fn(), + }, +})); + let mockIsPipelineRunning = false; +let mockPipelineTerminalEvent: { + status: "completed" | "cancelled" | "failed"; + errorMessage: string | null; + token: number; +} | null = null; let mockPipelineSources = ["linkedin"] as Array< "gradcracker" | "indeed" | "linkedin" | "ukvisajobs" >; @@ -112,6 +126,7 @@ const createMatchMedia = (matches: boolean) => vi.mock("./orchestrator/useOrchestratorData", () => ({ useOrchestratorData: () => ({ jobs: [jobFixture, job2, processingJob], + selectedJob: jobFixture, stats: { discovered: 1, processing: 1, @@ -123,6 +138,8 @@ vi.mock("./orchestrator/useOrchestratorData", () => ({ isLoading: false, isPipelineRunning: mockIsPipelineRunning, setIsPipelineRunning: vi.fn(), + pipelineTerminalEvent: mockPipelineTerminalEvent, + setIsRefreshPaused: vi.fn(), loadJobs: vi.fn(), }), })); @@ -363,6 +380,7 @@ describe("OrchestratorPage", () => { beforeEach(() => { vi.clearAllMocks(); mockIsPipelineRunning = false; + mockPipelineTerminalEvent = null; mockPipelineSources = ["linkedin"]; mockAutomaticRunValues = { topN: 12, @@ -699,10 +717,83 @@ describe("OrchestratorPage", () => { minSuitabilityScore: 55, sources: ["linkedin"], }); + expect(setIntervalSpy).not.toHaveBeenCalledWith(expect.any(Function), 5000); setIntervalSpy.mockRestore(); }); + it("shows completion toast from hook terminal state", async () => { + mockPipelineTerminalEvent = { + status: "completed", + errorMessage: null, + token: 1, + }; + window.matchMedia = createMatchMedia( + true, + ) as unknown as typeof window.matchMedia; + + render( + + + } /> + } /> + + , + ); + + await waitFor(() => { + expect(toast.success).toHaveBeenCalledWith("Pipeline completed"); + }); + }); + + it("shows cancelled toast from hook terminal state", async () => { + mockPipelineTerminalEvent = { + status: "cancelled", + errorMessage: null, + token: 1, + }; + window.matchMedia = createMatchMedia( + true, + ) as unknown as typeof window.matchMedia; + + render( + + + } /> + } /> + + , + ); + + await waitFor(() => { + expect(toast.message).toHaveBeenCalledWith("Pipeline cancelled"); + }); + }); + + it("shows failed toast from hook terminal state", async () => { + mockPipelineTerminalEvent = { + status: "failed", + errorMessage: "Pipeline exploded", + token: 1, + }; + window.matchMedia = createMatchMedia( + true, + ) as unknown as typeof window.matchMedia; + + render( + + + } /> + } /> + + , + ); + + await waitFor(() => { + expect(toast.error).toHaveBeenCalledWith("Pipeline exploded"); + }); + }); + it("blocks automatic run when no sources are compatible for selected country", async () => { window.matchMedia = createMatchMedia( true, diff --git a/orchestrator/src/client/pages/OrchestratorPage.tsx b/orchestrator/src/client/pages/OrchestratorPage.tsx index 6985ff7..c262866 100644 --- a/orchestrator/src/client/pages/OrchestratorPage.tsx +++ b/orchestrator/src/client/pages/OrchestratorPage.tsx @@ -120,13 +120,15 @@ export const OrchestratorPage: React.FC = () => { const { settings, refreshSettings } = useSettings(); const { jobs, + selectedJob, stats, isLoading, isPipelineRunning, setIsPipelineRunning, + pipelineTerminalEvent, setIsRefreshPaused, loadJobs, - } = useOrchestratorData(); + } = useOrchestratorData(selectedJobId); const enabledSources = useMemo( () => getEnabledSources(settings ?? null), [settings], @@ -144,13 +146,6 @@ export const OrchestratorPage: React.FC = () => { ); const counts = useMemo(() => getJobCounts(jobs), [jobs]); const sourcesWithJobs = useMemo(() => getSourcesWithJobs(jobs), [jobs]); - const selectedJob = useMemo( - () => - selectedJobId - ? (jobs.find((job) => job.id === selectedJobId) ?? null) - : null, - [jobs, selectedJobId], - ); const { selectedJobIds, canSkipSelected, @@ -200,28 +195,6 @@ export const OrchestratorPage: React.FC = () => { toast.message("Pipeline started", { description: `Sources: ${config.sources.join(", ")}. This may take a few minutes.`, }); - - const pollInterval = setInterval(async () => { - try { - const status = await api.getPipelineStatus(); - if (!status.isRunning) { - clearInterval(pollInterval); - setIsPipelineRunning(false); - setIsCancelling(false); - await loadJobs(); - const outcome = status.lastRun?.status; - if (outcome === "cancelled") { - toast.message("Pipeline cancelled"); - } else if (outcome === "failed") { - toast.error(status.lastRun?.errorMessage || "Pipeline failed"); - } else { - toast.success("Pipeline completed"); - } - } - } catch { - // Ignore errors - } - }, 5000); } catch (error) { setIsPipelineRunning(false); setIsCancelling(false); @@ -230,9 +203,27 @@ export const OrchestratorPage: React.FC = () => { toast.error(message); } }, - [loadJobs, setIsPipelineRunning], + [setIsPipelineRunning], ); + useEffect(() => { + if (!pipelineTerminalEvent) return; + setIsPipelineRunning(false); + setIsCancelling(false); + + if (pipelineTerminalEvent.status === "cancelled") { + toast.message("Pipeline cancelled"); + return; + } + + if (pipelineTerminalEvent.status === "failed") { + toast.error(pipelineTerminalEvent.errorMessage || "Pipeline failed"); + return; + } + + toast.success("Pipeline completed"); + }, [pipelineTerminalEvent, setIsPipelineRunning]); + const handleCancelPipeline = useCallback(async () => { if (isCancelling || !isPipelineRunning) return; diff --git a/orchestrator/src/client/pages/orchestrator/JobCommandBar.tsx b/orchestrator/src/client/pages/orchestrator/JobCommandBar.tsx index aaa3c9d..2b54785 100644 --- a/orchestrator/src/client/pages/orchestrator/JobCommandBar.tsx +++ b/orchestrator/src/client/pages/orchestrator/JobCommandBar.tsx @@ -1,5 +1,5 @@ import { isMetaKeyPressed } from "@client/lib/meta-key"; -import type { Job } from "@shared/types.js"; +import type { JobListItem } from "@shared/types.js"; import type React from "react"; import { useCallback, useEffect, useMemo, useState } from "react"; import { @@ -29,7 +29,7 @@ import { JobCommandBarLockSuggestions } from "./JobCommandBarLockSuggestions"; import { JobRowContent } from "./JobRowContent"; interface JobCommandBarProps { - jobs: Job[]; + jobs: JobListItem[]; onSelectJob: (tab: FilterTab, jobId: string) => void; open?: boolean; onOpenChange?: (open: boolean) => void; diff --git a/orchestrator/src/client/pages/orchestrator/JobCommandBar.utils.ts b/orchestrator/src/client/pages/orchestrator/JobCommandBar.utils.ts index a4c8e21..9973cf9 100644 --- a/orchestrator/src/client/pages/orchestrator/JobCommandBar.utils.ts +++ b/orchestrator/src/client/pages/orchestrator/JobCommandBar.utils.ts @@ -1,4 +1,4 @@ -import type { Job, JobStatus } from "@shared/types.js"; +import type { JobListItem, JobStatus } from "@shared/types.js"; import type { FilterTab } from "./constants"; export type CommandGroupId = "ready" | "discovered" | "applied" | "other"; @@ -119,7 +119,7 @@ export const resolveLockFromAliasPrefix = ( return matches[0]; }; -export const jobMatchesLock = (job: Job, lock: StatusLock) => { +export const jobMatchesLock = (job: JobListItem, lock: StatusLock) => { if (lock === "ready") return job.status === "ready"; if (lock === "discovered") return job.status === "discovered"; if (lock === "applied") return job.status === "applied"; @@ -128,7 +128,10 @@ export const jobMatchesLock = (job: Job, lock: StatusLock) => { return false; }; -export const computeJobMatchScore = (job: Job, normalizedQuery: string) => { +export const computeJobMatchScore = ( + job: JobListItem, + normalizedQuery: string, +) => { if (!normalizedQuery) return 0; const titleScore = computeFieldMatchScore(job.title, normalizedQuery); const employerScore = computeFieldMatchScore(job.employer, normalizedQuery); @@ -145,10 +148,10 @@ export const computeJobMatchScore = (job: Job, normalizedQuery: string) => { }; export const groupJobsForCommandBar = ( - scopedJobs: Job[], + scopedJobs: JobListItem[], normalizedQuery: string, -): Record => { - const groups: Record = { +): Record => { + const groups: Record = { ready: [], discovered: [], applied: [], @@ -179,7 +182,7 @@ export const groupJobsForCommandBar = ( }; export const orderCommandGroups = ( - groupedJobs: Record, + groupedJobs: Record, normalizedQuery: string, ) => { if (!normalizedQuery) return commandGroupMeta; diff --git a/orchestrator/src/client/pages/orchestrator/JobDetailPanel.tsx b/orchestrator/src/client/pages/orchestrator/JobDetailPanel.tsx index e5ad6ad..d022ba2 100644 --- a/orchestrator/src/client/pages/orchestrator/JobDetailPanel.tsx +++ b/orchestrator/src/client/pages/orchestrator/JobDetailPanel.tsx @@ -1,4 +1,4 @@ -import type { Job } from "@shared/types.js"; +import type { Job, JobListItem } from "@shared/types.js"; import { CheckCircle2, Copy, @@ -46,7 +46,7 @@ import type { FilterTab } from "./constants"; interface JobDetailPanelProps { activeTab: FilterTab; - activeJobs: Job[]; + activeJobs: JobListItem[]; selectedJob: Job | null; onSelectJobId: (jobId: string | null) => void; onJobUpdated: () => Promise; diff --git a/orchestrator/src/client/pages/orchestrator/JobListPanel.tsx b/orchestrator/src/client/pages/orchestrator/JobListPanel.tsx index 624eca7..887dbfa 100644 --- a/orchestrator/src/client/pages/orchestrator/JobListPanel.tsx +++ b/orchestrator/src/client/pages/orchestrator/JobListPanel.tsx @@ -1,4 +1,4 @@ -import type { Job } from "@shared/types.js"; +import type { JobListItem } from "@shared/types.js"; import { Loader2 } from "lucide-react"; import type React from "react"; import { Checkbox } from "@/components/ui/checkbox"; @@ -9,8 +9,8 @@ import { JobRowContent } from "./JobRowContent"; interface JobListPanelProps { isLoading: boolean; - jobs: Job[]; - activeJobs: Job[]; + jobs: JobListItem[]; + activeJobs: JobListItem[]; selectedJobId: string | null; selectedJobIds: Set; activeTab: FilterTab; diff --git a/orchestrator/src/client/pages/orchestrator/JobRowContent.tsx b/orchestrator/src/client/pages/orchestrator/JobRowContent.tsx index c2731a4..16887ff 100644 --- a/orchestrator/src/client/pages/orchestrator/JobRowContent.tsx +++ b/orchestrator/src/client/pages/orchestrator/JobRowContent.tsx @@ -1,9 +1,9 @@ -import type { Job } from "@shared/types.js"; +import type { JobListItem } from "@shared/types.js"; import { cn } from "@/lib/utils"; import { defaultStatusToken, statusTokens } from "./constants"; interface JobRowContentProps { - job: Job; + job: JobListItem; isSelected?: boolean; showStatusDot?: boolean; statusDotClassName?: string; diff --git a/orchestrator/src/client/pages/orchestrator/bulkActions.ts b/orchestrator/src/client/pages/orchestrator/bulkActions.ts index cc09fea..71417f4 100644 --- a/orchestrator/src/client/pages/orchestrator/bulkActions.ts +++ b/orchestrator/src/client/pages/orchestrator/bulkActions.ts @@ -1,18 +1,18 @@ -import type { BulkJobActionResponse, Job } from "@shared/types"; +import type { BulkJobActionResponse, JobListItem } from "@shared/types"; const SKIPPABLE_STATUSES = new Set(["discovered", "ready"]); -export function canBulkSkip(jobs: Job[]): boolean { +export function canBulkSkip(jobs: JobListItem[]): boolean { return ( jobs.length > 0 && jobs.every((job) => SKIPPABLE_STATUSES.has(job.status)) ); } -export function canBulkMoveToReady(jobs: Job[]): boolean { +export function canBulkMoveToReady(jobs: JobListItem[]): boolean { return jobs.length > 0 && jobs.every((job) => job.status === "discovered"); } -export function canBulkRescore(jobs: Job[]): boolean { +export function canBulkRescore(jobs: JobListItem[]): boolean { return jobs.length > 0 && jobs.every((job) => job.status !== "processing"); } diff --git a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.ts b/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.ts index d74768d..b76bbc5 100644 --- a/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.ts +++ b/orchestrator/src/client/pages/orchestrator/useBulkJobSelection.ts @@ -1,4 +1,4 @@ -import type { BulkJobAction, Job } from "@shared/types.js"; +import type { BulkJobAction, JobListItem } from "@shared/types.js"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import * as api from "../../api"; @@ -13,7 +13,7 @@ import type { FilterTab } from "./constants"; const MAX_BULK_ACTION_JOB_IDS = 100; interface UseBulkJobSelectionArgs { - activeJobs: Job[]; + activeJobs: JobListItem[]; activeTab: FilterTab; loadJobs: () => Promise; } diff --git a/orchestrator/src/client/pages/orchestrator/useFilteredJobs.ts b/orchestrator/src/client/pages/orchestrator/useFilteredJobs.ts index d79ba24..e341ce3 100644 --- a/orchestrator/src/client/pages/orchestrator/useFilteredJobs.ts +++ b/orchestrator/src/client/pages/orchestrator/useFilteredJobs.ts @@ -1,4 +1,4 @@ -import type { Job, JobSource } from "@shared/types"; +import type { JobListItem, JobSource } from "@shared/types"; import { useMemo } from "react"; import type { FilterTab, @@ -16,7 +16,7 @@ const getSponsorCategory = (score: number | null): SponsorFilter => { }; export const useFilteredJobs = ( - jobs: Job[], + jobs: JobListItem[], activeTab: FilterTab, sourceFilter: JobSource | "all", sponsorFilter: SponsorFilter, diff --git a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts index 0c25c9f..5574535 100644 --- a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts +++ b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.test.ts @@ -5,6 +5,8 @@ import { useOrchestratorData } from "./useOrchestratorData"; vi.mock("../../api", () => ({ getJobs: vi.fn(), + getJobsRevision: vi.fn(), + getJob: vi.fn(), getPipelineStatus: vi.fn(), })); @@ -14,7 +16,34 @@ vi.mock("sonner", () => ({ }, })); -const makeResponse = (jobId: string) => ({ +class MockEventSource { + static instances: MockEventSource[] = []; + onopen: ((event: Event) => void) | null = null; + onmessage: ((event: MessageEvent) => void) | null = null; + onerror: ((event: Event) => void) | null = null; + + constructor(public url: string) { + MockEventSource.instances.push(this); + } + + close = vi.fn(); + + emitOpen() { + this.onopen?.(new Event("open")); + } + + emitMessage(payload: unknown) { + this.onmessage?.({ + data: JSON.stringify(payload), + } as MessageEvent); + } + + emitError() { + this.onerror?.(new Event("error")); + } +} + +const makeResponse = (jobId: string, revision = `rev-${jobId}`) => ({ jobs: [{ id: jobId }], total: 1, byStatus: { @@ -25,6 +54,7 @@ const makeResponse = (jobId: string) => ({ skipped: 0, expired: 0, }, + revision, }); type Deferred = { @@ -44,14 +74,32 @@ describe("useOrchestratorData", () => { beforeEach(() => { vi.clearAllMocks(); vi.useRealTimers(); - vi.mocked(api.getJobs).mockResolvedValue(makeResponse("initial") as any); + MockEventSource.instances = []; + (globalThis as any).EventSource = MockEventSource; + Object.defineProperty(document, "visibilityState", { + configurable: true, + value: "visible", + }); + vi.mocked(api.getJobs).mockResolvedValue( + makeResponse("initial", "rev-initial") as any, + ); + vi.mocked(api.getJobsRevision).mockResolvedValue({ + revision: "rev-initial", + latestUpdatedAt: "2026-01-01T00:00:00.000Z", + total: 1, + statusFilter: null, + } as any); + vi.mocked(api.getJob).mockResolvedValue({ + id: "initial", + updatedAt: "2026-01-01T00:00:00.000Z", + } as any); vi.mocked(api.getPipelineStatus).mockResolvedValue({ isRunning: false, } as any); }); it("applies newest loadJobs response when requests resolve out of order", async () => { - const { result } = renderHook(() => useOrchestratorData()); + const { result } = renderHook(() => useOrchestratorData(null)); await waitFor(() => { expect((result.current.jobs[0] as any)?.id).toBe("initial"); @@ -85,47 +133,360 @@ describe("useOrchestratorData", () => { expect((result.current.jobs[0] as any)?.id).toBe("newest"); }); - it("pauses and resumes polling based on isRefreshPaused", async () => { + it("checks revision every 30s and skips full reload when unchanged", async () => { vi.useFakeTimers(); - vi.mocked(api.getJobs).mockResolvedValue(makeResponse("steady") as any); + vi.mocked(api.getJobs).mockResolvedValue( + makeResponse("steady", "rev-steady") as any, + ); + vi.mocked(api.getJobsRevision).mockResolvedValue({ + revision: "rev-steady", + latestUpdatedAt: "2026-01-01T00:00:00.000Z", + total: 1, + statusFilter: null, + } as any); - const { result } = renderHook(() => useOrchestratorData()); + renderHook(() => useOrchestratorData(null)); await act(async () => { await Promise.resolve(); }); expect(api.getJobs).toHaveBeenCalledTimes(1); - act(() => { - result.current.setIsRefreshPaused(true); + await act(async () => { + vi.advanceTimersByTime(30000); + await Promise.resolve(); }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(1); + expect(api.getJobs).toHaveBeenCalledTimes(1); + }); + + it("loads full list when revision changes", async () => { + vi.useFakeTimers(); + vi.mocked(api.getJobs) + .mockResolvedValueOnce(makeResponse("initial", "rev-initial") as any) + .mockResolvedValueOnce(makeResponse("newest", "rev-new") as any); + vi.mocked(api.getJobsRevision) + .mockResolvedValueOnce({ + revision: "rev-new", + latestUpdatedAt: "2026-01-02T00:00:00.000Z", + total: 1, + statusFilter: null, + } as any) + .mockResolvedValue({ + revision: "rev-new", + latestUpdatedAt: "2026-01-02T00:00:00.000Z", + total: 1, + statusFilter: null, + } as any); + + renderHook(() => useOrchestratorData(null)); + await act(async () => { await Promise.resolve(); }); - const pausedBaselineCalls = vi.mocked(api.getJobs).mock.calls.length; - await act(async () => { - vi.advanceTimersByTime(10000); + vi.advanceTimersByTime(30000); await Promise.resolve(); }); - - expect(api.getJobs).toHaveBeenCalledTimes(pausedBaselineCalls); - - act(() => { - result.current.setIsRefreshPaused(false); - }); - - const resumedBaselineCalls = vi.mocked(api.getJobs).mock.calls.length; - await act(async () => { - vi.advanceTimersByTime(10000); await Promise.resolve(); }); + expect(api.getJobs).toHaveBeenCalledTimes(2); + }); - expect(vi.mocked(api.getJobs).mock.calls.length).toBeGreaterThan( - resumedBaselineCalls, + it("triggers immediate revision checks on focus/online/visibility", async () => { + vi.useFakeTimers(); + vi.mocked(api.getJobs).mockResolvedValue( + makeResponse("initial", "rev-initial") as any, ); + + renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + vi.mocked(api.getJobsRevision).mockClear(); + + act(() => { + window.dispatchEvent(new Event("focus")); + }); + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(1); + + act(() => { + window.dispatchEvent(new Event("online")); + }); + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(2); + + Object.defineProperty(document, "visibilityState", { + configurable: true, + value: "hidden", + }); + act(() => { + document.dispatchEvent(new Event("visibilitychange")); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(2); + + Object.defineProperty(document, "visibilityState", { + configurable: true, + value: "visible", + }); + act(() => { + document.dispatchEvent(new Event("visibilitychange")); + }); + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(3); + }); + + it("suppresses interval checks while tab is hidden", async () => { + vi.useFakeTimers(); + vi.mocked(api.getJobs).mockResolvedValue( + makeResponse("initial", "rev-initial") as any, + ); + + Object.defineProperty(document, "visibilityState", { + configurable: true, + value: "hidden", + }); + + renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + vi.mocked(api.getJobsRevision).mockClear(); + + await act(async () => { + vi.advanceTimersByTime(30000); + await Promise.resolve(); + }); + expect(api.getJobsRevision).not.toHaveBeenCalled(); + + Object.defineProperty(document, "visibilityState", { + configurable: true, + value: "visible", + }); + act(() => { + document.dispatchEvent(new Event("visibilitychange")); + }); + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(1); + }); + + it("throttles revision checks while pipeline SSE is active", async () => { + vi.useFakeTimers(); + renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + vi.mocked(api.getJobsRevision).mockClear(); + + const sse = MockEventSource.instances[0]; + expect(sse).toBeTruthy(); + + act(() => { + sse.emitOpen(); + sse.emitMessage({ step: "crawling" }); + }); + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(1); + + act(() => { + sse.emitMessage({ step: "crawling" }); + }); + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(1); + + await act(async () => { + vi.advanceTimersByTime(2500); + await Promise.resolve(); + }); + + act(() => { + sse.emitMessage({ step: "crawling" }); + }); + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobsRevision).toHaveBeenCalledTimes(2); + }); + + it("forces a jobs reload on terminal pipeline SSE step", async () => { + vi.useFakeTimers(); + vi.mocked(api.getJobs) + .mockResolvedValueOnce(makeResponse("initial", "rev-initial") as any) + .mockResolvedValueOnce( + makeResponse("after-terminal", "rev-terminal") as any, + ); + + renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobs).toHaveBeenCalledTimes(1); + + const sse = MockEventSource.instances[0]; + act(() => { + sse.emitOpen(); + sse.emitMessage({ + step: "completed", + startedAt: "2026-01-01T00:00:00.000Z", + completedAt: "2026-01-01T00:05:00.000Z", + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(api.getJobs).toHaveBeenCalledTimes(2); + }); + + it("falls back to polling pipeline status when SSE disconnects", async () => { + vi.useFakeTimers(); + renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + vi.mocked(api.getPipelineStatus).mockClear(); + + const sse = MockEventSource.instances[0]; + act(() => { + sse.emitOpen(); + sse.emitError(); + }); + + await act(async () => { + vi.advanceTimersByTime(30000); + await Promise.resolve(); + }); + + expect(api.getPipelineStatus).toHaveBeenCalledTimes(1); + }); + + it("runs a safety full refresh every 10 minutes when visible", async () => { + vi.useFakeTimers(); + vi.mocked(api.getJobs).mockResolvedValue( + makeResponse("steady", "rev-steady") as any, + ); + vi.mocked(api.getJobsRevision).mockResolvedValue({ + revision: "rev-steady", + latestUpdatedAt: "2026-01-01T00:00:00.000Z", + total: 1, + statusFilter: null, + } as any); + + renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + expect(api.getJobs).toHaveBeenCalledTimes(1); + + await act(async () => { + vi.advanceTimersByTime(600000); + await Promise.resolve(); + }); + expect(api.getJobs).toHaveBeenCalledTimes(2); + + Object.defineProperty(document, "visibilityState", { + configurable: true, + value: "hidden", + }); + await act(async () => { + vi.advanceTimersByTime(600000); + await Promise.resolve(); + }); + expect(api.getJobs).toHaveBeenCalledTimes(2); + }); + + it("closes pipeline SSE connection on unmount", async () => { + const { unmount } = renderHook(() => useOrchestratorData(null)); + + await act(async () => { + await Promise.resolve(); + }); + + const sse = MockEventSource.instances[0]; + expect(sse.close).not.toHaveBeenCalled(); + + unmount(); + + expect(sse.close).toHaveBeenCalledTimes(1); + }); + + it("loads full selected job details on demand", async () => { + vi.mocked(api.getJobs).mockResolvedValue({ + jobs: [ + { + id: "job-1", + title: "Role", + employer: "Acme", + source: "manual", + jobUrl: "https://example.com/job-1", + applicationLink: null, + datePosted: null, + deadline: null, + salary: null, + location: null, + status: "discovered", + suitabilityScore: null, + sponsorMatchScore: null, + jobType: null, + jobFunction: null, + salaryMinAmount: null, + salaryMaxAmount: null, + salaryCurrency: null, + discoveredAt: "2026-01-01T00:00:00.000Z", + appliedAt: null, + updatedAt: "2026-01-01T00:00:00.000Z", + }, + ], + total: 1, + byStatus: { + discovered: 1, + processing: 0, + ready: 0, + applied: 0, + skipped: 0, + expired: 0, + }, + revision: "rev-job-1", + } as any); + vi.mocked(api.getJob).mockResolvedValue({ + id: "job-1", + title: "Role", + employer: "Acme", + status: "discovered", + updatedAt: "2026-01-01T00:00:00.000Z", + } as any); + + const { result } = renderHook(() => useOrchestratorData("job-1")); + + await waitFor(() => { + expect(api.getJobs).toHaveBeenCalledWith({ view: "list" }); + }); + + await waitFor(() => { + expect(api.getJob).toHaveBeenCalledWith("job-1"); + expect((result.current.selectedJob as any)?.id).toBe("job-1"); + }); }); }); diff --git a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts index 964501f..bba8e9c 100644 --- a/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts +++ b/orchestrator/src/client/pages/orchestrator/useOrchestratorData.ts @@ -1,4 +1,4 @@ -import type { Job, JobStatus } from "@shared/types"; +import type { Job, JobListItem, JobStatus } from "@shared/types"; import { useCallback, useEffect, useRef, useState } from "react"; import { toast } from "sonner"; import * as api from "../../api"; @@ -12,26 +12,120 @@ const initialStats: Record = { expired: 0, }; -export const useOrchestratorData = () => { - const [jobs, setJobs] = useState([]); +const isDocumentVisible = () => + typeof document === "undefined" || document.visibilityState === "visible"; + +type PipelineProgressStep = + | "idle" + | "crawling" + | "importing" + | "scoring" + | "processing" + | "completed" + | "cancelled" + | "failed"; + +type PipelineProgressEvent = { + step: PipelineProgressStep; + startedAt?: string; + completedAt?: string; + error?: string; +}; + +type PipelineTerminalStatus = "completed" | "cancelled" | "failed"; + +type PipelineTerminalEvent = { + status: PipelineTerminalStatus; + errorMessage: string | null; + token: number; +}; + +const ACTIVE_PIPELINE_STEPS: ReadonlySet = new Set([ + "crawling", + "importing", + "scoring", + "processing", +]); + +const TERMINAL_PIPELINE_STEPS: ReadonlySet = new Set([ + "completed", + "cancelled", + "failed", +]); + +export const useOrchestratorData = (selectedJobId: string | null) => { + const [jobListItems, setJobListItems] = useState([]); + const [selectedJob, setSelectedJob] = useState(null); const [stats, setStats] = useState>(initialStats); const [isLoading, setIsLoading] = useState(true); const [isPipelineRunning, setIsPipelineRunning] = useState(false); + const [isPipelineSseConnected, setIsPipelineSseConnected] = useState(false); + const [pipelineTerminalEvent, setPipelineTerminalEvent] = + useState(null); const [isRefreshPaused, setIsRefreshPaused] = useState(false); const requestSeqRef = useRef(0); const latestAppliedSeqRef = useRef(0); const pendingLoadCountRef = useRef(0); + const selectedJobRequestSeqRef = useRef(0); + const selectedJobCacheRef = useRef>(new Map()); + const lastRevisionRef = useRef(null); + const lastSseRefreshAtRef = useRef(0); + const lastTerminalSignatureRef = useRef(null); + const lastTerminalNotificationKeyRef = useRef(null); + const terminalEventTokenRef = useRef(0); + + const publishPipelineTerminal = useCallback( + ( + status: PipelineTerminalStatus, + errorMessage: string | null, + dedupeKey: string, + ) => { + if (dedupeKey === lastTerminalNotificationKeyRef.current) return; + lastTerminalNotificationKeyRef.current = dedupeKey; + terminalEventTokenRef.current += 1; + setPipelineTerminalEvent({ + status, + errorMessage, + token: terminalEventTokenRef.current, + }); + }, + [], + ); + + const loadSelectedJob = useCallback( + async (jobId: string) => { + const seq = ++selectedJobRequestSeqRef.current; + try { + const fullJob = await api.getJob(jobId); + selectedJobCacheRef.current.set(jobId, fullJob); + if ( + selectedJobId === jobId && + seq === selectedJobRequestSeqRef.current + ) { + setSelectedJob(fullJob); + } + } catch (error) { + const message = + error instanceof Error + ? error.message + : "Failed to load selected job details"; + toast.error(message); + } + }, + [selectedJobId], + ); const loadJobs = useCallback(async () => { const seq = ++requestSeqRef.current; pendingLoadCountRef.current += 1; try { setIsLoading(true); - const data = await api.getJobs(); + const data = await api.getJobs({ view: "list" }); if (seq >= latestAppliedSeqRef.current) { latestAppliedSeqRef.current = seq; - setJobs(data.jobs); + setJobListItems(data.jobs); setStats(data.byStatus); + lastRevisionRef.current = data.revision; } } catch (error) { const message = @@ -52,33 +146,199 @@ export const useOrchestratorData = () => { try { const status = await api.getPipelineStatus(); setIsPipelineRunning(status.isRunning); + const terminalStatus = status.lastRun?.status; + if ( + status.isRunning || + !terminalStatus || + !TERMINAL_PIPELINE_STEPS.has(terminalStatus as PipelineProgressStep) + ) { + return; + } + publishPipelineTerminal( + terminalStatus as PipelineTerminalStatus, + status.lastRun?.errorMessage ?? null, + `status:${status.lastRun?.id ?? "unknown"}:${terminalStatus}:${status.lastRun?.completedAt ?? ""}`, + ); } catch { // Ignore errors } - }, []); + }, [publishPipelineTerminal]); + + const checkForJobChanges = useCallback(async () => { + if (isRefreshPaused || !isDocumentVisible()) return; + try { + const revision = await api.getJobsRevision(); + const previousRevision = lastRevisionRef.current; + if (previousRevision === null) { + lastRevisionRef.current = revision.revision; + return; + } + if (revision.revision !== previousRevision) { + await loadJobs(); + } + } catch { + // Ignore errors + } + }, [isRefreshPaused, loadJobs]); useEffect(() => { - loadJobs(); - checkPipelineStatus(); + void loadJobs(); + void checkPipelineStatus(); + }, [checkPipelineStatus, loadJobs]); + useEffect(() => { const interval = setInterval(() => { - if (isRefreshPaused) return; - loadJobs(); - checkPipelineStatus(); - }, 10000); + if (!isDocumentVisible() || isRefreshPaused) return; + void checkForJobChanges(); + }, 30000); return () => clearInterval(interval); - }, [loadJobs, checkPipelineStatus, isRefreshPaused]); + }, [checkForJobChanges, isRefreshPaused]); + + useEffect(() => { + const interval = setInterval(() => { + if (!isDocumentVisible() || isRefreshPaused) return; + void loadJobs(); + }, 600000); + + return () => clearInterval(interval); + }, [isRefreshPaused, loadJobs]); + + useEffect(() => { + if (typeof window === "undefined") return; + + const refreshFromVisibilitySignal = () => { + if (!isDocumentVisible() || isRefreshPaused) return; + void checkForJobChanges(); + }; + + const onVisibilityChange = () => { + if (!isDocumentVisible()) return; + refreshFromVisibilitySignal(); + }; + + window.addEventListener("focus", refreshFromVisibilitySignal); + window.addEventListener("online", refreshFromVisibilitySignal); + document.addEventListener("visibilitychange", onVisibilityChange); + + return () => { + window.removeEventListener("focus", refreshFromVisibilitySignal); + window.removeEventListener("online", refreshFromVisibilitySignal); + document.removeEventListener("visibilitychange", onVisibilityChange); + }; + }, [checkForJobChanges, isRefreshPaused]); + + useEffect(() => { + if (typeof EventSource === "undefined") return; + + const eventSource = new EventSource("/api/pipeline/progress"); + + eventSource.onopen = () => { + setIsPipelineSseConnected(true); + }; + + eventSource.onmessage = (event) => { + let payload: unknown; + try { + payload = JSON.parse(event.data); + } catch { + return; + } + + if (!payload || typeof payload !== "object") return; + const step = (payload as { step?: unknown }).step; + if (typeof step !== "string") return; + if ( + !ACTIVE_PIPELINE_STEPS.has(step as PipelineProgressStep) && + !TERMINAL_PIPELINE_STEPS.has(step as PipelineProgressStep) && + step !== "idle" + ) { + return; + } + + const typedStep = step as PipelineProgressStep; + setIsPipelineRunning(ACTIVE_PIPELINE_STEPS.has(typedStep)); + + if (ACTIVE_PIPELINE_STEPS.has(typedStep)) { + const now = Date.now(); + if (now - lastSseRefreshAtRef.current >= 2500) { + lastSseRefreshAtRef.current = now; + void checkForJobChanges(); + } + return; + } + + if (TERMINAL_PIPELINE_STEPS.has(typedStep)) { + const eventPayload = payload as PipelineProgressEvent; + const terminalSignature = `${typedStep}:${eventPayload.startedAt ?? ""}:${ + eventPayload.completedAt ?? "" + }`; + if (terminalSignature === lastTerminalSignatureRef.current) return; + lastTerminalSignatureRef.current = terminalSignature; + publishPipelineTerminal( + typedStep as PipelineTerminalStatus, + eventPayload.error ?? null, + `sse:${terminalSignature}`, + ); + void loadJobs(); + } + }; + + eventSource.onerror = () => { + setIsPipelineSseConnected(false); + }; + + return () => { + eventSource.close(); + }; + }, [checkForJobChanges, loadJobs, publishPipelineTerminal]); + + useEffect(() => { + if (isPipelineSseConnected) return; + + const interval = setInterval(() => { + if (!isDocumentVisible() || isRefreshPaused) return; + void checkPipelineStatus(); + }, 30000); + + return () => clearInterval(interval); + }, [checkPipelineStatus, isPipelineSseConnected, isRefreshPaused]); + + useEffect(() => { + if (!selectedJobId) { + setSelectedJob(null); + return; + } + + const selectedJobListItem = jobListItems.find( + (job) => job.id === selectedJobId, + ); + if (!selectedJobListItem) { + setSelectedJob(null); + return; + } + + const cached = selectedJobCacheRef.current.get(selectedJobId); + if (cached && cached.updatedAt === selectedJobListItem.updatedAt) { + setSelectedJob(cached); + return; + } + + void loadSelectedJob(selectedJobId); + }, [jobListItems, loadSelectedJob, selectedJobId]); return { - jobs, + jobs: jobListItems, + selectedJob, stats, isLoading, isPipelineRunning, setIsPipelineRunning, + pipelineTerminalEvent, isRefreshPaused, setIsRefreshPaused, loadJobs, + checkForJobChanges, checkPipelineStatus, }; }; diff --git a/orchestrator/src/client/pages/orchestrator/utils.ts b/orchestrator/src/client/pages/orchestrator/utils.ts index 6749d03..ea0a756 100644 --- a/orchestrator/src/client/pages/orchestrator/utils.ts +++ b/orchestrator/src/client/pages/orchestrator/utils.ts @@ -1,4 +1,4 @@ -import type { AppSettings, Job, JobSource } from "@shared/types"; +import type { AppSettings, JobListItem, JobSource } from "@shared/types"; import type { FilterTab, JobSort } from "./constants"; import { orderedFilterSources, orderedSources } from "./constants"; @@ -13,7 +13,7 @@ const compareString = (a: string, b: string) => const compareNumber = (a: number, b: number) => a - b; export const parseSalaryBounds = ( - job: Job, + job: JobListItem, ): { min: number; max: number } | null => { if ( typeof job.salaryMinAmount === "number" && @@ -52,7 +52,7 @@ export const parseSalaryBounds = ( return { min: Math.min(...values), max: Math.max(...values) }; }; -export const compareJobs = (a: Job, b: Job, sort: JobSort) => { +export const compareJobs = (a: JobListItem, b: JobListItem, sort: JobSort) => { let value = 0; switch (sort.key) { @@ -110,7 +110,7 @@ export const compareJobs = (a: Job, b: Job, sort: JobSort) => { return a.id.localeCompare(b.id); }; -export const jobMatchesQuery = (job: Job, query: string) => { +export const jobMatchesQuery = (job: JobListItem, query: string) => { const normalized = query.trim().toLowerCase(); if (!normalized) return true; const haystack = [ @@ -128,7 +128,9 @@ export const jobMatchesQuery = (job: Job, query: string) => { return haystack.includes(normalized); }; -export const getJobCounts = (jobs: Job[]): Record => { +export const getJobCounts = ( + jobs: JobListItem[], +): Record => { const byTab: Record = { ready: 0, discovered: 0, @@ -146,7 +148,7 @@ export const getJobCounts = (jobs: Job[]): Record => { return byTab; }; -export const getSourcesWithJobs = (jobs: Job[]): JobSource[] => { +export const getSourcesWithJobs = (jobs: JobListItem[]): JobSource[] => { const seen = new Set(); for (const job of jobs) { seen.add(job.source); diff --git a/orchestrator/src/server/api/routes/jobs.test.ts b/orchestrator/src/server/api/routes/jobs.test.ts index c757d02..03eb7e3 100644 --- a/orchestrator/src/server/api/routes/jobs.test.ts +++ b/orchestrator/src/server/api/routes/jobs.test.ts @@ -31,10 +31,100 @@ describe.sequential("Jobs API routes", () => { expect(listBody.ok).toBe(true); expect(listBody.data.total).toBe(1); expect(listBody.data.jobs[0].id).toBe(job.id); + expect(typeof listBody.data.revision).toBe("string"); const filteredRes = await fetch(`${baseUrl}/api/jobs?status=skipped`); const filteredBody = await filteredRes.json(); expect(filteredBody.data.total).toBe(0); + expect(typeof filteredBody.data.revision).toBe("string"); + }); + + it("supports lightweight and full jobs list views", async () => { + const { createJob } = await import("../../repositories/jobs"); + await createJob({ + source: "manual", + title: "List View Role", + employer: "Acme", + jobUrl: "https://example.com/job/list-view", + jobDescription: "Heavy description that should not be in list mode", + }); + + const listRes = await fetch(`${baseUrl}/api/jobs?view=list`); + const listBody = await listRes.json(); + expect(listRes.status).toBe(200); + expect(listBody.ok).toBe(true); + expect(typeof listBody.meta.requestId).toBe("string"); + expect(listBody.data.jobs[0].id).toBeTruthy(); + expect(listBody.data.jobs[0].title).toBe("List View Role"); + expect(listBody.data.jobs[0]).not.toHaveProperty("jobDescription"); + expect(typeof listBody.data.revision).toBe("string"); + + const fullRes = await fetch(`${baseUrl}/api/jobs?view=full`); + const fullBody = await fullRes.json(); + expect(fullRes.status).toBe(200); + expect(fullBody.ok).toBe(true); + expect(fullBody.data.jobs[0].title).toBe("List View Role"); + expect(fullBody.data.jobs[0]).toHaveProperty("jobDescription"); + expect(typeof fullBody.data.revision).toBe("string"); + + const defaultRes = await fetch(`${baseUrl}/api/jobs`); + const defaultBody = await defaultRes.json(); + expect(defaultRes.status).toBe(200); + expect(defaultBody.ok).toBe(true); + expect(defaultBody.data.jobs[0]).not.toHaveProperty("jobDescription"); + expect(typeof defaultBody.data.revision).toBe("string"); + }); + + it("returns jobs revision and supports status filtering", async () => { + const { createJob, updateJob } = await import("../../repositories/jobs"); + const readyJob = await createJob({ + source: "manual", + title: "Ready Role", + employer: "Acme", + jobUrl: "https://example.com/job/revision-ready", + jobDescription: "Ready description", + }); + const appliedJob = await createJob({ + source: "manual", + title: "Applied Role", + employer: "Beta", + jobUrl: "https://example.com/job/revision-applied", + jobDescription: "Applied description", + }); + await updateJob(readyJob.id, { status: "ready" }); + await updateJob(appliedJob.id, { status: "applied" }); + + const allRes = await fetch(`${baseUrl}/api/jobs/revision`); + const allBody = await allRes.json(); + + expect(allRes.status).toBe(200); + expect(allBody.ok).toBe(true); + expect(typeof allBody.meta.requestId).toBe("string"); + expect(typeof allBody.data.revision).toBe("string"); + expect(allBody.data.total).toBe(2); + expect(allBody.data.latestUpdatedAt).toBeTruthy(); + expect(allBody.data.statusFilter).toBeNull(); + + const filteredRes = await fetch( + `${baseUrl}/api/jobs/revision?status=applied,ready`, + ); + const filteredBody = await filteredRes.json(); + + expect(filteredRes.status).toBe(200); + expect(filteredBody.ok).toBe(true); + expect(filteredBody.data.total).toBe(2); + expect(filteredBody.data.statusFilter).toBe("applied,ready"); + expect(typeof filteredBody.data.revision).toBe("string"); + }); + + it("rejects invalid jobs list view query", async () => { + const res = await fetch(`${baseUrl}/api/jobs?view=compact`); + const body = await res.json(); + + expect(res.status).toBe(400); + expect(body.ok).toBe(false); + expect(body.error.code).toBe("INVALID_REQUEST"); + expect(typeof body.meta.requestId).toBe("string"); }); it("returns 404 for missing jobs", async () => { diff --git a/orchestrator/src/server/api/routes/jobs.ts b/orchestrator/src/server/api/routes/jobs.ts index ac69623..b20c344 100644 --- a/orchestrator/src/server/api/routes/jobs.ts +++ b/orchestrator/src/server/api/routes/jobs.ts @@ -4,13 +4,14 @@ import { sanitizeWebhookPayload } from "@infra/sanitize"; import { APPLICATION_OUTCOMES, APPLICATION_STAGES, - type ApiResponse, type BulkJobAction, type BulkJobActionResponse, type BulkJobActionResult, type Job, + type JobListItem, type JobStatus, type JobsListResponse, + type JobsRevisionResponse, } from "@shared/types"; import { type Request, type Response, Router } from "express"; import { z } from "zod"; @@ -181,11 +182,27 @@ const bulkActionRequestSchema = z.object({ jobIds: z.array(z.string().min(1)).min(1).max(100), }); +const listJobsQuerySchema = z.object({ + status: z.string().optional(), + view: z.enum(["full", "list"]).optional(), +}); + +const jobsRevisionQuerySchema = z.object({ + status: z.string().optional(), +}); + const SKIPPABLE_STATUSES: ReadonlySet = new Set([ "discovered", "ready", ]); +function parseStatusFilter(statusFilter?: string): JobStatus[] | undefined { + const parsed = statusFilter?.split(",").filter(Boolean) as + | JobStatus[] + | undefined; + return parsed && parsed.length > 0 ? parsed : undefined; +} + function mapErrorForResult(error: unknown): { code: string; message: string; @@ -339,27 +356,102 @@ async function executeBulkActionForJob( */ jobsRouter.get("/", async (req: Request, res: Response) => { try { - const statusFilter = req.query.status as string | undefined; - const statuses = statusFilter?.split(",").filter(Boolean) as - | JobStatus[] - | undefined; + const parsedQuery = listJobsQuerySchema.safeParse(req.query); + if (!parsedQuery.success) { + return fail( + res, + badRequest( + "Invalid jobs list query parameters", + parsedQuery.error.flatten(), + ), + ); + } - const jobs = await jobsRepo.getAllJobs(statuses); + const statusFilter = parsedQuery.data.status; + const statuses = parseStatusFilter(statusFilter); + const view = parsedQuery.data.view ?? "list"; + + const jobs: Array = + view === "list" + ? await jobsRepo.getJobListItems(statuses) + : await jobsRepo.getAllJobs(statuses); const stats = await jobsRepo.getJobStats(); + const revision = await jobsRepo.getJobsRevision(statuses); - const response: ApiResponse = { - ok: true, - data: { - jobs, - total: jobs.length, - byStatus: stats, - }, + const response: JobsListResponse = { + jobs, + total: jobs.length, + byStatus: stats, + revision: revision.revision, }; - res.json(response); + logger.info("Jobs list fetched", { + route: "GET /api/jobs", + view, + statusFilter: statusFilter ?? null, + revision: revision.revision, + returnedCount: jobs.length, + }); + + ok(res, response); } catch (error) { - const message = error instanceof Error ? error.message : "Unknown error"; - res.status(500).json({ success: false, error: message }); + const err = + error instanceof AppError + ? error + : new AppError({ + status: 500, + code: "INTERNAL_ERROR", + message: error instanceof Error ? error.message : "Unknown error", + }); + fail(res, err); + } +}); + +/** + * GET /api/jobs/revision - Get jobs list revision for lightweight change detection + * Query params: status (comma-separated list of statuses to filter) + */ +jobsRouter.get("/revision", async (req: Request, res: Response) => { + try { + const parsedQuery = jobsRevisionQuerySchema.safeParse(req.query); + if (!parsedQuery.success) { + return fail( + res, + badRequest( + "Invalid jobs revision query parameters", + parsedQuery.error.flatten(), + ), + ); + } + + const statuses = parseStatusFilter(parsedQuery.data.status); + const revision = await jobsRepo.getJobsRevision(statuses); + + const response: JobsRevisionResponse = { + revision: revision.revision, + latestUpdatedAt: revision.latestUpdatedAt, + total: revision.total, + statusFilter: revision.statusFilter, + }; + + logger.info("Jobs revision fetched", { + route: "GET /api/jobs/revision", + statusFilter: revision.statusFilter, + revision: revision.revision, + total: revision.total, + }); + + ok(res, response); + } catch (error) { + const err = + error instanceof AppError + ? error + : new AppError({ + status: 500, + code: "INTERNAL_ERROR", + message: error instanceof Error ? error.message : "Unknown error", + }); + fail(res, err); } }); diff --git a/orchestrator/src/server/db/migrate.ts b/orchestrator/src/server/db/migrate.ts index 2b02378..b39af59 100644 --- a/orchestrator/src/server/db/migrate.ts +++ b/orchestrator/src/server/db/migrate.ts @@ -197,6 +197,7 @@ const migrations = [ `CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)`, `CREATE INDEX IF NOT EXISTS idx_jobs_discovered_at ON jobs(discovered_at)`, + `CREATE INDEX IF NOT EXISTS idx_jobs_status_discovered_at ON jobs(status, discovered_at)`, `CREATE INDEX IF NOT EXISTS idx_pipeline_runs_started_at ON pipeline_runs(started_at)`, `CREATE INDEX IF NOT EXISTS idx_stage_events_application_id ON stage_events(application_id)`, `CREATE INDEX IF NOT EXISTS idx_stage_events_occurred_at ON stage_events(occurred_at)`, @@ -240,6 +241,16 @@ for (const migration of migrations) { continue; } + // Optional performance-only migration: if this fails we should still boot + // existing databases and continue without the index. + const isOptionalOptimizationMigration = migration.includes( + "idx_jobs_status_discovered_at", + ); + if (isOptionalOptimizationMigration) { + console.warn("⚠️ Optional migration skipped:", message); + continue; + } + console.error("❌ Migration failed:", error); process.exit(1); } diff --git a/orchestrator/src/server/repositories/jobs.ts b/orchestrator/src/server/repositories/jobs.ts index a3a69fc..0444bd9 100644 --- a/orchestrator/src/server/repositories/jobs.ts +++ b/orchestrator/src/server/repositories/jobs.ts @@ -6,7 +6,9 @@ import { randomUUID } from "node:crypto"; import type { CreateJobInput, Job, + JobListItem, JobStatus, + JobsRevisionResponse, UpdateJobInput, } from "@shared/types"; import { and, desc, eq, inArray, isNull, lt, ne, sql } from "drizzle-orm"; @@ -14,6 +16,11 @@ import { db, schema } from "../db/index"; const { jobs } = schema; +function normalizeStatusFilter(statuses?: JobStatus[]): string | null { + if (!statuses || statuses.length === 0) return null; + return Array.from(new Set(statuses)).sort().join(","); +} + /** * Get all jobs, optionally filtered by status. */ @@ -31,6 +38,87 @@ export async function getAllJobs(statuses?: JobStatus[]): Promise { return rows.map(mapRowToJob); } +/** + * Get lightweight list items for jobs, optionally filtered by status. + */ +export async function getJobListItems( + statuses?: JobStatus[], +): Promise { + const selection = { + id: jobs.id, + source: jobs.source, + title: jobs.title, + employer: jobs.employer, + jobUrl: jobs.jobUrl, + applicationLink: jobs.applicationLink, + datePosted: jobs.datePosted, + deadline: jobs.deadline, + salary: jobs.salary, + location: jobs.location, + status: jobs.status, + suitabilityScore: jobs.suitabilityScore, + sponsorMatchScore: jobs.sponsorMatchScore, + jobType: jobs.jobType, + jobFunction: jobs.jobFunction, + salaryMinAmount: jobs.salaryMinAmount, + salaryMaxAmount: jobs.salaryMaxAmount, + salaryCurrency: jobs.salaryCurrency, + discoveredAt: jobs.discoveredAt, + appliedAt: jobs.appliedAt, + updatedAt: jobs.updatedAt, + } as const; + + const query = + statuses && statuses.length > 0 + ? db + .select(selection) + .from(jobs) + .where(inArray(jobs.status, statuses)) + .orderBy(desc(jobs.discoveredAt)) + : db.select(selection).from(jobs).orderBy(desc(jobs.discoveredAt)); + + const rows = await query; + return rows.map((row) => ({ + ...row, + source: row.source as JobListItem["source"], + status: row.status as JobStatus, + })); +} + +/** + * Get a lightweight revision token for jobs list invalidation. + */ +export async function getJobsRevision( + statuses?: JobStatus[], +): Promise { + const statusFilter = normalizeStatusFilter(statuses); + const whereClause = + statuses && statuses.length > 0 + ? inArray(jobs.status, statuses) + : undefined; + + const baseQuery = db + .select({ + latestUpdatedAt: sql`max(${jobs.updatedAt})`, + total: sql`count(*)`, + }) + .from(jobs); + const [row] = whereClause + ? await baseQuery.where(whereClause) + : await baseQuery; + + const latestUpdatedAt = row?.latestUpdatedAt ?? null; + const total = row?.total ?? 0; + const revision = `${latestUpdatedAt ?? "none"}:${total}:${statusFilter ?? "all"}`; + + return { + revision, + latestUpdatedAt, + total, + statusFilter, + }; +} + /** * Get a single job by ID. */ diff --git a/shared/src/types.ts b/shared/src/types.ts index 762bcad..10e09ae 100644 --- a/shared/src/types.ts +++ b/shared/src/types.ts @@ -197,6 +197,31 @@ export interface Job { updatedAt: string; } +export type JobListItem = Pick< + Job, + | "id" + | "source" + | "title" + | "employer" + | "jobUrl" + | "applicationLink" + | "datePosted" + | "deadline" + | "salary" + | "location" + | "status" + | "suitabilityScore" + | "sponsorMatchScore" + | "jobType" + | "jobFunction" + | "salaryMinAmount" + | "salaryMaxAmount" + | "salaryCurrency" + | "discoveredAt" + | "appliedAt" + | "updatedAt" +>; + export interface CreateJobInput { source: JobSource; title: string; @@ -333,10 +358,18 @@ export type ApiResponse = meta: ApiMeta; }; -export interface JobsListResponse { - jobs: Job[]; +export interface JobsListResponse { + jobs: TJob[]; total: number; byStatus: Record; + revision: string; +} + +export interface JobsRevisionResponse { + revision: string; + latestUpdatedAt: string | null; + total: number; + statusFilter: string | null; } export type BulkJobAction = "skip" | "move_to_ready" | "rescore";