From d2e9d5911cc15901050ee7fa44e29b01bac3e353 Mon Sep 17 00:00:00 2001 From: h Date: Fri, 22 May 2026 00:18:39 +0200 Subject: [PATCH] feat: streaming via extension --- src/api.ts | 132 +++++++++++++++++++++++++++++++++++++ src/main.ts | 186 +++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 301 insertions(+), 17 deletions(-) diff --git a/src/api.ts b/src/api.ts index 55a1cea..a55be74 100644 --- a/src/api.ts +++ b/src/api.ts @@ -89,3 +89,135 @@ export async function sendChat( body: JSON.stringify(req), })) as ChatResponse; } + +export interface StreamChatCallbacks { + // Fires for every intermediate snapshot (full file content as the + // gateway would have written it, frontmatter included). Caller is + // responsible for splicing it into the editor — we don't ship the + // diff because the gateway already renders the canonical view and we + // don't want two slightly-different renderers to drift. + onDelta(newContent: string): void; + // Fires exactly once at end-of-turn (success or nothing_to_do). + onDone(response: ChatResponse): void; +} + +// Server-Sent Events arrive as ``event: \ndata: \n\n`` +// frames. We can't use ``requestUrl`` (it buffers the whole body), so +// SSE is the one place in the plugin that goes through native +// ``fetch``. CORS is allowed by the gateway's ``CORSMiddleware``; auth +// is the same bearer token as the other endpoints. +export async function sendChatStream( + settings: BeaverSettings, + req: ChatRequest, + cb: StreamChatCallbacks, + signal?: AbortSignal, +): Promise { + const url = `${baseUrl(settings)}/chat/stream`; + const headers: Record = { + ...authHeader(settings), + "Content-Type": "application/json", + Accept: "text/event-stream", + }; + const res = await fetch(url, { + method: "POST", + headers, + body: JSON.stringify(req), + signal, + }); + if (!res.ok || !res.body) { + // Drain the body so we can surface a useful detail. 409 in + // particular returns JSON; the rest may be JSON or plain text. + const text = await res.text().catch(() => ""); + let detail: unknown = text; + try { + const parsed = JSON.parse(text) as { detail?: unknown }; + detail = + parsed && typeof parsed === "object" && "detail" in parsed + ? parsed.detail + : parsed; + } catch { + // not JSON; leave detail as the raw text + } + const msg = + typeof detail === "string" && detail + ? detail + : `HTTP ${res.status}`; + throw new BeaverApiError(res.status, msg, detail); + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + // SSE frames are separated by a blank line (``\n\n``). We buffer + // partial frames across reads, then flush full ones in order. + let buf = ""; + let done = false; + while (!done) { + const chunk = await reader.read(); + done = chunk.done; + if (chunk.value) buf += decoder.decode(chunk.value, { stream: !done }); + let sep = buf.indexOf("\n\n"); + while (sep >= 0) { + const frame = buf.slice(0, sep); + buf = buf.slice(sep + 2); + const handled = handleSseFrame(frame, cb); + if (handled === "stop") { + // ``done``/``error`` is terminal — stop reading even if the + // server sends extra padding before closing. + try { + await reader.cancel(); + } catch { + // best-effort cleanup + } + return; + } + sep = buf.indexOf("\n\n"); + } + } +} + +function handleSseFrame( + frame: string, + cb: StreamChatCallbacks, +): "continue" | "stop" { + // SSE lines: ``event: `` / ``data: ``. ``data`` may span + // multiple lines (concatenated with ``\n``) per the spec; we honour + // that even though the gateway emits single-line ``data:`` today. + let event = "message"; + const dataLines: string[] = []; + for (const rawLine of frame.split("\n")) { + const line = rawLine.replace(/\r$/, ""); + if (!line || line.startsWith(":")) continue; + const colon = line.indexOf(":"); + if (colon < 0) continue; + const field = line.slice(0, colon); + let value = line.slice(colon + 1); + if (value.startsWith(" ")) value = value.slice(1); + if (field === "event") event = value; + else if (field === "data") dataLines.push(value); + } + if (dataLines.length === 0) return "continue"; + let data: unknown; + try { + data = JSON.parse(dataLines.join("\n")); + } catch { + return "continue"; + } + const obj = data as Record; + if (event === "delta") { + const content = obj.new_content; + if (typeof content === "string") cb.onDelta(content); + return "continue"; + } + if (event === "done") { + cb.onDone(obj as unknown as ChatResponse); + return "stop"; + } + if (event === "error") { + const code = + typeof obj.status_code === "number" ? obj.status_code : 500; + const detail = + typeof obj.detail === "string" ? obj.detail : `HTTP ${code}`; + throw new BeaverApiError(code, detail, obj); + } + return "continue"; +} diff --git a/src/main.ts b/src/main.ts index eed0d8f..e7d27ec 100644 --- a/src/main.ts +++ b/src/main.ts @@ -9,7 +9,7 @@ import { BeaverApiError, ChatResponse, listAgents, - sendChat, + sendChatStream, } from "./api"; import { pickAgent } from "./agentPicker"; import { @@ -20,6 +20,135 @@ import { const AGENT_CACHE_TTL_MS = 5 * 60 * 1000; +// Obsidian's ``Editor`` is a thin shim over a CodeMirror 6 +// ``EditorView``; reaching for it directly is the only way to grab +// the real scrollable DOM node (``.cm-scroller``). Property name is +// unofficial but stable across recent Obsidian releases. +interface CMHandle { + scrollDOM: HTMLElement; +} +function cmOf(editor: Editor): CMHandle | null { + const cm = (editor as unknown as { cm?: CMHandle }).cm; + return cm && cm.scrollDOM instanceof HTMLElement ? cm : null; +} + +// Boundary of the YAML frontmatter block. ``---\n…---\n`` at the very +// start of the file; anything else is a no-frontmatter file and we +// return 0. +function frontmatterEnd(s: string): number { + if (!s.startsWith("---\n")) return 0; + const idx = s.indexOf("\n---\n", 4); + return idx >= 0 ? idx + 5 : 0; +} + +// Replace only the regions that actually changed instead of +// ``editor.setValue`` (which resets cursor + scroll and is expensive +// on every snapshot for long files). +// +// Fast path: when the body after the frontmatter is identical except +// for a tail append (the common case for the final write — gateway +// refreshes frontmatter at the top and appends USER_SCAFFOLD at the +// bottom, body in between is unchanged), we apply TWO small edits +// instead of one huge one. The frontmatter rewrite stays a small, +// localised change; the tail append touches only the very last bytes. +// A single big ``replaceRange`` spanning the frontmatter triggers +// Obsidian's Properties-widget rebuild + an asynchronous scroll-to- +// start-of-content that's hard to override. +// +// Slow path: prefix/suffix-trimmed single splice for any other shape. +// +// After every edit we restore scroll on the scrollDOM directly, +// repeatedly across several frames, because the widget rebuild keeps +// resetting scroll asynchronously and a single restore loses the +// race. +function spliceIntoEditor(editor: Editor, newContent: string): void { + const current = editor.getValue(); + if (current === newContent) return; + const cm = cmOf(editor); + const savedScrollTop = cm?.scrollDOM.scrollTop ?? null; + + const oldFmEnd = frontmatterEnd(current); + const newFmEnd = frontmatterEnd(newContent); + const oldBody = current.slice(oldFmEnd); + const newBody = newContent.slice(newFmEnd); + const fmsDiffer = + current.slice(0, oldFmEnd) !== newContent.slice(0, newFmEnd); + + if (newBody.startsWith(oldBody) && (fmsDiffer || newBody.length > oldBody.length)) { + // Tail append first, then (maybe) frontmatter rewrite. Order + // matters: appending at the current end keeps the frontmatter + // splice positions valid. Reversing the order would invalidate + // the append offset after the frontmatter grew. + if (newBody.length > oldBody.length) { + const appendStart = editor.offsetToPos(current.length); + editor.replaceRange(newBody.slice(oldBody.length), appendStart); + } + if (fmsDiffer) { + const fmStart = editor.offsetToPos(0); + const fmEnd = editor.offsetToPos(oldFmEnd); + editor.replaceRange(newContent.slice(0, newFmEnd), fmStart, fmEnd); + } + } else if (oldBody.startsWith(newBody) && (fmsDiffer || oldBody.length > newBody.length)) { + // Symmetric case: tail truncation. Unlikely on the streaming + // path but cheap to handle. + if (oldBody.length > newBody.length) { + const truncStart = editor.offsetToPos(oldFmEnd + newBody.length); + const truncEnd = editor.offsetToPos(current.length); + editor.replaceRange("", truncStart, truncEnd); + } + if (fmsDiffer) { + const fmStart = editor.offsetToPos(0); + const fmEnd = editor.offsetToPos(oldFmEnd); + editor.replaceRange(newContent.slice(0, newFmEnd), fmStart, fmEnd); + } + } else { + // Generic single-splice fallback: trim common prefix + suffix. + let prefix = 0; + const cap = Math.min(current.length, newContent.length); + while ( + prefix < cap && + current.charCodeAt(prefix) === newContent.charCodeAt(prefix) + ) { + prefix++; + } + let suffix = 0; + const maxSuffix = cap - prefix; + while ( + suffix < maxSuffix && + current.charCodeAt(current.length - 1 - suffix) === + newContent.charCodeAt(newContent.length - 1 - suffix) + ) { + suffix++; + } + const start = editor.offsetToPos(prefix); + const end = editor.offsetToPos(current.length - suffix); + editor.replaceRange( + newContent.slice(prefix, newContent.length - suffix), + start, + end, + ); + } + + if (cm != null && savedScrollTop != null) { + // Race against Obsidian's deferred scroll-reset. Sync restore + + // two animation frames + two timeouts; whichever fires after + // Obsidian's own scroll write wins. Cheap noop on the + // already-correct frames. + const restore = () => { + if (cm.scrollDOM.scrollTop !== savedScrollTop) { + cm.scrollDOM.scrollTop = savedScrollTop; + } + }; + restore(); + requestAnimationFrame(() => { + restore(); + requestAnimationFrame(restore); + }); + window.setTimeout(restore, 50); + window.setTimeout(restore, 200); + } +} + export default class BeaverPlugin extends Plugin { settings: BeaverSettings = { ...DEFAULT_SETTINGS }; @@ -129,15 +258,26 @@ export default class BeaverPlugin extends Plugin { : await this.app.vault.read(file); const notice = new Notice(`Beaver: sending to ${agent}…`, 0); - let response: ChatResponse; + let finalResponse: ChatResponse | null = null; try { - response = await sendChat(this.settings, { - filename, - content, - agent, - }); + await sendChatStream( + this.settings, + { filename, content, agent }, + { + onDelta: (newContent) => { + // Live splice into the editor only — we deliberately don't + // write the partial to disk. The gateway also skips its own + // intermediate file writes on this endpoint, so the local + // file (and Obsidian Sync's copy) only sees the final state + // once. + if (editor) spliceIntoEditor(editor, newContent); + }, + onDone: (resp) => { + finalResponse = resp; + }, + }, + ); } catch (err) { - notice.hide(); if (err instanceof BeaverApiError && err.status === 409) { new Notice("Beaver: already running for this file", 6000); return; @@ -148,13 +288,24 @@ export default class BeaverPlugin extends Plugin { notice.hide(); } - if (response.status === "nothing_to_do") { - new Notice(`Beaver: nothing to do (${response.reason ?? "no reason"})`); + if (!finalResponse) { + // Stream ended without a ``done`` event — shouldn't happen but + // we surface it instead of pretending nothing was wrong. + this.notifyError( + `sending to ${agent}`, + new Error("stream ended without a done event"), + ); return; } - if (typeof response.new_content === "string") { - await this.writeBack(file, editor, view, response.new_content); + const resp: ChatResponse = finalResponse; + if (resp.status === "nothing_to_do") { + new Notice(`Beaver: nothing to do (${resp.reason ?? "no reason"})`); + return; + } + + if (typeof resp.new_content === "string") { + await this.writeBack(file, editor, view, resp.new_content); } new Notice(`Beaver: ${agent} replied`); } @@ -196,13 +347,14 @@ export default class BeaverPlugin extends Plugin { _view: MarkdownView | null, newContent: string, ): Promise { - if (editor && editor.getValue() !== newContent) { - editor.setValue(newContent); + if (editor) { + // Reuse the splice path so the final write (frontmatter refresh + // at the top + USER_SCAFFOLD appended at the bottom) preserves + // scroll just like the streaming deltas did. + spliceIntoEditor(editor, newContent); return; } - if (!editor) { - await this.app.vault.modify(file, newContent); - } + await this.app.vault.modify(file, newContent); } private notifyError(action: string, err: unknown): void {