Skip to content

Commit

Permalink
All query.py functions now produce printable information.
Browse files Browse the repository at this point in the history
  • Loading branch information
ausmaster committed Jun 26, 2024
1 parent 8b2f024 commit 6d37976
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 83 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Operations:
--cpeid CPEID print CPE information about a specific CPE by CPE ID
--cve CVE print CVE information about a specfic CVE by CVE ID
--cpe2cves CPE2CVES print all CVEs given a CPE name
--str2cpes STR2CPES find closest matching CPE given string in the form
'<VENDOR> <PRODUCT> <VERSION>'
--str2cpes STR2CPES prints closest matching CPE(s) given string in the
form '<VENDOR> <PRODUCT> <VERSION>'.
```

142 changes: 69 additions & 73 deletions query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
"""
from __future__ import annotations

from datetime import datetime as Datetime
from operator import itemgetter
from sys import stdout
from textwrap import wrap
from typing import Callable, Literal, Final, Generator
from typing import Callable, Literal, Generator

import nltk
from nltk.tokenize import word_tokenize
Expand All @@ -17,39 +15,7 @@
# pylint: disable=E0401,E0611
from vaultlib import VaultArgumentParser, VaultConfig, VaultMongoClient, s_print
from vaultlib import BColors as C
from vaultlib.api import CPESchema, CVESchema

DATETIME_FORMAT: Final[str] = "%m/%d/%Y %H:%M:%S"


def cve_to_str(cve: CVESchema) -> str:
"""
Creates a printable representation of a CVE document.
:param cve: CVE document from Mongo
:return: String representation of a CVE document
"""
if not cve:
return ""
# Python 3.8 - 3.11 do not allow backslashes inside f-string expression
description = "\n".join(wrap(cve['description']))
return f"""[{cve['_id']}]
Published: {Datetime.fromisoformat(cve['published']).strftime(DATETIME_FORMAT)}
Last Modified: {Datetime.fromisoformat(cve['last_modified']).strftime(DATETIME_FORMAT)}
Status: {cve['status']}
Description: {description}
"""


def gen_printable_cves(cve_cursor: Cursor[CVESchema]) -> Generator[str, None, None]:
"""
Generator function that creates a printable representation of each CVE
:param cve_cursor: Results cursor from Pymongo
:return: Yield string representation of CVE
"""
for cve in cve_cursor:
yield cve_to_str(cve)
from vaultlib.api import CPESchema, CVESchema, stringify_results, cpe_str, cve_str


class VaultQuery:
Expand All @@ -59,91 +25,97 @@ class VaultQuery:
def __init__(self, mongo_client: VaultMongoClient) -> None:
self.client = mongo_client

def q_cve_id(self, cve_id: str) -> None:
def p_cve_id(self, cve_id: str) -> None:
"""
Prints CVE information given CVE ID.
:param cve_id: The CVE ID
:return: None
"""
if cve := self.client.cves.find_one({"_id": cve_id}):
print(cve_to_str(cve))
print(cve_str(cve))
else:
print(f"{cve_id} not found.")

def q_cpe_id(self, cpe_id: str) -> CPESchema | None:
def p_cpe_id(self, cpe_id: str) -> None:
"""
Prints CPE information given CPE ID.
:param cpe_id: The CPE ID
:return: Dict of CPE.
"""
return self.client.cpes.find_one({"_id": cpe_id})
if cpe := self.client.cpes.find_one({"_id": cpe_id}):
print(cpe_str(cpe))
else:
print(f"{cpe_id} not found.")

def q_cpe_name(self, cpe_name: str) -> CPESchema | None:
def p_cpe_name(self, cpe_name: str) -> None:
"""
Prints CPE information given CPE Name.
:param cpe_name: The CPE Name
:return: Dict of CPE.
"""
return self.client.cpes.find_one({"cpe_name": cpe_name})
if cpe := self.client.cpes.find_one({"cpe_name": cpe_name}):
print(cpe_str(cpe))
else:
print(f"{cpe_name} not found.")

def q_cpe_matches(self, cpe_id: str) -> Cursor:
"""
Prints CPE matches information given CPE ID.
Queries CPE matches information given CPE ID.
:param cpe_id: The CPE ID
:return: Dict of CPE.
"""
return self.client.cpematches.find({"matches": cpe_id})

def cpe_to_cves(self, cpe_id: str) -> None:
def q_cpe_to_cves(self, cpe_id: str) -> Cursor[CVESchema]:
"""
Prints all CVEs given a CPE ID.
Queries CPE matches information given CPE ID.
:param cpe_id: The CPE ID
:return: List of all CVEs
:return: Cursor for all CVEs
"""
for cve in gen_printable_cves(self.client.cves.find({
return self.client.cves.find({
"configurations.nodes.cpeMatch": {
"$elemMatch": {
"matchCriteriaId": {
"$in": [match["_id"] for match in self.q_cpe_matches(cpe_id)]
}
}
}
})):
print(cve)
})

def p_cpe_to_cves(self, cpe_id: str) -> None:
"""
Prints all CVEs given a CPE ID.
:param cpe_id: The CPE ID
:return: List of all CVEs
"""
for str_cve in stringify_results(self.q_cpe_to_cves(cpe_id)):
print(str_cve)

def cpe_name_to_cves(self, cpe_name: str) -> None:
def p_cpename_to_cves(self, cpe_name: str) -> None:
"""
Returns all CVEs given a CPE Name.
Prints all CVEs given a CPE Name.
:param cpe_name: The CPE Name
:return: List of all CVEs
"""
cpe = self.q_cpe_name(cpe_name)
cpe = self.client.cpes.find_one({"cpe_name": cpe_name})
if not cpe:
return
for cve in gen_printable_cves(self.client.cves.find({
"configurations.nodes.cpeMatch": {
"$elemMatch": {
"matchCriteriaId": {
"$in": [match["_id"] for match in self.q_cpe_matches(cpe["_id"])]
}
}
}
})):
print(cve)
self.p_cpe_to_cves(cpe["_id"])

def ml_find_cpe(
self,
cpe_search_str: str,
frmt: Literal["Vpv", "pv"] = "Vpv",
threshold: float = 80.0,
limit: int = 10
) -> list[tuple[float, CPESchema]]:
) -> Generator[tuple[float, CPESchema], None, None]:
"""
Using Levenshtein Distance, find the most similar CPE(s)
given a string containing the Vendor, Product, and/or Version.
Expand All @@ -155,8 +127,8 @@ def ml_find_cpe(
V = Vendor, p = Product, v = Version. Defaults to "Vpv",
choices are "Vpv" and "pv".
:param threshold: Minimum WRatio score to be included in results.
:param limit: Maximum number of results to return.
:return: Sorted list of CPEs from highest WRatio score to lowest.
:param limit: Maximum number of results to return. Defaults to 10, set to -1 to return all.
:return: Generator that yields a sorted list of CPEs from highest WRatio score to lowest.
"""
tokens = word_tokenize(cpe_search_str.lower())
weights = {"vendor": 0.4, "product": 0.4, "version": 0.2}
Expand Down Expand Up @@ -195,7 +167,31 @@ def get_weighted_score(scores: list[float]) -> float:
if (score := get_weighted_score(match_scores)) > threshold:
matches.append((score, cpe))
matches.sort(key=itemgetter(0), reverse=True)
return matches[:limit]
for entry_num, match in enumerate(matches, 1):
if limit != -1 and entry_num > limit:
return
yield match

def p_ml_find_cpe(
self,
cpe_search_str: str,
frmt: Literal["Vpv", "pv"] = "Vpv",
threshold: float = 80.0,
limit: int = 10
) -> None:
"""
Prints results gathered from ml_find_cpe.
:param cpe_search_str: String to search for
:param frmt: The specific ordering of token elements in the string.
V = Vendor, p = Product, v = Version. Defaults to "Vpv",
choices are "Vpv" and "pv".
:param threshold: Minimum WRatio score to be included in results.
:param limit: Maximum number of results to return. Defaults to 10, set to -1 to return all.
:return: Sorted list of CPEs from highest WRatio score to lowest.
"""
for match_score, cpe in self.ml_find_cpe(cpe_search_str, frmt, threshold, limit):
print(f"[[Match Score {match_score}%]]\n{cpe_str(cpe)}")


if __name__ == '__main__':
Expand All @@ -211,8 +207,8 @@ def get_weighted_score(scores: list[float]) -> float:
op_select.add_argument("--cpe2cves",
help="print all CVEs given a CPE name")
op_select.add_argument("--str2cpes",
help="find closest matching CPE given "
"string in the form '<VENDOR> <PRODUCT> <VERSION>'")
help="prints closest matching CPE(s) given "
"string in the form '<VENDOR> <PRODUCT> <VERSION>'.")
args = arg_parse.parse_args()

config = VaultConfig(args.config)
Expand All @@ -227,12 +223,12 @@ def get_weighted_score(scores: list[float]) -> float:
query = VaultQuery(mngo_client)

if args.cve:
query.q_cve_id(args.cve)
query.p_cve_id(args.cve)
elif args.cpe:
print(query.q_cpe_name(args.cpe))
query.p_cpe_name(args.cpe)
elif args.cpeid:
print(query.q_cpe_id(args.cpeid))
query.p_cpe_id(args.cpeid)
elif args.cpe2cves:
query.cpe_name_to_cves(args.cpe2cves)
query.p_cpename_to_cves(args.cpe2cves)
elif args.str2cpes:
print(query.ml_find_cpe(args.str2cpes))
query.p_ml_find_cpe(args.str2cpes)
87 changes: 79 additions & 8 deletions vaultlib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,47 @@
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from copy import deepcopy
from datetime import datetime as Datetime
from functools import partial
from re import split
from textwrap import wrap
from threading import Lock
from time import time, sleep
from typing import Any, Callable, TypedDict
from typing import Any, Callable, TypedDict, Final, Generator

from pymongo.cursor import Cursor
from requests import get, HTTPError, PreparedRequest, Response, Session

from .config import VaultConfig
from .utils import BColors as C
from .utils import camel_to_snake, snake_to_camel, int_to_ordinal, s_print

DATETIME_FORMAT: Final[str] = "%m/%d/%Y %H:%M:%S"


class CVESchema(TypedDict):
"""
Schema for CVE records in the cves MongoDB collection.
"""
_id: str
description: str
published: str
last_modified: str
status: str
metrics_v20: dict[str, dict[str, Any]]
metrics_v30: dict[str, dict[str, Any]]
metrics_v31: dict[str, dict[str, Any]]
metrics_v40: dict[str, dict[str, Any]]
metrics_v20: dict[str, dict[str, str | float]]
metrics_v30: dict[str, dict[str, str | float]]
metrics_v31: dict[str, dict[str, str | float]]
metrics_v40: dict[str, dict[str, str | float]]
configurations: list[dict[str, Any]]
references: list[dict[str, Any]]
cwes: list[dict[str, Any]]
references: list[dict[str, str | list[str]]]
cwes: dict[str, str]


class CPESchema(TypedDict, total=False):
"""
Schema for CPE records in the cpes MongoDB collection.
"""
_id: str
cpe_name: str
created: str
last_modified: str
Expand Down Expand Up @@ -421,7 +428,7 @@ def __init__( # pylint: disable=R0913
self.results: list[dict[str, Any]] = []
self.max_workers = max_workers
self.progress_callback = progress_callback
self.last_call_time = 0
self.last_call_time = 0.0
self.total_calls = 0
self.completed_calls = 0

Expand Down Expand Up @@ -479,3 +486,67 @@ def run(self) -> list[dict[str, Any]]:
for future in as_completed(future_to_call):
future.result() # Ensure that all futures are processed
return self.results


def cve_str(cve: CVESchema) -> str:
"""
Creates a printable representation of a CVE document.
:param cve: CVE document from Mongo
:return: String representation of a CVE document
"""
if not cve:
return ""
# Python 3.8 - 3.11 do not allow backslashes inside f-string expression
description = "\n".join(wrap(cve['description']))
references = "\n".join(map(lambda x: f"{x['source']} - {x['url']}", cve["references"]))
return f"""[{cve["_id"]}]
Published: {Datetime.fromisoformat(cve["published"]).strftime(DATETIME_FORMAT)}
Last Modified: {Datetime.fromisoformat(cve["last_modified"]).strftime(DATETIME_FORMAT)}
Status: {cve["status"]}
Description: {description}
CWEs: {", ".join(cve["cwes"].values())}
References:
{references}
"""


def cpe_str(cpe: CPESchema) -> str:
"""
Creates a printable representation of a CPE document.
:param cpe: CPE document from Mongo
:return: String representation of a CPE document
"""
return f"""[{cpe["cpe_name"]}]
ID: {cpe["_id"]}
Title: {cpe["title"]}
Created: {Datetime.fromisoformat(cpe["created"]).strftime(DATETIME_FORMAT)}
Last Modified: {Datetime.fromisoformat(cpe["last_modified"]).strftime(DATETIME_FORMAT)}
Deprecated: {cpe["deprecated"]}
Part: {cpe["part"]}
Vendor: {cpe["vendor"]}
Product: {cpe["product"]}
Version: {cpe["version"]}
Update: {cpe["update"]}
Edition: {cpe["edition"]}
Language: {cpe["language"]}
SW Edition: {cpe["sw_edition"]}
Target SW: {cpe["target_sw"]}
Target HW: {cpe["target_hw"]}
Other: {cpe["other"]}
"""


def stringify_results(cursor: Cursor[CVESchema] | Cursor[CPESchema]) -> Generator[str, None, None]:
"""
Generator function that creates a printable representation of each CVE.
:param cursor: Results cursor from Pymongo
:return: Yield string representation of CVE
"""
type_func = Callable[[CVESchema], str] | Callable[[CPESchema], str] # noqa
func: type_func = cve_str if cursor.collection.name == "cves" \
else cpe_str
for schema in cursor:
yield func(schema) # type: ignore

0 comments on commit 6d37976

Please sign in to comment.