diff --git a/backend/src/bot/handlers/__init__.py b/backend/src/bot/handlers/__init__.py index 86e00af..3e2fe58 100644 --- a/backend/src/bot/handlers/__init__.py +++ b/backend/src/bot/handlers/__init__.py @@ -1,6 +1,6 @@ from aiogram import Router -from . import apikey, chat, initialize, message, rag, start +from . import apikey, chat, initialize, message, proxy, rag, start router = Router() @@ -10,5 +10,6 @@ router.include_routers( apikey.router, chat.router, rag.router, + proxy.router, message.router, ) diff --git a/backend/src/bot/handlers/initialize/initializer.py b/backend/src/bot/handlers/initialize/initializer.py index 473cc50..accd0f8 100644 --- a/backend/src/bot/handlers/initialize/initializer.py +++ b/backend/src/bot/handlers/initialize/initializer.py @@ -17,6 +17,7 @@ async def startup(bot: Bot) -> None: types.BotCommand(command="/model", description="Change AI model"), types.BotCommand(command="/presets", description="Show prompt presets"), types.BotCommand(command="/preset", description="Apply a preset"), + types.BotCommand(command="/proxy", description="Proxy chat to another bot"), ] ) logger.info(f"[green]Started as[/] @{(await bot.me()).username}") diff --git a/backend/src/bot/handlers/message/handler.py b/backend/src/bot/handlers/message/handler.py index 9519fff..d44a3bb 100644 --- a/backend/src/bot/handlers/message/handler.py +++ b/backend/src/bot/handlers/message/handler.py @@ -18,6 +18,7 @@ from aiogram.types import ( ) from convex import ConvexInt64 +from bot.handlers.proxy.handler import get_proxy_config from bot.modules.ai import ( SUMMARIZE_PROMPT, AgentDeps, @@ -161,6 +162,39 @@ class StreamingState: await self.update_message(self.pending_content, force=True) +class ProxyStreamingState: + def __init__(self, bot: Bot, chat_id: int, message: types.Message) -> None: + self.bot = bot + self.chat_id = chat_id + self.message = message + self.last_edit_time = 0.0 + self.last_content = "" + self.pending_content: str | None = None + + async def update_message(self, content: str, *, force: bool = False) -> None: + if content == self.last_content: + return + + if len(content) > TELEGRAM_MAX_LENGTH: + display_content = content[: TELEGRAM_MAX_LENGTH - 3] + "..." + else: + display_content = content + + now = time.monotonic() + if force or (now - self.last_edit_time) >= EDIT_THROTTLE_SECONDS: + with contextlib.suppress(Exception): + await self.message.edit_text(display_content) + self.last_edit_time = now + self.last_content = content + self.pending_content = None + else: + self.pending_content = content + + async def flush(self) -> None: + if self.pending_content and self.pending_content != self.last_content: + await self.update_message(self.pending_content, force=True) + + async def send_long_message( bot: Bot, chat_id: int, text: str, reply_markup: ReplyKeyboardMarkup | None = None ) -> None: @@ -354,8 +388,14 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915 await processing_msg.edit_text(truncated) -async def process_message( - user_id: int, text: str, bot: Bot, chat_id: int, *, skip_user_message: bool = False +async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915 + user_id: int, + text: str, + bot: Bot, + chat_id: int, + *, + skip_user_message: bool = False, + skip_proxy_user_message: bool = False, ) -> None: user = await convex.query( "users:getByTelegramId", {"telegramId": ConvexInt64(user_id)} @@ -378,6 +418,15 @@ async def process_message( model_name = user.get("model", "gemini-3-pro-preview") convex_user_id = user["_id"] + proxy_config = get_proxy_config(chat_id) + proxy_state: ProxyStreamingState | None = None + + if proxy_config and not skip_proxy_user_message: + with contextlib.suppress(Exception): + await proxy_config.proxy_bot.send_message( + proxy_config.target_chat_id, f"👤 {text}" + ) + rag_connections = await convex.query( "ragConnections:getActiveForUser", {"userId": convex_user_id} ) @@ -432,11 +481,21 @@ async def process_message( processing_msg = await bot.send_message(chat_id, "...") state = StreamingState(bot, chat_id, processing_msg) + if proxy_config: + proxy_processing_msg = await proxy_config.proxy_bot.send_message( + proxy_config.target_chat_id, "..." + ) + proxy_state = ProxyStreamingState( + proxy_config.proxy_bot, proxy_config.target_chat_id, proxy_processing_msg + ) + try: await state.start_typing() async def on_chunk(content: str) -> None: await state.update_message(content) + if proxy_state: + await proxy_state.update_message(content) await convex.mutation( "messages:update", {"messageId": assistant_message_id, "content": content}, @@ -454,6 +513,8 @@ async def process_message( ) await state.flush() + if proxy_state: + await proxy_state.flush() full_history = [*history[:-1], {"role": "assistant", "content": final_answer}] follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite") @@ -481,6 +542,15 @@ async def process_message( keyboard = make_follow_up_keyboard(follow_ups) await send_long_message(bot, chat_id, final_answer, keyboard) + if proxy_state and proxy_config: + with contextlib.suppress(Exception): + await proxy_state.message.delete() + parts = split_message(final_answer) + for part in parts: + await proxy_config.proxy_bot.send_message( + proxy_config.target_chat_id, part + ) + except Exception as e: # noqa: BLE001 await state.stop_typing() error_msg = f"Error: {e}" @@ -494,6 +564,9 @@ async def process_message( ) with contextlib.suppress(Exception): await processing_msg.edit_text(html.quote(error_msg[:TELEGRAM_MAX_LENGTH])) + if proxy_state: + with contextlib.suppress(Exception): + await proxy_state.message.edit_text(error_msg[:TELEGRAM_MAX_LENGTH]) async def send_to_telegram(user_id: int, text: str, bot: Bot) -> None: @@ -550,6 +623,7 @@ async def on_album_message( images_base64: list[str] = [] images_media_types: list[str] = [] + photos_bytes: list[bytes] = [] for msg in album: if not msg.photo: @@ -563,6 +637,7 @@ async def on_album_message( buffer = io.BytesIO() await bot.download_file(file.file_path, buffer) image_bytes = buffer.getvalue() + photos_bytes.append(image_bytes) images_base64.append(base64.b64encode(image_bytes).decode()) ext = file.file_path.rsplit(".", 1)[-1].lower() @@ -573,6 +648,22 @@ async def on_album_message( await message.answer("Failed to get photos.") return + proxy_config = get_proxy_config(message.chat.id) + if proxy_config: + with contextlib.suppress(Exception): + media = [] + for i, photo_bytes in enumerate(photos_bytes): + cap = f"👤 {caption}" if i == 0 else None + media.append( + InputMediaPhoto( + media=BufferedInputFile(photo_bytes, f"photo_{i}.jpg"), + caption=cap, + ) + ) + await proxy_config.proxy_bot.send_media_group( + proxy_config.target_chat_id, media + ) + active_chat_id = user["activeChatId"] await convex.mutation( "messages:create", @@ -587,7 +678,12 @@ async def on_album_message( ) await process_message( - message.from_user.id, caption, bot, message.chat.id, skip_user_message=True + message.from_user.id, + caption, + bot, + message.chat.id, + skip_user_message=True, + skip_proxy_user_message=True, ) @@ -628,6 +724,15 @@ async def on_photo_message(message: types.Message, bot: Bot) -> None: ext = file.file_path.rsplit(".", 1)[-1].lower() media_type = f"image/{ext}" if ext in ("png", "gif", "webp") else "image/jpeg" + proxy_config = get_proxy_config(message.chat.id) + if proxy_config: + with contextlib.suppress(Exception): + await proxy_config.proxy_bot.send_photo( + proxy_config.target_chat_id, + BufferedInputFile(image_bytes, "photo.jpg"), + caption=f"👤 {caption}", + ) + active_chat_id = user["activeChatId"] await convex.mutation( "messages:create", @@ -642,5 +747,10 @@ async def on_photo_message(message: types.Message, bot: Bot) -> None: ) await process_message( - message.from_user.id, caption, bot, message.chat.id, skip_user_message=True + message.from_user.id, + caption, + bot, + message.chat.id, + skip_user_message=True, + skip_proxy_user_message=True, ) diff --git a/backend/src/bot/handlers/proxy/__init__.py b/backend/src/bot/handlers/proxy/__init__.py new file mode 100644 index 0000000..c33d170 --- /dev/null +++ b/backend/src/bot/handlers/proxy/__init__.py @@ -0,0 +1,3 @@ +from .handler import router + +__all__ = ["router"] diff --git a/backend/src/bot/handlers/proxy/handler.py b/backend/src/bot/handlers/proxy/handler.py new file mode 100644 index 0000000..28086bf --- /dev/null +++ b/backend/src/bot/handlers/proxy/handler.py @@ -0,0 +1,101 @@ +from dataclasses import dataclass + +from aiogram import Bot, Router, types +from aiogram.client.default import DefaultBotProperties +from aiogram.enums import ParseMode +from aiogram.filters import Command + +router = Router() + + +@dataclass +class ProxyConfig: + bot_token: str + target_chat_id: int + proxy_bot: Bot + + +proxy_states: dict[int, ProxyConfig] = {} + + +def get_proxy_config(chat_id: int) -> ProxyConfig | None: + return proxy_states.get(chat_id) + + +async def cleanup_proxy(chat_id: int) -> None: + config = proxy_states.pop(chat_id, None) + if config: + await config.proxy_bot.session.close() + + +@router.message(Command("proxy")) +async def on_proxy(message: types.Message) -> None: # noqa: C901 + if not message.from_user or not message.text: + return + + chat_id = message.chat.id + args = message.text.split(maxsplit=2) + + if len(args) == 1: + config = get_proxy_config(chat_id) + if config: + await message.answer( + f"Proxy active → chat {config.target_chat_id}\n\n" + "Use /proxy deactivate to stop." + ) + else: + await message.answer( + "Usage:\n" + "/proxy BOT_TOKEN CHAT_ID - activate proxy\n" + "/proxy deactivate - stop proxy" + ) + return + + if args[1].lower() == "deactivate": + if chat_id in proxy_states: + await cleanup_proxy(chat_id) + await message.answer("✓ Proxy deactivated.") + else: + await message.answer("No active proxy.") + return + + if len(args) < 3: # noqa: PLR2004 + await message.answer("Usage: /proxy BOT_TOKEN CHAT_ID") + return + + bot_token = args[1] + try: + target_chat_id = int(args[2]) + except ValueError: + await message.answer("Invalid chat ID. Must be a number.") + return + + if chat_id in proxy_states: + await cleanup_proxy(chat_id) + + try: + proxy_bot = Bot( + token=bot_token, default=DefaultBotProperties(parse_mode=ParseMode.HTML) + ) + bot_info = await proxy_bot.get_me() + + try: + await proxy_bot.send_message(target_chat_id, "🔗 Proxy connected") + except Exception as e: # noqa: BLE001 + await proxy_bot.session.close() + await message.answer(f"Cannot send to chat {target_chat_id}: {e}") + return + + proxy_states[chat_id] = ProxyConfig( + bot_token=bot_token, target_chat_id=target_chat_id, proxy_bot=proxy_bot + ) + + await message.answer( + f"✓ Proxy activated via @{bot_info.username}\n" + f"Target: {target_chat_id}\n\n" + "All messages will be forwarded." + ) + await message.delete() + + except Exception as e: # noqa: BLE001 + await message.answer(f"Invalid bot token: {e}")