forked from wdr1/itunes2spotify
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspotifyFunctions.py
169 lines (128 loc) · 4.77 KB
/
spotifyFunctions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import sys
import shelve
import time
import logging
import random
import spotipy
import spotipy.util as util
spotifyObject = None
username = ""
iTunes2spotifyMapping = None
uriCache = None
uriFilename = ".uricache"
maxSongsPerAddTracksCall = 20
jankyRateLimitingWaitTime = 2000 # milliseconds
jankyRateLimitingLastRequestTime = None
maxRetryAttempts = 3
totalSongs = 0
totalSongsNotFound = 0
def getUserToken():
global username, spotifyObject
scope = 'user-library-read playlist-modify-private'
# token = util.prompt_for_user_token(username, scope)
sp_oauth = spotipy.SpotifyOAuth(scope=scope, username=username)
spotifyObject = spotipy.Spotify(auth_manager=sp_oauth)
# if token:
# spotifyObject = spotipy.Spotify(auth=token)
# else:
# logging.error(f"Can't get token for {username}")
# sys.exit()
def trackDict2SpotifySearchString(trackDict):
s = ""
for ituneKey, spotifyKey in iTunes2spotifyMapping.items():
if spotifyKey in trackDict:
s += spotifyKey + ':"' + trackDict[spotifyKey] + '" '
# searchString = s.encode('ascii', 'ignore')
searchString = s
return searchString
# Check if we already have the URI. When we don't retreive, cache, while
# rate-limiting so Spotify doesn't get cranky
def tracks2SpotifyURIs(tracks):
global uriCache, uriFilename, totalSongs, totalSongsNotFound
if uriCache is None:
uriCache = shelve.open(uriFilename)
results = []
for t in tracks:
totalSongs += 1
# Attempt to search by all criteria
songURI = track2SpotifyURIs(t, uriCache)
searchString = trackDict2SpotifySearchString(t)
# Attempt to search by just track + Artist
if songURI is None:
t2 = t
t2.pop('album')
songURI = track2SpotifyURIs(t2, uriCache)
if songURI is not None:
logging.warning("I: Fallback on '%s'" % searchString)
if songURI is not None:
results.append(songURI)
uriCache.sync()
else:
totalSongsNotFound += 1
logging.error("MISSING: Couldn't find '%s'" % searchString)
uriCache.close()
return results
def track2SpotifyURIs(track, cache):
songURI = None
searchString = str(trackDict2SpotifySearchString(track))
if searchString in list(cache.keys()):
songURI = cache[searchString]
return songURI
jankyRateLimiting()
songURI = findSpotifyURI(track)
if songURI is not None:
cache[searchString] = songURI
return songURI
# Make sure we don't hammer Spotify
def jankyRateLimiting():
global jankyRateLimitingLastRequestTime
now = int(round(time.time() * 1000))
if jankyRateLimitingLastRequestTime is None:
jankyRateLimitingLastRequestTime = now
return
timeSince = now - jankyRateLimitingLastRequestTime
jankyRateLimitingLastRequestTime = now
if timeSince < jankyRateLimitingWaitTime:
waitTime = (jankyRateLimitingWaitTime - timeSince)/1000.0
time.sleep(waitTime)
def findSpotifyURI(trackDict):
searchString = trackDict2SpotifySearchString(trackDict)
attempts = 0
while attempts < maxRetryAttempts:
attempts += 1
logging.debug("Looking for '%s'... (%d)" % (searchString, attempts))
try:
results = spotifyObject.search(q=searchString, type='track')
break
except:
logging.error("Unexpected error:", sys.exc_info()[0])
return None
if results['tracks']['total'] == 0:
return None
return results['tracks']['items'][0]['uri']
def createPlaylist(playlistName):
logging.info("Created playlist '%s'" % playlistName)
playlistObject = spotifyObject.user_playlist_create(username, playlistName, public=False)
return playlistObject["id"]
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def addSpotifyURIstoPlaylist(playlistId, songUris):
for group in chunker(songUris, maxSongsPerAddTracksCall):
results = None
attempts = 0
while attempts < maxRetryAttempts:
attempts += 1
jankyRateLimiting()
logging.info("Adding %d tracks to playlist (%d)...", len(group), attempts)
try:
results = spotifyObject.user_playlist_add_tracks(username, playlistId, group)
break
except:
logging.error("Unexpected error:", sys.exc_info()[0])
def addTracksToPlaylist(playlistName, tracks):
global totalSongs, totalSongsNotFound
getUserToken()
songUris = tracks2SpotifyURIs(tracks)
logging.info(f"#Songs: {totalSongs} with not found #songs: {totalSongsNotFound}")
playlistId = createPlaylist(playlistName)
addSpotifyURIstoPlaylist(playlistId, songUris)