Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'compare' typos fix #1264

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Contribute
There are many ways to contribute to FACT.
For instance, you can write an unpacking, compare or analysis plug-in.
For instance, you can write an unpacking, comparison or analysis plug-in.
You can develop your plug-in in your own repository under your favorite license.
It can be added to a local FACT installation as git submodule.
Have a look at [FACT’s Developer’s Manual](https://github.com/fkie-cad/FACT_core/wiki) for more details.
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ fixable = ["ALL"]
"conftest.py" = ["ARG002"]

[tool.ruff.lint.isort]
known-first-party = ["analysis", "compare", "helperFunctions", "install", "intercom", "objects", "plugins", "scheduler",
"statistic", "storage", "test", "unpacker", "version", "web_interface", "config"]
known-first-party = ["analysis", "comparison", "helperFunctions", "install", "intercom", "objects", "plugins",
"scheduler", "statistic", "storage", "test", "unpacker", "version", "web_interface", "config"]
known-third-party = ["docker"]

[tool.ruff.lint.pylint]
Expand Down
File renamed without changes.
26 changes: 14 additions & 12 deletions src/compare/compare.py → src/comparison/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
import logging
from typing import TYPE_CHECKING

from helperFunctions.plugin import discover_compare_plugins
from helperFunctions.plugin import discover_comparison_plugins
from helperFunctions.virtual_file_path import get_paths_for_all_parents
from objects.firmware import Firmware
from storage.binary_service import BinaryService

if TYPE_CHECKING:
from compare.PluginBase import CompareBasePlugin
from comparison.comparison_base_plugin import ComparisonBasePlugin
from objects.file import FileObject
from storage.db_interface_comparison import ComparisonDbInterface


class Compare:
class Comparison:
"""
This Module compares firmware images
"""

compare_plugins = {} # noqa: RUF012
comparison_plugins = {} # noqa: RUF012

def __init__(self, db_interface: ComparisonDbInterface | None = None):
self.db_interface = db_interface
self._setup_plugins()
logging.info(f'Comparison plugins available: {", ".join(self.compare_plugins)}')
logging.info(f'Comparison plugins available: {", ".join(self.comparison_plugins)}')

def compare(self, uid_list):
logging.info(f'Comparison in progress: {uid_list}')
Expand All @@ -41,7 +41,7 @@ def compare(self, uid_list):
def compare_objects(self, fo_list):
return {
'general': self._create_general_section_dict(fo_list),
'plugins': self._execute_compare_plugins(fo_list),
'plugins': self._execute_comparison_plugins(fo_list),
}

def _create_general_section_dict(self, object_list):
Expand Down Expand Up @@ -78,24 +78,26 @@ def _get_vfp_data(self, object_list: list[FileObject]) -> dict[str, list[str]]:
# --- plug-in system ---

def _setup_plugins(self):
self.compare_plugins = {}
self.comparison_plugins = {}
self._init_plugins()

def _init_plugins(self):
for plugin in discover_compare_plugins():
for plugin in discover_comparison_plugins():
try:
self.compare_plugins[plugin.ComparePlugin.NAME] = plugin.ComparePlugin(db_interface=self.db_interface)
self.comparison_plugins[plugin.ComparisonPlugin.NAME] = plugin.ComparisonPlugin(
db_interface=self.db_interface
)
except Exception:
logging.error(f'Could not import comparison plugin {plugin.AnalysisPlugin.NAME}', exc_info=True)

def _execute_compare_plugins(self, fo_list: list[FileObject]) -> dict[str, dict]:
def _execute_comparison_plugins(self, fo_list: list[FileObject]) -> dict[str, dict]:
comparison_results = {}
for plugin in schedule_comparison_plugins(self.compare_plugins):
for plugin in schedule_comparison_plugins(self.comparison_plugins):
comparison_results[plugin.NAME] = plugin.compare(fo_list, comparison_results)
return comparison_results


def schedule_comparison_plugins(plugin_dict: dict[str, CompareBasePlugin]) -> list[CompareBasePlugin]:
def schedule_comparison_plugins(plugin_dict: dict[str, ComparisonBasePlugin]) -> list[ComparisonBasePlugin]:
# we use a reverse topological sort for scheduling the plugins while considering their dependencies
# see also: https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
visited = set()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations # noqa: N999
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING
Expand All @@ -9,9 +9,9 @@
from objects.file import FileObject


class CompareBasePlugin(BasePlugin):
class ComparisonBasePlugin(BasePlugin):
"""
This is the compare plug-in base class. All compare plug-ins should be derived from this class.
This is the comparison plugin base class. All comparison plugins should be derived from this class.
"""

# must be set by the plugin:
Expand All @@ -26,7 +26,7 @@ def __init__(self, config=None, db_interface=None, view_updater=None):
self.database = db_interface

@abstractmethod
def compare_function(self, fo_list: list[FileObject], dependency_results: dict[str, dict]) -> dict[str, dict]:
def compare_objects(self, fo_list: list[FileObject], dependency_results: dict[str, dict]) -> dict[str, dict]:
"""
This function must be implemented by the plugin.
`fo_list` is a list with file_objects including analysis and all summaries.
Expand All @@ -38,7 +38,7 @@ def compare_function(self, fo_list: list[FileObject], dependency_results: dict[s

def compare(self, fo_list: list[FileObject], dependency_results: dict[str, dict]) -> dict[str, dict]:
"""
This function is called by the compare module.
This function is called by the comparison module.
"""
missing_comparison_deps = self._get_missing_comparison_deps(dependency_results)
if missing_comparison_deps:
Expand All @@ -49,8 +49,8 @@ def compare(self, fo_list: list[FileObject], dependency_results: dict[str, dict]
}
missing_analysis_deps = self._get_missing_analysis_deps(fo_list)
if missing_analysis_deps:
return {'Compare Skipped': {'all': f"Required analyses not present: {', '.join(missing_analysis_deps)}"}}
return self.compare_function(fo_list, dependency_results)
return {'Comparison Skipped': {'all': f"Required analyses not present: {', '.join(missing_analysis_deps)}"}}
return self.compare_objects(fo_list, dependency_results)

def _get_missing_comparison_deps(self, dependency_results: dict[str, dict]) -> list[str]:
return [
Expand Down
26 changes: 13 additions & 13 deletions src/helperFunctions/data_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,36 @@ def make_unicode_string(code: Any) -> str:
return code.__str__()


def convert_uid_list_to_compare_id(uid_list: Iterable[str]) -> str:
def convert_uid_list_to_comparison_id(uid_list: Iterable[str]) -> str:
"""
Convert a list of UIDs to a compare ID (which is a unique string consisting of UIDs separated by semi-colons, used
Convert a list of UIDs to a comparison ID (which is a unique string consisting of UIDs separated by semicolons, used
to identify a FACT `Firmware` or `FileObject` comparison).

:param uid_list: A list of `FileObject` or `Firmware` UIDs.
:return: The compare ID.
:return: The comparison ID.
"""
return ';'.join(sorted(uid_list))


def convert_compare_id_to_list(compare_id: str) -> list[str]:
def convert_comparison_id_to_list(comparison_id: str) -> list[str]:
"""
Convert a compare ID back to a list of UIDs.
Convert a comparison ID back to a list of UIDs.

:param compare_id: The compare ID.
:param comparison_id: The comparison ID.
:return: The according UID list.
"""
return compare_id.split(';')
return comparison_id.split(';')


def normalize_compare_id(compare_id: str) -> str:
def normalize_comparison_id(comparison_id: str) -> str:
"""
Sort the UIDs in a compare ID (so that it is unique) and return it.
Sort the UIDs in a comparison ID (so that it is unique) and return it.

:param compare_id: The compare ID.
:return: The according unique compare ID with reordered UIDs.
:param comparison_id: The comparison ID.
:return: The according unique comparison ID with reordered UIDs.
"""
uids = convert_compare_id_to_list(compare_id)
return convert_uid_list_to_compare_id(uids)
uids = convert_comparison_id_to_list(comparison_id)
return convert_uid_list_to_comparison_id(uids)


def get_value_of_first_key(input_dict: dict[_KT, _VT]) -> _VT | None:
Expand Down
8 changes: 4 additions & 4 deletions src/helperFunctions/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def discover_analysis_plugins() -> list:
return _import_plugins('analysis')


def discover_compare_plugins() -> list:
"""Returns a list of modules where each module is a compare plugin."""
return _import_plugins('compare')
def discover_comparison_plugins() -> list:
"""Returns a list of modules where each module is a comparison plugin."""
return _import_plugins('comparison')


def _import_plugins(plugin_type):
assert plugin_type in ['analysis', 'compare']
assert plugin_type in ['analysis', 'comparison']

plugins = []
src_dir = get_src_dir()
Expand Down
10 changes: 5 additions & 5 deletions src/intercom/back_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class InterComBackEndBinding:
def __init__(
self,
analysis_service=None,
compare_service=None,
comparison_service=None,
unpacking_service=None,
unpacking_locks=None,
testing=False, # noqa: ARG002
):
self.analysis_service = analysis_service
self.compare_service = compare_service
self.comparison_service = comparison_service
self.unpacking_service = unpacking_service
self.unpacking_locks = unpacking_locks
self.poll_delay = config.backend.intercom_poll_delay
Expand All @@ -49,7 +49,7 @@ def start(self):
InterComBackEndAnalysisPlugInsPublisher(analysis_service=self.analysis_service)
self._start_listener(InterComBackEndAnalysisTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndReAnalyzeTask, self.unpacking_service.add_task)
self._start_listener(InterComBackEndCompareTask, self.compare_service.add_task)
self._start_listener(InterComBackEndComparisonTask, self.comparison_service.add_task)
self._start_listener(InterComBackEndRawDownloadTask)
self._start_listener(InterComBackEndFileDiffTask)
self._start_listener(InterComBackEndTarRepackTask)
Expand Down Expand Up @@ -131,8 +131,8 @@ class InterComBackEndSingleFileTask(InterComBackEndReAnalyzeTask):
CONNECTION_TYPE = 'single_file_task'


class InterComBackEndCompareTask(InterComListener):
CONNECTION_TYPE = 'compare_task'
class InterComBackEndComparisonTask(InterComListener):
CONNECTION_TYPE = 'comparison_task'


class InterComBackEndRawDownloadTask(InterComListenerAndResponder):
Expand Down
4 changes: 2 additions & 2 deletions src/intercom/front_end_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def add_re_analyze_task(self, fw, unpack=True):
def add_single_file_task(self, fw):
self._add_to_redis_queue('single_file_task', fw, fw.uid)

def add_compare_task(self, compare_id, force=False):
self._add_to_redis_queue('compare_task', (compare_id, force), compare_id)
def add_comparison_task(self, comparison_id, force=False):
self._add_to_redis_queue('comparison_task', (comparison_id, force), comparison_id)

def delete_file(self, uid_list: set[str]):
self._add_to_redis_queue('file_delete_task', uid_list)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import ssdeep

import config
from compare.PluginBase import CompareBasePlugin
from comparison.comparison_base_plugin import ComparisonBasePlugin
from helperFunctions.compare_sets import iter_element_and_rest, remove_duplicates_from_list
from helperFunctions.data_conversion import convert_uid_list_to_compare_id
from helperFunctions.data_conversion import convert_uid_list_to_comparison_id
from objects.firmware import Firmware

if TYPE_CHECKING:
from objects.file import FileObject


class ComparePlugin(CompareBasePlugin):
class ComparisonPlugin(ComparisonBasePlugin):
"""
Compares file coverage
"""
Expand All @@ -29,27 +29,27 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ssdeep_ignore_threshold = config.backend.ssdeep_ignore

def compare_function(self, fo_list, dependency_results: dict[str, dict]): # noqa: ARG002
compare_result = {
def compare_objects(self, fo_list, dependency_results: dict[str, dict]): # noqa: ARG002
comparison_result = {
'files_in_common': self._get_intersection_of_files(fo_list),
'exclusive_files': self._get_exclusive_files(fo_list),
}

self._handle_partially_common_files(compare_result, fo_list)
self._handle_partially_common_files(comparison_result, fo_list)

for result in compare_result.values():
for result in comparison_result.values():
if isinstance(result, dict):
result['collapse'] = False

similar_files, similarity = self._get_similar_files(fo_list, compare_result['exclusive_files'])
compare_result['similar_files'] = self.combine_similarity_results(similar_files, fo_list, similarity)
similar_files, similarity = self._get_similar_files(fo_list, comparison_result['exclusive_files'])
comparison_result['similar_files'] = self.combine_similarity_results(similar_files, fo_list, similarity)

if len(fo_list) == 2 and all(isinstance(fo, Firmware) for fo in fo_list): # noqa: PLR2004
compare_result['changed_text_files'] = self._find_changed_text_files(
fo_list, compare_result['files_in_common']['all']
comparison_result['changed_text_files'] = self._find_changed_text_files(
fo_list, comparison_result['files_in_common']['all']
)

return compare_result
return comparison_result

def _get_exclusive_files(self, fo_list: list[FileObject]) -> dict[str, list[str]]:
result = {}
Expand All @@ -68,16 +68,16 @@ def _get_intersection_of_files(self, fo_list: list[FileObject]) -> dict[str, lis
def _get_included_file_sets(fo_list: list[FileObject]) -> list[set[str]]:
return [set(file_object.list_of_all_included_files) for file_object in fo_list]

def _handle_partially_common_files(self, compare_result, fo_list):
def _handle_partially_common_files(self, comparison_result, fo_list):
if len(fo_list) > 2: # noqa: PLR2004
compare_result['files_in_more_than_one_but_not_in_all'] = self._get_files_in_more_than_one_but_not_in_all(
fo_list, compare_result
)
not_in_all = compare_result['files_in_more_than_one_but_not_in_all']
comparison_result[
'files_in_more_than_one_but_not_in_all'
] = self._get_files_in_more_than_one_but_not_in_all(fo_list, comparison_result)
not_in_all = comparison_result['files_in_more_than_one_but_not_in_all']
else:
not_in_all = {}
compare_result['non_zero_files_in_common'] = self._get_non_zero_common_files(
compare_result['files_in_common'], not_in_all
comparison_result['non_zero_files_in_common'] = self._get_non_zero_common_files(
comparison_result['files_in_common'], not_in_all
)

@staticmethod
Expand All @@ -104,7 +104,7 @@ def _get_similar_files(
for file_one in exclusive_files[parent_one.uid]:
for similar_file_pair, value in self._find_similar_file_for(file_one, parent_one.uid, parent_two):
similar_files.append(similar_file_pair)
similarity[convert_uid_list_to_compare_id(similar_file_pair)] = value
similarity[convert_uid_list_to_comparison_id(similar_file_pair)] = value
similarity_sets = generate_similarity_sets(remove_duplicates_from_list(similar_files))
return similarity_sets, similarity

Expand Down Expand Up @@ -134,7 +134,7 @@ def combine_similarity_results(self, similar_files: list[list[str]], fo_list: li
def _get_similarity_value(group_of_similar_files: list[str], similarity_dict: dict[str, str]) -> str:
similarities_list = []
for id_tuple in combinations(group_of_similar_files, 2):
similar_file_pair_id = convert_uid_list_to_compare_id(id_tuple)
similar_file_pair_id = convert_uid_list_to_comparison_id(id_tuple)
if similar_file_pair_id in similarity_dict:
similarities_list.append(similarity_dict[similar_file_pair_id])
if not similarities_list:
Expand Down
Loading
Loading