Skip to content

Commit

Permalink
feat: converted hash plugin to new base class
Browse files Browse the repository at this point in the history
also moved methods from helperFunctions.hash that were used exclusively in the plugin into the plugin
  • Loading branch information
jstucke committed Nov 14, 2024
1 parent a76c79b commit 1c22602
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 175 deletions.
81 changes: 6 additions & 75 deletions src/helperFunctions/hash.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,39 @@
from __future__ import annotations

import contextlib
import logging
import sys
from hashlib import md5, new
from typing import TYPE_CHECKING
from hashlib import new

import lief
import ssdeep
import tlsh

from helperFunctions.data_conversion import make_bytes

if TYPE_CHECKING:
from objects.file import FileObject

ELF_MIME_TYPES = [
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
]


def get_hash(hash_function, binary):
def get_hash(hash_function: str, binary: bytes | str) -> str:
"""
Hashes binary with hash_function.
:param hash_function: The hash function to use. See hashlib for more
:param binary: The data to hash, either as string or array of Integers
:return: The hash as hexstring
:return: The hash as hex string
"""
binary = make_bytes(binary)
raw_hash = new(hash_function)
raw_hash.update(binary)
raw_hash.update(make_bytes(binary))
return raw_hash.hexdigest()


def get_sha256(code):
def get_sha256(code: bytes | str) -> str:
return get_hash('sha256', code)


def get_md5(code):
def get_md5(code: bytes | str) -> str:
return get_hash('md5', code)


def get_ssdeep(code):
binary = make_bytes(code)
raw_hash = ssdeep.Hash()
raw_hash.update(binary)
return raw_hash.digest()


def get_tlsh(code):
tlsh_hash = tlsh.hash(make_bytes(code))
return tlsh_hash if tlsh_hash != 'TNULL' else ''


def get_tlsh_comparison(first, second):
return tlsh.diff(first, second)


def get_imphash(file_object: FileObject) -> str | None:
"""
Generates and returns the md5 hash of the (sorted) imported functions of an ELF file represented by `file_object`.
Returns `None` if there are no imports or if an exception occurs.
:param file_object: The FileObject of which the imphash shall be computed
"""
if _is_elf_file(file_object):
try:
with _suppress_stdout():
functions = [f.name for f in lief.ELF.parse(file_object.file_path).imported_functions]
if functions:
return md5(','.join(sorted(functions)).encode()).hexdigest()
except Exception:
logging.exception(f'Could not compute imphash for {file_object.file_path}')
return None


def _is_elf_file(file_object: FileObject) -> bool:
return file_object.processed_analysis['file_type']['result']['mime'] in ELF_MIME_TYPES


def normalize_lief_items(functions):
"""
Shorthand to convert a list of objects to a list of strings
"""
return [str(function) for function in functions]


class _StandardOutWriter:
def write(self, _):
pass


@contextlib.contextmanager
def _suppress_stdout():
"""A context manager that suppresses any output to stdout and stderr."""
writer = _StandardOutWriter()

stdout, stderr = sys.stdout, sys.stderr
sys.stdout, sys.stderr = writer, writer

yield

sys.stdout, sys.stderr = stdout, stderr
138 changes: 104 additions & 34 deletions src/plugins/analysis/hash/code/hash.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,113 @@
from __future__ import annotations

import logging
from hashlib import algorithms_guaranteed
from typing import TYPE_CHECKING, Optional

import lief
import ssdeep
import tlsh
from pydantic import BaseModel, Field
from semver import Version

import config
from analysis.PluginBase import AnalysisBasePlugin
from helperFunctions.hash import get_hash, get_imphash, get_ssdeep, get_tlsh
from analysis.plugin import AnalysisPluginV0
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.hash import get_hash, get_md5

if TYPE_CHECKING:
from io import FileIO

ELF_MIME_TYPES = [
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
]


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
# The supported hashes are the ones from helperFunctions.hash and hashlib (except "shake" which is of
# little use considering its variable length).
# If they are not supported on the platform or not selected in the configuration of the plugin, the value will
# be `None`.
# Only the md5 and sha256 hashes are guaranteed to be available (since they are required down the line)

# from hashlib
md5: str = Field(description="md5 hash of the file's content")
sha256: str = Field(description="sha256 hash of the file's content")
sha1: Optional[str] = Field(description="sha1 hash of the file's content", default=None)
sha224: Optional[str] = Field(description="sha224 hash of the file's content", default=None)
sha384: Optional[str] = Field(description="sha384 hash of the file's content", default=None)
sha512: Optional[str] = Field(description="sha512 hash of the file's content", default=None)
blake2b: Optional[str] = Field(description="blake2b hash of the file's content", default=None)
blake2s: Optional[str] = Field(description="blake2s hash of the file's content", default=None)
sha3_224: Optional[str] = Field(description="sha3_224 hash of the file's content", default=None)
sha3_256: Optional[str] = Field(description="sha3_256 hash of the file's content", default=None)
sha3_384: Optional[str] = Field(description="sha3_384 hash of the file's content", default=None)
sha3_512: Optional[str] = Field(description="sha3_512 hash of the file's content", default=None)

ssdeep: Optional[str] = Field(description="ssdeep hash of the file's content", default=None)
tlsh: Optional[str] = Field(description="tlsh hash of the file's content", default=None)
imphash: Optional[str] = Field(
description='import hash: the MD5 hash of the sorted imported functions (ELF files only)',
default=None,
)

def __init__(self):
super().__init__(
metadata=self.MetaData(
name='file_hashes',
description='calculate different hash values of the file',
version=Version(1, 3, 0),
dependencies=['file_type'],
Schema=self.Schema,
),
)
configured_hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', [])
self.hashes_to_create = set(configured_hashes).union({'sha256', 'md5'})

def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema:
del virtual_file_path
result = {}

class AnalysisPlugin(AnalysisBasePlugin):
file_handle.seek(0)
file_contents = file_handle.read()
for hash_ in self.hashes_to_create.intersection(algorithms_guaranteed):
result[hash_] = get_hash(hash_, file_contents)
result['ssdeep'] = get_ssdeep(file_contents)
result['imphash'] = get_imphash(file_handle, analyses.get('file_type'))
result['tlsh'] = get_tlsh(file_contents)

return self.Schema(**result)


def get_imphash(file: FileIO, type_analysis: BaseModel | None) -> str | None:
"""
This Plugin creates several hashes of the file
Generates and returns the md5 hash for the (sorted) imported functions of an ELF file.
Returns `None` if there are no imports or if an exception occurs.
"""
if type_analysis is not None and _is_elf_file(type_analysis):
try:
if (parsed_elf := lief.ELF.parse(file.name)) is not None and len(parsed_elf.imported_functions) > 0:
functions = [f.name for f in parsed_elf.imported_functions]
return get_md5(','.join(sorted(functions)))
except Exception as error:
logging.warning(f'Could not compute imphash for {file}: {error}')
return None


def _is_elf_file(type_analysis: BaseModel) -> bool:
return type_analysis.mime in ELF_MIME_TYPES


def get_ssdeep(file_contents: bytes) -> str:
raw_hash = ssdeep.Hash()
raw_hash.update(file_contents)
return raw_hash.digest()


NAME = 'file_hashes'
DEPENDENCIES = ['file_type'] # noqa: RUF012
DESCRIPTION = 'calculate different hash values of the file'
VERSION = '1.2'
FILE = __file__

def additional_setup(self):
hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', ['sha256'])
self.hashes_to_create = hashes

def process_object(self, file_object):
"""
This function must be implemented by the plugin.
Analysis result must be a dict stored in file_object.processed_analysis[self.NAME]
If you want to propagate results to parent objects store a list of strings 'summary' entry of your result dict
"""
file_object.processed_analysis[self.NAME] = {}
for hash_ in self.hashes_to_create:
if hash_ in algorithms_guaranteed:
file_object.processed_analysis[self.NAME][hash_] = get_hash(hash_, file_object.binary)
else:
logging.debug(f'algorithm {hash_} not available')
file_object.processed_analysis[self.NAME]['ssdeep'] = get_ssdeep(file_object.binary)
file_object.processed_analysis[self.NAME]['imphash'] = get_imphash(file_object)

tlsh_hash = get_tlsh(file_object.binary)
if tlsh_hash:
file_object.processed_analysis[self.NAME]['tlsh'] = get_tlsh(file_object.binary)

return file_object
def get_tlsh(file_contents: bytes) -> str | None:
tlsh_hash = tlsh.hash(file_contents)
return tlsh_hash if tlsh_hash != 'TNULL' else None
56 changes: 39 additions & 17 deletions src/plugins/analysis/hash/test/test_plugin_hash.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import os
from pathlib import Path

import pytest
from common_helper_files import get_dir_of_file

from test.common_helper import MockFileObject
from ..code.hash import AnalysisPlugin, get_imphash, get_ssdeep, get_tlsh

from ..code.hash import AnalysisPlugin
TEST_DATA_DIR = Path(__file__).parent / 'data'
TEST_FILE = TEST_DATA_DIR / 'ls'
MD5_LEN = 32
TEST_STRING = b'test string'

TEST_DATA_DIR = os.path.join(get_dir_of_file(__file__), 'data') # noqa: PTH118

class MockTypeResultSchema:
mime = 'application/x-executable'


ANALYSIS_RESULT = {'file_type': MockTypeResultSchema()}


@pytest.mark.backend_config_overwrite(
Expand All @@ -23,20 +31,34 @@
@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginHash:
def test_all_hashes(self, analysis_plugin):
result = analysis_plugin.process_object(MockFileObject()).processed_analysis[analysis_plugin.NAME]
with TEST_FILE.open('rb') as fp:
result = analysis_plugin.analyze(fp, {}, ANALYSIS_RESULT)

assert 'md5' in result, 'md5 not in result'
assert 'sha1' in result, 'sha1 not in result'
assert 'foo' not in result, 'foo in result but not available'
assert result['md5'] == '6f8db599de986fab7a21625b7916589c', 'hash not correct'
assert 'ssdeep' in result, 'ssdeep not in result'
assert 'imphash' in result, 'imphash not in result'
assert result.md5 is not None
assert result.sha1 is not None
assert result.ssdeep is not None
assert result.imphash is not None
assert result.md5 == '87b02c9bea4be534649d3ab0b6f040a0', 'hash not correct'

def test_imphash(self, analysis_plugin):
file_path = os.path.join(TEST_DATA_DIR, 'ls') # noqa: PTH118
result = analysis_plugin.process_object(MockFileObject(file_path=file_path)).processed_analysis[
analysis_plugin.NAME
]
with TEST_FILE.open('rb') as fp:
result = analysis_plugin.analyze(fp, {}, ANALYSIS_RESULT)

assert isinstance(result.imphash, str), 'imphash should be a string'
assert len(result.imphash) == MD5_LEN, 'imphash does not look like an md5'
assert result.imphash == 'd9eccd5f72564ac07601458b26040259'


def test_get_ssdeep():
assert get_ssdeep(TEST_STRING) == '3:Hv2:HO', 'not correct from string'


def test_imphash_bad_file():
this_file = Path(__file__)
with this_file.open('rb') as fp:
assert get_imphash(fp, MockTypeResultSchema()) is None


assert isinstance(result['imphash'], str), 'imphash should be a string'
assert len(result['imphash']) == 32, 'imphash does not look like an md5'
def test_get_tlsh():
assert get_tlsh(b'foobar') is None # make sure the result is not 'TNULL'
assert get_tlsh(os.urandom(2**7)) not in [None, ''] # the new tlsh version should work for smaller inputs
14 changes: 14 additions & 0 deletions src/plugins/analysis/hash/view/hash.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% extends "analysis_plugins/general_information.html" %}

{% block analysis_result_details %}

{% for key, value in analysis_result.items() | sort %}
{% if value %}
<tr>
<td>{{ key }}</td>
<td style="font-family: monospace">{{ value }}</td>
</tr>
{% endif %}
{% endfor %}

{% endblock %}
Loading

0 comments on commit 1c22602

Please sign in to comment.