feat(*): first mvp

This commit is contained in:
h
2026-01-20 21:54:48 +01:00
parent b9703da2fc
commit ec17f5e0fd
52 changed files with 2599 additions and 576 deletions

View File

@@ -11,11 +11,20 @@ setup_logging()
async def runner() -> None:
from . import handlers # noqa: PLC0415
from .common import bot, dp # noqa: PLC0415
from .sync import start_sync_listener # noqa: PLC0415
dp.include_routers(handlers.router)
sync_task = asyncio.create_task(start_sync_listener(bot))
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot)
try:
await dp.start_polling(bot)
finally:
sync_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await sync_task
def plugins() -> None:

View File

@@ -1,7 +1,9 @@
from aiogram import Router
from . import initialize, start
from . import apikey, chat, initialize, message, start
router = Router()
router.include_routers(start.router, initialize.router)
router.include_routers(
start.router, initialize.router, apikey.router, chat.router, message.router
)

View File

@@ -0,0 +1,3 @@
from .handler import router
__all__ = ["router"]

View File

@@ -0,0 +1,33 @@
from aiogram import Router, types
from aiogram.filters import Command
from convex import ConvexInt64
from utils import env
from utils.convex import ConvexClient
router = Router()
convex = ConvexClient(env.convex_url)
@router.message(Command("apikey"))
async def on_apikey(message: types.Message) -> None:
if not message.from_user:
return
args = message.text.split(maxsplit=1) if message.text else []
if len(args) < 2: # noqa: PLR2004
await message.answer(
"Usage: /apikey YOUR_GEMINI_API_KEY\n\n"
"Get your API key at https://aistudio.google.com/apikey"
)
return
api_key = args[1].strip()
user_id = await convex.mutation(
"users:getOrCreate", {"telegramId": ConvexInt64(message.from_user.id)}
)
await convex.mutation("users:setApiKey", {"userId": user_id, "apiKey": api_key})
await message.delete()
await message.answer("✓ API key saved. Use /new to create a chat.")

View File

@@ -0,0 +1,3 @@
from .handlers import router
__all__ = ["router"]

View File

@@ -0,0 +1,155 @@
from aiogram import Router, types
from aiogram.filters import Command
from convex import ConvexInt64
from bot.modules.ai import PRESETS
from bot.modules.mnemonic import generate_mnemonic
from utils import env
from utils.convex import ConvexClient
router = Router()
convex = ConvexClient(env.convex_url)
@router.message(Command("new"))
async def on_new(message: types.Message) -> None:
if not message.from_user:
return
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)}
)
if not user:
await message.answer("Use /apikey first to set your Gemini API key.")
return
if not user.get("geminiApiKey"):
await message.answer("Use /apikey first to set your Gemini API key.")
return
mnemonic = generate_mnemonic()
chat_id = await convex.mutation(
"chats:create", {"userId": user["_id"], "mnemonic": mnemonic}
)
await convex.mutation(
"users:setActiveChat", {"userId": user["_id"], "chatId": chat_id}
)
url = f"{env.site.url}/{mnemonic}"
await message.answer(f"New chat created!\n\n<code>{url}</code>", parse_mode="HTML")
@router.message(Command("clear"))
async def on_clear(message: types.Message) -> None:
if not message.from_user:
return
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)}
)
if not user or not user.get("activeChatId"):
await message.answer("No active chat. Use /new to create one.")
return
await convex.mutation("chats:clear", {"chatId": user["activeChatId"]})
await message.answer("✓ Chat history cleared.")
@router.message(Command("prompt"))
async def on_prompt(message: types.Message) -> None:
if not message.from_user:
return
args = message.text.split(maxsplit=1) if message.text else []
if len(args) < 2: # noqa: PLR2004
await message.answer(
"Usage: /prompt YOUR_SYSTEM_PROMPT\n\n"
"Example: /prompt You are a helpful math tutor."
)
return
prompt = args[1].strip()
user_id = await convex.mutation(
"users:getOrCreate", {"telegramId": ConvexInt64(message.from_user.id)}
)
await convex.mutation(
"users:setSystemPrompt", {"userId": user_id, "prompt": prompt}
)
await message.answer("✓ System prompt updated.")
@router.message(Command("model"))
async def on_model(message: types.Message) -> None:
if not message.from_user:
return
args = message.text.split(maxsplit=1) if message.text else []
if len(args) < 2: # noqa: PLR2004
await message.answer(
"Usage: /model MODEL_NAME\n\n"
"Available models:\n"
"• gemini-2.5-pro-preview-05-06 (default)\n"
"• gemini-2.5-flash-preview-05-20\n"
"• gemini-2.0-flash"
)
return
model = args[1].strip()
user_id = await convex.mutation(
"users:getOrCreate", {"telegramId": ConvexInt64(message.from_user.id)}
)
await convex.mutation("users:setModel", {"userId": user_id, "model": model})
await message.answer(f"✓ Model set to {model}")
@router.message(Command("presets"))
async def on_presets(message: types.Message) -> None:
if not message.from_user:
return
lines = ["<b>Available presets:</b>\n"]
lines.extend(f"• <code>/preset {name}</code>" for name in PRESETS)
lines.append("\nUse /preset NAME to apply a preset.")
await message.answer("\n".join(lines), parse_mode="HTML")
@router.message(Command("preset"))
async def on_preset(message: types.Message) -> None:
if not message.from_user:
return
args = message.text.split(maxsplit=1) if message.text else []
if len(args) < 2: # noqa: PLR2004
await message.answer(
"Usage: /preset NAME\n\nUse /presets to see available presets."
)
return
preset_name = args[1].strip().lower()
preset = PRESETS.get(preset_name)
if not preset:
await message.answer(
f"Unknown preset: {preset_name}\n\nUse /presets to see available presets."
)
return
system_prompt, follow_up_prompt = preset
user_id = await convex.mutation(
"users:getOrCreate", {"telegramId": ConvexInt64(message.from_user.id)}
)
await convex.mutation(
"users:setSystemPrompt", {"userId": user_id, "prompt": system_prompt}
)
await convex.mutation(
"users:setFollowUpPrompt", {"userId": user_id, "prompt": follow_up_prompt}
)
await message.answer(f"✓ Preset '{preset_name}' applied.")

View File

@@ -8,7 +8,16 @@ router = Router()
@router.startup()
async def startup(bot: Bot) -> None:
await bot.set_my_commands(
[types.BotCommand(command="/start", description="Start bot")]
[
types.BotCommand(command="/start", description="Start bot"),
types.BotCommand(command="/apikey", description="Set Gemini API key"),
types.BotCommand(command="/new", description="Create new chat"),
types.BotCommand(command="/clear", description="Clear chat history"),
types.BotCommand(command="/prompt", description="Set system prompt"),
types.BotCommand(command="/model", description="Change AI model"),
types.BotCommand(command="/presets", description="Show prompt presets"),
types.BotCommand(command="/preset", description="Apply a preset"),
]
)
logger.info(f"[green]Started as[/] @{(await bot.me()).username}")

View File

@@ -0,0 +1,3 @@
from .handler import router
__all__ = ["router"]

View File

@@ -0,0 +1,401 @@
import asyncio
import contextlib
import io
import time
from aiogram import Bot, F, Router, html, types
from aiogram.enums import ChatAction
from aiogram.types import KeyboardButton, ReplyKeyboardMarkup, ReplyKeyboardRemove
from convex import ConvexInt64
from bot.modules.ai import (
SUMMARIZE_PROMPT,
ImageData,
create_follow_up_agent,
create_text_agent,
get_follow_ups,
stream_response,
)
from utils import env
from utils.convex import ConvexClient
router = Router()
convex = ConvexClient(env.convex_url)
EDIT_THROTTLE_SECONDS = 1.0
TELEGRAM_MAX_LENGTH = 4096
def make_follow_up_keyboard(options: list[str]) -> ReplyKeyboardMarkup:
buttons = [[KeyboardButton(text=opt)] for opt in options]
return ReplyKeyboardMarkup(
keyboard=buttons, resize_keyboard=True, one_time_keyboard=True
)
def split_message(text: str, max_length: int = TELEGRAM_MAX_LENGTH) -> list[str]:
if len(text) <= max_length:
return [text]
parts: list[str] = []
while text:
if len(text) <= max_length:
parts.append(text)
break
split_pos = text.rfind("\n", 0, max_length)
if split_pos == -1:
split_pos = text.rfind(" ", 0, max_length)
if split_pos == -1:
split_pos = max_length
parts.append(text[:split_pos])
text = text[split_pos:].lstrip()
return parts
class StreamingState:
def __init__(self, bot: Bot, chat_id: int, message: types.Message) -> None:
self.bot = bot
self.chat_id = chat_id
self.message = message
self.last_edit_time = 0.0
self.last_content = ""
self.pending_content: str | None = None
self._typing_task: asyncio.Task[None] | None = None
async def start_typing(self) -> None:
async def typing_loop() -> None:
while True:
await self.bot.send_chat_action(self.chat_id, ChatAction.TYPING)
await asyncio.sleep(4)
self._typing_task = asyncio.create_task(typing_loop())
async def stop_typing(self) -> None:
if self._typing_task:
self._typing_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._typing_task
async def update_message(self, content: str, *, force: bool = False) -> None:
if content == self.last_content:
return
if len(content) > TELEGRAM_MAX_LENGTH:
display_content = content[: TELEGRAM_MAX_LENGTH - 3] + "..."
else:
display_content = content
now = time.monotonic()
if force or (now - self.last_edit_time) >= EDIT_THROTTLE_SECONDS:
with contextlib.suppress(Exception):
await self.message.edit_text(html.quote(display_content))
self.last_edit_time = now
self.last_content = content
self.pending_content = None
else:
self.pending_content = content
async def flush(self) -> None:
if self.pending_content and self.pending_content != self.last_content:
await self.update_message(self.pending_content, force=True)
async def send_long_message(
bot: Bot, chat_id: int, text: str, reply_markup: ReplyKeyboardMarkup | None = None
) -> None:
parts = split_message(text)
for i, part in enumerate(parts):
is_last = i == len(parts) - 1
await bot.send_message(
chat_id, html.quote(part), reply_markup=reply_markup if is_last else None
)
async def process_message_from_web( # noqa: C901, PLR0915
convex_user_id: str, text: str, bot: Bot, convex_chat_id: str
) -> None:
user = await convex.query("users:getById", {"userId": convex_user_id})
if not user or not user.get("geminiApiKey"):
return
tg_chat_id = user["telegramChatId"].value if user.get("telegramChatId") else None
is_summarize = text == "/summarize"
if tg_chat_id and not is_summarize:
await bot.send_message(
tg_chat_id, f"📱 {html.quote(text)}", reply_markup=ReplyKeyboardRemove()
)
api_key = user["geminiApiKey"]
model_name = user.get("model", "gemini-3-pro-preview")
assistant_message_id = await convex.mutation(
"messages:create",
{
"chatId": convex_chat_id,
"role": "assistant",
"content": "",
"source": "web",
"isStreaming": True,
},
)
history = await convex.query(
"messages:getHistoryForAI", {"chatId": convex_chat_id, "limit": 50}
)
system_prompt = SUMMARIZE_PROMPT if is_summarize else user.get("systemPrompt")
text_agent = create_text_agent(
api_key=api_key, model_name=model_name, system_prompt=system_prompt
)
processing_msg = None
state = None
if tg_chat_id:
processing_msg = await bot.send_message(tg_chat_id, "...")
state = StreamingState(bot, tg_chat_id, processing_msg)
try:
if state:
await state.start_typing()
async def on_chunk(content: str) -> None:
if state:
await state.update_message(content)
await convex.mutation(
"messages:update",
{"messageId": assistant_message_id, "content": content},
)
if is_summarize:
prompt_text = "Summarize what was done in this conversation."
hist = history[:-2]
else:
prompt_text = text
hist = history[:-1]
final_answer = await stream_response(text_agent, prompt_text, hist, on_chunk)
if state:
await state.flush()
full_history = [*history, {"role": "assistant", "content": final_answer}]
follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite")
follow_up_prompt = user.get("followUpPrompt")
follow_up_agent = create_follow_up_agent(
api_key=api_key, model_name=follow_up_model, system_prompt=follow_up_prompt
)
follow_ups = await get_follow_ups(follow_up_agent, full_history)
if state:
await state.stop_typing()
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": final_answer,
"followUpOptions": follow_ups,
"isStreaming": False,
},
)
if tg_chat_id and processing_msg:
with contextlib.suppress(Exception):
await processing_msg.delete()
keyboard = make_follow_up_keyboard(follow_ups)
await send_long_message(bot, tg_chat_id, final_answer, keyboard)
except Exception as e: # noqa: BLE001
if state:
await state.stop_typing()
error_msg = f"Error: {e}"
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": error_msg,
"isStreaming": False,
},
)
if tg_chat_id and processing_msg:
with contextlib.suppress(Exception):
truncated = html.quote(error_msg[:TELEGRAM_MAX_LENGTH])
await processing_msg.edit_text(truncated)
async def process_message(
user_id: int, text: str, bot: Bot, chat_id: int, image: ImageData | None = None
) -> None:
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(user_id)}
)
if not user:
await bot.send_message(chat_id, "Use /apikey first to set your Gemini API key.")
return
if not user.get("geminiApiKey"):
await bot.send_message(chat_id, "Use /apikey first to set your Gemini API key.")
return
if not user.get("activeChatId"):
await bot.send_message(chat_id, "Use /new first to create a chat.")
return
active_chat_id = user["activeChatId"]
api_key = user["geminiApiKey"]
model_name = user.get("model", "gemini-3-pro-preview")
await convex.mutation(
"messages:create",
{
"chatId": active_chat_id,
"role": "user",
"content": text,
"source": "telegram",
},
)
assistant_message_id = await convex.mutation(
"messages:create",
{
"chatId": active_chat_id,
"role": "assistant",
"content": "",
"source": "telegram",
"isStreaming": True,
},
)
history = await convex.query(
"messages:getHistoryForAI", {"chatId": active_chat_id, "limit": 50}
)
text_agent = create_text_agent(
api_key=api_key, model_name=model_name, system_prompt=user.get("systemPrompt")
)
processing_msg = await bot.send_message(chat_id, "...")
state = StreamingState(bot, chat_id, processing_msg)
try:
await state.start_typing()
async def on_chunk(content: str) -> None:
await state.update_message(content)
await convex.mutation(
"messages:update",
{"messageId": assistant_message_id, "content": content},
)
final_answer = await stream_response(
text_agent, text, history[:-2], on_chunk, image=image
)
await state.flush()
full_history = [*history[:-1], {"role": "assistant", "content": final_answer}]
follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite")
follow_up_prompt = user.get("followUpPrompt")
follow_up_agent = create_follow_up_agent(
api_key=api_key, model_name=follow_up_model, system_prompt=follow_up_prompt
)
follow_ups = await get_follow_ups(follow_up_agent, full_history, image=image)
await state.stop_typing()
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": final_answer,
"followUpOptions": follow_ups,
"isStreaming": False,
},
)
with contextlib.suppress(Exception):
await processing_msg.delete()
keyboard = make_follow_up_keyboard(follow_ups)
await send_long_message(bot, chat_id, final_answer, keyboard)
except Exception as e: # noqa: BLE001
await state.stop_typing()
error_msg = f"Error: {e}"
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": error_msg,
"isStreaming": False,
},
)
with contextlib.suppress(Exception):
await processing_msg.edit_text(html.quote(error_msg[:TELEGRAM_MAX_LENGTH]))
async def send_to_telegram(user_id: int, text: str, bot: Bot) -> None:
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(user_id)}
)
if not user or not user.get("telegramChatId"):
return
tg_chat_id = user["telegramChatId"]
await bot.send_message(
tg_chat_id, f"📱 {html.quote(text)}", reply_markup=ReplyKeyboardRemove()
)
@router.message(F.text & ~F.text.startswith("/"))
async def on_text_message(message: types.Message, bot: Bot) -> None:
if not message.from_user or not message.text:
return
await convex.mutation(
"users:getOrCreate",
{
"telegramId": ConvexInt64(message.from_user.id),
"telegramChatId": ConvexInt64(message.chat.id),
},
)
await process_message(message.from_user.id, message.text, bot, message.chat.id)
@router.message(F.photo)
async def on_photo_message(message: types.Message, bot: Bot) -> None:
if not message.from_user or not message.photo:
return
await convex.mutation(
"users:getOrCreate",
{
"telegramId": ConvexInt64(message.from_user.id),
"telegramChatId": ConvexInt64(message.chat.id),
},
)
caption = message.caption or "Process the image according to your task"
photo = message.photo[-1]
file = await bot.get_file(photo.file_id)
if not file.file_path:
await message.answer("Failed to get photo.")
return
buffer = io.BytesIO()
await bot.download_file(file.file_path, buffer)
image_bytes = buffer.getvalue()
ext = file.file_path.rsplit(".", 1)[-1].lower()
media_type = f"image/{ext}" if ext in ("png", "gif", "webp") else "image/jpeg"
image = ImageData(data=image_bytes, media_type=media_type)
await process_message(
message.from_user.id, caption, bot, message.chat.id, image=image
)

View File

@@ -3,7 +3,23 @@ from aiogram.filters import CommandStart
router = Router()
WELCOME_MESSAGE = """
<b>Welcome to AI Chat!</b>
Get started:
1. /apikey YOUR_KEY — Set your Gemini API key
2. /new — Create a new chat and get your Watch URL
Commands:
• /clear — Clear chat history
• /prompt — Set custom system prompt
• /model — Change AI model
• /presets — Show available presets
Get your API key at https://aistudio.google.com/apikey
""".strip()
@router.message(CommandStart())
async def on_start(message: types.Message) -> None:
await message.answer("hi")
await message.answer(WELCOME_MESSAGE, parse_mode="HTML")

View File

View File

@@ -0,0 +1,21 @@
from .agent import (
ImageData,
StreamCallback,
create_follow_up_agent,
create_text_agent,
get_follow_ups,
stream_response,
)
from .prompts import DEFAULT_FOLLOW_UP, PRESETS, SUMMARIZE_PROMPT
__all__ = [
"DEFAULT_FOLLOW_UP",
"PRESETS",
"SUMMARIZE_PROMPT",
"ImageData",
"StreamCallback",
"create_follow_up_agent",
"create_text_agent",
"get_follow_ups",
"stream_response",
]

View File

@@ -0,0 +1,115 @@
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from pydantic_ai import (
Agent,
BinaryContent,
ModelMessage,
ModelRequest,
ModelResponse,
TextPart,
UserPromptPart,
)
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.providers.google import GoogleProvider
from .models import FollowUpOptions
from .prompts import DEFAULT_FOLLOW_UP
StreamCallback = Callable[[str], Awaitable[None]]
@dataclass
class ImageData:
data: bytes
media_type: str
LATEX_INSTRUCTION = "For math, use LaTeX: $...$ inline, $$...$$ display."
DEFAULT_SYSTEM_PROMPT = (
"You are a helpful AI assistant. Provide clear, concise answers."
)
def create_text_agent(
api_key: str,
model_name: str = "gemini-3-pro-preview",
system_prompt: str | None = None,
) -> Agent[None, str]:
provider = GoogleProvider(api_key=api_key)
model = GoogleModel(model_name, provider=provider)
base_prompt = system_prompt or DEFAULT_SYSTEM_PROMPT
full_prompt = f"{base_prompt} {LATEX_INSTRUCTION}"
return Agent(model, system_prompt=full_prompt)
def create_follow_up_agent(
api_key: str,
model_name: str = "gemini-2.5-flash-lite",
system_prompt: str | None = None,
) -> Agent[None, FollowUpOptions]:
provider = GoogleProvider(api_key=api_key)
model = GoogleModel(model_name, provider=provider)
prompt = system_prompt or DEFAULT_FOLLOW_UP
return Agent(model, output_type=FollowUpOptions, system_prompt=prompt)
def build_message_history(history: list[dict[str, str]]) -> list[ModelMessage]:
messages: list[ModelMessage] = []
for msg in history:
if msg["role"] == "user":
messages.append(
ModelRequest(parts=[UserPromptPart(content=msg["content"])])
)
else:
messages.append(ModelResponse(parts=[TextPart(content=msg["content"])]))
return messages
async def stream_response( # noqa: PLR0913
text_agent: Agent[None, str],
message: str,
history: list[dict[str, str]] | None = None,
on_chunk: StreamCallback | None = None,
image: ImageData | None = None,
images: list[ImageData] | None = None,
) -> str:
message_history = build_message_history(history) if history else None
all_images = images or ([image] if image else [])
if all_images:
prompt: list[str | BinaryContent] = [message]
prompt.extend(
BinaryContent(data=img.data, media_type=img.media_type)
for img in all_images
)
else:
prompt = message # type: ignore[assignment]
stream = text_agent.run_stream(prompt, message_history=message_history)
async with stream as result:
async for text in result.stream_text():
if on_chunk:
await on_chunk(text)
return await result.get_output()
async def get_follow_ups(
follow_up_agent: Agent[None, FollowUpOptions],
history: list[dict[str, str]],
image: ImageData | None = None,
) -> list[str]:
message_history = build_message_history(history) if history else None
if image:
prompt: list[str | BinaryContent] = [
"Suggest follow-up options based on this conversation and image.",
BinaryContent(data=image.data, media_type=image.media_type),
]
else:
prompt = "Suggest follow-up questions based on this conversation." # type: ignore[assignment]
result = await follow_up_agent.run(prompt, message_history=message_history)
return result.output["options"]

View File

@@ -0,0 +1,10 @@
from typing import TypedDict
class AIResponse(TypedDict):
answer: str
follow_up_options: list[str]
class FollowUpOptions(TypedDict):
options: list[str]

View File

@@ -0,0 +1,37 @@
EXAM_SYSTEM = """You help solve problem sets and exams.
When you receive an IMAGE with problems:
- Give HINTS in Russian for each problem
- Focus on key insights and potential difficulties,
give all formulas that will be helpful
- Be quite concise, but include all needed hints - this will be viewed on Apple Watch
- Format: info needed to solve each problem or "unstuck" while solving
When asked for DETAILS on a specific problem (or a problem number):
- Provide full structured solution in English
- Academic style, as it would be written in a notebook
- Step by step, clean, no fluff"""
EXAM_FOLLOW_UP = """You see a problem set image. List available problem numbers.
Output only the numbers that exist in the image, like: 1, 2, 3, 4, 5
If problems have letters (a, b, c), list them as: 1a, 1b, 2a, etc.
Keep it minimal - just the identifiers.
Then, if applicable, output some possible followups of conversation"""
DEFAULT_FOLLOW_UP = (
"Based on the conversation, suggest 3 short follow-up questions "
"the user might want to ask. Be concise, each under 50 chars."
)
SUMMARIZE_PROMPT = """You are summarize agent. You may receive:
1. Images
2. Conversation history showing what was discussed/solved
Summarize VERY briefly:
- Which problems were solved
- Key results or answers found
- What's left to do
Max 2-3 sentences. This is for Apple Watch display."""
PRESETS: dict[str, tuple[str, str]] = {"exam": (EXAM_SYSTEM, EXAM_FOLLOW_UP)}

View File

@@ -0,0 +1,3 @@
from .generator import generate_mnemonic
__all__ = ["generate_mnemonic"]

View File

@@ -0,0 +1,8 @@
from xkcdpass import xkcd_password as xp
_wordfile = xp.locate_wordfile()
_wordlist = xp.generate_wordlist(wordfile=_wordfile, min_length=4, max_length=6)
def generate_mnemonic(word_count: int = 3, separator: str = "-") -> str:
return xp.generate_xkcdpassword(_wordlist, numwords=word_count, delimiter=separator)

58
backend/src/bot/sync.py Normal file
View File

@@ -0,0 +1,58 @@
import asyncio
from aiogram import Bot
from bot.handlers.message.handler import process_message_from_web
from utils import env
from utils.convex import ConvexClient
from utils.logging import logger
convex = ConvexClient(env.convex_url)
background_tasks = set()
async def start_sync_listener(bot: Bot) -> None:
logger.info("Starting Convex sync listener...")
processed_ids: set[str] = set()
sub = convex.subscribe("pendingGenerations:list", {})
try:
async for pending_list in sub:
for item in pending_list:
item_id = item["_id"]
if item_id in processed_ids:
continue
processed_ids.add(item_id)
logger.info(f"Processing pending generation: {item_id}")
task = asyncio.create_task(
handle_pending_generation(bot, item, item_id)
)
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
except asyncio.CancelledError:
logger.info("Sync listener cancelled")
raise
except Exception as e: # noqa: BLE001
logger.error(f"Sync listener error: {e}")
finally:
sub.unsubscribe()
async def handle_pending_generation(bot: Bot, item: dict, item_id: str) -> None:
try:
await process_message_from_web(
convex_user_id=item["userId"],
text=item["userMessage"],
bot=bot,
convex_chat_id=item["chatId"],
)
except Exception as e: # noqa: BLE001
logger.error(f"Error processing {item_id}: {e}")
finally:
await convex.mutation("pendingGenerations:remove", {"id": item_id})

View File

@@ -0,0 +1,3 @@
from .client import ConvexClient
__all__ = ["ConvexClient"]

View File

@@ -0,0 +1,21 @@
import asyncio
from typing import Any
from convex import ConvexClient as SyncConvexClient
class ConvexClient:
def __init__(self, url: str) -> None:
self._client = SyncConvexClient(url)
async def query(self, name: str, args: dict[str, Any] | None = None) -> Any: # noqa: ANN401
return await asyncio.to_thread(self._client.query, name, args or {})
async def mutation(self, name: str, args: dict[str, Any] | None = None) -> Any: # noqa: ANN401
return await asyncio.to_thread(self._client.mutation, name, args or {})
async def action(self, name: str, args: dict[str, Any] | None = None) -> Any: # noqa: ANN401
return await asyncio.to_thread(self._client.action, name, args or {})
def subscribe(self, name: str, args: dict[str, Any] | None = None) -> Any: # noqa: ANN401
return self._client.subscribe(name, args or {})

View File

@@ -6,6 +6,10 @@ class BotSettings(BaseSettings):
token: SecretStr
class SiteSettings(BaseSettings):
url: str = Field(default="https://localhost")
class LogSettings(BaseSettings):
level: str = "INFO"
level_external: str = "WARNING"
@@ -15,6 +19,7 @@ class LogSettings(BaseSettings):
class Settings(BaseSettings):
bot: BotSettings
site: SiteSettings
log: LogSettings
convex_url: str = Field(validation_alias=AliasChoices("CONVEX_SELF_HOSTED_URL"))