-
Notifications
You must be signed in to change notification settings - Fork 0
/
Helicopter.py
163 lines (139 loc) · 4.71 KB
/
Helicopter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import threading
import multiprocessing
import time
from datetime import datetime
import sys
import os, pdb
import serial
import struct
import csv
class HeliControl(object):
def __init__(self, COM = 'COM6', Baud = 9600, Filename = None):
### Arduino Communications
self.COM = COM # COM Port
self.Baud = Baud # Baud Rate: Recommended 9600
self.startMarker = '<' # Used for Serial Read
self.arduino = None # Arduino Class
self.raw_data = None # Stores Raw Data
if Filename == None:
self.Filename = 'ArudinoReport_'+datetime.today().strftime('%Y-%m-%d-%H-%M-%S')+'.csv'
### RPM and Pitch Set Parameters
self.RPMsens = 10 # RPM Tolerance
self.Pitchsens = 0.1 # Pitch Tolerance
### Initial Servo Positions
self.Thr = 0
self.Col = 0
self.Ail = 127
self.Ele = 127
self.Rud = 127
### Specify Boolean for Recording and Stop
self.Collect = False
self.Stop = False
### Connect to Arduino
self.Connect(self.COM, self.Baud)
### Begin Background Threads
self.ReadThread = None
self.ControlThread = None
self.WriteThread = None
self.StartThreads()
def Connect(self, COM, Baud):
self.arduino = serial.Serial(str(COM), Baud, timeout = 2)
self.arduino.flush()
print(self.arduino.read_until(b'>').decode())
#end
def StartThreads(self):
self.ReadThread = threading.Thread(target=self._ReadArduino)
# self.ReadThread.daemon = True
self.ReadThread.start()
print('> Read Arduino Thread Started')
self.ControlThread = threading.Thread(target=self._Control)
# self.ControlThread.daemon = True
self.ControlThread.start()
print('> Control Arduino Thread Started')
self.WriteThread = threading.Thread(target=self._Write2File)
# self.WriteThread.daemon = True
self.WriteThread.start()
print('> Arduino Data File Writing Thread Started')
#end
def _ReadArduino(self):
while True:
received = self.arduino.readline()
# Following line is simply removing the extraneous bits of the string (byte, quotes, markers, newline symbols)
received = str(received[1:-3])[2:-1]
data = received.split(":")
data.append(str(datetime.now()))
self.raw_data = data
# print(data)
# time.sleep(0.5)
if self.Stop:
break
#end
def _Control(self):
while True:
self.arduino.write(self.startMarker.encode())
self.arduino.write(struct.pack('>BBBBB',self.Thr,self.Col,self.Ail,self.Ele, self.Rud))
time.sleep(0.1)
if self.Stop:
break
#end
# def Collect(self):
# self.threading2 = threading.Thread(target=self.ReadArduino)
def _Write2File(self):
while True:
if self.Collect:
File = open(self.Filename, 'a+', newline='')
FileWriter = csv.writer(File)
FileWriter.writerow(self.raw_data)
if self.Stop:
break
#end
#end
def SetRPM(self, RPM):
RPMcurrent = self.raw_data[0]
while abs(RPMcurrent-RPM) > self.RPMsens:
if RPMcurrent < RPM:
self.Thr += 1
time.sleep(0.1)
else:
self.Thr -= 1
time.sleep(0.1)
#end
RPMcurrent = self.raw_data[0]
#end
def SetPitch(self, Pitch):
# Pitchcurrent = self.raw_data[1]
# 0 = -10, 255 = +10
PitchCurrent = self.raw_data[3]*(10-(-10))/255 - 10 # Until we get sensor
while abs(Pitchcurrent-Pitch) > self.Pitchsens:
if Pitchcurrent < Pitch:
self.Col += 1
time.sleep(0.1)
else:
self.Col -= 1
time.sleep(0.1)
#end
Pitchcurrent = self.raw_data[3]*(10-(-10))/255 - 10 # Until we get sensor
#end
def SetServo(self, Thr, Col, Ail, Ele):
self.Thr = Thr
self.Col = Col
self.Ail = Ail
self.Ele = Ele
#end
def StartCollect(self):
self.Collect = True
def StopCollect(self):
self.Collect = False
def kill(self):
self.Thr = 0
self.Col = 0
time.sleep(1.)
self.Stop = True
if __name__ == "__main__":
print('---Testing HeliControl Class---')
heli = HeliControl()
for i in range(255):
heli.SetPitch(i,i,i,i)
print(i)
time.sleep(0.5)
heli.stop()