from io import BytesIO from typing import Any from pyrogram import Client from pyrogram.types import Message from userbot.modules.capture import repository from userbot.modules.capture.context import CaptureContext from utils.policy.models import CaptureToggles _MEDIA_ATTRS = ( "photo", "video", "voice", "video_note", "document", "audio", "animation", "sticker", ) _WEB_PAGE_ATTRS = ("photo", "video", "animation", "document", "audio") def media_object(message: Message) -> tuple[str | None, Any]: for attr in _MEDIA_ATTRS: obj = getattr(message, attr, None) if obj is not None: return attr, obj web_page = getattr(message, "web_page", None) if web_page is not None: for attr in _WEB_PAGE_ATTRS: obj = getattr(web_page, attr, None) if obj is not None: return attr, obj return None, None def self_destruct_ttl(message: Message) -> int | None: _, obj = media_object(message) return getattr(obj, "ttl_seconds", None) if obj is not None else None def media_unique_id(message: Message) -> str | None: _, obj = media_object(message) return getattr(obj, "file_unique_id", None) if obj is not None else None async def capture_media( # noqa: PLR0913 client: Client, message: Message, ctx: CaptureContext, chat_id: int, message_id: int, toggles: CaptureToggles, ) -> None: kind, obj = media_object(message) if obj is None: return unique_id = getattr(obj, "file_unique_id", None) ttl = getattr(obj, "ttl_seconds", None) want = toggles.self_destruct_media if ttl else toggles.media file_size = getattr(obj, "file_size", None) mime = getattr(obj, "mime_type", None) storage_key: str | None = None downloaded = False if want: existing = await repository.current_media( ctx.pool, ctx.account_id, chat_id, message_id ) if ( existing is not None and existing["downloaded"] and existing["unique_id"] == unique_id and existing["storage_key"] is not None ): storage_key = existing["storage_key"] file_size = existing["file_size"] downloaded = True else: target = message if getattr(message, kind or "", None) is obj else obj buffer = await client.download_media(target, in_memory=True) if isinstance(buffer, BytesIO): data = buffer.getvalue() storage_key = ctx.storage.put(data) file_size = len(data) downloaded = True await repository.insert_media( ctx.pool, ctx.account_id, chat_id, message_id, kind or "unknown", storage_key, file_size, mime, ttl, unique_id, downloaded=downloaded, )