-
Notifications
You must be signed in to change notification settings - Fork 8
/
streamtest.py
71 lines (61 loc) · 1.96 KB
/
streamtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""
Stream test:
Pull the video from the drone and display in cv2 window.
Optionally encode video and dump to file.
"""
import av
import numpy
import tellopy
import cv2
def encode(frame, ovstream, output):
"""
convert frames to packets and write to file
"""
try:
pkt = ovstream.encode(frame)
except Exception as err:
print("encoding failed{0}".format(err))
if pkt is not None:
try:
output.mux(pkt)
except Exception:
print('mux failed: ' + str(pkt))
return True
def main():
# Set up tello streaming
drone = tellopy.Tello()
drone.log.set_level(2)
drone.connect()
drone.start_video()
# container for processing the packets into frames
container = av.open(drone.get_video_stream())
video_st = container.streams.video[0]
# stream and outputfile for video
output = av.open('archive.mp4', 'w')
ovstream = output.add_stream('mpeg4', video_st.rate)
ovstream.pix_fmt = 'yuv420p'
ovstream.width = video_st.width
ovstream.height = video_st.height
counter = 0
save = True
for packet in container.demux((video_st,)):
for frame in packet.decode():
# convert frame to cv2 image and show
image = cv2.cvtColor(numpy.array(
frame.to_image()), cv2.COLOR_RGB2BGR)
cv2.imshow('frame', image)
key = cv2.waitKey(1) & 0xFF
# save initial 1300 frames
if save:
new_frame = av.VideoFrame(
width=frame.width, height=frame.height, format=frame.format.name)
for i in range(len(frame.planes)):
new_frame.planes[i].update(frame.planes[i])
encode(new_frame, ovstream, output)
counter += 1
print("Frames encoded:", counter)
if counter > 300:
output.close()
save == False
if __name__ == '__main__':
main()