-
-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathTokenRetriever.py
104 lines (90 loc) · 4.12 KB
/
TokenRetriever.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import gzip
import seleniumwire.undetected_chromedriver as uc
import json
import requests
from urllib.parse import urlparse, parse_qs
import hashlib
import os
import base64
class TokenRetriever:
CLIENT_KEY = "awdjaq9ide8ofrtz"
REDIRECT_URI = "https://streamlabs.com/tiktok/auth"
STATE = ""
SCOPE = "user.info.basic,live.room.info,live.room.manage,user.info.profile,user.info.stats"
STREAMLABS_API_URL = "https://streamlabs.com/api/v5/auth/data"
def __init__(self, cookies_file='cookies.json'):
self.code_verifier = self.generate_code_verifier()
self.code_challenge = self.generate_code_challenge(self.code_verifier)
self.streamlabs_auth_url = (
f"https://streamlabs.com/m/login?force_verify=1&external=mobile&skip_splash=1&tiktok&code_challenge={self.code_challenge}"
)
self.cookies_file = cookies_file
self.driver = None
@staticmethod
def generate_code_verifier():
return base64.urlsafe_b64encode(os.urandom(64)).decode('utf-8').rstrip('=')
@staticmethod
def generate_code_challenge(code_verifier):
sha256 = hashlib.sha256()
sha256.update(code_verifier.encode('utf-8'))
return base64.urlsafe_b64encode(sha256.digest()).decode('utf-8').rstrip('=')
def load_cookies(self, driver):
if os.path.exists(self.cookies_file):
with open(self.cookies_file, 'r') as f:
cookies = json.load(f)
for cookie in cookies:
driver.add_cookie(cookie)
def retrieve_token(self):
# Set up a directory for Selenium Wire to store its files
selenium_wire_storage = os.path.join(os.path.dirname(os.path.abspath(__file__)), "seleniumwire")
# Ensure the directory exists
if not os.path.exists(selenium_wire_storage):
os.makedirs(selenium_wire_storage)
# Configure Selenium Wire options
seleniumwire_options = {
'request_storage_base_dir': selenium_wire_storage
}
chrome_options = uc.ChromeOptions()
chrome_options.add_argument('--ignore-ssl-errors=yes')
chrome_options.add_argument('--ignore-certificate-errors')
# Now configure the webdriver with Selenium Wire options
self.driver = uc.Chrome(seleniumwire_options=seleniumwire_options, options=chrome_options)
self.driver.get("https://www.tiktok.com/legal") # Load a page first before setting cookies
self.load_cookies(self.driver)
self.driver.get(self.streamlabs_auth_url)
try:
request = self.driver.wait_for_request('https://www.tiktok.com/passport/open/web/auth/v2/', timeout=600)
if request:
decompressed_body = gzip.decompress(request.response.body)
response_body = decompressed_body.decode('utf-8')
data = json.loads(response_body)
redirect_url = data.get('redirect_url')
if redirect_url:
with requests.session() as s:
s.get(redirect_url)
parsed_url = urlparse(redirect_url)
auth_code = parse_qs(parsed_url.query).get('code', [None])[0]
else:
print("No redirect_url found in the response.")
return None
else:
print("No request intercepted or timeout reached.")
return None
finally:
try:
self.driver.close()
except Exception as e:
print(f"Error closing browser: {e}")
if auth_code:
try:
import time
token_request_url = f"{self.STREAMLABS_API_URL}?code_verifier={self.code_verifier}&code={auth_code}"
time.sleep(3) # Wait a few seconds before sending the request
response = requests.get(token_request_url).json()
if response["success"]:
return response["data"]["oauth_token"]
except:
print("Failed to obtain token.")
return None
print("Failed to obtain authorization code.")
return None