-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
executable file
·159 lines (137 loc) · 5.13 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python
from influxdb import InfluxDBClient
import datetime
# Home Assistant service to send query InfluxDB and place response points into an entity
# as attributes in the form of 'key: value' pairs.
# https://hacs-pyscript.readthedocs.io/en/latest/reference.html?highlight=service#service-service-name
#
# https://www.home-assistant.io/docs/blueprint/selectors/
@service
def influxdb_query_to_entity(database=None, query=None, key_field_name='time', value_field_name='sum', entity_id=None, unit_of_measurement=None, friendly_name=None, icon=None):
"""yaml
name: InfluxDB query to entity
description: Home Assistant service to send query InfluxDB and place response points into an entity as attributes in the form of key and value pairs.
fields:
database:
name: Database
description: InfluxDB database name
example: octopus
required: true
selector:
text:
query:
name: Query
description: InfluxDB query. The query should return at least the two fields specified by key_field_name and value_field_name. The field `time` is always returned so typically `query` will only specify one field
example: SELECT sum("consumption") FROM "electricity" WHERE time >= now() - 30d GROUP BY time(1d) fill(none)
required: true
selector:
text:
key_field_name:
name: Key field name
description: Name of the field returned by the query that will be used as the attribute key
example: time
default: time
required: false
selector:
text:
value_field_name:
name: Value field name
description: Name of the field returned by the query that will be used as the attribute value
example: sum
default: sum
required: false
selector:
text:
entity_id:
name: Entity ID
description: Entity in Home Assistant to create or update
example: sensor.octopus_electricity_consumption_30days
required: true
selector:
entity:
unit_of_measurement:
name: Unit of measurement
description: If specified, add the entity attribute unit_of_measurement with the value
example: kWh
required: false
selector:
text:
friendly_name:
name: Friendly name
description: If specified, add the entity attribute friendly_name with the value
example: Import
required: false
selector:
text:
icon:
name: Icon
description: If specified, add the entity attribute icon with the value
example: mdi:flash
required: false
selector:
text:
"""
log.debug('received parameters: ' + str(locals()))
if database is None:
log.error('"database" is required but not passed on service call to influxdb_query')
return
if query is None:
log.error('"query" is required but not passed on service call to influxdb_query')
return
if entity_id is None:
log.error('"entity_id" is required but not passed on service call to influxdb_query')
return
# Connect to InfluxDB
# https://influxdb-python.readthedocs.io/en/latest/api-documentation.html
influxdbclient = InfluxDBClient(
host=get_config('host'),
port=get_config('port'),
username=get_config('username'),
password=get_config('password'),
database=database
)
# Call InfluxDB to execute query
# Use Pyscript task.executor to avoid I/O blocking
# https://hacs-pyscript.readthedocs.io/en/latest/reference.html?highlight=task.executor#task-executor
try:
response = task.executor(influxdbclient.query, query)
except:
log.error('exception when processing parameters: ' + str(locals()))
raise
log.info('query result: ' + str(response))
# Get the points from the query
points = response.get_points()
attributes = {}
# Set the entity_id attributes if they were passed on service call
if unit_of_measurement:
attributes['unit_of_measurement'] = unit_of_measurement
if friendly_name:
attributes['friendly_name'] = friendly_name
if icon:
attributes['icon'] = icon
# Add each InfluxDB query result point as an entity_id attribute
lastTime = ''
for point in points:
attributes[point[key_field_name]] = point[value_field_name]
lastPoint = point['time']
# Only create entity if the query returns at least 1 result
if lastPoint:
# Set the entity_id value to the last query value and attributes to the query points
state.set(entity_id, value=lastPoint, new_attributes=attributes)
# Get configuration from Pyscript
# https://hacs-pyscript.readthedocs.io/en/latest/reference.html?highlight=load#configuration
def get_config(name):
value = pyscript.app_config.get(name)
if value is None:
log.error('"' + name + '" is required parameter but not defined in Pyscript configuration for application')
return value
# Pyscript startup and app reload
# https://hacs-pyscript.readthedocs.io/en/latest/reference.html?highlight=load#time-trigger
@time_trigger('startup')
def load():
log.info(f'app has started')
# Check required configuration
get_config('host')
get_config('port')
get_config('username')
get_config('password')