Skip to content

Commit

Permalink
credshed module tests, optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
TheTechromancer committed Oct 12, 2023
1 parent 3d316ca commit d4568bb
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 123 deletions.
1 change: 1 addition & 0 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,7 @@ class _data_validator(BaseModel):
def _pretty_string(self):
return self.data["WAF"]


def make_event(
data,
event_type=None,
Expand Down
126 changes: 39 additions & 87 deletions bbot/modules/credshed.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from contextlib import suppress

from bbot.modules.base import BaseModule
from bbot.modules.templates.credential_leak import credential_leak


class credshed(BaseModule):
watched_events = ["EMAIL_ADDRESS", "DNS_NAME"]
class credshed(credential_leak):
watched_events = ["DNS_NAME"]
produced_events = ["PASSWORD", "HASHED_PASSWORD", "USERNAME", "EMAIL_ADDRESS"]
flags = ["passive", "safe"]
meta = {
Expand All @@ -22,9 +22,8 @@ async def setup(self):
self.base_url = self.config.get("credshed_url", "").rstrip("/")
self.username = self.config.get("username", "")
self.password = self.config.get("password", "")
self.results = {}

# make sure we have the information required to make queries
# soft-fail if we don't have the necessary information to make queries
if not (self.base_url and self.username and self.password):
return None, "Must set username, password, and credshed_url"

Expand All @@ -40,93 +39,46 @@ async def setup(self):

return await super().setup()

async def filter_event(self, event):
if event.module == self or "subdomain" in event.tags:
return False
return True

async def handle_event(self, event):
query = self.make_query(event)
cs_query = await self.helpers.request(
f"{self.base_url}/api/search",
method="POST",
cookies={"access_token_cookie": self.auth_token},
json={"query": event.data},
json={"query": query},
)

if cs_query and cs_query.json().get("stats").get("total_count") > 0:
accounts = cs_query.json().get("accounts")
for i in accounts:
email = i.get("e")
pw = i.get("p")
h_pw = i.get("h")
user = i.get("u")
src = i.get("s")[0]
if email not in self.results:
self.results[email] = {"source": [src], "passwords": {}, "hashed": {}, "usernames": {}}
else:
if src not in self.results[email]["source"]:
self.results[email]["source"].append(src)

if pw:
if pw not in self.results[email]["passwords"]:
self.results[email]["passwords"][pw] = [src]
else:
self.results[email]["passwords"][pw].append(src)

if h_pw:
for x in h_pw:
if x not in self.results[email]["hashed"]:
self.results[email]["hashed"][x] = [src]
else:
self.results[email]["hashed"][x].append(src)

if user:
if user not in self.results[email]["usernames"]:
self.results[email]["usernames"][user] = [src]
else:
self.results[email]["usernames"][user].append(src)

for x in self.results:
if cs_query.json().get("stats").get("query_type") == "domain":
f = self.make_event(x, "EMAIL_ADDRESS", source=event, tags="credshed")
self.emit_event(f)

if self.results[x]["hashed"]:
for y in self.results[x]["hashed"]:
self.emit_event(
y, "HASHED_PASSWORD", source=f, tags=f'credshed-source-{self.results[x]["hashed"][y]}'
)
if cs_query is not None and cs_query.status_code != 200:
self.warning(f"Error retrieving results from {self.base_url} (status code {cs_query.status_code}): {cs_query.text}")

if self.results[x]["passwords"]:
for y in self.results[x]["passwords"]:
self.emit_event(
y, "PASSWORD", source=f, tags=f'credshed-source-{self.results[x]["passwords"][y]}'
)

if self.results[x]["usernames"]:
for y in self.results[x]["usernames"]:
self.emit_event(
y, "USERNAME", source=f, tags=f'credshed-source-{self.results[x]["usernames"][y]}'
)

if cs_query.json().get("stats").get("query_type") == "email":
if self.results[x]["hashed"]:
for y in self.results[x]["hashed"]:
self.emit_event(
y,
"HASHED_PASSWORD",
source=event,
tags=f'credshed-source-{self.results[x]["hashed"][y]}',
)

if self.results[x]["passwords"]:
for y in self.results[x]["passwords"]:
self.emit_event(
y, "PASSWORD", source=event, tags=f'credshed-source-{self.results[x]["passwords"][y]}'
)

if self.results[x]["usernames"]:
for y in self.results[x]["usernames"]:
self.emit_event(
y, "USERNAME", source=event, tags=f'credshed-source-{self.results[x]["usernames"][y]}'
)
json_result = {}
with suppress(Exception):
json_result = cs_query.json()

if not json_result:
return

accounts = json_result.get("accounts", [])

for i in accounts:
email = i.get("e", "")
pw = i.get("p", "")
hashes = i.get("h", [])
user = i.get("u", "")
src = i.get("s", [])
src = [src[0] if src else ""]

tags = []
if src:
tags = [f"credshed-source-{src}"]

email_event = self.make_event(email, "EMAIL_ADDRESS", source=event, tags=tags)
if email_event is not None:
self.emit_event(email_event)
if user and not self.already_seen(f"{email}:{user}"):
self.emit_event(user, "USERNAME", source=email_event, tags=tags)
if pw and not self.already_seen(f"{email}:{pw}"):
self.emit_event(pw, "PASSWORD", source=email_event, tags=tags)
for h_pw in hashes:
if h_pw and not self.already_seen(f"{email}:{h_pw}"):
self.emit_event(h_pw, "HASHED_PASSWORD", source=email_event, tags=tags)
44 changes: 8 additions & 36 deletions bbot/modules/dehashed.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from contextlib import suppress

from bbot.modules.base import BaseModule
from bbot.modules.templates.credential_leak import credential_leak


class dehashed(BaseModule):
class dehashed(credential_leak):
watched_events = ["DNS_NAME"]
produced_events = ["PASSWORD", "HASHED_PASSWORD", "USERNAME"]
flags = ["passive"]
meta = {"description": "Execute queries against dehashed.com for exposed credentials", "auth_required": True}
options = {"username": "", "api_key": ""}
options_desc = {
"username": "Email Address associated with your API key",
"api_key": "DeHashed API Key"
}
options_desc = {"username": "Email Address associated with your API key", "api_key": "DeHashed API Key"}

base_url = "https://api.dehashed.com/search"

Expand All @@ -23,37 +20,22 @@ async def setup(self):
self.headers = {
"Accept": "application/json",
}
self.queries_processed = set()
self.data_seen = set()

# soft-fail if we don't have the necessary information to make queries
if not (self.username and self.api_key):
return None, "No username / API key set"

return await super().setup()

async def filter_event(self, event):
query = self.make_query(event)
query_hash = hash(query)
if query_hash not in self.queries_processed:
self.queries_processed.add(query_hash)
return True
return False, f'Already processed "{query}"'

async def handle_event(self, event):
already_seen = set()
emails = {}

if event.type == "DNS_NAME":
query = f"domain:{event.data}"
else:
query = f"email:{event.data}"
query = f"domain:{self.make_query(event)}"
url = f"{self.base_url}?query={query}&size=10000&page=" + "{page}"
async for entries in self.query(url):
for entry in entries:

# we have to clean up the email field because dehashed does a poor job of it
email_str = entry.get("email", "").replace('\\', '')
email_str = entry.get("email", "").replace("\\", "")
found_emails = list(self.helpers.extract_emails(email_str))
if not found_emails:
self.debug(f"Invalid email from dehashed.com: {email_str}")
Expand All @@ -79,12 +61,6 @@ async def handle_event(self, event):
if h_pw and not self.already_seen(f"{email}:{h_pw}"):
self.emit_event(h_pw, "HASHED_PASSWORD", source=email_event, tags=tags)

def already_seen(self, item):
h = hash(item)
already_seen = h in self.data_seen
self.data_seen.add(h)
return already_seen

async def query(self, url):
page = 0
num_entries = 0
Expand All @@ -101,17 +77,13 @@ async def query(self, url):
page += 1
if (page >= 3) or (not entries):
if result is not None and result.status_code != 200:
self.warning(f"Error retrieving results from dehashed.com: {result.text}")
self.warning(
f"Error retrieving results from dehashed.com (status code {results.status_code}): {result.text}"
)
elif (page >= 3) and (total > num_entries):
self.info(
f"{event.data} has {total:,} results in Dehashed. The API can only process the first 30,000 results. Please check dehashed.com to get the remaining results."
)
agen.aclose()
break
yield entries

def make_query(self, event):
if "target" in event.tags:
return event.data
_, domain = self.helpers.split_domain(event.data)
return domain
33 changes: 33 additions & 0 deletions bbot/modules/templates/credential_leak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from bbot.modules.base import BaseModule


class credential_leak(BaseModule):
"""
A typical free API-based subdomain enumeration module
Inherited by many other modules including sublist3r, dnsdumpster, etc.
"""

async def setup(self):
self.queries_processed = set()
self.data_seen = set()
return True

async def filter_event(self, event):
query = self.make_query(event)
query_hash = hash(query)
if query_hash not in self.queries_processed:
self.queries_processed.add(query_hash)
return True
return False, f'Already processed "{query}"'

def make_query(self, event):
if "target" in event.tags:
return event.data
_, domain = self.helpers.split_domain(event.data)
return domain

def already_seen(self, item):
h = hash(item)
already_seen = h in self.data_seen
self.data_seen.add(h)
return already_seen
91 changes: 91 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_credshed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from .base import ModuleTestBase


credshed_auth_response = {
"access_token": "big_access_token",
"login": True,
}


credshed_response = {
"accounts": [
{
"e": "[email protected]",
"h": [],
"m": "hello my name is bob",
"p": "",
"s": [121562],
"u": "",
},
{
"e": "[email protected]",
"h": [
"539FE8942DEADBEEFBC49E6EB2F175AC",
"D2D8F0E9A4A2DEADBEEF1AC80F36D61F",
"$2a$12$SHIC49jLIwsobdeadbeefuWb2BKWHUOk2yhpD77A0itiZI1vJqXHm",
],
"m": "hello my name is judy",
"p": "",
"s": [80437],
"u": "",
},
{
"e": "[email protected]",
"h": [],
"m": "hello my name is tim",
"p": "TimTamSlam69",
"s": [80437],
"u": "tim",
},
],
"stats": {
"accounts_searched": 9820758365,
"elapsed": "0.00",
"limit": 1000,
"query": "blacklanternsecurity.com",
"query_type": "domain",
"sources_searched": 129957,
"total_count": 3,
"unique_count": 3,
},
}


class TestCredshed(ModuleTestBase):
config_overrides = {
"modules": {"credshed": {"username": "admin", "password": "password", "credshed_url": "https://credshed.com"}}
}

async def setup_before_prep(self, module_test):
module_test.httpx_mock.add_response(
url=f"https://credshed.com/api/auth",
json=credshed_auth_response,
method="POST",
)
module_test.httpx_mock.add_response(
url=f"https://credshed.com/api/search",
json=credshed_response,
method="POST",
)

def check(self, module_test, events):
assert len(events) == 10
assert 1 == len([e for e in events if e.type == "EMAIL_ADDRESS" and e.data == "[email protected]"])
assert 1 == len([e for e in events if e.type == "EMAIL_ADDRESS" and e.data == "[email protected]"])
assert 1 == len([e for e in events if e.type == "EMAIL_ADDRESS" and e.data == "[email protected]"])
assert 1 == len(
[e for e in events if e.type == "HASHED_PASSWORD" and e.data == "539FE8942DEADBEEFBC49E6EB2F175AC"]
)
assert 1 == len(
[e for e in events if e.type == "HASHED_PASSWORD" and e.data == "D2D8F0E9A4A2DEADBEEF1AC80F36D61F"]
)
assert 1 == len(
[
e
for e in events
if e.type == "HASHED_PASSWORD"
and e.data == "$2a$12$SHIC49jLIwsobdeadbeefuWb2BKWHUOk2yhpD77A0itiZI1vJqXHm"
]
)
assert 1 == len([e for e in events if e.type == "PASSWORD" and e.data == "TimTamSlam69"])
assert 1 == len([e for e in events if e.type == "USERNAME" and e.data == "tim"])

0 comments on commit d4568bb

Please sign in to comment.