From a10799ec25860ca7acc3b03ebfaa838b26c19670 Mon Sep 17 00:00:00 2001 From: Willis Berrios Date: Thu, 30 Jan 2025 11:47:52 -0600 Subject: [PATCH] feat: Enhance JS database management with hashing and timestamp functionality (#327) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Ryan Mast <3969255+nightlark@users.noreply.github.com> --- surfactant/database_manager/__init__.py | 0 surfactant/database_manager/database_utils.py | 60 +++++++++++++++++ surfactant/infoextractors/js_file.py | 67 +++++++++++++++---- 3 files changed, 114 insertions(+), 13 deletions(-) create mode 100644 surfactant/database_manager/__init__.py create mode 100644 surfactant/database_manager/database_utils.py diff --git a/surfactant/database_manager/__init__.py b/surfactant/database_manager/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/surfactant/database_manager/database_utils.py b/surfactant/database_manager/database_utils.py new file mode 100644 index 00000000..4fecaf6f --- /dev/null +++ b/surfactant/database_manager/database_utils.py @@ -0,0 +1,60 @@ +# Copyright 2025 Lawrence Livermore National Security, LLC +# See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT +# Copyright 2025 Lawrence Livermore National Security, LLC +# See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT +import hashlib +from typing import Dict, Optional + +import tomlkit + + +def calculate_hash(data: str) -> str: + """Calculate the SHA-256 hash of the given data.""" + return hashlib.sha256(data.encode("utf-8")).hexdigest() + + +def load_hash_and_timestamp( + hash_file_path, pattern_key: str, pattern_file: str +) -> Optional[Dict[str, str]]: + """Load the hash and timestamp for a specific pattern from the specified TOML file.""" + try: + with open(hash_file_path, "r") as f: + hash_data = tomlkit.load(f) + # Access the specific structure using the provided keys + return hash_data.get(pattern_key, {}).get(pattern_file) + except FileNotFoundError: + return None + + +def save_hash_and_timestamp(hash_file_path, pattern_info: Dict[str, str]) -> None: + """Save the hash and timestamp for a specific pattern to the specified TOML file.""" + try: + with open(hash_file_path, "r") as f: + hash_data = tomlkit.load(f) + except FileNotFoundError: + hash_data = {} + + # Define the new data structure + new_data = { + pattern_info["pattern_key"]: { + pattern_info["pattern_file"]: { + "source": pattern_info["source"], + "hash": pattern_info["hash_value"], + "timestamp": pattern_info["timestamp"], + } + } + } + + # Update the existing data with the new data + if pattern_info["pattern_key"] in hash_data: + hash_data[pattern_info["pattern_key"]].update(new_data[pattern_info["pattern_key"]]) + else: + hash_data.update(new_data) + + # Write the updated data back to the TOML file + with open(hash_file_path, "w") as f: + tomlkit.dump(hash_data, f) diff --git a/surfactant/infoextractors/js_file.py b/surfactant/infoextractors/js_file.py index 8fba6f17..f72545c2 100644 --- a/surfactant/infoextractors/js_file.py +++ b/surfactant/infoextractors/js_file.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: MIT import json import re +from datetime import datetime, timezone from typing import Any, Dict, List, Optional import requests @@ -11,12 +12,41 @@ import surfactant.plugin from surfactant.configmanager import ConfigManager +from surfactant.database_manager.database_utils import ( + calculate_hash, + load_hash_and_timestamp, + save_hash_and_timestamp, +) from surfactant.sbomtypes import SBOM, Software class JSDatabaseManager: def __init__(self): - self.js_lib_database = None + self._js_lib_database = None # Use the private attribute + self.database_version_file_path = ( + ConfigManager().get_data_dir_path() / "infoextractors" / "js_library_patterns.toml" + ) + self.pattern_key = "js_library_patterns" + self.pattern_file = "js_library_patterns.json" + self.source = "jsfile.retirejs" + self.new_hash = None + self.download_timestamp = None + + @property + def js_lib_database(self) -> Optional[Dict[str, Any]]: + if self._js_lib_database is None: + self.load_db() + return self._js_lib_database + + @property + def pattern_info(self) -> Dict[str, Any]: + return { + "pattern_key": self.pattern_key, + "pattern_file": self.pattern_file, + "source": self.source, + "hash_value": self.new_hash, + "timestamp": self.download_timestamp, + } def load_db(self) -> None: js_lib_file = ( @@ -25,15 +55,15 @@ def load_db(self) -> None: try: with open(js_lib_file, "r") as regex: - self.js_lib_database = json.load(regex) + self._js_lib_database = json.load(regex) except FileNotFoundError: logger.warning( "Javascript library pattern database could not be loaded. Run `surfactant plugin update-db js_file` to fetch the pattern database." ) - self.js_lib_database = None + self._js_lib_database = None def get_database(self) -> Optional[Dict[str, Any]]: - return self.js_lib_database + return self._js_lib_database js_db_manager = JSDatabaseManager() @@ -90,12 +120,12 @@ def match_by_attribute(attribute: str, content: str, database: Dict) -> List[Dic return libs -def download_database() -> Optional[Dict[str, Any]]: +def download_database() -> Optional[str]: url = "https://raw.githubusercontent.com/RetireJS/retire.js/master/repository/jsrepository-master.json" response = requests.get(url) if response.status_code == 200: logger.info("Request successful!") - return json.loads(response.text) + return response.text if response.status_code == 404: logger.error("Resource not found.") @@ -129,19 +159,30 @@ def strip_irrelevant_data(retirejs_db: dict) -> dict: @surfactant.plugin.hookimpl def update_db() -> str: - """Retrieves the javascript library CVE database used by retire.js (https://github.com/RetireJS/retire.js/blob/master/repository/jsrepository-master.json) and only keeps the contents under each library's "extractors" section, which contains file hashes and regexes relevant for detecting a specific javascript library by its file name or contents. + raw_data = download_database() + if raw_data is not None: + js_db_manager.new_hash = calculate_hash(raw_data) + current_data = load_hash_and_timestamp( + js_db_manager.database_version_file_path, + js_db_manager.pattern_key, + js_db_manager.pattern_file, + ) + if current_data and js_db_manager.new_hash == current_data.get("hash"): + return "No update occurred. Database is up-to-date." - The resulting smaller json is written to js_library_patterns.json in the same directory. This smaller file will be read from to make the checks later on.""" - retirejs = download_database() - if retirejs is not None: + retirejs = json.loads(raw_data) cleaned = strip_irrelevant_data(retirejs) + js_db_manager.download_timestamp = datetime.now(timezone.utc) + path = ConfigManager().get_data_dir_path() / "infoextractors" path.mkdir(parents=True, exist_ok=True) - json_file_path = ( - ConfigManager().get_data_dir_path() / "infoextractors" / "js_library_patterns.json" - ) + json_file_path = path / "js_library_patterns.json" with open(json_file_path, "w") as f: json.dump(cleaned, f, indent=4) + + save_hash_and_timestamp( + js_db_manager.database_version_file_path, js_db_manager.pattern_info + ) return "Update complete." return "No update occurred."