Skip to content

Commit

Permalink
MRG: Merge pull request #23 from aerosense-ai/feature/thread-current-…
Browse files Browse the repository at this point in the history
…serial-port-reading-implementation

Use separate threads for serial port reading and packet parsing
  • Loading branch information
cortadocodes authored Feb 2, 2022
2 parents edf6292 + 2fdb700 commit eb1ee44
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 36 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

name: python-ci

on: [push]
on:
push:
branches-ignore:
- main

jobs:

check-semantic-version:
if: "!contains(github.event.head_commit.message, 'skipci')"
runs-on: ubuntu-latest
Expand Down
35 changes: 33 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# This workflow tests and releases a new version of the package.

name: Release the package on merge into main

# Only trigger when a pull request into main branch is closed.
Expand All @@ -8,9 +10,38 @@ on:
- main

jobs:
run-tests:
if: "github.event.pull_request.merged == true"
runs-on: ${{ matrix.os }}
env:
USING_COVERAGE: '3.8'
strategy:
matrix:
python: [ 3.8 ]
os: [ ubuntu-latest, windows-latest ]
steps:
- name: Checkout Repository
uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Install tox
run: pip install tox
- name: Run tests
env:
GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
TEST_PROJECT_NAME: ${{ secrets.TEST_PROJECT_NAME }}
run: tox
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
file: coverage.xml
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}

release:
# This job will only run if the PR has been merged (and not closed without merging).
if: "github.event.pull_request.merged == true && !contains(github.event.pull_request.head.message, 'skipci')"
needs: run-tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/update-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ name: update-pull-request
# Only trigger for pull requests into main branch.
on:
pull_request:
branches:
- main

jobs:
description:
if: "!contains(github.event.pull_request.body, '<!--- SKIP AUTOGENERATED NOTES --->')"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
13 changes: 9 additions & 4 deletions data_gateway/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ def gateway_cli(logger_uri, log_level):
from octue.log_handlers import apply_log_handler, get_remote_handler

# Apply log handler locally.
apply_log_handler(log_level=log_level.upper())
apply_log_handler(log_level=log_level.upper(), include_thread_name=True)

# Stream logs to remote handler if required.
if logger_uri is not None:
apply_log_handler(handler=get_remote_handler(logger_uri=logger_uri), log_level=log_level.upper())
apply_log_handler(
handler=get_remote_handler(logger_uri=logger_uri),
log_level=log_level.upper(),
include_thread_name=True,
)


@gateway_cli.command()
Expand Down Expand Up @@ -205,8 +209,9 @@ def start(

# Start packet reader in a separate thread so commands can be sent to it in real time in interactive mode or by a
# routine.
thread = threading.Thread(target=packet_reader.read_packets, args=(serial_port,), daemon=True)
thread.start()
reader_thread = threading.Thread(target=packet_reader.read_packets, args=(serial_port,), daemon=True)
reader_thread.setName("ReaderThread")
reader_thread.start()

try:
if interactive:
Expand Down
78 changes: 56 additions & 22 deletions data_gateway/packet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import json
import logging
import os
import queue
import struct
import threading

from octue.cloud import storage

Expand Down Expand Up @@ -76,8 +78,8 @@ def __init__(
self.writer = NoOperationContextManager()

def read_packets(self, serial_port, stop_when_no_more_data=False):
"""Read and process packets from a serial port, uploading them to Google Cloud storage and/or writing them to
disk.
"""Read packets from a serial port and send them to a separate thread that will parse and upload them to Google
Cloud storage and/or write them to disk.
:param serial.Serial serial_port: name of serial port to read from
:param bool stop_when_no_more_data: stop reading when no more data is received from the port (for testing)
Expand All @@ -97,7 +99,21 @@ def read_packets(self, serial_port, stop_when_no_more_data=False):

with self.uploader:
with self.writer:
packet_queue = queue.Queue()
error_queue = queue.Queue()

parser_thread = threading.Thread(
target=self._parse_payload,
kwargs={"packet_queue": packet_queue, "error_queue": error_queue},
daemon=True,
)

parser_thread.setName("ParserThread")
parser_thread.start()

while not self.stop:
if not error_queue.empty():
raise error_queue.get()

serial_data = serial_port.read()

Expand Down Expand Up @@ -127,8 +143,13 @@ def read_packets(self, serial_port, stop_when_no_more_data=False):
serial_port.open()
continue

self._parse_payload(
packet_type=packet_type, payload=payload, data=data, previous_timestamp=previous_timestamp
packet_queue.put(
{
"packet_type": packet_type,
"payload": payload,
"data": data,
"previous_timestamp": previous_timestamp,
}
)

def update_handles(self, payload):
Expand Down Expand Up @@ -186,30 +207,43 @@ def _persist_configuration(self):
),
)

def _parse_payload(self, packet_type, payload, data, previous_timestamp):
"""Check if a full payload has been received (correct length) with correct packet type handle, then parse the
payload from a serial port.
def _parse_payload(self, packet_queue, error_queue):
"""Get packets from a thread-safe packet queue, check if a full payload has been received (i.e. correct length)
with the correct packet type handle, then parse the payload. After parsing/processing, upload them to Google
Cloud storage and/or write them to disk. If any errors are raised, put them on the error queue for the reader
thread to handle.
:param str packet_type:
:param iter payload:
:param dict data:
:param dict previous_timestamp:
:param queue.Queue packet_queue: a thread-safe queue of packets provided by the reader thread
:param queue.Queue error_queue: a thread-safe queue to put any exceptions on to for the reader thread to handle
:return None:
"""
if packet_type not in self.handles:
logger.error("Received packet with unknown type: %s", packet_type)
raise exceptions.UnknownPacketTypeError("Received packet with unknown type: {}".format(packet_type))
try:
while not self.stop:
packet_type, payload, data, previous_timestamp = packet_queue.get().values()

if packet_type not in self.handles:
logger.error("Received packet with unknown type: %s", packet_type)
raise exceptions.UnknownPacketTypeError("Received packet with unknown type: {}".format(packet_type))

if len(payload) == 244: # If the full data payload is received, proceed parsing it
timestamp = int.from_bytes(payload[240:244], self.config.endian, signed=False) / (2 ** 16)

if len(payload) == 244: # If the full data payload is received, proceed parsing it
timestamp = int.from_bytes(payload[240:244], self.config.endian, signed=False) / (2 ** 16)
data, sensor_names = self._parse_sensor_packet_data(self.handles[packet_type], payload, data)

data, sensor_names = self._parse_sensor_packet_data(self.handles[packet_type], payload, data)
for sensor_name in sensor_names:
self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp)
self._timestamp_and_persist_data(data, sensor_name, timestamp, self.config.period[sensor_name])

for sensor_name in sensor_names:
self._check_for_packet_loss(sensor_name, timestamp, previous_timestamp)
self._timestamp_and_persist_data(data, sensor_name, timestamp, self.config.period[sensor_name])
elif len(payload) >= 1 and self.handles[packet_type] in [
"Mic 1",
"Cmd Decline",
"Sleep State",
"Info Message",
]:
self._parse_info_packet(self.handles[packet_type], payload)

elif len(payload) >= 1 and self.handles[packet_type] in ["Mic 1", "Cmd Decline", "Sleep State", "Info Message"]:
self._parse_info_packet(self.handles[packet_type], payload)
except Exception as e:
error_queue.put(e)

def _parse_sensor_packet_data(self, packet_type, payload, data):
"""Parse sensor data type payloads.
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

setup(
name="data_gateway",
version="0.8.1",
version="0.9.0",
install_requires=[
"click>=7.1.2",
"pyserial==3.5",
"python-slugify==5.0.2",
"octue==0.6.5",
"octue==0.10.0",
],
url="https://gitlab.com/windenergie-hsr/aerosense/digital-twin/data-gateway",
license="MIT",
Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from data_gateway.configuration import Configuration


apply_log_handler()
apply_log_handler(include_thread_name=True)


TEST_PROJECT_NAME = "a-project-name"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_packet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def test_error_is_raised_if_unknown_sensor_type_packet_is_received(self):
bucket_name=TEST_BUCKET_NAME,
)
with self.assertRaises(exceptions.UnknownPacketTypeError):
packet_reader.read_packets(serial_port, stop_when_no_more_data=True)
packet_reader.read_packets(serial_port, stop_when_no_more_data=False)

def test_configuration_file_is_persisted(self):
"""Test that the configuration file is persisted."""
Expand Down

0 comments on commit eb1ee44

Please sign in to comment.