-
Notifications
You must be signed in to change notification settings - Fork 1
/
transfer-yt-subs.py
111 lines (94 loc) · 3.81 KB
/
transfer-yt-subs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/python3
import os
import re
import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
class bcolors:
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
CLIENT_SECRETS_FILE = 'client_secret.json'
SCOPES = ['https://www.googleapis.com/auth/youtube']
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'
# Аутентификация в два аккаунта с созданием локального веб-сервера на 2х разных портах
def get_authenticated_service(export: bool):
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
if export:
credentials = flow.run_local_server(port=8080)
else:
credentials = flow.run_local_server(port=8081)
return build(API_SERVICE_NAME, API_VERSION, credentials = credentials)
# Вызов API youtube.subscriptions.list для постраничной выгрузки подписок
def get_chanellsid(target_channels):
next_page_token = ''
while True:
get_chanellsid_response = youtube.subscriptions().list(
part="snippet",
mine=True,
maxResults=50,
pageToken=next_page_token
).execute()
for channelid in get_chanellsid_response['items']:
target_channels.append(channelid['snippet']['resourceId']['channelId'])
if 'nextPageToken' in get_chanellsid_response:
next_page_token = get_chanellsid_response['nextPageToken']
else:
break
# Вызов API youtube.subscriptions.insert для добавления подписки на канал
def add_subscription(youtube, channel_id):
add_subscription_response = youtube.subscriptions().insert(
part='snippet',
body=dict(
snippet=dict(
resourceId=dict(
channelId=channel_id
)
)
)).execute()
return add_subscription_response
if __name__ == '__main__':
print()
print(bcolors.WARNING + 'Login to Google-account for export!' + bcolors.ENDC)
print()
youtube = get_authenticated_service(export=True)
export_account_channels = []
get_chanellsid(export_account_channels)
print()
print(bcolors.WARNING + 'Login to Google-account for import!' + bcolors.ENDC)
print()
youtube = get_authenticated_service(export=False)
import_account_channels = []
get_chanellsid(import_account_channels)
channels_to_add = list(set(export_account_channels) ^ set(import_account_channels)) # Извлечение еще неоформленных подписок на импорт-аккаунте
channels_to_add_quantity = len(channels_to_add)
if channels_to_add_quantity > 200:
print()
print(bcolors.WARNING + 'Channels quantity ('+ str(channels_to_add_quantity) +') greater than API quota! Wait for the end of execution and try again next day!' + bcolors.ENDC)
print(bcolors.WARNING + 'API quota is 200 subscriptions' + bcolors.ENDC)
print()
elif channels_to_add_quantity == 0:
print()
print(bcolors.WARNING + 'Count of channels to subscription - '+ str(channels_to_add_quantity) + bcolors.ENDC)
quit()
else:
print()
print(bcolors.WARNING + 'Count of channels to subscription - '+ str(channels_to_add_quantity) + bcolors.ENDC)
print()
counter = 0
try:
for channel_id in channels_to_add:
add_subscription(youtube, channel_id)
counter += 1
except HttpError as e:
print()
print(bcolors.FAIL + 'An HTTP error {} occurred:\n{}'.format(e.resp.status, e.content) + bcolors.ENDC)
print()
print('A subscriptions to ' + str(counter) + ' channels ' + 'was ' + bcolors.OKGREEN + 'added.' + bcolors.ENDC)
else:
print()
print('A subscriptions to ' + str(counter) + ' channels ' + 'was ' + bcolors.OKGREEN + 'added.' + bcolors.ENDC)