feat: init

This commit is contained in:
h
2026-04-16 01:16:54 +02:00
commit 14bf1047ee
51 changed files with 2227 additions and 0 deletions

35
src/bot/__init__.py Normal file
View File

@@ -0,0 +1,35 @@
import asyncio
import contextlib
from rich import print
async def runner():
from utils.db import init_db
from . import handlers
from .common import bot, dp
await init_db()
dp.include_routers(handlers.router)
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot)
def plugins():
from rich import traceback
traceback.install(show_locals=True)
def main():
plugins()
print("Starting...")
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(runner())
print("[red]Stopped.[/]")

4
src/bot/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from . import main
if __name__ == "__main__":
main()

14
src/bot/common.py Normal file
View File

@@ -0,0 +1,14 @@
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.fsm.storage.memory import MemoryStorage
from utils import env
bot = Bot(
token=env.bot.token.get_secret_value(),
default=DefaultBotProperties(parse_mode="HTML"),
)
storage = MemoryStorage()
dp = Dispatcher(storage=storage)
__all__ = ["bot", "dp"]

View File

@@ -0,0 +1,14 @@
from .approval import ApprovalAction, ApprovalCallback
from .menu import MenuAction, MenuCallback
from .settings import SettingsAction, SettingsCallback
from .states import AddChannelStates
__all__ = [
"AddChannelStates",
"ApprovalAction",
"ApprovalCallback",
"MenuAction",
"MenuCallback",
"SettingsAction",
"SettingsCallback",
]

View File

@@ -0,0 +1,13 @@
from enum import StrEnum
from aiogram.filters.callback_data import CallbackData
class ApprovalAction(StrEnum):
ACCEPT = "accept"
REJECT = "reject"
class ApprovalCallback(CallbackData, prefix="approval"):
action: ApprovalAction
post_id: str

16
src/bot/factories/menu.py Normal file
View File

@@ -0,0 +1,16 @@
from enum import StrEnum
from aiogram.filters.callback_data import CallbackData
class MenuAction(StrEnum):
PAGE = "page"
SELECT = "select"
SETTINGS = "settings"
ADD = "add"
class MenuCallback(CallbackData, prefix="menu"):
action: MenuAction
channel_id: int = 0
page: int = 0

View File

@@ -0,0 +1,21 @@
from enum import StrEnum
from aiogram.filters.callback_data import CallbackData
class SettingsAction(StrEnum):
MAIN = "main"
MEMBERS = "members"
PAGE = "page"
MODE_TOGGLE = "mode_toggle"
KICK = "kick"
INVITE_MODE = "invite_mode"
INVITE_CREATE = "invite_create"
class SettingsCallback(CallbackData, prefix="settings"):
action: SettingsAction
channel_id: int
user_id: int = 0
page: int = 0
mode: str = "instant"

View File

@@ -0,0 +1,5 @@
from aiogram.fsm.state import State, StatesGroup
class AddChannelStates(StatesGroup):
waiting_channel = State()

View File

@@ -0,0 +1,3 @@
from .channel_owner import ChannelOwnerFilter
__all__ = ["ChannelOwnerFilter"]

View File

@@ -0,0 +1,30 @@
from typing import Any, Protocol
from aiogram.filters import Filter
from aiogram.types import CallbackQuery
from utils.db.models import Channel
class HasChannelId(Protocol):
channel_id: int
class ChannelOwnerFilter(Filter):
async def __call__(
self, callback: CallbackQuery, callback_data: HasChannelId | None = None
) -> bool | dict[str, Any]:
user = callback.from_user
if user is None or callback_data is None:
return False
channel = await Channel.get_by_id(callback_data.channel_id)
if channel is None:
await callback.answer("Канал не найден.", show_alert=True)
return False
if channel.owner_id != user.id:
await callback.answer("Нет доступа.", show_alert=True)
return False
return {"channel": channel}

View File

@@ -0,0 +1,7 @@
from aiogram import Router
from . import common, menu, posting, settings
router = Router()
router.include_routers(common.router, menu.router, settings.router, posting.router)

View File

@@ -0,0 +1,7 @@
from aiogram import Router
from . import initialize, start
router = Router()
router.include_routers(initialize.router, start.router)

View File

@@ -0,0 +1,17 @@
from aiogram import Bot, Router, types
from rich import print
router = Router()
@router.startup()
async def startup(bot: Bot):
await bot.set_my_commands(
[types.BotCommand(command="/start", description="Запустить бота")]
)
print(f"[green]Started as[/] @{(await bot.me()).username}")
@router.shutdown()
async def shutdown():
print("Shutting down bot...")

View File

@@ -0,0 +1,80 @@
from aiogram import Router
from aiogram.filters import CommandObject, CommandStart
from aiogram.types import Message
from aiogram.utils.deep_linking import decode_payload
from bot.handlers.menu.helpers import get_menu_view
from utils.db.models import Channel, Invite, User
router = Router()
def get_display_name(message: Message) -> str:
if message.from_user is None:
return ""
if message.from_user.username:
return f"@{message.from_user.username}"
return message.from_user.full_name
def decode_start_payload(command: CommandObject) -> str:
if not command.args:
return ""
try:
return decode_payload(command.args)
except Exception:
return command.args
@router.message(CommandStart(deep_link=True))
async def start_deep_link(message: Message, command: CommandObject):
if message.from_user is None:
return
payload = decode_start_payload(command)
if not payload.startswith("inv_"):
await start_cmd(message)
return
invite = await Invite.get_by_code(payload[4:])
if invite is None:
await message.answer("Ссылка недействительна или уже использована.")
return
channel = await Channel.get_by_id(invite.channel_id)
if channel is None:
await message.answer("Канал не найден.")
return
user_id = message.from_user.id
if channel.owner_id == user_id:
await message.answer("Вы владелец этого канала.")
return
if channel.get_member(user_id) is not None:
await message.answer("Вы уже участник этого канала.")
return
await channel.add_member(user_id, get_display_name(message), invite.mode)
invite.used = True
await invite.save()
await User.get_or_create(user_id)
mode_label = "сразу" if invite.mode == "instant" else "после проверки"
await message.answer(
f"Вы добавлены в <b>{channel.title}</b> (режим: {mode_label})."
)
@router.message(CommandStart())
async def start_cmd(message: Message):
if message.from_user is None:
return
text, reply_markup = await get_menu_view(message.from_user.id)
if text == "У вас пока нет каналов.":
text = (
"Привет! Я бот для предложки постов.\n\n"
"Добавьте канал или перейдите по инвайт-ссылке."
)
await message.answer(text, reply_markup=reply_markup)

View File

@@ -0,0 +1,7 @@
from aiogram import Router
from . import add_channel, callbacks
router = Router()
router.include_routers(callbacks.router, add_channel.router)

View File

@@ -0,0 +1,76 @@
from aiogram import Bot, Router
from aiogram.fsm.context import FSMContext
from aiogram.types import Message
from bot.factories import AddChannelStates
from bot.handlers.menu.helpers import get_menu_view
from utils.db.models import Channel
router = Router()
def parse_chat_reference(raw: str) -> int | str | None:
if raw.lstrip("-").isdigit():
return int(raw)
if raw.startswith("@"):
return raw
return None
def has_admin(admins: list, user_id: int) -> bool:
return any(admin.user.id == user_id for admin in admins)
def get_channel_owner_id(admins: list) -> int | None:
for admin in admins:
if admin.status == "creator":
return admin.user.id
return None
@router.message(AddChannelStates.waiting_channel)
async def add_channel(message: Message, state: FSMContext, bot: Bot):
if message.from_user is None or message.text is None:
return
chat_id = parse_chat_reference(message.text.strip())
if chat_id is None:
await message.answer("Отправьте @username или числовой ID канала.")
return
try:
admins = await bot.get_chat_administrators(chat_id)
except Exception:
await message.answer(
"Не удалось получить информацию о канале.\n"
"Убедитесь, что бот добавлен как админ."
)
return
bot_info = await bot.me()
if not has_admin(admins, bot_info.id):
await message.answer("Бот не является администратором этого канала.")
return
owner_id = get_channel_owner_id(admins)
if owner_id != message.from_user.id:
await message.answer("Вы не являетесь владельцем этого канала.")
return
chat = await bot.get_chat(chat_id)
existing = await Channel.get_by_id(chat.id)
if existing is not None:
await message.answer("Этот канал уже добавлен.")
await state.clear()
return
channel = Channel(
id=chat.id, owner_id=message.from_user.id, title=chat.title or str(chat.id)
)
await channel.insert()
await state.clear()
_, reply_markup = await get_menu_view(message.from_user.id)
await message.answer(
f"Канал <b>{channel.title}</b> добавлен!", reply_markup=reply_markup
)

View File

@@ -0,0 +1,62 @@
from aiogram import F, Router
from aiogram.fsm.context import FSMContext
from aiogram.types import CallbackQuery, Message
from bot.factories import AddChannelStates, MenuAction, MenuCallback
from bot.filters import ChannelOwnerFilter
from bot.handlers.menu.helpers import edit_menu
from bot.keyboards import channel_settings_kb
from utils.db.models import Channel, User
router = Router()
@router.callback_query(MenuCallback.filter(F.action == MenuAction.PAGE))
async def show_menu_page(callback: CallbackQuery, callback_data: MenuCallback):
if callback.from_user is None or not isinstance(callback.message, Message):
return
await edit_menu(callback.message, callback.from_user.id, callback_data.page)
await callback.answer()
@router.callback_query(MenuCallback.filter(F.action == MenuAction.SELECT))
async def toggle_active_channel(callback: CallbackQuery, callback_data: MenuCallback):
if callback.from_user is None or not isinstance(callback.message, Message):
return
user = await User.get_or_create(callback.from_user.id)
if user.active_channel_id == callback_data.channel_id:
user.active_channel_id = None
else:
user.active_channel_id = callback_data.channel_id
await user.save()
await edit_menu(callback.message, callback.from_user.id, callback_data.page)
await callback.answer()
@router.callback_query(
MenuCallback.filter(F.action == MenuAction.SETTINGS), ChannelOwnerFilter()
)
async def open_channel_settings(callback: CallbackQuery, channel: Channel):
if not isinstance(callback.message, Message):
return
await callback.message.edit_text(
f"⚙️ <b>{channel.title}</b>", reply_markup=channel_settings_kb(channel)
)
await callback.answer()
@router.callback_query(MenuCallback.filter(F.action == MenuAction.ADD))
async def request_channel_reference(callback: CallbackQuery, state: FSMContext):
if not isinstance(callback.message, Message):
return
await state.set_state(AddChannelStates.waiting_channel)
await callback.message.edit_text(
"Отправьте @username или числовой ID канала.\n"
"Бот должен быть добавлен как админ с правом отправки сообщений."
)
await callback.answer()

View File

@@ -0,0 +1,19 @@
from aiogram.types import InlineKeyboardMarkup, Message
from bot.keyboards import menu_kb
from utils.db.models import Channel, User
async def get_menu_view(
user_id: int, page: int = 0
) -> tuple[str, InlineKeyboardMarkup]:
user = await User.get_or_create(user_id)
channels = await Channel.get_user_channels(user_id)
text = "Ваши каналы:" if channels else "У вас пока нет каналов."
reply_markup = menu_kb(channels, user_id, user.active_channel_id, page)
return text, reply_markup
async def edit_menu(message: Message, user_id: int, page: int = 0) -> None:
text, reply_markup = await get_menu_view(user_id, page)
await message.edit_text(text, reply_markup=reply_markup)

View File

@@ -0,0 +1,7 @@
from aiogram import Router
from . import approval, messages
router = Router()
router.include_routers(approval.router, messages.router)

View File

@@ -0,0 +1,85 @@
from aiogram import Bot, F, Router
from aiogram.types import CallbackQuery, Message
from bson import ObjectId
from bson.errors import InvalidId
from bot.factories import ApprovalAction, ApprovalCallback
from utils.db.models import Channel, PendingPost
router = Router()
async def resolve_pending_context(
callback: CallbackQuery, post_id: str
) -> tuple[PendingPost, Channel] | None:
if callback.from_user is None:
return None
try:
pending = await PendingPost.get(ObjectId(post_id))
except InvalidId:
pending = None
if pending is None:
await callback.answer("Пост не найден.", show_alert=True)
return None
if pending.status != "pending":
await callback.answer("Уже обработано.", show_alert=True)
return None
channel = await Channel.get_by_id(pending.channel_id)
if channel is None or channel.owner_id != callback.from_user.id:
await callback.answer("Нет доступа.", show_alert=True)
return None
return pending, channel
async def update_approval_message(callback: CallbackQuery, status_text: str) -> None:
if not isinstance(callback.message, Message):
return
original_text = callback.message.text or ""
await callback.message.edit_text(f"{original_text}\n\n{status_text}")
@router.callback_query(ApprovalCallback.filter(F.action == ApprovalAction.ACCEPT))
async def accept_post(
callback: CallbackQuery, callback_data: ApprovalCallback, bot: Bot
):
context = await resolve_pending_context(callback, callback_data.post_id)
if context is None:
return
pending, channel = context
if pending.is_forwarded:
await bot.forward_message(
chat_id=channel.id,
from_chat_id=pending.source_chat_id,
message_id=pending.source_message_id,
)
else:
await bot.copy_message(
chat_id=channel.id,
from_chat_id=pending.source_chat_id,
message_id=pending.source_message_id,
)
pending.status = "accepted"
await pending.save()
await update_approval_message(callback, "✅ <b>Принято</b>")
await callback.answer()
@router.callback_query(ApprovalCallback.filter(F.action == ApprovalAction.REJECT))
async def reject_post(callback: CallbackQuery, callback_data: ApprovalCallback):
context = await resolve_pending_context(callback, callback_data.post_id)
if context is None:
return
pending, _ = context
pending.status = "rejected"
await pending.save()
await update_approval_message(callback, "❌ <b>Отменено</b>")
await callback.answer()

View File

@@ -0,0 +1,97 @@
from contextlib import suppress
from aiogram import Bot, F, Router
from aiogram.exceptions import TelegramBadRequest
from aiogram.types import Message, ReactionTypeEmoji
from bot.keyboards import approval_kb
from utils.db.models import Channel, PendingPost, User
router = Router()
async def clear_active_channel(user: User) -> None:
user.active_channel_id = None
await user.save()
def get_contributor_name(message: Message) -> str:
if message.from_user is None:
return ""
if message.from_user.username:
return f"@{message.from_user.username}"
return message.from_user.full_name
async def react(message: Message, emoji: str) -> None:
with suppress(TelegramBadRequest):
await message.react([ReactionTypeEmoji(emoji=emoji)])
async def send_instant_post(
message: Message, channel: Channel, is_forwarded: bool
) -> None:
if is_forwarded:
await message.forward(chat_id=channel.id)
else:
await message.copy_to(chat_id=channel.id)
await react(message, "👍")
async def send_post_for_approval(
message: Message, bot: Bot, channel: Channel, is_forwarded: bool
) -> None:
if message.from_user is None:
return
pending = PendingPost(
source_chat_id=message.chat.id,
source_message_id=message.message_id,
channel_id=channel.id,
contributor_id=message.from_user.id,
is_forwarded=is_forwarded,
)
await pending.insert()
if is_forwarded:
await message.forward(chat_id=channel.owner_id)
else:
await message.copy_to(chat_id=channel.owner_id)
await bot.send_message(
chat_id=channel.owner_id,
text=f"📝 Предложка в <b>{channel.title}</b>",
reply_markup=approval_kb(str(pending.id), get_contributor_name(message)),
)
await react(message, "👀")
@router.message(~F.text.startswith("/"))
async def handle_post(message: Message, bot: Bot):
if message.from_user is None:
return
user = await User.get_by_id(message.from_user.id)
if user is None or user.active_channel_id is None:
return
channel = await Channel.get_by_id(user.active_channel_id)
if channel is None:
await clear_active_channel(user)
return
if channel.owner_id == message.from_user.id:
mode = "instant"
else:
member = channel.get_member(message.from_user.id)
if member is None:
await clear_active_channel(user)
return
mode = member.mode
is_forwarded = message.forward_origin is not None
if mode == "instant":
await send_instant_post(message, channel, is_forwarded)
return
await send_post_for_approval(message, bot, channel, is_forwarded)

View File

@@ -0,0 +1,7 @@
from aiogram import Router
from . import admin
router = Router()
router.include_router(admin.router)

View File

@@ -0,0 +1,154 @@
from uuid import uuid4
from aiogram import Bot, F, Router
from aiogram.types import CallbackQuery, Message
from aiogram.utils.deep_linking import create_start_link
from bot.factories import SettingsAction, SettingsCallback
from bot.filters import ChannelOwnerFilter
from bot.keyboards import channel_settings_kb, invite_mode_kb, members_kb
from utils.db.models import Channel, Invite
router = Router()
def settings_text(channel: Channel) -> str:
return f"⚙️ <b>{channel.title}</b>"
def members_text(channel: Channel) -> str:
return f"👥 <b>{channel.title}</b>"
async def refresh_channel(channel: Channel) -> Channel:
return await Channel.get_by_id(channel.id) or channel
@router.callback_query(
SettingsCallback.filter(F.action == SettingsAction.MAIN), ChannelOwnerFilter()
)
async def show_settings(callback: CallbackQuery, channel: Channel):
if not isinstance(callback.message, Message):
return
await callback.message.edit_text(
settings_text(channel), reply_markup=channel_settings_kb(channel)
)
await callback.answer()
@router.callback_query(
SettingsCallback.filter(F.action == SettingsAction.MEMBERS), ChannelOwnerFilter()
)
@router.callback_query(
SettingsCallback.filter(F.action == SettingsAction.PAGE), ChannelOwnerFilter()
)
async def show_members(
callback: CallbackQuery, callback_data: SettingsCallback, channel: Channel
):
if not isinstance(callback.message, Message):
return
if not channel.members:
await callback.answer("Нет участников.", show_alert=True)
return
await callback.message.edit_text(
members_text(channel), reply_markup=members_kb(channel, callback_data.page)
)
await callback.answer()
@router.callback_query(
SettingsCallback.filter(F.action == SettingsAction.MODE_TOGGLE),
ChannelOwnerFilter(),
)
async def toggle_member_mode(
callback: CallbackQuery, callback_data: SettingsCallback, channel: Channel
):
if not isinstance(callback.message, Message):
return
member = channel.get_member(callback_data.user_id)
if member is None:
await callback.answer("Участник не найден.", show_alert=True)
return
next_mode = "approval" if member.mode == "instant" else "instant"
await channel.set_member_mode(callback_data.user_id, next_mode)
refreshed = await refresh_channel(channel)
await callback.message.edit_reply_markup(
reply_markup=members_kb(refreshed, callback_data.page)
)
await callback.answer()
@router.callback_query(
SettingsCallback.filter(F.action == SettingsAction.KICK), ChannelOwnerFilter()
)
async def kick_member(
callback: CallbackQuery, callback_data: SettingsCallback, channel: Channel
):
if not isinstance(callback.message, Message):
return
await channel.remove_member(callback_data.user_id)
refreshed = await refresh_channel(channel)
if refreshed.members:
await callback.message.edit_reply_markup(
reply_markup=members_kb(refreshed, callback_data.page)
)
else:
await callback.message.edit_text(
settings_text(refreshed), reply_markup=channel_settings_kb(refreshed)
)
await callback.answer()
@router.callback_query(
SettingsCallback.filter(F.action == SettingsAction.INVITE_MODE),
ChannelOwnerFilter(),
)
async def show_invite_modes(callback: CallbackQuery, channel: Channel):
if not isinstance(callback.message, Message):
return
await callback.message.edit_text(
f"🔗 Режим для нового участника <b>{channel.title}</b>:",
reply_markup=invite_mode_kb(channel.id),
)
await callback.answer()
@router.callback_query(
SettingsCallback.filter(F.action == SettingsAction.INVITE_CREATE),
ChannelOwnerFilter(),
)
async def create_invite(
callback: CallbackQuery, callback_data: SettingsCallback, channel: Channel, bot: Bot
):
if not isinstance(callback.message, Message):
return
if callback_data.mode == "approval":
mode = "approval"
elif callback_data.mode == "instant":
mode = "instant"
else:
await callback.answer("Неверный режим.", show_alert=True)
return
invite = Invite(code=uuid4().hex[:12], channel_id=channel.id, mode=mode)
await invite.insert()
link = await create_start_link(bot, f"inv_{invite.code}", encode=True)
mode_label = "сразу" if mode == "instant" else "после проверки"
await callback.message.edit_text(
f"🔗 Инвайт для <b>{channel.title}</b> (режим: {mode_label}):\n\n"
f"<code>{link}</code>",
reply_markup=channel_settings_kb(channel),
)
await callback.answer()

View File

@@ -0,0 +1,11 @@
from .approval import approval_kb
from .channel import channel_settings_kb, invite_mode_kb, members_kb
from .menu import menu_kb
__all__ = [
"approval_kb",
"channel_settings_kb",
"invite_mode_kb",
"members_kb",
"menu_kb",
]

View File

@@ -0,0 +1,29 @@
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from bot.factories import ApprovalAction, ApprovalCallback
def approval_kb(post_id: str, contributor_name: str) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=f"От: {contributor_name}", callback_data="noop"
)
],
[
InlineKeyboardButton(
text="✅ Принять",
callback_data=ApprovalCallback(
action=ApprovalAction.ACCEPT, post_id=post_id
).pack(),
),
InlineKeyboardButton(
text="❌ Отклонить",
callback_data=ApprovalCallback(
action=ApprovalAction.REJECT, post_id=post_id
).pack(),
),
],
]
)

View File

@@ -0,0 +1,146 @@
import math
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from bot.factories import MenuAction, MenuCallback, SettingsAction, SettingsCallback
from utils.db.models import Channel
PER_PAGE = 5
def channel_settings_kb(channel: Channel) -> InlineKeyboardMarkup:
member_count = len(channel.members)
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=f"👥 Участники ({member_count})",
callback_data=SettingsCallback(
action=SettingsAction.MEMBERS, channel_id=channel.id
).pack(),
)
],
[
InlineKeyboardButton(
text="🔗 Создать инвайт",
callback_data=SettingsCallback(
action=SettingsAction.INVITE_MODE, channel_id=channel.id
).pack(),
)
],
[
InlineKeyboardButton(
text="◀️ Назад",
callback_data=MenuCallback(action=MenuAction.PAGE).pack(),
)
],
]
)
def members_kb(channel: Channel, page: int = 0) -> InlineKeyboardMarkup:
total_pages = max(1, math.ceil(len(channel.members) / PER_PAGE))
page = min(page, total_pages - 1)
start = page * PER_PAGE
page_members = channel.members[start : start + PER_PAGE]
rows: list[list[InlineKeyboardButton]] = []
for m in page_members:
mode_label = "сразу" if m.mode == "instant" else "проверка"
rows.append(
[
InlineKeyboardButton(
text=f"{m.display_name}{mode_label}", callback_data="noop"
),
InlineKeyboardButton(
text="🔄",
callback_data=SettingsCallback(
action=SettingsAction.MODE_TOGGLE,
channel_id=channel.id,
user_id=m.user_id,
page=page,
).pack(),
),
InlineKeyboardButton(
text="",
callback_data=SettingsCallback(
action=SettingsAction.KICK,
channel_id=channel.id,
user_id=m.user_id,
page=page,
).pack(),
),
]
)
if total_pages > 1:
nav_row = []
if page > 0:
nav_row.append(
InlineKeyboardButton(
text="◀️",
callback_data=SettingsCallback(
action=SettingsAction.PAGE, channel_id=channel.id, page=page - 1
).pack(),
)
)
nav_row.append(
InlineKeyboardButton(text=f"{page + 1}/{total_pages}", callback_data="noop")
)
if page < total_pages - 1:
nav_row.append(
InlineKeyboardButton(
text="▶️",
callback_data=SettingsCallback(
action=SettingsAction.PAGE, channel_id=channel.id, page=page + 1
).pack(),
)
)
rows.append(nav_row)
rows.append(
[
InlineKeyboardButton(
text="◀️ Назад",
callback_data=SettingsCallback(
action=SettingsAction.MAIN, channel_id=channel.id
).pack(),
)
]
)
return InlineKeyboardMarkup(inline_keyboard=rows)
def invite_mode_kb(channel_id: int) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="📨 Сразу",
callback_data=SettingsCallback(
action=SettingsAction.INVITE_CREATE,
channel_id=channel_id,
mode="instant",
).pack(),
),
InlineKeyboardButton(
text="✅ После проверки",
callback_data=SettingsCallback(
action=SettingsAction.INVITE_CREATE,
channel_id=channel_id,
mode="approval",
).pack(),
),
],
[
InlineKeyboardButton(
text="◀️ Назад",
callback_data=SettingsCallback(
action=SettingsAction.MAIN, channel_id=channel_id
).pack(),
)
],
]
)

80
src/bot/keyboards/menu.py Normal file
View File

@@ -0,0 +1,80 @@
import math
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from bot.factories import MenuAction, MenuCallback
from utils.db.models import Channel
PER_PAGE = 5
def menu_kb(
channels: list[Channel], user_id: int, active_channel_id: int | None, page: int = 0
) -> InlineKeyboardMarkup:
total_pages = max(1, math.ceil(len(channels) / PER_PAGE))
page = min(page, total_pages - 1)
start = page * PER_PAGE
page_channels = channels[start : start + PER_PAGE]
rows: list[list[InlineKeyboardButton]] = []
for ch in page_channels:
is_active = ch.id == active_channel_id
label = f"{'' if is_active else ''}{ch.title}"
row = [
InlineKeyboardButton(
text=label,
callback_data=MenuCallback(
action=MenuAction.SELECT, channel_id=ch.id, page=page
).pack(),
)
]
if ch.owner_id == user_id:
row.append(
InlineKeyboardButton(
text="⚙️",
callback_data=MenuCallback(
action=MenuAction.SETTINGS, channel_id=ch.id, page=page
).pack(),
)
)
rows.append(row)
if total_pages > 1:
nav_row = []
if page > 0:
nav_row.append(
InlineKeyboardButton(
text="◀️",
callback_data=MenuCallback(
action=MenuAction.PAGE, page=page - 1
).pack(),
)
)
nav_row.append(
InlineKeyboardButton(text=f"{page + 1}/{total_pages}", callback_data="noop")
)
if page < total_pages - 1:
nav_row.append(
InlineKeyboardButton(
text="▶️",
callback_data=MenuCallback(
action=MenuAction.PAGE, page=page + 1
).pack(),
)
)
rows.append(nav_row)
rows.append(
[
InlineKeyboardButton(
text=" Добавить канал",
callback_data=MenuCallback(action=MenuAction.ADD).pack(),
)
]
)
return InlineKeyboardMarkup(inline_keyboard=rows)

4
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .env import env
from .logging import logger
__all__ = ["env", "logger"]

15
src/utils/db/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
from beanie import init_beanie
from pymongo import AsyncMongoClient
from utils.env import env
client = AsyncMongoClient(env.db.connection_url)
async def init_db():
from .models import Channel, Invite, PendingPost, User
await init_beanie(
database=client[env.db.db_name],
document_models=[User, Channel, Invite, PendingPost],
)

View File

@@ -0,0 +1,6 @@
from .channel import Channel, ChannelMember
from .invite import Invite
from .pending_post import PendingPost
from .user import User
__all__ = ["Channel", "ChannelMember", "Invite", "PendingPost", "User"]

View File

@@ -0,0 +1,58 @@
from typing import Literal
from beanie import Document
from pydantic import BaseModel
class ChannelMember(BaseModel):
user_id: int
display_name: str
mode: Literal["instant", "approval"] = "approval"
class Channel(Document):
id: int
owner_id: int
title: str
members: list[ChannelMember] = []
class Settings:
name = "channels"
@classmethod
async def get_by_id(cls, id_: int) -> "Channel | None":
return await cls.find_one(cls.id == id_)
@classmethod
async def get_user_channels(cls, user_id: int) -> list["Channel"]:
return await cls.find(
{"$or": [{"owner_id": user_id}, {"members.user_id": user_id}]}
).to_list()
def get_member(self, user_id: int) -> ChannelMember | None:
for m in self.members:
if m.user_id == user_id:
return m
return None
async def add_member(
self, user_id: int, display_name: str, mode: Literal["instant", "approval"]
) -> None:
if self.get_member(user_id) is not None:
return
self.members.append(
ChannelMember(user_id=user_id, display_name=display_name, mode=mode)
)
await self.save()
async def remove_member(self, user_id: int) -> None:
self.members = [m for m in self.members if m.user_id != user_id]
await self.save()
async def set_member_mode(
self, user_id: int, mode: Literal["instant", "approval"]
) -> None:
member = self.get_member(user_id)
if member is not None:
member.mode = mode
await self.save()

View File

@@ -0,0 +1,17 @@
from typing import Literal
from beanie import Document
class Invite(Document):
code: str
channel_id: int
mode: Literal["instant", "approval"]
used: bool = False
class Settings:
name = "invites"
@classmethod
async def get_by_code(cls, code: str) -> "Invite | None":
return await cls.find_one(cls.code == code, cls.used == False) # noqa: E712

View File

@@ -0,0 +1,15 @@
from typing import Literal
from beanie import Document
class PendingPost(Document):
source_chat_id: int
source_message_id: int
channel_id: int
contributor_id: int
is_forwarded: bool
status: Literal["pending", "accepted", "rejected"] = "pending"
class Settings:
name = "pending_posts"

View File

@@ -0,0 +1,21 @@
from beanie import Document
class User(Document):
id: int
active_channel_id: int | None = None
class Settings:
name = "users"
@classmethod
async def get_by_id(cls, id_: int) -> "User | None":
return await cls.find_one(cls.id == id_)
@classmethod
async def get_or_create(cls, id_: int) -> "User":
user = await cls.get_by_id(id_)
if user is None:
user = cls(id=id_)
await user.insert()
return user

39
src/utils/env.py Normal file
View File

@@ -0,0 +1,39 @@
from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class BotSettings(BaseSettings):
token: SecretStr
class DatabaseSettings(BaseSettings):
host: str = "mongodb"
port: int = 27017
user: str = "user"
password: str = "password"
db_name: str = "prod"
connection_params: str = "?authSource=admin"
@property
def connection_url(self) -> str:
return f"mongodb://{self.user}:{self.password}@{self.host}:{self.port}/{self.db_name}{self.connection_params}"
class LogSettings(BaseSettings):
level: str = "INFO"
level_external: str = "WARNING"
show_time: bool = False
console_width: int = 150
class Settings(BaseSettings):
bot: BotSettings
db: DatabaseSettings
log: LogSettings
model_config = SettingsConfigDict(
case_sensitive=False, env_file=".env", env_nested_delimiter="__", extra="ignore"
)
env = Settings() # ty:ignore[missing-argument]

36
src/utils/logging.py Normal file
View File

@@ -0,0 +1,36 @@
import logging
from rich.console import Console
from rich.logging import RichHandler
from rich.traceback import install
from .env import env
console = Console(width=env.log.console_width, color_system="auto", force_terminal=True)
def setup_logging() -> None:
from aiogram.dispatcher import router
logging.basicConfig(
level=env.log.level_external,
format="",
datefmt=None,
handlers=[
RichHandler(
console=console,
markup=True,
rich_tracebacks=True,
enable_link_path=False,
tracebacks_show_locals=True,
omit_repeated_times=False,
show_time=env.log.show_time,
tracebacks_suppress=[router],
)
],
)
install(console=console, show_locals=True)
logger = logging.getLogger("post-proposal-bot")
logger.setLevel(env.log.level)