diff --git a/.github/workflows/ci-test.yml b/.github/workflows/ci-test.yml index 42c44395..6adcce87 100644 --- a/.github/workflows/ci-test.yml +++ b/.github/workflows/ci-test.yml @@ -63,6 +63,9 @@ jobs: - name: Code Quality - Black run: | poetry run black gamutrf --check + poetry run black utils --check + poetry run black torchserve --check + poetry run black augment --check - name: Code Quality - Pylint run: | poetry run pylint --fail-under=6 gamutrf/ @@ -71,7 +74,9 @@ jobs: PYTHONPATH: /usr/local/lib/python3.10/dist-packages:/usr/lib/python3/dist-packages run: | sudo pip3 install pytype=="$(grep -E "pytype = " pyproject.toml | grep -Eo "[0-9\.]+")" && \ - pytype -k gamutrf/ + sudo pip3 install -U pyserial && \ + pytype -k gamutrf/ && \ + pytype -k utils/mavlink-api - name: Test with pytest env: PYTHONPATH: /usr/local/lib/python3.10/dist-packages:/usr/lib/python3/dist-packages diff --git a/utils/mavlink-api/mavlink-api.py b/utils/mavlink-api/mavlink-api.py index 9e2fef96..9afc1ac5 100644 --- a/utils/mavlink-api/mavlink-api.py +++ b/utils/mavlink-api/mavlink-api.py @@ -1,8 +1,7 @@ -from flask import Flask, jsonify -from pymavlink import mavutil import threading import time -import serial +from flask import Flask, jsonify +from pymavlink import mavutil # pytype: disable=import-error app = Flask(__name__) @@ -16,6 +15,7 @@ def __init__(self): self.latitude = None self.longitude = None self.altitude = None + self.heading = None self.relative_alt = None self.time_usec = None self.vx = None @@ -32,7 +32,7 @@ def __init__(self): self.mavlink_thread.daemon = True self.mavlink_thread.start() - def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict: + def GLOBAL_POSITION_INT_parser(self, data=None): """Selected required data from data received from Pixhawk, and convert to required units. Args: @@ -41,7 +41,7 @@ def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict: clean_data (dict): Required data to be published """ # If data supplied, else use last msg - if data == None: + if data is None: data = self.latest_GLOBAL_POSITION_INT_msg.to_dict() # Check for stale GPS @@ -57,15 +57,12 @@ def GLOBAL_POSITION_INT_parser(self, data: dict = None) -> dict: self.vx = data["vx"] / 100.0 # meters/second self.vy = data["vy"] / 100.0 # meters/second self.vz = data["vz"] / 100.0 # meters/second - return - def GPS_RAW_INT_parser(self, data: dict = None) -> dict: + def GPS_RAW_INT_parser(self, data=None): """Selected required data from data received from Pixhawk, and convert to required units. Args: data (dict): Data received from the Pixhawk device - Returns: - clean_data (dict): Required data to be published GPS_FIX_TYPE [Enum] Type of GPS fix @@ -82,7 +79,7 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict: 8 GPS_FIX_TYPE_PPP PPP, 3D position. """ # If data supplied, else use last msg - if data == None: + if data is None: data = self.latest_GPS_RAW_INT_msg.to_dict() # Check for stale GPS @@ -92,8 +89,6 @@ def GPS_RAW_INT_parser(self, data: dict = None) -> dict: self.time_usec = data["time_usec"] # UNIX Epoch time uSec self.gps_fix_type = data["fix_type"] - return - def gps_stale_check(self): # Check for stale GPS data if (time.time() - self.latest_GPS_RAW_INT_timestamp > self.STALE_TIMEOUT) or ( @@ -160,8 +155,7 @@ def get_latest_gps_fix_status(): ), 200, ) - else: - return jsonify({"error": "No GPS data available"}), 404 + return jsonify({"error": "No GPS data available"}), 404 @app.route("/gps-data", methods=["GET"]) @@ -170,8 +164,7 @@ def get_latest_gps_data(): mavlink_gps_handler.GLOBAL_POSITION_INT_parser() msg = mavlink_gps_handler.create_gps_json_payload() return jsonify(msg), 200 - else: - return jsonify({"error": "No GPS data available"}), 404 + return jsonify({"error": "No GPS data available"}), 404 @app.route("/heading", methods=["GET"]) @@ -187,12 +180,11 @@ def get_latest_heading(): ), 200, ) - else: - return jsonify({"error": "No heading data available"}), 404 + return jsonify({"error": "No heading data available"}), 404 def main(): - app.run(host="0.0.0.0", port=8888) # nosec + app.run(host="0.0.0.0", port=8888) # nosec if __name__ == "__main__": diff --git a/utils/mavlink-api/pyproject.toml b/utils/mavlink-api/pyproject.toml index c4ae04e1..ae18edc0 100644 --- a/utils/mavlink-api/pyproject.toml +++ b/utils/mavlink-api/pyproject.toml @@ -6,7 +6,7 @@ authors = ["lk-iqt <112730501+lk-iqt@users.noreply.github.com>"] readme = "README.md" [tool.poetry.dependencies] -python = "^3.11" +python = ">=3.10,<3.13" Flask = "^3.0.0" pymavlink = "^2.4.40" pyserial = "^3.5" diff --git a/utils/mavlink-api/utils/gps-test.py b/utils/mavlink-api/utils/gps-test.py index 537496d5..02708294 100644 --- a/utils/mavlink-api/utils/gps-test.py +++ b/utils/mavlink-api/utils/gps-test.py @@ -1,28 +1,25 @@ -import requests import json import logging -import os -import socket import time -import csv -import time -from datetime import datetime import gpsd import httpx + def get_adafruit_gps(): try: if gpsd.gpsd_stream is None: gpsd.connect(host="127.0.0.1", port=2947) packet = gpsd.get_current() - vals= {"timestamp": time.time(), - "position": packet.position(), - "altitude": packet.altitude(), - "gps_time": packet.get_time().timestamp(), - "map_url": packet.map_url(), - "heading": None, - "gps": "fix"} + vals = { + "timestamp": time.time(), + "position": packet.position(), + "altitude": packet.altitude(), + "gps_time": packet.get_time().timestamp(), + "map_url": packet.map_url(), + "heading": None, + "gps": "fix", + } except (BrokenPipeError, gpsd.NoFixError, AttributeError) as err: logging.error("could not update with GPS: %s", err) vals = { @@ -36,10 +33,11 @@ def get_adafruit_gps(): } return vals + def get_pixhawk_gps(): try: external_gps_msg = json.loads(httpx.get(f"http://127.0.0.1:8888/gps-data").text) - + vals = { "timestamp": time.time(), "position": ( @@ -66,17 +64,21 @@ def get_pixhawk_gps(): } return vals + def write_to_csv(filename, data): - with open(filename, 'a', newline='') as csvfile: + with open(filename, "a", newline="") as csvfile: fieldnames = data.keys() + import csv + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if csvfile.tell() == 0: writer.writeheader() writer.writerow(data) + # Define the file names for the CSV files -adafruit_csv_filename = 'adafruit_gps_data.csv' -pixhawk_csv_filename = 'pixhawk_gps_data.csv' +adafruit_csv_filename = "adafruit_gps_data.csv" +pixhawk_csv_filename = "pixhawk_gps_data.csv" while True: # Get data from Adafruit GPS @@ -92,4 +94,4 @@ def write_to_csv(filename, data): print("Pixhawk GPS data written to CSV") # Wait for one minute before the next iteration - time.sleep(60) \ No newline at end of file + time.sleep(60) diff --git a/utils/mavlink-api/utils/mavlink_serial_test.py b/utils/mavlink-api/utils/mavlink_serial_test.py index 01514873..9aae710d 100644 --- a/utils/mavlink-api/utils/mavlink_serial_test.py +++ b/utils/mavlink-api/utils/mavlink_serial_test.py @@ -1,7 +1,7 @@ #! /usr/bin/env python3 import sys -from pymavlink import mavutil +from pymavlink import mavutil # pytype: disable=import-error if len(sys.argv) < 2: print("Usage: python mavlink_serial_test.py ")