diff --git a/FC_Board/lib/Big_Data.py b/FC_Board/lib/Big_Data.py index e28ab5c..f204b45 100755 --- a/FC_Board/lib/Big_Data.py +++ b/FC_Board/lib/Big_Data.py @@ -3,13 +3,14 @@ import traceback import gc + class Face: def __init__(self, Add, Pos, debug_state, tca): self.tca = tca self.address = Add self.position = Pos self.debug = debug_state - + # Use tuple instead of list for immutable data self.senlist = () # Define sensors based on position using a dictionary lookup instead of if-elif chain @@ -18,13 +19,13 @@ def __init__(self, Add, Pos, debug_state, tca): "x-": ("MCP", "VEML"), "y+": ("MCP", "VEML", "DRV"), "y-": ("MCP", "VEML"), - "z-": ("MCP", "VEML", "DRV") + "z-": ("MCP", "VEML", "DRV"), } self.senlist = sensor_map.get(Pos, ()) - + # Initialize sensor states dict only with needed sensors self.sensors = {sensor: False for sensor in self.senlist} - + # Initialize sensor objects as None self.mcp = None self.veml = None @@ -36,39 +37,43 @@ def debug_print(self, statement): def Sensorinit(self, senlist, address): gc.collect() # Force garbage collection before initializing sensors - + if "MCP" in senlist: try: import adafruit_mcp9808 + self.mcp = adafruit_mcp9808.MCP9808(self.tca[address], address=27) self.sensors["MCP"] = True except Exception as e: self.debug_print("[ERROR][Temperature Sensor]" + str(e)) - + if "VEML" in senlist: try: import adafruit_veml7700 + self.veml = adafruit_veml7700.VEML7700(self.tca[address]) self.sensors["VEML"] = True except Exception as e: self.debug_print("[ERROR][Light Sensor]" + str(e)) - + if "DRV" in senlist: try: import adafruit_drv2605 + self.drv = adafruit_drv2605.DRV2605(self.tca[address]) self.sensors["DRV"] = True except Exception as e: self.debug_print("[ERROR][Motor Driver]" + str(e)) - + gc.collect() # Clean up after initialization + class AllFaces: def __init__(self, debug, tca): self.tca = tca self.debug = debug self.faces = [] - + # Create faces using a loop instead of individual variables positions = [("y+", 0), ("y-", 1), ("x+", 2), ("x-", 3), ("z-", 4)] for pos, addr in positions: @@ -76,7 +81,7 @@ def __init__(self, debug, tca): face.Sensorinit(face.senlist, face.address) self.faces.append(face) gc.collect() # Clean up after each face initialization - + def Face_Test_All(self): results = [] for face in self.faces: diff --git a/FC_Board/lib/Field.py b/FC_Board/lib/Field.py index feb483b..34c6194 100755 --- a/FC_Board/lib/Field.py +++ b/FC_Board/lib/Field.py @@ -18,29 +18,15 @@ def debug_print(self, statement): def __init__(self, cubesat, debug): self.debug = debug self.cubesat = cubesat - try: - if self.cubesat.legacy: - self.cubesat.enable_rf.value = True - - self.cubesat.radio1.spreading_factor = 8 - self.cubesat.radio1.low_datarate_optimize = False - self.cubesat.radio1.node = 0xFB - self.cubesat.radio1.destination = 0xFA - self.cubesat.radio1.receive_timeout = 10 - self.cubesat.radio1.enable_crc = True - if self.cubesat.radio1.spreading_factor > 8: - self.cubesat.radio1.low_datarate_optimize = True - except Exception as e: - self.debug_print( - "Error Defining Radio features: " - + "".join(traceback.format_exception(e)) - ) def Beacon(self, msg): try: if self.cubesat.is_licensed: self.debug_print("I am beaconing: " + str(msg)) - self.cubesat.radio1.send(msg) + print( + "Message Success: " + + str(self.cubesat.radio1.send_with_ack(bytes(msg, "UTF-8"))) + ) else: self.debug_print( "Please toggle licensed variable in code once you obtain an amateur radio license" diff --git a/FC_Board/lib/adafruit_rfm/rfm9x.py b/FC_Board/lib/adafruit_rfm/rfm9x.py new file mode 100644 index 0000000..7ab8d62 --- /dev/null +++ b/FC_Board/lib/adafruit_rfm/rfm9x.py @@ -0,0 +1,535 @@ +# SPDX-FileCopyrightText: 2024 Jerry Needell for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +""" +`adafruit_rfm.rfm9x` +==================================================== + +CircuitPython module for the RFM95/6/7/8 LoRa 433/915mhz radio modules. + +* Author(s): Jerry Needell +""" + +import time + +from micropython import const + +from adafruit_rfm.rfm_common import RFMSPI + +try: + import busio + import digitalio + from circuitpython_typing import ReadableBuffer + + try: + from typing import Literal + except ImportError: + from typing_extensions import Literal + +except ImportError: + pass + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM.git" + +# pylint: disable=duplicate-code + +# Internal constants: +# Register names (FSK Mode even though we use LoRa instead, from table 85) +_RF95_REG_00_FIFO = const(0x00) +_RF95_REG_01_OP_MODE = const(0x01) +_RF95_REG_06_FRF_MSB = const(0x06) +_RF95_REG_07_FRF_MID = const(0x07) +_RF95_REG_08_FRF_LSB = const(0x08) +_RF95_REG_09_PA_CONFIG = const(0x09) +_RF95_REG_0A_PA_RAMP = const(0x0A) +_RF95_REG_0B_OCP = const(0x0B) +_RF95_REG_0C_LNA = const(0x0C) +_RF95_REG_0D_FIFO_ADDR_PTR = const(0x0D) +_RF95_REG_0E_FIFO_TX_BASE_ADDR = const(0x0E) +_RF95_REG_0F_FIFO_RX_BASE_ADDR = const(0x0F) +_RF95_REG_10_FIFO_RX_CURRENT_ADDR = const(0x10) +_RF95_REG_11_IRQ_FLAGS_MASK = const(0x11) +_RF95_REG_12_IRQ_FLAGS = const(0x12) +_RF95_REG_13_RX_NB_BYTES = const(0x13) +_RF95_REG_14_RX_HEADER_CNT_VALUE_MSB = const(0x14) +_RF95_REG_15_RX_HEADER_CNT_VALUE_LSB = const(0x15) +_RF95_REG_16_RX_PACKET_CNT_VALUE_MSB = const(0x16) +_RF95_REG_17_RX_PACKET_CNT_VALUE_LSB = const(0x17) +_RF95_REG_18_MODEM_STAT = const(0x18) +_RF95_REG_19_PKT_SNR_VALUE = const(0x19) +_RF95_REG_1A_PKT_RSSI_VALUE = const(0x1A) +_RF95_REG_1B_RSSI_VALUE = const(0x1B) +_RF95_REG_1C_HOP_CHANNEL = const(0x1C) +_RF95_REG_1D_MODEM_CONFIG1 = const(0x1D) +_RF95_REG_1E_MODEM_CONFIG2 = const(0x1E) +_RF95_REG_1F_SYMB_TIMEOUT_LSB = const(0x1F) +_RF95_REG_20_PREAMBLE_MSB = const(0x20) +_RF95_REG_21_PREAMBLE_LSB = const(0x21) +_RF95_REG_22_PAYLOAD_LENGTH = const(0x22) +_RF95_REG_23_MAX_PAYLOAD_LENGTH = const(0x23) +_RF95_REG_24_HOP_PERIOD = const(0x24) +_RF95_REG_25_FIFO_RX_BYTE_ADDR = const(0x25) +_RF95_REG_26_MODEM_CONFIG3 = const(0x26) + +_RF95_REG_40_DIO_MAPPING1 = const(0x40) +_RF95_REG_41_DIO_MAPPING2 = const(0x41) +_RF95_REG_42_VERSION = const(0x42) + +_RF95_REG_4B_TCXO = const(0x4B) +_RF95_REG_4D_PA_DAC = const(0x4D) +_RF95_REG_5B_FORMER_TEMP = const(0x5B) +_RF95_REG_61_AGC_REF = const(0x61) +_RF95_REG_62_AGC_THRESH1 = const(0x62) +_RF95_REG_63_AGC_THRESH2 = const(0x63) +_RF95_REG_64_AGC_THRESH3 = const(0x64) + +_RF95_DETECTION_OPTIMIZE = const(0x31) +_RF95_DETECTION_THRESHOLD = const(0x37) + +_RF95_PA_DAC_DISABLE = const(0x04) +_RF95_PA_DAC_ENABLE = const(0x07) + +# The crystal oscillator frequency of the module +_RF95_FXOSC = 32000000.0 + +# The Frequency Synthesizer step = RH_RF95_FXOSC / 2^^19 +_RF95_FSTEP = _RF95_FXOSC / 524288 + +# RadioHead specific compatibility constants. +_RH_BROADCAST_ADDRESS = const(0xFF) + +# The acknowledgement bit in the FLAGS +# The top 4 bits of the flags are reserved for RadioHead. The lower 4 bits are reserved +# for application layer use. +_RH_FLAGS_ACK = const(0x80) +_RH_FLAGS_RETRY = const(0x40) + +# User facing constants: +SLEEP_MODE = 0b000 +STANDBY_MODE = 0b001 +FS_TX_MODE = 0b010 +TX_MODE = 0b011 +FS_RX_MODE = 0b100 +RX_MODE = 0b101 + + +# pylint: disable=too-many-instance-attributes +class RFM9x(RFMSPI): + """Interface to a RFM95/6/7/8 LoRa radio module. Allows sending and + receiving bytes of data in long range LoRa mode at a support board frequency + (433/915mhz). + + You must specify the following parameters: + - spi: The SPI bus connected to the radio. + - cs: The CS pin DigitalInOut connected to the radio. + - reset: The reset/RST pin DigialInOut connected to the radio. + - frequency: The frequency (in mhz) of the radio module (433/915mhz typically). + + You can optionally specify: + - preamble_length: The length in bytes of the packet preamble (default 8). + - high_power: Boolean to indicate a high power board (RFM95, etc.). Default + is True for high power. + - baudrate: Baud rate of the SPI connection, default is 10mhz but you might + choose to lower to 1mhz if using long wires or a breadboard. + - agc: Boolean to Enable/Disable Automatic Gain Control - Default=False (AGC off) + - crc: Boolean to Enable/Disable Cyclic Redundancy Check - Default=True (CRC Enabled) + Remember this library makes a best effort at receiving packets with pure + Python code. Trying to receive packets too quickly will result in lost data + so limit yourself to simple scenarios of sending and receiving single + packets at a time. + + Also note this library tries to be compatible with raw RadioHead Arduino + library communication. This means the library sets up the radio modulation + to match RadioHead's defaults. + Advanced RadioHead features like address/node specific packets + or "reliable datagram" delivery are supported however due to the + limitations noted, "reliable datagram" is still subject to missed packets. + """ + + operation_mode = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, bits=3) + + low_frequency_mode = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, offset=3, bits=1) + + modulation_type = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, offset=5, bits=2) + + # Long range/LoRa mode can only be set in sleep mode! + long_range_mode = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, offset=7, bits=1) + + output_power = RFMSPI.RegisterBits(_RF95_REG_09_PA_CONFIG, bits=4) + + max_power = RFMSPI.RegisterBits(_RF95_REG_09_PA_CONFIG, offset=4, bits=3) + + pa_select = RFMSPI.RegisterBits(_RF95_REG_09_PA_CONFIG, offset=7, bits=1) + + pa_dac = RFMSPI.RegisterBits(_RF95_REG_4D_PA_DAC, bits=3) + + dio0_mapping = RFMSPI.RegisterBits(_RF95_REG_40_DIO_MAPPING1, offset=6, bits=2) + + auto_agc = RFMSPI.RegisterBits(_RF95_REG_26_MODEM_CONFIG3, offset=2, bits=1) + + low_datarate_optimize = RFMSPI.RegisterBits( + _RF95_REG_26_MODEM_CONFIG3, offset=3, bits=1 + ) + + lna_boost_hf = RFMSPI.RegisterBits(_RF95_REG_0C_LNA, offset=0, bits=2) + + auto_ifon = RFMSPI.RegisterBits(_RF95_DETECTION_OPTIMIZE, offset=7, bits=1) + + detection_optimize = RFMSPI.RegisterBits(_RF95_DETECTION_OPTIMIZE, offset=0, bits=3) + + bw_bins = (7800, 10400, 15600, 20800, 31250, 41700, 62500, 125000, 250000) + + def __init__( # noqa: PLR0913 + self, + spi: busio.SPI, + cs: digitalio.DigitalInOut, # pylint: disable=invalid-name + rst: digitalio.DigitalInOut, + frequency: int, + *, + preamble_length: int = 8, + high_power: bool = True, + baudrate: int = 5000000, + agc: bool = False, + crc: bool = True, + ) -> None: + super().__init__(spi, cs, baudrate=baudrate) + self.module = "RFM9X" + self.max_packet_length = 252 + self.high_power = high_power + # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. + # Set Default Baudrate to 5MHz to avoid problems + # self._device = spidev.SPIDevice(spi, cs, baudrate=baudrate, polarity=0, phase=0) + # Setup reset as a digital output - initially High + # This line is pulled low as an output quickly to trigger a reset. + self._rst = rst + # initialize Reset High + self._rst.switch_to_output(value=True) + self.reset() + # No device type check! Catch an error from the very first request and + # throw a nicer message to indicate possible wiring problems. + version = self.read_u8(address=_RF95_REG_42_VERSION) + if version != 18: + raise RuntimeError( + "Failed to find rfm9x with expected version -- check wiring. Version found:", + hex(version), + ) + + # Set sleep mode, wait 10s and confirm in sleep mode (basic device check). + # Also set long range mode (LoRa mode) as it can only be done in sleep. + self.sleep() + time.sleep(0.01) + self.long_range_mode = True + if self.operation_mode != SLEEP_MODE or not self.long_range_mode: + raise RuntimeError("Failed to configure radio for LoRa mode, check wiring!") + # clear default setting for access to LF registers if frequency > 525MHz + if frequency > 525: + self.low_frequency_mode = 0 + # Setup entire 256 byte FIFO + self.write_u8(_RF95_REG_0E_FIFO_TX_BASE_ADDR, 0x00) + self.write_u8(_RF95_REG_0F_FIFO_RX_BASE_ADDR, 0x00) + # Set mode idle + self.idle() + # Set frequency + self.frequency_mhz = frequency + # Set preamble length (default 8 bytes to match radiohead). + self.preamble_length = preamble_length + # Defaults set modem config to RadioHead compatible Bw125Cr45Sf128 mode. + self.signal_bandwidth = 125000 + self.coding_rate = 5 + self.spreading_factor = 7 + # Default to enable CRC checking on incoming packets. + self.enable_crc = crc + """CRC Enable state""" + # set AGC - Default = False + self.auto_agc = agc + """Automatic Gain Control state""" + # Set transmit power to 13 dBm, a safe value any module supports. + self.tx_power = 13 + + def reset(self) -> None: + """Perform a reset of the chip.""" + # See section 7.2.2 of the datasheet for reset description. + self._rst.value = False # Set Reset Low + time.sleep(0.0001) # 100 us + self._rst.value = True # set Reset High + time.sleep(0.005) # 5 ms + + def idle(self) -> None: + """Enter idle standby mode.""" + self.operation_mode = STANDBY_MODE + + def sleep(self) -> None: + """Enter sleep mode.""" + self.operation_mode = SLEEP_MODE + + def listen(self) -> None: + """Listen for packets to be received by the chip. Use :py:func:`receive` + to listen, wait and retrieve packets as they're available. + """ + self.operation_mode = RX_MODE + self.dio0_mapping = 0b00 # Interrupt on rx done. + + def transmit(self) -> None: + """Transmit a packet which is queued in the FIFO. This is a low level + function for entering transmit mode and more. For generating and + transmitting a packet of data use :py:func:`send` instead. + """ + self.operation_mode = TX_MODE + self.dio0_mapping = 0b01 # Interrupt on tx done. + + @property + def preamble_length(self) -> int: + """The length of the preamble for sent and received packets, an unsigned + 16-bit value. Received packets must match this length or they are + ignored! Set to 8 to match the RadioHead RFM95 library. + """ + msb = self.read_u8(_RF95_REG_20_PREAMBLE_MSB) + lsb = self.read_u8(_RF95_REG_21_PREAMBLE_LSB) + return ((msb << 8) | lsb) & 0xFFFF + + @preamble_length.setter + def preamble_length(self, val: int) -> None: + assert 0 <= val <= 65535 + self.write_u8(_RF95_REG_20_PREAMBLE_MSB, (val >> 8) & 0xFF) + self.write_u8(_RF95_REG_21_PREAMBLE_LSB, val & 0xFF) + + @property + def frequency_mhz(self) -> Literal[433.0, 915.0]: + """The frequency of the radio in Megahertz. Only the allowed values for + your radio must be specified (i.e. 433 vs. 915 mhz)! + """ + msb = self.read_u8(_RF95_REG_06_FRF_MSB) + mid = self.read_u8(_RF95_REG_07_FRF_MID) + lsb = self.read_u8(_RF95_REG_08_FRF_LSB) + frf = ((msb << 16) | (mid << 8) | lsb) & 0xFFFFFF + frequency = (frf * _RF95_FSTEP) / 1000000.0 + return frequency + + @frequency_mhz.setter + def frequency_mhz(self, val: Literal[433.0, 915.0]) -> None: + if val < 240 or val > 960: + raise RuntimeError("frequency_mhz must be between 240 and 960") + # Calculate FRF register 24-bit value. + frf = int((val * 1000000.0) / _RF95_FSTEP) & 0xFFFFFF + # Extract byte values and update registers. + msb = frf >> 16 + mid = (frf >> 8) & 0xFF + lsb = frf & 0xFF + self.write_u8(_RF95_REG_06_FRF_MSB, msb) + self.write_u8(_RF95_REG_07_FRF_MID, mid) + self.write_u8(_RF95_REG_08_FRF_LSB, lsb) + + @property + def tx_power(self) -> int: + """The transmit power in dBm. Can be set to a value from 5 to 23 for + high power devices (RFM95/96/97/98, high_power=True) or -1 to 14 for low + power devices. Only integer power levels are actually set (i.e. 12.5 + will result in a value of 12 dBm). + The actual maximum setting for high_power=True is 20dBm but for values > 20 + the PA_BOOST will be enabled resulting in an additional gain of 3dBm. + The actual setting is reduced by 3dBm. + The reported value will reflect the reduced setting. + """ + if self.high_power: + return self.output_power + 5 + return self.output_power - 1 + + @tx_power.setter + def tx_power(self, val: int) -> None: + val = int(val) + if self.high_power: + if val < 5 or val > 23: + raise RuntimeError("tx_power must be between 5 and 23") + # Enable power amp DAC if power is above 20 dB. + # Lower setting by 3db when PA_BOOST enabled - see Data Sheet Section 6.4 + if val > 20: + self.pa_dac = _RF95_PA_DAC_ENABLE + val -= 3 + else: + self.pa_dac = _RF95_PA_DAC_DISABLE + self.pa_select = True + self.output_power = (val - 5) & 0x0F + else: + assert -1 <= val <= 14 + self.pa_select = False + self.max_power = 0b111 # Allow max power output. + self.output_power = (val + 1) & 0x0F + + @property + def rssi(self) -> float: + """The received strength indicator (in dBm) of the last received message.""" + # Read RSSI register and convert to value using formula in datasheet. + # Remember in LoRa mode the payload register changes function to RSSI! + raw_rssi = self.read_u8(_RF95_REG_1A_PKT_RSSI_VALUE) + if self.low_frequency_mode: + raw_rssi -= 157 + else: + raw_rssi -= 164 + return float(raw_rssi) + + @property + def snr(self) -> float: + """The SNR (in dB) of the last received message.""" + # Read SNR 0x19 register and convert to value using formula in datasheet. + # SNR(dB) = PacketSnr [twos complement] / 4 + snr_byte = self.read_u8(_RF95_REG_19_PKT_SNR_VALUE) + if snr_byte > 127: + snr_byte = (256 - snr_byte) * -1 + return snr_byte / 4 + + @property + def signal_bandwidth(self) -> int: + """The signal bandwidth used by the radio (try setting to a higher + value to increase throughput or to a lower value to increase the + likelihood of successfully received payloads). Valid values are + listed in RFM9x.bw_bins.""" + bw_id = (self.read_u8(_RF95_REG_1D_MODEM_CONFIG1) & 0xF0) >> 4 + if bw_id >= len(self.bw_bins): + current_bandwidth = 500000 + else: + current_bandwidth = self.bw_bins[bw_id] + return current_bandwidth + + @signal_bandwidth.setter + def signal_bandwidth(self, val: int) -> None: + # Set signal bandwidth (set to 125000 to match RadioHead Bw125). + for bw_id, cutoff in enumerate(self.bw_bins): + if val <= cutoff: + break + else: + bw_id = 9 + self.write_u8( + _RF95_REG_1D_MODEM_CONFIG1, + (self.read_u8(_RF95_REG_1D_MODEM_CONFIG1) & 0x0F) | (bw_id << 4), + ) + if val >= 500000: + # see Semtech SX1276 errata note 2.3 + self.auto_ifon = True + # see Semtech SX1276 errata note 2.1 + if self.low_frequency_mode: + self.write_u8(0x36, 0x02) + self.write_u8(0x3A, 0x7F) + else: + self.write_u8(0x36, 0x02) + self.write_u8(0x3A, 0x64) + else: + # see Semtech SX1276 errata note 2.3 + self.auto_ifon = False + self.write_u8(0x36, 0x03) + if val == 7800: + self.write_u8(0x2F, 0x48) + elif val >= 62500: + # see Semtech SX1276 errata note 2.3 + self.write_u8(0x2F, 0x40) + else: + self.write_u8(0x2F, 0x44) + self.write_u8(0x30, 0) + + @property + def coding_rate(self) -> Literal[5, 6, 7, 8]: + """The coding rate used by the radio to control forward error + correction (try setting to a higher value to increase tolerance of + short bursts of interference or to a lower value to increase bit + rate). Valid values are limited to 5, 6, 7, or 8.""" + cr_id = (self.read_u8(_RF95_REG_1D_MODEM_CONFIG1) & 0x0E) >> 1 + denominator = cr_id + 4 + return denominator + + @coding_rate.setter + def coding_rate(self, val: Literal[5, 6, 7, 8]) -> None: + # Set coding rate (set to 5 to match RadioHead Cr45). + denominator = min(max(val, 5), 8) + cr_id = denominator - 4 + self.write_u8( + _RF95_REG_1D_MODEM_CONFIG1, + (self.read_u8(_RF95_REG_1D_MODEM_CONFIG1) & 0xF1) | (cr_id << 1), + ) + + @property + def spreading_factor(self) -> Literal[6, 7, 8, 9, 10, 11, 12]: + """The spreading factor used by the radio (try setting to a higher + value to increase the receiver's ability to distinguish signal from + noise or to a lower value to increase the data transmission rate). + Valid values are limited to 6, 7, 8, 9, 10, 11, or 12.""" + sf_id = (self.read_u8(_RF95_REG_1E_MODEM_CONFIG2) & 0xF0) >> 4 + return sf_id + + @spreading_factor.setter + def spreading_factor(self, val: Literal[6, 7, 8, 9, 10, 11, 12]) -> None: + # Set spreading factor (set to 7 to match RadioHead Sf128). + val = min(max(val, 6), 12) + + if val == 6: + self.detection_optimize = 0x5 + else: + self.detection_optimize = 0x3 + + self.write_u8(_RF95_DETECTION_THRESHOLD, 0x0C if val == 6 else 0x0A) + self.write_u8( + _RF95_REG_1E_MODEM_CONFIG2, + ((self.read_u8(_RF95_REG_1E_MODEM_CONFIG2) & 0x0F) | ((val << 4) & 0xF0)), + ) + + @property + def enable_crc(self) -> bool: + """Set to True to enable hardware CRC checking of incoming packets. + Incoming packets that fail the CRC check are not processed. Set to + False to disable CRC checking and process all incoming packets.""" + return (self.read_u8(_RF95_REG_1E_MODEM_CONFIG2) & 0x04) == 0x04 + + @enable_crc.setter + def enable_crc(self, val: bool) -> None: + # Optionally enable CRC checking on incoming packets. + if val: + self.write_u8( + _RF95_REG_1E_MODEM_CONFIG2, + self.read_u8(_RF95_REG_1E_MODEM_CONFIG2) | 0x04, + ) + else: + self.write_u8( + _RF95_REG_1E_MODEM_CONFIG2, + self.read_u8(_RF95_REG_1E_MODEM_CONFIG2) & 0xFB, + ) + + @property + def crc_error(self) -> bool: + """crc status""" + return (self.read_u8(_RF95_REG_12_IRQ_FLAGS) & 0x20) >> 5 + + def packet_sent(self) -> bool: + """Transmit status""" + return (self.read_u8(_RF95_REG_12_IRQ_FLAGS) & 0x8) >> 3 + + def payload_ready(self) -> bool: + """Receive status""" + return (self.read_u8(_RF95_REG_12_IRQ_FLAGS) & 0x40) >> 6 + + def clear_interrupt(self) -> None: + """Clear Interrupt flags""" + self.write_u8(_RF95_REG_12_IRQ_FLAGS, 0xFF) + + def fill_fifo(self, payload: ReadableBuffer) -> None: + """len_data is not used but is here for compatibility with rfm69 + Fill the FIFO with a packet to send""" + self.write_u8(_RF95_REG_0D_FIFO_ADDR_PTR, 0x00) # FIFO starts at 0. + # Write payload. + self.write_from(_RF95_REG_00_FIFO, payload) + # Write payload and header length. + self.write_u8(_RF95_REG_22_PAYLOAD_LENGTH, len(payload)) + + def read_fifo(self) -> bytearray: + """Read the data from the FIFO.""" + # Read the length of the FIFO. + fifo_length = self.read_u8(_RF95_REG_13_RX_NB_BYTES) + if fifo_length > 0: # read and clear the FIFO if anything in it + packet = bytearray(fifo_length) + current_addr = self.read_u8(_RF95_REG_10_FIFO_RX_CURRENT_ADDR) + self.write_u8(_RF95_REG_0D_FIFO_ADDR_PTR, current_addr) + # read the packet + self.read_into(_RF95_REG_00_FIFO, packet) + + # clear interrupt + self.write_u8(_RF95_REG_12_IRQ_FLAGS, 0xFF) + return packet diff --git a/FC_Board/lib/adafruit_rfm/rfm9xfsk.py b/FC_Board/lib/adafruit_rfm/rfm9xfsk.py new file mode 100644 index 0000000..f46df76 --- /dev/null +++ b/FC_Board/lib/adafruit_rfm/rfm9xfsk.py @@ -0,0 +1,578 @@ +# SPDX-FileCopyrightText: 2024 Jerry Needell for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +""" +`adafruit_rfm.rfm9xfsk` +==================================================== + +CircuitPython module for the RFM95/6/7/8 FSK 433/915mhz radio modules. + +* Author(s): Jerry Needell +""" + +import time + +from micropython import const + +from adafruit_rfm.rfm_common import RFMSPI + +try: + from typing import Optional + + import busio + import digitalio + from circuitpython_typing import ReadableBuffer + + try: + from typing import Literal + except ImportError: + from typing_extensions import Literal + +except ImportError: + pass + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM.git" + +# pylint: disable=duplicate-code + +# Internal constants: +# Register names (FSK Mode even though we use LoRa instead, from table 85) +_RF95_REG_00_FIFO = const(0x00) +_RF95_REG_01_OP_MODE = const(0x01) +_RF95_REG_02_BITRATE_MSB = const(0x02) +_RF95_REG_03_BITRATE_LSB = const(0x03) +_RF95_REG_04_FDEV_MSB = const(0x4) +_RF95_REG_05_FDEV_LSB = const(0x5) +_RF95_REG_06_FRF_MSB = const(0x06) +_RF95_REG_07_FRF_MID = const(0x07) +_RF95_REG_08_FRF_LSB = const(0x08) +_RF95_REG_09_PA_CONFIG = const(0x09) +_RF95_REG_0A_PA_RAMP = const(0x0A) +_RF95_REG_0B_OCP = const(0x0B) +_RF95_REG_0C_LNA = const(0x0C) +_RF95_REG_0D_RX_CFG = const(0x0D) +_RF95_REG_0E_RSSI_CFG = const(0x0E) +_RF95_REG_0F_RSSI_COLLISION = const(0x0F) +_RF95_REG_10_RSSI_THRESH = const(0x10) +_RF95_REG_11_RSSI_VALUE = const(0x11) +_RF95_REG_12_RX_BW = const(0x12) +_RF95_REG_13_AFC_BW = const(0x13) +_RF95_REG_14_OOK_PEAK = const(0x14) +_RF95_REG_15_OOK_FIX = const(0x15) +_RF95_REG_16_OOK_AVG = const(0x16) +_RF95_REG_1A_AFC_FEI_CTL = const(0x1A) +_RF95_REG_1B_AFC_MSB = const(0x1B) +_RF95_REG_1C_AFC_LSB = const(0x1C) +_RF95_REG_1D_FEI_MSB = const(0x1D) +_RF95_REG_1E_FEI_LSB = const(0x1E) +_RF95_REG_1F_PREAMBLE_DETECT = const(0x1F) +_RF95_REG_20_RX_TIMEOUT_1 = const(0x20) +_RF95_REG_21_RX_TIMEOUT_2 = const(0x21) +_RF95_REG_22_RX_TIMEOUT_3 = const(0x22) +_RF95_REG_23_RX_DELAY = const(0x23) +_RF95_REG_24_OSC = const(0x24) +_RF95_REG_25_PREAMBLE_MSB = const(0x25) +_RF95_REG_26_PREAMBLE_LSB = const(0x26) +_RF95_REG_27_SYNC_CONFIG = const(0x27) +_RF95_REG_28_SYNC_VALUE_1 = const(0x28) +_RF95_REG_29_SYNC_VALUE_2 = const(0x29) +_RF95_REG_2A_SYNC_VALUE_3 = const(0x2A) +_RF95_REG_2B_SYNC_VALUE_4 = const(0x2B) +_RF95_REG_2C_SYNC_VALUE_5 = const(0x2C) +_RF95_REG_2D_SYNC_VALUE_6 = const(0x2D) +_RF95_REG_2E_SYNC_VALUE_7 = const(0x2E) +_RF95_REG_2F_SYNC_VALUE_8 = const(0x2F) +_RF95_REG_30_PACKET_CONFIG_1 = const(0x30) +_RF95_REG_31_PACKET_CONFIG_2 = const(0x31) +_RF95_REG_32_PAYLOAD_LENGTH = const(0x32) +_RF95_REG_33_NODE_ADDR = const(0x33) +_RF95_REG_34_BROADCAST_ADDR = const(0x34) +_RF95_REG_35_FIFO_THRESH = const(0x35) +_RF95_REG_36_SEQ_CFG_1 = const(0x36) +_RF95_REG_37_SEQ_CFG_2 = const(0x37) +_RF95_REG_38_TIMER_RES = const(0x38) +_RF95_REG_39_TIMER1_COEF = const(0x39) +_RF95_REG_3A_TIMER2_COEF = const(0x3A) +_RF95_REG_3B_IMAGE_CAL = const(0x3B) +_RF95_REG_3C_TEMP = const(0x3C) +_RF95_REG_3D_LOW_BAT = const(0x3D) +_RF95_REG_3E_IRQ_FLAGS_1 = const(0x3E) +_RF95_REG_3F_IRQ_FLAGS_2 = const(0x3F) + +_RF95_REG_40_DIO_MAPPING1 = const(0x40) +_RF95_REG_41_DIO_MAPPING2 = const(0x41) +_RF95_REG_42_VERSION = const(0x42) + +_RF95_REG_44_PIII_IOP = const(0x44) + +_RF95_REG_4B_TCXO = const(0x4B) +_RF95_REG_4D_PA_DAC = const(0x4D) +_RF95_REG_5B_FORMER_TEMP = const(0x5B) +_RF95_REG_5B_BIT_RATE_FRAC = const(0x5D) +_RF95_REG_61_AGC_REF = const(0x61) +_RF95_REG_62_AGC_THRESH1 = const(0x62) +_RF95_REG_63_AGC_THRESH2 = const(0x63) +_RF95_REG_64_AGC_THRESH3 = const(0x64) + + +_RF95_PA_DAC_DISABLE = const(0x04) +_RF95_PA_DAC_ENABLE = const(0x07) + +# The crystal oscillator frequency of the module +_RF95_FXOSC = 32000000.0 + +# The Frequency Synthesizer step = RH_RF95_FXOSC / 2^^19 +_RF95_FSTEP = _RF95_FXOSC / 524288 + +# RadioHead specific compatibility constants. +_RH_BROADCAST_ADDRESS = const(0xFF) + +# The acknowledgement bit in the FLAGS +# The top 4 bits of the flags are reserved for RadioHead. The lower 4 bits are reserved +# for application layer use. +_RH_FLAGS_ACK = const(0x80) +_RH_FLAGS_RETRY = const(0x40) + +# User facing constants: +SLEEP_MODE = 0b000 +STANDBY_MODE = 0b001 +FS_TX_MODE = 0b010 +TX_MODE = 0b011 +FS_RX_MODE = 0b100 +RX_MODE = 0b101 + + +# pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-public-methods +class RFM9xFSK(RFMSPI): + """Interface to a RFM95/6/7/8 FSK radio module. Allows sending and + receiving bytes of data in FSK mode at a support board frequency + (433/915mhz). + + :param busio.SPI spi: The SPI bus connected to the chip. Ensure SCK, MOSI, and MISO are + connected. + :param ~digitalio.DigitalInOut cs: A DigitalInOut object connected to the chip's CS/chip select + line. + :param ~digitalio.DigitalInOut reset: A DigitalInOut object connected to the chip's RST/reset + line. + :param int frequency: The center frequency to configure for radio transmission and reception. + Must be a frequency supported by your hardware (i.e. either 433 or 915mhz). + :param bytes sync_word: A byte string up to 8 bytes long which represents the syncronization + word used by received and transmitted packets. Read the datasheet for a full understanding + of this value! However by default the library will set a value that matches the RadioHead + Arduino library. + :param int preamble_length: The number of bytes to pre-pend to a data packet as a preamble. + This is by default 4 to match the RadioHead library. + :param bool high_power: Indicate if the chip is a high power variant that supports boosted + transmission power. The default is True as it supports the common RFM69HCW modules sold by + Adafruit. + + Also note this library tries to be compatible with raw RadioHead Arduino + library communication. This means the library sets up the radio modulation + to match RadioHead's defaults. + Advanced RadioHead features like address/node specific packets + or "reliable datagram" delivery are supported however due to the + limitations noted, "reliable datagram" is still subject to missed packets. + """ + + operation_mode = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, bits=3) + low_frequency_mode = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, offset=3, bits=1) + modulation_type = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, offset=5, bits=2) + modulation_shaping = RFMSPI.RegisterBits(_RF95_REG_0A_PA_RAMP, offset=5, bits=2) + # Long range/LoRa mode can only be set in sleep mode! + long_range_mode = RFMSPI.RegisterBits(_RF95_REG_01_OP_MODE, offset=7, bits=1) + sync_on = RFMSPI.RegisterBits(_RF95_REG_27_SYNC_CONFIG, offset=4, bits=1) + sync_size = RFMSPI.RegisterBits(_RF95_REG_27_SYNC_CONFIG, offset=0, bits=3) + output_power = RFMSPI.RegisterBits(_RF95_REG_09_PA_CONFIG, bits=4) + max_power = RFMSPI.RegisterBits(_RF95_REG_09_PA_CONFIG, offset=4, bits=3) + pa_select = RFMSPI.RegisterBits(_RF95_REG_09_PA_CONFIG, offset=7, bits=1) + pa_dac = RFMSPI.RegisterBits(_RF95_REG_4D_PA_DAC, bits=3) + dio0_mapping = RFMSPI.RegisterBits(_RF95_REG_40_DIO_MAPPING1, offset=6, bits=2) + lna_boost_hf = RFMSPI.RegisterBits(_RF95_REG_0C_LNA, offset=0, bits=2) + rx_bw_mantissa = RFMSPI.RegisterBits(_RF95_REG_12_RX_BW, offset=3, bits=2) + rx_bw_exponent = RFMSPI.RegisterBits(_RF95_REG_12_RX_BW, offset=0, bits=3) + afc_bw_mantissa = RFMSPI.RegisterBits(_RF95_REG_13_AFC_BW, offset=3, bits=2) + afc_bw_exponent = RFMSPI.RegisterBits(_RF95_REG_13_AFC_BW, offset=0, bits=3) + packet_format = RFMSPI.RegisterBits(_RF95_REG_30_PACKET_CONFIG_1, offset=7, bits=1) + dc_free = RFMSPI.RegisterBits(_RF95_REG_30_PACKET_CONFIG_1, offset=5, bits=2) + crc_on = RFMSPI.RegisterBits(_RF95_REG_30_PACKET_CONFIG_1, offset=4, bits=1) + crc_auto_clear_off = RFMSPI.RegisterBits( + _RF95_REG_30_PACKET_CONFIG_1, offset=3, bits=1 + ) + address_filter = RFMSPI.RegisterBits(_RF95_REG_30_PACKET_CONFIG_1, offset=1, bits=2) + crc_type = RFMSPI.RegisterBits(_RF95_REG_30_PACKET_CONFIG_1, offset=0, bits=1) + mode_ready = RFMSPI.RegisterBits(_RF95_REG_3E_IRQ_FLAGS_1, offset=7) + ook_bit_sync_on = RFMSPI.RegisterBits(_RF95_REG_14_OOK_PEAK, offset=5, bits=1) + ook_thresh_type = RFMSPI.RegisterBits(_RF95_REG_14_OOK_PEAK, offset=4, bits=2) + ook_thresh_step = RFMSPI.RegisterBits(_RF95_REG_14_OOK_PEAK, offset=0, bits=3) + ook_peak_thresh_dec = RFMSPI.RegisterBits(_RF95_REG_16_OOK_AVG, offset=5, bits=3) + ook_average_offset = RFMSPI.RegisterBits(_RF95_REG_16_OOK_AVG, offset=2, bits=2) + ook_average_thresh_filt = RFMSPI.RegisterBits( + _RF95_REG_16_OOK_AVG, offset=0, bits=2 + ) + + def __init__( # noqa: PLR0913 + self, + spi: busio.SPI, + cs: digitalio.DigitalInOut, # pylint: disable=invalid-name + rst: digitalio.DigitalInOut, + frequency: int, + *, + sync_word: bytes = b"\x2d\xd4", + preamble_length: int = 4, + high_power: bool = True, + baudrate: int = 5000000, + crc: bool = True, + ) -> None: + super().__init__(spi, cs, baudrate=baudrate) + self.module = "RFM9X" + self.max_packet_length = 252 + self.high_power = high_power + # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. + # Set Default Baudrate to 5MHz to avoid problems + # self._device = spidev.SPIDevice(spi, cs, baudrate=baudrate, polarity=0, phase=0) + # Setup reset as a digital output - initially High + # This line is pulled low as an output quickly to trigger a reset. + self._rst = rst + # initialize Reset High + self._rst.switch_to_output(value=True) + self.reset() + # No device type check! Catch an error from the very first request and + # throw a nicer message to indicate possible wiring problems. + version = self.read_u8(address=_RF95_REG_42_VERSION) + if version != 18: + raise RuntimeError( + "Failed to find rfm9x with expected version -- check wiring. Version found:", + hex(version), + ) + + # Set sleep mode, wait 10s and confirm in sleep mode (basic device check). + # Also set long range mode (LoRa mode) as it can only be done in sleep. + self.sleep() + time.sleep(0.01) + self.long_range_mode = False + if self.operation_mode != SLEEP_MODE or self.long_range_mode: + raise RuntimeError("Failed to configure radio for FSK mode, check wiring!") + # clear default setting for access to LF registers if frequency > 525MHz + if frequency > 525: + self.low_frequency_mode = 0 + # Set mode idle + self.idle() + # Setup the chip in a similar way to the RadioHead RFM69 library. + # Set FIFO TX condition to not empty and the default FIFO threshold to 15. + self.write_u8(_RF95_REG_35_FIFO_THRESH, 0b10001111) + # Set the syncronization word. + self.sync_word = sync_word + self.preamble_length = preamble_length # Set the preamble length. + self.frequency_mhz = frequency # Set frequency. + # Configure modulation for RadioHead library GFSK_Rb250Fd250 mode + # by default. Users with advanced knowledge can manually reconfigure + # for any other mode (consulting the datasheet is absolutely + # necessary!). + self.modulation_shaping = 0b01 # Gaussian filter, BT=1.0 + self.bitrate = 250000 # 250kbs + self.frequency_deviation = 250000 # 250khz + self.rx_bw_mantissa = 0b00 + self.rx_bw_exponent = 0b000 + self.afc_bw_mantissa = 0b00 + self.afc_bw_exponent = 0b000 + self.packet_format = 1 # Variable length. + self.dc_free = 0b10 # Whitening + # Set transmit power to 13 dBm, a safe value any module supports. + self._tx_power = 13 + self.tx_power = self._tx_power + + # Default to enable CRC checking on incoming packets. + self.enable_crc = crc + """CRC Enable state""" + self.snr = None + + def reset(self) -> None: + """Perform a reset of the chip.""" + # See section 7.2.2 of the datasheet for reset description. + self._rst.value = False # Set Reset Low + time.sleep(0.0001) # 100 us + self._rst.value = True # set Reset High + time.sleep(0.005) # 5 ms + + def idle(self) -> None: + """Enter idle standby mode.""" + self.operation_mode = STANDBY_MODE + + def sleep(self) -> None: + """Enter sleep mode.""" + self.operation_mode = SLEEP_MODE + + def listen(self) -> None: + """Listen for packets to be received by the chip. Use :py:func:`receive` + to listen, wait and retrieve packets as they're available. + """ + self.operation_mode = RX_MODE + self.dio0_mapping = 0b00 # Interrupt on rx done. + + def transmit(self) -> None: + """Transmit a packet which is queued in the FIFO. This is a low level + function for entering transmit mode and more. For generating and + transmitting a packet of data use :py:func:`send` instead. + """ + self.operation_mode = TX_MODE + self.dio0_mapping = 0b00 # Interrupt on tx done. + + @property + def sync_word(self) -> bytearray: + """The synchronization word value. This is a byte string up to 8 bytes long (64 bits) + which indicates the synchronization word for transmitted and received packets. Any + received packet which does not include this sync word will be ignored. The default value + is 0x2D, 0xD4 which matches the RadioHead RFM69 library. Setting a value of None will + disable synchronization word matching entirely. + """ + # Handle when sync word is disabled.. + if not self.sync_on: + return None + # Sync word is not disabled so read the current value. + sync_word_length = self.sync_size + 1 # Sync word size is offset by 1 + # according to datasheet. + sync_word = bytearray(sync_word_length) + self.read_into(_RF95_REG_28_SYNC_VALUE_1, sync_word) + return sync_word + + @sync_word.setter + def sync_word(self, val: Optional[bytearray]) -> None: + # Handle disabling sync word when None value is set. + if val is None: + self.sync_on = 0 + else: + # Check sync word is at most 8 bytes. + assert 1 <= len(val) <= 8 + # Update the value, size and turn on the sync word. + self.write_from(_RF95_REG_28_SYNC_VALUE_1, val) + self.sync_size = len(val) - 1 # Again sync word size is offset by + # 1 according to datasheet. + self.sync_on = 1 + + @property + def bitrate(self) -> float: + """The modulation bitrate in bits/second (or chip rate if Manchester encoding is enabled). + Can be a value from ~489 to 32mbit/s, but see the datasheet for the exact supported + values. + """ + msb = self.read_u8(_RF95_REG_02_BITRATE_MSB) + lsb = self.read_u8(_RF95_REG_03_BITRATE_LSB) + return _RF95_FXOSC / ((msb << 8) | lsb) + + @bitrate.setter + def bitrate(self, val: float) -> None: + assert (_RF95_FXOSC / 65535) <= val <= 32000000.0 + # Round up to the next closest bit-rate value with addition of 0.5. + bitrate = int((_RF95_FXOSC / val) + 0.5) & 0xFFFF + self.write_u8(_RF95_REG_02_BITRATE_MSB, bitrate >> 8) + self.write_u8(_RF95_REG_03_BITRATE_LSB, bitrate & 0xFF) + + @property + def frequency_deviation(self) -> float: + """The frequency deviation in Hertz.""" + msb = self.read_u8(_RF95_REG_04_FDEV_MSB) + lsb = self.read_u8(_RF95_REG_05_FDEV_LSB) + return _RF95_FSTEP * ((msb << 8) | lsb) + + @frequency_deviation.setter + def frequency_deviation(self, val: float) -> None: + assert 0 <= val <= (_RF95_FSTEP * 16383) # fdev is a 14-bit unsigned value + # Round up to the next closest integer value with addition of 0.5. + fdev = int((val / _RF95_FSTEP) + 0.5) & 0x3FFF + self.write_u8(_RF95_REG_04_FDEV_MSB, fdev >> 8) + self.write_u8(_RF95_REG_05_FDEV_LSB, fdev & 0xFF) + + @property + def temperature(self) -> float: + """The internal temperature of the chip.. See Sec 5.5.7 of the DataSheet + calibrated or very accurate. + """ + temp = self.read_u8(_RF95_REG_3C_TEMP) + return temp + + @property + def preamble_length(self) -> int: + """The length of the preamble for sent and received packets, an unsigned + 16-bit value. Received packets must match this length or they are + ignored! Set to 4 to match the RF69. + """ + msb = self.read_u8(_RF95_REG_25_PREAMBLE_MSB) + lsb = self.read_u8(_RF95_REG_26_PREAMBLE_LSB) + return ((msb << 8) | lsb) & 0xFFFF + + @preamble_length.setter + def preamble_length(self, val: int) -> None: + assert 0 <= val <= 65535 + self.write_u8(_RF95_REG_25_PREAMBLE_MSB, (val >> 8) & 0xFF) + self.write_u8(_RF95_REG_26_PREAMBLE_LSB, val & 0xFF) + + @property + def frequency_mhz(self) -> Literal[433.0, 915.0]: + """The frequency of the radio in Megahertz. Only the allowed values for + your radio must be specified (i.e. 433 vs. 915 mhz)! + """ + msb = self.read_u8(_RF95_REG_06_FRF_MSB) + mid = self.read_u8(_RF95_REG_07_FRF_MID) + lsb = self.read_u8(_RF95_REG_08_FRF_LSB) + frf = ((msb << 16) | (mid << 8) | lsb) & 0xFFFFFF + frequency = (frf * _RF95_FSTEP) / 1000000.0 + return frequency + + @frequency_mhz.setter + def frequency_mhz(self, val: Literal[433.0, 915.0]) -> None: + if val < 240 or val > 960: + raise RuntimeError("frequency_mhz must be between 240 and 960") + # Calculate FRF register 24-bit value. + frf = int((val * 1000000.0) / _RF95_FSTEP) & 0xFFFFFF + # Extract byte values and update registers. + msb = frf >> 16 + mid = (frf >> 8) & 0xFF + lsb = frf & 0xFF + self.write_u8(_RF95_REG_06_FRF_MSB, msb) + self.write_u8(_RF95_REG_07_FRF_MID, mid) + self.write_u8(_RF95_REG_08_FRF_LSB, lsb) + + @property + def tx_power(self) -> int: + """The transmit power in dBm. Can be set to a value from 5 to 23 for + high power devices (RFM95/96/97/98, high_power=True) or -1 to 14 for low + power devices. Only integer power levels are actually set (i.e. 12.5 + will result in a value of 12 dBm). + The actual maximum setting for high_power=True is 20dBm but for values > 20 + the PA_BOOST will be enabled resulting in an additional gain of 3dBm. + The actual setting is reduced by 3dBm. + The reported value will reflect the reduced setting. + """ + if self.high_power: + return self.output_power + 5 + return self.output_power - 1 + + @tx_power.setter + def tx_power(self, val: int) -> None: + val = int(val) + if self.high_power: + if val < 5 or val > 23: + raise RuntimeError("tx_power must be between 5 and 23") + # Enable power amp DAC if power is above 20 dB. + # Lower setting by 3db when PA_BOOST enabled - see Data Sheet Section 6.4 + if val > 20: + self.pa_dac = _RF95_PA_DAC_ENABLE + val -= 3 + else: + self.pa_dac = _RF95_PA_DAC_DISABLE + self.pa_select = True + self.output_power = (val - 5) & 0x0F + else: + assert -1 <= val <= 14 + self.pa_select = False + self.max_power = 0b111 # Allow max power output. + self.output_power = (val + 1) & 0x0F + + @property + def rssi(self) -> float: + """The received strength indicator (in dBm) of the last received message.""" + # Read RSSI register and convert to value using formula in datasheet. + # Remember in LoRa mode the payload register changes function to RSSI! + raw_rssi = self.read_u8(_RF95_REG_11_RSSI_VALUE) + return -raw_rssi / 2.0 + + @property + def enable_crc(self) -> bool: + """Set to True to enable hardware CRC checking of incoming packets. + Incoming packets that fail the CRC check are not processed. Set to + False to disable CRC checking and process all incoming packets.""" + return self.crc_on + + @enable_crc.setter + def enable_crc(self, val: bool) -> None: + # Optionally enable CRC checking on incoming packets. + if val: + self.crc_on = 1 + self.crc_type = 0 # use CCITT for RF69 compatibility + else: + self.crc_on = 0 + + @property + def crc_error(self) -> bool: + """crc status""" + return (self.read_u8(_RF95_REG_3F_IRQ_FLAGS_2) & 0x2) >> 1 + + @property + def enable_address_filter(self) -> bool: + """Set to True to enable address filtering. + Incoming packets that do no match the node address or broadcast address + will be ignored.""" + return self.address_filter + + @enable_address_filter.setter + def enable_address_filter(self, val: bool) -> None: + # Enable address filtering on incoming packets. + if val: + self.address_filter = 2 # accept node address or broadcast address + else: + self.address_filter = 0 + + @property + def fsk_node_address(self) -> int: + """Node Address for Address Filtering""" + return self.read_u8(_RF95_REG_33_NODE_ADDR) + + @fsk_node_address.setter + def fsk_node_address(self, val: int) -> None: + assert 0 <= val <= 255 + self.write_u8(_RF95_REG_33_NODE_ADDR, val) + + @property + def fsk_broadcast_address(self) -> int: + """Node Address for Address Filtering""" + return self.read_u8(_RF95_REG_34_BROADCAST_ADDR) + + @fsk_broadcast_address.setter + def fsk_broadcast_address(self, val: int) -> None: + assert 0 <= val <= 255 + self.write_u8(_RF95_REG_34_BROADCAST_ADDR, val) + + @property + def ook_fixed_threshold(self) -> int: + """Fixed threshold for data slicer in OOK mode""" + return self.read_u8(_RF95_REG_15_OOK_FIX) + + @ook_fixed_threshold.setter + def ook_fixed_threshold(self, val: int) -> None: + assert 0 <= val <= 255 + self.write_u8(_RF95_REG_15_OOK_FIX, val) + + def packet_sent(self) -> bool: + """Transmit status""" + return (self.read_u8(_RF95_REG_3F_IRQ_FLAGS_2) & 0x8) >> 3 + + def payload_ready(self) -> bool: + """Receive status""" + return (self.read_u8(_RF95_REG_3F_IRQ_FLAGS_2) & 0x4) >> 2 + + def clear_interrupt(self) -> None: + """Clear interrupt Flags""" + self.write_u8(_RF95_REG_3E_IRQ_FLAGS_1, 0xFF) + self.write_u8(_RF95_REG_3F_IRQ_FLAGS_2, 0xFF) + + def fill_fifo(self, payload: ReadableBuffer) -> None: + """Write the payload to the FIFO.""" + complete_payload = bytearray(1) # prepend packet length to payload + complete_payload[0] = len(payload) + # put the payload lengthe in the beginning of the packet for RFM69 + complete_payload = complete_payload + payload + # Write payload to transmit fifo + self.write_from(_RF95_REG_00_FIFO, complete_payload) + + def read_fifo(self) -> bytearray: + """Read the data from the FIFO.""" + # Read the length of the FIFO. + fifo_length = self.read_u8(_RF95_REG_00_FIFO) + if fifo_length > 0: # read and clear the FIFO if anything in it + packet = bytearray(fifo_length) + # read the packet + self.read_into(_RF95_REG_00_FIFO, packet, fifo_length) + return packet diff --git a/FC_Board/lib/adafruit_rfm/rfm_common.py b/FC_Board/lib/adafruit_rfm/rfm_common.py new file mode 100644 index 0000000..34d607b --- /dev/null +++ b/FC_Board/lib/adafruit_rfm/rfm_common.py @@ -0,0 +1,586 @@ +# SPDX-FileCopyrightText: 2024 Jerry Needell for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +""" + +* Author(s): Jerry Needell +""" + +import asyncio +import random +import time + +from adafruit_bus_device import spi_device + +try: + from typing import Callable, Optional, Type + + import busio + import digitalio + from circuitpython_typing import ReadableBuffer, WriteableBuffer + +except ImportError: + pass + +from micropython import const + +HAS_SUPERVISOR = False + +try: + import supervisor + + if hasattr(supervisor, "ticks_ms"): + HAS_SUPERVISOR = True +except ImportError: + pass + + +__version__ = "0.0.0+auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM.git" + + +# RadioHead specific compatibility constants. +_RH_BROADCAST_ADDRESS = const(0xFF) + +# The acknowledgement bit in the FLAGS +# The top 4 bits of the flags are reserved for RadioHead. The lower 4 bits are reserved +# for application layer use. +_RH_FLAGS_ACK = const(0x80) +_RH_FLAGS_RETRY = const(0x40) + +_RH_RF95_REG_5B_FORMER_TEMP = const(0x5B) + +# supervisor.ticks_ms() contants +_TICKS_PERIOD = const(1 << 29) +_TICKS_MAX = const(_TICKS_PERIOD - 1) +_TICKS_HALFPERIOD = const(_TICKS_PERIOD // 2) + + +def ticks_diff(ticks1: int, ticks2: int) -> int: + """Compute the signed difference between two ticks values + assuming that they are within 2**28 ticks + """ + diff = (ticks1 - ticks2) & _TICKS_MAX + diff = ((diff + _TICKS_HALFPERIOD) & _TICKS_MAX) - _TICKS_HALFPERIOD + return diff + + +def asyncio_to_blocking(function): + """run async function as normal blocking function""" + + def blocking_function(self, *args, **kwargs): + return asyncio.run(function(self, *args, **kwargs)) + + return blocking_function + + +async def asyncio_check_timeout( + flag: Callable, limit: float, timeout_poll: float +) -> bool: + """test for timeout waiting for specified flag""" + timed_out = False + if HAS_SUPERVISOR: + start = supervisor.ticks_ms() + while not timed_out and not flag(): + if ticks_diff(supervisor.ticks_ms(), start) >= limit * 1000: + timed_out = True + await asyncio.sleep(timeout_poll) + else: + start = time.monotonic() + while not timed_out and not flag(): + if time.monotonic() - start >= limit: + timed_out = True + await asyncio.sleep(timeout_poll) + + return timed_out + + +# pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-nested-blocks +class RFMSPI: + """Base class for SPI type devices""" + + class RegisterBits: + """Simplify register access""" + + # Class to simplify access to the many configuration bits avaialable + # on the chip's registers. This is a subclass here instead of using + # a higher level module to increase the efficiency of memory usage + # (all of the instances of this bit class will share the same buffer + # used by the parent RFM69 class instance vs. each having their own + # buffer and taking too much memory). + + # Quirk of pylint that it requires public methods for a class. This + # is a decorator class in Python and by design it has no public methods. + # Instead it uses dunder accessors like get and set below. For some + # reason pylint can't figure this out so disable the check. + # pylint: disable=too-few-public-methods + + # Again pylint fails to see the true intent of this code and warns + # against private access by calling the write and read functions below. + # This is by design as this is an internally used class. Disable the + # check from pylint. + # pylint: disable=protected-access + + def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None: + assert 0 <= offset <= 7 + assert 1 <= bits <= 8 + assert (offset + bits) <= 8 + self._address = address + self._mask = 0 + for _ in range(bits): + self._mask <<= 1 + self._mask |= 1 + self._mask <<= offset + self._offset = offset + + def __get__(self, obj: Optional["RFM"], objtype: Type["RFM"]) -> int: + reg_value = obj.read_u8(self._address) + return (reg_value & self._mask) >> self._offset + + def __set__(self, obj: Optional["RFM"], val: int) -> None: + reg_value = obj.read_u8(self._address) + reg_value &= ~self._mask + reg_value |= (val & 0xFF) << self._offset + obj.write_u8(self._address, reg_value) + + # pylint: disable-msg=too-many-arguments + def __init__( # noqa: PLR0913 + self, + spi: busio.SPI, + cs_pin: digitalio.DigitalInOut, + baudrate: int = 5000000, + polarity: int = 0, + phase: int = 0, + ): + self.spi_device = spi_device.SPIDevice( + spi, cs_pin, baudrate=baudrate, polarity=polarity, phase=phase + ) + # initialize last RSSI reading + self.last_rssi = 0.0 + """The RSSI of the last received packet. Stored when the packet was received. + The instantaneous RSSI value may not be accurate once the + operating mode has been changed. + """ + self.last_snr = 0.0 + """The SNR of the last received packet. Stored when the packet was received. + The instantaneous SNR value may not be accurate once the + operating mode has been changed. + """ + # initialize timeouts and delays delays + self.ack_wait = 0.1 + """The delay time before attempting a retry after not receiving an ACK""" + self.receive_timeout = 0.5 + """The amount of time to poll for a received packet. + If no packet is received, the returned packet will be None + """ + self.xmit_timeout = 2.0 + """The amount of time to wait for the HW to transmit the packet. + This is mainly used to prevent a hang due to a HW issue + """ + self.ack_retries = 5 + """The number of ACK retries before reporting a failure.""" + self.ack_delay = None + """The delay time before attemting to send an ACK. + If ACKs are being missed try setting this to .1 or .2. + """ + # initialize sequence number counter for reliabe datagram mode + self.sequence_number = 0 + # create seen Ids list + self.seen_ids = bytearray(256) + # initialize packet header + # node address - default is broadcast + self.node = _RH_BROADCAST_ADDRESS + """The default address of this Node. (0-255). + If not 255 (0xff) then only packets address to this node will be accepted. + First byte of the RadioHead header. + """ + # destination address - default is broadcast + self.destination = _RH_BROADCAST_ADDRESS + """The default destination address for packet transmissions. (0-255). + If 255 (0xff) then any receiving node should accept the packet. + Second byte of the RadioHead header. + """ + # ID - contains seq count for reliable datagram mode + self.identifier = 0 + """Automatically set to the sequence number when send_with_ack() used. + Third byte of the RadioHead header. + """ + # flags - identifies ack/reetry packet for reliable datagram mode + self.flags = 0 + """Upper 4 bits reserved for use by Reliable Datagram Mode. + Lower 4 bits may be used to pass information. + Fourth byte of the RadioHead header. + """ + self.radiohead = True + """Enable RadioHead compatibility""" + + self.crc_error_count = 0 + self.timeout_poll = 0.001 + + # pylint: enable-msg=too-many-arguments + + # Global buffer for SPI commands + _BUFFER = bytearray(4) + + # pylint: disable=no-member + # Reconsider pylint: disable when this can be tested + def read_into( + self, address: int, buf: WriteableBuffer, length: Optional[int] = None + ) -> None: + """Read a number of bytes from the specified address into the provided + buffer. If length is not specified (the default) the entire buffer + will be filled.""" + if length is None: + length = len(buf) + with self.spi_device as device: + self._BUFFER[0] = address & 0x7F # Strip out top bit to set 0 + # value (read). + device.write(self._BUFFER, end=1) + device.readinto(buf, end=length) + + def read_u8(self, address: int) -> int: + """Read a single byte from the provided address and return it.""" + self.read_into(address, self._BUFFER, length=1) + return self._BUFFER[0] + + def write_from( + self, address: int, buf: ReadableBuffer, length: Optional[int] = None + ) -> None: + """Write a number of bytes to the provided address and taken from the + provided buffer. If no length is specified (the default) the entire + buffer is written.""" + if length is None: + length = len(buf) + with self.spi_device as device: + self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to + # indicate a write. + device.write(self._BUFFER, end=1) + device.write(buf, end=length) + + def write_u8(self, address: int, val: int) -> None: + """Write a byte register to the chip. Specify the 7-bit address and the + 8-bit value to write to that address.""" + with self.spi_device as device: + self._BUFFER[0] = ( + address | 0x80 + ) & 0xFF # Set top bit to 1 to indicate a write. + self._BUFFER[1] = val & 0xFF + device.write(self._BUFFER, end=2) + + # pylint: disable=too-many-branches + + async def asyncio_send( # noqa: PLR0912 PLR0913 + self, + data: ReadableBuffer, + *, + keep_listening: bool = False, + destination: Optional[int] = None, + node: Optional[int] = None, + identifier: Optional[int] = None, + flags: Optional[int] = None, + ) -> bool: + """Send a string of data using the transmitter. + You can only send 252 bytes at a time + (limited by chip's FIFO size and appended headers). + if the propert radiohead is True then this appends a 4 byte header + to be compatible with the RadioHead library. + The header defaults to using the initialized attributes: + (destination,node,identifier,flags) + It may be temporarily overidden via the kwargs - destination,node,identifier,flags. + Values passed via kwargs do not alter the attribute settings. + The keep_listening argument should be set to True if you want to start listening + automatically after the packet is sent. The default setting is False. + + Returns: True if success or False if the send timed out. + """ + # Disable pylint warning to not use length as a check for zero. + # This is a puzzling warning as the below code is clearly the most + # efficient and proper way to ensure a precondition that the provided + # buffer be within an expected range of bounds. Disable this check. + # pylint: disable=len-as-condition + assert 0 < len(data) <= self.max_packet_length + # pylint: enable=len-as-condition + self.idle() # Stop receiving to clear FIFO and keep it clear. + # Combine header and data to form payload + if self.radiohead: + payload = bytearray(4) + if destination is None: # use attribute + payload[0] = self.destination + else: # use kwarg + payload[0] = destination + if node is None: # use attribute + payload[1] = self.node + else: # use kwarg + payload[1] = node + if identifier is None: # use attribute + payload[2] = self.identifier + else: # use kwarg + payload[2] = identifier + if flags is None: # use attribute + payload[3] = self.flags + else: # use kwarg + payload[3] = flags + payload = payload + data + elif destination is not None: # prepend destination for non RH packets + payload = destination.to_bytes(1, "big") + data + else: + payload = data + self.fill_fifo(payload) + # Turn on transmit mode to send out the packet. + self.transmit() + # Wait for packet_sent interrupt with explicit polling (not ideal but + # best that can be done right now without interrupts). + timed_out = await asyncio_check_timeout( + self.packet_sent, self.xmit_timeout, self.timeout_poll + ) + # Listen again if necessary and return the result packet. + if keep_listening: + self.listen() + else: + # Enter idle mode to stop receiving other packets. + self.idle() + self.clear_interrupt() + return not timed_out + + send = asyncio_to_blocking(asyncio_send) + """Non-asyncio wrapper to Send a string of data using the transmitter + using the same arguments and keywords as asyncio_send() + """ + + async def asyncio_send_with_ack(self, data: ReadableBuffer) -> bool: + """Reliable Datagram mode: + Send a packet with data and wait for an ACK response. + The packet header is automatically generated. + If enabled, the packet transmission will be retried on failure + """ + if not self.radiohead: + raise RuntimeError("send_with_ack onl suppoted in RadioHead mode") + if self.ack_retries: + retries_remaining = self.ack_retries + else: + retries_remaining = 1 + got_ack = False + self.sequence_number = (self.sequence_number + 1) & 0xFF + while not got_ack and retries_remaining: + self.identifier = self.sequence_number + await self.asyncio_send(data, keep_listening=True) + # Don't look for ACK from Broadcast message + if self.destination == _RH_BROADCAST_ADDRESS: + got_ack = True + else: + # wait for a packet from our destination + ack_packet = await self.asyncio_receive( + timeout=self.ack_wait, with_header=True + ) + if ack_packet is not None: + if ack_packet[3] & _RH_FLAGS_ACK: + # check the ID + if ack_packet[2] == self.identifier: + got_ack = True + break + # pause before next retry -- random delay + if not got_ack: + # delay by random amount before next try + await asyncio.sleep(self.ack_wait + self.ack_wait * random.random()) + retries_remaining = retries_remaining - 1 + # set retry flag in packet header + self.flags |= _RH_FLAGS_RETRY + self.flags = 0 # clear flags + return got_ack + + send_with_ack = asyncio_to_blocking(asyncio_send_with_ack) + """Non-asyncio wrapper to Send a string of data using the transmitter + using the same arguments and keywords as asyncio_send_with_ack() + """ + + async def asyncio_receive( # noqa: PLR0912 + self, + *, + keep_listening: bool = True, + with_header: bool = False, + timeout: Optional[float] = None, + ) -> Optional[bytearray]: + """Wait to receive a packet from the receiver. If a packet is found the payload bytes + are returned, otherwise None is returned (which indicates the timeout elapsed with no + reception). + If keep_listening is True (the default) the chip will immediately enter listening mode + after reception of a packet, otherwise it will fall back to idle mode and ignore any + future reception. + Packets may have a 4-byte header for compatibility with the + RadioHead library. + The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip + the header before returning the packet to the caller. + If with_header is True then the 4 byte header will be returned with the packet. + The payload then begins at packet[4]. + """ + if not self.radiohead and with_header: + raise RuntimeError("with_header only supported for RadioHead mode") + timed_out = False + if timeout is None: + timeout = self.receive_timeout + if timeout is not None: + # Wait for the payloadready signal. This is not ideal and will + # surely miss or overflow the FIFO when packets aren't read fast + # enough, however it's the best that can be done from Python without + # interrupt supports. + # Make sure we are listening for packets. + self.listen() + timed_out = await asyncio_check_timeout( + self.payload_ready, timeout, self.timeout_poll + ) + # Payload ready is set, a packet is in the FIFO. + packet = None + # save last RSSI reading + self.last_rssi = self.rssi + self.last_snr = self.snr + + # Enter idle mode to stop receiving other packets. + self.idle() + if not timed_out: + if self.enable_crc and self.crc_error: + self.crc_error_count += 1 + else: + packet = self.read_fifo() + if self.radiohead: + if len(packet) < 5: + # reject the packet if it is too small to contain the RAdioHead Header + packet = None + if packet is not None: + if ( + self.node != _RH_BROADCAST_ADDRESS # noqa: PLR1714 + and packet[0] != _RH_BROADCAST_ADDRESS + and packet[0] != self.node + ): + packet = None + if ( + not with_header and packet is not None + ): # skip the header if not wanted + packet = packet[4:] + # Listen again if necessary and return the result packet. + if keep_listening: + self.listen() + else: + # Enter idle mode to stop receiving other packets. + self.idle() + self.clear_interrupt() + return packet + + receive = asyncio_to_blocking(asyncio_receive) + """Non-asyncio wrapper to Receive a packet + using the same arguments and keywords as asyncio_receive() + """ + + async def asyncio_receive_with_ack( # noqa: PLR0912 + self, + *, + keep_listening: bool = True, + with_header: bool = False, + timeout: Optional[float] = None, + ) -> Optional[bytearray]: + """Wait to receive a RadioHead packet from the receiver then send an ACK packet in response. + AKA Reliable Datagram mode. + If a packet is found the payload bytes are returned, otherwise None is returned + (which indicates the timeout elapsed with no reception). + If keep_listening is True (the default) the chip will immediately enter listening mode + after receipt of a packet, otherwise it will fall back to idle mode and ignore + any incomming packets until it is called again. + All packets must have a 4-byte header for compatibility with the RadioHead library. + The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip + the header before returning the packet to the caller. + If with_header is True then the 4 byte header will be returned with the packet. + The payload then begins at packet[4]. + """ + if not self.radiohead: + raise RuntimeError("receive_with_ack only supported for RadioHead mode") + timed_out = False + if timeout is None: + timeout = self.receive_timeout + if timeout is not None: + # Wait for the payloadready signal. This is not ideal and will + # surely miss or overflow the FIFO when packets aren't read fast + # enough, however it's the best that can be done from Python without + # interrupt supports. + # Make sure we are listening for packets. + self.listen() + timed_out = await asyncio_check_timeout( + self.payload_ready, timeout, self.timeout_poll + ) + # Payload ready is set, a packet is in the FIFO. + packet = None + # save last RSSI reading + self.last_rssi = self.rssi + self.last_snr = self.snr + + # Enter idle mode to stop receiving other packets. + self.idle() + if not timed_out: + if self.enable_crc and self.crc_error: + self.crc_error_count += 1 + else: + packet = self.read_fifo() + if self.radiohead: + if len(packet) < 5: + # reject the packet if it is too small to contain the RAdioHead Header + packet = None + if packet is not None: + if ( + self.node != _RH_BROADCAST_ADDRESS # noqa: PLR1714 + and packet[0] != _RH_BROADCAST_ADDRESS + and packet[0] != self.node + ): + packet = None + # send ACK unless this was an ACK or a broadcast + elif ((packet[3] & _RH_FLAGS_ACK) == 0) and ( + packet[0] != _RH_BROADCAST_ADDRESS + ): + # delay before sending Ack to give receiver a chance to get ready + if self.ack_delay is not None: + await asyncio.sleep(self.ack_delay) + # send ACK packet to sender (data is b'!') + await self.asyncio_send( + b"!", + destination=packet[1], + node=packet[0], + identifier=packet[2], + flags=(packet[3] | _RH_FLAGS_ACK), + ) + # reject Retries if we have seen this idetifier from this source before + if (self.seen_ids[packet[1]] == packet[2]) and ( + packet[3] & _RH_FLAGS_RETRY + ): + packet = None + else: # save the packet identifier for this source + self.seen_ids[packet[1]] = packet[2] + if ( + packet is not None and (packet[3] & _RH_FLAGS_ACK) != 0 + ): # Ignore it if it was an ACK packet + packet = None + if ( + not with_header and packet is not None + ): # skip the header if not wanted + packet = packet[4:] + # Listen again if necessary and return the result packet. + if keep_listening: + self.listen() + else: + # Enter idle mode to stop receiving other packets. + self.idle() + self.clear_interrupt() + return packet + + receive_with_ack = asyncio_to_blocking(asyncio_receive_with_ack) + """Non-asyncio wrapper to Receive a packet + using the same arguments and keywords as asyncio_receive_with_ack() + """ + + @property + def former_temperature(self): + """Tries to grab former temp from module""" + raw_temp = self.read_u8(_RH_RF95_REG_5B_FORMER_TEMP) + temp = raw_temp & 0x7F + if (raw_temp & 0x80) == 0x80: + temp = ~temp + 0x01 + + return temp + 143 # Added prescalar for temp diff --git a/FC_Board/lib/functions.py b/FC_Board/lib/functions.py index 2762635..9c5db53 100755 --- a/FC_Board/lib/functions.py +++ b/FC_Board/lib/functions.py @@ -37,7 +37,7 @@ def __init__(self, cubesat): "Hey Its pretty cold up here, did someone forget to pay the electric bill?" ] self.last_battery_temp = 20 - self.callsign = "K06AZM" + self.callsign = "" self.state_bool = False self.face_data_baton = False self.detumble_enable_z = True @@ -83,8 +83,6 @@ def send(self, msg): self.field = Field.Field(self.cubesat, self.debug) message = f"{self.callsign} " + str(msg) + f" {self.callsign}" self.field.Beacon(message) - if self.cubesat.f_fsk: - self.cubesat.radio1.cw(message) if self.cubesat.is_licensed: self.debug_print(f"Sent Packet: " + message) else: @@ -128,14 +126,26 @@ def beacon(self): self.field = Field.Field(self.cubesat, self.debug) self.field.Beacon(lora_beacon) - if self.cubesat.f_fsk: - self.cubesat.radio1.cw(lora_beacon) del self.field del Field def joke(self): self.send(random.choice(self.jokes)) + def format_state_of_health(self, hardware): + to_return = "" + for key, value in hardware.items(): + to_return = to_return + key + "=" + if value: + to_return += "1" + else: + to_return += "0" + + if len(to_return) > 245: + return to_return + + return to_return + def state_of_health(self): import Field @@ -171,25 +181,13 @@ def state_of_health(self): + str(self.state_list) + f"{self.callsign}" ) - if self.cubesat.f_fsk: - self.cubesat.radio1.cw( - f"{self.callsign} Yearling^2 State of Health 1/2" - + str(self.state_list) - + f"{self.callsign}" - ) self.state_bool = True else: self.field.Beacon( f"{self.callsign} YSOH 2/2" - + str(self.cubesat.hardware) + + self.format_state_of_health(self.cubesat.hardware) + f"{self.callsign}" ) - if self.cubesat.f_fsk: - self.cubesat.radio1.cw( - f"{self.callsign} YSOH 2/2" - + str(self.cubesat.hardware) - + f"{self.callsign}" - ) self.state_bool = False del self.field del Field @@ -203,10 +201,6 @@ def send_face(self): self.field.Beacon( f"{self.callsign} Y-: {self.facestring[0]} Y+: {self.facestring[1]} X-: {self.facestring[2]} X+: {self.facestring[3]} Z-: {self.facestring[4]} {self.callsign}" ) - if self.cubesat.f_fsk: - self.cubesat.radio1.cw( - f"{self.callsign} Y-: {self.facestring[0]} Y+: {self.facestring[1]} X-: {self.facestring[2]} X+: {self.facestring[3]} Z-: {self.facestring[4]} {self.callsign}" - ) del self.field del Field @@ -217,7 +211,7 @@ def listen(self): try: self.debug_print("Listening") self.cubesat.radio1.receive_timeout = 10 - received = self.cubesat.radio1.receive(keep_listening=True) + received = self.cubesat.radio1.receive_with_ack(keep_listening=True) except Exception as e: self.debug_print( "An Error has occured while listening: " diff --git a/FC_Board/lib/pysquared.py b/FC_Board/lib/pysquared.py index 70bd78e..9f1501f 100755 --- a/FC_Board/lib/pysquared.py +++ b/FC_Board/lib/pysquared.py @@ -20,7 +20,8 @@ from collections import OrderedDict # Hardware Specific Libs -import pysquared_rfm9x # Radio +from adafruit_rfm import rfm9x # Radio +from adafruit_rfm import rfm9xfsk # Radio import neopixel # RGB LED from adafruit_lsm6ds.lsm6dsox import LSM6DSOX # IMU import adafruit_lis2mdl # Magnetometer @@ -170,10 +171,6 @@ def __init__(self): if self.c_boot > 200: self.c_boot = 0 - if self.f_fsk: - self.debug_print("Fsk going to false") - self.f_fsk = False - if self.f_softboot: self.f_softboot = False @@ -267,31 +264,41 @@ def __init__(self): self.radio1_DIO4.switch_to_input() try: - self.radio1 = pysquared_rfm9x.RFM9x( - self.spi0, - _rf_cs1, - _rf_rst1, - self.radio_cfg["freq"], - code_rate=8, - baudrate=1320000, - ) - # Default LoRa Modulation Settings - # Frequency: 437.4 MHz, SF7, BW125kHz, CR4/8, Preamble=8, CRC=True - self.radio1.dio0 = self.radio1_DIO0 - # self.radio1.dio4=self.radio1_DIO4 - self.radio1.max_output = True - self.radio1.tx_power = self.radio_cfg["pwr"] - self.radio1.spreading_factor = self.radio_cfg["sf"] + if self.f_fsk: + self.radio1 = rfm9xfsk.RFM9xFSK( + self.spi0, + _rf_cs1, + _rf_rst1, + self.radio_cfg["freq"], + # code_rate=8, code rate does not exist for RFM9xFSK + ) + self.radio1.fsk_node_address = 1 + self.radio1.fsk_broadcast_address = 0xFF + self.radio1.modulation_type = 0 + else: + # Default LoRa Modulation Settings + # Frequency: 437.4 MHz, SF7, BW125kHz, CR4/8, Preamble=8, CRC=True + self.radio1 = rfm9x.RFM9x( + self.spi0, + _rf_cs1, + _rf_rst1, + self.radio_cfg["freq"], + # code_rate=8, code rate does not exist for RFM9xFSK + ) + self.radio1.max_output = True + self.radio1.tx_power = self.radio_cfg["pwr"] + self.radio1.spreading_factor = self.radio_cfg["sf"] + + self.radio1.enable_crc = True + self.radio1.ack_delay = 0.2 + if self.radio1.spreading_factor > 9: + self.radio1.preamble_length = self.radio1.spreading_factor self.radio1.node = self.radio_cfg["id"] self.radio1.destination = self.radio_cfg["gs"] - self.radio1.enable_crc = True - self.radio1.ack_delay = 0.2 - if self.radio1.spreading_factor > 9: - self.radio1.preamble_length = self.radio1.spreading_factor self.hardware["Radio1"] = True - if self.legacy: - self.enable_rf.value = False + # if self.legacy: + # self.enable_rf.value = False except Exception as e: self.error_print( @@ -444,6 +451,10 @@ def __init__(self): self.error_print("[ERROR][CAMERA]TCA Not Initialized") self.hardware["CAM"] = False + if self.f_fsk: + self.debug_print("Next restart will be in LoRa mode.") + self.f_fsk = False + """ Prints init State of PySquared Hardware """ @@ -689,6 +700,7 @@ def take_image(self): """ def watchdog_pet(self): + ... self.watchdog_pin.value = True time.sleep(0.01) self.watchdog_pin.value = False diff --git a/FC_Board/lib/pysquared_rfm9x.py b/FC_Board/lib/pysquared_rfm9x.py deleted file mode 100755 index 1f221ab..0000000 --- a/FC_Board/lib/pysquared_rfm9x.py +++ /dev/null @@ -1,1116 +0,0 @@ -# SPDX-FileCopyrightText: 2017 Tony DiCola for Adafruit Industries -# -# SPDX-License-Identifier: MIT - -""" -MODIFIED VERSION of adafruit_rfm9x CircuitPython Library for PyCubed Use -See https://github.com/adafruit/Adafruit_CircuitPython_RFM9x - -CircuitPython Version: 7.0.0 alpha -Library Repo: https://github.com/pycubed/library_pycubed.py -* Edits by: Max Holliday -Added temperature readout by Nicole Maggard -""" -import time -from random import random -import digitalio -from micropython import const -import adafruit_bus_device.spi_device as spidev - -# pylint: disable=bad-whitespace -# Internal constants: -# Register names (FSK Mode even though we use LoRa instead, from table 85) -_RH_RF95_REG_00_FIFO = const(0x00) -_RH_RF95_REG_01_OP_MODE = const(0x01) -_RH_RF95_REG_06_FRF_MSB = const(0x06) -_RH_RF95_REG_07_FRF_MID = const(0x07) -_RH_RF95_REG_08_FRF_LSB = const(0x08) -_RH_RF95_REG_09_PA_CONFIG = const(0x09) -_RH_RF95_REG_0A_PA_RAMP = const(0x0A) -_RH_RF95_REG_0B_OCP = const(0x0B) -_RH_RF95_REG_0C_LNA = const(0x0C) -_RH_RF95_REG_0D_FIFO_ADDR_PTR = const(0x0D) -_RH_RF95_REG_0E_FIFO_TX_BASE_ADDR = const(0x0E) -_RH_RF95_REG_0F_FIFO_RX_BASE_ADDR = const(0x0F) -_RH_RF95_REG_10_FIFO_RX_CURRENT_ADDR = const(0x10) -_RH_RF95_REG_11_IRQ_FLAGS_MASK = const(0x11) -_RH_RF95_REG_12_IRQ_FLAGS = const(0x12) -_RH_RF95_REG_13_RX_NB_BYTES = const(0x13) -_RH_RF95_REG_14_RX_HEADER_CNT_VALUE_MSB = const(0x14) -_RH_RF95_REG_15_RX_HEADER_CNT_VALUE_LSB = const(0x15) -_RH_RF95_REG_16_RX_PACKET_CNT_VALUE_MSB = const(0x16) -_RH_RF95_REG_17_RX_PACKET_CNT_VALUE_LSB = const(0x17) -_RH_RF95_REG_18_MODEM_STAT = const(0x18) -_RH_RF95_REG_19_PKT_SNR_VALUE = const(0x19) -_RH_RF95_REG_1A_PKT_RSSI_VALUE = const(0x1A) -_RH_RF95_REG_1B_RSSI_VALUE = const(0x1B) -_RH_RF95_REG_1C_HOP_CHANNEL = const(0x1C) -_RH_RF95_REG_1D_MODEM_CONFIG1 = const(0x1D) -_RH_RF95_REG_1E_MODEM_CONFIG2 = const(0x1E) -_RH_RF95_REG_1F_SYMB_TIMEOUT_LSB = const(0x1F) -_RH_RF95_REG_20_PREAMBLE_MSB = const(0x20) -_RH_RF95_REG_21_PREAMBLE_LSB = const(0x21) -_RH_RF95_REG_22_PAYLOAD_LENGTH = const(0x22) -_RH_RF95_REG_23_MAX_PAYLOAD_LENGTH = const(0x23) -_RH_RF95_REG_24_HOP_PERIOD = const(0x24) -_RH_RF95_REG_25_FIFO_RX_BYTE_ADDR = const(0x25) -_RH_RF95_REG_26_MODEM_CONFIG3 = const(0x26) - -_RH_RF95_REG_3C_REGTEMP = const(0x3C) - -_RH_RF95_REG_40_DIO_MAPPING1 = const(0x40) -_RH_RF95_REG_41_DIO_MAPPING2 = const(0x41) -_RH_RF95_REG_42_VERSION = const(0x42) - -_RH_RF95_REG_4B_TCXO = const(0x4B) -_RH_RF95_REG_4D_PA_DAC = const(0x4D) -_RH_RF95_REG_5B_FORMER_TEMP = const(0x5B) -_RH_RF95_REG_61_AGC_REF = const(0x61) -_RH_RF95_REG_62_AGC_THRESH1 = const(0x62) -_RH_RF95_REG_63_AGC_THRESH2 = const(0x63) -_RH_RF95_REG_64_AGC_THRESH3 = const(0x64) - -_RH_RF95_DETECTION_OPTIMIZE = const(0x31) -_RH_RF95_DETECTION_THRESHOLD = const(0x37) - -_RH_RF95_PA_DAC_DISABLE = const(0x04) -_RH_RF95_PA_DAC_ENABLE = const(0x07) - -# The Frequency Synthesizer step = RH_RF95_FXOSC / 2^^19 -_RH_RF95_FSTEP = 32000000 / 524288 - -# RadioHead specific compatibility constants. -_RH_BROADCAST_ADDRESS = const(0xFF) - -# The acknowledgement bit in the FLAGS -# The top 4 bits of the flags are reserved for RadioHead. The lower 4 bits are reserved -# for application layer use. -_RH_FLAGS_ACK = const(0x80) -_RH_FLAGS_RETRY = const(0x40) - -# User facing constants: -SLEEP_MODE = const(0) # 0b000 -STANDBY_MODE = const(1) # 0b001 -FS_TX_MODE = const(2) # 0b010 -TX_MODE = const(3) # 0b011 -FS_RX_MODE = const(4) # 0b100 -RX_MODE = const(5) # 0b101 -# pylint: enable=bad-whitespace - -# gap =bytes([0xFF]) -# sgap=bytes([0xFF,0xFF,0xFF]) -# dot =bytes([0]) -# dash=bytes([0,0,0]) -# # ...- .-. ...-- -..- -# VR3X = (gap+(dot+gap)*3)+dash+sgap+\ -# (dot+gap)+dash+gap+dot+sgap+\ -# ((dot+gap)*3)+dash+gap+dash+sgap+\ -# dash+gap+((dot+gap)*2)+dash+gap -VR3X = b"\xff\x00\xff\x00\xff\x00\xff\x00\x00\x00\xff\xff\xff\x00\xff\x00\x00\x00\xff\x00\xff\xff\xff\x00\xff\x00\xff\x00\xff\x00\x00\x00\xff\x00\x00\x00\xff\xff\xff\x00\x00\x00\xff\x00\xff\x00\xff\x00\x00\x00\xff" - - -# Disable the too many instance members warning. Pylint has no knowledge -# of the context and is merely guessing at the proper amount of members. This -# is a complex chip which requires exposing many attributes and state. Disable -# the warning to work around the error. -# pylint: disable=too-many-instance-attributes - -_bigbuffer = bytearray(256) -bw_bins = (7800, 10400, 15600, 20800, 31250, 41700, 62500, 125000, 250000) - - -class RFM9x: - """Interface to a RFM95/6/7/8 LoRa radio module. Allows sending and - receivng bytes of data in long range LoRa mode at a support board frequency - (433/915mhz). - - You must specify the following parameters: - - spi: The SPI bus connected to the radio. - - cs: The CS pin DigitalInOut connected to the radio. - - reset: The reset/RST pin DigialInOut connected to the radio. - - frequency: The frequency (in mhz) of the radio module (433/915mhz typically). - - You can optionally specify: - - preamble_length: The length in bytes of the packet preamble (default 8). - - high_power: Boolean to indicate a high power board (RFM95, etc.). Default - is True for high power. - - baudrate: Baud rate of the SPI connection, default is 10mhz but you might - choose to lower to 1mhz if using long wires or a breadboard. - - Remember this library makes a best effort at receiving packets with pure - Python code. Trying to receive packets too quickly will result in lost data - so limit yourself to simple scenarios of sending and receiving single - packets at a time. - - Also note this library tries to be compatible with raw RadioHead Arduino - library communication. This means the library sets up the radio modulation - to match RadioHead's defaults and assumes that each packet contains a - 4 byte header compatible with RadioHead's implementation. - Advanced RadioHead features like address/node specific packets - or "reliable datagram" delivery are supported however due to the - limitations noted, "reliable datagram" is still subject to missed packets but with it, - sender is notified if a packet has potentially been missed. - """ - - # Global buffer for SPI commands - _BUFFER = bytearray(4) - DEBUG_HEADER = False - valid_ids = (58, 59, 60, 255) - - class _RegisterBits: - # Class to simplify access to the many configuration bits avaialable - # on the chip's registers. This is a subclass here instead of using - # a higher level module to increase the efficiency of memory usage - # (all of the instances of this bit class will share the same buffer - # used by the parent RFM69 class instance vs. each having their own - # buffer and taking too much memory). - - # Quirk of pylint that it requires public methods for a class. This - # is a decorator class in Python and by design it has no public methods. - # Instead it uses dunder accessors like get and set below. For some - # reason pylint can't figure this out so disable the check. - # pylint: disable=too-few-public-methods - - # Again pylint fails to see the true intent of this code and warns - # against private access by calling the write and read functions below. - # This is by design as this is an internally used class. Disable the - # check from pylint. - # pylint: disable=protected-access - - def __init__(self, address, *, offset=0, bits=1): - assert 0 <= offset <= 7 - assert 1 <= bits <= 8 - assert (offset + bits) <= 8 - self._address = address - self._mask = 0 - for _ in range(bits): - self._mask <<= 1 - self._mask |= 1 - self._mask <<= offset - self._offset = offset - - def __get__(self, obj, objtype): - reg_value = obj._read_u8(self._address) - return (reg_value & self._mask) >> self._offset - - def __set__(self, obj, val): - reg_value = obj._read_u8(self._address) - reg_value &= ~self._mask - reg_value |= (val & 0xFF) << self._offset - obj._write_u8(self._address, reg_value) - - operation_mode = _RegisterBits(_RH_RF95_REG_01_OP_MODE, bits=3) - - low_frequency_mode = _RegisterBits(_RH_RF95_REG_01_OP_MODE, offset=3, bits=1) - - osc_calibration = _RegisterBits(_RH_RF95_REG_24_HOP_PERIOD, offset=3, bits=1) - - modulation_type = _RegisterBits(_RH_RF95_REG_01_OP_MODE, offset=5, bits=2) - - # Long range/LoRa mode can only be set in sleep mode! - long_range_mode = _RegisterBits(_RH_RF95_REG_01_OP_MODE, offset=7, bits=1) - - lna_boost = _RegisterBits(_RH_RF95_REG_0C_LNA, bits=2) - - output_power = _RegisterBits(_RH_RF95_REG_09_PA_CONFIG, bits=4) - - modulation_shaping = _RegisterBits(_RH_RF95_REG_0A_PA_RAMP, bits=2) - - pa_ramp = _RegisterBits(_RH_RF95_REG_0A_PA_RAMP, bits=4) - - max_power = _RegisterBits(_RH_RF95_REG_09_PA_CONFIG, offset=4, bits=3) - - pa_select = _RegisterBits(_RH_RF95_REG_09_PA_CONFIG, offset=7, bits=1) - - pa_dac = _RegisterBits(_RH_RF95_REG_4D_PA_DAC, bits=3) - - dio0_mapping = _RegisterBits(_RH_RF95_REG_40_DIO_MAPPING1, offset=6, bits=2) - - low_datarate_optimize = _RegisterBits( - _RH_RF95_REG_26_MODEM_CONFIG3, offset=3, bits=1 - ) - auto_agc = _RegisterBits(_RH_RF95_REG_26_MODEM_CONFIG3, offset=2, bits=1) - - debug = False - buffview = memoryview(_bigbuffer) - - def __init__( - self, - spi, - cs, - reset, - frequency, - *, - preamble_length=8, - code_rate=5, - high_power=True, - baudrate=5000000, - max_output=False - ): - self.high_power = high_power - self.max_output = max_output - self.dio0 = False - # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. - # Set Default Baudrate to 5MHz to avoid problems - self._device = spidev.SPIDevice(spi, cs, baudrate=baudrate, polarity=0, phase=0) - # Setup reset as a digital input (default state for reset line according - # to the datasheet). This line is pulled low as an output quickly to - # trigger a reset. Note that reset MUST be done like this and set as - # a high impedence input or else the chip cannot change modes (trust me!). - self._reset = reset - self._reset.switch_to_input(pull=digitalio.Pull.UP) - self.reset() - # No device type check! Catch an error from the very first request and - # throw a nicer message to indicate possible wiring problems. - version = self._read_u8(_RH_RF95_REG_42_VERSION) - if version != 18: - raise RuntimeError( - "Failed to find rfm9x with expected version -- check wiring" - ) - - # Set sleep mode, wait 10ms and confirm in sleep mode (basic device check). - # Also set long range mode (LoRa mode) as it can only be done in sleep. - self.idle() - time.sleep(0.01) - self.osc_calibration = True - time.sleep(1) - - self.sleep() - time.sleep(0.01) - self.long_range_mode = True - if self.operation_mode != SLEEP_MODE or not self.long_range_mode: - raise RuntimeError("Failed to configure radio for LoRa mode, check wiring!") - # clear default setting for access to LF registers if frequency > 525MHz - if frequency > 525: - self.low_frequency_mode = 0 - # Setup entire 256 byte FIFO - self._write_u8(_RH_RF95_REG_0E_FIFO_TX_BASE_ADDR, 0x00) - self._write_u8(_RH_RF95_REG_0F_FIFO_RX_BASE_ADDR, 0x00) - # Disable Freq Hop - self._write_u8(_RH_RF95_REG_24_HOP_PERIOD, 0x00) - # Set mode idle - self.idle() - - # Set frequency - self.frequency_mhz = frequency - # Set preamble length (default 8 bytes to match radiohead). - self.preamble_length = preamble_length - # Defaults set modem config to RadioHead compatible Bw125Cr45Sf128 mode. - self.signal_bandwidth = 125000 - self.coding_rate = code_rate - self.spreading_factor = 7 - # Default to disable CRC checking on incoming packets. - self.enable_crc = False - # Note no sync word is set for LoRa mode either! - self._write_u8(_RH_RF95_REG_26_MODEM_CONFIG3, 0x00) - # Set transmit power to 13 dBm, a safe value any module supports. - self.tx_power = 13 - # initialize last RSSI reading - self.last_rssi = 0.0 - """The RSSI of the last received packet. Stored when the packet was received. - This instantaneous RSSI value may not be accurate once the - operating mode has been changed. - """ - # initialize timeouts and delays delays - self.ack_wait = 0.5 - """The delay time before attempting a retry after not receiving an ACK""" - self.receive_timeout = 0.5 - """The amount of time to poll for a received packet. - If no packet is received, the returned packet will be None - """ - self.xmit_timeout = 2.0 - """The amount of time to wait for the HW to transmit the packet. - This is mainly used to prevent a hang due to a HW issue - """ - self.ack_retries = 5 - """The number of ACK retries before reporting a failure.""" - self.ack_delay = None - """The delay time before attemting to send an ACK. - If ACKs are being missed try setting this to .1 or .2. - """ - # initialize sequence number counter for reliabe datagram mode - self.sequence_number = 0 - # create seen Ids list - self.seen_ids = bytearray(256) - # initialize packet header - # node address - default is broadcast - self.node = _RH_BROADCAST_ADDRESS - """The default address of this Node. (0-255). - If not 255 (0xff) then only packets address to this node will be accepted. - First byte of the RadioHead header. - """ - # destination address - default is broadcast - self.destination = _RH_BROADCAST_ADDRESS - """The default destination address for packet transmissions. (0-255). - If 255 (0xff) then any receiving node should accept the packet. - Second byte of the RadioHead header. - """ - # ID - contains seq count for reliable datagram mode - self.identifier = 0 - """Automatically set to the sequence number when send_with_ack() used. - Third byte of the RadioHead header. - """ - # flags - identifies ack/reetry packet for reliable datagram mode - self.flags = 0 - """Upper 4 bits reserved for use by Reliable Datagram Mode. - Lower 4 bits may be used to pass information. - Fourth byte of the RadioHead header. - """ - self.crc_error_count = 0 - - self.auto_agc = True - self.pa_ramp = 0 # mode agnostic - self.lna_boost = 3 # mode agnostic - - def cw(self, msg=None): - success = False - if msg is None: - msg = VR3X - - cache = [] - if self.long_range_mode: - # cache LoRa params - cache = [ - self.spreading_factor, - self.signal_bandwidth, - self.coding_rate, - self.preamble_length, - self.enable_crc, - ] - - self.operation_mode = SLEEP_MODE - time.sleep(0.01) - self.long_range_mode = False # FSK/OOK Mode - self.modulation_type = 0 # FSK - self.modulation_shaping = 2 - self._write_u8(0x25, 0x00) # no preamble - self._write_u8(0x26, 0x00) # no preamble - self._write_u8(0x27, 0x00) # no sync word - self._write_u8(0x3F, 10) # clear FIFO - self._write_u8(0x02, 0xFF) # BitRate(15:8) - self._write_u8(0x03, 0xFF) # BitRate(15:8) - self._write_u8(0x05, 11) # Freq deviation Lsb 600 Hz - self.idle() - # Set payload length VR3X Morse length = 51 - self._write_u8(0x35, len(msg) - 1) - self._write_from(_RH_RF95_REG_00_FIFO, bytearray(msg)) - - _t = time.monotonic() + 10 - self.operation_mode = TX_MODE - while time.monotonic() < _t: - a = self._read_u8(0x3F) - # print(a,end=' ') - if (a >> 6) & 1: - time.sleep(0.01) - success = True - break - if not (a >> 6) & 1: - print("cw timeout") - self.idle() - if cache: - self.operation_mode = SLEEP_MODE - time.sleep(0.01) - self.long_range_mode = True - self._write_u8(_RH_RF95_REG_0E_FIFO_TX_BASE_ADDR, 0x00) - self._write_u8(_RH_RF95_REG_0F_FIFO_RX_BASE_ADDR, 0x00) - self._write_u8(_RH_RF95_REG_24_HOP_PERIOD, 0x00) - self.idle() - self.spreading_factor = cache[0] - self.signal_bandwidth = cache[1] - self.coding_rate = cache[2] - self.preamble_length = cache[3] - self.enable_crc = cache[4] - self.auto_agc = True - return success - - # pylint: disable=no-member - # Reconsider pylint: disable when this can be tested - def _read_into(self, address, buf, length=None): - # Read a number of bytes from the specified address into the provided - # buffer. If length is not specified (the default) the entire buffer - # will be filled. - if length is None: - length = len(buf) - with self._device as device: - self._BUFFER[0] = address & 0x7F # Strip out top bit to set 0 - # value (read). - device.write(self._BUFFER, end=1) - device.readinto(buf, end=length) - - def _read_u8(self, address): - # Read a single byte from the provided address and return it. - self._read_into(address, self._BUFFER, length=1) - return self._BUFFER[0] - - def _write_from(self, address, buf, length=None): - # Write a number of bytes to the provided address and taken from the - # provided buffer. If no length is specified (the default) the entire - # buffer is written. - if length is None: - length = len(buf) - with self._device as device: - self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to - # indicate a write. - device.write(self._BUFFER, end=1) - device.write(buf, end=length) - - def _write_u8(self, address, val): - # Write a byte register to the chip. Specify the 7-bit address and the - # 8-bit value to write to that address. - with self._device as device: - self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to - # indicate a write. - self._BUFFER[1] = val & 0xFF - device.write(self._BUFFER, end=2) - - def reset(self): - """Perform a reset of the chip.""" - # See section 7.2.2 of the datasheet for reset description. - self._reset.switch_to_output(value=False) - time.sleep(0.0001) # 100 us - self._reset.switch_to_input(pull=digitalio.Pull.UP) - time.sleep(0.005) # 5 ms - - def idle(self): - """Enter idle standby mode.""" - self.operation_mode = STANDBY_MODE - - def sleep(self): - """Enter sleep mode.""" - self.operation_mode = SLEEP_MODE - - def listen(self): - """Listen for packets to be received by the chip. Use :py:func:`receive` - to listen, wait and retrieve packets as they're available. - """ - self.operation_mode = RX_MODE - self.dio0_mapping = 0b00 # Interrupt on rx done. - - def transmit(self): - """Transmit a packet which is queued in the FIFO. This is a low level - function for entering transmit mode and more. For generating and - transmitting a packet of data use :py:func:`send` instead. - """ - self.operation_mode = TX_MODE - self.dio0_mapping = 0b01 # Interrupt on tx done. - - @property - def preamble_length(self): - """The length of the preamble for sent and received packets, an unsigned - 16-bit value. Received packets must match this length or they are - ignored! Set to 8 to match the RadioHead RFM95 library. - """ - msb = self._read_u8(_RH_RF95_REG_20_PREAMBLE_MSB) - lsb = self._read_u8(_RH_RF95_REG_21_PREAMBLE_LSB) - return ((msb << 8) | lsb) & 0xFFFF - - @preamble_length.setter - def preamble_length(self, val): - assert 0 <= val <= 65535 - self._write_u8(_RH_RF95_REG_20_PREAMBLE_MSB, (val >> 8) & 0xFF) - self._write_u8(_RH_RF95_REG_21_PREAMBLE_LSB, val & 0xFF) - - @property - def frequency_mhz(self): - """The frequency of the radio in Megahertz. Only the allowed values for - your radio must be specified (i.e. 433 vs. 915 mhz)! - """ - msb = self._read_u8(_RH_RF95_REG_06_FRF_MSB) - mid = self._read_u8(_RH_RF95_REG_07_FRF_MID) - lsb = self._read_u8(_RH_RF95_REG_08_FRF_LSB) - frf = ((msb << 16) | (mid << 8) | lsb) & 0xFFFFFF - frequency = (frf * _RH_RF95_FSTEP) / 1000000.0 - return frequency - - @frequency_mhz.setter - def frequency_mhz(self, val): - if val < 240 or val > 960: - raise RuntimeError("frequency_mhz must be between 240 and 960") - # Calculate FRF register 24-bit value. - frf = int((val * 1000000.0) / _RH_RF95_FSTEP) & 0xFFFFFF - # Extract byte values and update registers. - msb = frf >> 16 - mid = (frf >> 8) & 0xFF - lsb = frf & 0xFF - self._write_u8(_RH_RF95_REG_06_FRF_MSB, msb) - self._write_u8(_RH_RF95_REG_07_FRF_MID, mid) - self._write_u8(_RH_RF95_REG_08_FRF_LSB, lsb) - - @property - def tx_power(self): - """The transmit power in dBm. Can be set to a value from 5 to 23 for - high power devices (RFM95/96/97/98, high_power=True) or -1 to 14 for low - power devices. Only integer power levels are actually set (i.e. 12.5 - will result in a value of 12 dBm). - The actual maximum setting for high_power=True is 20dBm but for values > 20 - the PA_BOOST will be enabled resulting in an additional gain of 3dBm. - The actual setting is reduced by 3dBm. - The reported value will reflect the reduced setting. - """ - if self.high_power: - return self.output_power + 5 - return self.output_power - 1 - - @tx_power.setter - def tx_power(self, val): - val = int(val) - if self.max_output is True: - print("RFM9X Max Output Power Enabled") - self._write_u8(_RH_RF95_REG_0B_OCP, 0x3F) # set Ocp to 240mA - self.pa_dac = _RH_RF95_PA_DAC_ENABLE - self.pa_select = True - self.max_power = 0b111 - self.output_power = 0x0F - return - - if self.high_power: - if val < 5 or val > 23: - raise RuntimeError("tx_power must be between 5 and 23") - # Enable power amp DAC if power is above 20 dB. - # Lower setting by 3db when PA_BOOST enabled - see Data Sheet Section 6.4 - if val > 20: - self.pa_dac = _RH_RF95_PA_DAC_ENABLE - val -= 3 - else: - self.pa_dac = _RH_RF95_PA_DAC_DISABLE - self.pa_select = True - self.output_power = (val - 5) & 0x0F - else: - assert -1 <= val <= 14 - self.pa_select = False - self.max_power = 0b111 # Allow max power output. - self.output_power = (val + 1) & 0x0F - - # ADDED FOR PYCUBED - @property - def packet_status(self): - return (self.rssi, self._read_u8(_RH_RF95_REG_19_PKT_SNR_VALUE) / 4) - - @property - def pll_timeout(self): - return self._read_u8(_RH_RF95_REG_1C_HOP_CHANNEL) - - def rssi(self, raw=False): - """The received strength indicator (in dBm) of the last received message.""" - # Read RSSI register and convert to value using formula in datasheet. - # Remember in LoRa mode the payload register changes function to RSSI! - if raw: - return self._read_u8(_RH_RF95_REG_1A_PKT_RSSI_VALUE) - return self._read_u8(_RH_RF95_REG_1A_PKT_RSSI_VALUE) - 137 - - @property - def signal_bandwidth(self): - """The signal bandwidth used by the radio (try setting to a higher - value to increase throughput or to a lower value to increase the - likelihood of successfully received payloads). Valid values are - listed in RFM9x.bw_bins.""" - bw_id = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0xF0) >> 4 - if bw_id >= len(bw_bins): - current_bandwidth = 500000 - else: - current_bandwidth = bw_bins[bw_id] - return current_bandwidth - - @signal_bandwidth.setter - def signal_bandwidth(self, val): - # Set signal bandwidth (set to 125000 to match RadioHead Bw125). - for bw_id, cutoff in enumerate(bw_bins): - if val <= cutoff: - break - else: - bw_id = 9 - self._write_u8( - _RH_RF95_REG_1D_MODEM_CONFIG1, - (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0x0F) | (bw_id << 4), - ) - if val >= 500000: - # see Semtech SX1276 errata note 2.1 - self._write_u8(0x36, 0x02) - self._write_u8(0x3A, 0x64) - else: - if val == 7800: - self._write_u8(0x2F, 0x48) - elif val >= 62500: - # see Semtech SX1276 errata note 2.3 - self._write_u8(0x2F, 0x40) - else: - self._write_u8(0x2F, 0x44) - self._write_u8(0x30, 0) - - @property - def coding_rate(self): - """The coding rate used by the radio to control forward error - correction (try setting to a higher value to increase tolerance of - short bursts of interference or to a lower value to increase bit - rate). Valid values are limited to 5, 6, 7, or 8.""" - cr_id = (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0x0E) >> 1 - denominator = cr_id + 4 - return denominator - - @coding_rate.setter - def coding_rate(self, val): - # Set coding rate (set to 5 to match RadioHead Cr45). - denominator = min(max(val, 5), 8) - cr_id = denominator - 4 - self._write_u8( - _RH_RF95_REG_1D_MODEM_CONFIG1, - (self._read_u8(_RH_RF95_REG_1D_MODEM_CONFIG1) & 0xF1) | (cr_id << 1), - ) - - @property - def spreading_factor(self): - """The spreading factor used by the radio (try setting to a higher - value to increase the receiver's ability to distinguish signal from - noise or to a lower value to increase the data transmission rate). - Valid values are limited to 6, 7, 8, 9, 10, 11, or 12.""" - sf_id = (self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0xF0) >> 4 - return sf_id - - @spreading_factor.setter - def spreading_factor(self, val): - # Set spreading factor (set to 7 to match RadioHead Sf128). - val = min(max(val, 6), 12) - self._write_u8(_RH_RF95_DETECTION_OPTIMIZE, 0xC5 if val == 6 else 0xC3) - - if self.signal_bandwidth >= 5000000: - self._write_u8(_RH_RF95_DETECTION_OPTIMIZE, 0xC5 if val == 6 else 0xC3) - else: - # see Semtech SX1276 errata note 2.3 - self._write_u8(_RH_RF95_DETECTION_OPTIMIZE, 0x45 if val == 6 else 0x43) - - self._write_u8(_RH_RF95_DETECTION_THRESHOLD, 0x0C if val == 6 else 0x0A) - self._write_u8( - _RH_RF95_REG_1E_MODEM_CONFIG2, - ( - (self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0x0F) - | ((val << 4) & 0xF0) - ), - ) - - @property - def enable_crc(self): - """Set to True to enable hardware CRC checking of incoming packets. - Incoming packets that fail the CRC check are not processed. Set to - False to disable CRC checking and process all incoming packets.""" - return (self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0x04) == 0x04 - - '''@property - def temperature(self): - """Tries to grab current temp from module""" - raw_temp=self._read_u8(_RH_RF95_REG_3C_REGTEMP) - temp = (raw_temp & 0x7F) - if (raw_temp & 0x80) == 0x80: - temp=~temp+0x01 - - return temp+24#Added prescalar for temp''' - - @property - def former_temperature(self): - """Tries to grab former temp from module""" - raw_temp = self._read_u8(_RH_RF95_REG_5B_FORMER_TEMP) - temp = raw_temp & 0x7F - if (raw_temp & 0x80) == 0x80: - temp = ~temp + 0x01 - - return temp + 143 # Added prescalar for temp - - @enable_crc.setter - def enable_crc(self, val): - # Optionally enable CRC checking on incoming packets. - if val: - self._write_u8( - _RH_RF95_REG_1E_MODEM_CONFIG2, - self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) | 0x04, - ) - else: - self._write_u8( - _RH_RF95_REG_1E_MODEM_CONFIG2, - self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0xFB, - ) - - def tx_done(self): - """Transmit status""" - # if self.dio0: - # print('TxDIO0: {}, {}'.format(self.dio0.value,hex((self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x8) >> 3))) - # return self.dio0.value - return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x8) >> 3 - - def rx_done(self): - """Receive status""" - if self.dio0: - # print('RxDIO0: {}, {}'.format(self.dio0.value,hex((self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x40) >> 6))) - return self.dio0.value - else: - return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x40) >> 6 - - async def await_rx(self, timeout=60): - _t = time.monotonic() + timeout - while not self.rx_done(): - if time.monotonic() < _t: - yield - else: - # Timed out - return False - # Received something - return True - - def crc_error(self): - """crc status""" - return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x20) >> 5 - - def send( - self, - data, - *, - keep_listening=False, - destination=None, - node=None, - identifier=None, - flags=None - ): - """Send a string of data using the transmitter. - You can only send 252 bytes at a time - (limited by chip's FIFO size and appended headers). - This appends a 4 byte header to be compatible with the RadioHead library. - The header defaults to using the initialized attributes: - (destination,node,identifier,flags) - It may be temporarily overidden via the kwargs - destination,node,identifier,flags. - Values passed via kwargs do not alter the attribute settings. - The keep_listening argument should be set to True if you want to start listening - automatically after the packet is sent. The default setting is False. - - Returns: True if success or False if the send timed out. - """ - # Disable pylint warning to not use length as a check for zero. - # This is a puzzling warning as the below code is clearly the most - # efficient and proper way to ensure a precondition that the provided - # buffer be within an expected range of bounds. Disable this check. - # pylint: disable=len-as-condition - - if hasattr(self, "txrx"): # TX - self.txrx[0].value = True - self.txrx[1].value = False - - l = len(data) - assert 0 < l <= 252 - # pylint: enable=len-as-condition - self.idle() # Stop receiving to clear FIFO and keep it clear. - l += 4 - # Fill the FIFO with a packet to send. - self._write_u8(_RH_RF95_REG_0D_FIFO_ADDR_PTR, 0x00) # FIFO starts at 0. - - # Combine header and data to form payload - if data == b"!": - payload = bytearray(5) - else: - payload = self.buffview[:l] - - if destination is None: # use attribute - payload[0] = self.destination - else: # use kwarg - payload[0] = destination - if node is None: # use attribute - payload[1] = self.node - else: # use kwarg - payload[1] = node - if identifier is None: # use attribute - payload[2] = self.identifier - else: # use kwarg - payload[2] = identifier - if flags is None: # use attribute - payload[3] = self.flags - else: # use kwarg - payload[3] = flags - if self.DEBUG_HEADER: - print("[header] - {}".format([hex(i) for i in payload])) - # payload = payload + data - try: - if isinstance(data, (bytes, bytearray, memoryview)): - payload[4:] = data[:] - else: - payload[4:] = data.encode() - except Exception as e: - print("payload encoding error:", e) - payload = bytearray(payload[:4]) + data - - # Write payload. - self._write_from(_RH_RF95_REG_00_FIFO, payload) - # Write payload and header length. - self._write_u8(_RH_RF95_REG_22_PAYLOAD_LENGTH, l) - # Turn on transmit mode to send out the packet. - self.transmit() - # Wait for tx done interrupt with explicit polling (not ideal but - # best that can be done right now without interrupts). - start = time.monotonic() - timed_out = False - while not timed_out and not self.tx_done(): - if (time.monotonic() - start) >= self.xmit_timeout: - timed_out = True - - if hasattr(self, "txrx"): # RX - self.txrx[0].value = False - self.txrx[1].value = True - - # Listen again if necessary and return the result packet. - if keep_listening: - self.listen() - else: - # Enter idle mode to stop receiving other packets. - self.idle() - # Clear interrupt. - self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) - return not timed_out - - def send_with_ack(self, data): - """Reliable Datagram mode: - Send a packet with data and wait for an ACK response. - The packet header is automatically generated. - If enabled, the packet transmission will be retried on failure - """ - if self.ack_retries: - retries_remaining = self.ack_retries - else: - retries_remaining = 1 - got_ack = False - self.retry_counter = 0 # ADDED FOR PYCUBED - self.sequence_number = (self.sequence_number + 1) & 0xFF - while not got_ack and retries_remaining: - self.identifier = self.sequence_number - self.send(data, keep_listening=True) - # Don't look for ACK from Broadcast message - if self.destination == _RH_BROADCAST_ADDRESS: - print("uhf destination=RHbroadcast address (dont look for ack)") - got_ack = True - else: - # wait for a packet from our destination - ack_packet = self.receive(timeout=self.ack_wait, with_header=True) - if ack_packet is not None: - if ack_packet[3] & _RH_FLAGS_ACK: - # check the ID - if ack_packet[2] == self.identifier: - got_ack = True - break - # pause before next retry -- random delay - if not got_ack: - self.retry_counter += 1 # ADDED FOR PYCUBED - print("no uhf ack, sending again...") - # delay by random amount before next try - time.sleep(self.ack_wait + self.ack_wait * random()) - retries_remaining = retries_remaining - 1 - # set retry flag in packet header - self.flags |= _RH_FLAGS_RETRY - self.flags = 0 # clear flags - return got_ack - - # pylint: disable=too-many-branches - def receive( - self, - *, - keep_listening=True, - with_header=False, - with_ack=False, - timeout=None, - debug=False, - view=False - ): - """Wait to receive a packet from the receiver. If a packet is found the payload bytes - are returned, otherwise None is returned (which indicates the timeout elapsed with no - reception). - If keep_listening is True (the default) the chip will immediately enter listening mode - after reception of a packet, otherwise it will fall back to idle mode and ignore any - future reception. - All packets must have a 4-byte header for compatibilty with the - RadioHead library. - The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip - the header before returning the packet to the caller. - If with_header is True then the 4 byte header will be returned with the packet. - The payload then begins at packet[4]. - If with_ack is True, send an ACK after receipt (Reliable Datagram mode) - """ - if hasattr(self, "txrx"): # RX - self.txrx[0].value = False - self.txrx[1].value = True - - timed_out = False - if timeout is None: - timeout = self.receive_timeout - if timeout is not None: - # Wait for the payload_ready signal. This is not ideal and will - # surely miss or overflow the FIFO when packets aren't read fast - # enough, however it's the best that can be done from Python without - # interrupt supports. - # Make sure we are listening for packets. - self.listen() - start = time.monotonic() - timed_out = False - while not timed_out and not self.rx_done(): - if (time.monotonic() - start) >= timeout: - timed_out = True - # Payload ready is set, a packet is in the FIFO. - packet = None - # save last RSSI reading - self.last_rssi = self.rssi(raw=True) - # Enter idle mode to stop receiving other packets. - self.idle() - if not timed_out: - if self.enable_crc and self.crc_error(): - self.crc_error_count += 1 - print("crc error") - if hasattr(self, "crc_errs"): - self.crc_errs += 1 - else: - # Read the data from the FIFO. - # Read the length of the FIFO. - fifo_length = self._read_u8(_RH_RF95_REG_13_RX_NB_BYTES) - # Handle if the received packet is too small to include the 4 byte - # RadioHead header and at least one byte of data --reject this packet and ignore it. - if fifo_length > 0: # read and clear the FIFO if anything in it - current_addr = self._read_u8(_RH_RF95_REG_10_FIFO_RX_CURRENT_ADDR) - self._write_u8(_RH_RF95_REG_0D_FIFO_ADDR_PTR, current_addr) - # packet = bytearray(fifo_length) - packet = self.buffview[:fifo_length] - # Read the packet. - self._read_into(_RH_RF95_REG_00_FIFO, packet) - # Clear interrupt. - self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) - if fifo_length < 5: - print("missing pckt header") - packet = None - else: - if ( - self.node != _RH_BROADCAST_ADDRESS - and packet[0] != _RH_BROADCAST_ADDRESS - and packet[0] != self.node - ): - packet = None - # send ACK unless this was an ACK or a broadcast - elif ( - with_ack - and ((packet[3] & _RH_FLAGS_ACK) == 0) - and (packet[0] != _RH_BROADCAST_ADDRESS) - ): - # delay before sending Ack to give receiver a chance to get ready - if self.ack_delay is not None: - time.sleep(self.ack_delay) - # send ACK packet to sender (data is b'!') - self.send( - b"!", - keep_listening=keep_listening, - destination=packet[1], - node=packet[0], - identifier=packet[2], - flags=(packet[3] | _RH_FLAGS_ACK), - ) - if debug: - print("Sent Ack to {}".format(packet[1])) - if debug: - print("\t{}".format(packet)) - - # # reject Retries if we have seen this idetifier from this source before - # if (self.seen_ids[packet[1]] == packet[2]) and ( - # packet[3] & _RH_FLAGS_RETRY - # ): - # print('duplicate identifier from this source. rejecting...') - # packet = None - # else: # save the packet identifier for this source - self.seen_ids[packet[1]] = packet[2] - if ( - not with_header and packet is not None - ): # skip the header if not wanted - packet = packet[4:] - - if hasattr(self, "txrx"): # RX - self.txrx[0].value = False - self.txrx[1].value = True - - # Listen again if necessary and return the result packet. - if keep_listening: - self.listen() - else: - # Enter idle mode to stop receiving other packets. - self.idle() - # Clear interrupt. - self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) - if view: - return packet - elif packet is not None: - return bytes(packet) - return packet - - def receive_all(self, only_for_me=True, debug=False): - # msg=[] - l = 0 - fifo_length = 0 - self.idle() - if self.enable_crc and self.crc_error(): - self.crc_error_count += 1 - print("crc error") - if hasattr(self, "crc_errs"): - self.crc_errs += 1 - else: - fifo_length = self._read_u8(_RH_RF95_REG_13_RX_NB_BYTES) - - if fifo_length > 0: - current_addr = self._read_u8(_RH_RF95_REG_10_FIFO_RX_CURRENT_ADDR) - self._write_u8(_RH_RF95_REG_0D_FIFO_ADDR_PTR, _RH_RF95_REG_00_FIFO) - self._read_into(_RH_RF95_REG_00_FIFO, _bigbuffer) - for i in range(4): - self._write_from(_RH_RF95_REG_00_FIFO, bytes(64)) - self._write_u8(_RH_RF95_REG_0D_FIFO_ADDR_PTR, _RH_RF95_REG_00_FIFO) - self.listen() - # Clear interrupt. - self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) - packetindex = [] - i = 0 - while i < 253: # 256-4 = 252 - # check first - if self.buffview[i] in self.valid_ids: - # check second - if self.buffview[i + 1] in self.valid_ids: - # make sure not the same - if self.buffview[i] != self.buffview[i + 1]: - # append first - packetindex.append(i) - i = i + 4 - l += 1 - continue - i += 1 - for i in range(l - 1): - # assume packets are back-to-back (read till index of next one) - # if (packetindex[i+1]-packetindex[i]) <= 10: - # msg.append(bytes(self.buffview[packetindex[i]:packetindex[i+1]])) - yield self.buffview[packetindex[i] : packetindex[i + 1]] - # last packet so read until the end of our fifo_length - if packetindex: - # if (packetindex[-1]-(current_addr+fifo_length)) <= 10: - # print('{},{},{}'.format(packetindex[-1],current_addr,fifo_length)) - # msg.append(bytes(self.buffview[packetindex[-1]:current_addr+fifo_length])) - yield self.buffview[packetindex[-1] : current_addr + fifo_length] - else: - self.listen() - # Clear interrupt. - self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) - - # if only_for_me: - # return [i for i in msg if msg[0] is self.node] - # else: - # return msg - - def send_fast(self, data, l): - self.idle() - self._write_u8(_RH_RF95_REG_0D_FIFO_ADDR_PTR, 0x00) # set fifo position - # Write payload. - self._write_from(_RH_RF95_REG_00_FIFO, data) - # Write payload and header length. - self._write_u8(_RH_RF95_REG_22_PAYLOAD_LENGTH, l) - # Turn on transmit mode to send out the packet. - self.transmit() - # Wait for tx done interrupt with explicit polling (not ideal but - # best that can be done right now without interrupts). - _t = time.monotonic() + 5 - while time.monotonic() < _t and not self.tx_done(): - pass - self.idle() - # Clear interrupt. - self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) - return diff --git a/Tests/fsk_test.py b/Tests/fsk_test.py new file mode 100644 index 0000000..e4c618d --- /dev/null +++ b/Tests/fsk_test.py @@ -0,0 +1,61 @@ +import board +import busio +import digitalio +import time +from pysquared import cubesat + +test_message = "Hello There!" +debug_mode = True +number_of_attempts = 0 + +# Radio Configuration Setup Here +radio_cfg = { + "spreading_factor": 8, + "tx_power": 13, # Set as a default that works for any radio + "node": 0x00, + "destination": 0x00, + "receive_timeout": 5, + "enable_crc": False, +} + +# Setting the Radio +cubesat.radio1.spreading_factor = radio_cfg["spreading_factor"] +if cubesat.radio1.spreading_factor > 8: + cubesat.radio1.low_datarate_optimize = True +else: + cubesat.radio1.low_datarate_optimize = False +cubesat.radio1.tx_power = radio_cfg["tx_power"] +cubesat.radio1.receive_timeout = radio_cfg["receive_timeout"] +cubesat.radio1.enable_crc = False + +cubesat.radio1.send(bytes("Hello world KN6YZZ!\r\n", "utf-8")) +print("Sent Hello World message!") + +# Wait to receive packets. +print("Waiting for packets...") + +while True: + packet = cubesat.radio1.receive() + # Optionally change the receive timeout from its default of 0.5 seconds: + # packet = rfm9x.receive(timeout=5.0) + # If no packet was received during the timeout then None is returned. + if packet is None: + # Packet has not been received + print("Received nothing! Listening again...") + else: + # Received a packet! + # Print out the raw bytes of the packet: + print(f"Received (raw bytes): {packet}") + # And decode to ASCII text and print it too. Note that you always + # receive raw bytes and need to convert to a text format like ASCII + # if you intend to do string processing on your data. Make sure the + # sending side is sending ASCII data before you try to decode! + try: + packet_text = str(packet, "ascii") + print(f"Received (ASCII): {packet_text}") + except UnicodeError: + print("Hex data: ", [hex(x) for x in packet]) + # Also read the RSSI (signal strength) of the last received message and + # print it. + rssi = cubesat.radio1.last_rssi + print(f"Received signal strength: {rssi} dB") diff --git a/Tests/radio_test.py b/Tests/radio_test.py index c2b885c..9686439 100755 --- a/Tests/radio_test.py +++ b/Tests/radio_test.py @@ -11,7 +11,11 @@ test_message = "Hello There!" debug_mode = True number_of_attempts = 0 -cube_callsign = "K06AZM" +cube_callsign = "" + +if cube_callsign == "": + print("No cube callsign!") + exit() # Radio Configuration Setup Here radio_cfg = { @@ -23,6 +27,14 @@ "enable_crc": False, } +if input("FSK or LoRa? [L/f]") == "F": + cubesat.f_fsk = True + del cubesat + print("Resetting in FSK") + from pysquared import cubesat + +print("FSK: " + str(cubesat.f_fsk)) + options = ["A", "B", "C"] # Setting the Radio @@ -158,7 +170,12 @@ def client(passcode): elif chosen_command == "7": packet = b"\x00\x00\x00\x00" + passcode.encode() + b"\x56\xc4" elif chosen_command == "8": - packet = b"\x00\x00\x00\x00" + passcode.encode() + b"RP" + input("Message to Repeat: ") + packet = ( + b"\x00\x00\x00\x00" + + passcode.encode() + + b"RP" + + input("Message to Repeat: ") + ) else: print( "Command is not valid or not implemented open radio_test.py and add them yourself!" @@ -169,24 +186,28 @@ def client(passcode): msg = cubesat.radio1.receive() if msg is not None: - msg_string = ''.join([chr(b) for b in msg]) + msg_string = "".join([chr(b) for b in msg]) print(f"Message Received {msg_string}") print(msg_string[:6]) - if msg_string[:6]==cube_callsign: + if msg_string[:6] == cube_callsign: time.sleep(0.1) tries += 1 if tries > 5: print("We tried 5 times! And there was no response. Quitting.") break - success = cubesat.radio1.send(packet) + success = cubesat.radio1.send_with_ack(packet) print("Success " + str(success)) if success is True: response = cubesat.radio1.receive(keep_listening=True) time.sleep(0.5) if response is not None: - print("msg: {}, RSSI: {}".format(response, cubesat.radio1.last_rssi - 137)) + print( + "msg: {}, RSSI: {}".format( + response, cubesat.radio1.last_rssi - 137 + ) + ) break else: debug_print("No response, trying again (" + str(tries) + ")")