diff --git a/anymusicbot.py b/anymusicbot.py
index ed777dd..52cea9f 100644
--- a/anymusicbot.py
+++ b/anymusicbot.py
@@ -1,4 +1,4 @@
from bot import main
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/bot/__init__.py b/bot/__init__.py
index 6cb621c..be3cdcb 100644
--- a/bot/__init__.py
+++ b/bot/__init__.py
@@ -33,8 +33,8 @@ def main():
plugins()
- print('Starting...')
+ print("Starting...")
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(runner())
- print('[red]Stopped.[/]')
+ print("[red]Stopped.[/]")
diff --git a/bot/__main__.py b/bot/__main__.py
index c7c70d0..868d99e 100644
--- a/bot/__main__.py
+++ b/bot/__main__.py
@@ -1,4 +1,4 @@
from . import main
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/bot/callbacks/full_menu.py b/bot/callbacks/full_menu.py
index fcacee9..e9c9ede 100644
--- a/bot/callbacks/full_menu.py
+++ b/bot/callbacks/full_menu.py
@@ -1,7 +1,5 @@
from aiogram import Router, F, Bot
-from aiogram.types import (
- CallbackQuery
-)
+from aiogram.types import CallbackQuery
from bot.factories.full_menu import FullMenuCallback
@@ -10,10 +8,10 @@ from bot.keyboards.inline.settings import get_settings_kb
router = Router()
-@router.callback_query(FullMenuCallback.filter(F.action == 'settings'))
+@router.callback_query(FullMenuCallback.filter(F.action == "settings"))
async def on_settings(callback_query: CallbackQuery, bot: Bot):
await bot.edit_message_text(
inline_message_id=callback_query.inline_message_id,
- text='⚙️ Settings',
- reply_markup=get_settings_kb()
+ text="⚙️ Settings",
+ reply_markup=get_settings_kb(),
)
diff --git a/bot/callbacks/on_home.py b/bot/callbacks/on_home.py
index a97ae70..e699cb8 100644
--- a/bot/callbacks/on_home.py
+++ b/bot/callbacks/on_home.py
@@ -1,7 +1,5 @@
from aiogram import Router, F, Bot
-from aiogram.types import (
- CallbackQuery
-)
+from aiogram.types import CallbackQuery
from bot.factories.full_menu import FullMenuCallback
@@ -10,10 +8,10 @@ from bot.keyboards.inline.full_menu import get_full_menu_kb
router = Router()
-@router.callback_query(FullMenuCallback.filter(F.action == 'home'))
+@router.callback_query(FullMenuCallback.filter(F.action == "home"))
async def on_home(callback_query: CallbackQuery, bot: Bot):
await bot.edit_message_text(
inline_message_id=callback_query.inline_message_id,
- text='⚙️ Menu',
- reply_markup=get_full_menu_kb()
+ text="⚙️ Menu",
+ reply_markup=get_full_menu_kb(),
)
diff --git a/bot/callbacks/settings.py b/bot/callbacks/settings.py
index f940d1a..0029c4b 100644
--- a/bot/callbacks/settings.py
+++ b/bot/callbacks/settings.py
@@ -1,7 +1,5 @@
from aiogram import Router, Bot
-from aiogram.types import (
- CallbackQuery
-)
+from aiogram.types import CallbackQuery
from aiogram.exceptions import TelegramBadRequest
from bot.factories.open_setting import OpenSettingCallback, SettingChoiceCallback
@@ -14,25 +12,20 @@ router = Router()
@router.callback_query(OpenSettingCallback.filter())
async def on_settings(
- callback_query: CallbackQuery,
- callback_data: OpenSettingCallback,
- bot: Bot
+ callback_query: CallbackQuery, callback_data: OpenSettingCallback, bot: Bot
):
await bot.edit_message_text(
inline_message_id=callback_query.inline_message_id,
text=settings_strings[callback_data.s_id].description,
reply_markup=get_setting_kb(
- callback_data.s_id,
- str(callback_query.from_user.id)
- )
+ callback_data.s_id, str(callback_query.from_user.id)
+ ),
)
@router.callback_query(SettingChoiceCallback.filter())
async def on_change_setting(
- callback_query: CallbackQuery,
- callback_data: SettingChoiceCallback,
- bot: Bot
+ callback_query: CallbackQuery, callback_data: SettingChoiceCallback, bot: Bot
):
UserSettings(callback_query.from_user.id)[callback_data.s_id] = callback_data.choice
try:
@@ -40,9 +33,8 @@ async def on_change_setting(
inline_message_id=callback_query.inline_message_id,
text=settings_strings[callback_data.s_id].description,
reply_markup=get_setting_kb(
- callback_data.s_id,
- str(callback_query.from_user.id)
- )
+ callback_data.s_id, str(callback_query.from_user.id)
+ ),
)
except TelegramBadRequest:
pass
diff --git a/bot/common.py b/bot/common.py
index 572f0a2..4bc42f6 100644
--- a/bot/common.py
+++ b/bot/common.py
@@ -8,4 +8,4 @@ dp = Dispatcher(storage=InDbStorage())
console = Console()
-__all__ = ['bot', 'dp', 'config', 'console']
+__all__ = ["bot", "dp", "config", "console"]
diff --git a/bot/factories/full_menu.py b/bot/factories/full_menu.py
index ecb577f..edcbd86 100644
--- a/bot/factories/full_menu.py
+++ b/bot/factories/full_menu.py
@@ -2,5 +2,5 @@ from typing import Literal
from aiogram.filters.callback_data import CallbackData
-class FullMenuCallback(CallbackData, prefix='full_menu'):
- action: Literal['home', 'settings']
+class FullMenuCallback(CallbackData, prefix="full_menu"):
+ action: Literal["home", "settings"]
diff --git a/bot/factories/open_setting.py b/bot/factories/open_setting.py
index 9793c93..3a2fa7b 100644
--- a/bot/factories/open_setting.py
+++ b/bot/factories/open_setting.py
@@ -1,10 +1,10 @@
from aiogram.filters.callback_data import CallbackData
-class OpenSettingCallback(CallbackData, prefix='setting'):
+class OpenSettingCallback(CallbackData, prefix="setting"):
s_id: str
-class SettingChoiceCallback(CallbackData, prefix='s_choice'):
+class SettingChoiceCallback(CallbackData, prefix="s_choice"):
s_id: str
choice: str
diff --git a/bot/filters/search.py b/bot/filters/search.py
index 86d18fb..5405d04 100644
--- a/bot/filters/search.py
+++ b/bot/filters/search.py
@@ -4,22 +4,21 @@ from aiogram.types import InlineQuery
class ServiceSearchFilter(BaseFilter):
def __init__(self, service_letter: str):
- self.service_letter = f'{service_letter}:'
+ self.service_letter = f"{service_letter}:"
async def __call__(self, inline_query: InlineQuery):
return (
- inline_query.query.startswith(self.service_letter) and
- inline_query.query != self.service_letter
+ inline_query.query.startswith(self.service_letter)
+ and inline_query.query != self.service_letter
)
class ServiceSearchMultiletterFilter(BaseFilter):
def __init__(self, service_lettes: list[str]):
- self.service_letter = [f'{letter}:' for letter in service_lettes]
+ self.service_letter = [f"{letter}:" for letter in service_lettes]
async def __call__(self, inline_query: InlineQuery):
return (
- any(inline_query.query.startswith(letter) for letter in
- self.service_letter) and
- inline_query.query not in self.service_letter
+ any(inline_query.query.startswith(letter) for letter in self.service_letter)
+ and inline_query.query not in self.service_letter
)
diff --git a/bot/filters/url.py b/bot/filters/url.py
index d2c2755..4ee9034 100644
--- a/bot/filters/url.py
+++ b/bot/filters/url.py
@@ -9,24 +9,21 @@ class MusicUrlFilter(BaseFilter):
pass
async def __call__(self, inline_query: InlineQuery):
- if not inline_query.query.strip().startswith('http'):
+ if not inline_query.query.strip().startswith("http"):
return False
url = urlparse(inline_query.query)
- return (
- url.scheme in ['http', 'https'] and
- any(
- map(
- url.netloc.endswith,
- [
- 'youtube.com',
- 'youtu.be',
- 'open.spotify.com',
- 'spotify.link',
- 'deezer.page.link',
- 'deezer.com',
- 'soundcloud.com'
- ]
- )
- )
+ return url.scheme in ["http", "https"] and any(
+ map(
+ url.netloc.endswith,
+ [
+ "youtube.com",
+ "youtu.be",
+ "open.spotify.com",
+ "spotify.link",
+ "deezer.page.link",
+ "deezer.com",
+ "soundcloud.com",
+ ],
+ )
)
diff --git a/bot/handlers/initialize/initializer.py b/bot/handlers/initialize/initializer.py
index 629ce9e..306a125 100644
--- a/bot/handlers/initialize/initializer.py
+++ b/bot/handlers/initialize/initializer.py
@@ -7,4 +7,4 @@ router = Router()
@router.startup()
async def startup(bot: Bot):
- print(f'[green]Started as[/] @{(await bot.me()).username}')
+ print(f"[green]Started as[/] @{(await bot.me()).username}")
diff --git a/bot/handlers/inline_default/on_inline_default.py b/bot/handlers/inline_default/on_inline_default.py
index 924d286..221b639 100644
--- a/bot/handlers/inline_default/on_inline_default.py
+++ b/bot/handlers/inline_default/on_inline_default.py
@@ -12,15 +12,15 @@ from bot.modules.settings import UserSettings
router = Router()
-@router.inline_query(F.query != '')
+@router.inline_query(F.query != "")
async def default_inline_query(inline_query: InlineQuery, settings: UserSettings):
await inline_query.answer(
await {
- 'd': get_deezer_search_results,
- 'c': get_soundcloud_search_results,
- 'y': get_youtube_search_results,
- 's': get_spotify_search_results
- }[settings['default_search_provider'].value](inline_query.query, settings),
+ "d": get_deezer_search_results,
+ "c": get_soundcloud_search_results,
+ "y": get_youtube_search_results,
+ "s": get_spotify_search_results,
+ }[settings["default_search_provider"].value](inline_query.query, settings),
cache_time=0,
- is_personal=True
+ is_personal=True,
)
diff --git a/bot/handlers/inline_empty/on_inline_empty.py b/bot/handlers/inline_empty/on_inline_empty.py
index f745e2f..d3a9e25 100644
--- a/bot/handlers/inline_empty/on_inline_empty.py
+++ b/bot/handlers/inline_empty/on_inline_empty.py
@@ -1,24 +1,21 @@
from aiogram import Router, F
-from aiogram.types import (
- InlineQuery, InputTextMessageContent, InlineQueryResultArticle
-)
+from aiogram.types import InlineQuery, InputTextMessageContent, InlineQueryResultArticle
from bot.keyboards.inline.full_menu import get_full_menu_kb
router = Router()
-@router.inline_query(F.query == '')
+@router.inline_query(F.query == "")
async def empty_inline_query(inline_query: InlineQuery):
await inline_query.answer(
[
InlineQueryResultArticle(
- id='show_menu',
- title='⚙️ Open menu',
- input_message_content=InputTextMessageContent(
- message_text='⚙️ Menu'
- ),
- reply_markup=get_full_menu_kb()
+ id="show_menu",
+ title="⚙️ Open menu",
+ input_message_content=InputTextMessageContent(message_text="⚙️ Menu"),
+ reply_markup=get_full_menu_kb(),
)
- ], cache_time=0
+ ],
+ cache_time=0,
)
diff --git a/bot/handlers/inline_error/on_inline_error_info.py b/bot/handlers/inline_error/on_inline_error_info.py
index 2626e4f..b156997 100644
--- a/bot/handlers/inline_error/on_inline_error_info.py
+++ b/bot/handlers/inline_error/on_inline_error_info.py
@@ -8,10 +8,10 @@ from bot.filters import ServiceSearchFilter
router = Router()
-@router.inline_query(ServiceSearchFilter('error'))
+@router.inline_query(ServiceSearchFilter("error"))
async def search_spotify_inline_query(inline_query: InlineQuery):
await inline_query.answer(
- await get_error_search_results(inline_query.query.removeprefix('error:')),
+ await get_error_search_results(inline_query.query.removeprefix("error:")),
cache_time=0,
- is_personal=True
+ is_personal=True,
)
diff --git a/bot/handlers/inline_song/__init__.py b/bot/handlers/inline_song/__init__.py
index 7fddc2e..df416bd 100644
--- a/bot/handlers/inline_song/__init__.py
+++ b/bot/handlers/inline_song/__init__.py
@@ -1,12 +1,16 @@
from aiogram import Router
-from . import (on_inline_spotify, on_inline_deezer, on_inline_youtube,
- on_inline_soundcloud)
+from . import (
+ on_inline_spotify,
+ on_inline_deezer,
+ on_inline_youtube,
+ on_inline_soundcloud,
+)
router = Router()
router.include_routers(
on_inline_spotify.router,
on_inline_deezer.router,
on_inline_youtube.router,
- on_inline_soundcloud.router
+ on_inline_soundcloud.router,
)
diff --git a/bot/handlers/inline_song/on_inline_deezer.py b/bot/handlers/inline_song/on_inline_deezer.py
index 12a9298..4ff13c6 100644
--- a/bot/handlers/inline_song/on_inline_deezer.py
+++ b/bot/handlers/inline_song/on_inline_deezer.py
@@ -9,13 +9,12 @@ from bot.modules.settings import UserSettings
router = Router()
-@router.inline_query(ServiceSearchFilter('d'))
+@router.inline_query(ServiceSearchFilter("d"))
async def search_deezer_inline_query(inline_query: InlineQuery, settings: UserSettings):
await inline_query.answer(
await get_deezer_search_results(
- inline_query.query.removeprefix('d:'),
- settings
+ inline_query.query.removeprefix("d:"), settings
),
cache_time=0,
- is_personal=True
+ is_personal=True,
)
diff --git a/bot/handlers/inline_song/on_inline_soundcloud.py b/bot/handlers/inline_song/on_inline_soundcloud.py
index 2618cc2..22f0596 100644
--- a/bot/handlers/inline_song/on_inline_soundcloud.py
+++ b/bot/handlers/inline_song/on_inline_soundcloud.py
@@ -9,16 +9,14 @@ from bot.modules.settings import UserSettings
router = Router()
-@router.inline_query(ServiceSearchMultiletterFilter(['c', 'с']))
+@router.inline_query(ServiceSearchMultiletterFilter(["c", "с"]))
async def search_soundcloud_inline_query(
- inline_query: InlineQuery,
- settings: UserSettings
+ inline_query: InlineQuery, settings: UserSettings
):
await inline_query.answer(
await get_soundcloud_search_results(
- inline_query.query.removeprefix('c:').removesuffix('с:'),
- settings
+ inline_query.query.removeprefix("c:").removesuffix("с:"), settings
),
cache_time=0,
- is_personal=True
+ is_personal=True,
)
diff --git a/bot/handlers/inline_song/on_inline_spotify.py b/bot/handlers/inline_song/on_inline_spotify.py
index 1d54e2f..5122c9e 100644
--- a/bot/handlers/inline_song/on_inline_spotify.py
+++ b/bot/handlers/inline_song/on_inline_spotify.py
@@ -9,14 +9,14 @@ from bot.modules.settings import UserSettings
router = Router()
-@router.inline_query(ServiceSearchFilter('s'))
+@router.inline_query(ServiceSearchFilter("s"))
async def search_spotify_inline_query(
- inline_query: InlineQuery,
- settings: UserSettings
+ inline_query: InlineQuery, settings: UserSettings
):
await inline_query.answer(
- await get_spotify_search_results(inline_query.query.removeprefix('s:'),
- settings),
+ await get_spotify_search_results(
+ inline_query.query.removeprefix("s:"), settings
+ ),
cache_time=0,
- is_personal=True
+ is_personal=True,
)
diff --git a/bot/handlers/inline_song/on_inline_youtube.py b/bot/handlers/inline_song/on_inline_youtube.py
index 6271663..2b7a785 100644
--- a/bot/handlers/inline_song/on_inline_youtube.py
+++ b/bot/handlers/inline_song/on_inline_youtube.py
@@ -9,12 +9,14 @@ from bot.modules.settings import UserSettings
router = Router()
-@router.inline_query(ServiceSearchFilter('y'))
-async def search_youtube_inline_query(inline_query: InlineQuery,
- settings: UserSettings):
+@router.inline_query(ServiceSearchFilter("y"))
+async def search_youtube_inline_query(
+ inline_query: InlineQuery, settings: UserSettings
+):
await inline_query.answer(
- await get_youtube_search_results(inline_query.query.removeprefix('y:'),
- settings),
+ await get_youtube_search_results(
+ inline_query.query.removeprefix("y:"), settings
+ ),
cache_time=0,
- is_personal=True
+ is_personal=True,
)
diff --git a/bot/handlers/inline_url/on_inline_url.py b/bot/handlers/inline_url/on_inline_url.py
index 4707347..2c3ace4 100644
--- a/bot/handlers/inline_url/on_inline_url.py
+++ b/bot/handlers/inline_url/on_inline_url.py
@@ -14,5 +14,5 @@ async def url_deezer_inline_query(inline_query: InlineQuery, settings: UserSetti
await inline_query.answer(
await get_url_results(inline_query.query, settings),
cache_time=0,
- is_personal=True
+ is_personal=True,
)
diff --git a/bot/handlers/on_chosen/__init__.py b/bot/handlers/on_chosen/__init__.py
index 4d1ec12..156f09b 100644
--- a/bot/handlers/on_chosen/__init__.py
+++ b/bot/handlers/on_chosen/__init__.py
@@ -12,4 +12,4 @@ router.include_routers(
suppress_verify.router,
)
-__all__ = ['router']
+__all__ = ["router"]
diff --git a/bot/handlers/on_chosen/deezer.py b/bot/handlers/on_chosen/deezer.py
index a31821a..0ceb490 100644
--- a/bot/handlers/on_chosen/deezer.py
+++ b/bot/handlers/on_chosen/deezer.py
@@ -1,6 +1,8 @@
from aiogram import Router, Bot, F
from aiogram.types import (
- BufferedInputFile, URLInputFile, InputMediaAudio,
+ BufferedInputFile,
+ URLInputFile,
+ InputMediaAudio,
ChosenInlineResult,
)
@@ -11,11 +13,11 @@ from bot.modules.database import db
router = Router()
-@router.chosen_inline_result(F.result_id.startswith('deez::'))
+@router.chosen_inline_result(F.result_id.startswith("deez::"))
async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
- bytestream: DeezerBytestream = await (await deezer.downloader.from_id(
- chosen_result.result_id.removeprefix('deez::')
- )).to_bytestream()
+ bytestream: DeezerBytestream = await (
+ await deezer.downloader.from_id(chosen_result.result_id.removeprefix("deez::"))
+ ).to_bytestream()
audio = await bot.send_audio(
chat_id=config.telegram.files_chat,
@@ -34,5 +36,5 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
- reply_markup=None
+ reply_markup=None,
)
diff --git a/bot/handlers/on_chosen/recode_cached.py b/bot/handlers/on_chosen/recode_cached.py
index d693a2e..29eba26 100644
--- a/bot/handlers/on_chosen/recode_cached.py
+++ b/bot/handlers/on_chosen/recode_cached.py
@@ -1,6 +1,7 @@
from aiogram import Router, Bot, F
from aiogram.types import (
- BufferedInputFile, InputMediaAudio,
+ BufferedInputFile,
+ InputMediaAudio,
ChosenInlineResult,
)
@@ -16,42 +17,37 @@ router = Router()
@router.chosen_inline_result(
- F.result_id.startswith('spotc::') | F.result_id.startswith('ytc::')
+ F.result_id.startswith("spotc::") | F.result_id.startswith("ytc::")
)
-async def on_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot,
- settings: UserSettings):
- if settings['recode_youtube'].value != 'yes':
+async def on_cached_chosen(
+ chosen_result: ChosenInlineResult, bot: Bot, settings: UserSettings
+):
+ if settings["recode_youtube"].value != "yes":
await bot.edit_message_reply_markup(
- inline_message_id=chosen_result.inline_message_id,
- reply_markup=None
+ inline_message_id=chosen_result.inline_message_id, reply_markup=None
)
return
- if (
- type(
- db.recoded.get(
- song_id := chosen_result.result_id
- .removeprefix('spotc::')
- .removeprefix('ytc::')
+ if type(
+ db.recoded.get(
+ song_id := chosen_result.result_id.removeprefix("spotc::").removeprefix(
+ "ytc::"
)
- ) in [bool, type(None)]
- ):
+ )
+ ) in [bool, type(None)]:
await bot.edit_message_reply_markup(
- inline_message_id=chosen_result.inline_message_id,
- reply_markup=None
+ inline_message_id=chosen_result.inline_message_id, reply_markup=None
)
return
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='🔄 Recoding...',
- reply_markup=None
+ caption="🔄 Recoding...",
+ reply_markup=None,
)
message = await bot.forward_message(
- config.telegram.files_chat,
- config.telegram.files_chat,
- db.recoded[song_id]
+ config.telegram.files_chat, config.telegram.files_chat, db.recoded[song_id]
)
song_io: BytesIO = await bot.download( # type: ignore
@@ -76,7 +72,7 @@ async def on_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot,
),
thumbnail=BufferedInputFile(
file=(await bot.download(message.audio.thumbnail.file_id)).read(),
- filename='thumbnail.jpg'
+ filename="thumbnail.jpg",
),
performer=message.audio.performer,
title=message.audio.title,
@@ -85,15 +81,15 @@ async def on_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot,
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='',
+ caption="",
reply_markup=None,
)
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
- media=InputMediaAudio(media=audio.audio.file_id)
+ media=InputMediaAudio(media=audio.audio.file_id),
)
- if chosen_result.result_id.startswith('spotc::'):
+ if chosen_result.result_id.startswith("spotc::"):
db.spotify[song_id] = audio.audio.file_id
else:
db.youtube[song_id] = audio.audio.file_id
diff --git a/bot/handlers/on_chosen/soundcloud.py b/bot/handlers/on_chosen/soundcloud.py
index fc66b68..8021be3 100644
--- a/bot/handlers/on_chosen/soundcloud.py
+++ b/bot/handlers/on_chosen/soundcloud.py
@@ -1,6 +1,8 @@
from aiogram import Router, Bot, F
from aiogram.types import (
- BufferedInputFile, URLInputFile, InputMediaAudio,
+ BufferedInputFile,
+ URLInputFile,
+ InputMediaAudio,
ChosenInlineResult,
)
@@ -11,11 +13,13 @@ from bot.modules.database import db
router = Router()
-@router.chosen_inline_result(F.result_id.startswith('sc::'))
+@router.chosen_inline_result(F.result_id.startswith("sc::"))
async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
- bytestream: SoundCloudBytestream = await (await soundcloud.downloader.from_id(
- chosen_result.result_id.removeprefix('sc::')
- )).to_bytestream()
+ bytestream: SoundCloudBytestream = await (
+ await soundcloud.downloader.from_id(
+ chosen_result.result_id.removeprefix("sc::")
+ )
+ ).to_bytestream()
audio = await bot.send_audio(
chat_id=config.telegram.files_chat,
@@ -33,5 +37,5 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot):
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
- reply_markup=None
+ reply_markup=None,
)
diff --git a/bot/handlers/on_chosen/spotify.py b/bot/handlers/on_chosen/spotify.py
index 1155579..7c42cf2 100644
--- a/bot/handlers/on_chosen/spotify.py
+++ b/bot/handlers/on_chosen/spotify.py
@@ -1,6 +1,8 @@
from aiogram import Router, Bot, F
from aiogram.types import (
- BufferedInputFile, URLInputFile, InputMediaAudio,
+ BufferedInputFile,
+ URLInputFile,
+ InputMediaAudio,
ChosenInlineResult,
)
@@ -16,16 +18,17 @@ router = Router()
def not_strict_name(song, yt_song):
- if 'feat' in yt_song.name.lower():
+ if "feat" in yt_song.name.lower():
return any(artist.lower() in yt_song.name.lower() for artist in song.artists)
else:
return False
-@router.chosen_inline_result(F.result_id.startswith('spot::'))
-async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
- settings: UserSettings):
- song = spotify.songs.from_id(chosen_result.result_id.removeprefix('spot::'))
+@router.chosen_inline_result(F.result_id.startswith("spot::"))
+async def on_new_chosen(
+ chosen_result: ChosenInlineResult, bot: Bot, settings: UserSettings
+):
+ song = spotify.songs.from_id(chosen_result.result_id.removeprefix("spot::"))
bytestream = None
audio = None
@@ -34,14 +37,15 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
song.full_name,
exact_match=True,
)
- if settings['exact_spotify_search'].value == 'yes':
- if ((song.all_artists != yt_song.all_artists or song.name != yt_song.name)
- and not not_strict_name(song, yt_song)):
+ if settings["exact_spotify_search"].value == "yes":
+ if (
+ song.all_artists != yt_song.all_artists or song.name != yt_song.name
+ ) and not not_strict_name(song, yt_song):
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='🙄 Cannot find this song on YouTube, trying Deezer...',
+ caption="🙄 Cannot find this song on YouTube, trying Deezer...",
reply_markup=None,
- parse_mode='HTML',
+ parse_mode="HTML",
)
yt_song = None
bytestream = False
@@ -66,9 +70,9 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
except AgeRestrictedError:
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='🔞 This song is age restricted, trying Deezer...',
+ caption="🔞 This song is age restricted, trying Deezer...",
reply_markup=None,
- parse_mode='HTML',
+ parse_mode="HTML",
)
yt_song = None
@@ -99,29 +103,29 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
assert e
if audio:
- if settings['exact_spotify_search'].value == 'yes':
+ if settings["exact_spotify_search"].value == "yes":
db.spotify[song.id] = audio.audio.file_id
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
- reply_markup=None
+ reply_markup=None,
)
else:
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='🤷♂️ Cannot download this song',
+ caption="🤷♂️ Cannot download this song",
reply_markup=None,
- parse_mode='HTML',
+ parse_mode="HTML",
)
- if yt_song and settings['recode_youtube'].value == 'yes':
+ if yt_song and settings["recode_youtube"].value == "yes":
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='🔄 Recoding...',
+ caption="🔄 Recoding...",
reply_markup=None,
- parse_mode='HTML',
+ parse_mode="HTML",
)
await bytestream.rerender()
@@ -139,20 +143,20 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
db.youtube[yt_song.id] = audio.audio.file_id
db.recoded[yt_song.id] = True
- if settings['exact_spotify_search'].value == 'yes':
+ if settings["exact_spotify_search"].value == "yes":
db.spotify[song.id] = audio.audio.file_id
db.recoded[song.id] = True
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='',
+ caption="",
reply_markup=None,
)
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
- media=InputMediaAudio(media=audio.audio.file_id)
+ media=InputMediaAudio(media=audio.audio.file_id),
)
- elif yt_song and settings['recode_youtube'].value == 'no':
+ elif yt_song and settings["recode_youtube"].value == "no":
db.recoded[yt_song.id] = audio.message_id
- if settings['exact_spotify_search'].value == 'yes':
+ if settings["exact_spotify_search"].value == "yes":
db.recoded[song.id] = audio.message_id
diff --git a/bot/handlers/on_chosen/suppress_verify.py b/bot/handlers/on_chosen/suppress_verify.py
index b073544..ef20ac2 100644
--- a/bot/handlers/on_chosen/suppress_verify.py
+++ b/bot/handlers/on_chosen/suppress_verify.py
@@ -7,10 +7,9 @@ router = Router()
@router.chosen_inline_result(
- F.result_id.startswith('deezc::') | F.result_id.startswith('scc::')
+ F.result_id.startswith("deezc::") | F.result_id.startswith("scc::")
)
async def on_unneeded_cached_chosen(chosen_result: ChosenInlineResult, bot: Bot):
await bot.edit_message_reply_markup(
- inline_message_id=chosen_result.inline_message_id,
- reply_markup=None
+ inline_message_id=chosen_result.inline_message_id, reply_markup=None
)
diff --git a/bot/handlers/on_chosen/youtube.py b/bot/handlers/on_chosen/youtube.py
index 444bdaa..57bdad3 100644
--- a/bot/handlers/on_chosen/youtube.py
+++ b/bot/handlers/on_chosen/youtube.py
@@ -1,6 +1,8 @@
from aiogram import Router, Bot, F
from aiogram.types import (
- BufferedInputFile, URLInputFile, InputMediaAudio,
+ BufferedInputFile,
+ URLInputFile,
+ InputMediaAudio,
ChosenInlineResult,
)
@@ -12,19 +14,20 @@ from bot.modules.settings import UserSettings
router = Router()
-@router.chosen_inline_result(F.result_id.startswith('yt::'))
-async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
- settings: UserSettings):
- song = youtube.songs.from_id(chosen_result.result_id.removeprefix('yt::'))
+@router.chosen_inline_result(F.result_id.startswith("yt::"))
+async def on_new_chosen(
+ chosen_result: ChosenInlineResult, bot: Bot, settings: UserSettings
+):
+ song = youtube.songs.from_id(chosen_result.result_id.removeprefix("yt::"))
try:
bytestream = await song.to_bytestream()
except AgeRestrictedError:
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='🔞 This song is age restricted, so I can\'t download it. '
- 'Try downloading it from Deezer or SoundCloud',
- reply_markup=None
+ caption="🔞 This song is age restricted, so I can't download it. "
+ "Try downloading it from Deezer or SoundCloud",
+ reply_markup=None,
)
return
@@ -45,14 +48,14 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
- reply_markup=None
+ reply_markup=None,
)
- if settings['recode_youtube'].value == 'yes':
+ if settings["recode_youtube"].value == "yes":
await bot.edit_message_caption(
inline_message_id=chosen_result.inline_message_id,
- caption='🔄 Recoding...',
- reply_markup=None
+ caption="🔄 Recoding...",
+ reply_markup=None,
)
await bytestream.rerender()
@@ -75,7 +78,7 @@ async def on_new_chosen(chosen_result: ChosenInlineResult, bot: Bot,
await bot.edit_message_media(
inline_message_id=chosen_result.inline_message_id,
media=InputMediaAudio(media=audio.audio.file_id),
- reply_markup=None
+ reply_markup=None,
)
else:
db.recoded[song.id] = audio.message_id
diff --git a/bot/keyboards/inline/full_menu.py b/bot/keyboards/inline/full_menu.py
index 639c268..9e259ca 100644
--- a/bot/keyboards/inline/full_menu.py
+++ b/bot/keyboards/inline/full_menu.py
@@ -1,27 +1,23 @@
-from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
- InlineKeyboardBuilder)
+from aiogram.utils.keyboard import (
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
+ InlineKeyboardBuilder,
+)
from bot.factories.full_menu import FullMenuCallback
from bot.keyboards.inline import search_variants as sv
def get_full_menu_kb() -> InlineKeyboardMarkup:
- buttons = (sv.get_search_variants(
- query='',
- services=
- sv.soundcloud |
- sv.spotify |
- sv.deezer |
- sv.youtube
+ buttons = sv.get_search_variants(
+ query="", services=sv.soundcloud | sv.spotify | sv.deezer | sv.youtube
) + [
- [
- InlineKeyboardButton(
- text='⚙️ Settings',
- callback_data=FullMenuCallback(
- action='settings'
- ).pack()
- )
- ],
- ])
+ [
+ InlineKeyboardButton(
+ text="⚙️ Settings",
+ callback_data=FullMenuCallback(action="settings").pack(),
+ )
+ ],
+ ]
return InlineKeyboardBuilder(buttons).as_markup()
diff --git a/bot/keyboards/inline/search_variants.py b/bot/keyboards/inline/search_variants.py
index 5170348..6423577 100644
--- a/bot/keyboards/inline/search_variants.py
+++ b/bot/keyboards/inline/search_variants.py
@@ -1,42 +1,34 @@
-from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
- InlineKeyboardBuilder)
+from aiogram.utils.keyboard import (
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
+ InlineKeyboardBuilder,
+)
-deezer = {
- 'd': '🎵 Search in Deezer'
-}
-soundcloud = {
- 'c': '☁️ Search in SoundCloud'
-}
-youtube = {
- 'y': '▶️ Search in YouTube'
-}
-spotify = {
- 's': '🎧 Search in Spotify'
-}
+deezer = {"d": "🎵 Search in Deezer"}
+soundcloud = {"c": "☁️ Search in SoundCloud"}
+youtube = {"y": "▶️ Search in YouTube"}
+spotify = {"s": "🎧 Search in Spotify"}
def get_search_variants(
- query: str,
- services: dict[str, str],
+ query: str,
+ services: dict[str, str],
) -> list[list[InlineKeyboardButton]]:
buttons = [
[
InlineKeyboardButton(
- text=services[key],
- switch_inline_query_current_chat=f'{key}:{query}'
+ text=services[key], switch_inline_query_current_chat=f"{key}:{query}"
)
- ] for key in services.keys()
+ ]
+ for key in services.keys()
]
return buttons
def get_search_variants_kb(
- query: str,
- services: dict[str, str],
+ query: str,
+ services: dict[str, str],
) -> InlineKeyboardMarkup:
- return InlineKeyboardBuilder(get_search_variants(
- query,
- services
- )).as_markup()
+ return InlineKeyboardBuilder(get_search_variants(query, services)).as_markup()
diff --git a/bot/keyboards/inline/setting.py b/bot/keyboards/inline/setting.py
index fc5a729..edf1587 100644
--- a/bot/keyboards/inline/setting.py
+++ b/bot/keyboards/inline/setting.py
@@ -1,5 +1,8 @@
-from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
- InlineKeyboardBuilder)
+from aiogram.utils.keyboard import (
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
+ InlineKeyboardBuilder,
+)
from bot.factories.open_setting import SettingChoiceCallback
from bot.factories.full_menu import FullMenuCallback
@@ -11,22 +14,21 @@ def get_setting_kb(s_id: str, user_id: str) -> InlineKeyboardMarkup:
buttons = [
[
InlineKeyboardButton(
- text=(
- '✅ ' if setting.value == choice else ''
- ) + setting.choices[choice],
+ text=("✅ " if setting.value == choice else "")
+ + setting.choices[choice],
callback_data=SettingChoiceCallback(
s_id=s_id,
choice=choice,
- ).pack()
+ ).pack(),
)
- ] for choice in setting.choices.keys()
- ] + [[
- InlineKeyboardButton(
- text='🔙',
- callback_data=FullMenuCallback(
- action='settings'
- ).pack()
- )
- ]]
+ ]
+ for choice in setting.choices.keys()
+ ] + [
+ [
+ InlineKeyboardButton(
+ text="🔙", callback_data=FullMenuCallback(action="settings").pack()
+ )
+ ]
+ ]
return InlineKeyboardBuilder(buttons).as_markup()
diff --git a/bot/keyboards/inline/settings.py b/bot/keyboards/inline/settings.py
index d643984..4c0d65f 100644
--- a/bot/keyboards/inline/settings.py
+++ b/bot/keyboards/inline/settings.py
@@ -1,5 +1,8 @@
-from aiogram.utils.keyboard import (InlineKeyboardMarkup, InlineKeyboardButton,
- InlineKeyboardBuilder)
+from aiogram.utils.keyboard import (
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
+ InlineKeyboardBuilder,
+)
from bot.factories.open_setting import OpenSettingCallback
from bot.factories.full_menu import FullMenuCallback
@@ -13,16 +16,16 @@ def get_settings_kb() -> InlineKeyboardMarkup:
text=settings_strings[setting_id].name,
callback_data=OpenSettingCallback(
s_id=setting_id,
- ).pack()
+ ).pack(),
)
- ] for setting_id in settings_strings.keys()
- ] + [[
- InlineKeyboardButton(
- text='🔙',
- callback_data=FullMenuCallback(
- action='home'
- ).pack()
- )
- ]]
+ ]
+ for setting_id in settings_strings.keys()
+ ] + [
+ [
+ InlineKeyboardButton(
+ text="🔙", callback_data=FullMenuCallback(action="home").pack()
+ )
+ ]
+ ]
return InlineKeyboardBuilder(buttons).as_markup()
diff --git a/bot/middlewares/inject_settings.py b/bot/middlewares/inject_settings.py
index eb0a120..1b21115 100644
--- a/bot/middlewares/inject_settings.py
+++ b/bot/middlewares/inject_settings.py
@@ -8,19 +8,20 @@ from bot.modules.settings import UserSettings
class SettingsInjectorMiddleware(BaseMiddleware):
async def __call__(
- self,
- handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
- event: TelegramObject,
- data: Dict[str, Any],
+ self,
+ handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
+ event: TelegramObject,
+ data: Dict[str, Any],
):
- if (not hasattr(event, 'from_user') and
- (not hasattr(event, 'inline_query') or event.inline_query is None)):
+ if not hasattr(event, "from_user") and (
+ not hasattr(event, "inline_query") or event.inline_query is None
+ ):
return await handler(event, data)
- elif hasattr(event, 'inline_query') and event.inline_query is not None:
+ elif hasattr(event, "inline_query") and event.inline_query is not None:
settings = UserSettings(event.inline_query.from_user.id)
- data['settings'] = settings
+ data["settings"] = settings
else:
settings = UserSettings(event.from_user.id)
- data['settings'] = settings
+ data["settings"] = settings
return await handler(event, data)
diff --git a/bot/middlewares/private_button.py b/bot/middlewares/private_button.py
index a2a1743..b4dfa22 100644
--- a/bot/middlewares/private_button.py
+++ b/bot/middlewares/private_button.py
@@ -8,12 +8,12 @@ from bot.modules.database import db
class PrivateButtonMiddleware(BaseMiddleware):
async def __call__(
- self,
- handler: Callable[[CallbackQuery, Dict[str, Any]], Awaitable[Any]],
- event: CallbackQuery,
- data: Dict[str, Any],
+ self,
+ handler: Callable[[CallbackQuery, Dict[str, Any]], Awaitable[Any]],
+ event: CallbackQuery,
+ data: Dict[str, Any],
):
if event.from_user.id == db.inline[event.inline_message_id].from_user.id:
return await handler(event, data)
else:
- await event.answer('This button is not for you')
+ await event.answer("This button is not for you")
diff --git a/bot/middlewares/save_chosen.py b/bot/middlewares/save_chosen.py
index 6a59e61..d0ff446 100644
--- a/bot/middlewares/save_chosen.py
+++ b/bot/middlewares/save_chosen.py
@@ -26,10 +26,10 @@ class SavedResult:
class SaveChosenMiddleware(BaseMiddleware):
async def __call__(
- self,
- handler: Callable[[ChosenInlineResult, Dict[str, Any]], Awaitable[Any]],
- event: ChosenInlineResult,
- data: Dict[str, Any],
+ self,
+ handler: Callable[[ChosenInlineResult, Dict[str, Any]], Awaitable[Any]],
+ event: ChosenInlineResult,
+ data: Dict[str, Any],
):
db.inline[event.inline_message_id] = SavedResult(
result_id=event.result_id,
@@ -38,9 +38,9 @@ class SaveChosenMiddleware(BaseMiddleware):
first_name=event.from_user.first_name,
last_name=event.from_user.last_name,
username=event.from_user.username,
- language_code=event.from_user.language_code
+ language_code=event.from_user.language_code,
),
query=event.query,
- inline_message_id=event.inline_message_id
+ inline_message_id=event.inline_message_id,
)
return await handler(event, data)
diff --git a/bot/modules/common/song/song.py b/bot/modules/common/song/song.py
index b2f822f..3ce9d13 100644
--- a/bot/modules/common/song/song.py
+++ b/bot/modules/common/song/song.py
@@ -11,7 +11,7 @@ class BaseSongItem:
@property
def all_artists(self):
- return ', '.join(self.artists)
+ return ", ".join(self.artists)
@property
def full_name(self):
diff --git a/bot/modules/database/__init__.py b/bot/modules/database/__init__.py
index f54bf8c..d3e1d04 100644
--- a/bot/modules/database/__init__.py
+++ b/bot/modules/database/__init__.py
@@ -3,4 +3,4 @@ from .db import Db
db = Db()
-__all__ = ['db']
+__all__ = ["db"]
diff --git a/bot/modules/database/db.py b/bot/modules/database/db.py
index 0484ddc..b5fc144 100644
--- a/bot/modules/database/db.py
+++ b/bot/modules/database/db.py
@@ -3,13 +3,13 @@ from .db_model import DBDict
class Db(object):
def __init__(self):
- self.fsm = DBDict('fsm')
- self.config = DBDict('config')
- self.inline = DBDict('inline')
- self.errors = DBDict('errors')
- self.settings = DBDict('settings')
- self.spotify = DBDict('spotify')
- self.deezer = DBDict('deezer')
- self.youtube = DBDict('youtube')
- self.soundcloud = DBDict('soundcloud')
- self.recoded = DBDict('recoded')
+ self.fsm = DBDict("fsm")
+ self.config = DBDict("config")
+ self.inline = DBDict("inline")
+ self.errors = DBDict("errors")
+ self.settings = DBDict("settings")
+ self.spotify = DBDict("spotify")
+ self.deezer = DBDict("deezer")
+ self.youtube = DBDict("youtube")
+ self.soundcloud = DBDict("soundcloud")
+ self.recoded = DBDict("recoded")
diff --git a/bot/modules/deezer/__init__.py b/bot/modules/deezer/__init__.py
index 53e4f3c..28f8641 100644
--- a/bot/modules/deezer/__init__.py
+++ b/bot/modules/deezer/__init__.py
@@ -7,4 +7,4 @@ deezer = Deezer(
arl=config.tokens.deezer.arl,
)
-__all__ = ['deezer', 'DeezerBytestream']
+__all__ = ["deezer", "DeezerBytestream"]
diff --git a/bot/modules/deezer/downloader.py b/bot/modules/deezer/downloader.py
index ab24c3a..f30ccb3 100644
--- a/bot/modules/deezer/downloader.py
+++ b/bot/modules/deezer/downloader.py
@@ -17,10 +17,7 @@ class DeezerBytestream:
@classmethod
def from_bytestream(
- cls,
- bytestream: BytesIO,
- filename: str,
- full_song: FullSongItem
+ cls, bytestream: BytesIO, filename: str, full_song: FullSongItem
):
bytestream.seek(0)
return cls(
@@ -38,21 +35,18 @@ class Downloader:
song: FullSongItem
@classmethod
- async def build(
- cls,
- song_id: str,
- driver: DeezerDriver
- ):
+ async def build(cls, song_id: str, driver: DeezerDriver):
track = await driver.reverse_get_track(song_id)
try:
return cls(
song_id=str(song_id),
driver=driver,
- track=track['results'],
- song=await FullSongItem.from_deezer(track)
+ track=track["results"],
+ song=await FullSongItem.from_deezer(track),
)
except KeyError:
from icecream import ic
+
ic(track)
await driver.renew_engine()
return await cls.build(song_id, driver)
@@ -65,7 +59,7 @@ class Downloader:
audio = BytesIO()
async for chunk in self.driver.engine.get_data_iter(
- await self._get_download_url(quality=quality)
+ await self._get_download_url(quality=quality)
):
if i % 3 > 0 or len(chunk) < 2 * 1024:
audio.write(chunk)
@@ -76,18 +70,16 @@ class Downloader:
return DeezerBytestream.from_bytestream(
filename=self.song.full_name + track_formats.TRACK_FORMAT_MAP[quality].ext,
bytestream=audio,
- full_song=self.song
+ full_song=self.song,
)
- async def _get_download_url(self, quality: str = 'MP3_128'):
+ async def _get_download_url(self, quality: str = "MP3_128"):
md5_origin = self.track["MD5_ORIGIN"]
track_id = self.track["SNG_ID"]
media_version = self.track["MEDIA_VERSION"]
url_decrypter = UrlDecrypter(
- md5_origin=md5_origin,
- track_id=track_id,
- media_version=media_version
+ md5_origin=md5_origin, track_id=track_id, media_version=media_version
)
return url_decrypter.get_url_for(track_formats.TRACK_FORMAT_MAP[quality])
@@ -98,7 +90,4 @@ class DownloaderBuilder:
driver: DeezerDriver
async def from_id(self, song_id: str):
- return await Downloader.build(
- song_id=song_id,
- driver=self.driver
- )
+ return await Downloader.build(song_id=song_id, driver=self.driver)
diff --git a/bot/modules/deezer/driver.py b/bot/modules/deezer/driver.py
index 5b3aa15..2085bf1 100644
--- a/bot/modules/deezer/driver.py
+++ b/bot/modules/deezer/driver.py
@@ -10,30 +10,19 @@ class DeezerDriver:
engine: DeezerEngine
async def get_track(self, track_id: int | str):
- data = await self.engine.call_legacy_api(
- f'track/{track_id}'
- )
+ data = await self.engine.call_legacy_api(f"track/{track_id}")
return data
async def reverse_get_track(self, track_id: str):
- return await self.engine.call_api(
- 'song.getData',
- params={
- 'SNG_ID': track_id
- }
- )
+ return await self.engine.call_api("song.getData", params={"SNG_ID": track_id})
async def search(self, query: str, limit: int = 30):
data = await self.engine.call_legacy_api(
- 'search/track',
- params={
- 'q': clean_query(query),
- 'limit': limit
- }
+ "search/track", params={"q": clean_query(query), "limit": limit}
)
- return data['data']
+ return data["data"]
async def renew_engine(self):
self.engine = await self.engine.from_arl(self.engine.arl)
diff --git a/bot/modules/deezer/engine.py b/bot/modules/deezer/engine.py
index 08eaeec..d87f800 100644
--- a/bot/modules/deezer/engine.py
+++ b/bot/modules/deezer/engine.py
@@ -7,13 +7,13 @@ from attrs import define
HTTP_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
+ "(KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
"Content-Language": "en-US",
"Cache-Control": "max-age=0",
"Accept": "*/*",
"Accept-Charset": "utf-8,ISO-8859-1;q=0.7,*;q=0.3",
"Accept-Language": "en-US,en;q=0.9,en-US;q=0.8,en;q=0.7",
- "Connection": 'keep-alive'
+ "Connection": "keep-alive",
}
@@ -25,28 +25,22 @@ class DeezerEngine:
@classmethod
async def from_arl(cls, arl: str):
- cookies = {'arl': arl}
+ cookies = {"arl": arl}
data, cookies = await cls(cookies).call_api(
- 'deezer.getUserData', get_cookies=True
+ "deezer.getUserData", get_cookies=True
)
- data = data['results']
- token = data['checkForm']
+ data = data["results"]
+ token = data["checkForm"]
- return cls(
- cookies=cookies,
- arl=arl,
- token=token
- )
+ return cls(cookies=cookies, arl=arl, token=token)
- async def call_legacy_api(
- self, request_point: str, params: dict = None
- ):
+ async def call_legacy_api(self, request_point: str, params: dict = None):
async with aiohttp.ClientSession(cookies=self.cookies) as session:
async with session.get(
- f"https://api.deezer.com/{request_point}",
- params=params,
- headers=HTTP_HEADERS
+ f"https://api.deezer.com/{request_point}",
+ params=params,
+ headers=HTTP_HEADERS,
) as r:
return await r.json()
@@ -63,31 +57,26 @@ class DeezerEngine:
async def get_data_iter(self, url: str):
async with aiohttp.ClientSession(
- cookies=self.cookies,
- headers=HTTP_HEADERS
+ cookies=self.cookies, headers=HTTP_HEADERS
) as session:
- r = await session.get(
- url,
- allow_redirects=True
- )
+ r = await session.get(url, allow_redirects=True)
async for chunk in self._iter_exact_chunks(r):
yield chunk
async def call_api(
- self, method: str, params: dict = None,
- get_cookies: bool = False
+ self, method: str, params: dict = None, get_cookies: bool = False
):
async with aiohttp.ClientSession(cookies=self.cookies) as session:
async with session.post(
- f"https://www.deezer.com/ajax/gw-light.php",
- params={
- 'method': method,
- 'api_version': '1.0',
- 'input': '3',
- 'api_token': self.token or 'null',
- },
- headers=HTTP_HEADERS,
- json=params
+ f"https://www.deezer.com/ajax/gw-light.php",
+ params={
+ "method": method,
+ "api_version": "1.0",
+ "input": "3",
+ "api_token": self.token or "null",
+ },
+ headers=HTTP_HEADERS,
+ json=params,
) as r:
if not get_cookies:
return await r.json()
diff --git a/bot/modules/deezer/song.py b/bot/modules/deezer/song.py
index 9954466..c2f3f35 100644
--- a/bot/modules/deezer/song.py
+++ b/bot/modules/deezer/song.py
@@ -10,11 +10,11 @@ class SongItem(BaseSongItem):
@classmethod
def from_deezer(cls, song_item: dict):
return cls(
- name=song_item['title'],
- id=str(song_item['id']),
- artists=[song_item['artist']['name']],
- preview_url=song_item.get('preview'),
- thumbnail=song_item['album']['cover_medium']
+ name=song_item["title"],
+ id=str(song_item["id"]),
+ artists=[song_item["artist"]["name"]],
+ preview_url=song_item.get("preview"),
+ thumbnail=song_item["album"]["cover_medium"],
)
@@ -25,21 +25,23 @@ class FullSongItem(BaseSongItem):
@classmethod
async def from_deezer(cls, song_item: dict):
- if song_item.get('results'):
- song_item = song_item['results']
+ if song_item.get("results"):
+ song_item = song_item["results"]
return cls(
- name=song_item['SNG_TITLE'],
- id=song_item['SNG_ID'],
- artists=[artist['ART_NAME'] for artist in song_item['ARTISTS']],
- preview_url=(song_item.get('MEDIA').get('HREF')
- if type(song_item.get('MEDIA')) is dict and
- song_item.get('MEDIA').get('TYPE') == 'preview'
- else None),
- thumbnail=f'https://e-cdns-images.dzcdn.net/images/cover/'
- f'{song_item["ALB_PICTURE"]}/320x320.jpg',
- duration=int(song_item['DURATION']),
- track_dict=song_item
+ name=song_item["SNG_TITLE"],
+ id=song_item["SNG_ID"],
+ artists=[artist["ART_NAME"] for artist in song_item["ARTISTS"]],
+ preview_url=(
+ song_item.get("MEDIA").get("HREF")
+ if type(song_item.get("MEDIA")) is dict
+ and song_item.get("MEDIA").get("TYPE") == "preview"
+ else None
+ ),
+ thumbnail=f"https://e-cdns-images.dzcdn.net/images/cover/"
+ f'{song_item["ALB_PICTURE"]}/320x320.jpg',
+ duration=int(song_item["DURATION"]),
+ track_dict=song_item,
)
diff --git a/bot/modules/deezer/track_formats.py b/bot/modules/deezer/track_formats.py
index 700eaee..7c5e3e6 100644
--- a/bot/modules/deezer/track_formats.py
+++ b/bot/modules/deezer/track_formats.py
@@ -19,32 +19,11 @@ class TrackFormat:
TRACK_FORMAT_MAP = {
- FLAC: TrackFormat(
- code=9,
- ext=".flac"
- ),
- MP3_128: TrackFormat(
- code=1,
- ext=".mp3"
- ),
- MP3_256: TrackFormat(
- code=5,
- ext=".mp3"
- ),
- MP3_320: TrackFormat(
- code=3,
- ext=".mp3"
- ),
- MP4_RA1: TrackFormat(
- code=13,
- ext=".mp4"
- ),
- MP4_RA2: TrackFormat(
- code=14,
- ext=".mp4"
- ),
- MP4_RA3: TrackFormat(
- code=15,
- ext=".mp3"
- )
+ FLAC: TrackFormat(code=9, ext=".flac"),
+ MP3_128: TrackFormat(code=1, ext=".mp3"),
+ MP3_256: TrackFormat(code=5, ext=".mp3"),
+ MP3_320: TrackFormat(code=3, ext=".mp3"),
+ MP4_RA1: TrackFormat(code=13, ext=".mp4"),
+ MP4_RA2: TrackFormat(code=14, ext=".mp4"),
+ MP4_RA3: TrackFormat(code=15, ext=".mp3"),
}
diff --git a/bot/modules/deezer/util.py b/bot/modules/deezer/util.py
index 14417dc..5643d07 100644
--- a/bot/modules/deezer/util.py
+++ b/bot/modules/deezer/util.py
@@ -34,20 +34,20 @@ class UrlDecrypter:
media_version: str
def get_url_for(self, track_format: TrackFormat):
- step1 = (f'{self.md5_origin}¤{track_format.code}¤'
- f'{self.track_id}¤{self.media_version}')
+ step1 = (
+ f"{self.md5_origin}¤{track_format.code}¤"
+ f"{self.track_id}¤{self.media_version}"
+ )
m = hashlib.md5()
m.update(bytes([ord(x) for x in step1]))
- step2 = f'{m.hexdigest()}¤{step1}¤'
+ step2 = f"{m.hexdigest()}¤{step1}¤"
step2 = step2.ljust(80, " ")
cipher = Cipher(
- algorithm=algorithms.AES(
- key=bytes('jo6aey6haid2Teih', 'ascii')
- ),
+ algorithm=algorithms.AES(key=bytes("jo6aey6haid2Teih", "ascii")),
mode=modes.ECB(),
- backend=default_backend()
+ backend=default_backend(),
)
encryptor = cipher.encryptor()
@@ -55,7 +55,7 @@ class UrlDecrypter:
cdn = self.md5_origin[0]
- return f'https://e-cdns-proxy-{cdn}.dzcdn.net/mobile/1/{step3}'
+ return f"https://e-cdns-proxy-{cdn}.dzcdn.net/mobile/1/{step3}"
@define
@@ -69,12 +69,10 @@ class ChunkDecrypter:
cipher = Cipher(
algorithms.Blowfish(get_blowfish_key(track_id)),
modes.CBC(bytes([i for i in range(8)])),
- default_backend()
+ default_backend(),
)
- return cls(
- cipher=cipher
- )
+ return cls(cipher=cipher)
def decrypt_chunk(self, chunk: bytes):
decryptor = self.cipher.decryptor()
@@ -82,7 +80,7 @@ class ChunkDecrypter:
def get_blowfish_key(track_id: str):
- secret = 'g4el58wc0zvf9na1'
+ secret = "g4el58wc0zvf9na1"
m = hashlib.md5()
m.update(bytes([ord(x) for x in track_id]))
diff --git a/bot/modules/error/handler.py b/bot/modules/error/handler.py
index 93a481d..4235ffa 100644
--- a/bot/modules/error/handler.py
+++ b/bot/modules/error/handler.py
@@ -42,9 +42,9 @@ async def on_error(event: ErrorEvent, bot: Bot):
await bot.edit_message_caption(
inline_message_id=event.update.chosen_inline_result.inline_message_id,
- caption=f'💔 ERROR occurred. Use this code to get more information: '
- f'{error_id}',
- parse_mode='HTML',
+ caption=f"💔 ERROR occurred. Use this code to get more information: "
+ f"{error_id}",
+ parse_mode="HTML",
)
else:
@@ -53,7 +53,7 @@ async def on_error(event: ErrorEvent, bot: Bot):
exception=pretty_exception,
)
- console.print(f'[red]{error_id} occurred[/]')
+ console.print(f"[red]{error_id} occurred[/]")
console.print(event)
console.print(traceback)
- console.print(f'-{error_id} occurred-')
+ console.print(f"-{error_id} occurred-")
diff --git a/bot/modules/error/pretty.py b/bot/modules/error/pretty.py
index a65337a..537feb9 100644
--- a/bot/modules/error/pretty.py
+++ b/bot/modules/error/pretty.py
@@ -13,12 +13,14 @@ class PrettyException:
🐊 {e.__traceback__.tb_frame.f_code.co_filename.replace(os.getcwd(), "")}\r
:{e.__traceback__.tb_frame.f_lineno}
"""
- self.short = (f'{e.__class__.__name__}: '
- f'{"".join(traceback.format_exception_only(e)).strip()}')
+ self.short = (
+ f"{e.__class__.__name__}: "
+ f'{"".join(traceback.format_exception_only(e)).strip()}'
+ )
- self.pretty_exception = (f"{self.long}\n\n"
- f"⬇️ Trace:"
- f"{self.get_full_stack()}")
+ self.pretty_exception = (
+ f"{self.long}\n\n" f"⬇️ Trace:" f"{self.get_full_stack()}"
+ )
@staticmethod
def get_full_stack():
@@ -40,9 +42,11 @@ class PrettyException:
full_stack = "\n".join(
[
- format_line(line)
- if re.search(line_regex, line)
- else f"{line}"
+ (
+ format_line(line)
+ if re.search(line_regex, line)
+ else f"{line}"
+ )
for line in full_stack.splitlines()
]
)
diff --git a/bot/modules/fsm/in_db.py b/bot/modules/fsm/in_db.py
index ba07ce8..9c6bac4 100644
--- a/bot/modules/fsm/in_db.py
+++ b/bot/modules/fsm/in_db.py
@@ -18,14 +18,14 @@ class MemoryStorageRecord:
class StorageDict(DefaultDict):
def __init__(self, default_factory=None) -> None:
- if type(db.fsm.get('fsm')) is not dict:
- db.fsm['fsm'] = dict()
+ if type(db.fsm.get("fsm")) is not dict:
+ db.fsm["fsm"] = dict()
- super().__init__(default_factory, db.fsm['fsm'])
+ super().__init__(default_factory, db.fsm["fsm"])
def __setitem__(self, key, value):
super().__setitem__(key, value)
- db.fsm['fsm'] = dict(self)
+ db.fsm["fsm"] = dict(self)
class InDbStorage(BaseStorage):
diff --git a/bot/modules/settings/model.py b/bot/modules/settings/model.py
index b601ac6..40c3e7e 100644
--- a/bot/modules/settings/model.py
+++ b/bot/modules/settings/model.py
@@ -11,46 +11,32 @@ class Setting:
settings_strings: dict[str, Setting] = {
- 'search_preview': Setting(
- name='Search preview',
- description='Show only covers (better display), '
- 'or add 30 seconds of track preview whenever possible?',
- choices={
- 'cover': 'Cover picture',
- 'preview': 'Audio preview'
- },
+ "search_preview": Setting(
+ name="Search preview",
+ description="Show only covers (better display), "
+ "or add 30 seconds of track preview whenever possible?",
+ choices={"cover": "Cover picture", "preview": "Audio preview"},
),
- 'recode_youtube': Setting(
- name='Recode YouTube (and Spotify)',
- description='Recode when downloading from YouTube (and Spotify) to '
- 'more compatible format (may take some time)',
- choices={
- 'no': 'Send original file',
- 'yes': 'Recode to libmp3lame'
- },
+ "recode_youtube": Setting(
+ name="Recode YouTube (and Spotify)",
+ description="Recode when downloading from YouTube (and Spotify) to "
+ "more compatible format (may take some time)",
+ choices={"no": "Send original file", "yes": "Recode to libmp3lame"},
),
- 'exact_spotify_search': Setting(
- name='Only exact Spotify matches',
- description='When searching on Youtube from Spotify, show only exact matches, '
- 'may protect against inaccurate matches, but at the same time it '
- 'can lose reuploaded tracks. Should be enabled always, except in '
- 'situations where the track is not found on both YouTube and '
- 'Deezer',
- choices={
- 'yes': 'Only exact matches',
- 'no': 'Fuzzy matches also'
- },
+ "exact_spotify_search": Setting(
+ name="Only exact Spotify matches",
+ description="When searching on Youtube from Spotify, show only exact matches, "
+ "may protect against inaccurate matches, but at the same time it "
+ "can lose reuploaded tracks. Should be enabled always, except in "
+ "situations where the track is not found on both YouTube and "
+ "Deezer",
+ choices={"yes": "Only exact matches", "no": "Fuzzy matches also"},
+ ),
+ "default_search_provider": Setting(
+ name="Default search provider",
+ description="Which service to use when searching without service filter",
+ choices={"d": "Deezer", "c": "SoundCloud", "y": "YouTube", "s": "Spotify"},
),
- 'default_search_provider': Setting(
- name='Default search provider',
- description='Which service to use when searching without service filter',
- choices={
- 'd': 'Deezer',
- 'c': 'SoundCloud',
- 'y': 'YouTube',
- 's': 'Spotify'
- }
- )
}
@@ -64,8 +50,8 @@ class UserSettings:
if db.settings.get(self.user_id) is None:
db.settings[self.user_id] = dict(
- (setting, list(settings_strings[setting].choices)[0]) for setting in
- settings_strings
+ (setting, list(settings_strings[setting].choices)[0])
+ for setting in settings_strings
)
def __getitem__(self, item):
diff --git a/bot/modules/soundcloud/__init__.py b/bot/modules/soundcloud/__init__.py
index da4c00d..195d79c 100644
--- a/bot/modules/soundcloud/__init__.py
+++ b/bot/modules/soundcloud/__init__.py
@@ -7,4 +7,4 @@ soundcloud = SoundCloud(
client_id=config.tokens.soundcloud.client_id,
)
-__all__ = ['soundcloud', 'SoundCloudBytestream']
+__all__ = ["soundcloud", "SoundCloudBytestream"]
diff --git a/bot/modules/soundcloud/downloader.py b/bot/modules/soundcloud/downloader.py
index 15d7a29..724ff2e 100644
--- a/bot/modules/soundcloud/downloader.py
+++ b/bot/modules/soundcloud/downloader.py
@@ -15,18 +15,9 @@ class SoundCloudBytestream:
song: SongItem
@classmethod
- def from_bytes(
- cls,
- bytes_: bytes,
- filename: str,
- duration: int,
- song: SongItem
- ):
+ def from_bytes(cls, bytes_: bytes, filename: str, duration: int, song: SongItem):
return cls(
- file=bytes_,
- filename=filename,
- duration=int(duration / 1000),
- song=song
+ file=bytes_, filename=filename, duration=int(duration / 1000), song=song
)
@@ -40,60 +31,53 @@ class Downloader:
song: SongItem
@classmethod
- async def build(
- cls,
- song_id: str,
- driver: SoundCloudDriver
- ):
+ async def build(cls, song_id: str, driver: SoundCloudDriver):
track = await driver.get_track(song_id)
song = SongItem.from_soundcloud(track)
- if url := cls._try_get_progressive(track['media']['transcodings']):
+ if url := cls._try_get_progressive(track["media"]["transcodings"]):
method = cls._progressive
else:
- url = track['media']['transcodings'][0]['url']
- method = cls._hls if \
- (track['media']['transcodings'][0]['format']['protocol']
- == 'hls') else cls._progressive
+ url = track["media"]["transcodings"][0]["url"]
+ method = (
+ cls._hls
+ if (track["media"]["transcodings"][0]["format"]["protocol"] == "hls")
+ else cls._progressive
+ )
return cls(
driver=driver,
- duration=track['duration'],
+ duration=track["duration"],
method=method,
download_url=url,
filename=f'{track["title"]}.mp3',
- song=song
+ song=song,
)
@staticmethod
def _try_get_progressive(urls: list) -> str | None:
for transcode in urls:
- if transcode['format']['protocol'] == 'progressive':
- return transcode['url']
+ if transcode["format"]["protocol"] == "progressive":
+ return transcode["url"]
async def _progressive(self, url: str) -> bytes:
return await self.driver.engine.read_data(
- url=(await self.driver.engine.get(
- url
- ))['url']
+ url=(await self.driver.engine.get(url))["url"]
)
async def _hls(self, url: str) -> bytes:
m3u8_obj = m3u8.loads(
- (await self.driver.engine.read_data(
- (await self.driver.engine.get(
- url=url
- ))['url']
- )).decode()
+ (
+ await self.driver.engine.read_data(
+ (await self.driver.engine.get(url=url))["url"]
+ )
+ ).decode()
)
content = bytearray()
for segment in m3u8_obj.files:
content.extend(
- await self.driver.engine.read_data(
- url=segment,
- append_client_id=False
- )
+ await self.driver.engine.read_data(url=segment, append_client_id=False)
)
return content
@@ -103,7 +87,7 @@ class Downloader:
bytes_=await self.method(self, self.download_url),
filename=self.filename,
duration=self.duration,
- song=self.song
+ song=self.song,
)
@@ -112,7 +96,4 @@ class DownloaderBuilder:
driver: SoundCloudDriver
async def from_id(self, song_id: str):
- return await Downloader.build(
- song_id=song_id,
- driver=self.driver
- )
+ return await Downloader.build(song_id=song_id, driver=self.driver)
diff --git a/bot/modules/soundcloud/driver.py b/bot/modules/soundcloud/driver.py
index de63309..d9b8d59 100644
--- a/bot/modules/soundcloud/driver.py
+++ b/bot/modules/soundcloud/driver.py
@@ -8,23 +8,12 @@ class SoundCloudDriver:
engine: SoundCloudEngine
async def get_track(self, track_id: int | str):
- return await self.engine.call(
- f'tracks/{track_id}'
- )
+ return await self.engine.call(f"tracks/{track_id}")
async def search(self, query: str, limit: int = 30):
- return (await self.engine.call(
- 'search/tracks',
- params={
- 'q': query,
- 'limit': limit
- }
- ))['collection']
+ return (
+ await self.engine.call("search/tracks", params={"q": query, "limit": limit})
+ )["collection"]
async def resolve_url(self, url: str):
- return await self.engine.call(
- 'resolve',
- params={
- 'url': url
- }
- )
+ return await self.engine.call("resolve", params={"url": url})
diff --git a/bot/modules/soundcloud/engine.py b/bot/modules/soundcloud/engine.py
index 9ce4ba1..e953532 100644
--- a/bot/modules/soundcloud/engine.py
+++ b/bot/modules/soundcloud/engine.py
@@ -8,27 +8,33 @@ class SoundCloudEngine:
async def call(self, request_point: str, params: dict = None):
return await self.get(
- url=f'https://api-v2.soundcloud.com/{request_point}',
- params=params
+ url=f"https://api-v2.soundcloud.com/{request_point}", params=params
)
async def get(self, url: str, params: dict = None):
async with aiohttp.ClientSession() as session:
async with session.get(
- url,
- params=(params or {}) | {
- 'client_id': self.client_id,
- },
+ url,
+ params=(params or {})
+ | {
+ "client_id": self.client_id,
+ },
) as r:
return await r.json()
- async def read_data(self, url: str, params: dict = None,
- append_client_id: bool = True):
+ async def read_data(
+ self, url: str, params: dict = None, append_client_id: bool = True
+ ):
async with aiohttp.ClientSession() as session:
async with session.get(
- url,
- params=(params or {}) | ({
- 'client_id': self.client_id,
- } if append_client_id else {}),
+ url,
+ params=(params or {})
+ | (
+ {
+ "client_id": self.client_id,
+ }
+ if append_client_id
+ else {}
+ ),
) as r:
return await r.content.read()
diff --git a/bot/modules/soundcloud/song.py b/bot/modules/soundcloud/song.py
index 5ca7c61..0ced03a 100644
--- a/bot/modules/soundcloud/song.py
+++ b/bot/modules/soundcloud/song.py
@@ -9,13 +9,15 @@ class SongItem(BaseSongItem):
@classmethod
def from_soundcloud(cls, song_item: dict):
return cls(
- name=song_item['title'],
- id=str(song_item['id']),
+ name=song_item["title"],
+ id=str(song_item["id"]),
artists=[],
- thumbnail=(song_item['artwork_url'] or song_item['user']['avatar_url'] or
- 'https://soundcloud.com/images/default_avatar_large.png')
- .replace('large.jpg', 't300x300.jpg'),
- preview_url=None
+ thumbnail=(
+ song_item["artwork_url"]
+ or song_item["user"]["avatar_url"]
+ or "https://soundcloud.com/images/default_avatar_large.png"
+ ).replace("large.jpg", "t300x300.jpg"),
+ preview_url=None,
)
@property
diff --git a/bot/modules/spotify/__init__.py b/bot/modules/spotify/__init__.py
index 2b403d4..1a503db 100644
--- a/bot/modules/spotify/__init__.py
+++ b/bot/modules/spotify/__init__.py
@@ -4,7 +4,7 @@ from bot.utils.config import config
spotify = Spotify(
client_id=config.tokens.spotify.client_id,
- client_secret=config.tokens.spotify.client_secret
+ client_secret=config.tokens.spotify.client_secret,
)
-__all__ = ['spotify']
+__all__ = ["spotify"]
diff --git a/bot/modules/spotify/song.py b/bot/modules/spotify/song.py
index fcd453b..c8e24dc 100644
--- a/bot/modules/spotify/song.py
+++ b/bot/modules/spotify/song.py
@@ -9,12 +9,15 @@ class SongItem(BaseSongItem):
@classmethod
def from_spotify(cls, song_item: dict):
return cls(
- name=song_item['name'],
- id=song_item['id'],
- artists=[artist['name'] for artist in song_item['artists']],
- preview_url=song_item['preview_url'].split('?')[0] if
- song_item['preview_url'] is not None else None,
- thumbnail=song_item['album']['images'][1]['url']
+ name=song_item["name"],
+ id=song_item["id"],
+ artists=[artist["name"] for artist in song_item["artists"]],
+ preview_url=(
+ song_item["preview_url"].split("?")[0]
+ if song_item["preview_url"] is not None
+ else None
+ ),
+ thumbnail=song_item["album"]["images"][1]["url"],
)
@@ -28,7 +31,7 @@ class Songs(object):
if r is None:
return None
- return [SongItem.from_spotify(item) for item in r['tracks']['items']]
+ return [SongItem.from_spotify(item) for item in r["tracks"]["items"]]
def from_id(self, song_id: str) -> SongItem | None:
r = self.spotify.track(song_id)
diff --git a/bot/modules/spotify/spotify.py b/bot/modules/spotify/spotify.py
index bde36cf..3b2aa11 100644
--- a/bot/modules/spotify/spotify.py
+++ b/bot/modules/spotify/spotify.py
@@ -8,11 +8,10 @@ class Spotify(object):
def __init__(self, client_id, client_secret):
self.spotify = spotipy.Spotify(
client_credentials_manager=SpotifyClientCredentials(
- client_id=client_id,
- client_secret=client_secret
+ client_id=client_id, client_secret=client_secret
),
backoff_factor=0.1,
- retries=10
+ retries=10,
)
self.songs = Songs(self.spotify)
diff --git a/bot/modules/url/id_getter.py b/bot/modules/url/id_getter.py
index 21b0fc3..0813bb1 100644
--- a/bot/modules/url/id_getter.py
+++ b/bot/modules/url/id_getter.py
@@ -10,26 +10,28 @@ async def get_url_after_redirect(url: str) -> str:
async def get_id(recognised: RecognisedService):
- if recognised.name == 'yt':
- return recognised.parse_result.path.replace('/', '') if (
- recognised.parse_result.netloc.endswith('youtu.be')
- ) else recognised.parse_result.query.split('=')[1].split('&')[0]
+ if recognised.name == "yt":
+ return (
+ recognised.parse_result.path.replace("/", "")
+ if (recognised.parse_result.netloc.endswith("youtu.be"))
+ else recognised.parse_result.query.split("=")[1].split("&")[0]
+ )
- elif recognised.name == 'spot':
- if recognised.parse_result.netloc.endswith('open.spotify.com'):
- return recognised.parse_result.path.split('/')[2]
+ elif recognised.name == "spot":
+ if recognised.parse_result.netloc.endswith("open.spotify.com"):
+ return recognised.parse_result.path.split("/")[2]
else:
url = await get_url_after_redirect(recognised.parse_result.geturl())
- return url.split('/')[-1].split('?')[0]
+ return url.split("/")[-1].split("?")[0]
- elif recognised.name == 'deez':
- if recognised.parse_result.netloc.endswith('deezer.com'):
- return recognised.parse_result.path.split('/')[-1]
+ elif recognised.name == "deez":
+ if recognised.parse_result.netloc.endswith("deezer.com"):
+ return recognised.parse_result.path.split("/")[-1]
else:
url = await get_url_after_redirect(recognised.parse_result.geturl())
- return url.split('/')[-1].split('?')[0]
+ return url.split("/")[-1].split("?")[0]
- elif recognised.name == 'sc':
- if not recognised.parse_result.netloc.startswith('on'):
+ elif recognised.name == "sc":
+ if not recognised.parse_result.netloc.startswith("on"):
return recognised.parse_result.geturl()
return await get_url_after_redirect(recognised.parse_result.geturl())
diff --git a/bot/modules/url/recognise.py b/bot/modules/url/recognise.py
index 4be60d7..96b88af 100644
--- a/bot/modules/url/recognise.py
+++ b/bot/modules/url/recognise.py
@@ -14,7 +14,7 @@ from bot.modules.soundcloud import soundcloud
@dataclass
class RecognisedService:
- name: Literal['yt', 'spot', 'deez', 'sc']
+ name: Literal["yt", "spot", "deez", "sc"]
db_table: DBDict
by_id_func: Callable | Awaitable
parse_result: ParseResult
@@ -22,33 +22,33 @@ class RecognisedService:
def recognise_music_service(url: str) -> RecognisedService | None:
url = urlparse(url)
- if url.netloc.endswith('youtube.com') or url.netloc.endswith('youtu.be'):
+ if url.netloc.endswith("youtube.com") or url.netloc.endswith("youtu.be"):
return RecognisedService(
- name='yt',
+ name="yt",
db_table=db.youtube,
by_id_func=youtube.songs.from_id,
- parse_result=url
+ parse_result=url,
)
- elif url.netloc.endswith('open.spotify.com') or url.netloc.endswith('spotify.link'):
+ elif url.netloc.endswith("open.spotify.com") or url.netloc.endswith("spotify.link"):
return RecognisedService(
- name='spot',
+ name="spot",
db_table=db.spotify,
by_id_func=spotify.songs.from_id,
- parse_result=url
+ parse_result=url,
)
- elif url.netloc.endswith('deezer.page.link') or url.netloc.endswith('deezer.com'):
+ elif url.netloc.endswith("deezer.page.link") or url.netloc.endswith("deezer.com"):
return RecognisedService(
- name='deez',
+ name="deez",
db_table=db.deezer,
by_id_func=deezer.songs.from_id,
- parse_result=url
+ parse_result=url,
)
- elif url.netloc.endswith('soundcloud.com'):
+ elif url.netloc.endswith("soundcloud.com"):
return RecognisedService(
- name='sc',
+ name="sc",
db_table=db.soundcloud,
by_id_func=soundcloud.songs.from_url,
- parse_result=url
+ parse_result=url,
)
else:
return None
diff --git a/bot/modules/youtube/song.py b/bot/modules/youtube/song.py
index 7eca20c..869d355 100644
--- a/bot/modules/youtube/song.py
+++ b/bot/modules/youtube/song.py
@@ -15,19 +15,19 @@ class SongItem(BaseSongItem):
@classmethod
def from_youtube(cls, song_item: dict):
return cls(
- name=song_item['title'],
- id=song_item['videoId'],
- artists=[artist['name'] for artist in song_item['artists']],
- thumbnail=song_item['thumbnails'][1]['url']
+ name=song_item["title"],
+ id=song_item["videoId"],
+ artists=[artist["name"] for artist in song_item["artists"]],
+ thumbnail=song_item["thumbnails"][1]["url"],
)
@classmethod
def from_details(cls, details: dict):
return cls(
- name=details['title'],
- id=details['videoId'],
- artists=details['author'].split(' & '),
- thumbnail=details['thumbnail']['thumbnails'][1]['url']
+ name=details["title"],
+ id=details["videoId"],
+ artists=details["author"].split(" & "),
+ thumbnail=details["thumbnail"]["thumbnails"][1]["url"],
)
def to_bytestream(self) -> Awaitable[YouTubeBytestream]:
@@ -39,16 +39,10 @@ class Songs(object):
ytm: ytmusicapi.YTMusic
def search(
- self,
- query: str,
- limit: int = 10,
- exact_match: bool = False
+ self, query: str, limit: int = 10, exact_match: bool = False
) -> list[SongItem] | None:
r = self.ytm.search(
- query,
- limit=limit,
- filter='songs',
- ignore_spelling=exact_match
+ query, limit=limit, filter="songs", ignore_spelling=exact_match
)
if r is None:
@@ -68,4 +62,4 @@ class Songs(object):
if r is None:
return None
- return SongItem.from_details(r['videoDetails'])
+ return SongItem.from_details(r["videoDetails"])
diff --git a/bot/modules/youtube/youtube.py b/bot/modules/youtube/youtube.py
index 80ddbfd..d756f27 100644
--- a/bot/modules/youtube/youtube.py
+++ b/bot/modules/youtube/youtube.py
@@ -9,6 +9,4 @@ class YouTube(object):
self.ytm = ytmusicapi.YTMusic()
self.download = Downloader
- self.songs = Songs(
- self.ytm
- )
+ self.songs = Songs(self.ytm)
diff --git a/bot/results/common/search.py b/bot/results/common/search.py
index ec305ad..92433e1 100644
--- a/bot/results/common/search.py
+++ b/bot/results/common/search.py
@@ -1,6 +1,8 @@
from aiogram.types import (
- InlineQueryResultDocument, InlineQueryResultCachedAudio,
- InlineKeyboardMarkup, InlineKeyboardButton
+ InlineQueryResultDocument,
+ InlineQueryResultCachedAudio,
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
)
from bot.modules.database.db import DBDict
@@ -10,37 +12,38 @@ from bot.modules.common.song import BaseSongItem
from typing import TypeVar
-BaseSongT = TypeVar('BaseSongT', bound=BaseSongItem)
+BaseSongT = TypeVar("BaseSongT", bound=BaseSongItem)
async def get_common_search_result(
- audio: BaseSongT,
- db_table: DBDict,
- service_id: str,
- settings: UserSettings
+ audio: BaseSongT, db_table: DBDict, service_id: str, settings: UserSettings
) -> InlineQueryResultDocument | InlineQueryResultCachedAudio:
return (
InlineQueryResultDocument(
- id=f'{service_id}::' + audio.id,
+ id=f"{service_id}::" + audio.id,
title=audio.name,
description=audio.all_artists,
thumb_url=audio.thumbnail,
- document_url=(audio.preview_url or audio.thumbnail) if
- settings['search_preview'].value == 'preview' else audio.thumbnail,
- mime_type='application/zip',
+ document_url=(
+ (audio.preview_url or audio.thumbnail)
+ if settings["search_preview"].value == "preview"
+ else audio.thumbnail
+ ),
+ mime_type="application/zip",
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[
- [InlineKeyboardButton(text='Downloading...', callback_data='.')]
+ [InlineKeyboardButton(text="Downloading...", callback_data=".")]
]
),
caption=audio.full_name,
- ) if audio.id not in list(db_table.keys()) else
- InlineQueryResultCachedAudio(
- id=f'{service_id}c::' + audio.id,
+ )
+ if audio.id not in list(db_table.keys())
+ else InlineQueryResultCachedAudio(
+ id=f"{service_id}c::" + audio.id,
audio_file_id=db_table[audio.id],
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[
- [InlineKeyboardButton(text='Verifying...', callback_data='.')]
+ [InlineKeyboardButton(text="Verifying...", callback_data=".")]
]
),
)
diff --git a/bot/results/deezer/__init__.py b/bot/results/deezer/__init__.py
index af09ab9..2b3f940 100644
--- a/bot/results/deezer/__init__.py
+++ b/bot/results/deezer/__init__.py
@@ -1,4 +1,4 @@
from .search import get_deezer_search_results
-__all__ = ['get_deezer_search_results']
+__all__ = ["get_deezer_search_results"]
diff --git a/bot/results/deezer/search.py b/bot/results/deezer/search.py
index 3019f36..ca416d5 100644
--- a/bot/results/deezer/search.py
+++ b/bot/results/deezer/search.py
@@ -1,6 +1,4 @@
-from aiogram.types import (
- InlineQueryResultDocument, InlineQueryResultCachedAudio
-)
+from aiogram.types import InlineQueryResultDocument, InlineQueryResultCachedAudio
from bot.modules.deezer import deezer
from bot.modules.database import db
@@ -9,15 +7,12 @@ from bot.modules.settings import UserSettings
from ..common.search import get_common_search_result
-async def get_deezer_search_results(query: str, settings: UserSettings) -> list[
- InlineQueryResultDocument | InlineQueryResultCachedAudio
-]:
+async def get_deezer_search_results(
+ query: str, settings: UserSettings
+) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
- audio=audio,
- db_table=db.deezer,
- service_id='deez',
- settings=settings
+ audio=audio, db_table=db.deezer, service_id="deez", settings=settings
)
for audio in await deezer.songs.search(query, limit=50)
]
diff --git a/bot/results/error/error.py b/bot/results/error/error.py
index 4799792..90fb51f 100644
--- a/bot/results/error/error.py
+++ b/bot/results/error/error.py
@@ -1,5 +1,6 @@
from aiogram.types import (
- InlineQueryResultArticle, InputTextMessageContent,
+ InlineQueryResultArticle,
+ InputTextMessageContent,
)
from bot.modules.database import db
@@ -8,24 +9,27 @@ from bot.modules.error import Error
from bot.common import console
-async def get_error_search_results(error_id: str) -> (list[InlineQueryResultArticle]
- | None):
+async def get_error_search_results(
+ error_id: str,
+) -> list[InlineQueryResultArticle] | None:
error: Error = db.errors.get(error_id)
if error is None:
return []
- console.print(f'{error_id} requested')
+ console.print(f"{error_id} requested")
console.print(error.traceback)
- console.print(f'-{error_id} requested-')
+ console.print(f"-{error_id} requested-")
- return [(
- InlineQueryResultArticle(
- id=error_id,
- title=f'Error {error_id}',
- description=error.exception.short,
- input_message_content=InputTextMessageContent(
- message_text=error.exception.long,
- parse_mode='HTML',
- ),
+ return [
+ (
+ InlineQueryResultArticle(
+ id=error_id,
+ title=f"Error {error_id}",
+ description=error.exception.short,
+ input_message_content=InputTextMessageContent(
+ message_text=error.exception.long,
+ parse_mode="HTML",
+ ),
+ )
)
- )]
+ ]
diff --git a/bot/results/soundcloud/__init__.py b/bot/results/soundcloud/__init__.py
index ea67cc2..0a54fc7 100644
--- a/bot/results/soundcloud/__init__.py
+++ b/bot/results/soundcloud/__init__.py
@@ -1,6 +1,4 @@
from .search import get_soundcloud_search_results
-__all__ = [
- 'get_soundcloud_search_results'
-]
+__all__ = ["get_soundcloud_search_results"]
diff --git a/bot/results/soundcloud/search.py b/bot/results/soundcloud/search.py
index 8c6b662..5911f69 100644
--- a/bot/results/soundcloud/search.py
+++ b/bot/results/soundcloud/search.py
@@ -1,6 +1,4 @@
-from aiogram.types import (
- InlineQueryResultDocument, InlineQueryResultCachedAudio
-)
+from aiogram.types import InlineQueryResultDocument, InlineQueryResultCachedAudio
from bot.modules.soundcloud import soundcloud
from bot.modules.database import db
@@ -9,15 +7,12 @@ from bot.modules.settings import UserSettings
from ..common.search import get_common_search_result
-async def get_soundcloud_search_results(query: str, settings: UserSettings) -> list[
- InlineQueryResultDocument | InlineQueryResultCachedAudio
-]:
+async def get_soundcloud_search_results(
+ query: str, settings: UserSettings
+) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
- audio=audio,
- db_table=db.soundcloud,
- service_id='sc',
- settings=settings
+ audio=audio, db_table=db.soundcloud, service_id="sc", settings=settings
)
for audio in await soundcloud.songs.search(query, limit=50)
]
diff --git a/bot/results/spotify/__init__.py b/bot/results/spotify/__init__.py
index eacb443..d7673ce 100644
--- a/bot/results/spotify/__init__.py
+++ b/bot/results/spotify/__init__.py
@@ -1,6 +1,4 @@
from .search import get_spotify_search_results
-__all__ = [
- 'get_spotify_search_results'
-]
+__all__ = ["get_spotify_search_results"]
diff --git a/bot/results/spotify/search.py b/bot/results/spotify/search.py
index 9598bbd..b5f797c 100644
--- a/bot/results/spotify/search.py
+++ b/bot/results/spotify/search.py
@@ -1,6 +1,4 @@
-from aiogram.types import (
- InlineQueryResultDocument, InlineQueryResultCachedAudio
-)
+from aiogram.types import InlineQueryResultDocument, InlineQueryResultCachedAudio
from bot.modules.spotify import spotify
from bot.modules.database import db
@@ -9,15 +7,12 @@ from bot.modules.settings import UserSettings
from ..common.search import get_common_search_result
-async def get_spotify_search_results(query: str, settings: UserSettings) -> list[
- InlineQueryResultDocument | InlineQueryResultCachedAudio
-]:
+async def get_spotify_search_results(
+ query: str, settings: UserSettings
+) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
- audio=audio,
- db_table=db.spotify,
- service_id='spot',
- settings=settings
+ audio=audio, db_table=db.spotify, service_id="spot", settings=settings
)
for audio in spotify.songs.search(query, limit=50)
]
diff --git a/bot/results/url/url.py b/bot/results/url/url.py
index a4325dd..cd63de1 100644
--- a/bot/results/url/url.py
+++ b/bot/results/url/url.py
@@ -1,6 +1,4 @@
-from aiogram.types import (
- InlineQueryResultDocument, InlineQueryResultCachedAudio
-)
+from aiogram.types import InlineQueryResultDocument, InlineQueryResultCachedAudio
from bot.modules.url import recognise_music_service, get_id
from bot.modules.settings import UserSettings
@@ -10,9 +8,9 @@ from ..common.search import get_common_search_result
import inspect
-async def get_url_results(query: str, settings: UserSettings) -> list[
- InlineQueryResultDocument | InlineQueryResultCachedAudio
-]:
+async def get_url_results(
+ query: str, settings: UserSettings
+) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
service = recognise_music_service(query)
if inspect.iscoroutinefunction(service.by_id_func):
audio = await service.by_id_func(await get_id(service))
@@ -26,6 +24,6 @@ async def get_url_results(query: str, settings: UserSettings) -> list[
audio=audio,
db_table=service.db_table,
service_id=service.name,
- settings=settings
+ settings=settings,
)
]
diff --git a/bot/results/youtube/__init__.py b/bot/results/youtube/__init__.py
index b95c9c6..a182d7f 100644
--- a/bot/results/youtube/__init__.py
+++ b/bot/results/youtube/__init__.py
@@ -1,6 +1,4 @@
from .search import get_youtube_search_results
-__all__ = [
- 'get_youtube_search_results'
-]
+__all__ = ["get_youtube_search_results"]
diff --git a/bot/results/youtube/search.py b/bot/results/youtube/search.py
index d82e69b..4ad0249 100644
--- a/bot/results/youtube/search.py
+++ b/bot/results/youtube/search.py
@@ -1,6 +1,4 @@
-from aiogram.types import (
- InlineQueryResultDocument, InlineQueryResultCachedAudio
-)
+from aiogram.types import InlineQueryResultDocument, InlineQueryResultCachedAudio
from bot.modules.youtube import youtube
from bot.modules.database import db
@@ -9,15 +7,12 @@ from bot.modules.settings import UserSettings
from ..common.search import get_common_search_result
-async def get_youtube_search_results(query: str, settings: UserSettings) -> list[
- InlineQueryResultDocument | InlineQueryResultCachedAudio
-]:
+async def get_youtube_search_results(
+ query: str, settings: UserSettings
+) -> list[InlineQueryResultDocument | InlineQueryResultCachedAudio]:
return [
await get_common_search_result(
- audio=audio,
- db_table=db.youtube,
- service_id='yt',
- settings=settings
+ audio=audio, db_table=db.youtube, service_id="yt", settings=settings
)
for audio in youtube.songs.search(query, limit=40)
]
diff --git a/bot/utils/config/_config.py b/bot/utils/config/_config.py
index fd6bbe2..47f3f92 100644
--- a/bot/utils/config/_config.py
+++ b/bot/utils/config/_config.py
@@ -5,7 +5,7 @@ class Config(dict):
def __init__(self, _config: dict = None):
try:
if _config is None:
- config = tomllib.load(open('config.toml', 'rb'))
+ config = tomllib.load(open("config.toml", "rb"))
super().__init__(**config)
else:
diff --git a/lib/ShazamIO/shazamio/algorithm.py b/lib/ShazamIO/shazamio/algorithm.py
index e7ac8a0..dc69acd 100644
--- a/lib/ShazamIO/shazamio/algorithm.py
+++ b/lib/ShazamIO/shazamio/algorithm.py
@@ -40,7 +40,9 @@ class SignatureGenerator:
# Used when processing input:
- self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0)
+ self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(
+ buffer_size=2048, default_value=0
+ )
self.fft_outputs: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0.0 * 1025]
@@ -91,12 +93,15 @@ class SignatureGenerator:
self.next_signature.number_samples / self.next_signature.sample_rate_hz
< self.MAX_TIME_SECONDS
or sum(
- len(peaks) for peaks in self.next_signature.frequency_band_to_sound_peaks.values()
+ len(peaks)
+ for peaks in self.next_signature.frequency_band_to_sound_peaks.values()
)
< self.MAX_PEAKS
):
self.process_input(
- self.input_pending_processing[self.samples_processed : self.samples_processed + 128]
+ self.input_pending_processing[
+ self.samples_processed : self.samples_processed + 128
+ ]
)
self.samples_processed += 128
@@ -107,7 +112,9 @@ class SignatureGenerator:
self.next_signature.number_samples = 0
self.next_signature.frequency_band_to_sound_peaks = {}
- self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0)
+ self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(
+ buffer_size=2048, default_value=0
+ )
self.fft_outputs: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0.0 * 1025]
)
@@ -124,7 +131,9 @@ class SignatureGenerator:
self.do_peak_spreading_and_recognition()
def do_fft(self, batch_of_128_s16le_mono_samples):
- type_ring = self.ring_buffer_of_samples.position + len(batch_of_128_s16le_mono_samples)
+ type_ring = self.ring_buffer_of_samples.position + len(
+ batch_of_128_s16le_mono_samples
+ )
self.ring_buffer_of_samples[
self.ring_buffer_of_samples.position : type_ring
] = batch_of_128_s16le_mono_samples
@@ -159,10 +168,13 @@ class SignatureGenerator:
temporary_array_1[1] = np.roll(temporary_array_1[1], -1)
temporary_array_1[2] = np.roll(temporary_array_1[2], -2)
- origin_last_fft_np = np.hstack([temporary_array_1.max(axis=0)[:-3], origin_last_fft[-3:]])
+ origin_last_fft_np = np.hstack(
+ [temporary_array_1.max(axis=0)[:-3], origin_last_fft[-3:]]
+ )
i1, i2, i3 = [
- (self.spread_fft_output.position + former_fft_num) % self.spread_fft_output.buffer_size
+ (self.spread_fft_output.position + former_fft_num)
+ % self.spread_fft_output.buffer_size
for former_fft_num in [-1, -3, -6]
]
@@ -234,27 +246,38 @@ class SignatureGenerator:
fft_number = self.spread_fft_output.num_written - 46
peak_magnitude = (
- np.log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3 + 6144
+ np.log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3
+ + 6144
)
peak_magnitude_before = (
- np.log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3 + 6144
+ np.log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3
+ + 6144
)
peak_magnitude_after = (
- np.log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3 + 6144
+ np.log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3
+ + 6144
)
peak_variation_1 = (
- peak_magnitude * 2 - peak_magnitude_before - peak_magnitude_after
+ peak_magnitude * 2
+ - peak_magnitude_before
+ - peak_magnitude_after
)
peak_variation_2 = (
- (peak_magnitude_after - peak_magnitude_before) * 32 / peak_variation_1
+ (peak_magnitude_after - peak_magnitude_before)
+ * 32
+ / peak_variation_1
)
- corrected_peak_frequency_bin = bin_position * 64 + peak_variation_2
+ corrected_peak_frequency_bin = (
+ bin_position * 64 + peak_variation_2
+ )
assert peak_variation_1 > 0
- frequency_hz = corrected_peak_frequency_bin * (16000 / 2 / 1024 / 64)
+ frequency_hz = corrected_peak_frequency_bin * (
+ 16000 / 2 / 1024 / 64
+ )
if 250 < frequency_hz < 520:
band = FrequencyBand.hz_250_520
@@ -267,7 +290,10 @@ class SignatureGenerator:
else:
continue
- if band not in self.next_signature.frequency_band_to_sound_peaks:
+ if (
+ band
+ not in self.next_signature.frequency_band_to_sound_peaks
+ ):
self.next_signature.frequency_band_to_sound_peaks[band] = []
self.next_signature.frequency_band_to_sound_peaks[band].append(
diff --git a/lib/ShazamIO/shazamio/api.py b/lib/ShazamIO/shazamio/api.py
index e4b815c..0470ed9 100644
--- a/lib/ShazamIO/shazamio/api.py
+++ b/lib/ShazamIO/shazamio/api.py
@@ -27,7 +27,9 @@ class Shazam(Converter, Geo, Request):
self.language = language
self.endpoint_country = endpoint_country
- async def top_world_tracks(self, limit: int = 200, offset: int = 0) -> Dict[str, Any]:
+ async def top_world_tracks(
+ self, limit: int = 200, offset: int = 0
+ ) -> Dict[str, Any]:
"""
Search top world tracks
@@ -292,7 +294,9 @@ class Shazam(Converter, Geo, Request):
headers=self.headers(),
)
- async def search_track(self, query: str, limit: int = 10, offset: int = 0) -> Dict[str, Any]:
+ async def search_track(
+ self, query: str, limit: int = 10, offset: int = 0
+ ) -> Dict[str, Any]:
"""
Search all tracks by prefix
:param query: Track full title or prefix title
diff --git a/lib/ShazamIO/shazamio/converter.py b/lib/ShazamIO/shazamio/converter.py
index fa4ce5f..cbf9a68 100644
--- a/lib/ShazamIO/shazamio/converter.py
+++ b/lib/ShazamIO/shazamio/converter.py
@@ -60,5 +60,7 @@ class Converter:
signature_generator.feed_input(audio.get_array_of_samples())
signature_generator.MAX_TIME_SECONDS = 12
if audio.duration_seconds > 12 * 3:
- signature_generator.samples_processed += 16000 * (int(audio.duration_seconds / 2) - 6)
+ signature_generator.samples_processed += 16000 * (
+ int(audio.duration_seconds / 2) - 6
+ )
return signature_generator
diff --git a/lib/ShazamIO/shazamio/misc.py b/lib/ShazamIO/shazamio/misc.py
index f88bb67..12d5c08 100644
--- a/lib/ShazamIO/shazamio/misc.py
+++ b/lib/ShazamIO/shazamio/misc.py
@@ -47,9 +47,7 @@ class ShazamUrl:
)
LISTENING_COUNTER = "https://www.shazam.com/services/count/v2/web/track/{}"
- SEARCH_ARTIST_V2 = (
- "https://www.shazam.com/services/amapi/v1/catalog/{endpoint_country}/artists/{artist_id}"
- )
+ SEARCH_ARTIST_V2 = "https://www.shazam.com/services/amapi/v1/catalog/{endpoint_country}/artists/{artist_id}"
class Request:
diff --git a/lib/ShazamIO/shazamio/schemas/artists.py b/lib/ShazamIO/shazamio/schemas/artists.py
index 9105998..b76a8c8 100644
--- a/lib/ShazamIO/shazamio/schemas/artists.py
+++ b/lib/ShazamIO/shazamio/schemas/artists.py
@@ -80,7 +80,9 @@ class ArtistRelationships(BaseModel):
class ArtistViews(BaseModel):
- top_music_videos: Optional[TopMusicVideosView] = Field(None, alias="top-music-videos")
+ top_music_videos: Optional[TopMusicVideosView] = Field(
+ None, alias="top-music-videos"
+ )
simular_artists: Optional[SimularArtist] = Field(None, alias="similar-artists")
latest_release: Optional[LastReleaseModel] = Field(None, alias="latest-release")
full_albums: Optional[FullAlbumsModel] = Field(None, alias="full-albums")
diff --git a/lib/ShazamIO/shazamio/signature.py b/lib/ShazamIO/shazamio/signature.py
index 225e80d..b918ef2 100644
--- a/lib/ShazamIO/shazamio/signature.py
+++ b/lib/ShazamIO/shazamio/signature.py
@@ -31,7 +31,7 @@ class RawSignatureHeader(LittleEndianStructure):
# field above,
# it can be inferred and subtracted so that we obtain the number of samples,
# and from the number of samples and sample rate we can obtain the length of the recording
- ("fixed_value", c_uint32)
+ ("fixed_value", c_uint32),
# Calculated as ((15 << 19) + 0x40000) - 0x7c0000 or 00 00 7c 00 - seems pretty constant,
# may be different in the "SigType.STREAMING" mode
]
@@ -100,7 +100,9 @@ class DecodedMessage:
assert crc32(check_summable_data) & 0xFFFFFFFF == header.crc32
assert header.magic2 == 0x94119C00
- self.sample_rate_hz = int(SampleRate(header.shifted_sample_rate_id >> 27).name.strip("_"))
+ self.sample_rate_hz = int(
+ SampleRate(header.shifted_sample_rate_id >> 27).name.strip("_")
+ )
self.number_samples = int(
header.number_samples_plus_divided_sample_rate - self.sample_rate_hz * 0.24
@@ -145,13 +147,17 @@ class DecodedMessage:
fft_pass_offset: int = raw_fft_pass[0]
if fft_pass_offset == 0xFF:
- fft_pass_number = int.from_bytes(frequency_peaks_buf.read(4), "little")
+ fft_pass_number = int.from_bytes(
+ frequency_peaks_buf.read(4), "little"
+ )
continue
else:
fft_pass_number += fft_pass_offset
peak_magnitude = int.from_bytes(frequency_peaks_buf.read(2), "little")
- corrected_peak_frequency_bin = int.from_bytes(frequency_peaks_buf.read(2), "little")
+ corrected_peak_frequency_bin = int.from_bytes(
+ frequency_peaks_buf.read(2), "little"
+ )
self.frequency_band_to_sound_peaks[frequency_band].append(
FrequencyPeak(
@@ -203,7 +209,9 @@ class DecodedMessage:
header.magic1 = 0xCAFE2580
header.magic2 = 0x94119C00
- header.shifted_sample_rate_id = int(getattr(SampleRate, "_%s" % self.sample_rate_hz)) << 27
+ header.shifted_sample_rate_id = (
+ int(getattr(SampleRate, "_%s" % self.sample_rate_hz)) << 27
+ )
header.fixed_value = (15 << 19) + 0x40000
header.number_samples_plus_divided_sample_rate = int(
self.number_samples + self.sample_rate_hz * 0.24
@@ -211,7 +219,9 @@ class DecodedMessage:
contents_buf = BytesIO()
- for frequency_band, frequency_peaks in sorted(self.frequency_band_to_sound_peaks.items()):
+ for frequency_band, frequency_peaks in sorted(
+ self.frequency_band_to_sound_peaks.items()
+ ):
peaks_buf = BytesIO()
fft_pass_number = 0
@@ -225,13 +235,19 @@ class DecodedMessage:
if frequency_peak.fft_pass_number - fft_pass_number >= 255:
peaks_buf.write(b"\xff")
- peaks_buf.write(frequency_peak.fft_pass_number.to_bytes(4, "little"))
+ peaks_buf.write(
+ frequency_peak.fft_pass_number.to_bytes(4, "little")
+ )
fft_pass_number = frequency_peak.fft_pass_number
- peaks_buf.write(bytes([frequency_peak.fft_pass_number - fft_pass_number]))
+ peaks_buf.write(
+ bytes([frequency_peak.fft_pass_number - fft_pass_number])
+ )
peaks_buf.write(frequency_peak.peak_magnitude.to_bytes(2, "little"))
- peaks_buf.write(frequency_peak.corrected_peak_frequency_bin.to_bytes(2, "little"))
+ peaks_buf.write(
+ frequency_peak.corrected_peak_frequency_bin.to_bytes(2, "little")
+ )
fft_pass_number = frequency_peak.fft_pass_number
@@ -245,7 +261,9 @@ class DecodedMessage:
header.size_minus_header = len(contents_buf.getvalue()) + 8
buf = BytesIO()
- buf.write(header) # We will rewrite it just after in order to include the final CRC-32
+ buf.write(
+ header
+ ) # We will rewrite it just after in order to include the final CRC-32
buf.write((0x40000000).to_bytes(4, "little"))
buf.write((len(contents_buf.getvalue()) + 8).to_bytes(4, "little"))