From 9554199ee3226a3dc7f625051080adbeaf943143 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 27 Dec 2021 12:35:56 +0000 Subject: [PATCH 01/11] FEA: Use separate threads for serial port reading and packet parsing --- data_gateway/cli.py | 4 +-- data_gateway/packet_reader.py | 55 +++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/data_gateway/cli.py b/data_gateway/cli.py index d2123b6f..7b613925 100644 --- a/data_gateway/cli.py +++ b/data_gateway/cli.py @@ -205,8 +205,8 @@ def start( # Start packet reader in a separate thread so commands can be sent to it in real time in interactive mode or by a # routine. - thread = threading.Thread(target=packet_reader.read_packets, args=(serial_port,), daemon=True) - thread.start() + reader_thread = threading.Thread(target=packet_reader.read_packets, args=(serial_port,), daemon=True) + reader_thread.start() try: if interactive: diff --git a/data_gateway/packet_reader.py b/data_gateway/packet_reader.py index 4f64d059..bfd9cbad 100644 --- a/data_gateway/packet_reader.py +++ b/data_gateway/packet_reader.py @@ -2,7 +2,9 @@ import json import logging import os +import queue import struct +import threading from octue.cloud import storage @@ -97,8 +99,12 @@ def read_packets(self, serial_port, stop_when_no_more_data=False): with self.uploader: with self.writer: - while not self.stop: + packet_queue = queue.Queue() + parser_thread = threading.Thread(target=self._parse_payload, args=(packet_queue,), daemon=True) + parser_thread.start() + + while not self.stop: serial_data = serial_port.read() if len(serial_data) == 0: @@ -127,8 +133,13 @@ def read_packets(self, serial_port, stop_when_no_more_data=False): serial_port.open() continue - self._parse_payload( - packet_type=packet_type, payload=payload, data=data, previous_timestamp=previous_timestamp + packet_queue.put( + { + "packet_type": packet_type, + "payload": payload, + "data": data, + "previous_timestamp": previous_timestamp, + } ) def update_handles(self, payload): @@ -186,30 +197,36 @@ def _persist_configuration(self): ), ) - def _parse_payload(self, packet_type, payload, data, previous_timestamp): + def _parse_payload(self, packet_queue): """Check if a full payload has been received (correct length) with correct packet type handle, then parse the payload from a serial port. - :param str packet_type: - :param iter payload: - :param dict data: - :param dict previous_timestamp: + :param queue.Queue packet_queue: + :return None: """ - if packet_type not in self.handles: - logger.error("Received packet with unknown type: %s", packet_type) - raise exceptions.UnknownPacketTypeError("Received packet with unknown type: {}".format(packet_type)) + while not self.stop: + packet_type, payload, data, previous_timestamp = packet_queue.get().values() + + if packet_type not in self.handles: + logger.error("Received packet with unknown type: %s", packet_type) + raise exceptions.UnknownPacketTypeError("Received packet with unknown type: {}".format(packet_type)) - if len(payload) == 244: # If the full data payload is received, proceed parsing it - timestamp = int.from_bytes(payload[240:244], self.config.endian, signed=False) / (2 ** 16) + if len(payload) == 244: # If the full data payload is received, proceed parsing it + timestamp = int.from_bytes(payload[240:244], self.config.endian, signed=False) / (2 ** 16) - data, sensor_names = self._parse_sensor_packet_data(self.handles[packet_type], payload, data) + data, sensor_names = self._parse_sensor_packet_data(self.handles[packet_type], payload, data) - for sensor_name in sensor_names: - self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp) - self._timestamp_and_persist_data(data, sensor_name, timestamp, self.config.period[sensor_name]) + for sensor_name in sensor_names: + self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp) + self._timestamp_and_persist_data(data, sensor_name, timestamp, self.config.period[sensor_name]) - elif len(payload) >= 1 and self.handles[packet_type] in ["Mic 1", "Cmd Decline", "Sleep State", "Info Message"]: - self._parse_info_packet(self.handles[packet_type], payload) + elif len(payload) >= 1 and self.handles[packet_type] in [ + "Mic 1", + "Cmd Decline", + "Sleep State", + "Info Message", + ]: + self._parse_info_packet(self.handles[packet_type], payload) def _parse_sensor_packet_data(self, packet_type, payload, data): """Parse sensor data type payloads. From 23bac95db6f872d1fd68e919bcdb7f4d31ce9ea4 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 27 Dec 2021 12:43:16 +0000 Subject: [PATCH 02/11] ENH: Add thread name to logging context --- data_gateway/__init__.py | 3 +++ data_gateway/cli.py | 3 ++- tests/__init__.py | 3 ++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/data_gateway/__init__.py b/data_gateway/__init__.py index 5636c833..352e65b5 100644 --- a/data_gateway/__init__.py +++ b/data_gateway/__init__.py @@ -1,5 +1,8 @@ +from octue.log_handlers import create_formatter + from . import exceptions __all__ = ("exceptions",) +LOG_FORMATTER = create_formatter(logging_metadata=("%(asctime)s", "%(levelname)s", "%(threadName)s", "%(name)s")) MICROPHONE_SENSOR_NAME = "Mics" diff --git a/data_gateway/cli.py b/data_gateway/cli.py index 7b613925..19f3f8a8 100644 --- a/data_gateway/cli.py +++ b/data_gateway/cli.py @@ -10,6 +10,7 @@ from requests import HTTPError from slugify import slugify +from data_gateway import LOG_FORMATTER from data_gateway.configuration import Configuration from data_gateway.dummy_serial import DummySerial from data_gateway.exceptions import DataMustBeSavedError, WrongNumberOfSensorCoordinatesError @@ -45,7 +46,7 @@ def gateway_cli(logger_uri, log_level): from octue.log_handlers import apply_log_handler, get_remote_handler # Apply log handler locally. - apply_log_handler(log_level=log_level.upper()) + apply_log_handler(log_level=log_level.upper(), formatter=LOG_FORMATTER) # Stream logs to remote handler if required. if logger_uri is not None: diff --git a/tests/__init__.py b/tests/__init__.py index 658dd7f8..95527b6d 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,9 +1,10 @@ from octue.log_handlers import apply_log_handler +from data_gateway import LOG_FORMATTER from data_gateway.configuration import Configuration -apply_log_handler() +apply_log_handler(formatter=LOG_FORMATTER) TEST_PROJECT_NAME = "a-project-name" From 6216fc5e98bc307b993e1ff312d60ed4f9d7aa9c Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 27 Dec 2021 13:01:35 +0000 Subject: [PATCH 03/11] FIX: Raise errors from parser thread in reader thread --- data_gateway/packet_reader.py | 56 ++++++++++++++++++++++------------- tests/test_packet_reader.py | 2 +- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/data_gateway/packet_reader.py b/data_gateway/packet_reader.py index bfd9cbad..35f6060f 100644 --- a/data_gateway/packet_reader.py +++ b/data_gateway/packet_reader.py @@ -99,12 +99,21 @@ def read_packets(self, serial_port, stop_when_no_more_data=False): with self.uploader: with self.writer: - packet_queue = queue.Queue() - parser_thread = threading.Thread(target=self._parse_payload, args=(packet_queue,), daemon=True) + error_queue = queue.Queue() + + parser_thread = threading.Thread( + target=self._parse_payload, + kwargs={"packet_queue": packet_queue, "error_queue": error_queue}, + daemon=True, + ) + parser_thread.start() while not self.stop: + if not error_queue.empty(): + raise error_queue.get() + serial_data = serial_port.read() if len(serial_data) == 0: @@ -197,36 +206,41 @@ def _persist_configuration(self): ), ) - def _parse_payload(self, packet_queue): + def _parse_payload(self, packet_queue, error_queue): """Check if a full payload has been received (correct length) with correct packet type handle, then parse the payload from a serial port. :param queue.Queue packet_queue: + :param queue.Queue error_queue: :return None: """ - while not self.stop: - packet_type, payload, data, previous_timestamp = packet_queue.get().values() + try: + while not self.stop: + packet_type, payload, data, previous_timestamp = packet_queue.get().values() + + if packet_type not in self.handles: + logger.error("Received packet with unknown type: %s", packet_type) + raise exceptions.UnknownPacketTypeError("Received packet with unknown type: {}".format(packet_type)) - if packet_type not in self.handles: - logger.error("Received packet with unknown type: %s", packet_type) - raise exceptions.UnknownPacketTypeError("Received packet with unknown type: {}".format(packet_type)) + if len(payload) == 244: # If the full data payload is received, proceed parsing it + timestamp = int.from_bytes(payload[240:244], self.config.endian, signed=False) / (2 ** 16) - if len(payload) == 244: # If the full data payload is received, proceed parsing it - timestamp = int.from_bytes(payload[240:244], self.config.endian, signed=False) / (2 ** 16) + data, sensor_names = self._parse_sensor_packet_data(self.handles[packet_type], payload, data) - data, sensor_names = self._parse_sensor_packet_data(self.handles[packet_type], payload, data) + for sensor_name in sensor_names: + self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp) + self._timestamp_and_persist_data(data, sensor_name, timestamp, self.config.period[sensor_name]) - for sensor_name in sensor_names: - self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp) - self._timestamp_and_persist_data(data, sensor_name, timestamp, self.config.period[sensor_name]) + elif len(payload) >= 1 and self.handles[packet_type] in [ + "Mic 1", + "Cmd Decline", + "Sleep State", + "Info Message", + ]: + self._parse_info_packet(self.handles[packet_type], payload) - elif len(payload) >= 1 and self.handles[packet_type] in [ - "Mic 1", - "Cmd Decline", - "Sleep State", - "Info Message", - ]: - self._parse_info_packet(self.handles[packet_type], payload) + except Exception as e: + error_queue.put(e) def _parse_sensor_packet_data(self, packet_type, payload, data): """Parse sensor data type payloads. diff --git a/tests/test_packet_reader.py b/tests/test_packet_reader.py index d722436d..21d90d25 100644 --- a/tests/test_packet_reader.py +++ b/tests/test_packet_reader.py @@ -82,7 +82,7 @@ def test_error_is_raised_if_unknown_sensor_type_packet_is_received(self): bucket_name=TEST_BUCKET_NAME, ) with self.assertRaises(exceptions.UnknownPacketTypeError): - packet_reader.read_packets(serial_port, stop_when_no_more_data=True) + packet_reader.read_packets(serial_port, stop_when_no_more_data=False) def test_configuration_file_is_persisted(self): """Test that the configuration file is persisted.""" From 053997c661ca404e0bb763490dbaf31bc81fa1fb Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 27 Dec 2021 13:16:12 +0000 Subject: [PATCH 04/11] DOC: Update packet reader docstrings --- data_gateway/packet_reader.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/data_gateway/packet_reader.py b/data_gateway/packet_reader.py index 35f6060f..d08ca44d 100644 --- a/data_gateway/packet_reader.py +++ b/data_gateway/packet_reader.py @@ -78,8 +78,8 @@ def __init__( self.writer = NoOperationContextManager() def read_packets(self, serial_port, stop_when_no_more_data=False): - """Read and process packets from a serial port, uploading them to Google Cloud storage and/or writing them to - disk. + """Read packets from a serial port and send them to a separate parser thread that will also upload them to + Google Cloud storage and/or write them to disk. :param serial.Serial serial_port: name of serial port to read from :param bool stop_when_no_more_data: stop reading when no more data is received from the port (for testing) @@ -207,11 +207,13 @@ def _persist_configuration(self): ) def _parse_payload(self, packet_queue, error_queue): - """Check if a full payload has been received (correct length) with correct packet type handle, then parse the - payload from a serial port. + """Get packets from a thread-safe packet queue, check if a full payload has been received (i.e. correct length) + with the correct packet type handle, then parse the payload. After parsing/processing, upload them to Google + Cloud storage and/or write them to disk. If any errors are raised, put them on the error queue for the reader + thread to handle. - :param queue.Queue packet_queue: - :param queue.Queue error_queue: + :param queue.Queue packet_queue: a thread-safe queue of packets provided by the reader thread + :param queue.Queue error_queue: a thread-safe queue to put any exceptions on to for th reader thread to handle :return None: """ try: From e9dcae00b89806136a795b44852de06bf8b2812c Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 27 Dec 2021 13:17:11 +0000 Subject: [PATCH 05/11] OPS: Increase version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 63e80dba..0e5224a6 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ setup( name="data_gateway", - version="0.8.0", + version="0.9.0", install_requires=[ "click>=7.1.2", "pyserial==3.5", From 235daf50ba7a9786906be0b2ed2adba1dbbd8eff Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 27 Dec 2021 13:25:09 +0000 Subject: [PATCH 06/11] OPS: Remove MacOS test runner from CI workflow --- .github/workflows/python-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index c68a0e18..eaeefdb7 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -37,7 +37,7 @@ jobs: strategy: matrix: python: [3.8] - os: [ubuntu-latest, windows-latest, macos-latest] + os: [ubuntu-latest, windows-latest] steps: - name: Checkout Repository uses: actions/checkout@v2 From 520e7c8355ace41661d47c4842a81c001598ab2f Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Mon, 27 Dec 2021 13:33:26 +0000 Subject: [PATCH 07/11] ENH: Set thread names to useful values --- data_gateway/cli.py | 1 + data_gateway/packet_reader.py | 1 + 2 files changed, 2 insertions(+) diff --git a/data_gateway/cli.py b/data_gateway/cli.py index 19f3f8a8..a2cc83ec 100644 --- a/data_gateway/cli.py +++ b/data_gateway/cli.py @@ -207,6 +207,7 @@ def start( # Start packet reader in a separate thread so commands can be sent to it in real time in interactive mode or by a # routine. reader_thread = threading.Thread(target=packet_reader.read_packets, args=(serial_port,), daemon=True) + reader_thread.setName("ReaderThread") reader_thread.start() try: diff --git a/data_gateway/packet_reader.py b/data_gateway/packet_reader.py index d08ca44d..464ad8fc 100644 --- a/data_gateway/packet_reader.py +++ b/data_gateway/packet_reader.py @@ -108,6 +108,7 @@ def read_packets(self, serial_port, stop_when_no_more_data=False): daemon=True, ) + parser_thread.setName("ParserThread") parser_thread.start() while not self.stop: From 16e392130ec02eac0affd4d4145b88e52227eeb0 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Tue, 1 Feb 2022 18:41:21 +0000 Subject: [PATCH 08/11] DEP: Use latest octue version --- data_gateway/__init__.py | 3 --- data_gateway/cli.py | 9 ++++++--- setup.py | 2 +- tests/__init__.py | 3 +-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/data_gateway/__init__.py b/data_gateway/__init__.py index 352e65b5..5636c833 100644 --- a/data_gateway/__init__.py +++ b/data_gateway/__init__.py @@ -1,8 +1,5 @@ -from octue.log_handlers import create_formatter - from . import exceptions __all__ = ("exceptions",) -LOG_FORMATTER = create_formatter(logging_metadata=("%(asctime)s", "%(levelname)s", "%(threadName)s", "%(name)s")) MICROPHONE_SENSOR_NAME = "Mics" diff --git a/data_gateway/cli.py b/data_gateway/cli.py index a2cc83ec..4d26224f 100644 --- a/data_gateway/cli.py +++ b/data_gateway/cli.py @@ -10,7 +10,6 @@ from requests import HTTPError from slugify import slugify -from data_gateway import LOG_FORMATTER from data_gateway.configuration import Configuration from data_gateway.dummy_serial import DummySerial from data_gateway.exceptions import DataMustBeSavedError, WrongNumberOfSensorCoordinatesError @@ -46,11 +45,15 @@ def gateway_cli(logger_uri, log_level): from octue.log_handlers import apply_log_handler, get_remote_handler # Apply log handler locally. - apply_log_handler(log_level=log_level.upper(), formatter=LOG_FORMATTER) + apply_log_handler(log_level=log_level.upper(), include_thread_name=True) # Stream logs to remote handler if required. if logger_uri is not None: - apply_log_handler(handler=get_remote_handler(logger_uri=logger_uri), log_level=log_level.upper()) + apply_log_handler( + handler=get_remote_handler(logger_uri=logger_uri), + log_level=log_level.upper(), + include_thread_name=True, + ) @gateway_cli.command() diff --git a/setup.py b/setup.py index 0e5224a6..40bb6332 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ "click>=7.1.2", "pyserial==3.5", "python-slugify==5.0.2", - "octue==0.6.5", + "octue==0.10.0", ], url="https://gitlab.com/windenergie-hsr/aerosense/digital-twin/data-gateway", license="MIT", diff --git a/tests/__init__.py b/tests/__init__.py index 95527b6d..b6c30085 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,10 +1,9 @@ from octue.log_handlers import apply_log_handler -from data_gateway import LOG_FORMATTER from data_gateway.configuration import Configuration -apply_log_handler(formatter=LOG_FORMATTER) +apply_log_handler(include_thread_name=True) TEST_PROJECT_NAME = "a-project-name" From 48e8af3d1f52b81df35c2f0c005f98c698ab98a7 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Wed, 2 Feb 2022 11:37:39 +0000 Subject: [PATCH 09/11] DOC: Update docstrings skipci --- data_gateway/packet_reader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/data_gateway/packet_reader.py b/data_gateway/packet_reader.py index 464ad8fc..e0dc3334 100644 --- a/data_gateway/packet_reader.py +++ b/data_gateway/packet_reader.py @@ -78,8 +78,8 @@ def __init__( self.writer = NoOperationContextManager() def read_packets(self, serial_port, stop_when_no_more_data=False): - """Read packets from a serial port and send them to a separate parser thread that will also upload them to - Google Cloud storage and/or write them to disk. + """Read packets from a serial port and send them to a separate thread that will parse and upload them to Google + Cloud storage and/or write them to disk. :param serial.Serial serial_port: name of serial port to read from :param bool stop_when_no_more_data: stop reading when no more data is received from the port (for testing) @@ -214,7 +214,7 @@ def _parse_payload(self, packet_queue, error_queue): thread to handle. :param queue.Queue packet_queue: a thread-safe queue of packets provided by the reader thread - :param queue.Queue error_queue: a thread-safe queue to put any exceptions on to for th reader thread to handle + :param queue.Queue error_queue: a thread-safe queue to put any exceptions on to for the reader thread to handle :return None: """ try: From 9ab3fe95b1ea1be28cd49c8e424bce80fba73e0f Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Wed, 2 Feb 2022 11:42:18 +0000 Subject: [PATCH 10/11] OPS: Make release depend on tests passing after merge --- .github/workflows/python-ci.yml | 6 ++++-- .github/workflows/release.yml | 35 +++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index cb25650c..880c22ce 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -6,10 +6,12 @@ name: python-ci -on: [push] +on: + push: + branches-ignore: + - main jobs: - check-semantic-version: if: "!contains(github.event.head_commit.message, 'skipci')" runs-on: ubuntu-latest diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a3db3f73..b77fbc4d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,3 +1,5 @@ +# This workflow tests and releases a new version of the package. + name: Release the package on merge into main # Only trigger when a pull request into main branch is closed. @@ -8,9 +10,38 @@ on: - main jobs: + run-tests: + if: "github.event.pull_request.merged == true" + runs-on: ${{ matrix.os }} + env: + USING_COVERAGE: '3.8' + strategy: + matrix: + python: [ 3.8 ] + os: [ ubuntu-latest, windows-latest ] + steps: + - name: Checkout Repository + uses: actions/checkout@v2 + - name: Setup Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python }} + - name: Install tox + run: pip install tox + - name: Run tests + env: + GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }} + TEST_PROJECT_NAME: ${{ secrets.TEST_PROJECT_NAME }} + run: tox + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v1 + with: + file: coverage.xml + fail_ci_if_error: false + token: ${{ secrets.CODECOV_TOKEN }} + release: - # This job will only run if the PR has been merged (and not closed without merging). - if: "github.event.pull_request.merged == true && !contains(github.event.pull_request.head.message, 'skipci')" + needs: run-tests runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 From 2fdb7000779d47c5f221d1a83158b0de5e234872 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Wed, 2 Feb 2022 11:55:12 +0000 Subject: [PATCH 11/11] OPS: Skip PR description workflow if relevant; run it on all branches skipci --- .github/workflows/update-pull-request.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/update-pull-request.yml b/.github/workflows/update-pull-request.yml index c7b3885a..d63220e3 100644 --- a/.github/workflows/update-pull-request.yml +++ b/.github/workflows/update-pull-request.yml @@ -9,11 +9,10 @@ name: update-pull-request # Only trigger for pull requests into main branch. on: pull_request: - branches: - - main jobs: description: + if: "!contains(github.event.pull_request.body, '')" runs-on: ubuntu-latest steps: - uses: actions/checkout@v2