-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathyoutube.py
183 lines (139 loc) · 5.37 KB
/
youtube.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from typing import Callable, Generator, List, Optional
import googleapiclient.discovery
import googleapiclient.errors
from data_access import DataAccess
DEFAULT_MAX_RESULTS = 50
def gen_resources(resource: Callable, **list_params) -> Generator[List, None, None]:
"""
Paginates through all the data relevant to `resource`, yielding each set
as it comes back.
:param resource: The YouTube Data API resource function (ie: youtube.videos).
:param list_params: Parameters to pass to the `list` call on `resource`.
:return: Generator.
"""
print("Generating resources.")
if "maxResults" not in list_params.keys():
list_params["maxResults"] = DEFAULT_MAX_RESULTS
next_page_token = None
while True:
if next_page_token:
list_params["pageToken"] = next_page_token
request = resource().list(**list_params)
# print("\t\tRequest made successfully.")
response = request.execute()
# print(f"\t\tRaw response: {response}")
data = response["items"]
print(f"\tRetrieved {len(data)}")
yield data
if "nextPageToken" in response.keys():
next_page_token = response["nextPageToken"]
else:
print("\tReached last page.")
break
return None
def gen_resources_for_ids(
resource: Callable, res_ids: List[str], **list_params
) -> Generator[List, None, None]:
"""
Makes requests to retrieve all resources for `res_ids`, yielding each batch.
:param resource: The YouTube Data API resource function (ie: youtube.videos).
:param res_ids:
:param list_params: Parameters to pass to the `list` call on `resource`.
:return: Generator
"""
print("Generating resources for ids.")
total = len(res_ids)
res_counter = 0
if "maxResults" not in list_params.keys():
list_params["maxResults"] = DEFAULT_MAX_RESULTS
max_results = DEFAULT_MAX_RESULTS
else:
max_results = list_params["maxResults"]
_res_ids = res_ids.copy()
while len(_res_ids) > 0:
request_ids = []
for _ in range(max_results):
request_ids.append(_res_ids.pop(0))
if len(_res_ids) == 0:
break
print(
f"\tRequesting {res_counter}-{res_counter + len(request_ids)} of {total}."
)
list_params["id"] = ",".join(request_ids)
request = resource().list(**list_params)
response = request.execute()
yield response["items"]
res_counter += max_results
print("\tFinished requesting resources.")
return None
class YouTube:
def __init__(self, api_key: str):
api_service_name = "youtube"
api_version = "v3"
self.youtube = googleapiclient.discovery.build(
api_service_name, api_version, developerKey=api_key
)
def get_pitems_for_pid(self, pid: str) -> List:
print(f"Requesting playlist items for {pid}.")
data = []
for items in gen_resources(
self.youtube.playlistItems, part="contentDetails", playlistId=pid
):
data += items
return data
def get_videos_for_pitems(self, pitems: List) -> List:
print("Requesting videos for playlist items.")
vids: List[str] = [pitem["contentDetails"]["videoId"] for pitem in pitems]
data = []
# Filter out videos we already have.
da = DataAccess()
vids = [vid for vid in vids if not da.have_video(vid)]
for items in gen_resources_for_ids(
self.youtube.videos, vids, part="snippet,statistics",
):
data += items
return data
def gen_comment_threads_for_videos(
self, videos: List
) -> Generator[List, None, None]:
"""
Generates `commentThreads` for the `videos`, yielding on every video.
:param videos:
:return: Generator
"""
print("Requesting comment threads for videos.")
for video in videos:
threads = self.get_comment_threads_for_video(video["id"])
yield threads
return None
def get_comment_threads_for_video(self, video_id: str) -> List:
print(f"Getting threads for {video_id}")
# Get all the threads for the video (paginated).
threads = []
for items in gen_resources(
self.youtube.commentThreads,
part="snippet",
videoId=video_id,
textFormat="plainText",
maxResults=100,
): # Allows up to 100 at a time.
threads += items
pass
for thread in threads:
# print(thread)
# Then get the top-level comments' replies (paginated).
if thread["snippet"]["totalReplyCount"] > 0:
top_level_comment = thread["snippet"]["topLevelComment"]
print(f"\tGetting replies for {thread['id']}")
replies = []
for items in gen_resources(
self.youtube.comments,
part="snippet",
parentId=top_level_comment["id"],
textFormat="plainText",
maxResults=100,
): # Allows up to 100 at a time.
replies += items
# And hydrate the thread with the retrieved comments.
thread["replies"] = {"comments": replies}
return threads