From 772e4e220147d0614286d39d0ad3c768a0423f16 Mon Sep 17 00:00:00 2001 From: Dan Dye Date: Fri, 12 Jul 2024 06:10:05 -0700 Subject: [PATCH] v1alpha sample to bulk update alerts PiperOrigin-RevId: 651751014 --- detect/v1alpha/bulk_update_alerts.py | 120 ++++++++++++++ detect/v1alpha/update_alert.py | 231 +++++++++++++++------------ 2 files changed, 248 insertions(+), 103 deletions(-) create mode 100644 detect/v1alpha/bulk_update_alerts.py diff --git a/detect/v1alpha/bulk_update_alerts.py b/detect/v1alpha/bulk_update_alerts.py new file mode 100644 index 0000000..705dd77 --- /dev/null +++ b/detect/v1alpha/bulk_update_alerts.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +r"""Executable and reusable sample for bulk updating alerts. + +The file provided to the --alert_ids_file parameter should have one alert + ID per line like so: +``` +de_ad9d2771-a567-49ee-6452-1b2db13c1d33 +de_3c2e2556-aba1-a253-7518-b4ddb666cc32 +``` +Usage: + python -m alerts.v1alpha.bulk_update_alerts \ + --project_id= \ + --project_instance= \ + --alert_ids_file= \ + --confidence_score= \ + --priority= \ + --reason= \ + --reputation= \ + --priority= \ + --status= \ + --verdict= \ + --risk_score= \ + --disregarded= \ + --severity= \ + --comment= \ + --root_cause= \ + --severity_display= + +# pylint: disable=line-too-long +API reference: + https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyUpdateAlert + https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Priority + https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Reason + https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Reputation + https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Priority + https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Status + https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Verdict +""" +# pylint: enable=line-too-long + +import json + +from common import chronicle_auth + +from . import update_alert + + +CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com" +SCOPES = [ + "https://www.googleapis.com/auth/cloud-platform", +] +DEFAULT_FEEDBACK = { + "comment": "automated cleanup", + "reason": "REASON_MAINTENANCE", + "reputation": "REPUTATION_UNSPECIFIED", + "root_cause": "Other", + "status": "CLOSED", + "verdict": "VERDICT_UNSPECIFIED", +} + + +if __name__ == "__main__": + parser = update_alert.get_update_parser() + parser.add_argument( + "--alert_ids_file", type=str, required=True, + help="File with one alert ID per line." + ) + parser.set_defaults( + comment=DEFAULT_FEEDBACK["comment"], + reason=DEFAULT_FEEDBACK["reason"], + reputation=DEFAULT_FEEDBACK["reputation"], + root_cause=DEFAULT_FEEDBACK["root_cause"], + status=DEFAULT_FEEDBACK["status"], + verdict=DEFAULT_FEEDBACK["verdict"], + ) + args = parser.parse_args() + + # raise error if required args are not present + update_alert.check_args(parser, args) + + auth_session = chronicle_auth.initialize_http_session( + args.credentials_file, + SCOPES, + ) + with open(args.alert_ids_file) as fh: + for alert_id in fh: + a_list = update_alert.update_alert( + auth_session, + args.project_id, + args.project_instance, + args.region, + alert_id.strip(), + args.confidence_score, + args.reason, + args.reputation, + args.priority, + args.status, + args.verdict, + args.risk_score, + args.disregarded, + args.severity, + args.comment, + args.root_cause, + ) + print(json.dumps(a_list, indent=2)) diff --git a/detect/v1alpha/update_alert.py b/detect/v1alpha/update_alert.py index 02a7741..f3a2dc6 100644 --- a/detect/v1alpha/update_alert.py +++ b/detect/v1alpha/update_alert.py @@ -96,6 +96,130 @@ ) +def get_update_parser(): + """Returns an argparse.ArgumentParser for the update_alert command.""" + parser = argparse.ArgumentParser() + chronicle_auth.add_argument_credentials_file(parser) + project_instance.add_argument_project_instance(parser) + project_id.add_argument_project_id(parser) + regions.add_argument_region(parser) + parser.add_argument( + "--comment", + type=str, + required=False, + default=None, + help="Analyst comment.", + ) + parser.add_argument( + "--confidence_score", + type=int, + required=False, + default=None, + help="confidence score [1-100] of the finding", + ) + parser.add_argument( + "--disregarded", + type=bool, + required=False, + default=None, + help="Analyst disregard (or un-disregard) the event", + ) + parser.add_argument( + "--priority", + choices=PRIORITY_ENUM, + required=False, + default=None, + help="alert priority.", + ) + parser.add_argument( + "--reason", + choices=REASON_ENUM, + required=False, + default=None, + help="reason for closing an Alert", + ) + parser.add_argument( + "--reputation", + choices=REPUTATION_ENUM, + required=False, + default=None, + help="A categorization of the finding as useful or not useful", + ) + parser.add_argument( + "--risk_score", + type=int, + required=False, + default=None, + help="risk score [0-100] of the finding", + ) + parser.add_argument( + "--root_cause", + type=str, + required=False, + default=None, + help="Alert root cause.", + ) + parser.add_argument( + "--status", + choices=STATUS_ENUM, + required=False, + default=None, + help="alert status", + ) + parser.add_argument( + "--verdict", + choices=VERDICT_ENUM, + required=False, + default=None, + help="a verdict on whether the finding reflects a security incident", + ) + parser.add_argument( + "--severity", + type=int, + required=False, + default=None, + help="severity score [0-100] of the finding", + ) + return parser + + +def check_args( + parser: argparse.ArgumentParser, + args_to_check: argparse.Namespace): + """Checks if at least one of the required arguments is provided. + + Args: + parser: instance of argparse.ArgumentParser (to raise error if needed). + args_to_check: instance of argparse.Namespace with the arguments to check. + """ + if not any( + [ + args_to_check.comment or args.comment == "", # pylint: disable=g-explicit-bool-comparison + args_to_check.disregarded, + args_to_check.priority, + args_to_check.reason, + args_to_check.reputation, + args_to_check.risk_score or args.risk_score == 0, + args_to_check.root_cause or args.root_cause == "", # pylint: disable=g-explicit-bool-comparison + args_to_check.severity or args.severity == 0, + args_to_check.status, + args_to_check.verdict, + ] + ): + parser.error("At least one of the arguments " + "--comment, " + "--disregarded, " + "--priority, " + "--reason, " + "--reputation, " + "--risk_score, " + "--root_cause, " + "--severity, " + "--status, " + "or --verdict " + "is required.") + + def update_alert( http_session: requests.AuthorizedSession, proj_id: str, @@ -189,114 +313,15 @@ def update_alert( if __name__ == "__main__": - parser = argparse.ArgumentParser() - chronicle_auth.add_argument_credentials_file(parser) - project_instance.add_argument_project_instance(parser) - project_id.add_argument_project_id(parser) - regions.add_argument_region(parser) - parser.add_argument( + main_parser = get_update_parser() + main_parser.add_argument( "--alert_id", type=str, required=True, help="identifier for the alert" ) - parser.add_argument( - "--confidence_score", - type=int, - required=False, - default=None, - help="confidence score [1-100] of the finding", - ) - parser.add_argument( - "--priority", - choices=PRIORITY_ENUM, - required=False, - default=None, - help="alert priority.", - ) - parser.add_argument( - "--reason", - choices=REASON_ENUM, - required=False, - default=None, - help="reason for closing an Alert", - ) - parser.add_argument( - "--reputation", - choices=REPUTATION_ENUM, - required=False, - default=None, - help="A categorization of the finding as useful or not useful", - ) - parser.add_argument( - "--status", - choices=STATUS_ENUM, - required=False, - default=None, - help="alert status", - ) - parser.add_argument( - "--verdict", - choices=VERDICT_ENUM, - required=False, - default=None, - help="a verdict on whether the finding reflects a security incident", - ) - parser.add_argument( - "--risk_score", - type=int, - required=False, - default=None, - help="risk score [0-100] of the finding", - ) - parser.add_argument( - "--disregarded", - type=bool, - required=False, - default=None, - help="Analyst disregard (or un-disregard) the event", - ) - parser.add_argument( - "--severity", - type=int, - required=False, - default=None, - help="severity score [0-100] of the finding", - ) - parser.add_argument( - "--comment", - type=str, - required=False, - default=None, - help="Analyst comment.", - ) - parser.add_argument( - "--root_cause", - type=str, - required=False, - default=None, - help="Alert root cause.", - ) - - args = parser.parse_args() + args = main_parser.parse_args() # Check if at least one of the specific arguments is provided - if not any( - [ - args.reason, - args.reputation, - args.priority, - args.status, - args.verdict, - args.risk_score or args.risk_score == 0, - args.disregarded, - args.severity or args.severity == 0, - args.comment or args.comment == "", # pylint: disable=g-explicit-bool-comparison - args.root_cause or args.root_cause == "", # pylint: disable=g-explicit-bool-comparison - ] - ): - parser.error("At least one of the arguments --reputation, --reason, " - "--priority, --status, --verdict, --risk_score, " - "--disregarded, --severity, --comment, " - "or --root_cause is required.") + check_args(main_parser, args) auth_session = chronicle_auth.initialize_http_session( args.credentials_file,