diff --git a/fenjing/const.py b/fenjing/const.py index a248ead..80b2b60 100644 --- a/fenjing/const.py +++ b/fenjing/const.py @@ -82,30 +82,36 @@ DANGEROUS_KEYWORDS = [ '"', + "%", "'", "+", ".", "0", + '0"', "1", "2", "=", "[", "_", - "%", "attr", + "base", "builtins", "chr", "class", "config", "eval", + "exec", "global", + "import", "include", + "init", "lipsum", "mro", "namespace", "open", "pop", "popen", + "posix", "read", "request", "self", diff --git a/fenjing/waf_func_gen.py b/fenjing/waf_func_gen.py index ca57deb..c2fd3c0 100644 --- a/fenjing/waf_func_gen.py +++ b/fenjing/waf_func_gen.py @@ -4,11 +4,12 @@ import logging import random import string +import traceback from copy import copy from collections import Counter, namedtuple from functools import lru_cache -from typing import Dict, Callable, Union, List +from typing import Dict, Callable, Tuple, Union, List from .const import DETECT_MODE_ACCURATE, DETECT_MODE_FAST, DANGEROUS_KEYWORDS from .colorize import colored @@ -28,7 +29,7 @@ ] -def grouped_payloads(size=3) -> List[str]: +def grouped_payloads(size=3, sep="") -> List[str]: """将所有payload按照size个一组拼接在一起 即:['a', 'b', 'c', 'd'] -> ['ab', 'cd'] @@ -39,11 +40,114 @@ def grouped_payloads(size=3) -> List[str]: List[str]: 拼接结果 """ return [ - "".join(dangerous_keywords[i : i + size]) # flake8: noqa + sep.join(dangerous_keywords[i : i + size]) # flake8: noqa for i in range(0, len(dangerous_keywords), size) ] +def get_next_p(b: str) -> List[int]: + """KMP算法中,获取字符串B的next数组的算法过程 + + Args: + b (str): 字符串B,KMP匹配的目标 + + Returns: + List[int]: next数组,定义与KMP相同 + """ + answer = [] + for i, c in enumerate(b): + if i == 0: + answer.append(-1) + continue + p = answer[i - 1] + while p >= 0 and b[p + 1] != c: + assert answer[p] < p + p = answer[p] + + if c == b[p + 1]: + answer.append(p + 1) + else: + answer.append(p) + return answer + + +def kmp(a: str, b: str) -> Tuple[int, Union[int, None]]: + """KMP算法,在A中寻找B的最长匹配子串 + + Args: + a (str): KMP中的字符串A + b (str): KMP中的字符串B + + Returns: + Tuple[int, Union[int, None]]: 最长匹配子串的结尾长度和位置 + """ + logger.debug("kmp(len(a)=%d, len(b)=%d)", len(a), len(b)) + if b == "": + return 0, None + next_p = get_next_p(b) + max_answer, max_answer_pos = 0, None + j = -1 + for i, c in enumerate(a): + while j >= 0 and b[j + 1] != c: + assert next_p[j] < j + j = next_p[j] + if c == b[j + 1]: + j += 1 + if j + 1 > max_answer: + max_answer = j + 1 + max_answer_pos = i + + if j == len(b) - 1: + j = -1 + logger.debug(f"{i}, {c}, {j}") + return max_answer, max_answer_pos + + +def find_pieces(resp_text, payload): + """从HTTP响应的正文和对应的payload中分析出可能被替换的关键字 + + Args: + resp_text (str): HTTP响应正文 + payload (str): payload + + Returns: + List[str]: 可能被替换的关键字 + """ + assert len(resp_text) < 1e5 and len(payload) < 1e5 # perf limit + logger.debug("find_pieces(%s, %s)", resp_text[:20], payload[:20]) + max_answer, max_answer_pos = kmp(resp_text, payload) + logger.debug(f"{max_answer}, {max_answer_pos}") + if max_answer <= 2 or max_answer_pos is None: + logger.debug("max answer too low") + return [] + + resp_text_matched = resp_text[max_answer_pos - max_answer + 1 : max_answer_pos + 1] + resp_text_unmatched, payload_unmatched = ( + resp_text[max_answer_pos + 1 :], + payload[len(resp_text_matched) :], + ) + if payload_unmatched == "": + logger.debug("read payload done") + return [] + + max_answer_unmatched, max_answer_pos_unmatched = kmp( + payload_unmatched, resp_text_unmatched + ) + + if max_answer_pos_unmatched is None: + return [] + + payload_unmatched_before = payload_unmatched[ + : max_answer_pos_unmatched - max_answer_unmatched + 1 + ] + resp_text_next = resp_text_unmatched.removeprefix(payload_unmatched_before) + payload_next = payload_unmatched.removeprefix(payload_unmatched_before) + assert len(resp_text_next) < len(resp_text) and len(payload_next) < len(payload) + return [ + payload_unmatched_before, + ] + find_pieces(resp_text_next, payload_next) + + class WafFuncGen: """ 根据指定的Submitter(表单submitter或者路径submitter)生成对应的WAF函数 @@ -94,26 +198,102 @@ def waf_page_hash(self): return [k for k, v in Counter(hashes).items() if v >= 2] + def replaced_keyword(self) -> List[str]: + extra = "".join(random.choices(string.ascii_lowercase, k=6)) + test_payloads = ( + dangerous_keywords + if self.detect_mode == DETECT_MODE_ACCURATE + else grouped_payloads(4, sep=extra) + ) + keywords = [] + for payload_raw in test_payloads: + payload = extra + payload_raw + extra + logger.info( + "Testing dangerous keyword %s", + colored("yellow", repr(payload)), + ) + result = self.subm.submit(payload) + if result is None: + logger.info( + "Submit %s for %s", + colored("yellow", "failed"), + colored("yellow", repr(payload)), + ) + continue + + status_code, text = result + if status_code == 500: + continue + if len(text) > 5e4: + continue + try: + payload_replaced_keyword = find_pieces(text, payload) + except Exception: + traceback.print_exc() + continue + if payload_replaced_keyword: + payload_replaced_keyword = list(set(payload_replaced_keyword)) + if len(payload_replaced_keyword) > 10: + logger.info( + "Replaced keywords found, ignore because it's too long (length=%d)", + len(payload_replaced_keyword), + ) + else: + keywords += payload_replaced_keyword + keywords = list(set(keywords)) + logger.info( + "These keywords might get %s: %s", + colored("yellow", "replaced", bold=True), + colored("yellow", repr(keywords)), + ) + return keywords + + def doubletapping(self, payload: str, keywords: list[str]): + if not keywords: + return payload + logger.info( + "Perform %s for payload: %s", + colored("blue", "doubletapping"), + colored("yellow", payload), + ) + exist_keywords = [w for w in keywords if w in payload] + replacement = { + w: w[: len(w) // 2] + w + w[len(w) // 2 :] + for w in exist_keywords + if len(w) >= 2 + } + for k, v in sorted(replacement.items(), key=lambda item: len(item[0])): + payload = payload.replace(k, v) + return payload + def generate(self) -> Callable: """生成WAF函数 Returns: Callable: WAF函数 """ + replaced_keyword = self.replaced_keyword() waf_hashes = self.waf_page_hash() + # self.subm.add_tamperer( + # lambda s: self.doubletapping(s, replaced_keyword) + # ) + # 随着检测payload一起提交的附加内容 # content: 内容本身,passed: 内容是否确认可以通过waf extra_content, extra_passed = ( - "".join(random.choices(string.ascii_lowercase, k=6)), + "".join(random.choices(string.ascii_lowercase, k=4)), False, ) # WAF函数,只有在payload一定可以通过WAF时才返回True @lru_cache(1000) def waf_func(value): - nonlocal extra_content, extra_passed + nonlocal extra_content, extra_passed, replaced_keyword + payload = extra_content + value for _ in range(5): - result = self.subm.submit(extra_content + value) + if any(w in payload for w in replaced_keyword): + return False + result = self.subm.submit(payload) if result is None: return False # status_code, text = result @@ -126,6 +306,12 @@ def waf_func(value): logger.debug("payload产生回显") return True + # 产生关键词替换 + replaced_list = find_pieces(result.text, payload) + if replaced_list: + logger.info("发现了新的关键词替换:%s", colored("yellow", repr(replaced_list))) + replaced_keyword += replaced_list + return False # 去除下方的规则,因为如果我们没有fuzz出所有的waf页面,而此时extra_content # 不在waf页面中的话,我们应该更加保守地认为payload应该是被waf拦住了 diff --git a/tests/test_cli.py b/tests/test_cli.py index b39c118..6fb6783 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,7 +12,7 @@ from fenjing import cli, waf_func_gen -SLEEP_INTERVAL = 0.01 +SLEEP_INTERVAL = 0.02 VULUNSERVER_ADDR = os.environ["VULUNSERVER_ADDR"] waf_func_gen.logger.setLevel(logging.ERROR) diff --git a/tests/test_cracker.py b/tests/test_cracker.py index 934cbfb..bc5f48d 100644 --- a/tests/test_cracker.py +++ b/tests/test_cracker.py @@ -17,7 +17,7 @@ VULUNSERVER_ADDR = os.environ["VULUNSERVER_ADDR"] waf_func_gen.logger.setLevel(logging.ERROR) - +SLEEP_INTERVAL = 0.05 class WrappedSubmitter(Submitter): def __init__(self, subm, blacklist): @@ -39,7 +39,7 @@ def setup_local_waf(self, blacklist): url=VULUNSERVER_ADDR, form=get_form(action="/", inputs=["name"], method="GET"), target_field="name", - requester=Requester(interval=0.01), + requester=Requester(interval=SLEEP_INTERVAL), ), self.local_blacklist, ) @@ -50,7 +50,7 @@ def setup_remote_waf(self, remote_uri): url=VULUNSERVER_ADDR, form=get_form(action=remote_uri, inputs=["name"], method="GET"), target_field="name", - requester=Requester(interval=0.01), + requester=Requester(interval=SLEEP_INTERVAL), ) def setUp(self): @@ -74,7 +74,7 @@ def test_waf(self): self.assertNotIn(w, payload) resp = self.subm.submit(payload) assert resp is not None - self.assertIn("cracked! @m wr!tI1111ng s()th", resp.text) + self.assertIn("cracked! @m wr!tI1111ng s()th", resp.text, resp.text) class TestEasy(TestBase): @@ -270,7 +270,7 @@ def setUp(self): self.local_blacklist = None self.subm = PathSubmitter( url=VULUNSERVER_ADDR + "/crackpath/", - requester=Requester(interval=0.01), + requester=Requester(interval=SLEEP_INTERVAL), ) @@ -466,3 +466,8 @@ def test_waf(self): resp = subm.submit(payload) assert resp is not None self.assertIn("fenjingtest", resp.text) + +class TestReplacedWAF(TestBase): + def setUp(self): + super().setUp() + self.setup_remote_waf("/replace_waf") diff --git a/tests/vulunserver.py b/tests/vulunserver.py index c6d8add..6af8856 100644 --- a/tests/vulunserver.py +++ b/tests/vulunserver.py @@ -259,6 +259,17 @@ def lengthlimit2_waf(): template = f"Hello, {name}" return render_template_string(template) +@app.route("/replace_waf", methods=["GET", "POST"]) +def replace_waf(): + name = request.args.get("name", "world") + words = waf_words(name) + for word in words: + if len(word) >= 3: + name = name.replace(word, "") + print(name) + template = f"Hello, {name}" + return render_template_string(template) + if __name__ == "__main__": app.run(host="0.0.0.0", port=5000)