Skip to content

Commit

Permalink
subhastext
Browse files Browse the repository at this point in the history
  • Loading branch information
babenek committed Aug 5, 2024
1 parent d5ca6b5 commit 9398354
Show file tree
Hide file tree
Showing 13 changed files with 642 additions and 580 deletions.
7 changes: 7 additions & 0 deletions credsweeper/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ def get_arguments() -> Namespace:
const="output.xlsx",
dest="xlsx_filename",
metavar="PATH")
parser.add_argument("--subtext", help="only part of text will be outputted", action="store_const", const=True)
parser.add_argument("--hashed",
help="line, variable, value will be hashed in output",
action="store_const",
const=True)
parser.add_argument("--sort", help="enable output sorting", dest="sort_output", action="store_true")
parser.add_argument("--log",
"-l",
Expand Down Expand Up @@ -282,6 +287,8 @@ def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Opt
api_validation=args.api_validation,
json_filename=json_filename,
xlsx_filename=xlsx_filename,
subtext=args.subtext,
hashed=args.hashed,
sort_output=args.sort_output,
use_filters=args.no_filters,
pool_count=args.jobs,
Expand Down
13 changes: 10 additions & 3 deletions credsweeper/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def __init__(self,
api_validation: bool = False,
json_filename: Union[None, str, Path] = None,
xlsx_filename: Union[None, str, Path] = None,
subtext: bool = False,
hashed: bool = False,
sort_output: bool = False,
use_filters: bool = True,
pool_count: int = 1,
Expand Down Expand Up @@ -70,6 +72,8 @@ def __init__(self,
to json
xlsx_filename: optional string variable, path to save result
to xlsx
subtext: use subtext of line near value like it performed in ML
hashed: use hash of line, value and variable instead plain text
use_filters: boolean variable, specifying the need of rule filters
pool_count: int value, number of parallel processes to use
ml_batch_size: int value, size of the batch for model inference
Expand Down Expand Up @@ -104,6 +108,8 @@ def __init__(self,
self.credential_manager = CredentialManager()
self.json_filename: Union[None, str, Path] = json_filename
self.xlsx_filename: Union[None, str, Path] = xlsx_filename
self.subtext = subtext
self.hashed = hashed
self.sort_output = sort_output
self.ml_batch_size = ml_batch_size if ml_batch_size and 0 < ml_batch_size else 16
self.ml_threshold = ml_threshold
Expand Down Expand Up @@ -400,16 +406,17 @@ def export_results(self) -> None:

if self.json_filename:
is_exported = True
Util.json_dump([credential.to_json() for credential in credentials], file_path=self.json_filename)
Util.json_dump([credential.to_json(subtext=self.subtext, hashed=self.hashed) for credential in credentials],
file_path=self.json_filename)

if self.xlsx_filename:
is_exported = True
data_list = []
for credential in credentials:
data_list.extend(credential.to_dict_list())
data_list.extend(credential.to_dict_list(subtext=self.subtext, hashed=self.hashed))
df = pd.DataFrame(data=data_list)
df.to_excel(self.xlsx_filename, index=False)

if is_exported is False:
for credential in credentials:
print(credential)
print(credential.to_str(subtext=self.subtext, hashed=self.hashed))
18 changes: 11 additions & 7 deletions credsweeper/credentials/candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,22 @@ def is_api_validation_available(self) -> bool:
"""
return len(self.validations) > 0

def __str__(self) -> str:
def to_str(self, subtext: bool = False, hashed: bool = False) -> str:
"""Represent candidate with subtext or|and hashed values"""
return f"rule: {self.rule_name}" \
f" | severity: {self.severity.value}" \
f" | confidence: {self.confidence.value}" \
f" | line_data_list: {self.line_data_list}" \
f" | line_data_list: [{', '.join([x.to_str(subtext, hashed) for x in self.line_data_list])}]" \
f" | api_validation: {self.api_validation.name}" \
f" | ml_validation: {self.ml_validation.name}"

def __str__(self):
return self.to_str()

def __repr__(self):
return str(self)
return self.to_str(subtext=True)

def to_json(self) -> Dict:
def to_json(self, subtext: bool, hashed: bool) -> Dict:
"""Convert credential candidate object to dictionary.
Return:
Expand All @@ -116,23 +120,23 @@ def to_json(self) -> Dict:
"confidence": self.confidence.value,
"use_ml": self.use_ml,
# put the array to end to make json more readable
"line_data_list": [line_data.to_json() for line_data in self.line_data_list],
"line_data_list": [line_data.to_json(subtext, hashed) for line_data in self.line_data_list],
}
if self.config is not None:
reported_output = {k: v for k, v in full_output.items() if k in self.config.candidate_output}
else:
reported_output = full_output
return reported_output

def to_dict_list(self) -> List[dict]:
def to_dict_list(self, subtext: bool, hashed: bool) -> List[dict]:
"""Convert credential candidate object to List[dict].
Return:
List[dict] object generated from current credential candidate
"""
reported_output = []
json_output = self.to_json()
json_output = self.to_json(subtext, hashed)
refined_data = copy.deepcopy(json_output)
del refined_data["line_data_list"]
for line_data in json_output["line_data_list"]:
Expand Down
67 changes: 42 additions & 25 deletions credsweeper/credentials/line_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import contextlib
import hashlib
import re
import string
from functools import cached_property
from typing import Any, Dict, Optional, Tuple

from credsweeper.common.constants import MAX_LINE_LENGTH
from credsweeper.common.constants import MAX_LINE_LENGTH, UTF_8, ML_HUNK
from credsweeper.config import Config
from credsweeper.utils import Util
from credsweeper.utils.entropy_validator import EntropyValidator
Expand Down Expand Up @@ -136,8 +137,14 @@ def sanitize_value(self):
self.value_start += start
self.value_end = self.value_start + len(self.value)

def check_url_part(self) -> bool:
"""Determines whether value is part of url like line"""
def clean_url_parameters(self) -> None:
"""Clean url address from 'query parameters'.
If line seem to be a URL - split by & character.
Variable should be right most value after & or ? ([-1]). And value should be left most before & ([0])
"""
# line length cannot exceed MAX_LINE_LENGTH
assert MAX_LINE_LENGTH >= len(self.line)
line_before_value = self.line[:self.value_start]
url_pos = -1
find_pos = 0
Expand All @@ -155,23 +162,17 @@ def check_url_part(self) -> bool:
self.url_part &= not self.url_chars_not_allowed_pattern.search(line_before_value, pos=url_pos + 3)
self.url_part |= self.line[self.variable_start - 1] in "?&" if 0 < self.variable_start else False
self.url_part |= bool(self.url_value_pattern.match(self.value))
return self.url_part

def clean_url_parameters(self) -> None:
"""Clean url address from 'query parameters'.
if not self.url_part:
return

If line seem to be a URL - split by & character.
Variable should be right most value after & or ? ([-1]). And value should be left most before & ([0])
"""
if self.check_url_part():
# all checks have passed - line before the value may be a URL
self.variable = self.variable.rsplit('&')[-1].rsplit('?')[-1].rsplit(';')[-1]
self.value = self.value.split('&', maxsplit=1)[0].split(';', maxsplit=1)[0].split('#', maxsplit=1)[0]
if not self.variable.endswith("://"):
# skip sanitize in case of URL credential rule
value_spl = self.url_param_split.split(self.value)
if len(value_spl) > 1:
self.value = value_spl[0]
# all checks have passed - line before the value may be a URL
self.variable = self.variable.rsplit('&')[-1].rsplit('?')[-1].rsplit(';')[-1]
self.value = self.value.split('&', maxsplit=1)[0].split(';', maxsplit=1)[0].split('#', maxsplit=1)[0]
if not self.variable.endswith("://"):
# skip sanitize in case of URL credential rule
value_spl = self.url_param_split.split(self.value)
if len(value_spl) > 1:
self.value = value_spl[0]

def clean_bash_parameters(self) -> None:
"""Split variable and value by bash special characters, if line assumed to be CLI command."""
Expand Down Expand Up @@ -287,14 +288,29 @@ def is_source_file_with_quotes(self) -> bool:
return True
return False

@staticmethod
def get_subtext_or_hash(text: Optional[str], pos: int, subtext: bool, hashed: bool) -> Optional[str]:
"""Represent a text with subtext or|and hash if required"""
text = Util.subtext(text, pos, ML_HUNK) if subtext and text is not None else text
if hashed:
# text = hashlib.sha256(text.encode(UTF_8, errors="replace")).hexdigest() if text is not None else None
text = hashlib.sha256(text.encode(UTF_8, errors="strict")).hexdigest() if text is not None else None
return text

def to_str(self, subtext: bool = False, hashed: bool = False) -> str:
"""Represent line_data with subtext or|and hashed values"""
return f"line: '{self.get_subtext_or_hash(self.line, self.value_start, subtext, hashed)}'" \
f" | line_num: {self.line_num} | path: {self.path}" \
f" | value: '{self.get_subtext_or_hash(self.value, 0, subtext, hashed)}'" \
f" | entropy_validation: {EntropyValidator(self.value)}"

def __str__(self):
return f"line: '{self.line}' | line_num: {self.line_num} | path: {self.path}" \
f" | value: '{self.value}' | entropy_validation: {EntropyValidator(self.value)}"
return self.to_str()

def __repr__(self):
return str(self)
return self.to_str(subtext=True)

def to_json(self) -> Dict:
def to_json(self, subtext: bool, hashed: bool) -> Dict:
"""Convert line data object to dictionary.
Return:
Expand All @@ -306,12 +322,13 @@ def to_json(self) -> Dict:
"line": self.line,
"line_num": self.line_num,
"path": self.path,
"info": self.info,
# info may contain variable name - so let it be hashed if requested
"info": hashlib.sha256(self.info.encode(UTF_8)).hexdigest() if hashed and self.info else self.info,
"pattern": self.pattern.pattern,
"separator": self.separator,
"separator_start": self.separator_start,
"separator_end": self.separator_end,
"value": self.value,
"value": self.get_subtext_or_hash(self.value, 0, subtext, hashed),
"value_start": self.value_start,
"value_end": self.value_end,
"variable": self.variable,
Expand Down
10 changes: 9 additions & 1 deletion credsweeper/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import math
import os
import string
import struct
import tarfile
from dataclasses import dataclass
Expand Down Expand Up @@ -685,6 +686,13 @@ def subtext(text: str, pos: int, hunk_size: int) -> str:
else:
left_quota = hunk_size - pos
left_pos = 0
# skip leading whitespaces in result string
for i in range(left_pos, pos):
if text[i] in string.whitespace:
left_quota += 1
left_pos += 1
else:
break
right_remain = len(text) - pos
if hunk_size <= right_remain:
right_quota = 0
Expand All @@ -698,4 +706,4 @@ def subtext(text: str, pos: int, hunk_size: int) -> str:
left_pos -= right_quota
if 0 > left_pos:
left_pos = 0
return text[left_pos:right_pos]
return text[left_pos:right_pos].rstrip()
4 changes: 3 additions & 1 deletion docs/source/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Get all argument list:
usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH]) [--rules [PATH]] [--severity SEVERITY] [--config [PATH]]
[--log_config [PATH]] [--denylist PATH] [--find-by-ext] [--depth POSITIVE_INT] [--no-filters] [--doc] [--ml_threshold FLOAT_OR_STR] [--ml_batch_size POSITIVE_INT]
[--azure | --cuda] [--api_validation] [--jobs POSITIVE_INT] [--skip_ignored] [--save-json [PATH]] [--save-xlsx [PATH]] [--sort] [--log LOG_LEVEL] [--size_limit SIZE_LIMIT]
[--azure | --cuda] [--api_validation] [--jobs POSITIVE_INT] [--skip_ignored] [--save-json [PATH]] [--save-xlsx [PATH]] [--subtext] [--hashed] [--sort] [--log LOG_LEVEL] [--size_limit SIZE_LIMIT]
[--banner] [--version]
options:
-h, --help show this help message and exit
Expand Down Expand Up @@ -49,6 +49,8 @@ Get all argument list:
--skip_ignored parse .gitignore files and skip credentials from ignored objects
--save-json [PATH] save result to json file (default: output.json)
--save-xlsx [PATH] save result to xlsx file (default: output.xlsx)
--subtext only part of text will be outputted
--hashed line, variable, value will be hashed in output
--sort enable output sorting
--log LOG_LEVEL, -l LOG_LEVEL
provide logging level of ['DEBUG', 'INFO', 'WARN', 'WARNING', 'ERROR', 'FATAL', 'CRITICAL', 'SILENCE'](default: 'warning', case insensitive)
Expand Down
2 changes: 1 addition & 1 deletion fuzz/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def fuzz_credsweeper_scan(data: bytes):
elif validation.__class__.__name__ in [GoogleMultiValidation.__name__]:
for i in range(3):
mock_flow(i, candidate)
candidate.to_dict_list()
candidate.to_dict_list(False, False)


def main():
Expand Down
5 changes: 4 additions & 1 deletion tests/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, Any, List

from tests import SAMPLES_POST_CRED_COUNT, SAMPLES_IN_DEEP_3, SAMPLES_CRED_COUNT, SAMPLES_IN_DOC, NEGLIGIBLE_ML_THRESHOLD
from tests import SAMPLES_POST_CRED_COUNT, SAMPLES_IN_DEEP_3, SAMPLES_CRED_COUNT, SAMPLES_IN_DOC, \
NEGLIGIBLE_ML_THRESHOLD

DATA_TEST_CFG: List[Dict[str, Any]] = [{
"__cred_count": SAMPLES_POST_CRED_COUNT,
Expand All @@ -9,11 +10,13 @@
}, {
"__cred_count": SAMPLES_CRED_COUNT,
"sort_output": True,
"hashed": True,
"json_filename": "ml_threshold.json",
"ml_threshold": NEGLIGIBLE_ML_THRESHOLD
}, {
"__cred_count": SAMPLES_IN_DOC,
"sort_output": True,
"subtext": True,
"json_filename": "doc.json",
"doc": True
}, {
Expand Down
Loading

0 comments on commit 9398354

Please sign in to comment.