Skip to content

Commit

Permalink
v1alpha sample to bulk update alerts
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 651751014
  • Loading branch information
dandye authored and copybara-github committed Jul 15, 2024
1 parent a3fa5b1 commit 772e4e2
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 103 deletions.
120 changes: 120 additions & 0 deletions detect/v1alpha/bulk_update_alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable and reusable sample for bulk updating alerts.
The file provided to the --alert_ids_file parameter should have one alert
ID per line like so:
```
de_ad9d2771-a567-49ee-6452-1b2db13c1d33
de_3c2e2556-aba1-a253-7518-b4ddb666cc32
```
Usage:
python -m alerts.v1alpha.bulk_update_alerts \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--alert_ids_file=<PATH_TO_FILE> \
--confidence_score=<CONFIDENCE_SCORE> \
--priority=<PRIORITY> \
--reason=<REASON> \
--reputation=<REPUTATION> \
--priority=<PRIORITY> \
--status=<STATUS> \
--verdict=<VERDICT> \
--risk_score=<RISK_SCORE> \
--disregarded=<DISREGARDED> \
--severity=<SEVERITY> \
--comment=<COMMENT> \
--root_cause=<ROOT_CAUSE> \
--severity_display=<SEVERITY_DISPLAY>
# pylint: disable=line-too-long
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyUpdateAlert
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Priority
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Reason
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Reputation
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Priority
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Status
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Verdict
"""
# pylint: enable=line-too-long

import json

from common import chronicle_auth

from . import update_alert


CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]
DEFAULT_FEEDBACK = {
"comment": "automated cleanup",
"reason": "REASON_MAINTENANCE",
"reputation": "REPUTATION_UNSPECIFIED",
"root_cause": "Other",
"status": "CLOSED",
"verdict": "VERDICT_UNSPECIFIED",
}


if __name__ == "__main__":
parser = update_alert.get_update_parser()
parser.add_argument(
"--alert_ids_file", type=str, required=True,
help="File with one alert ID per line."
)
parser.set_defaults(
comment=DEFAULT_FEEDBACK["comment"],
reason=DEFAULT_FEEDBACK["reason"],
reputation=DEFAULT_FEEDBACK["reputation"],
root_cause=DEFAULT_FEEDBACK["root_cause"],
status=DEFAULT_FEEDBACK["status"],
verdict=DEFAULT_FEEDBACK["verdict"],
)
args = parser.parse_args()

# raise error if required args are not present
update_alert.check_args(parser, args)

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
with open(args.alert_ids_file) as fh:
for alert_id in fh:
a_list = update_alert.update_alert(
auth_session,
args.project_id,
args.project_instance,
args.region,
alert_id.strip(),
args.confidence_score,
args.reason,
args.reputation,
args.priority,
args.status,
args.verdict,
args.risk_score,
args.disregarded,
args.severity,
args.comment,
args.root_cause,
)
print(json.dumps(a_list, indent=2))
231 changes: 128 additions & 103 deletions detect/v1alpha/update_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,130 @@
)


def get_update_parser():
"""Returns an argparse.ArgumentParser for the update_alert command."""
parser = argparse.ArgumentParser()
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument(
"--comment",
type=str,
required=False,
default=None,
help="Analyst comment.",
)
parser.add_argument(
"--confidence_score",
type=int,
required=False,
default=None,
help="confidence score [1-100] of the finding",
)
parser.add_argument(
"--disregarded",
type=bool,
required=False,
default=None,
help="Analyst disregard (or un-disregard) the event",
)
parser.add_argument(
"--priority",
choices=PRIORITY_ENUM,
required=False,
default=None,
help="alert priority.",
)
parser.add_argument(
"--reason",
choices=REASON_ENUM,
required=False,
default=None,
help="reason for closing an Alert",
)
parser.add_argument(
"--reputation",
choices=REPUTATION_ENUM,
required=False,
default=None,
help="A categorization of the finding as useful or not useful",
)
parser.add_argument(
"--risk_score",
type=int,
required=False,
default=None,
help="risk score [0-100] of the finding",
)
parser.add_argument(
"--root_cause",
type=str,
required=False,
default=None,
help="Alert root cause.",
)
parser.add_argument(
"--status",
choices=STATUS_ENUM,
required=False,
default=None,
help="alert status",
)
parser.add_argument(
"--verdict",
choices=VERDICT_ENUM,
required=False,
default=None,
help="a verdict on whether the finding reflects a security incident",
)
parser.add_argument(
"--severity",
type=int,
required=False,
default=None,
help="severity score [0-100] of the finding",
)
return parser


def check_args(
parser: argparse.ArgumentParser,
args_to_check: argparse.Namespace):
"""Checks if at least one of the required arguments is provided.
Args:
parser: instance of argparse.ArgumentParser (to raise error if needed).
args_to_check: instance of argparse.Namespace with the arguments to check.
"""
if not any(
[
args_to_check.comment or args.comment == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.disregarded,
args_to_check.priority,
args_to_check.reason,
args_to_check.reputation,
args_to_check.risk_score or args.risk_score == 0,
args_to_check.root_cause or args.root_cause == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.severity or args.severity == 0,
args_to_check.status,
args_to_check.verdict,
]
):
parser.error("At least one of the arguments "
"--comment, "
"--disregarded, "
"--priority, "
"--reason, "
"--reputation, "
"--risk_score, "
"--root_cause, "
"--severity, "
"--status, "
"or --verdict "
"is required.")


def update_alert(
http_session: requests.AuthorizedSession,
proj_id: str,
Expand Down Expand Up @@ -189,114 +313,15 @@ def update_alert(


if __name__ == "__main__":
parser = argparse.ArgumentParser()
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument(
main_parser = get_update_parser()
main_parser.add_argument(
"--alert_id", type=str, required=True,
help="identifier for the alert"
)
parser.add_argument(
"--confidence_score",
type=int,
required=False,
default=None,
help="confidence score [1-100] of the finding",
)
parser.add_argument(
"--priority",
choices=PRIORITY_ENUM,
required=False,
default=None,
help="alert priority.",
)
parser.add_argument(
"--reason",
choices=REASON_ENUM,
required=False,
default=None,
help="reason for closing an Alert",
)
parser.add_argument(
"--reputation",
choices=REPUTATION_ENUM,
required=False,
default=None,
help="A categorization of the finding as useful or not useful",
)
parser.add_argument(
"--status",
choices=STATUS_ENUM,
required=False,
default=None,
help="alert status",
)
parser.add_argument(
"--verdict",
choices=VERDICT_ENUM,
required=False,
default=None,
help="a verdict on whether the finding reflects a security incident",
)
parser.add_argument(
"--risk_score",
type=int,
required=False,
default=None,
help="risk score [0-100] of the finding",
)
parser.add_argument(
"--disregarded",
type=bool,
required=False,
default=None,
help="Analyst disregard (or un-disregard) the event",
)
parser.add_argument(
"--severity",
type=int,
required=False,
default=None,
help="severity score [0-100] of the finding",
)
parser.add_argument(
"--comment",
type=str,
required=False,
default=None,
help="Analyst comment.",
)
parser.add_argument(
"--root_cause",
type=str,
required=False,
default=None,
help="Alert root cause.",
)

args = parser.parse_args()
args = main_parser.parse_args()

# Check if at least one of the specific arguments is provided
if not any(
[
args.reason,
args.reputation,
args.priority,
args.status,
args.verdict,
args.risk_score or args.risk_score == 0,
args.disregarded,
args.severity or args.severity == 0,
args.comment or args.comment == "", # pylint: disable=g-explicit-bool-comparison
args.root_cause or args.root_cause == "", # pylint: disable=g-explicit-bool-comparison
]
):
parser.error("At least one of the arguments --reputation, --reason, "
"--priority, --status, --verdict, --risk_score, "
"--disregarded, --severity, --comment, "
"or --root_cause is required.")
check_args(main_parser, args)

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
Expand Down

0 comments on commit 772e4e2

Please sign in to comment.