single thread

This commit is contained in:
DaKheera47 2026-02-15 19:14:06 +00:00
parent 91f08b944d
commit 8b8120bf1d
7 changed files with 466 additions and 159 deletions

View File

@ -472,6 +472,23 @@ export async function listJobChatThreads(jobId: string): Promise<{
return fetchApi<{ threads: JobChatThread[] }>(`/jobs/${jobId}/chat/threads`);
}
export async function listJobGhostwriterMessages(
jobId: string,
options?: { limit?: number; offset?: number },
): Promise<{ messages: JobChatMessage[] }> {
const params = new URLSearchParams();
if (typeof options?.limit === "number") {
params.set("limit", String(options.limit));
}
if (typeof options?.offset === "number") {
params.set("offset", String(options.offset));
}
const query = params.toString();
return fetchApi<{ messages: JobChatMessage[] }>(
`/jobs/${jobId}/chat/messages${query ? `?${query}` : ""}`,
);
}
export async function createJobChatThread(
jobId: string,
input?: { title?: string | null },
@ -539,6 +556,23 @@ export async function streamJobChatMessage(
);
}
export async function streamJobGhostwriterMessage(
jobId: string,
input: { content: string; signal?: AbortSignal },
handlers: {
onEvent: (event: JobChatStreamEvent) => void;
},
): Promise<void> {
return streamSseEvents(
`/jobs/${jobId}/chat/messages`,
{ content: input.content, stream: true },
{
onEvent: handlers.onEvent,
signal: input.signal,
},
);
}
export async function cancelJobChatRun(
jobId: string,
threadId: string,
@ -553,6 +587,19 @@ export async function cancelJobChatRun(
);
}
export async function cancelJobGhostwriterRun(
jobId: string,
runId: string,
): Promise<{ cancelled: boolean; alreadyFinished: boolean }> {
return fetchApi<{ cancelled: boolean; alreadyFinished: boolean }>(
`/jobs/${jobId}/chat/runs/${runId}/cancel`,
{
method: "POST",
body: JSON.stringify({}),
},
);
}
export async function regenerateJobChatMessage(
jobId: string,
threadId: string,
@ -586,6 +633,24 @@ export async function streamRegenerateJobChatMessage(
);
}
export async function streamRegenerateJobGhostwriterMessage(
jobId: string,
assistantMessageId: string,
input: { signal?: AbortSignal },
handlers: {
onEvent: (event: JobChatStreamEvent) => void;
},
): Promise<void> {
return streamSseEvents(
`/jobs/${jobId}/chat/messages/${assistantMessageId}/regenerate`,
{ stream: true },
{
onEvent: handlers.onEvent,
signal: input.signal,
},
);
}
export async function processJob(
id: string,
options?: { force?: boolean },

View File

@ -41,7 +41,7 @@ export const GhostwriterDrawer: React.FC<GhostwriterDrawerProps> = ({
<SheetContent
side="right"
className="flex w-full flex-col p-0 sm:max-w-none lg:w-[50vw] xl:w-[40vw] 2xl:w-[30vw]"
className="w-full p-0 sm:max-w-none lg:w-[50vw] xl:w-[40vw] 2xl:w-[30vw]"
>
<div className="border-b border-border/50 p-4">
<SheetHeader>

View File

@ -1,9 +1,4 @@
import type {
Job,
JobChatMessage,
JobChatStreamEvent,
JobChatThread,
} from "@shared/types";
import type { Job, JobChatMessage, JobChatStreamEvent } from "@shared/types";
import type React from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
@ -11,33 +6,22 @@ import * as api from "../../api";
import { Composer } from "./Composer";
import { MessageList } from "./MessageList";
import { RunControls } from "./RunControls";
import { ThreadList } from "./ThreadList";
type GhostwriterPanelProps = {
job: Job;
};
export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
const [threads, setThreads] = useState<JobChatThread[]>([]);
const [activeThreadId, setActiveThreadId] = useState<string | null>(null);
const [messages, setMessages] = useState<JobChatMessage[]>([]);
const [threadPreviews, setThreadPreviews] = useState<Record<string, string>>(
{},
);
const [isLoading, setIsLoading] = useState(true);
const [isStreaming, setIsStreaming] = useState(false);
const [streamingMessageId, setStreamingMessageId] = useState<string | null>(
null,
);
const [activeRunId, setActiveRunId] = useState<string | null>(null);
const messageListRef = useRef<HTMLDivElement | null>(null);
const streamAbortRef = useRef<AbortController | null>(null);
const activeThreadIdRef = useRef<string | null>(null);
useEffect(() => {
activeThreadIdRef.current = activeThreadId;
}, [activeThreadId]);
useEffect(() => {
const container = messageListRef.current;
@ -49,50 +33,17 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
}
});
const loadThreadMessages = useCallback(
async (threadId: string) => {
const data = await api.listJobChatMessages(job.id, threadId, {
limit: 300,
});
setMessages(data.messages);
const preview = [...data.messages]
.reverse()
.find((message) => !!message.content.trim())?.content;
if (preview) {
setThreadPreviews((current) => ({ ...current, [threadId]: preview }));
}
},
[job.id],
);
const createThread = useCallback(async () => {
const created = await api.createJobChatThread(job.id, {
title: `${job.title} @ ${job.employer}`,
const loadMessages = useCallback(async () => {
const data = await api.listJobGhostwriterMessages(job.id, {
limit: 300,
});
setThreads((current) => [created.thread, ...current]);
setThreadPreviews((current) => ({ ...current, [created.thread.id]: "" }));
setActiveThreadId(created.thread.id);
setMessages([]);
return created.thread;
}, [job.id, job.title, job.employer]);
setMessages(data.messages);
}, [job.id]);
const load = useCallback(async () => {
setIsLoading(true);
try {
const data = await api.listJobChatThreads(job.id);
const nextThreads = data.threads;
setThreads(nextThreads);
let threadId = nextThreads[0]?.id ?? null;
if (!threadId) {
const created = await createThread();
threadId = created.id;
}
setActiveThreadId(threadId);
if (threadId) {
await loadThreadMessages(threadId);
}
await loadMessages();
} catch (error) {
const message =
error instanceof Error ? error.message : "Failed to load Ghostwriter";
@ -100,7 +51,7 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
} finally {
setIsLoading(false);
}
}, [job.id, createThread, loadThreadMessages]);
}, [loadMessages]);
useEffect(() => {
void load();
@ -116,8 +67,9 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
setActiveRunId(event.runId);
setStreamingMessageId(event.messageId);
setMessages((current) => {
if (current.some((message) => message.id === event.messageId))
if (current.some((message) => message.id === event.messageId)) {
return current;
}
return [
...current,
{
@ -152,13 +104,6 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
: message,
),
);
const threadId = activeThreadIdRef.current;
if (threadId) {
setThreadPreviews((current) => ({
...current,
[threadId]: `${current[threadId] ?? ""}${event.delta}`.trim(),
}));
}
return;
}
@ -174,10 +119,6 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
setStreamingMessageId(null);
setActiveRunId(null);
setIsStreaming(false);
setThreadPreviews((current) => ({
...current,
[event.message.threadId]: event.message.content,
}));
return;
}
@ -193,12 +134,11 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
const sendMessage = useCallback(
async (content: string) => {
if (!activeThreadIdRef.current || isStreaming) return;
if (isStreaming) return;
const threadId = activeThreadIdRef.current;
const optimisticUser: JobChatMessage = {
id: `tmp-user-${Date.now()}`,
threadId,
threadId: messages[messages.length - 1]?.threadId || "pending-thread",
jobId: job.id,
role: "user",
content,
@ -212,21 +152,19 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
};
setMessages((current) => [...current, optimisticUser]);
setThreadPreviews((current) => ({ ...current, [threadId]: content }));
setIsStreaming(true);
const controller = new AbortController();
streamAbortRef.current = controller;
try {
await api.streamJobChatMessage(
await api.streamJobGhostwriterMessage(
job.id,
threadId,
{ content, signal: controller.signal },
{ onEvent: onStreamEvent },
);
await loadThreadMessages(threadId);
await loadMessages();
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
return;
@ -240,25 +178,25 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
setIsStreaming(false);
}
},
[isStreaming, job.id, loadThreadMessages, onStreamEvent],
[isStreaming, job.id, loadMessages, messages, onStreamEvent],
);
const stopStreaming = useCallback(async () => {
if (!activeThreadId || !activeRunId) return;
if (!activeRunId) return;
try {
await api.cancelJobChatRun(job.id, activeThreadId, activeRunId);
await api.cancelJobGhostwriterRun(job.id, activeRunId);
streamAbortRef.current?.abort();
streamAbortRef.current = null;
setIsStreaming(false);
setActiveRunId(null);
setStreamingMessageId(null);
await loadThreadMessages(activeThreadId);
await loadMessages();
} catch (error) {
const message =
error instanceof Error ? error.message : "Failed to stop run";
toast.error(message);
}
}, [activeThreadId, activeRunId, job.id, loadThreadMessages]);
}, [activeRunId, job.id, loadMessages]);
const canRegenerate = useMemo(() => {
if (isStreaming || messages.length === 0) return false;
@ -267,7 +205,7 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
}, [isStreaming, messages]);
const regenerate = useCallback(async () => {
if (!activeThreadId || isStreaming || messages.length === 0) return;
if (isStreaming || messages.length === 0) return;
const last = messages[messages.length - 1];
if (last.role !== "assistant") return;
@ -276,14 +214,13 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
streamAbortRef.current = controller;
try {
await api.streamRegenerateJobChatMessage(
await api.streamRegenerateJobGhostwriterMessage(
job.id,
activeThreadId,
last.id,
{ signal: controller.signal },
{ onEvent: onStreamEvent },
);
await loadThreadMessages(activeThreadId);
await loadMessages();
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
return;
@ -297,59 +234,30 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
streamAbortRef.current = null;
setIsStreaming(false);
}
}, [
activeThreadId,
isStreaming,
job.id,
loadThreadMessages,
messages,
onStreamEvent,
]);
}, [isStreaming, job.id, loadMessages, messages, onStreamEvent]);
return (
<div className="flex min-h-0 flex-1 flex-col">
<div className="grid min-h-0 flex-1 grid-cols-1 md:grid-cols-[16rem_minmax(0,1fr)]">
<ThreadList
job={job}
threads={threads}
previews={threadPreviews}
activeThreadId={activeThreadId}
onSelectThread={(threadId) => {
setActiveThreadId(threadId);
void loadThreadMessages(threadId);
}}
onCreateThread={() => {
void createThread();
}}
disabled={isLoading || isStreaming}
<div
ref={messageListRef}
className="min-h-0 flex-1 overflow-y-auto border-b border-border/50 pb-3 pr-1"
>
<MessageList
messages={messages}
isStreaming={isStreaming}
streamingMessageId={streamingMessageId}
/>
</div>
<div className="mt-4 space-y-3">
<RunControls
isStreaming={isStreaming}
canRegenerate={canRegenerate}
onStop={stopStreaming}
onRegenerate={regenerate}
/>
<div className="flex min-h-0 flex-1 flex-col border-t border-border/50 pt-4 md:border-t-0 md:border-l md:pl-4 md:pt-0">
<div
ref={messageListRef}
className="min-h-0 flex-1 overflow-y-auto pr-1"
>
<MessageList
messages={messages}
isStreaming={isStreaming}
streamingMessageId={streamingMessageId}
/>
</div>
<div className="mt-4 space-y-3 border-t border-border/50 pt-3">
<RunControls
isStreaming={isStreaming}
canRegenerate={canRegenerate}
onStop={stopStreaming}
onRegenerate={regenerate}
/>
<Composer
disabled={isLoading || isStreaming || !activeThreadId}
onSend={sendMessage}
/>
</div>
</div>
<Composer disabled={isLoading || isStreaming} onSend={sendMessage} />
</div>
</div>
);

View File

@ -39,6 +39,22 @@ vi.mock("../../services/ghostwriter", () => ({
updatedAt: new Date().toISOString(),
},
]),
listMessagesForJob: vi.fn(async () => [
{
id: "message-1",
threadId: "thread-1",
jobId: "job-1",
role: "user",
content: "hello",
status: "complete",
tokensIn: 1,
tokensOut: null,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]),
sendMessage: vi.fn(async () => ({
userMessage: {
id: "user-1",
@ -70,6 +86,37 @@ vi.mock("../../services/ghostwriter", () => ({
},
runId: "run-1",
})),
sendMessageForJob: vi.fn(async () => ({
userMessage: {
id: "user-1",
threadId: "thread-1",
jobId: "job-1",
role: "user",
content: "hello",
status: "complete",
tokensIn: 1,
tokensOut: null,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
assistantMessage: {
id: "assistant-1",
threadId: "thread-1",
jobId: "job-1",
role: "assistant",
content: "hi",
status: "complete",
tokensIn: 1,
tokensOut: 1,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
runId: "run-1",
})),
cancelRun: vi.fn(async () => ({ cancelled: true, alreadyFinished: false })),
regenerateMessage: vi.fn(async () => ({
runId: "run-2",
@ -104,8 +151,8 @@ describe.sequential("Ghostwriter API", () => {
await stopServer({ server, closeDb, tempDir });
});
it("lists threads with request id metadata", async () => {
const res = await fetch(`${baseUrl}/api/jobs/job-1/chat/threads`, {
it("lists messages with request id metadata", async () => {
const res = await fetch(`${baseUrl}/api/jobs/job-1/chat/messages`, {
headers: {
"x-request-id": "chat-req-1",
},
@ -115,34 +162,18 @@ describe.sequential("Ghostwriter API", () => {
expect(res.status).toBe(200);
expect(res.headers.get("x-request-id")).toBe("chat-req-1");
expect(body.ok).toBe(true);
expect(body.data.threads.length).toBe(1);
expect(body.data.messages.length).toBe(1);
expect(body.meta.requestId).toBe("chat-req-1");
});
it("creates thread and sends a message", async () => {
const threadRes = await fetch(`${baseUrl}/api/jobs/job-1/chat/threads`, {
it("sends a message in the per-job conversation", async () => {
const messageRes = await fetch(`${baseUrl}/api/jobs/job-1/chat/messages`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ title: "My thread" }),
body: JSON.stringify({ content: "hello" }),
});
const threadBody = await threadRes.json();
expect(threadRes.status).toBe(201);
expect(threadBody.ok).toBe(true);
expect(threadBody.data.thread.id).toBe("thread-created");
const messageRes = await fetch(
`${baseUrl}/api/jobs/job-1/chat/threads/thread-1/messages`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ content: "hello" }),
},
);
const messageBody = await messageRes.json();
expect(messageRes.status).toBe(200);

View File

@ -37,6 +37,233 @@ function writeSse(res: Response, event: unknown): void {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
ghostwriterRouter.get(
"/messages",
asyncRoute(async (req, res) => {
const jobId = getJobId(req);
const parsed = listMessagesQuerySchema.safeParse(req.query);
if (!parsed.success) {
return fail(
res,
badRequest(parsed.error.message, parsed.error.flatten()),
);
}
await runWithRequestContext({ jobId }, async () => {
const messages = await ghostwriterService.listMessagesForJob({
jobId,
limit: parsed.data.limit,
offset: parsed.data.offset,
});
ok(res, { messages });
});
}),
);
ghostwriterRouter.post(
"/messages",
asyncRoute(async (req, res) => {
const jobId = getJobId(req);
const parsed = sendMessageSchema.safeParse(req.body);
if (!parsed.success) {
return fail(
res,
badRequest(parsed.error.message, parsed.error.flatten()),
);
}
await runWithRequestContext({ jobId }, async () => {
if (parsed.data.stream) {
res.status(200);
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("Connection", "keep-alive");
res.flushHeaders?.();
try {
await ghostwriterService.sendMessageForJob({
jobId,
content: parsed.data.content,
stream: {
onReady: ({ runId, threadId, messageId, requestId }) =>
writeSse(res, {
type: "ready",
runId,
threadId,
messageId,
requestId,
}),
onDelta: ({ runId, messageId, delta }) =>
writeSse(res, {
type: "delta",
runId,
messageId,
delta,
}),
onCompleted: ({ runId, message }) =>
writeSse(res, {
type: "completed",
runId,
message,
}),
onCancelled: ({ runId, message }) =>
writeSse(res, {
type: "cancelled",
runId,
message,
}),
onError: ({ runId, code, message, requestId }) =>
writeSse(res, {
type: "error",
runId,
code,
message,
requestId,
}),
},
});
} catch (error) {
const appError = toAppError(error);
writeSse(res, {
type: "error",
code: appError.code,
message: appError.message,
requestId: res.getHeader("x-request-id") || "unknown",
});
} finally {
res.end();
}
return;
}
const result = await ghostwriterService.sendMessageForJob({
jobId,
content: parsed.data.content,
});
ok(res, {
userMessage: result.userMessage,
assistantMessage: result.assistantMessage,
runId: result.runId,
});
});
}),
);
ghostwriterRouter.post(
"/runs/:runId/cancel",
asyncRoute(async (req, res) => {
const jobId = getJobId(req);
const runId = req.params.runId;
if (!runId) {
return fail(res, badRequest("Missing run id"));
}
await runWithRequestContext({ jobId }, async () => {
const result = await ghostwriterService.cancelRunForJob({
jobId,
runId,
});
ok(res, result);
});
}),
);
ghostwriterRouter.post(
"/messages/:assistantMessageId/regenerate",
asyncRoute(async (req, res) => {
const jobId = getJobId(req);
const assistantMessageId = req.params.assistantMessageId;
if (!assistantMessageId) {
return fail(res, badRequest("Missing message id"));
}
const parsed = regenerateSchema.safeParse(req.body ?? {});
if (!parsed.success) {
return fail(
res,
badRequest(parsed.error.message, parsed.error.flatten()),
);
}
await runWithRequestContext({ jobId }, async () => {
if (parsed.data.stream) {
res.status(200);
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("Connection", "keep-alive");
res.flushHeaders?.();
try {
await ghostwriterService.regenerateMessageForJob({
jobId,
assistantMessageId,
stream: {
onReady: ({ runId, threadId, messageId, requestId }) =>
writeSse(res, {
type: "ready",
runId,
threadId,
messageId,
requestId,
}),
onDelta: ({ runId, messageId, delta }) =>
writeSse(res, {
type: "delta",
runId,
messageId,
delta,
}),
onCompleted: ({ runId, message }) =>
writeSse(res, {
type: "completed",
runId,
message,
}),
onCancelled: ({ runId, message }) =>
writeSse(res, {
type: "cancelled",
runId,
message,
}),
onError: ({ runId, code, message, requestId }) =>
writeSse(res, {
type: "error",
runId,
code,
message,
requestId,
}),
},
});
} catch (error) {
const appError = toAppError(error);
writeSse(res, {
type: "error",
code: appError.code,
message: appError.message,
requestId: res.getHeader("x-request-id") || "unknown",
});
} finally {
res.end();
}
return;
}
const result = await ghostwriterService.regenerateMessageForJob({
jobId,
assistantMessageId,
});
ok(res, result);
});
}),
);
ghostwriterRouter.get(
"/threads",
asyncRoute(async (req, res) => {

View File

@ -70,6 +70,20 @@ export async function listThreadsForJob(
return rows.map(mapThread);
}
export async function getOrCreateThreadForJob(input: {
jobId: string;
title?: string | null;
}): Promise<JobChatThread> {
const existing = await listThreadsForJob(input.jobId);
if (existing.length > 0) {
return existing[0];
}
return createThread({
jobId: input.jobId,
title: input.title ?? null,
});
}
export async function getThreadById(
threadId: string,
): Promise<JobChatThread | null> {

View File

@ -104,6 +104,7 @@ type GenerateReplyOptions = {
stream?: {
onReady: (payload: {
runId: string;
threadId: string;
messageId: string;
requestId: string;
}) => void;
@ -129,15 +130,23 @@ type GenerateReplyOptions = {
};
};
async function ensureJobThread(jobId: string) {
return jobChatRepo.getOrCreateThreadForJob({
jobId,
title: null,
});
}
export async function createThread(input: {
jobId: string;
title?: string | null;
}) {
return jobChatRepo.createThread(input);
return ensureJobThread(input.jobId);
}
export async function listThreads(jobId: string) {
return jobChatRepo.listThreadsForJob(jobId);
const thread = await ensureJobThread(jobId);
return [thread];
}
export async function listMessages(input: {
@ -157,6 +166,18 @@ export async function listMessages(input: {
});
}
export async function listMessagesForJob(input: {
jobId: string;
limit?: number;
offset?: number;
}) {
const thread = await ensureJobThread(input.jobId);
return jobChatRepo.listMessagesForThread(thread.id, {
limit: input.limit,
offset: input.offset,
});
}
async function runAssistantReply(
options: GenerateReplyOptions,
): Promise<{ runId: string; messageId: string; message: string }> {
@ -203,6 +224,7 @@ async function runAssistantReply(
abortControllers.set(run.id, controller);
options.stream?.onReady({
runId: run.id,
threadId: options.threadId,
messageId: assistantMessage.id,
requestId,
});
@ -400,6 +422,20 @@ export async function sendMessage(input: {
};
}
export async function sendMessageForJob(input: {
jobId: string;
content: string;
stream?: GenerateReplyOptions["stream"];
}) {
const thread = await ensureJobThread(input.jobId);
return sendMessage({
jobId: input.jobId,
threadId: thread.id,
content: input.content,
stream: input.stream,
});
}
export async function regenerateMessage(input: {
jobId: string;
threadId: string;
@ -463,6 +499,20 @@ export async function regenerateMessage(input: {
};
}
export async function regenerateMessageForJob(input: {
jobId: string;
assistantMessageId: string;
stream?: GenerateReplyOptions["stream"];
}) {
const thread = await ensureJobThread(input.jobId);
return regenerateMessage({
jobId: input.jobId,
threadId: thread.id,
assistantMessageId: input.assistantMessageId,
stream: input.stream,
});
}
export async function cancelRun(input: {
jobId: string;
threadId: string;
@ -496,3 +546,15 @@ export async function cancelRun(input: {
alreadyFinished: false,
};
}
export async function cancelRunForJob(input: {
jobId: string;
runId: string;
}): Promise<{ cancelled: boolean; alreadyFinished: boolean }> {
const thread = await ensureJobThread(input.jobId);
return cancelRun({
jobId: input.jobId,
threadId: thread.id,
runId: input.runId,
});
}