From a99a80f6c9ea6303059d4867c8abecf928f6e709 Mon Sep 17 00:00:00 2001 From: h Date: Mon, 4 May 2026 12:20:40 +0200 Subject: [PATCH] feat(bot,frontend,backend): add integration with external collaborative solver --- backend/.env.example | 7 + backend/pyproject.toml | 1 + backend/src/bot/__init__.py | 16 +- backend/src/bot/handlers/__init__.py | 3 +- .../bot/handlers/initialize/initializer.py | 1 + backend/src/bot/handlers/message/handler.py | 66 +++++- backend/src/bot/handlers/room/__init__.py | 3 + backend/src/bot/handlers/room/handler.py | 202 ++++++++++++++++++ backend/src/bot/sync.py | 168 ++++++++++++++- backend/src/utils/collaborative/__init__.py | 7 + backend/src/utils/collaborative/client.py | 67 ++++++ backend/src/utils/env.py | 14 ++ .../src/lib/components/ChatMessage.svelte | 24 +-- .../lib/components/IncomingSheetsPanel.svelte | 53 +++++ frontend/src/lib/convex/_generated/api.d.ts | 4 + frontend/src/lib/convex/chats.ts | 15 ++ frontend/src/lib/convex/collaborative.ts | 148 +++++++++++++ frontend/src/lib/convex/incomingSheets.ts | 121 +++++++++++ frontend/src/lib/convex/messages.ts | 18 ++ frontend/src/lib/convex/schema.ts | 67 +++++- frontend/src/lib/convex/users.ts | 87 ++++++++ frontend/src/lib/utils/markdown.ts | 19 ++ frontend/src/routes/[mnemonic]/+page.svelte | 40 ++++ 23 files changed, 1115 insertions(+), 36 deletions(-) create mode 100644 backend/src/bot/handlers/room/__init__.py create mode 100644 backend/src/bot/handlers/room/handler.py create mode 100644 backend/src/utils/collaborative/__init__.py create mode 100644 backend/src/utils/collaborative/client.py create mode 100644 frontend/src/lib/components/IncomingSheetsPanel.svelte create mode 100644 frontend/src/lib/convex/collaborative.ts create mode 100644 frontend/src/lib/convex/incomingSheets.ts create mode 100644 frontend/src/lib/utils/markdown.ts diff --git a/backend/.env.example b/backend/.env.example index 0607ac7..638f9b2 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -6,3 +6,10 @@ LOG__LEVEL=INFO LOG__LEVEL_EXTERNAL=WARNING LOG__SHOW_TIME=false LOG__CONSOLE_WIDTH=150 + +COLLABORATIVE__API_URL= +COLLABORATIVE__API_KEY= +COLLABORATIVE__POLL_INTERVAL_SECONDS=8 +COLLABORATIVE__UPLOAD_MAX_ATTEMPTS=8 +COLLABORATIVE__UPLOAD_BACKOFF_BASE_SECONDS=2.0 +COLLABORATIVE__REQUEST_TIMEOUT_SECONDS=30 diff --git a/backend/pyproject.toml b/backend/pyproject.toml index cc26111..44a8c4f 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -9,6 +9,7 @@ requires-python = ">=3.13" dependencies = [ "aiogram>=3.24.0", "convex>=0.7.0", + "httpx>=0.27", "pydantic-ai-slim[google]>=1.44.0", "pydantic-settings>=2.12.0", "rich>=14.2.0", diff --git a/backend/src/bot/__init__.py b/backend/src/bot/__init__.py index 442ded8..aafae42 100644 --- a/backend/src/bot/__init__.py +++ b/backend/src/bot/__init__.py @@ -11,20 +11,28 @@ setup_logging() async def runner() -> None: from . import handlers # noqa: PLC0415 from .common import bot, dp # noqa: PLC0415 - from .sync import start_sync_listener # noqa: PLC0415 + from .sync import ( # noqa: PLC0415 + start_room_inbox_poller, + start_room_upload_worker, + start_sync_listener, + ) dp.include_routers(handlers.router) sync_task = asyncio.create_task(start_sync_listener(bot)) + upload_task = asyncio.create_task(start_room_upload_worker(bot)) + inbox_task = asyncio.create_task(start_room_inbox_poller(bot)) await bot.delete_webhook(drop_pending_updates=True) try: await dp.start_polling(bot) finally: - sync_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await sync_task + for t in (sync_task, upload_task, inbox_task): + t.cancel() + for t in (sync_task, upload_task, inbox_task): + with contextlib.suppress(asyncio.CancelledError): + await t def plugins() -> None: diff --git a/backend/src/bot/handlers/__init__.py b/backend/src/bot/handlers/__init__.py index ee07607..5ad510f 100644 --- a/backend/src/bot/handlers/__init__.py +++ b/backend/src/bot/handlers/__init__.py @@ -1,6 +1,6 @@ from aiogram import Router -from . import apikey, chat, initialize, inject, message, proxy, rag, start +from . import apikey, chat, initialize, inject, message, proxy, rag, room, start router = Router() @@ -12,5 +12,6 @@ router.include_routers( rag.router, inject.router, proxy.router, + room.router, message.router, ) diff --git a/backend/src/bot/handlers/initialize/initializer.py b/backend/src/bot/handlers/initialize/initializer.py index 6a83827..d237696 100644 --- a/backend/src/bot/handlers/initialize/initializer.py +++ b/backend/src/bot/handlers/initialize/initializer.py @@ -19,6 +19,7 @@ async def startup(bot: Bot) -> None: types.BotCommand(command="/preset", description="Apply a preset"), types.BotCommand(command="/proxy", description="Proxy chat to another bot"), types.BotCommand(command="/inject", description="Inject knowledge base"), + types.BotCommand(command="/room", description="Manage collaborative room"), ] ) logger.info(f"[green]Started as[/] @{(await bot.me()).username}") diff --git a/backend/src/bot/handlers/message/handler.py b/backend/src/bot/handlers/message/handler.py index b509469..dc77c47 100644 --- a/backend/src/bot/handlers/message/handler.py +++ b/backend/src/bot/handlers/message/handler.py @@ -36,6 +36,45 @@ convex = ConvexClient(env.convex_url) ALBUM_COLLECT_DELAY = 0.5 +_room_upload_tasks: set[asyncio.Task] = set() + + +async def _enqueue_room_upload( + user_id: str, pin: str, image_b64: str, media_type: str +) -> None: + try: + await convex.mutation( + "collaborative:enqueueUpload", + { + "userId": user_id, + "pin": pin, + "imageBase64": image_b64, + "mediaType": media_type, + }, + ) + except Exception as e: # noqa: BLE001 + from utils.logging import logger as _logger # noqa: PLC0415 + + _logger.warning(f"Failed to enqueue room upload: {e}") + + +def schedule_room_uploads( + user: dict, images_base64: list[str], media_types: list[str] +) -> None: + room = user.get("collaborativeRoom") if user else None + if not room: + return + pin = room.get("pin") + if not pin: + return + user_id = user["_id"] + for img_b64, media_type in zip(images_base64, media_types, strict=True): + task = asyncio.create_task( + _enqueue_room_upload(user_id, pin, img_b64, media_type) + ) + _room_upload_tasks.add(task) + task.add_done_callback(_room_upload_tasks.discard) + class AlbumMiddleware(BaseMiddleware): def __init__(self) -> None: @@ -197,13 +236,17 @@ class ProxyStreamingState: async def send_long_message( bot: Bot, chat_id: int, text: str, reply_markup: ReplyKeyboardMarkup | None = None -) -> None: +) -> int | None: parts = split_message(text) + first_id: int | None = None for i, part in enumerate(parts): is_last = i == len(parts) - 1 - await bot.send_message( + sent = await bot.send_message( chat_id, html.quote(part), reply_markup=reply_markup if is_last else None ) + if first_id is None: + first_id = sent.message_id + return first_id async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915 @@ -300,7 +343,7 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915 api_key=api_key, model_name=model_name, system_prompt=system_prompt, - rag_db_names=rag_db_names if rag_db_names else None, + rag_db_names=rag_db_names or None, ) agent_deps = ( @@ -383,7 +426,16 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915 with contextlib.suppress(Exception): await processing_msg.delete() keyboard = make_follow_up_keyboard(follow_ups) - await send_long_message(bot, tg_chat_id, final_answer, keyboard) + sent_id = await send_long_message(bot, tg_chat_id, final_answer, keyboard) + if sent_id is not None: + with contextlib.suppress(Exception): + await convex.mutation( + "messages:setTelegramMessageId", + { + "messageId": assistant_message_id, + "telegramMessageId": sent_id, + }, + ) except Exception as e: # noqa: BLE001 if state: @@ -502,7 +554,7 @@ async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915 api_key=api_key, model_name=model_name, system_prompt=system_prompt, - rag_db_names=rag_db_names if rag_db_names else None, + rag_db_names=rag_db_names or None, ) agent_deps = ( @@ -712,6 +764,8 @@ async def on_album_message( }, ) + schedule_room_uploads(user, images_base64, images_media_types) + await process_message( message.from_user.id, caption, @@ -782,6 +836,8 @@ async def on_photo_message(message: types.Message, bot: Bot) -> None: }, ) + schedule_room_uploads(user, [image_base64], [media_type]) + await process_message( message.from_user.id, caption, diff --git a/backend/src/bot/handlers/room/__init__.py b/backend/src/bot/handlers/room/__init__.py new file mode 100644 index 0000000..c33d170 --- /dev/null +++ b/backend/src/bot/handlers/room/__init__.py @@ -0,0 +1,3 @@ +from .handler import router + +__all__ = ["router"] diff --git a/backend/src/bot/handlers/room/handler.py b/backend/src/bot/handlers/room/handler.py new file mode 100644 index 0000000..41da30f --- /dev/null +++ b/backend/src/bot/handlers/room/handler.py @@ -0,0 +1,202 @@ +from aiogram import Router, types +from aiogram.filters import Command +from convex import ConvexInt64 + +from utils import env +from utils.collaborative import CollaborativeHTTPError, get_collaborative_client +from utils.convex import ConvexClient +from utils.logging import logger + +router = Router() +convex = ConvexClient(env.convex_url) + + +@router.message(Command("room")) +async def on_room(message: types.Message) -> None: # noqa: C901 + if not message.from_user or not message.text: + return + + if not env.collaborative.enabled: + await message.answer("Collaborative Solver is not configured on this instance.") + return + + args = message.text.split()[1:] + + if not args: + await show_usage(message) + return + + user = await convex.query( + "users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)} + ) + if not user: + await convex.mutation( + "users:getOrCreate", + { + "telegramId": ConvexInt64(message.from_user.id), + "telegramChatId": ConvexInt64(message.chat.id), + }, + ) + user = await convex.query( + "users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)} + ) + if not user: + await message.answer("User not found.") + return + + user_id = user["_id"] + sub = args[0] + + if sub == "create": + await create_room(message, user_id) + elif sub == "join": + if len(args) < 2: # noqa: PLR2004 + await message.answer( + "Usage: /room join <pin>", parse_mode="HTML" + ) + return + await join_room(message, user_id, args[1].strip()) + elif sub == "leave": + await leave_room(message, user_id, user.get("collaborativeRoom")) + elif sub == "status": + await show_status(message, user_id, user.get("collaborativeRoom")) + else: + await show_usage(message) + + +async def show_usage(message: types.Message) -> None: + await message.answer( + "Room commands:\n\n" + "/room create - Create a new room and get a PIN\n" + "/room join <pin> - Join an existing room\n" + "/room leave - Leave current room\n" + "/room status - Show current room status", + parse_mode="HTML", + ) + + +async def create_room(message: types.Message, user_id: str) -> None: + client = get_collaborative_client() + try: + resp = await client.create_room() + except (CollaborativeHTTPError, Exception) as e: # noqa: BLE001 + logger.error(f"Failed to create room: {e}") + await message.answer(f"Failed to create room: {e}") + return + + pin = resp.get("pin", "") + creator_token = resp.get("creator_token", "") + + if not pin: + await message.answer("Got invalid response from solver.") + return + + await convex.mutation( + "users:setCollaborativeRoom", + { + "userId": user_id, + "pin": pin, + "creatorToken": creator_token, + "lastSeenSheetId": 0, + }, + ) + + await message.answer( + f"Room created!\n\n" + f"PIN: {pin}\n\n" + f"Share this PIN with others so they can join with " + f"/room join {pin}.", + parse_mode="HTML", + ) + + +async def join_room(message: types.Message, user_id: str, pin: str) -> None: + client = get_collaborative_client() + try: + sheets_resp = await client.get_sheets(pin) + except CollaborativeHTTPError as e: + if e.status_code == 404: # noqa: PLR2004 + await message.answer( + f"Room {pin} not found.", parse_mode="HTML" + ) + else: + await message.answer(f"Failed to join room: {e}") + return + except Exception as e: # noqa: BLE001 + logger.error(f"Failed to join room: {e}") + await message.answer(f"Failed to join room: {e}") + return + + sheets = sheets_resp.get("sheets", []) or [] + last_sheet_id = max((int(s["id"]) for s in sheets), default=0) + + await convex.mutation( + "users:setCollaborativeRoom", + {"userId": user_id, "pin": pin, "lastSeenSheetId": last_sheet_id}, + ) + + await message.answer( + f"Joined room {pin} ({len(sheets)} sheet(s) so far).\n" + f"New sheets will appear in the web UI.", + parse_mode="HTML", + ) + + +async def leave_room(message: types.Message, user_id: str, room: dict | None) -> None: + if not room: + await message.answer("Not in any room.") + return + + pin = room.get("pin", "") + creator_token = room.get("creatorToken") + + if creator_token: + client = get_collaborative_client() + try: + await client.delete_room(pin, creator_token) + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to delete room {pin}: {e}") + + await convex.mutation("users:clearCollaborativeRoom", {"userId": user_id}) + await message.answer(f"Left room {pin}.", parse_mode="HTML") + + +async def show_status(message: types.Message, user_id: str, room: dict | None) -> None: + if not room: + await message.answer( + "Not in any room.\n\nUse /room create or " + "/room join <pin>.", + parse_mode="HTML", + ) + return + + pin = room.get("pin", "") + client = get_collaborative_client() + info: dict = {} + sheets_count: int | str = "?" + try: + info = await client.get_room(pin) + except Exception as e: # noqa: BLE001 + logger.warning(f"get_room failed: {e}") + try: + sheets_resp = await client.get_sheets(pin) + sheets_count = len(sheets_resp.get("sheets", []) or []) + except Exception as e: # noqa: BLE001 + logger.warning(f"get_sheets failed: {e}") + + queue = await convex.query("collaborative:queueDepthForUser", {"userId": user_id}) + role = "creator" if room.get("creatorToken") else "member" + last_seen = room.get("lastSeenSheetId", 0) + + await message.answer( + f"Room {pin} ({role})\n" + f"Members: {info.get('members', '?')}\n" + f"Processing: {info.get('processing', False)}\n" + f"Sheets in room: {sheets_count}\n" + f"Last seen sheet id: {last_seen}\n\n" + f"Upload queue:\n" + f" pending: {queue.get('pending', 0)}\n" + f" uploading: {queue.get('uploading', 0)}\n" + f" failed: {queue.get('failed', 0)}", + parse_mode="HTML", + ) diff --git a/backend/src/bot/sync.py b/backend/src/bot/sync.py index fed9857..f47860b 100644 --- a/backend/src/bot/sync.py +++ b/backend/src/bot/sync.py @@ -1,16 +1,23 @@ import asyncio +import base64 +import time from aiogram import Bot from bot.handlers.message.handler import process_message_from_web from utils import env +from utils.collaborative import ( + CollaborativeClient, + CollaborativeHTTPError, + get_collaborative_client, +) from utils.convex import ConvexClient from utils.logging import logger convex = ConvexClient(env.convex_url) -background_tasks = set() +background_tasks: set[asyncio.Task] = set() async def start_sync_listener(bot: Bot) -> None: @@ -58,3 +65,162 @@ async def handle_pending_generation(bot: Bot, item: dict, item_id: str) -> None: logger.error(f"Error processing {item_id}: {e}") finally: await convex.mutation("pendingGenerations:remove", {"id": item_id}) + + +async def start_room_upload_worker(bot: Bot) -> None: # noqa: ARG001 + if not env.collaborative.enabled: + logger.info("Collaborative not configured; upload worker disabled.") + return + + logger.info("Starting collaborative upload worker...") + + try: + reset_count = await convex.mutation("collaborative:resetStuckUploads", {}) + if reset_count: + logger.info(f"Reset {reset_count} stuck uploads to pending.") + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to reset stuck uploads: {e}") + + in_flight: set[str] = set() + sub = convex.subscribe("collaborative:listPendingUploads", {}) + + try: + async for pending_list in sub: + for item in pending_list or []: + item_id = item["_id"] + if item_id in in_flight: + continue + if item.get("attempts", 0) >= env.collaborative.upload_max_attempts: + continue + + in_flight.add(item_id) + task = asyncio.create_task(_handle_upload(item, item_id, in_flight)) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) + except asyncio.CancelledError: + logger.info("Upload worker cancelled") + raise + except Exception as e: # noqa: BLE001 + logger.error(f"Upload worker error: {e}") + finally: + sub.unsubscribe() + + +async def _handle_upload(item: dict, item_id: str, in_flight: set[str]) -> None: + try: + claimed = await convex.mutation("collaborative:markUploading", {"id": item_id}) + if not claimed: + return + + client = get_collaborative_client() + try: + image_bytes = base64.b64decode(item["imageBase64"]) + await client.upload_photo( + pin=item["pin"], image_bytes=image_bytes, media_type=item["mediaType"] + ) + await convex.mutation("collaborative:markDone", {"id": item_id}) + logger.info(f"Room upload {item_id} done.") + except CollaborativeHTTPError as e: + if 400 <= e.status_code < 500: # noqa: PLR2004 + await convex.mutation( + "collaborative:markFailed", {"id": item_id, "error": str(e)} + ) + logger.warning(f"Room upload {item_id} failed (4xx): {e}") + else: + await _bump_with_backoff(item, item_id, str(e)) + except Exception as e: # noqa: BLE001 + await _bump_with_backoff(item, item_id, str(e)) + finally: + in_flight.discard(item_id) + + +async def _bump_with_backoff(item: dict, item_id: str, error: str) -> None: + attempts = item.get("attempts", 0) + 1 + base = env.collaborative.upload_backoff_base_seconds + delay_seconds = min(base * (2 ** (attempts - 1)), 60.0) + next_retry_at_ms = int((time.time() + delay_seconds) * 1000) + await convex.mutation( + "collaborative:bumpAttempt", + {"id": item_id, "error": error[:500], "nextRetryAt": next_retry_at_ms}, + ) + logger.info( + f"Room upload {item_id} retry in {delay_seconds:.1f}s " + f"(attempt {attempts}): {error[:100]}" + ) + + +async def start_room_inbox_poller(bot: Bot) -> None: # noqa: ARG001 + if not env.collaborative.enabled: + logger.info("Collaborative not configured; inbox poller disabled.") + return + + logger.info("Starting collaborative inbox poller...") + interval = env.collaborative.poll_interval_seconds + client = get_collaborative_client() + + while True: + try: + users = await convex.query("users:listActiveRoomUsers", {}) + for user in users or []: + await _poll_user_room(client, user) + except asyncio.CancelledError: + logger.info("Inbox poller cancelled") + raise + except Exception as e: # noqa: BLE001 + logger.warning(f"Inbox poller iteration failed: {e}") + + try: + await asyncio.sleep(interval) + except asyncio.CancelledError: + logger.info("Inbox poller cancelled") + raise + + +async def _poll_user_room(client: CollaborativeClient, user: dict) -> None: + pin = user["pin"] + last_seen_sheet_id = int(user.get("lastSeenSheetId", 0) or 0) + user_id = user["_id"] + + try: + resp = await client.get_sheets(pin) + except CollaborativeHTTPError as e: + logger.warning(f"Room {pin} sheets fetch failed ({e.status_code}); skipping.") + return + except Exception as e: # noqa: BLE001 + logger.warning(f"Room {pin} sheets fetch error: {e}") + return + + sheets = resp.get("sheets", []) or [] + new_sheets = [s for s in sheets if int(s.get("id", 0)) > last_seen_sheet_id] + if not new_sheets: + return + + user_doc = await convex.query("users:getById", {"userId": user_id}) + if not user_doc or not user_doc.get("activeChatId"): + return + + chat_id = user_doc["activeChatId"] + + payload = [ + { + "sheetId": int(s["id"]), + "sheetCreatedAt": float(s.get("created_at", 0)), + "text": s.get("text", "") or "", + } + for s in new_sheets + ] + max_id = max(int(s["id"]) for s in new_sheets) + + try: + inserted = await convex.mutation( + "incomingSheets:createMany", + {"userId": user_id, "chatId": chat_id, "pin": pin, "sheets": payload}, + ) + await convex.mutation( + "users:setLastSeenSheetId", {"userId": user_id, "sheetId": max_id} + ) + logger.info( + f"Room {pin}: ingested {inserted} new sheet(s) (cursor → {max_id})." + ) + except Exception as e: # noqa: BLE001 + logger.error(f"Failed to ingest sheets for {pin}: {e}") diff --git a/backend/src/utils/collaborative/__init__.py b/backend/src/utils/collaborative/__init__.py new file mode 100644 index 0000000..ce12487 --- /dev/null +++ b/backend/src/utils/collaborative/__init__.py @@ -0,0 +1,7 @@ +from .client import ( + CollaborativeClient, + CollaborativeHTTPError, + get_collaborative_client, +) + +__all__ = ["CollaborativeClient", "CollaborativeHTTPError", "get_collaborative_client"] diff --git a/backend/src/utils/collaborative/client.py b/backend/src/utils/collaborative/client.py new file mode 100644 index 0000000..252f6d6 --- /dev/null +++ b/backend/src/utils/collaborative/client.py @@ -0,0 +1,67 @@ +from typing import Any + +import httpx + +from utils import env + + +class CollaborativeHTTPError(Exception): + def __init__(self, status_code: int, body: str) -> None: + super().__init__(f"HTTP {status_code}: {body[:200]}") + self.status_code = status_code + self.body = body + + +class CollaborativeClient: + def __init__(self) -> None: + cfg = env.collaborative + timeout = httpx.Timeout(connect=10.0, read=30.0, write=60.0, pool=10.0) + self._client = httpx.AsyncClient( + base_url=cfg.api_url.rstrip("/"), + timeout=timeout, + headers={"X-API-Key": cfg.api_key.get_secret_value()}, + ) + + async def aclose(self) -> None: + await self._client.aclose() + + async def _request( + self, + method: str, + path: str, + **kwargs: Any, # noqa: ANN401 + ) -> dict[str, Any]: + resp = await self._client.request(method, path, **kwargs) + if resp.status_code >= 400: # noqa: PLR2004 + raise CollaborativeHTTPError(resp.status_code, resp.text) + return resp.json() + + async def create_room(self) -> dict[str, str]: + return await self._request("POST", "/api/v1/rooms") + + async def get_room(self, pin: str) -> dict[str, Any]: + return await self._request("GET", f"/api/v1/rooms/{pin}") + + async def get_sheets(self, pin: str) -> dict[str, Any]: + return await self._request("GET", f"/api/v1/rooms/{pin}/sheets") + + async def delete_room(self, pin: str, creator_token: str) -> dict[str, Any]: + return await self._request( + "DELETE", f"/api/v1/rooms/{pin}", params={"creator_token": creator_token} + ) + + async def upload_photo( + self, pin: str, image_bytes: bytes, media_type: str, filename: str = "photo.jpg" + ) -> dict[str, Any]: + files = {"file": (filename, image_bytes, media_type)} + return await self._request("POST", f"/api/v1/rooms/{pin}/upload", files=files) + + +_client: CollaborativeClient | None = None + + +def get_collaborative_client() -> CollaborativeClient: + global _client # noqa: PLW0603 + if _client is None: + _client = CollaborativeClient() + return _client diff --git a/backend/src/utils/env.py b/backend/src/utils/env.py index de2aaf5..28867e5 100644 --- a/backend/src/utils/env.py +++ b/backend/src/utils/env.py @@ -17,10 +17,24 @@ class LogSettings(BaseSettings): console_width: int = 150 +class CollaborativeSettings(BaseSettings): + api_url: str = "" + api_key: SecretStr = SecretStr("") + poll_interval_seconds: int = 8 + upload_max_attempts: int = 8 + upload_backoff_base_seconds: float = 2.0 + request_timeout_seconds: int = 30 + + @property + def enabled(self) -> bool: + return bool(self.api_url) + + class Settings(BaseSettings): bot: BotSettings site: SiteSettings log: LogSettings + collaborative: CollaborativeSettings convex_url: str = Field(validation_alias=AliasChoices("CONVEX_SELF_HOSTED_URL")) convex_http_url: str = Field( diff --git a/frontend/src/lib/components/ChatMessage.svelte b/frontend/src/lib/components/ChatMessage.svelte index 02ba42f..f1bd478 100644 --- a/frontend/src/lib/components/ChatMessage.svelte +++ b/frontend/src/lib/components/ChatMessage.svelte @@ -1,6 +1,6 @@
+ import type { Id } from '$lib/convex/_generated/dataModel'; + import { processContent } from '$lib/utils/markdown'; + + interface Sheet { + _id: Id<'incomingSheets'>; + sheetId: number; + text: string; + } + + interface Props { + sheets: Sheet[]; + onaccept: (id: Id<'incomingSheets'>) => void; + ondismiss: (id: Id<'incomingSheets'>) => void; + } + + let { sheets, onaccept, ondismiss }: Props = $props(); + + +{#if sheets.length > 0} +
+
+ incoming sheets · {sheets.length} +
+ {#each sheets as s (s._id)} +
+
sheet #{s.sheetId}
+
+ + {@html processContent(s.text)} +
+
+ + +
+
+ {/each} +
+{/if} diff --git a/frontend/src/lib/convex/_generated/api.d.ts b/frontend/src/lib/convex/_generated/api.d.ts index f32e8b5..6b55ebe 100644 --- a/frontend/src/lib/convex/_generated/api.d.ts +++ b/frontend/src/lib/convex/_generated/api.d.ts @@ -9,8 +9,10 @@ */ import type * as chats from "../chats.js"; +import type * as collaborative from "../collaborative.js"; import type * as devicePairings from "../devicePairings.js"; import type * as http from "../http.js"; +import type * as incomingSheets from "../incomingSheets.js"; import type * as inject from "../inject.js"; import type * as injectConnections from "../injectConnections.js"; import type * as messages from "../messages.js"; @@ -30,8 +32,10 @@ import type { declare const fullApi: ApiFromModules<{ chats: typeof chats; + collaborative: typeof collaborative; devicePairings: typeof devicePairings; http: typeof http; + incomingSheets: typeof incomingSheets; inject: typeof inject; injectConnections: typeof injectConnections; messages: typeof messages; diff --git a/frontend/src/lib/convex/chats.ts b/frontend/src/lib/convex/chats.ts index 1dc47aa..e032b76 100644 --- a/frontend/src/lib/convex/chats.ts +++ b/frontend/src/lib/convex/chats.ts @@ -86,6 +86,21 @@ export const getWithUser = query({ ragDatabaseId: v.id('ragDatabases'), activeSince: v.number() }) + ), + injectCollectionMode: v.optional( + v.object({ + injectDatabaseId: v.id('injectDatabases'), + activeSince: v.number() + }) + ), + collaborativeRoom: v.optional( + v.object({ + pin: v.string(), + creatorToken: v.optional(v.string()), + joinedAt: v.number(), + lastSeenHistoryCount: v.optional(v.number()), + lastSeenSheetId: v.optional(v.number()) + }) ) }) }), diff --git a/frontend/src/lib/convex/collaborative.ts b/frontend/src/lib/convex/collaborative.ts new file mode 100644 index 0000000..4274b8a --- /dev/null +++ b/frontend/src/lib/convex/collaborative.ts @@ -0,0 +1,148 @@ +import { v } from 'convex/values'; +import { mutation, query } from './_generated/server'; + +const PENDING_UPLOAD_RETURN = v.object({ + _id: v.id('pendingRoomUploads'), + _creationTime: v.number(), + userId: v.id('users'), + pin: v.string(), + imageBase64: v.string(), + mediaType: v.string(), + status: v.union( + v.literal('pending'), + v.literal('uploading'), + v.literal('done'), + v.literal('failed') + ), + attempts: v.number(), + lastError: v.optional(v.string()), + createdAt: v.number(), + updatedAt: v.number() +}); + +export const enqueueUpload = mutation({ + args: { + userId: v.id('users'), + pin: v.string(), + imageBase64: v.string(), + mediaType: v.string() + }, + returns: v.id('pendingRoomUploads'), + handler: async (ctx, args) => { + const now = Date.now(); + return await ctx.db.insert('pendingRoomUploads', { + userId: args.userId, + pin: args.pin, + imageBase64: args.imageBase64, + mediaType: args.mediaType, + status: 'pending', + attempts: 0, + createdAt: now, + updatedAt: now + }); + } +}); + +export const listPendingUploads = query({ + args: {}, + returns: v.array(PENDING_UPLOAD_RETURN), + handler: async (ctx) => { + const now = Date.now(); + const items = await ctx.db + .query('pendingRoomUploads') + .withIndex('by_status_and_updated_at', (q) => q.eq('status', 'pending')) + .collect(); + return items.filter((i) => i.updatedAt <= now); + } +}); + +export const markUploading = mutation({ + args: { id: v.id('pendingRoomUploads') }, + returns: v.boolean(), + handler: async (ctx, args) => { + const item = await ctx.db.get(args.id); + if (!item || item.status !== 'pending') return false; + await ctx.db.patch(args.id, { status: 'uploading', updatedAt: Date.now() }); + return true; + } +}); + +export const markDone = mutation({ + args: { id: v.id('pendingRoomUploads') }, + returns: v.null(), + handler: async (ctx, args) => { + await ctx.db.delete(args.id); + return null; + } +}); + +export const bumpAttempt = mutation({ + args: { + id: v.id('pendingRoomUploads'), + error: v.string(), + nextRetryAt: v.number() + }, + returns: v.null(), + handler: async (ctx, args) => { + const item = await ctx.db.get(args.id); + if (!item) return null; + await ctx.db.patch(args.id, { + status: 'pending', + attempts: item.attempts + 1, + lastError: args.error, + updatedAt: args.nextRetryAt + }); + return null; + } +}); + +export const markFailed = mutation({ + args: { id: v.id('pendingRoomUploads'), error: v.string() }, + returns: v.null(), + handler: async (ctx, args) => { + const item = await ctx.db.get(args.id); + if (!item) return null; + await ctx.db.patch(args.id, { + status: 'failed', + lastError: args.error, + updatedAt: Date.now() + }); + return null; + } +}); + +export const resetStuckUploads = mutation({ + args: {}, + returns: v.number(), + handler: async (ctx) => { + const stuck = await ctx.db + .query('pendingRoomUploads') + .withIndex('by_status_and_updated_at', (q) => q.eq('status', 'uploading')) + .collect(); + const now = Date.now(); + for (const item of stuck) { + await ctx.db.patch(item._id, { status: 'pending', updatedAt: now }); + } + return stuck.length; + } +}); + +export const queueDepthForUser = query({ + args: { userId: v.id('users') }, + returns: v.object({ + pending: v.number(), + uploading: v.number(), + failed: v.number() + }), + handler: async (ctx, args) => { + const all = await ctx.db + .query('pendingRoomUploads') + .withIndex('by_user_id', (q) => q.eq('userId', args.userId)) + .collect(); + return { + pending: all.filter((i) => i.status === 'pending').length, + uploading: all.filter((i) => i.status === 'uploading').length, + failed: all.filter((i) => i.status === 'failed').length + }; + } +}); diff --git a/frontend/src/lib/convex/incomingSheets.ts b/frontend/src/lib/convex/incomingSheets.ts new file mode 100644 index 0000000..567afef --- /dev/null +++ b/frontend/src/lib/convex/incomingSheets.ts @@ -0,0 +1,121 @@ +import { v } from 'convex/values'; +import { mutation, query } from './_generated/server'; + +const SHEET_RETURN = v.object({ + _id: v.id('incomingSheets'), + _creationTime: v.number(), + userId: v.id('users'), + chatId: v.id('chats'), + pin: v.string(), + sheetId: v.number(), + sheetCreatedAt: v.number(), + text: v.string(), + status: v.union(v.literal('preview'), v.literal('accepted'), v.literal('dismissed')), + linkedMessageId: v.optional(v.id('messages')), + acceptedAt: v.optional(v.number()), + createdAt: v.number() +}); + +export const createMany = mutation({ + args: { + userId: v.id('users'), + chatId: v.id('chats'), + pin: v.string(), + sheets: v.array( + v.object({ + sheetId: v.number(), + sheetCreatedAt: v.number(), + text: v.string() + }) + ) + }, + returns: v.number(), + handler: async (ctx, args) => { + const now = Date.now(); + let inserted = 0; + for (const s of args.sheets) { + const existing = await ctx.db + .query('incomingSheets') + .withIndex('by_chat_id_and_sheet_id', (q) => + q.eq('chatId', args.chatId).eq('sheetId', s.sheetId) + ) + .unique(); + if (existing) continue; + await ctx.db.insert('incomingSheets', { + userId: args.userId, + chatId: args.chatId, + pin: args.pin, + sheetId: s.sheetId, + sheetCreatedAt: s.sheetCreatedAt, + text: s.text, + status: 'preview', + createdAt: now + }); + inserted += 1; + } + return inserted; + } +}); + +export const listForChat = query({ + args: { + chatId: v.id('chats'), + status: v.union(v.literal('preview'), v.literal('accepted'), v.literal('dismissed')) + }, + returns: v.array(SHEET_RETURN), + handler: async (ctx, args) => { + return await ctx.db + .query('incomingSheets') + .withIndex('by_chat_id_and_status', (q) => + q.eq('chatId', args.chatId).eq('status', args.status) + ) + .order('asc') + .collect(); + } +}); + +export const accept = mutation({ + args: { id: v.id('incomingSheets') }, + returns: v.union(v.id('messages'), v.null()), + handler: async (ctx, args) => { + const sheet = await ctx.db.get(args.id); + if (!sheet || sheet.status !== 'preview') return null; + + const messageId = await ctx.db.insert('messages', { + chatId: sheet.chatId, + role: 'user', + content: sheet.text, + source: 'web', + createdAt: Date.now() + }); + + const chat = await ctx.db.get(sheet.chatId); + if (chat) { + await ctx.db.insert('pendingGenerations', { + userId: chat.userId, + chatId: sheet.chatId, + userMessage: sheet.text, + createdAt: Date.now() + }); + } + + await ctx.db.patch(args.id, { + status: 'accepted', + linkedMessageId: messageId, + acceptedAt: Date.now() + }); + + return messageId; + } +}); + +export const dismiss = mutation({ + args: { id: v.id('incomingSheets') }, + returns: v.null(), + handler: async (ctx, args) => { + const sheet = await ctx.db.get(args.id); + if (!sheet || sheet.status !== 'preview') return null; + await ctx.db.patch(args.id, { status: 'dismissed' }); + return null; + } +}); diff --git a/frontend/src/lib/convex/messages.ts b/frontend/src/lib/convex/messages.ts index 194f4b0..3f96069 100644 --- a/frontend/src/lib/convex/messages.ts +++ b/frontend/src/lib/convex/messages.ts @@ -146,6 +146,24 @@ export const update = mutation({ } }); +export const setTelegramMessageId = mutation({ + args: { messageId: v.id('messages'), telegramMessageId: v.number() }, + returns: v.null(), + handler: async (ctx, args) => { + await ctx.db.patch(args.messageId, { telegramMessageId: args.telegramMessageId }); + return null; + } +}); + +export const getTelegramMessageId = query({ + args: { messageId: v.id('messages') }, + returns: v.union(v.number(), v.null()), + handler: async (ctx, args) => { + const msg = await ctx.db.get(args.messageId); + return msg?.telegramMessageId ?? null; + } +}); + export const getHistoryForAI = query({ args: { chatId: v.id('chats'), limit: v.optional(v.number()) }, returns: v.array( diff --git a/frontend/src/lib/convex/schema.ts b/frontend/src/lib/convex/schema.ts index f066952..8bed854 100644 --- a/frontend/src/lib/convex/schema.ts +++ b/frontend/src/lib/convex/schema.ts @@ -22,6 +22,15 @@ export default defineSchema({ injectDatabaseId: v.id('injectDatabases'), activeSince: v.number() }) + ), + collaborativeRoom: v.optional( + v.object({ + pin: v.string(), + creatorToken: v.optional(v.string()), + joinedAt: v.number(), + lastSeenHistoryCount: v.optional(v.number()), + lastSeenSheetId: v.optional(v.number()) + }) ) }).index('by_telegram_id', ['telegramId']), @@ -43,7 +52,8 @@ export default defineSchema({ followUpOptions: v.optional(v.array(v.string())), source: v.union(v.literal('telegram'), v.literal('web')), createdAt: v.number(), - isStreaming: v.optional(v.boolean()) + isStreaming: v.optional(v.boolean()), + telegramMessageId: v.optional(v.number()) }) .index('by_chat_id', ['chatId']) .index('by_chat_id_and_created_at', ['chatId', 'createdAt']), @@ -147,5 +157,58 @@ export default defineSchema({ }) .index('by_user_id', ['userId']) .index('by_user_id_and_inject_database_id', ['userId', 'injectDatabaseId']) - .index('by_inject_database_id', ['injectDatabaseId']) + .index('by_inject_database_id', ['injectDatabaseId']), + + pendingRoomUploads: defineTable({ + userId: v.id('users'), + pin: v.string(), + imageBase64: v.string(), + mediaType: v.string(), + status: v.union( + v.literal('pending'), + v.literal('uploading'), + v.literal('done'), + v.literal('failed') + ), + attempts: v.number(), + lastError: v.optional(v.string()), + createdAt: v.number(), + updatedAt: v.number() + }) + .index('by_status_and_updated_at', ['status', 'updatedAt']) + .index('by_user_id', ['userId']), + + incomingSolutions: defineTable({ + userId: v.id('users'), + chatId: v.id('chats'), + pin: v.string(), + conditionSnippet: v.string(), + problem: v.string(), + snippet: v.string(), + fullSolution: v.string(), + plain: v.string(), + status: v.union(v.literal('preview'), v.literal('accepted'), v.literal('dismissed')), + linkedMessageId: v.optional(v.id('messages')), + telegramMessageId: v.optional(v.number()), + acceptedAt: v.optional(v.number()), + createdAt: v.number() + }) + .index('by_chat_id_and_status', ['chatId', 'status']) + .index('by_status_and_telegram_message_id', ['status', 'telegramMessageId']) + .index('by_linked_message_id', ['linkedMessageId']), + + incomingSheets: defineTable({ + userId: v.id('users'), + chatId: v.id('chats'), + pin: v.string(), + sheetId: v.number(), + sheetCreatedAt: v.number(), + text: v.string(), + status: v.union(v.literal('preview'), v.literal('accepted'), v.literal('dismissed')), + linkedMessageId: v.optional(v.id('messages')), + acceptedAt: v.optional(v.number()), + createdAt: v.number() + }) + .index('by_chat_id_and_status', ['chatId', 'status']) + .index('by_chat_id_and_sheet_id', ['chatId', 'sheetId']) }); diff --git a/frontend/src/lib/convex/users.ts b/frontend/src/lib/convex/users.ts index d38c361..4c395ab 100644 --- a/frontend/src/lib/convex/users.ts +++ b/frontend/src/lib/convex/users.ts @@ -1,5 +1,6 @@ import { v } from 'convex/values'; import { mutation, query } from './_generated/server'; +import type { Id } from './_generated/dataModel'; const DEFAULT_MODEL = 'gemini-3-pro-preview'; @@ -28,6 +29,15 @@ export const getById = query({ injectDatabaseId: v.id('injectDatabases'), activeSince: v.number() }) + ), + collaborativeRoom: v.optional( + v.object({ + pin: v.string(), + creatorToken: v.optional(v.string()), + joinedAt: v.number(), + lastSeenHistoryCount: v.optional(v.number()), + lastSeenSheetId: v.optional(v.number()) + }) ) }), v.null() @@ -62,6 +72,15 @@ export const getByTelegramId = query({ injectDatabaseId: v.id('injectDatabases'), activeSince: v.number() }) + ), + collaborativeRoom: v.optional( + v.object({ + pin: v.string(), + creatorToken: v.optional(v.string()), + joinedAt: v.number(), + lastSeenHistoryCount: v.optional(v.number()), + lastSeenSheetId: v.optional(v.number()) + }) ) }), v.null() @@ -227,3 +246,71 @@ export const getInjectCollectionMode = query({ return user?.injectCollectionMode ?? null; } }); + +export const setCollaborativeRoom = mutation({ + args: { + userId: v.id('users'), + pin: v.string(), + creatorToken: v.optional(v.string()), + lastSeenSheetId: v.optional(v.number()) + }, + returns: v.null(), + handler: async (ctx, args) => { + await ctx.db.patch(args.userId, { + collaborativeRoom: { + pin: args.pin, + creatorToken: args.creatorToken, + joinedAt: Date.now(), + lastSeenSheetId: args.lastSeenSheetId ?? 0 + } + }); + return null; + } +}); + +export const clearCollaborativeRoom = mutation({ + args: { userId: v.id('users') }, + returns: v.null(), + handler: async (ctx, args) => { + await ctx.db.patch(args.userId, { collaborativeRoom: undefined }); + return null; + } +}); + +export const setLastSeenSheetId = mutation({ + args: { userId: v.id('users'), sheetId: v.number() }, + returns: v.null(), + handler: async (ctx, args) => { + const user = await ctx.db.get(args.userId); + if (!user?.collaborativeRoom) return null; + await ctx.db.patch(args.userId, { + collaborativeRoom: { ...user.collaborativeRoom, lastSeenSheetId: args.sheetId } + }); + return null; + } +}); + +export const listActiveRoomUsers = query({ + args: {}, + returns: v.array( + v.object({ + _id: v.id('users'), + pin: v.string(), + lastSeenSheetId: v.number() + }) + ), + handler: async (ctx) => { + const users = await ctx.db.query('users').collect(); + const result: Array<{ _id: Id<'users'>; pin: string; lastSeenSheetId: number }> = []; + for (const u of users) { + if (u.collaborativeRoom) { + result.push({ + _id: u._id, + pin: u.collaborativeRoom.pin, + lastSeenSheetId: u.collaborativeRoom.lastSeenSheetId ?? 0 + }); + } + } + return result; + } +}); diff --git a/frontend/src/lib/utils/markdown.ts b/frontend/src/lib/utils/markdown.ts new file mode 100644 index 0000000..c60bf2e --- /dev/null +++ b/frontend/src/lib/utils/markdown.ts @@ -0,0 +1,19 @@ +import { Marked } from 'marked'; + +const marked = new Marked({ breaks: true, gfm: true }); + +export function processLatex(text: string): string { + return text + .replace(/\$\$(.*?)\$\$/gs, (_, tex) => { + const encoded = encodeURIComponent(tex.trim()); + return `LaTeX`; + }) + .replace(/\$(.+?)\$/g, (_, tex) => { + const encoded = encodeURIComponent(tex.trim()); + return `LaTeX`; + }); +} + +export function processContent(text: string): string { + return marked.parse(processLatex(text)) as string; +} diff --git a/frontend/src/routes/[mnemonic]/+page.svelte b/frontend/src/routes/[mnemonic]/+page.svelte index e839149..6c53a1f 100644 --- a/frontend/src/routes/[mnemonic]/+page.svelte +++ b/frontend/src/routes/[mnemonic]/+page.svelte @@ -20,6 +20,7 @@ import PhotoPreview from '$lib/components/PhotoPreview.svelte'; import DraftBadge from '$lib/components/DraftBadge.svelte'; import SilentCapture from '$lib/components/SilentCapture.svelte'; + import IncomingSheetsPanel from '$lib/components/IncomingSheetsPanel.svelte'; const usePolling = getContext('convex-use-polling') ?? false; let mnemonic = $derived(page.params.mnemonic); @@ -193,6 +194,38 @@ const photoDraft = $derived(usePolling ? photoDraftPoll! : photoDraftWs!); const draftPhotos = $derived(photoDraft.data?.photos ?? []); + const incomingSheetsWs = usePolling + ? null + : useQuery(api.incomingSheets.listForChat, () => + chatId ? { chatId, status: 'preview' as const } : 'skip' + ); + const incomingSheetsPoll = usePolling + ? usePollingQuery(api.incomingSheets.listForChat, () => + chatId ? { chatId, status: 'preview' as const } : 'skip' + ) + : null; + const incomingSheetsQ = $derived(usePolling ? incomingSheetsPoll! : incomingSheetsWs!); + const incomingSheetItems = $derived(incomingSheetsQ.data ?? []); + + const acceptSheetPoll = usePolling ? usePollingMutation(api.incomingSheets.accept) : null; + const dismissSheetPoll = usePolling ? usePollingMutation(api.incomingSheets.dismiss) : null; + + function handleAcceptSheet(id: Id<'incomingSheets'>) { + if (usePolling && acceptSheetPoll) { + acceptSheetPoll({ id }); + } else if (clientWs) { + clientWs.mutation(api.incomingSheets.accept, { id }); + } + } + + function handleDismissSheet(id: Id<'incomingSheets'>) { + if (usePolling && dismissSheetPoll) { + dismissSheetPoll({ id }); + } else if (clientWs) { + clientWs.mutation(api.incomingSheets.dismiss, { id }); + } + } + $effect(() => { const req = captureNowRequest.data; if (req && hasCamera && !processedCaptureNowIds.has(req._id)) { @@ -611,6 +644,13 @@ {#if draftPhotos.length > 0} {/if} + {#if incomingSheetItems.length > 0} + + {/if} 0} />
{/if}