Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: converted ipc plugin to new base class #1263

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 63 additions & 42 deletions src/plugins/analysis/ipc/code/ipc_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,83 @@
import json
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, List, Union

from docker.types import Mount
from pydantic import BaseModel, Field
from semver import Version

from analysis.PluginBase import AnalysisBasePlugin
from analysis.plugin import AnalysisPluginV0
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.docker import run_docker_container

if TYPE_CHECKING:
from objects.file import FileObject
from io import FileIO

DOCKER_IMAGE = 'ipc'


class AnalysisPlugin(AnalysisBasePlugin):
"""
Inter-Process Communication Analysis
"""
class FunctionCall(BaseModel):
name: str = Field(
# Refer to sink_function_names in ../docker/ipc_analyzer/ipy_analyzer.py for a list of supported functions
description='The name of the function.',
)
target: Union[str, int] = Field(
description=(
'The first argument of the function call. '
'For all supported functions, this is either a pathname or a file descriptor.'
),
)
arguments: List[Any] = Field(
description=(
'The remaining arguments of the function call. Arguments of type `char*` are rendered as strings. '
'Arguments of type `char**` are rendered as array of strings. Integer arrays are rendered as such. '
'Everything else is rendered as integer.'
)
)

NAME = 'ipc_analyzer'
DESCRIPTION = 'Inter-Process Communication Analysis'
VERSION = '0.1.1'
FILE = __file__

MIME_WHITELIST = [ # noqa: RUF012
'application/x-executable',
'application/x-object',
'application/x-sharedlib',
]
DEPENDENCIES = ['file_type'] # noqa: RUF012
TIMEOUT = 600 # 10 minutes
class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
calls: List[FunctionCall] = Field(description='An array of IPC function calls.')

def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
def __init__(self):
metadata = self.MetaData(
name='ipc_analyzer',
dependencies=['file_type'],
description='Inter-Process Communication Analysis',
mime_whitelist=[
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
],
timeout=600,
version=Version(1, 0, 0),
Schema=self.Schema,
)
super().__init__(metadata=metadata)

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
del virtual_file_path, analyses
output = self._run_ipc_analyzer_in_docker(file_handle)
# output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
# we need to restructure this a bit so it lines up with the Schema
calls = [
{'target': target, 'name': call_dict['type'], 'arguments': call_dict['arguments']}
for target, call_list in output['ipcCalls'].items()
for call_dict in call_list
]
return self.Schema.model_validate({'calls': calls})

def _run_ipc_analyzer_in_docker(self, file_handle: FileIO) -> dict:
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(file_handle.name).absolute()
folder = Path(tmp_dir) / 'results'
mount = f'/input/{file_object.file_name}'
mount = f'/input/{path.name}'
if not folder.exists():
folder.mkdir()
output = folder / f'{file_object.file_name}.json'
output = folder / f'{path.name}.json'
output.write_text(json.dumps({'ipcCalls': {}}))
run_docker_container(
DOCKER_IMAGE,
Expand All @@ -49,28 +88,10 @@ def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
command=f'{mount} /results/',
mounts=[
Mount('/results/', str(folder.resolve()), type='bind'),
Mount(mount, file_object.file_path, type='bind'),
Mount(mount, str(path), type='bind'),
],
)
return json.loads(output.read_text())

def _do_full_analysis(self, file_object: FileObject) -> FileObject:
output = self._run_ipc_analyzer_in_docker(file_object)
file_object.processed_analysis[self.NAME] = {
'full': output,
'summary': self._create_summary(output['ipcCalls']),
}
return file_object

def process_object(self, file_object: FileObject) -> FileObject:
"""
This function handles only ELF executables. Otherwise, it returns an empty dictionary.
It calls the ipc docker container.
"""
return self._do_full_analysis(file_object)

@staticmethod
def _create_summary(output: dict) -> list[str]:
# output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
summary = {entry['type'] for result_list in output.values() for entry in result_list}
return sorted(summary)
def summarize(self, result: Schema) -> list[str]:
return sorted({call.name for call in result.calls})
36 changes: 19 additions & 17 deletions src/plugins/analysis/ipc/test/test_ipc_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,31 @@

import pytest

from objects.file import FileObject

from ..code.ipc_analyzer import AnalysisPlugin

TEST_DIR = Path(__file__).parent / 'data'


EXPECTED_SYSTEM_RESULT = {
'whoami': [{'type': 'system', 'arguments': ['']}],
'ls': [{'type': 'system', 'arguments': ['-l']}],
'echo': [{'type': 'system', 'arguments': ['hello']}],
'id': [{'type': 'system', 'arguments': ['']}],
'pwd': [{'type': 'system', 'arguments': ['']}],
'calls': [
{'arguments': [''], 'name': 'system', 'target': 'whoami'},
{'arguments': ['-l'], 'name': 'system', 'target': 'ls'},
{'arguments': ['hello'], 'name': 'system', 'target': 'echo'},
{'arguments': [''], 'name': 'system', 'target': 'id'},
{'arguments': [''], 'name': 'system', 'target': 'pwd'},
]
}

EXPECTED_WRITE_RESULT = {
'data.dat': [
{'type': 'open', 'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']]},
'calls': [
{'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']], 'name': 'open', 'target': 'data.dat'},
{
'type': 'write',
'arguments': [
'',
['Now is the winter of our discontent\\nMade glorious summer by this sun of York\\n'],
['Now is the winter of our discontent\\nMade ' 'glorious summer by this sun of York\\n'],
[77],
],
'name': 'write',
'target': 'data.dat',
},
]
}
Expand All @@ -40,8 +40,10 @@
('ipc_shared_files_test_bin', EXPECTED_WRITE_RESULT, ['open', 'write']),
],
)
def test_ipc_system(analysis_plugin, test_file, expected_result, expected_summary):
test_object = FileObject(file_path=str((TEST_DIR / test_file).resolve()))
result = analysis_plugin.process_object(test_object)
assert result.processed_analysis['ipc_analyzer']['full']['ipcCalls'] == expected_result
assert result.processed_analysis['ipc_analyzer']['summary'] == expected_summary
def test_ipc_analyze_summary(analysis_plugin, test_file, expected_result, expected_summary):
with (TEST_DIR / test_file).open('rb') as fp:
result = analysis_plugin.analyze(fp, {}, {})
as_dict = result.model_dump()
assert as_dict == expected_result
summary = analysis_plugin.summarize(result)
assert summary == expected_summary
79 changes: 43 additions & 36 deletions src/plugins/analysis/ipc/view/ipc_analyzer.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,47 @@

{% block analysis_result_details %}

<table class="table table-bordered">
<colgroup>
<col style="width: 50px">
<col style="width: 150px">
<col style="width: 600px">
</colgroup>
<tbody class="table-analysis">
<tr>
<td class="table-head-light"><b>Target</b></td>
<td class="table-head-light"><b>Type</b></td>
<td class="table-head-light"><b>Arguments</b></td>
</tr>
{% set ipc_calls = analysis_result['full']['ipcCalls'] %}
{% for target in ipc_calls.keys()|sort %}
{% set row_count = 1 + ipc_calls[target]|length %}
<tr>
<td rowspan={{ row_count }}>{{ target }}</td>
</tr>
{% for ipc_call in ipc_calls[target] %}
<tr>
<td>{{ ipc_call['type'] }}</td>
<td>
<ul class="m-0">
{% for arg in ipc_call['arguments'] %}
{% if arg %}
<li>{{ arg }}</li>
{% endif %}
{% endfor %}
</ul>
</td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
<tr>
<td colspan="2" class="p-0">

{% endblock %}
<table class="table table-bordered mb-0">
<colgroup>
<col style="width: 50px">
<col style="width: 150px">
<col style="width: 600px">
</colgroup>
<thead class="table-head-light">
<tr>
<th>Type</th>
<th>Target</th>
<th>Arguments</th>
</tr>
</thead>
<tbody>
{% for type, call_list in (analysis_result['calls'] | group_dict_list_by_key('name')).items() %}
{% set row_count = 1 + call_list | length %}
<tr>
<td rowspan="{{ row_count }}" style="font-family: monospace;">{{ type }}</td>
</tr>
{% for call_dict in call_list | sort_dict_list('target') %}
<tr>
<td style="font-family: monospace;">{{ call_dict.target }}</td>
<td>
<ul class="m-0">
{% for arg in call_dict.arguments %}
{% if arg %}
<li style="font-family: monospace;">{{ arg }}</li>
{% endif %}
{% endfor %}
</ul>
</td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>

</td>
</tr>

{% endblock %}
24 changes: 24 additions & 0 deletions src/test/unit/web_interface/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ def test_get_unique_keys_from_list_of_dicts(list_of_dicts, expected_result):
assert flt.get_unique_keys_from_list_of_dicts(list_of_dicts) == expected_result


@pytest.mark.parametrize(
('list_of_dicts', 'key', 'expected_result'),
[
([], '', {}),
([{'a': '1'}, {'a': '1'}, {'a': '2'}], 'a', {'1': [{'a': '1'}, {'a': '1'}], '2': [{'a': '2'}]}),
],
)
def test_group_dict_list_by_key(list_of_dicts, key, expected_result):
assert flt.group_dict_list_by_key(list_of_dicts, key) == expected_result


@pytest.mark.parametrize(
('function', 'input_data', 'expected_output', 'error_message'),
[
Expand Down Expand Up @@ -503,3 +514,16 @@ def test_str_to_hex(input_, expected_result):
)
def test_octal_to_readable(input_, include_type, expected_result):
assert flt.octal_to_readable(input_, include_type=include_type) == expected_result


@pytest.mark.parametrize(
('input_', 'expected_result'),
[
([], []),
([{'a': 2}, {'a': 1}, {'a': 3}], [{'a': 1}, {'a': 2}, {'a': 3}]),
([{'a': 2}, {'a': 1}, {'b': 3}], [{'b': 3}, {'a': 1}, {'a': 2}]),
([{'a': 'b'}, {'a': 'c'}, {'a': 'a'}], [{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]),
],
)
def test_sort_dict_list_by_key(input_, expected_result):
assert flt.sort_dict_list_by_key(input_, 'a') == expected_result
2 changes: 2 additions & 0 deletions src/web_interface/components/jinja_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['get_canvas_height'] = flt.get_canvas_height
self._app.jinja_env.filters['get_searchable_crypto_block'] = flt.get_searchable_crypto_block
self._app.jinja_env.filters['get_unique_keys_from_list_of_dicts'] = flt.get_unique_keys_from_list_of_dicts
self._app.jinja_env.filters['group_dict_list_by_key'] = flt.group_dict_list_by_key
self._app.jinja_env.filters['hex'] = hex
self._app.jinja_env.filters['hide_dts_binary_data'] = flt.hide_dts_binary_data
self._app.jinja_env.filters['infection_color'] = flt.infection_color
Expand Down Expand Up @@ -228,6 +229,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['sort_chart_list_by_value'] = flt.sort_chart_list_by_value
self._app.jinja_env.filters['sort_comments'] = flt.sort_comments
self._app.jinja_env.filters['sort_cve'] = flt.sort_cve_results
self._app.jinja_env.filters['sort_dict_list'] = flt.sort_dict_list_by_key
self._app.jinja_env.filters['sort_privileges'] = lambda privileges: sorted(
privileges, key=lambda role: len(privileges[role]), reverse=True
)
Expand Down
13 changes: 12 additions & 1 deletion src/web_interface/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from re import Match
from string import ascii_letters
from time import localtime, strftime, struct_time, time
from typing import Iterable, Union
from typing import Any, Iterable, Union

import packaging.version
import semver
Expand Down Expand Up @@ -366,6 +366,13 @@ def get_unique_keys_from_list_of_dicts(list_of_dicts: list[dict]):
return unique_keys


def group_dict_list_by_key(dict_list: list[dict], key: Any) -> dict[str, list[dict]]:
result = {}
for dictionary in dict_list:
result.setdefault(dictionary.get(key), []).append(dictionary)
return result


def random_collapse_id():
return ''.join(random.choice(ascii_letters) for _ in range(10))

Expand Down Expand Up @@ -433,6 +440,10 @@ def _cve_score_to_float(score: float | str) -> float:
return 0.0


def sort_dict_list_by_key(dict_list: list[dict], key: Any) -> list[dict]:
return sorted(dict_list, key=lambda d: str(d.get(key, '')))


def linter_reformat_issues(issues) -> dict[str, list[dict[str, str]]]:
reformatted = defaultdict(list, {})
for issue in issues:
Expand Down