Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add blintdb to blint #124

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions blint/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from blint.config import BlintOptions
from blint.logger import LOG
from blint.lib.sbom import generate
from blint.lib.utils import gen_file_list
from blint.lib.utils import gen_file_list, blintdb_setup

BLINT_LOGO = """
██████╗ ██╗ ██╗███╗ ██╗████████╗
Expand Down Expand Up @@ -86,6 +86,20 @@ def build_parser():
dest="suggest_fuzzable",
help="Suggest functions and symbols for fuzzing based on a dictionary.",
)
# TODO: what to do should this be default?
parser.add_argument(
aryan-rajoria marked this conversation as resolved.
Show resolved Hide resolved
"--use-blintdb",
action="store_true",
default=True,
dest="use_blintdb",
help="Use blintdb for symbol resolution.",
)
# TODO: Please suggest if this location is good
parser.add_argument(
aryan-rajoria marked this conversation as resolved.
Show resolved Hide resolved
"--blintdb-home",
dest="blintdb_home",
help="Path to blintdb. Defaults to $HOME/blintdb.",
)
# sbom commmand
subparsers = parser.add_subparsers(
title="sub-commands",
Expand All @@ -104,6 +118,8 @@ def build_parser():
nargs="+",
help="Source directories, container images or binary files. Defaults to current directory.",
)


sbom_parser.add_argument(
"-o",
"--output-file",
Expand Down Expand Up @@ -186,7 +202,8 @@ def handle_args():
sbom_output=args.sbom_output,
src_dir_boms=args.src_dir_boms,
src_dir_image=args.src_dir_image,
stdout_mode=args.stdout_mode
stdout_mode=args.stdout_mode,
use_blintdb=args.use_blintdb
)
return blint_options

Expand All @@ -195,6 +212,8 @@ def main():
"""Main function of the blint tool"""
blint_options = handle_args()

blintdb_setup(blint_options)
aryan-rajoria marked this conversation as resolved.
Show resolved Hide resolved

# SBOM command
if blint_options.sbom_mode:
run_sbom_mode(blint_options)
Expand Down
4 changes: 4 additions & 0 deletions blint/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,8 @@ class BlintOptions:
deep_mode (bool): Flag indicating whether to perform deep analysis.
export_prefixes (list): Prefixes to determine exported symbols.
src_dir_boms (list): Directory containing pre-build and build sboms.
use_blintdb (bool): Flag indicating whether or not to utilize blint-db
blintdb_home (str): User supplied location for where to install and use blintdb
"""
deep_mode: bool = False
exports_prefix: List = field(default_factory=list)
Expand All @@ -1303,6 +1305,8 @@ class BlintOptions:
src_dir_boms: List = field(default_factory=list)
src_dir_image: List = field(default_factory=list)
stdout_mode: bool = False
use_blintdb: bool = False
blintdb_home: str = None
aryan-rajoria marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self):
if not self.src_dir_image and not (self.sbom_mode and self.src_dir_boms):
Expand Down
227 changes: 227 additions & 0 deletions blint/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# SPDX-FileCopyrightText: AppThreat <[email protected]>
#
# SPDX-License-Identifier: MIT

import concurrent
import concurrent.futures
import os
import sqlite3
from contextlib import closing

from blint.logger import LOG

DEBUG_MODE = os.getenv("SCAN_DEBUG_MODE") == "debug"

def return_batch_binaries_detected(symbols_list):
"""
Current scoring algorithm along with batching
"""
binaries_detected_dict = {}

# Errors not being caught here
output_list = get_bid_using_ename_batch(symbols_list)

eid_list = [it[0] for it in output_list]
bid_2d_list = [it[1] for it in output_list]
for it in range(len(eid_list)):
bid_list = bid_2d_list[it].split(",")
score = 1 / len(bid_list)
for bid in bid_list:
bname = get_bname(bid)
if bname in binaries_detected_dict:
binaries_detected_dict[bname] += score
else:
binaries_detected_dict[bname] = score
return binaries_detected_dict


def get_bid_using_ename_batch(batch_export_name):
"""Retrieves binary IDs associated with a batch of export names from a SQLite database.

This function queries the database to find binary IDs linked to specific export names in a batch.
It uses the BLINTDB_LOC environment variable to connect to the SQLite database and performs a grouped query.

Args:
batch_export_name (list): A list of export names to search for in the database.

Returns:
list: A list of tuples containing export IDs and their corresponding concatenated binary IDs.

Raises:
sqlite3.Error: If there are any database connection or query execution issues.
"""
BLINTDB_LOC = os.getenv("BLINTDB_LOC")
with closing(sqlite3.connect(BLINTDB_LOC)) as connection:
with closing(connection.cursor()) as c:
place_holders = "?, " * (len(batch_export_name) - 1) + "?"
if len(batch_export_name) > 0:
output_string = f"SELECT eid, group_concat(bid) from BinariesExports where eid IN (SELECT rowid from Exports where infunc IN ({place_holders})) group by eid"
# print(output_string)
c.execute(output_string, batch_export_name)
res = c.fetchall()
connection.commit()
return res


def get_bname(bid):
"""Retrieves the binary name for a given binary ID from a SQLite database. The function performs a lookup to fetch the corresponding binary name.

This function connects to a SQLite database using the BLINTDB_LOC environment variable and executes a query to retrieve the binary name based on the provided binary ID. If no matching binary is found, it returns None.

Args:
bid (int): The binary identifier to search for in the database.

Returns:
str or None: The name of the binary if found, otherwise None.

Raises:
sqlite3.Error: If there are any database connection or query execution issues.
"""
BLINTDB_LOC = os.getenv("BLINTDB_LOC")
with closing(sqlite3.connect(BLINTDB_LOC)) as connection:
with closing(connection.cursor()) as c:
c.execute("SELECT bname from Binaries where bid=?", (bid,))
res = c.fetchall()
connection.commit()
return res[0][0] if res else None


def detect_binaries_utilized(sybmols_list) -> set:
"""Simple Voting algorithm
for a given symbols. e.g. XRenderAddGlyphs
we count the number of binaries associated to this function
e.g. which is one in this example XRenderAddGlyphs
is associated with 'libXrender-0.9.10/libxrender.so'.
so one is added to score, we want all the detections to have a score greater than 1.
"""
bin_detected_dict = {}

eid_list = [symbol["name"] for symbol in sybmols_list]
# creates a 2D array with batch_len, batch_len eids are processed in a single query
batch_len = 1000
eid_2d_list = [eid_list[i : i + batch_len] for i in range(0, len(eid_list), batch_len)]

LOG.debug(f"Created {len(eid_2d_list)} processes created")
# for eid in eid_list:
with concurrent.futures.ProcessPoolExecutor() as executor:
futures_bin_detected = {
executor.submit(return_batch_binaries_detected, it_eid_list): it_eid_list
for it_eid_list in eid_2d_list
}
for future in concurrent.futures.as_completed(futures_bin_detected):
single_binaries_detected_dict = future.result()
for fname, score in single_binaries_detected_dict.items():
if fname in bin_detected_dict:
bin_detected_dict[fname] += score
else:
bin_detected_dict[fname] = score
# create a set() and remove false positives
binary_detected = {bname for bname, score in bin_detected_dict.items() if score > 1}

LOG.debug(f"Output for binary_detected: {len(binary_detected)}")
return binary_detected

def get_export_id(export_name):
"""Retrieves the export ID for a given export name from a SQLite database. The function performs a lookup to fetch the corresponding export identifier.

This function connects to a SQLite database using the BLINTDB_LOC environment variable and executes a query to retrieve the export ID based on the provided export name. If no matching export is found, it returns None.

Args:
export_name (str): The name of the export to search for in the database.

Returns:
int or None: The export ID if found, otherwise None.

Raises:
sqlite3.Error: If there are any database connection or query execution issues.
"""
BLINTDB_LOC = os.getenv("BLINTDB_LOC")
with closing(sqlite3.connect(BLINTDB_LOC)) as connection:
with closing(connection.cursor()) as c:
c.execute("SELECT rowid from Exports where infunc=?", (export_name,))
res = c.fetchall()
connection.commit()
return res[0][0] if res else None


def get_bid_using_fid(eid):
"""Retrieves binary IDs associated with a specific export ID from a SQLite database. The function performs a lookup to fetch the corresponding binary identifiers.

This function connects to a SQLite database using the BLINTDB_LOC environment variable and executes a query to retrieve binary IDs based on the provided export ID. If no matching binary IDs are found, it returns None.

Args:
eid (int): The export identifier to search for in the database.

Returns:
list or None: A list of binary IDs if found, otherwise None.

Raises:
sqlite3.Error: If there are any database connection or query execution issues.
"""
BLINTDB_LOC = os.getenv("BLINTDB_LOC")
with closing(sqlite3.connect(BLINTDB_LOC)) as connection:
with closing(connection.cursor()) as c:
c.execute("SELECT bid from BinariesExports where eid=?", (eid,))
res = c.fetchall()
connection.commit()
return map(lambda x: x[0], res) if res else None


def get_pname(bid):
"""Retrieves the project name associated with a given binary ID from a SQLite database. The function performs a two-step lookup to fetch the corresponding project name.

This function connects to a SQLite database using the BLINTDB_LOC environment variable and executes queries to first find the project ID linked to the binary, and then retrieve the project name. If no matching project is found at either stage, it returns None.

Args:
bid (int): The binary identifier to search for in the database.

Returns:
str or None: The name of the project associated with the binary if found, otherwise None.

Raises:
sqlite3.Error: If there are any database connection or query execution issues.
"""
BLINTDB_LOC = os.getenv("BLINTDB_LOC")
with closing(sqlite3.connect(BLINTDB_LOC)) as connection:
with closing(connection.cursor()) as c:
c.execute("SELECT pid from Binaries where bid=?", (bid,))
res = c.fetchall()
if not res:
return None
pid = res[0][0]
c.execute("SELECT pname from Projects where pid=?", (pid,))
res = c.fetchall()
if not res:
return None
connection.commit()
return res[0][0]


def get_pname_bname(bname):
"""Retrieves the project name associated with a given binary name from a SQLite database. The function performs a two-step lookup to fetch the corresponding project name.

This function connects to a SQLite database using the BLINTDB_LOC environment variable and executes queries to first find the project ID linked to the binary name, and then retrieve the project name. If no matching project is found at either stage, it returns None.

Args:
bname (str): The binary name to search for in the database.

Returns:
str or None: The name of the project associated with the binary name if found, otherwise None.

Raises:
sqlite3.Error: If there are any database connection or query execution issues.
"""
BLINTDB_LOC = os.getenv("BLINTDB_LOC")
with closing(sqlite3.connect(BLINTDB_LOC)) as connection:
with closing(connection.cursor()) as c:
c.execute("SELECT pid from Binaries where bname=?", (bname,))
res = c.fetchall()
if not res:
return None
pid = res[0][0]
c.execute("SELECT pname from Projects where pid=?", (pid,))
res = c.fetchall()
if not res:
return None
connection.commit()
return res[0][0]
25 changes: 25 additions & 0 deletions blint/lib/sbom.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
find_bom_files,
get_version,
)
from blint.db import detect_binaries_utilized


def default_parent(src_dirs: list[str], symbols_purl_map: dict = None) -> Component:
Expand Down Expand Up @@ -349,6 +350,7 @@ def process_exe_file(
value=", ".join([f["name"] for f in symbols_version]),
)
)

internal_functions = sorted(
{f["name"] for f in metadata.get("functions", []) if not any(f["name"].startswith(p) for p in export_prefixes)}
)
Expand Down Expand Up @@ -415,6 +417,7 @@ def process_exe_file(
value=SYMBOL_DELIMITER.join(dynamic_symbols),
)
)

exported_dynamic_symbols = sorted(
{f["name"] for f in metadata.get("dynamic_symbols", []) if any(f["name"].startswith(p) for p in export_prefixes)}
)
Expand All @@ -425,6 +428,28 @@ def process_exe_file(
value=SYMBOL_DELIMITER.join(exported_dynamic_symbols),
)
)

# If USER_BLINTDB is "1" or "true", then match components with database
if os.environ.get("USE_BLINTDB", "") in ["1", "true"]:
# utilize voting logic along with blitndb
# we iterate through each symbol and try to find a match in the database

LOG.debug("utilizing blint_db")
symtab_symbols_list = metadata.get("symtab_symbols", [])
symtab_binaries_detected = detect_binaries_utilized(symtab_symbols_list)
dynamic_symbols_list = metadata.get("dynamic_symbols", [])
binaries_detected = detect_binaries_utilized(dynamic_symbols_list)

binaries_detected = binaries_detected.union(symtab_binaries_detected)
# adds the components in a similar way to dynamic entries
for binary in binaries_detected:
entry ={
"name": binary,
"tag": "NEEDED",
}
comp = create_dynamic_component(entry, exe)
lib_components.append(comp)

if not sbom.metadata.component.components:
sbom.metadata.component.components = []
# Automatically promote application dependencies to the parent
Expand Down
Loading
Loading