From 040bb05a58011e245a5227658fd6a29fc04487c2 Mon Sep 17 00:00:00 2001 From: Bodong Yang <86948717+Bodong-Yang@users.noreply.github.com> Date: Tue, 5 Mar 2024 14:57:18 +0900 Subject: [PATCH] refactor(v3.6.x): cleanup iot-logger related (#277) This PR removes everything related to aws iot-logger in otaclient package, a separated implementation is introduced in https://github.com/tier4/otaclient-iot-logging-server instead. Also, a new log tee logging handler is implemented in log_setting.py to replace the one in aws-iot-logger package. --- otaclient/app/log_setting.py | 69 +++++- otaclient/app/main.py | 2 +- otaclient/app/ota_client_stub.py | 2 +- otaclient/aws_iot_log_server/__init__.py | 18 -- otaclient/aws_iot_log_server/__main__.py | 77 ------- .../aws_iot_log_server/aws_iot_logger.py | 210 ------------------ otaclient/aws_iot_log_server/boto3_session.py | 137 ------------ otaclient/aws_iot_log_server/configs.py | 18 -- .../aws_iot_log_server/custom_http_handler.py | 89 -------- .../aws_iot_log_server/greengrass_config.py | 116 ---------- otaclient/aws_iot_log_server/http_server.py | 43 ---- .../aws_iot_log_server/log_proxy_server.py | 81 ------- otaclient/requirements.txt | 6 - tests/test_aws_iot_log_server/__init__.py | 15 -- tests/test_aws_iot_log_server/conftest.py | 25 --- .../data/greengrass/config.json | 29 --- .../data/greengrass/config.yaml | 16 -- .../greengrass/config_invalid_thingArn.json | 29 --- .../test_boto3_session.py | 56 ----- .../test_greengrass_config.py | 77 ------- 20 files changed, 65 insertions(+), 1050 deletions(-) delete mode 100644 otaclient/aws_iot_log_server/__init__.py delete mode 100644 otaclient/aws_iot_log_server/__main__.py delete mode 100644 otaclient/aws_iot_log_server/aws_iot_logger.py delete mode 100644 otaclient/aws_iot_log_server/boto3_session.py delete mode 100644 otaclient/aws_iot_log_server/configs.py delete mode 100644 otaclient/aws_iot_log_server/custom_http_handler.py delete mode 100644 otaclient/aws_iot_log_server/greengrass_config.py delete mode 100644 otaclient/aws_iot_log_server/http_server.py delete mode 100644 otaclient/aws_iot_log_server/log_proxy_server.py delete mode 100644 tests/test_aws_iot_log_server/__init__.py delete mode 100644 tests/test_aws_iot_log_server/conftest.py delete mode 100644 tests/test_aws_iot_log_server/data/greengrass/config.json delete mode 100644 tests/test_aws_iot_log_server/data/greengrass/config.yaml delete mode 100644 tests/test_aws_iot_log_server/data/greengrass/config_invalid_thingArn.json delete mode 100644 tests/test_aws_iot_log_server/test_boto3_session.py delete mode 100644 tests/test_aws_iot_log_server/test_greengrass_config.py diff --git a/otaclient/app/log_setting.py b/otaclient/app/log_setting.py index c5bcdbb98..ffe87492d 100644 --- a/otaclient/app/log_setting.py +++ b/otaclient/app/log_setting.py @@ -12,9 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import annotations + import logging import os +import time import yaml +from queue import Queue, Empty +from threading import Thread +from urllib.parse import urljoin + +import requests from otaclient import otaclient_package_name from .configs import config as cfg @@ -37,7 +46,53 @@ def get_logger(name: str, loglevel: int) -> logging.Logger: return logger -def configure_logging(loglevel: int, *, http_logging_url: str): +class _LogTeeHandler(logging.Handler): + """Implementation of teeing local logs to a remote otaclient-iot-logger server.""" + + def __init__( + self, + upload_interval: int = 30, + max_entries_per_upload: int = 512, + max_backlog: int = 2048, + ) -> None: + super().__init__() + self._queue: Queue[str] = Queue(maxsize=max_backlog) + self._max_entries_per_upload = max_entries_per_upload + self._upload_interval = upload_interval + + def emit(self, record: logging.LogRecord) -> None: + try: + self._queue.put_nowait(self.format(record)) + except Exception: + pass + + def start_upload_thread(self, endpoint_url: str): + _queue, _max_per_upload, _interval = ( + self._queue, + self._max_entries_per_upload, + self._upload_interval, + ) + + def _thread_main(): + _session = requests.Session() + + while True: + for _ in range(_max_per_upload): + try: + _entry = _queue.get_nowait() + _session.post(endpoint_url, data=_entry) + except Empty: + break + except Exception: + pass + time.sleep(_interval) + + _thread = Thread(target=_thread_main, daemon=True) + _thread.start() + return _thread + + +def configure_logging(loglevel: int, *, ecu_id: str): """Configure logging with http handler.""" # configure the root logger # NOTE: force to reload the basicConfig, this is for overriding setting @@ -49,15 +104,17 @@ def configure_logging(loglevel: int, *, http_logging_url: str): _otaclient_logger = logging.getLogger(otaclient_package_name) _otaclient_logger.setLevel(loglevel) - # if http_logging is enabled, attach the http handler to - # the otaclient package root logger - if http_logging_host := os.environ.get("HTTP_LOGGING_SERVER"): - from otaclient.aws_iot_log_server import CustomHttpHandler + if iot_logger_url := os.environ.get("HTTP_LOGGING_SERVER"): + iot_logger_url = f"{iot_logger_url.rstrip('/')}/" + log_upload_endpoint = urljoin(iot_logger_url, ecu_id) - ch = CustomHttpHandler(host=http_logging_host, url=http_logging_url) + ch = _LogTeeHandler() fmt = logging.Formatter(fmt=cfg.LOG_FORMAT) ch.setFormatter(fmt) + # star the logging thread + ch.start_upload_thread(log_upload_endpoint) + # NOTE: "otaclient" logger will be the root logger for all loggers name # starts with "otaclient.", and the settings will affect its child loggers. # For example, settings for "otaclient" logger will also be effective to diff --git a/otaclient/app/main.py b/otaclient/app/main.py index 1da711042..08296cfe1 100644 --- a/otaclient/app/main.py +++ b/otaclient/app/main.py @@ -27,7 +27,7 @@ from .ota_client_service import launch_otaclient_grpc_server # configure logging before any code being executed -configure_logging(loglevel=cfg.DEFAULT_LOG_LEVEL, http_logging_url=get_ecu_id()) +configure_logging(loglevel=cfg.DEFAULT_LOG_LEVEL, ecu_id=get_ecu_id()) logger = logging.getLogger(__name__) diff --git a/otaclient/app/ota_client_stub.py b/otaclient/app/ota_client_stub.py index d9affd1c6..cc1444ab9 100644 --- a/otaclient/app/ota_client_stub.py +++ b/otaclient/app/ota_client_stub.py @@ -90,7 +90,7 @@ def _subprocess_init(self): # to CRITICAL to filter out third_party libs' logging(requests, urllib3, etc.), # and then set the otaclient.ota_proxy logger to DEFAULT_LOG_LEVEL log_setting.configure_logging( - loglevel=logging.CRITICAL, http_logging_url=log_setting.get_ecu_id() + loglevel=logging.CRITICAL, ecu_id=log_setting.get_ecu_id() ) otaproxy_logger = logging.getLogger("otaclient.ota_proxy") otaproxy_logger.setLevel(cfg.DEFAULT_LOG_LEVEL) diff --git a/otaclient/aws_iot_log_server/__init__.py b/otaclient/aws_iot_log_server/__init__.py deleted file mode 100644 index b968ae6e9..000000000 --- a/otaclient/aws_iot_log_server/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from .custom_http_handler import CustomHttpHandler - -__all__ = ["CustomHttpHandler"] diff --git a/otaclient/aws_iot_log_server/__main__.py b/otaclient/aws_iot_log_server/__main__.py deleted file mode 100644 index 0885be0cb..000000000 --- a/otaclient/aws_iot_log_server/__main__.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from .log_proxy_server import launch_server - -if __name__ == "__main__": - import argparse - from .greengrass_config import GreengrassConfig - - parser = argparse.ArgumentParser( - prog="aws_iot_log_server", formatter_class=argparse.RawTextHelpFormatter - ) - parser.add_argument("--host", help="host name", default="localhost") - parser.add_argument("--port", help="port number", default="8080") - parser.add_argument("--aws_credential_provider_endpoint", required=True) - parser.add_argument("--aws_role_alias", required=True) - parser.add_argument("--aws_cloudwatch_log_group", required=True) - parser.add_argument( - "--greengrass_v2_config", - help="greengrass v2 config.yaml.\n" - "If this option is specified, it is evaluated prior to the following.", - ) - parser.add_argument( - "--greengrass_config", - help="greengrass config.json.\n" - "If this option is not specified, the following arguments are required:", - ) - parser.add_argument("--ca_cert_file") - parser.add_argument("--private_key_file") - parser.add_argument("--cert_file") - parser.add_argument("--region") - parser.add_argument("--thing_name") - args = parser.parse_args() - kwargs = dict( - host=args.host, - port=int(args.port), - aws_credential_provider_endpoint=args.aws_credential_provider_endpoint, - aws_role_alias=args.aws_role_alias, - aws_cloudwatch_log_group=args.aws_cloudwatch_log_group, - ) - if args.greengrass_config or args.greengrass_v2_config: - ggcfg = GreengrassConfig.parse_config( - args.greengrass_config, args.greengrass_v2_config - ) - kwargs.update( - dict( - ca_cert_file=ggcfg["ca_cert"], - private_key_file=ggcfg["private_key"], - cert_file=ggcfg["cert"], - region=ggcfg["region"], - thing_name=ggcfg["thing_name"], - ) - ) - else: - kwargs.update( - dict( - ca_cert_file=args.ca_cert_file, - private_key_file=args.private_key_file, - cert_file=args.cert_file, - region=args.region, - thing_name=args.thing_name, - ) - ) - - launch_server(**kwargs) diff --git a/otaclient/aws_iot_log_server/aws_iot_logger.py b/otaclient/aws_iot_log_server/aws_iot_logger.py deleted file mode 100644 index 081ca349c..000000000 --- a/otaclient/aws_iot_log_server/aws_iot_logger.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import time -from retrying import retry -from botocore.exceptions import ClientError -from datetime import datetime -import logging -from queue import Queue, Empty -from concurrent.futures import ThreadPoolExecutor -from typing import TypedDict, List - -from .boto3_session import Boto3Session -from .configs import LOG_FORMAT - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -_sh = logging.StreamHandler() -fmt = logging.Formatter(fmt=LOG_FORMAT) -_sh.setFormatter(fmt) -logger.addHandler(_sh) - - -class LogMessage(TypedDict): - timestamp: int - message: str - - -class AwsIotLogger: - def __init__( - self, - aws_credential_provider_endpoint, - aws_role_alias, - aws_cloudwatch_log_group, - ca_cert_file, - private_key_file, - cert_file, - region, - thing_name, - interval=60, - ): - _boto3_session = Boto3Session( - config={ - "ca_cert": ca_cert_file, - "cert": cert_file, - "private_key": private_key_file, - "region": region, - "thing_name": thing_name, - }, - credential_provider_endpoint=aws_credential_provider_endpoint, - role_alias=aws_role_alias, - ) - _client = _boto3_session.get_session().client("logs") - - # create log group - try: - _client.create_log_group(logGroupName=aws_cloudwatch_log_group) - logger.info(f"{aws_cloudwatch_log_group=} has been created.") - except ( - _client.exceptions.OperationAbortedException, - _client.exceptions.ResourceAlreadyExistsException, - ): - pass - self._log_group_name = aws_cloudwatch_log_group - self._client = _client - self._thing_name = thing_name - self._sequence_tokens = {} - self._interval = interval - self._log_message_queue = Queue() - self._executor = ThreadPoolExecutor() - self._executor.submit(self._send_messages_thread) - - @retry(stop_max_attempt_number=5, wait_fixed=500) - def send_messages( - self, - log_stream_suffix: str, - message_list: List[LogMessage], - ): - log_stream_name = self._get_log_stream_name(log_stream_suffix) - sequence_token = self._sequence_tokens.get(log_stream_name) - _client = self._client - try: - log_event = { - "logGroupName": self._log_group_name, - "logStreamName": log_stream_name, - "logEvents": message_list, - } - if sequence_token: - log_event["sequenceToken"] = sequence_token - - response = _client.put_log_events(**log_event) - self._sequence_tokens[log_stream_name] = response.get("nextSequenceToken") - except ClientError as e: - if isinstance( - e, - ( - _client.exceptions.DataAlreadyAcceptedException, - _client.exceptions.InvalidSequenceTokenException, - ), - ): - next_expected_token = e.response["Error"]["Message"].rsplit(" ", 1)[-1] - # null as the next sequenceToken means don't include any - # sequenceToken at all, not that the token should be set to "null" - if next_expected_token == "null": - self._sequence_tokens[log_stream_name] = None - else: - self._sequence_tokens[log_stream_name] = next_expected_token - elif isinstance(e, _client.exceptions.ResourceNotFoundException): - try: - _client.create_log_stream( - logGroupName=self._log_group_name, - logStreamName=log_stream_name, - ) - logger.info(f"{log_stream_name=} has been created.") - except ( - _client.exceptions.OperationAbortedException, - _client.exceptions.ResourceAlreadyExistsException, - ): - self._sequence_tokens[log_stream_name] = None - raise - except Exception: - # put log and just ignore - logger.exception( - "put_log_events failure: " - f"log_group_name={self._log_group_name}, " - f"log_stream_name={log_stream_name}" - ) - - def put_message(self, log_stream_suffix: str, message: LogMessage): - data = {log_stream_suffix: message} - self._log_message_queue.put(data) - - def _send_messages_thread(self): - try: - while True: - # merge message - message_dict = {} - while True: - try: - q = self._log_message_queue - data = q.get(block=False) - log_stream_suffix = next(iter(data)) # first key - if log_stream_suffix not in message_dict: - message_dict[log_stream_suffix] = [] - message = data[log_stream_suffix] - message_dict[log_stream_suffix].append(message) - except Empty: - break - # send merged message - for k, v in message_dict.items(): - self.send_messages(k, v) - time.sleep(self._interval) - except Exception as e: - logger.exception(e) - - def _get_log_stream_name(self, log_stream_sufix): - fmt = "{strftime:%Y/%m/%d}".format(strftime=datetime.utcnow()) - return f"{fmt}/{self._thing_name}/{log_stream_sufix}" - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument("--aws_credential_provider_endpoint", required=True) - parser.add_argument("--aws_role_alias", required=True) - parser.add_argument("--aws_cloudwatch_log_group", required=True) - parser.add_argument("--ca_cert_file") - parser.add_argument("--private_key_file") - parser.add_argument("--cert_file") - parser.add_argument("--region") - parser.add_argument("--thing_name") - args = parser.parse_args() - - kwargs = dict( - aws_credential_provider_endpoint=args.aws_credential_provider_endpoint, - aws_role_alias=args.aws_role_alias, - aws_cloudwatch_log_group=args.aws_cloudwatch_log_group, - ca_cert_file=args.ca_cert_file, - private_key_file=args.private_key_file, - cert_file=args.cert_file, - region=args.region, - thing_name=args.thing_name, - interval=2, - ) - - aws_iot_logger = AwsIotLogger(**kwargs) - - def create_log_message(message: str): - return {"timestamp": int(time.time()) * 1000, "message": message} - - aws_iot_logger.send_messages("my_ecu_name4", [create_log_message("hello")]) - aws_iot_logger.send_messages("my_ecu_name4", [create_log_message("hello-x")]) - aws_iot_logger.send_messages("my_ecu_name4", [create_log_message("hello-xx")]) - - aws_iot_logger.put_message("my_ecu_name4", create_log_message("hello-1")) - aws_iot_logger.put_message("my_ecu_name4", create_log_message("hello-2")) - aws_iot_logger.put_message("my_ecu_name4", create_log_message("hello-3")) diff --git a/otaclient/aws_iot_log_server/boto3_session.py b/otaclient/aws_iot_log_server/boto3_session.py deleted file mode 100644 index 75fff42db..000000000 --- a/otaclient/aws_iot_log_server/boto3_session.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import requests -import pycurl -import json -import botocore.credentials -import botocore.session -import boto3 -import logging -import datetime -from pytz import utc - -from .configs import LOG_FORMAT - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -_sh = logging.StreamHandler() -fmt = logging.Formatter(fmt=LOG_FORMAT) -_sh.setFormatter(fmt) -logger.addHandler(_sh) - - -class Boto3Session: - def __init__( - self, - config: dict, - credential_provider_endpoint: str, - role_alias: str, - ): - cfg = config - - self._ca_cert = cfg.get("ca_cert") - self._cert = cfg.get("cert") - self._private_key = cfg.get("private_key") - self._region = cfg.get("region") - self._thing_name = cfg.get("thing_name") - - self._credential_provider_endpoint = credential_provider_endpoint - self._role_alias = role_alias - - # session is automatically refreshed - def get_session(self, session_duration: int = 0): - # ref: https://github.com/boto/botocore/blob/f1d41183e0fad31301ad7331a8962e3af6359a22/botocore/credentials.py#L368 - session_credentials = ( - botocore.credentials.RefreshableCredentials.create_from_metadata( - metadata=self._refresh(session_duration), - refresh_using=self._refresh, - method="sts-assume-role", - ) - ) - session = botocore.session.get_session() - session._credentials = session_credentials - session.set_config_variable("region", self._region) - - return boto3.Session(botocore_session=session) - - def _get_body(self, url, use_pycurl=False): - if use_pycurl: # pycurl implementation - headers = [f"x-amzn-iot-thingname:{self._thing_name}"] - connection = pycurl.Curl() - connection.setopt(connection.URL, url) - - if self._private_key.startswith("pkcs11:"): - connection.setopt(pycurl.SSLENGINE, "pkcs11") - connection.setopt(pycurl.SSLKEYTYPE, "eng") - - # server auth option - connection.setopt(connection.SSL_VERIFYPEER, True) - connection.setopt(connection.CAINFO, self._ca_cert) - connection.setopt(connection.CAPATH, None) - connection.setopt(connection.SSL_VERIFYHOST, 2) - - # client auth option - connection.setopt(connection.SSLCERT, self._cert) - connection.setopt(connection.SSLKEY, self._private_key) - connection.setopt(connection.HTTPHEADER, headers) - - response = connection.perform_rs() - status = connection.getinfo(pycurl.HTTP_CODE) - if status // 100 != 2: - raise Exception(f"response error: {status=}") - connection.close() - return json.loads(response) - else: # requests implementation - # ref: https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/authorizing-direct-aws.html - headers = {"x-amzn-iot-thingname": self._thing_name} - logger.info(f"url: {url}, headers: {headers}") - try: - response = requests.get( - url, - verify=self._ca_cert, - cert=(self._cert, self._private_key), - headers=headers, - ) - response.raise_for_status() - return json.loads(response.text) - except requests.exceptions.RequestException: - logger.warning("requests error") - raise - - def _refresh(self, session_duration: int = 0) -> dict: - url = f"https://{self._credential_provider_endpoint}/role-aliases/{self._role_alias}/credentials" - - try: - body = self._get_body(url, use_pycurl=True) - except json.JSONDecodeError: - logger.exception(f"invalid response: resp={body}") - raise - - expiry_time = body.get("credentials", {}).get("expiration") - if session_duration > 0: - now = datetime.datetime.now(tz=utc) - new_expiry_time = now + datetime.timedelta(seconds=float(session_duration)) - expiry_time = new_expiry_time.isoformat(timespec="seconds") - - logger.info(f"session is refreshed: expiry_time: {expiry_time}") - - credentials = { - "access_key": body.get("credentials", {}).get("accessKeyId"), - "secret_key": body.get("credentials", {}).get("secretAccessKey"), - "token": body.get("credentials", {}).get("sessionToken"), - "expiry_time": expiry_time, - } - return credentials diff --git a/otaclient/aws_iot_log_server/configs.py b/otaclient/aws_iot_log_server/configs.py deleted file mode 100644 index 0fe1e454b..000000000 --- a/otaclient/aws_iot_log_server/configs.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -LOG_FORMAT: str = ( - "[%(asctime)s][%(levelname)s]-%(filename)s:%(funcName)s:%(lineno)d,%(message)s" -) diff --git a/otaclient/aws_iot_log_server/custom_http_handler.py b/otaclient/aws_iot_log_server/custom_http_handler.py deleted file mode 100644 index 53fc00ba5..000000000 --- a/otaclient/aws_iot_log_server/custom_http_handler.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import requests -import logging -import logging.handlers -from queue import Queue, Empty -from threading import Thread -from requests.packages.urllib3.util.retry import Retry -from requests.adapters import HTTPAdapter - -logger = logging.getLogger(__name__) - - -class CustomHttpHandler(logging.Handler): - # https://stackoverflow.com/questions/62957814/python-logging-log-data-to-server-using-the-logging-module - def __init__(self, host, url, timeout=3.0): - """ - only method post is supported - secure(=https) is not supported - """ - super().__init__() - self.host = host - self.url = url if url.startswith("/") else f"/{url}" - self.timeout = timeout - self.queue = Queue(maxsize=4096) - self.thread = Thread(target=self._start, daemon=True) - self.thread.start() - - def emit(self, record): - log_entry = self.format(record) - if self.queue.full(): - try: - self.queue.get_nowait() - except Empty: - pass - try: - self.queue.put_nowait(log_entry) - except Empty: - pass - - def _start(self): - session = requests.Session() - retries = Retry(total=5, backoff_factor=1) # 0, 1, 2, 4, 8 - Retry.DEFAULT_BACKOFF_MAX = 10 - session.mount("http://", HTTPAdapter(max_retries=retries)) - while True: - log_entry = self.queue.get() - try: - session.post( - url=f"http://{self.host}{self.url}", - data=log_entry, - timeout=self.timeout, - ) - except Exception: - logger.exception("post failure") - - -if __name__ == "__main__": - from configs import LOG_FORMAT - - logger = logging.getLogger(__name__) - logger.setLevel(logging.INFO) - fmt = logging.Formatter(fmt=LOG_FORMAT) - - # stream handler - _sh = logging.StreamHandler() - _sh.setFormatter(fmt) - - _hh = CustomHttpHandler(host="localhost:8080", url="my-ecu-id-123") - _hh.setFormatter(fmt) - - logger.addHandler(_sh) - logger.addHandler(_hh) - - logger.info("123") - logger.info("xyz") diff --git a/otaclient/aws_iot_log_server/greengrass_config.py b/otaclient/aws_iot_log_server/greengrass_config.py deleted file mode 100644 index 1ff63f48c..000000000 --- a/otaclient/aws_iot_log_server/greengrass_config.py +++ /dev/null @@ -1,116 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -import json -import yaml -import re - -from .configs import LOG_FORMAT - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -_sh = logging.StreamHandler() -fmt = logging.Formatter(fmt=LOG_FORMAT) -_sh.setFormatter(fmt) -logger.addHandler(_sh) - - -class GreengrassConfig: - @staticmethod - def parse_config(v1_config, v2_config) -> dict: - try: - return GreengrassConfig.parse_v2_config(v2_config) - except Exception: - return GreengrassConfig.parse_v1_config(v1_config) - - @staticmethod - def parse_v1_config(config) -> dict: - try: - with open(config) as f: - cfg = json.load(f) - except FileNotFoundError: - logger.exception(f"config file is not found: file={config}") - raise - except json.JSONDecodeError as e: - logger.exception(f"invalid json format: {e}") - raise - - ca_path = cfg.get("crypto", {}).get("caPath") - private_key_path = ( - cfg.get("crypto", {}) - .get("principals", {}) - .get("IoTCertificate", {}) - .get("privateKeyPath") - ) - certificate_path = ( - cfg.get("crypto", {}) - .get("principals", {}) - .get("IoTCertificate", {}) - .get("certificatePath") - ) - thing_arn = cfg.get("coreThing", {}).get("thingArn") - - strs = thing_arn.split(":", 6) - if len(strs) != 6: - logger.error(f"invalid thing arn: thing_arn={thing_arn}") - raise Exception(f"invalid thing arn: thing_arn={thing_arn}") - - region = strs[3] - thing_name = strs[5] - - def remove_prefix(s, prefix): - return re.sub(f"^{prefix}", "", s) - - return { - "ca_cert": remove_prefix(ca_path, "file://"), - "private_key": remove_prefix(private_key_path, "file://"), - "cert": remove_prefix(certificate_path, "file://"), - "region": region, - "thing_name": remove_prefix(thing_name, "thing/"), - } - - @staticmethod - def parse_v2_config(config) -> dict: - try: - with open(config) as f: - cfg = yaml.safe_load(f) - except FileNotFoundError: - logger.exception(f"config file is not found: file={config}") - raise - except yaml.YAMLError as e: - logger.exception(f"invalid yaml format: {e}") - raise - - ca_path = cfg["system"]["rootCaPath"] - private_key_path = cfg["system"]["privateKeyPath"] - certificate_path = cfg["system"]["certificateFilePath"] - thing_name = cfg["system"]["thingName"] - # When greengrass_v2 uses TPM2.0, both private and certificate should be specified with pkcs11 notation. - # But certificate file is required for this module, so replace it to the actual file name and check if it exists. - if certificate_path.startswith("pkcs11:"): - certificate_path = "/greengrass/certs/gg.cert.pem" - with open(certificate_path) as f: - pass - - region = cfg["services"]["aws.greengrass.Nucleus"]["configuration"]["awsRegion"] - - return { - "ca_cert": ca_path, - "private_key": private_key_path, - "cert": certificate_path, - "region": region, - "thing_name": thing_name, - } diff --git a/otaclient/aws_iot_log_server/http_server.py b/otaclient/aws_iot_log_server/http_server.py deleted file mode 100644 index fdc1fc85c..000000000 --- a/otaclient/aws_iot_log_server/http_server.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import time -from http.server import BaseHTTPRequestHandler, HTTPServer -from urllib.parse import urlparse -from queue import Queue - - -class HttpHandler(BaseHTTPRequestHandler): - _queue = Queue() - - def do_POST(self): - parsed = urlparse(self.path) - content_len = int(self.headers.get("content-length", 0)) - req_body = self.rfile.read(content_len).decode("utf-8") - path_list = list(filter(lambda x: x, parsed.path.split("/"))) - self._queue.put( - { - "timestamp": int(time.time()) * 1000, - "path": path_list, - "message": req_body, - } - ) - self.send_response(200) - self.end_headers() - - -if __name__ == "__main__": - server = HTTPServer(("localhost", 8080), HttpHandler) - server.serve_forever() diff --git a/otaclient/aws_iot_log_server/log_proxy_server.py b/otaclient/aws_iot_log_server/log_proxy_server.py deleted file mode 100644 index 6fe8a55ae..000000000 --- a/otaclient/aws_iot_log_server/log_proxy_server.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from concurrent.futures import ThreadPoolExecutor -from http.server import HTTPServer - -from .aws_iot_logger import AwsIotLogger -from .http_server import HttpHandler - - -def sender( - aws_credential_provider_endpoint, - aws_role_alias, - aws_cloudwatch_log_group, - ca_cert_file, - private_key_file, - cert_file, - region, - thing_name, - interval=4, -): - iot_logger = AwsIotLogger( - aws_credential_provider_endpoint, - aws_role_alias, - aws_cloudwatch_log_group, - ca_cert_file, - private_key_file, - cert_file, - region, - thing_name, - interval, - ) - - while True: - try: - data = HttpHandler._queue.get() - message = {"timestamp": data["timestamp"], "message": data["message"]} - log_stream_suffix = "/".join(data["path"]) - iot_logger.put_message(log_stream_suffix, message) - except Exception as e: - print(e) - - -def launch_server( - host: str, - port: int, - aws_credential_provider_endpoint, - aws_role_alias, - aws_cloudwatch_log_group, - ca_cert_file, - private_key_file, - cert_file, - region, - thing_name, -): - server = HTTPServer((host, port), HttpHandler) - with ThreadPoolExecutor() as executor: - executor.submit( - sender, - aws_credential_provider_endpoint, - aws_role_alias, - aws_cloudwatch_log_group, - ca_cert_file, - private_key_file, - cert_file, - region, - thing_name, - ) - server.serve_forever() diff --git a/otaclient/requirements.txt b/otaclient/requirements.txt index 33564ad22..1f8491993 100644 --- a/otaclient/requirements.txt +++ b/otaclient/requirements.txt @@ -4,15 +4,9 @@ grpcio==1.53.1 protobuf==4.21.12 PyYAML>=3.12 requests==2.31.0 -watchtower==1.0.6 -pytz==2021.1 -botocore==1.20.112 -boto3==1.17.112 -retrying==1.3.3 urllib3>=1.26.8, <2.0.0 uvicorn[standard]==0.20.0 aiohttp==3.8.5 aiofiles==22.1.0 zstandard==0.18.0 -pycurl==7.45.1 typing_extensions==4.6.3 \ No newline at end of file diff --git a/tests/test_aws_iot_log_server/__init__.py b/tests/test_aws_iot_log_server/__init__.py deleted file mode 100644 index cbabb5667..000000000 --- a/tests/test_aws_iot_log_server/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - diff --git a/tests/test_aws_iot_log_server/conftest.py b/tests/test_aws_iot_log_server/conftest.py deleted file mode 100644 index b2e2d8ebb..000000000 --- a/tests/test_aws_iot_log_server/conftest.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pytest -from pathlib import Path - - -@pytest.fixture -def shared_datadir(tmp_path: Path): - _shared_datadir = tmp_path / "_shared_datadir" - _shared_datadir.symlink_to(Path(__file__).parent / "data") - - return _shared_datadir diff --git a/tests/test_aws_iot_log_server/data/greengrass/config.json b/tests/test_aws_iot_log_server/data/greengrass/config.json deleted file mode 100644 index 84452a838..000000000 --- a/tests/test_aws_iot_log_server/data/greengrass/config.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "coreThing": { - "caPath": "root.ca.pem", - "certPath": "gg.cert.pem", - "keyPath": "gg.private.key", - "thingArn": "arn:aws:iot:ap-northeast-1:123:thing/foo-bar", - "iotHost": "abc-ats.iot.ap-northeast-1.amazonaws.com", - "ggHost": "greengrass-ats.iot.ap-northeast-1.amazonaws.com", - "keepAlive": 30 - }, - "runtime": { - "cgroup": { - "useSystemd": "yes" - } - }, - "managedRespawn": false, - "crypto": { - "caPath": "file:///greengrass/certs/root.ca.pem", - "principals": { - "IoTCertificate": { - "privateKeyPath": "file:///greengrass/certs/gg.private.key", - "certificatePath": "file:///greengrass/certs/gg.cert.pem" - }, - "SecretsManager": { - "privateKeyPath": "file:///greengrass/certs/gg.private.key" - } - } - } -} diff --git a/tests/test_aws_iot_log_server/data/greengrass/config.yaml b/tests/test_aws_iot_log_server/data/greengrass/config.yaml deleted file mode 100644 index fcf83f031..000000000 --- a/tests/test_aws_iot_log_server/data/greengrass/config.yaml +++ /dev/null @@ -1,16 +0,0 @@ ---- -system: - certificateFilePath: "/greengrass/certs/gg.cert.pem" - privateKeyPath: "/greengrass/certs/gg.private.key" - rootCaPath: "/greengrass/certs/root.ca.pem" - rootpath: "/greengrass/v2" - thingName: "foo-bar-v2" -services: - aws.greengrass.Nucleus: - componentType: "NUCLEUS" - version: "2.6.0" - configuration: - awsRegion: "ap-northeast-1" - iotRoleAlias: "foo-bar-role-alias" - iotDataEndpoint: "xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" - iotCredEndpoint: "xxxxxxxxxxxxx.credentials.iot.ap-northeast-1.amazonaws.com" diff --git a/tests/test_aws_iot_log_server/data/greengrass/config_invalid_thingArn.json b/tests/test_aws_iot_log_server/data/greengrass/config_invalid_thingArn.json deleted file mode 100644 index ecd5d08e3..000000000 --- a/tests/test_aws_iot_log_server/data/greengrass/config_invalid_thingArn.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "coreThing": { - "caPath": "root.ca.pem", - "certPath": "gg.cert.pem", - "keyPath": "gg.private.key", - "thingArn": "thing/foo-bar", - "iotHost": "abc-ats.iot.ap-northeast-1.amazonaws.com", - "ggHost": "greengrass-ats.iot.ap-northeast-1.amazonaws.com", - "keepAlive": 30 - }, - "runtime": { - "cgroup": { - "useSystemd": "yes" - } - }, - "managedRespawn": false, - "crypto": { - "caPath": "file:///greengrass/certs/root.ca.pem", - "principals": { - "IoTCertificate": { - "privateKeyPath": "file:///greengrass/certs/gg.private.key", - "certificatePath": "file:///greengrass/certs/gg.cert.pem" - }, - "SecretsManager": { - "privateKeyPath": "file:///greengrass/certs/gg.private.key" - } - } - } -} diff --git a/tests/test_aws_iot_log_server/test_boto3_session.py b/tests/test_aws_iot_log_server/test_boto3_session.py deleted file mode 100644 index 3da6a3fe4..000000000 --- a/tests/test_aws_iot_log_server/test_boto3_session.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import os - - -class TestBoto3Session: - def test__refresh_credentials(self, mocker, shared_datadir): - import pycurl - import requests - from otaclient.aws_iot_log_server.boto3_session import Boto3Session - from otaclient.aws_iot_log_server.greengrass_config import GreengrassConfig - - response = '{"credentials":{"accessKeyId":"123","secretAccessKey":"abc","sessionToken":"ABC","expiration":"2021-10-01T09:18:06Z"}}' - - connection_mock = mocker.MagicMock() - connection_mock.perform_rs = mocker.MagicMock(return_value=response) - connection_mock.getinfo = mocker.MagicMock(return_value=200) - pycurl.Curl = mocker.MagicMock(return_value=connection_mock) - - resp_mock = mocker.MagicMock() - resp_mock.text = response - - requests.get = mocker.MagicMock(return_value=resp_mock) - requests.raise_for_status = mocker.MagicMock() - - session_config = GreengrassConfig.parse_config( - os.path.join(os.path.join(shared_datadir, "greengrass/config.json")), - None, - ) - session = Boto3Session( - session_config, - "https://example.com", - "example_role_alias", - ) - got_credential = session._refresh() - want_credential = { - "access_key": "123", - "secret_key": "abc", - "token": "ABC", - "expiry_time": "2021-10-01T09:18:06Z", - } - - assert got_credential == want_credential diff --git a/tests/test_aws_iot_log_server/test_greengrass_config.py b/tests/test_aws_iot_log_server/test_greengrass_config.py deleted file mode 100644 index f2a191b94..000000000 --- a/tests/test_aws_iot_log_server/test_greengrass_config.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright 2022 TIER IV, INC. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import os -import pytest - - -class TestGreengarssConfig: - def test_parse_config(self, shared_datadir): - from otaclient.aws_iot_log_server.greengrass_config import GreengrassConfig - - config = GreengrassConfig.parse_config( - os.path.join(shared_datadir, "greengrass/config.json"), - None, - ) - assert config.get("ca_cert") == "/greengrass/certs/root.ca.pem" - assert config.get("private_key") == "/greengrass/certs/gg.private.key" - assert config.get("cert") == "/greengrass/certs/gg.cert.pem" - assert config.get("region") == "ap-northeast-1" - assert config.get("thing_name") == "foo-bar" - - def test_parse_config_no_file(self): - from otaclient.aws_iot_log_server.greengrass_config import GreengrassConfig - - with pytest.raises(Exception) as e: - GreengrassConfig.parse_config("no_file", None) - - assert str(e.value) == "[Errno 2] No such file or directory: 'no_file'" - - def test_parse_config_invalid_thing_arn(self, shared_datadir): - from otaclient.aws_iot_log_server.greengrass_config import GreengrassConfig - - with pytest.raises(Exception) as e: - GreengrassConfig.parse_config( - os.path.join(shared_datadir, "greengrass/config_invalid_thingArn.json"), - None, - ) - - assert str(e.value) == "invalid thing arn: thing_arn=thing/foo-bar" - - def test_parse_config_v2(self, shared_datadir): - from otaclient.aws_iot_log_server.greengrass_config import GreengrassConfig - - config = GreengrassConfig.parse_config( - os.path.join(shared_datadir, "greengrass/config.json"), - os.path.join(shared_datadir, "greengrass/config.yaml"), - ) - assert config.get("ca_cert") == "/greengrass/certs/root.ca.pem" - assert config.get("private_key") == "/greengrass/certs/gg.private.key" - assert config.get("cert") == "/greengrass/certs/gg.cert.pem" - assert config.get("region") == "ap-northeast-1" - assert config.get("thing_name") == "foo-bar-v2" - - def test_parse_config_illegal_v2(self, shared_datadir): - from otaclient.aws_iot_log_server.greengrass_config import GreengrassConfig - - config = GreengrassConfig.parse_config( - os.path.join(shared_datadir, "greengrass/config.json"), - os.path.join(shared_datadir, "greengrass/config.json"), # illegal v2 format - ) - assert config.get("ca_cert") == "/greengrass/certs/root.ca.pem" - assert config.get("private_key") == "/greengrass/certs/gg.private.key" - assert config.get("cert") == "/greengrass/certs/gg.cert.pem" - assert config.get("region") == "ap-northeast-1" - assert config.get("thing_name") == "foo-bar"