diff --git a/README.md b/README.md index f413db1..f8f87ec 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Below are the available options for the configuration file. ### maintain.py Options ``` usage: VulnVault Maintenance [-h] [-c CONFIG] - (--init | --cpefetch | --cvefetch | --cpematchfetch | --dropcpe | --dropcve | --dropcpematch | --updatecpe | --updatecve) + (--init | --fetchcpes | --fetchcves | --fetchcpematches | --dropcpes | --dropcves | --dropcpematches | --updatecpes | --updatecves | --updatecpematches) [-p] options: @@ -61,18 +61,20 @@ options: Operations: Main Operations, must choose one. - --init fetch CPEs and CVEs from NVD - --cpefetch fetch CPEs from NVD - --cvefetch fetch CVEs from NVD - --cpematchfetch fetch CPE matches from NVD - --dropcpe purges all CPEs from CPE collection - --dropcve purges all CVEs from CVE collection - --dropcpematch purges all CPE matches from CPE match collection - --updatecpe updates the CPE collection - --updatecve updates the CVE collection + --init load all collections from NVD + --fetchcpes fetch CPEs from NVD + --fetchcves fetch CVEs from NVD + --fetchcpematches fetch CPE matches from NVD + --dropcpes purges all CPEs from CPE collection + --dropcves purges all CVEs from CVE collection + --dropcpematches purges all CPE matches from CPE match collection + --updatecpes updates the CPE collection + --updatecves updates the CVE collection + --updatecpematches updates the CPE match collection Operation Augments: - -p, --purge purges selected collection before performing operation + -p, --purge purges selected collection before performing + operation. Only functional for fetch operations. Specific NVD API arguments can be passed via a -- suffix and can be in snake_case or camelCase. Example: --cvss_v3_severity HIGH or --cvssV3Severity diff --git a/maintain.py b/maintain.py index 308cd19..617a459 100644 --- a/maintain.py +++ b/maintain.py @@ -3,6 +3,7 @@ """ from __future__ import annotations from datetime import datetime as Datetime +from typing import Callable from pymongo import ReplaceOne @@ -12,174 +13,140 @@ VaultArgumentParser, VaultConfig, VaultMongoClient, - s_print, - VAULT_MONGO + s_print ) from vaultlib import BColors as C -NVD_API: NVDFetch | None = None +vault_mongo: VaultMongoClient +nvd_api: NVDFetch +arg_to_print_and_func: dict[str, tuple[str, Callable]] -def update_metadata(collection: str, datetime: Datetime) -> None: +class MetadataNotFoundException(Exception): """ - Updates the metadata for the selected collection. - - :return: None + Exception raised when a metadata entry cannot be found. """ - VAULT_MONGO.meta.update_one( - {"collection": collection}, - {"$set": {"updated": datetime}}, - upsert=True - ) -def drop_metadata() -> None: +def setup(config_path: str) -> tuple[VaultMongoClient, NVDFetch, dict[str, tuple[str, Callable]]]: """ - Drops the metadata collection. + Returns values for maintain.py global variables. - :return: None + :param config_path: Configuration file path + :return: (VAULT_MONGO, NVDFetch, ARGS_TO_PRINT_AND_FUNC) """ - VAULT_MONGO.db.drop_collection("metadata") + config = VaultConfig(config_path) + s_print("Connecting to MongoDB...") + client = VaultMongoClient(config).raise_if_not_connected() + C.print_success("Connected.") + api = NVDFetch(config) + return client, api, { + "cpes": ("CPEs", api.fetch_cpes), + "cves": ("CVEs", api.fetch_cves), + "cpematches": ("CPE matches", api.fetch_cpe_matches), + } def initial_load(now: Datetime) -> None: """ - Provides the initial procedure of loading all CVEs and CPEs from NVD. - - :return: None - """ - drop_metadata() - insert_cpes(now, drop=True) - insert_cves(now, drop=True) - insert_cpe_matches(now, drop=True) - - -def insert_cpes(now: Datetime, drop: bool = False, **kwargs) -> None: - """ - Insert procudure for inserting CPEs. - - :param now: DateTime now - :param drop: Drop collection before inserting CPEs. - :return: None - """ - if drop: - drop_cpes() - C.print_underline("Starting Collection and Insertion Procedure for CPEs") - cpes = NVD_API.fetch_cpes(**kwargs) - C.print_success("Collection complete.") - s_print("Inserting CPEs...") - VAULT_MONGO.cpes.insert_many(cpes) - C.print_success("Complete.") - s_print("Creating Indexes...") - VAULT_MONGO.cpes.create_index([("vendor", 1)]) - VAULT_MONGO.cpes.create_index([("product", 1)]) - VAULT_MONGO.cpes.create_index([("version", 1)]) - C.print_success("Created.") - C.print_success("Collection and Insertion Complete.") - update_metadata("cpes", now) - - -def drop_cpes() -> None: - """ - Procedure for dropping all CPEs from CPE collection. - - :return: None - """ - s_print("Dropping CPEs collection...") - VAULT_MONGO.db.drop_collection("cpes") - C.print_success("Dropped.") - - -def update_cpes(now: Datetime, **kwargs) -> None: - """ - Procedure for updating CPEs in CPE collection. + Provides the initial procedure of fetching and insering data into + all available collections from NVD. :param now: DateTime now :return: None """ - C.print_underline("Updating CPEs collection") - counts = VAULT_MONGO.cpes.bulk_write( - [ReplaceOne({"_id": cpe["_id"]}, cpe, upsert=True) for cpe in NVD_API.fetch_cpes(**kwargs)] - ) - C.print_success(f"{f'{counts.inserted_count} CPEs Inserted. ' if counts.inserted_count else ''}" - f"{counts.modified_count} CPEs Updated.") + vault_mongo.db.drop_collection("metadata") + insert_collection("cpes", now, drop=True) update_metadata("cpes", now) + insert_collection("cves", now, drop=True) + update_metadata("cves", now) + insert_collection("cpematches", now, drop=True) + update_metadata("cpematches", now) -def insert_cves(now: Datetime, drop: bool = False, **kwargs) -> None: +def insert_collection(coll: str, now: Datetime, drop: bool = False, **kwargs) -> None: """ - Insert procudure for inserting CVEs. + Procedure for inserting API entries. + :param coll: Collection name :param now: DateTime now - :param drop: Drop collection before inserting CVEs. + :param drop: Drop collection before inserting entries + :param kwargs: API keyward arguments :return: None """ + print_coll_str, api_call = arg_to_print_and_func[coll] if drop: - drop_cves() - C.print_underline("Starting Collection and Insertion Procedure for CVEs") - cves = NVD_API.fetch_cves(**kwargs) + drop_collection(coll) + C.print_underline(f"Starting Collection and Insertion Procedure for {print_coll_str}") + results = api_call(**kwargs) C.print_success("Collection complete.") - s_print("Inserting CVEs...") - VAULT_MONGO.cves.insert_many(cves) + s_print(f"Inserting {print_coll_str}...") + getattr(vault_mongo, coll).insert_many(results) C.print_success("Collection and Insertion Complete.") - update_metadata("cves", now) + update_metadata(coll, now) -def drop_cves() -> None: +def drop_collection(coll: str) -> None: """ - Procedure for dropping all CVEs from CVE collection. + Procedure for dropping a selected collection. + :param coll: Collection name :return: None """ - s_print("Dropping CVEs collection...") - VAULT_MONGO.db.drop_collection("cves") + print_coll_str, _ = arg_to_print_and_func[coll] + s_print(f"Dropping {print_coll_str} collection...") + vault_mongo.db.drop_collection(coll) C.print_success("Dropped.") -def update_cves(now: Datetime, **kwargs) -> None: +def update_collection(coll: str, now: Datetime, **kwargs) -> None: """ - Procedure for updating CPEs in CPE collection. + Procedure for updating collections. + :param coll: Collection name :param now: DateTime now + :param kwargs: API keyward arguments :return: None """ + print_coll_str, api_call = arg_to_print_and_func[coll] C.print_underline("Updating CVEs collection") - counts = VAULT_MONGO.cves.bulk_write( - [ReplaceOne({"_id": cve["_id"]}, cve, upsert=True) for cve in NVD_API.fetch_cves(**kwargs)] - ) - C.print_success(f"{f'{counts.inserted_count} CVEs Inserted. ' if counts.inserted_count else ''}" - f"{counts.modified_count} CVEs Updated.") - update_metadata("cves", now) - - -def drop_cpe_matches() -> None: - """ - Procedure for dropping all CVEs from CVE collection. + metadata = vault_mongo.meta.find_one({"collection": coll}) + if not metadata or not (last_updated := metadata.get("updated")): + raise MetadataNotFoundException(f"No {coll} metadata found") + results = [ + ReplaceOne({"_id": x["_id"]}, x, upsert=True) + for x in api_call( + last_mod_start_date=last_updated.isoformat(), + last_mod_end_date=now.isoformat(), + **kwargs + ) + ] + if results: + counts = vault_mongo.cves.bulk_write(results) + C.print_success("\n".join(( + f"{counts.upserted_count} {print_coll_str} Upserted." + if counts.upserted_count else "", + f"{counts.modified_count} {print_coll_str} Modified." + if counts.modified_count else "", + f"{counts.inserted_count} {print_coll_str} Inserted." + if counts.inserted_count else "" + ))) + else: + C.print_fail(f"No {print_coll_str} to update.") + update_metadata(coll, now) - :return: None - """ - s_print("Dropping CVEs collection...") - VAULT_MONGO.db.drop_collection("cpematches") - C.print_success("Dropped.") - -def insert_cpe_matches(now: Datetime, drop: bool = False, **kwargs) -> None: +def update_metadata(collection: str, datetime: Datetime) -> None: """ - Insert procudure for inserting CPE matches. + Updates the metadata for the selected collection. - :param now: DateTime now - :param drop: Drop collection before inserting CVEs. :return: None """ - if drop: - drop_cpe_matches() - C.print_underline("Starting Collection and Insertion Procedure for CPE matches") - cpe_matches = NVD_API.fetch_cpe_matches(**kwargs) - C.print_success("Collection complete.") - s_print("Inserting CPE matches...") - VAULT_MONGO.cpematches.insert_many(cpe_matches) - C.print_success("Collection and Insertion Complete.") - update_metadata("cpematches", now) + vault_mongo.meta.update_one( + {"collection": collection}, + {"$set": {"updated": datetime}}, + upsert=True + ) if __name__ == "__main__": @@ -191,50 +158,52 @@ def insert_cpe_matches(now: Datetime, drop: bool = False, **kwargs) -> None: op_group = arg_parse.add_argument_group("Operations", "Main Operations, must choose one.") op_select = op_group.add_mutually_exclusive_group(required=True) op_select.add_argument("--init", action="store_true", - help="fetch CPEs and CVEs from NVD") - op_select.add_argument("--cpefetch", action="store_true", + help="load all collections from NVD") + op_select.add_argument("--fetchcpes", action="store_true", help="fetch CPEs from NVD") - op_select.add_argument("--cvefetch", action="store_true", + op_select.add_argument("--fetchcves", action="store_true", help="fetch CVEs from NVD") - op_select.add_argument("--cpematchfetch", action="store_true", + op_select.add_argument("--fetchcpematches", action="store_true", help="fetch CPE matches from NVD") - op_select.add_argument("--dropcpe", action="store_true", + op_select.add_argument("--dropcpes", action="store_true", help="purges all CPEs from CPE collection") - op_select.add_argument("--dropcve", action="store_true", + op_select.add_argument("--dropcves", action="store_true", help="purges all CVEs from CVE collection") - op_select.add_argument("--dropcpematch", action="store_true", + op_select.add_argument("--dropcpematches", action="store_true", help="purges all CPE matches from CPE match collection") - op_select.add_argument("--updatecpe", action="store_true", + op_select.add_argument("--updatecpes", action="store_true", help="updates the CPE collection") - op_select.add_argument("--updatecve", action="store_true", + op_select.add_argument("--updatecves", action="store_true", help="updates the CVE collection") + op_select.add_argument("--updatecpematches", action="store_true", + help="updates the CPE match collection") op_augments = arg_parse.add_argument_group("Operation Augments") op_augments.add_argument("-p", "--purge", action="store_true", default=False, - help="purges selected collection before performing operation") + help="purges selected collection before performing operation. " + "Only functional for fetch operations.") args, api_options = arg_parse.parse_known_args() api_options = {api_options[i][2:]: api_options[i + 1] for i in range(0, len(api_options), 2)} - config = VaultConfig(args.config) - - s_print("Connecting to MongoDB...") - VAULT_MONGO = VaultMongoClient(config).raise_if_not_connected() - C.print_success("Connected.") - NVD_API = NVDFetch(config) + vault_mongo, nvd_api, arg_to_print_and_func = setup(args.config) d_now = Datetime.now() if args.init: initial_load(d_now) - elif args.cpefetch: - insert_cpes(d_now, drop=args.purge, **api_options) - elif args.cvefetch: - insert_cves(d_now, drop=args.purge, **api_options) - elif args.cpematchfetch: - insert_cpe_matches(d_now, drop=args.purge, **api_options) - elif args.dropcpe: - drop_cpes() - elif args.dropcve: - drop_cves() - elif args.updatecpe: - update_cpes(d_now, **api_options) - elif args.updatecve: - update_cves(d_now, **api_options) + elif args.fetchcpes: + insert_collection("cpes", d_now, drop=args.purge, **api_options) + elif args.fetchcves: + insert_collection("cves", d_now, drop=args.purge, **api_options) + elif args.fetchcpematches: + insert_collection("cpematches", d_now, drop=args.purge, **api_options) + elif args.dropcpes: + drop_collection("cpes") + elif args.dropcves: + drop_collection("cves") + elif args.dropcpematches: + drop_collection("cpematches") + elif args.updatecpes: + update_collection("cpes", d_now, **api_options) + elif args.updatecves: + update_collection("cves", d_now, **api_options) + elif args.updatecpematches: + update_collection("cpematches", d_now, **api_options) diff --git a/query.py b/query.py index ed30f4e..cb8d8e7 100644 --- a/query.py +++ b/query.py @@ -104,12 +104,15 @@ def ml_find_cpe( limit: int = 10 ) -> list[tuple[float, CPESchema]]: """ - Using fuzzy matching, find the most similar CPEs - given a string containing the Vendor, Product, and/or Version + Using Levenshtein Distance, find the most similar CPE(s) + given a string containing the Vendor, Product, and/or Version. + Takes a weighted score across vendor (40%), product (40%), + and version (20%) for overall similarity. :param cpe_search_str: String to search for :param frmt: The specific ordering of token elements in the string. - V = Vendor, p = Product, v = Version. + V = Vendor, p = Product, v = Version. Defaults to "Vpv", + choices are "Vpv" and "pv". :param threshold: Minimum WRatio score to be included in results. :param limit: Maximum number of results to return. :return: Sorted list of CPEs from highest WRatio score to lowest. diff --git a/vaultlib/__init__.py b/vaultlib/__init__.py index 7f699eb..561eb49 100644 --- a/vaultlib/__init__.py +++ b/vaultlib/__init__.py @@ -2,5 +2,5 @@ from .api import NVDFetch from .argparser import VaultArgumentParser from .config import VaultConfig -from .mongo import VaultMongoClient, VAULT_MONGO +from .mongo import VaultMongoClient from .utils import * diff --git a/vaultlib/api.py b/vaultlib/api.py index 7ccf50a..4f342b1 100644 --- a/vaultlib/api.py +++ b/vaultlib/api.py @@ -427,14 +427,15 @@ def __init__( # pylint: disable=R0913 api_resp = api_call(params) self.calls: list[partial] = [] - for curr_index in range( - api_resp["startIndex"] + api_resp["resultsPerPage"], - api_resp["totalResults"], - api_resp["resultsPerPage"] - ): - curr_params = deepcopy(params) - curr_params["startIndex"] = curr_index - self.calls.append(partial(api_call, curr_params)) + if int(api_resp.get("resultsPerPage")): + for curr_index in range( + api_resp["startIndex"] + api_resp["resultsPerPage"], + api_resp["totalResults"], + api_resp["resultsPerPage"] + ): + curr_params = deepcopy(params) + curr_params["startIndex"] = curr_index + self.calls.append(partial(api_call, curr_params)) self.total_calls = len(self.calls) self.results.extend(api_resp[data_key]) diff --git a/vaultlib/mongo.py b/vaultlib/mongo.py index 9b81878..d9c7128 100644 --- a/vaultlib/mongo.py +++ b/vaultlib/mongo.py @@ -42,6 +42,3 @@ def raise_if_not_connected( except ConnectionFailure as err: raise ConnectionFailure(exception_str) from err return self - - -VAULT_MONGO: VaultMongoClient | None = None