import json from datetime import datetime from typing import Annotated, Any import asyncpg from dishka.integrations.fastapi import DishkaRoute, FromDishka from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from utils.jobs import enqueue router = APIRouter(prefix="/api", tags=["backfill"], route_class=DishkaRoute) class BackfillRequest(BaseModel): account_id: int chat_id: int media: bool = False class FetchMediaRequest(BaseModel): account_id: int chat_id: int message_id: int class EnqueueResponse(BaseModel): job_id: int class JobView(BaseModel): id: int account_id: int kind: str status: str params: dict[str, Any] cursor: dict[str, Any] | None progress: dict[str, Any] flood_waits: int attempts: int error: str | None created_at: datetime started_at: datetime | None finished_at: datetime | None def _to_view(row: asyncpg.Record) -> JobView: data = dict(row) data["params"] = json.loads(data["params"]) data["progress"] = json.loads(data["progress"]) data["cursor"] = json.loads(data["cursor"]) if data["cursor"] is not None else None return JobView(**data) @router.post("/backfill", status_code=201) async def enqueue_backfill( pool: FromDishka[asyncpg.Pool], body: BackfillRequest ) -> EnqueueResponse: job_id = await enqueue( pool, body.account_id, "backfill", {"chat_id": body.chat_id, "media": body.media}, ) return EnqueueResponse(job_id=job_id) @router.post("/media/fetch", status_code=201) async def enqueue_fetch_media( pool: FromDishka[asyncpg.Pool], body: FetchMediaRequest ) -> EnqueueResponse: job_id = await enqueue( pool, body.account_id, "fetch_media", {"chat_id": body.chat_id, "message_id": body.message_id}, ) return EnqueueResponse(job_id=job_id) @router.get("/jobs") async def list_jobs( pool: FromDishka[asyncpg.Pool], account_id: Annotated[int, Query()], status: Annotated[str | None, Query()] = None, ) -> list[JobView]: if status is None: rows = await pool.fetch( "SELECT * FROM jobs WHERE account_id = $1 ORDER BY id DESC", account_id ) else: rows = await pool.fetch( "SELECT * FROM jobs WHERE account_id = $1 AND status = $2 ORDER BY id DESC", account_id, status, ) return [_to_view(row) for row in rows] @router.get("/jobs/{job_id}") async def get_job(pool: FromDishka[asyncpg.Pool], job_id: int) -> JobView: row = await pool.fetchrow("SELECT * FROM jobs WHERE id = $1", job_id) if row is None: raise HTTPException(status_code=404, detail="job not found") return _to_view(row)