-
Notifications
You must be signed in to change notification settings - Fork 560
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1919 from blacklanternsecurity/dev
Dev -> Stable 2.2.0
- Loading branch information
Showing
44 changed files
with
2,010 additions
and
1,162 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import puremagic | ||
|
||
|
||
def get_magic_info(file): | ||
|
||
magic_detections = puremagic.magic_file(file) | ||
if magic_detections: | ||
magic_detections.sort(key=lambda x: x.confidence, reverse=True) | ||
detection = magic_detections[0] | ||
return detection.extension, detection.mime_type, detection.name, detection.confidence | ||
return "", "", "", 0 | ||
|
||
|
||
def get_compression(mime_type): | ||
mime_type = mime_type.lower() | ||
# from https://github.com/cdgriffith/puremagic/blob/master/puremagic/magic_data.json | ||
compression_map = { | ||
"application/gzip": "gzip", # Gzip compressed file | ||
"application/zip": "zip", # Zip archive | ||
"application/x-bzip2": "bzip2", # Bzip2 compressed file | ||
"application/x-xz": "xz", # XZ compressed file | ||
"application/x-7z-compressed": "7z", # 7-Zip archive | ||
"application/vnd.rar": "rar", # RAR archive | ||
"application/x-lzma": "lzma", # LZMA compressed file | ||
"application/x-compress": "compress", # Unix compress file | ||
"application/zstd": "zstd", # Zstandard compressed file | ||
"application/x-lz4": "lz4", # LZ4 compressed file | ||
"application/x-tar": "tar", # Tar archive | ||
"application/x-zip-compressed-fb2": "zip", # Zip archive (FB2) | ||
"application/epub+zip": "zip", # EPUB book (Zip archive) | ||
"application/pak": "pak", # PAK archive | ||
"application/x-lha": "lha", # LHA archive | ||
"application/arj": "arj", # ARJ archive | ||
"application/vnd.ms-cab-compressed": "cab", # Microsoft Cabinet archive | ||
"application/x-sit": "sit", # StuffIt archive | ||
"application/binhex": "binhex", # BinHex encoded file | ||
"application/x-lrzip": "lrzip", # Long Range ZIP | ||
"application/x-alz": "alz", # ALZip archive | ||
"application/x-tgz": "tgz", # Gzip compressed Tar archive | ||
"application/x-gzip": "gzip", # Gzip compressed file | ||
"application/x-lzip": "lzip", # Lzip compressed file | ||
"application/x-zstd-compressed-tar": "zstd", # Zstandard compressed Tar archive | ||
"application/x-lz4-compressed-tar": "lz4", # LZ4 compressed Tar archive | ||
"application/vnd.comicbook+zip": "zip", # Comic book archive (Zip) | ||
"application/vnd.palm": "palm", # Palm OS data | ||
"application/fictionbook2+zip": "zip", # FictionBook 2.0 (Zip) | ||
"application/fictionbook3+zip": "zip", # FictionBook 3.0 (Zip) | ||
"application/x-cpio": "cpio", # CPIO archive | ||
"application/x-java-pack200": "pack200", # Java Pack200 archive | ||
"application/x-par2": "par2", # PAR2 recovery file | ||
"application/x-rar-compressed": "rar", # RAR archive | ||
"application/java-archive": "zip", # Java Archive (JAR) | ||
"application/x-webarchive": "zip", # Web archive (Zip) | ||
"application/vnd.android.package-archive": "zip", # Android package (APK) | ||
"application/x-itunes-ipa": "zip", # iOS application archive (IPA) | ||
"application/x-stuffit": "sit", # StuffIt archive | ||
"application/x-archive": "ar", # Unix archive | ||
"application/x-qpress": "qpress", # Qpress archive | ||
"application/x-xar": "xar", # XAR archive | ||
"application/x-ace": "ace", # ACE archive | ||
"application/x-zoo": "zoo", # Zoo archive | ||
"application/x-arc": "arc", # ARC archive | ||
"application/x-zstd-compressed-tar": "zstd", # Zstandard compressed Tar archive | ||
"application/x-lz4-compressed-tar": "lz4", # LZ4 compressed Tar archive | ||
"application/vnd.comicbook-rar": "rar", # Comic book archive (RAR) | ||
} | ||
|
||
return compression_map.get(mime_type, "") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
# This file contains SQLModel (Pydantic + SQLAlchemy) models for BBOT events, scans, and targets. | ||
# Used by the SQL output modules, but portable for outside use. | ||
|
||
import json | ||
import logging | ||
from datetime import datetime | ||
from pydantic import ConfigDict | ||
from typing import List, Optional | ||
from typing_extensions import Annotated | ||
from pydantic.functional_validators import AfterValidator | ||
from sqlmodel import inspect, Column, Field, SQLModel, JSON, String, DateTime as SQLADateTime | ||
|
||
|
||
log = logging.getLogger("bbot_server.models") | ||
|
||
|
||
def naive_datetime_validator(d: datetime): | ||
""" | ||
Converts all dates into UTC, then drops timezone information. | ||
This is needed to prevent inconsistencies in sqlite, because it is timezone-naive. | ||
""" | ||
# drop timezone info | ||
return d.replace(tzinfo=None) | ||
|
||
|
||
NaiveUTC = Annotated[datetime, AfterValidator(naive_datetime_validator)] | ||
|
||
|
||
class CustomJSONEncoder(json.JSONEncoder): | ||
def default(self, obj): | ||
# handle datetime | ||
if isinstance(obj, datetime): | ||
return obj.isoformat() | ||
return super().default(obj) | ||
|
||
|
||
class BBOTBaseModel(SQLModel): | ||
model_config = ConfigDict(extra="ignore") | ||
|
||
def __init__(self, *args, **kwargs): | ||
self._validated = None | ||
super().__init__(*args, **kwargs) | ||
|
||
@property | ||
def validated(self): | ||
try: | ||
if self._validated is None: | ||
self._validated = self.__class__.model_validate(self) | ||
return self._validated | ||
except AttributeError: | ||
return self | ||
|
||
def to_json(self, **kwargs): | ||
return json.dumps(self.validated.model_dump(), sort_keys=True, cls=CustomJSONEncoder, **kwargs) | ||
|
||
@classmethod | ||
def _pk_column_names(cls): | ||
return [column.name for column in inspect(cls).primary_key] | ||
|
||
def __hash__(self): | ||
return hash(self.to_json()) | ||
|
||
def __eq__(self, other): | ||
return hash(self) == hash(other) | ||
|
||
|
||
### EVENT ### | ||
|
||
|
||
class Event(BBOTBaseModel, table=True): | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
data = self._get_data(self.data, self.type) | ||
self.data = {self.type: data} | ||
if self.host: | ||
self.reverse_host = self.host[::-1] | ||
|
||
def get_data(self): | ||
return self._get_data(self.data, self.type) | ||
|
||
@staticmethod | ||
def _get_data(data, type): | ||
# handle SIEM-friendly format | ||
if isinstance(data, dict) and list(data) == [type]: | ||
return data[type] | ||
return data | ||
|
||
uuid: str = Field( | ||
primary_key=True, | ||
index=True, | ||
nullable=False, | ||
) | ||
id: str = Field(index=True) | ||
type: str = Field(index=True) | ||
scope_description: str | ||
data: dict = Field(sa_type=JSON) | ||
host: Optional[str] | ||
port: Optional[int] | ||
netloc: Optional[str] | ||
# store the host in reversed form for efficient lookups by domain | ||
reverse_host: Optional[str] = Field(default="", exclude=True, index=True) | ||
resolved_hosts: List = Field(default=[], sa_type=JSON) | ||
dns_children: dict = Field(default={}, sa_type=JSON) | ||
web_spider_distance: int = 10 | ||
scope_distance: int = Field(default=10, index=True) | ||
scan: str = Field(index=True) | ||
timestamp: NaiveUTC = Field(index=True) | ||
parent: str = Field(index=True) | ||
tags: List = Field(default=[], sa_type=JSON) | ||
module: str = Field(index=True) | ||
module_sequence: str | ||
discovery_context: str = "" | ||
discovery_path: List[str] = Field(default=[], sa_type=JSON) | ||
parent_chain: List[str] = Field(default=[], sa_type=JSON) | ||
|
||
|
||
### SCAN ### | ||
|
||
|
||
class Scan(BBOTBaseModel, table=True): | ||
id: str = Field(primary_key=True) | ||
name: str | ||
status: str | ||
started_at: NaiveUTC = Field(index=True) | ||
finished_at: Optional[NaiveUTC] = Field(default=None, sa_column=Column(SQLADateTime, nullable=True, index=True)) | ||
duration_seconds: Optional[float] = Field(default=None) | ||
duration: Optional[str] = Field(default=None) | ||
target: dict = Field(sa_type=JSON) | ||
preset: dict = Field(sa_type=JSON) | ||
|
||
|
||
### TARGET ### | ||
|
||
|
||
class Target(BBOTBaseModel, table=True): | ||
name: str = "Default Target" | ||
strict_scope: bool = False | ||
seeds: List = Field(default=[], sa_type=JSON) | ||
whitelist: List = Field(default=None, sa_type=JSON) | ||
blacklist: List = Field(default=[], sa_type=JSON) | ||
hash: str = Field(sa_column=Column("hash", String, unique=True, primary_key=True, index=True)) | ||
scope_hash: str = Field(sa_column=Column("scope_hash", String, index=True)) | ||
seed_hash: str = Field(sa_column=Column("seed_hashhash", String, index=True)) | ||
whitelist_hash: str = Field(sa_column=Column("whitelist_hash", String, index=True)) | ||
blacklist_hash: str = Field(sa_column=Column("blacklist_hash", String, index=True)) |
Oops, something went wrong.