Skip to content

Commit

Permalink
First try to detect replacement.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marven11 committed Sep 15, 2023
1 parent 03aad5e commit 67711d8
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 13 deletions.
8 changes: 7 additions & 1 deletion fenjing/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,36 @@

DANGEROUS_KEYWORDS = [
'"',
"%",
"'",
"+",
".",
"0",
'0"',
"1",
"2",
"=",
"[",
"_",
"%",
"attr",
"base",
"builtins",
"chr",
"class",
"config",
"eval",
"exec",
"global",
"import",
"include",
"init",
"lipsum",
"mro",
"namespace",
"open",
"pop",
"popen",
"posix",
"read",
"request",
"self",
Expand Down
198 changes: 192 additions & 6 deletions fenjing/waf_func_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import logging
import random
import string
import traceback

from copy import copy
from collections import Counter, namedtuple
from functools import lru_cache
from typing import Dict, Callable, Union, List
from typing import Dict, Callable, Tuple, Union, List

from .const import DETECT_MODE_ACCURATE, DETECT_MODE_FAST, DANGEROUS_KEYWORDS
from .colorize import colored
Expand All @@ -28,7 +29,7 @@
]


def grouped_payloads(size=3) -> List[str]:
def grouped_payloads(size=3, sep="") -> List[str]:
"""将所有payload按照size个一组拼接在一起
即:['a', 'b', 'c', 'd'] -> ['ab', 'cd']
Expand All @@ -39,11 +40,114 @@ def grouped_payloads(size=3) -> List[str]:
List[str]: 拼接结果
"""
return [
"".join(dangerous_keywords[i : i + size]) # flake8: noqa
sep.join(dangerous_keywords[i : i + size]) # flake8: noqa
for i in range(0, len(dangerous_keywords), size)
]


def get_next_p(b: str) -> List[int]:
"""KMP算法中,获取字符串B的next数组的算法过程
Args:
b (str): 字符串B,KMP匹配的目标
Returns:
List[int]: next数组,定义与KMP相同
"""
answer = []
for i, c in enumerate(b):
if i == 0:
answer.append(-1)
continue
p = answer[i - 1]
while p >= 0 and b[p + 1] != c:
assert answer[p] < p
p = answer[p]

if c == b[p + 1]:
answer.append(p + 1)
else:
answer.append(p)
return answer


def kmp(a: str, b: str) -> Tuple[int, Union[int, None]]:
"""KMP算法,在A中寻找B的最长匹配子串
Args:
a (str): KMP中的字符串A
b (str): KMP中的字符串B
Returns:
Tuple[int, Union[int, None]]: 最长匹配子串的结尾长度和位置
"""
logger.debug("kmp(len(a)=%d, len(b)=%d)", len(a), len(b))
if b == "":
return 0, None
next_p = get_next_p(b)
max_answer, max_answer_pos = 0, None
j = -1
for i, c in enumerate(a):
while j >= 0 and b[j + 1] != c:
assert next_p[j] < j
j = next_p[j]
if c == b[j + 1]:
j += 1
if j + 1 > max_answer:
max_answer = j + 1
max_answer_pos = i

if j == len(b) - 1:
j = -1
logger.debug(f"{i}, {c}, {j}")
return max_answer, max_answer_pos


def find_pieces(resp_text, payload):
"""从HTTP响应的正文和对应的payload中分析出可能被替换的关键字
Args:
resp_text (str): HTTP响应正文
payload (str): payload
Returns:
List[str]: 可能被替换的关键字
"""
assert len(resp_text) < 1e5 and len(payload) < 1e5 # perf limit
logger.debug("find_pieces(%s, %s)", resp_text[:20], payload[:20])
max_answer, max_answer_pos = kmp(resp_text, payload)
logger.debug(f"{max_answer}, {max_answer_pos}")
if max_answer <= 2 or max_answer_pos is None:
logger.debug("max answer too low")
return []

resp_text_matched = resp_text[max_answer_pos - max_answer + 1 : max_answer_pos + 1]
resp_text_unmatched, payload_unmatched = (
resp_text[max_answer_pos + 1 :],
payload[len(resp_text_matched) :],
)
if payload_unmatched == "":
logger.debug("read payload done")
return []

max_answer_unmatched, max_answer_pos_unmatched = kmp(
payload_unmatched, resp_text_unmatched
)

if max_answer_pos_unmatched is None:
return []

payload_unmatched_before = payload_unmatched[
: max_answer_pos_unmatched - max_answer_unmatched + 1
]
resp_text_next = resp_text_unmatched.removeprefix(payload_unmatched_before)
payload_next = payload_unmatched.removeprefix(payload_unmatched_before)
assert len(resp_text_next) < len(resp_text) and len(payload_next) < len(payload)
return [
payload_unmatched_before,
] + find_pieces(resp_text_next, payload_next)


class WafFuncGen:
"""
根据指定的Submitter(表单submitter或者路径submitter)生成对应的WAF函数
Expand Down Expand Up @@ -94,26 +198,102 @@ def waf_page_hash(self):

return [k for k, v in Counter(hashes).items() if v >= 2]

def replaced_keyword(self) -> List[str]:
extra = "".join(random.choices(string.ascii_lowercase, k=6))
test_payloads = (
dangerous_keywords
if self.detect_mode == DETECT_MODE_ACCURATE
else grouped_payloads(4, sep=extra)
)
keywords = []
for payload_raw in test_payloads:
payload = extra + payload_raw + extra
logger.info(
"Testing dangerous keyword %s",
colored("yellow", repr(payload)),
)
result = self.subm.submit(payload)
if result is None:
logger.info(
"Submit %s for %s",
colored("yellow", "failed"),
colored("yellow", repr(payload)),
)
continue

status_code, text = result
if status_code == 500:
continue
if len(text) > 5e4:
continue
try:
payload_replaced_keyword = find_pieces(text, payload)
except Exception:
traceback.print_exc()
continue
if payload_replaced_keyword:
payload_replaced_keyword = list(set(payload_replaced_keyword))
if len(payload_replaced_keyword) > 10:
logger.info(
"Replaced keywords found, ignore because it's too long (length=%d)",
len(payload_replaced_keyword),
)
else:
keywords += payload_replaced_keyword
keywords = list(set(keywords))
logger.info(
"These keywords might get %s: %s",
colored("yellow", "replaced", bold=True),
colored("yellow", repr(keywords)),
)
return keywords

def doubletapping(self, payload: str, keywords: list[str]):
if not keywords:
return payload
logger.info(
"Perform %s for payload: %s",
colored("blue", "doubletapping"),
colored("yellow", payload),
)
exist_keywords = [w for w in keywords if w in payload]
replacement = {
w: w[: len(w) // 2] + w + w[len(w) // 2 :]
for w in exist_keywords
if len(w) >= 2
}
for k, v in sorted(replacement.items(), key=lambda item: len(item[0])):
payload = payload.replace(k, v)
return payload

def generate(self) -> Callable:
"""生成WAF函数
Returns:
Callable: WAF函数
"""
replaced_keyword = self.replaced_keyword()
waf_hashes = self.waf_page_hash()
# self.subm.add_tamperer(
# lambda s: self.doubletapping(s, replaced_keyword)
# )

# 随着检测payload一起提交的附加内容
# content: 内容本身,passed: 内容是否确认可以通过waf
extra_content, extra_passed = (
"".join(random.choices(string.ascii_lowercase, k=6)),
"".join(random.choices(string.ascii_lowercase, k=4)),
False,
)

# WAF函数,只有在payload一定可以通过WAF时才返回True
@lru_cache(1000)
def waf_func(value):
nonlocal extra_content, extra_passed
nonlocal extra_content, extra_passed, replaced_keyword
payload = extra_content + value
for _ in range(5):
result = self.subm.submit(extra_content + value)
if any(w in payload for w in replaced_keyword):
return False
result = self.subm.submit(payload)
if result is None:
return False
# status_code, text = result
Expand All @@ -126,6 +306,12 @@ def waf_func(value):
logger.debug("payload产生回显")
return True

# 产生关键词替换
replaced_list = find_pieces(result.text, payload)
if replaced_list:
logger.info("发现了新的关键词替换:%s", colored("yellow", repr(replaced_list)))
replaced_keyword += replaced_list
return False
# 去除下方的规则,因为如果我们没有fuzz出所有的waf页面,而此时extra_content
# 不在waf页面中的话,我们应该更加保守地认为payload应该是被waf拦住了

Expand Down
2 changes: 1 addition & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from fenjing import cli, waf_func_gen

SLEEP_INTERVAL = 0.01
SLEEP_INTERVAL = 0.02
VULUNSERVER_ADDR = os.environ["VULUNSERVER_ADDR"]
waf_func_gen.logger.setLevel(logging.ERROR)

Expand Down
15 changes: 10 additions & 5 deletions tests/test_cracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

VULUNSERVER_ADDR = os.environ["VULUNSERVER_ADDR"]
waf_func_gen.logger.setLevel(logging.ERROR)

SLEEP_INTERVAL = 0.05

class WrappedSubmitter(Submitter):
def __init__(self, subm, blacklist):
Expand All @@ -39,7 +39,7 @@ def setup_local_waf(self, blacklist):
url=VULUNSERVER_ADDR,
form=get_form(action="/", inputs=["name"], method="GET"),
target_field="name",
requester=Requester(interval=0.01),
requester=Requester(interval=SLEEP_INTERVAL),
),
self.local_blacklist,
)
Expand All @@ -50,7 +50,7 @@ def setup_remote_waf(self, remote_uri):
url=VULUNSERVER_ADDR,
form=get_form(action=remote_uri, inputs=["name"], method="GET"),
target_field="name",
requester=Requester(interval=0.01),
requester=Requester(interval=SLEEP_INTERVAL),
)

def setUp(self):
Expand All @@ -74,7 +74,7 @@ def test_waf(self):
self.assertNotIn(w, payload)
resp = self.subm.submit(payload)
assert resp is not None
self.assertIn("cracked! @m wr!tI1111ng s()th", resp.text)
self.assertIn("cracked! @m wr!tI1111ng s()th", resp.text, resp.text)


class TestEasy(TestBase):
Expand Down Expand Up @@ -270,7 +270,7 @@ def setUp(self):
self.local_blacklist = None
self.subm = PathSubmitter(
url=VULUNSERVER_ADDR + "/crackpath/",
requester=Requester(interval=0.01),
requester=Requester(interval=SLEEP_INTERVAL),
)


Expand Down Expand Up @@ -466,3 +466,8 @@ def test_waf(self):
resp = subm.submit(payload)
assert resp is not None
self.assertIn("fenjingtest", resp.text)

class TestReplacedWAF(TestBase):
def setUp(self):
super().setUp()
self.setup_remote_waf("/replace_waf")
11 changes: 11 additions & 0 deletions tests/vulunserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ def lengthlimit2_waf():
template = f"Hello, {name}"
return render_template_string(template)

@app.route("/replace_waf", methods=["GET", "POST"])
def replace_waf():
name = request.args.get("name", "world")
words = waf_words(name)
for word in words:
if len(word) >= 3:
name = name.replace(word, "")
print(name)
template = f"Hello, {name}"
return render_template_string(template)


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)

0 comments on commit 67711d8

Please sign in to comment.