-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathaquariumctl.py
124 lines (100 loc) · 3.96 KB
/
aquariumctl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3.7
import helper
# Use Mock pins while running in Mac OS or Windows
helper.checkSimulate()
# Load the configuration data
import json
print("Loading & Initialising GPIO controls from config file...")
with open('config.json') as f:
data = json.load(f)
# Building all GPIOZero controls using configuration loaded from config file
import gpiozero
controls = dict()
for setting in data["controls"]:
if setting["port"] not in controls:
controls[setting["port"]] = gpiozero.DigitalOutputDevice(setting["port"], active_high=setting["activeHigh"], initial_value=False)
import time
from repeatedtimer import RepeatedTimer
controlStates = dict()
forcesControlStates = dict()
def updateCurrentStates():
global data
global controlStates
for port, obj in controlStates.items():
controlStates[port]["status"] = False
for setting in data["controls"]:
port = setting["port"]
if port not in controlStates:
controlStates[port] = {"name":setting["name"], "status":False}
controlStates[port]["name"] = setting["name"]
controlStates[port]["status"] = controlStates[port]["status"] or helper.isTimeBetween(setting["startTime"], setting["endTime"])
def resetForcedControlStates():
global controlStates
global forcesControlStates
for port, obj in controlStates.items():
forcesControlStates[port] = controlStates[port]["status"]
updateCurrentStates()
resetForcedControlStates()
def setControlStates():
global forcesControlStates
global controlStates
global controls
for port, value in controlStates.items():
control = controls[port]
status = False
if value["status"] == forcesControlStates[port]:
status = value["status"]
else:
status = forcesControlStates[port]
if status:
print("Switching ON the control " + value["name"])
control.on()
else:
print("Switching OFF the control " + value["name"])
control.off()
def checkControls():
print("\nResetting Controls...")
setControlStates()
print("\n")
def codeUpdate():
print("\nUpdating code...")
helper.checkUpdates()
print("\n")
print("\nstarting...\n")
updateTimer = RepeatedTimer(5, updateCurrentStates)
controlTimer = RepeatedTimer(5, checkControls)
resetTimer = RepeatedTimer(60, resetForcedControlStates)
updateCodeTimer = RepeatedTimer(60 * 60 * 3, codeUpdate)
import flask
from flask import request, jsonify, abort
app = flask.Flask(__name__, static_folder='frontend/webapp/')
app.config["DEBUG"] = False
@app.route('/api/controls', methods=['GET'])
def get_control_status():
states = dict()
for port, value in controlStates.items():
status = False
if value["status"] == forcesControlStates[port]:
status = value["status"]
else:
status = forcesControlStates[port]
states[port] = {"name":value["name"], "status":status}
return jsonify(states)
@app.route('/api/controls/<int:port>', methods=['PUT'])
def update_task(port):
global forcesControlStates
status = request.json['status']
if port not in forcesControlStates:
abort(404)
if not request.json:
abort(400)
if 'status' in request.json and type(request.json['status']) is not bool:
abort(401)
forcesControlStates[port] = status
return jsonify({'done': True, 'obj':forcesControlStates[port]})
app.run(host='0.0.0.0', port=5000, debug=False)
# Stop timers
updateTimer.stop()
controlTimer.stop()
resetTimer.stop()
updateCodeTimer.stop()