Skip to content

Commit

Permalink
Heavy optimization on maintain.py:
Browse files Browse the repository at this point in the history
Making generic insert, drop, and update collection functions instead of separate function per operation.
Fully implementing update function, utilizing information from metadata to update from the last time the collection was updated.
  • Loading branch information
ausmaster committed Jun 25, 2024
1 parent 17b21a3 commit 6c02754
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 172 deletions.
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Below are the available options for the configuration file.
### maintain.py Options
```
usage: VulnVault Maintenance [-h] [-c CONFIG]
(--init | --cpefetch | --cvefetch | --cpematchfetch | --dropcpe | --dropcve | --dropcpematch | --updatecpe | --updatecve)
(--init | --fetchcpes | --fetchcves | --fetchcpematches | --dropcpes | --dropcves | --dropcpematches | --updatecpes | --updatecves | --updatecpematches)
[-p]
options:
Expand All @@ -61,18 +61,20 @@ options:
Operations:
Main Operations, must choose one.
--init fetch CPEs and CVEs from NVD
--cpefetch fetch CPEs from NVD
--cvefetch fetch CVEs from NVD
--cpematchfetch fetch CPE matches from NVD
--dropcpe purges all CPEs from CPE collection
--dropcve purges all CVEs from CVE collection
--dropcpematch purges all CPE matches from CPE match collection
--updatecpe updates the CPE collection
--updatecve updates the CVE collection
--init load all collections from NVD
--fetchcpes fetch CPEs from NVD
--fetchcves fetch CVEs from NVD
--fetchcpematches fetch CPE matches from NVD
--dropcpes purges all CPEs from CPE collection
--dropcves purges all CVEs from CVE collection
--dropcpematches purges all CPE matches from CPE match collection
--updatecpes updates the CPE collection
--updatecves updates the CVE collection
--updatecpematches updates the CPE match collection
Operation Augments:
-p, --purge purges selected collection before performing operation
-p, --purge purges selected collection before performing
operation. Only functional for fetch operations.
Specific NVD API arguments can be passed via a -- suffix and can be in
snake_case or camelCase. Example: --cvss_v3_severity HIGH or --cvssV3Severity
Expand Down
261 changes: 115 additions & 146 deletions maintain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
from __future__ import annotations
from datetime import datetime as Datetime
from typing import Callable

from pymongo import ReplaceOne

Expand All @@ -12,174 +13,140 @@
VaultArgumentParser,
VaultConfig,
VaultMongoClient,
s_print,
VAULT_MONGO
s_print
)
from vaultlib import BColors as C

NVD_API: NVDFetch | None = None
vault_mongo: VaultMongoClient
nvd_api: NVDFetch
arg_to_print_and_func: dict[str, tuple[str, Callable]]


def update_metadata(collection: str, datetime: Datetime) -> None:
class MetadataNotFoundException(Exception):
"""
Updates the metadata for the selected collection.
:return: None
Exception raised when a metadata entry cannot be found.
"""
VAULT_MONGO.meta.update_one(
{"collection": collection},
{"$set": {"updated": datetime}},
upsert=True
)


def drop_metadata() -> None:
def setup(config_path: str) -> tuple[VaultMongoClient, NVDFetch, dict[str, tuple[str, Callable]]]:
"""
Drops the metadata collection.
Returns values for maintain.py global variables.
:return: None
:param config_path: Configuration file path
:return: (VAULT_MONGO, NVDFetch, ARGS_TO_PRINT_AND_FUNC)
"""
VAULT_MONGO.db.drop_collection("metadata")
config = VaultConfig(config_path)
s_print("Connecting to MongoDB...")
client = VaultMongoClient(config).raise_if_not_connected()
C.print_success("Connected.")
api = NVDFetch(config)
return client, api, {
"cpes": ("CPEs", api.fetch_cpes),
"cves": ("CVEs", api.fetch_cves),
"cpematches": ("CPE matches", api.fetch_cpe_matches),
}


def initial_load(now: Datetime) -> None:
"""
Provides the initial procedure of loading all CVEs and CPEs from NVD.
:return: None
"""
drop_metadata()
insert_cpes(now, drop=True)
insert_cves(now, drop=True)
insert_cpe_matches(now, drop=True)


def insert_cpes(now: Datetime, drop: bool = False, **kwargs) -> None:
"""
Insert procudure for inserting CPEs.
:param now: DateTime now
:param drop: Drop collection before inserting CPEs.
:return: None
"""
if drop:
drop_cpes()
C.print_underline("Starting Collection and Insertion Procedure for CPEs")
cpes = NVD_API.fetch_cpes(**kwargs)
C.print_success("Collection complete.")
s_print("Inserting CPEs...")
VAULT_MONGO.cpes.insert_many(cpes)
C.print_success("Complete.")
s_print("Creating Indexes...")
VAULT_MONGO.cpes.create_index([("vendor", 1)])
VAULT_MONGO.cpes.create_index([("product", 1)])
VAULT_MONGO.cpes.create_index([("version", 1)])
C.print_success("Created.")
C.print_success("Collection and Insertion Complete.")
update_metadata("cpes", now)


def drop_cpes() -> None:
"""
Procedure for dropping all CPEs from CPE collection.
:return: None
"""
s_print("Dropping CPEs collection...")
VAULT_MONGO.db.drop_collection("cpes")
C.print_success("Dropped.")


def update_cpes(now: Datetime, **kwargs) -> None:
"""
Procedure for updating CPEs in CPE collection.
Provides the initial procedure of fetching and insering data into
all available collections from NVD.
:param now: DateTime now
:return: None
"""
C.print_underline("Updating CPEs collection")
counts = VAULT_MONGO.cpes.bulk_write(
[ReplaceOne({"_id": cpe["_id"]}, cpe, upsert=True) for cpe in NVD_API.fetch_cpes(**kwargs)]
)
C.print_success(f"{f'{counts.inserted_count} CPEs Inserted. ' if counts.inserted_count else ''}"
f"{counts.modified_count} CPEs Updated.")
vault_mongo.db.drop_collection("metadata")
insert_collection("cpes", now, drop=True)
update_metadata("cpes", now)
insert_collection("cves", now, drop=True)
update_metadata("cves", now)
insert_collection("cpematches", now, drop=True)
update_metadata("cpematches", now)


def insert_cves(now: Datetime, drop: bool = False, **kwargs) -> None:
def insert_collection(coll: str, now: Datetime, drop: bool = False, **kwargs) -> None:
"""
Insert procudure for inserting CVEs.
Procedure for inserting API entries.
:param coll: Collection name
:param now: DateTime now
:param drop: Drop collection before inserting CVEs.
:param drop: Drop collection before inserting entries
:param kwargs: API keyward arguments
:return: None
"""
print_coll_str, api_call = arg_to_print_and_func[coll]
if drop:
drop_cves()
C.print_underline("Starting Collection and Insertion Procedure for CVEs")
cves = NVD_API.fetch_cves(**kwargs)
drop_collection(coll)
C.print_underline(f"Starting Collection and Insertion Procedure for {print_coll_str}")
results = api_call(**kwargs)
C.print_success("Collection complete.")
s_print("Inserting CVEs...")
VAULT_MONGO.cves.insert_many(cves)
s_print(f"Inserting {print_coll_str}...")
getattr(vault_mongo, coll).insert_many(results)
C.print_success("Collection and Insertion Complete.")
update_metadata("cves", now)
update_metadata(coll, now)


def drop_cves() -> None:
def drop_collection(coll: str) -> None:
"""
Procedure for dropping all CVEs from CVE collection.
Procedure for dropping a selected collection.
:param coll: Collection name
:return: None
"""
s_print("Dropping CVEs collection...")
VAULT_MONGO.db.drop_collection("cves")
print_coll_str, _ = arg_to_print_and_func[coll]
s_print(f"Dropping {print_coll_str} collection...")
vault_mongo.db.drop_collection(coll)
C.print_success("Dropped.")


def update_cves(now: Datetime, **kwargs) -> None:
def update_collection(coll: str, now: Datetime, **kwargs) -> None:
"""
Procedure for updating CPEs in CPE collection.
Procedure for updating collections.
:param coll: Collection name
:param now: DateTime now
:param kwargs: API keyward arguments
:return: None
"""
print_coll_str, api_call = arg_to_print_and_func[coll]
C.print_underline("Updating CVEs collection")
counts = VAULT_MONGO.cves.bulk_write(
[ReplaceOne({"_id": cve["_id"]}, cve, upsert=True) for cve in NVD_API.fetch_cves(**kwargs)]
)
C.print_success(f"{f'{counts.inserted_count} CVEs Inserted. ' if counts.inserted_count else ''}"
f"{counts.modified_count} CVEs Updated.")
update_metadata("cves", now)


def drop_cpe_matches() -> None:
"""
Procedure for dropping all CVEs from CVE collection.
metadata = vault_mongo.meta.find_one({"collection": coll})
if not metadata or not (last_updated := metadata.get("updated")):
raise MetadataNotFoundException(f"No {coll} metadata found")
results = [
ReplaceOne({"_id": x["_id"]}, x, upsert=True)
for x in api_call(
last_mod_start_date=last_updated.isoformat(),
last_mod_end_date=now.isoformat(),
**kwargs
)
]
if results:
counts = vault_mongo.cves.bulk_write(results)
C.print_success("\n".join((
f"{counts.upserted_count} {print_coll_str} Upserted."
if counts.upserted_count else "",
f"{counts.modified_count} {print_coll_str} Modified."
if counts.modified_count else "",
f"{counts.inserted_count} {print_coll_str} Inserted."
if counts.inserted_count else ""
)))
else:
C.print_fail(f"No {print_coll_str} to update.")
update_metadata(coll, now)

:return: None
"""
s_print("Dropping CVEs collection...")
VAULT_MONGO.db.drop_collection("cpematches")
C.print_success("Dropped.")


def insert_cpe_matches(now: Datetime, drop: bool = False, **kwargs) -> None:
def update_metadata(collection: str, datetime: Datetime) -> None:
"""
Insert procudure for inserting CPE matches.
Updates the metadata for the selected collection.
:param now: DateTime now
:param drop: Drop collection before inserting CVEs.
:return: None
"""
if drop:
drop_cpe_matches()
C.print_underline("Starting Collection and Insertion Procedure for CPE matches")
cpe_matches = NVD_API.fetch_cpe_matches(**kwargs)
C.print_success("Collection complete.")
s_print("Inserting CPE matches...")
VAULT_MONGO.cpematches.insert_many(cpe_matches)
C.print_success("Collection and Insertion Complete.")
update_metadata("cpematches", now)
vault_mongo.meta.update_one(
{"collection": collection},
{"$set": {"updated": datetime}},
upsert=True
)


if __name__ == "__main__":
Expand All @@ -191,50 +158,52 @@ def insert_cpe_matches(now: Datetime, drop: bool = False, **kwargs) -> None:
op_group = arg_parse.add_argument_group("Operations", "Main Operations, must choose one.")
op_select = op_group.add_mutually_exclusive_group(required=True)
op_select.add_argument("--init", action="store_true",
help="fetch CPEs and CVEs from NVD")
op_select.add_argument("--cpefetch", action="store_true",
help="load all collections from NVD")
op_select.add_argument("--fetchcpes", action="store_true",
help="fetch CPEs from NVD")
op_select.add_argument("--cvefetch", action="store_true",
op_select.add_argument("--fetchcves", action="store_true",
help="fetch CVEs from NVD")
op_select.add_argument("--cpematchfetch", action="store_true",
op_select.add_argument("--fetchcpematches", action="store_true",
help="fetch CPE matches from NVD")
op_select.add_argument("--dropcpe", action="store_true",
op_select.add_argument("--dropcpes", action="store_true",
help="purges all CPEs from CPE collection")
op_select.add_argument("--dropcve", action="store_true",
op_select.add_argument("--dropcves", action="store_true",
help="purges all CVEs from CVE collection")
op_select.add_argument("--dropcpematch", action="store_true",
op_select.add_argument("--dropcpematches", action="store_true",
help="purges all CPE matches from CPE match collection")
op_select.add_argument("--updatecpe", action="store_true",
op_select.add_argument("--updatecpes", action="store_true",
help="updates the CPE collection")
op_select.add_argument("--updatecve", action="store_true",
op_select.add_argument("--updatecves", action="store_true",
help="updates the CVE collection")
op_select.add_argument("--updatecpematches", action="store_true",
help="updates the CPE match collection")
op_augments = arg_parse.add_argument_group("Operation Augments")
op_augments.add_argument("-p", "--purge", action="store_true", default=False,
help="purges selected collection before performing operation")
help="purges selected collection before performing operation. "
"Only functional for fetch operations.")
args, api_options = arg_parse.parse_known_args()
api_options = {api_options[i][2:]: api_options[i + 1] for i in range(0, len(api_options), 2)}

config = VaultConfig(args.config)

s_print("Connecting to MongoDB...")
VAULT_MONGO = VaultMongoClient(config).raise_if_not_connected()
C.print_success("Connected.")
NVD_API = NVDFetch(config)
vault_mongo, nvd_api, arg_to_print_and_func = setup(args.config)

d_now = Datetime.now()
if args.init:
initial_load(d_now)
elif args.cpefetch:
insert_cpes(d_now, drop=args.purge, **api_options)
elif args.cvefetch:
insert_cves(d_now, drop=args.purge, **api_options)
elif args.cpematchfetch:
insert_cpe_matches(d_now, drop=args.purge, **api_options)
elif args.dropcpe:
drop_cpes()
elif args.dropcve:
drop_cves()
elif args.updatecpe:
update_cpes(d_now, **api_options)
elif args.updatecve:
update_cves(d_now, **api_options)
elif args.fetchcpes:
insert_collection("cpes", d_now, drop=args.purge, **api_options)
elif args.fetchcves:
insert_collection("cves", d_now, drop=args.purge, **api_options)
elif args.fetchcpematches:
insert_collection("cpematches", d_now, drop=args.purge, **api_options)
elif args.dropcpes:
drop_collection("cpes")
elif args.dropcves:
drop_collection("cves")
elif args.dropcpematches:
drop_collection("cpematches")
elif args.updatecpes:
update_collection("cpes", d_now, **api_options)
elif args.updatecves:
update_collection("cves", d_now, **api_options)
elif args.updatecpematches:
update_collection("cpematches", d_now, **api_options)
Loading

0 comments on commit 6c02754

Please sign in to comment.