diff --git a/.env.example b/.env.example index aedda52..68f8d97 100644 --- a/.env.example +++ b/.env.example @@ -23,6 +23,7 @@ NOTION_DATABASE_ID= # Optional: Webhook secret for n8n automation WEBHOOK_SECRET= +PIPELINE_WEBHOOK_URL= # ============================================================================= # JobSpy (Indeed/LinkedIn scraping) - optional diff --git a/orchestrator/.env.example b/orchestrator/.env.example index 39a12e5..a92ecc4 100644 --- a/orchestrator/.env.example +++ b/orchestrator/.env.example @@ -11,6 +11,7 @@ NOTION_DATABASE_ID= # Webhook security (optional) WEBHOOK_SECRET= +PIPELINE_WEBHOOK_URL= # Pipeline configuration PIPELINE_TOP_N=10 diff --git a/orchestrator/src/client/api/client.ts b/orchestrator/src/client/api/client.ts index 1e7f77c..cef2bf9 100644 --- a/orchestrator/src/client/api/client.ts +++ b/orchestrator/src/client/api/client.ts @@ -98,7 +98,10 @@ export async function getSettings(): Promise { return fetchApi('/settings'); } -export async function updateSettings(update: { model?: string | null }): Promise { +export async function updateSettings(update: { + model?: string | null + pipelineWebhookUrl?: string | null +}): Promise { return fetchApi('/settings', { method: 'PATCH', body: JSON.stringify(update), diff --git a/orchestrator/src/client/pages/SettingsPage.tsx b/orchestrator/src/client/pages/SettingsPage.tsx index dc9b5ec..d2c4690 100644 --- a/orchestrator/src/client/pages/SettingsPage.tsx +++ b/orchestrator/src/client/pages/SettingsPage.tsx @@ -15,6 +15,7 @@ import * as api from "../api" export const SettingsPage: React.FC = () => { const [settings, setSettings] = useState(null) const [modelDraft, setModelDraft] = useState("") + const [pipelineWebhookUrlDraft, setPipelineWebhookUrlDraft] = useState("") const [isSaving, setIsSaving] = useState(false) const [isLoading, setIsLoading] = useState(true) @@ -27,6 +28,7 @@ export const SettingsPage: React.FC = () => { if (!isMounted) return setSettings(data) setModelDraft(data.overrideModel ?? "") + setPipelineWebhookUrlDraft(data.overridePipelineWebhookUrl ?? "") }) .catch((error) => { const message = error instanceof Error ? error.message : "Failed to load settings" @@ -45,22 +47,32 @@ export const SettingsPage: React.FC = () => { const effectiveModel = settings?.model ?? "" const defaultModel = settings?.defaultModel ?? "" const overrideModel = settings?.overrideModel + const effectivePipelineWebhookUrl = settings?.pipelineWebhookUrl ?? "" + const defaultPipelineWebhookUrl = settings?.defaultPipelineWebhookUrl ?? "" + const overridePipelineWebhookUrl = settings?.overridePipelineWebhookUrl const canSave = useMemo(() => { if (!settings) return false const next = modelDraft.trim() const current = (overrideModel ?? "").trim() - return next !== current - }, [modelDraft, overrideModel, settings]) + const nextWebhook = pipelineWebhookUrlDraft.trim() + const currentWebhook = (overridePipelineWebhookUrl ?? "").trim() + return next !== current || nextWebhook !== currentWebhook + }, [modelDraft, overrideModel, settings, pipelineWebhookUrlDraft, overridePipelineWebhookUrl]) const handleSave = async () => { if (!settings) return try { setIsSaving(true) const trimmed = modelDraft.trim() - const updated = await api.updateSettings({ model: trimmed.length > 0 ? trimmed : null }) + const webhookTrimmed = pipelineWebhookUrlDraft.trim() + const updated = await api.updateSettings({ + model: trimmed.length > 0 ? trimmed : null, + pipelineWebhookUrl: webhookTrimmed.length > 0 ? webhookTrimmed : null, + }) setSettings(updated) setModelDraft(updated.overrideModel ?? "") + setPipelineWebhookUrlDraft(updated.overridePipelineWebhookUrl ?? "") toast.success("Settings saved") } catch (error) { const message = error instanceof Error ? error.message : "Failed to save settings" @@ -73,9 +85,10 @@ export const SettingsPage: React.FC = () => { const handleReset = async () => { try { setIsSaving(true) - const updated = await api.updateSettings({ model: null }) + const updated = await api.updateSettings({ model: null, pipelineWebhookUrl: null }) setSettings(updated) setModelDraft("") + setPipelineWebhookUrlDraft("") toast.success("Reset to default") } catch (error) { const message = error instanceof Error ? error.message : "Failed to reset settings" @@ -123,18 +136,51 @@ export const SettingsPage: React.FC = () => {
{defaultModel || "—"}
+ + -
- - + + + Pipeline Webhook + + + +
+
Pipeline status webhook URL
+ setPipelineWebhookUrlDraft(event.target.value)} + placeholder={defaultPipelineWebhookUrl || "https://..."} + disabled={isLoading || isSaving} + /> +
+ When set, the server sends a POST on pipeline completion/failure. Leave blank to disable. +
+
+ + + +
+
+
Effective
+
{effectivePipelineWebhookUrl || "—"}
+
+
+
Default (env)
+
{defaultPipelineWebhookUrl || "—"}
+
+ +
+ + +
) } - diff --git a/orchestrator/src/server/api/routes.ts b/orchestrator/src/server/api/routes.ts index bd66185..5fd6f63 100644 --- a/orchestrator/src/server/api/routes.ts +++ b/orchestrator/src/server/api/routes.ts @@ -184,12 +184,19 @@ apiRouter.get('/settings', async (_req: Request, res: Response) => { const defaultModel = process.env.MODEL || 'openai/gpt-4o-mini'; const model = overrideModel || defaultModel; + const overridePipelineWebhookUrl = await settingsRepo.getSetting('pipelineWebhookUrl'); + const defaultPipelineWebhookUrl = process.env.PIPELINE_WEBHOOK_URL || process.env.WEBHOOK_URL || ''; + const pipelineWebhookUrl = overridePipelineWebhookUrl || defaultPipelineWebhookUrl; + res.json({ success: true, data: { model, defaultModel, overrideModel, + pipelineWebhookUrl, + defaultPipelineWebhookUrl, + overridePipelineWebhookUrl, }, }); } catch (error) { @@ -200,6 +207,7 @@ apiRouter.get('/settings', async (_req: Request, res: Response) => { const updateSettingsSchema = z.object({ model: z.string().trim().min(1).max(200).nullable().optional(), + pipelineWebhookUrl: z.string().trim().min(1).max(2000).nullable().optional(), }); /** @@ -214,16 +222,28 @@ apiRouter.patch('/settings', async (req: Request, res: Response) => { await settingsRepo.setSetting('model', model); } + if ('pipelineWebhookUrl' in input) { + const pipelineWebhookUrl = input.pipelineWebhookUrl ?? null; + await settingsRepo.setSetting('pipelineWebhookUrl', pipelineWebhookUrl); + } + const overrideModel = await settingsRepo.getSetting('model'); const defaultModel = process.env.MODEL || 'openai/gpt-4o-mini'; const model = overrideModel || defaultModel; + const overridePipelineWebhookUrl = await settingsRepo.getSetting('pipelineWebhookUrl'); + const defaultPipelineWebhookUrl = process.env.PIPELINE_WEBHOOK_URL || process.env.WEBHOOK_URL || ''; + const pipelineWebhookUrl = overridePipelineWebhookUrl || defaultPipelineWebhookUrl; + res.json({ success: true, data: { model, defaultModel, overrideModel, + pipelineWebhookUrl, + defaultPipelineWebhookUrl, + overridePipelineWebhookUrl, }, }); } catch (error) { diff --git a/orchestrator/src/server/db/migrate.ts b/orchestrator/src/server/db/migrate.ts index 051314b..fd0ef71 100644 --- a/orchestrator/src/server/db/migrate.ts +++ b/orchestrator/src/server/db/migrate.ts @@ -95,6 +95,11 @@ const migrations = [ updated_at TEXT NOT NULL DEFAULT (datetime('now')) )`, + // Rename settings key: webhookUrl -> pipelineWebhookUrl (safe to re-run) + `INSERT OR REPLACE INTO settings(key, value, created_at, updated_at) + SELECT 'pipelineWebhookUrl', value, created_at, updated_at FROM settings WHERE key = 'webhookUrl'`, + `DELETE FROM settings WHERE key = 'webhookUrl'`, + // Add source column for existing databases (safe to skip if already present) `ALTER TABLE jobs ADD COLUMN source TEXT NOT NULL DEFAULT 'gradcracker'`, `UPDATE jobs SET source = 'gradcracker' WHERE source IS NULL OR source = ''`, diff --git a/orchestrator/src/server/pipeline/orchestrator.ts b/orchestrator/src/server/pipeline/orchestrator.ts index f8ac63c..6c9b571 100644 --- a/orchestrator/src/server/pipeline/orchestrator.ts +++ b/orchestrator/src/server/pipeline/orchestrator.ts @@ -17,6 +17,7 @@ import { generateSummary } from '../services/summary.js'; import { generatePdf } from '../services/pdf.js'; import * as jobsRepo from '../repositories/jobs.js'; import * as pipelineRepo from '../repositories/pipeline.js'; +import * as settingsRepo from '../repositories/settings.js'; import { progressHelpers, resetProgress, updateProgress } from './progress.js'; import type { CreateJobInput, Job, JobSource, PipelineConfig } from '../../shared/types.js'; @@ -34,6 +35,42 @@ const DEFAULT_CONFIG: PipelineConfig = { // Track if pipeline is currently running let isPipelineRunning = false; +async function notifyPipelineWebhook( + event: 'pipeline.completed' | 'pipeline.failed', + payload: Record +) { + const overridePipelineWebhookUrl = await settingsRepo.getSetting('pipelineWebhookUrl') + const pipelineWebhookUrl = ( + overridePipelineWebhookUrl || + process.env.PIPELINE_WEBHOOK_URL || + process.env.WEBHOOK_URL || + '' + ).trim() + if (!pipelineWebhookUrl) return + + try { + const headers: Record = { 'Content-Type': 'application/json' } + const secret = process.env.WEBHOOK_SECRET + if (secret) headers.Authorization = `Bearer ${secret}` + + const response = await fetch(pipelineWebhookUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + event, + sentAt: new Date().toISOString(), + ...payload, + }), + }) + + if (!response.ok) { + console.warn(`⚠️ Pipeline webhook POST failed (${response.status}): ${await response.text()}`) + } + } catch (error) { + console.warn('⚠️ Pipeline webhook POST failed:', error) + } +} + /** * Run the full job discovery and processing pipeline. */ @@ -196,6 +233,13 @@ export async function runPipeline(config: Partial = {}): Promise console.log(' Jobs processed: 0 (manual)'); progressHelpers.complete(created, 0); + + await notifyPipelineWebhook('pipeline.completed', { + pipelineRunId: pipelineRun.id, + jobsDiscovered: created, + jobsScored: unprocessedJobs.length, + jobsProcessed: 0, + }) isPipelineRunning = false; return { @@ -214,6 +258,11 @@ export async function runPipeline(config: Partial = {}): Promise }); progressHelpers.failed(message); + + await notifyPipelineWebhook('pipeline.failed', { + pipelineRunId: pipelineRun.id, + error: message, + }) isPipelineRunning = false; console.error('\n❌ Pipeline failed:', message); diff --git a/orchestrator/src/server/repositories/settings.ts b/orchestrator/src/server/repositories/settings.ts index 70602e0..d86115f 100644 --- a/orchestrator/src/server/repositories/settings.ts +++ b/orchestrator/src/server/repositories/settings.ts @@ -8,6 +8,7 @@ import { db, schema } from '../db/index.js' const { settings } = schema export type SettingKey = 'model' + | 'pipelineWebhookUrl' export async function getSetting(key: SettingKey): Promise { const [row] = await db.select().from(settings).where(eq(settings.key, key)) @@ -39,4 +40,3 @@ export async function setSetting(key: SettingKey, value: string | null): Promise updatedAt: now, }) } - diff --git a/orchestrator/src/shared/types.ts b/orchestrator/src/shared/types.ts index 8e65dc3..ac2b616 100644 --- a/orchestrator/src/shared/types.ts +++ b/orchestrator/src/shared/types.ts @@ -176,4 +176,7 @@ export interface AppSettings { model: string; defaultModel: string; overrideModel: string | null; + pipelineWebhookUrl: string; + defaultPipelineWebhookUrl: string; + overridePipelineWebhookUrl: string | null; }