Add soundcloud support

This commit is contained in:
BarsTiger
2023-11-13 23:08:58 +02:00
parent e99ba9daa3
commit bc2663c17c
22 changed files with 391 additions and 7 deletions

View File

@@ -0,0 +1,10 @@
from .soundcloud import SoundCloud
from .downloader import SoundCloudBytestream
from bot.utils.config import config
soundcloud = SoundCloud(
client_id=config.tokens.soundcloud.client_id,
)
__all__ = ['soundcloud', 'SoundCloudBytestream']

View File

@@ -0,0 +1,118 @@
from attrs import define
from typing import Callable
from .driver import SoundCloudDriver
from .song import SongItem
import m3u8
@define
class SoundCloudBytestream:
file: bytes
filename: str
duration: int
song: SongItem
@classmethod
def from_bytes(
cls,
bytes_: bytes,
filename: str,
duration: int,
song: SongItem
):
return cls(
file=bytes_,
filename=filename,
duration=int(duration / 1000),
song=song
)
@define
class Downloader:
driver: SoundCloudDriver
download_url: str
duration: int
filename: str
method: Callable
song: SongItem
@classmethod
async def build(
cls,
song_id: str,
driver: SoundCloudDriver
):
track = await driver.get_track(song_id)
song = SongItem.from_soundcloud(track)
if url := cls._try_get_progressive(track['media']['transcodings']):
method = cls._progressive
else:
url = track['media']['transcodings'][0]['url']
method = cls._hls if \
(track['media']['transcodings'][0]['format']['protocol']
== 'hls') else cls._progressive
return cls(
driver=driver,
duration=track['duration'],
method=method,
download_url=url,
filename=f'{track["title"]}.mp3',
song=song
)
@staticmethod
def _try_get_progressive(urls: list) -> str | None:
for transcode in urls:
if transcode['format']['protocol'] == 'progressive':
return transcode['url']
async def _progressive(self, url: str) -> bytes:
return await self.driver.engine.read_data(
url=(await self.driver.engine.get(
url
))['url']
)
async def _hls(self, url: str) -> bytes:
m3u8_obj = m3u8.loads(
(await self.driver.engine.read_data(
(await self.driver.engine.get(
url=url
))['url']
)).decode()
)
content = bytearray()
for segment in m3u8_obj.files:
content.extend(
await self.driver.engine.read_data(
url=segment,
append_client_id=False
)
)
return content
async def to_bytestream(self) -> SoundCloudBytestream:
return SoundCloudBytestream.from_bytes(
bytes_=await self.method(self, self.download_url),
filename=self.filename,
duration=self.duration,
song=self.song
)
@define
class DownloaderBuilder:
driver: SoundCloudDriver
async def from_id(self, song_id: str):
return await Downloader.build(
song_id=song_id,
driver=self.driver
)

View File

@@ -0,0 +1,30 @@
from attrs import define
from .engine import SoundCloudEngine
@define
class SoundCloudDriver:
engine: SoundCloudEngine
async def get_track(self, track_id: int | str):
return await self.engine.call(
f'tracks/{track_id}'
)
async def search(self, query: str, limit: int = 30):
return (await self.engine.call(
'search/tracks',
params={
'q': query,
'limit': limit
}
))['collection']
async def resolve_url(self, url: str):
return await self.engine.call(
'resolve',
params={
'url': url
}
)

View File

@@ -0,0 +1,34 @@
from attrs import define
import aiohttp
@define
class SoundCloudEngine:
client_id: str
async def call(self, request_point: str, params: dict = None):
return await self.get(
url=f'https://api-v2.soundcloud.com/{request_point}',
params=params
)
async def get(self, url: str, params: dict = None):
async with aiohttp.ClientSession() as session:
async with session.get(
url,
params=(params or {}) | {
'client_id': self.client_id,
},
) as r:
return await r.json()
async def read_data(self, url: str, params: dict = None,
append_client_id: bool = True):
async with aiohttp.ClientSession() as session:
async with session.get(
url,
params=(params or {}) | ({
'client_id': self.client_id,
} if append_client_id else {}),
) as r:
return await r.content.read()

View File

@@ -0,0 +1,55 @@
from attrs import define
from ..common.song import BaseSongItem
from .driver import SoundCloudDriver
@define
class SongItem(BaseSongItem):
@classmethod
def from_soundcloud(cls, song_item: dict):
return cls(
name=song_item['title'],
id=str(song_item['id']),
artists=[],
thumbnail=(song_item['artwork_url'] or song_item['user']['avatar_url'] or
'https://soundcloud.com/images/default_avatar_large.png')
.replace('large.jpg', 't300x300.jpg'),
preview_url=None
)
@property
def all_artists(self):
return None
@define
class Songs(object):
driver: SoundCloudDriver
async def search(self, query: str, limit: int = 30) -> list[SongItem] | None:
r = await self.driver.search(query, limit=limit)
if r is None:
return None
return [SongItem.from_soundcloud(item) for item in r][:limit]
async def search_one(self, query: str) -> SongItem | None:
return (await self.search(query, limit=1) or [None])[0]
async def from_id(self, song_id: str) -> SongItem | None:
r = await self.driver.get_track(song_id)
if r is None:
return None
return SongItem.from_soundcloud(r)
async def from_url(self, url: str) -> SongItem | None:
r = await self.driver.resolve_url(url)
if r is None:
return None
return SongItem.from_soundcloud(r)

View File

@@ -0,0 +1,12 @@
from .engine import SoundCloudEngine
from .driver import SoundCloudDriver
from .song import Songs
from .downloader import DownloaderBuilder
class SoundCloud(object):
def __init__(self, client_id: str):
self.engine = SoundCloudEngine(client_id=client_id)
self.driver = SoundCloudDriver(engine=self.engine)
self.songs = Songs(driver=self.driver)
self.downloader = DownloaderBuilder(driver=self.driver)