feat(ghostwriter): branching conversations with edit and per-message regenerate (#290)

Turn Ghostwriter's flat message list into a tree structure, enabling
Claude.ai/ChatGPT-style branching conversations.

**Data model**: Add `parentMessageId` and `activeChildId` to messages,
`activeRootMessageId` to threads. Migration backfills existing messages
into a linear chain and links regenerated messages as siblings.

**Backend**: Tree-walking queries (getActivePathFromRoot, getAncestorPath,
getSiblingsOf), rewritten history builder that follows the ancestor path,
new editMessage and switchBranch services, regenerate now works on any
assistant message (not just the latest).

**Frontend**: BranchNavigator component (← 2/3 → arrows), inline edit on
user messages, per-message regenerate on assistant messages, regenerate
button removed from composer (now per-message).

**Infra**: Pin Node 22 via Volta to prevent ABI mismatches with
better-sqlite3 across environments.
This commit is contained in:
0x1355 2026-03-19 12:25:00 +01:00 committed by GitHub
parent f19471ab58
commit 4787f4d151
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1085 additions and 181 deletions

View File

@ -9,6 +9,7 @@ import type {
ApplicationTask,
AppSettings,
BackupInfo,
BranchInfo,
DemoInfoResponse,
Job,
JobActionRequest,
@ -559,7 +560,7 @@ export async function listJobChatThreads(jobId: string): Promise<{
export async function listJobGhostwriterMessages(
jobId: string,
options?: { limit?: number; offset?: number },
): Promise<{ messages: JobChatMessage[] }> {
): Promise<{ messages: JobChatMessage[]; branches: BranchInfo[] }> {
const params = new URLSearchParams();
if (typeof options?.limit === "number") {
params.set("limit", String(options.limit));
@ -568,7 +569,7 @@ export async function listJobGhostwriterMessages(
params.set("offset", String(options.offset));
}
const query = params.toString();
return fetchApi<{ messages: JobChatMessage[] }>(
return fetchApi<{ messages: JobChatMessage[]; branches: BranchInfo[] }>(
`/jobs/${jobId}/chat/messages${query ? `?${query}` : ""}`,
);
}
@ -747,6 +748,37 @@ export async function streamRegenerateJobGhostwriterMessage(
);
}
export async function editJobGhostwriterMessage(
jobId: string,
messageId: string,
input: { content: string; signal?: AbortSignal },
handlers: {
onEvent: (event: JobChatStreamEvent) => void;
},
): Promise<void> {
return streamSseEvents(
`/jobs/${jobId}/chat/messages/${messageId}/edit`,
{ content: input.content, stream: true },
{
onEvent: handlers.onEvent,
signal: input.signal,
},
);
}
export async function switchJobGhostwriterBranch(
jobId: string,
messageId: string,
): Promise<{ messages: JobChatMessage[]; branches: BranchInfo[] }> {
return fetchApi<{ messages: JobChatMessage[]; branches: BranchInfo[] }>(
`/jobs/${jobId}/chat/messages/${messageId}/switch-branch`,
{
method: "POST",
body: JSON.stringify({}),
},
);
}
function toJobIdList(idOrIds: string | string[]): string[] {
return Array.isArray(idOrIds) ? idOrIds : [idOrIds];
}

View File

@ -0,0 +1,44 @@
import type { BranchInfo } from "@shared/types";
import { ChevronLeft, ChevronRight } from "lucide-react";
import type React from "react";
type BranchNavigatorProps = {
branchInfo: BranchInfo;
onSwitch: (messageId: string) => void;
};
export const BranchNavigator: React.FC<BranchNavigatorProps> = ({
branchInfo,
onSwitch,
}) => {
const { siblingIds, activeIndex } = branchInfo;
const total = siblingIds.length;
const canGoLeft = activeIndex > 0;
const canGoRight = activeIndex < total - 1;
return (
<div className="inline-flex items-center gap-0.5 text-[10px] text-muted-foreground">
<button
type="button"
disabled={!canGoLeft}
onClick={() => canGoLeft && onSwitch(siblingIds[activeIndex - 1])}
className="rounded p-0.5 hover:bg-muted/60 disabled:opacity-30 disabled:cursor-default"
aria-label="Previous variant"
>
<ChevronLeft className="h-3 w-3" />
</button>
<span className="tabular-nums">
{activeIndex + 1}/{total}
</span>
<button
type="button"
disabled={!canGoRight}
onClick={() => canGoRight && onSwitch(siblingIds[activeIndex + 1])}
className="rounded p-0.5 hover:bg-muted/60 disabled:opacity-30 disabled:cursor-default"
aria-label="Next variant"
>
<ChevronRight className="h-3 w-3" />
</button>
</div>
);
};

View File

@ -1,5 +1,5 @@
import { getMetaShortcutLabel, isMetaKeyPressed } from "@client/lib/meta-key";
import { Eraser, RefreshCcw, Send, Square } from "lucide-react";
import { Eraser, Send, Square } from "lucide-react";
import type React from "react";
import { useState } from "react";
import { Button } from "@/components/ui/button";
@ -8,9 +8,7 @@ import { Textarea } from "@/components/ui/textarea";
type ComposerProps = {
disabled?: boolean;
isStreaming: boolean;
canRegenerate: boolean;
canReset: boolean;
onRegenerate: () => Promise<void>;
onStop: () => Promise<void>;
onSend: (content: string) => Promise<void>;
onReset: () => void;
@ -19,9 +17,7 @@ type ComposerProps = {
export const Composer: React.FC<ComposerProps> = ({
disabled,
isStreaming,
canRegenerate,
canReset,
onRegenerate,
onStop,
onSend,
onReset,
@ -67,7 +63,7 @@ export const Composer: React.FC<ComposerProps> = ({
<Eraser className="h-3.5 w-3.5" />
</Button>
{isStreaming ? (
{isStreaming && (
<Button
size="icon"
variant="outline"
@ -77,17 +73,6 @@ export const Composer: React.FC<ComposerProps> = ({
>
<Square className="h-3.5 w-3.5" />
</Button>
) : (
<Button
size="icon"
variant="outline"
onClick={() => void onRegenerate()}
disabled={disabled || !canRegenerate}
aria-label="Regenerate response"
title="Regenerate response"
>
<RefreshCcw className="h-3.5 w-3.5" />
</Button>
)}
<Button

View File

@ -1,5 +1,10 @@
import * as api from "@client/api";
import type { Job, JobChatMessage, JobChatStreamEvent } from "@shared/types";
import type {
BranchInfo,
Job,
JobChatMessage,
JobChatStreamEvent,
} from "@shared/types";
import type React from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
@ -22,6 +27,7 @@ type GhostwriterPanelProps = {
export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
const [messages, setMessages] = useState<JobChatMessage[]>([]);
const [branches, setBranches] = useState<BranchInfo[]>([]);
const [isLoading, setIsLoading] = useState(true);
const [isStreaming, setIsStreaming] = useState(false);
const [streamingMessageId, setStreamingMessageId] = useState<string | null>(
@ -49,6 +55,7 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
limit: 300,
});
setMessages(data.messages);
setBranches(data.branches);
}, [job.id]);
const load = useCallback(async () => {
@ -94,6 +101,8 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
tokensOut: null,
version: 1,
replacesMessageId: null,
parentMessageId: null,
activeChildId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
@ -158,6 +167,8 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
tokensOut: null,
version: 1,
replacesMessageId: null,
parentMessageId: null,
activeChildId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
@ -209,43 +220,120 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
}
}, [activeRunId, job.id, loadMessages]);
const canRegenerate = useMemo(() => {
if (isStreaming || messages.length === 0) return false;
const last = messages[messages.length - 1];
return last.role === "assistant";
}, [isStreaming, messages]);
const regenerate = useCallback(
async (assistantMessageId: string) => {
if (isStreaming) return;
const regenerate = useCallback(async () => {
if (isStreaming || messages.length === 0) return;
const last = messages[messages.length - 1];
if (last.role !== "assistant") return;
// Remove messages below the branch point (everything after the regenerated message disappears)
setMessages((current) => {
const targetIndex = current.findIndex(
(m) => m.id === assistantMessageId,
);
if (targetIndex === -1) return current;
return current.slice(0, targetIndex);
});
setIsStreaming(true);
const controller = new AbortController();
streamAbortRef.current = controller;
setIsStreaming(true);
const controller = new AbortController();
streamAbortRef.current = controller;
try {
await api.streamRegenerateJobGhostwriterMessage(
job.id,
last.id,
{ signal: controller.signal },
{ onEvent: onStreamEvent },
);
await loadMessages();
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
return;
try {
await api.streamRegenerateJobGhostwriterMessage(
job.id,
assistantMessageId,
{ signal: controller.signal },
{ onEvent: onStreamEvent },
);
await loadMessages();
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
return;
}
const message =
error instanceof Error
? error.message
: "Failed to regenerate response";
toast.error(message);
} finally {
streamAbortRef.current = null;
setIsStreaming(false);
}
const message =
error instanceof Error
? error.message
: "Failed to regenerate response";
toast.error(message);
} finally {
streamAbortRef.current = null;
setIsStreaming(false);
}
}, [isStreaming, job.id, loadMessages, messages, onStreamEvent]);
},
[isStreaming, job.id, loadMessages, onStreamEvent],
);
const editMessage = useCallback(
async (messageId: string, content: string) => {
if (isStreaming) return;
// Remove the edited message and everything below it (old branch disappears)
setMessages((current) => {
const targetIndex = current.findIndex((m) => m.id === messageId);
if (targetIndex === -1) return current;
// Keep everything before the edited message, add an optimistic new user message
const before = current.slice(0, targetIndex);
return [
...before,
{
id: `tmp-edit-${Date.now()}`,
threadId: current[0]?.threadId || "pending-thread",
jobId: job.id,
role: "user" as const,
content,
status: "complete" as const,
tokensIn: null,
tokensOut: null,
version: 1,
replacesMessageId: null,
parentMessageId: null,
activeChildId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
];
});
setIsStreaming(true);
const controller = new AbortController();
streamAbortRef.current = controller;
try {
await api.editJobGhostwriterMessage(
job.id,
messageId,
{ content, signal: controller.signal },
{ onEvent: onStreamEvent },
);
await loadMessages();
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
return;
}
const message =
error instanceof Error ? error.message : "Failed to edit message";
toast.error(message);
} finally {
streamAbortRef.current = null;
setIsStreaming(false);
}
},
[isStreaming, job.id, loadMessages, onStreamEvent],
);
const switchBranch = useCallback(
async (messageId: string) => {
try {
const result = await api.switchJobGhostwriterBranch(job.id, messageId);
setMessages(result.messages);
setBranches(result.branches);
} catch (error) {
const message =
error instanceof Error ? error.message : "Failed to switch branch";
toast.error(message);
}
},
[job.id],
);
const canReset = useMemo(() => {
return !isStreaming && messages.length > 0;
@ -255,6 +343,7 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
try {
await api.resetJobGhostwriterConversation(job.id);
setMessages([]);
setBranches([]);
toast.success("Conversation cleared");
} catch (error) {
const message =
@ -283,8 +372,12 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
) : (
<MessageList
messages={messages}
branches={branches}
isStreaming={isStreaming}
streamingMessageId={streamingMessageId}
onRegenerate={regenerate}
onEdit={editMessage}
onSwitchBranch={switchBranch}
/>
)}
</div>
@ -293,9 +386,7 @@ export const GhostwriterPanel: React.FC<GhostwriterPanelProps> = ({ job }) => {
<Composer
disabled={isLoading || isStreaming}
isStreaming={isStreaming}
canRegenerate={canRegenerate}
canReset={canReset}
onRegenerate={regenerate}
onStop={stopStreaming}
onSend={sendMessage}
onReset={() => setIsResetDialogOpen(true)}

View File

@ -1,20 +1,59 @@
import type { JobChatMessage } from "@shared/types";
import type { BranchInfo, JobChatMessage } from "@shared/types";
import { Pencil, RefreshCcw } from "lucide-react";
import type React from "react";
import { useState } from "react";
import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { Button } from "@/components/ui/button";
import { Textarea } from "@/components/ui/textarea";
import { BranchNavigator } from "./BranchNavigator";
import { StreamingMessage } from "./StreamingMessage";
type MessageListProps = {
messages: JobChatMessage[];
branches: BranchInfo[];
isStreaming: boolean;
streamingMessageId: string | null;
onRegenerate: (messageId: string) => void;
onEdit: (messageId: string, content: string) => void;
onSwitchBranch: (messageId: string) => void;
};
export const MessageList: React.FC<MessageListProps> = ({
messages,
branches,
isStreaming,
streamingMessageId,
onRegenerate,
onEdit,
onSwitchBranch,
}) => {
const [editingMessageId, setEditingMessageId] = useState<string | null>(null);
const [editContent, setEditContent] = useState("");
const branchMap = new Map<string, BranchInfo>();
for (const branch of branches) {
branchMap.set(branch.messageId, branch);
}
const startEditing = (message: JobChatMessage) => {
setEditingMessageId(message.id);
setEditContent(message.content);
};
const cancelEditing = () => {
setEditingMessageId(null);
setEditContent("");
};
const submitEdit = (messageId: string) => {
const content = editContent.trim();
if (!content) return;
onEdit(messageId, content);
setEditingMessageId(null);
setEditContent("");
};
return (
<div className="space-y-3">
{messages.length > 0 &&
@ -24,22 +63,81 @@ export const MessageList: React.FC<MessageListProps> = ({
isStreaming &&
message.role === "assistant" &&
streamingMessageId === message.id;
const isEditing = editingMessageId === message.id;
const branch = branchMap.get(message.id);
return (
<div
key={message.id}
className={`rounded-lg border p-3 ${
className={`group rounded-lg border p-3 ${
isUser
? "border-primary/30 bg-primary/5"
: "border-border/60 bg-background"
}`}
>
<div className="mb-1 text-[10px] uppercase tracking-wide text-muted-foreground">
{isUser
? "You"
: `Ghostwriter${message.version > 1 ? ` v${message.version}` : ""}`}
<div className="mb-1 flex items-center gap-2">
<span className="text-[10px] uppercase tracking-wide text-muted-foreground">
{isUser ? "You" : "Ghostwriter"}
</span>
{branch && (
<BranchNavigator
branchInfo={branch}
onSwitch={onSwitchBranch}
/>
)}
<div className="ml-auto flex items-center gap-1 opacity-0 transition-opacity group-hover:opacity-100">
{isUser && !isStreaming && !isEditing && (
<button
type="button"
onClick={() => startEditing(message)}
className="rounded p-1 text-muted-foreground hover:bg-muted/60 hover:text-foreground"
aria-label="Edit message"
title="Edit message"
>
<Pencil className="h-3 w-3" />
</button>
)}
{!isUser && !isStreaming && !isActiveStreaming && (
<button
type="button"
onClick={() => onRegenerate(message.id)}
className="rounded p-1 text-muted-foreground hover:bg-muted/60 hover:text-foreground"
aria-label="Regenerate response"
title="Regenerate response"
>
<RefreshCcw className="h-3 w-3" />
</button>
)}
</div>
</div>
{isActiveStreaming ? (
{isEditing ? (
<div className="space-y-2">
<Textarea
value={editContent}
onChange={(e) => setEditContent(e.target.value)}
onKeyDown={(e) => {
if (e.key === "Escape") {
cancelEditing();
}
}}
className="min-h-[60px]"
autoFocus
/>
<div className="flex items-center justify-end gap-1">
<Button size="sm" variant="ghost" onClick={cancelEditing}>
Cancel
</Button>
<Button
size="sm"
onClick={() => submitEdit(message.id)}
disabled={!editContent.trim()}
>
Submit
</Button>
</div>
</div>
) : isActiveStreaming ? (
<StreamingMessage content={message.content} />
) : message.role === "assistant" ? (
<div className="text-sm leading-relaxed text-foreground [&_a]:text-primary [&_a]:underline [&_blockquote]:border-l [&_blockquote]:border-border [&_blockquote]:pl-3 [&_code]:rounded [&_code]:bg-muted/40 [&_code]:px-1 [&_h1]:mt-4 [&_h1]:text-base [&_h1]:font-semibold [&_h2]:mt-4 [&_h2]:text-sm [&_h2]:font-semibold [&_h3]:mt-3 [&_h3]:text-sm [&_h3]:font-semibold [&_li]:my-1 [&_ol]:my-2 [&_ol]:list-decimal [&_ol]:pl-5 [&_p]:my-2 [&_pre]:my-3 [&_pre]:overflow-x-auto [&_pre]:rounded [&_pre]:bg-muted/40 [&_pre]:p-3 [&_ul]:my-2 [&_ul]:list-disc [&_ul]:pl-5">

View File

@ -2,6 +2,19 @@ import type { Server } from "node:http";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { startServer, stopServer } from "./test-utils";
const baseMsgFields = {
threadId: "thread-1",
jobId: "job-1",
tokensIn: 1,
tokensOut: null,
version: 1,
replacesMessageId: null,
parentMessageId: null,
activeChildId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
vi.mock("@server/services/ghostwriter", () => ({
listThreads: vi.fn(async () => [
{
@ -23,97 +36,63 @@ vi.mock("@server/services/ghostwriter", () => ({
lastMessageAt: null,
}),
),
listMessages: vi.fn(async () => [
{
id: "message-1",
threadId: "thread-1",
jobId: "job-1",
role: "user",
content: "hello",
status: "complete",
tokensIn: 1,
tokensOut: null,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]),
listMessagesForJob: vi.fn(async () => [
{
id: "message-1",
threadId: "thread-1",
jobId: "job-1",
role: "user",
content: "hello",
status: "complete",
tokensIn: 1,
tokensOut: null,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]),
listMessages: vi.fn(async () => ({
messages: [
{
id: "message-1",
...baseMsgFields,
role: "user",
content: "hello",
status: "complete",
},
],
branches: [],
})),
listMessagesForJob: vi.fn(async () => ({
messages: [
{
id: "message-1",
...baseMsgFields,
role: "user",
content: "hello",
status: "complete",
},
],
branches: [],
})),
sendMessage: vi.fn(async () => ({
userMessage: {
id: "user-1",
threadId: "thread-1",
jobId: "job-1",
...baseMsgFields,
role: "user",
content: "hello",
status: "complete",
tokensIn: 1,
tokensOut: null,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
assistantMessage: {
id: "assistant-1",
threadId: "thread-1",
jobId: "job-1",
...baseMsgFields,
role: "assistant",
content: "hi",
status: "complete",
tokensIn: 1,
tokensOut: 1,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
runId: "run-1",
})),
sendMessageForJob: vi.fn(async () => ({
userMessage: {
id: "user-1",
threadId: "thread-1",
jobId: "job-1",
...baseMsgFields,
role: "user",
content: "hello",
status: "complete",
tokensIn: 1,
tokensOut: null,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
assistantMessage: {
id: "assistant-1",
threadId: "thread-1",
jobId: "job-1",
...baseMsgFields,
role: "assistant",
content: "hi",
status: "complete",
tokensIn: 1,
tokensOut: 1,
version: 1,
replacesMessageId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
runId: "run-1",
})),
@ -122,19 +101,47 @@ vi.mock("@server/services/ghostwriter", () => ({
runId: "run-2",
assistantMessage: {
id: "assistant-2",
threadId: "thread-1",
jobId: "job-1",
...baseMsgFields,
role: "assistant",
content: "updated",
status: "complete",
tokensIn: 1,
tokensOut: 1,
version: 2,
replacesMessageId: "assistant-1",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
parentMessageId: "user-1",
},
})),
editMessageForJob: vi.fn(async () => ({
userMessage: {
id: "user-2",
...baseMsgFields,
role: "user",
content: "edited",
status: "complete",
},
assistantMessage: {
id: "assistant-3",
...baseMsgFields,
role: "assistant",
content: "reply to edit",
status: "complete",
tokensOut: 3,
parentMessageId: "user-2",
},
runId: "run-3",
})),
switchBranchForJob: vi.fn(async () => ({
messages: [
{
id: "message-1",
...baseMsgFields,
role: "user",
content: "hello",
status: "complete",
},
],
branches: [],
})),
}));
describe.sequential("Ghostwriter API", () => {
@ -151,7 +158,7 @@ describe.sequential("Ghostwriter API", () => {
await stopServer({ server, closeDb, tempDir });
});
it("lists messages with request id metadata", async () => {
it("lists messages with request id metadata and branch info", async () => {
const res = await fetch(`${baseUrl}/api/jobs/job-1/chat/messages`, {
headers: {
"x-request-id": "chat-req-1",
@ -163,6 +170,7 @@ describe.sequential("Ghostwriter API", () => {
expect(res.headers.get("x-request-id")).toBe("chat-req-1");
expect(body.ok).toBe(true);
expect(body.data.messages.length).toBe(1);
expect(body.data.branches).toEqual([]);
expect(body.meta.requestId).toBe("chat-req-1");
});
@ -182,4 +190,38 @@ describe.sequential("Ghostwriter API", () => {
expect(messageBody.data.assistantMessage.role).toBe("assistant");
expect(typeof messageBody.meta.requestId).toBe("string");
});
it("edits a user message", async () => {
const res = await fetch(
`${baseUrl}/api/jobs/job-1/chat/messages/user-1/edit`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ content: "edited content" }),
},
);
const body = await res.json();
expect(res.status).toBe(200);
expect(body.ok).toBe(true);
expect(body.data.runId).toBe("run-3");
expect(body.data.userMessage.content).toBe("edited");
});
it("switches branch", async () => {
const res = await fetch(
`${baseUrl}/api/jobs/job-1/chat/messages/message-1/switch-branch`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({}),
},
);
const body = await res.json();
expect(res.status).toBe(200);
expect(body.ok).toBe(true);
expect(body.data.messages.length).toBe(1);
expect(body.data.branches).toEqual([]);
});
});

View File

@ -26,6 +26,11 @@ const regenerateSchema = z.object({
stream: z.boolean().optional(),
});
const editMessageSchema = z.object({
content: z.string().trim().min(1).max(20000),
stream: z.boolean().optional(),
});
function getJobId(req: Request): string {
const jobId = req.params.id;
if (!jobId) {
@ -47,12 +52,12 @@ ghostwriterRouter.get(
}
await runWithRequestContext({ jobId }, async () => {
const messages = await ghostwriterService.listMessagesForJob({
const result = await ghostwriterService.listMessagesForJob({
jobId,
limit: parsed.data.limit,
offset: parsed.data.offset,
});
ok(res, { messages });
ok(res, { messages: result.messages, branches: result.branches });
});
}),
);
@ -259,6 +264,123 @@ ghostwriterRouter.post(
}),
);
ghostwriterRouter.post(
"/messages/:messageId/edit",
asyncRoute(async (req, res) => {
const jobId = getJobId(req);
const messageId = req.params.messageId;
if (!messageId) {
return fail(res, badRequest("Missing message id"));
}
const parsed = editMessageSchema.safeParse(req.body);
if (!parsed.success) {
return fail(
res,
badRequest(parsed.error.message, parsed.error.flatten()),
);
}
await runWithRequestContext({ jobId }, async () => {
if (parsed.data.stream) {
setupSse(res, {
cacheControl: "no-cache, no-transform",
flushHeaders: true,
});
try {
await ghostwriterService.editMessageForJob({
jobId,
messageId,
content: parsed.data.content,
stream: {
onReady: ({ runId, threadId, messageId, requestId }) =>
writeSseData(res, {
type: "ready",
runId,
threadId,
messageId,
requestId,
}),
onDelta: ({ runId, messageId, delta }) =>
writeSseData(res, {
type: "delta",
runId,
messageId,
delta,
}),
onCompleted: ({ runId, message }) =>
writeSseData(res, {
type: "completed",
runId,
message,
}),
onCancelled: ({ runId, message }) =>
writeSseData(res, {
type: "cancelled",
runId,
message,
}),
onError: ({ runId, code, message, requestId }) =>
writeSseData(res, {
type: "error",
runId,
code,
message,
requestId,
}),
},
});
} catch (error) {
const appError = toAppError(error);
writeSseData(res, {
type: "error",
code: appError.code,
message: appError.message,
requestId: res.getHeader("x-request-id") || "unknown",
});
} finally {
res.end();
}
return;
}
const result = await ghostwriterService.editMessageForJob({
jobId,
messageId,
content: parsed.data.content,
});
ok(res, {
userMessage: result.userMessage,
assistantMessage: result.assistantMessage,
runId: result.runId,
});
});
}),
);
ghostwriterRouter.post(
"/messages/:messageId/switch-branch",
asyncRoute(async (req, res) => {
const jobId = getJobId(req);
const messageId = req.params.messageId;
if (!messageId) {
return fail(res, badRequest("Missing message id"));
}
await runWithRequestContext({ jobId }, async () => {
const result = await ghostwriterService.switchBranchForJob({
jobId,
messageId,
});
ok(res, { messages: result.messages, branches: result.branches });
});
}),
);
ghostwriterRouter.post(
"/reset",
asyncRoute(async (req, res) => {
@ -326,13 +448,13 @@ ghostwriterRouter.get(
}
await runWithRequestContext({ jobId }, async () => {
const messages = await ghostwriterService.listMessages({
const result = await ghostwriterService.listMessages({
jobId,
threadId,
limit: parsed.data.limit,
offset: parsed.data.offset,
});
ok(res, { messages });
ok(res, { messages: result.messages, branches: result.branches });
});
}),
);

View File

@ -103,6 +103,7 @@ const migrations = [
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
last_message_at TEXT,
active_root_message_id TEXT,
FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE
)`,
@ -117,6 +118,8 @@ const migrations = [
tokens_out INTEGER,
version INTEGER NOT NULL DEFAULT 1,
replaces_message_id TEXT,
parent_message_id TEXT,
active_child_id TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (thread_id) REFERENCES job_chat_threads(id) ON DELETE CASCADE,
@ -576,6 +579,45 @@ const migrations = [
'closed'
)`,
// Branching conversations: add parent_message_id and active_child_id to job_chat_messages
`ALTER TABLE job_chat_messages ADD COLUMN parent_message_id TEXT`,
`ALTER TABLE job_chat_messages ADD COLUMN active_child_id TEXT`,
`ALTER TABLE job_chat_threads ADD COLUMN active_root_message_id TEXT`,
// Backfill: link existing messages into a linear chain (each message's parent = its predecessor)
`UPDATE job_chat_messages
SET parent_message_id = (
SELECT prev.id
FROM job_chat_messages prev
WHERE prev.thread_id = job_chat_messages.thread_id
AND prev.created_at < job_chat_messages.created_at
ORDER BY prev.created_at DESC
LIMIT 1
)
WHERE parent_message_id IS NULL`,
// Backfill: for regenerated messages, re-link as siblings (same parent as the message they replaced)
`UPDATE job_chat_messages
SET parent_message_id = (
SELECT orig.parent_message_id
FROM job_chat_messages orig
WHERE orig.id = job_chat_messages.replaces_message_id
)
WHERE replaces_message_id IS NOT NULL`,
// Backfill: set active_child_id on every parent to its newest child
`UPDATE job_chat_messages
SET active_child_id = (
SELECT child.id
FROM job_chat_messages child
WHERE child.parent_message_id = job_chat_messages.id
ORDER BY child.created_at DESC
LIMIT 1
)
WHERE id IN (SELECT DISTINCT parent_message_id FROM job_chat_messages WHERE parent_message_id IS NOT NULL)`,
`CREATE INDEX IF NOT EXISTS idx_job_chat_messages_parent ON job_chat_messages(parent_message_id)`,
// Backfill: Mark closed applications from latest stage event.
`UPDATE jobs
SET
@ -621,7 +663,13 @@ for (const migration of migrations) {
.includes("alter table post_application_messages add column") ||
migration
.toLowerCase()
.includes("alter table stage_events add column")) &&
.includes("alter table stage_events add column") ||
migration
.toLowerCase()
.includes("alter table job_chat_messages add column") ||
migration
.toLowerCase()
.includes("alter table job_chat_threads add column")) &&
message.toLowerCase().includes("duplicate column name");
if (isDuplicateColumn) {

View File

@ -176,6 +176,7 @@ export const jobChatThreads = sqliteTable(
createdAt: text("created_at").notNull().default(sql`(datetime('now'))`),
updatedAt: text("updated_at").notNull().default(sql`(datetime('now'))`),
lastMessageAt: text("last_message_at"),
activeRootMessageId: text("active_root_message_id"),
},
(table) => ({
jobUpdatedIndex: index("idx_job_chat_threads_job_updated").on(
@ -204,6 +205,8 @@ export const jobChatMessages = sqliteTable(
tokensOut: integer("tokens_out"),
version: integer("version").notNull().default(1),
replacesMessageId: text("replaces_message_id"),
parentMessageId: text("parent_message_id"),
activeChildId: text("active_child_id"),
createdAt: text("created_at").notNull().default(sql`(datetime('now'))`),
updatedAt: text("updated_at").notNull().default(sql`(datetime('now'))`),
},

View File

@ -20,6 +20,7 @@ function mapThread(row: typeof jobChatThreads.$inferSelect): JobChatThread {
createdAt: row.createdAt,
updatedAt: row.updatedAt,
lastMessageAt: row.lastMessageAt,
activeRootMessageId: row.activeRootMessageId,
};
}
@ -35,6 +36,8 @@ function mapMessage(row: typeof jobChatMessages.$inferSelect): JobChatMessage {
tokensOut: row.tokensOut,
version: row.version,
replacesMessageId: row.replacesMessageId,
parentMessageId: row.parentMessageId,
activeChildId: row.activeChildId,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
};
@ -182,6 +185,7 @@ export async function createMessage(input: {
tokensOut?: number | null;
version?: number;
replacesMessageId?: string | null;
parentMessageId?: string | null;
}): Promise<JobChatMessage> {
const id = randomUUID();
const now = new Date().toISOString();
@ -197,6 +201,7 @@ export async function createMessage(input: {
tokensOut: input.tokensOut ?? null,
version: input.version ?? 1,
replacesMessageId: input.replacesMessageId ?? null,
parentMessageId: input.parentMessageId ?? null,
createdAt: now,
updatedAt: now,
});
@ -362,6 +367,188 @@ export async function deleteAllRunsForThread(
return result.changes;
}
/**
* Set the active root message for a thread (for branch navigation of root messages).
*/
export async function setActiveRoot(
threadId: string,
messageId: string,
): Promise<void> {
const now = new Date().toISOString();
await db
.update(jobChatThreads)
.set({ activeRootMessageId: messageId, updatedAt: now })
.where(eq(jobChatThreads.id, threadId));
}
/**
* Set the active child pointer on a parent message (for branch navigation).
*/
export async function setActiveChild(
messageId: string,
activeChildId: string,
): Promise<void> {
const now = new Date().toISOString();
await db
.update(jobChatMessages)
.set({ activeChildId, updatedAt: now })
.where(eq(jobChatMessages.id, messageId));
}
/**
* Get all children of a message, ordered by createdAt.
*/
export async function getChildrenOfMessage(
parentMessageId: string,
): Promise<JobChatMessage[]> {
const rows = await db
.select()
.from(jobChatMessages)
.where(eq(jobChatMessages.parentMessageId, parentMessageId))
.orderBy(jobChatMessages.createdAt);
return rows.map(mapMessage);
}
/**
* Get siblings of a message (all children of the same parent) and which index is active.
*/
export async function getSiblingsOf(
messageId: string,
): Promise<{ siblings: JobChatMessage[]; activeIndex: number }> {
const message = await getMessageById(messageId);
if (!message) {
return { siblings: [], activeIndex: 0 };
}
// Root messages: siblings are all root messages in the same thread
if (!message.parentMessageId) {
const allInThread = await db
.select()
.from(jobChatMessages)
.where(
and(
eq(jobChatMessages.threadId, message.threadId),
eq(jobChatMessages.role, message.role),
),
)
.orderBy(jobChatMessages.createdAt);
const rootSiblings = allInThread
.map(mapMessage)
.filter((m) => !m.parentMessageId);
if (rootSiblings.length <= 1) {
return { siblings: rootSiblings, activeIndex: 0 };
}
// Active root determined by thread's activeRootMessageId
const thread = await getThreadById(message.threadId);
const activeId = thread?.activeRootMessageId ?? messageId;
const activeIndex = Math.max(
0,
rootSiblings.findIndex((s) => s.id === activeId),
);
return { siblings: rootSiblings, activeIndex };
}
const parent = await getMessageById(message.parentMessageId);
const siblings = await getChildrenOfMessage(message.parentMessageId);
// The active child is determined by the parent's activeChildId pointer
const activeId = parent?.activeChildId ?? messageId;
const activeIndex = Math.max(
0,
siblings.findIndex((s) => s.id === activeId),
);
return { siblings, activeIndex };
}
/**
* Walk the tree from root to leaf following activeChildId pointers.
* Returns the "active path" the conversation the user currently sees.
*/
export async function getActivePathFromRoot(
threadId: string,
): Promise<JobChatMessage[]> {
// Load all messages for this thread into memory (fine for typical sizes)
const allRows = await db
.select()
.from(jobChatMessages)
.where(eq(jobChatMessages.threadId, threadId))
.orderBy(jobChatMessages.createdAt);
const all = allRows.map(mapMessage);
if (all.length === 0) return [];
// Build lookup maps
const byId = new Map<string, JobChatMessage>();
const childrenOf = new Map<string, JobChatMessage[]>();
for (const msg of all) {
byId.set(msg.id, msg);
const parentId = msg.parentMessageId;
if (parentId) {
const existing = childrenOf.get(parentId) ?? [];
existing.push(msg);
childrenOf.set(parentId, existing);
}
}
// Find root(s) — messages with no parent
const roots = all.filter((m) => !m.parentMessageId);
if (roots.length === 0) {
// Fallback for legacy data without parentMessageId backfill
return all;
}
// Pick the active root: use thread's activeRootMessageId, fall back to newest
const thread = await getThreadById(threadId);
const preferredRootId = thread?.activeRootMessageId;
const activeRoot = preferredRootId
? roots.find((r) => r.id === preferredRootId)
: undefined;
// Walk from root following activeChildId, falling back to newest child
const path: JobChatMessage[] = [];
let currentMsg: JobChatMessage | undefined =
activeRoot ?? roots[roots.length - 1];
while (currentMsg) {
path.push(currentMsg);
const children = childrenOf.get(currentMsg.id);
if (!children || children.length === 0) break;
// Follow activeChildId if set, otherwise pick newest
const wantId: string | null = currentMsg.activeChildId;
const next: JobChatMessage | undefined = wantId
? children.find((c) => c.id === wantId)
: undefined;
currentMsg = next ?? children[children.length - 1];
}
return path;
}
/**
* Walk from a message up to the root via parentMessageId.
* Returns messages in chronological order (root first).
*/
export async function getAncestorPath(
messageId: string,
): Promise<JobChatMessage[]> {
const path: JobChatMessage[] = [];
let currentId: string | null = messageId;
while (currentId) {
const msg = await getMessageById(currentId);
if (!msg) break;
path.unshift(msg); // prepend — we're walking backwards
currentId = msg.parentMessageId;
}
return path;
}
export async function completeRunIfRunning(
runId: string,
input: {

View File

@ -18,6 +18,12 @@ const mocks = vi.hoisted(() => ({
getMessageById: vi.fn(),
getLatestAssistantMessage: vi.fn(),
getRunById: vi.fn(),
getActivePathFromRoot: vi.fn(),
getAncestorPath: vi.fn(),
setActiveChild: vi.fn(),
setActiveRoot: vi.fn(),
getSiblingsOf: vi.fn(),
getChildrenOfMessage: vi.fn(),
},
settings: {
getAllSettings: vi.fn(),
@ -57,6 +63,12 @@ vi.mock("../repositories/ghostwriter", () => ({
getMessageById: mocks.repo.getMessageById,
getLatestAssistantMessage: mocks.repo.getLatestAssistantMessage,
getRunById: mocks.repo.getRunById,
getActivePathFromRoot: mocks.repo.getActivePathFromRoot,
getAncestorPath: mocks.repo.getAncestorPath,
setActiveChild: mocks.repo.setActiveChild,
getSiblingsOf: mocks.repo.getSiblingsOf,
getChildrenOfMessage: mocks.repo.getChildrenOfMessage,
setActiveRoot: mocks.repo.setActiveRoot,
}));
vi.mock("./llm/service", () => ({
@ -80,6 +92,7 @@ const thread = {
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
lastMessageAt: null,
activeRootMessageId: "user-1",
};
const baseUserMessage: JobChatMessage = {
@ -93,6 +106,8 @@ const baseUserMessage: JobChatMessage = {
tokensOut: null,
version: 1,
replacesMessageId: null,
parentMessageId: null,
activeChildId: "assistant-1",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
@ -108,6 +123,8 @@ const baseAssistantMessage: JobChatMessage = {
tokensOut: 4,
version: 1,
replacesMessageId: null,
parentMessageId: "user-1",
activeChildId: null,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
@ -182,6 +199,20 @@ describe("ghostwriter service", () => {
status: "failed",
},
]);
mocks.repo.getActivePathFromRoot.mockResolvedValue([
baseUserMessage,
baseAssistantMessage,
]);
mocks.repo.getAncestorPath.mockResolvedValue([
baseUserMessage,
baseAssistantMessage,
]);
mocks.repo.setActiveChild.mockResolvedValue(undefined);
mocks.repo.setActiveRoot.mockResolvedValue(undefined);
mocks.repo.getSiblingsOf.mockResolvedValue({
siblings: [baseAssistantMessage],
activeIndex: 0,
});
mocks.llmCallJson.mockResolvedValue({
success: true,
data: { response: "Thanks for your question." },
@ -328,23 +359,43 @@ describe("ghostwriter service", () => {
expect(result.assistantMessage?.status).toBe("cancelled");
});
it("enforces regenerate only on latest assistant message", async () => {
mocks.repo.getMessageById.mockResolvedValue(baseAssistantMessage);
mocks.repo.getLatestAssistantMessage.mockResolvedValue({
it("regenerates any assistant message, not just the latest", async () => {
const assistantPartial: JobChatMessage = {
...baseAssistantMessage,
id: "assistant-latest",
id: "assistant-regen",
content: "",
status: "partial",
parentMessageId: "user-1",
};
const assistantComplete: JobChatMessage = {
...baseAssistantMessage,
id: "assistant-regen",
content: "Thanks for your question.",
status: "complete",
parentMessageId: "user-1",
};
mocks.repo.getMessageById
.mockResolvedValueOnce(baseAssistantMessage) // target lookup
.mockResolvedValueOnce(baseUserMessage) // parent user lookup
.mockResolvedValueOnce(assistantComplete); // final lookup after run
mocks.repo.getAncestorPath.mockResolvedValue([baseUserMessage]);
mocks.repo.createMessage.mockResolvedValueOnce(assistantPartial);
mocks.repo.updateMessage.mockResolvedValue(assistantComplete);
const result = await regenerateMessage({
jobId: "job-1",
threadId: "thread-1",
assistantMessageId: "assistant-1",
});
await expect(
regenerateMessage({
jobId: "job-1",
threadId: "thread-1",
assistantMessageId: "assistant-1",
}),
).rejects.toMatchObject({
code: "INVALID_REQUEST",
status: 400,
});
expect(result.runId).toBe("run-1");
expect(result.assistantMessage?.id).toBe("assistant-regen");
expect(mocks.repo.setActiveChild).toHaveBeenCalledWith(
"user-1",
"assistant-regen",
);
});
it("returns alreadyFinished when cancelling non-running run", async () => {

View File

@ -7,7 +7,7 @@ import {
} from "@infra/errors";
import { logger } from "@infra/logger";
import { getRequestId } from "@infra/request-context";
import type { JobChatMessage, JobChatRun } from "@shared/types";
import type { BranchInfo, JobChatMessage, JobChatRun } from "@shared/types";
import * as jobChatRepo from "../repositories/ghostwriter";
import * as settingsRepo from "../repositories/settings";
import { buildJobChatPromptContext } from "./ghostwriter-context";
@ -87,10 +87,13 @@ async function resolveLlmRuntimeSettings(): Promise<LlmRuntimeSettings> {
async function buildConversationMessages(
threadId: string,
targetMessageId?: string,
): Promise<Array<{ role: "user" | "assistant"; content: string }>> {
const messages = await jobChatRepo.listMessagesForThread(threadId, {
limit: 40,
});
// If a target message is given, walk its ancestor path (branch-aware).
// Otherwise, fall back to the active path from root.
const messages = targetMessageId
? await jobChatRepo.getAncestorPath(targetMessageId)
: await jobChatRepo.getActivePathFromRoot(threadId);
return messages
.filter(
@ -98,6 +101,7 @@ async function buildConversationMessages(
message.role === "user" || message.role === "assistant",
)
.filter((message) => message.status !== "failed")
.slice(-40)
.map((message) => ({
role: message.role,
content: message.content,
@ -110,6 +114,8 @@ type GenerateReplyOptions = {
prompt: string;
replaceMessageId?: string;
version?: number;
/** Parent message ID for the assistant reply (i.e. the user message that triggered it). */
parentMessageId?: string;
stream?: {
onReady: (payload: {
runId: string;
@ -158,33 +164,50 @@ export async function listThreads(jobId: string) {
return [thread];
}
async function buildBranchInfoForPath(
messages: JobChatMessage[],
): Promise<BranchInfo[]> {
const branches: BranchInfo[] = [];
for (const msg of messages) {
const { siblings, activeIndex } = await jobChatRepo.getSiblingsOf(msg.id);
if (siblings.length > 1) {
branches.push({
messageId: msg.id,
siblingIds: siblings.map((s) => s.id),
activeIndex,
});
}
}
return branches;
}
export async function listMessages(input: {
jobId: string;
threadId: string;
limit?: number;
offset?: number;
}) {
}): Promise<{ messages: JobChatMessage[]; branches: BranchInfo[] }> {
const thread = await jobChatRepo.getThreadForJob(input.jobId, input.threadId);
if (!thread) {
throw notFound("Thread not found for this job");
}
return jobChatRepo.listMessagesForThread(input.threadId, {
limit: input.limit,
offset: input.offset,
});
const messages = await jobChatRepo.getActivePathFromRoot(input.threadId);
const branches = await buildBranchInfoForPath(messages);
return { messages, branches };
}
export async function listMessagesForJob(input: {
jobId: string;
limit?: number;
offset?: number;
}) {
}): Promise<{ messages: JobChatMessage[]; branches: BranchInfo[] }> {
const thread = await ensureJobThread(input.jobId);
return jobChatRepo.listMessagesForThread(thread.id, {
limit: input.limit,
offset: input.offset,
});
const messages = await jobChatRepo.getActivePathFromRoot(thread.id);
const branches = await buildBranchInfoForPath(messages);
return { messages, branches };
}
async function runAssistantReply(
@ -206,7 +229,7 @@ async function runAssistantReply(
const [context, llmConfig, history] = await Promise.all([
buildJobChatPromptContext(options.jobId),
resolveLlmRuntimeSettings(),
buildConversationMessages(options.threadId),
buildConversationMessages(options.threadId, options.parentMessageId),
]);
const requestId = getRequestId() ?? "unknown";
@ -237,6 +260,7 @@ async function runAssistantReply(
status: "partial",
version: options.version ?? 1,
replacesMessageId: options.replaceMessageId ?? null,
parentMessageId: options.parentMessageId ?? null,
});
} catch (error) {
await jobChatRepo.completeRun(run.id, {
@ -424,6 +448,11 @@ export async function sendMessage(input: {
throw notFound("Thread not found for this job");
}
// Determine parent: last message on the current active path
const activePath = await jobChatRepo.getActivePathFromRoot(input.threadId);
const parentId =
activePath.length > 0 ? activePath[activePath.length - 1].id : null;
const userMessage = await jobChatRepo.createMessage({
threadId: input.threadId,
jobId: input.jobId,
@ -432,15 +461,28 @@ export async function sendMessage(input: {
status: "complete",
tokensIn: estimateTokenCount(content),
tokensOut: null,
parentMessageId: parentId,
});
// Update parent's activeChildId to point to this new user message
if (parentId) {
await jobChatRepo.setActiveChild(parentId, userMessage.id);
} else {
// First message in thread — set as active root
await jobChatRepo.setActiveRoot(input.threadId, userMessage.id);
}
const result = await runAssistantReply({
jobId: input.jobId,
threadId: input.threadId,
prompt: content,
parentMessageId: userMessage.id,
stream: input.stream,
});
// Update user message's activeChildId to point to the assistant reply
await jobChatRepo.setActiveChild(userMessage.id, result.messageId);
const assistantMessage = await jobChatRepo.getMessageById(result.messageId);
return {
userMessage,
@ -487,37 +529,49 @@ export async function regenerateMessage(input: {
throw badRequest("Only assistant messages can be regenerated");
}
const latestAssistant = await jobChatRepo.getLatestAssistantMessage(
input.threadId,
);
if (!latestAssistant || latestAssistant.id !== target.id) {
throw badRequest("Only the latest assistant message can be regenerated");
// Find the parent user message (the user message that prompted this assistant reply).
// With branching, the parent is stored directly in parentMessageId.
let parentUserMessage: JobChatMessage | null = null;
if (target.parentMessageId) {
parentUserMessage = await jobChatRepo.getMessageById(
target.parentMessageId,
);
}
const messages = await jobChatRepo.listMessagesForThread(input.threadId, {
limit: 200,
});
const targetIndex = messages.findIndex((message) => message.id === target.id);
const priorUser =
targetIndex > 0
? [...messages.slice(0, targetIndex)]
.reverse()
.find((message) => message.role === "user")
: null;
// Fallback for legacy messages without parentMessageId: walk backwards in time
if (!parentUserMessage || parentUserMessage.role !== "user") {
const messages = await jobChatRepo.listMessagesForThread(input.threadId, {
limit: 200,
});
const targetIndex = messages.findIndex(
(message) => message.id === target.id,
);
parentUserMessage =
targetIndex > 0
? ([...messages.slice(0, targetIndex)]
.reverse()
.find((message) => message.role === "user") ?? null)
: null;
}
if (!priorUser) {
if (!parentUserMessage) {
throw badRequest("Could not find a user message to regenerate from");
}
// Create a new sibling assistant message with the same parent (the user message)
const result = await runAssistantReply({
jobId: input.jobId,
threadId: input.threadId,
prompt: priorUser.content,
prompt: parentUserMessage.content,
replaceMessageId: target.id,
version: (target.version || 1) + 1,
parentMessageId: parentUserMessage.id,
stream: input.stream,
});
// Update parent's activeChildId to the new assistant message (switch to new branch)
await jobChatRepo.setActiveChild(parentUserMessage.id, result.messageId);
const assistantMessage = await jobChatRepo.getMessageById(result.messageId);
return {
@ -540,6 +594,138 @@ export async function regenerateMessageForJob(input: {
});
}
export async function editMessage(input: {
jobId: string;
threadId: string;
messageId: string;
content: string;
stream?: GenerateReplyOptions["stream"];
}) {
const content = input.content.trim();
if (!content) {
throw badRequest("Message content is required");
}
const thread = await jobChatRepo.getThreadForJob(input.jobId, input.threadId);
if (!thread) {
throw notFound("Thread not found for this job");
}
const target = await jobChatRepo.getMessageById(input.messageId);
if (
!target ||
target.threadId !== input.threadId ||
target.jobId !== input.jobId
) {
throw notFound("Message not found for this thread");
}
if (target.role !== "user") {
throw badRequest("Only user messages can be edited");
}
// Create a new sibling user message (same parent as the original)
const newUserMessage = await jobChatRepo.createMessage({
threadId: input.threadId,
jobId: input.jobId,
role: "user",
content,
status: "complete",
tokensIn: estimateTokenCount(content),
tokensOut: null,
parentMessageId: target.parentMessageId,
});
// Update the grandparent's activeChildId to point to the new user message
if (target.parentMessageId) {
await jobChatRepo.setActiveChild(target.parentMessageId, newUserMessage.id);
} else {
// Editing a root message — set the new message as active root
await jobChatRepo.setActiveRoot(input.threadId, newUserMessage.id);
}
// Generate assistant reply as a child of the new user message
const result = await runAssistantReply({
jobId: input.jobId,
threadId: input.threadId,
prompt: content,
parentMessageId: newUserMessage.id,
stream: input.stream,
});
// Update new user message's activeChildId to the assistant reply
await jobChatRepo.setActiveChild(newUserMessage.id, result.messageId);
const assistantMessage = await jobChatRepo.getMessageById(result.messageId);
return {
userMessage: newUserMessage,
assistantMessage,
runId: result.runId,
};
}
export async function editMessageForJob(input: {
jobId: string;
messageId: string;
content: string;
stream?: GenerateReplyOptions["stream"];
}) {
const thread = await ensureJobThread(input.jobId);
return editMessage({
jobId: input.jobId,
threadId: thread.id,
messageId: input.messageId,
content: input.content,
stream: input.stream,
});
}
export async function switchBranch(input: {
jobId: string;
threadId: string;
messageId: string;
}): Promise<{ messages: JobChatMessage[]; branches: BranchInfo[] }> {
const thread = await jobChatRepo.getThreadForJob(input.jobId, input.threadId);
if (!thread) {
throw notFound("Thread not found for this job");
}
const target = await jobChatRepo.getMessageById(input.messageId);
if (
!target ||
target.threadId !== input.threadId ||
target.jobId !== input.jobId
) {
throw notFound("Message not found for this thread");
}
if (target.parentMessageId) {
// Update the parent's activeChildId to point to this sibling
await jobChatRepo.setActiveChild(target.parentMessageId, target.id);
} else {
// Switching between root messages
await jobChatRepo.setActiveRoot(input.threadId, target.id);
}
// Return the updated active path
return listMessages({
jobId: input.jobId,
threadId: input.threadId,
});
}
export async function switchBranchForJob(input: {
jobId: string;
messageId: string;
}): Promise<{ messages: JobChatMessage[]; branches: BranchInfo[] }> {
const thread = await ensureJobThread(input.jobId);
return switchBranch({
jobId: input.jobId,
threadId: thread.id,
messageId: input.messageId,
});
}
export async function cancelRun(input: {
jobId: string;
threadId: string;

View File

@ -28,5 +28,8 @@
"knip": "^5.83.1",
"tsx": "^4.19.2",
"typescript": "^5.9.3"
},
"volta": {
"node": "22.22.1"
}
}

View File

@ -29,6 +29,7 @@ export interface JobChatThread {
createdAt: string;
updatedAt: string;
lastMessageAt: string | null;
activeRootMessageId: string | null;
}
export interface JobChatMessage {
@ -42,10 +43,21 @@ export interface JobChatMessage {
tokensOut: number | null;
version: number;
replacesMessageId: string | null;
parentMessageId: string | null;
activeChildId: string | null;
createdAt: string;
updatedAt: string;
}
export interface BranchInfo {
/** The message ID this branch info belongs to (the currently active sibling). */
messageId: string;
/** Ordered sibling IDs at this branch point (by createdAt). */
siblingIds: string[];
/** 0-based index of the active sibling within siblingIds. */
activeIndex: number;
}
export interface JobChatRun {
id: string;
threadId: string;