feat(bot,frontend,backend): add integration with external collaborative solver

This commit is contained in:
h
2026-05-04 12:20:40 +02:00
parent 8379929372
commit a99a80f6c9
23 changed files with 1115 additions and 36 deletions
+12 -4
View File
@@ -11,20 +11,28 @@ setup_logging()
async def runner() -> None:
from . import handlers # noqa: PLC0415
from .common import bot, dp # noqa: PLC0415
from .sync import start_sync_listener # noqa: PLC0415
from .sync import ( # noqa: PLC0415
start_room_inbox_poller,
start_room_upload_worker,
start_sync_listener,
)
dp.include_routers(handlers.router)
sync_task = asyncio.create_task(start_sync_listener(bot))
upload_task = asyncio.create_task(start_room_upload_worker(bot))
inbox_task = asyncio.create_task(start_room_inbox_poller(bot))
await bot.delete_webhook(drop_pending_updates=True)
try:
await dp.start_polling(bot)
finally:
sync_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await sync_task
for t in (sync_task, upload_task, inbox_task):
t.cancel()
for t in (sync_task, upload_task, inbox_task):
with contextlib.suppress(asyncio.CancelledError):
await t
def plugins() -> None:
+2 -1
View File
@@ -1,6 +1,6 @@
from aiogram import Router
from . import apikey, chat, initialize, inject, message, proxy, rag, start
from . import apikey, chat, initialize, inject, message, proxy, rag, room, start
router = Router()
@@ -12,5 +12,6 @@ router.include_routers(
rag.router,
inject.router,
proxy.router,
room.router,
message.router,
)
@@ -19,6 +19,7 @@ async def startup(bot: Bot) -> None:
types.BotCommand(command="/preset", description="Apply a preset"),
types.BotCommand(command="/proxy", description="Proxy chat to another bot"),
types.BotCommand(command="/inject", description="Inject knowledge base"),
types.BotCommand(command="/room", description="Manage collaborative room"),
]
)
logger.info(f"[green]Started as[/] @{(await bot.me()).username}")
+61 -5
View File
@@ -36,6 +36,45 @@ convex = ConvexClient(env.convex_url)
ALBUM_COLLECT_DELAY = 0.5
_room_upload_tasks: set[asyncio.Task] = set()
async def _enqueue_room_upload(
user_id: str, pin: str, image_b64: str, media_type: str
) -> None:
try:
await convex.mutation(
"collaborative:enqueueUpload",
{
"userId": user_id,
"pin": pin,
"imageBase64": image_b64,
"mediaType": media_type,
},
)
except Exception as e: # noqa: BLE001
from utils.logging import logger as _logger # noqa: PLC0415
_logger.warning(f"Failed to enqueue room upload: {e}")
def schedule_room_uploads(
user: dict, images_base64: list[str], media_types: list[str]
) -> None:
room = user.get("collaborativeRoom") if user else None
if not room:
return
pin = room.get("pin")
if not pin:
return
user_id = user["_id"]
for img_b64, media_type in zip(images_base64, media_types, strict=True):
task = asyncio.create_task(
_enqueue_room_upload(user_id, pin, img_b64, media_type)
)
_room_upload_tasks.add(task)
task.add_done_callback(_room_upload_tasks.discard)
class AlbumMiddleware(BaseMiddleware):
def __init__(self) -> None:
@@ -197,13 +236,17 @@ class ProxyStreamingState:
async def send_long_message(
bot: Bot, chat_id: int, text: str, reply_markup: ReplyKeyboardMarkup | None = None
) -> None:
) -> int | None:
parts = split_message(text)
first_id: int | None = None
for i, part in enumerate(parts):
is_last = i == len(parts) - 1
await bot.send_message(
sent = await bot.send_message(
chat_id, html.quote(part), reply_markup=reply_markup if is_last else None
)
if first_id is None:
first_id = sent.message_id
return first_id
async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
@@ -300,7 +343,7 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
api_key=api_key,
model_name=model_name,
system_prompt=system_prompt,
rag_db_names=rag_db_names if rag_db_names else None,
rag_db_names=rag_db_names or None,
)
agent_deps = (
@@ -383,7 +426,16 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
with contextlib.suppress(Exception):
await processing_msg.delete()
keyboard = make_follow_up_keyboard(follow_ups)
await send_long_message(bot, tg_chat_id, final_answer, keyboard)
sent_id = await send_long_message(bot, tg_chat_id, final_answer, keyboard)
if sent_id is not None:
with contextlib.suppress(Exception):
await convex.mutation(
"messages:setTelegramMessageId",
{
"messageId": assistant_message_id,
"telegramMessageId": sent_id,
},
)
except Exception as e: # noqa: BLE001
if state:
@@ -502,7 +554,7 @@ async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915
api_key=api_key,
model_name=model_name,
system_prompt=system_prompt,
rag_db_names=rag_db_names if rag_db_names else None,
rag_db_names=rag_db_names or None,
)
agent_deps = (
@@ -712,6 +764,8 @@ async def on_album_message(
},
)
schedule_room_uploads(user, images_base64, images_media_types)
await process_message(
message.from_user.id,
caption,
@@ -782,6 +836,8 @@ async def on_photo_message(message: types.Message, bot: Bot) -> None:
},
)
schedule_room_uploads(user, [image_base64], [media_type])
await process_message(
message.from_user.id,
caption,
@@ -0,0 +1,3 @@
from .handler import router
__all__ = ["router"]
+202
View File
@@ -0,0 +1,202 @@
from aiogram import Router, types
from aiogram.filters import Command
from convex import ConvexInt64
from utils import env
from utils.collaborative import CollaborativeHTTPError, get_collaborative_client
from utils.convex import ConvexClient
from utils.logging import logger
router = Router()
convex = ConvexClient(env.convex_url)
@router.message(Command("room"))
async def on_room(message: types.Message) -> None: # noqa: C901
if not message.from_user or not message.text:
return
if not env.collaborative.enabled:
await message.answer("Collaborative Solver is not configured on this instance.")
return
args = message.text.split()[1:]
if not args:
await show_usage(message)
return
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)}
)
if not user:
await convex.mutation(
"users:getOrCreate",
{
"telegramId": ConvexInt64(message.from_user.id),
"telegramChatId": ConvexInt64(message.chat.id),
},
)
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)}
)
if not user:
await message.answer("User not found.")
return
user_id = user["_id"]
sub = args[0]
if sub == "create":
await create_room(message, user_id)
elif sub == "join":
if len(args) < 2: # noqa: PLR2004
await message.answer(
"Usage: <code>/room join &lt;pin&gt;</code>", parse_mode="HTML"
)
return
await join_room(message, user_id, args[1].strip())
elif sub == "leave":
await leave_room(message, user_id, user.get("collaborativeRoom"))
elif sub == "status":
await show_status(message, user_id, user.get("collaborativeRoom"))
else:
await show_usage(message)
async def show_usage(message: types.Message) -> None:
await message.answer(
"<b>Room commands:</b>\n\n"
"<code>/room create</code> - Create a new room and get a PIN\n"
"<code>/room join &lt;pin&gt;</code> - Join an existing room\n"
"<code>/room leave</code> - Leave current room\n"
"<code>/room status</code> - Show current room status",
parse_mode="HTML",
)
async def create_room(message: types.Message, user_id: str) -> None:
client = get_collaborative_client()
try:
resp = await client.create_room()
except (CollaborativeHTTPError, Exception) as e: # noqa: BLE001
logger.error(f"Failed to create room: {e}")
await message.answer(f"Failed to create room: {e}")
return
pin = resp.get("pin", "")
creator_token = resp.get("creator_token", "")
if not pin:
await message.answer("Got invalid response from solver.")
return
await convex.mutation(
"users:setCollaborativeRoom",
{
"userId": user_id,
"pin": pin,
"creatorToken": creator_token,
"lastSeenSheetId": 0,
},
)
await message.answer(
f"<b>Room created!</b>\n\n"
f"PIN: <code>{pin}</code>\n\n"
f"Share this PIN with others so they can join with "
f"<code>/room join {pin}</code>.",
parse_mode="HTML",
)
async def join_room(message: types.Message, user_id: str, pin: str) -> None:
client = get_collaborative_client()
try:
sheets_resp = await client.get_sheets(pin)
except CollaborativeHTTPError as e:
if e.status_code == 404: # noqa: PLR2004
await message.answer(
f"Room <code>{pin}</code> not found.", parse_mode="HTML"
)
else:
await message.answer(f"Failed to join room: {e}")
return
except Exception as e: # noqa: BLE001
logger.error(f"Failed to join room: {e}")
await message.answer(f"Failed to join room: {e}")
return
sheets = sheets_resp.get("sheets", []) or []
last_sheet_id = max((int(s["id"]) for s in sheets), default=0)
await convex.mutation(
"users:setCollaborativeRoom",
{"userId": user_id, "pin": pin, "lastSeenSheetId": last_sheet_id},
)
await message.answer(
f"Joined room <code>{pin}</code> ({len(sheets)} sheet(s) so far).\n"
f"New sheets will appear in the web UI.",
parse_mode="HTML",
)
async def leave_room(message: types.Message, user_id: str, room: dict | None) -> None:
if not room:
await message.answer("Not in any room.")
return
pin = room.get("pin", "")
creator_token = room.get("creatorToken")
if creator_token:
client = get_collaborative_client()
try:
await client.delete_room(pin, creator_token)
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to delete room {pin}: {e}")
await convex.mutation("users:clearCollaborativeRoom", {"userId": user_id})
await message.answer(f"Left room <code>{pin}</code>.", parse_mode="HTML")
async def show_status(message: types.Message, user_id: str, room: dict | None) -> None:
if not room:
await message.answer(
"Not in any room.\n\nUse <code>/room create</code> or "
"<code>/room join &lt;pin&gt;</code>.",
parse_mode="HTML",
)
return
pin = room.get("pin", "")
client = get_collaborative_client()
info: dict = {}
sheets_count: int | str = "?"
try:
info = await client.get_room(pin)
except Exception as e: # noqa: BLE001
logger.warning(f"get_room failed: {e}")
try:
sheets_resp = await client.get_sheets(pin)
sheets_count = len(sheets_resp.get("sheets", []) or [])
except Exception as e: # noqa: BLE001
logger.warning(f"get_sheets failed: {e}")
queue = await convex.query("collaborative:queueDepthForUser", {"userId": user_id})
role = "creator" if room.get("creatorToken") else "member"
last_seen = room.get("lastSeenSheetId", 0)
await message.answer(
f"<b>Room <code>{pin}</code></b> ({role})\n"
f"Members: {info.get('members', '?')}\n"
f"Processing: {info.get('processing', False)}\n"
f"Sheets in room: {sheets_count}\n"
f"Last seen sheet id: {last_seen}\n\n"
f"<b>Upload queue:</b>\n"
f" pending: {queue.get('pending', 0)}\n"
f" uploading: {queue.get('uploading', 0)}\n"
f" failed: {queue.get('failed', 0)}",
parse_mode="HTML",
)
+167 -1
View File
@@ -1,16 +1,23 @@
import asyncio
import base64
import time
from aiogram import Bot
from bot.handlers.message.handler import process_message_from_web
from utils import env
from utils.collaborative import (
CollaborativeClient,
CollaborativeHTTPError,
get_collaborative_client,
)
from utils.convex import ConvexClient
from utils.logging import logger
convex = ConvexClient(env.convex_url)
background_tasks = set()
background_tasks: set[asyncio.Task] = set()
async def start_sync_listener(bot: Bot) -> None:
@@ -58,3 +65,162 @@ async def handle_pending_generation(bot: Bot, item: dict, item_id: str) -> None:
logger.error(f"Error processing {item_id}: {e}")
finally:
await convex.mutation("pendingGenerations:remove", {"id": item_id})
async def start_room_upload_worker(bot: Bot) -> None: # noqa: ARG001
if not env.collaborative.enabled:
logger.info("Collaborative not configured; upload worker disabled.")
return
logger.info("Starting collaborative upload worker...")
try:
reset_count = await convex.mutation("collaborative:resetStuckUploads", {})
if reset_count:
logger.info(f"Reset {reset_count} stuck uploads to pending.")
except Exception as e: # noqa: BLE001
logger.warning(f"Failed to reset stuck uploads: {e}")
in_flight: set[str] = set()
sub = convex.subscribe("collaborative:listPendingUploads", {})
try:
async for pending_list in sub:
for item in pending_list or []:
item_id = item["_id"]
if item_id in in_flight:
continue
if item.get("attempts", 0) >= env.collaborative.upload_max_attempts:
continue
in_flight.add(item_id)
task = asyncio.create_task(_handle_upload(item, item_id, in_flight))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
except asyncio.CancelledError:
logger.info("Upload worker cancelled")
raise
except Exception as e: # noqa: BLE001
logger.error(f"Upload worker error: {e}")
finally:
sub.unsubscribe()
async def _handle_upload(item: dict, item_id: str, in_flight: set[str]) -> None:
try:
claimed = await convex.mutation("collaborative:markUploading", {"id": item_id})
if not claimed:
return
client = get_collaborative_client()
try:
image_bytes = base64.b64decode(item["imageBase64"])
await client.upload_photo(
pin=item["pin"], image_bytes=image_bytes, media_type=item["mediaType"]
)
await convex.mutation("collaborative:markDone", {"id": item_id})
logger.info(f"Room upload {item_id} done.")
except CollaborativeHTTPError as e:
if 400 <= e.status_code < 500: # noqa: PLR2004
await convex.mutation(
"collaborative:markFailed", {"id": item_id, "error": str(e)}
)
logger.warning(f"Room upload {item_id} failed (4xx): {e}")
else:
await _bump_with_backoff(item, item_id, str(e))
except Exception as e: # noqa: BLE001
await _bump_with_backoff(item, item_id, str(e))
finally:
in_flight.discard(item_id)
async def _bump_with_backoff(item: dict, item_id: str, error: str) -> None:
attempts = item.get("attempts", 0) + 1
base = env.collaborative.upload_backoff_base_seconds
delay_seconds = min(base * (2 ** (attempts - 1)), 60.0)
next_retry_at_ms = int((time.time() + delay_seconds) * 1000)
await convex.mutation(
"collaborative:bumpAttempt",
{"id": item_id, "error": error[:500], "nextRetryAt": next_retry_at_ms},
)
logger.info(
f"Room upload {item_id} retry in {delay_seconds:.1f}s "
f"(attempt {attempts}): {error[:100]}"
)
async def start_room_inbox_poller(bot: Bot) -> None: # noqa: ARG001
if not env.collaborative.enabled:
logger.info("Collaborative not configured; inbox poller disabled.")
return
logger.info("Starting collaborative inbox poller...")
interval = env.collaborative.poll_interval_seconds
client = get_collaborative_client()
while True:
try:
users = await convex.query("users:listActiveRoomUsers", {})
for user in users or []:
await _poll_user_room(client, user)
except asyncio.CancelledError:
logger.info("Inbox poller cancelled")
raise
except Exception as e: # noqa: BLE001
logger.warning(f"Inbox poller iteration failed: {e}")
try:
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info("Inbox poller cancelled")
raise
async def _poll_user_room(client: CollaborativeClient, user: dict) -> None:
pin = user["pin"]
last_seen_sheet_id = int(user.get("lastSeenSheetId", 0) or 0)
user_id = user["_id"]
try:
resp = await client.get_sheets(pin)
except CollaborativeHTTPError as e:
logger.warning(f"Room {pin} sheets fetch failed ({e.status_code}); skipping.")
return
except Exception as e: # noqa: BLE001
logger.warning(f"Room {pin} sheets fetch error: {e}")
return
sheets = resp.get("sheets", []) or []
new_sheets = [s for s in sheets if int(s.get("id", 0)) > last_seen_sheet_id]
if not new_sheets:
return
user_doc = await convex.query("users:getById", {"userId": user_id})
if not user_doc or not user_doc.get("activeChatId"):
return
chat_id = user_doc["activeChatId"]
payload = [
{
"sheetId": int(s["id"]),
"sheetCreatedAt": float(s.get("created_at", 0)),
"text": s.get("text", "") or "",
}
for s in new_sheets
]
max_id = max(int(s["id"]) for s in new_sheets)
try:
inserted = await convex.mutation(
"incomingSheets:createMany",
{"userId": user_id, "chatId": chat_id, "pin": pin, "sheets": payload},
)
await convex.mutation(
"users:setLastSeenSheetId", {"userId": user_id, "sheetId": max_id}
)
logger.info(
f"Room {pin}: ingested {inserted} new sheet(s) (cursor → {max_id})."
)
except Exception as e: # noqa: BLE001
logger.error(f"Failed to ingest sheets for {pin}: {e}")
@@ -0,0 +1,7 @@
from .client import (
CollaborativeClient,
CollaborativeHTTPError,
get_collaborative_client,
)
__all__ = ["CollaborativeClient", "CollaborativeHTTPError", "get_collaborative_client"]
+67
View File
@@ -0,0 +1,67 @@
from typing import Any
import httpx
from utils import env
class CollaborativeHTTPError(Exception):
def __init__(self, status_code: int, body: str) -> None:
super().__init__(f"HTTP {status_code}: {body[:200]}")
self.status_code = status_code
self.body = body
class CollaborativeClient:
def __init__(self) -> None:
cfg = env.collaborative
timeout = httpx.Timeout(connect=10.0, read=30.0, write=60.0, pool=10.0)
self._client = httpx.AsyncClient(
base_url=cfg.api_url.rstrip("/"),
timeout=timeout,
headers={"X-API-Key": cfg.api_key.get_secret_value()},
)
async def aclose(self) -> None:
await self._client.aclose()
async def _request(
self,
method: str,
path: str,
**kwargs: Any, # noqa: ANN401
) -> dict[str, Any]:
resp = await self._client.request(method, path, **kwargs)
if resp.status_code >= 400: # noqa: PLR2004
raise CollaborativeHTTPError(resp.status_code, resp.text)
return resp.json()
async def create_room(self) -> dict[str, str]:
return await self._request("POST", "/api/v1/rooms")
async def get_room(self, pin: str) -> dict[str, Any]:
return await self._request("GET", f"/api/v1/rooms/{pin}")
async def get_sheets(self, pin: str) -> dict[str, Any]:
return await self._request("GET", f"/api/v1/rooms/{pin}/sheets")
async def delete_room(self, pin: str, creator_token: str) -> dict[str, Any]:
return await self._request(
"DELETE", f"/api/v1/rooms/{pin}", params={"creator_token": creator_token}
)
async def upload_photo(
self, pin: str, image_bytes: bytes, media_type: str, filename: str = "photo.jpg"
) -> dict[str, Any]:
files = {"file": (filename, image_bytes, media_type)}
return await self._request("POST", f"/api/v1/rooms/{pin}/upload", files=files)
_client: CollaborativeClient | None = None
def get_collaborative_client() -> CollaborativeClient:
global _client # noqa: PLW0603
if _client is None:
_client = CollaborativeClient()
return _client
+14
View File
@@ -17,10 +17,24 @@ class LogSettings(BaseSettings):
console_width: int = 150
class CollaborativeSettings(BaseSettings):
api_url: str = ""
api_key: SecretStr = SecretStr("")
poll_interval_seconds: int = 8
upload_max_attempts: int = 8
upload_backoff_base_seconds: float = 2.0
request_timeout_seconds: int = 30
@property
def enabled(self) -> bool:
return bool(self.api_url)
class Settings(BaseSettings):
bot: BotSettings
site: SiteSettings
log: LogSettings
collaborative: CollaborativeSettings
convex_url: str = Field(validation_alias=AliasChoices("CONVEX_SELF_HOSTED_URL"))
convex_http_url: str = Field(