This commit is contained in:
hhh
2025-01-02 21:48:12 +02:00
129 changed files with 852 additions and 6982 deletions

View File

@@ -1,4 +1,4 @@
from bot import main
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@@ -1,11 +1,11 @@
from rich import print
import contextlib
from rich import print
async def runner():
from .common import dp, bot
from . import handlers, callbacks
from . import callbacks, handlers
from .common import bot, dp
from .modules.error import on_error
dp.error.register(on_error)
@@ -20,8 +20,8 @@ async def runner():
def plugins():
import nest_asyncio
from rich import traceback
from icecream import ic
from rich import traceback
nest_asyncio.apply()
traceback.install()
@@ -33,8 +33,8 @@ def main():
plugins()
print('Starting...')
print("Starting...")
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(runner())
print('[red]Stopped.[/]')
print("[red]Stopped.[/]")

View File

@@ -1,4 +1,4 @@
from . import main
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@@ -1,11 +1,8 @@
from aiogram import Router
from . import (
full_menu,
on_home,
settings,
)
from bot.middlewares import PrivateButtonMiddleware, SettingsInjectorMiddleware
from . import full_menu, on_home, settings
router = Router()

View File

@@ -1,19 +1,16 @@
from aiogram import Router, F, Bot
from aiogram.types import (
CallbackQuery
)
from aiogram import Bot, F, Router
from aiogram.types import CallbackQuery
from bot.factories.full_menu import FullMenuCallback
from bot.keyboards.inline.settings import get_settings_kb
router = Router()
@router.callback_query(FullMenuCallback.filter(F.action == 'settings'))
@router.callback_query(FullMenuCallback.filter(F.action == "settings"))
async def on_settings(callback_query: CallbackQuery, bot: Bot):
await bot.edit_message_text(
inline_message_id=callback_query.inline_message_id,
text='⚙️ Settings',
reply_markup=get_settings_kb()
text="⚙️ Settings",
reply_markup=get_settings_kb(),
)

View File

@@ -1,19 +1,16 @@
from aiogram import Router, F, Bot
from aiogram.types import (
CallbackQuery
)
from aiogram import Bot, F, Router
from aiogram.types import CallbackQuery
from bot.factories.full_menu import FullMenuCallback
from bot.keyboards.inline.full_menu import get_full_menu_kb
router = Router()
@router.callback_query(FullMenuCallback.filter(F.action == 'home'))
@router.callback_query(FullMenuCallback.filter(F.action == "home"))
async def on_home(callback_query: CallbackQuery, bot: Bot):
await bot.edit_message_text(
inline_message_id=callback_query.inline_message_id,
text='⚙️ Menu',
reply_markup=get_full_menu_kb()
text="⚙️ Menu",
reply_markup=get_full_menu_kb(),
)

View File

@@ -1,38 +1,30 @@
from aiogram import Router, Bot
from aiogram.types import (
CallbackQuery
)
from aiogram import Bot, Router
from aiogram.exceptions import TelegramBadRequest
from aiogram.types import CallbackQuery
from bot.factories.open_setting import OpenSettingCallback, SettingChoiceCallback
from bot.keyboards.inline.setting import get_setting_kb
from bot.modules.settings import settings_strings, UserSettings
from bot.modules.settings import UserSettings, settings_strings
router = Router()
@router.callback_query(OpenSettingCallback.filter())
async def on_settings(
callback_query: CallbackQuery,
callback_data: OpenSettingCallback,
bot: Bot
callback_query: CallbackQuery, callback_data: OpenSettingCallback, bot: Bot
):
await bot.edit_message_text(
inline_message_id=callback_query.inline_message_id,
text=settings_strings[callback_data.s_id].description,
reply_markup=get_setting_kb(
callback_data.s_id,
str(callback_query.from_user.id)
)
callback_data.s_id, str(callback_query.from_user.id)
),
)
@router.callback_query(SettingChoiceCallback.filter())
async def on_change_setting(
callback_query: CallbackQuery,
callback_data: SettingChoiceCallback,
bot: Bot
callback_query: CallbackQuery, callback_data: SettingChoiceCallback, bot: Bot
):
UserSettings(callback_query.from_user.id)[callback_data.s_id] = callback_data.choice
try:
@@ -40,9 +32,8 @@ async def on_change_setting(
inline_message_id=callback_query.inline_message_id,
text=settings_strings[callback_data.s_id].description,
reply_markup=get_setting_kb(
callback_data.s_id,
str(callback_query.from_user.id)
)
callback_data.s_id, str(callback_query.from_user.id)
),
)
except TelegramBadRequest:
pass

View File

@@ -1,6 +1,8 @@
from aiogram import Bot, Dispatcher
from bot.modules.fsm import InDbStorage
from rich.console import Console
from bot.modules.fsm import InDbStorage
from .utils.config import config
bot = Bot(token=config.telegram.bot_token)
@@ -8,4 +10,4 @@ dp = Dispatcher(storage=InDbStorage())
console = Console()
__all__ = ['bot', 'dp', 'config', 'console']
__all__ = ["bot", "dp", "config", "console"]

View File

@@ -1,6 +1,7 @@
from typing import Literal
from aiogram.filters.callback_data import CallbackData
class FullMenuCallback(CallbackData, prefix='full_menu'):
action: Literal['home', 'settings']
class FullMenuCallback(CallbackData, prefix="full_menu"):
action: Literal["home", "settings"]

View File

@@ -1,10 +1,10 @@
from aiogram.filters.callback_data import CallbackData
class OpenSettingCallback(CallbackData, prefix='setting'):
class OpenSettingCallback(CallbackData, prefix="setting"):
s_id: str
class SettingChoiceCallback(CallbackData, prefix='s_choice'):
class SettingChoiceCallback(CallbackData, prefix="s_choice"):
s_id: str
choice: str

View File

@@ -4,22 +4,21 @@ from aiogram.types import InlineQuery
class ServiceSearchFilter(BaseFilter):
def __init__(self, service_letter: str):
self.service_letter = f'{service_letter}:'
self.service_letter = f"{service_letter}:"
async def __call__(self, inline_query: InlineQuery):
return (
inline_query.query.startswith(self.service_letter) and
inline_query.query != self.service_letter
inline_query.query.startswith(self.service_letter)
and inline_query.query != self.service_letter
)
class ServiceSearchMultiletterFilter(BaseFilter):
def __init__(self, service_lettes: list[str]):
self.service_letter = [f'{letter}:' for letter in service_lettes]
self.service_letter = [f"{letter}:" for letter in service_lettes]
async def __call__(self, inline_query: InlineQuery):
return (
any(inline_query.query.startswith(letter) for letter in
self.service_letter) and
inline_query.query not in self.service_letter
any(inline_query.query.startswith(letter) for letter in self.service_letter)
and inline_query.query not in self.service_letter
)

View File

@@ -1,32 +1,29 @@
from urllib.parse import urlparse
from aiogram.filters import BaseFilter
from aiogram.types import InlineQuery
from urllib.parse import urlparse
class MusicUrlFilter(BaseFilter):
def __init__(self):
pass
async def __call__(self, inline_query: InlineQuery):
if not inline_query.query.strip().startswith('http'):
if not inline_query.query.strip().startswith("http"):
return False
url = urlparse(inline_query.query)
return (
url.scheme in ['http', 'https'] and
any(
map(
url.netloc.endswith,
[
'youtube.com',
'youtu.be',
'open.spotify.com',
'spotify.link',
'deezer.page.link',
'deezer.com',
'soundcloud.com'
]
)
)
return url.scheme in ["http", "https"] and any(
map(
url.netloc.endswith,
[
"youtube.com",
"youtu.be",
"open.spotify.com",
"spotify.link",
"deezer.page.link",
"deezer.com",
"soundcloud.com",
],
)
)

View File

@@ -1,16 +1,17 @@
from aiogram import Router
from . import (
initialize,
inline_song,
inline_url,
inline_error,
inline_default,
inline_empty,
on_chosen,
)
from bot.middlewares import SaveChosenMiddleware, SettingsInjectorMiddleware
from . import (
initialize,
inline_default,
inline_empty,
inline_error,
inline_song,
inline_url,
on_chosen,
)
router = Router()
router.chosen_inline_result.outer_middleware(SaveChosenMiddleware())

View File

@@ -1,10 +1,9 @@
from aiogram import Router, Bot
from aiogram import Bot, Router
from rich import print
router = Router()
@router.startup()
async def startup(bot: Bot):
print(f'[green]Started as[/] @{(await bot.me()).username}')
print(f"[green]Started as[/] @{(await bot.me()).username}")

View File

@@ -1,26 +1,24 @@
from aiogram import Router, F
from aiogram import F, Router
from aiogram.types import InlineQuery
from bot.modules.settings import UserSettings
from bot.results.deezer import get_deezer_search_results
from bot.results.soundcloud import get_soundcloud_search_results
from bot.results.youtube import get_youtube_search_results
from bot.results.spotify import get_spotify_search_results
from bot.modules.settings import UserSettings
from bot.results.youtube import get_youtube_search_results
router = Router()
@router.inline_query(F.query != '')
@router.inline_query(F.query != "")
async def default_inline_query(inline_query: InlineQuery, settings: UserSettings):
await inline_query.answer(
await {
'd': get_deezer_search_results,
'c': get_soundcloud_search_results,
'y': get_youtube_search_results,
's': get_spotify_search_results
}[settings['default_search_provider'].value](inline_query.query, settings),
"d": get_deezer_search_results,
"c": get_soundcloud_search_results,
"y": get_youtube_search_results,
"s": get_spotify_search_results,
}[settings["default_search_provider"].value](inline_query.query, settings),
cache_time=0,
is_personal=True
is_personal=True,
)

View File

@@ -1,24 +1,21 @@
from aiogram import Router, F
from aiogram.types import (
InlineQuery, InputTextMessageContent, InlineQueryResultArticle
)
from aiogram import F, Router
from aiogram.types import InlineQuery, InlineQueryResultArticle, InputTextMessageContent
from bot.keyboards.inline.full_menu import get_full_menu_kb
router = Router()
@router.inline_query(F.query == '')
@router.inline_query(F.query == "")
async def empty_inline_query(inline_query: InlineQuery):
await inline_query.answer(
[
InlineQueryResultArticle(
id='show_menu',
title='⚙️ Open menu',
input_message_content=InputTextMessageContent(
message_text='⚙️ Menu'
),
reply_markup=get_full_menu_kb()
id="show_menu",
title="⚙️ Open menu",
input_message_content=InputTextMessageContent(message_text="⚙️ Menu"),
reply_markup=get_full_menu_kb(),
)
], cache_time=0
],
cache_time=0,
)

View File

@@ -1,17 +1,16 @@
from aiogram import Router
from aiogram.types import InlineQuery
from bot.results.error import get_error_search_results
from bot.filters import ServiceSearchFilter
from bot.results.error import get_error_search_results
router = Router()
@router.inline_query(ServiceSearchFilter('error'))
@router.inline_query(ServiceSearchFilter("error"))
async def search_spotify_inline_query(inline_query: InlineQuery):
await inline_query.answer(
await get_error_search_results(inline_query.query.removeprefix('error:')),
await get_error_search_results(inline_query.query.removeprefix("error:")),
cache_time=0,
is_personal=True
is_personal=True,
)

View File

@@ -1,12 +1,16 @@
from aiogram import Router
from . import (on_inline_spotify, on_inline_deezer, on_inline_youtube,
on_inline_soundcloud)
from . import (
on_inline_deezer,
on_inline_soundcloud,
on_inline_spotify,
on_inline_youtube,
)
router = Router()
router.include_routers(
on_inline_spotify.router,
on_inline_deezer.router,
on_inline_youtube.router,
on_inline_soundcloud.router
on_inline_soundcloud.router,
)

View File

@@ -1,21 +1,19 @@
from aiogram import Router
from aiogram.types import InlineQuery
from bot.results.deezer import get_deezer_search_results
from bot.filters import ServiceSearchFilter
from bot.modules.settings import UserSettings
from bot.results.deezer import get_deezer_search_results
router = Router()
@router.inline_query(ServiceSearchFilter('d'))
@router.inline_query(ServiceSearchFilter("d"))
async def search_deezer_inline_query(inline_query: InlineQuery, settings: UserSettings):
await inline_query.answer(
await get_deezer_search_results(
inline_query.query.removeprefix('d:'),
settings
inline_query.query.removeprefix("d:"), settings
),
cache_time=0,
is_personal=True
is_personal=True,
)

View File

@@ -1,24 +1,21 @@
from aiogram import Router
from aiogram.types import InlineQuery
from bot.results.soundcloud import get_soundcloud_search_results
from bot.filters import ServiceSearchMultiletterFilter
from bot.modules.settings import UserSettings
from bot.results.soundcloud import get_soundcloud_search_results
router = Router()
@router.inline_query(ServiceSearchMultiletterFilter(['c', 'с']))
@router.inline_query(ServiceSearchMultiletterFilter(["c", "с"]))
async def search_soundcloud_inline_query(
inline_query: InlineQuery,
settings: UserSettings
inline_query: InlineQuery, settings: UserSettings
):
await inline_query.answer(
await get_soundcloud_search_results(
inline_query.query.removeprefix('c:').removesuffix('с:'),
settings
inline_query.query.removeprefix("c:").removesuffix("с:"), settings
),
cache_time=0,
is_personal=True
is_personal=True,
)

View File

@@ -1,22 +1,21 @@
from aiogram import Router
from aiogram.types import InlineQuery
from bot.results.spotify import get_spotify_search_results
from bot.filters import ServiceSearchFilter
from bot.modules.settings import UserSettings
from bot.results.spotify import get_spotify_search_results
router = Router()
@router.inline_query(ServiceSearchFilter('s'))
@router.inline_query(ServiceSearchFilter("s"))
async def search_spotify_inline_query(
inline_query: InlineQuery,
settings: UserSettings
inline_query: InlineQuery, settings: UserSettings
):
await inline_query.answer(
await get_spotify_search_results(inline_query.query.removeprefix('s:'),
settings),
await get_spotify_search_results(
inline_query.query.removeprefix("s:"), settings
),
cache_time=0,
is_personal=True
is_personal=True,
)

View File

@@ -1,20 +1,21 @@
from aiogram import Router
from aiogram.types import InlineQuery
from bot.results.youtube import get_youtube_search_results
from bot.filters import ServiceSearchFilter
from bot.modules.settings import UserSettings
from bot.results.youtube import get_youtube_search_results
router = Router()
@router.inline_query(ServiceSearchFilter('y'))
async def search_youtube_inline_query(inline_query: InlineQuery,
settings: UserSettings):
@router.inline_query(ServiceSearchFilter("y"))
async def search_youtube_inline_query(
inline_query: InlineQuery, settings: UserSettings
):
await inline_query.answer(
await get_youtube_search_results(inline_query.query.removeprefix('y:'),
settings),
await get_youtube_search_results(
inline_query.query.removeprefix("y:"), settings
),
cache_time=0,
is_personal=True
is_personal=True,
)

View File

@@ -1,10 +1,9 @@
from aiogram import Router
from aiogram.types import InlineQuery
from bot.results.url import get_url_results
from bot.filters import MusicUrlFilter
from bot.modules.settings import UserSettings
from bot.results.url import get_url_results
router = Router()
@@ -14,5 +13,5 @@ async def url_deezer_inline_query(inline_query: InlineQuery, settings: UserSetti
await inline_query.answer(
await get_url_results(inline_query.query, settings),
cache_time=0,
is_personal=True
is_personal=True,
)

View File

@@ -1,5 +1,6 @@
from aiogram import Router
from . import spotify, deezer, youtube, soundcloud, recode_cached, suppress_verify
from . import deezer, recode_cached, soundcloud, spotify, suppress_verify, youtube
router = Router()
@@ -12,4 +13,4 @@ router.include_routers(
suppress_verify.router,
)
__all__ = ['router']
__all__ = ["router"]

View File

@@ -1,21 +1,23 @@
from aiogram import Router, Bot, F
from aiogram import Bot, F, Router
from aiogram.types import (
BufferedInputFile, URLInputFile, InputMediaAudio,
BufferedInputFile,
ChosenInlineResult,
InputMediaAudio,
URLInputFile,
)
from bot.modules.deezer import deezer, DeezerBytestream
from bot.utils.config import config
from bot.modules.database import db
from bot.modules.deezer import DeezerBytestream, deezer
from bot.utils.config import config
router = Router()
@router.chosen_inline_result(F.result_id.startswith('deez::'))
@router.chosen_inline_result(F.result_id.startswith("deez::"))
async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
bytestream: DeezerBytestream = await (await deezer.downloader.from_id(
chosen_result.result_id.removeprefix('deez::')
)).to_bytestream()
bytestream: DeezerBytestream = await (
await deezer.downloader.from_id(chosen_result.result_id.removeprefix("deez::"))
).to_bytestream()
audio = await bot.send_audio(
chat_id=config.telegram.files_chat,
@@ -34,5 +36,5 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
reply_markup=None
reply_markup=None,
)

View File

@@ -1,57 +1,48 @@
from aiogram import Router, Bot, F
from aiogram.types import (
BufferedInputFile, InputMediaAudio,
ChosenInlineResult,
)
from io import BytesIO
from bot.modules.youtube.downloader import YouTubeBytestream
from aiogram import Bot, F, Router
from aiogram.types import BufferedInputFile, ChosenInlineResult, InputMediaAudio
from bot.utils.config import config
from bot.modules.database import db
from bot.modules.settings import UserSettings
from io import BytesIO
from bot.modules.youtube.downloader import YouTubeBytestream
from bot.utils.config import config
router = Router()
@router.chosen_inline_result(
F.result_id.startswith('spotc::') | F.result_id.startswith('ytc::')
F.result_id.startswith("spotc::") | F.result_id.startswith("ytc::")
)
async def on_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot,
settings: UserSettings):
if settings['recode_youtube'].value != 'yes':
async def on_cached_chosen(
chosen_result: ChosenInlineResult, bot: Bot, settings: UserSettings
):
if settings["recode_youtube"].value != "yes":
await bot.edit_message_reply_markup(
inline_message_id=chosen_result.inline_message_id,
reply_markup=None
inline_message_id=chosen_result.inline_message_id, reply_markup=None
)
return
if (
type(
db.recoded.get(
song_id := chosen_result.result_id
.removeprefix('spotc::')
.removeprefix('ytc::')
if type(
db.recoded.get(
song_id := chosen_result.result_id.removeprefix("spotc::").removeprefix(
"ytc::"
)
) in [bool, type(None)]
):
)
) in [bool, type(None)]:
await bot.edit_message_reply_markup(
inline_message_id=chosen_result.inline_message_id,
reply_markup=None
inline_message_id=chosen_result.inline_message_id, reply_markup=None
)
return
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='🔄 Recoding...',
reply_markup=None
caption="🔄 Recoding...",
reply_markup=None,
)
message = await bot.forward_message(
config.telegram.files_chat,
config.telegram.files_chat,
db.recoded[song_id]
config.telegram.files_chat, config.telegram.files_chat, db.recoded[song_id]
)
song_io: BytesIO = await bot.download( # type: ignore
@@ -76,7 +67,7 @@ async def on_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot,
),
thumbnail=BufferedInputFile(
file=(await bot.download(message.audio.thumbnail.file_id)).read(),
filename='thumbnail.jpg'
filename="thumbnail.jpg",
),
performer=message.audio.performer,
title=message.audio.title,
@@ -85,15 +76,15 @@ async def on_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot,
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='',
caption="",
reply_markup=None,
)
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id)
media=InputMediaAudio(media=audio.audio.file_id),
)
if chosen_result.result_id.startswith('spotc::'):
if chosen_result.result_id.startswith("spotc::"):
db.spotify[song_id] = audio.audio.file_id
else:
db.youtube[song_id] = audio.audio.file_id

View File

@@ -1,21 +1,25 @@
from aiogram import Router, Bot, F
from aiogram import Bot, F, Router
from aiogram.types import (
BufferedInputFile, URLInputFile, InputMediaAudio,
BufferedInputFile,
ChosenInlineResult,
InputMediaAudio,
URLInputFile,
)
from bot.modules.soundcloud import soundcloud, SoundCloudBytestream
from bot.utils.config import config
from bot.modules.database import db
from bot.modules.soundcloud import SoundCloudBytestream, soundcloud
from bot.utils.config import config
router = Router()
@router.chosen_inline_result(F.result_id.startswith('sc::'))
@router.chosen_inline_result(F.result_id.startswith("sc::"))
async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
bytestream: SoundCloudBytestream = await (await soundcloud.downloader.from_id(
chosen_result.result_id.removeprefix('sc::')
)).to_bytestream()
bytestream: SoundCloudBytestream = await (
await soundcloud.downloader.from_id(
chosen_result.result_id.removeprefix("sc::")
)
).to_bytestream()
audio = await bot.send_audio(
chat_id=config.telegram.files_chat,
@@ -33,5 +37,5 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
reply_markup=None
reply_markup=None,
)

View File

@@ -1,31 +1,34 @@
from aiogram import Router, Bot, F
from aiogram import Bot, F, Router
from aiogram.types import (
BufferedInputFile, URLInputFile, InputMediaAudio,
BufferedInputFile,
ChosenInlineResult,
InputMediaAudio,
URLInputFile,
)
from bot.modules.spotify import spotify
from bot.modules.youtube import youtube, AgeRestrictedError
from bot.modules.youtube.song import SongItem
from bot.modules.deezer import deezer
from bot.utils.config import config
from bot.modules.database import db
from bot.modules.deezer import deezer
from bot.modules.settings import UserSettings
from bot.modules.spotify import spotify
from bot.modules.youtube import AgeRestrictedError, youtube
from bot.modules.youtube.song import SongItem
from bot.utils.config import config
router = Router()
def not_strict_name(song, yt_song):
if 'feat' in yt_song.name.lower():
if "feat" in yt_song.name.lower():
return any(artist.lower() in yt_song.name.lower() for artist in song.artists)
else:
return False
@router.chosen_inline_result(F.result_id.startswith('spot::'))
async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
settings: UserSettings):
song = spotify.songs.from_id(chosen_result.result_id.removeprefix('spot::'))
@router.chosen_inline_result(F.result_id.startswith("spot::"))
async def on_new_chosen(
chosen_result: ChosenInlineResult, bot: Bot, settings: UserSettings
):
song = spotify.songs.from_id(chosen_result.result_id.removeprefix("spot::"))
bytestream = None
audio = None
@@ -34,14 +37,15 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
song.full_name,
exact_match=True,
)
if settings['exact_spotify_search'].value == 'yes':
if ((song.all_artists != yt_song.all_artists or song.name != yt_song.name)
and not not_strict_name(song, yt_song)):
if settings["exact_spotify_search"].value == "yes":
if (
song.all_artists != yt_song.all_artists or song.name != yt_song.name
) and not not_strict_name(song, yt_song):
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='🙄 Cannot find this song on YouTube, trying Deezer...',
caption="🙄 Cannot find this song on YouTube, trying Deezer...",
reply_markup=None,
parse_mode='HTML',
parse_mode="HTML",
)
yt_song = None
bytestream = False
@@ -66,9 +70,9 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
except AgeRestrictedError:
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='🔞 This song is age restricted, trying Deezer...',
caption="🔞 This song is age restricted, trying Deezer...",
reply_markup=None,
parse_mode='HTML',
parse_mode="HTML",
)
yt_song = None
@@ -99,29 +103,29 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
assert e
if audio:
if settings['exact_spotify_search'].value == 'yes':
if settings["exact_spotify_search"].value == "yes":
db.spotify[song.id] = audio.audio.file_id
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
reply_markup=None
reply_markup=None,
)
else:
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='🤷‍♂️ Cannot download this song',
caption="🤷‍♂️ Cannot download this song",
reply_markup=None,
parse_mode='HTML',
parse_mode="HTML",
)
if yt_song and settings['recode_youtube'].value == 'yes':
if yt_song and settings["recode_youtube"].value == "yes":
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='🔄 Recoding...',
caption="🔄 Recoding...",
reply_markup=None,
parse_mode='HTML',
parse_mode="HTML",
)
await bytestream.rerender()
@@ -139,20 +143,20 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
db.youtube[yt_song.id] = audio.audio.file_id
db.recoded[yt_song.id] = True
if settings['exact_spotify_search'].value == 'yes':
if settings["exact_spotify_search"].value == "yes":
db.spotify[song.id] = audio.audio.file_id
db.recoded[song.id] = True
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='',
caption="",
reply_markup=None,
)
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id)
media=InputMediaAudio(media=audio.audio.file_id),
)
elif yt_song and settings['recode_youtube'].value == 'no':
elif yt_song and settings["recode_youtube"].value == "no":
db.recoded[yt_song.id] = audio.message_id
if settings['exact_spotify_search'].value == 'yes':
if settings["exact_spotify_search"].value == "yes":
db.recoded[song.id] = audio.message_id

View File

@@ -1,16 +1,13 @@
from aiogram import Router, Bot, F
from aiogram.types import (
ChosenInlineResult,
)
from aiogram import Bot, F, Router
from aiogram.types import ChosenInlineResult
router = Router()
@router.chosen_inline_result(
F.result_id.startswith('deezc::') | F.result_id.startswith('scc::')
F.result_id.startswith("deezc::") | F.result_id.startswith("scc::")
)
async def on_unneeded_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot):
await bot.edit_message_reply_markup(
inline_message_id=chosen_result.inline_message_id,
reply_markup=None
inline_message_id=chosen_result.inline_message_id, reply_markup=None
)

View File

@@ -1,30 +1,33 @@
from aiogram import Router, Bot, F
from aiogram import Bot, F, Router
from aiogram.types import (
BufferedInputFile, URLInputFile, InputMediaAudio,
BufferedInputFile,
ChosenInlineResult,
InputMediaAudio,
URLInputFile,
)
from bot.modules.youtube import youtube, AgeRestrictedError
from bot.utils.config import config
from bot.modules.database import db
from bot.modules.settings import UserSettings
from bot.modules.youtube import AgeRestrictedError, youtube
from bot.utils.config import config
router = Router()
@router.chosen_inline_result(F.result_id.startswith('yt::'))
async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
settings: UserSettings):
song = youtube.songs.from_id(chosen_result.result_id.removeprefix('yt::'))
@router.chosen_inline_result(F.result_id.startswith("yt::"))
async def on_new_chosen(
chosen_result: ChosenInlineResult, bot: Bot, settings: UserSettings
):
song = youtube.songs.from_id(chosen_result.result_id.removeprefix("yt::"))
try:
bytestream = await song.to_bytestream()
except AgeRestrictedError:
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='🔞 This song is age restricted, so I can\'t download it. '
'Try downloading it from Deezer or SoundCloud',
reply_markup=None
caption="🔞 This song is age restricted, so I can't download it. "
"Try downloading it from Deezer or SoundCloud",
reply_markup=None,
)
return
@@ -45,14 +48,14 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
reply_markup=None
reply_markup=None,
)
if settings['recode_youtube'].value == 'yes':
if settings["recode_youtube"].value == "yes":
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
caption='🔄 Recoding...',
reply_markup=None
caption="🔄 Recoding...",
reply_markup=None,
)
await bytestream.rerender()
@@ -75,7 +78,7 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
reply_markup=None
reply_markup=None,
)
else:
db.recoded[song.id] = audio.message_id

View File

@@ -1,27 +1,23 @@
from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
InlineKeyboardBuilder)
from bot.factories.full_menu import FullMenuCallback
from aiogram.utils.keyboard import (
InlineKeyboardBuilder,
InlineKeyboardButton,
InlineKeyboardMarkup,
)
from bot.factories.full_menu import FullMenuCallback
from bot.keyboards.inline import search_variants as sv
def get_full_menu_kb() -> InlineKeyboardMarkup:
buttons = (sv.get_search_variants(
query='',
services=
sv.soundcloud |
sv.spotify |
sv.deezer |
sv.youtube
buttons = sv.get_search_variants(
query="", services=sv.soundcloud | sv.spotify | sv.deezer | sv.youtube
) + [
[
InlineKeyboardButton(
text='⚙️ Settings',
callback_data=FullMenuCallback(
action='settings'
).pack()
)
],
])
[
InlineKeyboardButton(
text="⚙️ Settings",
callback_data=FullMenuCallback(action="settings").pack(),
)
],
]
return InlineKeyboardBuilder(buttons).as_markup()

View File

@@ -1,42 +1,33 @@
from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
InlineKeyboardBuilder)
from aiogram.utils.keyboard import (
InlineKeyboardBuilder,
InlineKeyboardButton,
InlineKeyboardMarkup,
)
deezer = {
'd': '🎵 Search in Deezer'
}
soundcloud = {
'c': '☁️ Search in SoundCloud'
}
youtube = {
'y': '▶️ Search in YouTube'
}
spotify = {
's': '🎧 Search in Spotify'
}
deezer = {"d": "🎵 Search in Deezer"}
soundcloud = {"c": "☁️ Search in SoundCloud"}
youtube = {"y": "▶️ Search in YouTube"}
spotify = {"s": "🎧 Search in Spotify"}
def get_search_variants(
query: str,
services: dict[str, str],
query: str,
services: dict[str, str],
) -> list[list[InlineKeyboardButton]]:
buttons = [
[
InlineKeyboardButton(
text=services[key],
switch_inline_query_current_chat=f'{key}:{query}'
text=services[key], switch_inline_query_current_chat=f"{key}:{query}"
)
] for key in services.keys()
]
for key in services.keys()
]
return buttons
def get_search_variants_kb(
query: str,
services: dict[str, str],
query: str,
services: dict[str, str],
) -> InlineKeyboardMarkup:
return InlineKeyboardBuilder(get_search_variants(
query,
services
)).as_markup()
return InlineKeyboardBuilder(get_search_variants(query, services)).as_markup()

View File

@@ -1,8 +1,11 @@
from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
InlineKeyboardBuilder)
from bot.factories.open_setting import SettingChoiceCallback
from bot.factories.full_menu import FullMenuCallback
from aiogram.utils.keyboard import (
InlineKeyboardBuilder,
InlineKeyboardButton,
InlineKeyboardMarkup,
)
from bot.factories.full_menu import FullMenuCallback
from bot.factories.open_setting import SettingChoiceCallback
from bot.modules.settings import UserSettings
@@ -11,22 +14,21 @@ def get_setting_kb(s_id: str, user_id: str) -> InlineKeyboardMarkup:
buttons = [
[
InlineKeyboardButton(
text=(
'' if setting.value == choice else ''
) + setting.choices[choice],
text=("" if setting.value == choice else "")
+ setting.choices[choice],
callback_data=SettingChoiceCallback(
s_id=s_id,
choice=choice,
).pack()
).pack(),
)
] for choice in setting.choices.keys()
] + [[
InlineKeyboardButton(
text='🔙',
callback_data=FullMenuCallback(
action='settings'
).pack()
)
]]
]
for choice in setting.choices.keys()
] + [
[
InlineKeyboardButton(
text="🔙", callback_data=FullMenuCallback(action="settings").pack()
)
]
]
return InlineKeyboardBuilder(buttons).as_markup()

View File

@@ -1,8 +1,11 @@
from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
InlineKeyboardBuilder)
from bot.factories.open_setting import OpenSettingCallback
from bot.factories.full_menu import FullMenuCallback
from aiogram.utils.keyboard import (
InlineKeyboardBuilder,
InlineKeyboardButton,
InlineKeyboardMarkup,
)
from bot.factories.full_menu import FullMenuCallback
from bot.factories.open_setting import OpenSettingCallback
from bot.modules.settings import settings_strings
@@ -13,16 +16,16 @@ def get_settings_kb() -> InlineKeyboardMarkup:
text=settings_strings[setting_id].name,
callback_data=OpenSettingCallback(
s_id=setting_id,
).pack()
).pack(),
)
] for setting_id in settings_strings.keys()
] + [[
InlineKeyboardButton(
text='🔙',
callback_data=FullMenuCallback(
action='home'
).pack()
)
]]
]
for setting_id in settings_strings.keys()
] + [
[
InlineKeyboardButton(
text="🔙", callback_data=FullMenuCallback(action="home").pack()
)
]
]
return InlineKeyboardBuilder(buttons).as_markup()

View File

@@ -1,3 +1,3 @@
from .inject_settings import SettingsInjectorMiddleware
from .private_button import PrivateButtonMiddleware
from .save_chosen import SaveChosenMiddleware
from .inject_settings import SettingsInjectorMiddleware

View File

@@ -1,26 +1,27 @@
from typing import Any, Awaitable, Callable, Dict
from aiogram.dispatcher.middlewares.base import BaseMiddleware
from aiogram.types import TelegramObject
from typing import Any, Awaitable, Callable, Dict
from bot.modules.settings import UserSettings
class SettingsInjectorMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
):
if (not hasattr(event, 'from_user') and
(not hasattr(event, 'inline_query') or event.inline_query is None)):
if not hasattr(event, "from_user") and (
not hasattr(event, "inline_query") or event.inline_query is None
):
return await handler(event, data)
elif hasattr(event, 'inline_query') and event.inline_query is not None:
elif hasattr(event, "inline_query") and event.inline_query is not None:
settings = UserSettings(event.inline_query.from_user.id)
data['settings'] = settings
data["settings"] = settings
else:
settings = UserSettings(event.from_user.id)
data['settings'] = settings
data["settings"] = settings
return await handler(event, data)

View File

@@ -1,19 +1,19 @@
from typing import Any, Awaitable, Callable, Dict
from aiogram.dispatcher.middlewares.base import BaseMiddleware
from aiogram.types import CallbackQuery
from typing import Any, Awaitable, Callable, Dict
from bot.modules.database import db
class PrivateButtonMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[CallbackQuery, Dict[str, Any]], Awaitable[Any]],
event: CallbackQuery,
data: Dict[str, Any],
self,
handler: Callable[[CallbackQuery, Dict[str, Any]], Awaitable[Any]],
event: CallbackQuery,
data: Dict[str, Any],
):
if event.from_user.id == db.inline[event.inline_message_id].from_user.id:
return await handler(event, data)
else:
await event.answer('This button is not for you')
await event.answer("This button is not for you")

View File

@@ -1,9 +1,9 @@
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict
from aiogram.dispatcher.middlewares.base import BaseMiddleware
from aiogram.types import ChosenInlineResult
from typing import Any, Awaitable, Callable, Dict
from dataclasses import dataclass
from bot.modules.database import db
@@ -26,10 +26,10 @@ class SavedResult:
class SaveChosenMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[ChosenInlineResult, Dict[str, Any]], Awaitable[Any]],
event: ChosenInlineResult,
data: Dict[str, Any],
self,
handler: Callable[[ChosenInlineResult, Dict[str, Any]], Awaitable[Any]],
event: ChosenInlineResult,
data: Dict[str, Any],
):
db.inline[event.inline_message_id] = SavedResult(
result_id=event.result_id,
@@ -38,9 +38,9 @@ class SaveChosenMiddleware(BaseMiddleware):
first_name=event.from_user.first_name,
last_name=event.from_user.last_name,
username=event.from_user.username,
language_code=event.from_user.language_code
language_code=event.from_user.language_code,
),
query=event.query,
inline_message_id=event.inline_message_id
inline_message_id=event.inline_message_id,
)
return await handler(event, data)

View File

@@ -11,7 +11,7 @@ class BaseSongItem:
@property
def all_artists(self):
return ', '.join(self.artists)
return ", ".join(self.artists)
@property
def full_name(self):

View File

@@ -1,6 +1,5 @@
from .db import Db
db = Db()
__all__ = ['db']
__all__ = ["db"]

View File

@@ -3,13 +3,13 @@ from .db_model import DBDict
class Db(object):
def __init__(self):
self.fsm = DBDict('fsm')
self.config = DBDict('config')
self.inline = DBDict('inline')
self.errors = DBDict('errors')
self.settings = DBDict('settings')
self.spotify = DBDict('spotify')
self.deezer = DBDict('deezer')
self.youtube = DBDict('youtube')
self.soundcloud = DBDict('soundcloud')
self.recoded = DBDict('recoded')
self.fsm = DBDict("fsm")
self.config = DBDict("config")
self.inline = DBDict("inline")
self.errors = DBDict("errors")
self.settings = DBDict("settings")
self.spotify = DBDict("spotify")
self.deezer = DBDict("deezer")
self.youtube = DBDict("youtube")
self.soundcloud = DBDict("soundcloud")
self.recoded = DBDict("recoded")

View File

@@ -1,4 +1,5 @@
from sqlitedict import SqliteDict
from bot.utils.config import config

View File

@@ -1,10 +1,10 @@
from .deezer import Deezer
from .downloader import DeezerBytestream
from bot.utils.config import config
from .deezer import Deezer
from .downloader import DeezerBytestream
deezer = Deezer(
arl=config.tokens.deezer.arl,
)
__all__ = ['deezer', 'DeezerBytestream']
__all__ = ["deezer", "DeezerBytestream"]

View File

@@ -1,9 +1,9 @@
import asyncio
from .song import Songs
from .engine import DeezerEngine
from .driver import DeezerDriver
from .downloader import DownloaderBuilder
from .driver import DeezerDriver
from .engine import DeezerEngine
from .song import Songs
class Deezer(object):

View File

@@ -1,12 +1,11 @@
from attrs import define
from io import BytesIO
from .driver import DeezerDriver
from attrs import define
from . import track_formats
from .util import UrlDecrypter, ChunkDecrypter
from .driver import DeezerDriver
from .song import FullSongItem
from .util import ChunkDecrypter, UrlDecrypter
@define
@@ -17,10 +16,7 @@ class DeezerBytestream:
@classmethod
def from_bytestream(
cls,
bytestream: BytesIO,
filename: str,
full_song: FullSongItem
cls, bytestream: BytesIO, filename: str, full_song: FullSongItem
):
bytestream.seek(0)
return cls(
@@ -38,21 +34,18 @@ class Downloader:
song: FullSongItem
@classmethod
async def build(
cls,
song_id: str,
driver: DeezerDriver
):
async def build(cls, song_id: str, driver: DeezerDriver):
track = await driver.reverse_get_track(song_id)
try:
return cls(
song_id=str(song_id),
driver=driver,
track=track['results'],
song=await FullSongItem.from_deezer(track)
track=track["results"],
song=await FullSongItem.from_deezer(track),
)
except KeyError:
from icecream import ic
ic(track)
await driver.renew_engine()
return await cls.build(song_id, driver)
@@ -65,7 +58,7 @@ class Downloader:
audio = BytesIO()
async for chunk in self.driver.engine.get_data_iter(
await self._get_download_url(quality=quality)
await self._get_download_url(quality=quality)
):
if i % 3 > 0 or len(chunk) < 2 * 1024:
audio.write(chunk)
@@ -76,18 +69,16 @@ class Downloader:
return DeezerBytestream.from_bytestream(
filename=self.song.full_name + track_formats.TRACK_FORMAT_MAP[quality].ext,
bytestream=audio,
full_song=self.song
full_song=self.song,
)
async def _get_download_url(self, quality: str = 'MP3_128'):
async def _get_download_url(self, quality: str = "MP3_128"):
md5_origin = self.track["MD5_ORIGIN"]
track_id = self.track["SNG_ID"]
media_version = self.track["MEDIA_VERSION"]
url_decrypter = UrlDecrypter(
md5_origin=md5_origin,
track_id=track_id,
media_version=media_version
md5_origin=md5_origin, track_id=track_id, media_version=media_version
)
return url_decrypter.get_url_for(track_formats.TRACK_FORMAT_MAP[quality])
@@ -98,7 +89,4 @@ class DownloaderBuilder:
driver: DeezerDriver
async def from_id(self, song_id: str):
return await Downloader.build(
song_id=song_id,
driver=self.driver
)
return await Downloader.build(song_id=song_id, driver=self.driver)

View File

@@ -1,7 +1,6 @@
from attrs import define
from .engine import DeezerEngine
from .util import clean_query
@@ -10,30 +9,19 @@ class DeezerDriver:
engine: DeezerEngine
async def get_track(self, track_id: int | str):
data = await self.engine.call_legacy_api(
f'track/{track_id}'
)
data = await self.engine.call_legacy_api(f"track/{track_id}")
return data
async def reverse_get_track(self, track_id: str):
return await self.engine.call_api(
'song.getData',
params={
'SNG_ID': track_id
}
)
return await self.engine.call_api("song.getData", params={"SNG_ID": track_id})
async def search(self, query: str, limit: int = 30):
data = await self.engine.call_legacy_api(
'search/track',
params={
'q': clean_query(query),
'limit': limit
}
"search/track", params={"q": clean_query(query), "limit": limit}
)
return data['data']
return data["data"]
async def renew_engine(self):
self.engine = await self.engine.from_arl(self.engine.arl)

View File

@@ -1,19 +1,16 @@
import aiohttp
from aiohttp import ClientResponse
from attrs import define
HTTP_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
"(KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
"Content-Language": "en-US",
"Cache-Control": "max-age=0",
"Accept": "*/*",
"Accept-Charset": "utf-8,ISO-8859-1;q=0.7,*;q=0.3",
"Accept-Language": "en-US,en;q=0.9,en-US;q=0.8,en;q=0.7",
"Connection": 'keep-alive'
"Connection": "keep-alive",
}
@@ -25,28 +22,22 @@ class DeezerEngine:
@classmethod
async def from_arl(cls, arl: str):
cookies = {'arl': arl}
cookies = {"arl": arl}
data, cookies = await cls(cookies).call_api(
'deezer.getUserData', get_cookies=True
"deezer.getUserData", get_cookies=True
)
data = data['results']
token = data['checkForm']
data = data["results"]
token = data["checkForm"]
return cls(
cookies=cookies,
arl=arl,
token=token
)
return cls(cookies=cookies, arl=arl, token=token)
async def call_legacy_api(
self, request_point: str, params: dict = None
):
async def call_legacy_api(self, request_point: str, params: dict = None):
async with aiohttp.ClientSession(cookies=self.cookies) as session:
async with session.get(
f"https://api.deezer.com/{request_point}",
params=params,
headers=HTTP_HEADERS
f"https://api.deezer.com/{request_point}",
params=params,
headers=HTTP_HEADERS,
) as r:
return await r.json()
@@ -63,31 +54,26 @@ class DeezerEngine:
async def get_data_iter(self, url: str):
async with aiohttp.ClientSession(
cookies=self.cookies,
headers=HTTP_HEADERS
cookies=self.cookies, headers=HTTP_HEADERS
) as session:
r = await session.get(
url,
allow_redirects=True
)
r = await session.get(url, allow_redirects=True)
async for chunk in self._iter_exact_chunks(r):
yield chunk
async def call_api(
self, method: str, params: dict = None,
get_cookies: bool = False
self, method: str, params: dict = None, get_cookies: bool = False
):
async with aiohttp.ClientSession(cookies=self.cookies) as session:
async with session.post(
f"https://www.deezer.com/ajax/gw-light.php",
params={
'method': method,
'api_version': '1.0',
'input': '3',
'api_token': self.token or 'null',
},
headers=HTTP_HEADERS,
json=params
f"https://www.deezer.com/ajax/gw-light.php",
params={
"method": method,
"api_version": "1.0",
"input": "3",
"api_token": self.token or "null",
},
headers=HTTP_HEADERS,
json=params,
) as r:
if not get_cookies:
return await r.json()

View File

@@ -1,8 +1,7 @@
from attrs import define
from .driver import DeezerDriver
from ..common.song import BaseSongItem
from .driver import DeezerDriver
@define
@@ -10,11 +9,11 @@ class SongItem(BaseSongItem):
@classmethod
def from_deezer(cls, song_item: dict):
return cls(
name=song_item['title'],
id=str(song_item['id']),
artists=[song_item['artist']['name']],
preview_url=song_item.get('preview'),
thumbnail=song_item['album']['cover_medium']
name=song_item["title"],
id=str(song_item["id"]),
artists=[song_item["artist"]["name"]],
preview_url=song_item.get("preview"),
thumbnail=song_item["album"]["cover_medium"],
)
@@ -25,21 +24,23 @@ class FullSongItem(BaseSongItem):
@classmethod
async def from_deezer(cls, song_item: dict):
if song_item.get('results'):
song_item = song_item['results']
if song_item.get("results"):
song_item = song_item["results"]
return cls(
name=song_item['SNG_TITLE'],
id=song_item['SNG_ID'],
artists=[artist['ART_NAME'] for artist in song_item['ARTISTS']],
preview_url=(song_item.get('MEDIA').get('HREF')
if type(song_item.get('MEDIA')) is dict and
song_item.get('MEDIA').get('TYPE') == 'preview'
else None),
thumbnail=f'https://e-cdns-images.dzcdn.net/images/cover/'
f'{song_item["ALB_PICTURE"]}/320x320.jpg',
duration=int(song_item['DURATION']),
track_dict=song_item
name=song_item["SNG_TITLE"],
id=song_item["SNG_ID"],
artists=[artist["ART_NAME"] for artist in song_item["ARTISTS"]],
preview_url=(
song_item.get("MEDIA").get("HREF")
if type(song_item.get("MEDIA")) is dict
and song_item.get("MEDIA").get("TYPE") == "preview"
else None
),
thumbnail=f"https://e-cdns-images.dzcdn.net/images/cover/"
f'{song_item["ALB_PICTURE"]}/320x320.jpg',
duration=int(song_item["DURATION"]),
track_dict=song_item,
)

View File

@@ -19,32 +19,11 @@ class TrackFormat:
TRACK_FORMAT_MAP = {
FLAC: TrackFormat(
code=9,
ext=".flac"
),
MP3_128: TrackFormat(
code=1,
ext=".mp3"
),
MP3_256: TrackFormat(
code=5,
ext=".mp3"
),
MP3_320: TrackFormat(
code=3,
ext=".mp3"
),
MP4_RA1: TrackFormat(
code=13,
ext=".mp4"
),
MP4_RA2: TrackFormat(
code=14,
ext=".mp4"
),
MP4_RA3: TrackFormat(
code=15,
ext=".mp3"
)
FLAC: TrackFormat(code=9, ext=".flac"),
MP3_128: TrackFormat(code=1, ext=".mp3"),
MP3_256: TrackFormat(code=5, ext=".mp3"),
MP3_320: TrackFormat(code=3, ext=".mp3"),
MP4_RA1: TrackFormat(code=13, ext=".mp4"),
MP4_RA2: TrackFormat(code=14, ext=".mp4"),
MP4_RA3: TrackFormat(code=15, ext=".mp3"),
}

View File

@@ -1,12 +1,11 @@
# https://pypi.org/project/music-helper/
import warnings
import re
import hashlib
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import re
import warnings
from attrs import define
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .track_formats import TrackFormat
@@ -34,20 +33,20 @@ class UrlDecrypter:
media_version: str
def get_url_for(self, track_format: TrackFormat):
step1 = (f'{self.md5_origin}¤{track_format.code}¤'
f'{self.track_id}¤{self.media_version}')
step1 = (
f"{self.md5_origin}¤{track_format.code}¤"
f"{self.track_id}¤{self.media_version}"
)
m = hashlib.md5()
m.update(bytes([ord(x) for x in step1]))
step2 = f'{m.hexdigest()}¤{step1}¤'
step2 = f"{m.hexdigest()}¤{step1}¤"
step2 = step2.ljust(80, " ")
cipher = Cipher(
algorithm=algorithms.AES(
key=bytes('jo6aey6haid2Teih', 'ascii')
),
algorithm=algorithms.AES(key=bytes("jo6aey6haid2Teih", "ascii")),
mode=modes.ECB(),
backend=default_backend()
backend=default_backend(),
)
encryptor = cipher.encryptor()
@@ -55,7 +54,7 @@ class UrlDecrypter:
cdn = self.md5_origin[0]
return f'https://e-cdns-proxy-{cdn}.dzcdn.net/mobile/1/{step3}'
return f"https://e-cdns-proxy-{cdn}.dzcdn.net/mobile/1/{step3}"
@define
@@ -69,12 +68,10 @@ class ChunkDecrypter:
cipher = Cipher(
algorithms.Blowfish(get_blowfish_key(track_id)),
modes.CBC(bytes([i for i in range(8)])),
default_backend()
default_backend(),
)
return cls(
cipher=cipher
)
return cls(cipher=cipher)
def decrypt_chunk(self, chunk: bytes):
decryptor = self.cipher.decryptor()
@@ -82,7 +79,7 @@ class ChunkDecrypter:
def get_blowfish_key(track_id: str):
secret = 'g4el58wc0zvf9na1'
secret = "g4el58wc0zvf9na1"
m = hashlib.md5()
m.update(bytes([ord(x) for x in track_id]))

View File

@@ -1,2 +1,2 @@
from .handler import on_error, Error
from .handler import Error, on_error
from .pretty import PrettyException

View File

@@ -1,14 +1,14 @@
from bot.common import console
from aiogram.types.error_event import ErrorEvent
from dataclasses import dataclass
from aiogram import Bot
from aiogram.dispatcher import router as s_router
from aiogram.types.error_event import ErrorEvent
from rich.traceback import Traceback
from .pretty import PrettyException
from bot.common import console
from bot.modules.database import db
from dataclasses import dataclass
from .pretty import PrettyException
@dataclass
@@ -19,8 +19,8 @@ class Error:
async def on_error(event: ErrorEvent, bot: Bot):
import os
import base64
import os
error_id = base64.urlsafe_b64encode(os.urandom(6)).decode()
@@ -42,9 +42,9 @@ async def on_error(event: ErrorEvent, bot: Bot):
await bot.edit_message_caption(
inline_message_id=event.update.chosen_inline_result.inline_message_id,
caption=f'💔 <b>ERROR</b> occurred. Use this code to get more information: '
f'<code>{error_id}</code>',
parse_mode='HTML',
caption=f"💔 <b>ERROR</b> occurred. Use this code to get more information: "
f"<code>{error_id}</code>",
parse_mode="HTML",
)
else:
@@ -53,7 +53,7 @@ async def on_error(event: ErrorEvent, bot: Bot):
exception=pretty_exception,
)
console.print(f'[red]{error_id} occurred[/]')
console.print(f"[red]{error_id} occurred[/]")
console.print(event)
console.print(traceback)
console.print(f'-{error_id} occurred-')
console.print(f"-{error_id} occurred-")

View File

@@ -1,7 +1,7 @@
import os
import traceback
import contextlib
import os
import re
import traceback
from typing import Type
@@ -13,12 +13,14 @@ class PrettyException:
🐊 <code>{e.__traceback__.tb_frame.f_code.co_filename.replace(os.getcwd(), "")}\r
</code>:{e.__traceback__.tb_frame.f_lineno}
"""
self.short = (f'{e.__class__.__name__}: '
f'{"".join(traceback.format_exception_only(e)).strip()}')
self.short = (
f"{e.__class__.__name__}: "
f'{"".join(traceback.format_exception_only(e)).strip()}'
)
self.pretty_exception = (f"{self.long}\n\n"
f"⬇️ Trace:"
f"{self.get_full_stack()}")
self.pretty_exception = (
f"{self.long}\n\n" f"⬇️ Trace:" f"{self.get_full_stack()}"
)
@staticmethod
def get_full_stack():
@@ -40,9 +42,11 @@ class PrettyException:
full_stack = "\n".join(
[
format_line(line)
if re.search(line_regex, line)
else f"<code>{line}</code>"
(
format_line(line)
if re.search(line_regex, line)
else f"<code>{line}</code>"
)
for line in full_stack.splitlines()
]
)

View File

@@ -1,13 +1,10 @@
from bot.modules.database import db
from dataclasses import dataclass, field
from typing import Any, DefaultDict, Dict, Optional
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import (
BaseStorage,
StateType,
StorageKey,
)
from aiogram.fsm.storage.base import BaseStorage, StateType, StorageKey
from bot.modules.database import db
@dataclass
@@ -18,14 +15,14 @@ class MemoryStorageRecord:
class StorageDict(DefaultDict):
def __init__(self, default_factory=None) -> None:
if type(db.fsm.get('fsm')) is not dict:
db.fsm['fsm'] = dict()
if type(db.fsm.get("fsm")) is not dict:
db.fsm["fsm"] = dict()
super().__init__(default_factory, db.fsm['fsm'])
super().__init__(default_factory, db.fsm["fsm"])
def __setitem__(self, key, value):
super().__setitem__(key, value)
db.fsm['fsm'] = dict(self)
db.fsm["fsm"] = dict(self)
class InDbStorage(BaseStorage):

View File

@@ -1 +1 @@
from .model import UserSettings, Setting, settings_strings
from .model import Setting, UserSettings, settings_strings

View File

@@ -1,4 +1,5 @@
from dataclasses import dataclass
from ..database import db
@@ -11,46 +12,32 @@ class Setting:
settings_strings: dict[str, Setting] = {
'search_preview': Setting(
name='Search preview',
description='Show only covers (better display), '
'or add 30 seconds of track preview whenever possible?',
choices={
'cover': 'Cover picture',
'preview': 'Audio preview'
},
"search_preview": Setting(
name="Search preview",
description="Show only covers (better display), "
"or add 30 seconds of track preview whenever possible?",
choices={"cover": "Cover picture", "preview": "Audio preview"},
),
'recode_youtube': Setting(
name='Recode YouTube (and Spotify)',
description='Recode when downloading from YouTube (and Spotify) to '
'more compatible format (may take some time)',
choices={
'no': 'Send original file',
'yes': 'Recode to libmp3lame'
},
"recode_youtube": Setting(
name="Recode YouTube (and Spotify)",
description="Recode when downloading from YouTube (and Spotify) to "
"more compatible format (may take some time)",
choices={"no": "Send original file", "yes": "Recode to libmp3lame"},
),
'exact_spotify_search': Setting(
name='Only exact Spotify matches',
description='When searching on Youtube from Spotify, show only exact matches, '
'may protect against inaccurate matches, but at the same time it '
'can lose reuploaded tracks. Should be enabled always, except in '
'situations where the track is not found on both YouTube and '
'Deezer',
choices={
'yes': 'Only exact matches',
'no': 'Fuzzy matches also'
},
"exact_spotify_search": Setting(
name="Only exact Spotify matches",
description="When searching on Youtube from Spotify, show only exact matches, "
"may protect against inaccurate matches, but at the same time it "
"can lose reuploaded tracks. Should be enabled always, except in "
"situations where the track is not found on both YouTube and "
"Deezer",
choices={"yes": "Only exact matches", "no": "Fuzzy matches also"},
),
"default_search_provider": Setting(
name="Default search provider",
description="Which service to use when searching without service filter",
choices={"d": "Deezer", "c": "SoundCloud", "y": "YouTube", "s": "Spotify"},
),
'default_search_provider': Setting(
name='Default search provider',
description='Which service to use when searching without service filter',
choices={
'd': 'Deezer',
'c': 'SoundCloud',
'y': 'YouTube',
's': 'Spotify'
}
)
}
@@ -64,8 +51,8 @@ class UserSettings:
if db.settings.get(self.user_id) is None:
db.settings[self.user_id] = dict(
(setting, list(settings_strings[setting].choices)[0]) for setting in
settings_strings
(setting, list(settings_strings[setting].choices)[0])
for setting in settings_strings
)
def __getitem__(self, item):

View File

@@ -1,10 +1,10 @@
from .soundcloud import SoundCloud
from .downloader import SoundCloudBytestream
from bot.utils.config import config
from .downloader import SoundCloudBytestream
from .soundcloud import SoundCloud
soundcloud = SoundCloud(
client_id=config.tokens.soundcloud.client_id,
)
__all__ = ['soundcloud', 'SoundCloudBytestream']
__all__ = ["soundcloud", "SoundCloudBytestream"]

View File

@@ -1,11 +1,11 @@
from attrs import define
from typing import Callable
import m3u8
from attrs import define
from .driver import SoundCloudDriver
from .song import SongItem
import m3u8
@define
class SoundCloudBytestream:
@@ -15,18 +15,9 @@ class SoundCloudBytestream:
song: SongItem
@classmethod
def from_bytes(
cls,
bytes_: bytes,
filename: str,
duration: int,
song: SongItem
):
def from_bytes(cls, bytes_: bytes, filename: str, duration: int, song: SongItem):
return cls(
file=bytes_,
filename=filename,
duration=int(duration / 1000),
song=song
file=bytes_, filename=filename, duration=int(duration / 1000), song=song
)
@@ -40,60 +31,53 @@ class Downloader:
song: SongItem
@classmethod
async def build(
cls,
song_id: str,
driver: SoundCloudDriver
):
async def build(cls, song_id: str, driver: SoundCloudDriver):
track = await driver.get_track(song_id)
song = SongItem.from_soundcloud(track)
if url := cls._try_get_progressive(track['media']['transcodings']):
if url := cls._try_get_progressive(track["media"]["transcodings"]):
method = cls._progressive
else:
url = track['media']['transcodings'][0]['url']
method = cls._hls if \
(track['media']['transcodings'][0]['format']['protocol']
== 'hls') else cls._progressive
url = track["media"]["transcodings"][0]["url"]
method = (
cls._hls
if (track["media"]["transcodings"][0]["format"]["protocol"] == "hls")
else cls._progressive
)
return cls(
driver=driver,
duration=track['duration'],
duration=track["duration"],
method=method,
download_url=url,
filename=f'{track["title"]}.mp3',
song=song
song=song,
)
@staticmethod
def _try_get_progressive(urls: list) -> str | None:
for transcode in urls:
if transcode['format']['protocol'] == 'progressive':
return transcode['url']
if transcode["format"]["protocol"] == "progressive":
return transcode["url"]
async def _progressive(self, url: str) -> bytes:
return await self.driver.engine.read_data(
url=(await self.driver.engine.get(
url
))['url']
url=(await self.driver.engine.get(url))["url"]
)
async def _hls(self, url: str) -> bytes:
m3u8_obj = m3u8.loads(
(await self.driver.engine.read_data(
(await self.driver.engine.get(
url=url
))['url']
)).decode()
(
await self.driver.engine.read_data(
(await self.driver.engine.get(url=url))["url"]
)
).decode()
)
content = bytearray()
for segment in m3u8_obj.files:
content.extend(
await self.driver.engine.read_data(
url=segment,
append_client_id=False
)
await self.driver.engine.read_data(url=segment, append_client_id=False)
)
return content
@@ -103,7 +87,7 @@ class Downloader:
bytes_=await self.method(self, self.download_url),
filename=self.filename,
duration=self.duration,
song=self.song
song=self.song,
)
@@ -112,7 +96,4 @@ class DownloaderBuilder:
driver: SoundCloudDriver
async def from_id(self, song_id: str):
return await Downloader.build(
song_id=song_id,
driver=self.driver
)
return await Downloader.build(song_id=song_id, driver=self.driver)

View File

@@ -8,23 +8,12 @@ class SoundCloudDriver:
engine: SoundCloudEngine
async def get_track(self, track_id: int | str):
return await self.engine.call(
f'tracks/{track_id}'
)
return await self.engine.call(f"tracks/{track_id}")
async def search(self, query: str, limit: int = 30):
return (await self.engine.call(
'search/tracks',
params={
'q': query,
'limit': limit
}
))['collection']
return (
await self.engine.call("search/tracks", params={"q": query, "limit": limit})
)["collection"]
async def resolve_url(self, url: str):
return await self.engine.call(
'resolve',
params={
'url': url
}
)
return await self.engine.call("resolve", params={"url": url})

View File

@@ -1,5 +1,5 @@
from attrs import define
import aiohttp
from attrs import define
@define
@@ -8,27 +8,33 @@ class SoundCloudEngine:
async def call(self, request_point: str, params: dict = None):
return await self.get(
url=f'https://api-v2.soundcloud.com/{request_point}',
params=params
url=f"https://api-v2.soundcloud.com/{request_point}", params=params
)
async def get(self, url: str, params: dict = None):
async with aiohttp.ClientSession() as session:
async with session.get(
url,
params=(params or {}) | {
'client_id': self.client_id,
},
url,
params=(params or {})
| {
"client_id": self.client_id,
},
) as r:
return await r.json()
async def read_data(self, url: str, params: dict = None,
append_client_id: bool = True):
async def read_data(
self, url: str, params: dict = None, append_client_id: bool = True
):
async with aiohttp.ClientSession() as session:
async with session.get(
url,
params=(params or {}) | ({
'client_id': self.client_id,
} if append_client_id else {}),
url,
params=(params or {})
| (
{
"client_id": self.client_id,
}
if append_client_id
else {}
),
) as r:
return await r.content.read()

View File

@@ -9,13 +9,15 @@ class SongItem(BaseSongItem):
@classmethod
def from_soundcloud(cls, song_item: dict):
return cls(
name=song_item['title'],
id=str(song_item['id']),
name=song_item["title"],
id=str(song_item["id"]),
artists=[],
thumbnail=(song_item['artwork_url'] or song_item['user']['avatar_url'] or
'https://soundcloud.com/images/default_avatar_large.png')
.replace('large.jpg', 't300x300.jpg'),
preview_url=None
thumbnail=(
song_item["artwork_url"]
or song_item["user"]["avatar_url"]
or "https://soundcloud.com/images/default_avatar_large.png"
).replace("large.jpg", "t300x300.jpg"),
preview_url=None,
)
@property

View File

@@ -1,7 +1,7 @@
from .engine import SoundCloudEngine
from .driver import SoundCloudDriver
from .song import Songs
from .downloader import DownloaderBuilder
from .driver import SoundCloudDriver
from .engine import SoundCloudEngine
from .song import Songs
class SoundCloud(object):

View File

@@ -1,10 +1,10 @@
from .spotify import Spotify
from bot.utils.config import config
from .spotify import Spotify
spotify = Spotify(
client_id=config.tokens.spotify.client_id,
client_secret=config.tokens.spotify.client_secret
client_secret=config.tokens.spotify.client_secret,
)
__all__ = ['spotify']
__all__ = ["spotify"]

View File

@@ -1,5 +1,5 @@
from attrs import define
import spotipy
from attrs import define
from ..common.song import BaseSongItem
@@ -9,12 +9,15 @@ class SongItem(BaseSongItem):
@classmethod
def from_spotify(cls, song_item: dict):
return cls(
name=song_item['name'],
id=song_item['id'],
artists=[artist['name'] for artist in song_item['artists']],
preview_url=song_item['preview_url'].split('?')[0] if
song_item['preview_url'] is not None else None,
thumbnail=song_item['album']['images'][1]['url']
name=song_item["name"],
id=song_item["id"],
artists=[artist["name"] for artist in song_item["artists"]],
preview_url=(
song_item["preview_url"].split("?")[0]
if song_item["preview_url"] is not None
else None
),
thumbnail=song_item["album"]["images"][1]["url"],
)
@@ -28,7 +31,7 @@ class Songs(object):
if r is None:
return None
return [SongItem.from_spotify(item) for item in r['tracks']['items']]
return [SongItem.from_spotify(item) for item in r["tracks"]["items"]]
def from_id(self, song_id: str) -> SongItem | None:
r = self.spotify.track(song_id)

View File

@@ -8,11 +8,10 @@ class Spotify(object):
def __init__(self, client_id, client_secret):
self.spotify = spotipy.Spotify(
client_credentials_manager=SpotifyClientCredentials(
client_id=client_id,
client_secret=client_secret
client_id=client_id, client_secret=client_secret
),
backoff_factor=0.1,
retries=10
retries=10,
)
self.songs = Songs(self.spotify)

View File

@@ -1,2 +1,2 @@
from .recognise import recognise_music_service, RecognisedService
from .id_getter import get_id
from .recognise import RecognisedService, recognise_music_service

View File

@@ -1,7 +1,7 @@
from .recognise import RecognisedService
import aiohttp
from .recognise import RecognisedService
async def get_url_after_redirect(url: str) -> str:
async with aiohttp.ClientSession() as session:
@@ -10,26 +10,28 @@ async def get_url_after_redirect(url: str) -> str:
async def get_id(recognised: RecognisedService):
if recognised.name == 'yt':
return recognised.parse_result.path.replace('/', '') if (
recognised.parse_result.netloc.endswith('youtu.be')
) else recognised.parse_result.query.split('=')[1].split('&')[0]
if recognised.name == "yt":
return (
recognised.parse_result.path.replace("/", "")
if (recognised.parse_result.netloc.endswith("youtu.be"))
else recognised.parse_result.query.split("=")[1].split("&")[0]
)
elif recognised.name == 'spot':
if recognised.parse_result.netloc.endswith('open.spotify.com'):
return recognised.parse_result.path.split('/')[2]
elif recognised.name == "spot":
if recognised.parse_result.netloc.endswith("open.spotify.com"):
return recognised.parse_result.path.split("/")[2]
else:
url = await get_url_after_redirect(recognised.parse_result.geturl())
return url.split('/')[-1].split('?')[0]
return url.split("/")[-1].split("?")[0]
elif recognised.name == 'deez':
if recognised.parse_result.netloc.endswith('deezer.com'):
return recognised.parse_result.path.split('/')[-1]
elif recognised.name == "deez":
if recognised.parse_result.netloc.endswith("deezer.com"):
return recognised.parse_result.path.split("/")[-1]
else:
url = await get_url_after_redirect(recognised.parse_result.geturl())
return url.split('/')[-1].split('?')[0]
return url.split("/")[-1].split("?")[0]
elif recognised.name == 'sc':
if not recognised.parse_result.netloc.startswith('on'):
elif recognised.name == "sc":
if not recognised.parse_result.netloc.startswith("on"):
return recognised.parse_result.geturl()
return await get_url_after_redirect(recognised.parse_result.geturl())

View File

@@ -1,20 +1,18 @@
from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass
from typing import Callable, Awaitable, Literal
from typing import Awaitable, Callable, Literal
from urllib.parse import ParseResult, urlparse
from bot.modules.database import db
from bot.modules.database.db import DBDict
from bot.modules.youtube import youtube
from bot.modules.spotify import spotify
from bot.modules.deezer import deezer
from bot.modules.soundcloud import soundcloud
from bot.modules.spotify import spotify
from bot.modules.youtube import youtube
@dataclass
class RecognisedService:
name: Literal['yt', 'spot', 'deez', 'sc']
name: Literal["yt", "spot", "deez", "sc"]
db_table: DBDict
by_id_func: Callable | Awaitable
parse_result: ParseResult
@@ -22,33 +20,33 @@ class RecognisedService:
def recognise_music_service(url: str) -> RecognisedService | None:
url = urlparse(url)
if url.netloc.endswith('youtube.com') or url.netloc.endswith('youtu.be'):
if url.netloc.endswith("youtube.com") or url.netloc.endswith("youtu.be"):
return RecognisedService(
name='yt',
name="yt",
db_table=db.youtube,
by_id_func=youtube.songs.from_id,
parse_result=url
parse_result=url,
)
elif url.netloc.endswith('open.spotify.com') or url.netloc.endswith('spotify.link'):
elif url.netloc.endswith("open.spotify.com") or url.netloc.endswith("spotify.link"):
return RecognisedService(
name='spot',
name="spot",
db_table=db.spotify,
by_id_func=spotify.songs.from_id,
parse_result=url
parse_result=url,
)
elif url.netloc.endswith('deezer.page.link') or url.netloc.endswith('deezer.com'):
elif url.netloc.endswith("deezer.page.link") or url.netloc.endswith("deezer.com"):
return RecognisedService(
name='deez',
name="deez",
db_table=db.deezer,
by_id_func=deezer.songs.from_id,
parse_result=url
parse_result=url,
)
elif url.netloc.endswith('soundcloud.com'):
elif url.netloc.endswith("soundcloud.com"):
return RecognisedService(
name='sc',
name="sc",
db_table=db.soundcloud,
by_id_func=soundcloud.songs.from_url,
parse_result=url
parse_result=url,
)
else:
return None

View File

@@ -1,8 +1,8 @@
from .youtube import YouTube
from pytube.exceptions import AgeRestrictedError
from pytubefix.exceptions import AgeRestrictedError
from .youtube import YouTube
youtube = YouTube()
__all__ = ['youtube', 'AgeRestrictedError']
__all__ = ["youtube", "AgeRestrictedError"]

View File

@@ -1,12 +1,10 @@
from attrs import define
from pytube import YouTube, Stream
from pydub import AudioSegment
import asyncio
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
import asyncio
from attrs import define
from pydub import AudioSegment
from pytubefix import Stream, YouTube
@define
@@ -16,12 +14,7 @@ class YouTubeBytestream:
duration: int
@classmethod
def from_bytestream(
cls,
bytestream: BytesIO,
filename: str,
duration: float
):
def from_bytestream(cls, bytestream: BytesIO, filename: str, duration: float):
bytestream.seek(0)
return cls(
file=bytestream.read(),
@@ -30,11 +23,9 @@ class YouTubeBytestream:
)
def __rerender(self):
segment = AudioSegment.from_file(
file=BytesIO(self.file)
)
segment = AudioSegment.from_file(file=BytesIO(self.file))
self.file = segment.export(BytesIO(), format='mp3', codec='libmp3lame').read()
self.file = segment.export(BytesIO(), format="mp3", codec="libmp3lame").read()
return self
async def rerender(self):
@@ -54,13 +45,18 @@ class Downloader:
def from_id(cls, yt_id: str):
video = YouTube.from_id(yt_id)
audio_stream = video.streams.filter(
only_audio=True,
).order_by('abr').desc().first()
audio_stream = (
video.streams.filter(
only_audio=True,
)
.order_by("abr")
.desc()
.first()
)
return cls(
audio_stream=audio_stream,
filename=f'{audio_stream.default_filename}.mp3',
filename=f"{audio_stream.default_filename}.mp3",
duration=int(video.length),
)

View File

@@ -1,11 +1,10 @@
from attrs import define
import ytmusicapi
from .downloader import Downloader, YouTubeBytestream
from typing import Awaitable
import ytmusicapi
from attrs import define
from ..common.song import BaseSongItem
from .downloader import Downloader, YouTubeBytestream
@define
@@ -15,19 +14,19 @@ class SongItem(BaseSongItem):
@classmethod
def from_youtube(cls, song_item: dict):
return cls(
name=song_item['title'],
id=song_item['videoId'],
artists=[artist['name'] for artist in song_item['artists']],
thumbnail=song_item['thumbnails'][1]['url']
name=song_item["title"],
id=song_item["videoId"],
artists=[artist["name"] for artist in song_item["artists"]],
thumbnail=song_item["thumbnails"][1]["url"],
)
@classmethod
def from_details(cls, details: dict):
return cls(
name=details['title'],
id=details['videoId'],
artists=details['author'].split(' & '),
thumbnail=details['thumbnail']['thumbnails'][1]['url']
name=details["title"],
id=details["videoId"],
artists=details["author"].split(" & "),
thumbnail=details["thumbnail"]["thumbnails"][1]["url"],
)
def to_bytestream(self) -> Awaitable[YouTubeBytestream]:
@@ -39,16 +38,10 @@ class Songs(object):
ytm: ytmusicapi.YTMusic
def search(
self,
query: str,
limit: int = 10,
exact_match: bool = False
self, query: str, limit: int = 10, exact_match: bool = False
) -> list[SongItem] | None:
r = self.ytm.search(
query,
limit=limit,
filter='songs',
ignore_spelling=exact_match
query, limit=limit, filter="songs", ignore_spelling=exact_match
)
if r is None:
@@ -68,4 +61,4 @@ class Songs(object):
if r is None:
return None
return SongItem.from_details(r['videoDetails'])
return SongItem.from_details(r["videoDetails"])

View File

@@ -1,7 +1,7 @@
import ytmusicapi
from .song import Songs
from .downloader import Downloader
from .song import Songs
class YouTube(object):
@@ -9,6 +9,4 @@ class YouTube(object):
self.ytm = ytmusicapi.YTMusic()
self.download = Downloader
self.songs = Songs(
self.ytm
)
self.songs = Songs(self.ytm)

View File

@@ -1,46 +1,48 @@
from typing import TypeVar
from aiogram.types import (
InlineQueryResultDocument, InlineQueryResultCachedAudio,
InlineKeyboardMarkup, InlineKeyboardButton
InlineKeyboardButton,
InlineKeyboardMarkup,
InlineQueryResultCachedAudio,
InlineQueryResultDocument,
)
from bot.modules.common.song import BaseSongItem
from bot.modules.database.db import DBDict
from bot.modules.settings import UserSettings
from bot.modules.common.song import BaseSongItem
from typing import TypeVar
BaseSongT = TypeVar('BaseSongT', bound=BaseSongItem)
BaseSongT = TypeVar("BaseSongT", bound=BaseSongItem)
async def get_common_search_result(
audio: BaseSongT,
db_table: DBDict,
service_id: str,
settings: UserSettings
audio: BaseSongT, db_table: DBDict, service_id: str, settings: UserSettings
) -> InlineQueryResultDocument | InlineQueryResultCachedAudio:
return (
InlineQueryResultDocument(
id=f'{service_id}::' + audio.id,
id=f"{service_id}::" + audio.id,
title=audio.name,
description=audio.all_artists,
thumb_url=audio.thumbnail,
document_url=(audio.preview_url or audio.thumbnail) if
settings['search_preview'].value == 'preview' else audio.thumbnail,
mime_type='application/zip',
document_url=(
(audio.preview_url or audio.thumbnail)
if settings["search_preview"].value == "preview"
else audio.thumbnail
),
mime_type="application/zip",
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text='Downloading...', callback_data='.')]
[InlineKeyboardButton(text="Downloading...", callback_data=".")]
]
),
caption=audio.full_name,
) if audio.id not in list(db_table.keys()) else
InlineQueryResultCachedAudio(
id=f'{service_id}c::' + audio.id,
)
if audio.id not in list(db_table.keys())
else InlineQueryResultCachedAudio(
id=f"{service_id}c::" + audio.id,
audio_file_id=db_table[audio.id],
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text='Verifying...', callback_data='.')]
[InlineKeyboardButton(text="Verifying...", callback_data=".")]
]
),
)

View File

@@ -1,4 +1,3 @@
from .search import get_deezer_search_results
__all__ = ['get_deezer_search_results']
__all__ = ["get_deezer_search_results"]

View File

@@ -1,23 +1,18 @@
from aiogram.types import (
InlineQueryResultDocument, InlineQueryResultCachedAudio
)
from aiogram.types import InlineQueryResultCachedAudio, InlineQueryResultDocument
from bot.modules.deezer import deezer
from bot.modules.database import db
from bot.modules.deezer import deezer
from bot.modules.settings import UserSettings
from ..common.search import get_common_search_result
async def get_deezer_search_results(query: str, settings: UserSettings) -> list[
InlineQueryResultDocument | InlineQueryResultCachedAudio
]:
async def get_deezer_search_results(
query: str, settings: UserSettings
) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
audio=audio,
db_table=db.deezer,
service_id='deez',
settings=settings
audio=audio, db_table=db.deezer, service_id="deez", settings=settings
)
for audio in await deezer.songs.search(query, limit=50)
]

View File

@@ -1,31 +1,31 @@
from aiogram.types import (
InlineQueryResultArticle, InputTextMessageContent,
)
from aiogram.types import InlineQueryResultArticle, InputTextMessageContent
from bot.common import console
from bot.modules.database import db
from bot.modules.error import Error
from bot.common import console
async def get_error_search_results(error_id: str) -> (list[InlineQueryResultArticle]
| None):
async def get_error_search_results(
error_id: str,
) -> list[InlineQueryResultArticle] | None:
error: Error = db.errors.get(error_id)
if error is None:
return []
console.print(f'{error_id} requested')
console.print(f"{error_id} requested")
console.print(error.traceback)
console.print(f'-{error_id} requested-')
console.print(f"-{error_id} requested-")
return [(
InlineQueryResultArticle(
id=error_id,
title=f'Error {error_id}',
description=error.exception.short,
input_message_content=InputTextMessageContent(
message_text=error.exception.long,
parse_mode='HTML',
),
return [
(
InlineQueryResultArticle(
id=error_id,
title=f"Error {error_id}",
description=error.exception.short,
input_message_content=InputTextMessageContent(
message_text=error.exception.long,
parse_mode="HTML",
),
)
)
)]
]

View File

@@ -1,6 +1,3 @@
from .search import get_soundcloud_search_results
__all__ = [
'get_soundcloud_search_results'
]
__all__ = ["get_soundcloud_search_results"]

View File

@@ -1,23 +1,18 @@
from aiogram.types import (
InlineQueryResultDocument, InlineQueryResultCachedAudio
)
from aiogram.types import InlineQueryResultCachedAudio, InlineQueryResultDocument
from bot.modules.soundcloud import soundcloud
from bot.modules.database import db
from bot.modules.settings import UserSettings
from bot.modules.soundcloud import soundcloud
from ..common.search import get_common_search_result
async def get_soundcloud_search_results(query: str, settings: UserSettings) -> list[
InlineQueryResultDocument | InlineQueryResultCachedAudio
]:
async def get_soundcloud_search_results(
query: str, settings: UserSettings
) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
audio=audio,
db_table=db.soundcloud,
service_id='sc',
settings=settings
audio=audio, db_table=db.soundcloud, service_id="sc", settings=settings
)
for audio in await soundcloud.songs.search(query, limit=50)
]

View File

@@ -1,6 +1,3 @@
from .search import get_spotify_search_results
__all__ = [
'get_spotify_search_results'
]
__all__ = ["get_spotify_search_results"]

View File

@@ -1,23 +1,18 @@
from aiogram.types import (
InlineQueryResultDocument, InlineQueryResultCachedAudio
)
from aiogram.types import InlineQueryResultCachedAudio, InlineQueryResultDocument
from bot.modules.spotify import spotify
from bot.modules.database import db
from bot.modules.settings import UserSettings
from bot.modules.spotify import spotify
from ..common.search import get_common_search_result
async def get_spotify_search_results(query: str, settings: UserSettings) -> list[
InlineQueryResultDocument | InlineQueryResultCachedAudio
]:
async def get_spotify_search_results(
query: str, settings: UserSettings
) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
audio=audio,
db_table=db.spotify,
service_id='spot',
settings=settings
audio=audio, db_table=db.spotify, service_id="spot", settings=settings
)
for audio in spotify.songs.search(query, limit=50)
]

View File

@@ -1,18 +1,16 @@
from aiogram.types import (
InlineQueryResultDocument, InlineQueryResultCachedAudio
)
import inspect
from aiogram.types import InlineQueryResultCachedAudio, InlineQueryResultDocument
from bot.modules.url import recognise_music_service, get_id
from bot.modules.settings import UserSettings
from bot.modules.url import get_id, recognise_music_service
from ..common.search import get_common_search_result
import inspect
async def get_url_results(query: str, settings: UserSettings) -> list[
InlineQueryResultDocument | InlineQueryResultCachedAudio
]:
async def get_url_results(
query: str, settings: UserSettings
) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
service = recognise_music_service(query)
if inspect.iscoroutinefunction(service.by_id_func):
audio = await service.by_id_func(await get_id(service))
@@ -26,6 +24,6 @@ async def get_url_results(query: str, settings: UserSettings) -> list[
audio=audio,
db_table=service.db_table,
service_id=service.name,
settings=settings
settings=settings,
)
]

View File

@@ -1,6 +1,3 @@
from .search import get_youtube_search_results
__all__ = [
'get_youtube_search_results'
]
__all__ = ["get_youtube_search_results"]

View File

@@ -1,23 +1,18 @@
from aiogram.types import (
InlineQueryResultDocument, InlineQueryResultCachedAudio
)
from aiogram.types import InlineQueryResultCachedAudio, InlineQueryResultDocument
from bot.modules.youtube import youtube
from bot.modules.database import db
from bot.modules.settings import UserSettings
from bot.modules.youtube import youtube
from ..common.search import get_common_search_result
async def get_youtube_search_results(query: str, settings: UserSettings) -> list[
InlineQueryResultDocument | InlineQueryResultCachedAudio
]:
async def get_youtube_search_results(
query: str, settings: UserSettings
) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
audio=audio,
db_table=db.youtube,
service_id='yt',
settings=settings
audio=audio, db_table=db.youtube, service_id="yt", settings=settings
)
for audio in youtube.songs.search(query, limit=40)
]

View File

@@ -1,4 +1,3 @@
from ._config import Config
config = Config()

View File

@@ -5,7 +5,7 @@ class Config(dict):
def __init__(self, _config: dict = None):
try:
if _config is None:
config = tomllib.load(open('config.toml', 'rb'))
config = tomllib.load(open("config.toml", "rb"))
super().__init__(**config)
else:

View File

@@ -1,6 +1,6 @@
from .serializers import Serialize
from .api import Shazam
from .converter import Geo
from .enums import GenreMusic
from .serializers import Serialize
__all__ = ("Serialize", "Shazam", "Geo", "GenreMusic")

View File

@@ -1,5 +1,6 @@
from copy import copy
from typing import List, Optional, Any
from typing import Any, List, Optional
import numpy as np
from .enums import FrequencyBand
@@ -40,7 +41,9 @@ class SignatureGenerator:
# Used when processing input:
self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0)
self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(
buffer_size=2048, default_value=0
)
self.fft_outputs: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0.0 * 1025]
@@ -91,12 +94,15 @@ class SignatureGenerator:
self.next_signature.number_samples / self.next_signature.sample_rate_hz
< self.MAX_TIME_SECONDS
or sum(
len(peaks) for peaks in self.next_signature.frequency_band_to_sound_peaks.values()
len(peaks)
for peaks in self.next_signature.frequency_band_to_sound_peaks.values()
)
< self.MAX_PEAKS
):
self.process_input(
self.input_pending_processing[self.samples_processed : self.samples_processed + 128]
self.input_pending_processing[
self.samples_processed : self.samples_processed + 128
]
)
self.samples_processed += 128
@@ -107,7 +113,9 @@ class SignatureGenerator:
self.next_signature.number_samples = 0
self.next_signature.frequency_band_to_sound_peaks = {}
self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0)
self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(
buffer_size=2048, default_value=0
)
self.fft_outputs: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0.0 * 1025]
)
@@ -124,7 +132,9 @@ class SignatureGenerator:
self.do_peak_spreading_and_recognition()
def do_fft(self, batch_of_128_s16le_mono_samples):
type_ring = self.ring_buffer_of_samples.position + len(batch_of_128_s16le_mono_samples)
type_ring = self.ring_buffer_of_samples.position + len(
batch_of_128_s16le_mono_samples
)
self.ring_buffer_of_samples[
self.ring_buffer_of_samples.position : type_ring
] = batch_of_128_s16le_mono_samples
@@ -159,10 +169,13 @@ class SignatureGenerator:
temporary_array_1[1] = np.roll(temporary_array_1[1], -1)
temporary_array_1[2] = np.roll(temporary_array_1[2], -2)
origin_last_fft_np = np.hstack([temporary_array_1.max(axis=0)[:-3], origin_last_fft[-3:]])
origin_last_fft_np = np.hstack(
[temporary_array_1.max(axis=0)[:-3], origin_last_fft[-3:]]
)
i1, i2, i3 = [
(self.spread_fft_output.position + former_fft_num) % self.spread_fft_output.buffer_size
(self.spread_fft_output.position + former_fft_num)
% self.spread_fft_output.buffer_size
for former_fft_num in [-1, -3, -6]
]
@@ -234,27 +247,38 @@ class SignatureGenerator:
fft_number = self.spread_fft_output.num_written - 46
peak_magnitude = (
np.log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3 + 6144
np.log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3
+ 6144
)
peak_magnitude_before = (
np.log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3 + 6144
np.log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3
+ 6144
)
peak_magnitude_after = (
np.log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3 + 6144
np.log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3
+ 6144
)
peak_variation_1 = (
peak_magnitude * 2 - peak_magnitude_before - peak_magnitude_after
peak_magnitude * 2
- peak_magnitude_before
- peak_magnitude_after
)
peak_variation_2 = (
(peak_magnitude_after - peak_magnitude_before) * 32 / peak_variation_1
(peak_magnitude_after - peak_magnitude_before)
* 32
/ peak_variation_1
)
corrected_peak_frequency_bin = bin_position * 64 + peak_variation_2
corrected_peak_frequency_bin = (
bin_position * 64 + peak_variation_2
)
assert peak_variation_1 > 0
frequency_hz = corrected_peak_frequency_bin * (16000 / 2 / 1024 / 64)
frequency_hz = corrected_peak_frequency_bin * (
16000 / 2 / 1024 / 64
)
if 250 < frequency_hz < 520:
band = FrequencyBand.hz_250_520
@@ -267,7 +291,10 @@ class SignatureGenerator:
else:
continue
if band not in self.next_signature.frequency_band_to_sound_peaks:
if (
band
not in self.next_signature.frequency_band_to_sound_peaks
):
self.next_signature.frequency_band_to_sound_peaks[band] = []
self.next_signature.frequency_band_to_sound_peaks[band].append(

View File

@@ -1,21 +1,17 @@
import pathlib
import uuid
import time
from typing import Optional
import uuid
from typing import Any, Dict, Optional, Union
from pydub import AudioSegment
from typing import Dict, Any, Union
from .misc import Request
from .misc import ShazamUrl
from .converter import Converter, Geo
from .enums import GenreMusic
from .misc import Request, ShazamUrl
from .schemas.artists import ArtistQuery
from .signature import DecodedMessage
from .enums import GenreMusic
from .converter import Converter, Geo
from .typehints import CountryCode
from .utils import ArtistQueryGenerator
from .utils import get_song
from .utils import ArtistQueryGenerator, get_song
class Shazam(Converter, Geo, Request):
@@ -27,7 +23,9 @@ class Shazam(Converter, Geo, Request):
self.language = language
self.endpoint_country = endpoint_country
async def top_world_tracks(self, limit: int = 200, offset: int = 0) -> Dict[str, Any]:
async def top_world_tracks(
self, limit: int = 200, offset: int = 0
) -> Dict[str, Any]:
"""
Search top world tracks
@@ -292,7 +290,9 @@ class Shazam(Converter, Geo, Request):
headers=self.headers(),
)
async def search_track(self, query: str, limit: int = 10, offset: int = 0) -> Dict[str, Any]:
async def search_track(
self, query: str, limit: int = 10, offset: int = 0
) -> Dict[str, Any]:
"""
Search all tracks by prefix
:param query: Track full title or prefix title

View File

@@ -1,5 +1,4 @@
import aiohttp
from shazamio.exceptions import BadMethod
from shazamio.utils import validate_json

View File

@@ -60,5 +60,7 @@ class Converter:
signature_generator.feed_input(audio.get_array_of_samples())
signature_generator.MAX_TIME_SECONDS = 12
if audio.duration_seconds > 12 * 3:
signature_generator.samples_processed += 16000 * (int(audio.duration_seconds / 2) - 6)
signature_generator.samples_processed += 16000 * (
int(audio.duration_seconds / 2) - 6
)
return signature_generator

View File

@@ -1,22 +1,19 @@
from dataclass_factory import Factory
from shazamio.factory import FactorySchemas
from shazamio.schemas.artists import ArtistInfo
from shazamio.schemas.artists import ArtistV3
from shazamio.schemas.artists import ArtistInfo, ArtistV3
from shazamio.schemas.attributes import ArtistAttribute
from shazamio.schemas.models import (
SongSection,
VideoSection,
RelatedSection,
LyricsSection,
BeaconDataLyricsSection,
ArtistSection,
BeaconDataLyricsSection,
LyricsSection,
MatchModel,
RelatedSection,
ResponseTrack,
SongSection,
TrackInfo,
VideoSection,
YoutubeData,
)
from shazamio.schemas.models import TrackInfo
from shazamio.schemas.models import YoutubeData
from shazamio.schemas.models import ResponseTrack
FACTORY_TRACK = Factory(
schemas={

View File

@@ -1,4 +1,5 @@
from random import choice
from shazamio.user_agent import USER_AGENTS
@@ -47,9 +48,7 @@ class ShazamUrl:
)
LISTENING_COUNTER = "https://www.shazam.com/services/count/v2/web/track/{}"
SEARCH_ARTIST_V2 = (
"https://www.shazam.com/services/amapi/v1/catalog/{endpoint_country}/artists/{artist_id}"
)
SEARCH_ARTIST_V2 = "https://www.shazam.com/services/amapi/v1/catalog/{endpoint_country}/artists/{artist_id}"
class Request:

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel
from shazamio.schemas.photos import ImageModel

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel
from shazamio.schemas.photos import ImageModel

View File

@@ -3,10 +3,8 @@ from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseHrefNextData
from shazamio.schemas.base import BaseIdTypeHref
from shazamio.schemas.base import BaseHrefNextData, BaseIdTypeHref
from shazamio.schemas.photos import ImageModel

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel
from shazamio.schemas.photos import ImageModel

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.artist.views.top_music import PlayParams
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel

View File

@@ -1,20 +1,14 @@
from dataclasses import dataclass
from dataclasses import field
from typing import List
from typing import Optional
from typing import Union
from pydantic import BaseModel
from pydantic import Field
from dataclasses import dataclass, field
from typing import List, Optional, Union
from pydantic import BaseModel, Field
from shazamio.schemas.artist.views.full_albums import FullAlbumsModel
from shazamio.schemas.artist.views.last_release import LastReleaseModel
from shazamio.schemas.artist.views.simular_artists import SimularArtist
from shazamio.schemas.artist.views.top_music import TopMusicVideosView
from shazamio.schemas.artist.views.top_song import TopSong
from shazamio.schemas.attributes import ArtistAttribute
from shazamio.schemas.enums import ArtistExtend
from shazamio.schemas.enums import ArtistView
from shazamio.schemas.enums import ArtistExtend, ArtistView
from shazamio.schemas.errors import ErrorModel
@@ -80,7 +74,9 @@ class ArtistRelationships(BaseModel):
class ArtistViews(BaseModel):
top_music_videos: Optional[TopMusicVideosView] = Field(None, alias="top-music-videos")
top_music_videos: Optional[TopMusicVideosView] = Field(
None, alias="top-music-videos"
)
simular_artists: Optional[SimularArtist] = Field(None, alias="similar-artists")
latest_release: Optional[LastReleaseModel] = Field(None, alias="latest-release")
full_albums: Optional[FullAlbumsModel] = Field(None, alias="full-albums")

View File

@@ -1,8 +1,6 @@
from typing import List
from typing import Optional
from typing import List, Optional
from pydantic import BaseModel
from pydantic import Field
from pydantic import BaseModel, Field
class AttributeName(BaseModel):

View File

@@ -1,11 +1,8 @@
from typing import Generic
from typing import Optional
from typing import TypeVar
from typing import Generic, Optional, TypeVar
from pydantic import BaseModel
from pydantic.generics import GenericModel
T = TypeVar("T", bound=BaseModel)

Some files were not shown because too many files have changed in this diff Show More