/** * Job repository - data access layer for jobs. */ import { randomUUID } from "node:crypto"; import { getJobOwnerProfileId } from "@infra/request-context"; import { DEFAULT_JOB_OWNER_PROFILE_ID } from "@server/infra/job-owner-context"; import { buildJobContentFingerprint } from "@shared/job-fingerprint"; import { canonicalizeJobUrl } from "@shared/job-url-canonical"; import type { CreateJobInput, Job, JobListItem, JobStatus, JobsRevisionResponse, UpdateJobInput, } from "@shared/types"; import { and, desc, eq, inArray, isNull, lt, ne, sql } from "drizzle-orm"; import { db, schema } from "../db/index"; const { jobs } = schema; function normalizeCreateJobInputForDedup( input: CreateJobInput, ): CreateJobInput { const jobUrl = canonicalizeJobUrl(input.jobUrl); if (jobUrl === input.jobUrl) return input; return { ...input, jobUrl }; } function sourceJobKey(source: string, sourceJobId: string): string { return `${source}\0${sourceJobId}`; } function resolveOwnerForCreate(input: CreateJobInput): string { const fromInput = input.ownerProfileId?.trim(); if (fromInput) return fromInput; return getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID; } async function loadJobDedupIndexes(ownerProfileId: string): Promise<{ existingCanonicalSet: Set; existingSourceJobKeySet: Set; existingContentFingerprintSet: Set; }> { const rows = await db .select({ jobUrl: jobs.jobUrl, source: jobs.source, sourceJobId: jobs.sourceJobId, contentFingerprint: jobs.contentFingerprint, employer: jobs.employer, title: jobs.title, }) .from(jobs) .where(eq(jobs.ownerProfileId, ownerProfileId)); const existingCanonicalSet = new Set( rows.map((r) => canonicalizeJobUrl(r.jobUrl)), ); const existingSourceJobKeySet = new Set( rows .filter( (r) => r.sourceJobId != null && String(r.sourceJobId).trim().length > 0, ) .map((r) => sourceJobKey(r.source, String(r.sourceJobId))), ); // Cross-source dedup: prefer the persisted fingerprint, but fall back to // recomputing it from (employer, title) so legacy rows participate in // dedup until they're rewritten. const existingContentFingerprintSet = new Set(); for (const row of rows) { const stored = row.contentFingerprint?.trim(); if (stored) { existingContentFingerprintSet.add(stored); continue; } const recomputed = buildJobContentFingerprint({ employer: row.employer, title: row.title, }); if (recomputed) { existingContentFingerprintSet.add(recomputed); } } return { existingCanonicalSet, existingSourceJobKeySet, existingContentFingerprintSet, }; } async function findJobByCanonicalUrl( canonical: string, ownerProfileId: string, ): Promise { const [exact] = await db .select() .from(jobs) .where( and(eq(jobs.ownerProfileId, ownerProfileId), eq(jobs.jobUrl, canonical)), ); if (exact) return mapRowToJob(exact); const allRows = await db .select() .from(jobs) .where(eq(jobs.ownerProfileId, ownerProfileId)); for (const row of allRows) { if (canonicalizeJobUrl(row.jobUrl) === canonical) { return mapRowToJob(row); } } return null; } async function findJobByContentFingerprint( fingerprint: string, ownerProfileId: string, ): Promise { // Fast path: stored fingerprint match. const [stored] = await db .select() .from(jobs) .where( and( eq(jobs.ownerProfileId, ownerProfileId), eq(jobs.contentFingerprint, fingerprint), ), ) .limit(1); if (stored) return mapRowToJob(stored); // Fallback for legacy rows without a persisted fingerprint: scan and // recompute. Owner-scoped table size keeps this cheap in practice. const allRows = await db .select() .from(jobs) .where( and( eq(jobs.ownerProfileId, ownerProfileId), isNull(jobs.contentFingerprint), ), ); for (const row of allRows) { const recomputed = buildJobContentFingerprint({ employer: row.employer, title: row.title, }); if (recomputed === fingerprint) { return mapRowToJob(row); } } return null; } async function getJobBySourceAndExternalId( source: string, sourceJobId: string, ownerProfileId: string, ): Promise { const [row] = await db .select() .from(jobs) .where( and( eq(jobs.ownerProfileId, ownerProfileId), eq(jobs.source, source), eq(jobs.sourceJobId, sourceJobId), ), ); return row ? mapRowToJob(row) : null; } function normalizeStatusFilter(statuses?: JobStatus[]): string | null { if (!statuses || statuses.length === 0) return null; return Array.from(new Set(statuses)).sort().join(","); } /** * Get all jobs, optionally filtered by status. */ export async function getAllJobs( statuses?: JobStatus[], ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const ownerClause = eq(jobs.ownerProfileId, ownerProfileId); const query = statuses && statuses.length > 0 ? db .select() .from(jobs) .where(and(ownerClause, inArray(jobs.status, statuses))) .orderBy(desc(jobs.discoveredAt)) : db .select() .from(jobs) .where(ownerClause) .orderBy(desc(jobs.discoveredAt)); const rows = await query; return rows.map(mapRowToJob); } /** * Get lightweight list items for jobs, optionally filtered by status. */ export async function getJobListItems( statuses?: JobStatus[], ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const ownerClause = eq(jobs.ownerProfileId, ownerProfileId); const selection = { id: jobs.id, source: jobs.source, title: jobs.title, employer: jobs.employer, jobUrl: jobs.jobUrl, applicationLink: jobs.applicationLink, datePosted: jobs.datePosted, deadline: jobs.deadline, salary: jobs.salary, location: jobs.location, status: jobs.status, outcome: jobs.outcome, closedAt: jobs.closedAt, suitabilityScore: jobs.suitabilityScore, sponsorMatchScore: jobs.sponsorMatchScore, jobType: jobs.jobType, jobFunction: jobs.jobFunction, salaryMinAmount: jobs.salaryMinAmount, salaryMaxAmount: jobs.salaryMaxAmount, salaryCurrency: jobs.salaryCurrency, isRemote: jobs.isRemote, discoveredAt: jobs.discoveredAt, appliedAt: jobs.appliedAt, updatedAt: jobs.updatedAt, } as const; const query = statuses && statuses.length > 0 ? db .select(selection) .from(jobs) .where(and(ownerClause, inArray(jobs.status, statuses))) .orderBy(desc(jobs.discoveredAt)) : db .select(selection) .from(jobs) .where(ownerClause) .orderBy(desc(jobs.discoveredAt)); const rows = await query; return rows.map((row) => ({ ...row, source: row.source as JobListItem["source"], status: row.status as JobStatus, })); } /** * Get a lightweight revision token for jobs list invalidation. */ export async function getJobsRevision( statuses?: JobStatus[], ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const statusFilter = normalizeStatusFilter(statuses); const ownerClause = eq(jobs.ownerProfileId, ownerProfileId); const statusClause = statuses && statuses.length > 0 ? inArray(jobs.status, statuses) : undefined; const baseQuery = db .select({ latestUpdatedAt: sql`max(${jobs.updatedAt})`, total: sql`count(*)`, }) .from(jobs); const [row] = statusClause ? await baseQuery.where(and(ownerClause, statusClause)) : await baseQuery.where(ownerClause); const latestUpdatedAt = row?.latestUpdatedAt ?? null; const total = row?.total ?? 0; const revision = `${latestUpdatedAt ?? "none"}:${total}:${statusFilter ?? "all"}`; return { revision, latestUpdatedAt, total, statusFilter, }; } /** * Get a single job by ID (scoped to the owning search profile). */ export async function getJobById( id: string, ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const [row] = await db .select() .from(jobs) .where(and(eq(jobs.id, id), eq(jobs.ownerProfileId, ownerProfileId))); return row ? mapRowToJob(row) : null; } export async function listJobSummariesByIds( jobIds: string[], ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise< Array<{ id: string; title: string; employer: string; }> > { if (jobIds.length === 0) return []; return db .select({ id: jobs.id, title: jobs.title, employer: jobs.employer, }) .from(jobs) .where( and(eq(jobs.ownerProfileId, ownerProfileId), inArray(jobs.id, jobIds)), ); } /** * Get a job by its URL (for deduplication). * Matches canonical URL equivalence, including legacy rows stored with non-canonical URLs. */ export async function getJobByUrl( jobUrl: string, ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { return findJobByCanonicalUrl(canonicalizeJobUrl(jobUrl), ownerProfileId); } /** * Get all known canonical job URLs (for deduplication / crawler skip lists). */ export async function getAllJobUrls( ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const rows = await db .select({ jobUrl: jobs.jobUrl }) .from(jobs) .where(eq(jobs.ownerProfileId, ownerProfileId)); const canonicals = rows.map((r) => canonicalizeJobUrl(r.jobUrl)); return Array.from(new Set(canonicals)); } async function insertJob(input: CreateJobInput): Promise { const id = randomUUID(); const now = new Date().toISOString(); const ownerProfileId = resolveOwnerForCreate(input); const contentFingerprint = buildJobContentFingerprint({ employer: input.employer, title: input.title, }); await db.insert(jobs).values({ id, ownerProfileId, source: input.source, sourceJobId: input.sourceJobId ?? null, jobUrlDirect: input.jobUrlDirect ?? null, datePosted: input.datePosted ?? null, title: input.title, employer: input.employer, employerUrl: input.employerUrl ?? null, jobUrl: input.jobUrl, contentFingerprint, applicationLink: input.applicationLink ?? null, disciplines: input.disciplines ?? null, deadline: input.deadline ?? null, salary: input.salary ?? null, location: input.location ?? null, degreeRequired: input.degreeRequired ?? null, starting: input.starting ?? null, jobDescription: input.jobDescription ?? null, jobType: input.jobType ?? null, salarySource: input.salarySource ?? null, salaryInterval: input.salaryInterval ?? null, salaryMinAmount: input.salaryMinAmount ?? null, salaryMaxAmount: input.salaryMaxAmount ?? null, salaryCurrency: input.salaryCurrency ?? null, isRemote: input.isRemote ?? null, jobLevel: input.jobLevel ?? null, jobFunction: input.jobFunction ?? null, listingType: input.listingType ?? null, emails: input.emails ?? null, companyIndustry: input.companyIndustry ?? null, companyLogo: input.companyLogo ?? null, companyUrlDirect: input.companyUrlDirect ?? null, companyAddresses: input.companyAddresses ?? null, companyNumEmployees: input.companyNumEmployees ?? null, companyRevenue: input.companyRevenue ?? null, companyDescription: input.companyDescription ?? null, skills: input.skills ?? null, experienceRange: input.experienceRange ?? null, companyRating: input.companyRating ?? null, companyReviewsCount: input.companyReviewsCount ?? null, vacancyCount: input.vacancyCount ?? null, workFromHomeType: input.workFromHomeType ?? null, status: "discovered", discoveredAt: now, createdAt: now, updatedAt: now, }); const job = await getJobById(id); if (!job) { throw new Error(`Failed to retrieve newly created job with ID ${id}`); } return job; } function isJobUrlUniqueViolation(error: unknown): boolean { if (!(error instanceof Error)) return false; return ( /UNIQUE constraint failed: jobs\.job_url/i.test(error.message) || /UNIQUE constraint failed.*idx_jobs_owner_profile_job_url/i.test( error.message, ) ); } async function tryInsertJob(input: CreateJobInput): Promise { try { return await insertJob(input); } catch (error) { if (isJobUrlUniqueViolation(error)) return null; throw error; } } /** * Create jobs (or return existing jobs for duplicate URLs). */ export async function createJobs(input: CreateJobInput): Promise; export async function createJobs( inputs: CreateJobInput[], ): Promise<{ created: number; skipped: number }>; export async function createJobs( inputOrInputs: CreateJobInput | CreateJobInput[], ): Promise { if (!Array.isArray(inputOrInputs)) { const normalized = normalizeCreateJobInputForDedup(inputOrInputs); const ownerProfileId = resolveOwnerForCreate(normalized); const normalizedWithOwner: CreateJobInput = { ...normalized, ownerProfileId, }; const { existingCanonicalSet, existingSourceJobKeySet, existingContentFingerprintSet, } = await loadJobDedupIndexes(ownerProfileId); const sid = normalized.sourceJobId?.trim(); if (sid) { const sk = sourceJobKey(normalized.source, sid); if (existingSourceJobKeySet.has(sk)) { const existing = await getJobBySourceAndExternalId( normalized.source, sid, ownerProfileId, ); if (existing) return existing; } } if (existingCanonicalSet.has(normalized.jobUrl)) { const existing = await findJobByCanonicalUrl( normalized.jobUrl, ownerProfileId, ); if (existing) return existing; } const fingerprint = buildJobContentFingerprint({ employer: normalized.employer, title: normalized.title, }); if (fingerprint && existingContentFingerprintSet.has(fingerprint)) { const existing = await findJobByContentFingerprint( fingerprint, ownerProfileId, ); if (existing) return existing; } const inserted = await tryInsertJob(normalizedWithOwner); if (inserted) return inserted; const existingAfterConflict = (await findJobByCanonicalUrl(normalized.jobUrl, ownerProfileId)) ?? (sid ? await getJobBySourceAndExternalId( normalized.source, sid, ownerProfileId, ) : null); if (existingAfterConflict) return existingAfterConflict; throw new Error("Failed to create or resolve existing job by URL"); } const ownerProfileId = resolveOwnerForCreate(inputOrInputs[0] ?? {}); const { existingCanonicalSet, existingSourceJobKeySet, existingContentFingerprintSet, } = await loadJobDedupIndexes(ownerProfileId); const batchBuckets = new Map< string, { input: CreateJobInput; count: number; fingerprint: string | null; } >(); for (const raw of inputOrInputs) { const normalized = normalizeCreateJobInputForDedup({ ...raw, ownerProfileId, }); const sidForKey = normalized.sourceJobId?.trim(); const fingerprint = buildJobContentFingerprint({ employer: normalized.employer, title: normalized.title, }); // Coalesce duplicates within a single batch, preferring fingerprint when // available so two different feeds posting the same role merge into one // bucket. Fall back to source-job-id, then canonical URL. const batchKey = fingerprint ? `fp:${fingerprint}` : sidForKey ? `sid:${sourceJobKey(normalized.source, sidForKey)}` : `url:${normalized.jobUrl}`; const prev = batchBuckets.get(batchKey); if (prev) { prev.count += 1; } else { batchBuckets.set(batchKey, { input: normalized, count: 1, fingerprint }); } } let created = 0; let skipped = 0; for (const { input, count, fingerprint } of batchBuckets.values()) { const canonical = input.jobUrl; const sid = input.sourceJobId?.trim(); const sk = sid ? sourceJobKey(input.source, sid) : null; if (sk && existingSourceJobKeySet.has(sk)) { skipped += count; continue; } if (existingCanonicalSet.has(canonical)) { skipped += count; continue; } if (fingerprint && existingContentFingerprintSet.has(fingerprint)) { skipped += count; continue; } const inserted = await tryInsertJob(input); if (!inserted) { skipped += count; continue; } created += 1; skipped += count - 1; existingCanonicalSet.add(canonical); if (sk) { existingSourceJobKeySet.add(sk); } if (fingerprint) { existingContentFingerprintSet.add(fingerprint); } } return { created, skipped }; } /** * Create a single job (or return existing if URL matches). */ export async function createJob(input: CreateJobInput): Promise { return createJobs(input); } /** * Update a job. */ export async function updateJob( id: string, input: UpdateJobInput, ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const now = new Date().toISOString(); await db .update(jobs) .set({ ...input, updatedAt: now, ...(input.status === "processing" ? { processedAt: now } : {}), ...(input.status === "applied" && !input.appliedAt ? { appliedAt: now } : {}), }) .where(and(eq(jobs.id, id), eq(jobs.ownerProfileId, ownerProfileId))); return getJobById(id, ownerProfileId); } /** * Get job statistics by status. */ export async function getJobStats( ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise> { const result = await db .select({ status: jobs.status, count: sql`count(*)`, }) .from(jobs) .where(eq(jobs.ownerProfileId, ownerProfileId)) .groupBy(jobs.status); const stats: Record = { discovered: 0, processing: 0, ready: 0, applied: 0, in_progress: 0, skipped: 0, expired: 0, }; for (const row of result) { stats[row.status as JobStatus] = row.count; } return stats; } /** * Get jobs ready for processing (discovered with description). */ export async function getJobsForProcessing( limit: number = 10, ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const rows = await db .select() .from(jobs) .where( and( eq(jobs.ownerProfileId, ownerProfileId), eq(jobs.status, "discovered"), sql`${jobs.jobDescription} IS NOT NULL`, ), ) .orderBy(desc(jobs.discoveredAt)) .limit(limit); return rows.map(mapRowToJob); } /** * Get discovered jobs missing a suitability score. */ export async function getUnscoredDiscoveredJobs( limit?: number, ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const query = db .select() .from(jobs) .where( and( eq(jobs.ownerProfileId, ownerProfileId), eq(jobs.status, "discovered"), isNull(jobs.suitabilityScore), ), ) .orderBy(desc(jobs.discoveredAt)); const rows = typeof limit === "number" ? await query.limit(limit) : await query; return rows.map(mapRowToJob); } /** * Delete jobs by status. */ export async function deleteJobsByStatus( status: JobStatus, ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const result = await db .delete(jobs) .where( and(eq(jobs.ownerProfileId, ownerProfileId), eq(jobs.status, status)), ) .run(); return result.changes; } /** * Delete jobs with suitability score below threshold (excluding applied and in_progress jobs). */ export async function deleteJobsBelowScore( threshold: number, ownerProfileId: string = getJobOwnerProfileId() ?? DEFAULT_JOB_OWNER_PROFILE_ID, ): Promise { const result = await db .delete(jobs) .where( and( eq(jobs.ownerProfileId, ownerProfileId), lt(jobs.suitabilityScore, threshold), ne(jobs.status, "applied"), ne(jobs.status, "in_progress"), ), ) .run(); return result.changes; } // Helper to map database row to Job type function mapRowToJob(row: typeof jobs.$inferSelect): Job { return { id: row.id, ownerProfileId: row.ownerProfileId ?? DEFAULT_JOB_OWNER_PROFILE_ID, source: row.source as Job["source"], sourceJobId: row.sourceJobId ?? null, jobUrlDirect: row.jobUrlDirect ?? null, datePosted: row.datePosted ?? null, title: row.title, employer: row.employer, employerUrl: row.employerUrl, jobUrl: row.jobUrl, applicationLink: row.applicationLink, disciplines: row.disciplines, deadline: row.deadline, salary: row.salary, location: row.location, degreeRequired: row.degreeRequired, starting: row.starting, jobDescription: row.jobDescription, status: row.status as JobStatus, outcome: row.outcome ?? null, closedAt: row.closedAt ?? null, suitabilityScore: row.suitabilityScore, suitabilityReason: row.suitabilityReason, suitabilityAnalysis: row.suitabilityAnalysis ?? null, coverLetter: row.coverLetter ?? null, tailoredSummary: row.tailoredSummary, tailoredHeadline: row.tailoredHeadline ?? null, tailoredSkills: row.tailoredSkills ?? null, selectedProjectIds: row.selectedProjectIds ?? null, pdfPath: row.pdfPath, tracerLinksEnabled: row.tracerLinksEnabled ?? false, sponsorMatchScore: row.sponsorMatchScore ?? null, sponsorMatchNames: row.sponsorMatchNames ?? null, notes: row.notes ?? null, jobType: row.jobType ?? null, salarySource: row.salarySource ?? null, salaryInterval: row.salaryInterval ?? null, salaryMinAmount: row.salaryMinAmount ?? null, salaryMaxAmount: row.salaryMaxAmount ?? null, salaryCurrency: row.salaryCurrency ?? null, isRemote: row.isRemote ?? null, jobLevel: row.jobLevel ?? null, jobFunction: row.jobFunction ?? null, listingType: row.listingType ?? null, emails: row.emails ?? null, companyIndustry: row.companyIndustry ?? null, companyLogo: row.companyLogo ?? null, companyUrlDirect: row.companyUrlDirect ?? null, companyAddresses: row.companyAddresses ?? null, companyNumEmployees: row.companyNumEmployees ?? null, companyRevenue: row.companyRevenue ?? null, companyDescription: row.companyDescription ?? null, skills: row.skills ?? null, experienceRange: row.experienceRange ?? null, companyRating: row.companyRating ?? null, companyReviewsCount: row.companyReviewsCount ?? null, vacancyCount: row.vacancyCount ?? null, workFromHomeType: row.workFromHomeType ?? null, discoveredAt: row.discoveredAt, processedAt: row.processedAt, appliedAt: row.appliedAt, createdAt: row.createdAt, updatedAt: row.updatedAt, }; }