Files
beavergram/backend/src/utils/read/media.py
T

56 lines
1.7 KiB
Python

import asyncpg
from utils.read.models import MediaVersionView, MediaView
_MEDIA_COLS = (
"id, account_id, chat_id, message_id, kind, storage_key, file_size, "
"mime, ttl_seconds, downloaded, extracted_text, created_at"
)
_VERSION_COLS = "id, kind, storage_key, file_size, mime, observed_at"
async def get_media(pool: asyncpg.Pool, media_id: int) -> MediaView | None:
row = await pool.fetchrow(
f"SELECT {_MEDIA_COLS} FROM media WHERE id = $1", # noqa: S608
media_id,
)
return MediaView(**dict(row)) if row else None
async def get_message_media(
pool: asyncpg.Pool, account_id: int, chat_id: int, message_id: int
) -> MediaView | None:
row = await pool.fetchrow(
f"SELECT {_MEDIA_COLS} FROM media " # noqa: S608
"WHERE account_id = $1 AND chat_id = $2 AND message_id = $3",
account_id,
chat_id,
message_id,
)
return MediaView(**dict(row)) if row else None
async def get_media_versions(
pool: asyncpg.Pool, account_id: int, chat_id: int, message_id: int
) -> list[MediaVersionView]:
rows = await pool.fetch(
f"SELECT {_VERSION_COLS} FROM media_versions " # noqa: S608
"WHERE account_id = $1 AND chat_id = $2 AND message_id = $3 "
"ORDER BY observed_at",
account_id,
chat_id,
message_id,
)
return [MediaVersionView(**dict(row)) for row in rows]
async def get_media_version(
pool: asyncpg.Pool, version_id: int
) -> MediaVersionView | None:
row = await pool.fetchrow(
f"SELECT {_VERSION_COLS} FROM media_versions WHERE id = $1", # noqa: S608
version_id,
)
return MediaVersionView(**dict(row)) if row else None