diff --git a/.vscode/settings.json b/.vscode/settings.json index 9ea6d62..fb23cfd 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,5 @@ "python3.8InterpreterPath": "/Library/Frameworks/Python.framework/Versions/3.8/bin/python3.8", "modulename": "${workspaceFolderBasename}", "distname": "${workspaceFolderBasename}", - "moduleversion": "1.0.32" + "moduleversion": "1.1.0" } \ No newline at end of file diff --git a/README.md b/README.md index b46a656..893f6a2 100644 --- a/README.md +++ b/README.md @@ -291,8 +291,6 @@ The `GNSSNTRIPClient` class provides a basic NTRIP Client capability and forms t ### CLI Usage: -**NB:** see [KNOWN ISSUES](#chunkingissue) for an issue affecting some casters in NTRIP 2.0 mode (e.g. euref-ip.net, igs-ip.net). Use NTRIP 1.0 (`--ntripversion 1.0`) instead with these casters. - Assuming the Python 3 scripts (bin) directory is in your PATH, the CLI utility may be invoked from the shell thus: To retrieve the sourcetable and determine the closest available mountpoint to the reference lat/lon, leave the mountpoint argument blank (the port defaults to 2101): @@ -527,12 +525,9 @@ For help and full list of optional arguments, type: ubxcompare -h ``` - --- ## Troubleshooting -1. **KNOWN ISSUE**: `gnssntripclient` doesn't currently support `Transfer-Encoding: chunked` in NTRIP 2.0 mode, as used by some casters (e.g. euref-ip.net, igs-ip.net). This issue manifests itself as missing RTCM data and an inability to achieve an RTK fix. A fix is in hand, but as a temporary workaround, use NTRIP 1.0 instead (`--ntripversion 1.0`), as this doesn't implement chunked encoding. - 1. `SPARTNTypeError` or `SPARTNParseError` when parsing encrypted messages with 16-bit gnssTimetags (`timeTagtype=0`), e.g. GAD or some OCB messages: ``` diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5130e91..2a63eff 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,13 @@ # pygnssutils Release Notes +### RELEASE 1.1.0 + +ENHANCEMENTS: + +1. gnssntripclient now supports chunked transfer-encoded NTRIP datastreams. +1. gnssntripclient improved handling of NTRIP 1.0 casters. +1. gnssserver now supports NTRIP version 1.0 or 2.0 in NTRIP mode via arg `--ntripversion`. + ### RELEASE 1.0.32 ENHANCEMENTS: diff --git a/docs/pygnssutils.rst b/docs/pygnssutils.rst index a491dc6..601a154 100644 --- a/docs/pygnssutils.rst +++ b/docs/pygnssutils.rst @@ -108,6 +108,14 @@ pygnssutils.socket\_server module :undoc-members: :show-inheritance: +pygnssutils.socketwrapper module +-------------------------------- + +.. automodule:: pygnssutils.socketwrapper + :members: + :undoc-members: + :show-inheritance: + pygnssutils.ubxcompare module ----------------------------- diff --git a/pyproject.toml b/pyproject.toml index b8434a8..8e14cc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pygnssutils" authors = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] maintainers = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] description = "GNSS Command Line Utilities" -version = "1.0.32" +version = "1.1.0" license = { file = "LICENSE" } readme = "README.md" requires-python = ">=3.8" @@ -88,12 +88,13 @@ jobs = 0 reports = "y" recursive = "y" py-version = "3.8" -fail-under = "9.7" +fail-under = "9.8" fail-on = "E,F" clear-cache-post-run = "y" disable = """ bad-inline-option, deprecated-pragma, + duplicate-code, file-ignored, locally-disabled, logging-fstring-interpolation, diff --git a/src/pygnssutils/__init__.py b/src/pygnssutils/__init__.py index 109df1a..5886f98 100644 --- a/src/pygnssutils/__init__.py +++ b/src/pygnssutils/__init__.py @@ -15,6 +15,7 @@ from pygnssutils.gnssstreamer import GNSSStreamer from pygnssutils.helpers import * from pygnssutils.mqttmessage import * +from pygnssutils.socketwrapper import SocketWrapper from pygnssutils.ubxload import UBXLoader from pygnssutils.ubxsave import UBXSaver from pygnssutils.ubxsetrate import UBXSetRate diff --git a/src/pygnssutils/_version.py b/src/pygnssutils/_version.py index a2f6afe..2267698 100644 --- a/src/pygnssutils/_version.py +++ b/src/pygnssutils/_version.py @@ -8,4 +8,4 @@ :license: BSD 3-Clause """ -__version__ = "1.0.32" +__version__ = "1.1.0" diff --git a/src/pygnssutils/globals.py b/src/pygnssutils/globals.py index d9f89a1..2ecc8eb 100644 --- a/src/pygnssutils/globals.py +++ b/src/pygnssutils/globals.py @@ -15,6 +15,11 @@ EARTH_RADIUS = 6371 # km DEFAULT_BUFSIZE = 4096 # buffer size for NTRIP client MAXPORT = 65535 # max valid TCP port +ENCODE_NONE = 0 +ENCODE_CHUNKED = 1 +ENCODE_GZIP = 2 +ENCODE_COMPRESS = 4 +ENCODE_DEFLATE = 8 FORMAT_PARSED = 1 FORMAT_BINARY = 2 FORMAT_HEX = 4 diff --git a/src/pygnssutils/gnssdump_cli.py b/src/pygnssutils/gnssdump_cli.py index 76bacc7..8dfe103 100644 --- a/src/pygnssutils/gnssdump_cli.py +++ b/src/pygnssutils/gnssdump_cli.py @@ -19,6 +19,11 @@ from pygnssutils._version import __version__ as VERSION from pygnssutils.globals import ( CLIAPP, + ENCODE_CHUNKED, + ENCODE_COMPRESS, + ENCODE_DEFLATE, + ENCODE_GZIP, + ENCODE_NONE, EPILOG, FORMAT_BINARY, FORMAT_HEX, @@ -130,6 +135,18 @@ def main(): choices=[0, 1], default=1, ) + ap.add_argument( + "--encoding", + required=False, + help=( + f"Socket stream encoding {ENCODE_NONE} = none, " + f"{ENCODE_CHUNKED} = chunked, {ENCODE_GZIP} = gzip, " + f"{ENCODE_COMPRESS} = compress, {ENCODE_DEFLATE} = deflate. " + f"Options can be OR'd e.g. {ENCODE_CHUNKED} | {ENCODE_GZIP}." + ), + type=int, + default=ENCODE_NONE, + ) ap.add_argument( "--quitonerror", required=False, diff --git a/src/pygnssutils/gnssntripclient.py b/src/pygnssutils/gnssntripclient.py index 408a1f6..472985d 100644 --- a/src/pygnssutils/gnssntripclient.py +++ b/src/pygnssutils/gnssntripclient.py @@ -1,798 +1,932 @@ -""" -gnssntripclient.py - -NTRIP client class, retrieving sourcetable and RTCM3 or SPARTN -correction data from an NTRIP server and (optionally) sending -the correction data to a designated writeable output medium -(serial, file, socket, queue). - -Can also transmit client position back to NTRIP server at specified -intervals via formatted NMEA GGA sentences. - -Calling app, if defined, can implement the following methods: -- set_event() - create <> event -- dialog() - return reference to NTRIP config client dialog -- get_coordinates() - return coordinates from receiver - -NB: This utility is used by PyGPSClient - do not change footprint of -any public methods without first checking impact on PyGPSClient - -https://github.com/semuconsulting/PyGPSClient. - -Created on 03 Jun 2022 - -:author: semuadmin -:copyright: SEMU Consulting © 2022 -:license: BSD 3-Clause -""" - -# pylint: disable=invalid-name - -import socket -import ssl -from base64 import b64encode -from datetime import datetime, timedelta, timezone -from io import BufferedWriter, TextIOWrapper -from logging import getLogger -from os import getenv -from queue import Queue -from threading import Event, Thread -from time import sleep - -from certifi import where as findcacerts -from pynmeagps import GET, NMEAMessage -from pyrtcm import RTCMMessageError, RTCMParseError, RTCMTypeError -from pyspartn import SPARTNMessageError, SPARTNParseError, SPARTNReader, SPARTNTypeError -from pyubx2 import ERR_IGNORE, RTCM3_PROTOCOL, UBXReader -from serial import Serial - -from pygnssutils._version import __version__ as VERSION -from pygnssutils.exceptions import ParameterError -from pygnssutils.globals import ( - CLIAPP, - DEFAULT_BUFSIZE, - FIXES, - HTTPERR, - MAXPORT, - NOGGA, - NTRIP_EVENT, - OUTPORT_NTRIP, -) -from pygnssutils.helpers import find_mp_distance, format_conn, ipprot2int - -TIMEOUT = 10 -GGALIVE = 0 -GGAFIXED = 1 -DLGTNTRIP = "NTRIP Configuration" -RTCM = "RTCM" -SPARTN = "SPARTN" -MAX_RETRY = 5 -RETRY_INTERVAL = 10 -INACTIVITY_TIMEOUT = 10 -WAITTIME = 3 - - -class GNSSNTRIPClient: - """ - NTRIP client class. - """ - - def __init__(self, app=None, **kwargs): - """ - Constructor. - - :param object app: application from which this class is invoked (None) - :param int retries: (kwarg) maximum failed connection retries (5) - :param int retryinterval: (kwarg) retry interval in seconds (10) - :param int timeout: (kwarg) inactivity timeout in seconds (10) - """ - - # pylint: disable=consider-using-with - - self.__app = app # Reference to calling application class (if applicable) - # configure logger with name "pygnssutils" in calling module - self.logger = getLogger(__name__) - self._validargs = True - self._ntripqueue = Queue() - # persist settings to allow any calling app to retrieve them - self._settings = { - "ipprot": socket.AF_INET, - "server": "", - "port": 2101, - "https": 0, - "flowinfo": 0, - "scopeid": 0, - "mountpoint": "", - "distance": "", - "version": "2.0", - "datatype": RTCM, - "ntripuser": "anon", - "ntrippassword": "password", - "ggainterval": "None", - "ggamode": GGALIVE, - "sourcetable": [], - "reflat": 0.0, - "reflon": 0.0, - "refalt": 0.0, - "refsep": 0.0, - "spartndecode": 0, - "spartnkey": getenv("MQTTKEY", default=None), - "spartnbasedate": datetime.now(timezone.utc), - } - - try: - self._retries = int(kwargs.pop("retries", MAX_RETRY)) - self._retryinterval = int(kwargs.pop("retryinterval", RETRY_INTERVAL)) - self._timeout = int(kwargs.pop("timeout", INACTIVITY_TIMEOUT)) - except (ParameterError, ValueError, TypeError) as err: - self.logger.critical( - f"Invalid input arguments {kwargs=}\n{err=}\nType gnssntripclient -h for help.", - ) - self._validargs = False - - self._socket = None - self._connected = False - self._stopevent = Event() - self._ntrip_thread = None - self._last_gga = datetime.fromordinal(1) - self._retrycount = 0 - - def __enter__(self): - """ - Context manager enter routine. - """ - - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - """ - Context manager exit routine. - - Terminates threads in an orderly fashion. - """ - - self.stop() - - @property - def settings(self): - """ - Getter for NTRIP settings. - """ - - return self._settings - - @settings.setter - def settings(self, settings: dict): - """ - Setter for NTRIP settings. - - :param dict settings: NTRIP settings dictionary - """ - - self._settings = settings - - @property - def connected(self): - """ - Connection status getter. - """ - - return self._connected - - def run(self, **kwargs) -> bool: - """ - Open NTRIP server connection. - - If calling application implements a "get_coordinates" method to - obtain live coordinates (i.e. from GNSS receiver), the method will - use these instead of fixed reference coordinates. - - User login credentials can be obtained from environment variables - PYGPSCLIENT_USER and PYGPSCLIENT_PASSWORD, or passed as kwargs. - - :param str ipprot: (kwarg) IP protocol IPv4/IPv6 ("IPv4") - :param str server: (kwarg) NTRIP server URL ("") - :param int port: (kwarg) NTRIP port (2101) - :param int https: (kwarg) HTTPS (TLS) connection? 0 = HTTP 1 = HTTPS (0) - :param int flowinfo: (kwarg) flowinfo for IPv6 (0) - :param int scopeid: (kwarg) scopeid for IPv6 (0) - :param str mountpoint: (kwarg) NTRIP mountpoint ("", leave blank to get sourcetable) - :param str datatype: (kwarg) Data type - RTCM or SPARTN ("RTCM") - :param str version: (kwarg) NTRIP protocol version ("2.0") - :param str ntripuser: (kwarg) NTRIP authentication user ("anon") - :param str ntrippassword: (kwarg) NTRIP authentication password ("password") - :param int ggainterval: (kwarg) GGA sentence transmission interval (-1 = None) - :param int ggamode: (kwarg) GGA pos source; 0 = live from receiver, 1 = fixed reference (0) - :param str reflat: (kwarg) reference latitude (0.0) - :param str reflon: (kwarg) reference longitude (0.0) - :param str refalt: (kwarg) reference altitude (0.0) - :param str refsep: (kwarg) reference separation (0.0) - :param bool spartndecode: (kwarg) decode SPARTN messages (0) - :param str spartnkey: (kwarg) SPARTN decryption key (None) - :param object datetime: (kwarg) SPARTN decryption basedate (now(utc)) - :param object output: (kwarg) writeable output medium (serial, file, socket, queue) (None) - :returns: boolean flag 0 = terminated, 1 = Ok to stream RTCM3 data from server - :rtype: bool - """ - # pylint: disable=unused-variable - - try: - self._last_gga = datetime.fromordinal(1) - - ipprot = kwargs.get("ipprot", "IPv4") - self._settings["ipprot"] = ipprot2int(ipprot) - self._settings["server"] = server = kwargs.get("server", "") - self._settings["port"] = port = int(kwargs.get("port", OUTPORT_NTRIP)) - self._settings["https"] = int(kwargs.get("https", 0)) - self._settings["flowinfo"] = int(kwargs.get("flowinfo", 0)) - self._settings["scopeid"] = int(kwargs.get("scopeid", 0)) - self._settings["mountpoint"] = mountpoint = kwargs.get("mountpoint", "") - self._settings["datatype"] = kwargs.get("datatype", RTCM).upper() - self._settings["version"] = kwargs.get("version", "2.0") - self._settings["ntripuser"] = kwargs.get( - "ntripuser", getenv("PYGPSCLIENT_USER", "user") - ) - self._settings["ntrippassword"] = kwargs.get( - "ntrippassword", getenv("PYGPSCLIENT_PASSWORD", "password") - ) - self._settings["ggainterval"] = int(kwargs.get("ggainterval", NOGGA)) - self._settings["ggamode"] = int(kwargs.get("ggamode", GGALIVE)) - self._settings["reflat"] = kwargs.get("reflat", 0.0) - self._settings["reflon"] = kwargs.get("reflon", 0.0) - self._settings["refalt"] = kwargs.get("refalt", 0.0) - self._settings["refsep"] = kwargs.get("refsep", 0.0) - self._settings["spartndecode"] = kwargs.get("spartndecode", 0) - self._settings["spartnkey"] = kwargs.get( - "spartnkey", getenv("MQTTKEY", None) - ) - self._settings["spartnbasedate"] = kwargs.get( - "spartbasedate", datetime.now(timezone.utc) - ) - output = kwargs.get("output", None) - - if server == "": - raise ParameterError(f"Invalid server url {server}") - if port > MAXPORT or port < 1: - raise ParameterError(f"Invalid port {port}") - - except (ParameterError, ValueError, TypeError) as err: - self.logger.critical( - f"Invalid input arguments {kwargs}\n{err}\nType gnssntripclient -h for help." - ) - self._validargs = False - - if self._validargs: - self._connected = True - self._start_read_thread( - self._settings, - self._stopevent, - output, - ) - if mountpoint != "": - return 1 - return 0 - - def stop(self): - """ - Close NTRIP server connection. - """ - - self._stop_read_thread() - self._connected = False - - def _app_update_status(self, status: bool, msgt: tuple = None): - """ - THREADED - Update NTRIP connection status in calling application. - - :param bool status: NTRIP server connection status - :param tuple msgt: optional (message, color) - """ - - if self.__app is not None: - if hasattr(self.__app, "dialog"): - dlg = self.__app.dialog(DLGTNTRIP) - if dlg is not None: - if hasattr(dlg, "set_controls"): - dlg.set_controls(status, msgt) - - def _app_get_coordinates(self) -> tuple: - """ - THREADED - Get live coordinates from receiver, or use fixed - reference position, depending on ggamode setting. - - NB: 'fix' is a string e.g. "3D" or "RTK FLOAT" - - :returns: tuple of coordinate and fix data - :rtype: tuple - """ - - lat = lon = alt = sep = 0.0 - fix, sip, hdop, diffage, diffstation = ("3D", 15, 0.98, 0, 0) - if self._settings["ggamode"] == GGAFIXED: # fixed reference position - lat = self._settings["reflat"] - lon = self._settings["reflon"] - alt = self._settings["refalt"] - sep = self._settings["refsep"] - elif self.__app is not None: - if hasattr(self.__app, "get_coordinates"): # live position from receiver - coords = self.__app.get_coordinates() - if isinstance(coords, tuple): # old version (PyGPSClient <=1.4.19) - _, lat, lon, alt, sep = coords - else: # new version uses dict (PyGPSClient >=1.4.20) - lat = coords.get("lat", lat) - lon = coords.get("lon", lon) - alt = coords.get("alt", alt) - sep = coords.get("sep", sep) - sip = coords.get("sip", sip) - fix = coords.get("fix", fix) - hdop = coords.get("hdop", hdop) - diffage = coords.get("diffage", diffage) - diffstation = coords.get("diffstation", diffstation) - - lat, lon, alt, sep = [ - 0.0 if c == "" else float(c) for c in (lat, lon, alt, sep) - ] - - return lat, lon, alt, sep, fix, sip, hdop, diffage, diffstation - - def _formatGET(self, settings: dict) -> str: - """ - THREADED - Format HTTP GET Request. - - :param dict settings: settings dictionary - :return: formatted HTTP GET request - :rtype: str - """ - - ggahdr = "" - if settings["version"] == "2.0": - hver = "1.1" - nver = "Ntrip-Version: Ntrip/2.0\r\n" - if settings["ggainterval"] != NOGGA: - gga, _ = self._formatGGA() - ggahdr = f"Ntrip-GGA: {gga.decode('utf-8')}" # includes \r\n - else: - hver = "1.0" - nver = "" - - mountpoint = "/" + settings["mountpoint"] - user = settings["ntripuser"] + ":" + settings["ntrippassword"] - user = b64encode(user.encode(encoding="utf-8")) - req = ( - f"GET {mountpoint} HTTP/{hver}\r\n" - f"Host: {settings['server']}:{settings['port']}\r\n" - f"{nver}" - f"User-Agent: NTRIP pygnssutils/{VERSION}\r\n" - "Accept: */*\r\n" - f"Authorization: Basic {user.decode(encoding='utf-8')}\r\n" - f"{ggahdr}" - "Connection: close\r\n\r\n" # NECESSARY!!! - ) - self.logger.debug(f"HTTP Header\n{req}") - return req.encode(encoding="utf-8") - - def _formatGGA(self) -> tuple: - """ - THREADED - Format NMEA GGA sentence using pynmeagps. The raw string - output is suitable for sending to an NTRIP socket. - GGA timestamp will default to current UTC. GGA quality is - derived from fix string. - - :return: tuple of (raw NMEA message as bytes, NMEAMessage) - :rtype: tuple - :rtype: tuple - """ - - try: - lat, lon, alt, sep, fixs, sip, hdop, diffage, diffstation = ( - self._app_get_coordinates() - ) - lat = float(lat) - lon = float(lon) - fixi = FIXES.get(fixs, 1) - parsed_data = NMEAMessage( - "GP", - "GGA", - GET, - lat=lat, - lon=lon, - quality=fixi, - numSV=sip, - HDOP=hdop, - alt=alt, - altUnit="M", - sep=sep, - sepUnit="M", - diffAge=diffage, - diffStation=diffstation, - ) - - raw_data = parsed_data.serialize() - return raw_data, parsed_data - except ValueError: - return None, None - - def _send_GGA(self, ggainterval: int, output: object): - """ - THREADED - Send NMEA GGA sentence to NTRIP server at prescribed interval. - - :param int ggainterval: GGA send interval in seconds (-1 = don't send) - :param object output: writeable output medium e.g. serial port - """ - - if ggainterval != NOGGA: - if datetime.now() > self._last_gga + timedelta(seconds=ggainterval): - raw_data, parsed_data = self._formatGGA() - if parsed_data is not None: - self._socket.sendall(raw_data) - self._do_output(output, raw_data, parsed_data) - self._last_gga = datetime.now() - - def _get_closest_mountpoint(self) -> tuple: - """ - THREADED - Find closest mountpoint in sourcetable - if valid reference lat/lon are available. - - :return: tuple of (mountpoint, distance) - :rtype: tuple - """ - - try: - lat, lon, _, _, _, _, _, _, _ = self._app_get_coordinates() - closest_mp, dist = find_mp_distance( - float(lat), float(lon), self._settings["sourcetable"] - ) - if self._settings["mountpoint"] == "": - self._settings["mountpoint"] = closest_mp - self.logger.info( - "Closest mountpoint to reference location" - f"({lat}, {lon}) = {closest_mp}, {dist} km." - ) - - except ValueError: - return None, None - return closest_mp, dist - - def _start_read_thread( - self, - settings: dict, - stopevent: Event, - output: object, - ): - """ - Start the NTRIP reader thread. - """ - - if self._connected: - self._stopevent.clear() - self._ntrip_thread = Thread( - target=self._read_thread, - args=( - settings, - stopevent, - output, - ), - daemon=True, - ) - self._ntrip_thread.start() - - def _stop_read_thread(self): - """ - Stop NTRIP reader thread. - """ - - if self._ntrip_thread is not None: - self._stopevent.set() - self._ntrip_thread = None - - self.logger.info("Streaming terminated.") - - def _read_thread( - self, - settings: dict, - stopevent: Event, - output: object, - ): - """ - THREADED - Try connecting to NTRIP caster. - - :param dict settings: settings as dictionary - :param Event stopevent: stop event - :param object output: output stream for raw data - """ - - self._retrycount = 0 - server = settings["server"] - port = int(settings["port"]) - mountpoint = settings["mountpoint"] - - while self._retrycount <= self._retries and not stopevent.is_set(): - - try: - - self._do_connection(settings, stopevent, output) - - except ssl.SSLCertVerificationError as err: - tip = ( - f" - try using '{server[4:]}' rather than '{server}' for the NTRIP caster URL" - if "certificate is not valid for 'www." in err.strerror - else ( - f" - try adding the NTRIP caster URL SSL certificate to {findcacerts()}" - if "unable to get local issuer certificate" in err.strerror - else "" - ) - ) - self.logger.error(f"SSL Certificate Verification Error{tip}\n{err}") - self._retrycount = self._retries - stopevent.set() - self._connected = False - self._app_update_status(False, (f"Error!: {err.strerror[0:60]}", "red")) - - except ( - BrokenPipeError, - ConnectionAbortedError, - ConnectionRefusedError, - ConnectionResetError, - OverflowError, - socket.gaierror, - ssl.SSLError, - TimeoutError, - ) as err: - errm = str(repr(err)) - erra = f"Connection Error {errm.split('(', 1)[0]}" - errl = f"Error connecting to {server}:{port}/{mountpoint}: {errm}" - if self._retrycount == self._retries: - stopevent.set() - self._connected = False - self.logger.critical(errl) - break - self._retrycount += 1 - errr = ( - f". Retrying in {self._retryinterval * self._retrycount} secs " - f"({self._retrycount}/{self._retries}) ..." - ) - erra += errr - errl += errr - self.logger.warning(errl) - self._app_update_status(False, (erra, "red")) - - sleep(self._retryinterval * self._retrycount) - - def _do_connection( - self, - settings: dict, - stopevent: Event, - output: object, - ): - """ - THREADED - Opens socket to NTRIP server and reads incoming data. - - :param dict settings: settings as dictionary - :param Event stopevent: stop event - :param object output: output stream for raw data - :raises: Various socket error types if connection fails - """ - - server = settings["server"] - port = int(settings["port"]) - https = int(settings["https"]) - flowinfo = int(settings["flowinfo"]) - scopeid = int(settings["scopeid"]) - mountpoint = settings["mountpoint"] - ggainterval = int(settings["ggainterval"]) - datatype = settings["datatype"] - - conn = format_conn(settings["ipprot"], server, port, flowinfo, scopeid) - with socket.socket(settings["ipprot"], socket.SOCK_STREAM) as self._socket: - if https: - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - context.load_verify_locations(findcacerts()) - self._socket = context.wrap_socket(self._socket, server_hostname=server) - self._socket.settimeout(TIMEOUT) - self._socket.connect(conn) - self._socket.sendall(self._formatGET(settings)) - # send GGA sentence with request - # if mountpoint != "": - # self._send_GGA(ggainterval, output) - while not stopevent.is_set(): - rc = self._do_header(self._socket, stopevent, output) - if rc == "0": # streaming RTCM3/SPARTN data from mountpoint - self._retrycount = 0 - msg = f"Streaming {datatype} data from {server}:{port}/{mountpoint} ..." - self.logger.info(msg) - self._app_update_status(True, (msg, "blue")) - self._do_data( - self._socket, - datatype, - stopevent, - ggainterval, - output, - ) - elif rc == "1": # retrieved sourcetable - stopevent.set() - self._connected = False - self._app_update_status(False, ("Sourcetable retrieved", "blue")) - else: # error message - self.logger.critical( - f"Error connecting to {server}:{port}/{mountpoint=}: {rc}" - ) - stopevent.set() - self._connected = False - self._app_update_status(False, (f"Error!: {rc}", "red")) - - def _do_header(self, sock: socket, stopevent: Event, output: object) -> str: - """ - THREADED - Parse response header lines. - - :param socket sock: socket - :param Event stopevent: stop event - :return: return status or error message - :rtype: str - """ - - stable = [] - data = True - - while data and not stopevent.is_set(): - try: - data = sock.recv(DEFAULT_BUFSIZE) - header_lines = data.decode(encoding="utf-8").split("\r\n") - for line in header_lines: - # if sourcetable request, populate list - if True in [line.find(cd) > 0 for cd in HTTPERR]: # HTTP 4nn, 50n - return line - if line.find("STR;") >= 0: # sourcetable entry - strbits = line.split(";") - if strbits[0] == "STR": - strbits.pop(0) - stable.append(strbits) - elif line.find("ENDSOURCETABLE") >= 0: # end of sourcetable - self._settings["sourcetable"] = stable - mp, dist = self._get_closest_mountpoint() - self._do_output(output, stable, (mp, dist)) - self.logger.info(f"Complete sourcetable follows...\n{stable}") - return "1" - - except UnicodeDecodeError: - data = False - - return "0" - - def _do_data( - self, - sock: socket, - datatype: str, - stopevent: Event, - ggainterval: int, - output: object, - ): - """ - THREADED - Read and parse incoming NTRIP RTCM3/SPARTN data stream. - - :param socket sock: socket - :param str datatype: RTCM or SPARTN - :param Event stopevent: stop event - :param int ggainterval: GGA transmission interval seconds - :param object output: output stream for raw data - :raises: TimeoutError if inactivity timeout exceeded - """ - - parser = None - raw_data = None - parsed_data = None - last_activity = datetime.now() - - # parser will wrap socket as SocketStream - if datatype == SPARTN: - parser = SPARTNReader( - sock, - quitonerror=ERR_IGNORE, - bufsize=DEFAULT_BUFSIZE, - decode=self._settings["spartndecode"], - key=self._settings["spartnkey"], - basedate=self._settings["spartnbasedate"], - ) - else: - parser = UBXReader( - sock, - protfilter=RTCM3_PROTOCOL, - quitonerror=ERR_IGNORE, - bufsize=DEFAULT_BUFSIZE, - labelmsm=True, - ) - - while not stopevent.is_set(): - try: - raw_data, parsed_data = parser.read() - if raw_data is None: - if datetime.now() - last_activity > timedelta( - seconds=self._timeout - ): - raise TimeoutError( - f"Inactivity timeout error after {self._timeout} seconds" - ) - else: - self._do_output(output, raw_data, parsed_data) - last_activity = datetime.now() - self._send_GGA(ggainterval, output) - - except ( - RTCMMessageError, - RTCMParseError, - RTCMTypeError, - SPARTNMessageError, - SPARTNParseError, - SPARTNTypeError, - ) as err: - parsed_data = f"Error parsing data stream {err}" - self._do_output(output, raw_data, parsed_data) - continue - - def _do_output(self, output: object, raw: bytes, parsed: object): - """ - THREADED - Send sourcetable/closest mountpoint or RTCM3/SPARTN data to designated output medium. - - If output is Queue, will send both raw and parsed data. - - :param object output: writeable output medium for raw data - :param bytes raw: raw data - :param object parsed: parsed message - """ - - if hasattr(parsed, "identity"): - self.logger.info(f"{type(parsed).__name__} received: {parsed.identity}") - self.logger.debug(parsed) - if output is not None: - # serialize sourcetable if outputting to stream - if isinstance(raw, list) and not isinstance(output, Queue): - raw = self._serialize_srt(raw) - if isinstance(output, (Serial, BufferedWriter)): - output.write(raw) - elif isinstance(output, TextIOWrapper): - output.write(str(parsed)) - elif isinstance(output, Queue): - output.put(raw if self.__app == CLIAPP else (raw, parsed)) - elif isinstance(output, socket.socket): - output.sendall(raw) - - # notify any calling app that data is available - if self.__app is not None: - if hasattr(self.__app, "set_event"): - self.__app.set_event(NTRIP_EVENT) - - def _serialize_srt(self, sourcetable: list) -> bytes: - """ - Serialize sourcetable. - - :param list sourcetable: sourcetable as list - :return: sourcetable as bytes - :rtype: bytes - """ - - srt = "" - for row in sourcetable: - for i, col in enumerate(row): - dlm = "," if i < len(row) - 1 else "\r\n" - srt += f"{col}{dlm}" - return bytearray(srt, "utf-8") - - @property - def stopevent(self) -> Event: - """ - Getter for stop event. - - :return: stop event - :rtype: Event - """ - - return self._stopevent +""" +gnssntripclient.py + +NTRIP client class; essentially an HTTP client capable of +retrieving sourcetable and RTCM3 or SPARTN +correction data from an NTRIP server and (optionally) sending +the correction data to a designated writeable output medium +(serial, file, socket, queue). + +Can also transmit client position back to NTRIP server at specified +intervals via formatted NMEA GGA sentences. + +Calling app, if defined, can implement the following methods: +- set_event() - create <> event +- dialog() - return reference to NTRIP config client dialog +- get_coordinates() - return coordinates from receiver + +NB: This utility is used by PyGPSClient - do not change footprint of +any public methods without first checking impact on PyGPSClient - +https://github.com/semuconsulting/PyGPSClient. + +Created on 03 Jun 2022 + +:author: semuadmin +:copyright: SEMU Consulting © 2022 +:license: BSD 3-Clause +""" + +import socket +import ssl +from base64 import b64encode +from datetime import datetime, timedelta, timezone +from io import BufferedWriter, TextIOWrapper +from logging import getLogger +from os import getenv +from queue import Queue +from threading import Event, Thread +from time import sleep + +from certifi import where as findcacerts +from pynmeagps import GET, NMEAMessage +from pyrtcm import RTCMMessageError, RTCMParseError, RTCMTypeError +from pyspartn import SPARTNMessageError, SPARTNParseError, SPARTNReader, SPARTNTypeError +from pyubx2 import ERR_LOG, RTCM3_PROTOCOL, UBXReader +from serial import Serial + +from pygnssutils._version import __version__ as VERSION +from pygnssutils.exceptions import ParameterError +from pygnssutils.globals import ( + CLIAPP, + DEFAULT_BUFSIZE, + ENCODE_CHUNKED, + ENCODE_COMPRESS, + ENCODE_DEFLATE, + ENCODE_GZIP, + ENCODE_NONE, + FIXES, + MAXPORT, + NOGGA, + NTRIP_EVENT, + OUTPORT_NTRIP, +) +from pygnssutils.helpers import find_mp_distance, ipprot2int +from pygnssutils.socketwrapper import SocketWrapper + +TIMEOUT = 3 +GGALIVE = 0 +GGAFIXED = 1 +DLGTNTRIP = "NTRIP Configuration" +RTCM = "rtcm" +SPARTN = "spartn" +MAX_RETRY = 5 +RETRY_INTERVAL = 5 +INACTIVITY_TIMEOUT = 10 +WAITTIME = 3 + + +class GNSSNTRIPClient: + """ + NTRIP client class. + """ + + def __init__( + self, + app=None, + **kwargs, + ): + """ + Constructor. + + :param object app: application from which this class is invoked (None) + :param int retries: (kwarg) maximum failed connection retries (5) + :param int retryinterval: (kwarg) retry interval in seconds (10) + :param int timeout: (kwarg) inactivity timeout in seconds (10) + """ + + self.__app = app # Reference to calling application class (if applicable) + # configure logger with name "pygnssutils" in calling module + self.logger = getLogger(__name__) + self._ntripqueue = Queue() + # initialise and persist settings to allow any calling app to retrieve them + self._settings = {} + self.settings = self._settings + + try: + self._retries = int(kwargs.pop("retries", MAX_RETRY)) + self._retryinterval = int(kwargs.pop("retryinterval", RETRY_INTERVAL)) + self._timeout = int(kwargs.pop("timeout", INACTIVITY_TIMEOUT)) + except (ParameterError, ValueError, TypeError) as err: + msg = f"Invalid input arguments {err}" + self._app_update_status(False, (str(err), "red")) + raise ParameterError(msg + "\nType gnssntripclient -h for help.") from err + + self._socket = None + self._connected = False + self._stopevent = Event() + self._ntrip_thread = None + self._last_gga = datetime.fromordinal(1) + self._retrycount = 0 + self._ntrip_version = "2.0" + self._response_headers = {} + self._response_status = {} + self._response_body = None + self._output = None + + def __enter__(self): + """ + Context manager enter routine. + """ + + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + """ + Context manager exit routine. + + Terminates threads in an orderly fashion. + """ + + self.stop() + + def run(self, **kwargs) -> bool: + """ + Open NTRIP client connection. + + If calling application implements a "get_coordinates" method to + obtain live coordinates (i.e. from GNSS receiver), the method will + use these instead of fixed reference coordinates. + + User login credentials can be obtained from environment variables + PYGPSCLIENT_USER and PYGPSCLIENT_PASSWORD, or passed as kwargs. + + :param str ipprot: (kwarg) IP protocol IPv4/IPv6 ("IPv4") + :param str server: (kwarg) NTRIP server URL ("") + :param int port: (kwarg) NTRIP port (2101) + :param int https: (kwarg) HTTPS (TLS) connection? 0 = HTTP 1 = HTTPS (0) + :param int flowinfo: (kwarg) flowinfo for IPv6 (0) + :param int scopeid: (kwarg) scopeid for IPv6 (0) + :param str mountpoint: (kwarg) NTRIP mountpoint ("", leave blank to get sourcetable) + :param str datatype: (kwarg) Data type - RTCM or SPARTN ("RTCM") + :param str version: (kwarg) NTRIP protocol version ("2.0") + :param str ntripuser: (kwarg) NTRIP authentication user ("anon") + :param str ntrippassword: (kwarg) NTRIP authentication password ("password") + :param int ggainterval: (kwarg) GGA sentence transmission interval (-1 = None) + :param int ggamode: (kwarg) GGA pos source; 0 = live from receiver, 1 = fixed reference (0) + :param str reflat: (kwarg) reference latitude (0.0) + :param str reflon: (kwarg) reference longitude (0.0) + :param str refalt: (kwarg) reference altitude (0.0) + :param str refsep: (kwarg) reference separation (0.0) + :param bool spartndecode: (kwarg) decode SPARTN messages (0) + :param str spartnkey: (kwarg) SPARTN decryption key (None) + :param object datetime: (kwarg) SPARTN decryption basedate (now(utc)) + :param object output: (kwarg) writeable output medium (serial, file, socket, queue) (None) + :returns: boolean flag 0 = stream terminated, 1 = streaming data + :rtype: bool + """ + + # pylint: disable=unused-variable + + try: + self._last_gga = datetime.fromordinal(1) + self.settings = kwargs + self._output = kwargs.get("output", None) + + if self._settings["server"] == "": + raise ParameterError(f"Invalid server URL {self._settings['server']}") + if not 1 < self._settings["port"] < MAXPORT: + raise ParameterError(f"Invalid port {self._settings['port']}") + + except (ParameterError, ValueError, TypeError) as err: + msg = f"Invalid input arguments - {err}" + self._app_update_status(False, (str(err), "red")) + raise ParameterError(msg + "\nType gnssntripclient -h for help.") from err + + self._connected = True + self._start_read_thread( + self._settings, + self._stopevent, + self._output, + ) + if self.settings["mountpoint"] != "": + return 1 + return 0 + + def _start_read_thread( + self, + settings: dict, + stopevent: Event, + output: object, + ): + """ + Start the NTRIP reader thread. + """ + + if self._connected: + self._stopevent.clear() + self._ntrip_thread = Thread( + target=self._read_thread, + args=( + settings, + stopevent, + output, + ), + daemon=True, + ) + self._ntrip_thread.start() + + def _stop_read_thread(self): + """ + Stop NTRIP reader thread. + """ + + if self._ntrip_thread is not None: + self._stopevent.set() + self._ntrip_thread = None + + self._app_update_status(False, ("Disconnected", "blue")) + + def stop(self): + """ + Close NTRIP server connection. + """ + + self._stop_read_thread() + self._connected = False + + def _read_thread( + self, + settings: dict, + stopevent: Event, + output: object, + ): + """ + Try connecting to NTRIP caster. + + :param dict settings: settings as dictionary + :param Event stopevent: stop event + :param object output: output stream for raw data + """ + + self._retrycount = 0 + hostname = settings["server"] + errc = "" # critical error message + + while self._retrycount <= self._retries and not stopevent.is_set(): + + try: + + self._do_connection(settings, stopevent, output) + + except ssl.SSLCertVerificationError as err: + errc = err.strerror + if "certificate is not valid for 'www." in err.strerror: + errc += ( + f" - try using '{hostname[4:]}' rather than " + f"'{hostname}' for the NTRIP caster URL" + ) + elif "unable to get local issuer certificate" in err.strerror: + errc += f" - try adding the NTRIP caster URL SSL certificate to {findcacerts()}" + except ( + BrokenPipeError, + ConnectionAbortedError, + ConnectionRefusedError, + ConnectionResetError, + OverflowError, + socket.gaierror, + ssl.SSLError, + TimeoutError, + ) as err: + errm = str(repr(err)) + if self._retrycount == self._retries: + errc = errm # no more retries so critical error + else: + self._retrycount += 1 + errm += ( + f". Retrying in {self._retryinterval * (2**self._retrycount)} secs " + f"({self._retrycount}/{self._retries}) ..." + ) + self._app_update_status(False, (errm, "red")) + except Exception as err: # pylint: disable=broad-exception-caught + errc = str(repr(err)) + + if errc != "": # break connection on critical error + stopevent.set() + self._connected = False + self._app_update_status(False, (errc, "red")) + break + + sleep(self._retryinterval * (2**self._retrycount)) + + def _do_connection( + self, + settings: dict, + stopevent: Event, + output: object, + ): + """ + Opens socket to NTRIP server and reads incoming data. + + :param dict settings: settings as dictionary + :param Event stopevent: stop event + :param object output: output stream for raw data + :raises: Various socket error types if connection fails + """ + + hostname = settings["server"] + port = int(settings["port"]) + https = int(settings["https"]) + + # create a IPv4, IPv6 dual-stack socket for connection + ip = socket.gethostbyname(hostname) + with socket.create_connection((ip, port), self._timeout) as self._socket: + if https: + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.load_verify_locations(findcacerts()) + self._socket = context.wrap_socket( + self._socket, server_hostname=hostname + ) + + self._do_request(self._socket, settings, output) + + if not self.responseok: + stopevent.set() + self._connected = False + msg = ( + f"Connection failed {self._response_status['code']} " + f"{self._response_status['description']}" + ) + self._app_update_status(False, (msg, "red")) + elif self.is_sourcetable: + stable = self._parse_sourcetable(self.response_body) + self._settings["sourcetable"] = stable + mp, dist = self._get_closest_mountpoint() + self._do_output(output, stable, (mp, dist)) + self._app_update_status(False, ("Sourcetable retrieved", "blue")) + stopevent.set() + self._connected = False + + def _do_request(self, sock: socket, settings: dict, output: object): + """ + Send formatted HTTP(S) GET request and process response. + + :param socket sock: raw socket + :param dict settings: settings + :param object output: output stream for raw data + """ + + hostname = settings["server"] + port = int(settings["port"]) + datatype = settings["datatype"].lower() + ggainterval = settings["ggainterval"] + path = settings["mountpoint"] + + request_headers = self._set_headers(settings) + self.logger.info(f"Request headers:\n{request_headers}") + self._response_body = b"" + awaiting_response = True + + sock.sendall(request_headers.encode()) + + while True: + data = sock.recv(DEFAULT_BUFSIZE) + if len(data) == 0: + break + if awaiting_response: + data = self._parse_response_header(data) + awaiting_response = False + if ( + self.is_gnssdata + and not awaiting_response + and not self._stopevent.is_set() + ): + # stream gnss data until disconnection + msg = f"Streaming {datatype} data from {hostname}:{port}/{path} ..." + self._app_update_status(True, (msg, "blue")) + self._parse_ntrip_data( + sock, + datatype, + ggainterval, + output, + ) + if not self.is_gnssdata and not awaiting_response: + self._response_body = self._response_body + data + + def _set_headers(self, settings: dict) -> str: + """ + Construct HTTP(S) GET request headers. + + :param dict settings: settings + :return: request headers as string + :rtype: str + """ + + headers = "" + path = settings["mountpoint"] + hostname = settings["server"] + port = settings["port"] + user = settings["ntripuser"] + password = settings["ntrippassword"] + ntrip_version = settings["version"] + ggainterval = settings["ggainterval"] + if ggainterval == NOGGA: + gga = "" + else: + gga, _ = self._format_gga() + + cred = b64encode(f"{user}:{password}".encode()).decode() + headers += f"Authorization: Basic {cred}\r\n" + httpver = "1.1" + gga_as_data = "" + if ntrip_version == "2.0": + headers += "Ntrip-Version: Ntrip/2.0\r\n" + if ggainterval != NOGGA: + headers += f"Ntrip-GGA: {gga.decode()}" # includes \r\n + else: + httpver = "1.0" + if ggainterval != NOGGA: + gga_as_data = gga.decode() + + return ( + f"GET /{path} HTTP/{httpver}\r\n" + f"Host: {hostname}:{port}\r\n" + f"User-Agent: NTRIP pygnssutils/{VERSION}\r\n" + f"{headers}" + "Accept: */*\r\n" + "Connection: close\r\n" + "\r\n" + f"{gga_as_data}" + ) + + def _parse_response_header(self, data: bytes) -> bytes: + """ + Parse response header and body. + + :param bytes data: raw data from socket + :return: response body as bytes + :rtype: bytes + :raises: Exception + """ + + try: + hdrbdy = data.split(b"\r\n\r\n", 1) + if len(hdrbdy) == 1: # no body content + # some poorly implemented ICY responses only have + # a single "\r\n" between response header and body + if hdrbdy[0][:12] == b"ICY 200 OK\r\n": + hdr, bdy = hdrbdy[0][:10], hdrbdy[0][12:] + else: + hdr, bdy = hdrbdy[0], b"" + else: # has body content + hdr, bdy = hdrbdy + hdr = hdr.decode().split("\r\n") + status = hdr[0].split(" ", 3) + self._response_status = { + "protocol": status[0], + "code": int(status[1]), + "description": status[2], + } + for line in hdr: + rsp = line.split(":", 1) + if len(rsp) > 1: + self._response_headers[rsp[0].lower().strip()] = rsp[1].strip() + self.logger.info( + f"Response: {self._response_status}\n{self._response_headers}" + ) + return bdy + except Exception as err: + raise ConnectionAbortedError( + f"Unable to parse response headers - {err}" + ) from err # caught in _read_thread() + + def _parse_ntrip_data( + self, + sock: socket, + datatype: str, + ggainterval: int, + output: object, + ): + """ + Read and parse incoming NTRIP RTCM3/SPARTN data stream. + + :param socket sock: raw socket + :param str datatype: RTCM or SPARTN + :param int ggainterval: GGA transmission interval seconds + :raises: TimeoutError if inactivity timeout exceeded + """ + + parser = None + raw_data = None + parsed_data = None + last_activity = datetime.now() + stream = SocketWrapper(sock, self.encoding) + + # parser will wrap socket as SocketStream + if datatype == SPARTN: + parser = SPARTNReader( + stream, + quitonerror=ERR_LOG, + bufsize=DEFAULT_BUFSIZE, + decode=self._settings["spartndecode"], + key=self._settings["spartnkey"], + basedate=self._settings["spartnbasedate"], + ) + else: + parser = UBXReader( + stream, + protfilter=RTCM3_PROTOCOL, + quitonerror=ERR_LOG, + bufsize=DEFAULT_BUFSIZE, + labelmsm=True, + ) + + while not self._stopevent.is_set(): + try: + raw_data, parsed_data = parser.read() + if raw_data is None: + if datetime.now() - last_activity > timedelta( + seconds=self._timeout + ): + raise TimeoutError( + f"Inactivity timeout error after {self._timeout} seconds" + ) + else: + if hasattr(parsed_data, "identity"): + self.logger.info(f"Message received: {parsed_data.identity}") + self.logger.debug(parsed_data) + self._do_output(output, raw_data, parsed_data) + last_activity = datetime.now() + self._send_gga(ggainterval, output) + + except ( + RTCMMessageError, + RTCMParseError, + RTCMTypeError, + SPARTNMessageError, + SPARTNParseError, + SPARTNTypeError, + ) as err: + parsed_data = f"Error parsing data stream {err}" + self._do_output(output, raw_data, parsed_data) + continue + + def _parse_sourcetable(self, response: str) -> list: + """ + Parse raw gnss/sourcetable response into list of mountpoints. + + :param str response: response body as string + :return: sourcetable as list of mountpoints + :rtype: list + """ + + self.logger.info(f"Sourcetable:\n{response}") + sourcetable = [] + response = response.split("\r\n") + for line in response: + if line.find("STR;") >= 0: # mountpoint entry + strbits = line.split(";") + if strbits[0] == "STR": + strbits.pop(0) + sourcetable.append(strbits) + return sourcetable + + def _serialize_sourcetable(self, sourcetable: list) -> bytes: + """ + Serialize sourcetable. + + :param list sourcetable: sourcetable as list + :return: sourcetable as bytes + :rtype: bytes + """ + + srt = "" + for row in sourcetable: + for i, col in enumerate(row): + dlm = "," if i < len(row) - 1 else "\r\n" + srt += f"{col}{dlm}" + return bytearray(srt, "utf-8") + + def _format_gga(self) -> tuple: + """ + Format NMEA GGA sentence using pynmeagps. The raw string + output is suitable for sending to an NTRIP socket. + GGA timestamp will default to current UTC. GGA quality is + derived from fix string. + + :return: tuple of (raw NMEA message as bytes, NMEAMessage) + :rtype: tuple + :rtype: tuple + """ + + try: + lat, lon, alt, sep, fixs, sip, hdop, diffage, diffstation = ( + self._app_get_coordinates() + ) + lat = float(lat) + lon = float(lon) + fixi = FIXES.get(fixs, 1) + parsed_data = NMEAMessage( + "GP", + "GGA", + GET, + lat=lat, + lon=lon, + quality=fixi, + numSV=sip, + HDOP=hdop, + alt=alt, + altUnit="M", + sep=sep, + sepUnit="M", + diffAge=diffage, + diffStation=diffstation, + ) + + raw_data = parsed_data.serialize() + return raw_data, parsed_data + except ValueError: + return None, None + + def _send_gga(self, ggainterval: int, output: object): + """ + Send NMEA GGA sentence to NTRIP server at prescribed interval. + + :param int ggainterval: GGA send interval in seconds (-1 = don't send) + :param object output: writeable output medium e.g. serial port + """ + + if ggainterval != NOGGA: + if datetime.now() > self._last_gga + timedelta(seconds=ggainterval): + raw_data, parsed_data = self._format_gga() + if parsed_data is not None: + self._socket.sendall(raw_data) + self._do_output(output, raw_data, parsed_data) + self._last_gga = datetime.now() + + def _get_closest_mountpoint(self) -> tuple: + """ + THREADED + Find closest mountpoint in sourcetable + if valid reference lat/lon are available. + + :return: tuple of (mountpoint, distance) + :rtype: tuple + """ + + try: + lat, lon, _, _, _, _, _, _, _ = self._app_get_coordinates() + closest_mp, dist = find_mp_distance( + float(lat), float(lon), self._settings["sourcetable"] + ) + if self._settings["mountpoint"] == "": + self._settings["mountpoint"] = closest_mp + self.logger.debug( + "Closest mountpoint to reference location " + f"({lat}, {lon}) = {closest_mp}, {dist} km." + ) + + except ValueError: + return None, None + return closest_mp, dist + + def _do_output(self, output: object, raw: bytes, parsed: object): + """ + Send sourcetable/closest mountpoint or RTCM3/SPARTN data to designated output medium. + + If output is Queue, will send both raw and parsed data. + + :param object output: writeable output medium for raw data + :param bytes raw: raw data + :param object parsed: parsed message + """ + + if output is not None: + # serialize sourcetable if outputting to stream + if isinstance(raw, list) and not isinstance(output, Queue): + raw = self._serialize_sourcetable(raw) + if isinstance(output, (Serial, BufferedWriter)): + output.write(raw) + elif isinstance(output, TextIOWrapper): + output.write(str(parsed)) + elif isinstance(output, Queue): + output.put(raw if self.__app == CLIAPP else (raw, parsed)) + elif isinstance(output, socket.socket): + output.sendall(raw) + + # notify any calling app that data is available + if self.__app is not None: + if hasattr(self.__app, "set_event"): + self.__app.set_event(NTRIP_EVENT) + + def _app_update_status(self, status: bool, msgt: tuple = None): + """ + Update NTRIP connection status in calling application. + + :param bool status: NTRIP server connection status + :param tuple msgt: (message, color) + """ + + if msgt[1] == "red": + self.logger.error(msgt[0]) + else: + self.logger.info(msgt[0]) + if self.__app is not None: + if hasattr(self.__app, "dialog"): + dlg = self.__app.dialog(DLGTNTRIP) + if dlg is not None: + if hasattr(dlg, "set_controls"): + dlg.set_controls(status, msgt) + + def _app_get_coordinates(self) -> tuple: + """ + Get live coordinates from receiver, or use fixed + reference position, depending on ggamode setting. + + NB: 'fix' is a string e.g. "3D" or "RTK FLOAT" + + :returns: tuple of coordinate and fix data + :rtype: tuple + """ + + lat = lon = alt = sep = 0.0 + fix, sip, hdop, diffage, diffstation = ("3D", 15, 0.98, 0, 0) + if self._settings["ggamode"] == GGAFIXED: # fixed reference position + lat = self._settings["reflat"] + lon = self._settings["reflon"] + alt = self._settings["refalt"] + sep = self._settings["refsep"] + elif self.__app is not None: + if hasattr(self.__app, "get_coordinates"): # live position from receiver + coords = self.__app.get_coordinates() + if isinstance(coords, tuple): # old version (PyGPSClient <=1.4.19) + _, lat, lon, alt, sep = coords + else: # new version uses dict (PyGPSClient >=1.4.20) + lat = coords.get("lat", lat) + lon = coords.get("lon", lon) + alt = coords.get("alt", alt) + sep = coords.get("sep", sep) + sip = coords.get("sip", sip) + fix = coords.get("fix", fix) + hdop = coords.get("hdop", hdop) + diffage = coords.get("diffage", diffage) + diffstation = coords.get("diffstation", diffstation) + + lat, lon, alt, sep = [ + 0.0 if c == "" else float(c) for c in (lat, lon, alt, sep) + ] + + return lat, lon, alt, sep, fix, sip, hdop, diffage, diffstation + + @property + def settings(self): + """ + Getter for NTRIP settings. + """ + + return self._settings + + @settings.setter + def settings(self, kwargs: dict): + """ + Setter for NTRIP settings. + + :param dict kwargs: NTRIP settings (see run() method for kwargs) + """ + + ipprot = kwargs.get("ipprot", "IPv4") + self._settings["ipprot"] = ipprot2int(ipprot) + self._settings["server"] = kwargs.get("server", "") + self._settings["port"] = int(kwargs.get("port", OUTPORT_NTRIP)) + self._settings["https"] = int(kwargs.get("https", 0)) + self._settings["flowinfo"] = int(kwargs.get("flowinfo", 0)) + self._settings["scopeid"] = int(kwargs.get("scopeid", 0)) + self._settings["mountpoint"] = kwargs.get("mountpoint", "") + self._settings["sourcetable"] = kwargs.get("sourcetable", []) + self._settings["datatype"] = kwargs.get("datatype", RTCM).upper() + self._settings["version"] = kwargs.get("version", "2.0") + self._ntrip_version = self._settings["version"] + self._settings["ntripuser"] = kwargs.get( + "ntripuser", getenv("PYGPSCLIENT_USER", "user") + ) + self._settings["ntrippassword"] = kwargs.get( + "ntrippassword", getenv("PYGPSCLIENT_PASSWORD", "password") + ) + self._settings["ggainterval"] = int(kwargs.get("ggainterval", NOGGA)) + self._settings["ggamode"] = int(kwargs.get("ggamode", GGALIVE)) + self._settings["reflat"] = kwargs.get("reflat", 0.0) + self._settings["reflon"] = kwargs.get("reflon", 0.0) + self._settings["refalt"] = kwargs.get("refalt", 0.0) + self._settings["refsep"] = kwargs.get("refsep", 0.0) + self._settings["spartndecode"] = kwargs.get("spartndecode", 0) + self._settings["spartnkey"] = kwargs.get("spartnkey", getenv("MQTTKEY", None)) + self._settings["spartnbasedate"] = kwargs.get( + "spartbasedate", datetime.now(timezone.utc) + ) + + @property + def connected(self): + """ + Connection status getter. + """ + + return self._connected + + @property + def responseok(self) -> bool: + """ + Response OK indicator (i.e. 200 OK). + + :return: True/False + :rtype: bool + """ + + return self._response_status["code"] == 200 + + @property + def status(self) -> dict: + """ + Get response status e.g. {protocol: "HTTP/1.1", code: 200, description: "OK"}. + + :return: dict of protocol, status code, status description + :rtype: dict + """ + + return self._response_status + + @property + def content_type(self) -> str: + """ + Get content type e.g. "text/html" or "gnss/data". + + :return: content type + :rtype: str + """ + + return self._response_headers.get("content-type", "") + + @property + def response_body(self) -> object: + """ + Get response body if available. + + :return: response body as bytes or string, depending on encoding + :rtype: object + """ + + if "text/" in self.content_type or self.is_sourcetable: + return self._response_body.decode() + return self._response_body + + @property + def encoding(self) -> int: + """ + Get response transfer-encoding settings + (chunked, deflate, compress, gzip). + + :return: OR'd transfer-encoding value + :rtype: int + """ + + encoding = ENCODE_NONE + enc = self._response_headers.get("transfer-encoding", "").lower() + + if "chunked" in enc: + encoding |= ENCODE_CHUNKED + if "deflate" in enc: # zlib compression + encoding |= ENCODE_DEFLATE + if "compress" in enc: # Lempel-Ziv-Welch (LZW) compression + encoding |= ENCODE_COMPRESS + if "gzip" in enc: # Lempel-Zif compression with 32-bit CRC + encoding |= ENCODE_GZIP + + return encoding + + @property + def is_gnssdata(self) -> bool: + """ + Check if response is NTRIP data stream (RTCM or SPARTN). + + :return: gnss/data True/False + :rtype: bool + """ + + return (self._ntrip_version == "2.0" and self.content_type == "gnss/data") or ( + self._ntrip_version == "1.0" and self.status["protocol"].lower() == "icy" + ) + + @property + def is_sourcetable(self) -> bool: + """ + Check if response is NTRIP sourcetable. + + :return: gnss/sourcetable True/False + :rtype: bool + """ + + return ( + self._ntrip_version == "2.0" and self.content_type == "gnss/sourcetable" + ) or ( + self._ntrip_version == "1.0" + and self.status["protocol"].lower() == "sourcetable" + ) + + @property + def stopevent(self) -> Event: + """ + Getter for stop event. + + :return: stop event + :rtype: Event + """ + + return self._stopevent diff --git a/src/pygnssutils/gnssntripclient_cli.py b/src/pygnssutils/gnssntripclient_cli.py index e047b9d..81c0392 100644 --- a/src/pygnssutils/gnssntripclient_cli.py +++ b/src/pygnssutils/gnssntripclient_cli.py @@ -115,7 +115,7 @@ def main(): ap.add_argument( "--retryinterval", required=False, - help="Retry interval in seconds (* retries)", + help="Retry backoff (interval = retryinterval * (2**retries))", type=int, default=RETRY_INTERVAL, ) diff --git a/src/pygnssutils/gnssserver.py b/src/pygnssutils/gnssserver.py index 632270f..bc2626c 100644 --- a/src/pygnssutils/gnssserver.py +++ b/src/pygnssutils/gnssserver.py @@ -52,6 +52,7 @@ def __init__(self, app=None, **kwargs): :param str outport: (kwarg) TCP port (50010, or 2101 in NTRIP mode) :param int maxclients: (kwarg) maximum number of connected clients (5) :param int ntripmode: (kwarg) 0 = socket server, 1 - NTRIP server (0) + :param str ntripversion: (kwarg) NTRIP version "1.0", "2.0" ("2.0") :param str ntripuser: (kwarg) NTRIP caster authentication user ("anon") :param str ntrippassword: (kwarg) NTRIP caster authentication password ("password") :param int validate: (kwarg) 1 = validate checksums, 0 = do not validate (1) @@ -73,6 +74,7 @@ def __init__(self, app=None, **kwargs): # overrideable command line arguments.. # 0 = TCP Socket Server mode, 1 = NTRIP Server mode self._kwargs["ntripmode"] = int(kwargs.get("ntripmode", 0)) + self._kwargs["ntripversion"] = kwargs.get("ntripversion", "2.0") self._kwargs["ntripuser"] = kwargs.get("ntripuser", "anon") self._kwargs["ntrippassword"] = kwargs.get("ntrippassword", "password") ipprot = kwargs.get("ipprot", "IPv4") @@ -223,6 +225,7 @@ def _output_thread(self, app: object, kwargs): kwargs["output"], conn, ClientHandler, + ntripversion=kwargs["ntripversion"], ntripuser=kwargs["ntripuser"], ntrippassword=kwargs["ntrippassword"], ipprot=kwargs["ipprot"], diff --git a/src/pygnssutils/gnssserver_cli.py b/src/pygnssutils/gnssserver_cli.py index 394a84b..8fec927 100644 --- a/src/pygnssutils/gnssserver_cli.py +++ b/src/pygnssutils/gnssserver_cli.py @@ -66,6 +66,14 @@ def main(): choices=[0, 1], default=0, ) + ap.add_argument( + "--ntripversion", + required=False, + help="NTRIP version", + type=str, + choices=["1.0", "2.0"], + default="2.0", + ) ap.add_argument( "--ntripuser", required=False, diff --git a/src/pygnssutils/gnssstreamer.py b/src/pygnssutils/gnssstreamer.py index 258c65f..22e4e2f 100644 --- a/src/pygnssutils/gnssstreamer.py +++ b/src/pygnssutils/gnssstreamer.py @@ -46,6 +46,7 @@ from pygnssutils.exceptions import ParameterError from pygnssutils.globals import ( + ENCODE_NONE, FORMAT_BINARY, FORMAT_HEX, FORMAT_HEXTABLE, @@ -55,6 +56,7 @@ UBXSIMULATOR, ) from pygnssutils.helpers import format_conn, format_json, ipprot2int +from pygnssutils.socketwrapper import SocketWrapper from pygnssutils.ubxsimulator import UBXSimulator @@ -90,6 +92,7 @@ def __init__(self, app=None, **kwargs): :param str ipprot: (kwarg) IP protocol IPv4 / IPv6 :param int baudrate: (kwarg) serial baud rate (9600) :param int timeout: (kwarg) serial timeout in seconds (3) + :param int encoding: (kwarg) socket stream encoding 0 = None, 1 = chunked, 2 = gzip, 4 = compress, 8 = deflate (0) :param int validate: (kwarg) 1 = validate checksums, 0 = do not validate (1) :param int msgmode: (kwarg) 0 = GET, 1 = SET, 2 = POLL, 3 = SETPOLL (0) :param int parsebitfield: (kwarg) 1 = parse UBX 'X' attributes as bitfields, 0 = leave as bytes (1) @@ -114,6 +117,7 @@ def __init__(self, app=None, **kwargs): self._socket = kwargs.get("socket", None) self._ipprot = ipprot2int(kwargs.get("ipprot", "IPv4")) self._output = kwargs.get("output", None) + self._encoding = kwargs.get("encoding", ENCODE_NONE) if self._socket is not None: if self._ipprot == AF_INET6: # IPv6 host ip must be enclosed in [] @@ -228,10 +232,11 @@ def run(self, **kwargs) -> int: ) as self._stream: self._start_reader() elif self._socket is not None: # socket - with socket(self._ipprot, SOCK_STREAM) as self._stream: - self._stream.connect( + with socket(self._ipprot, SOCK_STREAM) as sock: + sock.connect( format_conn(self._ipprot, self._socket_host, self._socket_port) ) + self._stream = SocketWrapper(sock, self._encoding) self._start_reader() elif self._filename is not None: # binary file with open(self._filename, "rb") as self._stream: diff --git a/src/pygnssutils/helpers.py b/src/pygnssutils/helpers.py index bb6f1a1..436e661 100644 --- a/src/pygnssutils/helpers.py +++ b/src/pygnssutils/helpers.py @@ -39,17 +39,21 @@ def parse_config(configfile: str) -> dict: :param str configfile: fully qualified path to config file :return: config as kwargs, or None if file not found :rtype: dict + :raises: FileNotFoundError + :raises: ValueError """ + config = {} try: - config = {} with open(configfile, "r", encoding="utf-8") as infile: for cf in infile: key, val = cf.split("=", 1) config[key.strip()] = val.strip() return config - except (FileNotFoundError, ValueError): - return None + except FileNotFoundError as err: + raise FileNotFoundError(f"Configuration file not found: {configfile}") from err + except ValueError as err: + raise ValueError(f"Configuration file invalid: {configfile}, {err}") from err def set_common_args( @@ -334,11 +338,9 @@ def ipprot2int(family: str) -> int: :rtype: int """ - if family == "IPv4": - return AF_INET if family == "IPv6": return AF_INET6 - raise ValueError(f"Invalid family value {family}") + return AF_INET def ipprot2str(family: int) -> str: @@ -350,8 +352,6 @@ def ipprot2str(family: int) -> str: :rtype: int """ - if family == AF_INET: - return "IPv4" if family == AF_INET6: return "IPv6" - raise ValueError(f"Invalid family value {family}") + return "IPv4" diff --git a/src/pygnssutils/socket_server.py b/src/pygnssutils/socket_server.py index 1b3fc49..f79cf8e 100644 --- a/src/pygnssutils/socket_server.py +++ b/src/pygnssutils/socket_server.py @@ -32,17 +32,17 @@ :license: BSD 3-Clause """ -import logging from base64 import b64encode from datetime import datetime, timezone +from logging import getLogger from os import getenv from queue import Queue from socketserver import StreamRequestHandler, ThreadingTCPServer from threading import Event, Thread from pygnssutils._version import __version__ as VERSION -from pygnssutils.globals import CONNECTED, DISCONNECTED, HTTPCODES, VERBOSITY_MEDIUM -from pygnssutils.helpers import ipprot2int, set_logging +from pygnssutils.globals import CLIAPP, CONNECTED, DISCONNECTED +from pygnssutils.helpers import ipprot2int # from pygpsclient import version as PYGPSVERSION @@ -52,8 +52,6 @@ BUFSIZE = 1024 PYGPSMP = "pygnssutils" -logger = logging.getLogger(__name__) - class SocketServer(ThreadingTCPServer): """ @@ -74,6 +72,7 @@ def __init__( :param int maxclients: max no of clients allowed :param Queue msgqueue: queue containing raw GNSS messages :param str ipprot: (kwarg) IP protocol family (IPv4, IPv6) + :param str ntripversion: (kwarg) NTRIP version ("1.0", "2.0") :param str ntripuser: (kwarg) NTRIP authentication user name :param str ntrippassword: (kwarg) NTRIP authentication password :param int verbosity: (kwarg) log verbosity (1 = medium) @@ -81,11 +80,7 @@ def __init__( """ self.__app = app # Reference to main application class - set_logging( - logger, - kwargs.pop("verbosity", VERBOSITY_MEDIUM), - kwargs.pop("logtofile", ""), - ) + self.logger = getLogger(__name__) self._ntripmode = ntripmode self._maxclients = maxclients self._msgqueue = msgqueue @@ -93,6 +88,7 @@ def __init__( self._stream_thread = None self._stopmqread = Event() # set NTRIP Caster authentication credentials + self.ntripversion = kwargs.pop("ntripversion", "2.0") self._ntripuser = kwargs.pop("ntripuser", getenv("PYGPSCLIENT_USER", "anon")) self._ntrippassword = kwargs.pop( "ntrippassword", getenv("PYGPSCLIENT_PASSWORD", "password") @@ -340,12 +336,32 @@ def _process_ntrip_request(self, data: bytes) -> bytes: elif mountpoint == f"/{PYGPSMP}": # valid mountpoint validmp = True + http_date, server_date = self._format_dates() if not authorized: # respond with 401 - http = ( - self._format_http_header(401) - + f'WWW-Authenticate: Basic realm="{mountpoint}"\r\n' - + "Connection: close\r\n" - ) + if self.server.ntripversion == "1.0": + http = ( + "HTTP/1.1 401 Unauthorized\r\n" + f"Date: {http_date}\r\n" + f'WWW-Authenticate: Basic realm="{mountpoint}"\r\n' + "Content-Type: text/html\r\n" + "Connection: close\r\n\r\n" + "\r\n" + "401 Unauthorized\r\n" + "

" + "The server does not recognize your privileges " + "to the requested entity/stream

\r\n" + "\r\n\r\n" + ) + else: + http = ( + "HTTP/1.1 401 Unauthorized\r\n" + "Ntrip-Version: Ntrip/2.0\r\n" + f"Server: {PYGPSMP.upper()}_NTRIP_Caster_{VERSION}/of:{server_date}\r\n" + f"Date: {http_date}\r\n" + f'WWW-Authenticate: Basic realm="{mountpoint}"\r\n' + "Connection: close\r\n\r\n" + ) return BAD, bytes(http, "UTF-8") if strreq or (not strreq and not validmp): # respond with nominal sourcetable http = self._format_sourcetable() @@ -363,27 +379,42 @@ def _format_sourcetable(self) -> str: :rtype: str """ + http_date, server_date = self._format_dates() lat, lon = self.server.latlon ipaddr, port = self.server.server_address + pygu = PYGPSMP.upper() # sourcetable based on ZED-F9P capabilities sourcetable = ( - f"STR;{PYGPSMP};PYGNSSUTILS;RTCM 3.3;" - + "1005(5),1077(1),1087(1),1097(1),1127(1),1230(1);" - + f"0;GPS+GLO+GAL+BEI;SNIP;SRB;{lat};{lon};0;0;sNTRIP;none;B;N;0;\r\n" - ) - sourcefooter = ( - f"NET;SNIP;pygnssutils;N;N;pygnssutils;{ipaddr}:{port};info@semuconsulting.com;;\r\n" - + "ENDSOURCETABLE\r\n" - ) - http = ( - self._format_http_header(200) - + "Connection: close\r\n" - + "Content-Type: gnss/sourcetable\r\n" - + f"Content-Length: {len(sourcetable) + len(sourcefooter)}\r\n" - + "\r\n" # necessary to separate body from header - + sourcetable - + sourcefooter + f"CAS;{ipaddr};{port};{PYGPSMP}/{VERSION};SEMU;0;GBR;{lat};{lon};0.0.0.0;0;none\r\n" + f"NET;{pygu};SEMU;B;N;none;none;none;none\r\n" + f"STR;{PYGPSMP};{pygu};RTCM 3.3;" + "1005(5),1077(1),1087(1),1097(1),1127(1),1230(1);" + f"2;GPS+GLO+GAL+BEI;{pygu};GBR;{lat};{lon};0;0;{pygu};none;B;N;0;\r\n" + "ENDSOURCETABLE\r\n" ) + if self.server.ntripversion == "1.0": + http = ( + "SOURCETABLE 200 OK\r\n" + f"Date: {http_date}\r\n" + "Connection: close\r\n" + "Content-Type: text/plain\r\n" + f"Content-Length: {len(sourcetable)}\r\n" + "\r\n" # necessary to separate body from header + f"{sourcetable}" + ) + else: + http = ( + "HTTP/1.1 200 OK\r\n" + "Ntrip-Version: Ntrip/2.0\r\n" + # "Ntrip-Flags: st_filter,st_auth,st_match,st_strict,rtsp,plain_rtp\r\n" + f"Server: {pygu}_NTRIP_Caster_{VERSION}/of:{server_date}\r\n" + f"Date: {http_date}\r\n" + "Connection: close\r\n" + "Content-Type: gnss/sourcetable\r\n" + f"Content-Length: {len(sourcetable)}\r\n" + "\r\n" # necessary to separate body from header + f"{sourcetable}" + ) return http def _format_data(self) -> str: @@ -394,36 +425,35 @@ def _format_data(self) -> str: :rtype: str """ - http = ( - self._format_http_header(200) - + "Cache-Control: no-store, no-cache, max-age=0\r\n" - + "Pragma: no-cache\r\n" - + "Connection: close\r\n" - + "Content-Type: gnss/data\r\n" - + "\r\n" # necessary to separate body from header - ) + http_date, server_date = self._format_dates() + if self.server.ntripversion == "1.0": + http = "ICY 200 OK\r\n\r\n" + else: + http = ( + "HTTP/1.1 200 OK" + "Ntrip-Version: Ntrip/2.0\r\n" + f"Server: {PYGPSMP.upper()}_NTRIP_Caster_{VERSION}/of:{server_date}\r\n" + f"Date: {http_date}\r\n" + "Cache-Control: no-store, no-cache, max-age=0\r\n" + "Pragma: no-cache\r\n" + "Connection: close\r\n" + "Content-Type: gnss/data\r\n" + "\r\n" # necessary to separate body from header + ) return http - def _format_http_header(self, code: int = 200) -> str: + def _format_dates(self) -> tuple: """ - Format HTTP NTRIP header. + Format response header dates. - :param int code: HTTP response code (200) - :return: HTTP NTRIP header - :rtype: str + :return: tuple of (http_date, server_date) + :rtype: tuple """ dat = datetime.now(timezone.utc) - server_date = dat.strftime("%d %b %Y") http_date = dat.strftime("%a, %d %b %Y %H:%M:%S %Z") - header = ( - f"HTTP/1.1 {code} {HTTPCODES[code]}\r\n" - + "Ntrip-Version: Ntrip/2.0\r\n" - + "Ntrip-Flags: \r\n" - + f"Server: pygnssutils_NTRIP_Caster_{VERSION}/of:{server_date}\r\n" - + f"Date: {http_date}\r\n" - ) - return header + server_date = dat.strftime("%d %b %Y") + return (http_date, server_date) def _write_from_mq(self): """ @@ -436,7 +466,14 @@ def _write_from_mq(self): self.wfile.flush() -def runserver(host: str, port: int, mq: Queue, ntripmode: int = 0, maxclients: int = 5): +def runserver( + host: str, + port: int, + mq: Queue, + ntripmode: int = 0, + maxclients: int = 5, + **kwargs, +): """ THREADED Socket server function to be run as thread. @@ -446,14 +483,18 @@ def runserver(host: str, port: int, mq: Queue, ntripmode: int = 0, maxclients: i :param Queue mq: output message queue :param int ntripmode: 0 = basic, 1 = ntrip caster :param int maxclients: max concurrent clients + :param str ntripversion: (kwarg) NTRIP version 1.0 or 2.0 """ with SocketServer( - None, + CLIAPP, ntripmode, maxclients, mq, # message queue containing raw data from source (host, port), ClientHandler, + ntripversion=kwargs.get("ntripversion", "2.0"), + ntripuser=kwargs.get("ntripuser", "anon"), + ntrippassword=kwargs.get("ntrippassword", "password"), ) as server: server.serve_forever() diff --git a/src/pygnssutils/socketwrapper.py b/src/pygnssutils/socketwrapper.py new file mode 100644 index 0000000..2268691 --- /dev/null +++ b/src/pygnssutils/socketwrapper.py @@ -0,0 +1,177 @@ +""" +socketwrapper.py + +Socket stream wrapper providing read(n) and readline() methods. + +Supports chunked and compressed transfer-encoded datastreams. + +Created on 12 Feb 2023 + +:author: semuadmin +:copyright: SEMU Consulting © 2023 +:license: BSD 3-Clause +""" + +import socket +from io import BytesIO +from logging import getLogger +from zlib import MAX_WBITS, decompress +from zlib import error as zlibError + +from pygnssutils.globals import ( + DEFAULT_BUFSIZE, + ENCODE_CHUNKED, + ENCODE_COMPRESS, + ENCODE_DEFLATE, + ENCODE_GZIP, + ENCODE_NONE, +) + + +class SocketWrapper: + """ + Socket stream wrapper providing read(n) and readline() methods. + + Supports chunked transfer-encoded datastreams. + """ + + def __init__(self, sock: socket, encoding=ENCODE_NONE, bufsize=DEFAULT_BUFSIZE): + """ + Constructor. + + :param sock socket: socket object + :param int encoding: transfer-encoding + :param int bufsize: internal buffer size + """ + + # configure logger with name "pygnssutils" in calling module + self.logger = getLogger(__name__) + self._socket = sock + self._bufsize = bufsize + self._encoding = encoding + self._buffer = bytearray() + self._partials = b"" # partial chunks + self._recv() # populate initial buffer + + def _recv(self) -> bool: + """ + Read bytes from socket into internal buffer. + + :return: return code (0 = failure, 1 = success) + :rtype: bool + """ + + try: + data = self._socket.recv(self._bufsize) + if len(data) == 0: + return False + if self._encoding & ENCODE_CHUNKED: + data = self._partials + data + chunks, self._partials = self.dechunk(data) + self._buffer += chunks + else: + self._buffer += data + except (OSError, TimeoutError): + return False + return True + + def read(self, num: int) -> bytes: + """ + Read specified number of bytes from buffer. + NB: always check length of return data. + + :param int num: number of bytes to read + :return: bytes read (which may be less than num) + :rtype: bytes + """ + + # if at end of internal buffer, top it up from socket + while len(self._buffer) < num: + if not self._recv(): + return b"" + data = self._buffer[:num] + self._buffer = self._buffer[num:] + return bytes(data) + + def readline(self) -> bytes: + """ + Read bytes from buffer until CRLF reached. + NB: always check that return data terminator is CRLF. + + :return: bytes + :rtype: bytes + """ + + line = b"" + while True: + data = self.read(1) + if len(data) == 1: + line += data + if line[-2:] == b"\r\n": + break + else: + break + + return line + + def write(self, data: bytes, **kwargs): + """ + Write bytes to socket. + + :param bytes data: data + :param dict kwargs: kwargs + """ + + return self._socket.send(data, **kwargs) + + def dechunk(self, segment: bytes) -> tuple: + """ + Parse segment of chunked transfer-encoded byte stream. + + Returns complete chunks in this segment and any partial + chunk, which should be prepended to next segment read. + + :param segment: segment of byte stream + :return: tuple of (chunks, partial) + :rtype: tuple + """ + + instream = BytesIO(segment) + chunks = b"" + partial = b"" + + while True: + length_bytes = instream.readline() + if length_bytes[-2:] != b"\r\n": + # premature end of length bytes + partial = length_bytes + break + try: + chunk_length = int(length_bytes.strip(), 16) + except ValueError: + # residual bytes at beginning of stream + break + if chunk_length != 0: + chunk = instream.read(chunk_length) + if len(chunk) != chunk_length: + # premature end of chunk bytes + partial = length_bytes + chunk + break + try: + if self._encoding & ENCODE_GZIP: + chunk = decompress(chunk, wbits=MAX_WBITS | 16) + if self._encoding & ENCODE_COMPRESS: + chunk = decompress(chunk, wbits=MAX_WBITS) + if self._encoding & ENCODE_DEFLATE: + chunk = decompress(chunk, wbits=-MAX_WBITS) + except zlibError as err: + self.logger.error(f"Error decompressing data: {err}") + # parser will discard data + chunks += chunk + + instream.readline() + if chunk_length == 0: + # final chunk + break + + return chunks, partial diff --git a/tests/test_static.py b/tests/test_static.py index 7cbf130..1804304 100644 --- a/tests/test_static.py +++ b/tests/test_static.py @@ -155,18 +155,10 @@ def testipprot2int(self): # test IP family to int self.assertEqual(AF_INET, ipprot2int("IPv4")) self.assertEqual(AF_INET6, ipprot2int("IPv6")) - def testipprot2interr(self): # test IP family to int invalid - with self.assertRaisesRegex(ValueError, "Invalid family value IPv99"): - ipprot2int("IPv99") - def testipprot2str(self): # test IP family to str self.assertEqual("IPv4", ipprot2str(AF_INET)) self.assertEqual("IPv6", ipprot2str(AF_INET6)) - def testipprot2strerr(self): # test IP family to str invalid - with self.assertRaisesRegex(ValueError, "Invalid family value 99"): - ipprot2str(99) - def testparsemqttfreq(self): # test MQTTMessage constructor topic = "/pp/frequencies/Lb" payload = b'{\n "frequencies": {\n "us": {\n "current": {\n "value": "1556.29"\n }\n },\n "eu": {\n "current": {\n "value": "1545.26"\n }\n }\n }\n}'