Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Formatting and numerous Python source code optimizations #848

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions bbot/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ def __init__(self, config):

def setup(self):
if not self.url:
log.error(f"Must specify agent_url")
log.error("Must specify agent_url")
return False
if not self.token:
log.error(f"Must specify agent_token")
log.error("Must specify agent_token")
return False
return True

async def ws(self, rebuild=False):
if self._ws is None or rebuild:
kwargs = {"close_timeout": 0.5}
if self.token:
kwargs.update({"extra_headers": {"Authorization": f"Bearer {self.token}"}})
kwargs["extra_headers"] = {"Authorization": f"Bearer {self.token}"}
verbs = ("Building", "Built")
if rebuild:
verbs = ("Rebuilding", "Rebuilt")
Expand Down Expand Up @@ -74,10 +74,9 @@ async def start(self):
message = json.loads(message)
message = messages.Message(**message)

if message.command == "ping":
if self.scan is None:
await self.send({"conversation": str(message.conversation), "message_type": "pong"})
continue
if message.command == "ping" and self.scan is None:
await self.send({"conversation": str(message.conversation), "message_type": "pong"})
continue

command_type = getattr(messages, message.command, None)
if command_type is None:
Expand Down Expand Up @@ -116,7 +115,15 @@ async def send(self, message):
await asyncio.sleep(1)
# rebuild = True

async def start_scan(self, scan_id, name=None, targets=[], modules=[], output_modules=[], config={}):
async def start_scan(self, scan_id, name=None, targets=None, modules=None, output_modules=None, config=None):
if targets is None:
targets = []
if modules is None:
modules = []
if output_modules is None:
output_modules = []
if config is None:
config = {}
async with self._scan_lock:
if self.scan is None:
log.success(
Expand All @@ -139,7 +146,7 @@ async def start_scan(self, scan_id, name=None, targets=[], modules=[], output_mo
)
self.task = asyncio.create_task(self._start_scan_task(scan))

return {"success": f"Started scan", "scan_id": scan.id}
return {"success": "Started scan", "scan_id": scan.id}
else:
msg = f"Scan {self.scan.id} already in progress"
log.warning(msg)
Expand Down
5 changes: 1 addition & 4 deletions bbot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@
log = logging.getLogger("bbot.cli")
sys.stdout.reconfigure(line_buffering=True)


log_level = get_log_level()


from . import config


err = False
scan_name = ""

Expand Down Expand Up @@ -201,7 +198,7 @@ async def _main():
modules = set(scanner._scan_modules)
for m in scanner._scan_modules:
flags = module_loader._preloaded.get(m, {}).get("flags", [])
if not all(f in flags for f in options.require_flags):
if any(f not in flags for f in options.require_flags):
log.verbose(
f"Removing {m} because it does not have the required flags: {'+'.join(options.require_flags)}"
)
Expand Down
36 changes: 24 additions & 12 deletions bbot/core/configurator/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def parse_args(self, *args, **kwargs):
match_and_exit(m, output_module_choices, msg="output module")
for f in set(ret.flags + ret.require_flags):
if f not in flag_choices and not self._dummy:
if f not in flag_choices and not self._dummy:
match_and_exit(f, flag_choices, msg="flag")
match_and_exit(f, flag_choices, msg="flag")
return ret


Expand Down Expand Up @@ -104,13 +103,11 @@ def error(self, message):
),
]


epilog = "EXAMPLES\n"
for example in (scan_examples, usage_examples):
for title, description, command in example:
epilog += f"\n {title}:\n {command}\n"


parser = BBOTArgumentParser(
description="Bighuge BLS OSINT Tool", formatter_class=argparse.RawTextHelpFormatter, epilog=epilog
)
Expand Down Expand Up @@ -143,9 +140,19 @@ def error(self, message):
help=f'Modules to enable. Choices: {",".join(module_choices)}',
metavar="MODULE",
)
modules.add_argument("-l", "--list-modules", action="store_true", help=f"List available modules.")
modules.add_argument(
"-em", "--exclude-modules", nargs="+", default=[], help=f"Exclude these modules.", metavar="MODULE"
"-l",
"--list-modules",
action="store_true",
help="List available modules.",
)
modules.add_argument(
"-em",
"--exclude-modules",
nargs="+",
default=[],
help="Exclude these modules.",
metavar="MODULE",
)
modules.add_argument(
"-f",
Expand All @@ -155,21 +162,26 @@ def error(self, message):
help=f'Enable modules by flag. Choices: {",".join(sorted(flag_choices))}',
metavar="FLAG",
)
modules.add_argument("-lf", "--list-flags", action="store_true", help=f"List available flags.")
modules.add_argument(
"-lf",
"--list-flags",
action="store_true",
help="List available flags.",
)
modules.add_argument(
"-rf",
"--require-flags",
nargs="+",
default=[],
help=f"Only enable modules with these flags (e.g. -rf passive)",
help="Only enable modules with these flags (e.g. -rf passive)",
metavar="FLAG",
)
modules.add_argument(
"-ef",
"--exclude-flags",
nargs="+",
default=[],
help=f"Disable modules with these flags. (e.g. -ef aggressive)",
help="Disable modules with these flags. (e.g. -ef aggressive)",
metavar="FLAG",
)
modules.add_argument(
Expand Down Expand Up @@ -200,7 +212,9 @@ def error(self, message):
scan.add_argument("-s", "--silent", action="store_true", help="Be quiet")
scan.add_argument("--force", action="store_true", help="Run scan even if module setups fail")
scan.add_argument("-y", "--yes", action="store_true", help="Skip scan confirmation prompt")
scan.add_argument("--dry-run", action="store_true", help=f"Abort before executing scan")
scan.add_argument(
"--dry-run", action="store_true", help="Abort before executing scan"
)
scan.add_argument(
"--current-config",
action="store_true",
Expand All @@ -222,12 +236,10 @@ def error(self, message):
misc = p.add_argument_group(title="Misc")
misc.add_argument("--version", action="store_true", help="show BBOT version and exit")


cli_options = None
with suppress(Exception):
cli_options = dummy_parser.parse_args()


cli_config = []


Expand Down
54 changes: 29 additions & 25 deletions bbot/core/configurator/environ.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from ...modules import module_loader
from ..helpers.misc import cpu_architecture, os_platform, os_platform_friendly


# keep track of whether BBOT is being executed via the CLI
cli_execution = False

Expand Down Expand Up @@ -48,9 +47,9 @@ def add_to_path(v, k="PATH"):
var_list = os.environ.get(k, "").split(":")
deduped_var_list = []
for _ in var_list:
if not _ in deduped_var_list:
if _ not in deduped_var_list:
deduped_var_list.append(_)
if not v in deduped_var_list:
if v not in deduped_var_list:
deduped_var_list = [v] + deduped_var_list
new_var_str = ":".join(deduped_var_list)
os.environ[k] = new_var_str
Expand All @@ -61,7 +60,7 @@ def prepare_environment(bbot_config):
Sync config to OS environment variables
"""
# ensure bbot_home
if not "home" in bbot_config:
if "home" not in bbot_config:
bbot_config["home"] = "~/.bbot"
home = Path(bbot_config["home"]).expanduser().resolve()
bbot_config["home"] = str(home)
Expand All @@ -78,7 +77,7 @@ def prepare_environment(bbot_config):
# ensure bbot_tools
bbot_tools = home / "tools"
os.environ["BBOT_TOOLS"] = str(bbot_tools)
if not str(bbot_tools) in os.environ.get("PATH", "").split(":"):
if str(bbot_tools) not in os.environ.get("PATH", "").split(":"):
os.environ["PATH"] = f'{bbot_tools}:{os.environ.get("PATH", "").strip(":")}'
# ensure bbot_cache
bbot_cache = home / "cache"
Expand All @@ -99,15 +98,7 @@ def prepare_environment(bbot_config):

# exchange certain options between CLI args and config
if cli_execution and args.cli_options is not None:
# deps
bbot_config["retry_deps"] = args.cli_options.retry_deps
bbot_config["force_deps"] = args.cli_options.force_deps
bbot_config["no_deps"] = args.cli_options.no_deps
bbot_config["ignore_failed_deps"] = args.cli_options.ignore_failed_deps
# debug
bbot_config["debug"] = args.cli_options.debug
bbot_config["silent"] = args.cli_options.silent

configure_cli_options(bbot_config)
import logging

log = logging.getLogger()
Expand All @@ -122,11 +113,9 @@ def prepare_environment(bbot_config):

# copy config to environment
bbot_environ = flatten_config(bbot_config)
os.environ.update(bbot_environ)
os.environ |= bbot_environ

# handle HTTP proxy
http_proxy = bbot_config.get("http_proxy", "")
if http_proxy:
if http_proxy := bbot_config.get("http_proxy", ""):
os.environ["HTTP_PROXY"] = http_proxy
os.environ["HTTPS_PROXY"] = http_proxy
else:
Expand All @@ -142,12 +131,27 @@ def prepare_environment(bbot_config):
urllib3.disable_warnings()
ssl_verify = bbot_config.get("ssl_verify", False)
if not ssl_verify:
import requests
import functools
disable_ssl_verification()
return bbot_config

requests.adapters.BaseAdapter.send = functools.partialmethod(requests.adapters.BaseAdapter.send, verify=False)
requests.adapters.HTTPAdapter.send = functools.partialmethod(requests.adapters.HTTPAdapter.send, verify=False)
requests.Session.request = functools.partialmethod(requests.Session.request, verify=False)
requests.request = functools.partial(requests.request, verify=False)

return bbot_config
def configure_cli_options(bbot_config):
# deps
bbot_config["retry_deps"] = args.cli_options.retry_deps
bbot_config["force_deps"] = args.cli_options.force_deps
bbot_config["no_deps"] = args.cli_options.no_deps
bbot_config["ignore_failed_deps"] = args.cli_options.ignore_failed_deps
# debug
bbot_config["debug"] = args.cli_options.debug
bbot_config["silent"] = args.cli_options.silent


# TODO Rename this here and in `prepare_environment`
def disable_ssl_verification():
import requests
import functools

requests.adapters.BaseAdapter.send = functools.partialmethod(requests.adapters.BaseAdapter.send, verify=False)
requests.adapters.HTTPAdapter.send = functools.partialmethod(requests.adapters.HTTPAdapter.send, verify=False)
requests.Session.request = functools.partialmethod(requests.Session.request, verify=False)
requests.request = functools.partial(requests.request, verify=False)
10 changes: 6 additions & 4 deletions bbot/core/configurator/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@


def _get_config(filename, name="config"):
notify = False
if sys.argv and sys.argv[0].endswith("bbot") and not any(x in sys.argv for x in ("-s", "--silent")):
notify = True
notify = bool(
sys.argv
and sys.argv[0].endswith("bbot")
and all(x not in sys.argv for x in ("-s", "--silent"))
)
filename = Path(filename).resolve()
try:
conf = OmegaConf.load(str(filename))
Expand All @@ -26,7 +28,7 @@ def _get_config(filename, name="config"):
return conf
except Exception as e:
if filename.exists():
raise ConfigLoadError(f"Error parsing config at {filename}:\n\n{e}")
raise ConfigLoadError(f"Error parsing config at {filename}:\n\n{e}") from e
return OmegaConf.create()


Expand Down
Loading
Loading