Skip to content

Commit

Permalink
Refactor functionality to collect and parse
Browse files Browse the repository at this point in the history
  • Loading branch information
ttu committed Jan 26, 2025
1 parent 5264b90 commit 2ab5b17
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 65 deletions.
131 changes: 67 additions & 64 deletions ruuvitag_sensor/adapters/bleak_ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,71 +153,11 @@ async def get_history_data(

# Get the history service
# https://docs.ruuvi.com/communication/bluetooth-connection/nordic-uart-service-nus
history_service = next(
(service for service in client.services if service.uuid.lower() == RUUVI_HISTORY_SERVICE_UUID.lower()),
None,
)
if not history_service:
raise RuntimeError(f"History service not found - device {mac} may not support history")

# Get characteristics
tx_char = history_service.get_characteristic(RUUVI_HISTORY_TX_CHAR_UUID)
rx_char = history_service.get_characteristic(RUUVI_HISTORY_RX_CHAR_UUID)

if not tx_char or not rx_char:
raise RuntimeError("Required characteristics not found")

# Set up notification handler
notification_received = asyncio.Event()

def notification_handler(_, data: bytearray):
history_data.append(data)
notification_received.set()

# Enable notifications
await client.start_notify(tx_char, notification_handler)

# Request history data
command = bytearray([0x26]) # Get logged history command
if start_time:
timestamp = int(start_time.timestamp())
command.extend(struct.pack("<I", timestamp))

await client.write_gatt_char(rx_char, command)
log.debug("Requested history data from device %s", mac)

# Wait for initial notification
await asyncio.wait_for(notification_received.wait(), timeout=10.0)

# Wait for more data
try:
while True:
notification_received.clear()
await asyncio.wait_for(notification_received.wait(), timeout=5.0)
# Check if we've reached the maximum number of items
if max_items and len(history_data) >= max_items:
log.debug("Reached maximum number of items (%d)", max_items)
break
except asyncio.TimeoutError:
# No more data received for 1 second - assume transfer complete
pass

# Parse collected data
parsed_data = []
for data_point in history_data:
if len(data_point) < 10: # Minimum valid data length
continue

timestamp = struct.unpack("<I", data_point[0:4])[0]
measurement = self._parse_history_data(data_point[4:])
if measurement:
measurement["timestamp"] = datetime.fromtimestamp(timestamp)
parsed_data.append(measurement)

log.info("Downloaded %d history entries from device %s", len(parsed_data), mac)
history_data = await self._collect_history_data(client, start_time, max_items)
parsed_data = self._parse_history_entries(history_data)
return parsed_data
except Exception as e:
log.error(f"Failed to get history data from device {mac}: {e}")
log.error("Failed to get history data from device %s: %s", mac, e)
finally:
if client:
await client.disconnect()
Expand All @@ -241,6 +181,69 @@ async def _connect_gatt(self, mac: str) -> BleakClient:
await client.connect()
return client

async def _collect_history_data(
self, client: BleakClient, start_time: Optional[datetime], max_items: Optional[int]
) -> List[bytearray]:
history_data: List[bytearray] = []

history_service = next(
(service for service in client.services if service.uuid.lower() == RUUVI_HISTORY_SERVICE_UUID.lower()),
None,
)
if not history_service:
raise RuntimeError("History service not found - device may not support history")

tx_char = history_service.get_characteristic(RUUVI_HISTORY_TX_CHAR_UUID)
rx_char = history_service.get_characteristic(RUUVI_HISTORY_RX_CHAR_UUID)

if not tx_char or not rx_char:
raise RuntimeError("Required characteristics not found")

notification_received = asyncio.Event()

def notification_handler(_, data: bytearray):
history_data.append(data)
notification_received.set()

await client.start_notify(tx_char, notification_handler)

command = bytearray([0x26])
if start_time:
timestamp = int(start_time.timestamp())
command.extend(struct.pack("<I", timestamp))

await client.write_gatt_char(rx_char, command)
log.debug("Sent history command to device")

await asyncio.wait_for(notification_received.wait(), timeout=10.0)

try:
while True:
notification_received.clear()
await asyncio.wait_for(notification_received.wait(), timeout=5.0)
if max_items and len(history_data) >= max_items:
log.debug("Reached maximum number of items (%d)", max_items)
break
except asyncio.TimeoutError:
pass

return history_data

def _parse_history_entries(self, history_data: List[bytearray]) -> List[dict]:
parsed_data = []
for data_point in history_data:
if len(data_point) < 10:
continue

timestamp = struct.unpack("<I", data_point[0:4])[0]
measurement = self._parse_history_data(data_point[4:])
if measurement:
measurement["timestamp"] = datetime.fromtimestamp(timestamp)
parsed_data.append(measurement)

log.info("Downloaded %d history entries", len(parsed_data))
return parsed_data

def _parse_history_data(self, data: bytes) -> Optional[dict]:
"""
Parse history data point from RuuviTag
Expand All @@ -263,5 +266,5 @@ def _parse_history_data(self, data: bytes) -> Optional[dict]:
"data_format": 5, # History data uses similar format to data format 5
}
except Exception as e:
log.error(f"Failed to parse history data: {e}")
log.error("Failed to parse history data: %s", e)
return None
2 changes: 1 addition & 1 deletion ruuvitag_sensor/ruuvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,4 @@ async def download_history(
except asyncio.TimeoutError:
raise TimeoutError(f"History download timed out after {timeout} seconds")
except Exception as e:
raise RuntimeError(f"Failed to download history: {str(e)}") from e
raise RuntimeError(f"Failed to download history: {e!s}") from e

0 comments on commit 2ab5b17

Please sign in to comment.