forked from cta-wave/device-observation-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobservation_framework.py
697 lines (614 loc) · 25.3 KB
/
observation_framework.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
# -*- coding: utf-8 -*-
"""DPCTF Device Observation Framework.
Entry point to the Device Observation Framework.
The Software is provided to you by the Licensor under the License, as
defined below, subject to the following condition.
Without limiting other conditions in the License, the grant of rights under
the License will not include, and the License does not grant to you, the
right to Sell the Software.
For purposes of the foregoing, “Sell” means practicing any or all of the
rights granted to you under the License to provide to third parties, for a
fee or other consideration (including without limitation fees for hosting
or consulting/ support services related to the Software), a product or
service whose value derives, entirely or substantially, from the
functionality of the Software. Any license notice or attribution required
by the License must also include this Commons Clause License Condition
notice.
Software: WAVE Observation Framework
License: Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0.txt
Licensor: Consumer Technology Association
Contributor: Resillion UK Limited
"""
import argparse
import errno
import logging
import math
import os
import shutil
import sys
import traceback
from pathlib import Path
from typing import List, Tuple
import cv2
from dpctf_qr_decoder import (
DPCTFQrDecoder,
MezzanineDecodedQr,
PreTestDecodedQr,
TestStatusDecodedQr,
)
from exceptions import ConfigError, ObsFrameError, ObsFrameTerminate
from global_configurations import GlobalConfigurations
from log_handler import LogManager
from observation_framework_processor import ObservationFrameworkProcessor
from qr_recognition.qr_recognition import FrameAnalysis
from camera_calibration_helper import calibrate_camera
MAJOR = 2
MINOR = 0
PATCH = 2
BETA = ""
VERSION = f"{MAJOR}.{MINOR}.{PATCH}{BETA}"
logger = logging.getLogger(__name__)
QR_CODE_AREA_RATIO_TO_SIZE = 1.9
"""To detect whether full QR code area is found.
There are 4 mezzanine QR codes, left-top, right-top, left-down, right-down
QR code AREA (height and width) is 2 times bigger than its size
camera might not be possible to be angles exact 90 degree to screen
this ratio set to slightly lower than 2"""
def rename_input_file(
input_video_path_str: str, input_video_path: Path, session_token: str
) -> None:
"""Rename the input file to session token
<filename>_dpctf_<sessionID>
"""
if session_token:
file_name, file_extension = os.path.splitext(input_video_path_str)
if session_token not in file_name:
new_file_name = file_name + "_dpctf_" + session_token
new_file_path = os.path.join(
input_video_path.parent, new_file_name + file_extension
)
os.rename(input_video_path_str, new_file_path)
# rename generated audio file as well
audio_file_extension = ".wav"
input_audio_path_str = file_name + audio_file_extension
if os.path.exists(input_audio_path_str):
new_audio_file_name = file_name + "_dpctf_" + session_token
new_audio_file_path = os.path.join(
input_video_path.parent, new_audio_file_name + audio_file_extension
)
os.rename(input_audio_path_str, new_audio_file_path)
logger.info("Recorded file renamed to '%s'.", new_file_path)
def iter_to_get_qr_area(
vid_cap,
camera_frame_rate: float,
width: int,
height: int,
end_iter_frame_num: int,
do_adaptive_threshold_scan: bool,
global_configurations: GlobalConfigurations,
starting_point_s: int,
print_processed_frame: bool,
) -> Tuple:
"""Iterate video frame by frame and detect mezzanine QR codes area.
Args:
vid_cap: VideoCapture instance for the current file.
camera_frame_rate: recording camera frame rate
width: recording image width
height: recording image height
end_iter_frame_num: frame number where system stops search qr areas
global_configurations: to get configuration from
Returns:
first_pre_test_qr_time: first pre test qr code detection time in ms
qr_code_areas: qr_code_areas to crop when detecting qr code
"""
test_status_found = False
mezzanine_found = False
first_pre_test_found = False
first_pre_test_qr_time = 0
qr_code_areas = [[], []]
corrupted_frame_num = 0
vid_cap.set(cv2.CAP_PROP_POS_MSEC, starting_point_s * 1000)
len_frames = int(vid_cap.get(cv2.CAP_PROP_FRAME_COUNT))
starting_frame = math.floor(starting_point_s * camera_frame_rate)
capture_frame_num = starting_frame
while (len_frames + corrupted_frame_num) > capture_frame_num:
got_frame, image = vid_cap.read()
if not got_frame:
if "video" in global_configurations.get_ignore_corrupted():
# work around for camera has corrupted frame e.g.:GoPro
corrupted_frame_num += 1
capture_frame_num += 1
continue
else:
logger.warning("Recording frame %d is corrupted.", capture_frame_num)
break
# print out where the processing is currently
if print_processed_frame:
if capture_frame_num % 50 == 0:
print(f"Checking frame {capture_frame_num}...")
analysis = FrameAnalysis(
capture_frame_num, DPCTFQrDecoder(), max_qr_code_num_in_frame=3
)
rough_qr_code_areas = [[], []]
# left half for mezzanine
rough_qr_code_areas[0] = [0, 0, int(width / 2), height]
# right half for test status
rough_qr_code_areas[1] = [int(width / 2), 0, width, height]
analysis.full_scan(image, rough_qr_code_areas, do_adaptive_threshold_scan)
detected_qr_codes = analysis.all_codes()
for detected_code in detected_qr_codes:
if (
isinstance(detected_code, PreTestDecodedQr)
and not first_pre_test_found
and starting_frame == 0
):
first_pre_test_qr_time = capture_frame_num / camera_frame_rate * 1000
first_pre_test_found = True
logger.debug(
"First pre-test QR code is detected at time %f.",
first_pre_test_qr_time,
)
elif isinstance(detected_code, MezzanineDecodedQr) and not mezzanine_found:
logger.debug(
"Frame Number=%d Location=%s",
detected_code.frame_number,
detected_code.location,
)
new_qr_code_area = [
detected_code.location[0],
detected_code.location[1],
detected_code.location[0] + detected_code.location[2],
detected_code.location[1] + detected_code.location[3],
]
if not qr_code_areas[0]:
qr_code_areas[0] = new_qr_code_area
else:
if new_qr_code_area[0] < qr_code_areas[0][0]:
qr_code_areas[0][0] = new_qr_code_area[0]
if new_qr_code_area[1] < qr_code_areas[0][1]:
qr_code_areas[0][1] = new_qr_code_area[1]
if new_qr_code_area[2] > qr_code_areas[0][2]:
qr_code_areas[0][2] = new_qr_code_area[2]
if new_qr_code_area[3] > qr_code_areas[0][3]:
qr_code_areas[0][3] = new_qr_code_area[3]
if (
(qr_code_areas[0][2] - qr_code_areas[0][0])
> QR_CODE_AREA_RATIO_TO_SIZE * detected_code.location[2]
) and (
(qr_code_areas[0][3] - qr_code_areas[0][1])
> QR_CODE_AREA_RATIO_TO_SIZE * detected_code.location[3]
):
mezzanine_found = True
logger.debug(
"Mezzanine QR code area is detected successfully at: %s.",
qr_code_areas[0],
)
elif (
isinstance(detected_code, TestStatusDecodedQr) and not test_status_found
):
logger.debug(
"Status=%s Location=%s",
detected_code.status,
detected_code.location,
)
qr_code_areas[1] = [
detected_code.location[0],
detected_code.location[1],
detected_code.location[0] + detected_code.location[2],
detected_code.location[1] + detected_code.location[3],
]
test_status_found = True
logger.debug(
"Test status QR code area is detected successfully at: %s.",
qr_code_areas[1],
)
else:
continue
capture_frame_num += 1
# finish when both mezzanine and test status area found
if test_status_found and mezzanine_found:
return first_pre_test_qr_time, qr_code_areas
# finish when end_iter_frame_num reached
if capture_frame_num > end_iter_frame_num:
logger.debug(
"End of configured search is reached, search until frame number %d.",
end_iter_frame_num,
)
# if not full area found set to unknown
if not mezzanine_found:
qr_code_areas[0] = []
logger.info("Mezzanine QR code areas not detected successfully.")
if not test_status_found:
logger.info("Test Runner QR code areas not detected successfully.")
return first_pre_test_qr_time, qr_code_areas
# if not full area found set to unknown
logger.debug("End of recording is reached.")
if not mezzanine_found:
qr_code_areas[0] = []
logger.debug("Mezzanine QR code areas not detected successfully.")
if not test_status_found:
logger.debug("Test Runner QR code areas not detected successfully.")
return first_pre_test_qr_time, qr_code_areas
def get_qr_code_area(
input_video_path_str: str,
global_configurations: GlobalConfigurations,
do_adaptive_threshold_scan: bool,
print_processed_frame: bool,
) -> Tuple:
"""get mezzanine qr code area from recording
this will used used for intensive scan to crop the QR code area
Args:
input_video_path_str: input recording file
global_configurations: to get configuration from
Returns:
first_pre_test_qr_time: first pre test qr code detection time in ms
qr_code_areas: qr_code_areas to crop when detecting qr code
pre_test_qr_code_area: qr_code_area to crop for pre test qr code
"""
logger.info("Search '%s' to get QR code location...", input_video_path_str)
qr_code_areas = [[], []]
first_pre_test_qr_time = 0
# default start search from 0s and finished at configuration from config.ini
starting_point_s = 0
search_qr_area_to = global_configurations.get_search_qr_area_to()
# read user input range parameter
qr_search_range = global_configurations.get_qr_search_range()
input_video_path = Path(input_video_path_str).resolve()
if not input_video_path.is_file():
raise Exception(f"Recorded file '{input_video_path}' not found")
vid_cap = cv2.VideoCapture(input_video_path_str)
fps: float = vid_cap.get(cv2.CAP_PROP_FPS)
width: int = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height: int = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = int(vid_cap.get(cv2.CAP_PROP_FRAME_COUNT)) / fps
# if user input range parameter defined then update defaults
if qr_search_range:
starting_point_s = qr_search_range[1]
qr_code_search_duration = qr_search_range[2]
if starting_point_s > duration:
raise ValueError("Starting point larger than recording duration.")
search_qr_area_to = starting_point_s + qr_code_search_duration
if fps < 1 or width <= 0 or height <= 0:
vid_cap.release()
open(input_video_path_str, "rb")
# File is readable but invalid.
raise OSError(errno.EINVAL, "Video is invalid")
if search_qr_area_to != 0:
try:
first_pre_test_qr_time, qr_code_areas = iter_to_get_qr_area(
vid_cap,
fps,
width,
height,
int(fps * search_qr_area_to),
do_adaptive_threshold_scan,
global_configurations,
starting_point_s,
print_processed_frame,
)
finally:
vid_cap.release()
else:
vid_cap.release()
# check qr_code_area
# if not defined we just crop left half for mezzanine
# right half for the test runner status
qr_area_margin = global_configurations.get_qr_area_margin()
for i in range(0, len(qr_code_areas)):
if qr_code_areas[i]:
qr_code_areas[i][0] -= qr_area_margin
qr_code_areas[i][1] -= qr_area_margin
qr_code_areas[i][2] += qr_area_margin
qr_code_areas[i][3] += qr_area_margin
if qr_code_areas[i][0] < 0:
qr_code_areas[i][0] = 0
if qr_code_areas[i][1] < 0:
qr_code_areas[i][1] = 0
if qr_code_areas[i][2] > width:
qr_code_areas[i][2] = width
if qr_code_areas[i][3] > height:
qr_code_areas[i][2] = height
else:
# when qr_code_areas can not be detected
if i == 0:
# left half for mezzanine
qr_code_areas[i] = [0, 0, int(width / 2), height]
logger.info(
"QR code area for full scan is set to "
"left half of image for mezzanine QR code."
)
else:
# right half for test status
qr_code_areas[i] = [int(width / 2), 0, width, height]
logger.info(
"QR code area for full scan is set to "
"right half of image for test status QR code."
)
pre_test_qr_code_area = [int(width / 4), 0, int((width / 4) * 3), height]
return first_pre_test_qr_time, qr_code_areas, pre_test_qr_code_area
def run(
input_video_files: List[str],
log_manager: LogManager,
global_configurations: GlobalConfigurations,
do_adaptive_threshold_scan: bool,
print_processed_frame: bool,
):
"""
Calibrate camera and set camera calibration offset when calibration_file_path is given.
Runs the observation framework process.
"""
logger.info("Device Observation Framework (V%s) analysis started!", VERSION)
calibration_offset = 0
calibration_file_path = global_configurations.get_calibration_file_path()
if calibration_file_path:
logger.info("Camera calibration started, please wait.")
calibration_offset = calibrate_camera(
calibration_file_path, global_configurations, print_processed_frame
)
logger.info(
"Camera calibration offset %.2fms is applied to the observations.",
calibration_offset,
)
observation_framework = None
file_index = 0
qr_search_range = global_configurations.get_qr_search_range()
if qr_search_range:
file_index = qr_search_range[0]
starting_camera_frame_number = 0
if (global_configurations.get_system_mode()) == "debug":
logger.info(
"Device Observation Framework is running in 'debug' mode, "
"it reads local configuration file from the 'configuration' folder "
"and will not post results back to the Test Runner. "
"The results will be saved locally. "
)
if do_adaptive_threshold_scan:
logger.info("Intensive QR code scanning with an additional adaptiveThreshold.")
(
_first_pre_test_qr_time,
qr_code_areas,
pre_test_qr_code_area,
) = get_qr_code_area(
input_video_files[file_index],
global_configurations,
do_adaptive_threshold_scan,
print_processed_frame,
)
logger.info(
"QR code area for full scan is set to %s for mezzanine QR code, and %s "
"for test status QR code.",
qr_code_areas[0],
qr_code_areas[1],
)
# add pre-test qr code area
# this removes the first and last vertical quarter of image
if global_configurations.get_enable_cropped_scan_for_pre_test_qr():
qr_code_areas.append(pre_test_qr_code_area)
logger.info(
"Additional QR code scan with roughly cropped area "
"%s for pre-test QR code is enabled.",
qr_code_areas[2],
)
for i in range(0, len(input_video_files)):
input_video_path_str = input_video_files[i]
logger.info("Analysing recording '%s'.", input_video_path_str)
input_video_path = Path(input_video_path_str).resolve()
if not input_video_path.is_file():
raise Exception(f"Recorded file '{input_video_path}' not found")
vid_cap = cv2.VideoCapture(input_video_path_str)
fps: float = vid_cap.get(cv2.CAP_PROP_FPS)
width: int = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height: int = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if fps < 1 or width <= 0 or height <= 0:
vid_cap.release()
open(input_video_path_str, "rb")
# File is readable but invalid.
raise OSError(errno.EINVAL, "Video is invalid")
try:
if observation_framework is None:
observation_framework = ObservationFrameworkProcessor(
calibration_offset,
log_manager,
global_configurations,
fps,
do_adaptive_threshold_scan,
)
observation_framework.extract_audio(
input_video_path_str, starting_camera_frame_number
)
last_camera_frame_number = observation_framework.iter_qr_codes_in_video(
vid_cap,
starting_camera_frame_number,
qr_code_areas,
print_processed_frame,
)
starting_camera_frame_number += last_camera_frame_number
finally:
vid_cap.release()
if observation_framework:
for input_video_path_str in input_video_files:
rename_input_file(
input_video_path_str,
input_video_path,
observation_framework.pre_test_qr_code.session_token,
)
logger.info("The Device Observation Framework analysis has ended.")
def clear_path(file_path: str, session_log_threshold: int) -> None:
"""log path: list of session log files and event files
result path: list of session folders contain result file
if number of logs or results exceeded configured threshold
delete some file to release the disk space
"""
if os.path.isdir(file_path):
full_path = [
os.path.join(file_path, f)
for f in os.listdir(file_path)
if os.path.isdir(os.path.join(file_path, f))
]
num_session = len(full_path)
num_of_session_to_delete = num_session - session_log_threshold
if num_of_session_to_delete > 0:
oldest = sorted(full_path, key=os.path.getctime)[0:num_of_session_to_delete]
logger.info(
"Removing oldest %d file(s): %s!", num_of_session_to_delete, oldest
)
for oldest_session in oldest:
shutil.rmtree(oldest_session)
def clear_up(global_configurations: GlobalConfigurations) -> None:
"""check log and result path to release the disk space
global_configurations: to get threshold and path to clear
"""
log_file_path = global_configurations.get_log_file_path()
result_file_path = global_configurations.get_result_file_path()
session_log_threshold = global_configurations.get_session_log_threshold()
clear_path(log_file_path, session_log_threshold)
clear_path(result_file_path, session_log_threshold)
def check_python_version() -> bool:
"""Check minimum Python version is being used.
Returns:
True if version is OK.
"""
if sys.version_info.major == 3 and sys.version_info.minor >= 9:
return True
logger.critical(
"Aborting! Python version 3.9 or greater is required.\nCurrent Python version is %d.%d.",
sys.version_info.major,
sys.version_info.minor,
)
return False
def process_input_video_files(
input_str: str, global_configurations: GlobalConfigurations
) -> list:
"""
process input video files from input file string
and return input video file path in a list
"""
input_video_files = []
input_video_path = Path(input_str).resolve()
if not os.path.isabs(input_video_path):
input_video_path = os.path.abspath(input_video_path)
if input_video_path.is_dir():
input_files = os.listdir(input_video_path)
for input_file in input_files:
# skip audio files
if ".wav" in input_file:
continue
full_path = os.path.join(input_video_path, input_file)
if Path(full_path).resolve().is_file():
input_video_files.append(full_path)
else:
logger.warning("%s is not a file, skipped!", full_path)
else:
input_video_files.append(str(input_video_path))
# sort input files based on the configuration
sort_input_files_by = global_configurations.get_sort_input_files_by()
if sort_input_files_by == "filename":
sorted(input_video_files)
else:
input_video_files.sort(key=os.path.getmtime)
return input_video_files
def process_run(
input_str: str,
log_manager: LogManager,
global_configurations: GlobalConfigurations,
do_adaptive_threshold_scan: bool,
):
"""process run and handel exceptions"""
input_video_files = process_input_video_files(input_str, global_configurations)
try:
run(
input_video_files,
log_manager,
global_configurations,
do_adaptive_threshold_scan,
True, # print out processed frame
)
except ObsFrameTerminate as e:
logger.exception(
"Serious error is detected!\n%s\nSystem is terminating!", e, exc_info=False
)
clear_up(global_configurations)
sys.exit(1)
except ConfigError as e:
logger.exception("Serious error detected!\n%s", e, exc_info=False)
clear_up(global_configurations)
sys.exit(1)
except ObsFrameError as e:
logger.exception("Serious error detected!\n%s", e, exc_info=False)
clear_up(global_configurations)
sys.exit(1)
except Exception as e:
logger.exception(
"Serious error is detected!\n%s: %s", e, traceback.format_exc()
)
clear_up(global_configurations)
sys.exit(1)
logger.info(
"The Device Observation Framework has completed the analysis of all selected recordings, "
"The Device Observation Framework is exiting."
)
clear_up(global_configurations)
def main() -> None:
"""Entry point."""
parser = argparse.ArgumentParser(
description=f"DPCTF Device Observation Framework (v{VERSION})"
)
parser.add_argument(
"--input", required=True, help="Input recording file / path to analyse."
)
parser.add_argument(
"--log",
nargs="+", # Allow 1 or 2 values
help="Logging levels for log file writing and console output.",
default=["info", "info"],
choices=["info", "debug"],
)
parser.add_argument(
"--scan",
help="Scan depth for QR code detection.",
default="general",
choices=["general", "intensive"],
)
parser.add_argument(
"--mode",
help="System mode is for development purposes only.",
default="",
choices=["", "debug"],
)
parser.add_argument(
"--ignore_corrupted", help="Specific condition to ignore.", default=""
)
parser.add_argument(
"--range",
help="Search QR codes to crop the QR code area for better detection. "
"QR codes area detection includes mezzanine QR codes and Test Status QR code.",
default="",
metavar="id(file_index):start(s):duration(s)",
)
parser.add_argument(
"--calibration", help="Camera calibration recording file path.", default=""
)
args = parser.parse_args()
do_adaptive_threshold_scan = False
if args.scan == "intensive":
do_adaptive_threshold_scan = True
global_configurations = GlobalConfigurations()
global_configurations.set_ignore_corrupted(args.ignore_corrupted)
global_configurations.set_system_mode(args.mode)
global_configurations.set_qr_search_range(args.range)
global_configurations.set_calibration_file_path(args.calibration)
log_file_path = global_configurations.get_log_file_path()
log_file = log_file_path + "/events.log"
if len(args.log) == 1:
args.log = [args.log[0], args.log[0]]
log_manager = LogManager(log_file, args.log[0], args.log[1])
if not check_python_version():
sys.exit(1)
process_run(
args.input,
log_manager,
global_configurations,
do_adaptive_threshold_scan,
)
if __name__ == "__main__":
main()