Skip to content
This repository was archived by the owner on Sep 25, 2024. It is now read-only.

Account For Various Scenarios Spotify May Throw | Resubmission #7

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ build-backend = "flit_core.buildapi"

[project.scripts]
spotify-aac-downloader = "spotify_aac_downloader.cli:main"

[flake8]
max-line-length = 180
2 changes: 1 addition & 1 deletion spotify_aac_downloader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def main(
logger.debug("Setting up CDM")
downloader.setup_cdm()
logger.debug("Setting up session")
downloader.setup_session()
downloader.initialize_sessions()
if premium_quality and downloader.is_premium == "false":
logger.critical("Cannot download in premium quality with a free account")
return
Expand Down
95 changes: 80 additions & 15 deletions spotify_aac_downloader/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

import datetime
import functools
import glob
import re
import shutil
import subprocess
import time
from http.cookiejar import MozillaCookieJar
from pathlib import Path

import base62
import requests
from mutagen.mp4 import MP4, MP4Cover, MP4FreeForm
from pywidevine import PSSH, Cdm, Device
from pywidevine import PSSH, Cdm, Device, InvalidLicenseMessage
from yt_dlp import YoutubeDL

from .constants import *


class Downloader:
def __init__(
self,
Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(
self.truncate = None if truncate < 4 else truncate
self.audio_quality = "MP4_256" if premium_quality else "MP4_128"

def setup_session(self) -> None:
def initialize_sessions(self):
cookies = MozillaCookieJar(self.cookies_location)
cookies.load(ignore_discard=True, ignore_expires=True)
self.session = requests.Session()
Expand Down Expand Up @@ -87,6 +88,25 @@ def setup_session(self) -> None:
"authorization": f"Bearer {token}",
}
)
# Create unauthorized basic session to help in some scenarios
self.basic_session = requests.Session()
self.basic_session.headers.update(
{
"app-platform": "WebPlayer",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"accept-language": "en-CA,en-US;q=0.7,en;q=0.3",
"accept-encoding": "gzip, deflate, br",
"dnt": "1",
"connection": "keep-alive",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "cross-site",
"sec-gpc": "1",
"pragma": "no-cache",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0",
"Upgrade-Insecure-Requests": "1",
}
)

def setup_cdm(self) -> None:
self.cdm = Cdm.from_device(Device.load(self.wvd_location))
Expand Down Expand Up @@ -148,24 +168,46 @@ def get_file_id(self, metadata: dict) -> str:
if metadata.get("alternative") is not None:
audio_files = metadata["alternative"][0]["file"]
else:
print("Spotify does not have audio files for this song. This is likely either a region issue, or the song was removed from Spotify")
return None
return next(
i["file_id"] for i in audio_files if i["format"] == self.audio_quality
)

def get_pssh(self, file_id: str) -> str:
return requests.get(
f"https://seektables.scdn.co/seektable/{file_id}.json"
url=f"https://seektables.scdn.co/seektable/{file_id}.json",
timeout=5
).json()["pssh"]

def get_decryption_key(self, pssh: str) -> str:
pssh = PSSH(pssh)
challenge = self.cdm.get_license_challenge(self.cdm_session, pssh)
license = self.session.post(
"https://gue1-spclient.spotify.com/widevine-license/v1/audio/license",
challenge,
).content
self.cdm.parse_license(self.cdm_session, license)
decrypt_attempts = 0
while decrypt_attempts < 3:
if (decrypt_attempts < 2):
seconds_to_sleep = 60 * decrypt_attempts
if seconds_to_sleep > 0:
print("Rate limit possibly hit, waiting {} seconds to retry", seconds_to_sleep)
else:
# Artificially rate limit it to slow it down and avoid frequently hitting the rate limit
seconds_to_sleep = 1
time.sleep(seconds_to_sleep)
else:
# Re-initialize the download sessions as they must have expired
initialize_sessions(self)

challenge = self.cdm.get_license_challenge(self.cdm_session, pssh)
license = self.session.post(
'https://gue1-spclient.spotify.com/widevine-license/v1/audio/license',
challenge,
).content
try:
self.cdm.parse_license(self.cdm_session, license)
# Success, we can continue
break
except InvalidLicenseMessage as license_exception:
print(license_exception)
decrypt_attempts += 1
return next(
i for i in self.cdm.get_keys(self.cdm_session) if i.type == "CONTENT"
).key.hex()
Expand All @@ -176,6 +218,11 @@ def get_stream_url(self, file_id: str) -> str:
+ f"{file_id}?version=10000000&product=9&platform=39&alt=json",
).json()["cdnurl"][0]

def get_artists(self, artist_list: list[dict]) -> str:
if len(artist_list) == 1:
return artist_list[0]["name"]
return ";".join(i["name"] for i in artist_list)

def get_artist(self, artist_list: list[dict]) -> str:
if len(artist_list) == 1:
return artist_list[0]["name"]
Expand Down Expand Up @@ -232,18 +279,36 @@ def get_cover_url(self, metadata: dict) -> str:

def get_tags(self, metadata: dict, lyrics_unsynced: str) -> dict:
album = self.get_album(self.gid_to_uri(metadata["album"]["gid"]))
copyright = ""
try:
copyright = next(
(
i["text"]
for i in album["copyrights"]
if i["type"] == "P" or i["type"] == "C" or i["type"] == "T"
), None
)
except StopIteration:
# There was no copyright value found that was equivalent
print(
"No copyright value found. Full collection checked was: {}",
album["copyrights"],
)
pass
isrc = None
if metadata.get("external_id"):
isrc = next((i for i in metadata["external_id"] if i["type"] == "isrc"), None)
tags = {
"album": metadata["album"]["name"],
"album_artist": self.get_artist(metadata["album"]["artist"]),
# All artists, `;` separated
"artists": self.get_artists(metadata["artist"]),
# All artists, "display"-style
"artist": self.get_artist(metadata["artist"]),
"album_artist": self.get_artist(metadata["album"]["artist"]),

"album": metadata["album"]["name"],
"comment": f'https://open.spotify.com/track/{metadata["canonical_uri"].split(":")[-1]}',
"compilation": True if album["album_type"] == "compilation" else False,
"copyright": next(
(i["text"] for i in album["copyrights"] if i["type"] == "P"), None
),
"copyright": copyright,
"disc": metadata["disc_number"],
"disc_total": album["tracks"]["items"][-1]["disc_number"],
"isrc": isrc.get("id") if isrc is not None else None,
Expand Down