Skip to content

Commit

Permalink
feat: Enhance JS database management with hashing and timestamp funct…
Browse files Browse the repository at this point in the history
…ionality (#327)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Ryan Mast <[email protected]>
  • Loading branch information
3 people authored Jan 30, 2025
1 parent 28ca5c0 commit a10799e
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 13 deletions.
Empty file.
60 changes: 60 additions & 0 deletions surfactant/database_manager/database_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2025 Lawrence Livermore National Security, LLC
# See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT
# Copyright 2025 Lawrence Livermore National Security, LLC
# See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT
import hashlib
from typing import Dict, Optional

import tomlkit


def calculate_hash(data: str) -> str:
"""Calculate the SHA-256 hash of the given data."""
return hashlib.sha256(data.encode("utf-8")).hexdigest()


def load_hash_and_timestamp(
hash_file_path, pattern_key: str, pattern_file: str
) -> Optional[Dict[str, str]]:
"""Load the hash and timestamp for a specific pattern from the specified TOML file."""
try:
with open(hash_file_path, "r") as f:
hash_data = tomlkit.load(f)
# Access the specific structure using the provided keys
return hash_data.get(pattern_key, {}).get(pattern_file)
except FileNotFoundError:
return None


def save_hash_and_timestamp(hash_file_path, pattern_info: Dict[str, str]) -> None:
"""Save the hash and timestamp for a specific pattern to the specified TOML file."""
try:
with open(hash_file_path, "r") as f:
hash_data = tomlkit.load(f)
except FileNotFoundError:
hash_data = {}

# Define the new data structure
new_data = {
pattern_info["pattern_key"]: {
pattern_info["pattern_file"]: {
"source": pattern_info["source"],
"hash": pattern_info["hash_value"],
"timestamp": pattern_info["timestamp"],
}
}
}

# Update the existing data with the new data
if pattern_info["pattern_key"] in hash_data:
hash_data[pattern_info["pattern_key"]].update(new_data[pattern_info["pattern_key"]])
else:
hash_data.update(new_data)

# Write the updated data back to the TOML file
with open(hash_file_path, "w") as f:
tomlkit.dump(hash_data, f)
67 changes: 54 additions & 13 deletions surfactant/infoextractors/js_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,49 @@
# SPDX-License-Identifier: MIT
import json
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

import requests
from loguru import logger

import surfactant.plugin
from surfactant.configmanager import ConfigManager
from surfactant.database_manager.database_utils import (
calculate_hash,
load_hash_and_timestamp,
save_hash_and_timestamp,
)
from surfactant.sbomtypes import SBOM, Software


class JSDatabaseManager:
def __init__(self):
self.js_lib_database = None
self._js_lib_database = None # Use the private attribute
self.database_version_file_path = (
ConfigManager().get_data_dir_path() / "infoextractors" / "js_library_patterns.toml"
)
self.pattern_key = "js_library_patterns"
self.pattern_file = "js_library_patterns.json"
self.source = "jsfile.retirejs"
self.new_hash = None
self.download_timestamp = None

@property
def js_lib_database(self) -> Optional[Dict[str, Any]]:
if self._js_lib_database is None:
self.load_db()
return self._js_lib_database

@property
def pattern_info(self) -> Dict[str, Any]:
return {
"pattern_key": self.pattern_key,
"pattern_file": self.pattern_file,
"source": self.source,
"hash_value": self.new_hash,
"timestamp": self.download_timestamp,
}

def load_db(self) -> None:
js_lib_file = (
Expand All @@ -25,15 +55,15 @@ def load_db(self) -> None:

try:
with open(js_lib_file, "r") as regex:
self.js_lib_database = json.load(regex)
self._js_lib_database = json.load(regex)
except FileNotFoundError:
logger.warning(
"Javascript library pattern database could not be loaded. Run `surfactant plugin update-db js_file` to fetch the pattern database."
)
self.js_lib_database = None
self._js_lib_database = None

def get_database(self) -> Optional[Dict[str, Any]]:
return self.js_lib_database
return self._js_lib_database


js_db_manager = JSDatabaseManager()
Expand Down Expand Up @@ -90,12 +120,12 @@ def match_by_attribute(attribute: str, content: str, database: Dict) -> List[Dic
return libs


def download_database() -> Optional[Dict[str, Any]]:
def download_database() -> Optional[str]:
url = "https://raw.githubusercontent.com/RetireJS/retire.js/master/repository/jsrepository-master.json"
response = requests.get(url)
if response.status_code == 200:
logger.info("Request successful!")
return json.loads(response.text)
return response.text

if response.status_code == 404:
logger.error("Resource not found.")
Expand Down Expand Up @@ -129,19 +159,30 @@ def strip_irrelevant_data(retirejs_db: dict) -> dict:

@surfactant.plugin.hookimpl
def update_db() -> str:
"""Retrieves the javascript library CVE database used by retire.js (https://github.com/RetireJS/retire.js/blob/master/repository/jsrepository-master.json) and only keeps the contents under each library's "extractors" section, which contains file hashes and regexes relevant for detecting a specific javascript library by its file name or contents.
raw_data = download_database()
if raw_data is not None:
js_db_manager.new_hash = calculate_hash(raw_data)
current_data = load_hash_and_timestamp(
js_db_manager.database_version_file_path,
js_db_manager.pattern_key,
js_db_manager.pattern_file,
)
if current_data and js_db_manager.new_hash == current_data.get("hash"):
return "No update occurred. Database is up-to-date."

The resulting smaller json is written to js_library_patterns.json in the same directory. This smaller file will be read from to make the checks later on."""
retirejs = download_database()
if retirejs is not None:
retirejs = json.loads(raw_data)
cleaned = strip_irrelevant_data(retirejs)
js_db_manager.download_timestamp = datetime.now(timezone.utc)

path = ConfigManager().get_data_dir_path() / "infoextractors"
path.mkdir(parents=True, exist_ok=True)
json_file_path = (
ConfigManager().get_data_dir_path() / "infoextractors" / "js_library_patterns.json"
)
json_file_path = path / "js_library_patterns.json"
with open(json_file_path, "w") as f:
json.dump(cleaned, f, indent=4)

save_hash_and_timestamp(
js_db_manager.database_version_file_path, js_db_manager.pattern_info
)
return "Update complete."
return "No update occurred."

Expand Down

0 comments on commit a10799e

Please sign in to comment.