feat(global): init structure

This commit is contained in:
h
2025-07-01 12:55:01 +03:00
commit 1b21f23294
37 changed files with 1208 additions and 0 deletions

43
src/bot/__init__.py Normal file
View File

@@ -0,0 +1,43 @@
import contextlib
from rich import print
async def runner():
from dishka.integrations.aiogram import setup_dishka
from dependencies import container
from utils.db import init_db
from . import handlers
from .common import bot, dp
from .modules.error import on_error
await init_db()
dp.error.register(on_error)
dp.include_routers(handlers.router)
setup_dishka(container, dp, auto_inject=True)
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot)
def plugins():
from rich import traceback
traceback.install(show_locals=True)
def main():
import asyncio
plugins()
print("Starting...")
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(runner())
print("[red]Stopped.[/]")

4
src/bot/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from . import main
if __name__ == "__main__":
main()

16
src/bot/common.py Normal file
View File

@@ -0,0 +1,16 @@
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.fsm.storage.memory import MemoryStorage
from utils import env
from utils.config import config, dconfig
bot = Bot(
token=env.bot.token.get_secret_value(),
default=DefaultBotProperties(parse_mode="HTML"),
)
storage = MemoryStorage()
dp = Dispatcher(storage=storage)
__all__ = ["bot", "dp", "config", "dconfig"]

View File

@@ -0,0 +1 @@
from .admin import Admin

17
src/bot/filters/admin.py Normal file
View File

@@ -0,0 +1,17 @@
from aiogram.filters import BaseFilter
from aiogram.types import Message
from bot.common import config
class Admin(BaseFilter):
def __init__(self, notify: bool = True):
self.notify = notify
async def __call__(self, message: Message):
try:
return message.from_user.id in config.bot.admins
except TypeError:
if self.notify:
await message.answer("Вы не админ!")
return False

View File

@@ -0,0 +1,14 @@
from aiogram import Router
from . import (
initialize,
start,
)
router = Router()
router.include_routers(
start.router,
initialize.router,
)

View File

@@ -0,0 +1,3 @@
from .initializer import router
__all__ = ["router"]

View File

@@ -0,0 +1,17 @@
from aiogram import Bot, Router, types
from rich import print
router = Router()
@router.startup()
async def startup(bot: Bot):
await bot.set_my_commands(
[types.BotCommand(command="/start", description="Запустить бота")]
)
print(f"[green]Started as[/] @{(await bot.me()).username}")
@router.shutdown()
async def shutdown():
print("Shutting down bot...")

View File

@@ -0,0 +1,3 @@
from .start import router
__all__ = ["router"]

View File

@@ -0,0 +1,9 @@
from aiogram import Router, types
from aiogram.filters import CommandStart
router = Router()
@router.message(CommandStart())
async def on_start(message: types.Message):
await message.reply("hewo everynyan")

View File

View File

@@ -0,0 +1,3 @@
from .handler import on_error
__all__ = ["on_error"]

View File

@@ -0,0 +1,41 @@
from aiogram import Bot
from aiogram.dispatcher import router as s_router
from aiogram.types.error_event import ErrorEvent
from rich.traceback import Traceback
from utils.logging import console
async def on_error(event: ErrorEvent, bot: Bot):
import base64
import os
error_id = base64.urlsafe_b64encode(os.urandom(6)).decode()
traceback = Traceback.from_exception(
type(event.exception),
event.exception,
event.exception.__traceback__,
show_locals=True,
suppress=[s_router],
)
if event.update.chosen_inline_result:
await bot.edit_message_caption(
inline_message_id=event.update.chosen_inline_result.inline_message_id,
caption=f"💔 <b>ERROR</b> occurred. Use this code to search in logs: "
f"<code>{error_id}</code>",
parse_mode="HTML",
)
if event.update.message:
await event.update.message.answer(
text=f"💔 <b>ERROR</b> occurred. Use this code to search in logs: "
f"<code>{error_id}</code>",
parse_mode="HTML",
)
console.print(f"[red]{error_id} occurred[/]")
console.print(event)
console.print(traceback)
console.print(f"-{error_id} occurred-")

View File

@@ -0,0 +1 @@
from .paginator import Paginator

View File

@@ -0,0 +1,178 @@
from itertools import islice
from typing import Any, Callable, Coroutine, Iterable, Iterator
from aiogram import Dispatcher, F, Router, types
from aiogram.fsm.context import FSMContext
from aiogram.types import CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
class Paginator:
def __init__(
self,
data: (
types.InlineKeyboardMarkup
| Iterable[types.InlineKeyboardButton]
| Iterable[Iterable[types.InlineKeyboardButton]]
| InlineKeyboardBuilder
),
before_data: list[list[types.InlineKeyboardButton]] = None,
after_data: list[list[types.InlineKeyboardButton]] = None,
state: FSMContext = None,
callback_startswith: str = "page_",
size: int = 8,
page_separator: str = "/",
dp: Dispatcher | Router | None = None,
):
"""
Example: paginator = Paginator(data=kb, size=5)
:param data: An iterable object that stores an InlineKeyboardButton.
:param callback_startswith: What should callback_data begin with in handler pagination. Default = 'page_'.
:param size: Number of lines per page. Default = 8.
:param state: Current state.
:param page_separator: Separator for page numbers. Default = '/'.
"""
self.dp = dp
self.page_separator = page_separator
self._state = state
self._size = size
self._startswith = callback_startswith
self._before_data = before_data or []
self._after_data = after_data or []
if isinstance(data, types.InlineKeyboardMarkup):
self._list_kb = list(self._chunk(it=data.inline_keyboard, size=self._size))
elif isinstance(data, Iterable):
self._list_kb = list(self._chunk(it=list(data), size=self._size))
elif isinstance(data, InlineKeyboardBuilder):
self._list_kb = list(self._chunk(it=data.export(), size=self._size))
else:
raise ValueError(f"{data} is not valid data")
"""
Class for pagination's in aiogram inline keyboards
"""
def __call__(self, current_page=0, *args, **kwargs) -> types.InlineKeyboardMarkup:
"""
Example:
await message.answer(
text='Some menu',
reply_markup=paginator()
)
:return: InlineKeyboardMarkup
"""
if current_page >= len(self._list_kb):
current_page = 0
_list_current_page = self._list_kb[current_page]
paginations = self._get_paginator(
counts=len(self._list_kb),
page=current_page,
page_separator=self.page_separator,
startswith=self._startswith,
)
keyboard = types.InlineKeyboardMarkup(
inline_keyboard=self._before_data
+ [
*_list_current_page,
paginations,
]
+ self._after_data
)
if self.dp:
self.paginator_handler()
return keyboard
@staticmethod
def _get_page(call: types.CallbackQuery) -> int:
"""
:param call: CallbackQuery in paginator handler.
:return: Current page.
"""
return int(call.data[-1])
@staticmethod
def _chunk(it, size) -> Iterator[tuple[Any, ...]]:
"""
:param it: Source iterable object.
:param size: Chunk size.
:return: Iterator chunks pages.
"""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
@staticmethod
def _get_paginator(
counts: int, page: int, page_separator: str = "/", startswith: str = "page_"
) -> list[types.InlineKeyboardButton]:
"""
:param counts: Counts total buttons.
:param page: Current page.
:param page_separator: Separator for page numbers. Default = '/'.
:return: Page control line buttons.
"""
counts -= 1
paginations = []
if page > 0:
paginations.append(
types.InlineKeyboardButton(text="⏮️️", callback_data=f"{startswith}0")
)
paginations.append(
types.InlineKeyboardButton(
text="⬅️", callback_data=f"{startswith}{page - 1}"
),
)
paginations.append(
types.InlineKeyboardButton(
text=f"{page + 1}{page_separator}{counts + 1}", callback_data="pass"
),
)
if counts > page:
paginations.append(
types.InlineKeyboardButton(
text="➡️", callback_data=f"{startswith}{page + 1}"
)
)
paginations.append(
types.InlineKeyboardButton(
text="⏭️", callback_data=f"{startswith}{counts}"
)
)
return paginations
def paginator_handler(
self,
) -> tuple[Callable[[CallbackQuery, FSMContext], Coroutine[Any, Any, None]], Any]:
"""
Example:
args, kwargs = paginator.paginator_handler()
dp.register_callback_query_handler(*args, **kwargs)
:return: Data for register handler pagination.
"""
async def _page(call: types.CallbackQuery, state: FSMContext):
page = self._get_page(call)
await call.message.edit_reply_markup(
reply_markup=self.__call__(current_page=page)
)
await state.update_data({f"last_page_{self._startswith}": page})
if not self.dp:
return _page, F.data.startswith(self._startswith)
else:
self.dp.callback_query.register(
_page,
F.data.startswith(self._startswith),
)

View File

View File

@@ -0,0 +1,3 @@
from dishka import make_async_container
container = make_async_container()

View File

View File

@@ -0,0 +1 @@
from . import asyncio

View File

@@ -0,0 +1,16 @@
from typing import Callable, ParamSpec, TypeVar
from dishka.integrations.base import wrap_injection
from dependencies import container
T = TypeVar("T")
P = ParamSpec("P")
def inject(func: Callable[P, T]) -> Callable[P, T]:
return wrap_injection(
func=func,
is_async=True,
container_getter=lambda args, kwargs: container,
)

View File

View File

2
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from .env import env
from .logging import logger

23
src/utils/config.py Normal file
View File

@@ -0,0 +1,23 @@
from pymongo import MongoClient
from utils.db.models.config import DynamicConfig, DynamicConfigBase
from . import env
def load_config() -> DynamicConfigBase:
client = MongoClient(env.db.connection_url)
db = client[env.db.db_name]
config_dict = db.config.find_one()
if config_dict is None:
return DynamicConfigBase()
config_dict.pop("_id", None)
return DynamicConfigBase.model_validate(config_dict)
dconfig = DynamicConfig.get_or_create
config: DynamicConfigBase = load_config()

19
src/utils/db/__init__.py Normal file
View File

@@ -0,0 +1,19 @@
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from utils.env import env
client = AsyncIOMotorClient(env.db.connection_url)
async def init_db():
from .models import (
DynamicConfig,
)
await init_beanie(
database=client[env.db.db_name],
document_models=[
DynamicConfig,
],
)

View File

@@ -0,0 +1 @@
from .config import DynamicConfig

View File

@@ -0,0 +1,27 @@
from beanie import Document
from pydantic import BaseModel, Field
class BotConfig(BaseModel):
admins: list[int] = []
class DynamicConfigBase(BaseModel):
bot: BotConfig = Field(default_factory=BotConfig)
class DynamicConfig(DynamicConfigBase, Document):
class Settings:
name = "config"
async def save(self): # noqa
await super().save() # noqa
@classmethod
async def get_or_create(cls):
config = await cls.find_one()
if not config:
config = cls()
await config.save()
return config

42
src/utils/env.py Normal file
View File

@@ -0,0 +1,42 @@
from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class BotSettings(BaseSettings):
token: SecretStr
class DatabaseSettings(BaseSettings):
host: str = "mongodb"
port: int = 27017
user: SecretStr = "user"
password: SecretStr = "password"
db_name: str = "prod"
connection_params: str = "?authSource=admin"
scripts_connection_url: str = "mongodb://user:password@localhost:27017/"
@property
def connection_url(self) -> str:
return f"mongodb://{self.user.get_secret_value()}:{self.password.get_secret_value()}@{self.host}:{self.port}/{self.db_name}{self.connection_params}"
class LogSettings(BaseSettings):
level: str = "INFO"
show_time: bool = False
console_width: int = 150
class Settings(BaseSettings):
bot: BotSettings
db: DatabaseSettings
log: LogSettings
model_config = SettingsConfigDict(
case_sensitive=False,
env_file=".env",
env_nested_delimiter="__",
extra="ignore",
)
env = Settings() # noqa

37
src/utils/logging.py Normal file
View File

@@ -0,0 +1,37 @@
import logging
from rich.console import Console
from rich.logging import RichHandler
from . import env
console = Console(
width=env.log.console_width,
color_system="auto",
force_terminal=True,
)
def setup_logging():
from aiogram.dispatcher import router
from dishka.integrations import aiogram
logging.basicConfig(
level=env.log.level,
format=None, # noqa
datefmt=None,
handlers=[
RichHandler(
console=console,
markup=True,
rich_tracebacks=True,
tracebacks_show_locals=True,
omit_repeated_times=False,
show_time=env.log.show_time,
tracebacks_suppress=[router, aiogram],
)
],
)
logger = logging.getLogger(__name__)