-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathacmod.py
124 lines (108 loc) · 5.12 KB
/
acmod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import sys
from urllib.request import urlopen
import json
import time
from calendar import timegm
from datetime import datetime
import requests
import logging
from tuya_connector import TuyaOpenAPI, TUYA_LOGGER
import acpriv
tgt_temp_cool=19
tgt_temp_heat=26
def ac_get_data():
# get main result array
req_params = { "apiKey": acpriv.ss_key, "fields": "measurements,smartMode,acState,lastStateChange" }
ret_js = requests.get(acpriv.ss_url_v2_base, params=req_params)
ret_arr = json.loads(ret_js.text)
if ret_arr['status'] != "success":
raise RuntimeError("return is not success")
req_params = { "apiKey": acpriv.ss_key }
# get ac state change 0, insert into result array
ret_js = requests.get(acpriv.ss_url_v2_base + "/acStates", params=req_params)
ret_arr2 = json.loads(ret_js.text)
if ret_arr2['status'] != "success":
raise RuntimeError("return is not success")
# Hack: change to sensibo api resulted in updates of smart mode temp
# to be registered as events with the same reason ("Trigger") as the
# actual trigger events. However, they differ in that the "changedProperties"
# array is empty. As a way to notify the caller, we append the string "Change"
# to the reason when "changedProperties" is empty, to prevent code
# expecting "trigger" to mean actual smart mode operation from running
if (len(ret_arr2['result']) > 0):
if (len(ret_arr2['result'][0]['changedProperties']) == 0):
ret_arr2['result'][0]['reason'] += 'Change'
# patch last state change 0 into the main result array
ret_arr['result']['lastACStateChange'] = ret_arr2['result'][0];
return ret_arr
# Set AC run state and mode
def ac_set_state(b_on, s_mode, s_temp, s_fan):
req_params = { "apiKey": acpriv.ss_key }
req_data = { "acState": { "on": b_on, "mode": s_mode, "targetTemperature": s_temp, "fanLevel": s_fan }}
ret_js = requests.post(acpriv.ss_url_v2_base + "/acStates", params=req_params, json=req_data)
ret_arr = json.loads(ret_js.text)
if ret_arr['status'] != "success":
raise RuntimeError("return is not success")
def ac_set_state_after(b_on, s_mode, s_temp, s_fan, i_delay):
req_params = { "apiKey": acpriv.ss_key }
req_data = { "minutesFromNow": i_delay, "acState": { "on": b_on, "mode": s_mode, "targetTemperature": s_temp, "fanLevel": s_fan }}
ret_js = requests.put(acpriv.ss_url_v1_base + "/timer", params=req_params, json=req_data)
ret_arr = json.loads(ret_js.text)
if ret_arr['status'] != "success":
raise RuntimeError("return is not success")
# Given AC run state, flip it to the opposite
def ac_patch_state(b_on):
req_params = { "apiKey": acpriv.ss_key }
req_data = { "currentAcState": {"on": b_on}, "newValue": not b_on, "reason": "StateCorrectionByUser"}
ret_js = requests.patch(acpriv.ss_url_v2_base + "/acStates/on", params=req_params, json=req_data)
ret_arr = json.loads(ret_js.text)
if ret_arr['status'] != "success":
raise RuntimeError("return is not success")
def ac_set_smartmode(b_on, s_mode, i_lo_temp, i_hi_temp, i_tgt_temp = 0):
req_params = { "apiKey": acpriv.ss_key }
if (s_mode == "cool"):
if (i_tgt_temp == 0):
i_tgt_temp = tgt_temp_cool
req_data = { "enabled": b_on,
"lowTemperatureThreshold": i_lo_temp,
"lowTemperatureState": { "on": False },
"highTemperatureThreshold": i_hi_temp,
"highTemperatureState": { "on": True, "mode": "cool", "fanLevel": "auto", "targetTemperature": i_tgt_temp }
}
elif (s_mode == "heat"):
if (i_tgt_temp == 0):
i_tgt_temp = tgt_temp_heat
req_data = { "enabled": b_on,
"lowTemperatureThreshold": i_lo_temp,
"lowTemperatureState": { "on": True, "mode": "heat", "fanLevel": "auto", "targetTemperature": i_tgt_temp },
"highTemperatureThreshold": i_hi_temp,
"highTemperatureState": { "on": False }
}
else:
raise ValueError("Unsupported mode " + str(s_mode))
ret_js = requests.post(acpriv.ss_url_v2_base + "/smartmode", params=req_params, json=req_data)
ret_arr = json.loads(ret_js.text)
if ret_arr['status'] != "success":
raise RuntimeError("return is not success")
def ac_ctl_smartmode(b_on):
req_params = { "apiKey": acpriv.ss_key }
req_data = { "enabled": b_on }
ret_js = requests.put(acpriv.ss_url_v2_base + "/smartmode", params=req_params, json=req_data)
ret_arr = json.loads(ret_js.text)
if ret_arr['status'] != "success":
raise RuntimeError("return is not success")
def ac_get_switch_state():
TUYA_LOGGER.setLevel(logging.ERROR)
tapi = TuyaOpenAPI(acpriv.tuya_url, acpriv.tuya_id, acpriv.tuya_key)
tapi.connect()
res = tapi.get("/v1.0/iot-03/devices/{}/status".format(acpriv.tuya_device_id))
want_codes = { 'cur_current': 'current' }
map_res = dict(map(lambda e: ( want_codes[e['code']], e['value'] ),
filter(lambda v: v['code'] in want_codes.keys(), res['result'])))
vcur = map_res['current'];
if (vcur > 2000):
return "on"
elif (vcur > 300):
return "fan"
else:
return "off"