feat: log presence, read receipts, group leaves, links, profiles, stories

This commit is contained in:
h
2026-05-29 22:13:59 +02:00
parent bfd16ab02c
commit bcb94b6474
31 changed files with 1298 additions and 19 deletions
@@ -0,0 +1,155 @@
"""phase8 profiles groups stories links
Revision ID: a1d4f7c2e9b8
Revises: f3a8d1c5b7e2
Create Date: 2026-05-29 22:10:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "a1d4f7c2e9b8"
down_revision: str | None = "f3a8d1c5b7e2"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.create_table(
"peers",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("peer_id", sa.BigInteger(), nullable=False),
sa.Column("first_name", sa.String(), nullable=True),
sa.Column("last_name", sa.String(), nullable=True),
sa.Column("username", sa.String(), nullable=True),
sa.Column("phone", sa.String(), nullable=True),
sa.Column("photo_unique_id", sa.String(), nullable=True),
sa.Column("is_deleted_account", sa.Boolean(), nullable=False),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("account_id", "peer_id"),
)
op.create_table(
"peer_history",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("peer_id", sa.BigInteger(), nullable=False),
sa.Column("observed_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("first_name", sa.String(), nullable=True),
sa.Column("last_name", sa.String(), nullable=True),
sa.Column("username", sa.String(), nullable=True),
sa.Column("phone", sa.String(), nullable=True),
sa.Column("photo_unique_id", sa.String(), nullable=True),
sa.Column("is_deleted_account", sa.Boolean(), nullable=False),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint("account_id", "peer_id", "observed_at"),
)
op.create_table(
"avatars",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("owner_id", sa.BigInteger(), nullable=False),
sa.Column("owner_kind", sa.String(), nullable=False),
sa.Column("unique_id", sa.String(), nullable=False),
sa.Column("storage_key", sa.String(), nullable=True),
sa.Column("file_size", sa.BigInteger(), nullable=True),
sa.Column("mime", sa.String(), nullable=True),
sa.Column("downloaded", sa.Boolean(), nullable=False),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column(
"first_seen_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("account_id", "owner_id", "unique_id"),
)
op.create_table(
"chat_history",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("chat_id", sa.BigInteger(), nullable=False),
sa.Column("message_id", sa.BigInteger(), nullable=False),
sa.Column("event", sa.String(), nullable=False),
sa.Column("title", sa.String(), nullable=True),
sa.Column("photo_unique_id", sa.String(), nullable=True),
sa.Column("actor_id", sa.BigInteger(), nullable=True),
sa.Column("ts", sa.DateTime(timezone=True), nullable=False),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint("account_id", "chat_id", "message_id"),
)
op.create_table(
"memberships",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("chat_id", sa.BigInteger(), nullable=False),
sa.Column("message_id", sa.BigInteger(), nullable=False),
sa.Column("user_id", sa.BigInteger(), nullable=False),
sa.Column("event", sa.String(), nullable=False),
sa.Column("actor_id", sa.BigInteger(), nullable=True),
sa.Column("ts", sa.DateTime(timezone=True), nullable=False),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint("account_id", "chat_id", "message_id", "user_id"),
)
op.create_table(
"stories",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("peer_id", sa.BigInteger(), nullable=False),
sa.Column("story_id", sa.BigInteger(), nullable=False),
sa.Column("date", sa.DateTime(timezone=True), nullable=True),
sa.Column("expire_date", sa.DateTime(timezone=True), nullable=True),
sa.Column("caption", sa.String(), nullable=True),
sa.Column("media_kind", sa.String(), nullable=True),
sa.Column("storage_key", sa.String(), nullable=True),
sa.Column("file_size", sa.BigInteger(), nullable=True),
sa.Column("downloaded", sa.Boolean(), nullable=False),
sa.Column("views", sa.Integer(), nullable=True),
sa.Column("pinned", sa.Boolean(), nullable=False),
sa.Column("deleted", sa.Boolean(), nullable=False),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column(
"observed_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("account_id", "peer_id", "story_id"),
)
op.create_table(
"links",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("chat_id", sa.BigInteger(), nullable=False),
sa.Column("message_id", sa.BigInteger(), nullable=False),
sa.Column("position", sa.Integer(), nullable=False),
sa.Column("url", sa.String(), nullable=False),
sa.Column("kind", sa.String(), nullable=False),
sa.Column("web_url", sa.String(), nullable=True),
sa.Column("web_title", sa.String(), nullable=True),
sa.Column("web_description", sa.String(), nullable=True),
sa.Column("web_site_name", sa.String(), nullable=True),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint("account_id", "chat_id", "message_id", "position"),
)
def downgrade() -> None:
op.drop_table("links")
op.drop_table("stories")
op.drop_table("memberships")
op.drop_table("chat_history")
op.drop_table("avatars")
op.drop_table("peer_history")
op.drop_table("peers")
@@ -0,0 +1,67 @@
"""presence hypertable
Revision ID: e5c1f0a72b9d
Revises: d4b9f2e6a1c7
Create Date: 2026-05-29 21:00:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "e5c1f0a72b9d"
down_revision: str | None = "d4b9f2e6a1c7"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.create_table(
"presence",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("peer_id", sa.BigInteger(), nullable=False),
sa.Column("ts", sa.DateTime(timezone=True), nullable=False),
sa.Column("status", sa.String(), nullable=False),
sa.Column("last_online_date", sa.DateTime(timezone=True), nullable=True),
sa.Column("next_offline_date", sa.DateTime(timezone=True), nullable=True),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint("account_id", "peer_id", "ts"),
)
op.execute(
"SELECT create_hypertable('presence', by_range('ts', INTERVAL '1 week'))"
)
op.execute(
"ALTER TABLE presence SET ("
"timescaledb.enable_columnstore = true, "
"timescaledb.segmentby = 'peer_id', "
"timescaledb.orderby = 'ts DESC')"
)
op.execute("CALL add_columnstore_policy('presence', after => INTERVAL '30 days')")
op.execute(
"CREATE MATERIALIZED VIEW presence_hourly "
"WITH (timescaledb.continuous) AS "
"SELECT time_bucket('1 hour', ts) AS bucket, "
"account_id, peer_id, "
"count(*) AS samples, "
"count(*) FILTER (WHERE status = 'online') AS online_samples, "
"max(ts) AS last_seen "
"FROM presence "
"GROUP BY bucket, account_id, peer_id "
"WITH NO DATA"
)
op.execute(
"SELECT add_continuous_aggregate_policy('presence_hourly', "
"start_offset => INTERVAL '3 hours', "
"end_offset => INTERVAL '1 hour', "
"schedule_interval => INTERVAL '1 hour')"
)
def downgrade() -> None:
op.execute("DROP MATERIALIZED VIEW IF EXISTS presence_hourly")
op.drop_table("presence")
@@ -0,0 +1,49 @@
"""read_receipts hypertable
Revision ID: f3a8d1c5b7e2
Revises: e5c1f0a72b9d
Create Date: 2026-05-29 21:30:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
revision: str = "f3a8d1c5b7e2"
down_revision: str | None = "e5c1f0a72b9d"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.create_table(
"read_receipts",
sa.Column("account_id", sa.Integer(), nullable=False),
sa.Column("chat_id", sa.BigInteger(), nullable=False),
sa.Column("reader_id", sa.BigInteger(), nullable=False),
sa.Column("ts", sa.DateTime(timezone=True), nullable=False),
sa.Column("kind", sa.String(), nullable=False),
sa.Column("message_id", sa.BigInteger(), nullable=False),
sa.Column("raw", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint("account_id", "chat_id", "message_id", "ts"),
)
op.execute(
"SELECT create_hypertable('read_receipts', by_range('ts', INTERVAL '1 week'))"
)
op.execute(
"ALTER TABLE read_receipts SET ("
"timescaledb.enable_columnstore = true, "
"timescaledb.segmentby = 'chat_id', "
"timescaledb.orderby = 'ts DESC')"
)
op.execute(
"CALL add_columnstore_policy('read_receipts', after => INTERVAL '30 days')"
)
def downgrade() -> None:
op.drop_table("read_receipts")
+9 -2
View File
@@ -1,3 +1,10 @@
from userbot.handlers import deletes, edits, messages, raw
from userbot.handlers import deletes, edits, messages, presence, raw, stories
handlers = messages.handlers + edits.handlers + deletes.handlers + raw.handlers
handlers = (
messages.handlers
+ edits.handlers
+ deletes.handlers
+ presence.handlers
+ stories.handlers
+ raw.handlers
)
+26
View File
@@ -0,0 +1,26 @@
from datetime import UTC, datetime
from pyrogram.types import User
from userbot import PyroClient
from userbot.modules.presence import repository
@PyroClient.on_user_status()
async def on_user_status(client: PyroClient, user: User) -> None:
ctx = client.capture
if ctx is None or user.status is None:
return
await repository.insert_status(
ctx.pool,
ctx.account_id,
user.id,
datetime.now(UTC),
user.status.name.lower(),
user.last_online_date,
user.next_offline_date,
str(user.raw),
)
handlers = on_user_status.handlers
+11 -1
View File
@@ -6,14 +6,24 @@ from userbot import PyroClient
from userbot.handlers.raw import (
contacts,
dialog_filters,
profiles,
reactions,
read_contents,
read_receipts,
transcribed,
)
RawHandler = Callable[..., Awaitable[None]]
_MODULES = (contacts, dialog_filters, reactions, read_contents, transcribed)
_MODULES = (
contacts,
dialog_filters,
profiles,
reactions,
read_contents,
read_receipts,
transcribed,
)
_REGISTRY: dict[type, RawHandler] = {
update_type: module.handle for module in _MODULES for update_type in module.HANDLES
}
@@ -0,0 +1,73 @@
from dataclasses import replace
from pyrogram import raw
from userbot import PyroClient
from userbot.modules.avatars import capture_avatar
from userbot.modules.profiles import active_username, snapshot_from_user
from userbot.modules.profiles.parse import ProfileFields
from userbot.modules.profiles.repository import get_peer, write_profile
HANDLES = (raw.types.UpdateUserName, raw.types.UpdateUser, raw.types.UpdateUserPhone)
async def _handle_user(
client: PyroClient, update: raw.types.UpdateUser, users: dict
) -> None:
ctx = client.capture
if ctx is None:
return
raw_user = users.get(update.user_id)
if raw_user is None:
return
fields, photo_file_id, photo_unique_id = snapshot_from_user(client, raw_user)
current = await get_peer(ctx.pool, ctx.account_id, update.user_id)
if current == fields:
return
await write_profile(ctx.pool, ctx.account_id, update.user_id, fields, str(raw_user))
changed_photo = current is None or current.photo_unique_id != photo_unique_id
if photo_file_id and photo_unique_id and changed_photo:
await capture_avatar(
client,
ctx,
update.user_id,
"peer",
photo_file_id,
photo_unique_id,
str(raw_user),
)
async def _merge_partial(
client: PyroClient, peer_id: int, overrides: dict, raw_str: str
) -> None:
ctx = client.capture
if ctx is None:
return
current = await get_peer(ctx.pool, ctx.account_id, peer_id)
merged = replace(current or ProfileFields(), **overrides)
if merged == current:
return
await write_profile(ctx.pool, ctx.account_id, peer_id, merged, raw_str)
async def handle(
client: PyroClient, update: raw.base.Update, users: dict, _chats: dict
) -> None:
if isinstance(update, raw.types.UpdateUser):
await _handle_user(client, update, users)
elif isinstance(update, raw.types.UpdateUserName):
await _merge_partial(
client,
update.user_id,
{
"first_name": update.first_name or None,
"last_name": update.last_name or None,
"username": active_username(update.usernames),
},
str(update),
)
elif isinstance(update, raw.types.UpdateUserPhone):
await _merge_partial(
client, update.user_id, {"phone": update.phone or None}, str(update)
)
@@ -1,7 +1,10 @@
from datetime import UTC, datetime
from pyrogram import raw, utils
from userbot import PyroClient
from userbot.modules.capture.chat_meta import meta_from_chat_id
from userbot.modules.read_receipts import repository as receipts
from userbot.modules.stt import repository
from userbot.modules.stt.gate import safe_transcribe
@@ -19,16 +22,30 @@ async def handle(
return
if isinstance(update, raw.types.UpdateChannelReadMessagesContents):
chat_id = utils.get_peer_id(raw.types.PeerChannel(channel_id=update.channel_id))
candidates = await repository.pending_voice_reads(
candidates = await repository.voice_reads(
ctx.pool, ctx.account_id, update.messages, chat_id=chat_id
)
elif isinstance(update, raw.types.UpdateReadMessagesContents):
candidates = await repository.pending_voice_reads(
candidates = await repository.voice_reads(
ctx.pool, ctx.account_id, update.messages
)
else:
return
for cand_chat_id, message_id in candidates:
self_id = client.me.id if client.me else None
for cand_chat_id, message_id, sender_id, untranscribed in candidates:
if sender_id is not None and sender_id == self_id:
await receipts.insert_listened(
ctx.pool,
ctx.account_id,
cand_chat_id,
cand_chat_id,
datetime.now(UTC),
message_id,
str(update),
)
continue
if not untranscribed:
continue
meta = meta_from_chat_id(cand_chat_id, ctx.contacts.ids)
if ctx.resolve(meta).stt:
await safe_transcribe(client, ctx, cand_chat_id, message_id)
@@ -0,0 +1,26 @@
from datetime import UTC, datetime
from pyrogram import raw, utils
from userbot import PyroClient
from userbot.modules.read_receipts import repository
HANDLES = (raw.types.UpdateReadHistoryOutbox,)
async def handle(
client: PyroClient, update: raw.base.Update, _users: dict, _chats: dict
) -> None:
ctx = client.capture
if ctx is None or not isinstance(update, raw.types.UpdateReadHistoryOutbox):
return
chat_id = utils.get_peer_id(update.peer)
await repository.insert_read(
ctx.pool,
ctx.account_id,
chat_id,
chat_id,
datetime.now(UTC),
update.max_id,
str(update),
)
+52
View File
@@ -0,0 +1,52 @@
from io import BytesIO
from pyrogram.types import Story
from userbot import PyroClient
from userbot.modules.stories import repository
def _peer_id(story: Story) -> int:
if story.chat is not None:
return story.chat.id or 0
if story.from_user is not None:
return story.from_user.id or 0
return 0
@PyroClient.on_story()
async def on_story(client: PyroClient, story: Story) -> None:
ctx = client.capture
if ctx is None:
return
media_kind = story.media.name.lower() if story.media else None
storage_key: str | None = None
file_size: int | None = None
downloaded = False
if not story.deleted and story.media is not None:
buffer = await client.download_media(story, in_memory=True)
if isinstance(buffer, BytesIO):
data = buffer.getvalue()
storage_key = ctx.storage.put(data)
file_size = len(data)
downloaded = True
await repository.upsert_story(
ctx.pool,
ctx.account_id,
_peer_id(story),
story.id,
story.date,
story.expire_date,
story.caption,
media_kind,
storage_key,
file_size,
story.views,
str(story.raw),
pinned=bool(story.pinned),
deleted=bool(story.deleted),
downloaded=downloaded,
)
handlers = on_story.handlers
@@ -0,0 +1,3 @@
from userbot.modules.avatars.downloader import capture_avatar
__all__ = ["capture_avatar"]
@@ -0,0 +1,40 @@
from io import BytesIO
from pyrogram import Client
from userbot.modules.avatars import repository
from userbot.modules.capture.context import CaptureContext
async def capture_avatar( # noqa: PLR0913
client: Client,
ctx: CaptureContext,
owner_id: int,
owner_kind: str,
file_id: str,
unique_id: str,
raw: str,
) -> None:
if await repository.avatar_exists(ctx.pool, ctx.account_id, owner_id, unique_id):
return
storage_key: str | None = None
file_size: int | None = None
downloaded = False
buffer = await client.download_media(file_id, in_memory=True)
if isinstance(buffer, BytesIO):
data = buffer.getvalue()
storage_key = ctx.storage.put(data)
file_size = len(data)
downloaded = True
await repository.insert_avatar(
ctx.pool,
ctx.account_id,
owner_id,
owner_kind,
unique_id,
storage_key,
file_size,
None,
raw,
downloaded=downloaded,
)
@@ -0,0 +1,48 @@
import asyncpg
_INSERT_AVATAR = """
INSERT INTO avatars
(account_id, owner_id, owner_kind, unique_id, storage_key, file_size, mime,
downloaded, raw)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb)
ON CONFLICT (account_id, owner_id, unique_id) DO NOTHING
"""
_EXISTS = """
SELECT 1 FROM avatars
WHERE account_id = $1 AND owner_id = $2 AND unique_id = $3
"""
async def avatar_exists(
pool: asyncpg.Pool, account_id: int, owner_id: int, unique_id: str
) -> bool:
row = await pool.fetchval(_EXISTS, account_id, owner_id, unique_id)
return row is not None
async def insert_avatar( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
owner_id: int,
owner_kind: str,
unique_id: str,
storage_key: str | None,
file_size: int | None,
mime: str | None,
raw: str,
*,
downloaded: bool,
) -> None:
await pool.execute(
_INSERT_AVATAR,
account_id,
owner_id,
owner_kind,
unique_id,
storage_key,
file_size,
mime,
downloaded,
raw,
)
@@ -3,6 +3,8 @@ from pyrogram.types import Message
from userbot.modules.capture import repository
from userbot.modules.capture.context import CaptureContext
from userbot.modules.links import extract_links
from userbot.modules.links import repository as links_repository
from userbot.modules.media import capture_media, self_destruct_ttl
from utils.policy.models import CaptureToggles
@@ -55,3 +57,13 @@ async def capture_message(
await repository.insert_callbacks(
ctx.pool, ctx.account_id, chat_id, message.id, buttons
)
if message.service is not None:
from userbot.modules.groups import capture_service # noqa: PLC0415
await capture_service(client, message, ctx)
links = extract_links(message)
if links:
await links_repository.insert_links(
ctx.pool, ctx.account_id, chat_id, message.id, links, str(message)
)
@@ -0,0 +1,3 @@
from userbot.modules.groups.service import capture_service
__all__ = ["capture_service"]
@@ -0,0 +1,68 @@
from datetime import datetime
import asyncpg
_INSERT_CHAT_HISTORY = """
INSERT INTO chat_history
(account_id, chat_id, message_id, event, title, photo_unique_id, actor_id,
ts, raw)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb)
ON CONFLICT (account_id, chat_id, message_id) DO NOTHING
"""
_INSERT_MEMBERSHIP = """
INSERT INTO memberships
(account_id, chat_id, message_id, user_id, event, actor_id, ts, raw)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb)
ON CONFLICT (account_id, chat_id, message_id, user_id) DO NOTHING
"""
async def insert_chat_history( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
chat_id: int,
message_id: int,
event: str,
title: str | None,
photo_unique_id: str | None,
actor_id: int | None,
ts: datetime,
raw: str,
) -> None:
await pool.execute(
_INSERT_CHAT_HISTORY,
account_id,
chat_id,
message_id,
event,
title,
photo_unique_id,
actor_id,
ts,
raw,
)
async def insert_membership( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
chat_id: int,
message_id: int,
user_id: int,
event: str,
actor_id: int | None,
ts: datetime,
raw: str,
) -> None:
await pool.execute(
_INSERT_MEMBERSHIP,
account_id,
chat_id,
message_id,
user_id,
event,
actor_id,
ts,
raw,
)
@@ -0,0 +1,91 @@
from pyrogram import Client, enums
from pyrogram.types import Message
from userbot.modules.avatars import capture_avatar
from userbot.modules.capture.context import CaptureContext
from userbot.modules.capture.message import sender_id
from userbot.modules.groups import repository
async def capture_service(
client: Client, message: Message, ctx: CaptureContext
) -> None:
if message.service is None or message.chat is None or message.date is None:
return
chat_id = message.chat.id or 0
actor = sender_id(message)
ts = message.date
service = message.service
if service is enums.MessageServiceType.NEW_CHAT_TITLE:
await repository.insert_chat_history(
ctx.pool,
ctx.account_id,
chat_id,
message.id,
"title",
message.new_chat_title,
None,
actor,
ts,
str(message),
)
elif service is enums.MessageServiceType.NEW_CHAT_PHOTO:
photo = message.new_chat_photo
unique_id = photo.file_unique_id if photo else None
await repository.insert_chat_history(
ctx.pool,
ctx.account_id,
chat_id,
message.id,
"photo",
None,
unique_id,
actor,
ts,
str(message),
)
if photo is not None and unique_id is not None:
await capture_avatar(
client, ctx, chat_id, "chat", photo.file_id, unique_id, str(message)
)
elif service is enums.MessageServiceType.DELETE_CHAT_PHOTO:
await repository.insert_chat_history(
ctx.pool,
ctx.account_id,
chat_id,
message.id,
"photo_deleted",
None,
None,
actor,
ts,
str(message),
)
elif service is enums.MessageServiceType.NEW_CHAT_MEMBERS:
for member in message.new_chat_members or []:
await repository.insert_membership(
ctx.pool,
ctx.account_id,
chat_id,
message.id,
member.id,
"join",
actor,
ts,
str(message),
)
elif service is enums.MessageServiceType.LEFT_CHAT_MEMBER:
member = message.left_chat_member
if member is not None:
await repository.insert_membership(
ctx.pool,
ctx.account_id,
chat_id,
message.id,
member.id,
"leave",
actor,
ts,
str(message),
)
@@ -0,0 +1,3 @@
from userbot.modules.links.extract import LinkRow, extract_links
__all__ = ["LinkRow", "extract_links"]
@@ -0,0 +1,54 @@
from dataclasses import dataclass
from pyrogram.enums import MessageEntityType
from pyrogram.parser.utils import add_surrogates, remove_surrogates
from pyrogram.types import Message
@dataclass
class LinkRow:
url: str
kind: str
web_url: str | None = None
web_title: str | None = None
web_description: str | None = None
web_site_name: str | None = None
def _entity_urls(text: str | None, entities: list | None) -> list[tuple[str, str]]:
if not text or not entities:
return []
surrogated = add_surrogates(str(text))
found: list[tuple[str, str]] = []
for entity in entities:
if entity.type is MessageEntityType.TEXT_LINK and entity.url:
found.append(("text_link", entity.url))
elif entity.type is MessageEntityType.URL:
sliced = surrogated[entity.offset : entity.offset + entity.length]
found.append(("url", remove_surrogates(sliced)))
return found
def extract_links(message: Message) -> list[LinkRow]:
by_url: dict[str, LinkRow] = {}
sources = (
(message.text, message.entities),
(message.caption, message.caption_entities),
)
for text, entities in sources:
for kind, url in _entity_urls(text, entities):
if url not in by_url:
by_url[url] = LinkRow(url=url, kind=kind)
web_page = message.web_page
if web_page is not None and web_page.url:
row = by_url.get(web_page.url)
if row is None:
row = LinkRow(url=web_page.url, kind="web_page")
by_url[web_page.url] = row
row.web_url = web_page.url
row.web_title = web_page.title
row.web_description = web_page.description
row.web_site_name = web_page.site_name
return list(by_url.values())
@@ -0,0 +1,47 @@
import asyncpg
from userbot.modules.links.extract import LinkRow
_INSERT = """
INSERT INTO links
(account_id, chat_id, message_id, position, url, kind, web_url, web_title,
web_description, web_site_name, raw)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11::jsonb)
"""
async def insert_links( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
chat_id: int,
message_id: int,
rows: list[LinkRow],
raw: str,
) -> None:
async with pool.acquire() as conn, conn.transaction():
await conn.execute(
"DELETE FROM links "
"WHERE account_id = $1 AND chat_id = $2 AND message_id = $3",
account_id,
chat_id,
message_id,
)
await conn.executemany(
_INSERT,
[
(
account_id,
chat_id,
message_id,
position,
row.url,
row.kind,
row.web_url,
row.web_title,
row.web_description,
row.web_site_name,
raw,
)
for position, row in enumerate(rows)
],
)
@@ -0,0 +1,32 @@
from datetime import datetime
import asyncpg
_INSERT_STATUS = """
INSERT INTO presence
(account_id, peer_id, ts, status, last_online_date, next_offline_date, raw)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
ON CONFLICT (account_id, peer_id, ts) DO NOTHING
"""
async def insert_status( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
peer_id: int,
ts: datetime,
status: str,
last_online_date: datetime | None,
next_offline_date: datetime | None,
raw: str,
) -> None:
await pool.execute(
_INSERT_STATUS,
account_id,
peer_id,
ts,
status,
last_online_date,
next_offline_date,
raw,
)
@@ -0,0 +1,7 @@
from userbot.modules.profiles.parse import (
ProfileFields,
active_username,
snapshot_from_user,
)
__all__ = ["ProfileFields", "active_username", "snapshot_from_user"]
@@ -0,0 +1,39 @@
from dataclasses import dataclass
from pyrogram import Client, raw
from pyrogram.types import User
@dataclass(frozen=True)
class ProfileFields:
first_name: str | None = None
last_name: str | None = None
username: str | None = None
phone: str | None = None
photo_unique_id: str | None = None
is_deleted_account: bool = False
def active_username(usernames: list[raw.types.Username]) -> str | None:
for username in usernames:
if getattr(username, "active", False):
return username.username
return usernames[0].username if usernames else None
def snapshot_from_user(
client: Client, raw_user: raw.types.User
) -> tuple[ProfileFields, str | None, str | None]:
user = User._parse(client, raw_user) # noqa: SLF001
photo = user.photo if user else None
photo_unique_id = photo.big_photo_unique_id if photo else None
photo_file_id = photo.big_file_id if photo else None
fields = ProfileFields(
first_name=user.first_name if user else None,
last_name=user.last_name if user else None,
username=user.username if user else None,
phone=user.phone_number if user else None,
photo_unique_id=photo_unique_id,
is_deleted_account=bool(getattr(raw_user, "deleted", False)),
)
return fields, photo_file_id, photo_unique_id
@@ -0,0 +1,66 @@
import asyncpg
from userbot.modules.profiles.parse import ProfileFields
_GET_PEER = """
SELECT first_name, last_name, username, phone, photo_unique_id, is_deleted_account
FROM peers
WHERE account_id = $1 AND peer_id = $2
"""
_UPSERT_PEER = """
INSERT INTO peers
(account_id, peer_id, first_name, last_name, username, phone,
photo_unique_id, is_deleted_account, raw, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb, now())
ON CONFLICT (account_id, peer_id) DO UPDATE SET
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
username = EXCLUDED.username,
phone = EXCLUDED.phone,
photo_unique_id = EXCLUDED.photo_unique_id,
is_deleted_account = EXCLUDED.is_deleted_account,
raw = EXCLUDED.raw,
updated_at = now()
"""
_INSERT_HISTORY = """
INSERT INTO peer_history
(account_id, peer_id, observed_at, first_name, last_name, username, phone,
photo_unique_id, is_deleted_account, raw)
VALUES ($1, $2, now(), $3, $4, $5, $6, $7, $8, $9::jsonb)
ON CONFLICT DO NOTHING
"""
async def get_peer(
pool: asyncpg.Pool, account_id: int, peer_id: int
) -> ProfileFields | None:
row = await pool.fetchrow(_GET_PEER, account_id, peer_id)
if row is None:
return None
return ProfileFields(
first_name=row["first_name"],
last_name=row["last_name"],
username=row["username"],
phone=row["phone"],
photo_unique_id=row["photo_unique_id"],
is_deleted_account=row["is_deleted_account"],
)
async def write_profile(
pool: asyncpg.Pool, account_id: int, peer_id: int, fields: ProfileFields, raw: str
) -> None:
args = (
fields.first_name,
fields.last_name,
fields.username,
fields.phone,
fields.photo_unique_id,
fields.is_deleted_account,
raw,
)
async with pool.acquire() as conn, conn.transaction():
await conn.execute(_UPSERT_PEER, account_id, peer_id, *args)
await conn.execute(_INSERT_HISTORY, account_id, peer_id, *args)
@@ -0,0 +1,36 @@
from datetime import datetime
import asyncpg
_INSERT = """
INSERT INTO read_receipts
(account_id, chat_id, message_id, ts, reader_id, kind, raw)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb)
ON CONFLICT (account_id, chat_id, message_id, ts) DO NOTHING
"""
async def insert_read( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
chat_id: int,
reader_id: int,
ts: datetime,
max_id: int,
raw: str,
) -> None:
await pool.execute(_INSERT, account_id, chat_id, max_id, ts, reader_id, "read", raw)
async def insert_listened( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
chat_id: int,
reader_id: int,
ts: datetime,
message_id: int,
raw: str,
) -> None:
await pool.execute(
_INSERT, account_id, chat_id, message_id, ts, reader_id, "listened", raw
)
@@ -0,0 +1,59 @@
from datetime import datetime
import asyncpg
_UPSERT_STORY = """
INSERT INTO stories
(account_id, peer_id, story_id, date, expire_date, caption, media_kind,
storage_key, file_size, downloaded, views, pinned, deleted, raw, observed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::jsonb, now())
ON CONFLICT (account_id, peer_id, story_id) DO UPDATE SET
date = EXCLUDED.date,
expire_date = EXCLUDED.expire_date,
caption = EXCLUDED.caption,
media_kind = EXCLUDED.media_kind,
storage_key = COALESCE(EXCLUDED.storage_key, stories.storage_key),
file_size = COALESCE(EXCLUDED.file_size, stories.file_size),
downloaded = stories.downloaded OR EXCLUDED.downloaded,
views = EXCLUDED.views,
pinned = EXCLUDED.pinned,
deleted = EXCLUDED.deleted,
raw = EXCLUDED.raw
"""
async def upsert_story( # noqa: PLR0913
pool: asyncpg.Pool,
account_id: int,
peer_id: int,
story_id: int,
date: datetime | None,
expire_date: datetime | None,
caption: str | None,
media_kind: str | None,
storage_key: str | None,
file_size: int | None,
views: int | None,
raw: str,
*,
pinned: bool,
deleted: bool,
downloaded: bool,
) -> None:
await pool.execute(
_UPSERT_STORY,
account_id,
peer_id,
story_id,
date,
expire_date,
caption,
media_kind,
storage_key,
file_size,
downloaded,
views,
pinned,
deleted,
raw,
)
+26 -13
View File
@@ -9,16 +9,22 @@ UPDATE media SET extracted_text = $4
WHERE account_id = $1 AND chat_id = $2 AND message_id = $3
"""
_PENDING_BOX = """
SELECT chat_id, message_id FROM media
WHERE account_id = $1 AND message_id = ANY($2::bigint[])
AND chat_id > $3 AND kind = ANY($4::text[]) AND extracted_text IS NULL
_VOICE_READS_BOX = """
SELECT md.chat_id, md.message_id, m.sender_id,
md.extracted_text IS NULL AS untranscribed
FROM media md
LEFT JOIN messages m USING (account_id, chat_id, message_id)
WHERE md.account_id = $1 AND md.message_id = ANY($2::bigint[])
AND md.chat_id > $3 AND md.kind = ANY($4::text[])
"""
_PENDING_CHANNEL = """
SELECT chat_id, message_id FROM media
WHERE account_id = $1 AND message_id = ANY($2::bigint[])
AND chat_id = $3 AND kind = ANY($4::text[]) AND extracted_text IS NULL
_VOICE_READS_CHANNEL = """
SELECT md.chat_id, md.message_id, m.sender_id,
md.extracted_text IS NULL AS untranscribed
FROM media md
LEFT JOIN messages m USING (account_id, chat_id, message_id)
WHERE md.account_id = $1 AND md.message_id = ANY($2::bigint[])
AND md.chat_id = $3 AND md.kind = ANY($4::text[])
"""
@@ -28,18 +34,25 @@ async def set_extracted_text(
await pool.execute(_SET_EXTRACTED_TEXT, account_id, chat_id, message_id, text)
async def pending_voice_reads(
async def voice_reads(
pool: asyncpg.Pool,
account_id: int,
message_ids: list[int],
chat_id: int | None = None,
) -> list[tuple[int, int]]:
) -> list[tuple[int, int, int | None, bool]]:
if chat_id is None:
rows = await pool.fetch(
_PENDING_BOX, account_id, message_ids, CHANNEL_ID_THRESHOLD, _VOICE_KINDS
_VOICE_READS_BOX,
account_id,
message_ids,
CHANNEL_ID_THRESHOLD,
_VOICE_KINDS,
)
else:
rows = await pool.fetch(
_PENDING_CHANNEL, account_id, message_ids, chat_id, _VOICE_KINDS
_VOICE_READS_CHANNEL, account_id, message_ids, chat_id, _VOICE_KINDS
)
return [(row["chat_id"], row["message_id"]) for row in rows]
return [
(row["chat_id"], row["message_id"], row["sender_id"], row["untranscribed"])
for row in rows
]
+176
View File
@@ -234,3 +234,179 @@ class CapturePolicy(SQLModel, table=True):
onupdate=func.now(),
)
)
class Presence(SQLModel, table=True):
__tablename__ = "presence"
account_id: int = Field(primary_key=True)
peer_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
ts: datetime = Field(sa_column=Column(DateTime(timezone=True), primary_key=True))
status: str
last_online_date: datetime | None = Field(
default=None, sa_column=Column(DateTime(timezone=True))
)
next_offline_date: datetime | None = Field(
default=None, sa_column=Column(DateTime(timezone=True))
)
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
class ReadReceipt(SQLModel, table=True):
__tablename__ = "read_receipts"
account_id: int = Field(primary_key=True)
chat_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
message_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
ts: datetime = Field(sa_column=Column(DateTime(timezone=True), primary_key=True))
reader_id: int = Field(sa_column=Column(BigInteger, nullable=False))
kind: str
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
class Peer(SQLModel, table=True):
__tablename__ = "peers"
account_id: int = Field(primary_key=True)
peer_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
first_name: str | None = None
last_name: str | None = None
username: str | None = None
phone: str | None = None
photo_unique_id: str | None = None
is_deleted_account: bool = False
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
updated_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
)
class PeerHistory(SQLModel, table=True):
__tablename__ = "peer_history"
account_id: int = Field(primary_key=True)
peer_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
observed_at: datetime = Field(
sa_column=Column(DateTime(timezone=True), primary_key=True)
)
first_name: str | None = None
last_name: str | None = None
username: str | None = None
phone: str | None = None
photo_unique_id: str | None = None
is_deleted_account: bool = False
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
class Avatar(SQLModel, table=True):
__tablename__ = "avatars"
id: int | None = Field(default=None, primary_key=True)
account_id: int
owner_id: int = Field(sa_column=Column(BigInteger, nullable=False))
owner_kind: str
unique_id: str
storage_key: str | None = None
file_size: int | None = Field(default=None, sa_column=Column(BigInteger))
mime: str | None = None
downloaded: bool = False
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
first_seen_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
)
class ChatHistory(SQLModel, table=True):
__tablename__ = "chat_history"
account_id: int = Field(primary_key=True)
chat_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
message_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
event: str
title: str | None = None
photo_unique_id: str | None = None
actor_id: int | None = Field(default=None, sa_column=Column(BigInteger))
ts: datetime = Field(sa_column=Column(DateTime(timezone=True), nullable=False))
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
class Membership(SQLModel, table=True):
__tablename__ = "memberships"
account_id: int = Field(primary_key=True)
chat_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
message_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
user_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
event: str
actor_id: int | None = Field(default=None, sa_column=Column(BigInteger))
ts: datetime = Field(sa_column=Column(DateTime(timezone=True), nullable=False))
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
class Story(SQLModel, table=True):
__tablename__ = "stories"
account_id: int = Field(primary_key=True)
peer_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
story_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
date: datetime | None = Field(
default=None, sa_column=Column(DateTime(timezone=True))
)
expire_date: datetime | None = Field(
default=None, sa_column=Column(DateTime(timezone=True))
)
caption: str | None = None
media_kind: str | None = None
storage_key: str | None = None
file_size: int | None = Field(default=None, sa_column=Column(BigInteger))
downloaded: bool = False
views: int | None = None
pinned: bool = False
deleted: bool = False
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)
observed_at: datetime = Field(
sa_column=Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
)
class Link(SQLModel, table=True):
__tablename__ = "links"
account_id: int = Field(primary_key=True)
chat_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
message_id: int = Field(sa_column=Column(BigInteger, primary_key=True))
position: int = Field(primary_key=True)
url: str
kind: str
web_url: str | None = None
web_title: str | None = None
web_description: str | None = None
web_site_name: str | None = None
raw: dict[str, Any] = Field(
default_factory=dict, sa_column=Column(JSONB, nullable=False)
)