-
Notifications
You must be signed in to change notification settings - Fork 4
/
pc_main.py
97 lines (77 loc) · 2.63 KB
/
pc_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import ctypes
import json
import paho.mqtt.client as mqtt
import time
import yaml
from multiprocessing import Process, Value
from pynput import keyboard
class MQTTSender():
def __init__(self, host='localhost', port=1883, keepalive=60):
self._client = mqtt.Client()
self._client.connect(host, port, keepalive)
return
def disconnect(self):
self._client.disconnect()
return
def publish(self, driver_msg, topic='mqtt/sensor'):
self._client.publish(topic, driver_msg)
return
class MPDriverInput():
def __init__(self):
self.throttle = Value(ctypes.c_float, 0.0)
self.steering = Value(ctypes.c_float, 0.0)
self.is_run = Value(ctypes.c_bool, True)
self._p = Process(target=self._process, args=())
self._p.start()
return
def close(self):
self.is_run.value = False
self._p.join()
def _on_press(self, key):
if key == keyboard.Key.esc:
self.is_run.value = False
elif key == keyboard.Key.up:
self.throttle.value = 1.0
elif key == keyboard.Key.down:
self.throttle.value = -1.0
elif key == keyboard.Key.left:
self.steering.value = -1.0
elif key == keyboard.Key.right:
self.steering.value = 1.0
def _on_release(self, key):
if key == keyboard.Key.up or key == keyboard.Key.down:
self.throttle.value = 0.0
elif key == keyboard.Key.left or key == keyboard.Key.right:
self.steering.value = 0.0
def _process(self):
key_listener = keyboard.Listener(on_press=self._on_press, \
on_release=self._on_release)
key_listener.start()
while self.is_run.value:
time.sleep(0.01)
key_listener.stop()
def main():
ymlfile = open('config.yml')
cfg = yaml.load(ymlfile)
ymlfile.close()
sender = MQTTSender(host=cfg['pi_ip'])
mp_driver = MPDriverInput()
driver_msg = {
'time': time.time(),
'throttle': 0.0,
'steering': 0.0
}
print('Press esc to exit:')
while mp_driver.is_run.value:
driver_msg['time'] = int(time.time())
driver_msg['throttle'] = mp_driver.throttle.value
driver_msg['steering'] = mp_driver.steering.value
sender.publish(json.dumps(driver_msg))
print('Time:{}, Throttle: {}, Steering: {}'.format(driver_msg['time'], driver_msg['throttle'], driver_msg['steering']))
time.sleep(0.01)
mp_driver.close()
sender.disconnect()
if __name__ == '__main__':
main()