diff --git a/.github/workflows/SourceGit.yaml b/.github/workflows/SourceGit.yaml
new file mode 100644
index 0000000000..4a5fefda18
--- /dev/null
+++ b/.github/workflows/SourceGit.yaml
@@ -0,0 +1,53 @@
+name: Claim EpicGames from SourceGit
+
+on:
+ workflow_dispatch:
+ schedule:
+ - cron: "12 22 * * *"
+
+jobs:
+ setup:
+ env:
+ EPIC_EMAIL: ${{ secrets.EPIC_EMAIL }}
+ TOKEN: ${{ secrets.PUSHTOKEN }}
+ EPIC_PASSWORD: ${{ secrets.EPIC_PASSWORD }}
+ FAKE_HASH: "Automated deployment @ $(date '+%Y-%m-%d %H:%M:%S') Asia/Shanghai"
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: 初始化环境 & 拉取项目
+ run: |
+ sudo timedatectl set-timezone "Asia/Shanghai"
+ git clone https://github.com/QIN2DIM/epic-awesome-gamer.git epic
+ cp epic/requirements.txt .
+ - uses: actions/setup-python@v4
+ with:
+ python-version: "3.10"
+ cache: 'pip' # caching pip dependencies
+ - name: 安装依赖
+ run: |
+ pip install -r epic/requirements.txt
+ pip install requests
+ playwright install firefox
+ playwright install-deps firefox
+ - name: 启动项目
+ continue-on-error: true
+ timeout-minutes: 10
+ run: |
+ if [ -d "user_data_dir" ];then cp -rfp user_data_dir epic/; fi
+ echo "{}" > epic/src/config.json
+ cd epic/src/ && python3 claim.py
+ - name: Setup GIT user
+ uses: fregante/setup-git-user@v1
+ - name: 缓存身份令牌
+ run: |
+ if [ -d "epic/user_data_dir" ];then cp -rfp epic/user_data_dir . ; fi
+ if [ -d "epic/logs" ];then cp -rfp epic/logs . ; fi
+ echo "${{ env.FAKE_HASH }}" > _token
+ rm -rf epic
+ git add .
+ git commit -m "${{ env.FAKE_HASH }}"
+ git push -f
+ - name: PUSHPLUS
+ run: |
+ python PUSHPLUS.py
diff --git a/.github/workflows/ci_docker.yaml b/.github/workflows/ci_docker_fish.yaml
similarity index 69%
rename from .github/workflows/ci_docker.yaml
rename to .github/workflows/ci_docker_fish.yaml
index cde2e035bd..80b91d405e 100644
--- a/.github/workflows/ci_docker.yaml
+++ b/.github/workflows/ci_docker_fish.yaml
@@ -4,14 +4,16 @@ on:
workflow_dispatch:
push:
branches:
+# - update-challenge
- main
paths-ignore:
- ".github/**"
- "README.md"
- "LICENSE"
- ".gitignore"
+ - ".deepsource.toml"
env:
- IMAGE_TAG: ${{ secrets.DOCKERHUB_USERNAME }}/awesome-epic:daddy
+ IMAGE_TAG: ${{ secrets.DOCKERHUB_USERNAME }}/awesome-epic:fish
jobs:
docker:
@@ -22,22 +24,22 @@ jobs:
uses: actions/checkout@v2
-
name: Scaffold Init
- run: cd src && cp config-sample.yaml config.yaml
+ run: cd src && echo '{}' > config.json
-
name: Set up QEMU
- uses: docker/setup-qemu-action@v1
+ uses: docker/setup-qemu-action@v2
-
name: Set up Docker Buildx
- uses: docker/setup-buildx-action@v1
+ uses: docker/setup-buildx-action@v2
-
name: Login to DockerHub
- uses: docker/login-action@v1
+ uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
-
name: Build and push
- uses: docker/build-push-action@v2
+ uses: docker/build-push-action@v3
with:
context: .
platforms: linux/amd64 # linux/arm64
diff --git a/.github/workflows/samples/docker-compose.yaml b/.github/workflows/samples/docker-compose.yaml
new file mode 100644
index 0000000000..b95dd362be
--- /dev/null
+++ b/.github/workflows/samples/docker-compose.yaml
@@ -0,0 +1,19 @@
+version: "3.7"
+services:
+
+ epic-games-claimer:
+ image: ech0sec/awesome-epic:fish
+ init: true
+ command: "python3 claim.py"
+ volumes:
+ - "/home/epic/user_data_dir:/home/epic/user_data_dir"
+ - "/home/epic/logs:/home/epic/logs"
+ environment:
+ EPIC_EMAIL:
+ EPIC_PASSWORD:
+
+# ======================================
+# [🍜] Documentaion
+# ======================================
+# docker-compose https://docs.docker.com/compose/reference/
+# apprise https://github.com/caronc/apprise
diff --git a/.gitignore b/.gitignore
index f31568d8a4..1e26644624 100644
--- a/.gitignore
+++ b/.gitignore
@@ -128,13 +128,9 @@ dmypy.json
# Pyre type checker
.pyre/
.idea
-src/database
-src/model
-src/config.yaml
tests/
-docs-src/
-docs/
-.github/workflows/automated.yaml
-.github/workflows/automated_claim.yaml
-.github/workflows/sync_repo.yaml
-docker-compose.yaml
+archivist/
+src/*.json
+user_data_dir/
+logs/
+database/
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index 8e91625784..8a93f3c6b2 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,33 +1,13 @@
-#FROM amazonlinux:latest as builder
-#
-#WORKDIR /home/epic
-#
-#RUN yum update -y \
-# && yum install -y python3 wget
-#
-#COPY requirements.txt ./
-#RUN pip3 install --no-cache-dir -r requirements.txt
-#
-#COPY src ./
-#RUN wget https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm \
-# && yum localinstall -y google-chrome-stable_current_x86_64.rpm \
-# && rm google-chrome-stable_current_x86_64.rpm \
-# && wget -P model/ https://github.com/QIN2DIM/hcaptcha-challenger/releases/download/model/yolov5n6.onnx
-#
+FROM python:3.10 as builder
-FROM python:3.10-slim as builder
-
-WORKDIR /home/epic
+WORKDIR /home/epic/src
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN apt update -y \
- && apt install -y wget
+ && apt install -y wget xvfb tini \
+ && playwright install firefox \
+ && playwright install-deps firefox
-COPY src ./
-RUN wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb \
- && apt install -y ./google-chrome-stable_current_amd64.deb \
- && rm ./google-chrome-stable_current_amd64.deb \
- && wget -P model/ https://github.com/QIN2DIM/hcaptcha-challenger/releases/download/model/yolov5n6.onnx \
- && wget -P model/ https://github.com/QIN2DIM/hcaptcha-challenger/releases/download/model/rainbow.yaml
+COPY src ./
\ No newline at end of file
diff --git a/PUSHPLUS.py b/PUSHPLUS.py
new file mode 100644
index 0000000000..b2ab1eade5
--- /dev/null
+++ b/PUSHPLUS.py
@@ -0,0 +1,7 @@
+import requests
+import os
+token = os.environ['TOKEN']
+title= 'Epic-FreeGamer'
+content ='Epic-FreeGamer任务已执行'
+url = 'http://www.pushplus.plus/send?token='+token+'&title='+title+'&content='+content
+requests.get(url)
diff --git a/README.md b/README.md
index f6b1fbe845..8eca2cd9be 100644
--- a/README.md
+++ b/README.md
@@ -1,14 +1,13 @@
EPIC 免费人
-
🚀 优雅地领取 Epic 免费游戏
+
🍷 Gracefully claim weekly free games from Epic Store.
![](https://img.shields.io/docker/pulls/ech0sec/awesome-epic?color=green&style=for-the-badge)
-
![](https://img.shields.io/github/workflow/status/QIN2DIM/epic-awesome-gamer/scaffold_claim?style=for-the-badge)
+
-
![](https://img.shields.io/github/stars/QIN2DIM/epic-awesome-gamer?style=social)
-
![](https://img.shields.io/twitter/follow/QIN2DIM?label=Tweet&style=social)
-
![](https://img.shields.io/static/v1?style=social&logo=telegram&label=chat&message=studio)
+
![Discord](https://img.shields.io/discord/978108215499816980?style=social&logo=discord&label=echosec)
+
@@ -16,20 +15,35 @@
![scaffold-get-demo-output-small](https://github.com/QIN2DIM/img_pool/blob/main/img/scaffold-get-demo-output-small.gif)
-## 项目简介 👋
+## Introduction 👋
-[Epic AwesomeGamer](https://github.com/QIN2DIM/epic-awesome-gamer) 帮助玩家优雅地领取 Epic 免费游戏。
+[Epic 免费人](https://github.com/QIN2DIM/epic-awesome-gamer) 帮助玩家优雅地领取免费游戏。内置 [hcaptcha-challenger](https://github.com/QIN2DIM/hcaptcha-challenger) AI 模块,直面人机挑战。
-## 快速上手 🛴
+## Guides
-- :gear: [技术文档](https://www.wolai.com/vAiu9mSp6G15xhWeoEPUn2)
-- :small_red_triangle: 注意事项
-- :loudspeaker: 更新日志
-- :world_map: [开源计划](https://github.com/QIN2DIM/epic-awesome-gamer/issues/1)
+[ [`简体中文`](https://echosec.notion.site/Epic-7c74f1e29117420dbac5551e4b031f82?pvs=4) ] [ [`English`](https://echosec.notion.site/Epic-Awesome-Gamer-ba870cdf64c149e69f417448b1eb83c5?pvs=4) ]
-## 联系我们 📧
+## Features
-> 本项目由海南大学机器人与人工智能实验室(`RobAI-Lab`)提供维护。
-- [**Email**](mailto:HainanU_arai@163.com?subject=CampusDailyAutoSign-ISSUE) **||** [**Home**](https://a-rai.github.io/) **||** [**TG**](https://t.me/joinchat/HlB9SQJubb5VmNU5)
+### Task
+
+| Mode | Target | Progress |
+| ---------- | ------------------------------------------------------------ | -------- |
+| epic-games | [Epic Games Store](https://www.epicgames.com/store/free-games) | ✅ |
+| unreal | [Unreal Engine](https://www.unrealengine.com/) | 🚧 |
+| gog | [GOG](https://www.gog.com/) | 🚧 |
+| apg | [Amazon Prime Gaming](https://gaming.amazon.com/) | 🚧 |
+| xbox | [Xbox Live Games with Gold](https://www.xbox.com/en-US/live/gold#gameswithgold) | 🚧 |
+
+### Component
+
+| Demand | Support |
+| :------------------- | :------ |
+| hCaptcha Solver | ✅ |
+| Docker development | ✅ |
+| Persistent context @multi-user | ✅ |
+| Rolling Upgrade | 🚧 |
+| Epicgames DLC | 🚧 |
+| 2FA OTP support | 🚧 |
diff --git a/_token b/_token
new file mode 100644
index 0000000000..38790b853c
--- /dev/null
+++ b/_token
@@ -0,0 +1 @@
+Automated deployment @ 2024-10-23 06:33:15 Asia/Shanghai
diff --git a/requirements.txt b/requirements.txt
index 8c98605794..bdf5340ba5 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,37 +1 @@
-# {{< Scaffolding dependency >}}
-fire>=0.4.0
-
-# {{< Log system >}}
-loguru>=0.5.3
-pyyaml>=6.0
-
-# {{< Timed task >}}
-apscheduler>=3.8.1
-pytz>=2021.3
-
-# {{< Interface /Data cleaning >}}
-requests>=2.27.1
-cloudscraper>=1.2.58
-bs4>=0.0.1
-beautifulsoup4>=4.10.0
-lxml>=4.7.1
-aiohttp>=3.8.1
-
-# {{< Object Detection >}}
-opencv-python>=4.5.5.62
-scikit-image>=0.19.2
-numpy==1.21.5
-
-# {{< Challenger Drive >}}
-gevent>=21.12.0
-selenium>=4.1.0
-webdriver_manager==3.5.2
-undetected_chromedriver==3.1.3
-
-# {{< Message Push >}}
-apprise>=0.9.6
-
-# {{< Conflicting dependencies >}}
-# Manually install according to your needs.
-# Used to skip `tls in tls` authentication.
-# urllib3==1.25.11
+hcaptcha-challenger[playwright]
diff --git a/src/apis/__init__.py b/src/apis/__init__.py
deleted file mode 100644
index 35c4df7bf2..0000000000
--- a/src/apis/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
diff --git a/src/apis/scaffold/__init__.py b/src/apis/scaffold/__init__.py
deleted file mode 100644
index 5ba804a652..0000000000
--- a/src/apis/scaffold/__init__.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import os
-import random
-import sys
-
-_THE_PROPHECY = """
-# ====================================================================================================
-# Ithlinne's Prophecy
-# ====================================================================================================
-# Verily I say unto you,the era of the sword and axe is nigh, the era of the wolf's blizzard.
-# The Time of the White Chill and the White Light is nigh, the Time of Madness and the Time of Contempt:
-# Tedd Deireádh, the Time of End. The world will die amidst frost and be reborn with the new sun.
-# It will be reborn of Elder Blood, of Hen Ichaer, of the seed that has been sown.
-# A seed which will not sprout but burst into flame.
-# Ess'tuath esse! Thus it shall be! Watch for the signs!
-# What signs these shall be, I say unto you:
-# first the earth will flow with the blood of claim , the Blood of Epic .
-# ====================================================================================================
-"""
-if random.uniform(0, 1) > 0.711:
- for policy in ["epic", "claim"]:
- if policy in os.getenv("GITHUB_REPOSITORY", "").lower():
- print(f"[EXIT] 仓库名出现非法关键词 `{policy}`")
- sys.exit()
- print(_THE_PROPHECY)
diff --git a/src/apis/scaffold/challenge.py b/src/apis/scaffold/challenge.py
deleted file mode 100644
index ec841f9166..0000000000
--- a/src/apis/scaffold/challenge.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from typing import Optional
-
-from services.bricklayer import Bricklayer
-from services.settings import PATH_USR_COOKIES, logger
-from services.utils import ToolBox
-
-bricklayer = Bricklayer()
-
-
-def run(silence: Optional[bool] = None):
- """刷新上下文身份令牌"""
- logger.info(
- ToolBox.runtime_report(
- motive="STARTUP", action_name="ScaffoldChallenge", message="正在更新身份令牌..."
- )
- )
-
- # [🌀] 激活人机挑战
- if not bricklayer.cookie_manager.refresh_ctx_cookies(silence=silence):
- return
-
- # [🌀] 读取新的身份令牌
- ctx_cookies = bricklayer.cookie_manager.load_ctx_cookies()
-
- # [🌀] 保存身份令牌
- with open(PATH_USR_COOKIES, "w", encoding="utf8") as file:
- file.write(ToolBox.transfer_cookies(ctx_cookies))
-
- logger.success(
- ToolBox.runtime_report(
- motive="GET",
- action_name="ChallengeRunner",
- message="玩家饼干已到货。",
- path=PATH_USR_COOKIES,
- )
- )
diff --git a/src/apis/scaffold/claimer.py b/src/apis/scaffold/claimer.py
deleted file mode 100644
index 8e38655402..0000000000
--- a/src/apis/scaffold/claimer.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from typing import Optional
-
-from services.deploy import ClaimerScheduler
-from services.settings import logger
-
-
-@logger.catch()
-def deploy(platform: Optional[str] = None):
- """在微小容器中部署 `claim` 定时调度任务"""
- ClaimerScheduler(silence=True).deploy_jobs(platform)
-
-
-@logger.catch()
-def run(silence: Optional[bool] = None):
- """运行 `claim` 单步子任务,认领周免游戏"""
- ClaimerScheduler(silence=silence).job_loop_claim()
diff --git a/src/apis/scaffold/get.py b/src/apis/scaffold/get.py
deleted file mode 100644
index 3ee4d1b0a7..0000000000
--- a/src/apis/scaffold/get.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import random
-from typing import Optional
-
-from selenium.common.exceptions import WebDriverException
-
-from services.bricklayer import Bricklayer
-from services.explorer import Explorer
-from services.settings import logger
-from services.utils import CoroutineSpeedup, ToolBox
-
-SILENCE = True
-
-bricklayer = Bricklayer(silence=SILENCE)
-explorer = Explorer(silence=SILENCE)
-
-
-class SpawnBooster(CoroutineSpeedup):
- """协程助推器 并发执行片段代码"""
-
- def __init__(
- self,
- docker,
- ctx_cookies,
- power: Optional[int] = None,
- debug: Optional[bool] = None,
- ):
- super().__init__(docker=docker, power=power)
-
- self.debug = False if debug is None else debug
- self.power = min(4, 4 if power is None else power)
- self.action_name = "SpawnBooster"
-
- self.ctx_cookies = ctx_cookies
-
- if self.docker:
- random.shuffle(self.docker)
-
- def control_driver(self, task, *args, **kwargs):
- url = task
-
- # 运行前置检查
- response = explorer.game_manager.is_my_game(
- ctx_cookies=self.ctx_cookies, page_link=url
- )
-
- # 识别未在库的常驻周免游戏
- if response.get("status") is False:
- logger.debug(
- ToolBox.runtime_report(
- motive="BUILD",
- action_name=self.action_name,
- message="🛒 正在为玩家领取免费游戏",
- progress=f"[{self.progress()}]",
- url=url,
- )
- )
-
- # 启动 Bricklayer 获取免费游戏
- try:
- bricklayer.get_free_game(
- page_link=url, ctx_cookies=self.ctx_cookies, refresh=False
- )
- except WebDriverException as error:
- if self.debug:
- logger.exception(error)
- logger.error(
- ToolBox.runtime_report(
- motive="QUIT",
- action_name="SpawnBooster",
- message="未知错误",
- progress=f"[{self.progress()}]",
- url=url,
- )
- )
-
- def killer(self):
- logger.success(
- ToolBox.runtime_report(
- motive="OVER", action_name=self.action_name, message="✔ 任务队列已清空"
- )
- )
-
-
-def join(trace: bool = False, cache: bool = True):
- """
- 一键搬空免费商店
-
- 需要确保上下文身份令牌有效,可通过 `challenge` 脚手架强制刷新。
- :param cache:
- :param trace:
- :return:
- """
- from gevent import monkey
-
- monkey.patch_all(ssl=False)
-
- logger.info(
- ToolBox.runtime_report(
- motive="STARTUP", action_name="ScaffoldGet", message="🔨 正在为玩家领取免费游戏"
- )
- )
-
- # [🔨] 读取有效的身份令牌
- ctx_cookies = bricklayer.cookie_manager.load_ctx_cookies()
- if not bricklayer.cookie_manager.is_available_cookie(ctx_cookies):
- logger.critical(
- ToolBox.runtime_report(
- motive="SKIP",
- action_name="ScaffoldGet",
- message="身份令牌不存在或失效,手动执行 `challenge` 指令更新身份令牌。",
- )
- )
- return
-
- # [🔨] 缓存免费商城数据
- urls = explorer.game_manager.load_game_objs(only_url=True)
- if not cache or not urls:
- urls = explorer.discovery_free_games(ctx_cookies=ctx_cookies, cover=True)
-
- # [🔨] 启动 Bricklayer 搬空免费商店
- # 启动一轮协程任务,执行效率受限于本地网络带宽
- SpawnBooster(ctx_cookies=ctx_cookies, docker=urls, power=4, debug=trace).speedup()
diff --git a/src/apis/scaffold/install.py b/src/apis/scaffold/install.py
deleted file mode 100644
index 6c6286d99c..0000000000
--- a/src/apis/scaffold/install.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/20 16:16
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import sys
-import webbrowser
-
-from webdriver_manager.chrome import ChromeDriverManager
-from webdriver_manager.utils import get_browser_version_from_os
-
-from services.settings import DIR_MODEL, logger, PATH_RAINBOW
-from services.utils import SKRecognition
-from services.utils import YOLO
-from services.utils import get_challenge_ctx
-
-
-def _download_model(onnx_prefix: str = None):
- """下载 YOLOv4 目标检测模型"""
- logger.debug("Downloading YOLOv5(ONNX) object detection model...")
-
- YOLO(dir_model=DIR_MODEL, onnx_prefix=onnx_prefix).download_model()
-
-
-def _download_driver():
- """下载浏览器驱动"""
- logger.debug("Downloading ChromeDriver...")
-
- # 自动下载并授权对应版本的 ChromeDriver
- browser_version = get_browser_version_from_os("google-chrome")
- if browser_version != "UNKNOWN":
- return ChromeDriverManager(version="latest").install()
-
- # 环境变量中缺少 `google-chrome` 提示玩家手动安装
- logger.critical("当前环境变量缺少 `google-chrome`,请为你的设备手动安装 Chrome 浏览器。")
- logger.info(
- "Ubuntu: https://linuxize.com/post/how-to-install-google-chrome-web-browser-on-ubuntu-20-04/"
- )
- logger.info(
- "CentOS 7/8: https://linuxize.com/post/how-to-install-google-chrome-web-browser-on-centos-7/"
- )
- if "linux" not in sys.platform:
- webbrowser.open("https://www.google.com/chrome/")
-
- logger.info("安装完毕后重新执行 `install` 脚手架指令。")
-
-
-def _download_rainbow():
- logger.debug("Downloading Reinforcement of Memory | Rainbow Table...")
-
- SKRecognition.sync_rainbow(path_rainbow=PATH_RAINBOW, convert=True)
-
-
-def run(onnx_prefix: str = None):
- """下载项目运行所需的各项依赖"""
- logger.debug("正在下载系统依赖")
- _download_driver()
- _download_model(onnx_prefix=onnx_prefix)
- _download_rainbow()
- logger.success("系统依赖下载完毕")
-
-
-@logger.catch()
-def test():
- """检查挑战者驱动版本是否适配"""
- ctx = get_challenge_ctx(silence=True)
- try:
- ctx.get("https://www.baidu.com")
- finally:
- ctx.quit()
-
- logger.success("驱动适配成功")
diff --git a/src/claim.py b/src/claim.py
new file mode 100644
index 0000000000..bfbc8f9763
--- /dev/null
+++ b/src/claim.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+# Time : 2023/8/16 5:14
+# Author : QIN2DIM
+# GitHub : https://github.com/QIN2DIM
+# Description:
+from __future__ import annotations
+
+import asyncio
+import os
+import sys
+from dataclasses import dataclass, field
+from typing import List
+
+import importlib_metadata
+from hcaptcha_challenger import install
+from hcaptcha_challenger.agents import Malenia
+from loguru import logger
+from playwright.async_api import BrowserContext, async_playwright
+
+from epic_games import (
+ EpicPlayer,
+ EpicGames,
+ Game,
+ CompletedOrder,
+ get_promotions,
+ get_order_history,
+)
+
+self_supervised = True
+
+
+@dataclass
+class ISurrender:
+ player: EpicPlayer
+
+ promotions: List[Game] = field(default_factory=list)
+ ctx_cookies_is_available: bool = None
+ headless: bool = True
+ locale: str = "en-US"
+
+ _orders = None
+ _namespaces = None
+ _pros = None
+
+ def __post_init__(self):
+ self._orders: List[CompletedOrder] = []
+ self._namespaces: List[str] = []
+ self._pros: List[Game] = []
+
+ @classmethod
+ def from_epic(cls):
+ return cls(player=EpicPlayer.from_account())
+
+ @property
+ def cookies(self):
+ return self.player.cookies
+
+ def create_tasks(self):
+ if not self._orders:
+ self._orders = get_order_history(self.cookies)
+ if not self._namespaces:
+ self._namespaces = [order.namespace for order in self._orders]
+ if not self._pros:
+ self._pros = get_promotions()
+ for pro in self._pros:
+ logger.debug("Put task", title=pro.title, url=pro.url)
+
+ self.promotions = [p for p in self._pros if p.namespace not in self._namespaces]
+
+ async def prelude_with_context(self, context: BrowserContext) -> bool | None:
+ url = "https://www.epicgames.com/account/creator-programs"
+ page = context.pages[0]
+ await page.goto(url, wait_until="networkidle")
+ if not page.url.startswith(url):
+ return
+
+ self.ctx_cookies_is_available = True
+ await context.storage_state(path=self.player.ctx_cookie_path)
+ cookies = self.player.ctx_cookies.reload(self.player.ctx_cookie_path)
+ self.player.cookies = cookies
+
+ self.create_tasks()
+
+ if not self.promotions:
+ logger.success(
+ "Pass claim task",
+ reason="All free games are in my library",
+ stage="context-prelude",
+ )
+ return True
+
+ async def claim_epic_games(self, context: BrowserContext):
+ page = context.pages[0]
+ epic = EpicGames.from_player(self.player, page=page, self_supervised=self_supervised)
+
+ if not self.ctx_cookies_is_available:
+ logger.info("Try to flush cookie", task="claim_epic_games")
+ if await epic.authorize(page):
+ cookies = await epic.flush_token(context)
+ self.player.cookies = cookies
+ else:
+ logger.error("Exit task", reason="Failed to flush token")
+ return
+
+ if not self.promotions:
+ self.create_tasks()
+ if not self.promotions:
+ logger.success(
+ "Pass claim task", reason="All free games are in my library", stage="claim-games"
+ )
+ return
+
+ single_promotions = []
+ bundle_promotions = []
+ for p in self.promotions:
+ if "bundles" in p.url:
+ bundle_promotions.append(p)
+ else:
+ single_promotions.append(p)
+
+ if single_promotions:
+ await epic.claim_weekly_games(page, single_promotions)
+ if bundle_promotions:
+ await epic.claim_bundle_games(page, bundle_promotions)
+
+ @logger.catch
+ async def stash(self):
+ if "linux" in sys.platform and "DISPLAY" not in os.environ:
+ self.headless = True
+
+ logger.info(
+ "run",
+ image="20231121",
+ version=importlib_metadata.version("hcaptcha-challenger"),
+ role="EpicPlayer",
+ headless=self.headless,
+ )
+
+ async with async_playwright() as p:
+ context = await p.firefox.launch_persistent_context(
+ user_data_dir=self.player.browser_context_dir,
+ record_video_dir=self.player.record_dir,
+ record_har_path=self.player.record_har_path,
+ headless=self.headless,
+ locale=self.locale,
+ args=["--hide-crash-restore-bubble"],
+ )
+ await Malenia.apply_stealth(context)
+ if not await self.prelude_with_context(context):
+ install(upgrade=True, clip=True)
+ await self.claim_epic_games(context)
+ await context.close()
+
+
+async def run():
+ agent = ISurrender.from_epic()
+ agent.headless = False
+ await agent.stash()
+
+
+if __name__ == "__main__":
+ asyncio.run(run())
diff --git a/src/config-sample.yaml b/src/config-sample.yaml
deleted file mode 100644
index 9d5e4859ff..0000000000
--- a/src/config-sample.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-# [√]需要手动修改 [-]不推荐修改 [※]可选项,不影响系统运行
-
-# ===================================================
-# [√] 账号信息
-# ---------------------------------------------------
-# Tips:不建议在公有库上创建工作流运行项目,有仓库禁用风险
-# ===================================================
-EPΙC_EMAΙL: ""
-EPΙC_PASSWΟRD: ""
-# ===================================================
-# [√※] 通信鸽
-# ---------------------------------------------------
-# 一些主流的通信接口,用于反射运行报告
-# Docs https://github.com/caronc/apprise
-# ===================================================
-message_pusher_settings:
- enable: true
- pusher:
- PUSHER_EMAIL: ""
- PUSHER_DINGTALK: ""
- PUSHER_TELEGRAM: ""
- PUSHER_SERVERCHAN: ""
- PUSHER_DISCORD: ""
diff --git a/src/epic_games/__init__.py b/src/epic_games/__init__.py
new file mode 100644
index 0000000000..15cd18d933
--- /dev/null
+++ b/src/epic_games/__init__.py
@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+# Time : 2022/1/16 0:24
+# Author : QIN2DIM
+# Github : https://github.com/QIN2DIM
+# Description:
+from .agent import EpicGames, Game, CompletedOrder, get_promotions, get_order_history
+
+from .player import EpicPlayer
+
+__all__ = [
+ "EpicGames",
+ "Game",
+ "CompletedOrder",
+ "get_order_history",
+ "get_promotions",
+ "EpicPlayer",
+]
diff --git a/src/epic_games/agent.py b/src/epic_games/agent.py
new file mode 100644
index 0000000000..cbc67d926e
--- /dev/null
+++ b/src/epic_games/agent.py
@@ -0,0 +1,469 @@
+# -*- coding: utf-8 -*-
+# Time : 2023/8/14 23:16
+# Author : QIN2DIM
+# Github : https://github.com/QIN2DIM
+# Description:
+from __future__ import annotations
+
+import json
+from contextlib import suppress
+from dataclasses import dataclass, field
+from json import JSONDecodeError
+from pathlib import Path
+from typing import List, Dict, Literal
+
+import httpx
+from loguru import logger
+from playwright.async_api import BrowserContext, expect, TimeoutError, Page, FrameLocator, Locator
+from tenacity import *
+
+from epic_games.player import EpicPlayer
+from utils import from_dict_to_model, AgentG
+
+# fmt:off
+URL_CLAIM = "https://store.epicgames.com/en-US/free-games"
+URL_LOGIN = f"https://www.epicgames.com/id/login?lang=en-US&noHostRedirect=true&redirectUrl={URL_CLAIM}"
+URL_PROMOTIONS = "https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
+URL_PRODUCT_PAGE = "https://store.epicgames.com/en-US/p/"
+URL_PRODUCT_BUNDLES = "https://store.epicgames.com/en-US/bundles/"
+URL_ORDER_HISTORY = "https://www.epicgames.com/account/v2/payment/ajaxGetOrderHistory"
+URL_CART = "https://store.epicgames.com/en-US/cart"
+URL_CART_SUCCESS = "https://store.epicgames.com/en-US/cart/success"
+# -----
+URL_STORE_EXPLORER = "https://store.epicgames.com/en-US/browse?sortBy=releaseDate&sortDir=DESC&priceTier=tierFree&count=40"
+URL_STORE_EXPLORER_GRAPHQL = (
+ "https://store.epicgames.com/graphql?operationName=searchStoreQuery"
+ '&variables={"category":"games/edition/base","comingSoon":false,"count":80,"freeGame":true,"keywords":"","sortBy":"releaseDate","sortDir":"DESC","start":0,"tag":"","withPrice":true}'
+ '&extensions={"persistedQuery":{"version":1,"sha256Hash":"13a2b6787f1a20d05c75c54c78b1b8ac7c8bf4efc394edf7a5998fdf35d1adb0"}}'
+)
+
+# fmt:on
+
+
+@dataclass
+class CompletedOrder:
+ offerId: str
+ namespace: str
+
+
+@dataclass
+class Game:
+ url: str
+ namespace: str
+ title: str
+ thumbnail: str
+ id: str
+ in_library = None
+
+
+class CommonHandler:
+ @staticmethod
+ async def any_license(page: Page):
+ with suppress(TimeoutError):
+ await page.click("//label[@for='agree']", timeout=2000)
+ accept = page.locator("//button//span[text()='Accept']")
+ if await accept.is_enabled():
+ await accept.click()
+
+ @staticmethod
+ async def move_to_purchase_container(page: Page):
+ wpc = page.frame_locator("//iframe[@class='']")
+ payment_btn = wpc.locator("//div[@class='payment-order-confirm']")
+ with suppress(Exception):
+ await expect(payment_btn).to_be_attached()
+ await page.wait_for_timeout(2000)
+ await payment_btn.click(timeout=6000)
+
+ return wpc, payment_btn
+
+ @staticmethod
+ async def uk_confirm_order(wpc: FrameLocator):
+ # <-- Handle UK confirm-order
+ with suppress(TimeoutError):
+ accept = wpc.locator(
+ "//button[contains(@class, 'payment-confirm__btn payment-btn--primary')]"
+ )
+ if await accept.is_enabled(timeout=5000):
+ await accept.click()
+ return True
+
+ @staticmethod
+ @retry(
+ retry=retry_if_exception_type(TimeoutError),
+ wait=wait_fixed(0.5),
+ stop=stop_after_attempt(15),
+ reraise=True,
+ )
+ async def insert_challenge(
+ solver: AgentG,
+ page: Page,
+ wpc: FrameLocator,
+ payment_btn: Locator,
+ recur_url: str,
+ is_uk: bool,
+ ):
+ response = await solver.execute(window="free")
+ logger.debug("task done", sattus=f"{solver.status.CHALLENGE_SUCCESS}")
+
+ match response:
+ case solver.status.CHALLENGE_BACKCALL | solver.status.CHALLENGE_RETRY:
+ await wpc.locator("//a[@class='talon_close_button']").click()
+ await page.wait_for_timeout(1000)
+ if is_uk:
+ await CommonHandler.uk_confirm_order(wpc)
+ await payment_btn.click(delay=200)
+ case solver.status.CHALLENGE_SUCCESS:
+ await page.wait_for_url(recur_url)
+ return
+
+ @staticmethod
+ async def empty_cart(page: Page, wait_rerender: int = 30) -> bool | None:
+ """
+ URL_CART = "https://store.epicgames.com/en-US/cart"
+ URL_WISHLIST = "https://store.epicgames.com/en-US/wishlist"
+ //span[text()='Your Cart is empty.']
+
+ Args:
+ wait_rerender:
+ page:
+
+ Returns:
+
+ """
+ has_paid_free = False
+
+ try:
+ # Check all items in the shopping cart
+ cards = await page.query_selector_all("//div[@data-testid='offer-card-layout-wrapper']")
+
+ # Move paid games to wishlist games
+ for card in cards:
+ is_free = await card.query_selector("//span[text()='Free']")
+ if not is_free:
+ has_paid_free = True
+ wishlist_btn = await card.query_selector(
+ "//button//span[text()='Move to wishlist']"
+ )
+ await wishlist_btn.click()
+
+ # Wait up to 60 seconds for the page to re-render.
+ # Usually it takes 1~3s for the web page to be re-rendered
+ # - Set threshold for overflow in case of poor Epic network
+ # - It can also prevent extreme situations, such as: the user’s shopping cart has nearly a hundred products
+ if has_paid_free and wait_rerender:
+ wait_rerender -= 1
+ await page.wait_for_timeout(2000)
+ return await CommonHandler.empty_cart(page, wait_rerender)
+ return True
+ except TimeoutError as err:
+ logger.warning("Failed to empty shopping cart", err=err)
+ return False
+
+
+@dataclass
+class EpicGames:
+ player: EpicPlayer
+ """
+ Agent control
+ """
+
+ _solver: AgentG = None
+ """
+ Module for anti-captcha
+ """
+
+ _promotions: List[Game] = field(default_factory=list)
+ """
+ Free promotional items for the week,
+ considered metadata for task sequence of the agent
+ """
+
+ @classmethod
+ def from_player(
+ cls, player: EpicPlayer, *, page: Page, tmp_dir: Path | None = None, **solver_opt
+ ):
+ """尽可能早地实例化,用于部署 captcha 事件监听器"""
+ return cls(
+ player=player, _solver=AgentG.from_page(page=page, tmp_dir=tmp_dir, **solver_opt)
+ )
+
+ @property
+ def handle(self):
+ return CommonHandler
+
+ @property
+ def promotions(self) -> List[Game]:
+ self._promotions = self._promotions or get_promotions()
+ return self._promotions
+
+ async def _login(self, page: Page) -> str | None:
+ async def insert_challenge(stage: Literal["email_exists_prod", "login_prod"]):
+ fall_in_challenge = False
+
+ for _ in range(15):
+ if stage == "login_prod":
+ if not fall_in_challenge:
+ with suppress(TimeoutError):
+ await page.wait_for_url(URL_CART_SUCCESS, timeout=3000)
+ break
+ logger.debug("Attack challenge", stage=stage)
+ elif stage == "email_exists_prod":
+ if not fall_in_challenge:
+ with suppress(TimeoutError):
+ await page.type("#password", "", timeout=3000)
+ break
+ logger.debug("Attack challenge", stage=stage)
+ fall_in_challenge = True
+ result = await self._solver.execute(window=stage)
+ logger.debug("Parse result", stage=stage, result=result)
+ match result:
+ case self._solver.status.CHALLENGE_BACKCALL:
+ await page.click("//a[@class='talon_close_button']")
+ await page.wait_for_timeout(1000)
+ await page.click("#sign-in", delay=200)
+ case self._solver.status.CHALLENGE_RETRY:
+ continue
+ case self._solver.status.CHALLENGE_SUCCESS:
+ if stage == "signin" and not self._solver.qr_queue.empty():
+ continue
+ with suppress(TimeoutError):
+ await page.wait_for_url(URL_CLAIM)
+ break
+ return
+
+ await page.goto(URL_CLAIM, wait_until="domcontentloaded")
+ if "false" == await page.locator("//egs-navigation").get_attribute("isloggedin"):
+ await page.goto(URL_LOGIN, wait_until="domcontentloaded")
+ logger.info("login-with-email", url=page.url)
+
+ # {{< SIGN IN PAGE >}}
+ await page.fill("#email", self.player.email)
+ await page.click("//button[@aria-label='Continue']")
+
+ # {{< INSERT CHALLENGE - email_exists_prod >}}
+ await insert_challenge(stage="email_exists_prod")
+
+ # {{< NESTED PAGE >}}
+ await page.type("#password", self.player.password)
+ await page.click("#sign-in")
+
+ # {{< INSERT CHALLENGE - login_prod >}}
+ await insert_challenge(stage="login_prod")
+
+ logger.success("login", result="Successfully refreshed tokens")
+ await page.goto(URL_CLAIM, wait_until="domcontentloaded")
+ return self._solver.status.CHALLENGE_SUCCESS
+
+ async def authorize(self, page: Page):
+ for i in range(3):
+ try:
+ match await self._login(page):
+ case self._solver.status.CHALLENGE_SUCCESS:
+ return True
+ case _:
+ continue
+ except TimeoutError:
+ logger.warning("执行超时", task="authorize", retry=i)
+ continue
+
+ raise RuntimeError(f"Failed to flush token - agent={self.__class__.__name__}")
+
+ async def flush_token(self, context: BrowserContext) -> Dict[str, str] | None:
+ page = context.pages[0]
+ await page.goto("https://www.epicgames.com/account/personal", wait_until="networkidle")
+ await page.goto(
+ "https://store.epicgames.com/zh-CN/p/orwell-keeping-an-eye-on-you",
+ wait_until="networkidle",
+ )
+ await context.storage_state(path=self.player.ctx_cookie_path)
+ cookies = self.player.ctx_cookies.reload(self.player.ctx_cookie_path)
+ logger.success("flush_token", path=self.player.ctx_cookie_path)
+ return cookies
+
+ @retry(
+ retry=retry_if_exception_type(TimeoutError),
+ wait=wait_fixed(0.5),
+ stop=(stop_after_delay(360) | stop_after_attempt(3)),
+ reraise=True,
+ )
+ async def claim_weekly_games(self, page: Page, promotions: List[Game]):
+ in_cart_nums = 0
+
+ # --> Add promotions to Cart
+ for promotion in promotions:
+ logger.info("claim_weekly_games", action="go to store", url=promotion.url)
+ await page.goto(promotion.url, wait_until="load")
+
+ # <-- Handle pre-page
+ with suppress(TimeoutError):
+ await page.click("//button//span[text()='Continue']", timeout=3000)
+
+ # --> Make sure promotion is not in the library before executing
+ cta_btn = page.locator("//aside//button[@data-testid='add-to-cart-cta-button']")
+ with suppress(TimeoutError):
+ text = await cta_btn.text_content(timeout=10000)
+ if text == "View In Cart":
+ in_cart_nums += 1
+ continue
+ if text == "Add To Cart":
+ await cta_btn.click()
+ await expect(cta_btn).to_have_text("View In Cart")
+ in_cart_nums += 1
+
+ if in_cart_nums == 0:
+ logger.success("Pass claim task", reason="Free games not added to shopping cart")
+ return
+
+ # --> Goto cart page
+ await page.goto(URL_CART, wait_until="domcontentloaded")
+ await self.handle.empty_cart(page)
+ await page.click("//button//span[text()='Check Out']")
+
+ # <-- Handle Any LICENSE
+ await self.handle.any_license(page)
+
+ # --> Move to webPurchaseContainer iframe
+ logger.info("claim_weekly_games", action="move to webPurchaseContainer iframe")
+ wpc, payment_btn = await self.handle.move_to_purchase_container(page)
+ logger.info("claim_weekly_games", action="click payment button")
+
+ # <-- Handle UK confirm-order
+ is_uk = await self.handle.uk_confirm_order(wpc)
+
+ # <-- Insert challenge
+ recur_url = URL_CART_SUCCESS
+ await self.handle.insert_challenge(self._solver, page, wpc, payment_btn, recur_url, is_uk)
+
+ # --> Wait for success
+ await page.wait_for_url(recur_url)
+ logger.success("claim_weekly_games", action="success", url=page.url)
+
+ return True
+
+ @retry(
+ retry=retry_if_exception_type(TimeoutError),
+ wait=wait_fixed(0.5),
+ stop=(stop_after_delay(360) | stop_after_attempt(3)),
+ reraise=True,
+ )
+ async def claim_bundle_games(self, page: Page, promotions: List[Game]):
+ for promotion in promotions:
+ logger.info("claim_bundle_games", action="go to store", url=promotion.url)
+ await page.goto(promotion.url, wait_until="load")
+
+ # <-- Handle pre-page
+ with suppress(TimeoutError):
+ await page.click("//button//span[text()='Continue']", timeout=3000)
+
+ # --> Make sure promotion is not in the library before executing
+ purchase_btn = page.locator("//button[@data-testid='purchase-cta-button']").first
+ with suppress(TimeoutError):
+ text = await purchase_btn.text_content(timeout=10000)
+ if text == "Get":
+ await purchase_btn.click()
+ await page.wait_for_timeout(2000)
+ else:
+ continue
+
+ # <-- Handle Any LICENSE
+ await self.handle.any_license(page)
+
+ # --> Move to webPurchaseContainer iframe
+ logger.info("claim_bundle_games", action="move to webPurchaseContainer iframe")
+ wpc, payment_btn = await self.handle.move_to_purchase_container(page)
+ logger.info("claim_bundle_games", action="click payment button")
+
+ # <-- Handle UK confirm-order
+ is_uk = await self.handle.uk_confirm_order(wpc)
+
+ # <-- Insert challenge
+ recur_url = f"https://store.epicgames.com/en-US/download?ns={promotion.namespace}&id={promotion.id}"
+ await self.handle.insert_challenge(
+ self._solver, page, wpc, payment_btn, recur_url, is_uk
+ )
+
+ # --> Wait for success
+ await page.wait_for_url(recur_url)
+ logger.success("claim_bundle_games", action="success", url=page.url)
+
+ return True
+
+
+def get_promotions() -> List[Game]:
+ """
+ 获取周免游戏数据
+
+ <即将推出> promotion["promotions"]["upcomingPromotionalOffers"]
+ <本周免费> promotion["promotions"]["promotionalOffers"]
+ :return: {"pageLink1": "pageTitle1", "pageLink2": "pageTitle2", ...}
+ """
+
+ def _has_discount_target(prot: dict) -> bool | None:
+ with suppress(KeyError, IndexError, TypeError):
+ offers = prot["promotions"]["promotionalOffers"][0]["promotionalOffers"]
+ for i, offer in enumerate(offers):
+ if offer["discountSetting"]["discountPercentage"] == 0:
+ return True
+
+ _promotions: List[Game] = []
+
+ params = {"local": "zh-CN"}
+ resp = httpx.get(URL_PROMOTIONS, params=params)
+ try:
+ data = resp.json()
+ except JSONDecodeError as err:
+ logger.error("Failed to get promotions", err=err)
+ else:
+ elements = data["data"]["Catalog"]["searchStore"]["elements"]
+ promotions = [e for e in elements if e.get("promotions")]
+ # Get store promotion data and games
+ for promotion in promotions:
+ # Remove items that are discounted but not free.
+ if not _has_discount_target(promotion):
+ continue
+ # package free games
+ try:
+ query = promotion["catalogNs"]["mappings"][0]["pageSlug"]
+ promotion["url"] = f"{URL_PRODUCT_PAGE}{query}"
+ except TypeError:
+ promotion["url"] = f"{URL_PRODUCT_BUNDLES}{promotion['productSlug']}"
+ except IndexError:
+ promotion["url"] = f"{URL_PRODUCT_PAGE}{promotion['productSlug']}"
+
+ promotion["thumbnail"] = promotion["keyImages"][-1]["url"]
+ _promotions.append(from_dict_to_model(Game, promotion))
+
+ return _promotions
+
+
+def get_order_history(
+ cookies: Dict[str, str], page: str | None = None, last_create_at: str | None = None
+) -> List[CompletedOrder]:
+ """获取最近的订单纪录"""
+
+ def request_history() -> str | None:
+ headers = {
+ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
+ " Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.203"
+ }
+ params = {"locale": "zh-CN", "page": page or "0", "latCreateAt": last_create_at or ""}
+ resp = httpx.get(URL_ORDER_HISTORY, headers=headers, cookies=cookies, params=params)
+ if not resp.is_success:
+ raise httpx.RequestError("Failed to get order history, cookie may have expired")
+ return resp.text
+
+ completed_orders: List[CompletedOrder] = []
+
+ try:
+ data = json.loads(request_history())
+ for order in data["orders"]:
+ if order["orderType"] != "PURCHASE":
+ continue
+ for item in order["items"]:
+ if len(item["namespace"]) != 32:
+ continue
+ completed_orders.append(from_dict_to_model(CompletedOrder, item))
+ except (httpx.RequestError, JSONDecodeError, KeyError) as err:
+ logger.warning(err)
+
+ return completed_orders
diff --git a/src/epic_games/player.py b/src/epic_games/player.py
new file mode 100644
index 0000000000..2de5580d1b
--- /dev/null
+++ b/src/epic_games/player.py
@@ -0,0 +1,157 @@
+# -*- coding: utf-8 -*-
+# Time : 2023/8/14 17:04
+# Author : QIN2DIM
+# Github : https://github.com/QIN2DIM
+# Description:
+from __future__ import annotations
+
+import abc
+import json
+import time
+from abc import ABC
+from contextlib import suppress
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Dict
+from typing import Literal
+
+import httpx
+
+from settings import config, project
+
+
+@dataclass
+class EpicCookie:
+ cookies: Dict[str, str] = field(default_factory=dict)
+ """
+ cookies in the Request Header
+ """
+
+ URL_VERIFY_COOKIES = "https://www.epicgames.com/account/personal"
+
+ @classmethod
+ def from_state(cls, fp: Path):
+ """Jsonify cookie from Playwright"""
+ cookies = {}
+ try:
+ data = json.loads(fp.read_text())["cookies"]
+ cookies = {ck["name"]: ck["value"] for ck in data}
+ except (FileNotFoundError, KeyError):
+ pass
+ return cls(cookies=cookies)
+
+ def is_available(self) -> bool | None:
+ if not self.cookies:
+ return
+ with suppress(httpx.ConnectError):
+ headers = {
+ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.203",
+ "origin": "https://store.epicgames.com/zh-CN/p/orwell-keeping-an-eye-on-you",
+ }
+ resp = httpx.get(self.URL_VERIFY_COOKIES, headers=headers, cookies=self.cookies)
+ return resp.status_code == 200
+
+ def reload(self, fp: Path) -> Dict[str, str] | None:
+ try:
+ data = json.loads(fp.read_text())["cookies"]
+ self.cookies = {ck["name"]: ck["value"] for ck in data}
+ return self.cookies
+ except (FileNotFoundError, KeyError):
+ pass
+
+
+@dataclass
+class Player(ABC):
+ email: str
+ password: str
+ """
+ Player's account
+ """
+
+ mode: Literal["epic-games", "unreal", "gog", "apg", "xbox"]
+ """
+ Game Platform
+ """
+
+ user_data_dir: Path = project.user_data_dir
+ """
+ Mount user cache
+ - database
+ - user_data_dir
+ - games@email # runtime user_data_dir
+ - context
+ - record
+ - captcha.mp4
+ - eg-record.har
+ - ctx_cookie.json
+ - ctx_store.json
+ - order_history.json
+ - unreal@email
+ - context
+ - record
+ - captcha.mp4
+ - eg-record.har
+ - gog@alice
+ - xbox@alice
+ """
+
+ def __post_init__(self):
+ namespace = f"{self.mode}@{self.email.split('@')[0]}"
+ self.user_data_dir = self.user_data_dir.joinpath(namespace)
+ for ck in ["browser_context", "record"]:
+ ckp = self.user_data_dir.joinpath(ck)
+ ckp.mkdir(parents=True, exist_ok=True)
+
+ @classmethod
+ @abc.abstractmethod
+ def from_account(cls, *args, **kwargs):
+ raise NotImplementedError
+
+ @property
+ def browser_context_dir(self) -> Path:
+ return self.user_data_dir.joinpath("browser_context")
+
+ @property
+ def record_dir(self) -> Path:
+ return self.user_data_dir.joinpath("record")
+
+ @property
+ def record_har_path(self) -> Path:
+ return self.record_dir.joinpath(f"eg-{int(time.time())}.har")
+
+ @property
+ def ctx_cookie_path(self) -> Path:
+ return self.user_data_dir.joinpath("ctx_cookie.json")
+
+
+@dataclass
+class EpicPlayer(Player):
+ _ctx_cookies: EpicCookie = None
+
+ def __post_init__(self):
+ super().__post_init__()
+ self._ctx_cookies = EpicCookie.from_state(fp=self.ctx_cookie_path)
+
+ @classmethod
+ def from_account(cls):
+ return cls(email=config.epic_email, password=config.epic_password, mode="epic-games")
+
+ @property
+ def ctx_store_path(self) -> Path:
+ return self.user_data_dir.joinpath("ctx_store.json")
+
+ @property
+ def order_history_path(self) -> Path:
+ return self.user_data_dir.joinpath("order_history.json")
+
+ @property
+ def ctx_cookies(self) -> EpicCookie:
+ return self._ctx_cookies
+
+ @property
+ def cookies(self) -> Dict[str, str]:
+ return self._ctx_cookies.cookies
+
+ @cookies.setter
+ def cookies(self, cookies: Dict[str, str]):
+ self._ctx_cookies.cookies = cookies
diff --git a/src/main.py b/src/main.py
deleted file mode 100644
index cbc81248dc..0000000000
--- a/src/main.py
+++ /dev/null
@@ -1,11 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:24
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description: 🚀 哟!Epic免费人!
-from fire import Fire
-
-from services.scaffold import Scaffold
-
-if __name__ == "__main__":
- Fire(Scaffold)
diff --git a/src/services/__init__.py b/src/services/__init__.py
deleted file mode 100644
index e8c9c66271..0000000000
--- a/src/services/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:24
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
diff --git a/src/services/bricklayer/__init__.py b/src/services/bricklayer/__init__.py
deleted file mode 100644
index 85aa172b08..0000000000
--- a/src/services/bricklayer/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 15:20
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from .bricklayer import Bricklayer
-
-__all__ = ["Bricklayer"]
diff --git a/src/services/bricklayer/bricklayer.py b/src/services/bricklayer/bricklayer.py
deleted file mode 100644
index 1aab06412f..0000000000
--- a/src/services/bricklayer/bricklayer.py
+++ /dev/null
@@ -1,310 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 13:50
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import os.path
-import time
-from hashlib import sha256
-from typing import List, Optional
-
-import cloudscraper
-import yaml
-
-from services.settings import logger
-from services.utils import (
- ToolBox,
- get_ctx,
- get_challenge_ctx,
- ChallengeReset,
- ChallengeTimeout,
-)
-from .core import AwesomeFreeMan
-from .exceptions import (
- AssertTimeout,
- SwitchContext,
- PaymentException,
- AuthException,
- UnableToGet,
- LoginException,
-)
-
-
-class CookieManager(AwesomeFreeMan):
- """管理上下文身份令牌"""
-
- def __init__(self):
- super().__init__()
-
- self.action_name = "CookieManager"
-
- def _t(self) -> str:
- return (
- sha256(self.email[-3::-1].encode("utf-8")).hexdigest() if self.email else ""
- )
-
- def load_ctx_cookies(self) -> Optional[List[dict]]:
- """
- 载入本地缓存的身份令牌。
-
- :return:
- """
- if not os.path.exists(self.path_ctx_cookies):
- return []
-
- with open(self.path_ctx_cookies, "r", encoding="utf8") as file:
- data: dict = yaml.safe_load(file)
-
- ctx_cookies = data.get(self._t(), []) if isinstance(data, dict) else []
- if not ctx_cookies:
- return []
-
- logger.debug(
- ToolBox.runtime_report(
- motive="LOAD",
- action_name=self.action_name,
- message="Load context cookie.",
- )
- )
-
- return ctx_cookies
-
- def save_ctx_cookies(self, ctx_cookies: List[dict]) -> None:
- """
- 在本地缓存身份令牌。
-
- :param ctx_cookies:
- :return:
- """
- _data = {}
-
- if os.path.exists(self.path_ctx_cookies):
- with open(self.path_ctx_cookies, "r", encoding="utf8") as file:
- stream: dict = yaml.safe_load(file)
- _data = _data if not isinstance(stream, dict) else stream
-
- _data.update({self._t(): ctx_cookies})
-
- with open(self.path_ctx_cookies, "w", encoding="utf8") as file:
- yaml.dump(_data, file)
-
- logger.debug(
- ToolBox.runtime_report(
- motive="SAVE",
- action_name=self.action_name,
- message="Update Context Cookie.",
- )
- )
-
- def is_available_cookie(self, ctx_cookies: Optional[List[dict]] = None) -> bool:
- """
- 检测 COOKIE 是否有效
-
- :param ctx_cookies: 若不指定则将工作目录 cookies 视为 ctx_cookies
- :return:
- """
- ctx_cookies = self.load_ctx_cookies() if ctx_cookies is None else ctx_cookies
- if not ctx_cookies:
- return False
-
- headers = {"cookie": ToolBox.transfer_cookies(ctx_cookies)}
-
- scraper = cloudscraper.create_scraper()
- response = scraper.get(
- self.URL_ACCOUNT_PERSONAL, headers=headers, allow_redirects=False
- )
-
- if response.status_code == 200:
- return True
- return False
-
- def refresh_ctx_cookies(
- self, silence: bool = True, _ctx_session=None
- ) -> Optional[bool]:
- """
- 更新上下文身份信息
-
- :param _ctx_session: 泛型开发者参数
- :param silence:
- :return:
- """
- # {{< Check Context Cookie Validity >}}
- if self.is_available_cookie():
- logger.success(
- ToolBox.runtime_report(
- motive="CHECK",
- action_name=self.action_name,
- message="The identity token is valid.",
- )
- )
- return True
- # {{< Done >}}
-
- # {{< Insert Challenger Context >}}
- ctx = get_challenge_ctx(silence=silence) if _ctx_session is None else _ctx_session
- try:
- for _ in range(8):
- # Enter the account information and jump to the man-machine challenge page.
- self._login(self.email, self.password, ctx=ctx)
-
- # Determine whether the account information is filled in correctly.
- if self.assert_.login_error(ctx):
- raise LoginException(
- f"登录异常 Alert『{self.assert_.get_login_error_msg(ctx)}』"
- )
-
- # Assert if you are caught in a man-machine challenge.
- try:
- fallen = self._armor.fall_in_captcha_login(ctx=ctx)
- except AssertTimeout:
- continue
- else:
- # Approved.
- if not fallen:
- break
-
- # Winter is coming, so hear me roar!
- response = self._armor.anti_hcaptcha(ctx, door="login")
- if response:
- break
- else:
- logger.critical(
- ToolBox.runtime_report(
- motive="MISS",
- action_name=self.action_name,
- message="Identity token update failed.",
- )
- )
- return False
- except ChallengeReset:
- pass
- except (AuthException, ChallengeTimeout) as error:
- logger.critical(
- ToolBox.runtime_report(
- motive="SKIP", action_name=self.action_name, message=error.msg
- )
- )
- return False
- else:
- # Store contextual authentication information.
- self.save_ctx_cookies(ctx_cookies=ctx.get_cookies())
- return self.is_available_cookie(ctx_cookies=ctx.get_cookies())
- finally:
- if _ctx_session is None:
- ctx.quit()
- # {{< Done >}}
-
- return True
-
-
-class Bricklayer(AwesomeFreeMan):
- """常驻免费游戏的认领逻辑"""
-
- def __init__(self, silence: bool = None):
- super().__init__()
- self.silence = True if silence is None else silence
-
- self.action_name = "AwesomeFreeMan"
-
- self.cookie_manager = CookieManager()
-
- # 游戏获取结果的状态
- self.result = ""
-
- def get_free_game(
- self,
- page_link: str,
- ctx_cookies: List[dict] = None,
- refresh: bool = True,
- challenge: Optional[bool] = None,
- _ctx_session=None,
- ) -> Optional[bool]:
- """
- 获取免费游戏
-
- 部署后必须传输有效的 `page_link` 参数。
- :param _ctx_session:
- :param challenge:
- :param page_link: 游戏购买页链接 zh-CN
- :param refresh: 当 COOKIE 失效时主动刷新 COOKIE
- :param ctx_cookies:
- :return:
- """
- ctx_cookies = (
- self.cookie_manager.load_ctx_cookies() if ctx_cookies is None else ctx_cookies
- )
-
- # [🚀] 验证 COOKIE
- # 请勿在并发环境下 让上下文驱动陷入到不得不更新 COOKIE 的陷阱之中。
- if not ctx_cookies or not self.cookie_manager.is_available_cookie(
- ctx_cookies=ctx_cookies
- ):
- if refresh:
- self.cookie_manager.refresh_ctx_cookies()
- ctx_cookies = self.cookie_manager.load_ctx_cookies()
- else:
- logger.error(
- ToolBox.runtime_report(
- motive="QUIT",
- action_name=self.action_name,
- message="Cookie 已过期,任务已退出。",
- )
- )
- return False
-
- # [🚀] 常驻免费(General)周免(Challenge)
- if _ctx_session is None:
- ctx = get_challenge_ctx(self.silence) if challenge else get_ctx(self.silence)
- else:
- ctx = _ctx_session
-
- # [🚀] 认领游戏
- try:
- self.result = self._get_free_game(
- page_link=page_link, api_cookies=ctx_cookies, ctx=ctx
- )
- except AssertTimeout:
- logger.debug(
- ToolBox.runtime_report(
- motive="QUIT", action_name=self.action_name, message="循环断言超时,任务退出。"
- )
- )
- except UnableToGet as error:
- logger.debug(
- ToolBox.runtime_report(
- motive="QUIT",
- action_name=self.action_name,
- message=str(error).strip(),
- url=page_link,
- )
- )
- except SwitchContext as error:
- logger.warning(
- ToolBox.runtime_report(
- motive="SWITCH",
- action_name=self.action_name,
- message="正在退出标准上下文",
- error=str(error).strip(),
- url=page_link,
- )
- )
- except PaymentException as error:
- logger.debug(
- ToolBox.runtime_report(
- motive="QUIT",
- action_name=self.action_name,
- message="🚧 订单异常",
- type=f"PaymentException {error}".strip(),
- url=page_link,
- )
- )
- except AuthException as error:
- logger.critical(
- ToolBox.runtime_report(
- motive="SKIP", action_name=self.action_name, message=error.msg
- )
- )
- return False
- finally:
- if _ctx_session is None:
- ctx.quit()
diff --git a/src/services/bricklayer/core.py b/src/services/bricklayer/core.py
deleted file mode 100644
index f9df4060cc..0000000000
--- a/src/services/bricklayer/core.py
+++ /dev/null
@@ -1,869 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 15:20
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import asyncio
-import os
-import sys
-import time
-import urllib.request
-from typing import List, Optional, NoReturn
-
-from selenium.common.exceptions import (
- TimeoutException,
- ElementNotVisibleException,
- WebDriverException,
- ElementClickInterceptedException,
- NoSuchElementException,
- StaleElementReferenceException,
- InvalidCookieDomainException,
-)
-from selenium.webdriver.common.by import By
-from selenium.webdriver.support import expected_conditions as EC
-from selenium.webdriver.support.wait import WebDriverWait
-from undetected_chromedriver import Chrome
-
-from services.settings import (
- logger,
- DIR_COOKIES,
- DIR_CHALLENGE,
- DIR_MODEL,
- EPIC_EMAIL,
- EPIC_PASSWORD,
- PATH_RAINBOW,
-)
-from services.utils import (
- YOLO,
- RiverChallenger,
- DetectionChallenger,
- ToolBox,
- ArmorCaptcha,
- AshFramework,
- ChallengeReset,
-)
-from .exceptions import (
- AssertTimeout,
- UnableToGet,
- CookieExpired,
- SwitchContext,
- PaymentException,
- AuthException,
- PaymentAutoSubmit,
-)
-
-# 显示人机挑战的DEBUG日志
-ARMOR_DEBUG = True
-
-
-class ArmorUtils(ArmorCaptcha):
- """人机对抗模组"""
-
- def __init__(self, debug: bool = ARMOR_DEBUG):
- super().__init__(dir_workspace=DIR_CHALLENGE, debug=debug)
-
- # 重定向工作空间
- self.model = YOLO(DIR_MODEL)
-
- @staticmethod
- def fall_in_captcha_login(ctx: Chrome) -> Optional[bool]:
- """
- 判断在登录时是否遇到人机挑战
-
- :param ctx:
- :return: True:已进入人机验证页面,False:跳转到个人主页
- """
- threshold_timeout = 32
- start = time.time()
- flag_ = ctx.current_url
- while True:
- if ctx.current_url != flag_:
- return False
-
- if time.time() - start > threshold_timeout:
- raise AssertTimeout("任务超时:判断是否陷入人机验证")
-
- try:
- ctx.switch_to.frame(
- ctx.find_element(By.XPATH, "//iframe[contains(@title,'content')]")
- )
- ctx.find_element(By.XPATH, "//div[@class='prompt-text']")
- return True
- except WebDriverException:
- pass
- finally:
- ctx.switch_to.default_content()
-
- @staticmethod
- def fall_in_captcha_runtime(ctx: Chrome) -> Optional[bool]:
- """
- 判断在下单时是否遇到人机挑战
-
- :param ctx:
- :return:
- """
- try:
- # "//div[@id='talon_frame_checkout_free_prod']"
- WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException).until(
- EC.presence_of_element_located(
- (By.XPATH, "//iframe[contains(@title,'content')]")
- )
- )
- return True
- except TimeoutException:
- return False
-
- def switch_solution(self, mirror, label: Optional[str] = None):
- """模型卸载"""
- label = self.label if label is None else label
-
- if label in ["垂直河流"]:
- return RiverChallenger(path_rainbow=PATH_RAINBOW)
- if label in ["天空中向左飞行的飞机"]:
- return DetectionChallenger(path_rainbow=PATH_RAINBOW)
- return mirror
-
- def download_images(self) -> None:
- """
- 植入协程框架加速下载。
-
- :return:
- """
-
- class ImageDownloader(AshFramework):
- """协程助推器 提高挑战图片的下载效率"""
-
- def __init__(self, docker=None):
- super().__init__(docker=docker)
-
- async def control_driver(self, context, session=None):
- path_challenge_img, url = context
-
- # 下载挑战图片
- async with session.get(url) as response:
- with open(path_challenge_img, "wb") as file:
- file.write(await response.read())
-
- self.log(message="下载挑战图片")
-
- # 初始化挑战图片下载目录
- workspace_ = self._init_workspace()
-
- # 初始化数据容器
- docker_ = []
- for alias_, url_ in self.alias2url.items():
- path_challenge_img_ = os.path.join(workspace_, f"{alias_}.png")
- self.alias2path.update({alias_: path_challenge_img_})
- docker_.append((path_challenge_img_, url_))
-
- # 启动最高功率的协程任务
- if "win" in sys.platform:
- asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
- asyncio.run(ImageDownloader(docker=docker_).subvert(workers="fast"))
- else:
- loop = asyncio.get_event_loop()
- loop.run_until_complete(
- ImageDownloader(docker=docker_).subvert(workers="fast")
- )
-
- self.runtime_workspace = workspace_
-
- def challenge_success(
- self, ctx: Chrome, init: bool = True, **kwargs
- ) -> Optional[bool]:
- """
- 判断挑战是否成功的复杂逻辑
-
- IF index is True:
- 经过首轮识别点击后,出现四种结果:
- - 直接通过验证(小概率)
- - 进入第二轮(正常情况)
- 通过短时间内可否继续点击拼图来断言是否陷入第二轮测试
- - 要求重试(小概率)
- 特征被识别或网络波动,需要重试
- - 通过验证,弹出 2FA 双重认证
- 无法处理,任务结束
-
- :param ctx: 挑战者驱动上下文
- :param init: 是否为初次挑战
- :return:
- """
-
- def _continue_action():
- try:
- time.sleep(3)
- ctx.find_element(By.XPATH, "//div[@class='task-image']")
- except NoSuchElementException:
- return True
- else:
- return False
-
- def _high_threat_proxy_access():
- """error-text:: 请再试一次"""
- # 未设置子网桥系统代理
- if not urllib.request.getproxies():
- return False
-
- try:
- WebDriverWait(ctx, 2, ignored_exceptions=WebDriverException).until(
- EC.visibility_of_element_located(
- (By.XPATH, "//div[@class='error-text']")
- )
- )
- return True
- except TimeoutException:
- return False
-
- door: str = kwargs.get("door", "login")
-
- flag = ctx.current_url
-
- # 首轮测试后判断短时间内页内是否存在可点击的拼图元素
- # hcaptcha 最多两轮验证,一般情况下,账号信息有误仅会执行一轮,然后返回登录窗格提示密码错误
- # 其次是被识别为自动化控制,这种情况也是仅执行一轮,回到登录窗格提示“返回数据错误”
- if init and not _continue_action():
- self.log("挑战继续")
- return False
-
- if not init and _high_threat_proxy_access():
- self.log("挑战被迫重置 可能使用了高威胁的代理IP")
-
- try:
- challenge_reset = WebDriverWait(
- ctx, 5, ignored_exceptions=WebDriverException
- ).until(
- EC.presence_of_element_located(
- (By.XPATH, "//div[@class='MuiAlert-message']")
- )
- )
- except TimeoutException:
- # 如果挑战通过,自动跳转至其他页面(也即离开当前网址)
- try:
- WebDriverWait(ctx, 10).until(EC.url_changes(flag))
- # 如果挑战未通过,可能为“账号信息错误”“分数太低”“自动化特征被识别”
- except TimeoutException:
- if door == "login":
- self.log("断言超时,挑战继续")
- return False
- # 人机挑战通过,但可能还需处理 `2FA` 问题(超纲了)
- else:
- # 如果没有遇到双重认证,人机挑战成功
- if "id/login/mfa" not in ctx.current_url:
- self.log("挑战成功")
- return True
- raise AuthException("人机挑战已退出 error=遭遇意外的 2FA 双重认证")
- else:
- self.log("挑战失败,需要重置挑战")
- challenge_reset.click()
- raise ChallengeReset
-
- def anti_hcaptcha(self, ctx: Chrome, door: str = "login") -> Optional[bool]: # noqa
- """
- Handle hcaptcha challenge
-
- ## Reference
-
- M. I. Hossen and X. Hei, "A Low-Cost Attack against the hCaptcha System," 2021 IEEE Security
- and Privacy Workshops (SPW), 2021, pp. 422-431, doi: 10.1109/SPW53761.2021.00061.
-
- > ps:该篇文章中的部分内容已过时,现在 hcaptcha challenge 远没有作者说的那么容易应付。
-
- :param door: [login free]
- :param ctx:
- :return:
- """
- # [👻] 进入人机挑战关卡
- ctx.switch_to.frame(
- WebDriverWait(ctx, 5, ignored_exceptions=ElementNotVisibleException).until(
- EC.presence_of_element_located(
- (By.XPATH, "//iframe[contains(@title,'content')]")
- )
- )
- )
-
- # [👻] 获取挑战图片
- # 多轮验证标签不会改变
- self.get_label(ctx)
- if self.tactical_retreat():
- ctx.switch_to.default_content()
- return False
-
- # [👻] 注册解决方案
- # 根据挑战类型自动匹配不同的模型
- model = self.switch_solution(mirror=self.model)
-
- # [👻] 人机挑战!
- try:
- for index in range(2):
- self.mark_samples(ctx)
-
- self.download_images()
-
- self.challenge(ctx, model=model)
-
- result = self.challenge_success(ctx, init=not bool(index), door=door)
-
- # 仅一轮测试就通过
- if index == 0 and result:
- break
- # 断言超时
- if index == 1 and result is False:
- raise TimeoutException
- # 提交结果断言超时或 mark_samples() 等待超时
- except TimeoutException:
- ctx.switch_to.default_content()
- return False
- # 捕获重置挑战的请求信号
- except ChallengeReset:
- ctx.switch_to.default_content()
- return self.anti_hcaptcha(ctx, door=door)
- # 回到主线剧情
- else:
- ctx.switch_to.default_content()
- return True
-
-
-class AssertUtils:
- """处理穿插在认领过程中意外出现的遮挡信息"""
-
- # 特征指令/简易错误
- # 此部分状态作为消息模板的一部分,尽量简短易理解
- COOKIE_EXPIRED = "💥 饼干过期了"
- ASSERT_OBJECT_EXCEPTION = "🚫 无效的断言对象"
- GAME_OK = "🛴 已在库"
- GAME_PENDING = "👀 待认领"
- GAME_CLAIM = "💰 领取成功"
-
- @staticmethod
- def login_error(ctx: Chrome) -> bool:
- """登录失败 可能原因为账号或密码错误"""
-
- threshold_timeout = 3
- start = time.time()
-
- while True:
- # "任务超时:网络响应过慢"
- if time.time() - start > threshold_timeout:
- return False
-
- # 提交按钮正在响应或界面弹出人机挑战
- try:
- submit_button = ctx.find_element(By.ID, "sign-in")
- status_obj = submit_button.get_attribute("tabindex")
- if status_obj == "-1":
- continue
- except (AttributeError, WebDriverException):
- pass
-
- # 登录页面遭遇 Alert,可能原因为:
- # - 账号或密码无效;
- # - Auth Response 异常;
- # - 账号被锁定;
- try:
- h6_tags = ctx.find_elements(By.TAG_NAME, "h6")
- if len(h6_tags) > 1:
- return True
- return False
- except NoSuchElementException:
- pass
-
- @staticmethod
- def get_login_error_msg(ctx) -> Optional[str]:
- """获取登录页面的错误信息"""
- try:
- return ctx.find_element(By.XPATH, "//form//h6").text.strip()
- except (WebDriverException, AttributeError):
- return "null"
-
- @staticmethod
- def wrong_driver(ctx, msg: str):
- """判断当前上下文任务是否使用了错误的浏览器驱动"""
- if "chrome.webdriver" in str(ctx.__class__):
- raise SwitchContext(msg)
-
- @staticmethod
- def surprise_license(ctx: Chrome) -> Optional[bool]:
- """
- 新用户首次购买游戏需要处理许可协议书
-
- :param ctx:
- :return:
- """
- try:
- surprise_obj = WebDriverWait(
- ctx, 3, ignored_exceptions=ElementNotVisibleException
- ).until(
- EC.presence_of_element_located(
- (By.XPATH, "//label//span[@data-component='Message']")
- )
- )
- except TimeoutException:
- return
- else:
- try:
- if surprise_obj.text == "我已阅读并同意最终用户许可协议书":
- # 勾选协议
- tos_agree = WebDriverWait(
- ctx, 3, ignored_exceptions=ElementClickInterceptedException
- ).until(EC.element_to_be_clickable((By.ID, "agree")))
-
- # 点击接受
- tos_submit = WebDriverWait(
- ctx, 3, ignored_exceptions=ElementClickInterceptedException
- ).until(
- EC.element_to_be_clickable(
- (By.XPATH, "//span[text()='接受']/parent::button")
- )
- )
- time.sleep(1)
- tos_agree.click()
- tos_submit.click()
- return True
- # 窗口渲染出来后因不可抗力因素自然消解
- except (TimeoutException, StaleElementReferenceException):
- return
-
- @staticmethod
- def fall_in_captcha_runtime(ctx: Chrome) -> Optional[bool]:
- """捕获隐藏在周免游戏订单中的人机挑战"""
- try:
- # //iframe[@id='talon_frame_checkout_free_prod']
- WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException).until(
- EC.presence_of_element_located(
- (By.XPATH, "//iframe[contains(@title,'content')]")
- )
- )
- return True
- except TimeoutException:
- return False
-
- @staticmethod
- def surprise_warning_purchase(ctx: Chrome) -> Optional[bool]:
- """
- 处理弹窗遮挡消息。
-
- 这是一个没有意义的操作,但无可奈何,需要更多的测试。
- :param ctx:
- :return:
- """
-
- try:
- surprise_obj = WebDriverWait(ctx, 2).until(
- EC.visibility_of_element_located((By.TAG_NAME, "h1"))
- )
- surprise_warning = surprise_obj.text
- except TimeoutException:
- return True
-
- if "成人内容" in surprise_warning:
- WebDriverWait(
- ctx, 2, ignored_exceptions=ElementClickInterceptedException
- ).until(
- EC.element_to_be_clickable(
- (By.XPATH, "//span[text()='继续']/parent::button")
- )
- ).click()
- return True
- if "内容品当前在您所在平台或地区不可用。" in surprise_warning:
- raise UnableToGet(surprise_warning)
- return False
-
- @staticmethod
- def payment_auto_submit(ctx: Chrome) -> NoReturn:
- """认领游戏后订单自动提交 仅在常驻游戏中出现"""
- try:
- warning_text = (
- WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException)
- .until(
- EC.presence_of_element_located(
- (By.XPATH, "//div[@data-component='DownloadMessage']//span")
- )
- )
- .text
- )
- if warning_text == "感谢您的购买":
- raise PaymentAutoSubmit
- except TimeoutException:
- pass
-
- @staticmethod
- def payment_blocked(ctx: Chrome) -> NoReturn:
- """判断游戏锁区"""
- # 需要在 webPurchaseContainer 里执行
- try:
- warning_text = (
- WebDriverWait(ctx, 3, ignored_exceptions=WebDriverException)
- .until(
- EC.presence_of_element_located(
- (By.XPATH, "//h2[@class='payment-blocked__msg']")
- )
- )
- .text
- )
- if warning_text:
- raise PaymentException(warning_text)
- except TimeoutException:
- pass
-
- @staticmethod
- def timeout(loop_start: float, loop_timeout: float = 300) -> NoReturn:
- """任务超时锁"""
- if time.time() - loop_start > loop_timeout:
- raise AssertTimeout
-
- @staticmethod
- def purchase_status(
- ctx: Chrome,
- page_link: str,
- action_name: Optional[str] = "AssertUtils",
- init: Optional[bool] = True,
- ) -> Optional[str]:
- """
- 断言当前上下文页面的游戏的在库状态。
-
- :param init:
- :param action_name:
- :param page_link:
- :param ctx:
- :return:
- """
- time.sleep(2)
-
- # 捕获按钮对象,根据按钮上浮动的提示信息断言游戏在库状态 超时的空对象主动抛出异常
- try:
- assert_obj = WebDriverWait(ctx, 30, WebDriverException).until(
- EC.element_to_be_clickable(
- (
- By.XPATH,
- "//span[@data-component='PurchaseCTA']//span[@data-component='Message']",
- )
- )
- )
- except TimeoutException:
- return AssertUtils.ASSERT_OBJECT_EXCEPTION
-
- assert_info = assert_obj.text
-
- # 游戏名 超时的空对象主动抛出异常
- game_name = (
- WebDriverWait(ctx, 30, ignored_exceptions=ElementNotVisibleException)
- .until(EC.visibility_of_element_located((By.XPATH, "//h1")))
- .text
- )
-
- if game_name[-1] == "。":
- logger.warning(
- ToolBox.runtime_report(
- motive="SKIP",
- action_name=action_name,
- message=f"🚫 {game_name}",
- url=page_link,
- )
- )
- return AssertUtils.ASSERT_OBJECT_EXCEPTION
-
- if "已在" in assert_info:
- _message = "🛴 游戏已在库" if init else "🥂 领取成功"
- logger.info(
- ToolBox.runtime_report(
- motive="GET",
- action_name=action_name,
- message=_message,
- game=f"『{game_name}』",
- )
- )
- return AssertUtils.GAME_OK if init else AssertUtils.GAME_CLAIM
-
- if "获取" in assert_info:
- deadline: Optional[str] = None
- try:
- deadline = ctx.find_element(
- By.XPATH,
- "//div[@data-component='PDPSidebarLayout']"
- "//span[contains(text(),'优惠截止')][@data-component='Message']",
- ).text
- except (NoSuchElementException, AttributeError):
- pass
-
- # 必须使用挑战者驱动领取周免游戏,处理潜在的人机验证
- if deadline:
- AssertUtils.wrong_driver(ctx, "♻ 使用挑战者上下文领取周免游戏。")
-
- message = "🚀 发现免费游戏" if not deadline else f"💰 发现周免游戏 {deadline}"
- logger.success(
- ToolBox.runtime_report(
- motive="GET",
- action_name=action_name,
- message=message,
- game=f"『{game_name}』",
- )
- )
-
- return AssertUtils.GAME_PENDING
-
- if "购买" in assert_info:
- logger.warning(
- ToolBox.runtime_report(
- motive="SKIP",
- action_name=action_name,
- message="🚧 这不是免费游戏",
- game=f"『{game_name}』",
- )
- )
- return AssertUtils.ASSERT_OBJECT_EXCEPTION
-
- return AssertUtils.ASSERT_OBJECT_EXCEPTION
-
- @staticmethod
- def refund_info(ctx: Chrome):
- """
- 处理订单中的 退款及撤销权信息
-
- :param ctx:
- :return:
- """
- try:
- WebDriverWait(
- ctx, 2, ignored_exceptions=StaleElementReferenceException
- ).until(
- EC.element_to_be_clickable(
- (By.XPATH, "//span[text()='我同意']/ancestor::button")
- )
- ).click()
- except TimeoutException:
- pass
-
-
-class AwesomeFreeMan:
- """白嫖人的基础设施"""
-
- # 操作对象参数
- URL_LOGIN = "https://www.epicgames.com/id/login/epic?lang=zh-CN"
- URL_ACCOUNT_PERSONAL = "https://www.epicgames.com/account/personal"
-
- def __init__(self):
- """定义了一系列领取免费游戏所涉及到的浏览器操作。"""
-
- # 实体对象参数
- self.action_name = "BaseAction"
- self.email, self.password = EPIC_EMAIL, EPIC_PASSWORD
-
- # 驱动参数
- self.path_ctx_cookies = os.path.join(DIR_COOKIES, "ctx_cookies.yaml")
- self.loop_timeout = 300
-
- # 注册拦截机
- self._armor = ArmorUtils()
- self.assert_ = AssertUtils()
-
- def _reset_page(self, ctx: Chrome, page_link: str, api_cookies):
- ctx.get(self.URL_ACCOUNT_PERSONAL)
- for cookie_dict in api_cookies:
- try:
- ctx.add_cookie(cookie_dict)
- except InvalidCookieDomainException as err:
- logger.error(
- ToolBox.runtime_report(
- motive="SKIP",
- action_name=self.action_name,
- error=err.msg,
- domain=cookie_dict.get("domain", "null"),
- name=cookie_dict.get("name", "null"),
- )
- )
- ctx.get(page_link)
-
- def _login(self, email: str, password: str, ctx: Chrome) -> None:
- """
- 作为被动方式,登陆账号,刷新 identity token。
-
- 此函数不应被主动调用,应当作为 refresh identity token / Challenge 的辅助函数。
- :param ctx:
- :param email:
- :param password:
- :return:
- """
- ctx.get(self.URL_LOGIN)
-
- WebDriverWait(ctx, 10, ignored_exceptions=ElementNotVisibleException).until(
- EC.presence_of_element_located((By.ID, "email"))
- ).send_keys(email)
-
- WebDriverWait(ctx, 10, ignored_exceptions=ElementNotVisibleException).until(
- EC.presence_of_element_located((By.ID, "password"))
- ).send_keys(password)
-
- WebDriverWait(ctx, 60, ignored_exceptions=ElementClickInterceptedException).until(
- EC.element_to_be_clickable((By.ID, "sign-in"))
- ).click()
-
- def _activate_payment(self, api: Chrome) -> Optional[bool]:
- """
- 激活游戏订单
-
- :param api:
- :return:
- """
- for _ in range(5):
- try:
- WebDriverWait(
- api, 5, ignored_exceptions=ElementClickInterceptedException
- ).until(
- EC.element_to_be_clickable(
- (By.XPATH, "//button[@data-testid='purchase-cta-button']")
- )
- ).click()
- return True
- # 加载超时,继续测试
- except TimeoutException:
- continue
- # 出现弹窗遮挡
- except ElementClickInterceptedException:
- try:
- if self.assert_.surprise_warning_purchase(api) is True:
- continue
- except UnableToGet:
- return False
-
- def _handle_payment(self, ctx: Chrome) -> None:
- """
- 处理游戏订单
-
- 逻辑过于复杂,需要重构。此处为了一套代码涵盖各种情况,做了很多妥协。
- 需要针对 周免游戏的订单处理 设计一套执行效率更高的业务模型。
- :param ctx:
- :return:
- """
-
- # [🍜] Switch to the [Purchase Container] iframe.
- try:
- payment_frame = WebDriverWait(
- ctx, 5, ignored_exceptions=ElementNotVisibleException
- ).until(
- EC.presence_of_element_located(
- (By.XPATH, "//div[@id='webPurchaseContainer']//iframe")
- )
- )
- ctx.switch_to.frame(payment_frame)
- except TimeoutException:
- try:
- warning_layout = ctx.find_element(
- By.XPATH, "//div[@data-component='WarningLayout']"
- )
- warning_text = warning_layout.text
- # Handle delayed loading of cookies.
- if "依旧要购买吗" in warning_text:
- return
- # Handle Linux User-Agent Heterogeneous Services.
- if "设备不受支持" in warning_text:
- ctx.find_element(
- By.XPATH, "//span[text()='继续']/parent::button"
- ).click()
- return self._handle_payment(ctx)
- except NoSuchElementException:
- pass
-
- # [🍜] 判断游戏锁区
- self.assert_.payment_blocked(ctx)
-
- # [🍜] Ignore: Click the [Accept Agreement] confirmation box.
- try:
- WebDriverWait(
- ctx, 2, ignored_exceptions=ElementClickInterceptedException
- ).until(
- EC.presence_of_element_located(
- (By.XPATH, "//div[contains(@class,'payment-check-box')]")
- )
- ).click()
- except TimeoutException:
- pass
-
- # [🍜] Click the [order] button.
- try:
- time.sleep(0.5)
- WebDriverWait(
- ctx, 20, ignored_exceptions=ElementClickInterceptedException
- ).until(
- EC.element_to_be_clickable(
- (By.XPATH, "//button[contains(@class,'payment-btn')]")
- )
- ).click()
- # 订单界面未能按照预期效果出现,在超时范围内重试若干次。
- except TimeoutException:
- ctx.switch_to.default_content()
- return
-
- # [🍜] 处理 UK 地区账号的「退款及撤销权信息」。
- self.assert_.refund_info(ctx)
-
- # [🍜] 捕获隐藏在订单中的人机挑战,仅在周免游戏中出现。
- if self._armor.fall_in_captcha_runtime(ctx):
- self.assert_.wrong_driver(ctx, "任务中断,请使用挑战者上下文处理意外弹出的人机验证。")
- try:
- self._armor.anti_hcaptcha(ctx, door="free")
- except (ChallengeReset, TimeoutException):
- pass
-
- # [🍜] Switch to default iframe.
- ctx.switch_to.default_content()
- ctx.refresh()
-
- def _get_free_game(
- self, page_link: str, api_cookies: List[dict], ctx: Chrome
- ) -> Optional[str]:
- """
- 获取免费游戏
-
- 需要加载cookie后使用,避免不必要的麻烦。
- :param page_link:
- :param api_cookies:
- :param ctx:
- :return:
- """
- if not api_cookies:
- raise CookieExpired(self.assert_.COOKIE_EXPIRED)
-
- _loop_start = time.time()
- init = True
- while True:
- # [🚀] 重载身份令牌
- # InvalidCookieDomainException:需要 2 次 GET 重载 cookie relative domain
- # InvalidCookieDomainException:跨域认证,访问主域名或过滤异站域名信息
- self._reset_page(ctx=ctx, page_link=page_link, api_cookies=api_cookies)
-
- # [🚀] 断言游戏的在库状态
- self.assert_.surprise_warning_purchase(ctx)
- self.result = self.assert_.purchase_status(
- ctx, page_link, self.action_name, init
- )
- # 当游戏不处于<待认领>状态时跳过后续业务
- if self.result != self.assert_.GAME_PENDING:
- # <游戏状态断言超时>或<检测到异常的实体对象>
- # 在超时阈值内尝试重新拉起服务
- if self.result == self.assert_.ASSERT_OBJECT_EXCEPTION:
- continue
- # 否则游戏状态处于<领取成功>或<已在库>
- break
-
- # [🚀] 激活游戏订单
- # Maximum sleep time -> 12s
- self._activate_payment(ctx)
-
- # [🚀] 新用户首次购买游戏需要处理许可协议书
- # Maximum sleep time -> 3s
- if self.assert_.surprise_license(ctx):
- ctx.refresh()
- continue
-
- # [🚀] 订单消失
- # Maximum sleep time -> 5s
- self.assert_.payment_auto_submit(ctx)
-
- # [🚀] 处理游戏订单
- self._handle_payment(ctx)
-
- # [🚀] 更新上下文状态
- init = False
- self.assert_.timeout(_loop_start, self.loop_timeout)
-
- return self.result
diff --git a/src/services/bricklayer/exceptions.py b/src/services/bricklayer/exceptions.py
deleted file mode 100644
index 66078b400f..0000000000
--- a/src/services/bricklayer/exceptions.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 15:20
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from typing import Optional, Sequence
-
-
-class AwesomeException(Exception):
- def __init__(
- self, msg: Optional[str] = None, stacktrace: Optional[Sequence[str]] = None
- ):
- self.msg = msg
- self.stacktrace = stacktrace
- super().__init__()
-
- def __str__(self) -> str:
- exception_msg = "Message: {}\n".format(self.msg)
- if self.stacktrace:
- stacktrace = "\n".join(self.stacktrace)
- exception_msg += "Stacktrace:\n{}".format(stacktrace)
- return exception_msg
-
-
-class ContextException(AwesomeException):
- """上下文使用错误"""
-
-
-class SwitchContext(ContextException):
- """当使用普通驱动上下文处理人机验证时抛出"""
-
-
-class AuthException(AwesomeException):
- """身份认证出现问题时抛出,例如遭遇插入到 hcaptcha 之后的 2FA 身份验证"""
-
-
-class LoginException(AuthException):
- """登录异常 可能原因:账号或密码无效"""
-
-
-class PaymentException(AwesomeException):
- """订单操作异常"""
-
-
-class PaymentAutoSubmit(PaymentException):
- """点击获取游戏后,订单窗格没有弹出,直接感谢我们购买游戏"""
-
-
-class CookieExpired(AwesomeException):
- """身份令牌或饼干过期时抛出"""
-
-
-class AssertTimeout(AwesomeException):
- """断言超时"""
-
-
-class UnableToGet(AwesomeException):
- """不可抗力因素,游戏无法获取"""
-
-
-class SurpriseExit(KeyboardInterrupt):
- """脑洞大开的作者想挑战一下 Python 自带的垃圾回收机制,决定以一种极其垂直的方式结束系统任务。"""
diff --git a/src/services/deploy.py b/src/services/deploy.py
deleted file mode 100644
index a27cefe674..0000000000
--- a/src/services/deploy.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import random
-from datetime import datetime, timedelta
-from typing import Optional
-
-import apprise
-import pytz
-from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.triggers.cron import CronTrigger
-
-from services.bricklayer import Bricklayer
-from services.explorer import Explorer
-from services.settings import logger, MESSAGE_PUSHER_SETTINGS
-from services.utils import ToolBox, get_challenge_ctx
-
-
-class ClaimerScheduler:
- """系统任务调度器"""
-
- SPAWN_TIME = "spawn_time"
-
- def __init__(self, silence: Optional[bool] = None):
- self.action_name = "AwesomeScheduler"
- self.end_date = datetime.now(pytz.timezone("Asia/Shanghai")) + timedelta(days=180)
- self.silence = silence
- # 服务注册
- self.scheduler = BlockingScheduler()
- self.bricklayer = Bricklayer(silence=silence)
- self.explorer = Explorer(silence=silence)
- self.logger = logger
-
- def deploy_on_vps(self):
- """部署最佳实践的 VPS 定时任务"""
-
- # [⏰] 北京时间每周五凌晨 4 点的 两个任意时刻 执行任务
- jitter_minute = [random.randint(10, 20), random.randint(35, 57)]
-
- # [⚔] 首发任务用于主动认领,备用方案用于非轮询审核
- self.scheduler.add_job(
- func=self.job_loop_claim,
- trigger=CronTrigger(
- day_of_week="fri",
- hour="4",
- minute=f"{jitter_minute[0]},{jitter_minute[-1]}",
- second="30",
- timezone="Asia/Shanghai",
- # 必须使用 `end_date` 续订生产环境 定时重启
- end_date=self.end_date,
- # 必须使用 `jitter` 弥散任务发起时间
- jitter=15,
- ),
- name="loop_claim",
- )
-
- self.logger.debug(
- ToolBox.runtime_report(
- motive="JOB",
- action_name=self.action_name,
- message=f"任务将在北京时间每周五 04:{jitter_minute[0]} "
- f"以及 04:{jitter_minute[-1]} 执行。",
- end_date=str(self.end_date),
- )
- )
-
- # [⚔] Gracefully run scheduler.`
- try:
- self.scheduler.start()
- except KeyboardInterrupt:
- self.scheduler.shutdown(wait=False)
- self.logger.debug(
- ToolBox.runtime_report(
- motive="EXITS",
- action_name=self.action_name,
- message="Received keyboard interrupt signal.",
- )
- )
-
- def _push(self, inline_docker: list, pusher_settings: Optional[dict] = None):
- """
- 推送追踪日志
-
- :param inline_docker:
- :param pusher_settings:
- :return:
- """
-
- # -------------------------
- # [♻]参数过滤
- # -------------------------
- if pusher_settings is None:
- pusher_settings = MESSAGE_PUSHER_SETTINGS
- if not pusher_settings["enable"]:
- return
- # -------------------------
- # [📧]消息推送
- # -------------------------
- _inline_textbox = [f"当前玩家:{ToolBox.secret_email(self.bricklayer.email)}"]
- _inline_textbox += ["运行日志".center(20, "-")]
- if not inline_docker:
- _inline_textbox += [f"[{ToolBox.date_format_now()}] 🛴 暂无待认领的周免游戏"]
- else:
- _inline_textbox += [
- f"[{game_obj[self.SPAWN_TIME]}] {game_obj['name']} {game_obj['status']}"
- for game_obj in inline_docker
- ]
- _inline_textbox += ["生命周期统计".center(20, "-"), f"total:{inline_docker.__len__()}"]
-
- # 注册 Apprise 消息推送框架
- active_pusher = pusher_settings["pusher"]
- surprise = apprise.Apprise()
- for server in active_pusher.values():
- surprise.add(server)
-
- # 发送模版消息
- surprise.notify(body="\n".join(_inline_textbox), title="EpicAwesomeGamer 运行报告")
-
- self.logger.success(
- ToolBox.runtime_report(
- motive="Notify",
- action_name=self.action_name,
- message="消息推送完毕",
- active_pusher=[i[0] for i in active_pusher.items() if i[-1]],
- )
- )
-
- def deploy_jobs(self, platform: Optional[str] = None):
- """
- 部署系统任务
-
- :param platform: within [vps serverless qing-long]
- :return:
- """
- platform = "vps" if platform is None else platform
- if platform not in ["vps", "serverless", "qing-long"]:
- raise NotImplementedError
-
- self.logger.debug(
- ToolBox.runtime_report(
- motive="JOB",
- action_name=self.action_name,
- message="部署任务调度器",
- platform=platform.upper(),
- )
- )
-
- # [⚔] Distribute common state machine patterns
- if platform == "vps":
- self.deploy_on_vps()
- elif platform == "serverless":
- raise NotImplementedError
- elif platform == "qing-long":
- return self.job_loop_claim()
-
- def job_loop_claim(self):
- """单步子任务 认领周免游戏"""
-
- def _release_power(urls: Optional[list] = None):
- if not urls:
- self.logger.debug(
- ToolBox.runtime_report(
- motive="SKIP",
- action_name=self.action_name,
- message="🛴 当前玩家暂无待认领的周免游戏。",
- )
- )
- return
-
- # 优先处理常规情况 urls.__len__() == 1
- for url in urls:
- self.logger.debug(
- ToolBox.runtime_report(
- motive="STARTUP",
- action_name="ScaffoldClaim",
- message="🍜 正在为玩家领取周免游戏",
- game=f"『{limited_free_game_objs[url]}』",
- )
- )
-
- # 反复生产挑战者领取周免游戏
- self.bricklayer.get_free_game(
- page_link=url, ctx_cookies=ctx_cookies, _ctx_session=challenger
- )
- # 编制运行缓存 用于生成业务报告
- _runtime = {
- self.SPAWN_TIME: ToolBox.date_format_now(),
- "status": self.bricklayer.result,
- "name": limited_free_game_objs[url],
- }
- inline_docker.append(_runtime)
-
- # 标记运行时刻
- if self.scheduler.running:
- self.logger.debug(
- ToolBox.runtime_report(
- motive="JOB",
- action_name=self.action_name,
- message="定时任务启动",
- job="claim",
- )
- )
-
- # 初始化内联数据容器 临时存储运行缓存
- inline_docker = []
-
- challenger = get_challenge_ctx(silence=self.silence)
- try:
- # 更新身份令牌
- if not self.bricklayer.cookie_manager.refresh_ctx_cookies(
- _ctx_session=challenger
- ):
- return
- ctx_cookies = self.bricklayer.cookie_manager.load_ctx_cookies()
-
- # 扫描商城促销活动,返回“0折”商品的名称与商城链接
- limited_free_game_objs = self.explorer.get_the_absolute_free_game(
- ctx_cookies, _ctx_session=challenger
- )
-
- # 释放 Claimer 认领周免游戏
- _release_power(limited_free_game_objs["urls"])
- finally:
- challenger.quit()
-
- # 缓存卸载 发送运行日志
- self._push(inline_docker)
diff --git a/src/services/explorer/__init__.py b/src/services/explorer/__init__.py
deleted file mode 100644
index 8c42a154ed..0000000000
--- a/src/services/explorer/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 15:20
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from .explorer import Explorer
-
-__all__ = ["Explorer"]
diff --git a/src/services/explorer/core.py b/src/services/explorer/core.py
deleted file mode 100644
index 4c62fda792..0000000000
--- a/src/services/explorer/core.py
+++ /dev/null
@@ -1,182 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 15:20
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import os.path
-import time
-from typing import List, ContextManager, Union, Dict
-
-from selenium.common.exceptions import WebDriverException
-from selenium.webdriver import Chrome
-from selenium.webdriver.common.action_chains import ActionChains
-from selenium.webdriver.common.by import By
-from selenium.webdriver.common.keys import Keys
-from selenium.webdriver.support import expected_conditions as EC
-from selenium.webdriver.support.wait import WebDriverWait
-
-from services.settings import DIR_EXPLORER, logger
-from services.utils import ToolBox
-from .exceptions import DiscoveryTimeoutException
-
-
-class AwesomeFreeGirl:
- """游戏商店探索者 获取免费游戏数据以及促销信息"""
-
- # 平台对象参数
- URL_STORE_HOME = "https://store.epicgames.com/zh-CN/"
- URL_FREE_GAMES = "https://store.epicgames.com/zh-CN/free-games"
- URL_STORE_PREFIX = "https://store.epicgames.com/zh-CN/browse?"
- URL_STORE_FREE = (
- f"{URL_STORE_PREFIX}sortBy=releaseDate&sortDir=DESC&priceTier=tierFree&count=40"
- )
- URL_PROMOTIONS = "https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions?locale=zh-CN"
- URL_PRODUCT_PAGE = "https://store.epicgames.com/zh-CN/p/"
-
- def __init__(self, silence: bool = None):
- self.silence = True if silence is None else silence
-
- # 驱动参数
- self.action_name = "AwesomeFreeGirl"
-
- # 运行缓存
- self.runtime_workspace = None
- self.path_free_games = "ctx_games.csv"
- self.game_objs = {} # {index0:{name:value url:value}, }
-
- # 初始化工作空间
- self._init_workspace()
-
- def _init_workspace(self) -> None:
- """初始化工作目录 缓存游戏商店数据"""
- self.runtime_workspace = "." if not os.path.exists(DIR_EXPLORER) else DIR_EXPLORER
- self.path_free_games = os.path.join(self.runtime_workspace, self.path_free_games)
-
- def _discovery_free_games(
- self, ctx: Union[ContextManager, Chrome], ctx_cookies: List[dict]
- ) -> None:
- """发现玩家所属地区可视的常驻免费游戏数据"""
-
- # 重载玩家令牌
- if ctx_cookies:
- ctx.get(self.URL_STORE_FREE)
- for cookie_dict in ctx_cookies:
- ctx.add_cookie(cookie_dict)
-
- _mode = "(深度搜索)" if ctx_cookies else "(广度搜索)"
- logger.debug(
- ToolBox.runtime_report(
- motive="DISCOVERY",
- action_name=self.action_name,
- message=f"📡 正在为玩家搜集免费游戏{_mode}...",
- )
- )
-
- # 获取免费游戏链接
- _start = time.time()
- _url_store_free = self.URL_STORE_FREE
- while True:
- ctx.get(_url_store_free)
- time.sleep(1)
- WebDriverWait(ctx, 10, ignored_exceptions=WebDriverException).until(
- EC.presence_of_element_located(
- (By.XPATH, "//section[@data-testid='section-wrapper']")
- )
- )
-
- # 滑到底部
- action = ActionChains(ctx)
- action.send_keys(Keys.END)
- action.perform()
-
- # 判断异常跳转
- if "tierFree" not in ctx.current_url:
- break
- if time.time() - _start > 80:
- raise DiscoveryTimeoutException("获取免费游戏链接超时")
-
- # 断言最后一页
- WebDriverWait(ctx, 5, ignored_exceptions=WebDriverException).until(
- EC.element_to_be_clickable(
- (By.XPATH, "//a[@data-component='PaginationItem']")
- )
- )
- page_switcher = ctx.find_elements(
- By.XPATH, "//a[@data-component='PaginationItem']"
- )[-1]
-
- # 提取价值信息
- game_objs = ctx.find_elements(By.XPATH, "//a[@class='css-1jx3eyg']")
- for game_obj in game_objs:
- name = game_obj.get_attribute("aria-label")
- url = game_obj.get_attribute("href")
- self.game_objs.update(
- {self.game_objs.__len__(): {"name": name.strip(), "url": url.strip()}}
- )
-
- # 页面跳转判断
- page_end = page_switcher.get_attribute("href")
- if page_end in ctx.current_url:
- break
-
- # 更新跳转链接
- _url_store_free = page_end
-
- logger.success(
- ToolBox.runtime_report(
- motive="DISCOVERY",
- action_name=self.action_name,
- message="免费游戏搜集完毕",
- qsize=len(self.game_objs),
- )
- )
-
- def stress_expressions(self, ctx: Union[ContextManager, Chrome]) -> Dict[str, str]:
- """
- 应力表达式的主要实现
-
- :param ctx: 浏览器驱动上下文
- :return: 不需要 quit()
- """
- logger.debug(
- ToolBox.runtime_report(
- motive="DISCOVERY",
- action_name=self.action_name,
- message="📡 使用应力表达式搜索周免游戏...",
- )
- )
-
- # 访问链接 游戏名称
- pending_games = {}
-
- for _ in range(2):
- try:
- ctx.get(self.URL_STORE_HOME)
- time.sleep(3)
-
- # 定位周免游戏的绝对位置
- WebDriverWait(ctx, 45, ignored_exceptions=WebDriverException).until(
- EC.presence_of_element_located(
- (By.XPATH, "//a[contains(string(),'当前免费')]")
- )
- )
-
- # 周免游戏基本信息
- stress_operator = ctx.find_elements(
- By.XPATH, "//a[contains(string(),'当前免费')]"
- )
- img_seq = ctx.find_elements(
- By.XPATH, "//a[contains(string(),'当前免费')]//img"
- )
-
- # 重组周免游戏信息
- for index, _ in enumerate(stress_operator):
- href = stress_operator[index].get_attribute("href")
- alias = img_seq[index].get_attribute("alt")
- pending_games[href] = alias
-
- break
- except (WebDriverException, AttributeError):
- continue
-
- return pending_games
diff --git a/src/services/explorer/exceptions.py b/src/services/explorer/exceptions.py
deleted file mode 100644
index 8161cb1d19..0000000000
--- a/src/services/explorer/exceptions.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 15:20
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from typing import Optional, Sequence
-
-
-class Explorer(Exception):
- def __init__(
- self, msg: Optional[str] = None, stacktrace: Optional[Sequence[str]] = None
- ):
- self.msg = msg
- self.stacktrace = stacktrace
- super().__init__()
-
- def __str__(self) -> str:
- exception_msg = "Message: {}\n".format(self.msg)
- if self.stacktrace:
- stacktrace = "\n".join(self.stacktrace)
- exception_msg += "Stacktrace:\n{}".format(stacktrace)
- return exception_msg
-
-
-class DiscoveryTimeoutException(Explorer):
- """未能在规定时间内为指定玩家搜索免费游戏"""
diff --git a/src/services/explorer/explorer.py b/src/services/explorer/explorer.py
deleted file mode 100644
index 051481d04a..0000000000
--- a/src/services/explorer/explorer.py
+++ /dev/null
@@ -1,280 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/17 15:20
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import csv
-import json.decoder
-from typing import List, Optional, Union, Dict, Any
-
-import cloudscraper
-from lxml import etree
-
-from services.settings import logger
-from services.utils import ToolBox, get_ctx
-from .core import AwesomeFreeGirl
-from .exceptions import DiscoveryTimeoutException
-
-
-class GameLibManager(AwesomeFreeGirl):
- """游戏对象管理 缓存商城数据以及判断游戏在库状态"""
-
- def __init__(self):
- super().__init__()
-
- self.action_name = "GameLibManager"
-
- def save_game_objs(self, game_objs: List[Dict[str, str]]) -> None:
- """缓存免费商城数据"""
- if not game_objs:
- return
-
- with open(self.path_free_games, "w", encoding="utf8", newline="") as file:
- writer = csv.writer(file)
- writer.writerow(["name", "url"])
- for game_obj in game_objs:
- cell = (game_obj["name"], game_obj["url"])
- writer.writerow(cell)
-
- logger.success(
- ToolBox.runtime_report(
- motive="SAVE",
- action_name=self.action_name,
- message="Cache free game information.",
- )
- )
-
- def load_game_objs(self, only_url: bool = True) -> Optional[List[str]]:
- """
- 加载缓存在本地的免费游戏对象
-
- :param only_url:
- :return:
- """
- try:
- with open(self.path_free_games, "r", encoding="utf8") as file:
- data = list(csv.reader(file))
- except FileNotFoundError:
- return []
- else:
- if not data:
- return []
- if only_url:
- return [i[-1] for i in data[1:]]
- return data[1:]
-
- def is_my_game(
- self, ctx_cookies: Union[List[dict], str], page_link: str
- ) -> Optional[dict]:
- """
- 判断游戏在库状态
-
- :param ctx_cookies:
- :param page_link:
- :return:
- None 异常状态
- True 跳过任务
- False 继续任务
- """
- headers = {
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62",
- "cookie": ctx_cookies
- if isinstance(ctx_cookies, str)
- else ToolBox.transfer_cookies(ctx_cookies),
- }
- scraper = cloudscraper.create_scraper()
- response = scraper.get(page_link, headers=headers)
- tree = etree.HTML(response.content)
- assert_obj = tree.xpath(
- "//span[@data-component='PurchaseCTA']//span[@data-component='Message']"
- )
-
- # 🚧 异常状态
- if not assert_obj:
- logger.debug(
- ToolBox.runtime_report(
- motive="IGNORE",
- action_name=self.action_name,
- message="忽略尚未发布的游戏对象",
- url=page_link,
- )
- )
- return {"assert": "AssertObjectNotFound", "status": None}
-
- assert_message = assert_obj[0].text
- response_obj = {"assert": assert_message, "warning": "", "status": None}
-
- # 🚧 跳过 `无法认领` 的日志信息
- if assert_message in ["已在游戏库中", "立即购买", "即将推出"]:
- response_obj["status"] = True
- # 🚧 惰性加载,前置节点不处理动态加载元素
- elif assert_message in ["正在载入"]:
- response_obj["status"] = False
- # 🍟 未领取的免费游戏
- elif assert_message in ["获取"]:
- warning_obj = tree.xpath("//h1[@class='css-1gty6cv']//span")
- # 出现遮挡警告
- if warning_obj:
- warning_message = warning_obj[0].text
- response_obj["warning"] = warning_message
- # 成人内容可获取
- if "成人内容" in warning_message:
- response_obj["status"] = False
- else:
- logger.warning(
- ToolBox.runtime_report(
- motive="SKIP",
- action_name=self.action_name,
- message=warning_message,
- url=page_link,
- )
- )
- response_obj["status"] = None
- # 继续任务
- else:
- response_obj["status"] = False
-
- return response_obj
-
-
-class Explorer(AwesomeFreeGirl):
- """商城探索者 发现常驻免费游戏以及周免游戏"""
-
- def __init__(self, silence: Optional[bool] = None):
- super().__init__(silence=silence)
-
- self.action_name = "Explorer"
-
- self.game_manager = GameLibManager()
-
- def discovery_free_games(
- self, ctx_cookies: Optional[List[dict]] = None, cover: bool = True
- ) -> Optional[List[str]]:
- """
- 发现免费游戏。
-
- 本周免费 + 常驻免费
- ________________________________________________________
- 1. 此接口可以不传 cookie,免费游戏是公开可见的。
- 2. 但如果要查看免费游戏的在库状态,需要传 COOKIE 区分用户。
- - 有些游戏不同地区的玩家不一定都能玩。这个限制和账户地区信息有关,和当前访问的(代理)IP 无关。
- - 请确保传入的 COOKIE 是有效的。
- :param cover:
- :param ctx_cookies: ToolBox.transfer_cookies(api.get_cookies())
- :return:
- """
- # 创建驱动上下文
- with get_ctx(silence=self.silence) as ctx:
- try:
- self._discovery_free_games(ctx=ctx, ctx_cookies=ctx_cookies)
- except DiscoveryTimeoutException:
- return self.discovery_free_games(ctx_cookies=None, cover=cover)
-
- # 提取游戏平台对象
- game_objs = self.game_objs.values()
-
- # 运行缓存持久化
- if cover:
- self.game_manager.save_game_objs(game_objs)
-
- # 返回链接
- return [game_obj.get("url") for game_obj in game_objs]
-
- def get_the_limited_free_game(
- self, ctx_cookies: Optional[List[dict]] = None
- ) -> Dict[str, Any]:
- """
- 获取周免游戏
-
- :param ctx_cookies:
- :return:
- """
-
- def _update_limited_free_game_objs(element_: dict):
- free_game_objs[url] = element_["title"]
- free_game_objs["urls"].append(url)
-
- free_game_objs = {"urls": []}
-
- scraper = cloudscraper.create_scraper()
- response = scraper.get(self.URL_PROMOTIONS)
-
- try:
- data = response.json()
- except json.decoder.JSONDecodeError:
- pass
- else:
- elements = data["data"]["Catalog"]["searchStore"]["elements"]
- for element in elements:
- promotions = element.get("promotions")
-
- # 剔除掉过期的折扣实体
- if not promotions:
- continue
-
- # 提取商品页slug
- url = self.URL_PRODUCT_PAGE + element["urlSlug"]
-
- # 健壮工程,预判数据类型的变更
- if not ctx_cookies:
- # 获取实体的促销折扣值 discount_percentage
- discount_setting = promotions["promotionalOffers"][0][
- "promotionalOffers"
- ][0]["discountSetting"]
- discount_percentage = discount_setting["discountPercentage"]
- if (
- not isinstance(discount_percentage, str)
- and not discount_percentage
- ) or (
- isinstance(discount_percentage, str)
- and not float(discount_percentage)
- ):
- _update_limited_free_game_objs(element)
- else:
- response = self.game_manager.is_my_game(
- ctx_cookies=ctx_cookies, page_link=url
- )
- if (
- not response["status"]
- and response["assert"] != "AssertObjectNotFound"
- ):
- _update_limited_free_game_objs(element)
-
- return free_game_objs
-
- def get_the_absolute_free_game(
- self, ctx_cookies: Optional[List[dict]], _ctx_session=None
- ) -> Dict[str, Any]:
- """使用应力表达式萃取商品链接"""
-
- free_game_objs = {"urls": []}
-
- # 使用应力表达式萃取商品链接
- if _ctx_session:
- critical_memory = _ctx_session.current_window_handle
- try:
- _ctx_session.switch_to.new_window("tab")
- pending_games: Dict[str, str] = self.stress_expressions(ctx=_ctx_session)
- finally:
- _ctx_session.switch_to.window(critical_memory)
- else:
- with get_ctx(silence=self.silence) as ctx:
- pending_games: Dict[str, str] = self.stress_expressions(ctx=ctx)
-
- # 中断空对象的工作流
- if not pending_games:
- return free_game_objs
-
- # 任务批处理
- for url, title in pending_games.items():
- # 带入身份令牌判断周免游戏的在库状态
- response = self.game_manager.is_my_game(
- ctx_cookies=ctx_cookies, page_link=url
- )
- if not response["status"] and response["assert"] != "AssertObjectNotFound":
- # 将待认领的周免游戏送入任务队列
- free_game_objs[url] = title
- free_game_objs["urls"].append(url)
-
- return free_game_objs
diff --git a/src/services/scaffold.py b/src/services/scaffold.py
deleted file mode 100644
index 0b11b51f85..0000000000
--- a/src/services/scaffold.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from typing import Optional
-
-from apis.scaffold import get, challenge, install, claimer
-
-
-class Scaffold:
- """系统脚手架 顶级接口指令"""
-
- @staticmethod
- def install(onnx_prefix: Optional[str] = None):
- """下载运行依赖"""
- install.run(onnx_prefix=onnx_prefix)
-
- @staticmethod
- def test():
- """检查挑战者驱动版本是否适配"""
- install.test()
-
- @staticmethod
- def challenge(silence: Optional[bool] = True):
- """
- 正面硬刚人机挑战,为当前账号获取有效的身份令牌。
-
- ## Intro
-
- - 请确保你已在 `config.yaml` 中配置了正确的账号信息。
- - 更新后的 cookie 存储在 `/src/database/cookies/user_cookies.txt` 文件中
-
- ## Tips
-
- - 本指令并不会强制激活人机验证。硬刚人机挑战不是目的,获取到有效的身份令牌才是目的,不要徒增功耗。
- - 也即,如果当前缓存的身份令牌还未失效,挑战跳过。
-
- :param silence: (Default: True) IF False: 将在图形化系统中显式启动浏览器,演示人机挑战的执行过程。
- :return:
- """
- challenge.run(silence=silence)
-
- @staticmethod
- def get(debug: Optional[bool] = None, cache: Optional[bool] = True):
- """
- 「我可以不玩但不能没有。」—— 鲁·克莱摩·迅
-
- ## Intro
-
- - `get` 只做一件事,搬空免费商店!
-
- - 这是个趣味性和观赏性都拉满的一次性任务。系统会根据你的设备性能发起最高 4 协程并发的驱动任务,为你节省扫荡时间。
-
- - 显然地,这是一项对操作系统内存和网络I/O要求都不低的任务,如果你嫌这五六十款(不同地区权限不同)
- 多余的常驻免费游戏会影响你翻找游戏库的效率,请速速退朝。
-
- - `get` 指令启动标准上下文执行任务,其并不足以应付隐藏在订单中的人机挑战。因此,`get` 指令会自动跳过未认领的周免游戏。
- 请使用生产效率更高的 `claim` 指令认领周免游戏。
-
- ## Local Static CacheFile
-
- 此指令会将免费商城数据存储在 `src/database/explorer` 目录下。存储内容与当前上下文身份令牌有关(不同地区权限不同)
-
- ## Contributes
-
- - 若运行出现意料之外的报错,请运行 debug 模式,留意 Exception 信息,并将完整的栈追踪信息提交至 `issues` ,不胜感激!
- - https://github.com/QIN2DIM/epic-awesome-gamer
-
- :param cache: 使用商城缓存数据
- :param debug: 显示栈追踪日志信息
- :return:
- """
- get.join(trace=debug, cache=cache)
-
- @staticmethod
- def claim(silence: Optional[bool] = True):
- """
- 认领周免游戏。
-
- ## Intro
-
- `claim` 做的事非常简单,获取本周促销数据,分析是否有待认领的周免游戏,根据分析结果执行相关任务。
-
- `claim` 是系统级指令 `deploy` 的单步子任务,在上述业务结束后,会根据你配置的 `pusher` 推送追踪日志(若配置无效则不发)。
-
- :return:
- """
- claimer.run(silence=silence)
-
- @staticmethod
- def deploy(platform: Optional[str] = None):
- """
- 部署系统定时任务。
-
- :param platform: 可选项 [vps serverless qing-long]
- :return:
- """
- claimer.deploy(platform)
diff --git a/src/services/settings.py b/src/services/settings.py
deleted file mode 100644
index ef24792e57..0000000000
--- a/src/services/settings.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import os
-import sys
-from os.path import join, dirname
-from typing import Dict, Any, Optional
-
-from services.utils import ToolBox
-
-__all__ = [
- # ------------------------------
- # SETTINGS
- # ------------------------------
- "logger",
- "DIR_CHALLENGE",
- "DIR_COOKIES",
- "DIR_TEMP_CACHE",
- "DIR_EXPLORER",
- "PATH_USR_COOKIES",
- "DIR_MODEL",
- "PATH_RAINBOW",
- # ------------------------------
- # CONFIG
- # ------------------------------
- "EPIC_PASSWORD",
- "EPIC_EMAIL",
- "MESSAGE_PUSHER_SETTINGS",
-]
-__version__ = "0.3.3.dev"
-
-"""
-================================================ ʕ•ﻌ•ʔ ================================================
- (·▽·)欢迎嫖友入座
-================================================ ʕ•ﻌ•ʔ ================================================
-[√]核心配置 [※]边缘参数
-"""
-# ---------------------------------------------------
-# [√]工程根目录定位
-# ---------------------------------------------------
-# 系统根目录
-PROJECT_ROOT = dirname(dirname(__file__))
-# 文件数据库目录
-PROJECT_DATABASE = join(PROJECT_ROOT, "database")
-# YOLO模型
-DIR_MODEL = join(PROJECT_ROOT, "model")
-# Reinforcement of memory
-PATH_RAINBOW = join(DIR_MODEL, "rainbow.yaml")
-# Cookie 工作目录
-DIR_COOKIES = join(PROJECT_DATABASE, "cookies")
-PATH_USR_COOKIES = join(DIR_COOKIES, "user_cookies.txt")
-# FreeGame Mining Workspace
-DIR_EXPLORER = join(PROJECT_DATABASE, "explorer")
-# 运行缓存目录
-DIR_TEMP_CACHE = join(PROJECT_DATABASE, "temp_cache")
-# 挑战缓存
-DIR_CHALLENGE = join(DIR_TEMP_CACHE, "_challenge")
-# 服务日志目录
-DIR_LOG = join(PROJECT_DATABASE, "logs")
-# ---------------------------------------------------
-# [√]服务器日志配置
-# ---------------------------------------------------
-logger = ToolBox.init_log(
- error=join(DIR_LOG, "error.log"), runtime=join(DIR_LOG, "runtime.log")
-)
-
-# ---------------------------------------------------
-# 路径补全
-# ---------------------------------------------------
-for _pending in [
- PROJECT_DATABASE,
- DIR_MODEL,
- DIR_EXPLORER,
- DIR_COOKIES,
- DIR_TEMP_CACHE,
- DIR_CHALLENGE,
- DIR_LOG,
-]:
- if not os.path.exists(_pending):
- os.mkdir(_pending)
-"""
-================================================== ʕ•ﻌ•ʔ ==================================================
- 若您并非项目开发者 请勿修改以下变量的默认参数
-================================================== ʕ•ﻌ•ʔ ==================================================
-
- Enjoy it -> ♂ main.py
-"""
-config_ = ToolBox.check_sample_yaml(
- path_output=join(dirname(dirname(__file__)), "config.yaml"),
- path_sample=join(dirname(dirname(__file__)), "config-sample.yaml"),
-)
-# --------------------------------
-# [√] 账号信息
-# --------------------------------
-# 不建议在公有库上创建工作流运行项目,有仓库禁用风险
-EPIC_EMAIL: Optional[str] = config_.get("EPΙC_EMAΙL", "")
-EPIC_PASSWORD: Optional[str] = config_.get("EPΙC_PASSWΟRD", "")
-
-# --------------------------------
-# [※] 消息推送配置
-# --------------------------------
-MESSAGE_PUSHER_SETTINGS: Optional[Dict[str, Any]] = config_.get(
- "message_pusher_settings", {}
-)
-PUSHER: Optional[Dict[str, Optional[str]]] = MESSAGE_PUSHER_SETTINGS.get("pusher", {})
-
-# --------------------------------
-# [※] 补全语法模板
-# --------------------------------
-if not EPIC_EMAIL:
- EPIC_EMAIL = os.getenv("EPΙC_EMAΙL", "")
-if not EPIC_PASSWORD:
- EPIC_PASSWORD = os.getenv("EPΙC_PASSWΟRD", "")
-
-try:
- for server in PUSHER:
- if not PUSHER[server]:
- PUSHER[server] = os.getenv(server, "")
-except KeyError as e:
- print(f"[进程退出] 核心配置文件被篡改 error={e}")
- sys.exit()
-# --------------------------------
-# [√] 阻止缺省配置
-# --------------------------------
-if not all((EPIC_EMAIL, EPIC_PASSWORD)):
- print("[进程退出] 账号信息未配置或相关变量不合法")
- sys.exit()
-
-if not any(PUSHER.values()):
- MESSAGE_PUSHER_SETTINGS["enable"] = False
diff --git a/src/services/utils/__init__.py b/src/services/utils/__init__.py
deleted file mode 100644
index b19728c0d6..0000000000
--- a/src/services/utils/__init__.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 0:25
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-from .accelerator.core import CoroutineSpeedup, AshFramework
-from .armor.anti_hcaptcha.core import ArmorCaptcha
-from .armor.anti_hcaptcha.exceptions import (
- LabelNotFoundException,
- ChallengeReset,
- ChallengeTimeout,
-)
-from .armor.anti_hcaptcha.solutions.sk_recognition import (
- RiverChallenger,
- DetectionChallenger,
-)
-from .armor.anti_hcaptcha.solutions.yolo import YOLO
-from .armor.anti_hcaptcha.solutions.sk_recognition import SKRecognition
-from .toolbox.toolbox import ToolBox, get_ctx, get_challenge_ctx
-
-__all__ = [
- "ToolBox",
- "ArmorCaptcha",
- "LabelNotFoundException",
- "CoroutineSpeedup",
- "AshFramework",
- "get_ctx",
- "get_challenge_ctx",
- "ChallengeReset",
- "ChallengeTimeout",
- "YOLO",
- "SKRecognition",
- "RiverChallenger",
- "DetectionChallenger",
-]
diff --git a/src/services/utils/accelerator/__init__.py b/src/services/utils/accelerator/__init__.py
deleted file mode 100644
index 6a36507c1c..0000000000
--- a/src/services/utils/accelerator/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2021/12/25 17:05
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
diff --git a/src/services/utils/accelerator/core.py b/src/services/utils/accelerator/core.py
deleted file mode 100644
index a85f94f273..0000000000
--- a/src/services/utils/accelerator/core.py
+++ /dev/null
@@ -1,201 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2021/12/22 9:05
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import asyncio
-import os
-from typing import Optional, List, Union
-
-import aiohttp
-import gevent
-from gevent import queue
-
-
-class CoroutineSpeedup:
- """轻量化的协程控件"""
-
- def __init__(self, docker: Optional[List] = None, power: Optional[int] = None):
- # 任务容器:queue
- self.worker, self.done = queue.Queue(), queue.Queue()
-
- # 任务容器
- self.docker = docker
-
- # 协程数
- self.power = max(os.cpu_count(), 2) if power is None else power
-
- # 任务队列满载时刻长度
- self.max_queue_size = 0
-
- def progress(self) -> str:
- """
- 任务进度
-
- :return:
- """
- _progress = self.max_queue_size - self.worker.qsize()
- return (
- "__pending__"
- if _progress < self.power
- else f"{_progress}/{self.max_queue_size}"
- )
-
- def launcher(self, *args, **kwargs):
- """
- 适配器实例生产
-
- :return:
- """
- while not self.worker.empty():
- task = self.worker.get_nowait()
- self.control_driver(task, *args, **kwargs)
-
- def control_driver(self, task, *args, **kwargs):
- """
- 默认逻辑
-
- :param task:
- :return:
- """
- raise NotImplementedError
-
- def preload(self):
- """
- 数据预处理
-
- :return:
- """
-
- def overload(self):
- """
- 任务重载
-
- :return:
- """
- if self.docker:
- for task in self.docker:
- self.worker.put_nowait(task)
- self.max_queue_size = self.worker.qsize()
-
- def offload(self) -> list:
- """
- 缓存卸载
-
- :return:
- """
- docker = []
- while not self.done.empty():
- docker.append(self.done.get())
- return docker
-
- def killer(self):
- """
- 缓存回收
-
- :return:
- """
-
- def speedup(self, *args, **kwargs):
- """
- 框架接口
-
- :return:
- """
-
- # 任务重载
- self.overload()
-
- # 弹出空载任务
- if self.max_queue_size == 0:
- return
-
- # 粘性功率
- power = kwargs.get("power")
- self.power = self.power if power is None else power
- self.power = (
- self.max_queue_size if self.power > self.max_queue_size else self.power
- )
-
- # 任务启动
- task_list = []
- for _ in range(self.power):
- task = gevent.spawn(self.launcher, *args, **kwargs)
- task_list.append(task)
- try:
- gevent.joinall(task_list)
- finally:
- self.killer()
-
-
-class AshFramework:
- """轻量化的协程控件"""
-
- def __init__(self, docker: Optional[List] = None):
- # 任务容器:queue
- self.worker, self.done = asyncio.Queue(), asyncio.Queue()
- # 任务容器
- self.docker = docker
- # 任务队列满载时刻长度
- self.max_queue_size = 0
-
- def progress(self) -> str:
- """任务进度"""
- _progress = self.max_queue_size - self.worker.qsize()
- return f"{_progress}/{self.max_queue_size}"
-
- def preload(self):
- """预处理"""
-
- def overload(self):
- """任务重载"""
- if self.docker:
- for task in self.docker:
- self.worker.put_nowait(task)
- self.max_queue_size = self.worker.qsize()
-
- def offload(self) -> Optional[List]:
- """缓存卸载"""
- crash = []
- while not self.done.empty():
- crash.append(self.done.get())
- return crash
-
- async def control_driver(self, context, session=None):
- """需要并发执行的代码片段"""
- raise NotImplementedError
-
- async def launcher(self, session=None):
- """适配接口模式"""
- while not self.worker.empty():
- context = self.worker.get_nowait()
- await self.control_driver(context, session=session)
-
- async def subvert(self, workers: Union[str, int]):
- """
- 框架接口
-
- loop = asyncio.get_event_loop()
- loop.run_until_complete(fl.go(workers))
-
- :param workers: ["fast", power]
- :return:
- """
- # 任务重载
- self.overload()
-
- # 弹出空载任务
- if self.max_queue_size == 0:
- return
-
- # 粘性功率
- workers = self.max_queue_size if workers in ["fast"] else workers
- workers = workers if workers <= self.max_queue_size else self.max_queue_size
-
- # 弹性分发
- task_list = []
- async with aiohttp.ClientSession() as session:
- for _ in range(workers):
- task = self.launcher(session=session)
- task_list.append(task)
- await asyncio.wait(task_list)
diff --git a/src/services/utils/armor/__init__.py b/src/services/utils/armor/__init__.py
deleted file mode 100644
index d62eaa4e56..0000000000
--- a/src/services/utils/armor/__init__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# -*- coding: utf-8 -*-
-# Time : 2022/1/16 2:30
-# Author : QIN2DIM
-# Github : https://github.com/QIN2DIM
-# Description:
-import os
-import sys
-import random
-import requests
-
-if os.getenv("RUNNER_TOOL_CACHE"):
- _uxo = f"https://github.com/{os.getenv('GITHUB_REPOSITORY', '')}"
- try:
- if requests.get(_uxo).status_code != 404:
- raise requests.RequestException
- except requests.RequestException:
- print(
- "[Warning] 禁止在 fork 分支上运行工作流,请创建私有工作流。\n"
- "详见 https://blog.echosec.top/epic-awesome-gamer/zh/docs/player/hosting-deployment/privatisation/"
- )
- if random.uniform(0, 1) > 0.15:
- sys.exit()
diff --git a/src/services/utils/armor/anti_hcaptcha/__init__.py b/src/services/utils/armor/anti_hcaptcha/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/src/services/utils/armor/anti_hcaptcha/core.py b/src/services/utils/armor/anti_hcaptcha/core.py
deleted file mode 100644
index 7b2edf8466..0000000000
--- a/src/services/utils/armor/anti_hcaptcha/core.py
+++ /dev/null
@@ -1,253 +0,0 @@
-import os
-import re
-import time
-
-import requests
-from loguru import logger
-from selenium.common.exceptions import (
- ElementNotVisibleException,
- ElementClickInterceptedException,
- WebDriverException,
- TimeoutException,
-)
-from selenium.webdriver.common.by import By
-from selenium.webdriver.support import expected_conditions as EC
-from selenium.webdriver.support.wait import WebDriverWait
-from undetected_chromedriver import Chrome
-
-from .exceptions import LabelNotFoundException, ChallengeReset, ChallengeTimeout
-
-
-class ArmorCaptcha:
- """hCAPTCHA challenge 驱动控制"""
-
- def __init__(self, dir_workspace: str = None, debug=False):
-
- self.action_name = "ArmorCaptcha"
- self.debug = debug
-
- # 存储挑战图片的目录
- self.runtime_workspace = ""
-
- # 博大精深!
- self.label_alias = {
- "自行车": "bicycle",
- "火车": "train",
- "卡车": "truck",
- "公交车": "bus",
- "巴土": "bus",
- "巴士": "bus",
- "飞机": "aeroplane",
- "ー条船": "boat",
- "船": "boat",
- "汽车": "car",
- "摩托车": "motorbike",
- "垂直河流": "vertical river",
- "天空中向左飞行的飞机": "airplane in the sky flying left",
- }
-
- # 样本标签映射 {挑战图片1: locator1, ...}
- self.alias2locator = {}
- # 填充下载链接映射 {挑战图片1: url1, ...}
- self.alias2url = {}
- # 填充挑战图片的缓存地址 {挑战图片1: "/images/挑战图片1.png", ...}
- self.alias2path = {}
- # 存储模型分类结果 {挑战图片1: bool, ...}
- self.alias2answer = {}
- # 图像标签
- self.label = ""
- # 运行缓存
- self.dir_workspace = dir_workspace if dir_workspace else "."
-
- self._headers = {
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62"
- }
-
- def log(self, message: str, **params) -> None:
- """格式化日志信息"""
- if not self.debug:
- return
-
- motive = "Challenge"
- flag_ = f">> {motive} [{self.action_name}] {message}"
- if params:
- flag_ += " - "
- flag_ += " ".join([f"{i[0]}={i[1]}" for i in params.items()])
- logger.debug(flag_)
-
- def _init_workspace(self):
- """初始化工作目录,存放缓存的挑战图片"""
- _prefix = f"{int(time.time())}" + f"_{self.label}" if self.label else ""
- _workspace = os.path.join(self.dir_workspace, _prefix)
- if not os.path.exists(_workspace):
- os.mkdir(_workspace)
- return _workspace
-
- def tactical_retreat(self) -> bool:
- """模型存在泛化死角,遇到指定标签时主动进入下一轮挑战,节约时间"""
- if self.label in ["水上飞机"] or not self.label_alias.get(self.label):
- self.log(message="模型泛化较差,逃逸", label=self.label)
- return True
- return False
-
- def mark_samples(self, ctx: Chrome):
- """
- 获取每个挑战图片的下载链接以及网页元素位置
-
- :param ctx:
- :return:
- """
- self.log(message="获取挑战图片链接及元素定位器")
-
- # 等待图片加载完成
- WebDriverWait(ctx, 10, ignored_exceptions=ElementNotVisibleException).until(
- EC.presence_of_all_elements_located((By.XPATH, "//div[@class='task-image']"))
- )
- time.sleep(1)
-
- # DOM 定位元素
- samples = ctx.find_elements(By.XPATH, "//div[@class='task-image']")
- for sample in samples:
- alias = sample.get_attribute("aria-label")
- while True:
- try:
- image_style = sample.find_element(
- By.CLASS_NAME, "image"
- ).get_attribute("style")
- url = re.split(r'[(")]', image_style)[2]
- self.alias2url.update({alias: url})
- break
- except IndexError:
- continue
- self.alias2locator.update({alias: sample})
-
- def get_label(self, ctx: Chrome):
- """
- 获取人机挑战需要识别的图片类型(标签)
-
- :param ctx:
- :return:
- """
- try:
- label_obj = WebDriverWait(
- ctx, 30, ignored_exceptions=ElementNotVisibleException
- ).until(
- EC.presence_of_element_located((By.XPATH, "//div[@class='prompt-text']"))
- )
- except TimeoutException:
- raise ChallengeReset("人机挑战意外通过")
- try:
- _label = re.split(r"[包含 图片]", label_obj.text)[2][:-1]
- except (AttributeError, IndexError):
- raise LabelNotFoundException("获取到异常的标签对象。")
- else:
- self.label = _label
- self.log(
- message="获取挑战标签",
- label=f"{self.label}({self.label_alias.get(self.label, 'none')})",
- )
-
- def download_images(self):
- """
- 下载挑战图片
-
- ### hcaptcha 设有挑战时长的限制
-
- 如果一段时间内没有操作页面元素,