feat(bot): add chat proxy

This commit is contained in:
h
2026-01-29 16:15:52 +01:00
parent fc19240952
commit 0e14e945bd
5 changed files with 221 additions and 5 deletions

View File

@@ -18,6 +18,7 @@ from aiogram.types import (
)
from convex import ConvexInt64
from bot.handlers.proxy.handler import get_proxy_config
from bot.modules.ai import (
SUMMARIZE_PROMPT,
AgentDeps,
@@ -161,6 +162,39 @@ class StreamingState:
await self.update_message(self.pending_content, force=True)
class ProxyStreamingState:
def __init__(self, bot: Bot, chat_id: int, message: types.Message) -> None:
self.bot = bot
self.chat_id = chat_id
self.message = message
self.last_edit_time = 0.0
self.last_content = ""
self.pending_content: str | None = None
async def update_message(self, content: str, *, force: bool = False) -> None:
if content == self.last_content:
return
if len(content) > TELEGRAM_MAX_LENGTH:
display_content = content[: TELEGRAM_MAX_LENGTH - 3] + "..."
else:
display_content = content
now = time.monotonic()
if force or (now - self.last_edit_time) >= EDIT_THROTTLE_SECONDS:
with contextlib.suppress(Exception):
await self.message.edit_text(display_content)
self.last_edit_time = now
self.last_content = content
self.pending_content = None
else:
self.pending_content = content
async def flush(self) -> None:
if self.pending_content and self.pending_content != self.last_content:
await self.update_message(self.pending_content, force=True)
async def send_long_message(
bot: Bot, chat_id: int, text: str, reply_markup: ReplyKeyboardMarkup | None = None
) -> None:
@@ -354,8 +388,14 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
await processing_msg.edit_text(truncated)
async def process_message(
user_id: int, text: str, bot: Bot, chat_id: int, *, skip_user_message: bool = False
async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915
user_id: int,
text: str,
bot: Bot,
chat_id: int,
*,
skip_user_message: bool = False,
skip_proxy_user_message: bool = False,
) -> None:
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(user_id)}
@@ -378,6 +418,15 @@ async def process_message(
model_name = user.get("model", "gemini-3-pro-preview")
convex_user_id = user["_id"]
proxy_config = get_proxy_config(chat_id)
proxy_state: ProxyStreamingState | None = None
if proxy_config and not skip_proxy_user_message:
with contextlib.suppress(Exception):
await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, f"👤 {text}"
)
rag_connections = await convex.query(
"ragConnections:getActiveForUser", {"userId": convex_user_id}
)
@@ -432,11 +481,21 @@ async def process_message(
processing_msg = await bot.send_message(chat_id, "...")
state = StreamingState(bot, chat_id, processing_msg)
if proxy_config:
proxy_processing_msg = await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, "..."
)
proxy_state = ProxyStreamingState(
proxy_config.proxy_bot, proxy_config.target_chat_id, proxy_processing_msg
)
try:
await state.start_typing()
async def on_chunk(content: str) -> None:
await state.update_message(content)
if proxy_state:
await proxy_state.update_message(content)
await convex.mutation(
"messages:update",
{"messageId": assistant_message_id, "content": content},
@@ -454,6 +513,8 @@ async def process_message(
)
await state.flush()
if proxy_state:
await proxy_state.flush()
full_history = [*history[:-1], {"role": "assistant", "content": final_answer}]
follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite")
@@ -481,6 +542,15 @@ async def process_message(
keyboard = make_follow_up_keyboard(follow_ups)
await send_long_message(bot, chat_id, final_answer, keyboard)
if proxy_state and proxy_config:
with contextlib.suppress(Exception):
await proxy_state.message.delete()
parts = split_message(final_answer)
for part in parts:
await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, part
)
except Exception as e: # noqa: BLE001
await state.stop_typing()
error_msg = f"Error: {e}"
@@ -494,6 +564,9 @@ async def process_message(
)
with contextlib.suppress(Exception):
await processing_msg.edit_text(html.quote(error_msg[:TELEGRAM_MAX_LENGTH]))
if proxy_state:
with contextlib.suppress(Exception):
await proxy_state.message.edit_text(error_msg[:TELEGRAM_MAX_LENGTH])
async def send_to_telegram(user_id: int, text: str, bot: Bot) -> None:
@@ -550,6 +623,7 @@ async def on_album_message(
images_base64: list[str] = []
images_media_types: list[str] = []
photos_bytes: list[bytes] = []
for msg in album:
if not msg.photo:
@@ -563,6 +637,7 @@ async def on_album_message(
buffer = io.BytesIO()
await bot.download_file(file.file_path, buffer)
image_bytes = buffer.getvalue()
photos_bytes.append(image_bytes)
images_base64.append(base64.b64encode(image_bytes).decode())
ext = file.file_path.rsplit(".", 1)[-1].lower()
@@ -573,6 +648,22 @@ async def on_album_message(
await message.answer("Failed to get photos.")
return
proxy_config = get_proxy_config(message.chat.id)
if proxy_config:
with contextlib.suppress(Exception):
media = []
for i, photo_bytes in enumerate(photos_bytes):
cap = f"👤 {caption}" if i == 0 else None
media.append(
InputMediaPhoto(
media=BufferedInputFile(photo_bytes, f"photo_{i}.jpg"),
caption=cap,
)
)
await proxy_config.proxy_bot.send_media_group(
proxy_config.target_chat_id, media
)
active_chat_id = user["activeChatId"]
await convex.mutation(
"messages:create",
@@ -587,7 +678,12 @@ async def on_album_message(
)
await process_message(
message.from_user.id, caption, bot, message.chat.id, skip_user_message=True
message.from_user.id,
caption,
bot,
message.chat.id,
skip_user_message=True,
skip_proxy_user_message=True,
)
@@ -628,6 +724,15 @@ async def on_photo_message(message: types.Message, bot: Bot) -> None:
ext = file.file_path.rsplit(".", 1)[-1].lower()
media_type = f"image/{ext}" if ext in ("png", "gif", "webp") else "image/jpeg"
proxy_config = get_proxy_config(message.chat.id)
if proxy_config:
with contextlib.suppress(Exception):
await proxy_config.proxy_bot.send_photo(
proxy_config.target_chat_id,
BufferedInputFile(image_bytes, "photo.jpg"),
caption=f"👤 {caption}",
)
active_chat_id = user["activeChatId"]
await convex.mutation(
"messages:create",
@@ -642,5 +747,10 @@ async def on_photo_message(message: types.Message, bot: Bot) -> None:
)
await process_message(
message.from_user.id, caption, bot, message.chat.id, skip_user_message=True
message.from_user.id,
caption,
bot,
message.chat.id,
skip_user_message=True,
skip_proxy_user_message=True,
)