This commit is contained in:
BarsTiger
2023-10-07 22:51:05 +03:00
parent 2f14f4d1ee
commit 9d45084d98
33 changed files with 2205 additions and 0 deletions

21
lib/ShazamIO/LICENSE.txt Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 dotX12
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,38 @@
[tool.poetry]
name = "shazamio"
version = "0.0.6"
description = "Is a asynchronous framework from reverse engineered Shazam API written in Python 3.8+ with asyncio and aiohttp."
authors = ["dotX12"]
license = "MIT License"
keywords = ["python", "shazam", "music", "recognize", "api", "async", "asyncio", "aiohttp", "identification"]
readme = "README.md"
homepage = "https://github.com/dotX12/ShazamIO"
repository = "https://github.com/dotX12/ShazamIO"
include = [
"README.md",
"LICENSE.txt"
]
[tool.poetry.dependencies]
python = "^3.8"
numpy = "^1.24.0"
aiohttp = "^3.8.3"
pydub = "^0.25.1"
dataclass-factory = "2.16"
aiofiles = "*"
anyio = "^3.6.2"
pydantic = "*"
[build-system]
requires = ["poetry-core>=1.0.0", "wheel>=0.36,<1.0", "poetry>=1.1,<2", "virtualenv==20.0.33"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
addopts = "-scoped"
asyncio_mode = "auto"
filterwarnings = ["ignore::DeprecationWarning"]
[tool.black]
line-length = 100

View File

@@ -0,0 +1,6 @@
from .serializers import Serialize
from .api import Shazam
from .converter import Geo
from .enums import GenreMusic
__all__ = ("Serialize", "Shazam", "Geo", "GenreMusic")

View File

@@ -0,0 +1,280 @@
from copy import copy
from typing import List, Optional, Any
import numpy as np
from .enums import FrequencyBand
from .signature import DecodedMessage, FrequencyPeak
HANNING_MATRIX = np.hanning(2050)[1:-1] # Wipe trailing and leading zeroes
class RingBuffer(list):
def __init__(self, buffer_size: int, default_value: Any = None):
if default_value is not None:
list.__init__(self, [copy(default_value) for _ in range(buffer_size)])
else:
list.__init__(self, [None] * buffer_size)
self.position: int = 0
self.buffer_size: int = buffer_size
self.num_written: int = 0
def append(self, value: Any):
self[self.position] = value
self.position += 1
self.position %= self.buffer_size
self.num_written += 1
class SignatureGenerator:
def __init__(self):
# Used when storing input that will be processed when requiring to
# generate a signature:
self.input_pending_processing: List[int] = []
# Signed 16-bits, 16 KHz mono samples to be processed
self.samples_processed: int = 0
# Number of samples processed out of "self.input_pending_processing"
# Used when processing input:
self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0)
self.fft_outputs: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0.0 * 1025]
)
# Lists of 1025 floats, premultiplied with a Hanning function before being
# passed through FFT, computed from
# the ring buffer every new 128 samples
self.spread_fft_output: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0] * 1025
)
# How much data to send to Shazam at once?
self.MAX_TIME_SECONDS = 3.1
self.MAX_PEAKS = 255
# The object that will hold information about the next fingerpring
# to be produced
self.next_signature = DecodedMessage()
self.next_signature.sample_rate_hz = 16000
self.next_signature.number_samples = 0
self.next_signature.frequency_band_to_sound_peaks = {}
"""
Add data to be generated a signature for, which will be
processed when self.get_next_signature() is called. This
function expects signed 16-bit 16 KHz mono PCM samples.
"""
def feed_input(self, s16le_mono_samples: List[int]):
self.input_pending_processing += s16le_mono_samples
"""
Consume some of the samples fed to self.feed_input(), and return
a Shazam signature (DecodedMessage object) to be sent to servers
once "enough data has been gathered".
Except if there are no more samples to be consumed, in this case
we will return None.
"""
def get_next_signature(self) -> Optional[DecodedMessage]:
if len(self.input_pending_processing) - self.samples_processed < 128:
return None
while len(self.input_pending_processing) - self.samples_processed >= 128 and (
self.next_signature.number_samples / self.next_signature.sample_rate_hz
< self.MAX_TIME_SECONDS
or sum(
len(peaks) for peaks in self.next_signature.frequency_band_to_sound_peaks.values()
)
< self.MAX_PEAKS
):
self.process_input(
self.input_pending_processing[self.samples_processed : self.samples_processed + 128]
)
self.samples_processed += 128
returned_signature = self.next_signature
self.next_signature = DecodedMessage()
self.next_signature.sample_rate_hz = 16000
self.next_signature.number_samples = 0
self.next_signature.frequency_band_to_sound_peaks = {}
self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0)
self.fft_outputs: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0.0 * 1025]
)
self.spread_fft_output: RingBuffer[List[float]] = RingBuffer(
buffer_size=256, default_value=[0] * 1025
)
return returned_signature
def process_input(self, s16le_mono_samples: List[int]):
self.next_signature.number_samples += len(s16le_mono_samples)
for position_of_chunk in range(0, len(s16le_mono_samples), 128):
self.do_fft(s16le_mono_samples[position_of_chunk : position_of_chunk + 128])
self.do_peak_spreading_and_recognition()
def do_fft(self, batch_of_128_s16le_mono_samples):
type_ring = self.ring_buffer_of_samples.position + len(batch_of_128_s16le_mono_samples)
self.ring_buffer_of_samples[
self.ring_buffer_of_samples.position : type_ring
] = batch_of_128_s16le_mono_samples
self.ring_buffer_of_samples.position += len(batch_of_128_s16le_mono_samples)
self.ring_buffer_of_samples.position %= 2048
self.ring_buffer_of_samples.num_written += len(batch_of_128_s16le_mono_samples)
excerpt_from_ring_buffer: list = (
self.ring_buffer_of_samples[self.ring_buffer_of_samples.position :]
+ self.ring_buffer_of_samples[: self.ring_buffer_of_samples.position]
)
# The pre multiplication of the array is for applying a windowing function before the DFT
# (slight rounded Hanning without zeros at edges)
fft_results: np.array = np.fft.rfft(HANNING_MATRIX * excerpt_from_ring_buffer)
fft_results = (fft_results.real**2 + fft_results.imag**2) / (1 << 17)
fft_results = np.maximum(fft_results, 0.0000000001)
self.fft_outputs.append(fft_results)
def do_peak_spreading_and_recognition(self):
self.do_peak_spreading()
if self.spread_fft_output.num_written >= 46:
self.do_peak_recognition()
def do_peak_spreading(self):
origin_last_fft: List[float] = self.fft_outputs[self.fft_outputs.position - 1]
temporary_array_1 = np.tile(origin_last_fft, 3).reshape((3, -1))
temporary_array_1[1] = np.roll(temporary_array_1[1], -1)
temporary_array_1[2] = np.roll(temporary_array_1[2], -2)
origin_last_fft_np = np.hstack([temporary_array_1.max(axis=0)[:-3], origin_last_fft[-3:]])
i1, i2, i3 = [
(self.spread_fft_output.position + former_fft_num) % self.spread_fft_output.buffer_size
for former_fft_num in [-1, -3, -6]
]
temporary_array_2 = np.vstack(
[
origin_last_fft_np,
self.spread_fft_output[i1],
self.spread_fft_output[i2],
self.spread_fft_output[i3],
]
)
temporary_array_2[1] = np.max(temporary_array_2[:2, :], axis=0)
temporary_array_2[2] = np.max(temporary_array_2[:3, :], axis=0)
temporary_array_2[3] = np.max(temporary_array_2[:4, :], axis=0)
self.spread_fft_output[i1] = temporary_array_2[1].tolist()
self.spread_fft_output[i2] = temporary_array_2[2].tolist()
self.spread_fft_output[i3] = temporary_array_2[3].tolist()
self.spread_fft_output.append(list(origin_last_fft_np))
def do_peak_recognition(self):
fft_minus_46 = self.fft_outputs[
(self.fft_outputs.position - 46) % self.fft_outputs.buffer_size
]
fft_minus_49 = self.spread_fft_output[
(self.spread_fft_output.position - 49) % self.spread_fft_output.buffer_size
]
for bin_position in range(10, 1015):
# Ensure that the bin is large enough to be a peak
if fft_minus_46[bin_position] >= 1 / 64 and (
fft_minus_46[bin_position] >= fft_minus_49[bin_position - 1]
):
# Ensure that it is frequency-domain local minimum
max_neighbor_in_fft_minus_49 = 0
for neighbor_offset in [*range(-10, -3, 3), -3, 1, *range(2, 9, 3)]:
max_neighbor_in_fft_minus_49 = max(
fft_minus_49[bin_position + neighbor_offset],
max_neighbor_in_fft_minus_49,
)
if fft_minus_46[bin_position] > max_neighbor_in_fft_minus_49:
# Ensure that it is a time-domain local minimum
max_neighbor_in_other_adjacent_ffts = max_neighbor_in_fft_minus_49
for other_offset in [
-53,
-45,
*range(165, 201, 7),
*range(214, 250, 7),
]:
max_neighbor_in_other_adjacent_ffts = max(
self.spread_fft_output[
(self.spread_fft_output.position + other_offset)
% self.spread_fft_output.buffer_size
][bin_position - 1],
max_neighbor_in_other_adjacent_ffts,
)
if fft_minus_46[bin_position] > max_neighbor_in_other_adjacent_ffts:
# This is a peak, store the peak
fft_number = self.spread_fft_output.num_written - 46
peak_magnitude = (
np.log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3 + 6144
)
peak_magnitude_before = (
np.log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3 + 6144
)
peak_magnitude_after = (
np.log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3 + 6144
)
peak_variation_1 = (
peak_magnitude * 2 - peak_magnitude_before - peak_magnitude_after
)
peak_variation_2 = (
(peak_magnitude_after - peak_magnitude_before) * 32 / peak_variation_1
)
corrected_peak_frequency_bin = bin_position * 64 + peak_variation_2
assert peak_variation_1 > 0
frequency_hz = corrected_peak_frequency_bin * (16000 / 2 / 1024 / 64)
if 250 < frequency_hz < 520:
band = FrequencyBand.hz_250_520
elif 520 < frequency_hz < 1450:
band = FrequencyBand.hz_520_1450
elif 1450 < frequency_hz < 3500:
band = FrequencyBand.hz_1450_3500
elif 5500 < frequency_hz <= 5500:
band = FrequencyBand.hz_3500_5500
else:
continue
if band not in self.next_signature.frequency_band_to_sound_peaks:
self.next_signature.frequency_band_to_sound_peaks[band] = []
self.next_signature.frequency_band_to_sound_peaks[band].append(
FrequencyPeak(
fft_number,
int(peak_magnitude),
int(corrected_peak_frequency_bin),
16000,
)
)

View File

@@ -0,0 +1,378 @@
import pathlib
import uuid
import time
from typing import Optional
from pydub import AudioSegment
from typing import Dict, Any, Union
from .misc import Request
from .misc import ShazamUrl
from .schemas.artists import ArtistQuery
from .signature import DecodedMessage
from .enums import GenreMusic
from .converter import Converter, Geo
from .typehints import CountryCode
from .utils import ArtistQueryGenerator
from .utils import get_song
class Shazam(Converter, Geo, Request):
"""Is asynchronous framework for reverse engineered Shazam API written in Python 3.7 with
asyncio and aiohttp."""
def __init__(self, language: str = "en-US", endpoint_country: str = "GB"):
super().__init__(language=language)
self.language = language
self.endpoint_country = endpoint_country
async def top_world_tracks(self, limit: int = 200, offset: int = 0) -> Dict[str, Any]:
"""
Search top world tracks
:param limit: Determines how many songs the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 songs.
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter to
your own.
:return: dict tracks
"""
return await self.request(
"GET",
ShazamUrl.TOP_TRACKS_WORLD.format(
language=self.language,
endpoint_country=self.endpoint_country,
limit=limit,
offset=offset,
),
headers=self.headers(),
)
async def artist_about(
self, artist_id: int, query: Optional[ArtistQuery] = None
) -> Dict[str, Any]:
"""
Retrieving information from an artist profile
:param artist_id: Artist number. Example (203347991)
:param query: Foo
https://www.shazam.com/artist/203347991/
:return: dict about artist
"""
if query:
pg = ArtistQueryGenerator(source=query)
params_dict = pg.params()
else:
params_dict = {}
return await self.request(
"GET",
ShazamUrl.SEARCH_ARTIST_V2.format(
endpoint_country=self.endpoint_country,
artist_id=artist_id,
),
params=params_dict,
headers=self.headers(),
)
async def track_about(self, track_id: int) -> Dict[str, Any]:
"""
Get track information
:param track_id: Track number. Example: (549952578)
https://www.shazam.com/track/549952578/
:return: dict about track
"""
return await self.request(
"GET",
ShazamUrl.ABOUT_TRACK.format(
language=self.language,
endpoint_country=self.endpoint_country,
track_id=track_id,
),
headers=self.headers(),
)
async def top_country_tracks(
self,
country_code: Union[CountryCode, str],
limit: int = 200,
offset: int = 0,
) -> Dict[str, Any]:
"""
Get the best tracks by country code
https://www.shazam.com/charts/discovery/netherlands
:param country_code: ISO 3166-3 alpha-2 code. Example: RU,NL,UA
:param limit: Determines how many songs the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 songs.
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter to
your own.
:return: dict songs
"""
return await self.request(
"GET",
ShazamUrl.TOP_TRACKS_COUNTRY.format(
language=self.language,
endpoint_country=self.endpoint_country,
country_code=country_code,
limit=limit,
offset=offset,
),
headers=self.headers(),
)
async def top_city_tracks(
self,
country_code: Union[CountryCode, str],
city_name: str,
limit: int = 200,
offset: int = 0,
) -> Dict[str, Any]:
"""
Retrieving information from an artist profile
https://www.shazam.com/charts/top-50/russia/moscow
:param country_code: ISO 3166-3 alpha-2 code. Example: RU,NL,UA
:param city_name: City name from https://github.com/dotX12/dotX12/blob/main/city.json
Example: Budapest, Moscow
:param limit: Determines how many songs the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 songs.
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter to
your own.
:return: dict songs
"""
city_id = await self.city_id_from(country=country_code, city=city_name)
return await self.request(
"GET",
ShazamUrl.TOP_TRACKS_CITY.format(
language=self.language,
endpoint_country=self.endpoint_country,
limit=limit,
offset=offset,
city_id=city_id,
),
headers=self.headers(),
)
async def top_world_genre_tracks(
self,
genre: Union[GenreMusic, int],
limit: int = 100,
offset: int = 0,
) -> Dict[str, Any]:
"""
Get world tracks by certain genre
https://www.shazam.com/charts/genre/world/rock
:param genre: Genre name or ID:
POP = 1, HIP_HOP_RAP = 2, DANCE = 3, ELECTRONIC = 4, RNB_SOUL = 5, ALTERNATIVE =
6, ROCK = 7
LATIN = 8, FILM_TV_STAGE = 9, COUNTRY = 10, AFRO_BEATS = 11, WORLDWIDE = 12,
REGGAE_DANCE_HALL = 13
HOUSE = 14, K_POP = 15, FRENCH_POP = 16, SINGER_SONGWRITER = 17,
REGIONAL_MEXICANO = 18
:param limit: Determines how many songs the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 songs.
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter
to your own.
:return: dict songs
"""
return await self.request(
"GET",
ShazamUrl.GENRE_WORLD.format(
language=self.language,
endpoint_country=self.endpoint_country,
limit=limit,
offset=offset,
genre=genre,
),
headers=self.headers(),
)
async def top_country_genre_tracks(
self,
country_code: str,
genre: Union[GenreMusic, int],
limit: int = 200,
offset: int = 0,
) -> Dict[str, Any]:
"""
The best tracks by a genre in the country
https://www.shazam.com/charts/genre/spain/hip-hop-rap
:param country_code: ISO 3166-3 alpha-2 code. Example: RU,NL,UA
:param genre: Genre name or ID:
POP = 1, HIP_HOP_RAP = 2, DANCE = 3, ELECTRONIC = 4, RNB_SOUL = 5, ALTERNATIVE =
6, ROCK = 7
LATIN = 8, FILM_TV_STAGE = 9, COUNTRY = 10, AFRO_BEATS = 11, WORLDWIDE = 12,
REGGAE_DANCE_HALL = 13
HOUSE = 14, K_POP = 15, FRENCH_POP = 16, SINGER_SONGWRITER = 17,
REGIONAL_MEXICANO = 18
:param limit: Determines how many songs the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 songs
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter to
your own.
:return: dict songs
"""
return await self.request(
"GET",
ShazamUrl.GENRE_COUNTRY.format(
language=self.language,
endpoint_country=self.endpoint_country,
limit=limit,
offset=offset,
country=country_code,
genre=genre,
),
headers=self.headers(),
)
async def related_tracks(
self,
track_id: int,
limit: int = 20,
offset: int = 0,
) -> Dict[str, Any]:
"""
Similar songs based song id
https://www.shazam.com/track/546891609/2-phu%CC%81t-ho%CC%9Bn-kaiz-remix
:param track_id: Track number. Example: (549952578)
https://www.shazam.com/track/549952578/
:param limit: Determines how many songs the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 songs
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter to
your own.
:return: dict tracks
"""
return await self.request(
"GET",
ShazamUrl.RELATED_SONGS.format(
language=self.language,
endpoint_country=self.endpoint_country,
limit=limit,
offset=offset,
track_id=track_id,
),
headers=self.headers(),
)
async def search_artist(
self,
query: str,
limit: int = 10,
offset: int = 0,
) -> Dict[str, Any]:
"""
Search all artists by prefix or fullname
:param query: Artist name or search prefix
:param limit: Determines how many artists the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 artists.
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter to
your own.
:return: dict artists
"""
return await self.request(
"GET",
ShazamUrl.SEARCH_ARTIST.format(
language=self.language,
endpoint_country=self.endpoint_country,
limit=limit,
offset=offset,
query=query,
),
headers=self.headers(),
)
async def search_track(self, query: str, limit: int = 10, offset: int = 0) -> Dict[str, Any]:
"""
Search all tracks by prefix
:param query: Track full title or prefix title
:param limit: Determines how many songs the maximum can be in the request.
Example: If 5 is specified, the query will return no more than 5 songs.
:param offset: A parameter that determines with which song to display the request.
The default is 0. If you want to skip the first few songs, set this parameter to
your own.
:return: dict songs
"""
return await self.request(
"GET",
ShazamUrl.SEARCH_MUSIC.format(
language=self.language,
endpoint_country=self.endpoint_country,
limit=limit,
offset=offset,
query=query,
),
headers=self.headers(),
)
async def listening_counter(self, track_id: int) -> Dict[str, Any]:
"""
Returns the total track listener counter.
:param track_id: Track number. Example: (559284007)
https://www.shazam.com/track/559284007/rampampam
:return: The data dictionary that contains the listen counter.
"""
return await self.request(
"GET",
ShazamUrl.LISTENING_COUNTER.format(
track_id,
language=self.language,
),
headers=self.headers(),
)
async def get_youtube_data(self, link: str) -> Dict[str, Any]:
return await self.request("GET", link, headers=self.headers())
async def recognize_song(
self, data: Union[str, pathlib.Path, bytes, bytearray, AudioSegment]
) -> Dict[str, Any]:
"""
Creating a song signature based on a file and searching for this signature in the shazam
database.
:param data: Path to song file or bytes
:return: Dictionary with information about the found song
"""
song = await get_song(data=data)
audio = self.normalize_audio_data(song)
signature_generator = self.create_signature_generator(audio)
signature = signature_generator.get_next_signature()
if len(signature_generator.input_pending_processing) < 128:
return {"matches": []}
while not signature:
signature = signature_generator.get_next_signature()
results = await self.send_recognize_request(signature)
return results
async def send_recognize_request(self, sig: DecodedMessage) -> Dict[str, Any]:
data = Converter.data_search(
Request.TIME_ZONE,
sig.encode_to_uri(),
int(sig.number_samples / sig.sample_rate_hz * 1000),
int(time.time() * 1000),
)
return await self.request(
"POST",
ShazamUrl.SEARCH_FROM_FILE.format(
language=self.language,
endpoint_country=self.endpoint_country,
uuid_1=str(uuid.uuid4()).upper(),
uuid_2=str(uuid.uuid4()).upper(),
),
headers=self.headers(),
json=data,
)

View File

@@ -0,0 +1,18 @@
import aiohttp
from shazamio.exceptions import BadMethod
from shazamio.utils import validate_json
class HTTPClient:
@staticmethod
async def request(method: str, url: str, *args, **kwargs) -> dict:
async with aiohttp.ClientSession() as session:
if method.upper() == "GET":
async with session.get(url, **kwargs) as resp:
return await validate_json(resp, *args)
elif method.upper() == "POST":
async with session.post(url, **kwargs) as resp:
return await validate_json(resp, *args)
else:
raise BadMethod("Accept only GET/POST")

View File

@@ -0,0 +1,64 @@
from pydub import AudioSegment
from shazamio.algorithm import SignatureGenerator
from shazamio.client import HTTPClient
from shazamio.exceptions import BadCityName, BadCountryName
from shazamio.misc import ShazamUrl
from shazamio.schemas.models import *
from shazamio.typehints import CountryCode
class Geo(HTTPClient):
async def city_id_from(self, country: Union[CountryCode, str], city: str) -> int:
"""
Return City ID from country name and city name.
:param country: - Country name
:param city: - City name
:return: City ID
"""
data = await self.request("GET", ShazamUrl.CITY_IDS, "text/plain")
for response_country in data["countries"]:
if country == response_country["id"]:
for response_city in response_country["cities"]:
if city == response_city["name"]:
return response_city["id"]
raise BadCityName("City not found, check city name")
async def all_cities_from_country(self, country: Union[CountryCode, str]) -> list:
cities = []
data = await self.request("GET", ShazamUrl.CITY_IDS, "text/plain")
for response_country in data["countries"]:
if country == response_country["id"]:
for city in response_country["cities"]:
cities.append(city["name"])
return cities
raise BadCountryName("Country not found, check country name")
class Converter:
@staticmethod
def data_search(timezone: str, uri: str, samplems: int, timestamp: int) -> dict:
return {
"timezone": timezone,
"signature": {"uri": uri, "samplems": samplems},
"timestamp": timestamp,
"context": {},
"geolocation": {},
}
@staticmethod
def normalize_audio_data(audio: AudioSegment) -> AudioSegment:
audio = audio.set_sample_width(2)
audio = audio.set_frame_rate(16000)
audio = audio.set_channels(1)
return audio
@staticmethod
def create_signature_generator(audio: AudioSegment) -> SignatureGenerator:
signature_generator = SignatureGenerator()
signature_generator.feed_input(audio.get_array_of_samples())
signature_generator.MAX_TIME_SECONDS = 12
if audio.duration_seconds > 12 * 3:
signature_generator.samples_processed += 16000 * (int(audio.duration_seconds / 2) - 6)
return signature_generator

View File

@@ -0,0 +1,41 @@
from enum import IntEnum
class GenreMusic(IntEnum):
POP = 1
HIP_HOP_RAP = 2
DANCE = 3
ELECTRONIC = 4
RNB_SOUL = 5
ALTERNATIVE = 6
ROCK = 7
LATIN = 8
FILM_TV_STAGE = 9
COUNTRY = 10
AFRO_BEATS = 11
WORLDWIDE = 12
REGGAE_DANCE_HALL = 13
HOUSE = 14
K_POP = 15
FRENCH_POP = 16
SINGER_SONGWRITER = 17
REGIONAL_MEXICANO = 18
class SampleRate(IntEnum):
# Enum keys are sample rates in Hz
_8000 = 1
_11025 = 2
_16000 = 3
_32000 = 4
_44100 = 5
_48000 = 6
class FrequencyBand(IntEnum):
# Enum keys are frequency ranges in Hz
hz_0_250 = -1 # Nothing above 250 Hz is actually stored
hz_250_520 = 0
hz_520_1450 = 1
hz_1450_3500 = 2
hz_3500_5500 = 3 # This one (3.5 KHz - 5.5 KHz) should not be used in legacy mode

View File

@@ -0,0 +1,14 @@
class FailedDecodeJson(Exception):
pass
class BadCityName(Exception):
pass
class BadCountryName(Exception):
pass
class BadMethod(Exception):
pass

View File

@@ -0,0 +1,133 @@
from dataclass_factory import Schema
class FactorySchemas:
FACTORY_TRACK_SCHEMA = Schema(
name_mapping={
"photo_url": ("images", "coverarthq"),
"ringtone": ("hub", "actions", 1, "uri"),
"artist_id": ("artists", 0, "id"),
"apple_music_url": ("hub", "options", 0, "actions", 0, "uri"),
"spotify_url": ("hub", "providers", 0, "actions", 0, "uri"),
"spotify_uri": ("hub", "providers", 0, "actions", 1, "uri"),
"sections": "sections",
},
skip_internal=True,
)
FACTORY_ARTIST_SCHEMA = Schema(
name_mapping={
"avatar": "avatar",
"genres": ("genres", "secondaries"),
"genres_primary": ("genres", "primary"),
"adam_id": "adamid",
"url": "weburl",
}
)
FACTORY_SONG_SECTION_SCHEMA = Schema(
name_mapping={
"type": "type",
"meta_pages": "metapages",
"tab_name": "tabname",
"metadata": "metadata",
},
skip_internal=True,
)
FACTORY_VIDEO_SECTION_SCHEMA = Schema(
name_mapping={
"type": "type",
"youtube_url": "youtubeurl",
"tab_name": "tabname",
},
skip_internal=True,
)
FACTORY_RELATED_SECTION_SCHEMA = Schema(
name_mapping={
"type": "type",
"url": "url",
"tab_name": "tabname",
},
skip_internal=True,
)
FACTORY_YOUTUBE_TRACK_SCHEMA = Schema(
name_mapping={
"caption": "caption",
"image": "image",
"actions": "actions",
},
skip_internal=True,
)
FACTORY_RESPONSE_TRACK_SCHEMA = Schema(
name_mapping={
"matches": "matches",
"location": "location",
"retry_ms": "retryms",
"timestamp": "timestamp",
"timezone": "timezone",
"track": "track",
"tag_id": "tagid",
},
skip_internal=True,
)
FACTORY_LYRICS_SECTION = Schema(
name_mapping={
"type": "type",
"text": "text",
"footer": "footer",
"tab_name": "tabname",
"beacon_data": "beacondata",
},
)
FACTORY_BEACON_DATA_LYRICS_SECTION = Schema(
name_mapping={
"lyrics_id": "lyricsid",
"provider_name": "providername",
"common_track_id": "commontrackid",
}
)
FACTORY_ARTIST_SECTION = Schema(
name_mapping={
"type": "type",
"id": "id",
"name": "name",
"verified": "verified",
"actions": "actions",
"tab_name": "tabname",
"top_tracks": "toptracks",
}
)
FACTORY_MATCH = Schema(
name_mapping={
"id": "id",
"offset": "offset",
"channel": "channel",
"time_skew": "timeskew",
"frequency_skew": "frequencyskew",
}
)
FACTORY_ATTRIBUTES_ARTIST = Schema(
name_mapping={
"name": "name",
"url": "url",
"artist_bio": "artistBio",
"genre_names": "genreNames",
}
)
FACTORY_ARTIST_V2 = Schema(
name_mapping={
"id": "id",
"type": "type",
"attributes": "attributes",
}
)

View File

@@ -0,0 +1,44 @@
from dataclass_factory import Factory
from shazamio.factory import FactorySchemas
from shazamio.schemas.artists import ArtistInfo
from shazamio.schemas.artists import ArtistV3
from shazamio.schemas.attributes import ArtistAttribute
from shazamio.schemas.models import (
SongSection,
VideoSection,
RelatedSection,
LyricsSection,
BeaconDataLyricsSection,
ArtistSection,
MatchModel,
)
from shazamio.schemas.models import TrackInfo
from shazamio.schemas.models import YoutubeData
from shazamio.schemas.models import ResponseTrack
FACTORY_TRACK = Factory(
schemas={
TrackInfo: FactorySchemas.FACTORY_TRACK_SCHEMA,
SongSection: FactorySchemas.FACTORY_SONG_SECTION_SCHEMA,
VideoSection: FactorySchemas.FACTORY_VIDEO_SECTION_SCHEMA,
LyricsSection: FactorySchemas.FACTORY_LYRICS_SECTION,
BeaconDataLyricsSection: FactorySchemas.FACTORY_BEACON_DATA_LYRICS_SECTION,
ArtistSection: FactorySchemas.FACTORY_ARTIST_SECTION,
MatchModel: FactorySchemas.FACTORY_MATCH,
RelatedSection: FactorySchemas.FACTORY_RELATED_SECTION_SCHEMA,
YoutubeData: FactorySchemas.FACTORY_YOUTUBE_TRACK_SCHEMA,
ResponseTrack: FactorySchemas.FACTORY_RESPONSE_TRACK_SCHEMA,
},
debug_path=True,
)
FACTORY_ARTIST = Factory(
schemas={
ArtistAttribute: FactorySchemas.FACTORY_ATTRIBUTES_ARTIST,
ArtistV3: FactorySchemas.FACTORY_ARTIST_V2,
ArtistInfo: FactorySchemas.FACTORY_ARTIST_SCHEMA,
},
debug_path=True,
)

View File

@@ -0,0 +1,69 @@
from random import choice
from shazamio.user_agent import USER_AGENTS
class ShazamUrl:
SEARCH_FROM_FILE = (
"https://amp.shazam.com/discovery/v5/{language}/{endpoint_country}/iphone/-/tag"
"/{uuid_1}/{uuid_2}?sync=true&webv3=true&sampling=true"
"&connected=&shazamapiversion=v3&sharehub=true&hubv5minorversion=v5.1&hidelb=true&video=v3"
)
TOP_TRACKS_WORLD = (
"https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks"
"/ip-global-chart?pageSize={limit}&startFrom={offset}"
)
ABOUT_TRACK = (
"https://www.shazam.com/discovery/v5/{language}/{endpoint_country}/web/-/track"
"/{track_id}?shazamapiversion=v3&video=v3 "
)
TOP_TRACKS_COUNTRY = (
"https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks"
"/ip-country-chart-{country_code}?pageSize={limit}&startFrom={offset}"
)
TOP_TRACKS_CITY = (
"https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks"
"/ip-city-chart-{city_id}?pageSize={limit}&startFrom={offset}"
)
CITY_IDS = "https://raw.githubusercontent.com/dotX12/dotX12/main/city.json"
GENRE_WORLD = (
"https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks"
"/genre-global-chart-{genre}?pageSize={limit}&startFrom={offset}"
)
GENRE_COUNTRY = (
"https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks"
"/genre-country-chart-{country}-{genre}?pageSize={limit}&startFrom={offset}"
)
RELATED_SONGS = (
"https://cdn.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks"
"/track-similarities-id-{track_id}?startFrom={offset}&pageSize={limit}&connected=&channel="
)
SEARCH_ARTIST = (
"https://www.shazam.com/services/search/v4/{language}/{endpoint_country}/web"
"/search?term={query}&limit={limit}&offset={offset}&types=artists"
)
SEARCH_MUSIC = (
"https://www.shazam.com/services/search/v3/{language}/{endpoint_country}/web"
"/search?query={query}&numResults={limit}&offset={offset}&types=songs"
)
LISTENING_COUNTER = "https://www.shazam.com/services/count/v2/web/track/{}"
SEARCH_ARTIST_V2 = (
"https://www.shazam.com/services/amapi/v1/catalog/{endpoint_country}/artists/{artist_id}"
)
class Request:
TIME_ZONE = "Europe/Moscow"
def __init__(self, language: str):
self.language = language
def headers(self):
return {
"X-Shazam-Platform": "IPHONE",
"X-Shazam-AppVersion": "14.1.0",
"Accept": "*/*",
"Accept-Language": self.language,
"Accept-Encoding": "gzip, deflate",
"User-Agent": choice(USER_AGENTS),
}

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel
from shazamio.schemas.photos import ImageModel
class PlayParams(BaseModel):
id: str
kind: str
class EditorialArtwork(BaseModel):
subscription_hero: Optional[ImageModel] = Field(None, alias="subscriptionHero")
store_flow_case: Optional[ImageModel] = Field(None, alias="storeFlowcase")
class EditorialNotes(BaseModel):
standard: Optional[str] = None
short: Optional[str] = None
class AttributesFullAlbums(BaseModel):
copyright: str
genre_names: List[str] = Field(..., alias="genreNames")
release_date: str = Field(..., alias="releaseDate")
is_mastered_for_itunes: bool = Field(..., alias="isMasteredForItunes")
upc: str
artwork: ImageModel
play_params: PlayParams = Field(..., alias="playParams")
url: str
record_label: str = Field(..., alias="recordLabel")
track_count: int = Field(..., alias="trackCount")
is_compilation: bool = Field(..., alias="isCompilation")
is_prerelease: bool = Field(..., alias="isPrerelease")
audio_traits: List[str] = Field(..., alias="audioTraits")
editorial_artwork: EditorialArtwork = Field(..., alias="editorialArtwork")
is_single: bool = Field(..., alias="isSingle")
name: str
artist_name: str = Field(..., alias="artistName")
content_rating: Optional[str] = Field(None, alias="contentRating")
is_complete: bool = Field(..., alias="isComplete")
editorial_notes: Optional[EditorialNotes] = Field(None, alias="editorialNotes")
class FullAlbumsModel(BaseModel):
href: Optional[str] = None
attributes: Optional[AttributeName] = None
data: Optional[List[BaseDataModel[AttributesFullAlbums]]] = None

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel
from shazamio.schemas.photos import ImageModel
class PlayParams(BaseModel):
id: str
kind: str
class AttributeLastRelease(BaseModel):
copyright: str
genre_names: List[str] = Field(..., alias="genreNames")
release_date: str = Field(..., alias="releaseDate")
is_mastered_for_itunes: bool = Field(..., alias="isMasteredForItunes")
upc: str
artwork: ImageModel
play_params: PlayParams = Field(..., alias="playParams")
url: str
record_label: str = Field(..., alias="recordLabel")
track_count: int = Field(..., alias="trackCount")
is_compilation: bool = Field(..., alias="isCompilation")
is_prerelease: bool = Field(..., alias="isPrerelease")
audio_traits: List[str] = Field(..., alias="audioTraits")
editorial_artwork: Dict[str, Any] = Field(..., alias="editorialArtwork")
is_single: bool = Field(..., alias="isSingle")
name: str
artist_name: str = Field(..., alias="artistName")
content_rating: Optional[str] = Field(None, alias="contentRating")
is_complete: bool = Field(..., alias="isComplete")
class LastReleaseModel(BaseModel):
href: Optional[str] = None
attributes: Optional[AttributeName] = None
data: Optional[List[BaseDataModel[AttributeLastRelease]]] = None

View File

@@ -0,0 +1,47 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseHrefNextData
from shazamio.schemas.base import BaseIdTypeHref
from shazamio.schemas.photos import ImageModel
class EditorialArtwork(BaseModel):
centered_fullscreen_background: Optional[ImageModel] = Field(
None, alias="centeredFullscreenBackground"
)
subscription_hero: Optional[ImageModel] = Field(None, alias="subscriptionHero")
banner_uber: Optional[ImageModel] = Field(None, alias="bannerUber")
class Attributes(BaseModel):
genre_names: List[str] = Field(..., alias="genreNames")
editorial_artwork: EditorialArtwork = Field(..., alias="editorialArtwork")
name: str
artwork: ImageModel
url: str
origin: Optional[str] = None
artist_bio: Optional[str] = Field(None, alias="artistBio")
class Relationships(BaseModel):
albums: BaseHrefNextData[List[BaseIdTypeHref]]
class Datum(BaseModel):
id: str
type: str
href: str
attributes: Attributes
relationships: Relationships
class SimularArtist(BaseModel):
href: Optional[str] = None
next: Optional[str] = None
attributes: Optional[AttributeName] = None
data: Optional[List[Datum]] = None

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel
from shazamio.schemas.photos import ImageModel
class PlayParams(BaseModel):
id: str
kind: str
class Preview(BaseModel):
url: str
hls_url: str = Field(..., alias="hlsUrl")
artwork: ImageModel
class Attributes(BaseModel):
genre_names: List[str] = Field(..., alias="genreNames")
release_date: str = Field(..., alias="releaseDate")
duration_in_millis: int = Field(..., alias="durationInMillis")
isrc: str
artwork: ImageModel
play_params: PlayParams = Field(..., alias="playParams")
url: str
has4_k: bool = Field(..., alias="has4K")
editorial_artwork: Dict[str, Any] = Field(..., alias="editorialArtwork")
has_hdr: bool = Field(..., alias="hasHDR")
name: str
previews: List[Preview]
artist_name: str = Field(..., alias="artistName")
content_rating: Optional[str] = Field(None, alias="contentRating")
album_name: Optional[str] = Field(None, alias="albumName")
track_number: Optional[int] = Field(None, alias="trackNumber")
class TopMusicVideosView(BaseModel):
href: Optional[str] = None
attributes: Optional[AttributeName] = None
data: Optional[List[BaseDataModel[Attributes]]] = None

View File

@@ -0,0 +1,44 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
from shazamio.schemas.artist.views.top_music import PlayParams
from shazamio.schemas.attributes import AttributeName
from shazamio.schemas.base import BaseDataModel
from shazamio.schemas.photos import ImageModel
from shazamio.schemas.urls import UrlDTO
class AttributesTopSong(BaseModel):
has_time_synced_lyrics: bool = Field(..., alias="hasTimeSyncedLyrics")
album_name: Optional[str] = Field(None, alias="albumName")
genre_names: List = Field(..., alias="genreNames")
track_number: int = Field(..., alias="trackNumber")
release_date: str = Field(..., alias="releaseDate")
duration_in_millis: int = Field(..., alias="durationInMillis")
is_vocal_attenuation_allowed: bool = Field(..., alias="isVocalAttenuationAllowed")
is_mastered_for_itunes: bool = Field(..., alias="isMasteredForItunes")
isrc: str
artwork: ImageModel
composer_name: str = Field(..., alias="composerName")
audio_locale: str = Field(..., alias="audioLocale")
url: str
play_params: PlayParams = Field(..., alias="playParams")
disc_number: int = Field(..., alias="discNumber")
has_lyrics: bool = Field(..., alias="hasLyrics")
is_apple_digital_master: bool = Field(..., alias="isAppleDigitalMaster")
audio_traits: List[str] = Field(..., alias="audioTraits")
name: str
previews: List[UrlDTO] = Field([])
artist_name: str = Field(..., alias="artistName")
content_rating: Optional[str] = Field(None, alias="contentRating")
class TopSong(BaseModel):
id: Optional[str] = None
type: Optional[str] = None
href: Optional[str] = None
attributes: Optional[AttributeName] = None
data: Optional[List[BaseDataModel[AttributesTopSong]]] = None

View File

@@ -0,0 +1,100 @@
from dataclasses import dataclass
from dataclasses import field
from typing import List
from typing import Optional
from typing import Union
from pydantic import BaseModel
from pydantic import Field
from shazamio.schemas.artist.views.full_albums import FullAlbumsModel
from shazamio.schemas.artist.views.last_release import LastReleaseModel
from shazamio.schemas.artist.views.simular_artists import SimularArtist
from shazamio.schemas.artist.views.top_music import TopMusicVideosView
from shazamio.schemas.artist.views.top_song import TopSong
from shazamio.schemas.attributes import ArtistAttribute
from shazamio.schemas.enums import ArtistExtend
from shazamio.schemas.enums import ArtistView
from shazamio.schemas.errors import ErrorModel
@dataclass
class ArtistInfo:
name: str
verified: Optional[bool]
genres: Optional[List[str]] = field(default_factory=list)
alias: Optional[str] = None
genres_primary: Optional[str] = None
avatar: Optional[Union[dict, str]] = None
adam_id: Optional[int] = None
url: Optional[str] = ""
def __post_init__(self):
self.avatar = self.__optional_avatar()
def __optional_avatar(self) -> Optional[str]:
if self.avatar is None:
return None
elif "default" in self.avatar:
return self.avatar.get("default")
else:
return "".join(self.avatar)
@dataclass
class ArtistV2:
artist: ArtistInfo
@dataclass
class ArtistQuery:
views: List[ArtistView] = field(default_factory=list)
extend: List[ArtistExtend] = field(default_factory=list)
@dataclass
class ArtistAvatar:
width: int
height: int
url: str
@classmethod
def url_with_size(cls, height: int, width: int) -> str:
return cls.url.format(w=width, h=height)
class AlbumRelationshipElement(BaseModel):
id: str
type: str
href: str
class AlbumRelationship(BaseModel):
href: str
next: Optional[str] = None
data: List[AlbumRelationshipElement]
class ArtistRelationships(BaseModel):
albums: AlbumRelationship
class ArtistViews(BaseModel):
top_music_videos: Optional[TopMusicVideosView] = Field(None, alias="top-music-videos")
simular_artists: Optional[SimularArtist] = Field(None, alias="similar-artists")
latest_release: Optional[LastReleaseModel] = Field(None, alias="latest-release")
full_albums: Optional[FullAlbumsModel] = Field(None, alias="full-albums")
top_songs: Optional[TopSong] = Field(None, alias="top-songs")
class ArtistV3(BaseModel):
id: str
type: str
attributes: ArtistAttribute
relationships: ArtistRelationships
views: ArtistViews
class ArtistResponse(BaseModel):
errors: List[ErrorModel] = []
data: List[ArtistV3] = []

View File

@@ -0,0 +1,16 @@
from typing import List
from typing import Optional
from pydantic import BaseModel
from pydantic import Field
class AttributeName(BaseModel):
title: str
class ArtistAttribute(BaseModel):
genre_names: List[str] = Field([], alias="genreNames")
name: str
url: str
artist_bio: Optional[str] = Field(None, alias="artistBio")

View File

@@ -0,0 +1,25 @@
from typing import Generic
from typing import Optional
from typing import TypeVar
from pydantic import BaseModel
from pydantic.generics import GenericModel
T = TypeVar("T", bound=BaseModel)
class BaseIdTypeHref(BaseModel):
id: str
type: str
href: str
class BaseDataModel(GenericModel, BaseModel, Generic[T]):
attributes: T
class BaseHrefNextData(GenericModel, Generic[T]):
href: str
next: Optional[str] = None
data: T

View File

@@ -0,0 +1,18 @@
from enum import Enum
class ArtistExtend(str, Enum):
ARTIST_BIO = "artistBio"
BORN_OF_FORMED = "bornOrFormed"
EDITORIAL_ARTWORK = "editorialArtwork"
ORIGIN = "origin"
class ArtistView(str, Enum):
FULL_ALBUMS = "full-albums"
FEATURED_ALBUMS = "featured-albums"
LATEST_RELEASE = "latest-release"
TOP_MUSIC_VIDEOS = "top-music-videos"
SIMULAR_ARTISTS = "similar-artists"
TOP_SONGS = "top-songs"
PLAYLISTS = "playlists"

View File

@@ -0,0 +1,10 @@
from dataclasses import dataclass
@dataclass
class ErrorModel:
id: str
title: str
detail: str
status: str
code: str

View File

@@ -0,0 +1,201 @@
from dataclasses import dataclass
from dataclasses import field
from typing import List
from typing import Optional
from typing import Union
from urllib.parse import urlencode
from urllib.parse import urlparse
from urllib.parse import urlunparse
from uuid import UUID
@dataclass
class ShareModel:
subject: str
text: str
href: str
image: str
twitter: str
html: str
snapchat: str
@dataclass
class ActionModel:
name: str
type: str
share: ShareModel
uri: str
@dataclass
class SongMetaPages:
image: str
caption: str
@dataclass
class SongMetadata:
title: str
text: str
@dataclass
class SongSection:
type: str
meta_pages: List[SongMetaPages]
tab_name: str
metadata: List[SongMetadata]
@dataclass
class BaseIdTypeModel:
type: str
id: str
@dataclass
class TopTracksModel:
url: str
@dataclass
class ArtistSection:
type: str
id: str
name: str
verified: bool
actions: List[BaseIdTypeModel]
tab_name: str
top_tracks: TopTracksModel
class BeaconDataLyricsSection:
lyrics_id: str
provider_name: str
common_track_id: str
@dataclass
class LyricsSection:
type: str
text: List[str]
footer: str
tab_name: str
beacon_data: Optional[BeaconDataLyricsSection]
@dataclass
class VideoSection:
tab_name: str
youtube_url: str
type: str = "VIDEO"
@dataclass
class RelatedSection:
type: str
url: str
tab_name: str
@dataclass
class DimensionsModel:
width: int
height: int
@dataclass
class YoutubeImageModel:
dimensions: DimensionsModel
url: str
@dataclass
class MatchModel:
id: str
offset: float
time_skew: float
frequency_skew: float
channel: Optional[str] = field(default=None)
@dataclass
class LocationModel:
accuracy: float
@dataclass
class YoutubeData:
caption: str
image: YoutubeImageModel
actions: List[ActionModel]
uri: Optional[str] = None
def __post_init__(self):
self.uri = self.__get_youtube_uri()
def __get_youtube_uri(self):
if self.actions:
for action in self.actions:
if action.uri:
return action.uri
@dataclass
class TrackInfo:
key: int
title: str
subtitle: str
artist_id: Optional[str] = field(default=None)
shazam_url: str = None
photo_url: Optional[str] = field(init=False, default=None)
spotify_uri_query: Optional[str] = None
apple_music_url: Optional[str] = None
ringtone: Optional[str] = None
spotify_url: Optional[str] = field(default=None)
spotify_uri: Optional[str] = field(default=None)
youtube_link: Optional[str] = None
sections: Optional[
List[
Union[
SongSection,
VideoSection,
RelatedSection,
ArtistSection,
LyricsSection,
]
]
] = field(default_factory=list)
def __post_init__(self):
self.shazam_url = f"https://www.shazam.com/track/{self.artist_id}"
self.apple_music_url = self.__apple_music_url()
self.spotify_uri_query = self.__short_uri()
self.youtube_link = self.__youtube_link()
def __apple_music_url(self):
url_parse_list = list(urlparse(self.apple_music_url))
url_parse_list[4] = urlencode({}, doseq=True)
url_deleted_query = urlunparse(url_parse_list)
return url_deleted_query
def __short_uri(self):
if self.spotify_uri:
return self.spotify_uri.split("spotify:search:")[1]
def __youtube_link(self):
for i in self.sections:
if type(i) is VideoSection:
return i.youtube_url
@dataclass
class ResponseTrack:
tag_id: Optional[UUID]
retry_ms: Optional[int] = field(default=None)
location: Optional[LocationModel] = field(default=None)
matches: List[MatchModel] = field(default_factory=list)
timestamp: Optional[int] = field(default=None)
timezone: Optional[str] = field(default=None)
track: Optional[TrackInfo] = field(default=None)

View File

@@ -0,0 +1,16 @@
from typing import Optional
from pydantic import BaseModel
from pydantic import Field
class ImageModel(BaseModel):
width: int
url: str
height: int
text_color3: Optional[str] = Field(None, alias="textColor3")
text_color2: Optional[str] = Field(None, alias="textColor2")
text_color4: Optional[str] = Field(None, alias="textColor4")
text_color1: Optional[str] = Field(None, alias="textColor1")
bg_color: Optional[str] = Field(None, alias="bgColor")
has_p3: bool = Field(..., alias="hasP3")

View File

@@ -0,0 +1,5 @@
from pydantic import BaseModel
class UrlDTO(BaseModel):
url: str

View File

@@ -0,0 +1,32 @@
from typing import Union
from shazamio.factory_misc import FACTORY_ARTIST
from shazamio.factory_misc import FACTORY_TRACK
from shazamio.schemas.artists import ArtistInfo
from shazamio.schemas.artists import ArtistResponse
from shazamio.schemas.artists import ArtistV2
from shazamio.schemas.models import ResponseTrack
from shazamio.schemas.models import TrackInfo
from shazamio.schemas.models import YoutubeData
class Serialize:
@classmethod
def track(cls, data):
return FACTORY_TRACK.load(data, TrackInfo)
@classmethod
def youtube(cls, data):
return FACTORY_TRACK.load(data, YoutubeData)
@classmethod
def artist_v2(cls, data) -> ArtistResponse:
return ArtistResponse.parse_obj(data)
@classmethod
def artist(cls, data):
return FACTORY_ARTIST.load(data, Union[ArtistV2, ArtistInfo])
@classmethod
def full_track(cls, data):
return FACTORY_TRACK.load(data, ResponseTrack)

View File

@@ -0,0 +1,263 @@
from typing import Dict, List
from base64 import b64decode, b64encode
from math import exp, sqrt
from binascii import crc32
from io import BytesIO
from ctypes import *
from .enums import FrequencyBand, SampleRate
DATA_URI_PREFIX = "data:audio/vnd.shazam.sig;base64,"
class RawSignatureHeader(LittleEndianStructure):
_pack = True
_fields_ = [
("magic1", c_uint32), # Fixed 0xcafe2580 - 80 25 fe ca
(
"crc32",
c_uint32,
), # CRC-32 for all following (so excluding these first 8 bytes)
("size_minus_header", c_uint32),
# Total size of the message, minus the size of the current header (which is 48 bytes)
("magic2", c_uint32), # Fixed 0x94119c00 - 00 9c 11 94
("void1", c_uint32 * 3), # Void
("shifted_sample_rate_id", c_uint32),
# A member of SampleRate (usually 3 for 16000 Hz), left-shifted by 27 (usually giving
# 0x18000000 - 00 00 00 18)
("void2", c_uint32 * 2), # Void, or maybe used only in "rolling window" mode?
("number_samples_plus_divided_sample_rate", c_uint32),
# int(number_of_samples + sample_rate * 0.24) - As the sample rate is known thanks to the
# field above,
# it can be inferred and subtracted so that we obtain the number of samples,
# and from the number of samples and sample rate we can obtain the length of the recording
("fixed_value", c_uint32)
# Calculated as ((15 << 19) + 0x40000) - 0x7c0000 or 00 00 7c 00 - seems pretty constant,
# may be different in the "SigType.STREAMING" mode
]
class FrequencyPeak:
fft_pass_number: int = None
peak_magnitude: int = None
corrected_peak_frequency_bin: int = None
sample_rate_hz: int = None
def __init__(
self,
fft_pass_number: int,
peak_magnitude: int,
corrected_peak_frequency_bin: int,
sample_rate_hz: int,
):
self.fft_pass_number = fft_pass_number
self.peak_magnitude = peak_magnitude
self.corrected_peak_frequency_bin = corrected_peak_frequency_bin
self.sample_rate_hz = sample_rate_hz
def get_frequency_hz(self) -> float:
return self.corrected_peak_frequency_bin * (self.sample_rate_hz / 2 / 1024 / 64)
# ^ Convert back FFT bin to a frequency, given a 16 KHz sample
# rate, 1024 useful bins and the multiplication by 64 made before
# storing the information
def get_amplitude_pcm(self) -> float:
return sqrt(exp((self.peak_magnitude - 6144) / 1477.3) * (1 << 17) / 2) / 1024
# ^ Not sure about this calculation but gives small enough numbers
def get_seconds(self) -> float:
return (self.fft_pass_number * 128) / self.sample_rate_hz
# ^ Assume that new FFT bins are emitted every 128 samples, on a
# standard 16 KHz sample rate basis.
class DecodedMessage:
sample_rate_hz: int = None
number_samples: int = None
frequency_band_to_sound_peaks: Dict[FrequencyBand, List[FrequencyPeak]] = None
@classmethod
def decode_from_binary(cls, data: bytes):
self = cls()
buf = BytesIO(data)
buf.seek(8)
check_summable_data = buf.read()
buf.seek(0)
# Read and check the header
header = RawSignatureHeader()
buf.readinto(header)
assert header.magic1 == 0xCAFE2580
assert header.size_minus_header == len(data) - 48
assert crc32(check_summable_data) & 0xFFFFFFFF == header.crc32
assert header.magic2 == 0x94119C00
self.sample_rate_hz = int(SampleRate(header.shifted_sample_rate_id >> 27).name.strip("_"))
self.number_samples = int(
header.number_samples_plus_divided_sample_rate - self.sample_rate_hz * 0.24
)
# Read the type-length-value sequence that follows the header
# The first chunk is fixed and has no value, but instead just repeats
# the length of the message size minus the header:
assert int.from_bytes(buf.read(4), "little") == 0x40000000
assert int.from_bytes(buf.read(4), "little") == len(data) - 48
# Then, lists of frequency peaks for respective bands follow
self.frequency_band_to_sound_peaks = {}
while True:
tlv_header = buf.read(8)
if not tlv_header:
break
frequency_band_id = int.from_bytes(tlv_header[:4], "little")
frequency_peaks_size = int.from_bytes(tlv_header[4:], "little")
frequency_peaks_padding = -frequency_peaks_size % 4
frequency_peaks_buf = BytesIO(buf.read(frequency_peaks_size))
buf.read(frequency_peaks_padding)
# Decode frequency peaks
frequency_band = FrequencyBand(frequency_band_id - 0x60030040)
fft_pass_number = 0
self.frequency_band_to_sound_peaks[frequency_band] = []
while True:
raw_fft_pass: bytes = frequency_peaks_buf.read(1)
if not raw_fft_pass:
break
fft_pass_offset: int = raw_fft_pass[0]
if fft_pass_offset == 0xFF:
fft_pass_number = int.from_bytes(frequency_peaks_buf.read(4), "little")
continue
else:
fft_pass_number += fft_pass_offset
peak_magnitude = int.from_bytes(frequency_peaks_buf.read(2), "little")
corrected_peak_frequency_bin = int.from_bytes(frequency_peaks_buf.read(2), "little")
self.frequency_band_to_sound_peaks[frequency_band].append(
FrequencyPeak(
fft_pass_number,
peak_magnitude,
corrected_peak_frequency_bin,
self.sample_rate_hz,
)
)
return self
@classmethod
def decode_from_uri(cls, uri: str):
assert uri.startswith(DATA_URI_PREFIX)
return cls.decode_from_binary(b64decode(uri.replace(DATA_URI_PREFIX, "", 1)))
"""
Encode the current object to a readable JSON format, for debugging
purposes.
"""
def encode_to_json(self) -> dict:
return {
"sample_rate_hz": self.sample_rate_hz,
"number_samples": self.number_samples,
"_seconds": self.number_samples / self.sample_rate_hz,
"frequency_band_to_peaks": {
frequency_band.name.strip("_"): [
{
"fft_pass_number": frequency_peak.fft_pass_number,
"peak_magnitude": frequency_peak.peak_magnitude,
"corrected_peak_frequency_bin": frequency_peak.corrected_peak_frequency_bin,
"_frequency_hz": frequency_peak.get_frequency_hz(),
"_amplitude_pcm": frequency_peak.get_amplitude_pcm(),
"_seconds": frequency_peak.get_seconds(),
}
for frequency_peak in frequency_peaks
]
for frequency_band, frequency_peaks in sorted(
self.frequency_band_to_sound_peaks.items()
)
},
}
def encode_to_binary(self) -> bytes:
header = RawSignatureHeader()
header.magic1 = 0xCAFE2580
header.magic2 = 0x94119C00
header.shifted_sample_rate_id = int(getattr(SampleRate, "_%s" % self.sample_rate_hz)) << 27
header.fixed_value = (15 << 19) + 0x40000
header.number_samples_plus_divided_sample_rate = int(
self.number_samples + self.sample_rate_hz * 0.24
)
contents_buf = BytesIO()
for frequency_band, frequency_peaks in sorted(self.frequency_band_to_sound_peaks.items()):
peaks_buf = BytesIO()
fft_pass_number = 0
# NOTE: Correctly filtering and sorting the peaks within the members
# of "self.frequency_band_to_sound_peaks" is the responsibility of the
# caller
for frequency_peak in frequency_peaks:
assert frequency_peak.fft_pass_number >= fft_pass_number
if frequency_peak.fft_pass_number - fft_pass_number >= 255:
peaks_buf.write(b"\xff")
peaks_buf.write(frequency_peak.fft_pass_number.to_bytes(4, "little"))
fft_pass_number = frequency_peak.fft_pass_number
peaks_buf.write(bytes([frequency_peak.fft_pass_number - fft_pass_number]))
peaks_buf.write(frequency_peak.peak_magnitude.to_bytes(2, "little"))
peaks_buf.write(frequency_peak.corrected_peak_frequency_bin.to_bytes(2, "little"))
fft_pass_number = frequency_peak.fft_pass_number
contents_buf.write((0x60030040 + int(frequency_band)).to_bytes(4, "little"))
contents_buf.write(len(peaks_buf.getvalue()).to_bytes(4, "little"))
contents_buf.write(peaks_buf.getvalue())
contents_buf.write(b"\x00" * (-len(peaks_buf.getvalue()) % 4))
# Below, write the full message as a binary stream
header.size_minus_header = len(contents_buf.getvalue()) + 8
buf = BytesIO()
buf.write(header) # We will rewrite it just after in order to include the final CRC-32
buf.write((0x40000000).to_bytes(4, "little"))
buf.write((len(contents_buf.getvalue()) + 8).to_bytes(4, "little"))
buf.write(contents_buf.getvalue())
buf.seek(8)
header.crc32 = crc32(buf.read()) & 0xFFFFFFFF
buf.seek(0)
buf.write(header)
return buf.getvalue()
def encode_to_uri(self) -> str:
return DATA_URI_PREFIX + b64encode(self.encode_to_binary()).decode("ascii")

View File

@@ -0,0 +1,10 @@
class CountryCode:
"""ISO 3166-3 alpha-2 code. Example: RU,NL,UA"""
pass
class ShazamResponse:
"""Dictionary with found data on request"""
pass

View File

@@ -0,0 +1,102 @@
USER_AGENTS = [
"Dalvik/2.1.0 (Linux; U; Android 5.0.2; VS980 4G Build/LRX22G)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-T210 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-P905V Build/LMY47X)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; Vodafone Smart Tab 4G Build/KTU84P)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-G360H Build/KTU84P)",
"Dalvik/2.1.0 (Linux; U; Android 5.0.2; SM-S920L Build/LRX22G)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; Fire Pro Build/LRX21M)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N9005 Build/LRX21V)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G920F Build/MMB29K)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-G7102 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G928F Build/MMB29K)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J500FN Build/LMY48B)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; Coolpad 3320A Build/LMY47V)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-J110F Build/KTU84P)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SAMSUNG-SGH-I747 Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SAMSUNG-SM-T337A Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.3; SGH-T999 Build/JSS15J)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; D6603 Build/23.5.A.0.570)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J700H Build/LMY48B)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; HTC6600LVW Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910G Build/LMY47X)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910T Build/LMY47X)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; C6903 Build/14.4.A.0.157)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G920F Build/MMB29K)",
"Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-I9105P Build/JDQ39)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9192 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G531H Build/LMY48B)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N9005 Build/LRX21V)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; LGMS345 Build/LMY47V)",
"Dalvik/2.1.0 (Linux; U; Android 5.0.2; HTC One Build/LRX22G)",
"Dalvik/2.1.0 (Linux; U; Android 5.0.2; LG-D800 Build/LRX22G)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G531H Build/LMY48B)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N9005 Build/LRX21V)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-T113 Build/KTU84P)",
"Dalvik/1.6.0 (Linux; U; Android 4.2.2; AndyWin Build/JDQ39E)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; Lenovo A7000-a Build/LRX21M)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; LGL16C Build/KOT49I.L16CV11a)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9500 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 5.0.2; SM-A700FD Build/LRX22G)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-G130HN Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-N9005 Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.1.2; LG-E975T Build/JZO54K)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; E1 Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9500 Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-N5100 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-A310F Build/LMY47X)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J105H Build/LMY47V)",
"Dalvik/1.6.0 (Linux; U; Android 4.3; GT-I9305T Build/JSS15J)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; android Build/JDQ39)",
"Dalvik/1.6.0 (Linux; U; Android 4.2.1; HS-U970 Build/JOP40D)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-T561 Build/KTU84P)",
"Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-P3110 Build/JDQ39)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G925T Build/MMB29K)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; HUAWEI Y221-U22 Build/HUAWEIY221-U22)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G530T1 Build/LMY47X)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G920I Build/LMY47X)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; Vodafone Smart ultra 6 Build/LMY47V)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; XT1080 Build/SU6-7.7)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; ASUS MeMO Pad 7 Build/KTU84P)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-G800F Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-N7100 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G925I Build/MMB29K)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; A0001 Build/MMB29X)",
"Dalvik/2.1.0 (Linux; U; Android 5.1; XT1045 Build/LPB23.13-61)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; LGMS330 Build/LMY47V)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; Z970 Build/KTU84P)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N900P Build/LRX21V)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; T1-701u Build/HuaweiMediaPad)",
"Dalvik/2.1.0 (Linux; U; Android 5.1; HTCD100LVWPP Build/LMY47O)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G935R4 Build/MMB29M)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G930V Build/MMB29M)",
"Dalvik/2.1.0 (Linux; U; Android 5.0.2; ZTE Blade Q Lux Build/LRX22G)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; GT-I9060I Build/KTU84P)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; LGUS992 Build/MMB29M)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G900P Build/MMB29M)",
"Dalvik/1.6.0 (Linux; U; Android 4.1.2; SGH-T999L Build/JZO54K)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910V Build/LMY47X)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9500 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-P601 Build/LMY47X)",
"Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-S7272 Build/JDQ39)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910T Build/LMY47X)",
"Dalvik/1.6.0 (Linux; U; Android 4.3; SAMSUNG-SGH-I747 Build/JSS15J)",
"Dalvik/2.1.0 (Linux; U; Android 5.0.2; ZTE Blade Q Lux Build/LRX22G)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G930F Build/MMB29K)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; HTC_PO582 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 6.0; HUAWEI MT7-TL10 Build/HuaweiMT7-TL10)",
"Dalvik/2.1.0 (Linux; U; Android 6.0; LG-H811 Build/MRA58K)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-N7505 Build/KOT49H)",
"Dalvik/2.1.0 (Linux; U; Android 6.0; LG-H815 Build/MRA58K)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.2; LenovoA3300-HV Build/KOT49H)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-G360G Build/KTU84P)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; GT-I9300I Build/KTU84P)",
"Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)",
"Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-J700T Build/MMB29K)",
"Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J500FN Build/LMY48B)",
"Dalvik/1.6.0 (Linux; U; Android 4.2.2; SM-T217S Build/JDQ39)",
"Dalvik/1.6.0 (Linux; U; Android 4.4.4; SAMSUNG-SM-N900A Build/KTU84P)",
]

View File

@@ -0,0 +1,70 @@
import pathlib
from enum import Enum
from io import BytesIO
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import aiofiles
import aiohttp
from aiohttp import ContentTypeError
from pydub import AudioSegment
from shazamio.exceptions import FailedDecodeJson
from shazamio.schemas.artists import ArtistQuery
SongT = Union[str, pathlib.Path, bytes, bytearray]
FileT = Union[str, pathlib.Path]
async def validate_json(
resp: aiohttp.ClientResponse, content_type: str = "application/json"
) -> dict:
try:
return await resp.json(content_type=content_type)
except ContentTypeError as e:
bad_url = str(str(e).split(",")[2]).split("'")[1]
raise FailedDecodeJson(f"Check args, URL is invalid\nURL- {bad_url}")
async def get_file_bytes(file: FileT) -> bytes:
async with aiofiles.open(file, mode="rb") as f:
return await f.read()
async def get_song(data: SongT) -> Union[AudioSegment]:
if isinstance(data, (str, pathlib.Path)):
song_bytes = await get_file_bytes(file=data)
return AudioSegment.from_file(BytesIO(song_bytes))
if isinstance(data, (bytes, bytearray)):
return AudioSegment.from_file(BytesIO(data))
if isinstance(data, AudioSegment):
return data
class QueryBuilder:
def __init__(
self,
source: List[Union[str, Enum]],
):
self.source = source
def to_str(self) -> str:
return ",".join(self.source)
class ArtistQueryGenerator:
def __init__(
self,
source: Optional[ArtistQuery] = None,
):
self.source = source
def params(self) -> Dict[str, str]:
return {
"extend": QueryBuilder(source=self.source.extend or []).to_str(),
"views": QueryBuilder(source=self.source.views or []).to_str(),
}