Skip to content

Commit

Permalink
v1alpha samples for: Create Ref List, Rule, UDM Event
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 599597439
  • Loading branch information
dandye authored and copybara-github committed Jan 18, 2024
1 parent 9f21767 commit 9b8ff82
Show file tree
Hide file tree
Showing 3 changed files with 443 additions and 0 deletions.
126 changes: 126 additions & 0 deletions detect/v1alpha/create_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable and reusable sample for creating a detection rule.
HTTP request
POST https://chronicle.googleapis.com/v1alpha/{parent}/rules
python3 -m detect.v1alpha.create_rule \
--project_guid $PROJECT_GUID \
--project_id $PROJECT_ID \
--rule_file=./ip_in_abuseipdb_blocklist.yaral
Requires the following IAM permission on the parent resource:
chronicle.rules.create
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules/create
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.rules#Rule
"""

import argparse
import json
from typing import Any, List, Mapping

from common import chronicle_auth
from common import project_guid
from common import project_id
from common import regions

from google.auth.transport import requests
from google.oauth2 import service_account


INGESTION_API_BASE_URL = "https://malachiteingestion-pa.googleapis.com"
AUTHORIZATION_SCOPES = ["https://www.googleapis.com/auth/malachite-ingestion"]

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def authorize(
credentials_file_path: str, scopes: List(str)
) -> requests.AuthorizedSession:
"""Obtains an authorized session using the provided credentials.
Args:
credentials_file_path: The service account credentials file path.
scopes: List of Chronicle API Scopes
Returns:
requests.AuthorizedSession: An authorized session for making API calls.
"""
credentials = service_account.Credentials.from_service_account_file(
credentials_file_path, scopes=scopes
)
return requests.AuthorizedSession(credentials)


def create_rule(
http_session: requests.AuthorizedSession, rule_file_path: str
) -> Mapping[str, Any]:
"""Creates a new detection rule to find matches in logs.
Args:
http_session: Authorized session for HTTP requests.
rule_file_path: Content of the new detection rule, used to evaluate logs.
Returns:
New detection rule.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
"""
parent = f"projects/{args.project_id}/locations/{args.region}/instances/{args.project_guid}"
url = f"https://{args.region}-chronicle.googleapis.com/v1alpha/{parent}/rules"

body = {
"name": "ip_in_abuseipdb_blocklist",
"text": rule_file_path.read(),
}
response = http_session.request("POST", url, json=body)
if response.status_code >= 400:
print(response.text)
response.raise_for_status()
return response.json()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_guid.add_argument_project_guid(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument(
"-f",
"--rule_file",
type=argparse.FileType("r"),
required=True,
# File example: python3 create_rule.py -f <path>
# STDIN example: cat rule.txt | python3 create_rule.py -f -
help="path of a file with the desired rule's content, or - for STDIN",
)
args = parser.parse_args()

auth_session = authorize(args.credentials_file, SCOPES)
new_rule = create_rule(auth_session, args.rule_file)
print(json.dumps(new_rule, indent=2))
150 changes: 150 additions & 0 deletions ingestion/v1alpha/create_udm_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable and reusable sample for ingesting events in UDM format.
WARNING: This script makes use of the Ingestion API v1alpha. v1alpha is currently only in
preview for certain Chronicle customers. Please reach out to your Chronicle
representative if you wish to use this API.
The Unified Data Model (UDM) is a way of representing events across all log
sources. See
https://cloud.google.com/chronicle/docs/unified-data-model/udm-field-list for a
description of UDM fields, and see
https://cloud.google.com/chronicle/docs/unified-data-model/format-events-as-udm
for how to describe a log as an event in UDM format.
This command accepts a path to a file (--json_events_file) that contains an
array of JSON formatted events in UDM format. See
./example_input/sample_udm_events.json for an example.
So, assuming you've created a credentials file at $HOME/.chronicle_credentials.json,
and you are using environment variables for your PROJECT_GUID and PROJECT_ID,
you can run this command using the sample input like so:
$ python3 -m ingestion.v1alpha.create_udm_events \
--project_guid $PROJECT_GUID \
--project_id $PROJECT_ID \
--json_events_file=./ingestion/example_input/sample_udm_events.json
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.events/import
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.events/import#EventsInlineSource
https://cloud.google.com/chronicle/docs/reference/udm-field-list
https://cloud.google.com/chronicle/docs/unified-data-model/udm-usage
"""

import argparse
import json
from typing import List

from common import chronicle_auth
from common import project_guid
from common import project_id
from common import regions

from google.auth.transport import requests
from google.oauth2 import service_account

INGESTION_API_BASE_URL = "https://malachiteingestion-pa.googleapis.com"
AUTHORIZATION_SCOPES = ["https://www.googleapis.com/auth/malachite-ingestion"]

SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]


def authorize(
credentials_file_path: str, scopes: List(str)
) -> requests.AuthorizedSession:
"""Obtains an authorized session using the provided credentials.
Args:
credentials_file_path: The service account credentials file path.
scopes: List of Chronicle API Scopes
Returns:
requests.AuthorizedSession: An authorized session for making API calls.
"""
credentials = service_account.Credentials.from_service_account_file(
credentials_file_path, scopes=scopes
)
return requests.AuthorizedSession(credentials)


def create_udm_events(
http_session: requests.AuthorizedSession, json_events: str
) -> None:
"""Sends a collection of UDM events to the Chronicle backend for ingestion.
A Unified Data Model (UDM) event is a structured representation of an event
regardless of the log source.
Args:
http_session: Authorized session for HTTP requests.
json_events: A collection of UDM events in (serialized) JSON format.
Raises:
requests.exceptions.HTTPError: HTTP request resulted in an error
(response.status_code >= 400).
Requires the following IAM permission on the parent resource:
chronicle.events.import
POST https://chronicle.googleapis.com/v1alpha/{parent}/events:import
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.events/import
"""
parent = f"projects/{args.project_id}/locations/{args.region}/instances/{args.project_guid}"
url = f"https://{args.region}-chronicle.googleapis.com/v1alpha/{parent}/events:import"

body = {
"inline_source": {
"events": [{
"name": "foo",
"udm": json.loads(json_events)[0],
}]
}
}

response = http_session.request("POST", url, json=body)
if response.status_code >= 400:
print(body)
print(response.text)
response.raise_for_status()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
# common
chronicle_auth.add_argument_credentials_file(parser)
project_guid.add_argument_project_guid(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
# local
parser.add_argument(
"--json_events_file",
type=argparse.FileType("r"),
required=True,
help=(
'path to a file (or "-" for STDIN) containing a list of UDM '
"events in json format"
),
)
args = parser.parse_args()

auth_session = authorize(args.credentials_file, SCOPES)
create_udm_events(auth_session, args.json_events_file.read())
Loading

0 comments on commit 9b8ff82

Please sign in to comment.