-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPressSens.py
83 lines (70 loc) · 2.57 KB
/
PressSens.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python
# coding: utf-8
from datalogger import *
import threading
from time import sleep
from config import *
talive = None
def keep_alive():
global talive
PfLog.dre.command_tx_buf="COM,1".encode()
print("PRESS RECOVERY: ",PfLog.dre.command_tx_buf)
PfLog.sendCtrlCommand()
#print("programo timer")
talive = threading.Timer(10.0,keep_alive)
talive.start()
# Gets a response from the Motors
def getDataPfeiffer():
if (cte_simulate_sensors) or (cte_emulate_devices):
#print("Espero 0.3 segundos")
sleep(.5)
return 99, 5555.0, 98, 5554
else:
global talive
done = False
while not done:
#print("programo timer")
talive = threading.Timer(10.0,keep_alive)
talive.start()
PfLog.getCtrlResponse()
talive.cancel()
# PfLog.dre.command_rx_buf
resp2 = PfLog.dre.command_rx_str
splitresp = resp2.split(',') # Splits the string into groups
done = (len(splitresp)>=4)
if not done:
print("SKIPPING UNKNOWN MSG: ",resp2)
status1 = int(splitresp[0].strip()) # Removes whitespaces and assign to status of first channel
value1 = float(splitresp[1].strip()) * 1e9 # Removes whitespaces and assign to value of first channel
status2 = int(splitresp[2].strip()) # Removes whitespaces and assign to status of second channel
value2 = float(splitresp[3].strip()) * 1e9 # Removes whitespaces and assign to value of second channel
return status1, value1, status2, value2
# In[ ]:
if not cte_emulate_devices:
serport = serial.Serial(
port=config.cte_serial_port,
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
rtscts=False,
dsrdtr=False,
xonxoff=True
)
if config.cte_verbose:
print("PRESS serial port: " + config.cte_serial_port)
else:
serport = None
def execPressDatalog():
datalogger(serport,getDataPfeiffer,"Press1","Press2","press",cfg_press_nsamples_period1, cfg_press_nsamples_period2)
pressDatalog = threading.Thread(target=execPressDatalog, name="pressDatalog")
print("*** Lanzo pressDatalog")
pressDatalog.start()
veces = 1
while(1):
sleep(10)
threadvivo = pressDatalog.is_alive()
if not threadvivo:
pressDatalog = threading.Thread(target=execPressDatalog, name="pressDatalog")
print("*** Relanzo pressDatalog",veces)
pressDatalog.start()