Skip to content

Commit

Permalink
updated encoding protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
ftylitak committed Jun 10, 2024
1 parent f7464d4 commit ab95778
Showing 1 changed file with 54 additions and 17 deletions.
71 changes: 54 additions & 17 deletions insighioNode/apps/demo_console/lora_custom_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
LOCATION_4_20 = const(0x60)
LOCATION_MODEM = const(0x70)
LOCATION_GPS = const(0x71)
LOCATION_WEATHER_STATION = const(0x80)
LOCATION_RAIN_GAUGE = const(0x81)
LOCATION_SOLAR_SENSOR = const(0x82)
LOCATION_PULSE_COUNTER = const(0x83)
LOCATION_WEATHER_STATION_WIND = const(0x84)

TYPE_DEVICE_ID = const(0x01)
TYPE_RESET_CAUSE = const(0x02)
Expand Down Expand Up @@ -50,6 +55,15 @@
TYPE_VAPOR_PRESSURE_DEFICIT = const(0x24)
TYPE_ATMOSPHERIC_PRESSURE = const(0x25)
TYPE_TEMPERATURE_FAH = const(0x26)
TYPE_DEVIATION = const(0x27)
TYPE_RADIATION = const(0x28)
TYPE_COUNT = const(0x29)
TYPE_HEIGHT = const(0x2a)
TYPE_PERIOD = const(0x2b)
TYPE_NOISE = const(0x2c)
TYPE_DIRECTION_DEG = const(0x2d)
TYPE_DIRECTION_ID = const(0x2e)
TYPE_SPEED = const(0x2f)
TYPE_FORMULA = const(0x30)
TYPE_LORA_JOIN_DUR = const(0xC1)
TYPE_GPS_HDOP = const(0xD0)
Expand All @@ -73,6 +87,17 @@ def get_location_by_key(key):
return LOCATION_INTERNAL_BOARD
elif loc_name == "cpu":
return LOCATION_INTERNAL_CPU
elif loc_name == "pcnt":
return LOCATION_PULSE_COUNTER
elif loc_name == "rain":
return LOCATION_RAIN_GAUGE
elif loc_name == "solar":
return LOCATION_SOLAR_SENSOR
elif loc_name == "wth":
if len(parts) > 2 and position == "wind":
return LOCATION_WEATHER_STATION_WIND
else:
return LOCATION_WEATHER_STATION
elif loc_name == "tsl2561":
return LOCATION_I2C + 0
elif loc_name == "si7021":
Expand All @@ -89,6 +114,8 @@ def get_location_by_key(key):
return LOCATION_I2C + 6
elif loc_name == "4-20" and position[0] >= "0" and position[0] <= "9":
return LOCATION_4_20 + int(position[0])
elif loc_name == "sdi12" and position[0] >= "0" and position[0] <= "9":
return LOCATION_SDI12 + int(position[0])
elif len(parts) > 2:
if position == "ap1":
return LOCATION_A_P1
Expand Down Expand Up @@ -156,6 +183,27 @@ def create_message(device_id, measurements):
binary_data += struct.pack(">BBI", TYPE_GPS_LAT, LOCATION_GPS, round(value * 100000))
elif key == "gps_lon":
binary_data += struct.pack(">BBI", TYPE_GPS_LON, LOCATION_GPS, round(value * 100000))
elif key == "gps_dur":
binary_data += struct.pack(">BBI", TYPE_UPTIME, LOCATION_GPS, value)

elif key.endswith("_deviation"):
binary_data += struct.pack(">BBh", TYPE_DEVIATION, get_location_by_key(key), round(value * 100))
elif key.endswith("_radiation"):
binary_data += struct.pack(">BBH", TYPE_RADIATION, get_location_by_key(key), value)
elif key.endswith("_count"):
binary_data += struct.pack(">BBH", TYPE_COUNT, get_location_by_key(key), round(value * 10))
elif key.endswith("_height"):
binary_data += struct.pack(">BBH", TYPE_HEIGHT, get_location_by_key(key), round(value * 10))
elif key.endswith("_period"):
binary_data += struct.pack(">BBH", TYPE_PERIOD, get_location_by_key(key), round(value * 10))
elif key.endswith("_noise"):
binary_data += struct.pack(">BBH", TYPE_NOISE, get_location_by_key(key), round(value * 10))
elif key.endswith("_d"):
binary_data += struct.pack(">BBH", TYPE_DIRECTION_DEG, get_location_by_key(key), round(value * 10))
elif key.endswith("_direction"):
binary_data += struct.pack(">BBB", TYPE_DIRECTION_ID, get_location_by_key(key), value)
elif key.endswith("_speed"):
binary_data += struct.pack(">BBh", TYPE_SPEED, get_location_by_key(key), round(value * 100))

# explicit cases should be added here
# elif key == "new_explicit_value"
Expand All @@ -164,7 +212,7 @@ def create_message(device_id, measurements):
binary_data += struct.pack(">BBH", TYPE_LIGHT_LUX, get_location_by_key(key), value)
elif key.endswith("_temp"):
binary_data += struct.pack(">BBh", TYPE_TEMPERATURE_CEL, get_location_by_key(key), round(value * 100))
elif key.endswith("_humidity"):
elif key.endswith("_humidity") or key.endswith("_hum"):
binary_data += struct.pack(">BBH", TYPE_HUMIDITY, get_location_by_key(key), round(value * 100))
elif key.endswith("_co2"):
binary_data += struct.pack(">BBH", TYPE_CO2, get_location_by_key(key), round(value * 100))
Expand Down Expand Up @@ -213,23 +261,12 @@ def create_message(device_id, measurements):
binary_data += struct.pack(">BBH", TYPE_GENERIC | 0x01, get_location_by_key(key), value)

else:
keyParts = key.split("_")
index = 0
if len(keyParts) == 3:
try:
index = int(keyParts[2])
except Exception as e:
pass

if type(value) == str:
value = bytes(value, 'utf-8') # Or other appropriate encoding
binary_data += struct.pack("I%ds" % (len(value),), len(value), value)
elif type(value) == int:
binary_data += struct.pack(">BBi", TYPE_GENERIC | index, get_location_by_key(key), value)
elif type(value) == float:
binary_data += struct.pack(">BBi", TYPE_GENERIC | index, get_location_by_key(key), round(value * 100))
index = measurement_index if measurement_index is not None else 0

if type(value) == int or type(value) == float:
binary_data += struct.pack(">BBi", TYPE_GENERIC + index, get_location_by_key(key), round(value * 100))
else:
logging.error("Unidentified measurement: key: {}, value: {}, type: {}".format(key, value, type(value)))
logging.error("Unidentified or unaccepted measurement: key: {}, value: {}, type: {}".format(key, value, type(value)))
logging.info("message: size[{}], data:[{}]".format(len(binary_data), ubinascii.hexlify(binary_data).decode("utf-8")))
except Exception as e:
logging.exception(e, "Error encoding lora message")
Expand Down

0 comments on commit ab95778

Please sign in to comment.