-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmib_prop.pyx
349 lines (304 loc) · 12.6 KB
/
mib_prop.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#cython: language_level=3
from libc.stdio cimport *
from libc.string cimport *
from libc.stdlib cimport malloc, free
from cpython cimport PyObject, PyList_New, PyList_SetItem
cimport numpy as np
# essential for setting up NumPy-C API
np.import_array()
cdef extern from "src/mib_header_MQ1.h":
ctypedef struct mq1s:
unsigned int sequence_number
unsigned int exposure_time_ns
ctypedef struct mq1q:
unsigned int sequence_number
unsigned int exposure_time_ns
cdef extern from "src/read_mq1_headers.h":
ctypedef struct MQ1_fields:
unsigned int max_length
unsigned int* sequence_number
unsigned int* header_bytes
unsigned int* num_chips
unsigned int* det_x
unsigned int* det_y
char* pixel_depth
char* sensor_layout
char* chip_select
char* timestamp
double* exposure_time_s
unsigned int* counter
unsigned int* colour_mode
unsigned int* gain_mode
float* threshold
char* header_extension_id
char* extended_timestamp
unsigned int* exposure_time_ns
unsigned int* bit_depth
MQ1_fields allocate_MQ1_fields(unsigned int nheaders)
void deallocate_MQ1_fields(MQ1_fields mq1_fields)
unsigned int mq1_single_from_file(
FILE* mib_ptr,
unsigned int nheaders,
unsigned int detector_frame_bytes,
mq1s* mq1s_h,
MQ1_fields* mq1_fields,
)
unsigned int mq1_quad_from_file(
FILE* mib_ptr,
unsigned int nheaders,
unsigned int detector_frame_bytes,
mq1q* mq1q_h,
MQ1_fields* mq1_fields,
)
cdef extern from "src/mib_utils.h":
void header_meta_from_first(FILE* mib_ptr,
char* header_id,
unsigned int* header_bytes,
unsigned int* num_chips,
unsigned int* det_x,
unsigned int* det_y,
char* pixel_depth
)
unsigned int num_of_headers(FILE* mib_ptr,
const unsigned int stride
)
cdef extern from "src/mib_macros.h":
int MQ1_CHAR_LEN_PIXEL_DEPTH # 4
int MQ1_CHAR_LEN_SENSOR_LAYOUT # 7
int MQ1_CHAR_LEN_CHIP_SELECT # 3
int MQ1_CHAR_LEN_TIMESTAMP # 27
int MQ1_FLOAT_LEN_THRESHOLD # 8
int MQ1_CHAR_LEN_HEADER_EXTENSION_ID # 5
int MQ1_CHAR_LEN_EXTENDED_TIMESTAMP # 31
cdef extern from "Python.h":
PyObject* Py_BuildValue(const char* format, ...)
cdef npy_array_from_field(
void* field,
int nd,
np.npy_intp* dims,
int typenum):
# 'field' is on the heap and it will be freed
# so to avoid NumPy array pointing to invalid memory,
# a copy of the array pointing to the field (in heap) is made
new_arr = np.PyArray_SimpleNew(nd, dims, typenum)
np.PyArray_CopyInto(new_arr,
np.PyArray_SimpleNewFromData(nd,
dims,
typenum,
field))
return new_arr
cdef str_list_from_field(
char* field,
unsigned int str_len,
unsigned int nfield):
# if the char field is abc\0def\0ghi\0
# and str_len = 4, nfield = 3
# then str_list will be ["abc", "def", "ghi"]
# prevent modification by strtok
cdef char* copy = strdup(field)
str_list = PyList_New(nfield)
for k in range(nfield):
PyList_SetItem(str_list, k, <object>Py_BuildValue("s#", strtok(copy, "\0"), str_len-1))
free(copy)
copy = NULL
return str_list
def mib_props(mib_path, max_header_index=-1, **kwargs):
"""Return the specified fields of a .mib header.
Parameters
----------
mib_path : str
the path to the .mib file.
max_header_index : int, optional
the number of headers to be parsed. Default to -1, which will
parse all the headers in the .mib file.
The following parameters can be set to True to return the
corresponding field:
- sequence_number (header ID)
- header_bytes (acquisition sequence number)
- num_chips (data offset)
- det_x (pixel dimension X)
- det_y (pixel dimension Y)
- pixel_depth (pixel depth in file)
- sensor_layout (sensor layout)
- chip_select (chip select)
- timestamp (time stamp)
- exposure_time_s (acquisition shutter time in s)
- counter (counter)
- colour_mode (colour mode)
- gain_mode (gain mode)
- threshold (threshold, 0-7)
- header_extension_id (header extension ID)
- extended_timestamp (extended time stamp)
- exposure_time_ns (acquisition shutter time in ns)
- bit_depth (counter depth)
Any unrecognised field/parameter will be ignored.
Returns
-------
ret : dict
the specified field in the .mib header
"""
# separate to avoid creation of temporary Python object
fbytes = mib_path.encode()
cdef char* mib_filename = fbytes
cdef FILE* f
cdef char header_id[4]
cdef char pixel_depth[4]
cdef unsigned int header_bytes, num_chips, det_x, det_y, px_nbytes, frame_stride, num_headers, num_parsed
cdef long int max_hidx = max_header_index
cdef mq1s* mq1s_ptr
cdef mq1q* mq1q_ptr
cdef MQ1_fields mq1_fields
cdef np.npy_intp field_length
if max_hidx == 0:
msg = (f"max_header_index cannot be zero, use any negative number "
"to parse all headers in the MIB file")
raise ValueError(msg)
f = fopen(mib_filename, "rb")
if (f == NULL):
msg = f"Failed to open the MIB file {mib_path}"
raise FileNotFoundError(msg)
header_meta_from_first(f,
header_id,
&header_bytes,
&num_chips,
&det_x,
&det_y,
pixel_depth
)
# according to the pixel depth, set the number of bytes for each pixel
if ((pixel_depth == b"U01") or (pixel_depth == b"U08")):
px_nbytes = 1
elif (pixel_depth == b"U16"):
px_nbytes = 2
elif (pixel_depth == b"U32"):
px_nbytes = 4
elif (pixel_depth == b"U64"):
px_nbytes = 8
else:
msg = (f"The pixel depth {pixel_depth.decode()} is not supported. "
"Supported pixe depth are U01, U08, U16, U32 and U64.")
raise ValueError(msg)
# check if this is from 'MQ1'
if (header_id != b"MQ1"):
fclose(f)
msg = "Only the version 'MQ1' is currently supported"
raise ValueError(msg)
# unlikely to overflow
frame_stride = header_bytes + det_x * det_y * px_nbytes
# determine the number of headers/frames in the MIB file
# it returns 0 if it fails to determine
num_headers = num_of_headers(f, frame_stride)
if (num_headers == 0):
fclose(f)
msg = "Failed to determine the number of headers in the MIB file"
raise RuntimeError(msg)
if max_hidx > 0:
if max_hidx < num_headers:
num_headers = max_hidx
else:
print(f"warning: the specified max_header_index ({max_hidx}) is "
f"larger than the total number of header(s) ({num_headers}) "
f"in the MIB file. Only the first {num_headers} "
"header(s) will be parsed.")
# allocate memory for different fields
mq1_fields = allocate_MQ1_fields(num_headers)
# allocate memory
if (num_chips == 1):
# Single
mq1s_ptr = <mq1s*>malloc(sizeof(mq1s) * num_headers)
if (mq1s_ptr == NULL):
deallocate_MQ1_fields(mq1_fields)
fclose(f)
msg = "Error when allocating memory to single header pointer"
raise MemoryError(msg)
num_parsed = mq1_single_from_file(f,
num_headers,
det_x*det_y*px_nbytes,
mq1s_ptr,
&mq1_fields
)
# free memory
free(mq1s_ptr)
mq1s_ptr = NULL
elif (num_chips == 4):
# Quad
mq1q_ptr = <mq1q*>malloc(sizeof(mq1q) * num_headers)
if (mq1q_ptr == NULL):
deallocate_MQ1_fields(mq1_fields)
fclose(f)
msg = "Error when allocating memory to quad header pointer"
raise MemoryError(msg)
num_parsed = mq1_quad_from_file(f,
num_headers,
det_x*det_y*px_nbytes,
mq1q_ptr,
&mq1_fields
)
# free memory
free(mq1q_ptr)
mq1q_ptr = NULL
else:
deallocate_MQ1_fields(mq1_fields)
fclose(f)
raise RuntimeError(f"Chip number {num_chips} not supported")
# check status of parsing header
# the mq1_XXX_from_file return 0 upon unsuccessful parsing
# otherwise it returns the number of parsed headers
if (num_parsed == 0):
deallocate_MQ1_fields(mq1_fields)
fclose(f)
msg = "Error when parsing the header"
raise RuntimeError(msg)
# close the file
if (fclose(f) == EOF):
deallocate_MQ1_fields(mq1_fields)
raise RuntimeError(f"Failed top close the MIB file {mib_path}")
# compose the return depending on kwargs, horrible
field_length = <np.npy_intp> num_headers
cdef np.npy_intp[2] thresh_dims= [num_headers, MQ1_FLOAT_LEN_THRESHOLD]
ret = {}
if kwargs.get("sequence_number"):
ret["sequence_number"] = npy_array_from_field(<void*>mq1_fields.sequence_number, 1, &field_length, np.NPY_UINT32)
if kwargs.get("header_bytes"):
ret["header_bytes"] = npy_array_from_field(<void*>mq1_fields.header_bytes, 1, &field_length, np.NPY_UINT32)
if kwargs.get("num_chips"):
ret["num_chips"] = npy_array_from_field(<void*>mq1_fields.num_chips, 1, &field_length, np.NPY_UINT32)
if kwargs.get("det_x"):
ret["det_x"] = npy_array_from_field(<void*>mq1_fields.det_x, 1, &field_length, np.NPY_UINT32)
if kwargs.get("det_y"):
ret["det_y"] = npy_array_from_field(<void*>mq1_fields.det_y, 1, &field_length, np.NPY_UINT32)
if kwargs.get("pixel_depth"):
# pixel_depth is char[4]
ret["pixel_depth"] = str_list_from_field(mq1_fields.pixel_depth, MQ1_CHAR_LEN_PIXEL_DEPTH, num_headers)
if kwargs.get("sensor_layout"):
# sensor_layout is char[7]
ret["sensor_layout"] = str_list_from_field(mq1_fields.sensor_layout, MQ1_CHAR_LEN_SENSOR_LAYOUT, num_headers)
if kwargs.get("chip_select"):
# chip_select is char[3]
ret["chip_select"] = str_list_from_field(mq1_fields.chip_select, MQ1_CHAR_LEN_CHIP_SELECT, num_headers)
if kwargs.get("timestamp"):
# timestamp is char[27]
ret["timestamp"] = str_list_from_field(mq1_fields.timestamp, MQ1_CHAR_LEN_TIMESTAMP, num_headers)
if kwargs.get("exposure_time_s"):
ret["exposure_time_s"] = npy_array_from_field(<void*>mq1_fields.exposure_time_s, 1, &field_length, np.NPY_DOUBLE)
if kwargs.get("counter"):
ret["counter"] = npy_array_from_field(<void*>mq1_fields.counter, 1, &field_length, np.NPY_UINT32)
if kwargs.get("colour_mode"):
ret["colour_mode"] = npy_array_from_field(<void*>mq1_fields.colour_mode, 1, &field_length, np.NPY_UINT32)
if kwargs.get("gain_mode"):
ret["gain_mode"] = npy_array_from_field(<void*>mq1_fields.gain_mode, 1, &field_length, np.NPY_UINT32)
if kwargs.get("threshold"):
ret["threshold"] = npy_array_from_field(<void*>mq1_fields.threshold, 2, thresh_dims, np.NPY_FLOAT32)
if kwargs.get("header_extension_id"):
# header_extension_id is char[5]
ret["header_extension_id"] = str_list_from_field(mq1_fields.header_extension_id, MQ1_CHAR_LEN_HEADER_EXTENSION_ID, num_headers)
if kwargs.get("extended_timestamp"):
# extended_timestamp is char[31]
ret["extended_timestamp"] = str_list_from_field(mq1_fields.extended_timestamp, MQ1_CHAR_LEN_EXTENDED_TIMESTAMP, num_headers)
if kwargs.get("exposure_time_ns"):
ret["exposure_time_ns"] = npy_array_from_field(<void*>mq1_fields.exposure_time_ns, 1, &field_length, np.NPY_UINT32)
if kwargs.get("bit_depth"):
ret["bit_depth"] = npy_array_from_field(<void*>mq1_fields.bit_depth, 1, &field_length, np.NPY_UINT32)
# free MQ1 fields memory
deallocate_MQ1_fields(mq1_fields)
return ret