diff --git a/.github/workflows/SourceGit.yaml b/.github/workflows/SourceGit.yaml new file mode 100644 index 0000000000..4a5fefda18 --- /dev/null +++ b/.github/workflows/SourceGit.yaml @@ -0,0 +1,53 @@ +name: Claim EpicGames from SourceGit + +on: + workflow_dispatch: + schedule: + - cron: "12 22 * * *" + +jobs: + setup: + env: + EPIC_EMAIL: ${{ secrets.EPIC_EMAIL }} + TOKEN: ${{ secrets.PUSHTOKEN }} + EPIC_PASSWORD: ${{ secrets.EPIC_PASSWORD }} + FAKE_HASH: "Automated deployment @ $(date '+%Y-%m-%d %H:%M:%S') Asia/Shanghai" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: 初始化环境 & 拉取项目 + run: | + sudo timedatectl set-timezone "Asia/Shanghai" + git clone https://github.com/QIN2DIM/epic-awesome-gamer.git epic + cp epic/requirements.txt . + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: 'pip' # caching pip dependencies + - name: 安装依赖 + run: | + pip install -r epic/requirements.txt + pip install requests + playwright install firefox + playwright install-deps firefox + - name: 启动项目 + continue-on-error: true + timeout-minutes: 10 + run: | + if [ -d "user_data_dir" ];then cp -rfp user_data_dir epic/; fi + echo "{}" > epic/src/config.json + cd epic/src/ && python3 claim.py + - name: Setup GIT user + uses: fregante/setup-git-user@v1 + - name: 缓存身份令牌 + run: | + if [ -d "epic/user_data_dir" ];then cp -rfp epic/user_data_dir . ; fi + if [ -d "epic/logs" ];then cp -rfp epic/logs . ; fi + echo "${{ env.FAKE_HASH }}" > _token + rm -rf epic + git add . + git commit -m "${{ env.FAKE_HASH }}" + git push -f + - name: PUSHPLUS + run: | + python PUSHPLUS.py diff --git a/.github/workflows/ci_docker.yaml b/.github/workflows/ci_docker_fish.yaml similarity index 69% rename from .github/workflows/ci_docker.yaml rename to .github/workflows/ci_docker_fish.yaml index cde2e035bd..80b91d405e 100644 --- a/.github/workflows/ci_docker.yaml +++ b/.github/workflows/ci_docker_fish.yaml @@ -4,14 +4,16 @@ on: workflow_dispatch: push: branches: +# - update-challenge - main paths-ignore: - ".github/**" - "README.md" - "LICENSE" - ".gitignore" + - ".deepsource.toml" env: - IMAGE_TAG: ${{ secrets.DOCKERHUB_USERNAME }}/awesome-epic:daddy + IMAGE_TAG: ${{ secrets.DOCKERHUB_USERNAME }}/awesome-epic:fish jobs: docker: @@ -22,22 +24,22 @@ jobs: uses: actions/checkout@v2 - name: Scaffold Init - run: cd src && cp config-sample.yaml config.yaml + run: cd src && echo '{}' > config.json - name: Set up QEMU - uses: docker/setup-qemu-action@v1 + uses: docker/setup-qemu-action@v2 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v1 + uses: docker/setup-buildx-action@v2 - name: Login to DockerHub - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push - uses: docker/build-push-action@v2 + uses: docker/build-push-action@v3 with: context: . platforms: linux/amd64 # linux/arm64 diff --git a/.github/workflows/samples/docker-compose.yaml b/.github/workflows/samples/docker-compose.yaml new file mode 100644 index 0000000000..b95dd362be --- /dev/null +++ b/.github/workflows/samples/docker-compose.yaml @@ -0,0 +1,19 @@ +version: "3.7" +services: + + epic-games-claimer: + image: ech0sec/awesome-epic:fish + init: true + command: "python3 claim.py" + volumes: + - "/home/epic/user_data_dir:/home/epic/user_data_dir" + - "/home/epic/logs:/home/epic/logs" + environment: + EPIC_EMAIL: + EPIC_PASSWORD: + +# ====================================== +# [🍜] Documentaion +# ====================================== +# docker-compose https://docs.docker.com/compose/reference/ +# apprise https://github.com/caronc/apprise diff --git a/.gitignore b/.gitignore index f31568d8a4..1e26644624 100644 --- a/.gitignore +++ b/.gitignore @@ -128,13 +128,9 @@ dmypy.json # Pyre type checker .pyre/ .idea -src/database -src/model -src/config.yaml tests/ -docs-src/ -docs/ -.github/workflows/automated.yaml -.github/workflows/automated_claim.yaml -.github/workflows/sync_repo.yaml -docker-compose.yaml +archivist/ +src/*.json +user_data_dir/ +logs/ +database/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 8e91625784..8a93f3c6b2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,33 +1,13 @@ -#FROM amazonlinux:latest as builder -# -#WORKDIR /home/epic -# -#RUN yum update -y \ -# && yum install -y python3 wget -# -#COPY requirements.txt ./ -#RUN pip3 install --no-cache-dir -r requirements.txt -# -#COPY src ./ -#RUN wget https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm \ -# && yum localinstall -y google-chrome-stable_current_x86_64.rpm \ -# && rm google-chrome-stable_current_x86_64.rpm \ -# && wget -P model/ https://github.com/QIN2DIM/hcaptcha-challenger/releases/download/model/yolov5n6.onnx -# +FROM python:3.10 as builder -FROM python:3.10-slim as builder - -WORKDIR /home/epic +WORKDIR /home/epic/src COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt RUN apt update -y \ - && apt install -y wget + && apt install -y wget xvfb tini \ + && playwright install firefox \ + && playwright install-deps firefox -COPY src ./ -RUN wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb \ - && apt install -y ./google-chrome-stable_current_amd64.deb \ - && rm ./google-chrome-stable_current_amd64.deb \ - && wget -P model/ https://github.com/QIN2DIM/hcaptcha-challenger/releases/download/model/yolov5n6.onnx \ - && wget -P model/ https://github.com/QIN2DIM/hcaptcha-challenger/releases/download/model/rainbow.yaml +COPY src ./ \ No newline at end of file diff --git a/PUSHPLUS.py b/PUSHPLUS.py new file mode 100644 index 0000000000..b2ab1eade5 --- /dev/null +++ b/PUSHPLUS.py @@ -0,0 +1,7 @@ +import requests +import os +token = os.environ['TOKEN'] +title= 'Epic-FreeGamer' +content ='Epic-FreeGamer任务已执行' +url = 'http://www.pushplus.plus/send?token='+token+'&title='+title+'&content='+content +requests.get(url) diff --git a/README.md b/README.md index f6b1fbe845..8eca2cd9be 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,13 @@

EPIC 免费人

-

🚀 优雅地领取 Epic 免费游戏

+

🍷 Gracefully claim weekly free games from Epic Store.

- +
- - - + Discord +

@@ -16,20 +15,35 @@ ![scaffold-get-demo-output-small](https://github.com/QIN2DIM/img_pool/blob/main/img/scaffold-get-demo-output-small.gif) -## 项目简介 👋 +## Introduction 👋 -[Epic AwesomeGamer](https://github.com/QIN2DIM/epic-awesome-gamer) 帮助玩家优雅地领取 Epic 免费游戏。 +[Epic 免费人](https://github.com/QIN2DIM/epic-awesome-gamer) 帮助玩家优雅地领取免费游戏。内置 [hcaptcha-challenger](https://github.com/QIN2DIM/hcaptcha-challenger) AI 模块,直面人机挑战。 -## 快速上手 🛴 +## Guides -- :gear: [技术文档](https://www.wolai.com/vAiu9mSp6G15xhWeoEPUn2) -- :small_red_triangle: 注意事项 -- :loudspeaker: 更新日志 -- :world_map: [开源计划](https://github.com/QIN2DIM/epic-awesome-gamer/issues/1) +[ [`简体中文`](https://echosec.notion.site/Epic-7c74f1e29117420dbac5551e4b031f82?pvs=4) ] [ [`English`](https://echosec.notion.site/Epic-Awesome-Gamer-ba870cdf64c149e69f417448b1eb83c5?pvs=4) ] -## 联系我们 📧 +## Features -> 本项目由海南大学机器人与人工智能实验室(`RobAI-Lab`)提供维护。 -- [**Email**](mailto:HainanU_arai@163.com?subject=CampusDailyAutoSign-ISSUE) **||** [**Home**](https://a-rai.github.io/) **||** [**TG**](https://t.me/joinchat/HlB9SQJubb5VmNU5) +### Task + +| Mode | Target | Progress | +| ---------- | ------------------------------------------------------------ | -------- | +| epic-games | [Epic Games Store](https://www.epicgames.com/store/free-games) | ✅ | +| unreal | [Unreal Engine](https://www.unrealengine.com/) | 🚧 | +| gog | [GOG](https://www.gog.com/) | 🚧 | +| apg | [Amazon Prime Gaming](https://gaming.amazon.com/) | 🚧 | +| xbox | [Xbox Live Games with Gold](https://www.xbox.com/en-US/live/gold#gameswithgold) | 🚧 | + +### Component + +| Demand | Support | +| :------------------- | :------ | +| hCaptcha Solver | ✅ | +| Docker development | ✅ | +| Persistent context @multi-user | ✅ | +| Rolling Upgrade | 🚧 | +| Epicgames DLC | 🚧 | +| 2FA OTP support | 🚧 | diff --git a/_token b/_token new file mode 100644 index 0000000000..38790b853c --- /dev/null +++ b/_token @@ -0,0 +1 @@ +Automated deployment @ 2024-10-23 06:33:15 Asia/Shanghai diff --git a/requirements.txt b/requirements.txt index 8c98605794..bdf5340ba5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,37 +1 @@ -# {{< Scaffolding dependency >}} -fire>=0.4.0 - -# {{< Log system >}} -loguru>=0.5.3 -pyyaml>=6.0 - -# {{< Timed task >}} -apscheduler>=3.8.1 -pytz>=2021.3 - -# {{< Interface /Data cleaning >}} -requests>=2.27.1 -cloudscraper>=1.2.58 -bs4>=0.0.1 -beautifulsoup4>=4.10.0 -lxml>=4.7.1 -aiohttp>=3.8.1 - -# {{< Object Detection >}} -opencv-python>=4.5.5.62 -scikit-image>=0.19.2 -numpy==1.21.5 - -# {{< Challenger Drive >}} -gevent>=21.12.0 -selenium>=4.1.0 -webdriver_manager==3.5.2 -undetected_chromedriver==3.1.3 - -# {{< Message Push >}} -apprise>=0.9.6 - -# {{< Conflicting dependencies >}} -# Manually install according to your needs. -# Used to skip `tls in tls` authentication. -# urllib3==1.25.11 +hcaptcha-challenger[playwright] diff --git a/src/apis/__init__.py b/src/apis/__init__.py deleted file mode 100644 index 35c4df7bf2..0000000000 --- a/src/apis/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: diff --git a/src/apis/scaffold/__init__.py b/src/apis/scaffold/__init__.py deleted file mode 100644 index 5ba804a652..0000000000 --- a/src/apis/scaffold/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import os -import random -import sys - -_THE_PROPHECY = """ -# ==================================================================================================== -# Ithlinne's Prophecy -# ==================================================================================================== -# Verily I say unto you,the era of the sword and axe is nigh, the era of the wolf's blizzard. -# The Time of the White Chill and the White Light is nigh, the Time of Madness and the Time of Contempt: -# Tedd Deireádh, the Time of End. The world will die amidst frost and be reborn with the new sun. -# It will be reborn of Elder Blood, of Hen Ichaer, of the seed that has been sown. -# A seed which will not sprout but burst into flame. -# Ess'tuath esse! Thus it shall be! Watch for the signs! -# What signs these shall be, I say unto you: -# first the earth will flow with the blood of claim , the Blood of Epic . -# ==================================================================================================== -""" -if random.uniform(0, 1) > 0.711: - for policy in ["epic", "claim"]: - if policy in os.getenv("GITHUB_REPOSITORY", "").lower(): - print(f"[EXIT] 仓库名出现非法关键词 `{policy}`") - sys.exit() - print(_THE_PROPHECY) diff --git a/src/apis/scaffold/challenge.py b/src/apis/scaffold/challenge.py deleted file mode 100644 index ec841f9166..0000000000 --- a/src/apis/scaffold/challenge.py +++ /dev/null @@ -1,41 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from typing import Optional - -from services.bricklayer import Bricklayer -from services.settings import PATH_USR_COOKIES, logger -from services.utils import ToolBox - -bricklayer = Bricklayer() - - -def run(silence: Optional[bool] = None): - """刷新上下文身份令牌""" - logger.info( - ToolBox.runtime_report( - motive="STARTUP", action_name="ScaffoldChallenge", message="正在更新身份令牌..." - ) - ) - - # [🌀] 激活人机挑战 - if not bricklayer.cookie_manager.refresh_ctx_cookies(silence=silence): - return - - # [🌀] 读取新的身份令牌 - ctx_cookies = bricklayer.cookie_manager.load_ctx_cookies() - - # [🌀] 保存身份令牌 - with open(PATH_USR_COOKIES, "w", encoding="utf8") as file: - file.write(ToolBox.transfer_cookies(ctx_cookies)) - - logger.success( - ToolBox.runtime_report( - motive="GET", - action_name="ChallengeRunner", - message="玩家饼干已到货。", - path=PATH_USR_COOKIES, - ) - ) diff --git a/src/apis/scaffold/claimer.py b/src/apis/scaffold/claimer.py deleted file mode 100644 index 8e38655402..0000000000 --- a/src/apis/scaffold/claimer.py +++ /dev/null @@ -1,21 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from typing import Optional - -from services.deploy import ClaimerScheduler -from services.settings import logger - - -@logger.catch() -def deploy(platform: Optional[str] = None): - """在微小容器中部署 `claim` 定时调度任务""" - ClaimerScheduler(silence=True).deploy_jobs(platform) - - -@logger.catch() -def run(silence: Optional[bool] = None): - """运行 `claim` 单步子任务,认领周免游戏""" - ClaimerScheduler(silence=silence).job_loop_claim() diff --git a/src/apis/scaffold/get.py b/src/apis/scaffold/get.py deleted file mode 100644 index 3ee4d1b0a7..0000000000 --- a/src/apis/scaffold/get.py +++ /dev/null @@ -1,127 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import random -from typing import Optional - -from selenium.common.exceptions import WebDriverException - -from services.bricklayer import Bricklayer -from services.explorer import Explorer -from services.settings import logger -from services.utils import CoroutineSpeedup, ToolBox - -SILENCE = True - -bricklayer = Bricklayer(silence=SILENCE) -explorer = Explorer(silence=SILENCE) - - -class SpawnBooster(CoroutineSpeedup): - """协程助推器 并发执行片段代码""" - - def __init__( - self, - docker, - ctx_cookies, - power: Optional[int] = None, - debug: Optional[bool] = None, - ): - super().__init__(docker=docker, power=power) - - self.debug = False if debug is None else debug - self.power = min(4, 4 if power is None else power) - self.action_name = "SpawnBooster" - - self.ctx_cookies = ctx_cookies - - if self.docker: - random.shuffle(self.docker) - - def control_driver(self, task, *args, **kwargs): - url = task - - # 运行前置检查 - response = explorer.game_manager.is_my_game( - ctx_cookies=self.ctx_cookies, page_link=url - ) - - # 识别未在库的常驻周免游戏 - if response.get("status") is False: - logger.debug( - ToolBox.runtime_report( - motive="BUILD", - action_name=self.action_name, - message="🛒 正在为玩家领取免费游戏", - progress=f"[{self.progress()}]", - url=url, - ) - ) - - # 启动 Bricklayer 获取免费游戏 - try: - bricklayer.get_free_game( - page_link=url, ctx_cookies=self.ctx_cookies, refresh=False - ) - except WebDriverException as error: - if self.debug: - logger.exception(error) - logger.error( - ToolBox.runtime_report( - motive="QUIT", - action_name="SpawnBooster", - message="未知错误", - progress=f"[{self.progress()}]", - url=url, - ) - ) - - def killer(self): - logger.success( - ToolBox.runtime_report( - motive="OVER", action_name=self.action_name, message="✔ 任务队列已清空" - ) - ) - - -def join(trace: bool = False, cache: bool = True): - """ - 一键搬空免费商店 - - 需要确保上下文身份令牌有效,可通过 `challenge` 脚手架强制刷新。 - :param cache: - :param trace: - :return: - """ - from gevent import monkey - - monkey.patch_all(ssl=False) - - logger.info( - ToolBox.runtime_report( - motive="STARTUP", action_name="ScaffoldGet", message="🔨 正在为玩家领取免费游戏" - ) - ) - - # [🔨] 读取有效的身份令牌 - ctx_cookies = bricklayer.cookie_manager.load_ctx_cookies() - if not bricklayer.cookie_manager.is_available_cookie(ctx_cookies): - logger.critical( - ToolBox.runtime_report( - motive="SKIP", - action_name="ScaffoldGet", - message="身份令牌不存在或失效,手动执行 `challenge` 指令更新身份令牌。", - ) - ) - return - - # [🔨] 缓存免费商城数据 - urls = explorer.game_manager.load_game_objs(only_url=True) - if not cache or not urls: - urls = explorer.discovery_free_games(ctx_cookies=ctx_cookies, cover=True) - - # [🔨] 启动 Bricklayer 搬空免费商店 - # 启动一轮协程任务,执行效率受限于本地网络带宽 - SpawnBooster(ctx_cookies=ctx_cookies, docker=urls, power=4, debug=trace).speedup() diff --git a/src/apis/scaffold/install.py b/src/apis/scaffold/install.py deleted file mode 100644 index 6c6286d99c..0000000000 --- a/src/apis/scaffold/install.py +++ /dev/null @@ -1,72 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/20 16:16 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import sys -import webbrowser - -from webdriver_manager.chrome import ChromeDriverManager -from webdriver_manager.utils import get_browser_version_from_os - -from services.settings import DIR_MODEL, logger, PATH_RAINBOW -from services.utils import SKRecognition -from services.utils import YOLO -from services.utils import get_challenge_ctx - - -def _download_model(onnx_prefix: str = None): - """下载 YOLOv4 目标检测模型""" - logger.debug("Downloading YOLOv5(ONNX) object detection model...") - - YOLO(dir_model=DIR_MODEL, onnx_prefix=onnx_prefix).download_model() - - -def _download_driver(): - """下载浏览器驱动""" - logger.debug("Downloading ChromeDriver...") - - # 自动下载并授权对应版本的 ChromeDriver - browser_version = get_browser_version_from_os("google-chrome") - if browser_version != "UNKNOWN": - return ChromeDriverManager(version="latest").install() - - # 环境变量中缺少 `google-chrome` 提示玩家手动安装 - logger.critical("当前环境变量缺少 `google-chrome`,请为你的设备手动安装 Chrome 浏览器。") - logger.info( - "Ubuntu: https://linuxize.com/post/how-to-install-google-chrome-web-browser-on-ubuntu-20-04/" - ) - logger.info( - "CentOS 7/8: https://linuxize.com/post/how-to-install-google-chrome-web-browser-on-centos-7/" - ) - if "linux" not in sys.platform: - webbrowser.open("https://www.google.com/chrome/") - - logger.info("安装完毕后重新执行 `install` 脚手架指令。") - - -def _download_rainbow(): - logger.debug("Downloading Reinforcement of Memory | Rainbow Table...") - - SKRecognition.sync_rainbow(path_rainbow=PATH_RAINBOW, convert=True) - - -def run(onnx_prefix: str = None): - """下载项目运行所需的各项依赖""" - logger.debug("正在下载系统依赖") - _download_driver() - _download_model(onnx_prefix=onnx_prefix) - _download_rainbow() - logger.success("系统依赖下载完毕") - - -@logger.catch() -def test(): - """检查挑战者驱动版本是否适配""" - ctx = get_challenge_ctx(silence=True) - try: - ctx.get("https://www.baidu.com") - finally: - ctx.quit() - - logger.success("驱动适配成功") diff --git a/src/claim.py b/src/claim.py new file mode 100644 index 0000000000..bfbc8f9763 --- /dev/null +++ b/src/claim.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# Time : 2023/8/16 5:14 +# Author : QIN2DIM +# GitHub : https://github.com/QIN2DIM +# Description: +from __future__ import annotations + +import asyncio +import os +import sys +from dataclasses import dataclass, field +from typing import List + +import importlib_metadata +from hcaptcha_challenger import install +from hcaptcha_challenger.agents import Malenia +from loguru import logger +from playwright.async_api import BrowserContext, async_playwright + +from epic_games import ( + EpicPlayer, + EpicGames, + Game, + CompletedOrder, + get_promotions, + get_order_history, +) + +self_supervised = True + + +@dataclass +class ISurrender: + player: EpicPlayer + + promotions: List[Game] = field(default_factory=list) + ctx_cookies_is_available: bool = None + headless: bool = True + locale: str = "en-US" + + _orders = None + _namespaces = None + _pros = None + + def __post_init__(self): + self._orders: List[CompletedOrder] = [] + self._namespaces: List[str] = [] + self._pros: List[Game] = [] + + @classmethod + def from_epic(cls): + return cls(player=EpicPlayer.from_account()) + + @property + def cookies(self): + return self.player.cookies + + def create_tasks(self): + if not self._orders: + self._orders = get_order_history(self.cookies) + if not self._namespaces: + self._namespaces = [order.namespace for order in self._orders] + if not self._pros: + self._pros = get_promotions() + for pro in self._pros: + logger.debug("Put task", title=pro.title, url=pro.url) + + self.promotions = [p for p in self._pros if p.namespace not in self._namespaces] + + async def prelude_with_context(self, context: BrowserContext) -> bool | None: + url = "https://www.epicgames.com/account/creator-programs" + page = context.pages[0] + await page.goto(url, wait_until="networkidle") + if not page.url.startswith(url): + return + + self.ctx_cookies_is_available = True + await context.storage_state(path=self.player.ctx_cookie_path) + cookies = self.player.ctx_cookies.reload(self.player.ctx_cookie_path) + self.player.cookies = cookies + + self.create_tasks() + + if not self.promotions: + logger.success( + "Pass claim task", + reason="All free games are in my library", + stage="context-prelude", + ) + return True + + async def claim_epic_games(self, context: BrowserContext): + page = context.pages[0] + epic = EpicGames.from_player(self.player, page=page, self_supervised=self_supervised) + + if not self.ctx_cookies_is_available: + logger.info("Try to flush cookie", task="claim_epic_games") + if await epic.authorize(page): + cookies = await epic.flush_token(context) + self.player.cookies = cookies + else: + logger.error("Exit task", reason="Failed to flush token") + return + + if not self.promotions: + self.create_tasks() + if not self.promotions: + logger.success( + "Pass claim task", reason="All free games are in my library", stage="claim-games" + ) + return + + single_promotions = [] + bundle_promotions = [] + for p in self.promotions: + if "bundles" in p.url: + bundle_promotions.append(p) + else: + single_promotions.append(p) + + if single_promotions: + await epic.claim_weekly_games(page, single_promotions) + if bundle_promotions: + await epic.claim_bundle_games(page, bundle_promotions) + + @logger.catch + async def stash(self): + if "linux" in sys.platform and "DISPLAY" not in os.environ: + self.headless = True + + logger.info( + "run", + image="20231121", + version=importlib_metadata.version("hcaptcha-challenger"), + role="EpicPlayer", + headless=self.headless, + ) + + async with async_playwright() as p: + context = await p.firefox.launch_persistent_context( + user_data_dir=self.player.browser_context_dir, + record_video_dir=self.player.record_dir, + record_har_path=self.player.record_har_path, + headless=self.headless, + locale=self.locale, + args=["--hide-crash-restore-bubble"], + ) + await Malenia.apply_stealth(context) + if not await self.prelude_with_context(context): + install(upgrade=True, clip=True) + await self.claim_epic_games(context) + await context.close() + + +async def run(): + agent = ISurrender.from_epic() + agent.headless = False + await agent.stash() + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/src/config-sample.yaml b/src/config-sample.yaml deleted file mode 100644 index 9d5e4859ff..0000000000 --- a/src/config-sample.yaml +++ /dev/null @@ -1,23 +0,0 @@ -# [√]需要手动修改 [-]不推荐修改 [※]可选项,不影响系统运行 - -# =================================================== -# [√] 账号信息 -# --------------------------------------------------- -# Tips:不建议在公有库上创建工作流运行项目,有仓库禁用风险 -# =================================================== -EPΙC_EMAΙL: "" -EPΙC_PASSWΟRD: "" -# =================================================== -# [√※] 通信鸽 -# --------------------------------------------------- -# 一些主流的通信接口,用于反射运行报告 -# Docs https://github.com/caronc/apprise -# =================================================== -message_pusher_settings: - enable: true - pusher: - PUSHER_EMAIL: "" - PUSHER_DINGTALK: "" - PUSHER_TELEGRAM: "" - PUSHER_SERVERCHAN: "" - PUSHER_DISCORD: "" diff --git a/src/epic_games/__init__.py b/src/epic_games/__init__.py new file mode 100644 index 0000000000..15cd18d933 --- /dev/null +++ b/src/epic_games/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +# Time : 2022/1/16 0:24 +# Author : QIN2DIM +# Github : https://github.com/QIN2DIM +# Description: +from .agent import EpicGames, Game, CompletedOrder, get_promotions, get_order_history + +from .player import EpicPlayer + +__all__ = [ + "EpicGames", + "Game", + "CompletedOrder", + "get_order_history", + "get_promotions", + "EpicPlayer", +] diff --git a/src/epic_games/agent.py b/src/epic_games/agent.py new file mode 100644 index 0000000000..cbc67d926e --- /dev/null +++ b/src/epic_games/agent.py @@ -0,0 +1,469 @@ +# -*- coding: utf-8 -*- +# Time : 2023/8/14 23:16 +# Author : QIN2DIM +# Github : https://github.com/QIN2DIM +# Description: +from __future__ import annotations + +import json +from contextlib import suppress +from dataclasses import dataclass, field +from json import JSONDecodeError +from pathlib import Path +from typing import List, Dict, Literal + +import httpx +from loguru import logger +from playwright.async_api import BrowserContext, expect, TimeoutError, Page, FrameLocator, Locator +from tenacity import * + +from epic_games.player import EpicPlayer +from utils import from_dict_to_model, AgentG + +# fmt:off +URL_CLAIM = "https://store.epicgames.com/en-US/free-games" +URL_LOGIN = f"https://www.epicgames.com/id/login?lang=en-US&noHostRedirect=true&redirectUrl={URL_CLAIM}" +URL_PROMOTIONS = "https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions" +URL_PRODUCT_PAGE = "https://store.epicgames.com/en-US/p/" +URL_PRODUCT_BUNDLES = "https://store.epicgames.com/en-US/bundles/" +URL_ORDER_HISTORY = "https://www.epicgames.com/account/v2/payment/ajaxGetOrderHistory" +URL_CART = "https://store.epicgames.com/en-US/cart" +URL_CART_SUCCESS = "https://store.epicgames.com/en-US/cart/success" +# ----- +URL_STORE_EXPLORER = "https://store.epicgames.com/en-US/browse?sortBy=releaseDate&sortDir=DESC&priceTier=tierFree&count=40" +URL_STORE_EXPLORER_GRAPHQL = ( + "https://store.epicgames.com/graphql?operationName=searchStoreQuery" + '&variables={"category":"games/edition/base","comingSoon":false,"count":80,"freeGame":true,"keywords":"","sortBy":"releaseDate","sortDir":"DESC","start":0,"tag":"","withPrice":true}' + '&extensions={"persistedQuery":{"version":1,"sha256Hash":"13a2b6787f1a20d05c75c54c78b1b8ac7c8bf4efc394edf7a5998fdf35d1adb0"}}' +) + +# fmt:on + + +@dataclass +class CompletedOrder: + offerId: str + namespace: str + + +@dataclass +class Game: + url: str + namespace: str + title: str + thumbnail: str + id: str + in_library = None + + +class CommonHandler: + @staticmethod + async def any_license(page: Page): + with suppress(TimeoutError): + await page.click("//label[@for='agree']", timeout=2000) + accept = page.locator("//button//span[text()='Accept']") + if await accept.is_enabled(): + await accept.click() + + @staticmethod + async def move_to_purchase_container(page: Page): + wpc = page.frame_locator("//iframe[@class='']") + payment_btn = wpc.locator("//div[@class='payment-order-confirm']") + with suppress(Exception): + await expect(payment_btn).to_be_attached() + await page.wait_for_timeout(2000) + await payment_btn.click(timeout=6000) + + return wpc, payment_btn + + @staticmethod + async def uk_confirm_order(wpc: FrameLocator): + # <-- Handle UK confirm-order + with suppress(TimeoutError): + accept = wpc.locator( + "//button[contains(@class, 'payment-confirm__btn payment-btn--primary')]" + ) + if await accept.is_enabled(timeout=5000): + await accept.click() + return True + + @staticmethod + @retry( + retry=retry_if_exception_type(TimeoutError), + wait=wait_fixed(0.5), + stop=stop_after_attempt(15), + reraise=True, + ) + async def insert_challenge( + solver: AgentG, + page: Page, + wpc: FrameLocator, + payment_btn: Locator, + recur_url: str, + is_uk: bool, + ): + response = await solver.execute(window="free") + logger.debug("task done", sattus=f"{solver.status.CHALLENGE_SUCCESS}") + + match response: + case solver.status.CHALLENGE_BACKCALL | solver.status.CHALLENGE_RETRY: + await wpc.locator("//a[@class='talon_close_button']").click() + await page.wait_for_timeout(1000) + if is_uk: + await CommonHandler.uk_confirm_order(wpc) + await payment_btn.click(delay=200) + case solver.status.CHALLENGE_SUCCESS: + await page.wait_for_url(recur_url) + return + + @staticmethod + async def empty_cart(page: Page, wait_rerender: int = 30) -> bool | None: + """ + URL_CART = "https://store.epicgames.com/en-US/cart" + URL_WISHLIST = "https://store.epicgames.com/en-US/wishlist" + //span[text()='Your Cart is empty.'] + + Args: + wait_rerender: + page: + + Returns: + + """ + has_paid_free = False + + try: + # Check all items in the shopping cart + cards = await page.query_selector_all("//div[@data-testid='offer-card-layout-wrapper']") + + # Move paid games to wishlist games + for card in cards: + is_free = await card.query_selector("//span[text()='Free']") + if not is_free: + has_paid_free = True + wishlist_btn = await card.query_selector( + "//button//span[text()='Move to wishlist']" + ) + await wishlist_btn.click() + + # Wait up to 60 seconds for the page to re-render. + # Usually it takes 1~3s for the web page to be re-rendered + # - Set threshold for overflow in case of poor Epic network + # - It can also prevent extreme situations, such as: the user’s shopping cart has nearly a hundred products + if has_paid_free and wait_rerender: + wait_rerender -= 1 + await page.wait_for_timeout(2000) + return await CommonHandler.empty_cart(page, wait_rerender) + return True + except TimeoutError as err: + logger.warning("Failed to empty shopping cart", err=err) + return False + + +@dataclass +class EpicGames: + player: EpicPlayer + """ + Agent control + """ + + _solver: AgentG = None + """ + Module for anti-captcha + """ + + _promotions: List[Game] = field(default_factory=list) + """ + Free promotional items for the week, + considered metadata for task sequence of the agent + """ + + @classmethod + def from_player( + cls, player: EpicPlayer, *, page: Page, tmp_dir: Path | None = None, **solver_opt + ): + """尽可能早地实例化,用于部署 captcha 事件监听器""" + return cls( + player=player, _solver=AgentG.from_page(page=page, tmp_dir=tmp_dir, **solver_opt) + ) + + @property + def handle(self): + return CommonHandler + + @property + def promotions(self) -> List[Game]: + self._promotions = self._promotions or get_promotions() + return self._promotions + + async def _login(self, page: Page) -> str | None: + async def insert_challenge(stage: Literal["email_exists_prod", "login_prod"]): + fall_in_challenge = False + + for _ in range(15): + if stage == "login_prod": + if not fall_in_challenge: + with suppress(TimeoutError): + await page.wait_for_url(URL_CART_SUCCESS, timeout=3000) + break + logger.debug("Attack challenge", stage=stage) + elif stage == "email_exists_prod": + if not fall_in_challenge: + with suppress(TimeoutError): + await page.type("#password", "", timeout=3000) + break + logger.debug("Attack challenge", stage=stage) + fall_in_challenge = True + result = await self._solver.execute(window=stage) + logger.debug("Parse result", stage=stage, result=result) + match result: + case self._solver.status.CHALLENGE_BACKCALL: + await page.click("//a[@class='talon_close_button']") + await page.wait_for_timeout(1000) + await page.click("#sign-in", delay=200) + case self._solver.status.CHALLENGE_RETRY: + continue + case self._solver.status.CHALLENGE_SUCCESS: + if stage == "signin" and not self._solver.qr_queue.empty(): + continue + with suppress(TimeoutError): + await page.wait_for_url(URL_CLAIM) + break + return + + await page.goto(URL_CLAIM, wait_until="domcontentloaded") + if "false" == await page.locator("//egs-navigation").get_attribute("isloggedin"): + await page.goto(URL_LOGIN, wait_until="domcontentloaded") + logger.info("login-with-email", url=page.url) + + # {{< SIGN IN PAGE >}} + await page.fill("#email", self.player.email) + await page.click("//button[@aria-label='Continue']") + + # {{< INSERT CHALLENGE - email_exists_prod >}} + await insert_challenge(stage="email_exists_prod") + + # {{< NESTED PAGE >}} + await page.type("#password", self.player.password) + await page.click("#sign-in") + + # {{< INSERT CHALLENGE - login_prod >}} + await insert_challenge(stage="login_prod") + + logger.success("login", result="Successfully refreshed tokens") + await page.goto(URL_CLAIM, wait_until="domcontentloaded") + return self._solver.status.CHALLENGE_SUCCESS + + async def authorize(self, page: Page): + for i in range(3): + try: + match await self._login(page): + case self._solver.status.CHALLENGE_SUCCESS: + return True + case _: + continue + except TimeoutError: + logger.warning("执行超时", task="authorize", retry=i) + continue + + raise RuntimeError(f"Failed to flush token - agent={self.__class__.__name__}") + + async def flush_token(self, context: BrowserContext) -> Dict[str, str] | None: + page = context.pages[0] + await page.goto("https://www.epicgames.com/account/personal", wait_until="networkidle") + await page.goto( + "https://store.epicgames.com/zh-CN/p/orwell-keeping-an-eye-on-you", + wait_until="networkidle", + ) + await context.storage_state(path=self.player.ctx_cookie_path) + cookies = self.player.ctx_cookies.reload(self.player.ctx_cookie_path) + logger.success("flush_token", path=self.player.ctx_cookie_path) + return cookies + + @retry( + retry=retry_if_exception_type(TimeoutError), + wait=wait_fixed(0.5), + stop=(stop_after_delay(360) | stop_after_attempt(3)), + reraise=True, + ) + async def claim_weekly_games(self, page: Page, promotions: List[Game]): + in_cart_nums = 0 + + # --> Add promotions to Cart + for promotion in promotions: + logger.info("claim_weekly_games", action="go to store", url=promotion.url) + await page.goto(promotion.url, wait_until="load") + + # <-- Handle pre-page + with suppress(TimeoutError): + await page.click("//button//span[text()='Continue']", timeout=3000) + + # --> Make sure promotion is not in the library before executing + cta_btn = page.locator("//aside//button[@data-testid='add-to-cart-cta-button']") + with suppress(TimeoutError): + text = await cta_btn.text_content(timeout=10000) + if text == "View In Cart": + in_cart_nums += 1 + continue + if text == "Add To Cart": + await cta_btn.click() + await expect(cta_btn).to_have_text("View In Cart") + in_cart_nums += 1 + + if in_cart_nums == 0: + logger.success("Pass claim task", reason="Free games not added to shopping cart") + return + + # --> Goto cart page + await page.goto(URL_CART, wait_until="domcontentloaded") + await self.handle.empty_cart(page) + await page.click("//button//span[text()='Check Out']") + + # <-- Handle Any LICENSE + await self.handle.any_license(page) + + # --> Move to webPurchaseContainer iframe + logger.info("claim_weekly_games", action="move to webPurchaseContainer iframe") + wpc, payment_btn = await self.handle.move_to_purchase_container(page) + logger.info("claim_weekly_games", action="click payment button") + + # <-- Handle UK confirm-order + is_uk = await self.handle.uk_confirm_order(wpc) + + # <-- Insert challenge + recur_url = URL_CART_SUCCESS + await self.handle.insert_challenge(self._solver, page, wpc, payment_btn, recur_url, is_uk) + + # --> Wait for success + await page.wait_for_url(recur_url) + logger.success("claim_weekly_games", action="success", url=page.url) + + return True + + @retry( + retry=retry_if_exception_type(TimeoutError), + wait=wait_fixed(0.5), + stop=(stop_after_delay(360) | stop_after_attempt(3)), + reraise=True, + ) + async def claim_bundle_games(self, page: Page, promotions: List[Game]): + for promotion in promotions: + logger.info("claim_bundle_games", action="go to store", url=promotion.url) + await page.goto(promotion.url, wait_until="load") + + # <-- Handle pre-page + with suppress(TimeoutError): + await page.click("//button//span[text()='Continue']", timeout=3000) + + # --> Make sure promotion is not in the library before executing + purchase_btn = page.locator("//button[@data-testid='purchase-cta-button']").first + with suppress(TimeoutError): + text = await purchase_btn.text_content(timeout=10000) + if text == "Get": + await purchase_btn.click() + await page.wait_for_timeout(2000) + else: + continue + + # <-- Handle Any LICENSE + await self.handle.any_license(page) + + # --> Move to webPurchaseContainer iframe + logger.info("claim_bundle_games", action="move to webPurchaseContainer iframe") + wpc, payment_btn = await self.handle.move_to_purchase_container(page) + logger.info("claim_bundle_games", action="click payment button") + + # <-- Handle UK confirm-order + is_uk = await self.handle.uk_confirm_order(wpc) + + # <-- Insert challenge + recur_url = f"https://store.epicgames.com/en-US/download?ns={promotion.namespace}&id={promotion.id}" + await self.handle.insert_challenge( + self._solver, page, wpc, payment_btn, recur_url, is_uk + ) + + # --> Wait for success + await page.wait_for_url(recur_url) + logger.success("claim_bundle_games", action="success", url=page.url) + + return True + + +def get_promotions() -> List[Game]: + """ + 获取周免游戏数据 + + <即将推出> promotion["promotions"]["upcomingPromotionalOffers"] + <本周免费> promotion["promotions"]["promotionalOffers"] + :return: {"pageLink1": "pageTitle1", "pageLink2": "pageTitle2", ...} + """ + + def _has_discount_target(prot: dict) -> bool | None: + with suppress(KeyError, IndexError, TypeError): + offers = prot["promotions"]["promotionalOffers"][0]["promotionalOffers"] + for i, offer in enumerate(offers): + if offer["discountSetting"]["discountPercentage"] == 0: + return True + + _promotions: List[Game] = [] + + params = {"local": "zh-CN"} + resp = httpx.get(URL_PROMOTIONS, params=params) + try: + data = resp.json() + except JSONDecodeError as err: + logger.error("Failed to get promotions", err=err) + else: + elements = data["data"]["Catalog"]["searchStore"]["elements"] + promotions = [e for e in elements if e.get("promotions")] + # Get store promotion data and games + for promotion in promotions: + # Remove items that are discounted but not free. + if not _has_discount_target(promotion): + continue + # package free games + try: + query = promotion["catalogNs"]["mappings"][0]["pageSlug"] + promotion["url"] = f"{URL_PRODUCT_PAGE}{query}" + except TypeError: + promotion["url"] = f"{URL_PRODUCT_BUNDLES}{promotion['productSlug']}" + except IndexError: + promotion["url"] = f"{URL_PRODUCT_PAGE}{promotion['productSlug']}" + + promotion["thumbnail"] = promotion["keyImages"][-1]["url"] + _promotions.append(from_dict_to_model(Game, promotion)) + + return _promotions + + +def get_order_history( + cookies: Dict[str, str], page: str | None = None, last_create_at: str | None = None +) -> List[CompletedOrder]: + """获取最近的订单纪录""" + + def request_history() -> str | None: + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)" + " Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.203" + } + params = {"locale": "zh-CN", "page": page or "0", "latCreateAt": last_create_at or ""} + resp = httpx.get(URL_ORDER_HISTORY, headers=headers, cookies=cookies, params=params) + if not resp.is_success: + raise httpx.RequestError("Failed to get order history, cookie may have expired") + return resp.text + + completed_orders: List[CompletedOrder] = [] + + try: + data = json.loads(request_history()) + for order in data["orders"]: + if order["orderType"] != "PURCHASE": + continue + for item in order["items"]: + if len(item["namespace"]) != 32: + continue + completed_orders.append(from_dict_to_model(CompletedOrder, item)) + except (httpx.RequestError, JSONDecodeError, KeyError) as err: + logger.warning(err) + + return completed_orders diff --git a/src/epic_games/player.py b/src/epic_games/player.py new file mode 100644 index 0000000000..2de5580d1b --- /dev/null +++ b/src/epic_games/player.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# Time : 2023/8/14 17:04 +# Author : QIN2DIM +# Github : https://github.com/QIN2DIM +# Description: +from __future__ import annotations + +import abc +import json +import time +from abc import ABC +from contextlib import suppress +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict +from typing import Literal + +import httpx + +from settings import config, project + + +@dataclass +class EpicCookie: + cookies: Dict[str, str] = field(default_factory=dict) + """ + cookies in the Request Header + """ + + URL_VERIFY_COOKIES = "https://www.epicgames.com/account/personal" + + @classmethod + def from_state(cls, fp: Path): + """Jsonify cookie from Playwright""" + cookies = {} + try: + data = json.loads(fp.read_text())["cookies"] + cookies = {ck["name"]: ck["value"] for ck in data} + except (FileNotFoundError, KeyError): + pass + return cls(cookies=cookies) + + def is_available(self) -> bool | None: + if not self.cookies: + return + with suppress(httpx.ConnectError): + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.203", + "origin": "https://store.epicgames.com/zh-CN/p/orwell-keeping-an-eye-on-you", + } + resp = httpx.get(self.URL_VERIFY_COOKIES, headers=headers, cookies=self.cookies) + return resp.status_code == 200 + + def reload(self, fp: Path) -> Dict[str, str] | None: + try: + data = json.loads(fp.read_text())["cookies"] + self.cookies = {ck["name"]: ck["value"] for ck in data} + return self.cookies + except (FileNotFoundError, KeyError): + pass + + +@dataclass +class Player(ABC): + email: str + password: str + """ + Player's account + """ + + mode: Literal["epic-games", "unreal", "gog", "apg", "xbox"] + """ + Game Platform + """ + + user_data_dir: Path = project.user_data_dir + """ + Mount user cache + - database + - user_data_dir + - games@email # runtime user_data_dir + - context + - record + - captcha.mp4 + - eg-record.har + - ctx_cookie.json + - ctx_store.json + - order_history.json + - unreal@email + - context + - record + - captcha.mp4 + - eg-record.har + - gog@alice + - xbox@alice + """ + + def __post_init__(self): + namespace = f"{self.mode}@{self.email.split('@')[0]}" + self.user_data_dir = self.user_data_dir.joinpath(namespace) + for ck in ["browser_context", "record"]: + ckp = self.user_data_dir.joinpath(ck) + ckp.mkdir(parents=True, exist_ok=True) + + @classmethod + @abc.abstractmethod + def from_account(cls, *args, **kwargs): + raise NotImplementedError + + @property + def browser_context_dir(self) -> Path: + return self.user_data_dir.joinpath("browser_context") + + @property + def record_dir(self) -> Path: + return self.user_data_dir.joinpath("record") + + @property + def record_har_path(self) -> Path: + return self.record_dir.joinpath(f"eg-{int(time.time())}.har") + + @property + def ctx_cookie_path(self) -> Path: + return self.user_data_dir.joinpath("ctx_cookie.json") + + +@dataclass +class EpicPlayer(Player): + _ctx_cookies: EpicCookie = None + + def __post_init__(self): + super().__post_init__() + self._ctx_cookies = EpicCookie.from_state(fp=self.ctx_cookie_path) + + @classmethod + def from_account(cls): + return cls(email=config.epic_email, password=config.epic_password, mode="epic-games") + + @property + def ctx_store_path(self) -> Path: + return self.user_data_dir.joinpath("ctx_store.json") + + @property + def order_history_path(self) -> Path: + return self.user_data_dir.joinpath("order_history.json") + + @property + def ctx_cookies(self) -> EpicCookie: + return self._ctx_cookies + + @property + def cookies(self) -> Dict[str, str]: + return self._ctx_cookies.cookies + + @cookies.setter + def cookies(self, cookies: Dict[str, str]): + self._ctx_cookies.cookies = cookies diff --git a/src/main.py b/src/main.py deleted file mode 100644 index cbc81248dc..0000000000 --- a/src/main.py +++ /dev/null @@ -1,11 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:24 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: 🚀 哟!Epic免费人! -from fire import Fire - -from services.scaffold import Scaffold - -if __name__ == "__main__": - Fire(Scaffold) diff --git a/src/services/__init__.py b/src/services/__init__.py deleted file mode 100644 index e8c9c66271..0000000000 --- a/src/services/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:24 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: diff --git a/src/services/bricklayer/__init__.py b/src/services/bricklayer/__init__.py deleted file mode 100644 index 85aa172b08..0000000000 --- a/src/services/bricklayer/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 15:20 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from .bricklayer import Bricklayer - -__all__ = ["Bricklayer"] diff --git a/src/services/bricklayer/bricklayer.py b/src/services/bricklayer/bricklayer.py deleted file mode 100644 index 1aab06412f..0000000000 --- a/src/services/bricklayer/bricklayer.py +++ /dev/null @@ -1,310 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 13:50 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import os.path -import time -from hashlib import sha256 -from typing import List, Optional - -import cloudscraper -import yaml - -from services.settings import logger -from services.utils import ( - ToolBox, - get_ctx, - get_challenge_ctx, - ChallengeReset, - ChallengeTimeout, -) -from .core import AwesomeFreeMan -from .exceptions import ( - AssertTimeout, - SwitchContext, - PaymentException, - AuthException, - UnableToGet, - LoginException, -) - - -class CookieManager(AwesomeFreeMan): - """管理上下文身份令牌""" - - def __init__(self): - super().__init__() - - self.action_name = "CookieManager" - - def _t(self) -> str: - return ( - sha256(self.email[-3::-1].encode("utf-8")).hexdigest() if self.email else "" - ) - - def load_ctx_cookies(self) -> Optional[List[dict]]: - """ - 载入本地缓存的身份令牌。 - - :return: - """ - if not os.path.exists(self.path_ctx_cookies): - return [] - - with open(self.path_ctx_cookies, "r", encoding="utf8") as file: - data: dict = yaml.safe_load(file) - - ctx_cookies = data.get(self._t(), []) if isinstance(data, dict) else [] - if not ctx_cookies: - return [] - - logger.debug( - ToolBox.runtime_report( - motive="LOAD", - action_name=self.action_name, - message="Load context cookie.", - ) - ) - - return ctx_cookies - - def save_ctx_cookies(self, ctx_cookies: List[dict]) -> None: - """ - 在本地缓存身份令牌。 - - :param ctx_cookies: - :return: - """ - _data = {} - - if os.path.exists(self.path_ctx_cookies): - with open(self.path_ctx_cookies, "r", encoding="utf8") as file: - stream: dict = yaml.safe_load(file) - _data = _data if not isinstance(stream, dict) else stream - - _data.update({self._t(): ctx_cookies}) - - with open(self.path_ctx_cookies, "w", encoding="utf8") as file: - yaml.dump(_data, file) - - logger.debug( - ToolBox.runtime_report( - motive="SAVE", - action_name=self.action_name, - message="Update Context Cookie.", - ) - ) - - def is_available_cookie(self, ctx_cookies: Optional[List[dict]] = None) -> bool: - """ - 检测 COOKIE 是否有效 - - :param ctx_cookies: 若不指定则将工作目录 cookies 视为 ctx_cookies - :return: - """ - ctx_cookies = self.load_ctx_cookies() if ctx_cookies is None else ctx_cookies - if not ctx_cookies: - return False - - headers = {"cookie": ToolBox.transfer_cookies(ctx_cookies)} - - scraper = cloudscraper.create_scraper() - response = scraper.get( - self.URL_ACCOUNT_PERSONAL, headers=headers, allow_redirects=False - ) - - if response.status_code == 200: - return True - return False - - def refresh_ctx_cookies( - self, silence: bool = True, _ctx_session=None - ) -> Optional[bool]: - """ - 更新上下文身份信息 - - :param _ctx_session: 泛型开发者参数 - :param silence: - :return: - """ - # {{< Check Context Cookie Validity >}} - if self.is_available_cookie(): - logger.success( - ToolBox.runtime_report( - motive="CHECK", - action_name=self.action_name, - message="The identity token is valid.", - ) - ) - return True - # {{< Done >}} - - # {{< Insert Challenger Context >}} - ctx = get_challenge_ctx(silence=silence) if _ctx_session is None else _ctx_session - try: - for _ in range(8): - # Enter the account information and jump to the man-machine challenge page. - self._login(self.email, self.password, ctx=ctx) - - # Determine whether the account information is filled in correctly. - if self.assert_.login_error(ctx): - raise LoginException( - f"登录异常 Alert『{self.assert_.get_login_error_msg(ctx)}』" - ) - - # Assert if you are caught in a man-machine challenge. - try: - fallen = self._armor.fall_in_captcha_login(ctx=ctx) - except AssertTimeout: - continue - else: - # Approved. - if not fallen: - break - - # Winter is coming, so hear me roar! - response = self._armor.anti_hcaptcha(ctx, door="login") - if response: - break - else: - logger.critical( - ToolBox.runtime_report( - motive="MISS", - action_name=self.action_name, - message="Identity token update failed.", - ) - ) - return False - except ChallengeReset: - pass - except (AuthException, ChallengeTimeout) as error: - logger.critical( - ToolBox.runtime_report( - motive="SKIP", action_name=self.action_name, message=error.msg - ) - ) - return False - else: - # Store contextual authentication information. - self.save_ctx_cookies(ctx_cookies=ctx.get_cookies()) - return self.is_available_cookie(ctx_cookies=ctx.get_cookies()) - finally: - if _ctx_session is None: - ctx.quit() - # {{< Done >}} - - return True - - -class Bricklayer(AwesomeFreeMan): - """常驻免费游戏的认领逻辑""" - - def __init__(self, silence: bool = None): - super().__init__() - self.silence = True if silence is None else silence - - self.action_name = "AwesomeFreeMan" - - self.cookie_manager = CookieManager() - - # 游戏获取结果的状态 - self.result = "" - - def get_free_game( - self, - page_link: str, - ctx_cookies: List[dict] = None, - refresh: bool = True, - challenge: Optional[bool] = None, - _ctx_session=None, - ) -> Optional[bool]: - """ - 获取免费游戏 - - 部署后必须传输有效的 `page_link` 参数。 - :param _ctx_session: - :param challenge: - :param page_link: 游戏购买页链接 zh-CN - :param refresh: 当 COOKIE 失效时主动刷新 COOKIE - :param ctx_cookies: - :return: - """ - ctx_cookies = ( - self.cookie_manager.load_ctx_cookies() if ctx_cookies is None else ctx_cookies - ) - - # [🚀] 验证 COOKIE - # 请勿在并发环境下 让上下文驱动陷入到不得不更新 COOKIE 的陷阱之中。 - if not ctx_cookies or not self.cookie_manager.is_available_cookie( - ctx_cookies=ctx_cookies - ): - if refresh: - self.cookie_manager.refresh_ctx_cookies() - ctx_cookies = self.cookie_manager.load_ctx_cookies() - else: - logger.error( - ToolBox.runtime_report( - motive="QUIT", - action_name=self.action_name, - message="Cookie 已过期,任务已退出。", - ) - ) - return False - - # [🚀] 常驻免费(General)周免(Challenge) - if _ctx_session is None: - ctx = get_challenge_ctx(self.silence) if challenge else get_ctx(self.silence) - else: - ctx = _ctx_session - - # [🚀] 认领游戏 - try: - self.result = self._get_free_game( - page_link=page_link, api_cookies=ctx_cookies, ctx=ctx - ) - except AssertTimeout: - logger.debug( - ToolBox.runtime_report( - motive="QUIT", action_name=self.action_name, message="循环断言超时,任务退出。" - ) - ) - except UnableToGet as error: - logger.debug( - ToolBox.runtime_report( - motive="QUIT", - action_name=self.action_name, - message=str(error).strip(), - url=page_link, - ) - ) - except SwitchContext as error: - logger.warning( - ToolBox.runtime_report( - motive="SWITCH", - action_name=self.action_name, - message="正在退出标准上下文", - error=str(error).strip(), - url=page_link, - ) - ) - except PaymentException as error: - logger.debug( - ToolBox.runtime_report( - motive="QUIT", - action_name=self.action_name, - message="🚧 订单异常", - type=f"PaymentException {error}".strip(), - url=page_link, - ) - ) - except AuthException as error: - logger.critical( - ToolBox.runtime_report( - motive="SKIP", action_name=self.action_name, message=error.msg - ) - ) - return False - finally: - if _ctx_session is None: - ctx.quit() diff --git a/src/services/bricklayer/core.py b/src/services/bricklayer/core.py deleted file mode 100644 index f9df4060cc..0000000000 --- a/src/services/bricklayer/core.py +++ /dev/null @@ -1,869 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 15:20 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import asyncio -import os -import sys -import time -import urllib.request -from typing import List, Optional, NoReturn - -from selenium.common.exceptions import ( - TimeoutException, - ElementNotVisibleException, - WebDriverException, - ElementClickInterceptedException, - NoSuchElementException, - StaleElementReferenceException, - InvalidCookieDomainException, -) -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.wait import WebDriverWait -from undetected_chromedriver import Chrome - -from services.settings import ( - logger, - DIR_COOKIES, - DIR_CHALLENGE, - DIR_MODEL, - EPIC_EMAIL, - EPIC_PASSWORD, - PATH_RAINBOW, -) -from services.utils import ( - YOLO, - RiverChallenger, - DetectionChallenger, - ToolBox, - ArmorCaptcha, - AshFramework, - ChallengeReset, -) -from .exceptions import ( - AssertTimeout, - UnableToGet, - CookieExpired, - SwitchContext, - PaymentException, - AuthException, - PaymentAutoSubmit, -) - -# 显示人机挑战的DEBUG日志 -ARMOR_DEBUG = True - - -class ArmorUtils(ArmorCaptcha): - """人机对抗模组""" - - def __init__(self, debug: bool = ARMOR_DEBUG): - super().__init__(dir_workspace=DIR_CHALLENGE, debug=debug) - - # 重定向工作空间 - self.model = YOLO(DIR_MODEL) - - @staticmethod - def fall_in_captcha_login(ctx: Chrome) -> Optional[bool]: - """ - 判断在登录时是否遇到人机挑战 - - :param ctx: - :return: True:已进入人机验证页面,False:跳转到个人主页 - """ - threshold_timeout = 32 - start = time.time() - flag_ = ctx.current_url - while True: - if ctx.current_url != flag_: - return False - - if time.time() - start > threshold_timeout: - raise AssertTimeout("任务超时:判断是否陷入人机验证") - - try: - ctx.switch_to.frame( - ctx.find_element(By.XPATH, "//iframe[contains(@title,'content')]") - ) - ctx.find_element(By.XPATH, "//div[@class='prompt-text']") - return True - except WebDriverException: - pass - finally: - ctx.switch_to.default_content() - - @staticmethod - def fall_in_captcha_runtime(ctx: Chrome) -> Optional[bool]: - """ - 判断在下单时是否遇到人机挑战 - - :param ctx: - :return: - """ - try: - # "//div[@id='talon_frame_checkout_free_prod']" - WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException).until( - EC.presence_of_element_located( - (By.XPATH, "//iframe[contains(@title,'content')]") - ) - ) - return True - except TimeoutException: - return False - - def switch_solution(self, mirror, label: Optional[str] = None): - """模型卸载""" - label = self.label if label is None else label - - if label in ["垂直河流"]: - return RiverChallenger(path_rainbow=PATH_RAINBOW) - if label in ["天空中向左飞行的飞机"]: - return DetectionChallenger(path_rainbow=PATH_RAINBOW) - return mirror - - def download_images(self) -> None: - """ - 植入协程框架加速下载。 - - :return: - """ - - class ImageDownloader(AshFramework): - """协程助推器 提高挑战图片的下载效率""" - - def __init__(self, docker=None): - super().__init__(docker=docker) - - async def control_driver(self, context, session=None): - path_challenge_img, url = context - - # 下载挑战图片 - async with session.get(url) as response: - with open(path_challenge_img, "wb") as file: - file.write(await response.read()) - - self.log(message="下载挑战图片") - - # 初始化挑战图片下载目录 - workspace_ = self._init_workspace() - - # 初始化数据容器 - docker_ = [] - for alias_, url_ in self.alias2url.items(): - path_challenge_img_ = os.path.join(workspace_, f"{alias_}.png") - self.alias2path.update({alias_: path_challenge_img_}) - docker_.append((path_challenge_img_, url_)) - - # 启动最高功率的协程任务 - if "win" in sys.platform: - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - asyncio.run(ImageDownloader(docker=docker_).subvert(workers="fast")) - else: - loop = asyncio.get_event_loop() - loop.run_until_complete( - ImageDownloader(docker=docker_).subvert(workers="fast") - ) - - self.runtime_workspace = workspace_ - - def challenge_success( - self, ctx: Chrome, init: bool = True, **kwargs - ) -> Optional[bool]: - """ - 判断挑战是否成功的复杂逻辑 - - IF index is True: - 经过首轮识别点击后,出现四种结果: - - 直接通过验证(小概率) - - 进入第二轮(正常情况) - 通过短时间内可否继续点击拼图来断言是否陷入第二轮测试 - - 要求重试(小概率) - 特征被识别或网络波动,需要重试 - - 通过验证,弹出 2FA 双重认证 - 无法处理,任务结束 - - :param ctx: 挑战者驱动上下文 - :param init: 是否为初次挑战 - :return: - """ - - def _continue_action(): - try: - time.sleep(3) - ctx.find_element(By.XPATH, "//div[@class='task-image']") - except NoSuchElementException: - return True - else: - return False - - def _high_threat_proxy_access(): - """error-text:: 请再试一次""" - # 未设置子网桥系统代理 - if not urllib.request.getproxies(): - return False - - try: - WebDriverWait(ctx, 2, ignored_exceptions=WebDriverException).until( - EC.visibility_of_element_located( - (By.XPATH, "//div[@class='error-text']") - ) - ) - return True - except TimeoutException: - return False - - door: str = kwargs.get("door", "login") - - flag = ctx.current_url - - # 首轮测试后判断短时间内页内是否存在可点击的拼图元素 - # hcaptcha 最多两轮验证,一般情况下,账号信息有误仅会执行一轮,然后返回登录窗格提示密码错误 - # 其次是被识别为自动化控制,这种情况也是仅执行一轮,回到登录窗格提示“返回数据错误” - if init and not _continue_action(): - self.log("挑战继续") - return False - - if not init and _high_threat_proxy_access(): - self.log("挑战被迫重置 可能使用了高威胁的代理IP") - - try: - challenge_reset = WebDriverWait( - ctx, 5, ignored_exceptions=WebDriverException - ).until( - EC.presence_of_element_located( - (By.XPATH, "//div[@class='MuiAlert-message']") - ) - ) - except TimeoutException: - # 如果挑战通过,自动跳转至其他页面(也即离开当前网址) - try: - WebDriverWait(ctx, 10).until(EC.url_changes(flag)) - # 如果挑战未通过,可能为“账号信息错误”“分数太低”“自动化特征被识别” - except TimeoutException: - if door == "login": - self.log("断言超时,挑战继续") - return False - # 人机挑战通过,但可能还需处理 `2FA` 问题(超纲了) - else: - # 如果没有遇到双重认证,人机挑战成功 - if "id/login/mfa" not in ctx.current_url: - self.log("挑战成功") - return True - raise AuthException("人机挑战已退出 error=遭遇意外的 2FA 双重认证") - else: - self.log("挑战失败,需要重置挑战") - challenge_reset.click() - raise ChallengeReset - - def anti_hcaptcha(self, ctx: Chrome, door: str = "login") -> Optional[bool]: # noqa - """ - Handle hcaptcha challenge - - ## Reference - - M. I. Hossen and X. Hei, "A Low-Cost Attack against the hCaptcha System," 2021 IEEE Security - and Privacy Workshops (SPW), 2021, pp. 422-431, doi: 10.1109/SPW53761.2021.00061. - - > ps:该篇文章中的部分内容已过时,现在 hcaptcha challenge 远没有作者说的那么容易应付。 - - :param door: [login free] - :param ctx: - :return: - """ - # [👻] 进入人机挑战关卡 - ctx.switch_to.frame( - WebDriverWait(ctx, 5, ignored_exceptions=ElementNotVisibleException).until( - EC.presence_of_element_located( - (By.XPATH, "//iframe[contains(@title,'content')]") - ) - ) - ) - - # [👻] 获取挑战图片 - # 多轮验证标签不会改变 - self.get_label(ctx) - if self.tactical_retreat(): - ctx.switch_to.default_content() - return False - - # [👻] 注册解决方案 - # 根据挑战类型自动匹配不同的模型 - model = self.switch_solution(mirror=self.model) - - # [👻] 人机挑战! - try: - for index in range(2): - self.mark_samples(ctx) - - self.download_images() - - self.challenge(ctx, model=model) - - result = self.challenge_success(ctx, init=not bool(index), door=door) - - # 仅一轮测试就通过 - if index == 0 and result: - break - # 断言超时 - if index == 1 and result is False: - raise TimeoutException - # 提交结果断言超时或 mark_samples() 等待超时 - except TimeoutException: - ctx.switch_to.default_content() - return False - # 捕获重置挑战的请求信号 - except ChallengeReset: - ctx.switch_to.default_content() - return self.anti_hcaptcha(ctx, door=door) - # 回到主线剧情 - else: - ctx.switch_to.default_content() - return True - - -class AssertUtils: - """处理穿插在认领过程中意外出现的遮挡信息""" - - # 特征指令/简易错误 - # 此部分状态作为消息模板的一部分,尽量简短易理解 - COOKIE_EXPIRED = "💥 饼干过期了" - ASSERT_OBJECT_EXCEPTION = "🚫 无效的断言对象" - GAME_OK = "🛴 已在库" - GAME_PENDING = "👀 待认领" - GAME_CLAIM = "💰 领取成功" - - @staticmethod - def login_error(ctx: Chrome) -> bool: - """登录失败 可能原因为账号或密码错误""" - - threshold_timeout = 3 - start = time.time() - - while True: - # "任务超时:网络响应过慢" - if time.time() - start > threshold_timeout: - return False - - # 提交按钮正在响应或界面弹出人机挑战 - try: - submit_button = ctx.find_element(By.ID, "sign-in") - status_obj = submit_button.get_attribute("tabindex") - if status_obj == "-1": - continue - except (AttributeError, WebDriverException): - pass - - # 登录页面遭遇 Alert,可能原因为: - # - 账号或密码无效; - # - Auth Response 异常; - # - 账号被锁定; - try: - h6_tags = ctx.find_elements(By.TAG_NAME, "h6") - if len(h6_tags) > 1: - return True - return False - except NoSuchElementException: - pass - - @staticmethod - def get_login_error_msg(ctx) -> Optional[str]: - """获取登录页面的错误信息""" - try: - return ctx.find_element(By.XPATH, "//form//h6").text.strip() - except (WebDriverException, AttributeError): - return "null" - - @staticmethod - def wrong_driver(ctx, msg: str): - """判断当前上下文任务是否使用了错误的浏览器驱动""" - if "chrome.webdriver" in str(ctx.__class__): - raise SwitchContext(msg) - - @staticmethod - def surprise_license(ctx: Chrome) -> Optional[bool]: - """ - 新用户首次购买游戏需要处理许可协议书 - - :param ctx: - :return: - """ - try: - surprise_obj = WebDriverWait( - ctx, 3, ignored_exceptions=ElementNotVisibleException - ).until( - EC.presence_of_element_located( - (By.XPATH, "//label//span[@data-component='Message']") - ) - ) - except TimeoutException: - return - else: - try: - if surprise_obj.text == "我已阅读并同意最终用户许可协议书": - # 勾选协议 - tos_agree = WebDriverWait( - ctx, 3, ignored_exceptions=ElementClickInterceptedException - ).until(EC.element_to_be_clickable((By.ID, "agree"))) - - # 点击接受 - tos_submit = WebDriverWait( - ctx, 3, ignored_exceptions=ElementClickInterceptedException - ).until( - EC.element_to_be_clickable( - (By.XPATH, "//span[text()='接受']/parent::button") - ) - ) - time.sleep(1) - tos_agree.click() - tos_submit.click() - return True - # 窗口渲染出来后因不可抗力因素自然消解 - except (TimeoutException, StaleElementReferenceException): - return - - @staticmethod - def fall_in_captcha_runtime(ctx: Chrome) -> Optional[bool]: - """捕获隐藏在周免游戏订单中的人机挑战""" - try: - # //iframe[@id='talon_frame_checkout_free_prod'] - WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException).until( - EC.presence_of_element_located( - (By.XPATH, "//iframe[contains(@title,'content')]") - ) - ) - return True - except TimeoutException: - return False - - @staticmethod - def surprise_warning_purchase(ctx: Chrome) -> Optional[bool]: - """ - 处理弹窗遮挡消息。 - - 这是一个没有意义的操作,但无可奈何,需要更多的测试。 - :param ctx: - :return: - """ - - try: - surprise_obj = WebDriverWait(ctx, 2).until( - EC.visibility_of_element_located((By.TAG_NAME, "h1")) - ) - surprise_warning = surprise_obj.text - except TimeoutException: - return True - - if "成人内容" in surprise_warning: - WebDriverWait( - ctx, 2, ignored_exceptions=ElementClickInterceptedException - ).until( - EC.element_to_be_clickable( - (By.XPATH, "//span[text()='继续']/parent::button") - ) - ).click() - return True - if "内容品当前在您所在平台或地区不可用。" in surprise_warning: - raise UnableToGet(surprise_warning) - return False - - @staticmethod - def payment_auto_submit(ctx: Chrome) -> NoReturn: - """认领游戏后订单自动提交 仅在常驻游戏中出现""" - try: - warning_text = ( - WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException) - .until( - EC.presence_of_element_located( - (By.XPATH, "//div[@data-component='DownloadMessage']//span") - ) - ) - .text - ) - if warning_text == "感谢您的购买": - raise PaymentAutoSubmit - except TimeoutException: - pass - - @staticmethod - def payment_blocked(ctx: Chrome) -> NoReturn: - """判断游戏锁区""" - # 需要在 webPurchaseContainer 里执行 - try: - warning_text = ( - WebDriverWait(ctx, 3, ignored_exceptions=WebDriverException) - .until( - EC.presence_of_element_located( - (By.XPATH, "//h2[@class='payment-blocked__msg']") - ) - ) - .text - ) - if warning_text: - raise PaymentException(warning_text) - except TimeoutException: - pass - - @staticmethod - def timeout(loop_start: float, loop_timeout: float = 300) -> NoReturn: - """任务超时锁""" - if time.time() - loop_start > loop_timeout: - raise AssertTimeout - - @staticmethod - def purchase_status( - ctx: Chrome, - page_link: str, - action_name: Optional[str] = "AssertUtils", - init: Optional[bool] = True, - ) -> Optional[str]: - """ - 断言当前上下文页面的游戏的在库状态。 - - :param init: - :param action_name: - :param page_link: - :param ctx: - :return: - """ - time.sleep(2) - - # 捕获按钮对象,根据按钮上浮动的提示信息断言游戏在库状态 超时的空对象主动抛出异常 - try: - assert_obj = WebDriverWait(ctx, 30, WebDriverException).until( - EC.element_to_be_clickable( - ( - By.XPATH, - "//span[@data-component='PurchaseCTA']//span[@data-component='Message']", - ) - ) - ) - except TimeoutException: - return AssertUtils.ASSERT_OBJECT_EXCEPTION - - assert_info = assert_obj.text - - # 游戏名 超时的空对象主动抛出异常 - game_name = ( - WebDriverWait(ctx, 30, ignored_exceptions=ElementNotVisibleException) - .until(EC.visibility_of_element_located((By.XPATH, "//h1"))) - .text - ) - - if game_name[-1] == "。": - logger.warning( - ToolBox.runtime_report( - motive="SKIP", - action_name=action_name, - message=f"🚫 {game_name}", - url=page_link, - ) - ) - return AssertUtils.ASSERT_OBJECT_EXCEPTION - - if "已在" in assert_info: - _message = "🛴 游戏已在库" if init else "🥂 领取成功" - logger.info( - ToolBox.runtime_report( - motive="GET", - action_name=action_name, - message=_message, - game=f"『{game_name}』", - ) - ) - return AssertUtils.GAME_OK if init else AssertUtils.GAME_CLAIM - - if "获取" in assert_info: - deadline: Optional[str] = None - try: - deadline = ctx.find_element( - By.XPATH, - "//div[@data-component='PDPSidebarLayout']" - "//span[contains(text(),'优惠截止')][@data-component='Message']", - ).text - except (NoSuchElementException, AttributeError): - pass - - # 必须使用挑战者驱动领取周免游戏,处理潜在的人机验证 - if deadline: - AssertUtils.wrong_driver(ctx, "♻ 使用挑战者上下文领取周免游戏。") - - message = "🚀 发现免费游戏" if not deadline else f"💰 发现周免游戏 {deadline}" - logger.success( - ToolBox.runtime_report( - motive="GET", - action_name=action_name, - message=message, - game=f"『{game_name}』", - ) - ) - - return AssertUtils.GAME_PENDING - - if "购买" in assert_info: - logger.warning( - ToolBox.runtime_report( - motive="SKIP", - action_name=action_name, - message="🚧 这不是免费游戏", - game=f"『{game_name}』", - ) - ) - return AssertUtils.ASSERT_OBJECT_EXCEPTION - - return AssertUtils.ASSERT_OBJECT_EXCEPTION - - @staticmethod - def refund_info(ctx: Chrome): - """ - 处理订单中的 退款及撤销权信息 - - :param ctx: - :return: - """ - try: - WebDriverWait( - ctx, 2, ignored_exceptions=StaleElementReferenceException - ).until( - EC.element_to_be_clickable( - (By.XPATH, "//span[text()='我同意']/ancestor::button") - ) - ).click() - except TimeoutException: - pass - - -class AwesomeFreeMan: - """白嫖人的基础设施""" - - # 操作对象参数 - URL_LOGIN = "https://www.epicgames.com/id/login/epic?lang=zh-CN" - URL_ACCOUNT_PERSONAL = "https://www.epicgames.com/account/personal" - - def __init__(self): - """定义了一系列领取免费游戏所涉及到的浏览器操作。""" - - # 实体对象参数 - self.action_name = "BaseAction" - self.email, self.password = EPIC_EMAIL, EPIC_PASSWORD - - # 驱动参数 - self.path_ctx_cookies = os.path.join(DIR_COOKIES, "ctx_cookies.yaml") - self.loop_timeout = 300 - - # 注册拦截机 - self._armor = ArmorUtils() - self.assert_ = AssertUtils() - - def _reset_page(self, ctx: Chrome, page_link: str, api_cookies): - ctx.get(self.URL_ACCOUNT_PERSONAL) - for cookie_dict in api_cookies: - try: - ctx.add_cookie(cookie_dict) - except InvalidCookieDomainException as err: - logger.error( - ToolBox.runtime_report( - motive="SKIP", - action_name=self.action_name, - error=err.msg, - domain=cookie_dict.get("domain", "null"), - name=cookie_dict.get("name", "null"), - ) - ) - ctx.get(page_link) - - def _login(self, email: str, password: str, ctx: Chrome) -> None: - """ - 作为被动方式,登陆账号,刷新 identity token。 - - 此函数不应被主动调用,应当作为 refresh identity token / Challenge 的辅助函数。 - :param ctx: - :param email: - :param password: - :return: - """ - ctx.get(self.URL_LOGIN) - - WebDriverWait(ctx, 10, ignored_exceptions=ElementNotVisibleException).until( - EC.presence_of_element_located((By.ID, "email")) - ).send_keys(email) - - WebDriverWait(ctx, 10, ignored_exceptions=ElementNotVisibleException).until( - EC.presence_of_element_located((By.ID, "password")) - ).send_keys(password) - - WebDriverWait(ctx, 60, ignored_exceptions=ElementClickInterceptedException).until( - EC.element_to_be_clickable((By.ID, "sign-in")) - ).click() - - def _activate_payment(self, api: Chrome) -> Optional[bool]: - """ - 激活游戏订单 - - :param api: - :return: - """ - for _ in range(5): - try: - WebDriverWait( - api, 5, ignored_exceptions=ElementClickInterceptedException - ).until( - EC.element_to_be_clickable( - (By.XPATH, "//button[@data-testid='purchase-cta-button']") - ) - ).click() - return True - # 加载超时,继续测试 - except TimeoutException: - continue - # 出现弹窗遮挡 - except ElementClickInterceptedException: - try: - if self.assert_.surprise_warning_purchase(api) is True: - continue - except UnableToGet: - return False - - def _handle_payment(self, ctx: Chrome) -> None: - """ - 处理游戏订单 - - 逻辑过于复杂,需要重构。此处为了一套代码涵盖各种情况,做了很多妥协。 - 需要针对 周免游戏的订单处理 设计一套执行效率更高的业务模型。 - :param ctx: - :return: - """ - - # [🍜] Switch to the [Purchase Container] iframe. - try: - payment_frame = WebDriverWait( - ctx, 5, ignored_exceptions=ElementNotVisibleException - ).until( - EC.presence_of_element_located( - (By.XPATH, "//div[@id='webPurchaseContainer']//iframe") - ) - ) - ctx.switch_to.frame(payment_frame) - except TimeoutException: - try: - warning_layout = ctx.find_element( - By.XPATH, "//div[@data-component='WarningLayout']" - ) - warning_text = warning_layout.text - # Handle delayed loading of cookies. - if "依旧要购买吗" in warning_text: - return - # Handle Linux User-Agent Heterogeneous Services. - if "设备不受支持" in warning_text: - ctx.find_element( - By.XPATH, "//span[text()='继续']/parent::button" - ).click() - return self._handle_payment(ctx) - except NoSuchElementException: - pass - - # [🍜] 判断游戏锁区 - self.assert_.payment_blocked(ctx) - - # [🍜] Ignore: Click the [Accept Agreement] confirmation box. - try: - WebDriverWait( - ctx, 2, ignored_exceptions=ElementClickInterceptedException - ).until( - EC.presence_of_element_located( - (By.XPATH, "//div[contains(@class,'payment-check-box')]") - ) - ).click() - except TimeoutException: - pass - - # [🍜] Click the [order] button. - try: - time.sleep(0.5) - WebDriverWait( - ctx, 20, ignored_exceptions=ElementClickInterceptedException - ).until( - EC.element_to_be_clickable( - (By.XPATH, "//button[contains(@class,'payment-btn')]") - ) - ).click() - # 订单界面未能按照预期效果出现,在超时范围内重试若干次。 - except TimeoutException: - ctx.switch_to.default_content() - return - - # [🍜] 处理 UK 地区账号的「退款及撤销权信息」。 - self.assert_.refund_info(ctx) - - # [🍜] 捕获隐藏在订单中的人机挑战,仅在周免游戏中出现。 - if self._armor.fall_in_captcha_runtime(ctx): - self.assert_.wrong_driver(ctx, "任务中断,请使用挑战者上下文处理意外弹出的人机验证。") - try: - self._armor.anti_hcaptcha(ctx, door="free") - except (ChallengeReset, TimeoutException): - pass - - # [🍜] Switch to default iframe. - ctx.switch_to.default_content() - ctx.refresh() - - def _get_free_game( - self, page_link: str, api_cookies: List[dict], ctx: Chrome - ) -> Optional[str]: - """ - 获取免费游戏 - - 需要加载cookie后使用,避免不必要的麻烦。 - :param page_link: - :param api_cookies: - :param ctx: - :return: - """ - if not api_cookies: - raise CookieExpired(self.assert_.COOKIE_EXPIRED) - - _loop_start = time.time() - init = True - while True: - # [🚀] 重载身份令牌 - # InvalidCookieDomainException:需要 2 次 GET 重载 cookie relative domain - # InvalidCookieDomainException:跨域认证,访问主域名或过滤异站域名信息 - self._reset_page(ctx=ctx, page_link=page_link, api_cookies=api_cookies) - - # [🚀] 断言游戏的在库状态 - self.assert_.surprise_warning_purchase(ctx) - self.result = self.assert_.purchase_status( - ctx, page_link, self.action_name, init - ) - # 当游戏不处于<待认领>状态时跳过后续业务 - if self.result != self.assert_.GAME_PENDING: - # <游戏状态断言超时>或<检测到异常的实体对象> - # 在超时阈值内尝试重新拉起服务 - if self.result == self.assert_.ASSERT_OBJECT_EXCEPTION: - continue - # 否则游戏状态处于<领取成功>或<已在库> - break - - # [🚀] 激活游戏订单 - # Maximum sleep time -> 12s - self._activate_payment(ctx) - - # [🚀] 新用户首次购买游戏需要处理许可协议书 - # Maximum sleep time -> 3s - if self.assert_.surprise_license(ctx): - ctx.refresh() - continue - - # [🚀] 订单消失 - # Maximum sleep time -> 5s - self.assert_.payment_auto_submit(ctx) - - # [🚀] 处理游戏订单 - self._handle_payment(ctx) - - # [🚀] 更新上下文状态 - init = False - self.assert_.timeout(_loop_start, self.loop_timeout) - - return self.result diff --git a/src/services/bricklayer/exceptions.py b/src/services/bricklayer/exceptions.py deleted file mode 100644 index 66078b400f..0000000000 --- a/src/services/bricklayer/exceptions.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 15:20 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from typing import Optional, Sequence - - -class AwesomeException(Exception): - def __init__( - self, msg: Optional[str] = None, stacktrace: Optional[Sequence[str]] = None - ): - self.msg = msg - self.stacktrace = stacktrace - super().__init__() - - def __str__(self) -> str: - exception_msg = "Message: {}\n".format(self.msg) - if self.stacktrace: - stacktrace = "\n".join(self.stacktrace) - exception_msg += "Stacktrace:\n{}".format(stacktrace) - return exception_msg - - -class ContextException(AwesomeException): - """上下文使用错误""" - - -class SwitchContext(ContextException): - """当使用普通驱动上下文处理人机验证时抛出""" - - -class AuthException(AwesomeException): - """身份认证出现问题时抛出,例如遭遇插入到 hcaptcha 之后的 2FA 身份验证""" - - -class LoginException(AuthException): - """登录异常 可能原因:账号或密码无效""" - - -class PaymentException(AwesomeException): - """订单操作异常""" - - -class PaymentAutoSubmit(PaymentException): - """点击获取游戏后,订单窗格没有弹出,直接感谢我们购买游戏""" - - -class CookieExpired(AwesomeException): - """身份令牌或饼干过期时抛出""" - - -class AssertTimeout(AwesomeException): - """断言超时""" - - -class UnableToGet(AwesomeException): - """不可抗力因素,游戏无法获取""" - - -class SurpriseExit(KeyboardInterrupt): - """脑洞大开的作者想挑战一下 Python 自带的垃圾回收机制,决定以一种极其垂直的方式结束系统任务。""" diff --git a/src/services/deploy.py b/src/services/deploy.py deleted file mode 100644 index a27cefe674..0000000000 --- a/src/services/deploy.py +++ /dev/null @@ -1,229 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import random -from datetime import datetime, timedelta -from typing import Optional - -import apprise -import pytz -from apscheduler.schedulers.blocking import BlockingScheduler -from apscheduler.triggers.cron import CronTrigger - -from services.bricklayer import Bricklayer -from services.explorer import Explorer -from services.settings import logger, MESSAGE_PUSHER_SETTINGS -from services.utils import ToolBox, get_challenge_ctx - - -class ClaimerScheduler: - """系统任务调度器""" - - SPAWN_TIME = "spawn_time" - - def __init__(self, silence: Optional[bool] = None): - self.action_name = "AwesomeScheduler" - self.end_date = datetime.now(pytz.timezone("Asia/Shanghai")) + timedelta(days=180) - self.silence = silence - # 服务注册 - self.scheduler = BlockingScheduler() - self.bricklayer = Bricklayer(silence=silence) - self.explorer = Explorer(silence=silence) - self.logger = logger - - def deploy_on_vps(self): - """部署最佳实践的 VPS 定时任务""" - - # [⏰] 北京时间每周五凌晨 4 点的 两个任意时刻 执行任务 - jitter_minute = [random.randint(10, 20), random.randint(35, 57)] - - # [⚔] 首发任务用于主动认领,备用方案用于非轮询审核 - self.scheduler.add_job( - func=self.job_loop_claim, - trigger=CronTrigger( - day_of_week="fri", - hour="4", - minute=f"{jitter_minute[0]},{jitter_minute[-1]}", - second="30", - timezone="Asia/Shanghai", - # 必须使用 `end_date` 续订生产环境 定时重启 - end_date=self.end_date, - # 必须使用 `jitter` 弥散任务发起时间 - jitter=15, - ), - name="loop_claim", - ) - - self.logger.debug( - ToolBox.runtime_report( - motive="JOB", - action_name=self.action_name, - message=f"任务将在北京时间每周五 04:{jitter_minute[0]} " - f"以及 04:{jitter_minute[-1]} 执行。", - end_date=str(self.end_date), - ) - ) - - # [⚔] Gracefully run scheduler.` - try: - self.scheduler.start() - except KeyboardInterrupt: - self.scheduler.shutdown(wait=False) - self.logger.debug( - ToolBox.runtime_report( - motive="EXITS", - action_name=self.action_name, - message="Received keyboard interrupt signal.", - ) - ) - - def _push(self, inline_docker: list, pusher_settings: Optional[dict] = None): - """ - 推送追踪日志 - - :param inline_docker: - :param pusher_settings: - :return: - """ - - # ------------------------- - # [♻]参数过滤 - # ------------------------- - if pusher_settings is None: - pusher_settings = MESSAGE_PUSHER_SETTINGS - if not pusher_settings["enable"]: - return - # ------------------------- - # [📧]消息推送 - # ------------------------- - _inline_textbox = [f"当前玩家:{ToolBox.secret_email(self.bricklayer.email)}"] - _inline_textbox += ["运行日志".center(20, "-")] - if not inline_docker: - _inline_textbox += [f"[{ToolBox.date_format_now()}] 🛴 暂无待认领的周免游戏"] - else: - _inline_textbox += [ - f"[{game_obj[self.SPAWN_TIME]}] {game_obj['name']} {game_obj['status']}" - for game_obj in inline_docker - ] - _inline_textbox += ["生命周期统计".center(20, "-"), f"total:{inline_docker.__len__()}"] - - # 注册 Apprise 消息推送框架 - active_pusher = pusher_settings["pusher"] - surprise = apprise.Apprise() - for server in active_pusher.values(): - surprise.add(server) - - # 发送模版消息 - surprise.notify(body="\n".join(_inline_textbox), title="EpicAwesomeGamer 运行报告") - - self.logger.success( - ToolBox.runtime_report( - motive="Notify", - action_name=self.action_name, - message="消息推送完毕", - active_pusher=[i[0] for i in active_pusher.items() if i[-1]], - ) - ) - - def deploy_jobs(self, platform: Optional[str] = None): - """ - 部署系统任务 - - :param platform: within [vps serverless qing-long] - :return: - """ - platform = "vps" if platform is None else platform - if platform not in ["vps", "serverless", "qing-long"]: - raise NotImplementedError - - self.logger.debug( - ToolBox.runtime_report( - motive="JOB", - action_name=self.action_name, - message="部署任务调度器", - platform=platform.upper(), - ) - ) - - # [⚔] Distribute common state machine patterns - if platform == "vps": - self.deploy_on_vps() - elif platform == "serverless": - raise NotImplementedError - elif platform == "qing-long": - return self.job_loop_claim() - - def job_loop_claim(self): - """单步子任务 认领周免游戏""" - - def _release_power(urls: Optional[list] = None): - if not urls: - self.logger.debug( - ToolBox.runtime_report( - motive="SKIP", - action_name=self.action_name, - message="🛴 当前玩家暂无待认领的周免游戏。", - ) - ) - return - - # 优先处理常规情况 urls.__len__() == 1 - for url in urls: - self.logger.debug( - ToolBox.runtime_report( - motive="STARTUP", - action_name="ScaffoldClaim", - message="🍜 正在为玩家领取周免游戏", - game=f"『{limited_free_game_objs[url]}』", - ) - ) - - # 反复生产挑战者领取周免游戏 - self.bricklayer.get_free_game( - page_link=url, ctx_cookies=ctx_cookies, _ctx_session=challenger - ) - # 编制运行缓存 用于生成业务报告 - _runtime = { - self.SPAWN_TIME: ToolBox.date_format_now(), - "status": self.bricklayer.result, - "name": limited_free_game_objs[url], - } - inline_docker.append(_runtime) - - # 标记运行时刻 - if self.scheduler.running: - self.logger.debug( - ToolBox.runtime_report( - motive="JOB", - action_name=self.action_name, - message="定时任务启动", - job="claim", - ) - ) - - # 初始化内联数据容器 临时存储运行缓存 - inline_docker = [] - - challenger = get_challenge_ctx(silence=self.silence) - try: - # 更新身份令牌 - if not self.bricklayer.cookie_manager.refresh_ctx_cookies( - _ctx_session=challenger - ): - return - ctx_cookies = self.bricklayer.cookie_manager.load_ctx_cookies() - - # 扫描商城促销活动,返回“0折”商品的名称与商城链接 - limited_free_game_objs = self.explorer.get_the_absolute_free_game( - ctx_cookies, _ctx_session=challenger - ) - - # 释放 Claimer 认领周免游戏 - _release_power(limited_free_game_objs["urls"]) - finally: - challenger.quit() - - # 缓存卸载 发送运行日志 - self._push(inline_docker) diff --git a/src/services/explorer/__init__.py b/src/services/explorer/__init__.py deleted file mode 100644 index 8c42a154ed..0000000000 --- a/src/services/explorer/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 15:20 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from .explorer import Explorer - -__all__ = ["Explorer"] diff --git a/src/services/explorer/core.py b/src/services/explorer/core.py deleted file mode 100644 index 4c62fda792..0000000000 --- a/src/services/explorer/core.py +++ /dev/null @@ -1,182 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 15:20 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import os.path -import time -from typing import List, ContextManager, Union, Dict - -from selenium.common.exceptions import WebDriverException -from selenium.webdriver import Chrome -from selenium.webdriver.common.action_chains import ActionChains -from selenium.webdriver.common.by import By -from selenium.webdriver.common.keys import Keys -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.wait import WebDriverWait - -from services.settings import DIR_EXPLORER, logger -from services.utils import ToolBox -from .exceptions import DiscoveryTimeoutException - - -class AwesomeFreeGirl: - """游戏商店探索者 获取免费游戏数据以及促销信息""" - - # 平台对象参数 - URL_STORE_HOME = "https://store.epicgames.com/zh-CN/" - URL_FREE_GAMES = "https://store.epicgames.com/zh-CN/free-games" - URL_STORE_PREFIX = "https://store.epicgames.com/zh-CN/browse?" - URL_STORE_FREE = ( - f"{URL_STORE_PREFIX}sortBy=releaseDate&sortDir=DESC&priceTier=tierFree&count=40" - ) - URL_PROMOTIONS = "https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions?locale=zh-CN" - URL_PRODUCT_PAGE = "https://store.epicgames.com/zh-CN/p/" - - def __init__(self, silence: bool = None): - self.silence = True if silence is None else silence - - # 驱动参数 - self.action_name = "AwesomeFreeGirl" - - # 运行缓存 - self.runtime_workspace = None - self.path_free_games = "ctx_games.csv" - self.game_objs = {} # {index0:{name:value url:value}, } - - # 初始化工作空间 - self._init_workspace() - - def _init_workspace(self) -> None: - """初始化工作目录 缓存游戏商店数据""" - self.runtime_workspace = "." if not os.path.exists(DIR_EXPLORER) else DIR_EXPLORER - self.path_free_games = os.path.join(self.runtime_workspace, self.path_free_games) - - def _discovery_free_games( - self, ctx: Union[ContextManager, Chrome], ctx_cookies: List[dict] - ) -> None: - """发现玩家所属地区可视的常驻免费游戏数据""" - - # 重载玩家令牌 - if ctx_cookies: - ctx.get(self.URL_STORE_FREE) - for cookie_dict in ctx_cookies: - ctx.add_cookie(cookie_dict) - - _mode = "(深度搜索)" if ctx_cookies else "(广度搜索)" - logger.debug( - ToolBox.runtime_report( - motive="DISCOVERY", - action_name=self.action_name, - message=f"📡 正在为玩家搜集免费游戏{_mode}...", - ) - ) - - # 获取免费游戏链接 - _start = time.time() - _url_store_free = self.URL_STORE_FREE - while True: - ctx.get(_url_store_free) - time.sleep(1) - WebDriverWait(ctx, 10, ignored_exceptions=WebDriverException).until( - EC.presence_of_element_located( - (By.XPATH, "//section[@data-testid='section-wrapper']") - ) - ) - - # 滑到底部 - action = ActionChains(ctx) - action.send_keys(Keys.END) - action.perform() - - # 判断异常跳转 - if "tierFree" not in ctx.current_url: - break - if time.time() - _start > 80: - raise DiscoveryTimeoutException("获取免费游戏链接超时") - - # 断言最后一页 - WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException).until( - EC.element_to_be_clickable( - (By.XPATH, "//a[@data-component='PaginationItem']") - ) - ) - page_switcher = ctx.find_elements( - By.XPATH, "//a[@data-component='PaginationItem']" - )[-1] - - # 提取价值信息 - game_objs = ctx.find_elements(By.XPATH, "//a[@class='css-1jx3eyg']") - for game_obj in game_objs: - name = game_obj.get_attribute("aria-label") - url = game_obj.get_attribute("href") - self.game_objs.update( - {self.game_objs.__len__(): {"name": name.strip(), "url": url.strip()}} - ) - - # 页面跳转判断 - page_end = page_switcher.get_attribute("href") - if page_end in ctx.current_url: - break - - # 更新跳转链接 - _url_store_free = page_end - - logger.success( - ToolBox.runtime_report( - motive="DISCOVERY", - action_name=self.action_name, - message="免费游戏搜集完毕", - qsize=len(self.game_objs), - ) - ) - - def stress_expressions(self, ctx: Union[ContextManager, Chrome]) -> Dict[str, str]: - """ - 应力表达式的主要实现 - - :param ctx: 浏览器驱动上下文 - :return: 不需要 quit() - """ - logger.debug( - ToolBox.runtime_report( - motive="DISCOVERY", - action_name=self.action_name, - message="📡 使用应力表达式搜索周免游戏...", - ) - ) - - # 访问链接 游戏名称 - pending_games = {} - - for _ in range(2): - try: - ctx.get(self.URL_STORE_HOME) - time.sleep(3) - - # 定位周免游戏的绝对位置 - WebDriverWait(ctx, 45, ignored_exceptions=WebDriverException).until( - EC.presence_of_element_located( - (By.XPATH, "//a[contains(string(),'当前免费')]") - ) - ) - - # 周免游戏基本信息 - stress_operator = ctx.find_elements( - By.XPATH, "//a[contains(string(),'当前免费')]" - ) - img_seq = ctx.find_elements( - By.XPATH, "//a[contains(string(),'当前免费')]//img" - ) - - # 重组周免游戏信息 - for index, _ in enumerate(stress_operator): - href = stress_operator[index].get_attribute("href") - alias = img_seq[index].get_attribute("alt") - pending_games[href] = alias - - break - except (WebDriverException, AttributeError): - continue - - return pending_games diff --git a/src/services/explorer/exceptions.py b/src/services/explorer/exceptions.py deleted file mode 100644 index 8161cb1d19..0000000000 --- a/src/services/explorer/exceptions.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 15:20 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from typing import Optional, Sequence - - -class Explorer(Exception): - def __init__( - self, msg: Optional[str] = None, stacktrace: Optional[Sequence[str]] = None - ): - self.msg = msg - self.stacktrace = stacktrace - super().__init__() - - def __str__(self) -> str: - exception_msg = "Message: {}\n".format(self.msg) - if self.stacktrace: - stacktrace = "\n".join(self.stacktrace) - exception_msg += "Stacktrace:\n{}".format(stacktrace) - return exception_msg - - -class DiscoveryTimeoutException(Explorer): - """未能在规定时间内为指定玩家搜索免费游戏""" diff --git a/src/services/explorer/explorer.py b/src/services/explorer/explorer.py deleted file mode 100644 index 051481d04a..0000000000 --- a/src/services/explorer/explorer.py +++ /dev/null @@ -1,280 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/17 15:20 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import csv -import json.decoder -from typing import List, Optional, Union, Dict, Any - -import cloudscraper -from lxml import etree - -from services.settings import logger -from services.utils import ToolBox, get_ctx -from .core import AwesomeFreeGirl -from .exceptions import DiscoveryTimeoutException - - -class GameLibManager(AwesomeFreeGirl): - """游戏对象管理 缓存商城数据以及判断游戏在库状态""" - - def __init__(self): - super().__init__() - - self.action_name = "GameLibManager" - - def save_game_objs(self, game_objs: List[Dict[str, str]]) -> None: - """缓存免费商城数据""" - if not game_objs: - return - - with open(self.path_free_games, "w", encoding="utf8", newline="") as file: - writer = csv.writer(file) - writer.writerow(["name", "url"]) - for game_obj in game_objs: - cell = (game_obj["name"], game_obj["url"]) - writer.writerow(cell) - - logger.success( - ToolBox.runtime_report( - motive="SAVE", - action_name=self.action_name, - message="Cache free game information.", - ) - ) - - def load_game_objs(self, only_url: bool = True) -> Optional[List[str]]: - """ - 加载缓存在本地的免费游戏对象 - - :param only_url: - :return: - """ - try: - with open(self.path_free_games, "r", encoding="utf8") as file: - data = list(csv.reader(file)) - except FileNotFoundError: - return [] - else: - if not data: - return [] - if only_url: - return [i[-1] for i in data[1:]] - return data[1:] - - def is_my_game( - self, ctx_cookies: Union[List[dict], str], page_link: str - ) -> Optional[dict]: - """ - 判断游戏在库状态 - - :param ctx_cookies: - :param page_link: - :return: - None 异常状态 - True 跳过任务 - False 继续任务 - """ - headers = { - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62", - "cookie": ctx_cookies - if isinstance(ctx_cookies, str) - else ToolBox.transfer_cookies(ctx_cookies), - } - scraper = cloudscraper.create_scraper() - response = scraper.get(page_link, headers=headers) - tree = etree.HTML(response.content) - assert_obj = tree.xpath( - "//span[@data-component='PurchaseCTA']//span[@data-component='Message']" - ) - - # 🚧 异常状态 - if not assert_obj: - logger.debug( - ToolBox.runtime_report( - motive="IGNORE", - action_name=self.action_name, - message="忽略尚未发布的游戏对象", - url=page_link, - ) - ) - return {"assert": "AssertObjectNotFound", "status": None} - - assert_message = assert_obj[0].text - response_obj = {"assert": assert_message, "warning": "", "status": None} - - # 🚧 跳过 `无法认领` 的日志信息 - if assert_message in ["已在游戏库中", "立即购买", "即将推出"]: - response_obj["status"] = True - # 🚧 惰性加载,前置节点不处理动态加载元素 - elif assert_message in ["正在载入"]: - response_obj["status"] = False - # 🍟 未领取的免费游戏 - elif assert_message in ["获取"]: - warning_obj = tree.xpath("//h1[@class='css-1gty6cv']//span") - # 出现遮挡警告 - if warning_obj: - warning_message = warning_obj[0].text - response_obj["warning"] = warning_message - # 成人内容可获取 - if "成人内容" in warning_message: - response_obj["status"] = False - else: - logger.warning( - ToolBox.runtime_report( - motive="SKIP", - action_name=self.action_name, - message=warning_message, - url=page_link, - ) - ) - response_obj["status"] = None - # 继续任务 - else: - response_obj["status"] = False - - return response_obj - - -class Explorer(AwesomeFreeGirl): - """商城探索者 发现常驻免费游戏以及周免游戏""" - - def __init__(self, silence: Optional[bool] = None): - super().__init__(silence=silence) - - self.action_name = "Explorer" - - self.game_manager = GameLibManager() - - def discovery_free_games( - self, ctx_cookies: Optional[List[dict]] = None, cover: bool = True - ) -> Optional[List[str]]: - """ - 发现免费游戏。 - - 本周免费 + 常驻免费 - ________________________________________________________ - 1. 此接口可以不传 cookie,免费游戏是公开可见的。 - 2. 但如果要查看免费游戏的在库状态,需要传 COOKIE 区分用户。 - - 有些游戏不同地区的玩家不一定都能玩。这个限制和账户地区信息有关,和当前访问的(代理)IP 无关。 - - 请确保传入的 COOKIE 是有效的。 - :param cover: - :param ctx_cookies: ToolBox.transfer_cookies(api.get_cookies()) - :return: - """ - # 创建驱动上下文 - with get_ctx(silence=self.silence) as ctx: - try: - self._discovery_free_games(ctx=ctx, ctx_cookies=ctx_cookies) - except DiscoveryTimeoutException: - return self.discovery_free_games(ctx_cookies=None, cover=cover) - - # 提取游戏平台对象 - game_objs = self.game_objs.values() - - # 运行缓存持久化 - if cover: - self.game_manager.save_game_objs(game_objs) - - # 返回链接 - return [game_obj.get("url") for game_obj in game_objs] - - def get_the_limited_free_game( - self, ctx_cookies: Optional[List[dict]] = None - ) -> Dict[str, Any]: - """ - 获取周免游戏 - - :param ctx_cookies: - :return: - """ - - def _update_limited_free_game_objs(element_: dict): - free_game_objs[url] = element_["title"] - free_game_objs["urls"].append(url) - - free_game_objs = {"urls": []} - - scraper = cloudscraper.create_scraper() - response = scraper.get(self.URL_PROMOTIONS) - - try: - data = response.json() - except json.decoder.JSONDecodeError: - pass - else: - elements = data["data"]["Catalog"]["searchStore"]["elements"] - for element in elements: - promotions = element.get("promotions") - - # 剔除掉过期的折扣实体 - if not promotions: - continue - - # 提取商品页slug - url = self.URL_PRODUCT_PAGE + element["urlSlug"] - - # 健壮工程,预判数据类型的变更 - if not ctx_cookies: - # 获取实体的促销折扣值 discount_percentage - discount_setting = promotions["promotionalOffers"][0][ - "promotionalOffers" - ][0]["discountSetting"] - discount_percentage = discount_setting["discountPercentage"] - if ( - not isinstance(discount_percentage, str) - and not discount_percentage - ) or ( - isinstance(discount_percentage, str) - and not float(discount_percentage) - ): - _update_limited_free_game_objs(element) - else: - response = self.game_manager.is_my_game( - ctx_cookies=ctx_cookies, page_link=url - ) - if ( - not response["status"] - and response["assert"] != "AssertObjectNotFound" - ): - _update_limited_free_game_objs(element) - - return free_game_objs - - def get_the_absolute_free_game( - self, ctx_cookies: Optional[List[dict]], _ctx_session=None - ) -> Dict[str, Any]: - """使用应力表达式萃取商品链接""" - - free_game_objs = {"urls": []} - - # 使用应力表达式萃取商品链接 - if _ctx_session: - critical_memory = _ctx_session.current_window_handle - try: - _ctx_session.switch_to.new_window("tab") - pending_games: Dict[str, str] = self.stress_expressions(ctx=_ctx_session) - finally: - _ctx_session.switch_to.window(critical_memory) - else: - with get_ctx(silence=self.silence) as ctx: - pending_games: Dict[str, str] = self.stress_expressions(ctx=ctx) - - # 中断空对象的工作流 - if not pending_games: - return free_game_objs - - # 任务批处理 - for url, title in pending_games.items(): - # 带入身份令牌判断周免游戏的在库状态 - response = self.game_manager.is_my_game( - ctx_cookies=ctx_cookies, page_link=url - ) - if not response["status"] and response["assert"] != "AssertObjectNotFound": - # 将待认领的周免游戏送入任务队列 - free_game_objs[url] = title - free_game_objs["urls"].append(url) - - return free_game_objs diff --git a/src/services/scaffold.py b/src/services/scaffold.py deleted file mode 100644 index 0b11b51f85..0000000000 --- a/src/services/scaffold.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from typing import Optional - -from apis.scaffold import get, challenge, install, claimer - - -class Scaffold: - """系统脚手架 顶级接口指令""" - - @staticmethod - def install(onnx_prefix: Optional[str] = None): - """下载运行依赖""" - install.run(onnx_prefix=onnx_prefix) - - @staticmethod - def test(): - """检查挑战者驱动版本是否适配""" - install.test() - - @staticmethod - def challenge(silence: Optional[bool] = True): - """ - 正面硬刚人机挑战,为当前账号获取有效的身份令牌。 - - ## Intro - - - 请确保你已在 `config.yaml` 中配置了正确的账号信息。 - - 更新后的 cookie 存储在 `/src/database/cookies/user_cookies.txt` 文件中 - - ## Tips - - - 本指令并不会强制激活人机验证。硬刚人机挑战不是目的,获取到有效的身份令牌才是目的,不要徒增功耗。 - - 也即,如果当前缓存的身份令牌还未失效,挑战跳过。 - - :param silence: (Default: True) IF False: 将在图形化系统中显式启动浏览器,演示人机挑战的执行过程。 - :return: - """ - challenge.run(silence=silence) - - @staticmethod - def get(debug: Optional[bool] = None, cache: Optional[bool] = True): - """ - 「我可以不玩但不能没有。」—— 鲁·克莱摩·迅 - - ## Intro - - - `get` 只做一件事,搬空免费商店! - - - 这是个趣味性和观赏性都拉满的一次性任务。系统会根据你的设备性能发起最高 4 协程并发的驱动任务,为你节省扫荡时间。 - - - 显然地,这是一项对操作系统内存和网络I/O要求都不低的任务,如果你嫌这五六十款(不同地区权限不同) - 多余的常驻免费游戏会影响你翻找游戏库的效率,请速速退朝。 - - - `get` 指令启动标准上下文执行任务,其并不足以应付隐藏在订单中的人机挑战。因此,`get` 指令会自动跳过未认领的周免游戏。 - 请使用生产效率更高的 `claim` 指令认领周免游戏。 - - ## Local Static CacheFile - - 此指令会将免费商城数据存储在 `src/database/explorer` 目录下。存储内容与当前上下文身份令牌有关(不同地区权限不同) - - ## Contributes - - - 若运行出现意料之外的报错,请运行 debug 模式,留意 Exception 信息,并将完整的栈追踪信息提交至 `issues` ,不胜感激! - - https://github.com/QIN2DIM/epic-awesome-gamer - - :param cache: 使用商城缓存数据 - :param debug: 显示栈追踪日志信息 - :return: - """ - get.join(trace=debug, cache=cache) - - @staticmethod - def claim(silence: Optional[bool] = True): - """ - 认领周免游戏。 - - ## Intro - - `claim` 做的事非常简单,获取本周促销数据,分析是否有待认领的周免游戏,根据分析结果执行相关任务。 - - `claim` 是系统级指令 `deploy` 的单步子任务,在上述业务结束后,会根据你配置的 `pusher` 推送追踪日志(若配置无效则不发)。 - - :return: - """ - claimer.run(silence=silence) - - @staticmethod - def deploy(platform: Optional[str] = None): - """ - 部署系统定时任务。 - - :param platform: 可选项 [vps serverless qing-long] - :return: - """ - claimer.deploy(platform) diff --git a/src/services/settings.py b/src/services/settings.py deleted file mode 100644 index ef24792e57..0000000000 --- a/src/services/settings.py +++ /dev/null @@ -1,132 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import os -import sys -from os.path import join, dirname -from typing import Dict, Any, Optional - -from services.utils import ToolBox - -__all__ = [ - # ------------------------------ - # SETTINGS - # ------------------------------ - "logger", - "DIR_CHALLENGE", - "DIR_COOKIES", - "DIR_TEMP_CACHE", - "DIR_EXPLORER", - "PATH_USR_COOKIES", - "DIR_MODEL", - "PATH_RAINBOW", - # ------------------------------ - # CONFIG - # ------------------------------ - "EPIC_PASSWORD", - "EPIC_EMAIL", - "MESSAGE_PUSHER_SETTINGS", -] -__version__ = "0.3.3.dev" - -""" -================================================ ʕ•ﻌ•ʔ ================================================ - (·▽·)欢迎嫖友入座 -================================================ ʕ•ﻌ•ʔ ================================================ -[√]核心配置 [※]边缘参数 -""" -# --------------------------------------------------- -# [√]工程根目录定位 -# --------------------------------------------------- -# 系统根目录 -PROJECT_ROOT = dirname(dirname(__file__)) -# 文件数据库目录 -PROJECT_DATABASE = join(PROJECT_ROOT, "database") -# YOLO模型 -DIR_MODEL = join(PROJECT_ROOT, "model") -# Reinforcement of memory -PATH_RAINBOW = join(DIR_MODEL, "rainbow.yaml") -# Cookie 工作目录 -DIR_COOKIES = join(PROJECT_DATABASE, "cookies") -PATH_USR_COOKIES = join(DIR_COOKIES, "user_cookies.txt") -# FreeGame Mining Workspace -DIR_EXPLORER = join(PROJECT_DATABASE, "explorer") -# 运行缓存目录 -DIR_TEMP_CACHE = join(PROJECT_DATABASE, "temp_cache") -# 挑战缓存 -DIR_CHALLENGE = join(DIR_TEMP_CACHE, "_challenge") -# 服务日志目录 -DIR_LOG = join(PROJECT_DATABASE, "logs") -# --------------------------------------------------- -# [√]服务器日志配置 -# --------------------------------------------------- -logger = ToolBox.init_log( - error=join(DIR_LOG, "error.log"), runtime=join(DIR_LOG, "runtime.log") -) - -# --------------------------------------------------- -# 路径补全 -# --------------------------------------------------- -for _pending in [ - PROJECT_DATABASE, - DIR_MODEL, - DIR_EXPLORER, - DIR_COOKIES, - DIR_TEMP_CACHE, - DIR_CHALLENGE, - DIR_LOG, -]: - if not os.path.exists(_pending): - os.mkdir(_pending) -""" -================================================== ʕ•ﻌ•ʔ ================================================== - 若您并非项目开发者 请勿修改以下变量的默认参数 -================================================== ʕ•ﻌ•ʔ ================================================== - - Enjoy it -> ♂ main.py -""" -config_ = ToolBox.check_sample_yaml( - path_output=join(dirname(dirname(__file__)), "config.yaml"), - path_sample=join(dirname(dirname(__file__)), "config-sample.yaml"), -) -# -------------------------------- -# [√] 账号信息 -# -------------------------------- -# 不建议在公有库上创建工作流运行项目,有仓库禁用风险 -EPIC_EMAIL: Optional[str] = config_.get("EPΙC_EMAΙL", "") -EPIC_PASSWORD: Optional[str] = config_.get("EPΙC_PASSWΟRD", "") - -# -------------------------------- -# [※] 消息推送配置 -# -------------------------------- -MESSAGE_PUSHER_SETTINGS: Optional[Dict[str, Any]] = config_.get( - "message_pusher_settings", {} -) -PUSHER: Optional[Dict[str, Optional[str]]] = MESSAGE_PUSHER_SETTINGS.get("pusher", {}) - -# -------------------------------- -# [※] 补全语法模板 -# -------------------------------- -if not EPIC_EMAIL: - EPIC_EMAIL = os.getenv("EPΙC_EMAΙL", "") -if not EPIC_PASSWORD: - EPIC_PASSWORD = os.getenv("EPΙC_PASSWΟRD", "") - -try: - for server in PUSHER: - if not PUSHER[server]: - PUSHER[server] = os.getenv(server, "") -except KeyError as e: - print(f"[进程退出] 核心配置文件被篡改 error={e}") - sys.exit() -# -------------------------------- -# [√] 阻止缺省配置 -# -------------------------------- -if not all((EPIC_EMAIL, EPIC_PASSWORD)): - print("[进程退出] 账号信息未配置或相关变量不合法") - sys.exit() - -if not any(PUSHER.values()): - MESSAGE_PUSHER_SETTINGS["enable"] = False diff --git a/src/services/utils/__init__.py b/src/services/utils/__init__.py deleted file mode 100644 index b19728c0d6..0000000000 --- a/src/services/utils/__init__.py +++ /dev/null @@ -1,35 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 0:25 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -from .accelerator.core import CoroutineSpeedup, AshFramework -from .armor.anti_hcaptcha.core import ArmorCaptcha -from .armor.anti_hcaptcha.exceptions import ( - LabelNotFoundException, - ChallengeReset, - ChallengeTimeout, -) -from .armor.anti_hcaptcha.solutions.sk_recognition import ( - RiverChallenger, - DetectionChallenger, -) -from .armor.anti_hcaptcha.solutions.yolo import YOLO -from .armor.anti_hcaptcha.solutions.sk_recognition import SKRecognition -from .toolbox.toolbox import ToolBox, get_ctx, get_challenge_ctx - -__all__ = [ - "ToolBox", - "ArmorCaptcha", - "LabelNotFoundException", - "CoroutineSpeedup", - "AshFramework", - "get_ctx", - "get_challenge_ctx", - "ChallengeReset", - "ChallengeTimeout", - "YOLO", - "SKRecognition", - "RiverChallenger", - "DetectionChallenger", -] diff --git a/src/services/utils/accelerator/__init__.py b/src/services/utils/accelerator/__init__.py deleted file mode 100644 index 6a36507c1c..0000000000 --- a/src/services/utils/accelerator/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2021/12/25 17:05 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: diff --git a/src/services/utils/accelerator/core.py b/src/services/utils/accelerator/core.py deleted file mode 100644 index a85f94f273..0000000000 --- a/src/services/utils/accelerator/core.py +++ /dev/null @@ -1,201 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2021/12/22 9:05 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import asyncio -import os -from typing import Optional, List, Union - -import aiohttp -import gevent -from gevent import queue - - -class CoroutineSpeedup: - """轻量化的协程控件""" - - def __init__(self, docker: Optional[List] = None, power: Optional[int] = None): - # 任务容器:queue - self.worker, self.done = queue.Queue(), queue.Queue() - - # 任务容器 - self.docker = docker - - # 协程数 - self.power = max(os.cpu_count(), 2) if power is None else power - - # 任务队列满载时刻长度 - self.max_queue_size = 0 - - def progress(self) -> str: - """ - 任务进度 - - :return: - """ - _progress = self.max_queue_size - self.worker.qsize() - return ( - "__pending__" - if _progress < self.power - else f"{_progress}/{self.max_queue_size}" - ) - - def launcher(self, *args, **kwargs): - """ - 适配器实例生产 - - :return: - """ - while not self.worker.empty(): - task = self.worker.get_nowait() - self.control_driver(task, *args, **kwargs) - - def control_driver(self, task, *args, **kwargs): - """ - 默认逻辑 - - :param task: - :return: - """ - raise NotImplementedError - - def preload(self): - """ - 数据预处理 - - :return: - """ - - def overload(self): - """ - 任务重载 - - :return: - """ - if self.docker: - for task in self.docker: - self.worker.put_nowait(task) - self.max_queue_size = self.worker.qsize() - - def offload(self) -> list: - """ - 缓存卸载 - - :return: - """ - docker = [] - while not self.done.empty(): - docker.append(self.done.get()) - return docker - - def killer(self): - """ - 缓存回收 - - :return: - """ - - def speedup(self, *args, **kwargs): - """ - 框架接口 - - :return: - """ - - # 任务重载 - self.overload() - - # 弹出空载任务 - if self.max_queue_size == 0: - return - - # 粘性功率 - power = kwargs.get("power") - self.power = self.power if power is None else power - self.power = ( - self.max_queue_size if self.power > self.max_queue_size else self.power - ) - - # 任务启动 - task_list = [] - for _ in range(self.power): - task = gevent.spawn(self.launcher, *args, **kwargs) - task_list.append(task) - try: - gevent.joinall(task_list) - finally: - self.killer() - - -class AshFramework: - """轻量化的协程控件""" - - def __init__(self, docker: Optional[List] = None): - # 任务容器:queue - self.worker, self.done = asyncio.Queue(), asyncio.Queue() - # 任务容器 - self.docker = docker - # 任务队列满载时刻长度 - self.max_queue_size = 0 - - def progress(self) -> str: - """任务进度""" - _progress = self.max_queue_size - self.worker.qsize() - return f"{_progress}/{self.max_queue_size}" - - def preload(self): - """预处理""" - - def overload(self): - """任务重载""" - if self.docker: - for task in self.docker: - self.worker.put_nowait(task) - self.max_queue_size = self.worker.qsize() - - def offload(self) -> Optional[List]: - """缓存卸载""" - crash = [] - while not self.done.empty(): - crash.append(self.done.get()) - return crash - - async def control_driver(self, context, session=None): - """需要并发执行的代码片段""" - raise NotImplementedError - - async def launcher(self, session=None): - """适配接口模式""" - while not self.worker.empty(): - context = self.worker.get_nowait() - await self.control_driver(context, session=session) - - async def subvert(self, workers: Union[str, int]): - """ - 框架接口 - - loop = asyncio.get_event_loop() - loop.run_until_complete(fl.go(workers)) - - :param workers: ["fast", power] - :return: - """ - # 任务重载 - self.overload() - - # 弹出空载任务 - if self.max_queue_size == 0: - return - - # 粘性功率 - workers = self.max_queue_size if workers in ["fast"] else workers - workers = workers if workers <= self.max_queue_size else self.max_queue_size - - # 弹性分发 - task_list = [] - async with aiohttp.ClientSession() as session: - for _ in range(workers): - task = self.launcher(session=session) - task_list.append(task) - await asyncio.wait(task_list) diff --git a/src/services/utils/armor/__init__.py b/src/services/utils/armor/__init__.py deleted file mode 100644 index d62eaa4e56..0000000000 --- a/src/services/utils/armor/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# -*- coding: utf-8 -*- -# Time : 2022/1/16 2:30 -# Author : QIN2DIM -# Github : https://github.com/QIN2DIM -# Description: -import os -import sys -import random -import requests - -if os.getenv("RUNNER_TOOL_CACHE"): - _uxo = f"https://github.com/{os.getenv('GITHUB_REPOSITORY', '')}" - try: - if requests.get(_uxo).status_code != 404: - raise requests.RequestException - except requests.RequestException: - print( - "[Warning] 禁止在 fork 分支上运行工作流,请创建私有工作流。\n" - "详见 https://blog.echosec.top/epic-awesome-gamer/zh/docs/player/hosting-deployment/privatisation/" - ) - if random.uniform(0, 1) > 0.15: - sys.exit() diff --git a/src/services/utils/armor/anti_hcaptcha/__init__.py b/src/services/utils/armor/anti_hcaptcha/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/services/utils/armor/anti_hcaptcha/core.py b/src/services/utils/armor/anti_hcaptcha/core.py deleted file mode 100644 index 7b2edf8466..0000000000 --- a/src/services/utils/armor/anti_hcaptcha/core.py +++ /dev/null @@ -1,253 +0,0 @@ -import os -import re -import time - -import requests -from loguru import logger -from selenium.common.exceptions import ( - ElementNotVisibleException, - ElementClickInterceptedException, - WebDriverException, - TimeoutException, -) -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.wait import WebDriverWait -from undetected_chromedriver import Chrome - -from .exceptions import LabelNotFoundException, ChallengeReset, ChallengeTimeout - - -class ArmorCaptcha: - """hCAPTCHA challenge 驱动控制""" - - def __init__(self, dir_workspace: str = None, debug=False): - - self.action_name = "ArmorCaptcha" - self.debug = debug - - # 存储挑战图片的目录 - self.runtime_workspace = "" - - # 博大精深! - self.label_alias = { - "自行车": "bicycle", - "火车": "train", - "卡车": "truck", - "公交车": "bus", - "巴土": "bus", - "巴士": "bus", - "飞机": "aeroplane", - "ー条船": "boat", - "船": "boat", - "汽车": "car", - "摩托车": "motorbike", - "垂直河流": "vertical river", - "天空中向左飞行的飞机": "airplane in the sky flying left", - } - - # 样本标签映射 {挑战图片1: locator1, ...} - self.alias2locator = {} - # 填充下载链接映射 {挑战图片1: url1, ...} - self.alias2url = {} - # 填充挑战图片的缓存地址 {挑战图片1: "/images/挑战图片1.png", ...} - self.alias2path = {} - # 存储模型分类结果 {挑战图片1: bool, ...} - self.alias2answer = {} - # 图像标签 - self.label = "" - # 运行缓存 - self.dir_workspace = dir_workspace if dir_workspace else "." - - self._headers = { - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62" - } - - def log(self, message: str, **params) -> None: - """格式化日志信息""" - if not self.debug: - return - - motive = "Challenge" - flag_ = f">> {motive} [{self.action_name}] {message}" - if params: - flag_ += " - " - flag_ += " ".join([f"{i[0]}={i[1]}" for i in params.items()]) - logger.debug(flag_) - - def _init_workspace(self): - """初始化工作目录,存放缓存的挑战图片""" - _prefix = f"{int(time.time())}" + f"_{self.label}" if self.label else "" - _workspace = os.path.join(self.dir_workspace, _prefix) - if not os.path.exists(_workspace): - os.mkdir(_workspace) - return _workspace - - def tactical_retreat(self) -> bool: - """模型存在泛化死角,遇到指定标签时主动进入下一轮挑战,节约时间""" - if self.label in ["水上飞机"] or not self.label_alias.get(self.label): - self.log(message="模型泛化较差,逃逸", label=self.label) - return True - return False - - def mark_samples(self, ctx: Chrome): - """ - 获取每个挑战图片的下载链接以及网页元素位置 - - :param ctx: - :return: - """ - self.log(message="获取挑战图片链接及元素定位器") - - # 等待图片加载完成 - WebDriverWait(ctx, 10, ignored_exceptions=ElementNotVisibleException).until( - EC.presence_of_all_elements_located((By.XPATH, "//div[@class='task-image']")) - ) - time.sleep(1) - - # DOM 定位元素 - samples = ctx.find_elements(By.XPATH, "//div[@class='task-image']") - for sample in samples: - alias = sample.get_attribute("aria-label") - while True: - try: - image_style = sample.find_element( - By.CLASS_NAME, "image" - ).get_attribute("style") - url = re.split(r'[(")]', image_style)[2] - self.alias2url.update({alias: url}) - break - except IndexError: - continue - self.alias2locator.update({alias: sample}) - - def get_label(self, ctx: Chrome): - """ - 获取人机挑战需要识别的图片类型(标签) - - :param ctx: - :return: - """ - try: - label_obj = WebDriverWait( - ctx, 30, ignored_exceptions=ElementNotVisibleException - ).until( - EC.presence_of_element_located((By.XPATH, "//div[@class='prompt-text']")) - ) - except TimeoutException: - raise ChallengeReset("人机挑战意外通过") - try: - _label = re.split(r"[包含 图片]", label_obj.text)[2][:-1] - except (AttributeError, IndexError): - raise LabelNotFoundException("获取到异常的标签对象。") - else: - self.label = _label - self.log( - message="获取挑战标签", - label=f"{self.label}({self.label_alias.get(self.label, 'none')})", - ) - - def download_images(self): - """ - 下载挑战图片 - - ### hcaptcha 设有挑战时长的限制 - - 如果一段时间内没有操作页面元素,