From 71b00478aa24f5f6222b0c2fc2736cc27510cf5f Mon Sep 17 00:00:00 2001 From: luolongfei Date: Thu, 13 Jan 2022 17:20:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=87=AA=E5=8A=A8=E8=A7=A3?= =?UTF-8?q?=E9=94=81=E8=B4=A6=E6=88=B7=E5=8A=9F=E8=83=BD=EF=BC=9B=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E5=B0=81=E8=A3=85=E5=A2=9E=E5=BC=BA=E5=9E=8B=E5=85=83?= =?UTF-8?q?=E7=B4=A0=E6=9F=A5=E6=89=BE=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 2 +- netflix.py | 448 ++++++++++++++++++++++++++++++++++++----------- utils/version.py | 2 +- 3 files changed, 351 insertions(+), 101 deletions(-) diff --git a/.env.example b/.env.example index 21ad988..ca3f0a0 100644 --- a/.env.example +++ b/.env.example @@ -7,7 +7,7 @@ DRIVER_EXECUTABLE_FILE='/usr/bin/chromedriver' MULTIPLE_NETFLIX_ACCOUNTS='[账户1|密码1|用户名1][账户2|密码2|用户名2]' # 是否启用账户名保护,启用后,程序将每两分钟左右主动检查账户名是否被篡改并尝试恢复 1:启用 0:不启用 -ENABLE_ACCOUNT_NAME_PROTECTION=1 +ENABLE_ACCOUNT_PROTECTION=1 # 机器人邮箱 BOT_MAIL_USERNAME='' diff --git a/netflix.py b/netflix.py index 7f2c781..d9d0f13 100644 --- a/netflix.py +++ b/netflix.py @@ -24,6 +24,7 @@ import re import datetime import traceback +from functools import reduce, wraps from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from selenium import webdriver @@ -65,7 +66,7 @@ def wrapper(*args, **kwargs): except NoSuchElementException as e: logger.error('匹配元素超时,超过 {} 秒依然没有发现元素:{}', Netflix.TIMEOUT, str(e)) except TimeoutException as e: - logger.error(f'请求超时:{Netflix.driver.current_url} 异常:{str(e)}') + logger.error(f'查找元素超时或请求超时:{str(e)} [{Netflix.driver.current_url}]') except WebDriverException as e: logger.error(f'未知错误:{str(e)}') except Exception as e: @@ -78,7 +79,8 @@ def wrapper(*args, **kwargs): class Netflix(object): - # 超时秒数,包括隐式等待和显式等待 + # 超时秒数 + # 如果同时设置了显式等待和隐式等待,则 webdriver 会取二者中更大的时间 TIMEOUT = 24 USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36' @@ -88,6 +90,8 @@ class Netflix(object): RESET_PASSWORD_URL = 'https://www.netflix.com/password' FORGOT_PASSWORD_URL = 'https://www.netflix.com/LoginHelp' MANAGE_PROFILES_URL = 'https://www.netflix.com/ManageProfiles' + BROWSE_URL = 'https://www.netflix.com/browse' + ACCOUNT_URL = 'https://www.netflix.com/YourAccount' # 账户管理地址 # 请求重置密码的邮件正则 RESET_MAIL_REGEX = re.compile(r'accountaccess.*?URL_ACCOUNT_ACCESS', re.I) @@ -140,7 +144,7 @@ def __init__(self): self.options.add_argument('--disable-blink-features=AutomationControlled') # Chrome v88 以上版本正确隐藏浏览器特征 self.driver = webdriver.Chrome(executable_path=os.getenv('DRIVER_EXECUTABLE_FILE'), options=self.options) - self.driver.implicitly_wait(Netflix.TIMEOUT) + # self.driver.implicitly_wait(Netflix.TIMEOUT) # 不再指定隐式等待时间,防止显示等待与隐式等待混用导致等待时间混乱问题 # 防止通过 window.navigator.webdriver === true 检测模拟浏览器 # 注意,低于 Chrome v88 (不含) 的浏览器可用此处代码隐藏 Web Driver 特征 @@ -163,7 +167,7 @@ def __init__(self): 'source': stealth_js }) - # 统配显式等待 + # 通用显式等待实例 self.wait = WebDriverWait(self.driver, timeout=Netflix.TIMEOUT, poll_frequency=0.5) # 测试无头浏览器特征是否正确隐藏 @@ -258,47 +262,178 @@ def get_all_args(): return parser.parse_args() - def _login(self, netflix_username: str, netflix_password: str): + def find_element_by_id(self, id: str, timeout: int or float = 24.0, ignored_exceptions=None, + poll_frequency: int or float or None = None, message: str or None = None, + scroll_into_view: bool = False, block: str = 'start') -> WebElement: + """ + 根据 id 查找元素 + + 元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化 + :param id: + :param timeout: + :param ignored_exceptions: + :param poll_frequency: + :param message: + :param scroll_into_view: 是否将元素滚动到可视范围内 + :param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest + :return: + """ + message = f'查找 id 为 {id} 的元素未果' if not message else message + + if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency: + el = self.wait.until(EC.visibility_of_element_located((By.ID, id)), message) + else: + el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5, + ignored_exceptions=ignored_exceptions).until( + EC.visibility_of_element_located((By.ID, id)), message) + + if scroll_into_view: + self.scroll_page_until_el_is_visible(el, block) + + return el + + def find_element_by_class_name(self, class_name: str, timeout: int or float = 24.0, ignored_exceptions=None, + poll_frequency: int or float or None = None, message: str or None = None, + scroll_into_view: bool = False, block: str = 'start') -> WebElement: + """ + 根据 class name 查找元素 + + 元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化 + :param class_name: + :param timeout: + :param ignored_exceptions: + :param poll_frequency: + :param message: + :param scroll_into_view: 是否将元素滚动到可视范围内 + :param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest + :return: + """ + message = f'查找 class name 为 {class_name} 的元素未果' if not message else message + + if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency: + el = self.wait.until(EC.visibility_of_element_located((By.CLASS_NAME, class_name)), message) + else: + el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5, + ignored_exceptions=ignored_exceptions).until( + EC.visibility_of_element_located((By.CLASS_NAME, class_name)), message) + + if scroll_into_view: + self.scroll_page_until_el_is_visible(el, block) + + return el + + def find_element_by_xpath(self, xpath: str, timeout: int or float = 24.0, ignored_exceptions=None, + poll_frequency: int or float or None = None, message: str or None = None, + scroll_into_view: bool = False, block: str = 'start') -> WebElement: + """ + 根据 xpath 查找元素 + + 元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化 + :param xpath: + :param timeout: + :param ignored_exceptions: + :param poll_frequency: + :param message: + :param scroll_into_view: 是否将元素滚动到可视范围内 + :param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest + :return: + """ + message = f'查找 xpath 为 {xpath} 的元素未果' if not message else message + + if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency: + el = self.wait.until(EC.visibility_of_element_located((By.XPATH, xpath)), message) + else: + el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5, + ignored_exceptions=ignored_exceptions).until( + EC.visibility_of_element_located((By.XPATH, xpath)), message) + + if scroll_into_view: + self.scroll_page_until_el_is_visible(el, block) + + return el + + def find_element_by_tag_name(self, tag_name: str, timeout: int or float = 24.0, ignored_exceptions=None, + poll_frequency: int or float or None = None, message: str or None = None, + scroll_into_view: bool = False, block: str = 'start') -> WebElement: + """ + 根据 tag name 查找元素 + + 元素必须是已加载且可见才能寻到,如若未指定超时时间和异常处理等相关参数,则优先使用前期准备好的 WebDriverWait 实例,不再重复实例化 + :param tag_name: + :param timeout: + :param ignored_exceptions: + :param poll_frequency: + :param message: + :param scroll_into_view: 是否将元素滚动到可视范围内 + :param block: 定义垂直对齐方式,仅当 scroll_into_view 为 True 时才有意义,支持 start center end nearest + :return: + """ + message = f'查找 tag name 为 {tag_name} 的元素未果' if not message else message + + if not ignored_exceptions and timeout == Netflix.TIMEOUT and not poll_frequency: + el = self.wait.until(EC.visibility_of_element_located((By.TAG_NAME, tag_name)), message) + else: + el = WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency if poll_frequency else 0.5, + ignored_exceptions=ignored_exceptions).until( + EC.visibility_of_element_located((By.TAG_NAME, tag_name)), message) + + if scroll_into_view: + self.scroll_page_until_el_is_visible(el, block) + + return el + + def scroll_page_until_el_is_visible(self, el: WebElement, block: str = 'start') -> None: + """ + 滚动直到元素可见 + + 按钮需要滚动直到可见,否则无法点击 + 参考:https://developer.mozilla.org/zh-CN/docs/Web/API/Element/scrollIntoView + :param el: + :param block: 定义垂直方向的对齐,“start”、“center”、“end”, 或“nearest”之一。默认为 start + :return: + """ + self.driver.execute_script('arguments[0].scrollIntoView({{block: "{}"}});'.format(block), el) + + def _login(self, u: str, p: str, n: str = '') -> tuple: """ 登录 - :param netflix_username: - :param netflix_password: + :param u: + :param p: + :param n: :return: """ - logger.debug('尝试登录账户:{}', netflix_username) + logger.debug('尝试登录账户:{}', u) # 多账户,每次登录前需要清除 cookies self.driver.delete_all_cookies() self.driver.get(Netflix.LOGIN_URL) - u = self.driver.find_element_by_id('id_userLoginId') - u.clear() - u.send_keys(netflix_username) + username_input_el = self.find_element_by_id('id_userLoginId') + username_input_el.clear() + username_input_el.send_keys(u) time.sleep(1.1) - p = self.driver.find_element_by_id('id_password') - p.clear() - p.send_keys(netflix_password) + pwd_input_el = self.find_element_by_id('id_password') + pwd_input_el.clear() + pwd_input_el.send_keys(p) - self.driver.find_element_by_class_name('login-button').click() + self.find_element_by_class_name('login-button').click() if self.has_unknown_error_alert(): - raise UserWarning('当前账户可能处于风控期间,无法登录,本次操作将被忽略') + raise UserWarning(f'账户 {u} 可能正处于风控期间,无法登录,本次操作将被忽略') try: WebDriverWait(self.driver, timeout=3, poll_frequency=0.94).until(lambda d: 'browse' in d.current_url) - except Exception as e: - WebDriverWait(self.driver, timeout=2, poll_frequency=0.5).until( - EC.visibility_of_element_located((By.XPATH, '//a[@data-uia="header-signout-link"]')), - '查找登出元素未果') + except Exception: + self.find_element_by_xpath('//a[@data-uia="header-signout-link"]', message='查找登出元素未果') logger.warning(f'当前账户可能非 Netflix 会员,本次登录没有意义') logger.debug(f'已成功登录。当前地址为:{self.driver.current_url}') - return True + return u, p, n def __forgot_password(self, netflix_username: str): """ @@ -312,7 +447,7 @@ def __forgot_password(self, netflix_username: str): self.driver.get(Netflix.FORGOT_PASSWORD_URL) - forgot_pwd = self.driver.find_element_by_id('forgot_password_input') + forgot_pwd = self.find_element_by_id('forgot_password_input') forgot_pwd.clear() Netflix.send_keys_delay_random(forgot_pwd, netflix_username) @@ -322,10 +457,8 @@ def __forgot_password(self, netflix_username: str): # 直到页面显示已发送邮件 logger.debug('检测是否已到送信完成画面') - self.wait.until(EC.visibility_of( - self.driver.find_element_by_xpath('//*[@class="login-content"]//h2[@data-uia="email_sent_label"]')), - '查找送信完成元素未果') - + self.find_element_by_xpath('//*[@class="login-content"]//h2[@data-uia="email_sent_label"]', + message='查找送信完成元素未果') logger.info('已发送重置密码邮件到 {},注意查收', netflix_username) return True @@ -335,7 +468,7 @@ def click_forgot_pwd_btn(self): 点击忘记密码按钮 :return: """ - self.driver.find_element_by_class_name('forgot-password-action-button').click() + self.find_element_by_class_name('forgot-password-action-button').click() def __reset_password(self, curr_netflix_password: str, new_netflix_password: str): """ @@ -347,26 +480,26 @@ def __reset_password(self, curr_netflix_password: str, new_netflix_password: str try: self.driver.get(Netflix.RESET_PASSWORD_URL) - curr_pwd = self.driver.find_element_by_id('id_currentPassword') + curr_pwd = self.find_element_by_id('id_currentPassword') curr_pwd.clear() Netflix.send_keys_delay_random(curr_pwd, curr_netflix_password) time.sleep(1) - new_pwd = self.driver.find_element_by_id('id_newPassword') + new_pwd = self.find_element_by_id('id_newPassword') new_pwd.clear() Netflix.send_keys_delay_random(new_pwd, new_netflix_password) time.sleep(1) - confirm_new_pwd = self.driver.find_element_by_id('id_confirmNewPassword') + confirm_new_pwd = self.find_element_by_id('id_confirmNewPassword') confirm_new_pwd.clear() Netflix.send_keys_delay_random(confirm_new_pwd, new_netflix_password) time.sleep(1.1) # 其它设备无需重新登录 - self.driver.find_element_by_xpath('//li[@data-uia="field-requireAllDevicesSignIn+wrapper"]').click() + self.find_element_by_xpath('//li[@data-uia="field-requireAllDevicesSignIn+wrapper"]').click() time.sleep(1) @@ -382,13 +515,13 @@ def input_pwd(self, new_netflix_password: str) -> None: :param new_netflix_password: :return: """ - new_pwd = self.driver.find_element_by_id('id_newPassword') + new_pwd = self.find_element_by_id('id_newPassword') new_pwd.clear() Netflix.send_keys_delay_random(new_pwd, new_netflix_password) time.sleep(2) - confirm_new_pwd = self.driver.find_element_by_id('id_confirmNewPassword') + confirm_new_pwd = self.find_element_by_id('id_confirmNewPassword') confirm_new_pwd.clear() Netflix.send_keys_delay_random(confirm_new_pwd, new_netflix_password) @@ -399,7 +532,7 @@ def click_submit_btn(self): 点击提交输入的密码 :return: """ - self.driver.find_element_by_id('btn-save').click() + self.find_element_by_id('btn-save').click() def element_visibility_of(self, xpath: str, verify_val: bool = False, max_num_of_attempts: int = 3, el_wait_time: int = 2) -> WebElement or None: @@ -413,21 +546,9 @@ def element_visibility_of(self, xpath: str, verify_val: bool = False, :return: """ try: - self.driver.implicitly_wait(2) - - start_time = time.time() - while True: - if time.time() - start_time > el_wait_time: - return None - - try: - # 此处只为找到元素,如果下面不需要验证元素是否有值的话,则使用此处找到的元素 - # 否则下面验值逻辑会重新找到该元素以使用,此处的 el 会被覆盖 - el = self.driver.find_element_by_xpath(xpath) - - break - except Exception: - pass + # 此处只为找到元素,如果下面不需要验证元素是否有值的话,则使用此处找到的元素 + # 否则下面验值逻辑会重新找到该元素以使用,此处的 el 会被覆盖 + el = self.find_element_by_xpath(xpath, timeout=el_wait_time) num = 0 while True: @@ -435,7 +556,7 @@ def element_visibility_of(self, xpath: str, verify_val: bool = False, break # 需要每次循环找到此元素,以确定元素的值是否发生变化 - el = self.driver.find_element_by_xpath(xpath) + el = self.find_element_by_xpath(xpath, timeout=1) if el.tag_name == 'input': val = el.get_attribute('value') @@ -451,11 +572,9 @@ def element_visibility_of(self, xpath: str, verify_val: bool = False, time.sleep(num) - return el if EC.visibility_of(el) else None + return el except Exception: return None - finally: - self.driver.implicitly_wait(Netflix.TIMEOUT) def has_unknown_error_alert(self, error_el_xpath: str = '//div[@class="ui-message-contents"]') -> bool: """ @@ -539,7 +658,7 @@ def __reset_password_via_mail(self, reset_url: str, new_netflix_password: str) - def __pwd_change_result(self): """ 断言密码修改结果 - :return: + :return: """ try: self.wait.until(lambda d: d.current_url == 'https://www.netflix.com/YourAccount?confirm=password') @@ -1111,18 +1230,16 @@ def __recover_name(self, link_el: WebElement, real_name: str) -> bool: try: link_el.click() - WebDriverWait(self.driver, timeout=4.2, poll_frequency=0.94).until( - EC.visibility_of_element_located((By.XPATH, '//button[@data-uia="profile-save-button"]')), - '保存按钮不可点击') + save_btn = self.find_element_by_xpath('//button[@data-uia="profile-save-button"]', timeout=4.2, + poll_frequency=0.94, message='保存按钮尚未准备好') - name_input_el = self.driver.find_element_by_id('profile-name-entry') + name_input_el = self.find_element_by_id('profile-name-entry') name_input_el.clear() name_input_el.send_keys(real_name) - self.driver.find_element_by_xpath('//button[@data-uia="profile-save-button"]').click() + save_btn.click() - WebDriverWait(self.driver, timeout=5, poll_frequency=0.94).until( - EC.visibility_of(self.driver.find_element_by_class_name('profile-link')), '编辑按钮元素不可见') + self.find_element_by_class_name('profile-link', timeout=5, poll_frequency=0.94, message='编辑按钮元素不可见') return True except Exception as e: @@ -1130,20 +1247,21 @@ def __recover_name(self, link_el: WebElement, real_name: str) -> bool: return False - def _logout(self): + def _logout(self, u: str): """ 登出 + :param u: :return: """ try: - logger.debug('尝试登出') + logger.debug(f'尝试登出 [账户:{u}]') self.driver.get(Netflix.LOGOUT_URL) - WebDriverWait(self.driver, timeout=4.9, poll_frequency=0.5).until( - EC.visibility_of_element_located((By.XPATH, '//a[@data-uia="header-login-link"]')), '查找登入元素未果') + self.find_element_by_xpath('//a[@data-uia="header-login-link"]', timeout=4.9, poll_frequency=0.5, + message='查找登入元素未果') - logger.debug('登出成功') + logger.debug(f'登出成功 [账户:{u}]') return True except Exception as e: @@ -1151,55 +1269,187 @@ def _logout(self): return False - def protect_account_name(self): + @staticmethod + def pipeline(*steps): """ - 保护账户名不被修改 + 管道调用 + :param steps: :return: """ - for item in self.MULTIPLE_NETFLIX_ACCOUNTS: - try: - self._login(item.get('u'), item.get('p')) + return reduce(lambda x, y: y(*x) if isinstance(x, tuple) else y(x), steps) - self.driver.get(Netflix.MANAGE_PROFILES_URL) + def __handle_account_name(self, u: str, n: str) -> str: + """ + 处理账户名被篡改的问题 + :param u: + :param n: + :return: + """ + try: + logger.debug('开始处理用户名被篡改的问题') + + self.driver.get(Netflix.MANAGE_PROFILES_URL) + + WebDriverWait(self.driver, timeout=3, poll_frequency=0.94).until( + lambda d: 'ManageProfiles' in d.current_url, f'{u} 可能非会员,无法访问 {Netflix.MANAGE_PROFILES_URL} 地址') + + # 五个子账户,逐个检查 + success_num = 0 + events_count = 0 + for index in range(5): + real_name = n + f'_0{index + 1}' + + link_el = self.driver.find_elements_by_xpath('//a[@class="profile-link"]')[index] # TODO 多元素 + curr_name = link_el.text + + if curr_name != real_name: + logger.info(f'发现用户名被篡改为 【{curr_name}】') + events_count += 1 + + if self.__recover_name(link_el, real_name): + logger.success(f'程式已将 【{curr_name}】 恢复为 【{real_name}】') + + # 成功处理一件,就记录一件 + success_num += 1 + + if success_num: + self.find_element_by_xpath('//span[@data-uia="profile-button"]').click() WebDriverWait(self.driver, timeout=3, poll_frequency=0.94).until( - lambda d: 'ManageProfiles' in d.current_url, - f'{item.get("u")} 可能非会员,无法访问 {Netflix.MANAGE_PROFILES_URL} 地址') + lambda d: 'browse' in d.current_url) + + logger.success(f'用户名已恢复完成,共 {events_count} 件篡改事件,已成功处理 {success_num} 件') + + logger.debug('用户名处理结束') + except Exception as e: + logger.warning(f'处理用户名被篡改问题出错:{str(e)} [账户:{u}]') + finally: + return u + + def is_locked(self, svg_el_xpath: str, time_out: int = 1) -> bool: + """ + 账户是否被锁定 + :param svg_el_xpath: + :return: + """ + try: + self.find_element_by_xpath(svg_el_xpath, timeout=time_out, poll_frequency=0.5, message='不存在 svg 元素') + + return True + except Exception: + return False + + def __unlock_account(self, link_el: WebElement, u: str, p: str) -> bool: + """ + 解锁账户 + :param link_el: + :param p: + :return: + """ + try: + link_el.click() + + input_el = self.find_element_by_xpath('//input[@data-uia="input-account-content-restrictions"]', + timeout=9.4) + input_el.clear() + input_el.send_keys(p) + + time.sleep(0.94) + + self.find_element_by_xpath('//button[@data-uia="btn-account-pin-submit"]').click() + + # 取消勾选锁定 + self.find_element_by_xpath('//label[@for="bxid_lock-profile_true"]').click() + + # 提交 + WebDriverWait(self.driver, timeout=3, poll_frequency=0.5).until( + EC.element_to_be_clickable((By.XPATH, '//button[@data-uia="btn-account-pin-submit"]')), + '提交按钮不可点击').click() + + WebDriverWait(self.driver, timeout=4, poll_frequency=0.94).until(EC.url_contains('YourAccount'), + '未能正确跳回账户管理画面') + + return True + except Exception as e: + logger.error(f'尝试解锁账户【{u}】出错:{str(e)}') + + return False + + def __handle_account_lock(self, u: str, p: str, n: str) -> tuple: + """ + 处理账户被锁 PIN 的问题 + :param u: + :param p: + :param n: + :return: + """ + logger.debug('开始处理账户被锁 PIN 的问题') - # 五个子账户,逐个检查 - success_num = 0 - events_count = 0 - for index in range(5): - real_name = item.get('n') + f'_0{index + 1}' + # 五个子账户,逐个检查 + success_num = 0 + events_count = 0 + for index in range(1, 6): + try: + # 检查是否账户管理画面 + if 'YourAccount' not in self.driver.current_url: + self.driver.get(Netflix.ACCOUNT_URL) + WebDriverWait(self.driver, timeout=4.9, poll_frequency=0.94).until( + lambda d: 'YourAccount' in d.current_url, f'{u} 可能非会员,无法访问 {Netflix.ACCOUNT_URL} 地址') + + # 定位到账户区域 + self.find_element_by_xpath('//div[@class="profile-hub"]', scroll_into_view=True, block='center') - link_el = self.driver.find_elements_by_xpath('//a[@class="profile-link"]')[index] - curr_name = link_el.text + svg_el_xpath = f'(//li[contains(@class, "single-profile")])[{index}]//*[contains(@class, "svg-icon-profile-lock")]' + if self.is_locked(svg_el_xpath): + single_profile_el = self.find_element_by_xpath(f'//li[@id="profile_{index - 1}"]') - if curr_name != real_name: - logger.info(f'发现用户名被篡改为 【{curr_name}】') - events_count += 1 + # 展开列表选项 + single_profile_el.find_element_by_xpath('.//button[@class="profile-action-icons"]').click() - if self.__recover_name(link_el, real_name): - logger.success(f'程式已将 【{curr_name}】 恢复为 【{real_name}】') + account_name = single_profile_el.find_element_by_xpath('.//div[@class="profile-summary"]/h3').text - # 成功处理一件,就记录一件 - success_num += 1 + logger.info(f'发现【{account_name}】被锁 PIN') + events_count += 1 - if success_num: - self.driver.find_element_by_xpath('//span[@data-uia="profile-button"]').click() + # 变更链接 + link_el = single_profile_el.find_element_by_xpath( + './/a[@data-uia="action-profile-lock"]//div[@class="profile-change"]') + + # 解锁 + if self.__unlock_account(link_el, u, p): + logger.success(f'【{account_name}】已解除锁定') + success_num += 1 + else: + logger.debug(f'第 {index} 个账户是正常的,无需解锁') + except Exception as e: + logger.warning(f'处理账户被锁 PIN 问题出错:{str(e)} [账户:{u}]') - WebDriverWait(self.driver, timeout=3, poll_frequency=0.94).until( - lambda d: 'browse' in d.current_url) + if success_num: + logger.success(f'解锁完成,共 {events_count} 件被锁事件,已成功处理 {success_num} 件') - logger.success(f'用户名已恢复完成,共 {events_count} 件篡改事件,已成功处理 {success_num} 件') + logger.debug('账户被锁 PIN 问题处理结束') - logger.debug('用户名处理结束') + return u, n - self._logout() + def protect_account(self): + """ + 保护账户 + + 防止篡改与锁定 + :return: + """ + for item in self.MULTIPLE_NETFLIX_ACCOUNTS: + u = item.get('u') + p = item.get('p') + n = item.get('n') + + try: + self.pipeline((u, p, n), self._login, self.__handle_account_lock, self.__handle_account_name, + self._logout) except UserWarning as e: logger.debug(str(e)) except Exception as e: - logger.warning(f'用户名篡改检测出错:{str(e)} [账户:{item.get("u")}]') + logger.warning(f'保护用户出错:{str(e)} [账户:{u}]') @catch_exception def run(self): @@ -1291,12 +1541,12 @@ def run(self): time.sleep(3) - # 保护账户用户名 - if int(os.getenv('ENABLE_ACCOUNT_NAME_PROTECTION', 0)): + # 保护账户免受篡改与锁定 + if int(os.getenv('ENABLE_ACCOUNT_PROTECTION', 0)): now = time.time() if now - last_protection_time >= 124: last_protection_time = now - self.protect_account_name() + self.protect_account() logger.debug('开始下一轮监听') diff --git a/utils/version.py b/utils/version.py index 5319731..fee2f34 100644 --- a/utils/version.py +++ b/utils/version.py @@ -7,4 +7,4 @@ @time 10:02 """ -__version__ = 'v0.6' +__version__ = 'v0.7.1'