diff --git a/lib/ShazamIO/LICENSE.txt b/lib/ShazamIO/LICENSE.txt new file mode 100644 index 0000000..57cbe04 --- /dev/null +++ b/lib/ShazamIO/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 dotX12 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/lib/ShazamIO/pyproject.toml b/lib/ShazamIO/pyproject.toml new file mode 100644 index 0000000..78a41b2 --- /dev/null +++ b/lib/ShazamIO/pyproject.toml @@ -0,0 +1,38 @@ +[tool.poetry] +name = "shazamio" +version = "0.0.6" +description = "Is a asynchronous framework from reverse engineered Shazam API written in Python 3.8+ with asyncio and aiohttp." +authors = ["dotX12"] +license = "MIT License" +keywords = ["python", "shazam", "music", "recognize", "api", "async", "asyncio", "aiohttp", "identification"] +readme = "README.md" +homepage = "https://github.com/dotX12/ShazamIO" +repository = "https://github.com/dotX12/ShazamIO" +include = [ + "README.md", + "LICENSE.txt" +] + +[tool.poetry.dependencies] +python = "^3.8" +numpy = "^1.24.0" +aiohttp = "^3.8.3" +pydub = "^0.25.1" +dataclass-factory = "2.16" +aiofiles = "*" +anyio = "^3.6.2" +pydantic = "*" + +[build-system] +requires = ["poetry-core>=1.0.0", "wheel>=0.36,<1.0", "poetry>=1.1,<2", "virtualenv==20.0.33"] +build-backend = "poetry.core.masonry.api" + + +[tool.pytest.ini_options] +addopts = "-scoped" +asyncio_mode = "auto" +filterwarnings = ["ignore::DeprecationWarning"] + + +[tool.black] +line-length = 100 \ No newline at end of file diff --git a/lib/ShazamIO/shazamio/__init__.py b/lib/ShazamIO/shazamio/__init__.py new file mode 100644 index 0000000..1d6b6aa --- /dev/null +++ b/lib/ShazamIO/shazamio/__init__.py @@ -0,0 +1,6 @@ +from .serializers import Serialize +from .api import Shazam +from .converter import Geo +from .enums import GenreMusic + +__all__ = ("Serialize", "Shazam", "Geo", "GenreMusic") diff --git a/lib/ShazamIO/shazamio/algorithm.py b/lib/ShazamIO/shazamio/algorithm.py new file mode 100644 index 0000000..e7ac8a0 --- /dev/null +++ b/lib/ShazamIO/shazamio/algorithm.py @@ -0,0 +1,280 @@ +from copy import copy +from typing import List, Optional, Any +import numpy as np + +from .enums import FrequencyBand +from .signature import DecodedMessage, FrequencyPeak + +HANNING_MATRIX = np.hanning(2050)[1:-1] # Wipe trailing and leading zeroes + + +class RingBuffer(list): + def __init__(self, buffer_size: int, default_value: Any = None): + if default_value is not None: + list.__init__(self, [copy(default_value) for _ in range(buffer_size)]) + else: + list.__init__(self, [None] * buffer_size) + + self.position: int = 0 + self.buffer_size: int = buffer_size + self.num_written: int = 0 + + def append(self, value: Any): + self[self.position] = value + + self.position += 1 + self.position %= self.buffer_size + self.num_written += 1 + + +class SignatureGenerator: + def __init__(self): + # Used when storing input that will be processed when requiring to + # generate a signature: + + self.input_pending_processing: List[int] = [] + # Signed 16-bits, 16 KHz mono samples to be processed + + self.samples_processed: int = 0 + # Number of samples processed out of "self.input_pending_processing" + + # Used when processing input: + + self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0) + + self.fft_outputs: RingBuffer[List[float]] = RingBuffer( + buffer_size=256, default_value=[0.0 * 1025] + ) + # Lists of 1025 floats, premultiplied with a Hanning function before being + # passed through FFT, computed from + # the ring buffer every new 128 samples + + self.spread_fft_output: RingBuffer[List[float]] = RingBuffer( + buffer_size=256, default_value=[0] * 1025 + ) + + # How much data to send to Shazam at once? + + self.MAX_TIME_SECONDS = 3.1 + self.MAX_PEAKS = 255 + + # The object that will hold information about the next fingerpring + # to be produced + + self.next_signature = DecodedMessage() + self.next_signature.sample_rate_hz = 16000 + self.next_signature.number_samples = 0 + self.next_signature.frequency_band_to_sound_peaks = {} + + """ + Add data to be generated a signature for, which will be + processed when self.get_next_signature() is called. This + function expects signed 16-bit 16 KHz mono PCM samples. + """ + + def feed_input(self, s16le_mono_samples: List[int]): + self.input_pending_processing += s16le_mono_samples + + """ + Consume some of the samples fed to self.feed_input(), and return + a Shazam signature (DecodedMessage object) to be sent to servers + once "enough data has been gathered". + + Except if there are no more samples to be consumed, in this case + we will return None. + """ + + def get_next_signature(self) -> Optional[DecodedMessage]: + if len(self.input_pending_processing) - self.samples_processed < 128: + return None + while len(self.input_pending_processing) - self.samples_processed >= 128 and ( + self.next_signature.number_samples / self.next_signature.sample_rate_hz + < self.MAX_TIME_SECONDS + or sum( + len(peaks) for peaks in self.next_signature.frequency_band_to_sound_peaks.values() + ) + < self.MAX_PEAKS + ): + self.process_input( + self.input_pending_processing[self.samples_processed : self.samples_processed + 128] + ) + self.samples_processed += 128 + + returned_signature = self.next_signature + + self.next_signature = DecodedMessage() + self.next_signature.sample_rate_hz = 16000 + self.next_signature.number_samples = 0 + self.next_signature.frequency_band_to_sound_peaks = {} + + self.ring_buffer_of_samples: RingBuffer[int] = RingBuffer(buffer_size=2048, default_value=0) + self.fft_outputs: RingBuffer[List[float]] = RingBuffer( + buffer_size=256, default_value=[0.0 * 1025] + ) + self.spread_fft_output: RingBuffer[List[float]] = RingBuffer( + buffer_size=256, default_value=[0] * 1025 + ) + + return returned_signature + + def process_input(self, s16le_mono_samples: List[int]): + self.next_signature.number_samples += len(s16le_mono_samples) + for position_of_chunk in range(0, len(s16le_mono_samples), 128): + self.do_fft(s16le_mono_samples[position_of_chunk : position_of_chunk + 128]) + self.do_peak_spreading_and_recognition() + + def do_fft(self, batch_of_128_s16le_mono_samples): + type_ring = self.ring_buffer_of_samples.position + len(batch_of_128_s16le_mono_samples) + self.ring_buffer_of_samples[ + self.ring_buffer_of_samples.position : type_ring + ] = batch_of_128_s16le_mono_samples + self.ring_buffer_of_samples.position += len(batch_of_128_s16le_mono_samples) + self.ring_buffer_of_samples.position %= 2048 + self.ring_buffer_of_samples.num_written += len(batch_of_128_s16le_mono_samples) + + excerpt_from_ring_buffer: list = ( + self.ring_buffer_of_samples[self.ring_buffer_of_samples.position :] + + self.ring_buffer_of_samples[: self.ring_buffer_of_samples.position] + ) + + # The pre multiplication of the array is for applying a windowing function before the DFT + # (slight rounded Hanning without zeros at edges) + + fft_results: np.array = np.fft.rfft(HANNING_MATRIX * excerpt_from_ring_buffer) + + fft_results = (fft_results.real**2 + fft_results.imag**2) / (1 << 17) + fft_results = np.maximum(fft_results, 0.0000000001) + + self.fft_outputs.append(fft_results) + + def do_peak_spreading_and_recognition(self): + self.do_peak_spreading() + if self.spread_fft_output.num_written >= 46: + self.do_peak_recognition() + + def do_peak_spreading(self): + origin_last_fft: List[float] = self.fft_outputs[self.fft_outputs.position - 1] + + temporary_array_1 = np.tile(origin_last_fft, 3).reshape((3, -1)) + temporary_array_1[1] = np.roll(temporary_array_1[1], -1) + temporary_array_1[2] = np.roll(temporary_array_1[2], -2) + + origin_last_fft_np = np.hstack([temporary_array_1.max(axis=0)[:-3], origin_last_fft[-3:]]) + + i1, i2, i3 = [ + (self.spread_fft_output.position + former_fft_num) % self.spread_fft_output.buffer_size + for former_fft_num in [-1, -3, -6] + ] + + temporary_array_2 = np.vstack( + [ + origin_last_fft_np, + self.spread_fft_output[i1], + self.spread_fft_output[i2], + self.spread_fft_output[i3], + ] + ) + + temporary_array_2[1] = np.max(temporary_array_2[:2, :], axis=0) + temporary_array_2[2] = np.max(temporary_array_2[:3, :], axis=0) + temporary_array_2[3] = np.max(temporary_array_2[:4, :], axis=0) + + self.spread_fft_output[i1] = temporary_array_2[1].tolist() + self.spread_fft_output[i2] = temporary_array_2[2].tolist() + self.spread_fft_output[i3] = temporary_array_2[3].tolist() + + self.spread_fft_output.append(list(origin_last_fft_np)) + + def do_peak_recognition(self): + fft_minus_46 = self.fft_outputs[ + (self.fft_outputs.position - 46) % self.fft_outputs.buffer_size + ] + fft_minus_49 = self.spread_fft_output[ + (self.spread_fft_output.position - 49) % self.spread_fft_output.buffer_size + ] + + for bin_position in range(10, 1015): + # Ensure that the bin is large enough to be a peak + + if fft_minus_46[bin_position] >= 1 / 64 and ( + fft_minus_46[bin_position] >= fft_minus_49[bin_position - 1] + ): + # Ensure that it is frequency-domain local minimum + + max_neighbor_in_fft_minus_49 = 0 + + for neighbor_offset in [*range(-10, -3, 3), -3, 1, *range(2, 9, 3)]: + max_neighbor_in_fft_minus_49 = max( + fft_minus_49[bin_position + neighbor_offset], + max_neighbor_in_fft_minus_49, + ) + + if fft_minus_46[bin_position] > max_neighbor_in_fft_minus_49: + # Ensure that it is a time-domain local minimum + + max_neighbor_in_other_adjacent_ffts = max_neighbor_in_fft_minus_49 + + for other_offset in [ + -53, + -45, + *range(165, 201, 7), + *range(214, 250, 7), + ]: + max_neighbor_in_other_adjacent_ffts = max( + self.spread_fft_output[ + (self.spread_fft_output.position + other_offset) + % self.spread_fft_output.buffer_size + ][bin_position - 1], + max_neighbor_in_other_adjacent_ffts, + ) + + if fft_minus_46[bin_position] > max_neighbor_in_other_adjacent_ffts: + # This is a peak, store the peak + + fft_number = self.spread_fft_output.num_written - 46 + + peak_magnitude = ( + np.log(max(1 / 64, fft_minus_46[bin_position])) * 1477.3 + 6144 + ) + peak_magnitude_before = ( + np.log(max(1 / 64, fft_minus_46[bin_position - 1])) * 1477.3 + 6144 + ) + peak_magnitude_after = ( + np.log(max(1 / 64, fft_minus_46[bin_position + 1])) * 1477.3 + 6144 + ) + + peak_variation_1 = ( + peak_magnitude * 2 - peak_magnitude_before - peak_magnitude_after + ) + peak_variation_2 = ( + (peak_magnitude_after - peak_magnitude_before) * 32 / peak_variation_1 + ) + + corrected_peak_frequency_bin = bin_position * 64 + peak_variation_2 + + assert peak_variation_1 > 0 + + frequency_hz = corrected_peak_frequency_bin * (16000 / 2 / 1024 / 64) + + if 250 < frequency_hz < 520: + band = FrequencyBand.hz_250_520 + elif 520 < frequency_hz < 1450: + band = FrequencyBand.hz_520_1450 + elif 1450 < frequency_hz < 3500: + band = FrequencyBand.hz_1450_3500 + elif 5500 < frequency_hz <= 5500: + band = FrequencyBand.hz_3500_5500 + else: + continue + + if band not in self.next_signature.frequency_band_to_sound_peaks: + self.next_signature.frequency_band_to_sound_peaks[band] = [] + + self.next_signature.frequency_band_to_sound_peaks[band].append( + FrequencyPeak( + fft_number, + int(peak_magnitude), + int(corrected_peak_frequency_bin), + 16000, + ) + ) diff --git a/lib/ShazamIO/shazamio/api.py b/lib/ShazamIO/shazamio/api.py new file mode 100644 index 0000000..e4b815c --- /dev/null +++ b/lib/ShazamIO/shazamio/api.py @@ -0,0 +1,378 @@ +import pathlib +import uuid +import time +from typing import Optional + +from pydub import AudioSegment + +from typing import Dict, Any, Union + +from .misc import Request +from .misc import ShazamUrl +from .schemas.artists import ArtistQuery +from .signature import DecodedMessage +from .enums import GenreMusic +from .converter import Converter, Geo +from .typehints import CountryCode +from .utils import ArtistQueryGenerator +from .utils import get_song + + +class Shazam(Converter, Geo, Request): + """Is asynchronous framework for reverse engineered Shazam API written in Python 3.7 with + asyncio and aiohttp.""" + + def __init__(self, language: str = "en-US", endpoint_country: str = "GB"): + super().__init__(language=language) + self.language = language + self.endpoint_country = endpoint_country + + async def top_world_tracks(self, limit: int = 200, offset: int = 0) -> Dict[str, Any]: + """ + Search top world tracks + + :param limit: Determines how many songs the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 songs. + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter to + your own. + :return: dict tracks + """ + return await self.request( + "GET", + ShazamUrl.TOP_TRACKS_WORLD.format( + language=self.language, + endpoint_country=self.endpoint_country, + limit=limit, + offset=offset, + ), + headers=self.headers(), + ) + + async def artist_about( + self, artist_id: int, query: Optional[ArtistQuery] = None + ) -> Dict[str, Any]: + """ + Retrieving information from an artist profile + + :param artist_id: Artist number. Example (203347991) + :param query: Foo + https://www.shazam.com/artist/203347991/ + :return: dict about artist + """ + + if query: + pg = ArtistQueryGenerator(source=query) + params_dict = pg.params() + else: + params_dict = {} + + return await self.request( + "GET", + ShazamUrl.SEARCH_ARTIST_V2.format( + endpoint_country=self.endpoint_country, + artist_id=artist_id, + ), + params=params_dict, + headers=self.headers(), + ) + + async def track_about(self, track_id: int) -> Dict[str, Any]: + """ + Get track information + + :param track_id: Track number. Example: (549952578) + https://www.shazam.com/track/549952578/ + :return: dict about track + """ + return await self.request( + "GET", + ShazamUrl.ABOUT_TRACK.format( + language=self.language, + endpoint_country=self.endpoint_country, + track_id=track_id, + ), + headers=self.headers(), + ) + + async def top_country_tracks( + self, + country_code: Union[CountryCode, str], + limit: int = 200, + offset: int = 0, + ) -> Dict[str, Any]: + """ + Get the best tracks by country code + https://www.shazam.com/charts/discovery/netherlands + + :param country_code: ISO 3166-3 alpha-2 code. Example: RU,NL,UA + :param limit: Determines how many songs the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 songs. + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter to + your own. + :return: dict songs + """ + return await self.request( + "GET", + ShazamUrl.TOP_TRACKS_COUNTRY.format( + language=self.language, + endpoint_country=self.endpoint_country, + country_code=country_code, + limit=limit, + offset=offset, + ), + headers=self.headers(), + ) + + async def top_city_tracks( + self, + country_code: Union[CountryCode, str], + city_name: str, + limit: int = 200, + offset: int = 0, + ) -> Dict[str, Any]: + """ + Retrieving information from an artist profile + https://www.shazam.com/charts/top-50/russia/moscow + + :param country_code: ISO 3166-3 alpha-2 code. Example: RU,NL,UA + :param city_name: City name from https://github.com/dotX12/dotX12/blob/main/city.json + Example: Budapest, Moscow + :param limit: Determines how many songs the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 songs. + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter to + your own. + :return: dict songs + """ + city_id = await self.city_id_from(country=country_code, city=city_name) + return await self.request( + "GET", + ShazamUrl.TOP_TRACKS_CITY.format( + language=self.language, + endpoint_country=self.endpoint_country, + limit=limit, + offset=offset, + city_id=city_id, + ), + headers=self.headers(), + ) + + async def top_world_genre_tracks( + self, + genre: Union[GenreMusic, int], + limit: int = 100, + offset: int = 0, + ) -> Dict[str, Any]: + """ + Get world tracks by certain genre + https://www.shazam.com/charts/genre/world/rock + + :param genre: Genre name or ID: + POP = 1, HIP_HOP_RAP = 2, DANCE = 3, ELECTRONIC = 4, RNB_SOUL = 5, ALTERNATIVE = + 6, ROCK = 7 + LATIN = 8, FILM_TV_STAGE = 9, COUNTRY = 10, AFRO_BEATS = 11, WORLDWIDE = 12, + REGGAE_DANCE_HALL = 13 + HOUSE = 14, K_POP = 15, FRENCH_POP = 16, SINGER_SONGWRITER = 17, + REGIONAL_MEXICANO = 18 + + :param limit: Determines how many songs the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 songs. + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter + to your own. + :return: dict songs + """ + return await self.request( + "GET", + ShazamUrl.GENRE_WORLD.format( + language=self.language, + endpoint_country=self.endpoint_country, + limit=limit, + offset=offset, + genre=genre, + ), + headers=self.headers(), + ) + + async def top_country_genre_tracks( + self, + country_code: str, + genre: Union[GenreMusic, int], + limit: int = 200, + offset: int = 0, + ) -> Dict[str, Any]: + """ + The best tracks by a genre in the country + https://www.shazam.com/charts/genre/spain/hip-hop-rap + :param country_code: ISO 3166-3 alpha-2 code. Example: RU,NL,UA + :param genre: Genre name or ID: + POP = 1, HIP_HOP_RAP = 2, DANCE = 3, ELECTRONIC = 4, RNB_SOUL = 5, ALTERNATIVE = + 6, ROCK = 7 + LATIN = 8, FILM_TV_STAGE = 9, COUNTRY = 10, AFRO_BEATS = 11, WORLDWIDE = 12, + REGGAE_DANCE_HALL = 13 + HOUSE = 14, K_POP = 15, FRENCH_POP = 16, SINGER_SONGWRITER = 17, + REGIONAL_MEXICANO = 18 + :param limit: Determines how many songs the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 songs + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter to + your own. + :return: dict songs + """ + return await self.request( + "GET", + ShazamUrl.GENRE_COUNTRY.format( + language=self.language, + endpoint_country=self.endpoint_country, + limit=limit, + offset=offset, + country=country_code, + genre=genre, + ), + headers=self.headers(), + ) + + async def related_tracks( + self, + track_id: int, + limit: int = 20, + offset: int = 0, + ) -> Dict[str, Any]: + """ + Similar songs based song id + https://www.shazam.com/track/546891609/2-phu%CC%81t-ho%CC%9Bn-kaiz-remix + :param track_id: Track number. Example: (549952578) + https://www.shazam.com/track/549952578/ + :param limit: Determines how many songs the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 songs + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter to + your own. + :return: dict tracks + """ + return await self.request( + "GET", + ShazamUrl.RELATED_SONGS.format( + language=self.language, + endpoint_country=self.endpoint_country, + limit=limit, + offset=offset, + track_id=track_id, + ), + headers=self.headers(), + ) + + async def search_artist( + self, + query: str, + limit: int = 10, + offset: int = 0, + ) -> Dict[str, Any]: + """ + Search all artists by prefix or fullname + :param query: Artist name or search prefix + :param limit: Determines how many artists the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 artists. + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter to + your own. + :return: dict artists + """ + return await self.request( + "GET", + ShazamUrl.SEARCH_ARTIST.format( + language=self.language, + endpoint_country=self.endpoint_country, + limit=limit, + offset=offset, + query=query, + ), + headers=self.headers(), + ) + + async def search_track(self, query: str, limit: int = 10, offset: int = 0) -> Dict[str, Any]: + """ + Search all tracks by prefix + :param query: Track full title or prefix title + :param limit: Determines how many songs the maximum can be in the request. + Example: If 5 is specified, the query will return no more than 5 songs. + :param offset: A parameter that determines with which song to display the request. + The default is 0. If you want to skip the first few songs, set this parameter to + your own. + :return: dict songs + """ + return await self.request( + "GET", + ShazamUrl.SEARCH_MUSIC.format( + language=self.language, + endpoint_country=self.endpoint_country, + limit=limit, + offset=offset, + query=query, + ), + headers=self.headers(), + ) + + async def listening_counter(self, track_id: int) -> Dict[str, Any]: + """ + Returns the total track listener counter. + :param track_id: Track number. Example: (559284007) + https://www.shazam.com/track/559284007/rampampam + :return: The data dictionary that contains the listen counter. + """ + + return await self.request( + "GET", + ShazamUrl.LISTENING_COUNTER.format( + track_id, + language=self.language, + ), + headers=self.headers(), + ) + + async def get_youtube_data(self, link: str) -> Dict[str, Any]: + return await self.request("GET", link, headers=self.headers()) + + async def recognize_song( + self, data: Union[str, pathlib.Path, bytes, bytearray, AudioSegment] + ) -> Dict[str, Any]: + """ + Creating a song signature based on a file and searching for this signature in the shazam + database. + :param data: Path to song file or bytes + :return: Dictionary with information about the found song + """ + song = await get_song(data=data) + audio = self.normalize_audio_data(song) + signature_generator = self.create_signature_generator(audio) + signature = signature_generator.get_next_signature() + + if len(signature_generator.input_pending_processing) < 128: + return {"matches": []} + + while not signature: + signature = signature_generator.get_next_signature() + results = await self.send_recognize_request(signature) + return results + + async def send_recognize_request(self, sig: DecodedMessage) -> Dict[str, Any]: + data = Converter.data_search( + Request.TIME_ZONE, + sig.encode_to_uri(), + int(sig.number_samples / sig.sample_rate_hz * 1000), + int(time.time() * 1000), + ) + + return await self.request( + "POST", + ShazamUrl.SEARCH_FROM_FILE.format( + language=self.language, + endpoint_country=self.endpoint_country, + uuid_1=str(uuid.uuid4()).upper(), + uuid_2=str(uuid.uuid4()).upper(), + ), + headers=self.headers(), + json=data, + ) diff --git a/lib/ShazamIO/shazamio/client.py b/lib/ShazamIO/shazamio/client.py new file mode 100644 index 0000000..8862942 --- /dev/null +++ b/lib/ShazamIO/shazamio/client.py @@ -0,0 +1,18 @@ +import aiohttp + +from shazamio.exceptions import BadMethod +from shazamio.utils import validate_json + + +class HTTPClient: + @staticmethod + async def request(method: str, url: str, *args, **kwargs) -> dict: + async with aiohttp.ClientSession() as session: + if method.upper() == "GET": + async with session.get(url, **kwargs) as resp: + return await validate_json(resp, *args) + elif method.upper() == "POST": + async with session.post(url, **kwargs) as resp: + return await validate_json(resp, *args) + else: + raise BadMethod("Accept only GET/POST") diff --git a/lib/ShazamIO/shazamio/converter.py b/lib/ShazamIO/shazamio/converter.py new file mode 100644 index 0000000..fa4ce5f --- /dev/null +++ b/lib/ShazamIO/shazamio/converter.py @@ -0,0 +1,64 @@ +from pydub import AudioSegment +from shazamio.algorithm import SignatureGenerator +from shazamio.client import HTTPClient +from shazamio.exceptions import BadCityName, BadCountryName +from shazamio.misc import ShazamUrl +from shazamio.schemas.models import * +from shazamio.typehints import CountryCode + + +class Geo(HTTPClient): + async def city_id_from(self, country: Union[CountryCode, str], city: str) -> int: + """ + Return City ID from country name and city name. + :param country: - Country name + :param city: - City name + :return: City ID + """ + + data = await self.request("GET", ShazamUrl.CITY_IDS, "text/plain") + for response_country in data["countries"]: + if country == response_country["id"]: + for response_city in response_country["cities"]: + if city == response_city["name"]: + return response_city["id"] + raise BadCityName("City not found, check city name") + + async def all_cities_from_country(self, country: Union[CountryCode, str]) -> list: + cities = [] + data = await self.request("GET", ShazamUrl.CITY_IDS, "text/plain") + for response_country in data["countries"]: + if country == response_country["id"]: + for city in response_country["cities"]: + cities.append(city["name"]) + return cities + raise BadCountryName("Country not found, check country name") + + +class Converter: + @staticmethod + def data_search(timezone: str, uri: str, samplems: int, timestamp: int) -> dict: + return { + "timezone": timezone, + "signature": {"uri": uri, "samplems": samplems}, + "timestamp": timestamp, + "context": {}, + "geolocation": {}, + } + + @staticmethod + def normalize_audio_data(audio: AudioSegment) -> AudioSegment: + audio = audio.set_sample_width(2) + audio = audio.set_frame_rate(16000) + audio = audio.set_channels(1) + + return audio + + @staticmethod + def create_signature_generator(audio: AudioSegment) -> SignatureGenerator: + signature_generator = SignatureGenerator() + signature_generator.feed_input(audio.get_array_of_samples()) + signature_generator.MAX_TIME_SECONDS = 12 + if audio.duration_seconds > 12 * 3: + signature_generator.samples_processed += 16000 * (int(audio.duration_seconds / 2) - 6) + return signature_generator diff --git a/lib/ShazamIO/shazamio/enums.py b/lib/ShazamIO/shazamio/enums.py new file mode 100644 index 0000000..415be9c --- /dev/null +++ b/lib/ShazamIO/shazamio/enums.py @@ -0,0 +1,41 @@ +from enum import IntEnum + + +class GenreMusic(IntEnum): + POP = 1 + HIP_HOP_RAP = 2 + DANCE = 3 + ELECTRONIC = 4 + RNB_SOUL = 5 + ALTERNATIVE = 6 + ROCK = 7 + LATIN = 8 + FILM_TV_STAGE = 9 + COUNTRY = 10 + AFRO_BEATS = 11 + WORLDWIDE = 12 + REGGAE_DANCE_HALL = 13 + HOUSE = 14 + K_POP = 15 + FRENCH_POP = 16 + SINGER_SONGWRITER = 17 + REGIONAL_MEXICANO = 18 + + +class SampleRate(IntEnum): + # Enum keys are sample rates in Hz + _8000 = 1 + _11025 = 2 + _16000 = 3 + _32000 = 4 + _44100 = 5 + _48000 = 6 + + +class FrequencyBand(IntEnum): + # Enum keys are frequency ranges in Hz + hz_0_250 = -1 # Nothing above 250 Hz is actually stored + hz_250_520 = 0 + hz_520_1450 = 1 + hz_1450_3500 = 2 + hz_3500_5500 = 3 # This one (3.5 KHz - 5.5 KHz) should not be used in legacy mode diff --git a/lib/ShazamIO/shazamio/exceptions.py b/lib/ShazamIO/shazamio/exceptions.py new file mode 100644 index 0000000..a564994 --- /dev/null +++ b/lib/ShazamIO/shazamio/exceptions.py @@ -0,0 +1,14 @@ +class FailedDecodeJson(Exception): + pass + + +class BadCityName(Exception): + pass + + +class BadCountryName(Exception): + pass + + +class BadMethod(Exception): + pass diff --git a/lib/ShazamIO/shazamio/factory.py b/lib/ShazamIO/shazamio/factory.py new file mode 100644 index 0000000..f045ad5 --- /dev/null +++ b/lib/ShazamIO/shazamio/factory.py @@ -0,0 +1,133 @@ +from dataclass_factory import Schema + + +class FactorySchemas: + FACTORY_TRACK_SCHEMA = Schema( + name_mapping={ + "photo_url": ("images", "coverarthq"), + "ringtone": ("hub", "actions", 1, "uri"), + "artist_id": ("artists", 0, "id"), + "apple_music_url": ("hub", "options", 0, "actions", 0, "uri"), + "spotify_url": ("hub", "providers", 0, "actions", 0, "uri"), + "spotify_uri": ("hub", "providers", 0, "actions", 1, "uri"), + "sections": "sections", + }, + skip_internal=True, + ) + + FACTORY_ARTIST_SCHEMA = Schema( + name_mapping={ + "avatar": "avatar", + "genres": ("genres", "secondaries"), + "genres_primary": ("genres", "primary"), + "adam_id": "adamid", + "url": "weburl", + } + ) + + FACTORY_SONG_SECTION_SCHEMA = Schema( + name_mapping={ + "type": "type", + "meta_pages": "metapages", + "tab_name": "tabname", + "metadata": "metadata", + }, + skip_internal=True, + ) + + FACTORY_VIDEO_SECTION_SCHEMA = Schema( + name_mapping={ + "type": "type", + "youtube_url": "youtubeurl", + "tab_name": "tabname", + }, + skip_internal=True, + ) + + FACTORY_RELATED_SECTION_SCHEMA = Schema( + name_mapping={ + "type": "type", + "url": "url", + "tab_name": "tabname", + }, + skip_internal=True, + ) + + FACTORY_YOUTUBE_TRACK_SCHEMA = Schema( + name_mapping={ + "caption": "caption", + "image": "image", + "actions": "actions", + }, + skip_internal=True, + ) + + FACTORY_RESPONSE_TRACK_SCHEMA = Schema( + name_mapping={ + "matches": "matches", + "location": "location", + "retry_ms": "retryms", + "timestamp": "timestamp", + "timezone": "timezone", + "track": "track", + "tag_id": "tagid", + }, + skip_internal=True, + ) + + FACTORY_LYRICS_SECTION = Schema( + name_mapping={ + "type": "type", + "text": "text", + "footer": "footer", + "tab_name": "tabname", + "beacon_data": "beacondata", + }, + ) + + FACTORY_BEACON_DATA_LYRICS_SECTION = Schema( + name_mapping={ + "lyrics_id": "lyricsid", + "provider_name": "providername", + "common_track_id": "commontrackid", + } + ) + + FACTORY_ARTIST_SECTION = Schema( + name_mapping={ + "type": "type", + "id": "id", + "name": "name", + "verified": "verified", + "actions": "actions", + "tab_name": "tabname", + "top_tracks": "toptracks", + } + ) + + FACTORY_MATCH = Schema( + name_mapping={ + "id": "id", + "offset": "offset", + "channel": "channel", + "time_skew": "timeskew", + "frequency_skew": "frequencyskew", + } + ) + + FACTORY_ATTRIBUTES_ARTIST = Schema( + name_mapping={ + "name": "name", + "url": "url", + "artist_bio": "artistBio", + "genre_names": "genreNames", + } + ) + + FACTORY_ARTIST_V2 = Schema( + name_mapping={ + "id": "id", + "type": "type", + "attributes": "attributes", + } + ) diff --git a/lib/ShazamIO/shazamio/factory_misc.py b/lib/ShazamIO/shazamio/factory_misc.py new file mode 100644 index 0000000..b9d0076 --- /dev/null +++ b/lib/ShazamIO/shazamio/factory_misc.py @@ -0,0 +1,44 @@ +from dataclass_factory import Factory + +from shazamio.factory import FactorySchemas +from shazamio.schemas.artists import ArtistInfo +from shazamio.schemas.artists import ArtistV3 +from shazamio.schemas.attributes import ArtistAttribute +from shazamio.schemas.models import ( + SongSection, + VideoSection, + RelatedSection, + LyricsSection, + BeaconDataLyricsSection, + ArtistSection, + MatchModel, +) +from shazamio.schemas.models import TrackInfo +from shazamio.schemas.models import YoutubeData +from shazamio.schemas.models import ResponseTrack + + +FACTORY_TRACK = Factory( + schemas={ + TrackInfo: FactorySchemas.FACTORY_TRACK_SCHEMA, + SongSection: FactorySchemas.FACTORY_SONG_SECTION_SCHEMA, + VideoSection: FactorySchemas.FACTORY_VIDEO_SECTION_SCHEMA, + LyricsSection: FactorySchemas.FACTORY_LYRICS_SECTION, + BeaconDataLyricsSection: FactorySchemas.FACTORY_BEACON_DATA_LYRICS_SECTION, + ArtistSection: FactorySchemas.FACTORY_ARTIST_SECTION, + MatchModel: FactorySchemas.FACTORY_MATCH, + RelatedSection: FactorySchemas.FACTORY_RELATED_SECTION_SCHEMA, + YoutubeData: FactorySchemas.FACTORY_YOUTUBE_TRACK_SCHEMA, + ResponseTrack: FactorySchemas.FACTORY_RESPONSE_TRACK_SCHEMA, + }, + debug_path=True, +) + +FACTORY_ARTIST = Factory( + schemas={ + ArtistAttribute: FactorySchemas.FACTORY_ATTRIBUTES_ARTIST, + ArtistV3: FactorySchemas.FACTORY_ARTIST_V2, + ArtistInfo: FactorySchemas.FACTORY_ARTIST_SCHEMA, + }, + debug_path=True, +) diff --git a/lib/ShazamIO/shazamio/misc.py b/lib/ShazamIO/shazamio/misc.py new file mode 100644 index 0000000..f88bb67 --- /dev/null +++ b/lib/ShazamIO/shazamio/misc.py @@ -0,0 +1,69 @@ +from random import choice +from shazamio.user_agent import USER_AGENTS + + +class ShazamUrl: + SEARCH_FROM_FILE = ( + "https://amp.shazam.com/discovery/v5/{language}/{endpoint_country}/iphone/-/tag" + "/{uuid_1}/{uuid_2}?sync=true&webv3=true&sampling=true" + "&connected=&shazamapiversion=v3&sharehub=true&hubv5minorversion=v5.1&hidelb=true&video=v3" + ) + TOP_TRACKS_WORLD = ( + "https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks" + "/ip-global-chart?pageSize={limit}&startFrom={offset}" + ) + ABOUT_TRACK = ( + "https://www.shazam.com/discovery/v5/{language}/{endpoint_country}/web/-/track" + "/{track_id}?shazamapiversion=v3&video=v3 " + ) + TOP_TRACKS_COUNTRY = ( + "https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks" + "/ip-country-chart-{country_code}?pageSize={limit}&startFrom={offset}" + ) + TOP_TRACKS_CITY = ( + "https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks" + "/ip-city-chart-{city_id}?pageSize={limit}&startFrom={offset}" + ) + CITY_IDS = "https://raw.githubusercontent.com/dotX12/dotX12/main/city.json" + GENRE_WORLD = ( + "https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks" + "/genre-global-chart-{genre}?pageSize={limit}&startFrom={offset}" + ) + GENRE_COUNTRY = ( + "https://www.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks" + "/genre-country-chart-{country}-{genre}?pageSize={limit}&startFrom={offset}" + ) + RELATED_SONGS = ( + "https://cdn.shazam.com/shazam/v3/{language}/{endpoint_country}/web/-/tracks" + "/track-similarities-id-{track_id}?startFrom={offset}&pageSize={limit}&connected=&channel=" + ) + SEARCH_ARTIST = ( + "https://www.shazam.com/services/search/v4/{language}/{endpoint_country}/web" + "/search?term={query}&limit={limit}&offset={offset}&types=artists" + ) + SEARCH_MUSIC = ( + "https://www.shazam.com/services/search/v3/{language}/{endpoint_country}/web" + "/search?query={query}&numResults={limit}&offset={offset}&types=songs" + ) + LISTENING_COUNTER = "https://www.shazam.com/services/count/v2/web/track/{}" + + SEARCH_ARTIST_V2 = ( + "https://www.shazam.com/services/amapi/v1/catalog/{endpoint_country}/artists/{artist_id}" + ) + + +class Request: + TIME_ZONE = "Europe/Moscow" + + def __init__(self, language: str): + self.language = language + + def headers(self): + return { + "X-Shazam-Platform": "IPHONE", + "X-Shazam-AppVersion": "14.1.0", + "Accept": "*/*", + "Accept-Language": self.language, + "Accept-Encoding": "gzip, deflate", + "User-Agent": choice(USER_AGENTS), + } diff --git a/lib/ShazamIO/shazamio/schemas/__init__.py b/lib/ShazamIO/shazamio/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/ShazamIO/shazamio/schemas/artist/__init__.py b/lib/ShazamIO/shazamio/schemas/artist/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/ShazamIO/shazamio/schemas/artist/views/__init__.py b/lib/ShazamIO/shazamio/schemas/artist/views/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/ShazamIO/shazamio/schemas/artist/views/full_albums.py b/lib/ShazamIO/shazamio/schemas/artist/views/full_albums.py new file mode 100644 index 0000000..3ee3278 --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/artist/views/full_albums.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from typing import List, Optional + +from pydantic import BaseModel, Field + +from shazamio.schemas.attributes import AttributeName +from shazamio.schemas.base import BaseDataModel +from shazamio.schemas.photos import ImageModel + + +class PlayParams(BaseModel): + id: str + kind: str + + +class EditorialArtwork(BaseModel): + subscription_hero: Optional[ImageModel] = Field(None, alias="subscriptionHero") + store_flow_case: Optional[ImageModel] = Field(None, alias="storeFlowcase") + + +class EditorialNotes(BaseModel): + standard: Optional[str] = None + short: Optional[str] = None + + +class AttributesFullAlbums(BaseModel): + copyright: str + genre_names: List[str] = Field(..., alias="genreNames") + release_date: str = Field(..., alias="releaseDate") + is_mastered_for_itunes: bool = Field(..., alias="isMasteredForItunes") + upc: str + artwork: ImageModel + play_params: PlayParams = Field(..., alias="playParams") + url: str + record_label: str = Field(..., alias="recordLabel") + track_count: int = Field(..., alias="trackCount") + is_compilation: bool = Field(..., alias="isCompilation") + is_prerelease: bool = Field(..., alias="isPrerelease") + audio_traits: List[str] = Field(..., alias="audioTraits") + editorial_artwork: EditorialArtwork = Field(..., alias="editorialArtwork") + is_single: bool = Field(..., alias="isSingle") + name: str + artist_name: str = Field(..., alias="artistName") + content_rating: Optional[str] = Field(None, alias="contentRating") + is_complete: bool = Field(..., alias="isComplete") + editorial_notes: Optional[EditorialNotes] = Field(None, alias="editorialNotes") + + +class FullAlbumsModel(BaseModel): + href: Optional[str] = None + attributes: Optional[AttributeName] = None + data: Optional[List[BaseDataModel[AttributesFullAlbums]]] = None diff --git a/lib/ShazamIO/shazamio/schemas/artist/views/last_release.py b/lib/ShazamIO/shazamio/schemas/artist/views/last_release.py new file mode 100644 index 0000000..ce9518e --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/artist/views/last_release.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from shazamio.schemas.attributes import AttributeName +from shazamio.schemas.base import BaseDataModel +from shazamio.schemas.photos import ImageModel + + +class PlayParams(BaseModel): + id: str + kind: str + + +class AttributeLastRelease(BaseModel): + copyright: str + genre_names: List[str] = Field(..., alias="genreNames") + release_date: str = Field(..., alias="releaseDate") + is_mastered_for_itunes: bool = Field(..., alias="isMasteredForItunes") + upc: str + artwork: ImageModel + play_params: PlayParams = Field(..., alias="playParams") + url: str + record_label: str = Field(..., alias="recordLabel") + track_count: int = Field(..., alias="trackCount") + is_compilation: bool = Field(..., alias="isCompilation") + is_prerelease: bool = Field(..., alias="isPrerelease") + audio_traits: List[str] = Field(..., alias="audioTraits") + editorial_artwork: Dict[str, Any] = Field(..., alias="editorialArtwork") + is_single: bool = Field(..., alias="isSingle") + name: str + artist_name: str = Field(..., alias="artistName") + content_rating: Optional[str] = Field(None, alias="contentRating") + is_complete: bool = Field(..., alias="isComplete") + + +class LastReleaseModel(BaseModel): + href: Optional[str] = None + attributes: Optional[AttributeName] = None + data: Optional[List[BaseDataModel[AttributeLastRelease]]] = None diff --git a/lib/ShazamIO/shazamio/schemas/artist/views/simular_artists.py b/lib/ShazamIO/shazamio/schemas/artist/views/simular_artists.py new file mode 100644 index 0000000..5147835 --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/artist/views/simular_artists.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from typing import List, Optional + +from pydantic import BaseModel, Field + +from shazamio.schemas.attributes import AttributeName +from shazamio.schemas.base import BaseHrefNextData +from shazamio.schemas.base import BaseIdTypeHref +from shazamio.schemas.photos import ImageModel + + +class EditorialArtwork(BaseModel): + centered_fullscreen_background: Optional[ImageModel] = Field( + None, alias="centeredFullscreenBackground" + ) + subscription_hero: Optional[ImageModel] = Field(None, alias="subscriptionHero") + banner_uber: Optional[ImageModel] = Field(None, alias="bannerUber") + + +class Attributes(BaseModel): + genre_names: List[str] = Field(..., alias="genreNames") + editorial_artwork: EditorialArtwork = Field(..., alias="editorialArtwork") + name: str + artwork: ImageModel + url: str + origin: Optional[str] = None + artist_bio: Optional[str] = Field(None, alias="artistBio") + + +class Relationships(BaseModel): + albums: BaseHrefNextData[List[BaseIdTypeHref]] + + +class Datum(BaseModel): + id: str + type: str + href: str + attributes: Attributes + relationships: Relationships + + +class SimularArtist(BaseModel): + href: Optional[str] = None + next: Optional[str] = None + attributes: Optional[AttributeName] = None + data: Optional[List[Datum]] = None diff --git a/lib/ShazamIO/shazamio/schemas/artist/views/top_music.py b/lib/ShazamIO/shazamio/schemas/artist/views/top_music.py new file mode 100644 index 0000000..8886311 --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/artist/views/top_music.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from shazamio.schemas.attributes import AttributeName +from shazamio.schemas.base import BaseDataModel +from shazamio.schemas.photos import ImageModel + + +class PlayParams(BaseModel): + id: str + kind: str + + +class Preview(BaseModel): + url: str + hls_url: str = Field(..., alias="hlsUrl") + artwork: ImageModel + + +class Attributes(BaseModel): + genre_names: List[str] = Field(..., alias="genreNames") + release_date: str = Field(..., alias="releaseDate") + duration_in_millis: int = Field(..., alias="durationInMillis") + isrc: str + artwork: ImageModel + play_params: PlayParams = Field(..., alias="playParams") + url: str + has4_k: bool = Field(..., alias="has4K") + editorial_artwork: Dict[str, Any] = Field(..., alias="editorialArtwork") + has_hdr: bool = Field(..., alias="hasHDR") + name: str + previews: List[Preview] + artist_name: str = Field(..., alias="artistName") + content_rating: Optional[str] = Field(None, alias="contentRating") + album_name: Optional[str] = Field(None, alias="albumName") + track_number: Optional[int] = Field(None, alias="trackNumber") + + +class TopMusicVideosView(BaseModel): + href: Optional[str] = None + attributes: Optional[AttributeName] = None + data: Optional[List[BaseDataModel[Attributes]]] = None diff --git a/lib/ShazamIO/shazamio/schemas/artist/views/top_song.py b/lib/ShazamIO/shazamio/schemas/artist/views/top_song.py new file mode 100644 index 0000000..8b0e135 --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/artist/views/top_song.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from typing import List, Optional + +from pydantic import BaseModel, Field + +from shazamio.schemas.artist.views.top_music import PlayParams +from shazamio.schemas.attributes import AttributeName +from shazamio.schemas.base import BaseDataModel +from shazamio.schemas.photos import ImageModel +from shazamio.schemas.urls import UrlDTO + + +class AttributesTopSong(BaseModel): + has_time_synced_lyrics: bool = Field(..., alias="hasTimeSyncedLyrics") + album_name: Optional[str] = Field(None, alias="albumName") + genre_names: List = Field(..., alias="genreNames") + track_number: int = Field(..., alias="trackNumber") + release_date: str = Field(..., alias="releaseDate") + duration_in_millis: int = Field(..., alias="durationInMillis") + is_vocal_attenuation_allowed: bool = Field(..., alias="isVocalAttenuationAllowed") + is_mastered_for_itunes: bool = Field(..., alias="isMasteredForItunes") + isrc: str + artwork: ImageModel + composer_name: str = Field(..., alias="composerName") + audio_locale: str = Field(..., alias="audioLocale") + url: str + play_params: PlayParams = Field(..., alias="playParams") + disc_number: int = Field(..., alias="discNumber") + has_lyrics: bool = Field(..., alias="hasLyrics") + is_apple_digital_master: bool = Field(..., alias="isAppleDigitalMaster") + audio_traits: List[str] = Field(..., alias="audioTraits") + name: str + previews: List[UrlDTO] = Field([]) + artist_name: str = Field(..., alias="artistName") + content_rating: Optional[str] = Field(None, alias="contentRating") + + +class TopSong(BaseModel): + id: Optional[str] = None + type: Optional[str] = None + href: Optional[str] = None + attributes: Optional[AttributeName] = None + data: Optional[List[BaseDataModel[AttributesTopSong]]] = None diff --git a/lib/ShazamIO/shazamio/schemas/artists.py b/lib/ShazamIO/shazamio/schemas/artists.py new file mode 100644 index 0000000..9105998 --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/artists.py @@ -0,0 +1,100 @@ +from dataclasses import dataclass +from dataclasses import field +from typing import List +from typing import Optional +from typing import Union + +from pydantic import BaseModel +from pydantic import Field + +from shazamio.schemas.artist.views.full_albums import FullAlbumsModel +from shazamio.schemas.artist.views.last_release import LastReleaseModel +from shazamio.schemas.artist.views.simular_artists import SimularArtist +from shazamio.schemas.artist.views.top_music import TopMusicVideosView +from shazamio.schemas.artist.views.top_song import TopSong +from shazamio.schemas.attributes import ArtistAttribute +from shazamio.schemas.enums import ArtistExtend +from shazamio.schemas.enums import ArtistView +from shazamio.schemas.errors import ErrorModel + + +@dataclass +class ArtistInfo: + name: str + verified: Optional[bool] + genres: Optional[List[str]] = field(default_factory=list) + alias: Optional[str] = None + genres_primary: Optional[str] = None + avatar: Optional[Union[dict, str]] = None + adam_id: Optional[int] = None + url: Optional[str] = "" + + def __post_init__(self): + self.avatar = self.__optional_avatar() + + def __optional_avatar(self) -> Optional[str]: + if self.avatar is None: + return None + elif "default" in self.avatar: + return self.avatar.get("default") + else: + return "".join(self.avatar) + + +@dataclass +class ArtistV2: + artist: ArtistInfo + + +@dataclass +class ArtistQuery: + views: List[ArtistView] = field(default_factory=list) + extend: List[ArtistExtend] = field(default_factory=list) + + +@dataclass +class ArtistAvatar: + width: int + height: int + url: str + + @classmethod + def url_with_size(cls, height: int, width: int) -> str: + return cls.url.format(w=width, h=height) + + +class AlbumRelationshipElement(BaseModel): + id: str + type: str + href: str + + +class AlbumRelationship(BaseModel): + href: str + next: Optional[str] = None + data: List[AlbumRelationshipElement] + + +class ArtistRelationships(BaseModel): + albums: AlbumRelationship + + +class ArtistViews(BaseModel): + top_music_videos: Optional[TopMusicVideosView] = Field(None, alias="top-music-videos") + simular_artists: Optional[SimularArtist] = Field(None, alias="similar-artists") + latest_release: Optional[LastReleaseModel] = Field(None, alias="latest-release") + full_albums: Optional[FullAlbumsModel] = Field(None, alias="full-albums") + top_songs: Optional[TopSong] = Field(None, alias="top-songs") + + +class ArtistV3(BaseModel): + id: str + type: str + attributes: ArtistAttribute + relationships: ArtistRelationships + views: ArtistViews + + +class ArtistResponse(BaseModel): + errors: List[ErrorModel] = [] + data: List[ArtistV3] = [] diff --git a/lib/ShazamIO/shazamio/schemas/attributes.py b/lib/ShazamIO/shazamio/schemas/attributes.py new file mode 100644 index 0000000..c278cc6 --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/attributes.py @@ -0,0 +1,16 @@ +from typing import List +from typing import Optional + +from pydantic import BaseModel +from pydantic import Field + + +class AttributeName(BaseModel): + title: str + + +class ArtistAttribute(BaseModel): + genre_names: List[str] = Field([], alias="genreNames") + name: str + url: str + artist_bio: Optional[str] = Field(None, alias="artistBio") diff --git a/lib/ShazamIO/shazamio/schemas/base.py b/lib/ShazamIO/shazamio/schemas/base.py new file mode 100644 index 0000000..7347a2b --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/base.py @@ -0,0 +1,25 @@ +from typing import Generic +from typing import Optional +from typing import TypeVar + +from pydantic import BaseModel +from pydantic.generics import GenericModel + + +T = TypeVar("T", bound=BaseModel) + + +class BaseIdTypeHref(BaseModel): + id: str + type: str + href: str + + +class BaseDataModel(GenericModel, BaseModel, Generic[T]): + attributes: T + + +class BaseHrefNextData(GenericModel, Generic[T]): + href: str + next: Optional[str] = None + data: T diff --git a/lib/ShazamIO/shazamio/schemas/enums.py b/lib/ShazamIO/shazamio/schemas/enums.py new file mode 100644 index 0000000..e73019e --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/enums.py @@ -0,0 +1,18 @@ +from enum import Enum + + +class ArtistExtend(str, Enum): + ARTIST_BIO = "artistBio" + BORN_OF_FORMED = "bornOrFormed" + EDITORIAL_ARTWORK = "editorialArtwork" + ORIGIN = "origin" + + +class ArtistView(str, Enum): + FULL_ALBUMS = "full-albums" + FEATURED_ALBUMS = "featured-albums" + LATEST_RELEASE = "latest-release" + TOP_MUSIC_VIDEOS = "top-music-videos" + SIMULAR_ARTISTS = "similar-artists" + TOP_SONGS = "top-songs" + PLAYLISTS = "playlists" diff --git a/lib/ShazamIO/shazamio/schemas/errors.py b/lib/ShazamIO/shazamio/schemas/errors.py new file mode 100644 index 0000000..9349add --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/errors.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + + +@dataclass +class ErrorModel: + id: str + title: str + detail: str + status: str + code: str diff --git a/lib/ShazamIO/shazamio/schemas/models.py b/lib/ShazamIO/shazamio/schemas/models.py new file mode 100644 index 0000000..af8745b --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/models.py @@ -0,0 +1,201 @@ +from dataclasses import dataclass +from dataclasses import field +from typing import List +from typing import Optional +from typing import Union +from urllib.parse import urlencode +from urllib.parse import urlparse +from urllib.parse import urlunparse +from uuid import UUID + + +@dataclass +class ShareModel: + subject: str + text: str + href: str + image: str + twitter: str + html: str + snapchat: str + + +@dataclass +class ActionModel: + name: str + type: str + share: ShareModel + uri: str + + +@dataclass +class SongMetaPages: + image: str + caption: str + + +@dataclass +class SongMetadata: + title: str + text: str + + +@dataclass +class SongSection: + type: str + meta_pages: List[SongMetaPages] + tab_name: str + metadata: List[SongMetadata] + + +@dataclass +class BaseIdTypeModel: + type: str + id: str + + +@dataclass +class TopTracksModel: + url: str + + +@dataclass +class ArtistSection: + type: str + id: str + name: str + verified: bool + actions: List[BaseIdTypeModel] + tab_name: str + top_tracks: TopTracksModel + + +class BeaconDataLyricsSection: + lyrics_id: str + provider_name: str + common_track_id: str + + +@dataclass +class LyricsSection: + type: str + text: List[str] + footer: str + tab_name: str + beacon_data: Optional[BeaconDataLyricsSection] + + +@dataclass +class VideoSection: + tab_name: str + youtube_url: str + type: str = "VIDEO" + + +@dataclass +class RelatedSection: + type: str + url: str + tab_name: str + + +@dataclass +class DimensionsModel: + width: int + height: int + + +@dataclass +class YoutubeImageModel: + dimensions: DimensionsModel + url: str + + +@dataclass +class MatchModel: + id: str + offset: float + time_skew: float + frequency_skew: float + channel: Optional[str] = field(default=None) + + +@dataclass +class LocationModel: + accuracy: float + + +@dataclass +class YoutubeData: + caption: str + image: YoutubeImageModel + actions: List[ActionModel] + uri: Optional[str] = None + + def __post_init__(self): + self.uri = self.__get_youtube_uri() + + def __get_youtube_uri(self): + if self.actions: + for action in self.actions: + if action.uri: + return action.uri + + +@dataclass +class TrackInfo: + key: int + title: str + subtitle: str + artist_id: Optional[str] = field(default=None) + shazam_url: str = None + photo_url: Optional[str] = field(init=False, default=None) + spotify_uri_query: Optional[str] = None + apple_music_url: Optional[str] = None + ringtone: Optional[str] = None + spotify_url: Optional[str] = field(default=None) + spotify_uri: Optional[str] = field(default=None) + youtube_link: Optional[str] = None + sections: Optional[ + List[ + Union[ + SongSection, + VideoSection, + RelatedSection, + ArtistSection, + LyricsSection, + ] + ] + ] = field(default_factory=list) + + def __post_init__(self): + self.shazam_url = f"https://www.shazam.com/track/{self.artist_id}" + self.apple_music_url = self.__apple_music_url() + self.spotify_uri_query = self.__short_uri() + self.youtube_link = self.__youtube_link() + + def __apple_music_url(self): + url_parse_list = list(urlparse(self.apple_music_url)) + url_parse_list[4] = urlencode({}, doseq=True) + url_deleted_query = urlunparse(url_parse_list) + return url_deleted_query + + def __short_uri(self): + if self.spotify_uri: + return self.spotify_uri.split("spotify:search:")[1] + + def __youtube_link(self): + for i in self.sections: + if type(i) is VideoSection: + return i.youtube_url + + +@dataclass +class ResponseTrack: + tag_id: Optional[UUID] + retry_ms: Optional[int] = field(default=None) + location: Optional[LocationModel] = field(default=None) + matches: List[MatchModel] = field(default_factory=list) + timestamp: Optional[int] = field(default=None) + timezone: Optional[str] = field(default=None) + track: Optional[TrackInfo] = field(default=None) diff --git a/lib/ShazamIO/shazamio/schemas/photos.py b/lib/ShazamIO/shazamio/schemas/photos.py new file mode 100644 index 0000000..94af3cd --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/photos.py @@ -0,0 +1,16 @@ +from typing import Optional + +from pydantic import BaseModel +from pydantic import Field + + +class ImageModel(BaseModel): + width: int + url: str + height: int + text_color3: Optional[str] = Field(None, alias="textColor3") + text_color2: Optional[str] = Field(None, alias="textColor2") + text_color4: Optional[str] = Field(None, alias="textColor4") + text_color1: Optional[str] = Field(None, alias="textColor1") + bg_color: Optional[str] = Field(None, alias="bgColor") + has_p3: bool = Field(..., alias="hasP3") diff --git a/lib/ShazamIO/shazamio/schemas/urls.py b/lib/ShazamIO/shazamio/schemas/urls.py new file mode 100644 index 0000000..ac22508 --- /dev/null +++ b/lib/ShazamIO/shazamio/schemas/urls.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class UrlDTO(BaseModel): + url: str diff --git a/lib/ShazamIO/shazamio/serializers.py b/lib/ShazamIO/shazamio/serializers.py new file mode 100644 index 0000000..094bcd9 --- /dev/null +++ b/lib/ShazamIO/shazamio/serializers.py @@ -0,0 +1,32 @@ +from typing import Union + +from shazamio.factory_misc import FACTORY_ARTIST +from shazamio.factory_misc import FACTORY_TRACK +from shazamio.schemas.artists import ArtistInfo +from shazamio.schemas.artists import ArtistResponse +from shazamio.schemas.artists import ArtistV2 +from shazamio.schemas.models import ResponseTrack +from shazamio.schemas.models import TrackInfo +from shazamio.schemas.models import YoutubeData + + +class Serialize: + @classmethod + def track(cls, data): + return FACTORY_TRACK.load(data, TrackInfo) + + @classmethod + def youtube(cls, data): + return FACTORY_TRACK.load(data, YoutubeData) + + @classmethod + def artist_v2(cls, data) -> ArtistResponse: + return ArtistResponse.parse_obj(data) + + @classmethod + def artist(cls, data): + return FACTORY_ARTIST.load(data, Union[ArtistV2, ArtistInfo]) + + @classmethod + def full_track(cls, data): + return FACTORY_TRACK.load(data, ResponseTrack) diff --git a/lib/ShazamIO/shazamio/signature.py b/lib/ShazamIO/shazamio/signature.py new file mode 100644 index 0000000..225e80d --- /dev/null +++ b/lib/ShazamIO/shazamio/signature.py @@ -0,0 +1,263 @@ +from typing import Dict, List +from base64 import b64decode, b64encode +from math import exp, sqrt +from binascii import crc32 +from io import BytesIO +from ctypes import * +from .enums import FrequencyBand, SampleRate + +DATA_URI_PREFIX = "data:audio/vnd.shazam.sig;base64," + + +class RawSignatureHeader(LittleEndianStructure): + _pack = True + + _fields_ = [ + ("magic1", c_uint32), # Fixed 0xcafe2580 - 80 25 fe ca + ( + "crc32", + c_uint32, + ), # CRC-32 for all following (so excluding these first 8 bytes) + ("size_minus_header", c_uint32), + # Total size of the message, minus the size of the current header (which is 48 bytes) + ("magic2", c_uint32), # Fixed 0x94119c00 - 00 9c 11 94 + ("void1", c_uint32 * 3), # Void + ("shifted_sample_rate_id", c_uint32), + # A member of SampleRate (usually 3 for 16000 Hz), left-shifted by 27 (usually giving + # 0x18000000 - 00 00 00 18) + ("void2", c_uint32 * 2), # Void, or maybe used only in "rolling window" mode? + ("number_samples_plus_divided_sample_rate", c_uint32), + # int(number_of_samples + sample_rate * 0.24) - As the sample rate is known thanks to the + # field above, + # it can be inferred and subtracted so that we obtain the number of samples, + # and from the number of samples and sample rate we can obtain the length of the recording + ("fixed_value", c_uint32) + # Calculated as ((15 << 19) + 0x40000) - 0x7c0000 or 00 00 7c 00 - seems pretty constant, + # may be different in the "SigType.STREAMING" mode + ] + + +class FrequencyPeak: + fft_pass_number: int = None + peak_magnitude: int = None + corrected_peak_frequency_bin: int = None + sample_rate_hz: int = None + + def __init__( + self, + fft_pass_number: int, + peak_magnitude: int, + corrected_peak_frequency_bin: int, + sample_rate_hz: int, + ): + self.fft_pass_number = fft_pass_number + self.peak_magnitude = peak_magnitude + self.corrected_peak_frequency_bin = corrected_peak_frequency_bin + self.sample_rate_hz = sample_rate_hz + + def get_frequency_hz(self) -> float: + return self.corrected_peak_frequency_bin * (self.sample_rate_hz / 2 / 1024 / 64) + + # ^ Convert back FFT bin to a frequency, given a 16 KHz sample + # rate, 1024 useful bins and the multiplication by 64 made before + # storing the information + + def get_amplitude_pcm(self) -> float: + return sqrt(exp((self.peak_magnitude - 6144) / 1477.3) * (1 << 17) / 2) / 1024 + + # ^ Not sure about this calculation but gives small enough numbers + + def get_seconds(self) -> float: + return (self.fft_pass_number * 128) / self.sample_rate_hz + + # ^ Assume that new FFT bins are emitted every 128 samples, on a + # standard 16 KHz sample rate basis. + + +class DecodedMessage: + sample_rate_hz: int = None + number_samples: int = None + + frequency_band_to_sound_peaks: Dict[FrequencyBand, List[FrequencyPeak]] = None + + @classmethod + def decode_from_binary(cls, data: bytes): + self = cls() + + buf = BytesIO(data) + + buf.seek(8) + check_summable_data = buf.read() + buf.seek(0) + + # Read and check the header + + header = RawSignatureHeader() + buf.readinto(header) + + assert header.magic1 == 0xCAFE2580 + assert header.size_minus_header == len(data) - 48 + assert crc32(check_summable_data) & 0xFFFFFFFF == header.crc32 + assert header.magic2 == 0x94119C00 + + self.sample_rate_hz = int(SampleRate(header.shifted_sample_rate_id >> 27).name.strip("_")) + + self.number_samples = int( + header.number_samples_plus_divided_sample_rate - self.sample_rate_hz * 0.24 + ) + + # Read the type-length-value sequence that follows the header + + # The first chunk is fixed and has no value, but instead just repeats + # the length of the message size minus the header: + assert int.from_bytes(buf.read(4), "little") == 0x40000000 + assert int.from_bytes(buf.read(4), "little") == len(data) - 48 + + # Then, lists of frequency peaks for respective bands follow + + self.frequency_band_to_sound_peaks = {} + + while True: + tlv_header = buf.read(8) + if not tlv_header: + break + + frequency_band_id = int.from_bytes(tlv_header[:4], "little") + frequency_peaks_size = int.from_bytes(tlv_header[4:], "little") + + frequency_peaks_padding = -frequency_peaks_size % 4 + + frequency_peaks_buf = BytesIO(buf.read(frequency_peaks_size)) + buf.read(frequency_peaks_padding) + + # Decode frequency peaks + + frequency_band = FrequencyBand(frequency_band_id - 0x60030040) + + fft_pass_number = 0 + + self.frequency_band_to_sound_peaks[frequency_band] = [] + + while True: + raw_fft_pass: bytes = frequency_peaks_buf.read(1) + if not raw_fft_pass: + break + + fft_pass_offset: int = raw_fft_pass[0] + if fft_pass_offset == 0xFF: + fft_pass_number = int.from_bytes(frequency_peaks_buf.read(4), "little") + continue + else: + fft_pass_number += fft_pass_offset + + peak_magnitude = int.from_bytes(frequency_peaks_buf.read(2), "little") + corrected_peak_frequency_bin = int.from_bytes(frequency_peaks_buf.read(2), "little") + + self.frequency_band_to_sound_peaks[frequency_band].append( + FrequencyPeak( + fft_pass_number, + peak_magnitude, + corrected_peak_frequency_bin, + self.sample_rate_hz, + ) + ) + + return self + + @classmethod + def decode_from_uri(cls, uri: str): + assert uri.startswith(DATA_URI_PREFIX) + + return cls.decode_from_binary(b64decode(uri.replace(DATA_URI_PREFIX, "", 1))) + + """ + Encode the current object to a readable JSON format, for debugging + purposes. + """ + + def encode_to_json(self) -> dict: + return { + "sample_rate_hz": self.sample_rate_hz, + "number_samples": self.number_samples, + "_seconds": self.number_samples / self.sample_rate_hz, + "frequency_band_to_peaks": { + frequency_band.name.strip("_"): [ + { + "fft_pass_number": frequency_peak.fft_pass_number, + "peak_magnitude": frequency_peak.peak_magnitude, + "corrected_peak_frequency_bin": frequency_peak.corrected_peak_frequency_bin, + "_frequency_hz": frequency_peak.get_frequency_hz(), + "_amplitude_pcm": frequency_peak.get_amplitude_pcm(), + "_seconds": frequency_peak.get_seconds(), + } + for frequency_peak in frequency_peaks + ] + for frequency_band, frequency_peaks in sorted( + self.frequency_band_to_sound_peaks.items() + ) + }, + } + + def encode_to_binary(self) -> bytes: + header = RawSignatureHeader() + + header.magic1 = 0xCAFE2580 + header.magic2 = 0x94119C00 + header.shifted_sample_rate_id = int(getattr(SampleRate, "_%s" % self.sample_rate_hz)) << 27 + header.fixed_value = (15 << 19) + 0x40000 + header.number_samples_plus_divided_sample_rate = int( + self.number_samples + self.sample_rate_hz * 0.24 + ) + + contents_buf = BytesIO() + + for frequency_band, frequency_peaks in sorted(self.frequency_band_to_sound_peaks.items()): + peaks_buf = BytesIO() + + fft_pass_number = 0 + + # NOTE: Correctly filtering and sorting the peaks within the members + # of "self.frequency_band_to_sound_peaks" is the responsibility of the + # caller + + for frequency_peak in frequency_peaks: + assert frequency_peak.fft_pass_number >= fft_pass_number + + if frequency_peak.fft_pass_number - fft_pass_number >= 255: + peaks_buf.write(b"\xff") + peaks_buf.write(frequency_peak.fft_pass_number.to_bytes(4, "little")) + + fft_pass_number = frequency_peak.fft_pass_number + + peaks_buf.write(bytes([frequency_peak.fft_pass_number - fft_pass_number])) + peaks_buf.write(frequency_peak.peak_magnitude.to_bytes(2, "little")) + peaks_buf.write(frequency_peak.corrected_peak_frequency_bin.to_bytes(2, "little")) + + fft_pass_number = frequency_peak.fft_pass_number + + contents_buf.write((0x60030040 + int(frequency_band)).to_bytes(4, "little")) + contents_buf.write(len(peaks_buf.getvalue()).to_bytes(4, "little")) + contents_buf.write(peaks_buf.getvalue()) + contents_buf.write(b"\x00" * (-len(peaks_buf.getvalue()) % 4)) + + # Below, write the full message as a binary stream + + header.size_minus_header = len(contents_buf.getvalue()) + 8 + + buf = BytesIO() + buf.write(header) # We will rewrite it just after in order to include the final CRC-32 + + buf.write((0x40000000).to_bytes(4, "little")) + buf.write((len(contents_buf.getvalue()) + 8).to_bytes(4, "little")) + + buf.write(contents_buf.getvalue()) + + buf.seek(8) + header.crc32 = crc32(buf.read()) & 0xFFFFFFFF + buf.seek(0) + buf.write(header) + + return buf.getvalue() + + def encode_to_uri(self) -> str: + return DATA_URI_PREFIX + b64encode(self.encode_to_binary()).decode("ascii") diff --git a/lib/ShazamIO/shazamio/typehints.py b/lib/ShazamIO/shazamio/typehints.py new file mode 100644 index 0000000..90cc5bc --- /dev/null +++ b/lib/ShazamIO/shazamio/typehints.py @@ -0,0 +1,10 @@ +class CountryCode: + """ISO 3166-3 alpha-2 code. Example: RU,NL,UA""" + + pass + + +class ShazamResponse: + """Dictionary with found data on request""" + + pass diff --git a/lib/ShazamIO/shazamio/user_agent.py b/lib/ShazamIO/shazamio/user_agent.py new file mode 100644 index 0000000..850ed66 --- /dev/null +++ b/lib/ShazamIO/shazamio/user_agent.py @@ -0,0 +1,102 @@ +USER_AGENTS = [ + "Dalvik/2.1.0 (Linux; U; Android 5.0.2; VS980 4G Build/LRX22G)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-T210 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-P905V Build/LMY47X)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; Vodafone Smart Tab 4G Build/KTU84P)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-G360H Build/KTU84P)", + "Dalvik/2.1.0 (Linux; U; Android 5.0.2; SM-S920L Build/LRX22G)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; Fire Pro Build/LRX21M)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N9005 Build/LRX21V)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G920F Build/MMB29K)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-G7102 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G928F Build/MMB29K)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J500FN Build/LMY48B)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; Coolpad 3320A Build/LMY47V)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-J110F Build/KTU84P)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SAMSUNG-SGH-I747 Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SAMSUNG-SM-T337A Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.3; SGH-T999 Build/JSS15J)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; D6603 Build/23.5.A.0.570)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J700H Build/LMY48B)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; HTC6600LVW Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910G Build/LMY47X)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910T Build/LMY47X)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; C6903 Build/14.4.A.0.157)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G920F Build/MMB29K)", + "Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-I9105P Build/JDQ39)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9192 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G531H Build/LMY48B)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N9005 Build/LRX21V)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; LGMS345 Build/LMY47V)", + "Dalvik/2.1.0 (Linux; U; Android 5.0.2; HTC One Build/LRX22G)", + "Dalvik/2.1.0 (Linux; U; Android 5.0.2; LG-D800 Build/LRX22G)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G531H Build/LMY48B)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N9005 Build/LRX21V)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-T113 Build/KTU84P)", + "Dalvik/1.6.0 (Linux; U; Android 4.2.2; AndyWin Build/JDQ39E)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; Lenovo A7000-a Build/LRX21M)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; LGL16C Build/KOT49I.L16CV11a)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9500 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 5.0.2; SM-A700FD Build/LRX22G)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-G130HN Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-N9005 Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.1.2; LG-E975T Build/JZO54K)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; E1 Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9500 Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-N5100 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-A310F Build/LMY47X)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J105H Build/LMY47V)", + "Dalvik/1.6.0 (Linux; U; Android 4.3; GT-I9305T Build/JSS15J)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; android Build/JDQ39)", + "Dalvik/1.6.0 (Linux; U; Android 4.2.1; HS-U970 Build/JOP40D)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-T561 Build/KTU84P)", + "Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-P3110 Build/JDQ39)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G925T Build/MMB29K)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; HUAWEI Y221-U22 Build/HUAWEIY221-U22)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G530T1 Build/LMY47X)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G920I Build/LMY47X)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; Vodafone Smart ultra 6 Build/LMY47V)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; XT1080 Build/SU6-7.7)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; ASUS MeMO Pad 7 Build/KTU84P)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-G800F Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-N7100 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G925I Build/MMB29K)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; A0001 Build/MMB29X)", + "Dalvik/2.1.0 (Linux; U; Android 5.1; XT1045 Build/LPB23.13-61)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; LGMS330 Build/LMY47V)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; Z970 Build/KTU84P)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-N900P Build/LRX21V)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; T1-701u Build/HuaweiMediaPad)", + "Dalvik/2.1.0 (Linux; U; Android 5.1; HTCD100LVWPP Build/LMY47O)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G935R4 Build/MMB29M)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G930V Build/MMB29M)", + "Dalvik/2.1.0 (Linux; U; Android 5.0.2; ZTE Blade Q Lux Build/LRX22G)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; GT-I9060I Build/KTU84P)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; LGUS992 Build/MMB29M)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G900P Build/MMB29M)", + "Dalvik/1.6.0 (Linux; U; Android 4.1.2; SGH-T999L Build/JZO54K)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910V Build/LMY47X)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9500 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-P601 Build/LMY47X)", + "Dalvik/1.6.0 (Linux; U; Android 4.2.2; GT-S7272 Build/JDQ39)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-N910T Build/LMY47X)", + "Dalvik/1.6.0 (Linux; U; Android 4.3; SAMSUNG-SGH-I747 Build/JSS15J)", + "Dalvik/2.1.0 (Linux; U; Android 5.0.2; ZTE Blade Q Lux Build/LRX22G)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-G930F Build/MMB29K)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; HTC_PO582 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 6.0; HUAWEI MT7-TL10 Build/HuaweiMT7-TL10)", + "Dalvik/2.1.0 (Linux; U; Android 6.0; LG-H811 Build/MRA58K)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; SM-N7505 Build/KOT49H)", + "Dalvik/2.1.0 (Linux; U; Android 6.0; LG-H815 Build/MRA58K)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.2; LenovoA3300-HV Build/KOT49H)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; SM-G360G Build/KTU84P)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; GT-I9300I Build/KTU84P)", + "Dalvik/2.1.0 (Linux; U; Android 5.0; SM-G900F Build/LRX21T)", + "Dalvik/2.1.0 (Linux; U; Android 6.0.1; SM-J700T Build/MMB29K)", + "Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-J500FN Build/LMY48B)", + "Dalvik/1.6.0 (Linux; U; Android 4.2.2; SM-T217S Build/JDQ39)", + "Dalvik/1.6.0 (Linux; U; Android 4.4.4; SAMSUNG-SM-N900A Build/KTU84P)", +] diff --git a/lib/ShazamIO/shazamio/utils.py b/lib/ShazamIO/shazamio/utils.py new file mode 100644 index 0000000..bfc5c79 --- /dev/null +++ b/lib/ShazamIO/shazamio/utils.py @@ -0,0 +1,70 @@ +import pathlib +from enum import Enum +from io import BytesIO +from typing import Dict +from typing import List +from typing import Optional +from typing import Union + +import aiofiles +import aiohttp +from aiohttp import ContentTypeError +from pydub import AudioSegment + +from shazamio.exceptions import FailedDecodeJson +from shazamio.schemas.artists import ArtistQuery + +SongT = Union[str, pathlib.Path, bytes, bytearray] +FileT = Union[str, pathlib.Path] + + +async def validate_json( + resp: aiohttp.ClientResponse, content_type: str = "application/json" +) -> dict: + try: + return await resp.json(content_type=content_type) + except ContentTypeError as e: + bad_url = str(str(e).split(",")[2]).split("'")[1] + raise FailedDecodeJson(f"Check args, URL is invalid\nURL- {bad_url}") + + +async def get_file_bytes(file: FileT) -> bytes: + async with aiofiles.open(file, mode="rb") as f: + return await f.read() + + +async def get_song(data: SongT) -> Union[AudioSegment]: + if isinstance(data, (str, pathlib.Path)): + song_bytes = await get_file_bytes(file=data) + return AudioSegment.from_file(BytesIO(song_bytes)) + + if isinstance(data, (bytes, bytearray)): + return AudioSegment.from_file(BytesIO(data)) + + if isinstance(data, AudioSegment): + return data + + +class QueryBuilder: + def __init__( + self, + source: List[Union[str, Enum]], + ): + self.source = source + + def to_str(self) -> str: + return ",".join(self.source) + + +class ArtistQueryGenerator: + def __init__( + self, + source: Optional[ArtistQuery] = None, + ): + self.source = source + + def params(self) -> Dict[str, str]: + return { + "extend": QueryBuilder(source=self.source.extend or []).to_str(), + "views": QueryBuilder(source=self.source.views or []).to_str(), + }