-
Notifications
You must be signed in to change notification settings - Fork 0
/
connecteddrive_influx.py
executable file
·89 lines (75 loc) · 2.47 KB
/
connecteddrive_influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
import os
import pprint
import sys
from typing import Any, Callable, Dict
import bimmer_connected.account
import bimmer_connected.country_selector
import bimmer_connected.state
import bimmer_connected.vehicle
import influxdb
import localsettings as settings
def convert_attributes(attributes: Dict[str, Any]) -> Dict[str, Any]:
converted = {}
if 'STATUS' in attributes:
attributes = attributes['STATUS']
for k, v in attributes.items():
if k == "position":
converted.update({
p for p in v.items()
if p[0] != 'status'
})
else:
converted[k] = v
return converted
def history_attr(attr_name: str) -> bool:
return (attr_name == 'mileage' or
attr_name == 'lat' or
attr_name == 'lon')
def status_attr(attr_name: str) -> bool:
return (history_attr(attr_name) or
attr_name.startswith('door') or
attr_name.startswith('window') or
attr_name.startswith('hood') or
attr_name.endswith('roof') or
attr_name.startswith('trunk') or
attr_name.startswith('remaining') or
attr_name.startswith('max') or
attr_name.startswith('charging') or
attr_name.endswith('Status')
)
def write_to_influx(client: influxdb.InfluxDBClient,
filter: Callable[[str], bool],
vehicle: bimmer_connected.vehicle.ConnectedDriveVehicle) -> None:
data = convert_attributes(vehicle.state.attributes)
fields = {k: v for k, v in data.items() if filter(k)}
data = [{
'measurement': 'status',
'tags': {
'vin': vehicle.vin,
'name': vehicle.name
},
'time': vehicle.state.updateTime,
'fields': fields
}]
client.write_points(data, time_precision='s')
region = bimmer_connected.country_selector.get_region_from_name(settings.connected_region)
account = bimmer_connected.account.ConnectedDriveAccount(
settings.connected_username,
settings.connected_password,
region
)
influx = influxdb.InfluxDBClient(host=settings.influxdb_hostname, database="connecteddrive")
for vehicle in account.vehicles:
vehicle.state.update_data()
if sys.stdout.isatty():
print(vehicle.name)
pprint.pprint(vehicle.attributes)
pprint.pprint(vehicle.state.attributes)
for direction in bimmer_connected.vehicle.VehicleViewDirection:
filename = f"{vehicle.vin}_{direction.name}.png"
if os.path.exists(filename):
continue
with open(filename, 'wb') as output:
output.write(vehicle.get_vehicle_image(2000, 2000, direction))
write_to_influx(influx, status_attr, vehicle)