-
Notifications
You must be signed in to change notification settings - Fork 34
/
calib.py
executable file
·89 lines (68 loc) · 2.7 KB
/
calib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
import ctypes
import tensorrt as trt
import cv2
from torchvision import datasets, transforms
import os
from image_reader import read_image_chw
class PythonEntropyCalibrator(trt.infer.EntropyCalibrator):
def __init__(self, input_layers, stream):
trt.infer.EntropyCalibrator.__init__(self)
self.input_layers = input_layers
self.stream = stream
self.d_input = cuda.mem_alloc(self.stream.calibration_data.nbytes)
stream.reset()
def get_batch_size(self):
return self.stream.batch_size
def get_batch(self, bindings, names):
batch = self.stream.next_batch()
if not batch.size:
return None
cuda.memcpy_htod(self.d_input, batch)
for i in self.input_layers[0]:
assert names[0] != i
bindings[0] = int(self.d_input)
return bindings
def read_calibration_cache(self, length):
return None
def write_calibration_cache(self, ptr, size):
# cache = ctypes.c_char_p(int(ptr))
# with open('calibration_cache.bin', 'wb') as f:
# f.write(cache.value)
return None
class ImageBatchStream():
def __init__(self, args):
self.prefix = args.img_dir
self.batch_size = args.batch_size
self.channel = args.input_channel
self.width, self.height = 224, 224
self.filename = args.fake_test_file
lines = open(self.filename).readlines()
calibration_files = [s.strip().split(",")[0] for s in lines]
self.max_batches = (len(calibration_files) // self.batch_size) + \
(1 if (len(calibration_files) % self.batch_size)
else 0)
self.files = calibration_files
self.calibration_data = np.zeros((self.batch_size, self.channel,
self.height, self.width), dtype=np.float32)
self.batch = 0
# self.preprocessor = preprocessor
def reset(self):
self.batch = 0
def next_batch(self):
if self.batch < self.max_batches:
imgs = []
files_for_batch = self.files[self.batch_size * self.batch: \
self.batch_size * (self.batch + 1)]
for f in files_for_batch:
print("[ImageBatchStream] Processing ", f)
img = read_image_chw(os.path.join(self.prefix, f), self.width, self.height)
imgs.append(img)
for i in range(len(imgs)):
self.calibration_data[i] = imgs[i]
self.batch += 1
return np.ascontiguousarray(self.calibration_data, dtype=np.float32)
else:
return np.array([])