Skip to content

Commit

Permalink
feat: download history
Browse files Browse the repository at this point in the history
  • Loading branch information
ttu committed Nov 27, 2024
1 parent ccd71b9 commit 5e7de41
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 2 deletions.
18 changes: 18 additions & 0 deletions examples/download_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio

import ruuvitag_sensor.log
from ruuvitag_sensor.ruuvi import RuuviTagSensor

ruuvitag_sensor.log.enable_console()


async def main():
# On macOS, the device address is not a MAC address, but a system specific ID
# mac = "CA:F7:44:DE:EB:E1"
mac = "873A13F5-ED14-AEE1-E446-6ACF31649A1D"
data = await RuuviTagSensor.download_history(mac)
print(data)


if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
143 changes: 141 additions & 2 deletions ruuvitag_sensor/adapters/bleak_ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@
import logging
import os
import re
import struct
import sys
from typing import AsyncGenerator, List, Tuple
from datetime import datetime
from typing import AsyncGenerator, List, Optional, Tuple

from bleak import BleakScanner
from bleak import BleakClient, BleakScanner
from bleak.backends.scanner import AdvertisementData, AdvertisementDataCallback, BLEDevice

from ruuvitag_sensor.adapters import BleCommunicationAsync
from ruuvitag_sensor.adapters.utils import rssi_to_hex
from ruuvitag_sensor.ruuvi_types import MacAndRawData, RawData

MAC_REGEX = "[0-9a-f]{2}([:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$"
RUUVI_HISTORY_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
RUUVI_HISTORY_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
RUUVI_HISTORY_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"


def _get_scanner(detection_callback: AdvertisementDataCallback, bt_device: str = ""):
Expand Down Expand Up @@ -121,3 +126,137 @@ async def get_first_data(mac: str, bt_device: str = "") -> RawData:
await data_iter.aclose()

return data or ""

async def get_history_data(self, mac: str, start_time: Optional[datetime] = None) -> List[dict]:
"""
Get history data from a RuuviTag using GATT connection.
Args:
mac (str): MAC address of the RuuviTag
start_time (datetime, optional): Start time for history data
Returns:
List[dict]: List of historical sensor readings
Raises:
RuntimeError: If connection fails or required services not found
"""
history_data: List[bytearray] = []
client = None

try:
# Connect to device
client = await self._connect_gatt(mac)
log.debug("Connected to device %s", mac)

# Get the history service
# https://docs.ruuvi.com/communication/bluetooth-connection/nordic-uart-service-nus
services = await client.get_services()
history_service = next(
(service for service in services if service.uuid.lower() == RUUVI_HISTORY_SERVICE_UUID.lower()),
None,
)
if not history_service:
raise RuntimeError(f"History service not found - device {mac} may not support history")

# Get characteristics
tx_char = history_service.get_characteristic(RUUVI_HISTORY_TX_CHAR_UUID)
rx_char = history_service.get_characteristic(RUUVI_HISTORY_RX_CHAR_UUID)

if not tx_char or not rx_char:
raise RuntimeError("Required characteristics not found")

# Set up notification handler
notification_received = asyncio.Event()

def notification_handler(_, data: bytearray):
history_data.append(data)
notification_received.set()

# Enable notifications
await client.start_notify(tx_char, notification_handler)

# Request history data
command = bytearray([0x26]) # Get logged history command
if start_time:
timestamp = int(start_time.timestamp())
command.extend(struct.pack("<I", timestamp))

await client.write_gatt_char(rx_char, command)
log.debug("Requested history data from device %s", mac)

# Wait for initial notification
await asyncio.wait_for(notification_received.wait(), timeout=5.0)

# Wait for more data
try:
while True:
notification_received.clear()
await asyncio.wait_for(notification_received.wait(), timeout=1.0)
except asyncio.TimeoutError:
# No more data received for 1 second - assume transfer complete
pass

# Parse collected data
parsed_data = []
for data_point in history_data:
if len(data_point) < 10: # Minimum valid data length
continue

timestamp = struct.unpack("<I", data_point[0:4])[0]
measurement = self._parse_history_data(data_point[4:])
if measurement:
measurement["timestamp"] = datetime.fromtimestamp(timestamp)
parsed_data.append(measurement)

log.info("Downloaded %d history entries from device %s", len(parsed_data), mac)
return parsed_data
except Exception as e:
log.error(f"Failed to get history data from device {mac}: {e}")
finally:
if client:
await client.disconnect()
log.debug("Disconnected from device %s", mac)
return []

async def _connect_gatt(self, mac: str) -> BleakClient:
"""
Connect to a BLE device using GATT.
NOTE: On macOS, the device address is not a MAC address, but a system specific ID
Args:
mac (str): MAC address of the device to connect to
Returns:
BleakClient: Connected BLE client
"""
client = BleakClient(mac)
# TODO: Implement retry logic. connect fails for some reason pretty often.
await client.connect()
return client

def _parse_history_data(self, data: bytes) -> Optional[dict]:
"""
Parse history data point from RuuviTag
Args:
data (bytes): Raw history data point
Returns:
Optional[dict]: Parsed sensor data or None if parsing fails
"""
try:
temperature = struct.unpack("<h", data[0:2])[0] * 0.005
humidity = struct.unpack("<H", data[2:4])[0] * 0.0025
pressure = struct.unpack("<H", data[4:6])[0] + 50000

return {
"temperature": temperature,
"humidity": humidity,
"pressure": pressure,
"data_format": 5, # History data uses similar format to data format 5
}
except Exception as e:
log.error(f"Failed to parse history data: {e}")
return None
53 changes: 53 additions & 0 deletions ruuvitag_sensor/ruuvi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import time
from datetime import datetime
from multiprocessing import Manager
from multiprocessing.managers import ListProxy
from typing import AsyncGenerator, Callable, Dict, Generator, List, Optional
Expand Down Expand Up @@ -340,3 +342,54 @@ def _parse_data(
return None

return (mac_to_send, decoded)

@staticmethod
async def get_history_async(mac: str, start_time: Optional[datetime] = None) -> List[SensorData]:
"""
Get history data from a RuuviTag that supports it (firmware 3.30.0+)
Args:
mac (str): MAC address of the RuuviTag
start_time (datetime, optional): Start time for history data. If None, gets all available data
Returns:
List[SensorData]: List of historical sensor readings
"""
throw_if_not_async_adapter(ble)
return await ble.get_history_data(mac, start_time)

@staticmethod
async def download_history(mac: str, start_time: Optional[datetime] = None, timeout: int = 300) -> List[SensorData]:
"""
Download history data from a RuuviTag. Requires firmware version 3.30.0 or newer.
Args:
mac (str): MAC address of the RuuviTag. On macOS use UUID instead.
start_time (Optional[datetime]): If provided, only get data from this time onwards
timeout (int): Maximum time in seconds to wait for history download (default: 30)
Returns:
List[SensorData]: List of historical measurements, ordered by timestamp
Raises:
RuntimeError: If connection fails or device doesn't support history
TimeoutError: If download takes longer than timeout
"""
throw_if_not_async_adapter(ble)

# if not re.match("[0-9A-F]{2}(:[0-9A-F]{2}){5}$", mac.upper()):
# raise ValueError(f"Invalid MAC address: {mac}")

try:
history = await asyncio.wait_for(ble.get_history_data(mac, start_time), timeout=timeout)

# Sort by timestamp if present
if history and "timestamp" in history[0]:
history.sort(key=lambda x: x["timestamp"])

return history

except asyncio.TimeoutError:
raise TimeoutError(f"History download timed out after {timeout} seconds")
except Exception as e:
raise RuntimeError(f"Failed to download history: {str(e)}") from e

0 comments on commit 5e7de41

Please sign in to comment.