diff --git a/lists/append_to_list.py b/lists/append_to_list.py new file mode 100644 index 0000000..16ce708 --- /dev/null +++ b/lists/append_to_list.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Executable and reusable sample for appending to a list.""" + +import argparse +from typing import Sequence + +from common import chronicle_auth +from common import regions + +from google.auth.transport import requests + +from . import get_list + +CHRONICLE_API_BASE_URL = "https://backstory.googleapis.com" + + +# pylint: disable=bad-continuation +def append_to_list(http_session: requests.AuthorizedSession, + list_id: str, + content_lines: Sequence[str]) -> str: + """Append items to an existing reference list. + + Args: + http_session: Authorized session for HTTP requests. + list_id: ID of existing list. + content_lines: Iterable containing items to append to the existing list. + + Returns: + List update timestamp. + + Raises: + requests.exceptions.HTTPError: HTTP request resulted in an error + (response.status_code >= 400). + """ + url = f"{CHRONICLE_API_BASE_URL}/v2/lists" + + current_list = get_list.get_list(http_session, list_id) + seen = set(current_list) + deduplicated_list = [x for x in content_lines + if not (x in seen or seen.add(x))] + content_lines = current_list + deduplicated_list + + body = { + "name": list_id, + "lines": content_lines, + } + update_fields = ["list.lines"] + params = {"update_mask": ",".join(update_fields)} + + response = http_session.request("PATCH", url, params=params, json=body) + # Expected server response: + # { + # "name": "", + # "description": "", + # "createTime": "yyyy-mm-ddThh:mm:ss.ssssssZ", + # "lines": [ + # "", + # "", + # ... + # ], + # "contentType": "" + # } + + if response.status_code >= 400: + print(response.text) + response.raise_for_status() + return response.json()["createTime"] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + chronicle_auth.add_argument_credentials_file(parser) + regions.add_argument_region(parser) + parser.add_argument( + "-n", "--name", type=str, required=True, help="unique name for the list") + parser.add_argument( + "-f", + "--list_file", + type=argparse.FileType("r"), + required=True, + # File example: + # python3 -m lists.create_list -f + # STDIN example: + # cat | python3 -m lists.create_list -f - + help="path of a file containing the list content, or - for STDIN") + + args = parser.parse_args() + CHRONICLE_API_BASE_URL = regions.url(CHRONICLE_API_BASE_URL, args.region) + session = chronicle_auth.initialize_http_session(args.credentials_file) + + new_list_create_time = append_to_list( + session, args.name, + args.list_file.read().splitlines() + ) + + print(f"List successfully appended at {new_list_create_time}") diff --git a/lists/remove_from_list.py b/lists/remove_from_list.py new file mode 100644 index 0000000..5f65e0e --- /dev/null +++ b/lists/remove_from_list.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Executable and reusable sample for appending to an existing reference list.""" + +import argparse +from typing import Sequence + +from common import chronicle_auth +from common import regions + +from google.auth.transport import requests + +from . import get_list + +CHRONICLE_API_BASE_URL = "https://backstory.googleapis.com" + + +# pylint: disable=bad-continuation +def remove_from_list(http_session: requests.AuthorizedSession, + list_id: str, + content_lines: Sequence[str]) -> str: + """Remove items from an existing reference list. + + Args: + http_session: Authorized session for HTTP requests. + list_id: ID of existing list. + content_lines: Iterable containing items to remove from the existing list. + + Returns: + List update timestamp. + + Raises: + requests.exceptions.HTTPError: HTTP request resulted in an error + (response.status_code >= 400). + """ + url = f"{CHRONICLE_API_BASE_URL}/v2/lists" + + current_list = get_list.get_list(http_session, list_id) + to_remove = set(content_lines) + updated_list = [x for x in current_list if x not in to_remove] + + body = { + "name": list_id, + "lines": updated_list, + } + update_fields = ["list.lines"] + params = {"update_mask": ",".join(update_fields)} + + response = http_session.request("PATCH", url, params=params, json=body) + # Expected server response: + # { + # "name": "", + # "description": "", + # "createTime": "yyyy-mm-ddThh:mm:ss.ssssssZ", + # "lines": [ + # "", + # "", + # ... + # ], + # "contentType": "" + # } + + if response.status_code >= 400: + print(response.text) + response.raise_for_status() + return response.json()["createTime"] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + chronicle_auth.add_argument_credentials_file(parser) + regions.add_argument_region(parser) + parser.add_argument( + "-n", "--name", type=str, required=True, help="unique name for the list") + parser.add_argument( + "-f", + "--list_file", + type=argparse.FileType("r"), + required=True, + # File example: + # python3 -m lists.create_list -f + # STDIN example: + # cat | python3 -m lists.create_list -f - + help="path of a file containing the list content, or - for STDIN") + + args = parser.parse_args() + CHRONICLE_API_BASE_URL = regions.url(CHRONICLE_API_BASE_URL, args.region) + session = chronicle_auth.initialize_http_session(args.credentials_file) + new_list_create_time = remove_from_list(session, args.name, + args.list_file.read().splitlines()) + + print(f"Items successfully removed from list at {new_list_create_time}")