forked from AlexeyAB/darknet
-
Notifications
You must be signed in to change notification settings - Fork 2
/
darknet_tracker_havi.py
193 lines (176 loc) · 8.01 KB
/
darknet_tracker_havi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# this file is a modified file of darknet_video.py from https://github.com/AlexeyAB/darknet
from ctypes import *
import random
import os
import cv2
import time
import darknet
import argparse
from threading import Thread, enumerate
from queue import Queue
import numpy as np
from random import randint
from sort_havi import *
import pickle
def parser():
parser = argparse.ArgumentParser(description="YOLO Object Detection")
parser.add_argument("--input", type=str, default=0,
help="video source. If empty, uses webcam 0 stream")
parser.add_argument("--out_filename", type=str, default="",
help="inference video name. Not saved if empty")
parser.add_argument("--weights", default="./yolov4.weights",
help="yolo weights path")
parser.add_argument("--dont_show", action='store_true',
help="windown inference display. For headless systems")
parser.add_argument("--ext_output", action='store_true',
help="display bbox coordinates of detected objects")
parser.add_argument("--config_file", default="./cfg/yolov4.cfg",
help="path to config file")
parser.add_argument("--data_file", default="./cfg/coco.data",
help="path to data file")
parser.add_argument("--thresh", type=float, default=.25,
help="remove detections with confidence below this value")
parser.add_argument("--fall", type=bool, default=True,
help="fall detection")
parser.add_argument("--display", type=bool, default=True,
help="display tracker")
return parser.parse_args()
def str2int(video_path):
"""
argparse returns and string althout webcam uses int (0, 1 ...)
Cast to int if needed
"""
try:
return int(video_path)
except ValueError:
return video_path
def check_arguments_errors(args):
assert 0 < args.thresh < 1, "Threshold should be a float between zero and one (non-inclusive)"
if not os.path.exists(args.config_file):
raise(ValueError("Invalid config path {}".format(os.path.abspath(args.config_file))))
if not os.path.exists(args.weights):
raise(ValueError("Invalid weight path {}".format(os.path.abspath(args.weights))))
if not os.path.exists(args.data_file):
raise(ValueError("Invalid data file path {}".format(os.path.abspath(args.data_file))))
if str2int(args.input) == str and not os.path.exists(args.input):
raise(ValueError("Invalid video path {}".format(os.path.abspath(args.input))))
def set_saved_video(input_video, output_video, size):
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
fps = int(input_video.get(cv2.CAP_PROP_FPS))
video = cv2.VideoWriter(output_video, fourcc, fps, size)
return video
def video_capture(frame_queue, darknet_image_queue):
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_resized = cv2.resize(frame_rgb, (width, height),
interpolation=cv2.INTER_LINEAR)
frame_queue.put(frame_resized)
darknet.copy_image_from_bytes(darknet_image, frame_resized.tobytes())
darknet_image_queue.put(darknet_image)
cap.release()
def inference(darknet_image_queue, detections_queue, fps_queue, fall_queue, matrix_queue):
# Initialize tracker
mot_tracker1 = Sort(max_age=30, min_hits=1, iou_threshold=0.20)
while cap.isOpened():
# cap.set(cv2.CAP_PROP_FPS, 25)
darknet_image = darknet_image_queue.get()
prev_time = time.time()
detections = darknet.detect_image(network, class_names, darknet_image, thresh=args.thresh)
# detections = [element for element in detections if 'person' in str(element)] # Only track 'person'
detections = [element for element in detections]
# extract detected bounding boxes in each frame
dets = [list(single_det) for single_det in detections]
if len(dets) > 0:
# list of list objects
dets_list = []
# obj: class, confidence, coors x 4,
for obj in dets:
score = np.float(obj[1])*0.01
coors = [int(coor) for coor in list(obj[2])]
coors[-1] = coors[1] + coors[-1]
coors [-2] = coors[0] + coors[-2]
coors.append(score)
# need to add another step to map string class to id, e.g person -> 0, car -> 1
# dnum = maplabel(obj[0])
# coors.append(dnum)
coors.append(0)
# coors: xmin, ymin, xmax, ymax, conf, dnum (class)
dets_list.append(coors)
dets_list = np.asarray(dets_list)
elif len(dets) == 0:
dets_list = np.empty((0,6))
# update tracker ids
track_bbs_ids = mot_tracker1.update(dets_list) # x1,y1
id_queue.put(track_bbs_ids)
detections_queue.put(detections)
fps = int(1/(time.time() - prev_time))
fps_queue.put(fps)
#####################
cap.release()
#id_dict = {}
def drawing(frame_queue, detections_queue, fps_queue, fall_queue, id_queue):
random.seed(3) # deterministic bbox colors
video = set_saved_video(cap, args.out_filename, (width, height))
while cap.isOpened():
frame_resized = frame_queue.get()
detections = detections_queue.get()
ids = id_queue.get()
fps = fps_queue.get()
if frame_resized is not None:
image = darknet.draw_boxes(detections, frame_resized, class_colors)
if ids is not None:
for i in range(len(ids)): # Each box
single_obj = ids[i]
# Coordinates of the object
centers = (int((single_obj[0] + single_obj[2])/2), int((single_obj[1] + single_obj[3])/2))
centers_x = (int((single_obj[0] + single_obj[2])/2), int((single_obj[1] + single_obj[3])/2) - 20)
# Get object ID
single_id = single_obj[-2]
if str(single_id) + '_id' not in id_dict.keys():
id_dict[str(single_id) + '_id'] = []
id_dict[str(single_id) + '_id'].append(centers)
elif str(single_id) + '_id' in id_dict.keys():
id_dict[str(single_id) + '_id'].append(centers)
# Display ID with assigned box
cv2.putText(frame_resized, str(single_id), centers_x, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,0,0), 2)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
if args.out_filename is not None:
video.write(image)
if not args.dont_show:
cv2.imshow('Inference', image)
if cv2.waitKey(fps) == 100:
break
cap.release()
video.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
frame_queue = Queue()
darknet_image_queue = Queue(maxsize=1)
detections_queue = Queue()
fps_queue = Queue()
fall_queue = Queue()
matrix_queue = Queue()
id_queue = Queue()
args = parser()
check_arguments_errors(args)
network, class_names, class_colors = darknet.load_network(
args.config_file,
args.data_file,
args.weights,
batch_size=1
)
# Darknet doesn't accept numpy images.
# Create one with image we reuse for each detect
width = darknet.network_width(network)
height = darknet.network_height(network)
darknet_image = darknet.make_image(width, height, 3)
#
input_path = args.input
cap = cv2.VideoCapture(input_path)
id_dict = {}
Thread(target=video_capture, args=(frame_queue, darknet_image_queue)).start()
Thread(target=inference, args=(darknet_image_queue, detections_queue, fps_queue, fall_queue, matrix_queue)).start()
Thread(target=drawing, args=(frame_queue, detections_queue, fps_queue,fall_queue,id_queue)).start()