-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathlunarsensor.py
74 lines (50 loc) · 1.62 KB
/
lunarsensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import json
import logging
import os
import time
import aiohttp
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
logging.basicConfig()
log = logging.getLogger("lunarsensor")
log.level = logging.DEBUG if os.getenv("SENSOR_DEBUG") == "1" else logging.INFO
POLLING_SECONDS = 2
CLIENT = None
last_lux = 400
@app.on_event("startup")
async def startup_event():
global CLIENT
CLIENT = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=8))
await CLIENT.__aenter__()
@app.on_event("shutdown")
async def shutdown() -> None:
await CLIENT.__aexit__(None, None, None)
async def make_lux_response():
global last_lux
try:
lux = await read_lux()
except Exception as exc:
log.exception(exc)
else:
if lux is not None and lux != last_lux:
log.debug(f"Sending {lux} lux")
last_lux = lux
return {"id": "sensor-ambient_light", "state": f"{last_lux} lx", "value": last_lux}
async def sensor_reader(request):
while not await request.is_disconnected():
yield {"event": "state", "data": json.dumps(await make_lux_response())}
time.sleep(POLLING_SECONDS)
@app.get("/sensor/ambient_light")
async def sensor():
return await make_lux_response()
@app.get("/events")
async def events(request: Request):
event_generator = sensor_reader(request)
return EventSourceResponse(event_generator)
# Do the sensor reading logic below
async def read_lux():
if os.path.exists("/tmp/lux"):
with open("/tmp/lux") as f:
return float(f.read().strip() or "400.0")
return 400.00