From 42788c6b544184c59483b83527b2b7d982e594e3 Mon Sep 17 00:00:00 2001 From: Mack Ward Date: Wed, 29 Nov 2023 21:12:23 -0800 Subject: [PATCH] [skip ci] Split source code into separate repo --- .envrc | 5 - .flake8 | 3 - .github/workflows/main.yml | 19 +- .gitignore | 113 ----- .gitmodules | 3 - .isort.cfg | 3 - .pyre_configuration | 5 - .watchmanconfig | 1 - CONTRIBUTING.md | 77 +--- README.md | 8 +- TODO.md | 33 -- requirements/requirements-dev.in | 6 - requirements/requirements-dev.txt | 99 ---- requirements/requirements.in | 2 - requirements/requirements.txt | 30 -- src/alias.py | 27 -- src/environment.py | 40 -- src/file_formatter.py | 289 ------------ src/file_manager.py | 143 ------ src/file_updater.py | 342 -------------- src/git_utils.py | 29 -- src/main.py | 56 --- src/plants | 1 - src/playlist_id.py | 12 - src/playlist_types.py | 385 ---------------- src/spotify.py | 371 --------------- src/tests/test_alias.py | 25 - src/tests/test_file_formatter.py | 37 -- src/tests/test_file_updater.py | 438 ------------------ src/tests/test_playlist_id.py | 22 - src/tests/test_playlist_types.py | 540 ---------------------- src/tests/test_spotify.py | 730 ------------------------------ src/tests/test_url.py | 30 -- src/url.py | 28 -- 34 files changed, 22 insertions(+), 3930 deletions(-) delete mode 100644 .envrc delete mode 100644 .flake8 delete mode 100644 .gitignore delete mode 100644 .gitmodules delete mode 100644 .isort.cfg delete mode 100644 .pyre_configuration delete mode 100644 .watchmanconfig delete mode 100644 TODO.md delete mode 100644 requirements/requirements-dev.in delete mode 100644 requirements/requirements-dev.txt delete mode 100644 requirements/requirements.in delete mode 100644 requirements/requirements.txt delete mode 100644 src/alias.py delete mode 100644 src/environment.py delete mode 100644 src/file_formatter.py delete mode 100644 src/file_manager.py delete mode 100644 src/file_updater.py delete mode 100644 src/git_utils.py delete mode 100644 src/main.py delete mode 160000 src/plants delete mode 100644 src/playlist_id.py delete mode 100644 src/playlist_types.py delete mode 100644 src/spotify.py delete mode 100644 src/tests/test_alias.py delete mode 100644 src/tests/test_file_formatter.py delete mode 100644 src/tests/test_file_updater.py delete mode 100644 src/tests/test_playlist_id.py delete mode 100644 src/tests/test_playlist_types.py delete mode 100644 src/tests/test_spotify.py delete mode 100644 src/tests/test_url.py delete mode 100644 src/url.py diff --git a/.envrc b/.envrc deleted file mode 100644 index 0ededbfea29bf..0000000000000 --- a/.envrc +++ /dev/null @@ -1,5 +0,0 @@ -export VIRTUAL_ENV=venv -layout python python3.10 - -export SPOTIFY_CLIENT_ID="$MY__SPOTIFY_PLAYLIST_ARCHIVE__CLIENT_ID" -export SPOTIFY_CLIENT_SECRET="$MY__SPOTIFY_PLAYLIST_ARCHIVE__CLIENT_SECRET" diff --git a/.flake8 b/.flake8 deleted file mode 100644 index 31d572693e76c..0000000000000 --- a/.flake8 +++ /dev/null @@ -1,3 +0,0 @@ -[flake8] -max-complexity = 15 -max-line-length = 88 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 76ee728c8b9fb..8ca675352ca78 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,25 +11,38 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - name: Checkout mackorone/spotify-playlist-archive-src + uses: actions/checkout@v4 with: + repository: mackorone/spotify-playlist-archive-src + path: spotify-playlist-archive-src submodules: recursive + + - name: Checkout mackorone/spotify-playlist-archive + uses: actions/checkout@v4 + with: + repository: mackorone/spotify-playlist-archive + path: spotify-playlist-archive # Also fetch the second-to-last commit so the script can determine, # via git commands, which files changed in the most recent commit fetch-depth: 2 - name: Set up Python - uses: actions/setup-python@v3 + uses: actions/setup-python@v4 with: python-version: "3.10" - name: Install dependencies + working-directory: spotify-playlist-archive-src run: | python -m pip install --upgrade pip pip install -r requirements/requirements.txt - name: Run script + working-directory: spotify-playlist-archive env: SPOTIFY_CLIENT_ID: ${{ secrets.SPOTIFY_CLIENT_ID }} SPOTIFY_CLIENT_SECRET: ${{ secrets.SPOTIFY_CLIENT_SECRET }} - run: python src/main.py --commit-and-push --auto-register + run: > + python $GITHUB_WORKSPACE/spotify-playlist-archive-src/src/main.py + --playlists-dir playlists --commit-and-push --auto-register diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 2182b94d07a72..0000000000000 --- a/.gitignore +++ /dev/null @@ -1,113 +0,0 @@ -# Swap files -*.swp - -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -.hypothesis/ -.pytest_cache/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# pyenv -.python-version - -# celery beat schedule file -celerybeat-schedule - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ - -# Pyre type checker -.pyre/ - -# Ignore local testing output -_playlists/ diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 485c7096c4572..0000000000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "plants"] - path = src/plants - url = git@github.com:mackorone/plants.git diff --git a/.isort.cfg b/.isort.cfg deleted file mode 100644 index cf41d2e5da448..0000000000000 --- a/.isort.cfg +++ /dev/null @@ -1,3 +0,0 @@ -[settings] -include_trailing_comma=True -multi_line_output=VERTICAL_HANGING_INDENT diff --git a/.pyre_configuration b/.pyre_configuration deleted file mode 100644 index e29ca17836cc0..0000000000000 --- a/.pyre_configuration +++ /dev/null @@ -1,5 +0,0 @@ -{ - "site_package_search_strategy": "pep561", - "source_directories": ["src/"], - "strict": true -} diff --git a/.watchmanconfig b/.watchmanconfig deleted file mode 100644 index 0967ef424bce6..0000000000000 --- a/.watchmanconfig +++ /dev/null @@ -1 +0,0 @@ -{} diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f590bdcc5cdbf..cf4797806c15f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -12,78 +12,7 @@ Alternatively, follow these steps: If you don't know what a playlist ID is, you can use [this tool](https://spotifyplaylistarchive.com/get-playlist-id) to extract it from the playlist's URL. -## Development +## Source Code -### Setup - -This project uses [`pip-tools`](https://github.com/jazzband/pip-tools) to manage -dependencies. - -To get started, first create and activate a new virtual environment: -``` -$ python3.8 -m venv venv -$ source venv/bin/activate -``` - -Then upgrade `pip` and install `pip-tools`: -``` -$ pip install --upgrade pip -$ pip install pip-tools -``` - -Lastly, use `pip-sync` to install the dev requirements: -``` -$ pip-sync requirements/requirements-dev.txt -``` - -### Formatting - -This project uses [`isort`](https://github.com/pycqa/isort) and -[`black`](https://github.com/psf/black) to automatically format the source code. -You should invoke both of them, in that order, before submitting pull requests: -``` -$ isort src/ -$ black src/ -``` - -### Linting - -This project uses [`flake8`](https://github.com/pycqa/flake8) for linting, a -basic form of static analysis. You can use `flake8` to check for errors and bad -code style: -``` -$ flake8 src/ -``` - -### Type Checking - -This project uses [`pyre`](https://github.com/facebook/pyre-check) to check for -type errors. You can invoke it from anywhere in the repository as follows: -``` -$ pyre -``` - -Note that Pyre depends on [`watchman`](https://github.com/facebook/watchman), a -file watching service, for incremental type checking. It takes a few minutes to -install, but it's worth the investment - it makes type checking almost -instantaneous. - -### Unit Testing - -After making changes, you should update unit tests and run them as follows: -``` -$ cd src -$ python -m unittest tests/\*.py -``` - -### Integration Testing - -Copy the `playlists` directory to `_playlists`: -``` -$ cp -r playlists _playlists -``` - -Then run the script: -``` -$ src/main.py -``` +The source code for this project lives here: +[mackorone/spotify-playlist-archive-src](https://github.com/mackorone/spotify-playlist-archive-src) diff --git a/README.md b/README.md index 7fbebbe9b2c52..633e5c829a28a 100644 --- a/README.md +++ b/README.md @@ -44,12 +44,10 @@ To recreate an old version of a playlist: ## How it works -This repository contains a script for scraping Spotify playlists and publishing -them back to the repo. The script is run daily via +This repository uses a +[script](https://github.com/mackorone/spotify-playlist-archive-src) +to scrape Spotify playlists and publish them back to the repo. The script is run daily via [GitHub Actions](https://github.com/mackorone/spotify-playlist-archive/actions/workflows/main.yml). -It's also run after every commit, which means that playlists get regenerated -whenever the scraping or formatting logic changes, or when new playlists are -added. The script determines which playlists to scrape by looking at the file names in `playlists/registry`. Files get regenerated as follows: a pretty version of each diff --git a/TODO.md b/TODO.md deleted file mode 100644 index 0b0d249f54d79..0000000000000 --- a/TODO.md +++ /dev/null @@ -1,33 +0,0 @@ -- Cumulative playlist improvements - - Make date first scraped more prominent, add it to pretty playlists - - Clean up near-duplicates in cumulative playlists - - Key: title, artists, and duration - - Check that this doesn't have false positives - - Album can (and often does) differ: - - https://open.spotify.com/track/2nG54Y4a3sH9YpfxMolOyi - - https://open.spotify.com/track/2z0IupRlVRlDN5r2IVqHyN - - Update data for all tracks, not just current tracks - - How to handle tracks that are removed from Spotify - -- Features - - Automatically add alias for empty playlist names (best effort) - - Debug perf issues: https://github.com/github/git-sizer - - Add intermediate directories to playlist dirs - - (Maybe) delete old cumulative playlist blobs - - https://next.github.com/projects/flat-data - - Automatically generate aliases for personalized playlists - -- Codebase - - More unit tests for playlist updater - - only_fetch_these_playlists - - Fix code complexity of playlist updater - - Merge cumulative regeneration code - - Create a separate class for SpotifyPlaylist - - No concept of "original" vs "unique" name - - Consider using library for serialization - - Measure code coverage and add missing tests - - .coveragerc file should include: - [report] - exclude_lines = @external - - Reuse scraping code from mackorone/spotify-playlist-publisher - - Play around with Spotipy: https://github.com/spotipy-dev/spotipy diff --git a/requirements/requirements-dev.in b/requirements/requirements-dev.in deleted file mode 100644 index 4caae34ba90d5..0000000000000 --- a/requirements/requirements-dev.in +++ /dev/null @@ -1,6 +0,0 @@ --r requirements.in - -black -flake8 -isort -pyre-check diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt deleted file mode 100644 index 1c1e117c36863..0000000000000 --- a/requirements/requirements-dev.txt +++ /dev/null @@ -1,99 +0,0 @@ -# -# This file is autogenerated by pip-compile with python 3.10 -# To update, run: -# -# pip-compile requirements/requirements-dev.in -# -aiohttp==3.8.5 - # via -r requirements/requirements.in -aiosignal==1.3.1 - # via aiohttp -async-timeout==4.0.2 - # via aiohttp -attrs==23.1.0 - # via aiohttp -black==23.7.0 - # via -r requirements/requirements-dev.in -brotli==1.1.0 - # via -r requirements/requirements.in -charset-normalizer==3.2.0 - # via aiohttp -click==8.1.6 - # via - # black - # pyre-check -dataclasses-json==0.5.13 - # via pyre-check -flake8==6.0.0 - # via -r requirements/requirements-dev.in -frozenlist==1.4.0 - # via - # aiohttp - # aiosignal -idna==3.4 - # via yarl -intervaltree==3.1.0 - # via pyre-check -isort==5.12.0 - # via -r requirements/requirements-dev.in -libcst==1.0.1 - # via pyre-check -marshmallow==3.20.1 - # via dataclasses-json -mccabe==0.7.0 - # via flake8 -multidict==6.0.4 - # via - # aiohttp - # yarl -mypy-extensions==1.0.0 - # via - # black - # typing-inspect -packaging==23.1 - # via - # black - # marshmallow -pathspec==0.11.1 - # via black -platformdirs==3.9.1 - # via black -psutil==5.9.5 - # via - # pyre-check - # testslide -pycodestyle==2.10.0 - # via flake8 -pyflakes==3.0.1 - # via flake8 -pygments==2.15.1 - # via testslide -pyre-check==0.9.18 - # via -r requirements/requirements-dev.in -pyre-extensions==0.0.30 - # via pyre-check -pyyaml==6.0.1 - # via libcst -sortedcontainers==2.4.0 - # via intervaltree -tabulate==0.9.0 - # via pyre-check -testslide==2.7.1 - # via pyre-check -tomli==2.0.1 - # via black -typeguard==2.13.3 - # via testslide -typing-extensions==4.7.1 - # via - # libcst - # pyre-check - # pyre-extensions - # typing-inspect -typing-inspect==0.9.0 - # via - # dataclasses-json - # libcst - # pyre-extensions -yarl==1.9.2 - # via aiohttp diff --git a/requirements/requirements.in b/requirements/requirements.in deleted file mode 100644 index 89bb017baa20a..0000000000000 --- a/requirements/requirements.in +++ /dev/null @@ -1,2 +0,0 @@ -aiohttp -brotli diff --git a/requirements/requirements.txt b/requirements/requirements.txt deleted file mode 100644 index bb8bffa8070f7..0000000000000 --- a/requirements/requirements.txt +++ /dev/null @@ -1,30 +0,0 @@ -# -# This file is autogenerated by pip-compile with python 3.10 -# To update, run: -# -# pip-compile requirements/requirements.in -# -aiohttp==3.8.5 - # via -r requirements/requirements.in -aiosignal==1.3.1 - # via aiohttp -async-timeout==4.0.2 - # via aiohttp -attrs==23.1.0 - # via aiohttp -brotli==1.1.0 - # via -r requirements/requirements.in -charset-normalizer==3.2.0 - # via aiohttp -frozenlist==1.4.0 - # via - # aiohttp - # aiosignal -idna==3.4 - # via yarl -multidict==6.0.4 - # via - # aiohttp - # yarl -yarl==1.9.2 - # via aiohttp diff --git a/src/alias.py b/src/alias.py deleted file mode 100644 index f20d0da148167..0000000000000 --- a/src/alias.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python3 - -import string - - -class InvalidAliasError(Exception): - pass - - -class Alias(str): - def __new__(cls, alias: str) -> str: - if ( - not alias - or cls._contains_invalid_whitespace(alias) - or cls._contains_enclosing_whitespace(alias) - ): - raise InvalidAliasError(alias) - return super().__new__(cls, alias) - - @classmethod - def _contains_invalid_whitespace(cls, candidate: str) -> bool: - invalid_chars = set(string.whitespace) - set(" \t") - return bool(invalid_chars & set(candidate)) - - @classmethod - def _contains_enclosing_whitespace(cls, candidate: str) -> bool: - return candidate.strip() != candidate diff --git a/src/environment.py b/src/environment.py deleted file mode 100644 index 1c16e0e7927d1..0000000000000 --- a/src/environment.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 - -import os -import pathlib - -from plants.environment import Environment as PlantsEnvironment -from plants.external import external - -# Subclassing pathlib.Path is difficult because the concrete type depends on -# platform, which is determined at runtime. This second line determines which -# type to subclass while the first line tricks Pyre into (correctly) thinking -# that ConcretePathType is valid type. -ConcretePathType = pathlib.Path -ConcretePathType = type(pathlib.Path()) - - -class RelativePath(ConcretePathType): - def __new__(cls, path: pathlib.Path) -> pathlib.Path: - return super().__new__(cls, os.path.relpath(path)) - - -class Environment: - @classmethod - def get_prod_playlists_dir(cls) -> RelativePath: - return RelativePath(cls._get_repo_dir() / "playlists") - - @classmethod - def get_test_playlists_dir(cls) -> RelativePath: - dir_name = "_playlists/" - repo_dir = cls._get_repo_dir() - with open(repo_dir / ".gitignore") as f: - assert dir_name in f.read().splitlines() - return RelativePath(repo_dir / dir_name) - - @classmethod - @external - def _get_repo_dir(cls) -> pathlib.Path: - repo_dir = PlantsEnvironment.get_repo_root() - assert repo_dir.name == "spotify-playlist-archive" - return repo_dir diff --git a/src/file_formatter.py b/src/file_formatter.py deleted file mode 100644 index 820b23204e374..0000000000000 --- a/src/file_formatter.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 - -import dataclasses -import datetime -import json -from typing import List, Mapping, Tuple - -from plants.markdown import MarkdownEscapedString -from playlist_id import PlaylistID -from playlist_types import CumulativePlaylist, Playlist, Track -from url import URL - - -class Formatter: - TRACK_NO = "No." - TITLE = "Title" - ARTISTS = "Artist(s)" - ALBUM = "Album" - LENGTH = "Length" - ADDED = "Added" - REMOVED = "Removed" - - ARTIST_SEPARATOR = ", " - - @classmethod - def readme(cls, prev_content: str, playlists: Mapping[PlaylistID, Playlist]) -> str: - old_lines = prev_content.splitlines() - prefix = "## Playlists" - index = next(i for i, line in enumerate(old_lines) if line.startswith(prefix)) - header = prefix + MarkdownEscapedString(f" ({len(playlists)})") - - playlist_tuples: List[Tuple[str, str]] = [] - for playlist_id, playlist in playlists.items(): - name_stripped = playlist.unique_name.strip() - text = MarkdownEscapedString(name_stripped) - link = cls._link(text, URL.pretty(playlist_id)) - playlist_tuples.append((name_stripped, f"- {link}")) - playlist_lines = [text for key, text in sorted(playlist_tuples)] - - new_lines = old_lines[:index] + [header, ""] + playlist_lines - return "\n".join(new_lines) + "\n" - - @classmethod - def metadata_full_json(cls, playlists: Mapping[PlaylistID, Playlist]) -> str: - data = {} - for playlist_id, playlist in playlists.items(): - playlist_dict = dataclasses.asdict(playlist) - del playlist_dict["tracks"] - data[playlist_id] = playlist_dict - return json.dumps(data, indent=2, sort_keys=True) - - @classmethod - def metadata_compact_json(cls, playlists: Mapping[PlaylistID, Playlist]) -> str: - data = {} - for playlist_id, playlist in playlists.items(): - data[playlist_id] = playlist.unique_name - return json.dumps(data, separators=(",", ":"), sort_keys=True) - - @classmethod - def plain(cls, playlist_id: PlaylistID, playlist: Playlist) -> str: - lines = [cls._plain_line_from_track(track) for track in playlist.tracks] - # Sort alphabetically to minimize changes when tracks are reordered - sorted_lines = sorted(lines, key=lambda line: line.lower()) - header = [playlist.unique_name, playlist.description, ""] - return "\n".join(header + sorted_lines) + "\n" - - @classmethod - def pretty(cls, playlist_id: PlaylistID, playlist: Playlist) -> str: - columns = [ - cls.TRACK_NO, - cls.TITLE, - cls.ARTISTS, - cls.ALBUM, - cls.LENGTH, - ] - - vertical_separators = ["|"] * (len(columns) + 1) - line_template = " {} ".join(vertical_separators) - divider_line = "---".join(vertical_separators) - lines = cls._markdown_header_lines( - playlist_name=playlist.unique_name, - playlist_url=playlist.url, - playlist_id=playlist_id, - playlist_description=playlist.description, - is_cumulative=False, - ) - num_likes = playlist.num_followers - if num_likes is None: - num_likes_string = "??? likes" - else: - num_likes_string = f"{num_likes:,} like" + ("s" if num_likes > 1 else "") - num_songs = len(playlist.tracks) - lines += [ - "{} - {} - {} - {}".format( - cls._link( - MarkdownEscapedString(playlist.owner.name), playlist.owner.url - ), - num_likes_string, - f"{num_songs:,} song" + ("s" if num_songs > 1 else ""), - cls._format_duration_english( - sum(track.duration_ms for track in playlist.tracks) - ), - ), - "", - line_template.format(*columns), - divider_line, - ] - - for i, track in enumerate(playlist.tracks): - lines.append( - line_template.format( - i + 1, - cls._link(MarkdownEscapedString(track.name), track.url), - cls.ARTIST_SEPARATOR.join( - [ - cls._link(MarkdownEscapedString(artist.name), artist.url) - for artist in track.artists - ] - ), - cls._link(MarkdownEscapedString(track.album.name), track.album.url), - cls._format_duration(track.duration_ms), - ) - ) - - lines.append("") - lines.append(f"Snapshot ID: `{playlist.snapshot_id}`") - - return "\n".join(lines) + "\n" - - @classmethod - def cumulative( - cls, - playlist_id: PlaylistID, - playlist: CumulativePlaylist, - ) -> str: - columns = [ - cls.TITLE, - cls.ARTISTS, - cls.ALBUM, - cls.LENGTH, - cls.ADDED, - cls.REMOVED, - ] - - vertical_separators = ["|"] * (len(columns) + 1) - line_template = " {} ".join(vertical_separators) - divider_line = "---".join(vertical_separators) - lines = cls._markdown_header_lines( - playlist_name=playlist.name, - playlist_url=playlist.url, - playlist_id=playlist_id, - playlist_description=playlist.description, - is_cumulative=True, - ) - - num_songs = len(playlist.tracks) - info_line = "{} - {}".format( - f"{num_songs:,} song" + ("s" if num_songs > 1 else ""), - cls._format_duration_english( - sum(track.duration_ms for track in playlist.tracks) - ), - ) - - lines += [ - info_line, - "", - line_template.format(*columns), - divider_line, - ] - - for i, track in enumerate(playlist.tracks): - date_added = str(track.date_added) - if track.date_added_asterisk: - date_added += "\\*" - lines.append( - line_template.format( - # Title - cls._link(MarkdownEscapedString(track.name), track.url), - # Artists - cls.ARTIST_SEPARATOR.join( - [ - cls._link(MarkdownEscapedString(artist.name), artist.url) - for artist in track.artists - ] - ), - # Album - cls._link(MarkdownEscapedString(track.album.name), track.album.url), - # Length - cls._format_duration(track.duration_ms), - # Added - date_added, - # Removed - track.date_removed or "", - ) - ) - - lines.append("") - lines.append( - f"\\*This playlist was first scraped on {playlist.date_first_scraped}. " - "Prior content cannot be recovered." - ) - - return "\n".join(lines) + "\n" - - @classmethod - def _markdown_header_lines( - cls, - playlist_name: str, - playlist_url: str, - playlist_id: PlaylistID, - playlist_description: str, - is_cumulative: bool, - ) -> List[str]: - if is_cumulative: - pretty = cls._link(MarkdownEscapedString("pretty"), URL.pretty(playlist_id)) - cumulative = "cumulative" - else: - pretty = "pretty" - cumulative = cls._link( - MarkdownEscapedString("cumulative"), URL.cumulative(playlist_id) - ) - - return [ - "{} - {} - {} - {}".format( - pretty, - cumulative, - cls._link(MarkdownEscapedString("plain"), URL.plain(playlist_id)), - cls._link( - MarkdownEscapedString("githistory"), URL.plain_history(playlist_id) - ), - ), - "", - "### {}".format( - cls._link(MarkdownEscapedString(playlist_name), playlist_url) - ), - "", - # Some descriptions end with newlines, strip to clean them up - "> {}".format(MarkdownEscapedString(playlist_description.strip())), - "", - ] - - @classmethod - def _plain_line_from_track(cls, track: Track) -> str: - return "{} -- {} -- {}".format( - track.name, - cls.ARTIST_SEPARATOR.join([artist.name for artist in track.artists]), - track.album.name, - ) - - @classmethod - def _link(cls, text: MarkdownEscapedString, url: str) -> str: - if not url: - return text - return f"[{text}]({url})" - - @classmethod - def _format_duration(cls, duration_ms: int) -> str: - seconds = int(duration_ms // 1000) - timedelta = str(datetime.timedelta(seconds=seconds)) - - index = 0 - # Strip leading zeros but always include the minutes digit - while index < len(timedelta) - 4 and timedelta[index] in "0:": - index += 1 - - return timedelta[index:] - - @classmethod - def _format_duration_english(cls, duration_ms: int) -> str: - second_ms = 1000 - minute_ms = 60 * second_ms - hour_ms = 60 * minute_ms - day_ms = 24 * hour_ms - - days = duration_ms // day_ms - duration_ms -= days * day_ms - hours = duration_ms // hour_ms - duration_ms -= hours * hour_ms - minutes = duration_ms // minute_ms - duration_ms -= minutes * minute_ms - seconds = duration_ms // second_ms - - if not (days or hours or minutes): - return f"{seconds} sec" - if not (days or hours): - return f"{minutes} min {seconds} sec" - if not days: - return f"{hours} hr {minutes} min" - return f"{days:,} day {hours} hr {minutes} min" diff --git a/src/file_manager.py b/src/file_manager.py deleted file mode 100644 index 2602e0515aa0f..0000000000000 --- a/src/file_manager.py +++ /dev/null @@ -1,143 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import pathlib -from typing import AbstractSet, Dict, Optional, Set - -from alias import Alias, InvalidAliasError -from playlist_id import PlaylistID - -logger: logging.Logger = logging.getLogger(__name__) - - -class MalformedAliasError(Exception): - pass - - -class UnexpectedFilesError(Exception): - pass - - -class FileManager: - def __init__(self, playlists_dir: pathlib.Path) -> None: - self._playlists_dir = playlists_dir - - def ensure_subdirs_exist(self) -> None: - for directory in [ - self._get_registry_dir(), - self._get_plain_dir(), - self._get_pretty_dir(), - self._get_cumulative_dir(), - self._get_metadata_dir(), - ]: - directory.mkdir(parents=True, exist_ok=True) - - def ensure_registered(self, playlist_ids: AbstractSet[PlaylistID]) -> None: - registry_dir = self._get_registry_dir() - for playlist_id in sorted(playlist_ids): - path = registry_dir / playlist_id - if not path.exists(): - logger.info(f"Registering playlist: {playlist_id}") - path.touch() - - def fixup_aliases(self) -> None: - # GitHub makes it easy to create files that look empty but actually - # contain a single newline. Normalize them to simplify other logic. - for path in sorted(self._get_registry_dir().iterdir()): - with open(path, "r") as f: - content = f.read() - if content == "\n": - logger.info(f"Truncating empty alias: {path.name}") - with open(path, "w"): - pass - - def get_registered_playlists(self) -> Dict[PlaylistID, Optional[Alias]]: - playlists: Dict[PlaylistID, Optional[Alias]] = {} - for path in sorted(self._get_registry_dir().iterdir()): - playlist_id = PlaylistID(path.name) - with open(path, "r") as f: - lines = f.read().splitlines() - if lines: - if len(lines) != 1: - raise MalformedAliasError(f"Malformed alias: {playlist_id}") - try: - alias = Alias(lines[0]) - except InvalidAliasError: - raise MalformedAliasError(f"Malformed alias: {playlist_id}") - else: - alias = None - playlists[playlist_id] = alias - return playlists - - def ensure_no_unexpected_files(self) -> None: - unexpected_files: Set[pathlib.Path] = set() - playlist_ids = set(path.name for path in self._get_registry_dir().iterdir()) - for directory, suffixes in [ - (self._get_plain_dir(), [""]), - (self._get_pretty_dir(), [".md", ".json"]), - (self._get_cumulative_dir(), [".md", ".json"]), - ]: - for path in directory.iterdir(): - if not any( - path.name.endswith(suffix) - and self._remove_suffix(path.name, suffix) in playlist_ids - for suffix in suffixes - ): - unexpected_files.add(path) - if unexpected_files: - raise UnexpectedFilesError(f"Unexpected files: {unexpected_files}") - - def get_plain_path(self, playlist_id: PlaylistID) -> pathlib.Path: - return self._get_plain_dir() / playlist_id - - def get_pretty_json_path(self, playlist_id: PlaylistID) -> pathlib.Path: - return self._get_pretty_dir() / f"{playlist_id}.json" - - def get_pretty_markdown_path(self, playlist_id: PlaylistID) -> pathlib.Path: - return self._get_pretty_dir() / f"{playlist_id}.md" - - def get_cumulative_json_path(self, playlist_id: PlaylistID) -> pathlib.Path: - return self._get_cumulative_dir() / f"{playlist_id}.json" - - def get_cumulative_markdown_path(self, playlist_id: PlaylistID) -> pathlib.Path: - return self._get_cumulative_dir() / f"{playlist_id}.md" - - def get_old_metadata_json_path(self) -> pathlib.Path: - return self._playlists_dir / "metadata.json" - - def get_metadata_full_json_path(self) -> pathlib.Path: - return self._get_metadata_dir() / "metadata-full.json" - - def get_metadata_compact_json_path(self) -> pathlib.Path: - return self._get_metadata_dir() / "metadata-compact.json" - - def get_metadata_full_json_br_path(self) -> pathlib.Path: - return self._get_metadata_dir() / "metadata-full.json.br" - - def get_metadata_compact_json_br_path(self) -> pathlib.Path: - return self._get_metadata_dir() / "metadata-compact.json.br" - - def get_readme_path(self) -> pathlib.Path: - return self._playlists_dir.parent / "README.md" - - def _get_registry_dir(self) -> pathlib.Path: - return self._playlists_dir / "registry" - - def _get_plain_dir(self) -> pathlib.Path: - return self._playlists_dir / "plain" - - def _get_pretty_dir(self) -> pathlib.Path: - return self._playlists_dir / "pretty" - - def _get_cumulative_dir(self) -> pathlib.Path: - return self._playlists_dir / "cumulative" - - def _get_metadata_dir(self) -> pathlib.Path: - return self._playlists_dir / "metadata" - - @classmethod - def _remove_suffix(cls, string: str, suffix: str) -> str: - if not suffix: - return string - assert string.endswith(suffix) - return string[: -len(suffix)] diff --git a/src/file_updater.py b/src/file_updater.py deleted file mode 100644 index 942eab23b005e..0000000000000 --- a/src/file_updater.py +++ /dev/null @@ -1,342 +0,0 @@ -#!/usr/bin/env python3 - -import collections -import datetime -import logging -import pathlib -from typing import Dict, Optional, Set, TypeVar - -import brotli - -from file_formatter import Formatter -from file_manager import FileManager -from git_utils import GitUtils -from plants.environment import Environment -from playlist_id import PlaylistID -from playlist_types import CumulativePlaylist, Playlist -from spotify import FailedRequestError, Spotify - -logger: logging.Logger = logging.getLogger(__name__) - - -T = TypeVar("T", str, bytes) - - -class FileUpdater: - @classmethod - async def update_files( - cls, - now: datetime.datetime, - file_manager: FileManager, - auto_register: bool, - update_readme: bool, - ) -> None: - # Check nonempty to fail fast - client_id = Environment.get_env("SPOTIFY_CLIENT_ID") - client_secret = Environment.get_env("SPOTIFY_CLIENT_SECRET") - assert client_id and client_secret - - # Initialize the Spotify client - access_token = await Spotify.get_access_token( - client_id=client_id, client_secret=client_secret - ) - spotify = Spotify(access_token) - try: - await cls._update_files_impl( - now=now, - file_manager=file_manager, - auto_register=auto_register, - update_readme=update_readme, - spotify=spotify, - ) - finally: - await spotify.shutdown() - - @classmethod - async def _update_files_impl( - cls, - now: datetime.datetime, - file_manager: FileManager, - auto_register: bool, - update_readme: bool, - spotify: Spotify, - ) -> None: - # Ensure the output directories exist - file_manager.ensure_subdirs_exist() - - # Optimization: if the last commit only touched the registry, only the - # touched playlists will generate downstream changes, so only fetch - # those playlists. This makes adding new playlists cheap. - logger.info("Checking if last commit was registry-only") - only_fetch_these_playlists: Optional[Set[PlaylistID]] = None - uncommitted_changes = GitUtils.any_uncommitted_changes() - last_commit_content = GitUtils.get_last_commit_content() - # To prevent suprising behavior when testing locally, don't perform the - # optimization if there are local changes - if (not uncommitted_changes) and all( - path.startswith("playlists/registry") for path in last_commit_content - ): - only_fetch_these_playlists = { - PlaylistID(pathlib.Path(path).name) for path in last_commit_content - } - logger.info(f"Only fetch these playlists: {only_fetch_these_playlists}") - - # Automatically register select playlists - if auto_register and not only_fetch_these_playlists: - try: - await cls._auto_register(spotify, file_manager) - except Exception: - logger.exception("Failed to auto-register playlists") - - file_manager.fixup_aliases() - registered_playlists = file_manager.get_registered_playlists() - - # Read existing playlist data, useful if Spotify fetch fails - playlists: Dict[PlaylistID, Playlist] = {} - for playlist_id in sorted(registered_playlists): - path = file_manager.get_pretty_json_path(playlist_id) - prev_content = cls._get_file_content_or_empty_string(path) - if prev_content: - playlists[playlist_id] = Playlist.from_json(prev_content) - - # Update playlist data from Spotify - playlists_to_fetch = sorted(only_fetch_these_playlists or registered_playlists) - logger.info(f"Fetching {len(playlists_to_fetch)} playlist(s)...") - for i, playlist_id in enumerate(sorted(playlists_to_fetch)): - denominator = str(len(playlists_to_fetch)) - numerator = str(i).rjust(len(denominator)) - progress_fraction = i / len(playlists_to_fetch) - progress_percent = f"{progress_fraction:.1%}".rjust(5) - logger.info( - f"({numerator} / {denominator} - {progress_percent}) {playlist_id}" - ) - try: - playlists[playlist_id] = await spotify.get_playlist( - playlist_id, alias=registered_playlists[playlist_id] - ) - # When playlists are deleted, the Spotify API returns 404; skip - # those playlists (no updates) but retain them in the archive - except FailedRequestError: - logger.warning(f"Failed to fetch playlist: {playlist_id}") - logger.info("Done fetching playlists") - - # Gracefully handle playlists with the same name - original_playlist_names_to_ids = collections.defaultdict(set) - for playlist_id, playlist in playlists.items(): - original_playlist_names_to_ids[playlist.original_name].add(playlist_id) - duplicate_names: Dict[str, Set[PlaylistID]] = { - name: playlist_ids - for name, playlist_ids in original_playlist_names_to_ids.items() - if len(playlist_ids) > 1 - } - if duplicate_names: - logger.info("Handling duplicate names") - for original_name, playlist_ids in sorted(duplicate_names.items()): - sorted_by_num_followers = sorted( - playlist_ids, - # Sort by num_followers desc, playlist_id asc - key=lambda playlist_id: ( - -1 * (playlists[playlist_id].num_followers or 0), - playlist_id, - ), - ) - for i, playlist_id in enumerate(sorted_by_num_followers): - if i == 0: - logger.info(f" {playlist_id}: {original_name}") - continue - suffix = 2 - unique_name = f"{original_name} ({suffix})" - while any( - other_playlist_id != playlist_id - and other_playlist.unique_name == unique_name - for other_playlist_id, other_playlist in playlists.items() - ): - suffix += 1 - unique_name = f"{original_name} ({suffix})" - logger.info(f" {playlist_id}: {unique_name}") - playlist = playlists[playlist_id] - playlists[playlist_id] = Playlist( - url=playlist.url, - original_name=original_name, - unique_name=unique_name, - description=playlist.description, - tracks=playlist.tracks, - snapshot_id=playlist.snapshot_id, - num_followers=playlist.num_followers, - owner=playlist.owner, - ) - - # If we only fetched certain playlists, we only need to update those - # playlists along with any playlists that share the same name (their - # unique names may have changed) - if only_fetch_these_playlists: - possibly_affected_playlists = only_fetch_these_playlists - for original_name, playlist_ids in duplicate_names.items(): - # If any intersection, include all playlists - if only_fetch_these_playlists & playlist_ids: - possibly_affected_playlists |= playlist_ids - playlists_to_update = { - playlist_id: playlist - for playlist_id, playlist in playlists.items() - if playlist_id in possibly_affected_playlists - } - else: - playlists_to_update = playlists - - # Process the playlists - logger.info(f"Updating {len(playlists_to_update)} playlist(s)...") - for playlist_id, playlist in sorted(playlists_to_update.items()): - logger.info(f"Playlist ID: {playlist_id}") - logger.info(f"Playlist name: {playlist.unique_name}") - - plain_path = file_manager.get_plain_path(playlist_id) - pretty_json_path = file_manager.get_pretty_json_path(playlist_id) - pretty_md_path = file_manager.get_pretty_markdown_path(playlist_id) - cumulative_json_path = file_manager.get_cumulative_json_path(playlist_id) - cumulative_md_path = file_manager.get_cumulative_markdown_path(playlist_id) - - # Update plain playlist - prev_content = cls._get_file_content_or_empty_string(plain_path) - content = Formatter.plain(playlist_id, playlist) - cls._write_to_file_if_content_changed( - prev_content=prev_content, - content=content, - path=plain_path, - ) - - # Update pretty JSON - prev_content = cls._get_file_content_or_empty_string(pretty_json_path) - cls._write_to_file_if_content_changed( - prev_content=prev_content, - content=playlist.to_json() + "\n", - path=pretty_json_path, - ) - - # Update pretty markdown - prev_content = cls._get_file_content_or_empty_string(pretty_md_path) - content = Formatter.pretty(playlist_id, playlist) - cls._write_to_file_if_content_changed( - prev_content=prev_content, - content=content, - path=pretty_md_path, - ) - - # Update cumulative JSON - today = now.date() - prev_content = cls._get_file_content_or_empty_string(cumulative_json_path) - if prev_content: - prev_struct = CumulativePlaylist.from_json(prev_content) - else: - prev_struct = CumulativePlaylist( - url="", - name="", - description="", - tracks=[], - date_first_scraped=today, - ) - new_struct = prev_struct.update(today, playlist) - cls._write_to_file_if_content_changed( - prev_content=prev_content, - content=new_struct.to_json() + "\n", - path=cumulative_json_path, - ) - - # Update cumulative markdown - prev_content = cls._get_file_content_or_empty_string(cumulative_md_path) - content = Formatter.cumulative(playlist_id, new_struct) - cls._write_to_file_if_content_changed( - prev_content=prev_content, - content=content, - path=cumulative_md_path, - ) - - # Check for unexpected files in playlist directories - file_manager.ensure_no_unexpected_files() - - # Update all metadata files - metadata_full_json = Formatter.metadata_full_json(playlists) - metadata_compact_json = Formatter.metadata_compact_json(playlists) - cls._maybe_update_file( - path=file_manager.get_old_metadata_json_path(), - content=metadata_full_json + "\n", - ) - cls._maybe_update_file( - path=file_manager.get_metadata_full_json_path(), - content=metadata_full_json + "\n", - ) - cls._maybe_update_file( - path=file_manager.get_metadata_compact_json_path(), - content=metadata_compact_json + "\n", - ) - cls._maybe_update_file( - path=file_manager.get_metadata_full_json_br_path(), - content=brotli.compress(metadata_full_json.encode()), - ) - cls._maybe_update_file( - path=file_manager.get_metadata_compact_json_br_path(), - content=brotli.compress(metadata_compact_json.encode()), - ) - - # Lastly, update README.md - if update_readme: - readme_path = file_manager.get_readme_path() - with open(readme_path, "r") as f: - prev_content = f.read() - content = Formatter.readme(prev_content, playlists) - cls._write_to_file_if_content_changed(prev_content, content, readme_path) - - @classmethod - async def _auto_register(cls, spotify: Spotify, file_manager: FileManager) -> None: - playlist_ids = ( - await spotify.get_spotify_user_playlist_ids() - | await spotify.get_featured_playlist_ids() - | await spotify.get_category_playlist_ids() - ) - file_manager.ensure_registered(playlist_ids) - - @classmethod - def _get_file_content_or_empty_string(cls, path: pathlib.Path) -> str: - try: - with open(path, "r") as f: - return f.read() - except FileNotFoundError: - return "" - - @classmethod - def _get_file_content_or_empty_bytes(cls, path: pathlib.Path) -> bytes: - try: - with open(path, "rb") as f: - return f.read() - except FileNotFoundError: - return b"" - - @classmethod - def _write_to_file_if_content_changed( - cls, prev_content: T, content: T, path: pathlib.Path - ) -> None: - if content == prev_content: - logger.info(f" No changes to file: {path}") - return - logger.info(f" Writing updates to file: {path}") - if isinstance(content, bytes): - with open(path, "wb") as f: - f.write(content) - elif isinstance(content, str): - with open(path, "w") as f: - f.write(content) - else: - raise RuntimeError(f"Invalid content type: {type(content)}") - - @classmethod - def _maybe_update_file(cls, path: pathlib.Path, content: T) -> None: - if isinstance(content, bytes): - prev_content = cls._get_file_content_or_empty_bytes(path) - elif isinstance(content, str): - prev_content = cls._get_file_content_or_empty_string(path) - else: - raise RuntimeError(f"Invalid content type: {type(content)}") - cls._write_to_file_if_content_changed( - prev_content=prev_content, - content=content, - path=path, - ) diff --git a/src/git_utils.py b/src/git_utils.py deleted file mode 100644 index 8f4f17b9fcbe5..0000000000000 --- a/src/git_utils.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import subprocess -from typing import List, Tuple - -from plants.subprocess_utils import SubprocessUtils - -logger: logging.Logger = logging.getLogger(__name__) - - -class GitUtils: - @classmethod - def any_uncommitted_changes(cls) -> bool: - result = cls._run(("git", "status", "-s")) - return bool(result.stdout) - - @classmethod - def get_last_commit_content(cls) -> List[str]: - """Get files affected by the most recent commit""" - result = cls._run(("git", "log", "--name-only", "--pretty=format:", "-1")) - return result.stdout.splitlines() - - @classmethod - def _run(cls, args: Tuple[str, ...]) -> "subprocess.CompletedProcess[str]": - logger.info(f"- Running: {args}") - result = SubprocessUtils.run(args=args) - logger.info(f"- Exited with: {result.returncode}") - return result diff --git a/src/main.py b/src/main.py deleted file mode 100644 index 49e89094a8265..0000000000000 --- a/src/main.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import asyncio -import datetime -import logging - -from environment import Environment -from file_manager import FileManager -from file_updater import FileUpdater -from plants.committer import Committer -from plants.external import allow_external_calls -from plants.logging import configure_logging - -logger: logging.Logger = logging.getLogger(__name__) - - -async def main() -> None: - now = datetime.datetime.now() - parser = argparse.ArgumentParser(description="Snapshot Spotify playlists") - parser.add_argument( - "--auto-register", - help="Automatically register select playlists", - action="store_true", - ) - parser.add_argument( - "--commit-and-push", - help="Commit and push updated playlists upstream", - action="store_true", - ) - args = parser.parse_args() - auto_register = bool(args.auto_register) - commit_and_push = bool(args.commit_and_push) - - if commit_and_push: - playlists_dir = Environment.get_prod_playlists_dir() - else: - playlists_dir = Environment.get_test_playlists_dir() - file_manager = FileManager(playlists_dir=playlists_dir) - - await FileUpdater.update_files( - now=now, - file_manager=file_manager, - auto_register=auto_register, - update_readme=commit_and_push, - ) - if commit_and_push: - Committer.commit_and_push_if_github_actions() - - logger.info("Done") - - -if __name__ == "__main__": - allow_external_calls() - configure_logging() - asyncio.run(main()) diff --git a/src/plants b/src/plants deleted file mode 160000 index 9de2b12e61ad3..0000000000000 --- a/src/plants +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9de2b12e61ad3c29b7c05afaf7daa997afc24d88 diff --git a/src/playlist_id.py b/src/playlist_id.py deleted file mode 100644 index a83e77e9f39a5..0000000000000 --- a/src/playlist_id.py +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env python3 - - -class InvalidPlaylistIDError(Exception): - pass - - -class PlaylistID(str): - def __new__(cls, playlist_id: str) -> str: - if not playlist_id.isalnum(): - raise InvalidPlaylistIDError(playlist_id) - return super().__new__(cls, playlist_id) diff --git a/src/playlist_types.py b/src/playlist_types.py deleted file mode 100644 index 1f89610388641..0000000000000 --- a/src/playlist_types.py +++ /dev/null @@ -1,385 +0,0 @@ -#!/usr/bin/env python3 - -from __future__ import annotations - -import dataclasses -import datetime -import json -from typing import List, Optional, Sequence - - -@dataclasses.dataclass(frozen=True) -class Owner: - url: str - name: str - - -@dataclasses.dataclass(frozen=True) -class Album: - url: str - name: str - - -@dataclasses.dataclass(frozen=True) -class Artist: - url: str - name: str - - -@dataclasses.dataclass(frozen=True) -class Track: - url: str - name: str - album: Album - artists: Sequence[Artist] - duration_ms: int - added_at: Optional[datetime.datetime] - - def get_id(self) -> str: - track_id = self.url.split("/")[-1] - assert track_id.isalnum(), repr(track_id) - return track_id - - -@dataclasses.dataclass(frozen=True) -class Playlist: - url: str - # The registered alias, else the name from Spotify, unmodified - original_name: str - # A unique name for the playlist, derived from the original name. In most - # cases, they should be the exact same. When multiple playlists share the - # same original name, duplicates have a unique suffix, e.g., " (2)". - unique_name: str - description: str - tracks: Sequence[Track] - # The unqiue identifier for a particular playlist version. Note that for - # certain personalized playlists, snapshot ID changes with every request - # because the timestamp of the request is encoded within the ID. - # (https://artists.spotify.com/blog/our-playlist-ecosystem-is-evolving) - snapshot_id: str - num_followers: Optional[int] - owner: Owner - - @classmethod - def from_json(cls, content: str) -> Playlist: - playlist = json.loads(content) - assert isinstance(playlist, dict) - - playlist_url = playlist["url"] - assert isinstance(playlist_url, str) - - original_playlist_name = playlist["original_name"] - assert isinstance(original_playlist_name, str) - unique_playlist_name = playlist["unique_name"] - assert isinstance(unique_playlist_name, str) - - description = playlist["description"] - assert isinstance(description, str) - - snapshot_id = playlist["snapshot_id"] - assert isinstance(snapshot_id, str) - - num_followers = playlist["num_followers"] - assert isinstance(num_followers, (int, type(None))) - - assert isinstance(playlist["owner"], dict) - owner_url = playlist["owner"]["url"] - assert isinstance(owner_url, str) - owner_name = playlist["owner"]["name"] - assert isinstance(owner_name, str) - - tracks: List[Track] = [] - assert isinstance(playlist["tracks"], list) - for track in playlist["tracks"]: - assert isinstance(track, dict) - - track_url = track["url"] - assert isinstance(track_url, str) - - track_name = track["name"] - assert isinstance(track_name, str) - - assert isinstance(track["album"], dict) - album_url = track["album"]["url"] - assert isinstance(album_url, str) - album_name = track["album"]["name"] - assert isinstance(album_name, str) - - artists = [] - assert isinstance(track["artists"], list) - for artist in track["artists"]: - assert isinstance(artist, dict) - artist_url = artist["url"] - assert isinstance(artist_url, str) - artist_name = artist["name"] - assert isinstance(artist_name, str) - artists.append(Artist(url=artist_url, name=artist_name)) - - duration_ms = track["duration_ms"] - assert isinstance(duration_ms, int) - - added_at_string = track["added_at"] - if added_at_string is None: - added_at = None - else: - assert isinstance(added_at_string, str) - added_at = datetime.datetime.strptime( - added_at_string, "%Y-%m-%d %H:%M:%S" - ) - assert isinstance(added_at, (datetime.datetime, type(None))) - - tracks.append( - Track( - url=track_url, - name=track_name, - album=Album( - url=album_url, - name=album_name, - ), - artists=artists, - duration_ms=duration_ms, - added_at=added_at, - ) - ) - - return Playlist( - url=playlist_url, - original_name=original_playlist_name, - unique_name=unique_playlist_name, - description=description, - tracks=tracks, - snapshot_id=snapshot_id, - num_followers=num_followers, - owner=Owner( - url=owner_url, - name=owner_name, - ), - ) - - def to_json(self) -> str: - return json.dumps( - dataclasses.asdict(self), - indent=2, - sort_keys=True, - default=self.serialize_datetime, - ) - - @classmethod - def serialize_datetime(cls, obj: object) -> str: - assert isinstance(obj, datetime.datetime) - return str(obj) - - -@dataclasses.dataclass(frozen=True) -class CumulativeTrack: - url: str - name: str - album: Album - artists: Sequence[Artist] - duration_ms: int - # Represents the first date that the track appeared in the playlist, to our - # best knowledge - we can't know if a track was added and then removed - # prior to the playlist being scraped - date_added: datetime.date - # Indicates that the track belonged to the first version of the playlist - # that was indexed, but it's too late to go back and check when the track - # was originally added to the playlist - date_added_asterisk: bool - # Represents the most recent date that the track was removed from the - # playlist, is empty/null if the track is still present - date_removed: Optional[datetime.date] - - def get_id(self) -> str: - track_id = self.url.split("/")[-1] - assert track_id.isalnum(), repr(track_id) - return track_id - - -@dataclasses.dataclass(frozen=True) -class CumulativePlaylist: - url: str - name: str - description: str - tracks: Sequence[CumulativeTrack] - date_first_scraped: datetime.date - - def update( - self, - today: datetime.date, - playlist: Playlist, - ) -> CumulativePlaylist: - updated_tracks: List[CumulativeTrack] = [] - current_tracks = {track.get_id(): track for track in playlist.tracks} - previous_tracks = {track.get_id(): track for track in self.tracks} - for track_id in set(current_tracks) | set(previous_tracks): - old_data = previous_tracks.get(track_id) - new_data = current_tracks.get(track_id) - assert old_data or new_data - - if old_data: - url = old_data.url - name = old_data.name - album = old_data.album - artists = old_data.artists - duration_ms = old_data.duration_ms - date_added = old_data.date_added - date_added_asterisk = old_data.date_added_asterisk - date_removed = old_data.date_removed or today - - if new_data: - url = new_data.url - name = new_data.name - album = new_data.album - artists = new_data.artists - duration_ms = new_data.duration_ms - date_added = new_data.added_at.date() if new_data.added_at else today - date_added_asterisk = False - date_removed = None - # If the old date_added is earlier than what Spotify returned, use it - if old_data and old_data.date_added <= date_added: - date_added = old_data.date_added - date_added_asterisk = old_data.date_added_asterisk - - updated_tracks.append( - CumulativeTrack( - # pyre-fixme[61] - url=url, - # pyre-fixme[61] - name=name, - # pyre-fixme[61] - album=album, - # pyre-fixme[61] - artists=artists, - # pyre-fixme[61] - duration_ms=duration_ms, - # pyre-fixme[61] - date_added=date_added, - # pyre-fixme[61] - date_added_asterisk=date_added_asterisk, - # pyre-fixme[61] - date_removed=date_removed, - ) - ) - - return CumulativePlaylist( - url=playlist.url, - name=playlist.unique_name, - description=playlist.description, - tracks=sorted( - updated_tracks, - key=lambda track: ( - track.name.lower(), - tuple(artist.name.lower() for artist in track.artists), - track.duration_ms, - track.get_id(), - ), - ), - date_first_scraped=self.date_first_scraped, - ) - - @classmethod - def from_json(cls, content: str) -> CumulativePlaylist: - playlist = json.loads(content) - assert isinstance(playlist, dict) - - playlist_url = playlist["url"] - assert isinstance(playlist_url, str) - - playlist_name = playlist["name"] - assert isinstance(playlist_name, str) - - description = playlist["description"] - assert isinstance(description, str) - - date_first_scraped_string = playlist["date_first_scraped"] - assert isinstance(date_first_scraped_string, str) - date_first_scraped = datetime.datetime.strptime( - date_first_scraped_string, "%Y-%m-%d" - ).date() - assert isinstance(date_first_scraped, datetime.date) - - cumulative_tracks: List[CumulativeTrack] = [] - assert isinstance(playlist["tracks"], list) - for track in playlist["tracks"]: - assert isinstance(track, dict) - - track_url = track["url"] - assert isinstance(track_url, str) - - track_name = track["name"] - assert isinstance(track_name, str) - - assert isinstance(track["album"], dict) - album_url = track["album"]["url"] - assert isinstance(album_url, str) - album_name = track["album"]["name"] - assert isinstance(album_name, str) - - artists = [] - assert isinstance(track["artists"], list) - for artist in track["artists"]: - assert isinstance(artist, dict) - artist_url = artist["url"] - assert isinstance(artist_url, str) - artist_name = artist["name"] - assert isinstance(artist_name, str) - artists.append(Artist(url=artist_url, name=artist_name)) - - duration_ms = track["duration_ms"] - assert isinstance(duration_ms, int) - - date_added_string = track["date_added"] - assert isinstance(date_added_string, str) - date_added = datetime.datetime.strptime( - date_added_string, "%Y-%m-%d" - ).date() - assert isinstance(date_added, datetime.date) - - date_added_asterisk = track["date_added_asterisk"] - assert isinstance(date_added_asterisk, bool) - - date_removed = None - date_removed_string = track["date_removed"] - if date_removed_string is not None: - assert isinstance(date_removed_string, str) - date_removed = datetime.datetime.strptime( - date_removed_string, "%Y-%m-%d" - ).date() - assert isinstance(date_removed, datetime.date) - - cumulative_tracks.append( - CumulativeTrack( - url=track_url, - name=track_name, - album=Album( - url=album_url, - name=album_name, - ), - artists=artists, - duration_ms=duration_ms, - date_added=date_added, - date_added_asterisk=date_added_asterisk, - date_removed=date_removed, - ) - ) - - return CumulativePlaylist( - url=playlist_url, - name=playlist_name, - description=description, - tracks=cumulative_tracks, - date_first_scraped=date_first_scraped, - ) - - def to_json(self) -> str: - return json.dumps( - dataclasses.asdict(self), - indent=2, - sort_keys=True, - default=self.serialize_date, - ) - - @classmethod - def serialize_date(cls, obj: object) -> str: - assert isinstance(obj, datetime.date) - return str(obj) diff --git a/src/spotify.py b/src/spotify.py deleted file mode 100644 index df63a4671b170..0000000000000 --- a/src/spotify.py +++ /dev/null @@ -1,371 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import base64 -import datetime -import json -import logging -from typing import Any, Dict, List, Optional, Set, Type, TypeVar - -import aiohttp - -from alias import Alias -from plants.external import external -from playlist_id import PlaylistID -from playlist_types import Album, Artist, Owner, Playlist, Track - -logger: logging.Logger = logging.getLogger(__name__) - - -T = TypeVar("T") - - -class FailedRequestError(Exception): - pass - - -class InvalidDataError(Exception): - pass - - -class RetryBudgetExceededError(Exception): - pass - - -class Spotify: - - BASE_URL = "https://api.spotify.com/v1/" - - def __init__(self, access_token: str) -> None: - headers = {"Authorization": f"Bearer {access_token}"} - self._session: aiohttp.ClientSession = self._get_session(headers=headers) - # Handle rate limiting by retrying - self._retry_budget_seconds: float = 300 - - async def _get_with_retry( - self, href: str, *, max_spend_seconds: Optional[float] = None - ) -> Dict[str, Any]: - if max_spend_seconds is None: - max_spend_seconds = self._retry_budget_seconds - while True: - try: - async with self._session.get(href) as response: - status = response.status - if status == 429: - # Add an extra second, just to be safe - # https://stackoverflow.com/a/30557896/3176152 - backoff_seconds = int(response.headers["Retry-After"]) + 1 - reason = "Rate limited" - elif status // 100 == 5: - backoff_seconds = 1 - reason = f"Server error ({status})" - else: - data = await response.json(content_type=None) - context = json.dumps({"request": href, "response": data}) - if not isinstance(data, dict): - backoff_seconds = 1 - reason = "Invalid response" - elif not data: - backoff_seconds = 1 - reason = "Empty response" - elif "error" in data: - raise FailedRequestError(f"Failed request: {context}") - else: - return data - except aiohttp.client_exceptions.ClientConnectionError: - backoff_seconds = 1 - reason = "Connection problem" - except asyncio.exceptions.TimeoutError: - backoff_seconds = 1 - reason = "Asyncio timeout" - self._retry_budget_seconds -= backoff_seconds - if self._retry_budget_seconds <= 0: - raise RetryBudgetExceededError("Session retry budget exceeded") - max_spend_seconds -= backoff_seconds - if max_spend_seconds <= 0: - raise RetryBudgetExceededError("Request retry budget exceeded") - logger.warning(f"{reason}, will retry after {backoff_seconds}s") - await self._sleep(backoff_seconds) - - async def shutdown(self) -> None: - await self._session.close() - # Sleep to allow underlying connections to close - # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown - await self._sleep(0) - - async def get_spotify_user_playlist_ids(self) -> Set[PlaylistID]: - logger.info("Fetching @spotify playlist IDs") - playlist_ids: Set[PlaylistID] = set() - href = self.BASE_URL + "users/spotify/playlists?limit=50" - while href: - data = await self._get_with_retry(href) - playlist_ids |= {PlaylistID(x) for x in self._extract_ids(data)} - href = data.get("next") - return playlist_ids - - async def get_featured_playlist_ids(self) -> Set[PlaylistID]: - logger.info("Fetching featured playlist IDs") - playlist_ids: Set[PlaylistID] = set() - href = self.BASE_URL + "browse/featured-playlists?limit=50" - while href: - data = await self._get_with_retry(href) - playlists = self._get_optional(data, "playlists", dict) - if not playlists: - href = None - continue - playlist_ids |= {PlaylistID(x) for x in self._extract_ids(playlists)} - href = playlists.get("next") - return playlist_ids - - async def get_category_playlist_ids(self) -> Set[PlaylistID]: - logger.info("Fetching category playlist IDs") - playlist_ids: Set[PlaylistID] = set() - category_ids: Set[str] = set() - href = self.BASE_URL + "browse/categories?limit=50" - while href: - data = await self._get_with_retry(href, max_spend_seconds=3) - categories = self._get_optional(data, "categories", dict) - if not categories: - href = None - continue - category_ids |= self._extract_ids(categories) - href = categories.get("next") - for category in sorted(category_ids): - href = self.BASE_URL + f"browse/categories/{category}/playlists?limit=50" - while href: - try: - data = await self._get_with_retry(href, max_spend_seconds=3) - except FailedRequestError: - # Weirdly, some categories return 404 - break - playlists = self._get_optional(data, "playlists", dict) - if not playlists: - href = None - continue - playlist_ids |= {PlaylistID(x) for x in self._extract_ids(playlists)} - href = playlists.get("next") - return playlist_ids - - @classmethod - def _extract_ids(cls, data: Dict[str, Any]) -> Set[str]: - ids: Set[str] = set() - items = cls._get_optional(data, "items", list) - for item in items or []: - if not isinstance(item, dict): - continue - id_ = cls._get_optional(item, "id", str) - if not id_: - continue - ids.add(id_) - return ids - - async def get_playlist( - self, playlist_id: PlaylistID, *, alias: Optional[Alias] - ) -> Playlist: - href = self._get_playlist_href(playlist_id) - data = await self._get_with_retry(href) - - playlist_urls = self._get_required(data, "external_urls", dict) - playlist_url = self._get_optional(playlist_urls, "spotify", str) - if not playlist_url: - playlist_url = "" - - if alias: - name = alias - else: - name = self._get_required(data, "name", str) - if not name.strip(): - raise InvalidDataError(f"Empty playlist name: {repr(name)}") - - followers = self._get_required(data, "followers", dict) - followers_total = self._get_optional(followers, "total", int) - if followers_total is None: - logger.warning(f"Null followers total: {playlist_id}") - - owner = self._get_required(data, "owner", dict) - owner_urls = self._get_required(owner, "external_urls", dict) - owner_url = self._get_optional(owner_urls, "spotify", str) - if not owner_url: - owner_url = "" - owner_name = self._get_required(owner, "display_name", str) - if not owner_name: - logger.warning(f"Empty owner name: {owner_url}") - - return Playlist( - url=playlist_url, - original_name=name, - # When fetched, playlists are presumed to have unique names. Later - # on, if duplicates are discovered, their unique names get updated - # so they can be differentiated. It's bit hacky, but it's easier - # than defining separate structs for playlists fetched from Spotify - # and playlists read from JSON. - unique_name=name, - description=self._get_required(data, "description", str), - tracks=await self._get_tracks(playlist_id), - snapshot_id=self._get_required(data, "snapshot_id", str), - num_followers=followers_total, - owner=Owner( - url=owner_url, - name=owner_name, - ), - ) - - async def _get_tracks(self, playlist_id: PlaylistID) -> List[Track]: - tracks = [] - href = self._get_tracks_href(playlist_id) - - while href: - data = await self._get_with_retry(href) - items = self._get_required(data, "items", list) - for item in items: - if not isinstance(item, dict): - raise InvalidDataError(f"Invalid item: {item}") - - track = self._get_optional(item, "track", dict) - if not track: - continue - track_urls = self._get_required(track, "external_urls", dict) - track_url = self._get_optional(track_urls, "spotify", str) - if not track_url: - logger.warning("Skipping track with empty URL") - continue - track_name = self._get_required(track, "name", str) - if not track_name: - logger.warning(f"Empty track name: {track_url}") - - album = self._get_required(track, "album", dict) - album_urls = self._get_required(album, "external_urls", dict) - album_url = self._get_optional(album_urls, "spotify", str) - if not album_url: - album_url = "" - album_name = self._get_required(album, "name", str) - if not album_name: - logger.warning(f"Empty album name: {album_url}") - - artists = self._get_required(track, "artists", list) - artist_objs = [] - for artist in artists: - if not isinstance(artist, dict): - raise InvalidDataError(f"Invalid artist: {artist}") - artist_urls = self._get_required(artist, "external_urls", dict) - artist_url = self._get_optional(artist_urls, "spotify", str) - if not artist_url: - artist_url = "" - artist_name = self._get_required(artist, "name", str) - if not artist_name: - logger.warning(f"Empty artist name: {artist_url}") - artist_objs.append(Artist(url=artist_url, name=artist_name)) - - if not artist_objs: - logger.warning(f"Empty track artists: {track_url}") - - duration_ms = self._get_required(track, "duration_ms", int) - - added_at_string = self._get_optional(item, "added_at", str) - if added_at_string and added_at_string != "1970-01-01T00:00:00Z": - added_at = datetime.datetime.strptime( - added_at_string, "%Y-%m-%dT%H:%M:%SZ" - ) - else: - added_at = None - - tracks.append( - Track( - url=track_url, - name=track_name, - album=Album( - url=album_url, - name=album_name, - ), - artists=artist_objs, - duration_ms=duration_ms, - added_at=added_at, - ) - ) - - href = data.get("next") - - return tracks - - @classmethod - def _get_required( - cls, - dict_: Dict[str, Any], - key: str, - type_: Type[T], - ) -> T: - value = dict_.get(key) - if not isinstance(value, type_): - raise InvalidDataError(f"Invalid {key}: {value}") - return value - - @classmethod - def _get_optional( - cls, - dict_: Dict[str, Any], - key: str, - type_: Type[T], - ) -> Optional[T]: - value = dict_.get(key) - if not isinstance(value, (type_, type(None))): - raise InvalidDataError(f"Invalid {key}: {value}") - return value - - @classmethod - def _get_playlist_href(cls, playlist_id: PlaylistID) -> str: - rest = ( - "{}?fields=external_urls,name,description,snapshot_id," - "owner(display_name,external_urls),followers.total" - ) - template = cls.BASE_URL + "playlists/" + rest - return template.format(playlist_id) - - @classmethod - def _get_tracks_href(cls, playlist_id: PlaylistID) -> str: - rest = ( - "{}/tracks?fields=items(added_at,track(id,external_urls," - "duration_ms,name,album(external_urls,name),artists)),next" - ) - template = cls.BASE_URL + "playlists/" + rest - return template.format(playlist_id) - - @classmethod - async def get_access_token(cls, client_id: str, client_secret: str) -> str: - joined = f"{client_id}:{client_secret}" - encoded = base64.b64encode(joined.encode()).decode() - async with cls._get_session() as session: - async with session.post( - url="https://accounts.spotify.com/api/token", - data={"grant_type": "client_credentials"}, - headers={"Authorization": f"Basic {encoded}"}, - ) as response: - try: - data = await response.json() - except Exception as e: - raise InvalidDataError from e - - error = data.get("error") - if error: - raise InvalidDataError(f"Failed to get access token: {error}") - - access_token = data.get("access_token") - if not access_token: - raise InvalidDataError(f"Invalid access token: {access_token}") - - token_type = data.get("token_type") - if token_type != "Bearer": - raise InvalidDataError(f"Invalid token type: {token_type}") - - return access_token - - @classmethod - @external - def _get_session( - cls, headers: Optional[Dict[str, str]] = None - ) -> aiohttp.ClientSession: - return aiohttp.ClientSession(headers=headers) - - @classmethod - @external - async def _sleep(cls, seconds: float) -> None: - await asyncio.sleep(seconds) diff --git a/src/tests/test_alias.py b/src/tests/test_alias.py deleted file mode 100644 index c88517f6250ee..0000000000000 --- a/src/tests/test_alias.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python3 - -import string -from unittest import TestCase - -from alias import Alias, InvalidAliasError - - -class TestPlaylistID(TestCase): - def test_empty(self) -> None: - with self.assertRaises(InvalidAliasError): - Alias("") - - def test_invalid_whitespace(self) -> None: - for c in string.whitespace: - if c in " \t": - continue - with self.assertRaises(InvalidAliasError): - Alias(f"foo{c}bar") - - def test_enclosing_whitespace(self) -> None: - for c in string.whitespace: - for candidate in [f"{c}foo", f"foo{c}", f"{c}foo{c}"]: - with self.assertRaises(InvalidAliasError): - Alias(candidate) diff --git a/src/tests/test_file_formatter.py b/src/tests/test_file_formatter.py deleted file mode 100644 index 8b986ac3850de..0000000000000 --- a/src/tests/test_file_formatter.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python3 - -from unittest import TestCase - -from file_formatter import Formatter - - -class TestFormatDuration(TestCase): - def test_success(self) -> None: - self.assertEqual(Formatter._format_duration(0), "0:00") - self.assertEqual(Formatter._format_duration(999), "0:00") - self.assertEqual(Formatter._format_duration(1000), "0:01") - self.assertEqual(Formatter._format_duration(59999), "0:59") - self.assertEqual(Formatter._format_duration(60000), "1:00") - self.assertEqual(Formatter._format_duration(3599999), "59:59") - self.assertEqual(Formatter._format_duration(3600000), "1:00:00") - - -class TestFormatDurationEnglish(TestCase): - def test_success(self) -> None: - self.assertEqual(Formatter._format_duration_english(0), "0 sec") - self.assertEqual(Formatter._format_duration_english(999), "0 sec") - self.assertEqual(Formatter._format_duration_english(1000), "1 sec") - self.assertEqual(Formatter._format_duration_english(59999), "59 sec") - self.assertEqual(Formatter._format_duration_english(60000), "1 min 0 sec") - self.assertEqual(Formatter._format_duration_english(3599999), "59 min 59 sec") - self.assertEqual(Formatter._format_duration_english(3600000), "1 hr 0 min") - self.assertEqual(Formatter._format_duration_english(86399999), "23 hr 59 min") - self.assertEqual( - Formatter._format_duration_english(86400000), "1 day 0 hr 0 min" - ) - self.assertEqual( - Formatter._format_duration_english(1001001001), "11 day 14 hr 3 min" - ) - self.assertEqual( - Formatter._format_duration_english(1001001001001), "11,585 day 15 hr 50 min" - ) diff --git a/src/tests/test_file_updater.py b/src/tests/test_file_updater.py deleted file mode 100644 index 212cddf77c828..0000000000000 --- a/src/tests/test_file_updater.py +++ /dev/null @@ -1,438 +0,0 @@ -#!/usr/bin/env python3 - -import datetime -import pathlib -import tempfile -import textwrap -from typing import Optional, TypeVar -from unittest import IsolatedAsyncioTestCase -from unittest.mock import AsyncMock, Mock, call, patch, sentinel - -from alias import Alias -from file_manager import FileManager, MalformedAliasError, UnexpectedFilesError -from file_updater import FileUpdater -from plants.unittest_utils import UnittestUtils -from playlist_id import PlaylistID -from playlist_types import Owner, Playlist -from spotify import FailedRequestError - -T = TypeVar("T") - - -class TestUpdateFiles(IsolatedAsyncioTestCase): - async def asyncSetUp(self) -> None: - self.mock_get_env = UnittestUtils.patch( - self, - "plants.environment.Environment.get_env", - new_callable=Mock, - ) - self.mock_get_env.side_effect = lambda name: { - "SPOTIFY_CLIENT_ID": "client_id", - "SPOTIFY_CLIENT_SECRET": "client_secret", - }[name] - - self.mock_spotify_class = UnittestUtils.patch( - self, - "file_updater.Spotify", - new_callable=Mock, - ) - self.mock_spotify_class.get_access_token = AsyncMock() - self.mock_spotify_class.return_value.shutdown = AsyncMock() - - self.mock_update_files_impl = UnittestUtils.patch( - self, "file_updater.FileUpdater._update_files_impl", new_callable=AsyncMock - ) - - async def test_error(self) -> None: - self.mock_update_files_impl.side_effect = Exception - with self.assertRaises(Exception): - await FileUpdater.update_files( - now=sentinel.now, - file_manager=sentinel.file_manager, - auto_register=sentinel.auto_register, - update_readme=sentinel.update_readme, - ) - self.mock_spotify_class.return_value.shutdown.assert_called_once_with() - self.mock_spotify_class.return_value.shutdown.assert_awaited_once() - - async def test_success(self) -> None: - await FileUpdater.update_files( - now=sentinel.now, - file_manager=sentinel.file_manager, - auto_register=sentinel.auto_register, - update_readme=sentinel.update_readme, - ) - self.mock_get_env.assert_has_calls( - [ - call("SPOTIFY_CLIENT_ID"), - call("SPOTIFY_CLIENT_SECRET"), - ] - ) - self.mock_spotify_class.get_access_token.assert_called_once_with( - client_id="client_id", - client_secret="client_secret", - ) - self.mock_spotify_class.get_access_token.assert_awaited_once() - self.mock_spotify_class.assert_called_once_with( - self.mock_spotify_class.get_access_token.return_value - ) - self.mock_update_files_impl.assert_called_once_with( - now=sentinel.now, - file_manager=sentinel.file_manager, - auto_register=sentinel.auto_register, - update_readme=sentinel.update_readme, - spotify=self.mock_spotify_class.return_value, - ) - self.mock_spotify_class.return_value.shutdown.assert_called_once_with() - self.mock_spotify_class.return_value.shutdown.assert_awaited_once() - - -class TestUpdateFilesImpl(IsolatedAsyncioTestCase): - async def asyncSetUp(self) -> None: - self.now = datetime.datetime(2021, 12, 15) - self.temp_dir = tempfile.TemporaryDirectory() - self.repo_dir = pathlib.Path(self.temp_dir.name) - self.playlists_dir = self.repo_dir / "playlists" - self.file_manager = FileManager(self.playlists_dir) - - # Mock the GitUtils methods - UnittestUtils.patch( - self, - "git_utils.GitUtils.any_uncommitted_changes", - new_callable=lambda: Mock(return_value=False), - ) - UnittestUtils.patch( - self, - "git_utils.GitUtils.get_last_commit_content", - new_callable=lambda: Mock(return_value=[]), - ) - - # Mock the spotify class - self.mock_spotify_class = UnittestUtils.patch( - self, - "file_updater.Spotify", - new_callable=Mock, - ) - - # Use AsyncMocks for async methods - self.mock_spotify = self.mock_spotify_class.return_value - self.mock_spotify.get_spotify_user_playlist_ids = AsyncMock() - self.mock_spotify.get_featured_playlist_ids = AsyncMock() - self.mock_spotify.get_category_playlist_ids = AsyncMock() - self.mock_spotify.get_playlist = AsyncMock() - - async def asyncTearDown(self) -> None: - self.temp_dir.cleanup() - - async def _update_files_impl( - self, auto_register: bool = False, update_readme: bool = False - ) -> None: - await FileUpdater._update_files_impl( - now=self.now, - file_manager=self.file_manager, - auto_register=auto_register, - update_readme=update_readme, - spotify=self.mock_spotify, - ) - - @classmethod - def _helper( - cls, - playlist_id: PlaylistID, - original_name: str, - num_followers: int, - ) -> Playlist: - return Playlist( - url=f"url_{playlist_id}", - original_name=original_name, - unique_name=original_name, - description="description", - tracks=[], - snapshot_id="snapshot_id", - num_followers=num_followers, - owner=Owner( - url="owner_url", - name="owner_name", - ), - ) - - @classmethod - def _fake_get_playlist( - cls, playlist_id: PlaylistID, *, alias: Optional[Alias] - ) -> Playlist: - return cls._helper( - playlist_id=playlist_id, - original_name=alias or f"name_{playlist_id}", - num_followers=0, - ) - - async def test_empty(self) -> None: - names = ["registry", "plain", "pretty", "cumulative"] - for name in names: - self.assertFalse((self.playlists_dir / name).exists()) - await self._update_files_impl() - for name in names: - self.assertTrue((self.playlists_dir / name).exists()) - # Double check exist_ok = True - await self._update_files_impl() - - async def test_auto_register(self) -> None: - self.mock_spotify.get_spotify_user_playlist_ids.return_value = {"a", "d"} - self.mock_spotify.get_featured_playlist_ids.return_value = {"b", "d"} - self.mock_spotify.get_category_playlist_ids.return_value = {"c", "d"} - self.mock_spotify.get_playlist.side_effect = self._fake_get_playlist - for name in "abcd": - self.assertFalse((self.playlists_dir / "registry" / name).exists()) - await self._update_files_impl(auto_register=True) - for name in "abcd": - self.assertTrue((self.playlists_dir / "registry" / name).exists()) - - async def test_fixup_aliases(self) -> None: - self.mock_spotify.get_playlist.side_effect = self._fake_get_playlist - registry_dir = self.playlists_dir / "registry" - registry_dir.mkdir(parents=True) - alias_file = registry_dir / "foo" - with open(alias_file, "w") as f: - f.write("\n") - with open(alias_file, "r") as f: - self.assertTrue(f.read()) - await self._update_files_impl() - with open(alias_file, "r") as f: - self.assertFalse(f.read()) - - async def test_invalid_aliases(self) -> None: - registry_dir = self.playlists_dir / "registry" - registry_dir.mkdir(parents=True) - alias_file = registry_dir / "foo" - for malformed_alias in ["\n\n", "a\nc", " \n"]: - with open(alias_file, "w") as f: - f.write(malformed_alias) - with self.assertRaises(MalformedAliasError): - await self._update_files_impl() - - async def test_good_alias(self) -> None: - self.mock_spotify.get_playlist.side_effect = self._fake_get_playlist - registry_dir = self.playlists_dir / "registry" - registry_dir.mkdir(parents=True) - with open(registry_dir / "foo", "w") as f: - f.write("alias") - await self._update_files_impl() - self.mock_spotify.get_playlist.assert_called_once_with("foo", alias="alias") - with open(self.playlists_dir / "plain" / "foo", "r") as f: - lines = f.read().splitlines() - self.assertEqual(lines[0], "alias") - - async def test_duplicate_playlist_names(self) -> None: - self.mock_spotify.get_playlist.side_effect = [ - self._helper( - playlist_id=PlaylistID("a"), original_name="name", num_followers=1 - ), - self._helper( - playlist_id=PlaylistID("b"), original_name="name", num_followers=2 - ), - self._helper( - playlist_id=PlaylistID("c"), original_name="name", num_followers=2 - ), - self._helper( - playlist_id=PlaylistID("d"), original_name="name (3)", num_followers=0 - ), - self._helper( - playlist_id=PlaylistID("e"), original_name="name (3)", num_followers=0 - ), - self._helper( - playlist_id=PlaylistID("f"), - original_name="name (3) (2)", - num_followers=1, - ), - ] - registry_dir = self.playlists_dir / "registry" - registry_dir.mkdir(parents=True) - for playlist_id in "abcdef": - (registry_dir / playlist_id).touch() - await self._update_files_impl() - for playlist_id, name in [ - ("b", "name"), - ("c", "name (2)"), - ("d", "name (3)"), - ("f", "name (3) (2)"), - ("e", "name (3) (3)"), - ("a", "name (4)"), - ]: - with open(self.playlists_dir / "plain" / playlist_id, "r") as f: - lines = f.read().splitlines() - self.assertEqual(lines[0], name) - - async def test_unexpected_files(self) -> None: - self.mock_spotify.get_playlist.side_effect = self._fake_get_playlist - for directory in ["registry", "plain", "pretty", "cumulative"]: - (self.playlists_dir / directory).mkdir(parents=True) - (self.playlists_dir / "registry" / "foo").touch() - for directory, filename in [ - ("plain", "bar"), - ("plain", "foo.md"), - ("plain", "foo.json"), - ("pretty", "foo"), - ("pretty", "bar.md"), - ("pretty", "bar.json"), - ("cumulative", "foo"), - ("cumulative", "bar.md"), - ("cumulative", "bar.json"), - ]: - path = self.playlists_dir / directory / filename - path.touch() - with self.assertRaises(UnexpectedFilesError): - await self._update_files_impl() - path.unlink() - - # Patch the logger to suppress log spew - @patch("file_updater.logger") - async def test_readme_and_metadata_json(self, mock_logger: Mock) -> None: - # +-------------------+---+---+---+---+ - # | Criteria | a | b | c | d | - # +-------------------+---+---+---+---+ - # | Fetch succeeds | 1 | 1 | 0 | 0 | - # | Has existing data | 1 | 0 | 1 | 0 | - # +-------------------+---+---+---+---+ - - self.mock_spotify.get_playlist.side_effect = UnittestUtils.side_effect( - [ - self._fake_get_playlist(PlaylistID("a"), alias=None), - self._fake_get_playlist(PlaylistID("b"), alias=None), - FailedRequestError(), - FailedRequestError(), - ] - ) - - registry_dir = self.playlists_dir / "registry" - registry_dir.mkdir(parents=True) - for playlist_id in "abcd": - (registry_dir / playlist_id).touch() - - metadata_dir = self.playlists_dir / "metadata" - metadata_dir.mkdir(parents=True) - - pretty_dir = self.playlists_dir / "pretty" - pretty_dir.mkdir(parents=True) - for playlist_id in "ac": - path = pretty_dir / f"{playlist_id}.json" - playlist = self._helper( - playlist_id=PlaylistID(playlist_id), - original_name=f" name_{playlist_id} ", # ensure whitespace is stripped - num_followers=0, - ) - playlist_json = playlist.to_json() - with open(path, "w") as f: - f.write(playlist_json) - - with open(self.repo_dir / "README.md", "w") as f: - f.write( - textwrap.dedent( - """\ - prev content - - ## Playlists \\(1\\) - - - [fizz](buzz) - """ - ) - ) - await self._update_files_impl(update_readme=True) - with open(self.repo_dir / "README.md", "r") as f: - content = f.read() - self.assertEqual( - content, - textwrap.dedent( - """\ - prev content - - ## Playlists \\(3\\) - - - [name\\_a](/playlists/pretty/a.md) - - [name\\_b](/playlists/pretty/b.md) - - [name\\_c](/playlists/pretty/c.md) - """ - ), - ) - for path in [ - self.playlists_dir / "metadata.json", - metadata_dir / "metadata-full.json", - ]: - with open(path, "r") as f: - content = f.read() - self.assertEqual( - content, - textwrap.dedent( - """\ - { - "a": { - "description": "description", - "num_followers": 0, - "original_name": "name_a", - "owner": { - "name": "owner_name", - "url": "owner_url" - }, - "snapshot_id": "snapshot_id", - "unique_name": "name_a", - "url": "url_a" - }, - "b": { - "description": "description", - "num_followers": 0, - "original_name": "name_b", - "owner": { - "name": "owner_name", - "url": "owner_url" - }, - "snapshot_id": "snapshot_id", - "unique_name": "name_b", - "url": "url_b" - }, - "c": { - "description": "description", - "num_followers": 0, - "original_name": " name_c ", - "owner": { - "name": "owner_name", - "url": "owner_url" - }, - "snapshot_id": "snapshot_id", - "unique_name": " name_c ", - "url": "url_c" - } - } - """ - ), - ) - - with open(metadata_dir / "metadata-compact.json", "r") as f: - content = f.read() - self.assertEqual(content, '{"a":"name_a","b":"name_b","c":" name_c "}\n') - - with open(metadata_dir / "metadata-full.json.br", "rb") as f: - content = f.read() - self.assertEqual( - content, - ( - b"\x1b\x11\x03\x00\x1c\x07v,cz\xbe\xb1u'\xa6nK\xf5,$c\x1b\xdb-\x82" - b"\x1eQL&\x88H_\xea\xb0(L\xdd\xbc\xe7gYjc76\x8e\r\x9e>X\xf4\xc0\tQ" - b"\x17\x95n\x8b\x04\xf3W\x04\xe2\x8d;\xffH\xe0\xc6\x94z\x01\x9c\x1cu" - b"\xd4[Da\x03\xcd\xa76\xc9q\x04\x0ezG\xa5r\xd4u\xaf\x9eB\xb9S$f\xff" - b"\xe8\x9dy\x98\x81sz\xb9\xf9\x966D7\x1c\x1dL2Jl&4\x8dn\xc0\xd5\x8dB" - b"\xa5?\x9a\xf7\xf0\x0e&\x9a\x11?/\xc9\x87\xfc>C\xf4<\x81\x07\xb3j" - ), - ) - - with open(metadata_dir / "metadata-compact.json.br", "rb") as f: - content = f.read() - self.assertEqual( - content, - ( - b"\x1b)\x00\xf8\x1d\tv\xac\x89\xbb\xf348a\x08tc\xa9>7\xd9\x8fQC" - b"\x11C\xa4Xt:\x81EDqH\x15\xd0\xc0\x1e\x97\xe9\x82c\xa2\x14=" - ), - ) - - async def test_success(self) -> None: - # TODO - pass diff --git a/src/tests/test_playlist_id.py b/src/tests/test_playlist_id.py deleted file mode 100644 index 8a015277b6d77..0000000000000 --- a/src/tests/test_playlist_id.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 - -import string -from unittest import TestCase - -from playlist_id import InvalidPlaylistIDError, PlaylistID - - -class TestPlaylistID(TestCase): - def test_invalid(self) -> None: - for c in string.punctuation + string.whitespace: - with self.assertRaises(InvalidPlaylistIDError): - PlaylistID(c) - - def test_valid(self) -> None: - PlaylistID(string.ascii_letters + string.digits) - - def test_equal(self) -> None: - self.assertEqual(PlaylistID("foo"), PlaylistID("foo")) - - def test_str(self) -> None: - self.assertEqual(str(PlaylistID("foo")), "foo") diff --git a/src/tests/test_playlist_types.py b/src/tests/test_playlist_types.py deleted file mode 100644 index 8db13309b4b0f..0000000000000 --- a/src/tests/test_playlist_types.py +++ /dev/null @@ -1,540 +0,0 @@ -#!/usr/bin/env python3 - -import datetime -import textwrap -from unittest import TestCase - -from playlist_types import ( - Album, - Artist, - CumulativePlaylist, - CumulativeTrack, - Owner, - Playlist, - Track, -) - - -class TestTrackGetID(TestCase): - def test_success(self) -> None: - track = Track( - url="https://open.spotify.com/track/abc123", - name="", - album=Album( - url="", - name="", - ), - artists=[], - duration_ms=0, - added_at=None, - ) - self.assertEqual(track.get_id(), "abc123") - - -class TestPlaylistToAndFromJSON(TestCase): - def test_success(self) -> None: - playlist = Playlist( - url="playlist_url", - original_name="playlist_original_name", - unique_name="playlist_unique_name", - description="description", - tracks=[ - Track( - url="track_url", - name="track_name", - album=Album( - url="album_url", - name="album_name", - ), - artists=[ - Artist( - url="artist_url", - name="artist_name", - ) - ], - duration_ms=1234, - added_at=datetime.datetime(2021, 12, 25, 23, 59, 59), - ), - Track( - url="", - name="", - album=Album( - url="", - name="", - ), - artists=[ - Artist( - url="", - name="", - ) - ], - duration_ms=0, - added_at=None, - ), - ], - snapshot_id="snapshot_id", - num_followers=999, - owner=Owner( - url="owner_url", - name="owner_name", - ), - ) - playlist_json = playlist.to_json() - self.assertEqual( - playlist_json, - textwrap.dedent( - """\ - { - "description": "description", - "num_followers": 999, - "original_name": "playlist_original_name", - "owner": { - "name": "owner_name", - "url": "owner_url" - }, - "snapshot_id": "snapshot_id", - "tracks": [ - { - "added_at": "2021-12-25 23:59:59", - "album": { - "name": "album_name", - "url": "album_url" - }, - "artists": [ - { - "name": "artist_name", - "url": "artist_url" - } - ], - "duration_ms": 1234, - "name": "track_name", - "url": "track_url" - }, - { - "added_at": null, - "album": { - "name": "", - "url": "" - }, - "artists": [ - { - "name": "", - "url": "" - } - ], - "duration_ms": 0, - "name": "", - "url": "" - } - ], - "unique_name": "playlist_unique_name", - "url": "playlist_url" - }""" - ), - ) - self.assertEqual(playlist, Playlist.from_json(playlist_json)) - - -class TestCumulativeTrackGetID(TestCase): - def test_success(self) -> None: - track = CumulativeTrack( - url="https://open.spotify.com/track/abc123", - name="", - album=Album( - url="", - name="", - ), - artists=[], - duration_ms=0, - date_added=datetime.date(1970, 1, 1), - date_added_asterisk=False, - date_removed=None, - ) - self.assertEqual(track.get_id(), "abc123") - - -class TestCumulativePlaylistUpdate(TestCase): - def test_just_old_data(self) -> None: - date_first_scraped = datetime.date(2000, 1, 1) - date_added = datetime.date(2000, 1, 2) - other = datetime.date(2000, 1, 3) - today = datetime.date(2000, 1, 4) - for date_added_asterisk in [False, True]: - for date_removed, updated_date_removed in [ - (other, other), - (None, today), - ]: - self.assertEqual( - CumulativePlaylist( - url="old_playlist_url", - name="old_playlist_name", - description="old_description", - tracks=[ - CumulativeTrack( - url="https://open.spotify.com/track/abc123", - name="old_track_name", - album=Album( - url="old_album_url", - name="old_album_name", - ), - artists=[ - Artist( - url="old_artist_url", - name="old_artist_name", - ) - ], - duration_ms=1234, - date_added=date_added, - date_added_asterisk=date_added_asterisk, - date_removed=date_removed, - ), - ], - date_first_scraped=date_first_scraped, - ).update( - today=today, - playlist=Playlist( - url="new_playlist_url", - original_name="new_playlist_name", - unique_name="new_playlist_name", - description="new_description", - tracks=[], - snapshot_id="new_snapshot_id", - num_followers=999, - owner=Owner( - url="new_owner_url", - name="new_owner_name", - ), - ), - ), - CumulativePlaylist( - url="new_playlist_url", - name="new_playlist_name", - description="new_description", - tracks=[ - CumulativeTrack( - url="https://open.spotify.com/track/abc123", - name="old_track_name", - album=Album( - url="old_album_url", - name="old_album_name", - ), - artists=[ - Artist( - url="old_artist_url", - name="old_artist_name", - ) - ], - duration_ms=1234, - date_added=date_added, - date_added_asterisk=date_added_asterisk, - date_removed=updated_date_removed, - ), - ], - date_first_scraped=date_first_scraped, - ), - ) - - def test_just_new_data(self) -> None: - date_first_scraped = datetime.date(2000, 1, 1) - time_added = datetime.datetime(2000, 1, 2) - date_added = time_added.date() - today = datetime.date(2000, 1, 3) - for added_at, updated_date_added in [ - (time_added, date_added), - (None, today), - ]: - self.assertEqual( - CumulativePlaylist( - url="old_playlist_url", - name="old_playlist_name", - description="old_description", - tracks=[], - date_first_scraped=date_first_scraped, - ).update( - today=today, - playlist=Playlist( - url="new_playlist_url", - original_name="new_playlist_name", - unique_name="new_playlist_name", - description="new_description", - tracks=[ - Track( - url="https://open.spotify.com/track/abc123", - name="new_track_name", - album=Album( - url="new_album_url", - name="new_album_name", - ), - artists=[ - Artist( - url="new_artist_url", - name="new_artist_name", - ) - ], - duration_ms=1234, - added_at=added_at, - ), - ], - snapshot_id="new_snapshot_id", - num_followers=999, - owner=Owner( - url="new_owner_url", - name="new_owner_name", - ), - ), - ), - CumulativePlaylist( - url="new_playlist_url", - name="new_playlist_name", - description="new_description", - tracks=[ - CumulativeTrack( - url="https://open.spotify.com/track/abc123", - name="new_track_name", - album=Album( - url="new_album_url", - name="new_album_name", - ), - artists=[ - Artist( - url="new_artist_url", - name="new_artist_name", - ) - ], - duration_ms=1234, - date_added=updated_date_added, - date_added_asterisk=False, - date_removed=None, - ), - ], - date_first_scraped=date_first_scraped, - ), - ) - - def test_both_old_and_new_data(self) -> None: - date_first_scraped = datetime.date(2000, 1, 1) - date_removed = datetime.date(2000, 1, 4) - today = datetime.date(2000, 1, 5) - for ( - old_date_added, - new_date_added, - updated_date_added, - updated_date_added_asterisk, - ) in [ - ( - # old_date_added < new_date_added - datetime.date(2000, 1, 2), - datetime.date(2000, 1, 3), - # old_date_added and asterisk are preserved - datetime.date(2000, 1, 2), - True, - ), - ( - # old_date_added == new_date_added - datetime.date(2000, 1, 2), - datetime.date(2000, 1, 2), - # old_date_added and asterisk are preserved - datetime.date(2000, 1, 2), - True, - ), - ( - # old_date_added > new_date_added - datetime.date(2000, 1, 3), - datetime.date(2000, 1, 2), - # old_date_added and asterisk are replaced - datetime.date(2000, 1, 2), - False, - ), - ]: - self.assertEqual( - CumulativePlaylist( - url="old_playlist_url", - name="old_playlist_name", - description="old_description", - tracks=[ - CumulativeTrack( - url="https://open.spotify.com/track/abc123", - name="old_track_name", - album=Album( - url="old_album_url", - name="old_album_name", - ), - artists=[ - Artist( - url="old_artist_url", - name="old_artist_name", - ) - ], - duration_ms=1234, - date_added=old_date_added, - date_added_asterisk=True, - date_removed=date_removed, - ), - ], - date_first_scraped=date_first_scraped, - ).update( - today=today, - playlist=Playlist( - url="new_playlist_url", - original_name="new_playlist_name", - unique_name="new_playlist_name", - description="new_description", - tracks=[ - Track( - url="https://open.spotify.com/track/abc123", - name="new_track_name", - album=Album( - url="new_album_url", - name="new_album_name", - ), - artists=[ - Artist( - url="new_artist_url", - name="new_artist_name", - ) - ], - duration_ms=5678, - added_at=datetime.datetime( - new_date_added.year, - new_date_added.month, - new_date_added.day, - ), - ), - ], - snapshot_id="new_snapshot_id", - num_followers=999, - owner=Owner( - url="new_owner_url", - name="new_owner_name", - ), - ), - ), - CumulativePlaylist( - url="new_playlist_url", - name="new_playlist_name", - description="new_description", - tracks=[ - CumulativeTrack( - url="https://open.spotify.com/track/abc123", - name="new_track_name", - album=Album( - url="new_album_url", - name="new_album_name", - ), - artists=[ - Artist( - url="new_artist_url", - name="new_artist_name", - ) - ], - duration_ms=5678, - date_added=updated_date_added, - date_added_asterisk=updated_date_added_asterisk, - date_removed=None, - ), - ], - date_first_scraped=date_first_scraped, - ), - ) - - -class TestCumulativePlaylistToAndFromJSON(TestCase): - def test_success(self) -> None: - cumulative_playlist = CumulativePlaylist( - url="playlist_url", - name="playlist_name", - description="description", - tracks=[ - CumulativeTrack( - url="track_url", - name="track_name", - album=Album( - url="album_url", - name="album_name", - ), - artists=[ - Artist( - url="artist_url", - name="artist_name", - ) - ], - duration_ms=100001, - date_added=datetime.date(2021, 12, 27), - date_added_asterisk=False, - date_removed=datetime.date(2021, 12, 29), - ), - CumulativeTrack( - url="", - name="", - album=Album( - url="", - name="", - ), - artists=[ - Artist( - url="", - name="", - ) - ], - duration_ms=0, - date_added=datetime.date(2021, 12, 25), - date_added_asterisk=True, - date_removed=None, - ), - ], - date_first_scraped=datetime.date(2021, 12, 25), - ) - cumulative_playlist_json = cumulative_playlist.to_json() - self.assertEqual( - cumulative_playlist_json, - textwrap.dedent( - """\ - { - "date_first_scraped": "2021-12-25", - "description": "description", - "name": "playlist_name", - "tracks": [ - { - "album": { - "name": "album_name", - "url": "album_url" - }, - "artists": [ - { - "name": "artist_name", - "url": "artist_url" - } - ], - "date_added": "2021-12-27", - "date_added_asterisk": false, - "date_removed": "2021-12-29", - "duration_ms": 100001, - "name": "track_name", - "url": "track_url" - }, - { - "album": { - "name": "", - "url": "" - }, - "artists": [ - { - "name": "", - "url": "" - } - ], - "date_added": "2021-12-25", - "date_added_asterisk": true, - "date_removed": null, - "duration_ms": 0, - "name": "", - "url": "" - } - ], - "url": "playlist_url" - }""" - ), - ) - self.assertEqual( - cumulative_playlist, CumulativePlaylist.from_json(cumulative_playlist_json) - ) diff --git a/src/tests/test_spotify.py b/src/tests/test_spotify.py deleted file mode 100644 index 73ecd03b54854..0000000000000 --- a/src/tests/test_spotify.py +++ /dev/null @@ -1,730 +0,0 @@ -#!/usr/bin/env python3 - -from __future__ import annotations - -import asyncio -import copy -import datetime -from unittest import IsolatedAsyncioTestCase -from unittest.mock import AsyncMock, Mock, call, patch - -import aiohttp - -from alias import Alias -from plants.unittest_utils import UnittestUtils -from playlist_id import PlaylistID -from playlist_types import Album, Artist, Owner, Playlist, Track -from spotify import ( - FailedRequestError, - InvalidDataError, - RetryBudgetExceededError, - Spotify, -) - - -class MockSession(AsyncMock): - @classmethod - async def create(cls) -> MockSession: - mock_session = MockSession() - await mock_session._init() - return mock_session - - async def _init(self) -> None: - # AsyncMock objects beget other AsyncMock objects, but these methods - # are synchronous so we need initialize them explicitly - self.get = Mock(return_value=AsyncMock()) - self.post = Mock(return_value=AsyncMock()) - # Allow MockSession objects to be used as async context managers - async with self as session: - session.get = self.get - session.post = self.post - - -class SpotifyTestCase(IsolatedAsyncioTestCase): - async def asyncSetUp(self) -> None: - self.mock_session = await MockSession.create() - self.mock_get_session = UnittestUtils.patch( - self, - "spotify.Spotify._get_session", - # new_callable returns the replacement for get_session - new_callable=lambda: Mock(return_value=self.mock_session), - ) - self.mock_sleep = UnittestUtils.patch( - self, - "spotify.Spotify._sleep", - new_callable=AsyncMock, - ) - - -class TestGetWithRetry(SpotifyTestCase): - # Patch the logger to suppress log spew - @patch("spotify.logger") - async def test_exception(self, mock_logger: Mock) -> None: - for type_ in [ - aiohttp.client_exceptions.ClientOSError, - asyncio.exceptions.TimeoutError, - ]: - self.mock_session.get.side_effect = type_ - spotify = Spotify("token") - with self.assertRaises(RetryBudgetExceededError): - await spotify.get_playlist(PlaylistID("abc123"), alias=None) - - # Patch the logger to suppress log spew - @patch("spotify.logger") - async def test_invalid_response(self, mock_logger: Mock) -> None: - for data in ["", {}]: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = data - spotify = Spotify("token") - with self.assertRaises(RetryBudgetExceededError): - await spotify.get_playlist(PlaylistID("abc123"), alias=None) - - async def test_failed_request(self) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = {"error": ""} - spotify = Spotify("token") - with self.assertRaises(FailedRequestError): - await spotify.get_playlist(PlaylistID("abc123"), alias=None) - - # Patch the logger to suppress log spew - @patch("spotify.logger") - async def test_server_unavailable(self, mock_logger: Mock) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.status = 500 - spotify = Spotify("token") - with self.assertRaises(RetryBudgetExceededError): - await spotify._get_with_retry("href") - - # Patch the logger to suppress log spew - @patch("spotify.logger") - async def test_transient_server_error(self, mock_logger: Mock) -> None: - mock_responses = [AsyncMock(), AsyncMock()] - async with mock_responses[0] as mock_response: - mock_response.status = 500 - async with mock_responses[1] as mock_response: - mock_response.json.return_value = {"items": [], "next": ""} - self.mock_session.get.side_effect = mock_responses - spotify = Spotify("token") - await spotify._get_with_retry("href") - self.assertEqual(self.mock_session.get.call_count, 2) - self.mock_sleep.assert_called_once_with(1) - - # Patch the logger to suppress log spew - @patch("spotify.logger") - async def test_rate_limited(self, mock_logger: Mock) -> None: - mock_responses = [AsyncMock(), AsyncMock()] - async with mock_responses[0] as mock_response: - mock_response.status = 429 - mock_response.headers = {"Retry-After": 4.2} - async with mock_responses[1] as mock_response: - mock_response.json.return_value = {"items": [], "next": ""} - self.mock_session.get.side_effect = mock_responses - spotify = Spotify("token") - await spotify._get_with_retry("href") - self.assertEqual(self.mock_session.get.call_count, 2) - self.mock_sleep.assert_called_once_with(5) - - -class TestShutdown(SpotifyTestCase): - async def test_success(self) -> None: - spotify = Spotify("token") - await spotify.shutdown() - self.mock_session.close.assert_called_once() - self.mock_sleep.assert_called_once_with(0) - - -class TestGetSpotifyUserPlaylistIDs(SpotifyTestCase): - async def test_invalid_data(self) -> None: - for data in [ - {"items": None}, - {"items": [None]}, - {"items": [{}]}, - {"items": [{"id": None}]}, - ]: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = data - spotify = Spotify("token") - self.assertEqual( - await spotify.get_spotify_user_playlist_ids(), - set(), - ) - - async def test_success(self) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.side_effect = [ - { - "items": [{"id": "a"}, {"id": "b"}], - "next": "next_url", - }, - { - "items": [{"id": "c"}, {"id": "d"}], - "next": "", - }, - ] - spotify = Spotify("token") - playlist_ids = await spotify.get_spotify_user_playlist_ids() - self.assertEqual(playlist_ids, {PlaylistID(x) for x in "abcd"}) - self.mock_session.get.assert_has_calls( - [ - call("https://api.spotify.com/v1/users/spotify/playlists?limit=50"), - call("next_url"), - ] - ) - - -class TestGetFeaturedPlaylistIDs(SpotifyTestCase): - async def test_invalid_data(self) -> None: - for data in [ - {"playlists": None}, - {"playlists": {}}, - {"playlists": {"items": None}}, - {"playlists": {"items": [None]}}, - {"playlists": {"items": [{}]}}, - {"playlists": {"items": [{"id": None}]}}, - ]: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = data - spotify = Spotify("token") - self.assertEqual( - await spotify.get_featured_playlist_ids(), - set(), - ) - - async def test_success(self) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.side_effect = [ - { - "playlists": { - "items": [{"id": "a"}, {"id": "b"}], - "next": "next_url", - }, - }, - { - "playlists": { - "items": [{"id": "c"}, {"id": "d"}], - "next": "", - }, - }, - ] - spotify = Spotify("token") - playlist_ids = await spotify.get_featured_playlist_ids() - self.assertEqual(playlist_ids, {PlaylistID(x) for x in "abcd"}) - self.mock_session.get.assert_has_calls( - [ - call("https://api.spotify.com/v1/browse/featured-playlists?limit=50"), - call("next_url"), - ] - ) - - -class TestGetCategoryPlaylistIDs(SpotifyTestCase): - async def test_invalid_data(self) -> None: - for side_effect in [ - # Invalid categories response - [{"categories": None}], - [{"categories": {}}], - [{"categories": {"items": None}}], - [{"categories": {"items": [None]}}], - [{"categories": {"items": [{}]}}], - [{"categories": {"items": [{"id": None}]}}], - # Valid categories response, invalid playlists response - [ - {"categories": {"items": [{"id": "a"}]}}, - {"playlists": None}, - ], - [ - {"categories": {"items": [{"id": "a"}]}}, - {"playlists": {}}, - ], - [ - {"categories": {"items": [{"id": "a"}]}}, - {"playlists": {"items": None}}, - ], - [ - {"categories": {"items": [{"id": "a"}]}}, - {"playlists": {"items": [None]}}, - ], - [ - {"categories": {"items": [{"id": "a"}]}}, - {"playlists": {"items": [{}]}}, - ], - [ - {"categories": {"items": [{"id": "a"}]}}, - {"playlists": {"items": [{"id": None}]}}, - ], - ]: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.side_effect = side_effect - spotify = Spotify("token") - self.assertEqual( - await spotify.get_category_playlist_ids(), - set(), - ) - - async def test_success(self) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.side_effect = UnittestUtils.side_effect( - [ - { - "categories": { - "items": [{"id": "category_1"}, {"id": "category_2"}], - "next": "next_category_url", - }, - }, - # Use category_3 to simulate FailedRequestError - { - "categories": { - "items": [{"id": "category_3"}], - "next": "", - }, - }, - # First playlists belonging to category_1 - { - "playlists": { - "items": [{"id": "a"}, {"id": "b"}], - "next": "next_playlists_url", - }, - }, - # More playlists belonging to category_1 - { - "playlists": { - "items": [{"id": "c"}], - "next": "", - }, - }, - # All playlists belonging to category_2 - { - "playlists": { - "items": [{"id": "d"}], - "next": "", - }, - }, - # category_3 doesn't actually exist - FailedRequestError(), - ] - ) - spotify = Spotify("token") - playlist_ids = await spotify.get_category_playlist_ids() - self.assertEqual(playlist_ids, {PlaylistID(x) for x in "abcd"}) - self.mock_session.get.assert_has_calls( - [ - call("https://api.spotify.com/v1/browse/categories?limit=50"), - call("next_category_url"), - call( - "https://api.spotify.com/v1/browse/categories/category_1/playlists?" - "limit=50" - ), - call("next_playlists_url"), - call( - "https://api.spotify.com/v1/browse/categories/category_2/playlists?" - "limit=50" - ), - call( - "https://api.spotify.com/v1/browse/categories/category_3/playlists?" - "limit=50" - ), - ] - ) - - -class TestGetPlaylist(SpotifyTestCase): - @patch("spotify.Spotify._get_tracks", new_callable=AsyncMock) - # pyre-fixme[30] - async def test_invalid_data(self, mock_get_tracks: Mock) -> None: - mock_get_tracks.return_value = [] - valid_data = { - "name": "playlist_name", - "description": "playlist_description", - "external_urls": { - "spotify": "playlist_url", - }, - "snapshot_id": "playlist_snapshot_id", - "followers": { - "total": 999, - }, - "owner": { - "display_name": "owner_name", - "external_urls": { - "spotify": "owner_url", - }, - }, - } - overrides = { - "name": ["", " ", "\n", None, 1], - "description": [None, 1], - "external_urls": [None, 1], - "external_urls.spotify": [1], - "snapshot_id": [None, 1], - "followers": [None, 1], - "followers.total": ["a"], - "owner": [None, 1], - "owner.display_name": [None, 1], - "owner.external_urls": [None, 1], - "owner.external_urls.spotify": [1], - } - for key, values in overrides.items(): - for value in values: - data = copy.deepcopy(valid_data) - ref = data - parts = [(int(x) if x.isdigit() else x) for x in key.split(".")] - for name in parts[:-1]: - ref = ref[name] - ref[parts[-1]] = value - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = data - spotify = Spotify("token") - with self.assertRaises(InvalidDataError): - await spotify.get_playlist(PlaylistID("abc123"), alias=None) - - @patch("spotify.Spotify._get_tracks", new_callable=AsyncMock) - async def test_nonempty_alias(self, mock_get_tracks: AsyncMock) -> None: - mock_get_tracks.return_value = [] - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = { - "name": "playlist_name", - "description": "", - "external_urls": {}, - "snapshot_id": "", - "followers": { - "total": 0, - }, - "owner": { - "display_name": "owner_name", - "external_urls": {}, - }, - } - spotify = Spotify("token") - playlist = await spotify.get_playlist( - PlaylistID("abc123"), alias=Alias("alias") - ) - self.assertEqual(playlist.original_name, "alias") - self.assertEqual(playlist.unique_name, "alias") - - @patch("spotify.Spotify._get_tracks", new_callable=AsyncMock) - async def test_success(self, mock_get_tracks: AsyncMock) -> None: - track = Track( - url="track_url", - name="track_name", - album=Album( - url="album_url", - name="album_name", - ), - artists=[ - Artist( - url="artist_url", - name="artist_name", - ) - ], - duration_ms=100, - added_at=datetime.datetime(2021, 12, 25), - ) - mock_get_tracks.return_value = [track] - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = { - "name": "playlist_name", - "description": "playlist_description", - "external_urls": { - "spotify": "playlist_url", - }, - "snapshot_id": "playlist_snapshot_id", - "followers": { - "total": 999, - }, - "owner": { - "display_name": "owner_name", - "external_urls": { - "spotify": "owner_url", - }, - }, - } - spotify = Spotify("token") - playlist = await spotify.get_playlist(PlaylistID("abc123"), alias=None) - self.assertEqual( - playlist, - Playlist( - url="playlist_url", - original_name="playlist_name", - unique_name="playlist_name", - description="playlist_description", - tracks=[track], - snapshot_id="playlist_snapshot_id", - num_followers=999, - owner=Owner( - url="owner_url", - name="owner_name", - ), - ), - ) - - -class TestGetTracks(SpotifyTestCase): - async def test_invalid_data(self) -> None: - valid_data = { - "items": [ - { - "track": { - "duration_ms": 456, - "name": "track_name", - "album": { - "name": "album_name", - "external_urls": { - "spotify": "album_url", - }, - }, - "artists": [ - { - "name": "artist_name", - "external_urls": { - "spotify": "artist_url", - }, - }, - ], - "external_urls": { - "spotify": "track_url", - }, - }, - "added_at": "2021-12-25T00:00:00Z", - }, - ], - "next": "", - } - overrides = { - "items": [None, 1], - "items.0": [None, 1], - "items.0.track": [1], - "items.0.track.external_urls": [None, 1], - "items.0.track.external_urls.spotify": [1], - "items.0.track.name": [None, 1], - "items.0.track.album": [None, 1], - "items.0.track.album.external_urls": [None, 1], - "items.0.track.album.external_urls.spotify": [1], - "items.0.track.album.name": [None, 1], - "items.0.track.artists": [None, 1], - "items.0.track.artists.0": [None, 1], - "items.0.track.artists.0.external_urls": [None, 1], - "items.0.track.artists.0.external_urls.spotify": [1], - "items.0.track.artists.0.name": [None, 1], - "items.0.track.duration_ms": [None, "a"], - "items.0.added_at": [1], - } - for key, values in overrides.items(): - for value in values: - data = copy.deepcopy(valid_data) - ref = data - parts = [(int(x) if x.isdigit() else x) for x in key.split(".")] - for name in parts[:-1]: - ref = ref[name] # pyre-fixme[6] - ref[parts[-1]] = value # pyre-fixme[16] - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = data - spotify = Spotify("token") - with self.assertRaises(InvalidDataError): - await spotify._get_tracks(PlaylistID("abc123")) - - async def test_empty_playlist(self) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = { - "items": [], - "next": "", - } - spotify = Spotify("token") - tracks = await spotify._get_tracks(PlaylistID("abc123")) - self.assertEqual(tracks, []) - - async def test_empty_track(self) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = { - "items": [{"track": {}}], - "next": "", - } - spotify = Spotify("token") - tracks = await spotify._get_tracks(PlaylistID("abc123")) - self.assertEqual(tracks, []) - - # Patch the logger to suppress log spew - @patch("spotify.logger") - async def test_empty_track_url(self, logger: Mock) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = { - "items": [{"track": {"external_urls": {"spotify": ""}}}], - "next": "", - } - spotify = Spotify("token") - tracks = await spotify._get_tracks(PlaylistID("abc123")) - self.assertEqual(tracks, []) - - # Patch the logger to suppress log spew - @patch("spotify.logger") - async def test_missing_info(self, logger: Mock) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = { - "items": [ - { - "track": { - "duration_ms": 123, - "name": "", - "album": { - "name": "", - "external_urls": {}, - }, - "artists": [], - "external_urls": {"spotify": "track_url"}, - }, - "added_at": "1970-01-01T00:00:00Z", - }, - ], - "next": "", - } - spotify = Spotify("token") - tracks = await spotify._get_tracks(PlaylistID("abc123")) - self.assertEqual( - tracks, - [ - Track( - url="track_url", - name="", - album=Album( - url="", - name="", - ), - artists=[], - duration_ms=123, - added_at=None, - ) - ], - ) - - async def test_success(self) -> None: - async with self.mock_session.get.return_value as mock_response: - mock_response.json.return_value = { - "items": [ - { - "track": { - "duration_ms": 456, - "name": "track_name", - "album": { - "name": "album_name", - "external_urls": { - "spotify": "album_url", - }, - }, - "artists": [ - { - "name": "artist_name_1", - "external_urls": { - "spotify": "artist_url_1", - }, - }, - { - "name": "artist_name_2", - "external_urls": { - "spotify": "artist_url_2", - }, - }, - ], - "external_urls": { - "spotify": "track_url", - }, - }, - "added_at": "2021-12-25T00:00:00Z", - }, - ], - "next": "", - } - spotify = Spotify("token") - tracks = await spotify._get_tracks(PlaylistID("abc123")) - self.assertEqual( - tracks, - [ - Track( - url="track_url", - name="track_name", - album=Album( - url="album_url", - name="album_name", - ), - artists=[ - Artist( - name="artist_name_1", - url="artist_url_1", - ), - Artist( - name="artist_name_2", - url="artist_url_2", - ), - ], - duration_ms=456, - added_at=datetime.datetime(2021, 12, 25), - ) - ], - ) - - @patch("spotify.Spotify._get_tracks_href") - async def test_pagination(self, mock_get_tracks_href: Mock) -> None: - mock_get_tracks_href.side_effect = lambda x: x - async with self.mock_session.get.return_value as mock_response: - mock_response.json.side_effect = [ - {"items": [], "next": "b"}, - {"items": [], "next": "c"}, - {"items": [], "next": ""}, - ] - spotify = Spotify("token") - tracks = await spotify._get_tracks(PlaylistID("a")) - self.assertEqual(tracks, []) - self.mock_session.get.assert_has_calls( - [ - call("a"), - call("b"), - call("c"), - ] - ) - - -class TestGetAccessToken(SpotifyTestCase): - async def test_invalid_json(self) -> None: - async with self.mock_session.post.return_value as mock_response: - mock_response.json.side_effect = Exception - with self.assertRaises(InvalidDataError): - await Spotify.get_access_token("client_id", "client_secret") - - async def test_error(self) -> None: - async with self.mock_session.post.return_value as mock_response: - mock_response.json.return_value = { - "error": "something went wrong", - "access_token": "token", - "token_type": "Bearer", - } - with self.assertRaises(InvalidDataError): - await Spotify.get_access_token("client_id", "client_secret") - - async def test_invalid_access_token(self) -> None: - async with self.mock_session.post.return_value as mock_response: - mock_response.json.return_value = { - "access_token": "", - "token_type": "Bearer", - } - with self.assertRaises(InvalidDataError): - await Spotify.get_access_token("client_id", "client_secret") - - async def test_invalid_token_type(self) -> None: - async with self.mock_session.post.return_value as mock_response: - mock_response.json.return_value = { - "access_token": "token", - "token_type": "invalid", - } - with self.assertRaises(InvalidDataError): - await Spotify.get_access_token("client_id", "client_secret") - - async def test_success(self) -> None: - async with self.mock_session.post.return_value as mock_response: - mock_response.json.return_value = { - "access_token": "token", - "token_type": "Bearer", - } - token = await Spotify.get_access_token("client_id", "client_secret") - self.assertEqual(token, "token") - self.mock_session.post.assert_called_once_with( - url="https://accounts.spotify.com/api/token", - data={"grant_type": "client_credentials"}, - headers={"Authorization": "Basic Y2xpZW50X2lkOmNsaWVudF9zZWNyZXQ="}, - ) diff --git a/src/tests/test_url.py b/src/tests/test_url.py deleted file mode 100644 index 07c79f1c10c70..0000000000000 --- a/src/tests/test_url.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python3 - -from unittest import TestCase - -from playlist_id import PlaylistID -from url import URL - -URL.HISTORY_BASE = "base" - - -class TestPlainHistory(TestCase): - def test_success(self) -> None: - self.assertEqual(URL.plain_history(PlaylistID("foo")), "base/plain/foo") - - -class TestPlain(TestCase): - def test_success(self) -> None: - self.assertEqual(URL.plain(PlaylistID("foo")), "/playlists/plain/foo") - - -class TestPretty(TestCase): - def test_success(self) -> None: - self.assertEqual(URL.pretty(PlaylistID("foo")), "/playlists/pretty/foo.md") - - -class TestCumulative(TestCase): - def test_success(self) -> None: - self.assertEqual( - URL.cumulative(PlaylistID("foo")), "/playlists/cumulative/foo.md" - ) diff --git a/src/url.py b/src/url.py deleted file mode 100644 index e98dffb0d9d71..0000000000000 --- a/src/url.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python3 - -from playlist_id import PlaylistID - - -class URL: - - BASE = "/playlists" - HISTORY_BASE = ( - "https://github.githistory.xyz/mackorone/spotify-playlist-archive/" - "blob/main/playlists" - ) - - @classmethod - def plain_history(cls, playlist_id: PlaylistID) -> str: - return cls.HISTORY_BASE + f"/plain/{playlist_id}" - - @classmethod - def plain(cls, playlist_id: PlaylistID) -> str: - return cls.BASE + f"/plain/{playlist_id}" - - @classmethod - def pretty(cls, playlist_id: PlaylistID) -> str: - return cls.BASE + f"/pretty/{playlist_id}.md" - - @classmethod - def cumulative(cls, playlist_id: PlaylistID) -> str: - return cls.BASE + f"/cumulative/{playlist_id}.md"