From 3d1f44be88f928f82d46d8d80e7fbeea4012f886 Mon Sep 17 00:00:00 2001 From: Jakub Suchenek Date: Sun, 26 Nov 2023 04:48:13 +0100 Subject: [PATCH 1/3] Update project structure Split code to be more re-usable. --- add_playlist.py | 104 ---------------------------------- yt_api/__init__.py | 0 yt_api/add_playlist.py | 77 +++++++++++++++++++++++++ yt_api/authentication.py | 53 +++++++++++++++++ yt_api/models/__init__.py | 3 + yt_api/models/localization.py | 11 ++++ yt_api/models/thumbnail.py | 12 ++++ 7 files changed, 156 insertions(+), 104 deletions(-) delete mode 100644 add_playlist.py create mode 100644 yt_api/__init__.py create mode 100644 yt_api/add_playlist.py create mode 100644 yt_api/authentication.py create mode 100644 yt_api/models/__init__.py create mode 100644 yt_api/models/localization.py create mode 100644 yt_api/models/thumbnail.py diff --git a/add_playlist.py b/add_playlist.py deleted file mode 100644 index bd7313b..0000000 --- a/add_playlist.py +++ /dev/null @@ -1,104 +0,0 @@ -""" -This code sample creates a private playlist in the authorizing user's -YouTube channel. -Usage: - python playlist_updates.py --title= --description=<DESCRIPTION> -""" - -from argparse import ArgumentParser, Namespace -from pathlib import Path -from pickle import load as pickle_loads, dump as pickle_dump - -from google.auth.transport.requests import Request -from google.oauth2.credentials import Credentials -from googleapiclient.discovery import build, Resource -from googleapiclient.errors import HttpError -from google_auth_oauthlib.flow import InstalledAppFlow - - -CREDENTIALS_FILE = 'spoyt_credentials.pickle' - -CLIENT_SECRETS_FILE = 'spoyt_o2.json' -SCOPES = ['https://www.googleapis.com/auth/youtube'] -API_SERVICE_NAME = 'youtube' -API_VERSION = 'v3' -DEVELOPER_KEY = '632456508771-criin7m1g53l3e3s90pv6o6sgo9gdphj.apps.googleusercontent.com' - - -def get_authenticated_service() -> Resource: - """Authorize the request and store authorization credentials.""" - credentials: Credentials = None - - # Try to load credentials from file - if (path := Path(CREDENTIALS_FILE)).exists(): - with open(path, 'rb') as f: - credentials = pickle_loads(f) - - if not credentials or not credentials.valid: - # Try to refresh credentials - if credentials and credentials.expired and credentials.refresh_token: - credentials.refresh(Request()) - # Obtain new credentials - else: - flow: InstalledAppFlow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES) - credentials: Credentials = flow.run_local_server() - - # Save credentials to keyring - if not path.exists(): - with open(path, 'x'): - pass - with open(path, 'wb') as f: - pickle_dump(credentials, f) - - return build( - API_SERVICE_NAME, - API_VERSION, - credentials=credentials, - developerKey=DEVELOPER_KEY - ) - - -def add_playlist(youtube: Resource, args: Namespace) -> str: - """Creates playlist then returns it's ID.""" - body = dict( - snippet=dict( - title=args.title, - description=args.description - ), - status=dict( - privacyStatus='private' - ) - ) - - playlists_insert_response: dict = youtube.playlists().insert( - part='snippet,status', - body=body - ).execute() - - return playlists_insert_response.get('id') - - -if __name__ == '__main__': - # Parse arguments - parser = ArgumentParser() - parser.add_argument( - '--title', - default='Test Playlist', - help='The title of the new playlist.' - ) - parser.add_argument( - '--description', - default='A private playlist created with the YouTube Data API.', - help='The description of the new playlist.' - ) - args = parser.parse_args() - - # Connect to YouTube - youtube = get_authenticated_service() - - # Create playlist - try: - new_playlist = add_playlist(youtube, args) - print(f'New playlist ID: {new_playlist}') - except HttpError as e: - print('An HTTP error {0.resp.status} occurred: {0.content}'.format(e)) diff --git a/yt_api/__init__.py b/yt_api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/yt_api/add_playlist.py b/yt_api/add_playlist.py new file mode 100644 index 0000000..10cf7a5 --- /dev/null +++ b/yt_api/add_playlist.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +""" +Creates a private playlist in the authorizing user's YouTube channel. +""" +from googleapiclient.discovery import Resource +from googleapiclient.errors import HttpError + +from authentication import get_authenticated_service +from models import Localization, Thumbnail + + +class Playlist: + def __init__(self, payload: dict): + self.kind: str = payload.get('kind') + self.etag: str = payload.get('etag') + self.id: str = payload.get('id') + + snippet: dict = payload.get('snippet', {}) + self.published_at: str = snippet.get('publishedAt') + self.channel_id: str = snippet.get('channelId') + self.title: str = snippet.get('title') + self.description: str = snippet.get('description') + self.thumbnails: list[Thumbnail] = Thumbnail.list_from_payload( + snippet.get('thumbnails') + ) + self.channel_title: str = snippet.get('channelTitle') + self.default_language: str = snippet.get('defaultLanguage') + localized: dict = snippet.get('localized', {}) + self.localized_title: str = localized.get('title') + self.localized_description: str = localized.get('description') + + status: dict = payload.get('status', {}) + self.privacy_status: str = status.get('privacyStatus') + + content_details: dict = payload.get('contentDetails', {}) + self.item_count: int = int(content_details.get('itemCount', 0)) or None + + player: dict = payload.get('player', {}) + self.embed_html: str = player.get('embedHtml') + + self.localizations: list[Localization] = Localization.list_from_payload( + payload.get('localizations', {}) + ) + + + +def add_playlist(youtube: Resource, title: str, description: str) -> Playlist: + """Creates playlist then returns it as `Playlist`.""" + response: dict = youtube.playlists().insert( + part = 'snippet,status', + body = dict( + snippet = dict( + title = title, + description = description + ), + status = dict( + privacyStatus = 'private' + ) + ) + ).execute() + + return Playlist(response) + + +if __name__ == '__main__': + # Connect to YouTube + youtube = get_authenticated_service() + + playlist_title = 'Test playlist' + playlist_description = 'Playlist created with YouTube API.' + + # Create playlist + try: + new_playlist = add_playlist(youtube, playlist_title, playlist_description) + print(f'New playlist ID: {new_playlist.id}') + except HttpError as e: + print('An HTTP error {0.resp.status} occurred: {0.content}'.format(e)) diff --git a/yt_api/authentication.py b/yt_api/authentication.py new file mode 100644 index 0000000..8670c57 --- /dev/null +++ b/yt_api/authentication.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +from os import getenv +from pathlib import Path +from pickle import load as pickle_loads, dump as pickle_dump + +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from googleapiclient.discovery import build, Resource +from google_auth_oauthlib.flow import InstalledAppFlow + + +CREDENTIALS_FILE = 'spoyt_credentials.pickle' +CLIENT_SECRETS_FILE = 'client_secret.json' +SCOPES = ['https://www.googleapis.com/auth/youtube'] +API_SERVICE_NAME = 'youtube' +API_VERSION = 'v3' +DEVELOPER_KEY = getenv('DEVELOPER_KEY') + + +def get_authenticated_service() -> Resource: + """Authorize the request and store authorization credentials.""" + credentials: Credentials = None + + # Try to load credentials from file + if (path := Path(CREDENTIALS_FILE)).exists(): + with open(path, 'rb') as f: + credentials = pickle_loads(f) + + if not credentials or not credentials.valid: + # Try to refresh credentials + if credentials and credentials.expired and credentials.refresh_token: + credentials.refresh(Request()) + # Obtain new credentials + else: + flow: InstalledAppFlow = InstalledAppFlow.from_client_secrets_file( + CLIENT_SECRETS_FILE, + SCOPES + ) + credentials: Credentials = flow.run_local_server() + + # Save credentials to keyring + if not path.exists(): + with open(path, 'x'): + pass + with open(path, 'wb') as f: + pickle_dump(credentials, f) + + return build( + API_SERVICE_NAME, + API_VERSION, + credentials=credentials, + developerKey=DEVELOPER_KEY + ) diff --git a/yt_api/models/__init__.py b/yt_api/models/__init__.py new file mode 100644 index 0000000..481b9bd --- /dev/null +++ b/yt_api/models/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +from .localization import Localization +from .thumbnail import Thumbnail diff --git a/yt_api/models/localization.py b/yt_api/models/localization.py new file mode 100644 index 0000000..c073c38 --- /dev/null +++ b/yt_api/models/localization.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +class Localization: + def __init__(self, name: str, payload: dict) -> None: + self.name: str = name + self.title: str = payload.get('title') + self.description: str = payload.get('description') + + @staticmethod + def list_from_payload(payload: dict[str, dict[str, str]]) -> list['Localization']: + """Models YouTube API response of 'localizations'.""" + return [Localization(l, payload[l]) for l in payload] diff --git a/yt_api/models/thumbnail.py b/yt_api/models/thumbnail.py new file mode 100644 index 0000000..4234e9a --- /dev/null +++ b/yt_api/models/thumbnail.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +class Thumbnail: + def __init__(self, name: str, payload: dict) -> None: + self.name: str = name + self.url: str = payload.get('url') + self.width: int = int(payload.get('width', 0)) or None + self.height: int = int(payload.get('height', 0)) or None + + @staticmethod + def list_from_payload(payload: dict[str, dict[str, str | str, int]]) -> list['Thumbnail']: + """Models YouTube API response of 'thumbails'.""" + return [Thumbnail(t, payload[t]) for t in payload] From 4ba2dbf3402e22c4813fa6e2488b4f6e84257d19 Mon Sep 17 00:00:00 2001 From: Jakub Suchenek <jakub.suchenek.25@gmail.com> Date: Sun, 26 Nov 2023 04:48:20 +0100 Subject: [PATCH 2/3] Create add_video.py --- yt_api/add_video.py | 85 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 yt_api/add_video.py diff --git a/yt_api/add_video.py b/yt_api/add_video.py new file mode 100644 index 0000000..b4d3917 --- /dev/null +++ b/yt_api/add_video.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +""" +Adds video to playlist. +""" +from json import loads as json_loads + +from googleapiclient.discovery import Resource +from googleapiclient.errors import HttpError + +from authentication import get_authenticated_service +from models import Thumbnail + + +class PlaylistItem: + def __init__(self, payload: dict) -> None: + self.kind: str = payload.get('kind') + self.etag: str = payload.get('etag') + self.id: str = payload.get('id') + + snippet: dict = payload.get('snippet', {}) + self.published_at: str = snippet.get('publishedAt') + self.channel_id: str = snippet.get('channelId') + self.title: str = snippet.get('title') + self.description: str = snippet.get('description') + self.thumbnails: list[Thumbnail] = Thumbnail.list_from_payload( + payload.get('thumbnails') + ) + self.channel_title: str = snippet.get('channelTitle') + self.video_owner_channel_title: str = snippet.get('videoOwnerChannelTitle') + self.video_owner_channel_id: str = snippet.get('videoOwnerChannelId') + self.playlist_id: str = snippet.get('playlistId') + self.position: int = snippet.get('position') + + resource_id: dict = snippet.get('resourceId') + self.resource_kind: str = resource_id.get('kind') + self.resource_video_id: str = resource_id.get('videoId') + + content_details: dict = payload.get('contentDetails', {}) + self.video_id: str = content_details.get('videoId') + self.start_at: str = content_details.get('startAt') + self.end_at: str = content_details.get('endAt') + self.note: str = content_details.get('note') + self.video_published_at: str = content_details.get('videoPublishedAt') + + status: dict = payload.get('status', {}) + self.privacy_status: str = status.get('privacyStatus') + + + + +def add_video(youtube: Resource, playlist_id: str, video_id: str) -> bool: + """Adds video to playlist by IDs and return it as `PlaylistItem`.""" + response: dict = youtube.playlistItems().insert( + part='snippet', + body={ + 'snippet': { + 'playlistId': playlist_id, + 'resourceId': { + 'kind': 'youtube#video', + 'videoId': video_id + } + } + } + ).execute() + + return PlaylistItem(response) + + +if __name__ == '__main__': + # Connect to YouTube + youtube = get_authenticated_service() + + playlist_id = 'PLHpqNKpoWiNX5Qbl0syonrL3El3Ojt9Kf' + video_id = 'dQw4w9WgXcQ' + + try: + playlist_item = add_video(youtube, playlist_id, video_id) + if playlist_item: + print(f'Video ID: {video_id} added to playlist ID {playlist_id}') + else: + print('Adding failed') + except HttpError as e: + code = e.resp.status + message = json_loads(e.content).get('error', {}).get('message', 'Unknown error.') + print(f'Error {code}: {message}') From be99fbc60e50cf46e14925eb0386c08d255e39af Mon Sep 17 00:00:00 2001 From: Jakub Suchenek <jakub.suchenek.25@gmail.com> Date: Sun, 26 Nov 2023 04:48:52 +0100 Subject: [PATCH 3/3] Create database connection --- requirements.txt | 3 ++ yt_api/database/__init__.py | 3 ++ yt_api/database/core.py | 19 ++++++++++ yt_api/database/playlist.py | 69 +++++++++++++++++++++++++++++++++++++ yt_api/database/track.py | 69 +++++++++++++++++++++++++++++++++++++ 5 files changed, 163 insertions(+) create mode 100644 yt_api/database/__init__.py create mode 100644 yt_api/database/core.py create mode 100644 yt_api/database/playlist.py create mode 100644 yt_api/database/track.py diff --git a/requirements.txt b/requirements.txt index 7c68661..9354ca1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,6 @@ google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client +rich +sqlalchemy +pymysql diff --git a/yt_api/database/__init__.py b/yt_api/database/__init__.py new file mode 100644 index 0000000..7e3cb21 --- /dev/null +++ b/yt_api/database/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +from .playlist import Playlist, test_playlist +from .track import Track, test_track diff --git a/yt_api/database/core.py b/yt_api/database/core.py new file mode 100644 index 0000000..b1ef7fa --- /dev/null +++ b/yt_api/database/core.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from os import getenv + +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session, sessionmaker + + +CONNECTION_STRING = getenv('CONNECTION_STRING') + + +Base = declarative_base() + + +def create_session() -> Session: + engine = create_engine(CONNECTION_STRING) + Base.metadata.create_all(engine) + session = sessionmaker(bind=engine) + return session() diff --git a/yt_api/database/playlist.py b/yt_api/database/playlist.py new file mode 100644 index 0000000..bacfaea --- /dev/null +++ b/yt_api/database/playlist.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +from sqlalchemy import Column, inspect, Integer, String +from sqlalchemy.orm import Session + +from core import Base, create_session + + +class Playlist(Base): + __tablename__ = 'playlists' + + id = Column(Integer, primary_key=True, autoincrement=True, nullable=False) + spotify_id = Column(String(22), nullable=False) + youtube_id = Column(String(34), nullable=False) + + @classmethod + def add_playlist(cls, *, spotify_id: str, youtube_id: str) -> 'Playlist': + with create_session() as session: + new_playlist = cls(spotify_id=spotify_id, youtube_id=youtube_id) + session.add(new_playlist) + session.commit() + return new_playlist + + @classmethod + def remove_playlist(cls, *, spotify_id: str = None, youtube_id: str = None) -> None: + if not spotify_id and not youtube_id: + return + with create_session() as session: + if spotify_id: + playlist_to_remove = session.query(cls).filter_by(spotify_id=spotify_id).first() + elif youtube_id: + playlist_to_remove = session.query(cls).filter_by(youtube_id=youtube_id).first() + session.delete(playlist_to_remove) + session.commit() + + @classmethod + def get_playlist(cls, *, spotify_id: str = None, youtube_id: str = None) -> 'Playlist': + if not spotify_id and not youtube_id: + return None + with create_session() as session: + if spotify_id: + playlist = session.query(cls).filter_by(spotify_id=spotify_id).first() + elif youtube_id: + playlist = session.query(cls).filter_by(youtube_id=youtube_id).first() + return playlist + + +def test_playlist(session: Session) -> bool: + inspector = inspect(session.bind) + return 'playlists' in inspector.get_table_names() + + +if __name__ == '__main__': + from logging import basicConfig, getLogger, INFO + from rich.logging import RichHandler + + basicConfig( + level=INFO, + format='%(message)s', + datefmt='[%X]', + handlers=[RichHandler(markup=True, rich_tracebacks=True)] + ) + log = getLogger('sqlalchemy.engine') + log.setLevel(INFO) + + session = create_session() + if (valid := test_playlist(session)): + log.info('The `playlists` table exists and is correctly structured.') + else: + log.error('The `playlists` table does not exist or is not correctly structured.') diff --git a/yt_api/database/track.py b/yt_api/database/track.py new file mode 100644 index 0000000..adbebf8 --- /dev/null +++ b/yt_api/database/track.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +from sqlalchemy import Column, inspect, Integer, String +from sqlalchemy.orm import Session + +from core import Base, create_session + + +class Track(Base): + __tablename__ = 'tracks' + + id = Column(Integer, primary_key=True, nullable=False) + spotify_id = Column(String(22), nullable=False) + youtube_id = Column(String(11), nullable=False) + + @classmethod + def add_track(cls, *, spotify_id: str, youtube_id: str) -> 'Track': + with create_session() as session: + new_track = cls(spotify_id=spotify_id, youtube_id=youtube_id) + session.add(new_track) + session.commit() + return new_track + + @classmethod + def remove_track(cls, *, spotify_id: str = None, youtube_id: str = None) -> None: + if not spotify_id and not youtube_id: + return + with create_session() as session: + if spotify_id: + track_to_remove = session.query(cls).filter_by(spotify_id=spotify_id).first() + elif youtube_id: + track_to_remove = session.query(cls).filter_by(youtube_id=youtube_id).first() + session.delete(track_to_remove) + session.commit() + + @classmethod + def get_track(cls, *, spotify_id: str = None, youtube_id: str = None) -> 'Track': + if not spotify_id and not youtube_id: + return None + with create_session() as session: + if spotify_id: + track = session.query(cls).filter_by(spotify_id=spotify_id).first() + elif youtube_id: + track = session.query(cls).filter_by(youtube_id=youtube_id).first() + return track + + +def test_track(session: Session) -> bool: + inspector = inspect(session.bind) + return 'tracks' in inspector.get_table_names() + + +if __name__ == '__main__': + from logging import basicConfig, getLogger, INFO + from rich.logging import RichHandler + + basicConfig( + level=INFO, + format='%(message)s', + datefmt='[%X]', + handlers=[RichHandler(markup=True, rich_tracebacks=True)] + ) + log = getLogger('sqlalchemy.engine') + log.setLevel(INFO) + + session = create_session() + if (valid := test_track(session)): + log.info('The `tracks` table exists and is correctly structured.') + else: + log.error('The `tracks` table does not exist or is not correctly structured.')