Skip to content

Commit

Permalink
Added an internal module to always extract different file types
Browse files Browse the repository at this point in the history
  • Loading branch information
domwhewell-sage committed Nov 4, 2024
1 parent b2c3bb2 commit 57622c2
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 0 deletions.
120 changes: 120 additions & 0 deletions bbot/modules/internal/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import zipfile

from pathlib import Path
from subprocess import CalledProcessError
from bbot.modules.internal.base import BaseInternalModule


class extract(BaseInternalModule):
watched_events = ["FILESYSTEM"]
produced_events = ["FILESYSTEM"]
flags = ["passive"]
meta = {
"description": "Extract different types of files into folders on the filesystem",
"created_date": "2024-11-04",
"author": "@domwhewell-sage",
}
options = {
"threads": 4,
}
options_desc = {
"threads": "Maximum jadx threads for extracting apk's, default: 4",
}
deps_ansible = [
{
"name": "Install latest JRE (Debian)",
"package": {"name": ["default-jre"], "state": "present"},
"become": True,
"when": "ansible_facts['os_family'] == 'Debian'",
},
{
"name": "Install latest JRE (Arch)",
"package": {"name": ["jre-openjdk"], "state": "present"},
"become": True,
"when": "ansible_facts['os_family'] == 'Archlinux'",
},
{
"name": "Install latest JRE (Fedora)",
"package": {"name": ["java-openjdk-headless"], "state": "present"},
"become": True,
"when": "ansible_facts['os_family'] == 'RedHat'",
},
{
"name": "Install latest JRE (Alpine)",
"package": {"name": ["openjdk11"], "state": "present"},
"become": True,
"when": "ansible_facts['os_family'] == 'Alpine'",
},
{
"name": "Download jadx",
"unarchive": {
"src": "https://github.com/skylot/jadx/releases/download/v1.5.0/jadx-1.5.0.zip",
"include": "bin/jadx",
"dest": "#{BBOT_TOOLS}",
"remote_src": True,
},
},
]

zipcompressed = ["doc", "dot", "docm", "docx", "ppt", "pptm", "pptx", "xls", "xlt", "xlsm", "xlsx", "zip"]
jadx = ["xapk", "apk"]
allowed_extensions = zipcompressed + jadx

async def setup(self):
self.threads = self.config.get("threads", 4)
return True

async def filter_event(self, event):
if "file" in event.tags:
if not any(event.data["path"].endswith(f".{ext}") for ext in self.allowed_extensions):
return False, "Extract unable to handle file type"
else:
return False, "Event is not a file"
return True

async def handle_event(self, event):
path = Path(event.data["path"])
extension = path.suffix.strip(".").lower()
output_dir = path.parent / path.name.replace(".", "_")
self.helpers.mkdir(output_dir)

# Use the appropriate extraction method based on the file type
self.info(f"Extracting {path} to {output_dir}")
if extension in self.zipcompressed:
success = self.extract_zip_file(path, output_dir)
elif extension in self.jadx:
success = await self.decompile_apk(path, output_dir)

# If the extraction was successful, emit the event
if success:
await self.emit_event(
{"path": str(output_dir)},
"FILESYSTEM",
tags="folder",
parent=event,
context=f'extracted "{path}" to: {output_dir}',
)
else:
output_dir.rmdir()

def extract_zip_file(self, path, output_dir):
try:
with zipfile.ZipFile(path, "r") as zip_ref:
zip_ref.extractall(output_dir)
except Exception as e:
self.warning(f"Error extracting {path}. Exception: {repr(e)}")
return False
return True

async def decompile_apk(self, path, output_dir):
command = ["jadx", "--threads-count", self.threads, "--output-dir", str(output_dir), str(path)]
try:
output = await self.run_process(command, check=True)
except CalledProcessError as e:
self.warning(f"Error decompiling {path}. STDERR: {repr(e.stderr)}")
return False
if not Path(output_dir / "resources").exists() and not Path(output_dir / "sources").exists():
self.warning(f"JADX was unable to decompile {path}.")
self.warning(output)
return False
return True
89 changes: 89 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import zipfile

from pathlib import Path
from .base import ModuleTestBase, tempapkfile


class TestExtractZip(ModuleTestBase):
targets = ["http://127.0.0.1:8888"]
modules_overrides = ["filedownload", "httpx", "excavate", "speculate", "extract"]

temp_path = Path("/tmp/.bbot_test")
zip_file = temp_path / "test.zip"
with zipfile.ZipFile(zip_file, "w") as z:
z.writestr("test.txt", "This is a test file")

async def setup_after_prep(self, module_test):
module_test.set_expect_requests(
dict(uri="/"),
dict(
response_data='<a href="/test.zip"/>',
),
)
module_test.set_expect_requests(
dict(uri="/test.zip"),
dict(
response_data=self.zip_file.read_bytes(),
headers={"Content-Type": "application/zip"},
),
)

def check(self, module_test, events):
filesystem_events = [e for e in events if e.type == "FILESYSTEM"]

zip_file_event = [e for e in filesystem_events if "test.zip" in e.data["path"]]
assert 1 == len(zip_file_event), "No zip file found"
file = Path(zip_file_event[0].data["path"])
assert file.is_file(), f"File not found at {file}"
extract_event = [e for e in filesystem_events if "test_zip" in e.data["path"] and "folder" in e.tags]
assert 1 == len(extract_event), "Failed to extract zip"
extract_path = Path(extract_event[0].data["path"])
assert extract_path.is_dir(), "Destination folder doesn't exist"


class TestExtractApk(ModuleTestBase):
modules_overrides = ["apkpure", "google_playstore", "speculate", "extract"]
apk_file = tempapkfile()

async def setup_after_prep(self, module_test):
await module_test.mock_dns({"blacklanternsecurity.com": {"A": ["127.0.0.99"]}})
module_test.httpx_mock.add_response(
url="https://play.google.com/store/search?q=blacklanternsecurity&c=apps",
text="""<!DOCTYPE html>
<html>
<head>
<title>"blacklanternsecurity" - Android Apps on Google Play</title>
</head>
<body>
<a href="/store/apps/details?id=com.bbot.test&pcampaignid=dontmatchme&pli=1"/>
</body>
</html>""",
)
module_test.httpx_mock.add_response(
url="https://play.google.com/store/apps/details?id=com.bbot.test",
text="""<!DOCTYPE html>
<html>
<head>
<title>BBOT</title>
</head>
<body>
<meta name="appstore:developer_url" content="https://www.blacklanternsecurity.com">
</div>
</div>
</body>
</html>""",
)
module_test.httpx_mock.add_response(
url="https://d.apkpure.com/b/XAPK/com.bbot.test?version=latest",
content=self.apk_file,
)

def check(self, module_test, events):
extract_event = [
e
for e in events
if e.type == "FILESYSTEM" and "com_bbot_test_xapk" in e.data["path"] and "folder" in e.tags
]
assert 1 == len(extract_event), "Failed to extract apk"
extract_path = Path(extract_event[0].data["path"])
assert extract_path.is_dir(), "Destination apk doesn't exist"

0 comments on commit 57622c2

Please sign in to comment.