diff --git a/README.md b/README.md index 99bbdca..b19c888 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ Below are the available options for the configuration file. - nvdCveChApi: NVD CVE Change History API Endpoint - nvdCpeApi: NVD CPE API Endpoint - nvdCpeMcApi: NVD CPE Match Criteria API -- apiKey: NVD API Key +- apiKey: NVD API Key as string OR fetch from environment variable by setting to {"env": } - mongoHost: MongoDB Docker Container Hostname - mongoPort: MongoDB Docker Container Port - connRetries: Connection Retry Limit diff --git a/maintain.py b/maintain.py index eea2f7b..3a78c8d 100644 --- a/maintain.py +++ b/maintain.py @@ -113,7 +113,7 @@ def update_collection(coll: str, now: Datetime, **kwargs) -> None: metadata = vault_mongo.meta.find_one({"collection": coll}) if not metadata or not (last_updated := metadata.get("updated")): raise MetadataNotFoundException(f"No {coll} metadata found") - results = [ + results: list[ReplaceOne] = [ ReplaceOne({"_id": x["_id"]}, x, upsert=True) for x in api_call( last_mod_start_date=last_updated.isoformat(), @@ -182,7 +182,10 @@ def update_metadata(collection: str, datetime: Datetime) -> None: help="purges selected collection before performing operation. " "Only functional for fetch operations.") args, api_options = arg_parse.parse_known_args() - api_options = {api_options[i][2:]: api_options[i + 1] for i in range(0, len(api_options), 2)} + api_options: dict[str, str] = { + api_options[i][2:]: api_options[i + 1] + for i in range(0, len(api_options), 2) + } vault_mongo, nvd_api, arg_to_print_and_func = setup(args.config) diff --git a/vaultlib/config.py b/vaultlib/config.py index eae41ea..91e9ec1 100644 --- a/vaultlib/config.py +++ b/vaultlib/config.py @@ -2,6 +2,7 @@ This is used to configure VulnVault based on a configuration file. """ from json import load +from os import getenv from pathlib import Path from .utils import camel_to_snake @@ -37,11 +38,11 @@ def __init__(self, config_path: str = "config.json") -> None: # Number of threads used to fetch API data self.fetch_threads: int = 3 # NLTK "Punkt" Pre-trained model URL - self.punkt_url = "punkt" + self.punkt_url: str = "punkt" # All Config values from config.json are converted from camelCase to snake_case # overrides instance variable if exists - if (config_path := Path(config_path)).exists(): + if (config_path := Path(config_path)).exists(): # type: ignore g_vars = dir(self) with open(config_path, "r", encoding="utf-8") as config_file: for config_key, config_value in load(config_file).items(): @@ -49,7 +50,14 @@ def __init__(self, config_path: str = "config.json") -> None: if config_key not in g_vars: continue - if (g_var_type := type(getattr(self, config_key))) is not str: + if config_key == "api_key" and isinstance(config_value, dict): + try: + self.api_key = getenv(config_value["env"]) # type: ignore + except KeyError as e: + raise ValueError( + "api_key JSON object does not have a \"env\" property" + ) from e + elif (g_var_type := type(getattr(self, config_key))) is not str: setattr(self, config_key, g_var_type(config_value)) else: setattr(self, config_key, config_value)