Add download by url
This commit is contained in:
@@ -1 +1,2 @@
|
||||
from .search import ServiceSearchFilter
|
||||
from .url import MusicUrlFilter
|
||||
|
||||
31
bot/filters/url.py
Normal file
31
bot/filters/url.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from aiogram.filters import BaseFilter
|
||||
from aiogram.types import InlineQuery
|
||||
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
class MusicUrlFilter(BaseFilter):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def __call__(self, inline_query: InlineQuery):
|
||||
if not inline_query.query.strip().startswith('http'):
|
||||
return False
|
||||
|
||||
url = urlparse(inline_query.query)
|
||||
return (
|
||||
url.scheme in ['http', 'https'] and
|
||||
any(
|
||||
map(
|
||||
url.netloc.endswith,
|
||||
[
|
||||
'youtube.com',
|
||||
'youtu.be',
|
||||
'open.spotify.com',
|
||||
'spotify.link',
|
||||
'deezer.page.link',
|
||||
'deezer.com',
|
||||
]
|
||||
)
|
||||
)
|
||||
)
|
||||
@@ -2,6 +2,7 @@ from aiogram import Router
|
||||
from . import (
|
||||
initialize,
|
||||
inline_song,
|
||||
inline_url,
|
||||
inline_default,
|
||||
inline_empty,
|
||||
on_chosen,
|
||||
@@ -16,6 +17,7 @@ router.chosen_inline_result.outer_middleware(SaveChosenMiddleware())
|
||||
router.include_routers(
|
||||
initialize.router,
|
||||
inline_song.router,
|
||||
inline_url.router,
|
||||
inline_default.router,
|
||||
inline_empty.router,
|
||||
on_chosen.router,
|
||||
|
||||
1
bot/handlers/inline_url/__init__.py
Normal file
1
bot/handlers/inline_url/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .on_inline_url import router
|
||||
17
bot/handlers/inline_url/on_inline_url.py
Normal file
17
bot/handlers/inline_url/on_inline_url.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from aiogram import Router
|
||||
|
||||
from aiogram.types import InlineQuery
|
||||
|
||||
from bot.results.url import get_url_results
|
||||
from bot.filters import MusicUrlFilter
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
@router.inline_query(MusicUrlFilter())
|
||||
async def url_deezer_inline_query(inline_query: InlineQuery):
|
||||
await inline_query.answer(
|
||||
await get_url_results(inline_query.query),
|
||||
cache_time=0,
|
||||
is_personal=True
|
||||
)
|
||||
2
bot/modules/url/__init__.py
Normal file
2
bot/modules/url/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .recognise import recognise_music_service, RecognisedService
|
||||
from .id_getter import get_id
|
||||
30
bot/modules/url/id_getter.py
Normal file
30
bot/modules/url/id_getter.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from .recognise import RecognisedService
|
||||
|
||||
import aiohttp
|
||||
|
||||
|
||||
async def get_url_after_redirect(url: str) -> str:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.head(url, allow_redirects=True) as resp:
|
||||
return str(resp.url)
|
||||
|
||||
|
||||
async def get_id(recognised: RecognisedService):
|
||||
if recognised.name == 'yt':
|
||||
return recognised.parse_result.path.replace('/', '') if (
|
||||
recognised.parse_result.netloc.endswith('youtu.be')
|
||||
) else recognised.parse_result.query.split('=')[1].split('&')[0]
|
||||
|
||||
elif recognised.name == 'spot':
|
||||
if recognised.parse_result.netloc.endswith('open.spotify.com'):
|
||||
return recognised.parse_result.path.split('/')[2]
|
||||
else:
|
||||
url = await get_url_after_redirect(recognised.parse_result.geturl())
|
||||
return url.split('/')[-1].split('?')[0]
|
||||
|
||||
elif recognised.name == 'deez':
|
||||
if recognised.parse_result.netloc.endswith('deezer.com'):
|
||||
return recognised.parse_result.path.split('/')[-1]
|
||||
else:
|
||||
url = await get_url_after_redirect(recognised.parse_result.geturl())
|
||||
return url.split('/')[-1].split('?')[0]
|
||||
46
bot/modules/url/recognise.py
Normal file
46
bot/modules/url/recognise.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from urllib.parse import urlparse, ParseResult
|
||||
from dataclasses import dataclass
|
||||
|
||||
from typing import Callable, Awaitable, Literal
|
||||
|
||||
from bot.modules.database import db
|
||||
from bot.modules.database.db import DBDict
|
||||
|
||||
from bot.modules.youtube import youtube
|
||||
from bot.modules.spotify import spotify
|
||||
from bot.modules.deezer import deezer
|
||||
|
||||
|
||||
@dataclass
|
||||
class RecognisedService:
|
||||
name: Literal['yt', 'spot', 'deez']
|
||||
db_table: DBDict
|
||||
by_id_func: Callable | Awaitable
|
||||
parse_result: ParseResult
|
||||
|
||||
|
||||
def recognise_music_service(url: str) -> RecognisedService | None:
|
||||
url = urlparse(url)
|
||||
if url.netloc.endswith('youtube.com') or url.netloc.endswith('youtu.be'):
|
||||
return RecognisedService(
|
||||
name='yt',
|
||||
db_table=db.youtube,
|
||||
by_id_func=youtube.songs.from_id,
|
||||
parse_result=url
|
||||
)
|
||||
elif url.netloc.endswith('open.spotify.com') or url.netloc.endswith('spotify.link'):
|
||||
return RecognisedService(
|
||||
name='spot',
|
||||
db_table=db.spotify,
|
||||
by_id_func=spotify.songs.from_id,
|
||||
parse_result=url
|
||||
)
|
||||
elif url.netloc.endswith('deezer.page.link') or url.netloc.endswith('deezer.com'):
|
||||
return RecognisedService(
|
||||
name='deez',
|
||||
db_table=db.deezer,
|
||||
by_id_func=deezer.songs.from_id,
|
||||
parse_result=url
|
||||
)
|
||||
else:
|
||||
return None
|
||||
1
bot/results/url/__init__.py
Normal file
1
bot/results/url/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .url import get_url_results
|
||||
29
bot/results/url/url.py
Normal file
29
bot/results/url/url.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from aiogram.types import (
|
||||
InlineQueryResultDocument, InlineQueryResultCachedAudio
|
||||
)
|
||||
|
||||
from bot.modules.url import recognise_music_service, get_id
|
||||
|
||||
from ..common.search import get_common_search_result
|
||||
|
||||
import inspect
|
||||
|
||||
|
||||
async def get_url_results(query: str) -> list[
|
||||
InlineQueryResultDocument | InlineQueryResultCachedAudio
|
||||
]:
|
||||
service = recognise_music_service(query)
|
||||
if inspect.iscoroutinefunction(service.by_id_func):
|
||||
audio = await service.by_id_func(await get_id(service))
|
||||
elif inspect.ismethod(service.by_id_func):
|
||||
audio = service.by_id_func(await get_id(service))
|
||||
else:
|
||||
return []
|
||||
|
||||
return [
|
||||
await get_common_search_result(
|
||||
audio=audio,
|
||||
db_table=service.db_table,
|
||||
service_id=service.name,
|
||||
)
|
||||
]
|
||||
Reference in New Issue
Block a user