feat: streaming via extension

This commit is contained in:
h
2026-05-22 00:18:39 +02:00
parent 93b52f0621
commit d2e9d5911c
2 changed files with 301 additions and 17 deletions
+132
View File
@@ -89,3 +89,135 @@ export async function sendChat(
body: JSON.stringify(req), body: JSON.stringify(req),
})) as ChatResponse; })) as ChatResponse;
} }
export interface StreamChatCallbacks {
// Fires for every intermediate snapshot (full file content as the
// gateway would have written it, frontmatter included). Caller is
// responsible for splicing it into the editor — we don't ship the
// diff because the gateway already renders the canonical view and we
// don't want two slightly-different renderers to drift.
onDelta(newContent: string): void;
// Fires exactly once at end-of-turn (success or nothing_to_do).
onDone(response: ChatResponse): void;
}
// Server-Sent Events arrive as ``event: <name>\ndata: <json>\n\n``
// frames. We can't use ``requestUrl`` (it buffers the whole body), so
// SSE is the one place in the plugin that goes through native
// ``fetch``. CORS is allowed by the gateway's ``CORSMiddleware``; auth
// is the same bearer token as the other endpoints.
export async function sendChatStream(
settings: BeaverSettings,
req: ChatRequest,
cb: StreamChatCallbacks,
signal?: AbortSignal,
): Promise<void> {
const url = `${baseUrl(settings)}/chat/stream`;
const headers: Record<string, string> = {
...authHeader(settings),
"Content-Type": "application/json",
Accept: "text/event-stream",
};
const res = await fetch(url, {
method: "POST",
headers,
body: JSON.stringify(req),
signal,
});
if (!res.ok || !res.body) {
// Drain the body so we can surface a useful detail. 409 in
// particular returns JSON; the rest may be JSON or plain text.
const text = await res.text().catch(() => "");
let detail: unknown = text;
try {
const parsed = JSON.parse(text) as { detail?: unknown };
detail =
parsed && typeof parsed === "object" && "detail" in parsed
? parsed.detail
: parsed;
} catch {
// not JSON; leave detail as the raw text
}
const msg =
typeof detail === "string" && detail
? detail
: `HTTP ${res.status}`;
throw new BeaverApiError(res.status, msg, detail);
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
// SSE frames are separated by a blank line (``\n\n``). We buffer
// partial frames across reads, then flush full ones in order.
let buf = "";
let done = false;
while (!done) {
const chunk = await reader.read();
done = chunk.done;
if (chunk.value) buf += decoder.decode(chunk.value, { stream: !done });
let sep = buf.indexOf("\n\n");
while (sep >= 0) {
const frame = buf.slice(0, sep);
buf = buf.slice(sep + 2);
const handled = handleSseFrame(frame, cb);
if (handled === "stop") {
// ``done``/``error`` is terminal — stop reading even if the
// server sends extra padding before closing.
try {
await reader.cancel();
} catch {
// best-effort cleanup
}
return;
}
sep = buf.indexOf("\n\n");
}
}
}
function handleSseFrame(
frame: string,
cb: StreamChatCallbacks,
): "continue" | "stop" {
// SSE lines: ``event: <name>`` / ``data: <json>``. ``data`` may span
// multiple lines (concatenated with ``\n``) per the spec; we honour
// that even though the gateway emits single-line ``data:`` today.
let event = "message";
const dataLines: string[] = [];
for (const rawLine of frame.split("\n")) {
const line = rawLine.replace(/\r$/, "");
if (!line || line.startsWith(":")) continue;
const colon = line.indexOf(":");
if (colon < 0) continue;
const field = line.slice(0, colon);
let value = line.slice(colon + 1);
if (value.startsWith(" ")) value = value.slice(1);
if (field === "event") event = value;
else if (field === "data") dataLines.push(value);
}
if (dataLines.length === 0) return "continue";
let data: unknown;
try {
data = JSON.parse(dataLines.join("\n"));
} catch {
return "continue";
}
const obj = data as Record<string, unknown>;
if (event === "delta") {
const content = obj.new_content;
if (typeof content === "string") cb.onDelta(content);
return "continue";
}
if (event === "done") {
cb.onDone(obj as unknown as ChatResponse);
return "stop";
}
if (event === "error") {
const code =
typeof obj.status_code === "number" ? obj.status_code : 500;
const detail =
typeof obj.detail === "string" ? obj.detail : `HTTP ${code}`;
throw new BeaverApiError(code, detail, obj);
}
return "continue";
}
+169 -17
View File
@@ -9,7 +9,7 @@ import {
BeaverApiError, BeaverApiError,
ChatResponse, ChatResponse,
listAgents, listAgents,
sendChat, sendChatStream,
} from "./api"; } from "./api";
import { pickAgent } from "./agentPicker"; import { pickAgent } from "./agentPicker";
import { import {
@@ -20,6 +20,135 @@ import {
const AGENT_CACHE_TTL_MS = 5 * 60 * 1000; const AGENT_CACHE_TTL_MS = 5 * 60 * 1000;
// Obsidian's ``Editor`` is a thin shim over a CodeMirror 6
// ``EditorView``; reaching for it directly is the only way to grab
// the real scrollable DOM node (``.cm-scroller``). Property name is
// unofficial but stable across recent Obsidian releases.
interface CMHandle {
scrollDOM: HTMLElement;
}
function cmOf(editor: Editor): CMHandle | null {
const cm = (editor as unknown as { cm?: CMHandle }).cm;
return cm && cm.scrollDOM instanceof HTMLElement ? cm : null;
}
// Boundary of the YAML frontmatter block. ``---\n…---\n`` at the very
// start of the file; anything else is a no-frontmatter file and we
// return 0.
function frontmatterEnd(s: string): number {
if (!s.startsWith("---\n")) return 0;
const idx = s.indexOf("\n---\n", 4);
return idx >= 0 ? idx + 5 : 0;
}
// Replace only the regions that actually changed instead of
// ``editor.setValue`` (which resets cursor + scroll and is expensive
// on every snapshot for long files).
//
// Fast path: when the body after the frontmatter is identical except
// for a tail append (the common case for the final write — gateway
// refreshes frontmatter at the top and appends USER_SCAFFOLD at the
// bottom, body in between is unchanged), we apply TWO small edits
// instead of one huge one. The frontmatter rewrite stays a small,
// localised change; the tail append touches only the very last bytes.
// A single big ``replaceRange`` spanning the frontmatter triggers
// Obsidian's Properties-widget rebuild + an asynchronous scroll-to-
// start-of-content that's hard to override.
//
// Slow path: prefix/suffix-trimmed single splice for any other shape.
//
// After every edit we restore scroll on the scrollDOM directly,
// repeatedly across several frames, because the widget rebuild keeps
// resetting scroll asynchronously and a single restore loses the
// race.
function spliceIntoEditor(editor: Editor, newContent: string): void {
const current = editor.getValue();
if (current === newContent) return;
const cm = cmOf(editor);
const savedScrollTop = cm?.scrollDOM.scrollTop ?? null;
const oldFmEnd = frontmatterEnd(current);
const newFmEnd = frontmatterEnd(newContent);
const oldBody = current.slice(oldFmEnd);
const newBody = newContent.slice(newFmEnd);
const fmsDiffer =
current.slice(0, oldFmEnd) !== newContent.slice(0, newFmEnd);
if (newBody.startsWith(oldBody) && (fmsDiffer || newBody.length > oldBody.length)) {
// Tail append first, then (maybe) frontmatter rewrite. Order
// matters: appending at the current end keeps the frontmatter
// splice positions valid. Reversing the order would invalidate
// the append offset after the frontmatter grew.
if (newBody.length > oldBody.length) {
const appendStart = editor.offsetToPos(current.length);
editor.replaceRange(newBody.slice(oldBody.length), appendStart);
}
if (fmsDiffer) {
const fmStart = editor.offsetToPos(0);
const fmEnd = editor.offsetToPos(oldFmEnd);
editor.replaceRange(newContent.slice(0, newFmEnd), fmStart, fmEnd);
}
} else if (oldBody.startsWith(newBody) && (fmsDiffer || oldBody.length > newBody.length)) {
// Symmetric case: tail truncation. Unlikely on the streaming
// path but cheap to handle.
if (oldBody.length > newBody.length) {
const truncStart = editor.offsetToPos(oldFmEnd + newBody.length);
const truncEnd = editor.offsetToPos(current.length);
editor.replaceRange("", truncStart, truncEnd);
}
if (fmsDiffer) {
const fmStart = editor.offsetToPos(0);
const fmEnd = editor.offsetToPos(oldFmEnd);
editor.replaceRange(newContent.slice(0, newFmEnd), fmStart, fmEnd);
}
} else {
// Generic single-splice fallback: trim common prefix + suffix.
let prefix = 0;
const cap = Math.min(current.length, newContent.length);
while (
prefix < cap &&
current.charCodeAt(prefix) === newContent.charCodeAt(prefix)
) {
prefix++;
}
let suffix = 0;
const maxSuffix = cap - prefix;
while (
suffix < maxSuffix &&
current.charCodeAt(current.length - 1 - suffix) ===
newContent.charCodeAt(newContent.length - 1 - suffix)
) {
suffix++;
}
const start = editor.offsetToPos(prefix);
const end = editor.offsetToPos(current.length - suffix);
editor.replaceRange(
newContent.slice(prefix, newContent.length - suffix),
start,
end,
);
}
if (cm != null && savedScrollTop != null) {
// Race against Obsidian's deferred scroll-reset. Sync restore +
// two animation frames + two timeouts; whichever fires after
// Obsidian's own scroll write wins. Cheap noop on the
// already-correct frames.
const restore = () => {
if (cm.scrollDOM.scrollTop !== savedScrollTop) {
cm.scrollDOM.scrollTop = savedScrollTop;
}
};
restore();
requestAnimationFrame(() => {
restore();
requestAnimationFrame(restore);
});
window.setTimeout(restore, 50);
window.setTimeout(restore, 200);
}
}
export default class BeaverPlugin extends Plugin { export default class BeaverPlugin extends Plugin {
settings: BeaverSettings = { ...DEFAULT_SETTINGS }; settings: BeaverSettings = { ...DEFAULT_SETTINGS };
@@ -129,15 +258,26 @@ export default class BeaverPlugin extends Plugin {
: await this.app.vault.read(file); : await this.app.vault.read(file);
const notice = new Notice(`Beaver: sending to ${agent}`, 0); const notice = new Notice(`Beaver: sending to ${agent}`, 0);
let response: ChatResponse; let finalResponse: ChatResponse | null = null;
try { try {
response = await sendChat(this.settings, { await sendChatStream(
filename, this.settings,
content, { filename, content, agent },
agent, {
}); onDelta: (newContent) => {
// Live splice into the editor only — we deliberately don't
// write the partial to disk. The gateway also skips its own
// intermediate file writes on this endpoint, so the local
// file (and Obsidian Sync's copy) only sees the final state
// once.
if (editor) spliceIntoEditor(editor, newContent);
},
onDone: (resp) => {
finalResponse = resp;
},
},
);
} catch (err) { } catch (err) {
notice.hide();
if (err instanceof BeaverApiError && err.status === 409) { if (err instanceof BeaverApiError && err.status === 409) {
new Notice("Beaver: already running for this file", 6000); new Notice("Beaver: already running for this file", 6000);
return; return;
@@ -148,13 +288,24 @@ export default class BeaverPlugin extends Plugin {
notice.hide(); notice.hide();
} }
if (response.status === "nothing_to_do") { if (!finalResponse) {
new Notice(`Beaver: nothing to do (${response.reason ?? "no reason"})`); // Stream ended without a ``done`` event — shouldn't happen but
// we surface it instead of pretending nothing was wrong.
this.notifyError(
`sending to ${agent}`,
new Error("stream ended without a done event"),
);
return; return;
} }
if (typeof response.new_content === "string") { const resp: ChatResponse = finalResponse;
await this.writeBack(file, editor, view, response.new_content); if (resp.status === "nothing_to_do") {
new Notice(`Beaver: nothing to do (${resp.reason ?? "no reason"})`);
return;
}
if (typeof resp.new_content === "string") {
await this.writeBack(file, editor, view, resp.new_content);
} }
new Notice(`Beaver: ${agent} replied`); new Notice(`Beaver: ${agent} replied`);
} }
@@ -196,13 +347,14 @@ export default class BeaverPlugin extends Plugin {
_view: MarkdownView | null, _view: MarkdownView | null,
newContent: string, newContent: string,
): Promise<void> { ): Promise<void> {
if (editor && editor.getValue() !== newContent) { if (editor) {
editor.setValue(newContent); // Reuse the splice path so the final write (frontmatter refresh
// at the top + USER_SCAFFOLD appended at the bottom) preserves
// scroll just like the streaming deltas did.
spliceIntoEditor(editor, newContent);
return; return;
} }
if (!editor) { await this.app.vault.modify(file, newContent);
await this.app.vault.modify(file, newContent);
}
} }
private notifyError(action: string, err: unknown): void { private notifyError(action: string, err: unknown): void {