From 8b8120bf1db487639c9fbd3666eee717bbeef989 Mon Sep 17 00:00:00 2001 From: DaKheera47 Date: Sun, 15 Feb 2026 19:14:06 +0000 Subject: [PATCH] single thread --- orchestrator/src/client/api/client.ts | 65 +++++ .../ghostwriter/GhostwriterDrawer.tsx | 2 +- .../ghostwriter/GhostwriterPanel.tsx | 176 ++++---------- .../src/server/api/routes/ghostwriter.test.ts | 75 ++++-- .../src/server/api/routes/ghostwriter.ts | 227 ++++++++++++++++++ .../src/server/repositories/ghostwriter.ts | 14 ++ .../src/server/services/ghostwriter.ts | 66 ++++- 7 files changed, 466 insertions(+), 159 deletions(-) diff --git a/orchestrator/src/client/api/client.ts b/orchestrator/src/client/api/client.ts index 7597775..049042e 100644 --- a/orchestrator/src/client/api/client.ts +++ b/orchestrator/src/client/api/client.ts @@ -472,6 +472,23 @@ export async function listJobChatThreads(jobId: string): Promise<{ return fetchApi<{ threads: JobChatThread[] }>(`/jobs/${jobId}/chat/threads`); } +export async function listJobGhostwriterMessages( + jobId: string, + options?: { limit?: number; offset?: number }, +): Promise<{ messages: JobChatMessage[] }> { + const params = new URLSearchParams(); + if (typeof options?.limit === "number") { + params.set("limit", String(options.limit)); + } + if (typeof options?.offset === "number") { + params.set("offset", String(options.offset)); + } + const query = params.toString(); + return fetchApi<{ messages: JobChatMessage[] }>( + `/jobs/${jobId}/chat/messages${query ? `?${query}` : ""}`, + ); +} + export async function createJobChatThread( jobId: string, input?: { title?: string | null }, @@ -539,6 +556,23 @@ export async function streamJobChatMessage( ); } +export async function streamJobGhostwriterMessage( + jobId: string, + input: { content: string; signal?: AbortSignal }, + handlers: { + onEvent: (event: JobChatStreamEvent) => void; + }, +): Promise { + return streamSseEvents( + `/jobs/${jobId}/chat/messages`, + { content: input.content, stream: true }, + { + onEvent: handlers.onEvent, + signal: input.signal, + }, + ); +} + export async function cancelJobChatRun( jobId: string, threadId: string, @@ -553,6 +587,19 @@ export async function cancelJobChatRun( ); } +export async function cancelJobGhostwriterRun( + jobId: string, + runId: string, +): Promise<{ cancelled: boolean; alreadyFinished: boolean }> { + return fetchApi<{ cancelled: boolean; alreadyFinished: boolean }>( + `/jobs/${jobId}/chat/runs/${runId}/cancel`, + { + method: "POST", + body: JSON.stringify({}), + }, + ); +} + export async function regenerateJobChatMessage( jobId: string, threadId: string, @@ -586,6 +633,24 @@ export async function streamRegenerateJobChatMessage( ); } +export async function streamRegenerateJobGhostwriterMessage( + jobId: string, + assistantMessageId: string, + input: { signal?: AbortSignal }, + handlers: { + onEvent: (event: JobChatStreamEvent) => void; + }, +): Promise { + return streamSseEvents( + `/jobs/${jobId}/chat/messages/${assistantMessageId}/regenerate`, + { stream: true }, + { + onEvent: handlers.onEvent, + signal: input.signal, + }, + ); +} + export async function processJob( id: string, options?: { force?: boolean }, diff --git a/orchestrator/src/client/components/ghostwriter/GhostwriterDrawer.tsx b/orchestrator/src/client/components/ghostwriter/GhostwriterDrawer.tsx index efb6b37..e377130 100644 --- a/orchestrator/src/client/components/ghostwriter/GhostwriterDrawer.tsx +++ b/orchestrator/src/client/components/ghostwriter/GhostwriterDrawer.tsx @@ -41,7 +41,7 @@ export const GhostwriterDrawer: React.FC = ({
diff --git a/orchestrator/src/client/components/ghostwriter/GhostwriterPanel.tsx b/orchestrator/src/client/components/ghostwriter/GhostwriterPanel.tsx index 661b9af..be8cbc3 100644 --- a/orchestrator/src/client/components/ghostwriter/GhostwriterPanel.tsx +++ b/orchestrator/src/client/components/ghostwriter/GhostwriterPanel.tsx @@ -1,9 +1,4 @@ -import type { - Job, - JobChatMessage, - JobChatStreamEvent, - JobChatThread, -} from "@shared/types"; +import type { Job, JobChatMessage, JobChatStreamEvent } from "@shared/types"; import type React from "react"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; @@ -11,33 +6,22 @@ import * as api from "../../api"; import { Composer } from "./Composer"; import { MessageList } from "./MessageList"; import { RunControls } from "./RunControls"; -import { ThreadList } from "./ThreadList"; type GhostwriterPanelProps = { job: Job; }; export const GhostwriterPanel: React.FC = ({ job }) => { - const [threads, setThreads] = useState([]); - const [activeThreadId, setActiveThreadId] = useState(null); const [messages, setMessages] = useState([]); - const [threadPreviews, setThreadPreviews] = useState>( - {}, - ); const [isLoading, setIsLoading] = useState(true); const [isStreaming, setIsStreaming] = useState(false); const [streamingMessageId, setStreamingMessageId] = useState( null, ); const [activeRunId, setActiveRunId] = useState(null); + const messageListRef = useRef(null); - const streamAbortRef = useRef(null); - const activeThreadIdRef = useRef(null); - - useEffect(() => { - activeThreadIdRef.current = activeThreadId; - }, [activeThreadId]); useEffect(() => { const container = messageListRef.current; @@ -49,50 +33,17 @@ export const GhostwriterPanel: React.FC = ({ job }) => { } }); - const loadThreadMessages = useCallback( - async (threadId: string) => { - const data = await api.listJobChatMessages(job.id, threadId, { - limit: 300, - }); - setMessages(data.messages); - const preview = [...data.messages] - .reverse() - .find((message) => !!message.content.trim())?.content; - if (preview) { - setThreadPreviews((current) => ({ ...current, [threadId]: preview })); - } - }, - [job.id], - ); - - const createThread = useCallback(async () => { - const created = await api.createJobChatThread(job.id, { - title: `${job.title} @ ${job.employer}`, + const loadMessages = useCallback(async () => { + const data = await api.listJobGhostwriterMessages(job.id, { + limit: 300, }); - setThreads((current) => [created.thread, ...current]); - setThreadPreviews((current) => ({ ...current, [created.thread.id]: "" })); - setActiveThreadId(created.thread.id); - setMessages([]); - return created.thread; - }, [job.id, job.title, job.employer]); + setMessages(data.messages); + }, [job.id]); const load = useCallback(async () => { setIsLoading(true); try { - const data = await api.listJobChatThreads(job.id); - const nextThreads = data.threads; - setThreads(nextThreads); - - let threadId = nextThreads[0]?.id ?? null; - if (!threadId) { - const created = await createThread(); - threadId = created.id; - } - - setActiveThreadId(threadId); - if (threadId) { - await loadThreadMessages(threadId); - } + await loadMessages(); } catch (error) { const message = error instanceof Error ? error.message : "Failed to load Ghostwriter"; @@ -100,7 +51,7 @@ export const GhostwriterPanel: React.FC = ({ job }) => { } finally { setIsLoading(false); } - }, [job.id, createThread, loadThreadMessages]); + }, [loadMessages]); useEffect(() => { void load(); @@ -116,8 +67,9 @@ export const GhostwriterPanel: React.FC = ({ job }) => { setActiveRunId(event.runId); setStreamingMessageId(event.messageId); setMessages((current) => { - if (current.some((message) => message.id === event.messageId)) + if (current.some((message) => message.id === event.messageId)) { return current; + } return [ ...current, { @@ -152,13 +104,6 @@ export const GhostwriterPanel: React.FC = ({ job }) => { : message, ), ); - const threadId = activeThreadIdRef.current; - if (threadId) { - setThreadPreviews((current) => ({ - ...current, - [threadId]: `${current[threadId] ?? ""}${event.delta}`.trim(), - })); - } return; } @@ -174,10 +119,6 @@ export const GhostwriterPanel: React.FC = ({ job }) => { setStreamingMessageId(null); setActiveRunId(null); setIsStreaming(false); - setThreadPreviews((current) => ({ - ...current, - [event.message.threadId]: event.message.content, - })); return; } @@ -193,12 +134,11 @@ export const GhostwriterPanel: React.FC = ({ job }) => { const sendMessage = useCallback( async (content: string) => { - if (!activeThreadIdRef.current || isStreaming) return; + if (isStreaming) return; - const threadId = activeThreadIdRef.current; const optimisticUser: JobChatMessage = { id: `tmp-user-${Date.now()}`, - threadId, + threadId: messages[messages.length - 1]?.threadId || "pending-thread", jobId: job.id, role: "user", content, @@ -212,21 +152,19 @@ export const GhostwriterPanel: React.FC = ({ job }) => { }; setMessages((current) => [...current, optimisticUser]); - setThreadPreviews((current) => ({ ...current, [threadId]: content })); setIsStreaming(true); const controller = new AbortController(); streamAbortRef.current = controller; try { - await api.streamJobChatMessage( + await api.streamJobGhostwriterMessage( job.id, - threadId, { content, signal: controller.signal }, { onEvent: onStreamEvent }, ); - await loadThreadMessages(threadId); + await loadMessages(); } catch (error) { if (error instanceof Error && error.name === "AbortError") { return; @@ -240,25 +178,25 @@ export const GhostwriterPanel: React.FC = ({ job }) => { setIsStreaming(false); } }, - [isStreaming, job.id, loadThreadMessages, onStreamEvent], + [isStreaming, job.id, loadMessages, messages, onStreamEvent], ); const stopStreaming = useCallback(async () => { - if (!activeThreadId || !activeRunId) return; + if (!activeRunId) return; try { - await api.cancelJobChatRun(job.id, activeThreadId, activeRunId); + await api.cancelJobGhostwriterRun(job.id, activeRunId); streamAbortRef.current?.abort(); streamAbortRef.current = null; setIsStreaming(false); setActiveRunId(null); setStreamingMessageId(null); - await loadThreadMessages(activeThreadId); + await loadMessages(); } catch (error) { const message = error instanceof Error ? error.message : "Failed to stop run"; toast.error(message); } - }, [activeThreadId, activeRunId, job.id, loadThreadMessages]); + }, [activeRunId, job.id, loadMessages]); const canRegenerate = useMemo(() => { if (isStreaming || messages.length === 0) return false; @@ -267,7 +205,7 @@ export const GhostwriterPanel: React.FC = ({ job }) => { }, [isStreaming, messages]); const regenerate = useCallback(async () => { - if (!activeThreadId || isStreaming || messages.length === 0) return; + if (isStreaming || messages.length === 0) return; const last = messages[messages.length - 1]; if (last.role !== "assistant") return; @@ -276,14 +214,13 @@ export const GhostwriterPanel: React.FC = ({ job }) => { streamAbortRef.current = controller; try { - await api.streamRegenerateJobChatMessage( + await api.streamRegenerateJobGhostwriterMessage( job.id, - activeThreadId, last.id, { signal: controller.signal }, { onEvent: onStreamEvent }, ); - await loadThreadMessages(activeThreadId); + await loadMessages(); } catch (error) { if (error instanceof Error && error.name === "AbortError") { return; @@ -297,59 +234,30 @@ export const GhostwriterPanel: React.FC = ({ job }) => { streamAbortRef.current = null; setIsStreaming(false); } - }, [ - activeThreadId, - isStreaming, - job.id, - loadThreadMessages, - messages, - onStreamEvent, - ]); + }, [isStreaming, job.id, loadMessages, messages, onStreamEvent]); return (
-
- { - setActiveThreadId(threadId); - void loadThreadMessages(threadId); - }} - onCreateThread={() => { - void createThread(); - }} - disabled={isLoading || isStreaming} +
+ +
+ +
+ -
-
- -
- -
- - - -
-
+
); diff --git a/orchestrator/src/server/api/routes/ghostwriter.test.ts b/orchestrator/src/server/api/routes/ghostwriter.test.ts index 4df6e2b..25fe210 100644 --- a/orchestrator/src/server/api/routes/ghostwriter.test.ts +++ b/orchestrator/src/server/api/routes/ghostwriter.test.ts @@ -39,6 +39,22 @@ vi.mock("../../services/ghostwriter", () => ({ updatedAt: new Date().toISOString(), }, ]), + listMessagesForJob: vi.fn(async () => [ + { + id: "message-1", + threadId: "thread-1", + jobId: "job-1", + role: "user", + content: "hello", + status: "complete", + tokensIn: 1, + tokensOut: null, + version: 1, + replacesMessageId: null, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + ]), sendMessage: vi.fn(async () => ({ userMessage: { id: "user-1", @@ -70,6 +86,37 @@ vi.mock("../../services/ghostwriter", () => ({ }, runId: "run-1", })), + sendMessageForJob: vi.fn(async () => ({ + userMessage: { + id: "user-1", + threadId: "thread-1", + jobId: "job-1", + role: "user", + content: "hello", + status: "complete", + tokensIn: 1, + tokensOut: null, + version: 1, + replacesMessageId: null, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + assistantMessage: { + id: "assistant-1", + threadId: "thread-1", + jobId: "job-1", + role: "assistant", + content: "hi", + status: "complete", + tokensIn: 1, + tokensOut: 1, + version: 1, + replacesMessageId: null, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + runId: "run-1", + })), cancelRun: vi.fn(async () => ({ cancelled: true, alreadyFinished: false })), regenerateMessage: vi.fn(async () => ({ runId: "run-2", @@ -104,8 +151,8 @@ describe.sequential("Ghostwriter API", () => { await stopServer({ server, closeDb, tempDir }); }); - it("lists threads with request id metadata", async () => { - const res = await fetch(`${baseUrl}/api/jobs/job-1/chat/threads`, { + it("lists messages with request id metadata", async () => { + const res = await fetch(`${baseUrl}/api/jobs/job-1/chat/messages`, { headers: { "x-request-id": "chat-req-1", }, @@ -115,34 +162,18 @@ describe.sequential("Ghostwriter API", () => { expect(res.status).toBe(200); expect(res.headers.get("x-request-id")).toBe("chat-req-1"); expect(body.ok).toBe(true); - expect(body.data.threads.length).toBe(1); + expect(body.data.messages.length).toBe(1); expect(body.meta.requestId).toBe("chat-req-1"); }); - it("creates thread and sends a message", async () => { - const threadRes = await fetch(`${baseUrl}/api/jobs/job-1/chat/threads`, { + it("sends a message in the per-job conversation", async () => { + const messageRes = await fetch(`${baseUrl}/api/jobs/job-1/chat/messages`, { method: "POST", headers: { "Content-Type": "application/json", }, - body: JSON.stringify({ title: "My thread" }), + body: JSON.stringify({ content: "hello" }), }); - const threadBody = await threadRes.json(); - - expect(threadRes.status).toBe(201); - expect(threadBody.ok).toBe(true); - expect(threadBody.data.thread.id).toBe("thread-created"); - - const messageRes = await fetch( - `${baseUrl}/api/jobs/job-1/chat/threads/thread-1/messages`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ content: "hello" }), - }, - ); const messageBody = await messageRes.json(); expect(messageRes.status).toBe(200); diff --git a/orchestrator/src/server/api/routes/ghostwriter.ts b/orchestrator/src/server/api/routes/ghostwriter.ts index 209f830..5f0b235 100644 --- a/orchestrator/src/server/api/routes/ghostwriter.ts +++ b/orchestrator/src/server/api/routes/ghostwriter.ts @@ -37,6 +37,233 @@ function writeSse(res: Response, event: unknown): void { res.write(`data: ${JSON.stringify(event)}\n\n`); } +ghostwriterRouter.get( + "/messages", + asyncRoute(async (req, res) => { + const jobId = getJobId(req); + const parsed = listMessagesQuerySchema.safeParse(req.query); + if (!parsed.success) { + return fail( + res, + badRequest(parsed.error.message, parsed.error.flatten()), + ); + } + + await runWithRequestContext({ jobId }, async () => { + const messages = await ghostwriterService.listMessagesForJob({ + jobId, + limit: parsed.data.limit, + offset: parsed.data.offset, + }); + ok(res, { messages }); + }); + }), +); + +ghostwriterRouter.post( + "/messages", + asyncRoute(async (req, res) => { + const jobId = getJobId(req); + + const parsed = sendMessageSchema.safeParse(req.body); + if (!parsed.success) { + return fail( + res, + badRequest(parsed.error.message, parsed.error.flatten()), + ); + } + + await runWithRequestContext({ jobId }, async () => { + if (parsed.data.stream) { + res.status(200); + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + res.setHeader("Connection", "keep-alive"); + res.flushHeaders?.(); + + try { + await ghostwriterService.sendMessageForJob({ + jobId, + content: parsed.data.content, + stream: { + onReady: ({ runId, threadId, messageId, requestId }) => + writeSse(res, { + type: "ready", + runId, + threadId, + messageId, + requestId, + }), + onDelta: ({ runId, messageId, delta }) => + writeSse(res, { + type: "delta", + runId, + messageId, + delta, + }), + onCompleted: ({ runId, message }) => + writeSse(res, { + type: "completed", + runId, + message, + }), + onCancelled: ({ runId, message }) => + writeSse(res, { + type: "cancelled", + runId, + message, + }), + onError: ({ runId, code, message, requestId }) => + writeSse(res, { + type: "error", + runId, + code, + message, + requestId, + }), + }, + }); + } catch (error) { + const appError = toAppError(error); + writeSse(res, { + type: "error", + code: appError.code, + message: appError.message, + requestId: res.getHeader("x-request-id") || "unknown", + }); + } finally { + res.end(); + } + + return; + } + + const result = await ghostwriterService.sendMessageForJob({ + jobId, + content: parsed.data.content, + }); + + ok(res, { + userMessage: result.userMessage, + assistantMessage: result.assistantMessage, + runId: result.runId, + }); + }); + }), +); + +ghostwriterRouter.post( + "/runs/:runId/cancel", + asyncRoute(async (req, res) => { + const jobId = getJobId(req); + const runId = req.params.runId; + if (!runId) { + return fail(res, badRequest("Missing run id")); + } + + await runWithRequestContext({ jobId }, async () => { + const result = await ghostwriterService.cancelRunForJob({ + jobId, + runId, + }); + + ok(res, result); + }); + }), +); + +ghostwriterRouter.post( + "/messages/:assistantMessageId/regenerate", + asyncRoute(async (req, res) => { + const jobId = getJobId(req); + const assistantMessageId = req.params.assistantMessageId; + if (!assistantMessageId) { + return fail(res, badRequest("Missing message id")); + } + + const parsed = regenerateSchema.safeParse(req.body ?? {}); + if (!parsed.success) { + return fail( + res, + badRequest(parsed.error.message, parsed.error.flatten()), + ); + } + + await runWithRequestContext({ jobId }, async () => { + if (parsed.data.stream) { + res.status(200); + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + res.setHeader("Connection", "keep-alive"); + res.flushHeaders?.(); + + try { + await ghostwriterService.regenerateMessageForJob({ + jobId, + assistantMessageId, + stream: { + onReady: ({ runId, threadId, messageId, requestId }) => + writeSse(res, { + type: "ready", + runId, + threadId, + messageId, + requestId, + }), + onDelta: ({ runId, messageId, delta }) => + writeSse(res, { + type: "delta", + runId, + messageId, + delta, + }), + onCompleted: ({ runId, message }) => + writeSse(res, { + type: "completed", + runId, + message, + }), + onCancelled: ({ runId, message }) => + writeSse(res, { + type: "cancelled", + runId, + message, + }), + onError: ({ runId, code, message, requestId }) => + writeSse(res, { + type: "error", + runId, + code, + message, + requestId, + }), + }, + }); + } catch (error) { + const appError = toAppError(error); + writeSse(res, { + type: "error", + code: appError.code, + message: appError.message, + requestId: res.getHeader("x-request-id") || "unknown", + }); + } finally { + res.end(); + } + + return; + } + + const result = await ghostwriterService.regenerateMessageForJob({ + jobId, + assistantMessageId, + }); + + ok(res, result); + }); + }), +); + ghostwriterRouter.get( "/threads", asyncRoute(async (req, res) => { diff --git a/orchestrator/src/server/repositories/ghostwriter.ts b/orchestrator/src/server/repositories/ghostwriter.ts index 3124c03..47ab607 100644 --- a/orchestrator/src/server/repositories/ghostwriter.ts +++ b/orchestrator/src/server/repositories/ghostwriter.ts @@ -70,6 +70,20 @@ export async function listThreadsForJob( return rows.map(mapThread); } +export async function getOrCreateThreadForJob(input: { + jobId: string; + title?: string | null; +}): Promise { + const existing = await listThreadsForJob(input.jobId); + if (existing.length > 0) { + return existing[0]; + } + return createThread({ + jobId: input.jobId, + title: input.title ?? null, + }); +} + export async function getThreadById( threadId: string, ): Promise { diff --git a/orchestrator/src/server/services/ghostwriter.ts b/orchestrator/src/server/services/ghostwriter.ts index 7bf2290..de78714 100644 --- a/orchestrator/src/server/services/ghostwriter.ts +++ b/orchestrator/src/server/services/ghostwriter.ts @@ -104,6 +104,7 @@ type GenerateReplyOptions = { stream?: { onReady: (payload: { runId: string; + threadId: string; messageId: string; requestId: string; }) => void; @@ -129,15 +130,23 @@ type GenerateReplyOptions = { }; }; +async function ensureJobThread(jobId: string) { + return jobChatRepo.getOrCreateThreadForJob({ + jobId, + title: null, + }); +} + export async function createThread(input: { jobId: string; title?: string | null; }) { - return jobChatRepo.createThread(input); + return ensureJobThread(input.jobId); } export async function listThreads(jobId: string) { - return jobChatRepo.listThreadsForJob(jobId); + const thread = await ensureJobThread(jobId); + return [thread]; } export async function listMessages(input: { @@ -157,6 +166,18 @@ export async function listMessages(input: { }); } +export async function listMessagesForJob(input: { + jobId: string; + limit?: number; + offset?: number; +}) { + const thread = await ensureJobThread(input.jobId); + return jobChatRepo.listMessagesForThread(thread.id, { + limit: input.limit, + offset: input.offset, + }); +} + async function runAssistantReply( options: GenerateReplyOptions, ): Promise<{ runId: string; messageId: string; message: string }> { @@ -203,6 +224,7 @@ async function runAssistantReply( abortControllers.set(run.id, controller); options.stream?.onReady({ runId: run.id, + threadId: options.threadId, messageId: assistantMessage.id, requestId, }); @@ -400,6 +422,20 @@ export async function sendMessage(input: { }; } +export async function sendMessageForJob(input: { + jobId: string; + content: string; + stream?: GenerateReplyOptions["stream"]; +}) { + const thread = await ensureJobThread(input.jobId); + return sendMessage({ + jobId: input.jobId, + threadId: thread.id, + content: input.content, + stream: input.stream, + }); +} + export async function regenerateMessage(input: { jobId: string; threadId: string; @@ -463,6 +499,20 @@ export async function regenerateMessage(input: { }; } +export async function regenerateMessageForJob(input: { + jobId: string; + assistantMessageId: string; + stream?: GenerateReplyOptions["stream"]; +}) { + const thread = await ensureJobThread(input.jobId); + return regenerateMessage({ + jobId: input.jobId, + threadId: thread.id, + assistantMessageId: input.assistantMessageId, + stream: input.stream, + }); +} + export async function cancelRun(input: { jobId: string; threadId: string; @@ -496,3 +546,15 @@ export async function cancelRun(input: { alreadyFinished: false, }; } + +export async function cancelRunForJob(input: { + jobId: string; + runId: string; +}): Promise<{ cancelled: boolean; alreadyFinished: boolean }> { + const thread = await ensureJobThread(input.jobId); + return cancelRun({ + jobId: input.jobId, + threadId: thread.id, + runId: input.runId, + }); +}