-
Notifications
You must be signed in to change notification settings - Fork 3
/
replier.py
44 lines (37 loc) · 1.34 KB
/
replier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random
import time
from osparc_control import CommandManifest
from osparc_control import CommandParameter
from osparc_control import CommandType
from osparc_control import PairedTransmitter
# declare some commands to which a reply can be provided
random_in_range_manifest = CommandManifest(
action="random_in_range",
description="gets the time",
params=[
CommandParameter(name="a", description="lower bound for random numbers"),
CommandParameter(name="b", description="upper bound for random numbers"),
],
command_type=CommandType.WITH_IMMEDIATE_REPLY,
)
paired_transmitter = PairedTransmitter(
remote_host="localhost",
exposed_commands=[random_in_range_manifest],
remote_port=2346,
listen_port=2345,
)
paired_transmitter.start_background_sync()
wait_for_requests = True
while wait_for_requests:
for command in paired_transmitter.get_incoming_requests():
if command.action == random_in_range_manifest.action:
random_int = random.randint( # noqa: S311
command.params["a"], command.params["b"]
)
paired_transmitter.reply_to_command(
request_id=command.request_id, payload=random_int
)
wait_for_requests = False
# allow for message to be delivered
time.sleep(0.3)
paired_transmitter.stop_background_sync()