Skip to content

Commit

Permalink
Added single-org, SQS version of ASM Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
arng4108 committed Oct 29, 2024
1 parent 11bf7c9 commit 22c3de5
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 39 deletions.
86 changes: 81 additions & 5 deletions src/pe_asm/asm_sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A tool for gathering pe asm data.
Usage:
pe-asm-sync METHOD [--log-level=LEVEL] [--staging]
pe-asm-sync METHOD [--log-level=LEVEL] [--staging] [--org=ORG]
Options:
-h --help Show this message.
Expand All @@ -10,12 +10,18 @@
-l --log-level=LEVEL If specified, then the log level will be set to
the specified value. Valid values are "debug", "info",
"warning", "error", and "critical". [default: info]
-o --org=ORG The cyhy_db_name of the single organization to collect data for.
This option is only used for the SQS version of the ASM Sync.
Org name must match the ID in the cyhy-db. E.g. DHS,DHS_ICE,DOC.
[default: all]
-s --staging Run on the staging database. Otherwise will run on a local copy.
"""

# Standard Python Libraries
from datetime import timedelta
import logging
import sys
import time
from typing import Any, Dict

# Third-Party Libraries
Expand All @@ -36,6 +42,13 @@
identify_sub_changes,
pe_db_connect,
pe_db_staging_connect,
# SQS version imports
sqs_query_org,
sqs_identify_cidr_changes,
sqs_identify_ip_changes,
sqs_identify_sub_changes,
sqs_identify_ip_sub_changes,
sqs_identified_sub_domains,
)
from .helpers.enumerate_subs_from_root import get_subdomains
from .helpers.fill_cidrs_from_cyhy_assets import fill_cidrs
Expand All @@ -57,7 +70,7 @@
LOGGER = logging.getLogger(__name__)


def run_asm_sync(staging, method):
def run_asm_sync(staging, method, org):
"""Collect and sync ASM data."""
if method == "asm":
# Run function to fetch and store all CyHy assets in the P&E database
Expand All @@ -67,7 +80,7 @@ def run_asm_sync(staging, method):

# Fill the P&E CIDRs table from CyHy assets
LOGGER.info("Filling CIDRs.")
fill_cidrs("all_orgs", staging)
fill_cidrs(staging, "all_orgs")
LOGGER.info("Finished.")

# Identify which CIDRs are current
Expand Down Expand Up @@ -123,6 +136,70 @@ def run_asm_sync(staging, method):
dedupe(staging)
LOGGER.info("Finished.")

if method == "asm-sqs":
# SQS version of the ASM Sync (single org)
LOGGER.info(f"--- SQS ASM Sync Process Starting for {org} ---")
sqs_asm_start = time.time()

# --- Local Portion of ASM Sync ---
# *** Warning: The local portion of the ASM Sync process needs
# to be run locally on a Macbook before the following code can
# run. A dedicated python script is available for this
# "local step" of the ASM Sync

# Retrieve additional info for the specified org
org_df = sqs_query_org(staging, org)
org_uid = org_df["organizations_uid"][0]

# Fill the cidrs table with new data from the cyhy_db_assets
LOGGER.info("Filling the CIDRs table using the retrieved CyHy assets...")
fill_cidrs(staging, org_df)
LOGGER.info("Finished filling the CIDRs table using the retrieved CyHy assets")

# Identify which CIDRs are current
LOGGER.info("Identifying CIDR changes...")
sqs_identify_cidr_changes(staging, org_uid)
LOGGER.info("Finished identifying CIDR changes")

# Enumerate subdomains from roots
LOGGER.info("Enumerating sub-domains from root domains...")
get_subdomains(staging, org_df)
LOGGER.info("Finished enumerating sub-domains from root domains")

# Enumerate subdomains from IPs
LOGGER.info("Linking sub-domains and ips using ips...")
connect_subs_from_ips(staging, org_df)
LOGGER.info("Finished linking sub-domains and ips using ips")

# Enumerate IPs from subdomains
LOGGER.info("Linking sub-domains and ips using sub-domains...")
connect_ips_from_subs(staging, org_df)
LOGGER.info("Finished linking sub-domains and ips using sub-domains")

# Identify which IPs, sub-domains, and connections are current
LOGGER.info("Identify IP changes...")
sqs_identify_ip_changes(staging, org_uid)
LOGGER.info("Finished identifying IP changes")
LOGGER.info("Identifying sub-domain changes...")
sqs_identify_sub_changes(staging, org_uid)
LOGGER.info("Finished identifying sub-domain changes")
LOGGER.info("Identifying IP sub-domain link changes...")
sqs_identify_ip_sub_changes(staging, org_uid)
LOGGER.info("Finished identifying IP sub-domain link changes")
LOGGER.info("Updating identified sub-domains...")
sqs_identified_sub_domains(staging, org_uid)
LOGGER.info("Finished updating identified sub-domains")

# Run shodan dedupe
LOGGER.info("Running Shodan dedupe...")
dedupe(staging, org_df)
LOGGER.info("Finished running Shodan dedupe")

# Log execution time and finish
sqs_asm_end = time.time()
LOGGER.info(f"SQS ASM Sync execution time for {org}: {str(timedelta(seconds=(sqs_asm_end - sqs_asm_start)))} (H:M:S)")
LOGGER.info(f"--- SQS ASM Sync Process Complete for {org} ---")

elif method == "scorecard":
LOGGER.info("STARTING")
get_cyhy_port_scans(staging)
Expand Down Expand Up @@ -176,7 +253,6 @@ def main():
datefmt="%m/%d/%Y %I:%M:%S",
level=log_level.upper(),
)
LOGGER.info("Starting ASM sync scripts")

# Check for the staging option
try:
Expand All @@ -186,7 +262,7 @@ def main():
staging = False

# Run ASM finder
run_asm_sync(staging, validated_args["METHOD"])
run_asm_sync(staging, validated_args["METHOD"], validated_args["--org"])

# Stop logging and clean up
logging.shutdown()
243 changes: 243 additions & 0 deletions src/pe_asm/data/cyhy_db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,3 +759,246 @@ def get_fceb_orgs(conn):
df = pd.read_sql(sql, conn)
fceb_list = list(df["cyhy_db_name"])
return fceb_list


# --- SQS ASM Sync Functions ---
def sqs_query_org(staging, cyhy_db_name):
"""Query additional info for the specified organization."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
LOGGER.info(f"Retrieving additional info for org: {cyhy_db_name}")
sql = f"""
SELECT
organizations_uid, cyhy_db_name, name, agency_type
FROM
organizations o
WHERE
cyhy_db_name = '{cyhy_db_name}'
"""
df = pd.read_sql(sql, conn)
conn.close()
return df

def sqs_identify_cidr_changes(staging, org_id):
"""Identify CIDR changes, single organization."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking CIDRs as current if seen within the last 3 days")
cursor.execute(
f"""
UPDATE cidrs
SET current = True
WHERE
last_seen > (CURRENT_DATE - INTERVAL '3 days')
AND
organizations_uid = '{org_id}'
"""
)
conn.commit()
LOGGER.info("Marking CIDRs as not current if not seen within the last 3 days")
cursor.execute(
f"""
UPDATE cidrs
SET current = False
WHERE
last_seen < (CURRENT_DATE - INTERVAL '3 days')
AND
organizations_uid = '{org_id}'
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()

def sqs_query_roots(conn, org_id):
"""Query root_domains, single organization."""
LOGGER.info("Retrieving root domains for this organization")
sql = f"""
SELECT
r.root_domain_uid,
r.root_domain
FROM
root_domains r
JOIN
organizations o
ON r.organizations_uid = o.organizations_uid
WHERE
o.organizations_uid = '{org_id}'
"""
df = pd.read_sql(sql, conn)
return df

def sqs_identify_ip_changes(staging, org_id):
"""Identify IP changes, single organization."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking IPs as current if seen within the last 3 days")
cursor.execute(
f"""
UPDATE ips
SET current = True
WHERE
last_seen > (CURRENT_DATE - INTERVAL '3 days')
AND
organizations_uid = '{org_id}'
"""
)
conn.commit()
LOGGER.info("Marking IPs as not current if not seen within the last 3 days")
cursor.execute(
f"""
UPDATE ips
SET current = False
WHERE
(last_seen < (CURRENT_DATE - INTERVAL '3 days') or last_seen isnull)
AND
organizations_uid = '{org_id}'
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()

def sqs_identify_sub_changes(staging, org_id):
"""Identify IP changes, single organization."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking subdomains as current if seen within the last 3 days")
cursor.execute(
f"""
UPDATE
sub_domains sd
SET
current = True
FROM
root_domains rd
WHERE
sd.root_domain_uid = rd.root_domain_uid
AND
last_seen > (CURRENT_DATE - INTERVAL '3 days')
AND
organizations_uid = '{org_id}'
"""
)
conn.commit()
LOGGER.info("Marking subdomains as not current if not seen within the last 3 days")
cursor.execute(
f"""
UPDATE
sub_domains sd
SET
current = False
FROM
root_domains rd
WHERE
sd.root_domain_uid = rd.root_domain_uid
AND
(last_seen < (CURRENT_DATE - INTERVAL '3 days') or last_seen isnull)
AND
organizations_uid = '{org_id}'
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()

def sqs_identify_ip_sub_changes(staging, org_id):
"""Identify IP/Subs changes, single organization."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking IPs-subs as current if seen within the last 3 days")
cursor.execute(
f"""
UPDATE
ips_subs
SET
current = True
FROM
ips
WHERE
ips_subs.ip_hash = ips.ip_hash
AND
ips_subs.last_seen > (CURRENT_DATE - INTERVAL '3 days')
AND
ips.organizations_uid = '{org_id}'
"""
)
conn.commit()
LOGGER.info("Marking IPs-subs as not current if not seen within the last 3 days")
cursor.execute(
f"""
UPDATE
ips_subs
SET
current = False
FROM
ips
WHERE
ips_subs.ip_hash = ips.ip_hash
AND
(ips_subs.last_seen < (CURRENT_DATE - INTERVAL '3 days') or ips_subs.last_seen isnull)
AND
ips.organizations_uid = '{org_id}'
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()

def sqs_identified_sub_domains(staging, org_id):
"""Set sub-domains to identified, single organization."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# If the sub's root-domain has enumerate=False, then "identified" is True
cursor = conn.cursor()
LOGGER.info("Marking identified subdomains")
cursor.execute(
f"""
UPDATE
sub_domains sd
SET
identified = True
FROM
root_domains rd
WHERE
sd.root_domain_uid = rd.root_domain_uid
AND
rd.enumerate_subs = false
AND
rd.organizations_uid = '{org_id}'
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()
Loading

0 comments on commit 22c3de5

Please sign in to comment.