diff --git a/managers/scanner_manager.py b/managers/scanner_manager.py index 08cfe53..a3f5838 100644 --- a/managers/scanner_manager.py +++ b/managers/scanner_manager.py @@ -100,6 +100,7 @@ def scan_url(self, url, pbar=None): if is_spring is False: if pbar: pbar.update(1) + pbar.refresh() return # 进行路径检测 @@ -111,7 +112,9 @@ def scan_url(self, url, pbar=None): self.reporter.generate(url, is_spring, detected_paths, found_cves) if pbar: pbar.update(1) + pbar.refresh() except Exception as e: logger.error(f"Error processing URL: {e}", extra={"target": url}) if pbar: - pbar.update(1) \ No newline at end of file + pbar.update(1) + pbar.refresh() \ No newline at end of file diff --git a/scanners/path_detector.py b/scanners/path_detector.py index 6ee589b..b72d086 100644 --- a/scanners/path_detector.py +++ b/scanners/path_detector.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ File Name: path_detector.py - Description : 路径检测模块,提高了路径探测速度和效率 + Description : 优化路径检测模块,提高路径探测速度和效率,增加 SSL 错误处理与重试机制 Author : sule01u date: 2023/10/8 """ @@ -10,24 +10,45 @@ import time import requests from urllib.parse import urljoin -from concurrent.futures import as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed from utils.custom_headers import TIMEOUT, DEFAULT_HEADER +from colorama import Fore from utils.logging_config import configure_logger -from utils.global_thread_pool import GlobalThreadPool # 引入全局线程池 from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import threading +import ssl +from urllib3.exceptions import InsecureRequestWarning +import warnings + +# 禁用urllib3中的不安全请求警告 +warnings.simplefilter('ignore', InsecureRequestWarning) # 初始化日志记录 logger = configure_logger(__name__) +class SSLAdapter(HTTPAdapter): + """自定义 SSL 适配器,指定 SSL/TLS 版本""" + def __init__(self, ssl_version=None, **kwargs): + self.ssl_version = ssl_version + super().__init__(**kwargs) + + def init_poolmanager(self, *args, **kwargs): + kwargs['ssl_version'] = self.ssl_version + super().init_poolmanager(*args, **kwargs) + + def proxy_manager_for(self, *args, **kwargs): + kwargs['ssl_version'] = self.ssl_version + return super().proxy_manager_for(*args, **kwargs) + class PathDetector: """路径探测类""" MAX_FAILED_COUNT = 80 - MAX_SUCCESS_COUNT = 80 + MAX_SUCCESS_COUNT = 50 CHUNK_SIZE = 1024 SSE_MAX_SIZE = 5120 # 5KB MAX_RESPONSE_LENGTH = 102400 # 100KB + PATH_THREAD_COUNT = 3 # 使用独立的3个线程池进行路径探测 def __init__(self, paths, proxy_manager): self.paths = paths @@ -39,9 +60,10 @@ def detect(self, url): path_failed_count = 0 path_success_count = 0 detected_paths = [] - try: - # 使用全局线程池进行并发探测 - futures = {GlobalThreadPool.submit_task(self._detect_path, url, path, signature): path for path, signature in self.paths.items()} + + # 使用独立的线程池进行路径探测,并指定最大线程数为3 + with ThreadPoolExecutor(max_workers=self.PATH_THREAD_COUNT) as executor: + futures = {executor.submit(self._detect_path, url, path, signature): path for path, signature in self.paths.items()} for future in as_completed(futures): path = futures[future] @@ -50,7 +72,6 @@ def detect(self, url): if result: detected_paths.append(result) path_success_count += 1 - logger.info(f"[Success] Detected sensitive path: {result}", extra={"target": result}) if path_success_count > self.MAX_SUCCESS_COUNT: logger.info(f"Exceeded maximum success count of {self.MAX_SUCCESS_COUNT}, stopping path detection for {url}") @@ -64,14 +85,6 @@ def detect(self, url): logger.info(f"Exceeded maximum failed count of {self.MAX_FAILED_COUNT}, stopping path detection for {url}") break time.sleep(0.05) # 防止过快请求导致目标被封禁 - except KeyboardInterrupt: - logger.warning("User interrupted the path detection process with Ctrl + C") - # 取消所有未完成的任务 - for future in futures: - future.cancel() - logger.info("All pending tasks were cancelled successfully.") - finally: - logger.info(f"Path detection process for {url} finished.") return detected_paths @@ -88,7 +101,6 @@ def _make_request(self, url): session = self._get_session() # 获取线程本地的 Session 对象 try: with session.get(url, stream=True, allow_redirects=False) as res: - logger.debug(f"[{res.status_code}] [Content-Length: {res.headers.get('Content-Length', 0)}]", extra={"target": url}) if "text/event-stream" in res.headers.get("Content-Type", ""): # SSE 流式传输处理 content = b"" @@ -98,14 +110,40 @@ def _make_request(self, url): break return content.decode("utf-8", errors="ignore") elif res.status_code == 200: + # ANSI 控制字符实现闪动效果 + blinking_effect = "\033[5m" + # 修改logger.info调用,输出红色闪动的成功消息 + logger.info(f"{blinking_effect}{Fore.RED} [{res.status_code}] [Content-Length: {res.headers.get('Content-Length', 0)}] {Fore.CYAN}<-- [Success] {Fore.RESET}", extra={"target": url}) # 返回前 MAX_RESPONSE_LENGTH 的内容 return res.text[:self.MAX_RESPONSE_LENGTH] + logger.info(f"[{res.status_code}] [Content-Length: {res.headers.get('Content-Length', 0)}]", extra={"target": url}) + except requests.exceptions.SSLError as ssl_error: + logger.error(f"SSL error occurred for {url}: {ssl_error}", extra={"target": url}) + return self._retry_with_different_ssl_version(session, url) # 使用不同的 SSL/TLS 版本重新连接 except requests.RequestException as e: logger.debug(f"Request error: {e}", extra={"target": url}) except Exception as e: logger.error(f"An unexpected error occurred during path detection: {e}", extra={"target": url}) return None + def _retry_with_different_ssl_version(self, session, url): + """尝试使用不同的 SSL/TLS 版本重新发起请求""" + ssl_versions = [ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2] + for version in ssl_versions: + try: + # 使用不同的 SSL 版本进行重试 + ssl_adapter = SSLAdapter(ssl_version=version) + session.mount('https://', ssl_adapter) + with session.get(url, stream=True, allow_redirects=False) as res: + if res.status_code == 200: + logger.info(f"Successfully connected using SSL version: {version}", extra={"target": url}) + return res.text[:self.MAX_RESPONSE_LENGTH] + except requests.exceptions.SSLError as ssl_error: + logger.warning(f"Retry with SSL version {version} failed: {ssl_error}", extra={"target": url}) + except Exception as e: + logger.error(f"An unexpected error occurred during SSL retry: {e}", extra={"target": url}) + return None + def _get_session(self): """获取线程本地的 Session 对象,如果不存在则创建""" if not hasattr(self.thread_local, 'session'): @@ -116,14 +154,17 @@ def _get_session(self): session.timeout = TIMEOUT session.max_redirects = 3 + # 配置自定义 SSL 适配器 + ssl_adapter = SSLAdapter(ssl_version=ssl.PROTOCOL_TLSv1_2) # 默认使用 TLSv1.2 + session.mount('https://', ssl_adapter) + # 配置 HTTPAdapter,启用 keep-alive 和连接池 adapter = HTTPAdapter( pool_connections=200, pool_maxsize=200, - max_retries=Retry(total=3, backoff_factor=0.3) + max_retries=Retry(total=3, backoff_factor=0.3) # 启用重试机制 ) session.mount('http://', adapter) - session.mount('https://', adapter) self.thread_local.session = session return self.thread_local.session @@ -143,11 +184,8 @@ def close_sessions(detector_instance): if __name__ == '__main__': # 测试用例 from managers.proxy_manager import ProxyManager - - # 初始化全局线程池 - GlobalThreadPool.initialize(max_workers=50) # 新增:初始化全局线程池 - proxy_manager = ProxyManager() paths = {"actuator": "_links", "actuator/beans": "beans"} path_d = PathDetector(paths, proxy_manager) - print(path_d.detect("http://192.168.1.13:8080/")) \ No newline at end of file + print(path_d.detect("http://192.168.1.13:8080/")) + print(path_d.detect("http://192.168.1.13:8083/")) \ No newline at end of file