-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
124 lines (97 loc) · 4.05 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import sqlalchemy
import pandas as pd
from sqlalchemy.orm import sessionmaker
import requests
import json
from datetime import datetime
import datetime
import sqlite3
DATABASE_LOCATION = "sqlite:///my_played_tracks.sqlite"
USER_ID = "okwdd5okkkshk6nueoewmsn0r" # your Spotify username
TOKEN = "BQDkgp5LUBL8ROgQ1kaT-nOHaQRTvq9G2F8Chnc2KXCGClUmkptKDIdQjZ5ErbdoeVwZ_hkUErerwaZlM3Pac2OGGw1KoJnjZ9zOTzDn-K3oEYvEBrz2Lu46FSSznuIiDKBmnSaxg4kLyZY-CD_jTpbYu0-8Opl_l_gO" # your Spotify API token
# Generate your token here: https://developer.spotify.com/console/get-recently-played/
# Note: You need a Spotify account (can be easily created for free)
def check_if_valid_data(df: pd.DataFrame) -> bool:
# Check if dataframe is empty
if df.empty:
print("No songs downloaded. Finishing execution")
return False
# Primary Key Check
if pd.Series(df['played_at']).is_unique:
pass
else:
raise Exception("Primary Key check is violated")
# Check for nulls
if df.isnull().values.any():
raise Exception("Null values found")
# # Check that all timestamps are of yesterday's date
# yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
# yesterday = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
#
# timestamps = df["timestamp"].tolist()
# for timestamp in timestamps:
# if datetime.datetime.strptime(timestamp, '%Y-%m-%d') != yesterday:
# raise Exception("At least one of the returned songs does not have a yesterday's timestamp")
#
# return True
if __name__ == "__main__":
# Extract part of the ETL process
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": "Bearer {token}".format(token=TOKEN)
}
# Convert time to Unix timestamp in miliseconds
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=1)
yesterday_unix_timestamp = int(yesterday.timestamp()) * 1000
# Download all songs you've listened to "after yesterday", which means in the last 24 hours
r = requests.get(
"https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp),
headers=headers)
data = r.json()
song_names = []
artist_names = []
played_at_list = []
timestamps = []
# Extracting only the relevant bits of data from the json object
for song in data["items"]:
song_names.append(song["track"]["name"])
artist_names.append(song["track"]["album"]["artists"][0]["name"])
played_at_list.append(song["played_at"])
timestamps.append(song["played_at"][0:10])
# Prepare a dictionary in order to turn it into a pandas dataframe below
song_dict = {
"song_name": song_names,
"artist_name": artist_names,
"played_at": played_at_list,
"timestamp": timestamps
}
song_df = pd.DataFrame(song_dict, columns=["song_name", "artist_name", "played_at", "timestamp"])
print(song_df)
# Validate
if check_if_valid_data(song_df):
print("Data valid, proceed to Load stage")
# Load
engine = sqlalchemy.create_engine(DATABASE_LOCATION)
conn = sqlite3.connect('my_played_tracks.sqlite')
cursor = conn.cursor()
sql_query = """
CREATE TABLE IF NOT EXISTS my_played_tracks(
song_name VARCHAR(200),
artist_name VARCHAR(200),
played_at VARCHAR(200),
timestamp VARCHAR(200),
CONSTRAINT primary_key_constraint PRIMARY KEY (played_at)
)
"""
cursor.execute(sql_query)
print("Opened database successfully")
try:
song_df.to_sql("my_played_tracks", engine, index=False, if_exists='append')
except:
print("Data already exists in the database")
conn.close()
print("Close database successfully")
# Job scheduling
# For the scheduling in Airflow, refer to files in the dag folder