[feat] add settings with some example schema, initialization and run functionality
This commit is contained in:
@@ -1,37 +1,36 @@
|
||||
import asyncio
|
||||
import contextlib
|
||||
|
||||
from utils.logging import setup_logging, logger
|
||||
|
||||
from dishka import make_async_container
|
||||
from dishka.integrations.aiogram import setup_dishka, AiogramProvider
|
||||
from beanie import init_beanie
|
||||
from dishka import make_async_container
|
||||
from dishka.integrations.aiogram import AiogramProvider, setup_dishka
|
||||
|
||||
from utils.logging import logger, setup_logging
|
||||
|
||||
setup_logging()
|
||||
|
||||
|
||||
async def setup_db():
|
||||
from utils.db import UserSettingsDocument
|
||||
|
||||
from .common import db
|
||||
|
||||
await init_beanie(
|
||||
database=db,
|
||||
document_models=[
|
||||
|
||||
]
|
||||
)
|
||||
|
||||
await init_beanie(database=db, document_models=[UserSettingsDocument])
|
||||
logger.info("Database connection established")
|
||||
|
||||
|
||||
async def runner():
|
||||
from . import callbacks, handlers
|
||||
from .common import bot, dp
|
||||
from .dependencies import SettingsProvider
|
||||
|
||||
await setup_db()
|
||||
|
||||
container = make_async_container(
|
||||
AiogramProvider(),
|
||||
SettingsProvider(),
|
||||
)
|
||||
|
||||
setup_dishka(
|
||||
container=container,
|
||||
router=dp,
|
||||
@@ -48,10 +47,7 @@ async def runner():
|
||||
|
||||
|
||||
def main():
|
||||
import asyncio
|
||||
|
||||
logger.info("Starting...")
|
||||
with contextlib.suppress(KeyboardInterrupt):
|
||||
asyncio.run(runner())
|
||||
|
||||
logger.info("[red]Stopped.[/]")
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
from . import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -1,3 +1,14 @@
|
||||
from aiogram import Router
|
||||
from aiogram import F, Router
|
||||
from aiogram.types import CallbackQuery
|
||||
|
||||
from . import settings
|
||||
|
||||
router = Router()
|
||||
router.include_routers(
|
||||
settings.router,
|
||||
)
|
||||
|
||||
|
||||
@router.callback_query(F.data == "_")
|
||||
async def on_empty(callback: CallbackQuery):
|
||||
await callback.answer()
|
||||
|
||||
11
src/bot/callbacks/settings/__init__.py
Normal file
11
src/bot/callbacks/settings/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from aiogram import Router
|
||||
|
||||
from . import input, main, section, selectors
|
||||
|
||||
router = Router()
|
||||
router.include_routers(
|
||||
main.router,
|
||||
section.router,
|
||||
selectors.router,
|
||||
input.router,
|
||||
)
|
||||
64
src/bot/callbacks/settings/input.py
Normal file
64
src/bot/callbacks/settings/input.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from aiogram import Bot, F, Router
|
||||
from aiogram.filters import StateFilter
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.fsm.state import State, StatesGroup
|
||||
from aiogram.types import CallbackQuery, Message
|
||||
from dishka import FromDishka
|
||||
|
||||
from bot.keyboards.settings import SettingsMenuGenerator
|
||||
from bot.modules.settings import UserSettings
|
||||
from utils.db import UserSettingsDocument
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
class SettingsStates(StatesGroup):
|
||||
waiting_for_input = State()
|
||||
|
||||
|
||||
@router.callback_query(F.data.startswith("settings:input:"))
|
||||
async def on_input_setting(
|
||||
callback: CallbackQuery,
|
||||
bot: Bot,
|
||||
state: FSMContext,
|
||||
user_settings: FromDishka[UserSettings],
|
||||
):
|
||||
_, _, section, field = callback.data.split(":")
|
||||
|
||||
await state.update_data(section=section, field=field)
|
||||
await state.set_state(SettingsStates.waiting_for_input)
|
||||
|
||||
await callback.answer()
|
||||
await bot.send_message(
|
||||
chat_id=callback.from_user.id,
|
||||
text=SettingsMenuGenerator.get_input_prompt(section, field, user_settings),
|
||||
parse_mode="HTML",
|
||||
)
|
||||
|
||||
|
||||
@router.message(StateFilter(SettingsStates.waiting_for_input))
|
||||
async def on_input_value(
|
||||
message: Message,
|
||||
state: FSMContext,
|
||||
user_settings: FromDishka[UserSettings],
|
||||
):
|
||||
data = await state.get_data()
|
||||
section = data["section"]
|
||||
field = data["field"]
|
||||
|
||||
await state.clear()
|
||||
|
||||
section_obj = getattr(user_settings, section)
|
||||
|
||||
setattr(section_obj, field, message.text)
|
||||
|
||||
await UserSettingsDocument.update_field(
|
||||
message.from_user.id, section, field, message.text
|
||||
)
|
||||
|
||||
await message.answer(
|
||||
"✅ Setting updated!",
|
||||
reply_markup=SettingsMenuGenerator.get_section_menu(
|
||||
section, user_settings, router
|
||||
),
|
||||
)
|
||||
25
src/bot/callbacks/settings/main.py
Normal file
25
src/bot/callbacks/settings/main.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from aiogram import Bot, F, Router, types
|
||||
from aiogram.filters import Command
|
||||
from aiogram.types import CallbackQuery
|
||||
|
||||
from bot.keyboards.settings import SettingsMenuGenerator
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
@router.callback_query(F.data == "settings:main")
|
||||
async def on_main_settings(callback: CallbackQuery, bot: Bot):
|
||||
await callback.answer()
|
||||
await bot.edit_message_text(
|
||||
"⚙️ Settings",
|
||||
chat_id=callback.from_user.id,
|
||||
message_id=callback.message.message_id,
|
||||
reply_markup=SettingsMenuGenerator.get_main_menu(router),
|
||||
)
|
||||
|
||||
|
||||
@router.message(Command("settings"))
|
||||
async def on_settings_command(message: types.Message):
|
||||
await message.answer(
|
||||
"⚙️ Settings", reply_markup=SettingsMenuGenerator.get_main_menu()
|
||||
)
|
||||
50
src/bot/callbacks/settings/section.py
Normal file
50
src/bot/callbacks/settings/section.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from aiogram import Bot, F, Router
|
||||
from aiogram.types import CallbackQuery
|
||||
from dishka import FromDishka
|
||||
|
||||
from bot.keyboards.settings import SettingsMenuGenerator
|
||||
from bot.modules.settings import UserSettings
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
@router.callback_query(F.data.startswith("settings:section:"))
|
||||
async def on_section_settings(
|
||||
callback: CallbackQuery, bot: Bot, user_settings: FromDishka[UserSettings]
|
||||
):
|
||||
section = callback.data.split(":")[-1]
|
||||
await callback.answer()
|
||||
await bot.edit_message_text(
|
||||
f"⚙️ {section.capitalize()} settings",
|
||||
chat_id=callback.from_user.id,
|
||||
message_id=callback.message.message_id,
|
||||
reply_markup=SettingsMenuGenerator.get_section_menu(
|
||||
section, user_settings, router
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@router.callback_query(
|
||||
F.data.startswith("settings:select:") | F.data.startswith("settings:list:")
|
||||
)
|
||||
async def on_select_setting(
|
||||
callback: CallbackQuery, bot: Bot, user_settings: FromDishka[UserSettings]
|
||||
):
|
||||
_, mode, section, field = callback.data.split(":")
|
||||
|
||||
section_obj = getattr(user_settings, section)
|
||||
|
||||
info = section_obj.get_info(field)
|
||||
if not info:
|
||||
await callback.answer("Setting info not found")
|
||||
return
|
||||
|
||||
await callback.answer()
|
||||
await bot.edit_message_text(
|
||||
f"⚙️ {info.title}\n{info.description}",
|
||||
chat_id=callback.from_user.id,
|
||||
message_id=callback.message.message_id,
|
||||
reply_markup=getattr(SettingsMenuGenerator, f"get_{mode}_menu")(
|
||||
section, field, user_settings, router
|
||||
),
|
||||
)
|
||||
94
src/bot/callbacks/settings/selectors.py
Normal file
94
src/bot/callbacks/settings/selectors.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from aiogram import Bot, F, Router
|
||||
from aiogram.types import CallbackQuery
|
||||
from dishka import FromDishka
|
||||
|
||||
from bot.keyboards.settings import SettingsMenuGenerator
|
||||
from bot.modules.settings import UserSettings
|
||||
from utils.db import UserSettingsDocument
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
@router.callback_query(F.data.startswith("settings:toggle:"))
|
||||
async def on_toggle_setting(
|
||||
callback: CallbackQuery, bot: Bot, user_settings: FromDishka[UserSettings]
|
||||
):
|
||||
_, _, section, field = callback.data.split(":")
|
||||
|
||||
section_obj = getattr(user_settings, section)
|
||||
|
||||
current_value = getattr(section_obj, field)
|
||||
setattr(section_obj, field, not current_value)
|
||||
|
||||
await UserSettingsDocument.update_field(
|
||||
callback.from_user.id, section, field, not current_value
|
||||
)
|
||||
|
||||
await callback.answer("Setting updated")
|
||||
await bot.edit_message_reply_markup(
|
||||
chat_id=callback.from_user.id,
|
||||
message_id=callback.message.message_id,
|
||||
reply_markup=SettingsMenuGenerator.get_section_menu(
|
||||
section, user_settings, router
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@router.callback_query(F.data.startswith("settings:set:"))
|
||||
async def on_set_value(
|
||||
callback: CallbackQuery, bot: Bot, user_settings: FromDishka[UserSettings]
|
||||
):
|
||||
_, _, section, field, value = callback.data.split(":")
|
||||
|
||||
section_obj = getattr(user_settings, section)
|
||||
|
||||
setattr(section_obj, field, value)
|
||||
await UserSettingsDocument.update_field(
|
||||
callback.from_user.id, section, field, value
|
||||
)
|
||||
|
||||
await callback.answer("Setting updated")
|
||||
await bot.edit_message_reply_markup(
|
||||
chat_id=callback.from_user.id,
|
||||
message_id=callback.message.message_id,
|
||||
reply_markup=SettingsMenuGenerator.get_select_menu(
|
||||
section, field, user_settings, router
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@router.callback_query(F.data.startswith("settings:move:"))
|
||||
async def on_move_item(
|
||||
callback: CallbackQuery, bot: Bot, user_settings: FromDishka[UserSettings]
|
||||
):
|
||||
_, _, section, field, index, direction = callback.data.split(":")
|
||||
index = int(index)
|
||||
|
||||
section_obj = getattr(user_settings, section)
|
||||
current_list = getattr(section_obj, field)
|
||||
|
||||
if direction == "up" and index > 0:
|
||||
current_list[index], current_list[index - 1] = (
|
||||
current_list[index - 1],
|
||||
current_list[index],
|
||||
)
|
||||
elif direction == "down" and index < len(current_list) - 1:
|
||||
current_list[index], current_list[index + 1] = (
|
||||
current_list[index + 1],
|
||||
current_list[index],
|
||||
)
|
||||
|
||||
setattr(section_obj, field, current_list)
|
||||
|
||||
await UserSettingsDocument.update_field(
|
||||
callback.from_user.id, section, field, current_list
|
||||
)
|
||||
|
||||
await callback.answer("Order updated")
|
||||
await bot.edit_message_reply_markup(
|
||||
chat_id=callback.from_user.id,
|
||||
message_id=callback.message.message_id,
|
||||
reply_markup=SettingsMenuGenerator.get_list_menu(
|
||||
section, field, user_settings, router
|
||||
),
|
||||
)
|
||||
@@ -1,11 +1,14 @@
|
||||
import motor.motor_asyncio
|
||||
from aiogram import Bot, Dispatcher
|
||||
from aiogram.client.default import DefaultBotProperties
|
||||
from aiogram.fsm.storage.mongo import MongoStorage
|
||||
import motor.motor_asyncio
|
||||
|
||||
from utils import env
|
||||
|
||||
bot = Bot(token=env.bot.token.get_secret_value(), default=DefaultBotProperties(parse_mode="HTML"))
|
||||
bot = Bot(
|
||||
token=env.bot.token.get_secret_value(),
|
||||
default=DefaultBotProperties(parse_mode="HTML"),
|
||||
)
|
||||
dp = Dispatcher(storage=MongoStorage.from_url(env.db.connection_url))
|
||||
|
||||
motor_client = motor.motor_asyncio.AsyncIOMotorClient(env.db.connection_url)
|
||||
|
||||
1
src/bot/dependencies/__init__.py
Normal file
1
src/bot/dependencies/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .settings import SettingsProvider
|
||||
1
src/bot/dependencies/settings/__init__.py
Normal file
1
src/bot/dependencies/settings/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .user_settings import SettingsProvider
|
||||
20
src/bot/dependencies/settings/user_settings.py
Normal file
20
src/bot/dependencies/settings/user_settings.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from aiogram.types import TelegramObject
|
||||
from dishka import Provider, Scope, provide
|
||||
|
||||
from bot.modules.settings import UserSettings
|
||||
from utils.db import UserSettingsDocument
|
||||
|
||||
|
||||
class SettingsProvider(Provider):
|
||||
@provide(scope=Scope.REQUEST)
|
||||
async def get_user_settings(self, event: TelegramObject) -> UserSettings:
|
||||
if not hasattr(event, "from_user") and (
|
||||
not hasattr(event, "inline_query") or event.inline_query is None
|
||||
):
|
||||
user_id = 0
|
||||
elif hasattr(event, "inline_query") and event.inline_query is not None:
|
||||
user_id = event.inline_query.from_user.id
|
||||
else:
|
||||
user_id = event.from_user.id
|
||||
|
||||
return await UserSettingsDocument.get_user_settings(user_id)
|
||||
@@ -1,4 +1,5 @@
|
||||
from aiogram import Bot, Router
|
||||
|
||||
from utils.logging import logger
|
||||
|
||||
router = Router()
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
from aiogram import Bot, Router, types
|
||||
from aiogram import Router, types
|
||||
from aiogram.filters import CommandStart
|
||||
from utils.logging import logger
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
@router.message(CommandStart)
|
||||
@router.message(CommandStart())
|
||||
async def on_start(message: types.Message):
|
||||
await message.reply("nya")
|
||||
|
||||
1
src/bot/keyboards/__init__.py
Normal file
1
src/bot/keyboards/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
pass
|
||||
1
src/bot/keyboards/settings/__init__.py
Normal file
1
src/bot/keyboards/settings/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .generator import SettingsMenuGenerator
|
||||
299
src/bot/keyboards/settings/generator.py
Normal file
299
src/bot/keyboards/settings/generator.py
Normal file
@@ -0,0 +1,299 @@
|
||||
from typing import List, Optional, get_type_hints
|
||||
|
||||
from aiogram import Router
|
||||
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
|
||||
|
||||
from bot.keyboards.utils import Paginator
|
||||
from bot.modules.settings import UserSettings
|
||||
from bot.modules.settings.model import BaseSettings, SettingInfo
|
||||
|
||||
|
||||
class SettingsMenuGenerator:
|
||||
BACK_BUTTON_TEXT = "🔙 Back"
|
||||
ITEMS_PER_PAGE = 5
|
||||
|
||||
@classmethod
|
||||
def get_main_menu(cls, router: Optional[Router] = None) -> InlineKeyboardMarkup:
|
||||
keyboard = []
|
||||
fields = get_type_hints(UserSettings)
|
||||
|
||||
for field_name, field_type in fields.items():
|
||||
if (
|
||||
not isinstance(field_type, type)
|
||||
or not issubclass(field_type, BaseSettings)
|
||||
or field_name.startswith("_")
|
||||
):
|
||||
continue
|
||||
|
||||
keyboard.append(
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=field_name.capitalize(),
|
||||
callback_data=f"settings:section:{field_name}",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
paginator = Paginator(
|
||||
data=InlineKeyboardMarkup(inline_keyboard=keyboard),
|
||||
size=cls.ITEMS_PER_PAGE,
|
||||
callback_startswith="settings_main_page_",
|
||||
dp=router,
|
||||
)
|
||||
|
||||
return paginator()
|
||||
|
||||
@classmethod
|
||||
def get_section_menu(
|
||||
cls,
|
||||
section_name: str,
|
||||
settings: UserSettings,
|
||||
router: Optional[Router] = None,
|
||||
page: int = 0,
|
||||
) -> InlineKeyboardMarkup:
|
||||
section = getattr(settings, section_name)
|
||||
if not section:
|
||||
return InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT, callback_data="settings:main"
|
||||
)
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
field_info = section.get_all_info()
|
||||
fields = get_type_hints(section.__class__)
|
||||
|
||||
field_names = [
|
||||
f
|
||||
for f in fields.keys()
|
||||
if not f.endswith("_info") and not f.startswith("_")
|
||||
]
|
||||
|
||||
field_names.sort(
|
||||
key=lambda f: field_info.get(f, SettingInfo("", "", order=999)).order
|
||||
)
|
||||
|
||||
keyboard = []
|
||||
for field_name in field_names:
|
||||
value = getattr(section, field_name)
|
||||
info = section.get_info(field_name)
|
||||
if info is None:
|
||||
continue
|
||||
|
||||
field_type = fields[field_name]
|
||||
if field_type == bool:
|
||||
value_text = "✅ Enabled" if value else "❌ Disabled"
|
||||
keyboard.append(
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=f"{info.title}: {value_text}",
|
||||
callback_data=f"settings:toggle:{section_name}:{field_name}",
|
||||
)
|
||||
]
|
||||
)
|
||||
elif field_type == str and info.options:
|
||||
value_text = info.options.get(value, value)
|
||||
keyboard.append(
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=f"{info.title}: {value_text}",
|
||||
callback_data=f"settings:select:{section_name}:{field_name}",
|
||||
)
|
||||
]
|
||||
)
|
||||
elif field_type == List[str]:
|
||||
keyboard.append(
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=f"{info.title}",
|
||||
callback_data=f"settings:list:{section_name}:{field_name}",
|
||||
)
|
||||
]
|
||||
)
|
||||
else:
|
||||
keyboard.append(
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=f"{info.title}: {value}",
|
||||
callback_data=f"settings:input:{section_name}:{field_name}",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
after_data = [
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT, callback_data="settings:main"
|
||||
)
|
||||
]
|
||||
]
|
||||
|
||||
paginator = Paginator(
|
||||
data=InlineKeyboardMarkup(inline_keyboard=keyboard),
|
||||
after_data=after_data,
|
||||
callback_startswith=f"settings_section_{section_name}_page_",
|
||||
size=cls.ITEMS_PER_PAGE,
|
||||
dp=router,
|
||||
)
|
||||
|
||||
return paginator(current_page=page)
|
||||
|
||||
@classmethod
|
||||
def get_select_menu(
|
||||
cls,
|
||||
section_name: str,
|
||||
field_name: str,
|
||||
settings: UserSettings,
|
||||
router: Optional[Router] = None,
|
||||
) -> InlineKeyboardMarkup:
|
||||
section = getattr(settings, section_name)
|
||||
|
||||
if not section:
|
||||
return InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT, callback_data="settings:main"
|
||||
)
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
info = section.get_info(field_name)
|
||||
if not info or not info.options:
|
||||
return InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT,
|
||||
callback_data=f"settings:section:{section_name}",
|
||||
)
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
keyboard = []
|
||||
for option_key, option_text in info.options.items():
|
||||
prefix = "✅ " if option_key == getattr(section, field_name) else ""
|
||||
keyboard.append(
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=f"{prefix}{option_text}",
|
||||
callback_data=f"settings:set:{section_name}:{field_name}:{option_key}",
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
paginator = Paginator(
|
||||
data=InlineKeyboardMarkup(inline_keyboard=keyboard),
|
||||
after_data=[
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT,
|
||||
callback_data=f"settings:section:{section_name}",
|
||||
)
|
||||
]
|
||||
],
|
||||
callback_startswith=f"settings_select_{section_name}_{field_name}_page_",
|
||||
size=cls.ITEMS_PER_PAGE,
|
||||
dp=router,
|
||||
)
|
||||
|
||||
return paginator()
|
||||
|
||||
@classmethod
|
||||
def get_list_menu(
|
||||
cls,
|
||||
section_name: str,
|
||||
field_name: str,
|
||||
settings: UserSettings,
|
||||
router: Optional[Router] = None,
|
||||
) -> InlineKeyboardMarkup:
|
||||
section = getattr(settings, section_name)
|
||||
if not section:
|
||||
return InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT, callback_data="settings:main"
|
||||
)
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
info = section.get_info(field_name)
|
||||
if not info:
|
||||
return InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT,
|
||||
callback_data=f"settings:section:{section_name}",
|
||||
)
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
current_list = getattr(section, field_name)
|
||||
|
||||
keyboard = []
|
||||
for i, item in enumerate(current_list):
|
||||
item_text = info.options.get(item, item) if info.options else item
|
||||
|
||||
row = [
|
||||
InlineKeyboardButton(text=f"{i + 1}. {item_text}", callback_data="_")
|
||||
]
|
||||
|
||||
move_buttons = []
|
||||
if i > 0:
|
||||
move_buttons.append(
|
||||
InlineKeyboardButton(
|
||||
text="⬆️",
|
||||
callback_data=f"settings:move:{section_name}:{field_name}:{i}:up",
|
||||
)
|
||||
)
|
||||
if i < len(current_list) - 1:
|
||||
move_buttons.append(
|
||||
InlineKeyboardButton(
|
||||
text="⬇️",
|
||||
callback_data=f"settings:move:{section_name}:{field_name}:{i}:down",
|
||||
)
|
||||
)
|
||||
|
||||
keyboard.append(row + move_buttons)
|
||||
|
||||
after_data = [
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=cls.BACK_BUTTON_TEXT,
|
||||
callback_data=f"settings:section:{section_name}",
|
||||
)
|
||||
]
|
||||
]
|
||||
|
||||
paginator = Paginator(
|
||||
data=InlineKeyboardMarkup(inline_keyboard=keyboard),
|
||||
after_data=after_data,
|
||||
callback_startswith=f"settings_list_{section_name}_{field_name}_page_",
|
||||
size=cls.ITEMS_PER_PAGE,
|
||||
dp=router,
|
||||
)
|
||||
|
||||
return paginator()
|
||||
|
||||
@classmethod
|
||||
def get_input_prompt(
|
||||
cls, section_name: str, field_name: str, settings: UserSettings
|
||||
) -> str:
|
||||
section = getattr(settings, section_name)
|
||||
if not section:
|
||||
return "Enter your value:"
|
||||
|
||||
info = section.get_info(field_name)
|
||||
if not info:
|
||||
return "Enter your value:"
|
||||
|
||||
return f"<b>{info.title}</b>\n{info.description}\n\nPlease enter your value:"
|
||||
1
src/bot/keyboards/utils/__init__.py
Normal file
1
src/bot/keyboards/utils/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .paginator import Paginator
|
||||
124
src/bot/keyboards/utils/paginator.py
Normal file
124
src/bot/keyboards/utils/paginator.py
Normal file
@@ -0,0 +1,124 @@
|
||||
from itertools import islice
|
||||
from typing import Any, Iterable, Iterator
|
||||
|
||||
from aiogram import Dispatcher, F, Router, types
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.utils.keyboard import InlineKeyboardBuilder
|
||||
|
||||
|
||||
class Paginator:
|
||||
def __init__(
|
||||
self,
|
||||
data: (
|
||||
types.InlineKeyboardMarkup
|
||||
| Iterable[types.InlineKeyboardButton]
|
||||
| Iterable[Iterable[types.InlineKeyboardButton]]
|
||||
| InlineKeyboardBuilder
|
||||
),
|
||||
before_data: list[list[types.InlineKeyboardButton]] = None,
|
||||
after_data: list[list[types.InlineKeyboardButton]] = None,
|
||||
state: FSMContext = None,
|
||||
callback_startswith: str = "page_",
|
||||
size: int = 8,
|
||||
page_separator: str = "/",
|
||||
dp: Dispatcher | Router | None = None,
|
||||
):
|
||||
self.dp = dp
|
||||
self.page_separator = page_separator
|
||||
self._state = state
|
||||
self._size = size
|
||||
self._startswith = callback_startswith
|
||||
self._before_data = before_data or []
|
||||
self._after_data = after_data or []
|
||||
if isinstance(data, types.InlineKeyboardMarkup):
|
||||
self._list_kb = list(self._chunk(it=data.inline_keyboard, size=self._size))
|
||||
elif isinstance(data, Iterable):
|
||||
self._list_kb = list(self._chunk(it=data, size=self._size))
|
||||
elif isinstance(data, InlineKeyboardBuilder):
|
||||
self._list_kb = list(self._chunk(it=data.export(), size=self._size))
|
||||
else:
|
||||
raise ValueError(f"{data} is not valid data")
|
||||
|
||||
def __call__(self, current_page=0, *args, **kwargs) -> types.InlineKeyboardMarkup:
|
||||
_list_current_page = self._list_kb[current_page]
|
||||
|
||||
paginations = self._get_paginator(
|
||||
counts=len(self._list_kb),
|
||||
page=current_page,
|
||||
page_separator=self.page_separator,
|
||||
startswith=self._startswith,
|
||||
)
|
||||
keyboard = types.InlineKeyboardMarkup(
|
||||
inline_keyboard=self._before_data
|
||||
+ [*_list_current_page, paginations]
|
||||
+ self._after_data
|
||||
)
|
||||
|
||||
if self.dp:
|
||||
self.paginator_handler()
|
||||
|
||||
return keyboard
|
||||
|
||||
@staticmethod
|
||||
def _get_page(call: types.CallbackQuery) -> int:
|
||||
return int(call.data[-1])
|
||||
|
||||
@staticmethod
|
||||
def _chunk(it, size) -> Iterator[tuple[Any, ...]]:
|
||||
it = iter(it)
|
||||
return iter(lambda: tuple(islice(it, size)), ())
|
||||
|
||||
@staticmethod
|
||||
def _get_paginator(
|
||||
counts: int, page: int, page_separator: str = "/", startswith: str = "page_"
|
||||
) -> list[types.InlineKeyboardButton]:
|
||||
if counts < 2:
|
||||
return []
|
||||
|
||||
counts -= 1
|
||||
|
||||
paginations = []
|
||||
|
||||
if page > 0:
|
||||
paginations.append(
|
||||
types.InlineKeyboardButton(text="⏮️️", callback_data=f"{startswith}0")
|
||||
)
|
||||
paginations.append(
|
||||
types.InlineKeyboardButton(
|
||||
text="⬅️", callback_data=f"{startswith}{page - 1}"
|
||||
),
|
||||
)
|
||||
paginations.append(
|
||||
types.InlineKeyboardButton(
|
||||
text=f"{page + 1}{page_separator}{counts + 1}", callback_data="pass"
|
||||
),
|
||||
)
|
||||
if counts > page:
|
||||
paginations.append(
|
||||
types.InlineKeyboardButton(
|
||||
text="➡️", callback_data=f"{startswith}{page + 1}"
|
||||
)
|
||||
)
|
||||
paginations.append(
|
||||
types.InlineKeyboardButton(
|
||||
text="⏭️", callback_data=f"{startswith}{counts}"
|
||||
)
|
||||
)
|
||||
return paginations
|
||||
|
||||
def paginator_handler(self):
|
||||
async def _page(call: types.CallbackQuery, state: FSMContext):
|
||||
page = self._get_page(call)
|
||||
|
||||
await call.message.edit_reply_markup(
|
||||
reply_markup=self.__call__(current_page=page)
|
||||
)
|
||||
await state.update_data({f"last_page_{self._startswith}": page})
|
||||
|
||||
if not self.dp:
|
||||
return _page, F.data.startswith(self._startswith)
|
||||
else:
|
||||
self.dp.callback_query.register(
|
||||
_page,
|
||||
F.data.startswith(self._startswith),
|
||||
)
|
||||
@@ -0,0 +1 @@
|
||||
pass
|
||||
|
||||
1
src/bot/modules/settings/__init__.py
Normal file
1
src/bot/modules/settings/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .settings import UserSettings
|
||||
40
src/bot/modules/settings/model.py
Normal file
40
src/bot/modules/settings/model.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from typing import ClassVar, Dict, Optional, Unpack
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
class SettingInfo:
|
||||
def __init__(
|
||||
self,
|
||||
title: str,
|
||||
description: str,
|
||||
options: Optional[Dict[str, str]] = None,
|
||||
order: int = 100,
|
||||
):
|
||||
self.title = title
|
||||
self.description = description
|
||||
self.options = options
|
||||
self.order = order
|
||||
|
||||
|
||||
class BaseSettings(BaseModel):
|
||||
_settings_info: ClassVar[Dict[str, SettingInfo]] = {}
|
||||
|
||||
def __init_subclass__(cls, **kwargs: Unpack[ConfigDict]):
|
||||
super().__init_subclass__(**kwargs)
|
||||
cls._settings_info = {}
|
||||
|
||||
for key, value in cls.__dict__.items():
|
||||
if isinstance(value, SettingInfo):
|
||||
field_name = key.replace("_info", "")
|
||||
cls._settings_info[field_name] = value
|
||||
|
||||
def get_info(self, field_name: str) -> Optional[SettingInfo]:
|
||||
return self.__class__._settings_info.get(field_name)
|
||||
|
||||
def get_all_info(self) -> Dict[str, SettingInfo]:
|
||||
return self.__class__._settings_info
|
||||
|
||||
class Config:
|
||||
populate_by_name = True
|
||||
arbitrary_types_allowed = True
|
||||
70
src/bot/modules/settings/settings.py
Normal file
70
src/bot/modules/settings/settings.py
Normal file
@@ -0,0 +1,70 @@
|
||||
from typing import ClassVar, List
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from bot.modules.settings.model import BaseSettings, SettingInfo
|
||||
|
||||
|
||||
class SearchSettings(BaseSettings):
|
||||
default_provider: str = Field(default="y")
|
||||
default_provider_info: ClassVar[SettingInfo] = SettingInfo(
|
||||
title="Default search provider",
|
||||
description="Which service to use when searching without a service filter",
|
||||
options={"y": "YouTube", "d": "Deezer", "c": "SoundCloud", "s": "Spotify"},
|
||||
order=10,
|
||||
)
|
||||
|
||||
show_preview: bool = Field(default=True)
|
||||
show_preview_info: ClassVar[SettingInfo] = SettingInfo(
|
||||
title="Search preview",
|
||||
description="Show audio preview in search results when available",
|
||||
order=20,
|
||||
)
|
||||
|
||||
|
||||
class DownloadSettings(BaseSettings):
|
||||
recode_youtube: bool = Field(default=True)
|
||||
recode_youtube_info: ClassVar[SettingInfo] = SettingInfo(
|
||||
title="Recode YouTube",
|
||||
description="Recode when downloading from YouTube to more compatible format (may take some time)",
|
||||
order=10,
|
||||
)
|
||||
|
||||
exact_spotify_match: bool = Field(default=True)
|
||||
exact_spotify_match_info: ClassVar[SettingInfo] = SettingInfo(
|
||||
title="Exact Spotify matches",
|
||||
description="When searching on YouTube from Spotify, show only exact matches",
|
||||
order=20,
|
||||
)
|
||||
|
||||
|
||||
class ProviderSettings(BaseSettings):
|
||||
order: List[str] = Field(default=["youtube", "deezer", "soundcloud", "spotify"])
|
||||
order_info: ClassVar[SettingInfo] = SettingInfo(
|
||||
title="Provider order",
|
||||
description="Order of providers to try when downloading music",
|
||||
options={
|
||||
"youtube": "YouTube",
|
||||
"deezer": "Deezer",
|
||||
"soundcloud": "SoundCloud",
|
||||
"spotify": "Spotify",
|
||||
},
|
||||
order=10,
|
||||
)
|
||||
|
||||
|
||||
class AppearanceSettings(BaseSettings):
|
||||
theme: str = Field(default="default")
|
||||
theme_info: ClassVar[SettingInfo] = SettingInfo(
|
||||
title="Theme",
|
||||
description="Visual appearance of the bot",
|
||||
options={"default": "Default", "compact": "Compact", "emoji": "Emoji Rich"},
|
||||
order=10,
|
||||
)
|
||||
|
||||
|
||||
class UserSettings(BaseSettings):
|
||||
search: SearchSettings = Field(default_factory=SearchSettings)
|
||||
download: DownloadSettings = Field(default_factory=DownloadSettings)
|
||||
providers: ProviderSettings = Field(default_factory=ProviderSettings)
|
||||
appearance: AppearanceSettings = Field(default_factory=AppearanceSettings)
|
||||
@@ -0,0 +1 @@
|
||||
from .models import UserSettingsDocument
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
|
||||
from utils import env
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from .settings import UserSettingsDocument
|
||||
|
||||
53
src/utils/db/models/settings.py
Normal file
53
src/utils/db/models/settings.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from typing import Any
|
||||
|
||||
from beanie import Document
|
||||
from pydantic import Field
|
||||
|
||||
from bot.modules.settings import UserSettings
|
||||
|
||||
|
||||
class UserSettingsDocument(Document):
|
||||
user_id: int
|
||||
settings: UserSettings = Field(default_factory=UserSettings)
|
||||
|
||||
class Settings:
|
||||
name = "user_settings"
|
||||
indexes = ["user_id"]
|
||||
|
||||
@classmethod
|
||||
async def get_user_settings(cls, user_id: int) -> UserSettings:
|
||||
doc = await cls.find_one(cls.user_id == user_id)
|
||||
if not doc:
|
||||
doc = cls(user_id=user_id)
|
||||
await doc.insert()
|
||||
return doc.settings
|
||||
|
||||
async def update_settings(self, settings: UserSettings) -> None:
|
||||
self.settings = settings
|
||||
await self.save()
|
||||
|
||||
@classmethod
|
||||
async def update_section(cls, user_id: int, section: str, value: Any) -> None:
|
||||
doc = await cls.find_one(cls.user_id == user_id)
|
||||
if not doc:
|
||||
doc = cls(user_id=user_id)
|
||||
setattr(doc.settings, section, value)
|
||||
await doc.insert()
|
||||
else:
|
||||
setattr(doc.settings, section, value)
|
||||
await doc.save()
|
||||
|
||||
@classmethod
|
||||
async def update_field(
|
||||
cls, user_id: int, section: str, field: str, value: Any
|
||||
) -> None:
|
||||
doc = await cls.find_one(cls.user_id == user_id)
|
||||
if not doc:
|
||||
doc = cls(user_id=user_id)
|
||||
section_obj = getattr(doc.settings, section)
|
||||
setattr(section_obj, field, value)
|
||||
await doc.insert()
|
||||
else:
|
||||
section_obj = getattr(doc.settings, section)
|
||||
setattr(section_obj, field, value)
|
||||
await doc.save()
|
||||
@@ -1,5 +1,5 @@
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
from pydantic import SecretStr
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class BotSettings(BaseSettings):
|
||||
|
||||
@@ -5,7 +5,6 @@ from rich.logging import RichHandler
|
||||
|
||||
from . import env
|
||||
|
||||
|
||||
console = Console(
|
||||
width=env.log.console_width,
|
||||
color_system="auto",
|
||||
|
||||
Reference in New Issue
Block a user