Skip to content

Commit

Permalink
Add options to merge command for controlling creation of system objec…
Browse files Browse the repository at this point in the history
…t and controlling the type of relationship created (#201)

* Add help message for merge command config_file option
* Add a system_uuid option for the merge command, and prevent adding randomly generated UUIDs without a system object
* Restructure the error message for an invalid uuid
* Fix bug with merge tests modifying data for subsequent runs
* Add tests for system relationship creation logic
  • Loading branch information
nightlark committed Jan 13, 2025
1 parent 0b92dc5 commit 0901d8f
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 48 deletions.
104 changes: 83 additions & 21 deletions surfactant/cmd/merge.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import uuid as uuid_module
from collections import deque
from typing import Dict, List
from typing import Dict, List, Tuple

import click
from loguru import logger

from surfactant.configmanager import ConfigManager
from surfactant.plugin.manager import find_io_plugin, get_plugin_manager
from surfactant.sbomtypes._relationship import Relationship
from surfactant.sbomtypes._sbom import SBOM
Expand All @@ -14,11 +15,18 @@

@click.argument("sbom_outfile", envvar="SBOM_OUTPUT", type=click.File("w"), required=True)
@click.argument("input_sboms", type=click.File("r"), required=True, nargs=-1)
@click.option("--config_file", type=click.File("r"), required=False)
@click.option(
"--config_file",
type=click.File("r"),
required=False,
help="Config file for controlling some aspects of the merged SBOM, primarily the creation of a new top-level system object (settings here will typically take precedence over command line options)",
)
@click.option(
"--output_format",
is_flag=False,
default="surfactant.output.cytrics_writer",
default=ConfigManager().get(
"core", "output_format", fallback="surfactant.output.cytrics_writer"
),
help="SBOM output format, options=surfactant.output.[cytrics|csv|spdx]_writer",
)
@click.option(
Expand All @@ -27,8 +35,36 @@
default="surfactant.input_readers.cytrics_reader",
help="SBOM input format, assumes that all input SBOMs being merged have the same format, options=surfactant.input_readers.[cytrics|cyclonedx|spdx]_reader",
)
@click.option(
"--system_uuid",
is_flag=False,
help="System UUID to use for relationships to a top-level system object",
)
@click.option(
"--system_relationship",
is_flag=False,
default="Contains",
show_default=True,
help="Relationship type between merged SBOM contents to a top-level system object",
)
@click.option(
"--add_system/--no_add_system",
default=False,
show_default=True,
help="Create a top-level system entry for tying together the merged SBOM components. When disabled, relationships will still be created to a provided system UUID",
)
@click.command("merge")
def merge_command(input_sboms, sbom_outfile, config_file, output_format, input_format):
# pylint: disable-next=too-many-positional-arguments
def merge_command(
input_sboms,
sbom_outfile,
config_file,
output_format,
input_format,
system_uuid,
system_relationship,
add_system,
):
"""Merge two or more INPUT_SBOMS together into SBOM_OUTFILE.
An optional CONFIG_FILE can be supplied to specify a root system entry
Expand All @@ -43,10 +79,19 @@ def merge_command(input_sboms, sbom_outfile, config_file, output_format, input_f
config = None
if config_file:
config = json.load(config_file)
merge(sboms, sbom_outfile, config, output_writer)


def merge(input_sboms, sbom_outfile, config, output_writer):
merge(sboms, sbom_outfile, config, output_writer, system_uuid, system_relationship, add_system)


# pylint: disable-next=too-many-positional-arguments
def merge(
input_sboms,
sbom_outfile,
config,
output_writer,
system_uuid=None,
system_relationship="Contains", # remember: keep in-sync with click arg default
add_system=False,
):
"""Merge two or more SBOMs."""
merged_sbom = input_sboms[0]
for sbom_m in input_sboms[1:]:
Expand All @@ -56,22 +101,28 @@ def merge(input_sboms, sbom_outfile, config, output_writer):
roots = get_roots_check_cycles(rel_graph)

# Check if provided UUID for a system object already exists to avoid creating a duplicate
add_system = True
if config and "system" in config:
if "UUID" in config["system"]:
for s in merged_sbom.systems:
if config["system"]["UUID"] == s.UUID:
add_system = False
break
# Even if not adding the system, create a dummy/placeholder with the UUID for creating relationships
system = create_system_object(merged_sbom, config)
system, using_random_uuid = create_system_object(merged_sbom, config, system_uuid)
if add_system:
merged_sbom.systems.append(system)

# Add a system relationship to each root software/systems entry identified
for r in roots:
merged_sbom.relationships.add(
Relationship(xUUID=system["UUID"], yUUID=r, relationship="Includes")
if not using_random_uuid or add_system:
if config and "systemRelationship" in config:
system_relationship = config["systemRelationship"]
for r in roots:
merged_sbom.relationships.add(
Relationship(xUUID=system.UUID, yUUID=r, relationship=system_relationship)
)
else:
logger.warning(
"No top-level system relationships added; enable the add system option to randomly generate a UUID, or specify a system UUID"
)

output_writer.write_sbom(merged_sbom, sbom_outfile)
Expand All @@ -92,10 +143,10 @@ def construct_relationship_graph(sbom: SBOM):
rel_graph[sw.UUID] = []
# iterate through all relationships, adding edges to the adjacency list
for rel in sbom.relationships:
# check case where xUUID doesn't exist (and error if yUUID doesn't exist) in the graph
if rel.xUUID not in rel_graph or rel.yUUID not in rel_graph:
logger.error("====ERROR xUUID or yUUID doesn't exist====")
logger.error(f"{rel = }")
logger.error(
f"Either the xUUID or yUUID for the relationship does not exist in the graph: {rel = }"
)
continue
# consider also including relationship type for the edge
# treat as directed graph, with inverted edges (pointing to parents) so dfs will eventually lead to the root parent node for a (sub)graph
Expand Down Expand Up @@ -151,23 +202,34 @@ def dfs(rel):
return roots


def create_system_object(sbom: SBOM, config=None) -> System:
def create_system_object(sbom: SBOM, config=None, system_uuid=None) -> Tuple[System, bool]:
"""Function to create an accurate system object
Positional arguments:
sbom (SBOM): The SBOM the system object is being created for.
config: The user specified config json (Optional).
Returns:
System: The created system object.
Tuple[System, bool]: The created system object and a boolean indicating if a random UUID was used.
"""

system = {}
using_random_uuid = False
if config and "system" in config:
system = config["system"]
# make sure the required fields are present and at least mostly valid
if ("UUID" not in system) or not sbom.is_valid_uuid4(system["UUID"]):

# system_uuid supplied via command line overrides config file UUID
if system_uuid:
system["UUID"] = system_uuid
elif "UUID" not in system:
# No UUID, generate a random one...
using_random_uuid = True
system["UUID"] = str(uuid_module.uuid4())
# check if the UUID appears valid based on the CyTRICS schema
elif not sbom.is_valid_uuid4(system["UUID"]):
invalid_uuid = system["UUID"]
logger.error(f"Invalid uuid4 given ({invalid_uuid}) for the system")

if "name" not in system:
system["name"] = ""
captureStart = -1
Expand All @@ -183,4 +245,4 @@ def create_system_object(sbom: SBOM, config=None) -> System:
system["captureStart"] = captureStart
if "captureEnd" not in system or not system["captureEnd"]:
system["captureEnd"] = captureEnd
return System(**system)
return System(**system), using_random_uuid
Loading

0 comments on commit 0901d8f

Please sign in to comment.