-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyt-search-in-subs.py
executable file
·130 lines (102 loc) · 4.11 KB
/
yt-search-in-subs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
try:
from bs4 import BeautifulSoup
import urllib3
import certifi
except ModuleNotFoundError:
print("you need to install bs4, urllib3 and certify to use this script")
exit(1)
import sys
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def lcs(s1, s2):
matrix = [["" for x in range(len(s2))] for x in range(len(s1))]
for i in range(len(s1)):
for j in range(len(s2)):
if s1[i] == s2[j]:
if i == 0 or j == 0:
matrix[i][j] = s1[i]
else:
matrix[i][j] = matrix[i-1][j-1] + s1[i]
else:
matrix[i][j] = max(matrix[i-1][j], matrix[i][j-1], key=len)
cs = matrix[-1][-1]
return len(cs)
def get_channel_vids_query(channel_url, query, poolmanager):
query_url = channel_url + '/search?query=' + query.replace(' ', '&')
while True:
response = poolmanager.request('GET', query_url)
if response.status == 200:
break
else:
print("having trouble connecting to youtube")
time.sleep(0.5)
html = response.data
channel_videos = []
parsed_html = BeautifulSoup(html, features="html.parser")
video_entries_html = parsed_html.body.findAll('li', attrs={'class':'feed-item-container'})
author = parsed_html.body.find('a', attrs={'class':'branded-page-header-title-link'})['title']
for video_entry_html in video_entries_html:
video = {}
video['title'] = video_entry_html.find('a', attrs={'class':'yt-uix-tile-link'})['title']
video['id'] = video_entry_html.find('a', attrs={'class':'yt-uix-tile-link'})['href'].split('?v=')[1]
video['author'] = author
channel_videos.append(video)
return channel_videos
def parse_sub_file(file):
f = open(file, 'r')
subs_xml = f.read()
f.close()
parsed_xml = BeautifulSoup(subs_xml, features="html.parser")
subs_parsed_xml = parsed_xml.body.outline.findAll('outline')
sub_channel_ids = [s['xmlurl'].split('channel_id=')[1] for s in subs_parsed_xml]
return sub_channel_ids
def gen_channel_url(channel_id):
return 'https://www.youtube.com/channel/' + channel_id
def gen_video_url(video_id):
return 'https://www.youtube.com/watch?v=' + video_id
def search_in_subs(query, sub_file):
subs = parse_sub_file(sub_file)
print("the results printed last will be the closest to your query")
print('Scanning Channels:')
final_results = []
future_channel_results = {}
channel_result = [None]*len(subs)
with ThreadPoolExecutor(max_workers=len(subs)) as executor:
http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
for sid in range(len(subs)):
sub_url = gen_channel_url(subs[sid])
future_channel_results.update({executor.submit(get_channel_vids_query, sub_url, query, http): sid})
c = 0
for future in as_completed(future_channel_results):
sid=future_channel_results[future]
channel_result[sid] = future.result()
print(c, "/", len(subs))
c+=1
for ch in channel_result:
final_results += ch
final_results = sorted(final_results, key=lambda x: lcs(x['title'].lower(), query.lower()))
return final_results
def print_videos(query_results):
for vid in query_results:
print(vid['title'], 'by', vid['author'])
print(gen_video_url(vid['id']))
print()
if __name__ == '__main__':
if len(sys.argv) != 3:
print("usage: yt-search-in-subs.py <query> <subscription file>")
print("you can download the subscription file at:")
print("https://www.youtube.com/subscription_manager?action_takeout=1")
exit(1)
elif not os.path.isfile(sys.argv[2]):
print("subscription file not found")
exit(1)
else:
now = time.time()
result = search_in_subs(sys.argv[1], sys.argv[2])
if result:
print_videos(result)
else:
print("No videos found.")
print(time.time() - now, "seconds for query")