-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmha_server.py
146 lines (128 loc) · 5.99 KB
/
mha_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import json
from MHAConnection import MHAConnection
import server_common
class LoopingWebSocket(server_common.MyWebSocketHandler):
def __init__(self, *args, **kwargs):
# grab our private keyword arguments
self.mha_host = kwargs.pop('mha_host')
self.mha_port = kwargs.pop('mha_port')
self.interval = None
self.pooling_id = kwargs.pop('pooling_id')
pool_path = kwargs.pop('pool_path')
# cache the location of the pooling plug-in
with MHAConnection(self.mha_host, self.mha_port, self.interval) as mha_conn:
self._plugin_path = mha_conn.find_id(self.pooling_id)[0]
# If --pool-path was not passed, default to looking for a monitoring
# plug-in in the same namespace as the acPooling_wave plug-in.
if not pool_path:
mon_path = self._plugin_path.replace(self.pooling_id, b'doasvm_mon')
pool_path = mon_path + b'.pool'
self._pool_path = pool_path
super(LoopingWebSocket, self).__init__(*args, **kwargs)
def _send_data(self):
try:
with MHAConnection(self.mha_host, self.mha_port, self.interval) as mha_conn:
p = mha_conn.get_val_converted(self._pool_path)
self.write_message(json.dumps({'data': p}))
except ValueError as e:
print("Error sending data: {}".format(e))
def on_message(self, message):
message = json.loads(message)
try:
if 'command' in message:
if message['command'] == 'send_data':
self._send_data()
else:
print('Unknown command "{}"'.format(message['command']))
elif 'new_pooling_wndlen' in message:
print('Pooling wndlen = {}'.format(message['new_pooling_wndlen']))
with MHAConnection(self.mha_host, self.mha_port, self.interval) as mha_conn:
mha_conn.set_val(self._plugin_path + b'.pooling_wndlen',
message['new_pooling_wndlen'])
elif 'new_pooling_alpha' in message:
print('Pooling alpha = {}'.format(message['new_pooling_alpha']))
with MHAConnection(self.mha_host, self.mha_port, self.interval) as mha_conn:
mha_conn.set_val(self._plugin_path + b'.alpha',
message['new_pooling_alpha'])
elif 'new_pooling_type' in message:
print('Pooling type = {}'.format(message['new_pooling_type']))
with MHAConnection(self.mha_host, self.mha_port, self.interval) as mha_conn:
mha_conn.set_val(self._plugin_path + b'.pooling_type',
message['new_pooling_type'])
elif 'beamformer' in message:
print('Beamformer = {}'.format(message['beamformer']))
with MHAConnection(self.mha_host, self.mha_port, self.interval) as mha_conn:
if message['beamformer'] is False:
mha_conn.set_val(b'mha.doachain.post.select', "NoBf")
elif message['beamformer'] is True:
mha_conn.set_val(b'mha.doachain.post.select', "Bf")
else:
print('Unknown message "{}"'.format(message))
elif 'new_interval' in message:
print('Interval = {}'.format(message['new_interval']))
self.interval = message['new_interval']
else:
print('Unknown message "{}"'.format(message))
except Exception as e:
print("Error handling message \"{}\": {}".format(message, e))
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
server_common.add_common_args(parser)
parser.add_argument(
'--mha-host',
default='127.0.0.1',
help='The host on which MHA is running.',
)
parser.add_argument(
'--mha-port',
default=33337,
type=int,
help='The port on which MHA is listening.',
)
parser.add_argument(
'--classification-id',
default=b'svm',
type=lambda s: (s if type(s) == bytes else s.encode()),
help="""The ID of a doasvm_classification instance. This is used to
fetch the "angles" variable in order to pass additional parameters to
the web app.
"""
)
parser.add_argument(
'--pooling-id',
default=b'pool',
type=lambda s: (s if type(s) == bytes else s.encode()),
help="""The ID of the desired acPooling_wave instance. This is the
instance that will be controlled from the web app.
"""
)
parser.add_argument(
'--pool-path',
default=b'mha.doachain.doasvm_mon.pool',
type=lambda s: (s if type(s) == bytes else s.encode()),
help="""The full path to the desired "pool" variable to visualise. If
unset, it is assumed that a doasvm_mon instance (named "doasvm_mon")
exists in the same namespace as the pooling plug-in specified by
--pooling-id, and that it has has a variable named "pool".
"""
)
args = parser.parse_args()
# abort the connection after a 5 second timeout
with MHAConnection(args.mha_host, args.mha_port, 5) as mha_conn:
plugin_path = mha_conn.find_id(args.classification_id)
if not plugin_path:
classification_id = args.classification_id.decode()
exit('Error: Could not find plug-in with ID "' + classification_id
+ '"')
angles = mha_conn.get_val_converted(plugin_path[0] + b'.angles')
ws_args = (
LoopingWebSocket, {'mha_host': args.mha_host,
'mha_port': args.mha_port,
'pooling_id': args.pooling_id,
'pool_path': args.pool_path}
)
server_common.main(args, ws_args, 'mha', min(angles), max(angles),
len(angles))