diff --git a/gamutrf/grpduzmq.py b/gamutrf/grpduzmq.py index 1eeb7ee2..a25f6a3b 100644 --- a/gamutrf/grpduzmq.py +++ b/gamutrf/grpduzmq.py @@ -4,6 +4,7 @@ import sys import pmt import zmq +import zstandard try: from gnuradio import gr # pytype: disable=import-error @@ -38,6 +39,7 @@ def __init__( self.zmq_pub.bind(zmq_addr) self.message_port_register_in(pmt.intern("json")) self.set_msg_handler(pmt.intern("json"), self.receive_pdu) + self.context = zstandard.ZstdCompressor() def stop(self): self.zmq_pub.close() @@ -45,6 +47,8 @@ def stop(self): def receive_pdu(self, pdu): item = pmt.to_python(pmt.cdr(pdu)).tobytes().decode("utf8").strip() try: - self.zmq_pub.send_string(item + DELIM, flags=zmq.NOBLOCK) + data = item + DELIM + data = self.context.compress(data.encode("utf8")) + self.zmq_pub.send(data, flags=zmq.NOBLOCK) except zmq.ZMQError as e: logging.error(str(e)) diff --git a/gamutrflib/gamutrflib/zmqbucket.py b/gamutrflib/gamutrflib/zmqbucket.py index 034c1978..64b9b906 100644 --- a/gamutrflib/gamutrflib/zmqbucket.py +++ b/gamutrflib/gamutrflib/zmqbucket.py @@ -81,11 +81,12 @@ def fft_proxy( tmp_buff_file = buff_file.replace(tmp_buff_file, "." + tmp_buff_file) if os.path.exists(tmp_buff_file): os.remove(tmp_buff_file) - context = zstandard.ZstdCompressor() + compress_context = zstandard.ZstdCompressor() + decompress_context = zstandard.ZstdDecompressor() shutdown = False while not shutdown: with open(tmp_buff_file, "wb") as zbf: - with context.stream_writer(zbf) as bf: + with compress_context.stream_writer(zbf) as bf: while not shutdown: shutdown = live_file is not None and not live_file.exists() try: @@ -93,6 +94,11 @@ def fft_proxy( except zmq.error.Again: time.sleep(poll_timeout) continue + # gamutrf might send compressed message + try: + sock_txt = decompress_context.decompress(sock_txt) + except zstandard.ZstdError: + pass bf.write(sock_txt) now = time.time() if ( @@ -122,7 +128,6 @@ def __init__( self.addr = addr self.port = port self.context = zstandard.ZstdDecompressor() - self.txt_buf = "" self.fftbuffer = None self.scan_configs = {} self.proxy_result = executor.submit( @@ -138,30 +143,20 @@ def healthy(self): def __str__(self): return f"ZmqScanner on {self.addr}:{self.port}" - def read_buff_file(self): + def read_buff_file(self, log): + lines = None if os.path.exists(self.buff_file): self.info("read %u bytes of FFT data" % os.stat(self.buff_file).st_size) with self.context.stream_reader(open(self.buff_file, "rb")) as bf: - self.txt_buf += bf.read().decode("utf8") - os.remove(self.buff_file) - return True - return False - - def txtbuf_to_lines(self, log): - lines = self.txt_buf.splitlines() - if len(lines) > 1: - if self.txt_buf.endswith("\n"): - if log: - log.write(self.txt_buf) - self.txt_buf = "" - elif lines: - last_line = lines[-1] + txt_buf = bf.read().decode("utf8") if log: - log.write(self.txt_buf[: -len(last_line)]) - self.txt_buf = last_line - lines = lines[:-1] - return lines - return None + log.write(txt_buf) + try: + lines = [json.loads(line) for line in txt_buf.splitlines() if line] + except json.decoder.JSONDecodeError as err: + logging.info("%s: %s", err, txt_buf) + os.remove(self.buff_file) + return lines def read_new_frame_df(self, df, discard_time): frame_df = None @@ -194,9 +189,7 @@ def read_new_frame_df(self, df, discard_time): def lines_to_df(self, lines): try: records = [] - for line in lines: - line = line.strip() - json_record = json.loads(line) + for json_record in lines: ts = float(json_record["ts"]) sweep_start = float(json_record["sweep_start"]) total_tune_count = int(json_record["total_tune_count"]) @@ -223,12 +216,11 @@ def lines_to_df(self, lines): def read_buff(self, log, discard_time): scan_config = None frame_df = None - if self.read_buff_file(): - lines = self.txtbuf_to_lines(log) - if lines: - df = self.lines_to_df(lines) - if df is not None: - scan_config, frame_df = self.read_new_frame_df(df, discard_time) + lines = self.read_buff_file(log) + if lines: + df = self.lines_to_df(lines) + if df is not None: + scan_config, frame_df = self.read_new_frame_df(df, discard_time) return scan_config, frame_df