diff --git a/CHANGELOG.md b/CHANGELOG.md index 29dd5c9..e7c8a07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), [markdownlint](https://dlaa.me/markdownlint/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.1.0] - 2023-12-21 + +### Changed in 1.1.0 + +- Updates + ## [1.0.2] - 2023-03-08 ### Removed in 1.0.2 diff --git a/Python/Tasks/Deleting/DeleteFutures.py b/Python/Tasks/Deleting/DeleteFutures.py index 36123cd..eabdb74 100755 --- a/Python/Tasks/Deleting/DeleteFutures.py +++ b/Python/Tasks/Deleting/DeleteFutures.py @@ -6,21 +6,27 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def del_record(engine, rec_to_del): record_dict = json.loads(rec_to_del) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.deleteRecord(data_source, record_id) @@ -28,16 +34,20 @@ def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} deletes, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} deletes," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() @@ -45,49 +55,57 @@ def futures_del(engine, input_file): prev_time = time.time() success_recs = error_recs = 0 - with open(input_file, 'r') as in_file: + with open(input_file, "r") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(del_record, engine, record): record for record in itertools.islice(in_file, executor._max_workers)} + futures = { + executor.submit(del_record, engine, record): record + for record in itertools.islice(in_file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 + record = in_file.readline() + if record: + futures[executor.submit(del_record, engine, record)] = ( + record + ) + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 10000 == 0: engine_stats(engine) finally: - futures.pop(f) - - record = in_file.readline() - if record: - futures[executor.submit(del_record, engine, record)] = record + del futures[f] - print(f'Successfully deleted {success_recs} records, with {error_recs} errors') + print( + f"Successfully deleted {success_recs:,} records, with" + f" {error_recs:,} errors" + ) try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - futures_del(g2_engine, '../../../Resources/Data/del-10K.json') + g2_engine.init("G2Engine", engine_config_json, False) + futures_del(g2_engine, "../../../Resources/Data/del-10K.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Deleting/DeleteLoop.py b/Python/Tasks/Deleting/DeleteLoop.py index 82100d4..67315d8 100755 --- a/Python/Tasks/Deleting/DeleteLoop.py +++ b/Python/Tasks/Deleting/DeleteLoop.py @@ -3,51 +3,56 @@ import json import os import sys -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def del_records_from_file(engine, input_file): - success_recs = 0 + success_recs = error_recs = 0 - with open(input_file, 'r') as file: + with open(input_file, "r") as file: for rec_to_add in file: try: record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.deleteRecord(data_source, record_id, rec_to_add) - except G2BadInputException as ex: - mock_logger('ERROR', ex, rec_to_add) - except G2RetryableException as ex: - mock_logger('WARN', ex, rec_to_add) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, rec_to_add) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, rec_to_add) + error_recs += 1 + except G2RetryableException as err: + mock_logger("WARN", err, rec_to_add) + error_recs += 1 + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, rec_to_add) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, rec_to_add) else: success_recs += 1 if success_recs % 1000 == 0: - print(f'Processed {success_recs} deletes') + print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors") - print(f'Successfully deleted {success_recs} records') + print(f"Successfully deleted {success_recs:,} records, with {error_recs:,} errors") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - del_records_from_file(g2_engine, '../../../Resources/Data/del-10K.json') + g2_engine.init("G2Engine", engine_config_json, False) + del_records_from_file(g2_engine, "../../../Resources/Data/del-10K.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Deleting/DeleteWithInfoFutures.py b/Python/Tasks/Deleting/DeleteWithInfoFutures.py index a49d69b..1f1f6bc 100755 --- a/Python/Tasks/Deleting/DeleteWithInfoFutures.py +++ b/Python/Tasks/Deleting/DeleteWithInfoFutures.py @@ -6,40 +6,50 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def del_record(engine, rec_to_del): with_info = bytearray() record_dict = json.loads(rec_to_del) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.deleteRecordWithInfo(data_source, record_id, with_info) - return with_info.decode() + '\n' + return with_info.decode() def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} deletes, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} deletes," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() @@ -47,55 +57,65 @@ def futures_del(engine, input_file, output_file): prev_time = time.time() success_recs = error_recs = 0 - with open(output_file, 'w') as out_file: - with open(input_file, 'r') as in_file: + with open(output_file, "w") as out_file: + with open(input_file, "r") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(del_record, engine, record): record for record in itertools.islice(in_file, executor._max_workers)} + futures = { + executor.submit(del_record, engine, record): record + for record in itertools.islice(in_file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: result = f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 - out_file.write(result) + record = in_file.readline() + if record: + futures[executor.submit(del_record, engine, record)] = ( + record + ) + + out_file.write(f"{result}\n") + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 10000 == 0: engine_stats(engine) finally: - futures.pop(f) - - record = in_file.readline() - if record: - futures[executor.submit(del_record, engine, record)] = record + del futures[f] - print(f'Successfully deleted {success_recs} records, with {error_recs} errors') - print(f'With info responses written to {output_file}') + print( + f"Successfully deleted {success_recs:,} records, with" + f" {error_recs:,} errors" + ) + print(f"With info responses written to {output_file}") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) futures_del( g2_engine, - '../../../Resources/Data/del-10K.json', - '../../../Resources/Output/Del_File_WithInfo.json') + "../../../Resources/Data/del-10K.json", + "../../../Resources/Output/Del_File_WithInfo.json", + ) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Deleting/README.md b/Python/Tasks/Deleting/README.md index 9984ced..36ea09d 100644 --- a/Python/Tasks/Deleting/README.md +++ b/Python/Tasks/Deleting/README.md @@ -12,8 +12,3 @@ Deleting a record only requires the data source code and record ID for the recor * Read and delete source records from a file using multiple threads * Collect the response from the [with info](../../../README.md#with-info) version of the API and write it to a file -## API Calls -* [deleteRecord](https://github.com/antaenc/senzing-code-snippets/blob/f2556a2152a4524780f63c1e66a868f53419dd60/Python/APIs/G2Engine/Data_Manipulation/deleteRecord.py) - * Deletes a single record -* [deleteRecordWithInfo](https://github.com/antaenc/senzing-code-snippets/blob/f2556a2152a4524780f63c1e66a868f53419dd60/Python/APIs/G2Engine/Data_Manipulation/deleteRecordWithInfo.py) - * Deletes a single record and returns information outlining any entities affected by the deletion of the record. For further information see [with info](../../../README.md#with-info) diff --git a/Python/Tasks/Initialization/G2ModuleIniToJson.py b/Python/Tasks/Initialization/G2ModuleIniToJson.py index fa0576f..af6c524 100755 --- a/Python/Tasks/Initialization/G2ModuleIniToJson.py +++ b/Python/Tasks/Initialization/G2ModuleIniToJson.py @@ -1,19 +1,15 @@ #! /usr/bin/env python3 import configparser -import os -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +ini_file_name = "../../../Resources/G2Module/G2Module.ini" +engine_config_json = {} -if not engine_config_json: - ini_file_name = '../../../Resources/G2Module/G2Module.ini' - engine_config_json = {} +cfgp = configparser.ConfigParser() +cfgp.optionxform = str +cfgp.read(ini_file_name) - cfgp = configparser.ConfigParser() - cfgp.optionxform = str - cfgp.read(ini_file_name) - - for section in cfgp.sections(): - engine_config_json[section] = dict(cfgp.items(section)) +for section in cfgp.sections(): + engine_config_json[section] = dict(cfgp.items(section)) print(engine_config_json) diff --git a/Python/Tasks/Initialization/PurgeRepository.py b/Python/Tasks/Initialization/PurgeRepository.py new file mode 100755 index 0000000..ee3d457 --- /dev/null +++ b/Python/Tasks/Initialization/PurgeRepository.py @@ -0,0 +1,30 @@ +#! /usr/bin/env python3 + +import os +import sys +from senzing import ( + G2Engine, + G2Exception, +) + +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) + +purge_msg = """ +********** WARNING ********** +This example will purge all currently loaded data from the senzing database! +Before proceeding, all instances of senzing (custom code, rest api, redoer, etc.) must be shut down. +********** WARNING ********** + +Are you sure you want to continue and purge the senzing database? (y/n) """ + +if input(purge_msg) not in ["y", "Y", "yes", "YES"]: + sys.exit() + +try: + g2_engine = G2Engine() + g2_engine.init("G2Engine", engine_config_json, False) + g2_engine.purgeRepository() + print("Senzing repository purged") + g2_engine.destroy() +except G2Exception as err: + print(err) diff --git a/Python/Tasks/Initialization/README.md b/Python/Tasks/Initialization/README.md index 4fa8bb9..e889073 100644 --- a/Python/Tasks/Initialization/README.md +++ b/Python/Tasks/Initialization/README.md @@ -4,4 +4,4 @@ * G2ModuleIniToJson.py * The snippets herein utilize the `SENZING_ENGINE_CONFIGURATION_JSON` environment variable for Senzing engine object initialization * If you are familiar with working with a Senzing project you will be aware the same configuration data is held in the G2Module.ini file - * Example to check for `SENZING_ENGINE_CONFIGURATION_JSON` and if not present convert a G2Module.ini file to JSON to use on engine object initialization calls + * Example to convert G2Module.ini to `SENZING_ENGINE_CONFIGURATION_JSON` JSON to use on engine object initialization calls diff --git a/Python/Tasks/Loading/Add100KFutures.py b/Python/Tasks/Loading/Add100KFutures.py index 5bfa622..9a9babc 100755 --- a/Python/Tasks/Loading/Add100KFutures.py +++ b/Python/Tasks/Loading/Add100KFutures.py @@ -6,21 +6,28 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) + +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_record(engine, rec_to_add): record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecord(data_source, record_id, rec_to_add) @@ -28,16 +35,20 @@ def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} adds, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} adds," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() @@ -45,50 +56,57 @@ def futures_add(engine, input_file): prev_time = time.time() success_recs = error_recs = 0 - with open(input_file, 'r') as file: + with open(input_file, "r") as file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(add_record, engine, record): record for record in itertools.islice(file, executor._max_workers)} + futures = { + executor.submit(add_record, engine, record): record + for record in itertools.islice(file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 + record = file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = ( + record + ) + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 10000 == 0: engine_stats(engine) finally: - futures.pop(f) - - record = file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print(f'Successfully loaded {success_recs} records, with {error_recs} errors') + print( + f"Successfully loaded {success_recs:,} records, with" + f" {error_recs:,} errors" + ) try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - futures_add(g2_engine, '../../../Resources/Data/load-100k.json') - + g2_engine.init("G2Engine", engine_config_json, False) + futures_add(g2_engine, "../../../Resources/Data/load-100k.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Loading/Add10KFutures.py b/Python/Tasks/Loading/Add10KFutures.py index baaaa53..248e7c0 100755 --- a/Python/Tasks/Loading/Add10KFutures.py +++ b/Python/Tasks/Loading/Add10KFutures.py @@ -6,21 +6,28 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) + +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_record(engine, rec_to_add): record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecord(data_source, record_id, rec_to_add) @@ -28,16 +35,20 @@ def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} adds, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} adds," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() @@ -45,50 +56,57 @@ def futures_add(engine, input_file): prev_time = time.time() success_recs = error_recs = 0 - with open(input_file, 'r') as file: + with open(input_file, "r") as file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(add_record, engine, record): record for record in itertools.islice(file, executor._max_workers)} + futures = { + executor.submit(add_record, engine, record): record + for record in itertools.islice(file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 + record = file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = ( + record + ) + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 10000 == 0: engine_stats(engine) finally: - futures.pop(f) - - record = file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print(f'Successfully loaded {success_recs} records, with {error_recs} errors') + print( + f"Successfully loaded {success_recs:,} records, with" + f" {error_recs:,} errors" + ) try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - futures_add(g2_engine, '../../../Resources/Data/load-10K.json') - + g2_engine.init("G2Engine", engine_config_json, False) + futures_add(g2_engine, "../../../Resources/Data/load-10K.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Loading/Add10KQueue.py b/Python/Tasks/Loading/Add10KQueue.py index 3fe860b..f06520a 100755 --- a/Python/Tasks/Loading/Add10KQueue.py +++ b/Python/Tasks/Loading/Add10KQueue.py @@ -6,21 +6,27 @@ import sys import time from multiprocessing import Process, Queue -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_record(engine, rec_to_add): record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecord(data_source, record_id, rec_to_add) @@ -28,25 +34,29 @@ def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} adds, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} adds," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() def producer(input_file, queue): - with open(input_file, 'r') as file: + with open(input_file, "r") as file: for record in file: queue.put(record, block=True) - print(f'All records read from {input_file}') + print(f"All records read from {input_file}") def consumer(engine, queue): @@ -54,57 +64,72 @@ def consumer(engine, queue): success_recs = error_recs = 0 with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(add_record, engine, queue.get()): _ for _ in range(executor._max_workers)} + futures = { + executor.submit(add_record, engine, queue.get()): _ + for _ in range(executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 + if not queue.empty(): + record = queue.get() + futures[executor.submit(add_record, engine, record)] = record + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) finally: - futures.pop(f) - - if not queue.empty(): - record = queue.get() - futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print(f'Successfully loaded {success_recs} records, with {error_recs} errors') + print( + f"Successfully loaded {success_recs:,} records, with {error_recs:,} errors" + ) -load_file = '../../../Resources/Data/load-10K.json' +load_file = "../../../Resources/Data/load-10K.json" try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) input_queue = Queue(maxsize=200) - producer_proc = Process(target=producer, args=(load_file, input_queue,)) + producer_proc = Process( + target=producer, + args=( + load_file, + input_queue, + ), + ) producer_proc.start() - consumer_proc = Process(target=consumer, args=(g2_engine, input_queue,)) + consumer_proc = Process( + target=consumer, + args=( + g2_engine, + input_queue, + ), + ) consumer_proc.start() producer_proc.join() consumer_proc.join() g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Loading/Add50KFutures.py b/Python/Tasks/Loading/Add50KFutures.py index 001d3b8..2488cdb 100755 --- a/Python/Tasks/Loading/Add50KFutures.py +++ b/Python/Tasks/Loading/Add50KFutures.py @@ -6,21 +6,27 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_record(engine, rec_to_add): record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecord(data_source, record_id, rec_to_add) @@ -28,16 +34,20 @@ def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} adds, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} adds," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() @@ -45,49 +55,57 @@ def futures_add(engine, input_file): prev_time = time.time() success_recs = error_recs = 0 - with open(input_file, 'r') as file: + with open(input_file, "r") as file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(add_record, engine, record): record for record in itertools.islice(file, executor._max_workers)} + futures = { + executor.submit(add_record, engine, record): record + for record in itertools.islice(file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 + record = file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = ( + record + ) + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 10000 == 0: engine_stats(engine) finally: - futures.pop(f) - - record = file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print(f'Successfully loaded {success_recs} records, with {error_recs} errors') + print( + f"Successfully loaded {success_recs:,} records, with" + f" {error_recs:,} errors" + ) try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - futures_add(g2_engine, '../../../Resources/Data/load-50k-with-errors.json') + g2_engine.init("G2Engine", engine_config_json, False) + futures_add(g2_engine, "../../../Resources/Data/load-50k-with-errors.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception)as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Loading/Add50KWithInfoFutures.py b/Python/Tasks/Loading/Add50KWithInfoFutures.py index e4c11e7..d157f01 100755 --- a/Python/Tasks/Loading/Add50KWithInfoFutures.py +++ b/Python/Tasks/Loading/Add50KWithInfoFutures.py @@ -6,40 +6,50 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_record(engine, rec_to_add): with_info = bytearray() record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecordWithInfo(data_source, record_id, rec_to_add, with_info) - return with_info.decode() + '\n' + return with_info.decode() def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} adds, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} adds," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() @@ -47,55 +57,65 @@ def futures_add(engine, input_file, output_file): prev_time = time.time() success_recs = error_recs = 0 - with open(output_file, 'w') as out_file: - with open(input_file, 'r') as in_file: + with open(output_file, "w") as out_file: + with open(input_file, "r") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(add_record, engine, record): record for record in itertools.islice(in_file, executor._max_workers)} + futures = { + executor.submit(add_record, engine, record): record + for record in itertools.islice(in_file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: result = f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 - out_file.write(result) + record = in_file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = ( + record + ) + + out_file.write(f"{result}\n") + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 10000 == 0: engine_stats(engine) finally: - futures.pop(f) - - record = in_file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print(f'Successfully loaded {success_recs} records, with {error_recs} errors') - print(f'With info responses written to {output_file}') + print( + f"Successfully loaded {success_recs:,} records, with" + f" {error_recs:,} errors" + ) + print(f"With info responses written to {output_file}") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) futures_add( g2_engine, - '../../../Resources/Data/load-50k-with-errors.json', - '../../../Resources/Output/Add_File_WithInfo.json') + "../../../Resources/Data/load-50k-with-errors.json", + "../../../Resources/Output/Add_File_WithInfo.json", + ) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Loading/Add5KLoop.py b/Python/Tasks/Loading/Add5KLoop.py index a127658..9cfcc49 100755 --- a/Python/Tasks/Loading/Add5KLoop.py +++ b/Python/Tasks/Loading/Add5KLoop.py @@ -3,51 +3,55 @@ import json import os import sys -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_records_from_file(engine, input_file): - success_recs = 0 - - with open(input_file, 'r') as file: + success_recs = error_recs = 0 + with open(input_file, "r") as file: for rec_to_add in file: try: record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecord(data_source, record_id, rec_to_add) - except G2BadInputException as ex: - mock_logger('ERROR', ex, rec_to_add) - except G2RetryableException as ex: - mock_logger('WARN', ex, rec_to_add) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, rec_to_add) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, rec_to_add) + error_recs += 1 + except G2RetryableException as err: + mock_logger("WARN", err, rec_to_add) + error_recs += 1 + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, rec_to_add) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, rec_to_add) else: success_recs += 1 if success_recs % 500 == 0: - print(f'Processed {success_recs} adds') + print(f"Processed {success_recs:,} adds, with {error_recs:,} errors") - print(f'Successfully added {success_recs} records') + print(f"Successfully loaded {success_recs:,} records, with {error_recs:,} errors") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - add_records_from_file(g2_engine, '../../../Resources/Data/load-5K.json') + g2_engine.init("G2Engine", engine_config_json, False) + add_records_from_file(g2_engine, "../../../Resources/Data/load-5K.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + print(err) diff --git a/Python/Tasks/Loading/AddRecords.py b/Python/Tasks/Loading/AddRecords.py index 248de15..8351368 100755 --- a/Python/Tasks/Loading/AddRecords.py +++ b/Python/Tasks/Loading/AddRecords.py @@ -2,29 +2,103 @@ import json import os -import sys -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import G2Engine, G2Exception -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) records = [ - {"DATA_SOURCE": "TEST", "RECORD_ID": "1001", "RECORD_TYPE": "PERSON", "PRIMARY_NAME_LAST": "Smith", "PRIMARY_NAME_FIRST": "Robert", "DATE_OF_BIRTH": "12/11/1978", "ADDR_TYPE": "MAILING", "ADDR_LINE1": "123 Main Street, Las Vegas NV 89132", "PHONE_TYPE": "HOME", "PHONE_NUMBER": "702-919-1300", "EMAIL_ADDRESS": "bsmith@work.com", "DATE": "1/2/18", "STATUS": "Active", "AMOUNT": "100"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "1002", "RECORD_TYPE": "PERSON", "PRIMARY_NAME_LAST": "Smith II", "PRIMARY_NAME_FIRST": "Bob", "DATE_OF_BIRTH": "11/12/1978", "ADDR_TYPE": "HOME", "ADDR_LINE1": "1515 Adela Lane", "ADDR_CITY": "Las Vegas", "ADDR_STATE": "NV", "ADDR_POSTAL_CODE": "89111", "PHONE_TYPE": "MOBILE", "PHONE_NUMBER": "702-919-1300", "DATE": "3/10/17", "STATUS": "Inactive", "AMOUNT": "200"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "1003", "RECORD_TYPE": "PERSON", "PRIMARY_NAME_LAST": "Smith", "PRIMARY_NAME_FIRST": "Bob", "PRIMARY_NAME_MIDDLE": "J", "DATE_OF_BIRTH": "12/11/1978", "EMAIL_ADDRESS": "bsmith@work.com", "DATE": "4/9/16", "STATUS": "Inactive", "AMOUNT": "300"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "1004", "RECORD_TYPE": "PERSON", "PRIMARY_NAME_LAST": "Smith", "PRIMARY_NAME_FIRST": "B", "ADDR_TYPE": "HOME", "ADDR_LINE1": "1515 Adela Ln", "ADDR_CITY": "Las Vegas", "ADDR_STATE": "NV", "ADDR_POSTAL_CODE": "89132", "EMAIL_ADDRESS": "bsmith@work.com", "DATE": "1/5/15", "STATUS": "Inactive", "AMOUNT": "400"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "1005", "RECORD_TYPE": "PERSON", "PRIMARY_NAME_LAST": "Smith", "PRIMARY_NAME_FIRST": "Rob", "PRIMARY_NAME_MIDDLE": "E", "DRIVERS_LICENSE_NUMBER": "112233", "DRIVERS_LICENSE_STATE": "NV", "ADDR_TYPE": "MAILING", "ADDR_LINE1": "123 E Main St", "ADDR_CITY": "Henderson", "ADDR_STATE": "NV", "ADDR_POSTAL_CODE": "89132", "DATE": "7/16/19", "STATUS": "Active", "AMOUNT": "500"}, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1001", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_LAST": "Smith", + "PRIMARY_NAME_FIRST": "Robert", + "DATE_OF_BIRTH": "12/11/1978", + "ADDR_TYPE": "MAILING", + "ADDR_LINE1": "123 Main Street, Las Vegas NV 89132", + "PHONE_TYPE": "HOME", + "PHONE_NUMBER": "702-919-1300", + "EMAIL_ADDRESS": "bsmith@work.com", + "DATE": "1/2/18", + "STATUS": "Active", + "AMOUNT": "100", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1002", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_LAST": "Smith II", + "PRIMARY_NAME_FIRST": "Bob", + "DATE_OF_BIRTH": "11/12/1978", + "ADDR_TYPE": "HOME", + "ADDR_LINE1": "1515 Adela Lane", + "ADDR_CITY": "Las Vegas", + "ADDR_STATE": "NV", + "ADDR_POSTAL_CODE": "89111", + "PHONE_TYPE": "MOBILE", + "PHONE_NUMBER": "702-919-1300", + "DATE": "3/10/17", + "STATUS": "Inactive", + "AMOUNT": "200", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1003", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_LAST": "Smith", + "PRIMARY_NAME_FIRST": "Bob", + "PRIMARY_NAME_MIDDLE": "J", + "DATE_OF_BIRTH": "12/11/1978", + "EMAIL_ADDRESS": "bsmith@work.com", + "DATE": "4/9/16", + "STATUS": "Inactive", + "AMOUNT": "300", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1004", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_LAST": "Smith", + "PRIMARY_NAME_FIRST": "B", + "ADDR_TYPE": "HOME", + "ADDR_LINE1": "1515 Adela Ln", + "ADDR_CITY": "Las Vegas", + "ADDR_STATE": "NV", + "ADDR_POSTAL_CODE": "89132", + "EMAIL_ADDRESS": "bsmith@work.com", + "DATE": "1/5/15", + "STATUS": "Inactive", + "AMOUNT": "400", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1005", + "RECORD_TYPE": "PERSON", + "PRIMARY_NAME_LAST": "Smith", + "PRIMARY_NAME_FIRST": "Rob", + "PRIMARY_NAME_MIDDLE": "E", + "DRIVERS_LICENSE_NUMBER": "112233", + "DRIVERS_LICENSE_STATE": "NV", + "ADDR_TYPE": "MAILING", + "ADDR_LINE1": "123 E Main St", + "ADDR_CITY": "Henderson", + "ADDR_STATE": "NV", + "ADDR_POSTAL_CODE": "89132", + "DATE": "7/16/19", + "STATUS": "Active", + "AMOUNT": "500", + }, ] try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) for record in records: - DATA_SOURCE = record['DATA_SOURCE'] - RECORD_ID = record['RECORD_ID'] - g2_engine.addRecord(DATA_SOURCE, RECORD_ID, json.dumps(record)) - print(f'Record {RECORD_ID} added') + data_source = record["DATA_SOURCE"] + record_id = record["RECORD_ID"] + g2_engine.addRecord(data_source, record_id, json.dumps(record)) + print(f"Record {record_id} added") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + print(err) diff --git a/Python/Tasks/Loading/AddTruthsetLoop.py b/Python/Tasks/Loading/AddTruthsetLoop.py index 974d65d..2e0990b 100755 --- a/Python/Tasks/Loading/AddTruthsetLoop.py +++ b/Python/Tasks/Loading/AddTruthsetLoop.py @@ -3,54 +3,59 @@ import json import os import sys -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_records_from_file(engine, input_file): - success_recs = 0 + success_recs = error_recs = 0 - with open(input_file, 'r') as file: - print(f'\nAdding records from {input_file}') + with open(input_file, "r") as file: + print(f"\nAdding records from {input_file}") for rec_to_add in file: try: record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecord(data_source, record_id, rec_to_add) - except G2BadInputException as ex: - mock_logger('ERROR', ex, rec_to_add) - except G2RetryableException as ex: - mock_logger('WARN', ex, rec_to_add) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, rec_to_add) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, rec_to_add) + error_recs += 1 + except G2RetryableException as err: + mock_logger("WARN", err, rec_to_add) + error_recs += 1 + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, rec_to_add) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, rec_to_add) else: success_recs += 1 if success_recs % 500 == 0: - print(f'Processed {success_recs} adds') + print(f"Processed {success_recs:,} adds, with {error_recs:,} errors") - print(f'Successfully added {success_recs} records') + print(f"Successfully loaded {success_recs:,} records, with {error_recs:,} errors") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - add_records_from_file(g2_engine, '../../../Resources/Data/truth/customers.json') - add_records_from_file(g2_engine, '../../../Resources/Data/truth/reference.json') - add_records_from_file(g2_engine, '../../../Resources/Data/truth/watchlist.json') + g2_engine.init("G2Engine", engine_config_json, False) + add_records_from_file(g2_engine, "../../../Resources/Data/truth/customers.json") + add_records_from_file(g2_engine, "../../../Resources/Data/truth/reference.json") + add_records_from_file(g2_engine, "../../../Resources/Data/truth/watchlist.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + print(err) diff --git a/Python/Tasks/Loading/README.md b/Python/Tasks/Loading/README.md index dd697d9..90c882a 100644 --- a/Python/Tasks/Loading/README.md +++ b/Python/Tasks/Loading/README.md @@ -17,8 +17,3 @@ The loading snippets outline adding new source records. Adding source records in * **AddTruthsetLoop.py** * Read and load from multiple source files, adding a sample truth set -## API Calls -* [addRecord](../../../Python/APIs/G2Engine/Data_Manipulation/addRecord.py) - * Adds a single record -* [addRecordWithInfo](../../../Python/APIs/G2Engine/Data_Manipulation/addRecordWithInfo.py) - * Adds a single record and returns information outlining any entities affected by the addition of the record. For further information see [With Info](../../../README.md#with-info) diff --git a/Python/Tasks/Redo/Add10KWithRedo.py b/Python/Tasks/Redo/Add10KWithRedo.py index d2d3ad8..f9b59df 100755 --- a/Python/Tasks/Redo/Add10KWithRedo.py +++ b/Python/Tasks/Redo/Add10KWithRedo.py @@ -3,50 +3,56 @@ import json import os import sys -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def add_records_from_file(engine, input_file): - success_recs = 0 + success_recs = error_recs = 0 - with open(input_file, 'r') as file: + with open(input_file, "r") as file: for rec_to_add in file: try: record_dict = json.loads(rec_to_add) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.addRecord(data_source, record_id, rec_to_add) - except G2BadInputException as ex: - mock_logger('ERROR', ex, rec_to_add) - except G2RetryableException as ex: - mock_logger('WARN', ex, rec_to_add) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, rec_to_add) - raise ex - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, rec_to_add) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, rec_to_add) + error_recs += 1 + except G2RetryableException as err: + mock_logger("WARN", err, rec_to_add) + error_recs += 1 + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, rec_to_add) + raise err else: success_recs += 1 if success_recs % 1000 == 0: - print(f'Processed {success_recs} adds') + print(f"Processed {success_recs:,} adds, with {error_recs:,} errors") - print(f'Successfully added {success_recs} records') + print(f"Successfully added {success_recs:,} records, with {error_recs:,} errors") def process_redo(engine): - success_recs = 0 + success_recs = error_recs = 0 - print('\nStarting to process redo records...') + print("\nStarting to process redo records...") while True: try: @@ -58,30 +64,37 @@ def process_redo(engine): success_recs += 1 if success_recs % 100 == 0: - print(f'Processed {success_recs} redo records') - except G2BadInputException as ex: - mock_logger('ERROR', ex) - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print( + f"Processed {success_recs:,} redo records, with" + f" {error_recs:,} errors" + ) + except G2BadInputException as err: + mock_logger("ERROR", err) + error_recs += 1 + except G2RetryableException as err: + mock_logger("WARN", err) + error_recs += 1 + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err) raise - print(f'Successfully processed {success_recs} redo records') + print( + f"Successfully processed {success_recs:,} redo records, with" + f" {error_recs:,} errors" + ) try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - add_records_from_file(g2_engine, '../../../Resources/Data/load-10K.json') + g2_engine.init("G2Engine", engine_config_json, False) + add_records_from_file(g2_engine, "../../../Resources/Data/load-10K.json") redo_count = g2_engine.countRedoRecords() if redo_count: process_redo(g2_engine) else: - print('No redo records to process') + print("No redo records to process") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Redo/README.md b/Python/Tasks/Redo/README.md index 7b055db..23f78c4 100644 --- a/Python/Tasks/Redo/README.md +++ b/Python/Tasks/Redo/README.md @@ -10,17 +10,10 @@ When an entity requires additional work a record is automatically created in the * **Add10KWithRedo.py** * Read and load source records from a file and then process any redo records * **RedoContinuous.py** - * Basic example of continously monitoring for redo records to process + * Basic example of continuously monitoring for redo records to process * **RedoContinuousFutures.py** - * Continously monitor for redo records to process using concurrent futures and multiple threads + * Continuously monitor for redo records to process using concurrent futures and multiple threads * **RedoWithInfoContinuous.py** - * Continously monitor for redo records to process + * Continuously monitor for redo records to process * Collect the response from the [with info](../../../README.md#with-info) version of the API and write it to a file -## API Calls -* [getRedoRecord](../../../Python/APIs/G2Engine/Redo/getRedoRecord.py) - * Retrieve a single redo record for processing -* [process](../../../Python/APIs/G2Engine/Redo/process.py) - * Process a single redo record -* [processWithInfo](../../../Python/APIs/G2Engine/Redo/processWithInfo.py) - * Process a single redo record and returns information outlining any entities affected by the processing of the record. For further information see [With Info](../../../README.md#with-info) diff --git a/Python/Tasks/Redo/RedoContinuous.py b/Python/Tasks/Redo/RedoContinuous.py index d72ad9f..31bda02 100755 --- a/Python/Tasks/Redo/RedoContinuous.py +++ b/Python/Tasks/Redo/RedoContinuous.py @@ -3,19 +3,25 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def process_redo(engine): - success_recs = 0 + success_recs = error_recs = 0 while True: try: @@ -23,7 +29,10 @@ def process_redo(engine): engine.getRedoRecord(redo_record) if not redo_record: - print(f'No redo records to process, pausing for 30 seconds. Total processed {success_recs} . (CTRL-C to exit)...') + print( + "No redo records to process, pausing for 30 seconds. Total" + f" processed {success_recs:,} . (CTRL-C to exit)..." + ) time.sleep(30) continue @@ -31,21 +40,25 @@ def process_redo(engine): success_recs += 1 if success_recs % 100 == 0: - print(f'Processed {success_recs} redo records') - except G2BadInputException as ex: - mock_logger('ERROR', ex) - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print( + f"Processed {success_recs:,} redo records, with" + f" {error_recs:,} errors" + ) + except G2BadInputException as err: + mock_logger("ERROR", err) + error_recs += 1 + except G2RetryableException as err: + mock_logger("WARN", err) + error_recs += 1 + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err) raise try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) process_redo(g2_engine) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Redo/RedoContinuousFutures.py b/Python/Tasks/Redo/RedoContinuousFutures.py index 2f37794..6cef3b2 100755 --- a/Python/Tasks/Redo/RedoContinuousFutures.py +++ b/Python/Tasks/Redo/RedoContinuousFutures.py @@ -4,35 +4,56 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) -def process_redo(engine): - redo_record = bytearray() - engine.getRedoRecord(redo_record) - if not redo_record: - return None - engine.process(redo_record.decode()) - return redo_record +def get_redo_record(engine): + try: + redo_record = bytearray() + engine.getRedoRecord(redo_record) + except G2Exception as err: + mock_logger("CRITICAL", err) + raise err + + return redo_record.decode() + + +def prime_redo_records(engine, quantity): + redo_records = [] + for _ in range(quantity): + single_redo_rec = get_redo_record(engine) + if single_redo_rec: + redo_records.append(single_redo_rec) + return redo_records + + +def process_redo_record(engine, record): + engine.process(record) def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise @@ -40,17 +61,20 @@ def redo_count(engine): redo_recs = None try: redo_recs = engine.countRedoRecords() - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise return redo_recs -def redo_pause(success_recs): - print(f'No redo records to process, pausing for 30 seconds. Total processed: {success_recs} (CTRL-C to exit)...') +def redo_pause(success): + print( + "No redo records to process, pausing for 30 seconds. Total processed:" + f" {success:,} (CTRL-C to exit)..." + ) time.sleep(30) @@ -59,48 +83,69 @@ def futures_redo(engine): redo_paused = False with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(process_redo, engine): _ for _ in range(executor._max_workers)} - - while futures: - for f in concurrent.futures.as_completed(futures.keys()): + while True: + futures = { + executor.submit(process_redo_record, engine, record): record + for record in prime_redo_records(engine, executor._max_workers) + } + if not futures: + redo_pause(success_recs) + else: + break + + while True: + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: - result = f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + _ = f.result() + except G2BadInputException as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise else: - if result: - success_recs += 1 - - if success_recs % 100 == 0: - print(f'Processed {success_recs} redo records') - - if success_recs % 2000 == 0: - engine_stats(engine) + record = get_redo_record(engine) + if record: + futures[ + executor.submit(process_redo_record, engine, record) + ] = record else: redo_paused = True - finally: - futures.pop(f) - if redo_paused: - while not redo_count(engine): - redo_pause(success_recs) - redo_paused = False + success_recs += 1 + if success_recs % 100 == 0: + print( + f"Processed {success_recs:,} redo records, with" + f" {error_recs:,} errors" + ) + + if success_recs % 1000 == 0: + engine_stats(engine) + finally: + del futures[f] - futures[executor.submit(process_redo, engine)] = None + if redo_paused: + while not redo_count(engine): + redo_pause(success_recs) + redo_paused = False + while len(futures) < executor._max_workers: + record = get_redo_record(engine) + if record: + futures[ + executor.submit(process_redo_record, engine, record) + ] = record try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) futures_redo(g2_engine) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Redo/RedoWithInfoContinuous.py b/Python/Tasks/Redo/RedoWithInfoContinuous.py index 2249b7c..b8c553c 100755 --- a/Python/Tasks/Redo/RedoWithInfoContinuous.py +++ b/Python/Tasks/Redo/RedoWithInfoContinuous.py @@ -3,21 +3,28 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def process_redo(engine, output_file): - success_recs = 0 + success_recs = error_recs = 0 + redo_record = with_info = bytearray() - with open(output_file, 'w') as out_file: + with open(output_file, "w") as out_file: try: while True: redo_record = bytearray() @@ -25,31 +32,38 @@ def process_redo(engine, output_file): engine.getRedoRecord(redo_record) if not redo_record: - print(f'No redo records to process, pausing for 30 seconds. Total processed {success_recs} . (CTRL-C to exit)...') + print( + "No redo records to process, pausing for 30 seconds. Total" + f" processed {success_recs:,} . (CTRL-C to exit)..." + ) time.sleep(30) continue engine.processWithInfo(redo_record, with_info) success_recs += 1 - out_file.write(with_info.decode() + '\n') + out_file.write(f"{with_info.decode()}\n") if success_recs % 100 == 0: - print(f'Processed {success_recs} redo records') + print( + f"Processed {success_recs:,} redo records, with" + f" {error_recs:,} errors" + ) except G2BadInputException as ex: - mock_logger('ERROR', ex, redo_record) + mock_logger("ERROR", ex, redo_record) + error_recs += 1 except G2RetryableException as ex: - mock_logger('WARN', ex, redo_record) + mock_logger("WARN", ex, redo_record) + error_recs += 1 except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, redo_record) + mock_logger("CRITICAL", ex, redo_record) raise try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - process_redo(g2_engine, '../../../Resources/Output/Redo_WithInfo_Continuous.json') + g2_engine.init("G2Engine", engine_config_json, False) + process_redo(g2_engine, "../../../Resources/Output/Redo_WithInfo_Continuous.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Replacing/README.md b/Python/Tasks/Replacing/README.md index 6b93b51..db0e1bd 100644 --- a/Python/Tasks/Replacing/README.md +++ b/Python/Tasks/Replacing/README.md @@ -12,8 +12,3 @@ To replace an existing loaded record use the same data source code and record ID * **ReplaceRecords.py** * Basic iteration over a few records, replacing each one -## API Calls -* [replaceRecord](../../../Python/APIs/G2Engine/Data_Manipulation/replaceRecord.py) - * Replaces a single record -* [replaceRecordWithInfo](../../../Python/APIs/G2Engine/Data_Manipulation/replaceRecordWithInfo.py) - * Replaces a single record and returns information outlining any entities affected by the replacement of the record. For further information see [With Info](../../../README.md#with-info) diff --git a/Python/Tasks/Replacing/Replace5KWithInfoFutures.py b/Python/Tasks/Replacing/Replace5KWithInfoFutures.py index 478f16b..5565fae 100755 --- a/Python/Tasks/Replacing/Replace5KWithInfoFutures.py +++ b/Python/Tasks/Replacing/Replace5KWithInfoFutures.py @@ -6,40 +6,50 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def replace_record(engine, rec_to_replace): with_info = bytearray() record_dict = json.loads(rec_to_replace) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) - engine.replaceRecord(data_source, record_id, rec_to_replace) - return with_info.decode() + '\n' + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) + engine.replaceRecordWithInfo(data_source, record_id, rec_to_replace, with_info) + return with_info.decode() def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} replacements, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} replacements," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error:,} errors" + ) return time.time() @@ -47,54 +57,65 @@ def futures_replace(engine, input_file, output_file): prev_time = time.time() success_recs = error_recs = 0 - with open(output_file, 'w') as out_file: - with open(input_file, 'r') as in_file: + with open(output_file, "w") as out_file: + with open(input_file, "r") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(replace_record, engine, record): record for record in itertools.islice(in_file, executor._max_workers)} + futures = { + executor.submit(replace_record, engine, record): record + for record in itertools.islice(in_file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: result = f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 - out_file.write(result) + record = in_file.readline() + if record: + futures[ + executor.submit(replace_record, engine, record) + ] = record + out_file.write(f"{result}\n") + + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 5000 == 0: engine_stats(engine) finally: - futures.pop(f) + del futures[f] - record = in_file.readline() - if record: - futures[executor.submit(replace_record, engine, record)] = record - print(f'Successfully replaced {success_recs} records, with {error_recs} errors') - print(f'With info responses written to {output_file}') + print( + f"Successfully replaced {success_recs:,} records, with" + f" {error_recs:,} errors" + ) + print(f"With info responses written to {output_file}") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) futures_replace( g2_engine, - '../../../Resources/Data/replace-5K.json', - '../../../Resources/Output/Replace_File_WithInfo.json') + "../../../Resources/Data/replace-5K.json", + "../../../Resources/Output/Replace_File_WithInfo.json", + ) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Replacing/Replace5kFutures.py b/Python/Tasks/Replacing/Replace5kFutures.py index 4306487..5a65674 100755 --- a/Python/Tasks/Replacing/Replace5kFutures.py +++ b/Python/Tasks/Replacing/Replace5kFutures.py @@ -6,21 +6,27 @@ import os import sys import time -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def replace_record(engine, rec_to_replace): record_dict = json.loads(rec_to_replace) - data_source = record_dict.get('DATA_SOURCE', None) - record_id = record_dict.get('RECORD_ID', None) + data_source = record_dict.get("DATA_SOURCE", None) + record_id = record_dict.get("RECORD_ID", None) engine.replaceRecord(data_source, record_id, rec_to_replace) @@ -28,16 +34,20 @@ def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} replacements, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} replacements," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error:,} errors" + ) return time.time() @@ -45,49 +55,57 @@ def futures_replace(engine, input_file): prev_time = time.time() success_recs = error_recs = 0 - with open(input_file, 'r') as in_file: + with open(input_file, "r") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(replace_record, engine, record): record for record in itertools.islice(in_file, executor._max_workers)} + futures = { + executor.submit(replace_record, engine, record): record + for record in itertools.islice(in_file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 + record = in_file.readline() + if record: + futures[executor.submit(replace_record, engine, record)] = ( + record + ) + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 2500 == 0: engine_stats(engine) finally: - futures.pop(f) - - record = in_file.readline() - if record: - futures[executor.submit(replace_record, engine, record)] = record + del futures[f] - print(f'Successfully searched {success_recs} records, with {error_recs} errors') + print( + f"Successfully replaced {success_recs:,} records, with" + f" {error_recs:,} errors" + ) try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - futures_replace(g2_engine, '../../../Resources/Data/replace-5K.json') + g2_engine.init("G2Engine", engine_config_json, False) + futures_replace(g2_engine, "../../../Resources/Data/replace-5K.json") g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Replacing/ReplaceRecords.py b/Python/Tasks/Replacing/ReplaceRecords.py index 472d02e..849d724 100755 --- a/Python/Tasks/Replacing/ReplaceRecords.py +++ b/Python/Tasks/Replacing/ReplaceRecords.py @@ -1,59 +1,134 @@ #! /usr/bin/env python3 -import hashlib import json import os import sys -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) replace_records = [ - {"DATA_SOURCE": "TEST", "SOCIAL_HANDLE": "flavour", "DATE_OF_BIRTH": "4/8/1983", "ADDR_STATE": "LA", "ADDR_POSTAL_CODE": "71232", "SSN_NUMBER": "053-39-3251", "GENDER": "F", "srccode": "MDMPER", "CC_ACCOUNT_NUMBER": "5534202208773608", "RECORD_ID": "386820964", "ADDR_CITY": "Delhi", "DRIVERS_LICENSE_STATE": "DE", "PHONE_NUMBER": "225-671-9087", "NAME_LAST": "SEAMAN", "ADDR_LINE1": "772 Armstrong RD"}, - {"DATA_SOURCE": "TEST", "DATE_OF_BIRTH": "6/2/1952", "ADDR_STATE": "TX", "ADDR_POSTAL_CODE": "75215", "SSN_NUMBER": "501-27-9836", "GENDER": "M", "srccode": "MDMPER", "CC_ACCOUNT_NUMBER": "50185881568895705", "RECORD_ID": "181734352", "ADDR_CITY": "Dlalas", "DRIVERS_LICENSE_NUMBER": "V995121498988", "DRIVERS_LICENSE_STATE": "MD", "PHONE_NUMBER": "347-4506", "NAME_LAST": "THOMPSON", "ADDR_LINE1": "788 Alma ST"}, - {"DATA_SOURCE": "TEST", "DATE_OF_BIRTH": "5/5/1946", "ADDR_STATE": "TX", "ADDR_POSTAL_CODE": "75234", "SSN_NUMBER": "299-21-8788", "NAME_FIRST": "AUNA", "GENDER": "F", "srccode": "MDMPER", "RECORD_ID": "314249610", "ADDR_CITY": "DalXlas", "DRIVERS_LICENSE_STATE": "AL", "PHONE_NUMBER": "682-282-9435", "NAME_LAST": "HAUPTMAN", "ADDR_LINE1": "1438 Albemarle DR"}, - {"DATA_SOURCE": "TEST", "ADDR_STATE": "CA", "ADDR_POSTAL_CODE": "90012", "SSN_NUMBER": "202-09-1656", "NAME_FIRST": "JEREMY", "GENDER": "M", "srccode": "MDMPER", "CC_ACCOUNT_NUMBER": "374561958104783", "RECORD_ID": "399059018", "ADDR_CITY": "Los Angles", "DRIVERS_LICENSE_NUMBER": "419243052", "DRIVERS_LICENSE_STATE": "KO", "PHONE_NUMBER": "213-862-0665", "NAME_LAST": "WHITE", "ADDR_LINE1": "2292 1st ST"}, - {"DATA_SOURCE": "TEST", "ADDR_STATE": "NY", "ADDR_POSTAL_CODE": "14626", "NAME_FIRST": "KYLE", "GENDER": "M", "srccode": "MDMPER", "RECORD_ID": "441460361", "ADDR_CITY": "Rotchester", "DRIVERS_LICENSE_NUMBER": "928877314", "PHONE_NUMBER": "669-1853", "NAME_LAST": "WILLIAMS", "NAME_SUFFIX": "IV", "ADDR_LINE1": "1874 Brooks AVE"}, + { + "DATA_SOURCE": "TEST", + "SOCIAL_HANDLE": "flavour", + "DATE_OF_BIRTH": "4/8/1983", + "ADDR_STATE": "LA", + "ADDR_POSTAL_CODE": "71232", + "SSN_NUMBER": "053-39-3251", + "GENDER": "F", + "srccode": "MDMPER", + "CC_ACCOUNT_NUMBER": "5534202208773608", + "RECORD_ID": "386820964", + "ADDR_CITY": "Delhi", + "DRIVERS_LICENSE_STATE": "DE", + "PHONE_NUMBER": "225-671-9087", + "NAME_LAST": "SEAMAN", + "ADDR_LINE1": "772 Armstrong RD", + }, + { + "DATA_SOURCE": "TEST", + "DATE_OF_BIRTH": "6/2/1952", + "ADDR_STATE": "TX", + "ADDR_POSTAL_CODE": "75215", + "SSN_NUMBER": "501-27-9836", + "GENDER": "M", + "srccode": "MDMPER", + "CC_ACCOUNT_NUMBER": "50185881568895705", + "RECORD_ID": "181734352", + "ADDR_CITY": "Dlalas", + "DRIVERS_LICENSE_NUMBER": "V995121498988", + "DRIVERS_LICENSE_STATE": "MD", + "PHONE_NUMBER": "347-4506", + "NAME_LAST": "THOMPSON", + "ADDR_LINE1": "788 Alma ST", + }, + { + "DATA_SOURCE": "TEST", + "DATE_OF_BIRTH": "5/5/1946", + "ADDR_STATE": "TX", + "ADDR_POSTAL_CODE": "75234", + "SSN_NUMBER": "299-21-8788", + "NAME_FIRST": "AUNA", + "GENDER": "F", + "srccode": "MDMPER", + "RECORD_ID": "314249610", + "ADDR_CITY": "DalXlas", + "DRIVERS_LICENSE_STATE": "AL", + "PHONE_NUMBER": "682-282-9435", + "NAME_LAST": "HAUPTMAN", + "ADDR_LINE1": "1438 Albemarle DR", + }, + { + "DATA_SOURCE": "TEST", + "ADDR_STATE": "CA", + "ADDR_POSTAL_CODE": "90012", + "SSN_NUMBER": "202-09-1656", + "NAME_FIRST": "JEREMY", + "GENDER": "M", + "srccode": "MDMPER", + "CC_ACCOUNT_NUMBER": "374561958104783", + "RECORD_ID": "399059018", + "ADDR_CITY": "Los Angles", + "DRIVERS_LICENSE_NUMBER": "419243052", + "DRIVERS_LICENSE_STATE": "KO", + "PHONE_NUMBER": "213-862-0665", + "NAME_LAST": "WHITE", + "ADDR_LINE1": "2292 1st ST", + }, + { + "DATA_SOURCE": "TEST", + "ADDR_STATE": "NY", + "ADDR_POSTAL_CODE": "14626", + "NAME_FIRST": "KYLE", + "GENDER": "M", + "srccode": "MDMPER", + "RECORD_ID": "441460361", + "ADDR_CITY": "Rotchester", + "DRIVERS_LICENSE_NUMBER": "928877314", + "PHONE_NUMBER": "669-1853", + "NAME_LAST": "WILLIAMS", + "NAME_SUFFIX": "IV", + "ADDR_LINE1": "1874 Brooks AVE", + }, ] -original_rec_response = bytearray() -current_rec_response = bytearray() def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def replacer(engine): - for rec_to_replace in replace_records: try: - data_source = rec_to_replace.get('DATA_SOURCE', None) - record_id = rec_to_replace.get('RECORD_ID', None) - engine.getRecord(data_source, record_id, original_rec_response) + data_source = rec_to_replace.get("DATA_SOURCE", None) + record_id = rec_to_replace.get("RECORD_ID", None) engine.replaceRecord(data_source, record_id, json.dumps(rec_to_replace)) - engine.getRecord(data_source, record_id, current_rec_response) - except G2BadInputException as ex: - mock_logger('ERROR', ex, rec_to_replace) - except G2RetryableException as ex: - mock_logger('WARN', ex, rec_to_replace) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, rec_to_replace) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, rec_to_replace) + except G2RetryableException as err: + mock_logger("WARN", err, rec_to_replace) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, rec_to_replace) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, rec_to_replace) else: - print(f'\nRecord replaced - data source = {data_source} - record_id = {record_id}') - print(f' Original Record (Hash: {hashlib.md5(original_rec_response).hexdigest()}):\n {original_rec_response.decode()}') - print(f'\n Current Record (Hash: {hashlib.md5(current_rec_response).hexdigest()})\n {current_rec_response.decode()}') + print( + f"\nRecord replaced - data source = {data_source} - record id =" + f" {record_id}" + ) try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) replacer(g2_engine) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Searching/README.md b/Python/Tasks/Searching/README.md index 7f8e538..d91187a 100644 --- a/Python/Tasks/Searching/README.md +++ b/Python/Tasks/Searching/README.md @@ -6,10 +6,8 @@ There are [considerations](https://senzing.zendesk.com/hc/en-us/articles/3600078 ## Snippets * **Search5kFutures.py** * Read and search for records from a file using multiple threads + * To see results first load records with [Add10KFutures.py](../Loading/Add10KFutures.py) * **SearchRecords.py** * Basic iteration over a few records, searching for each one + * To see results first load records with [AddTruthSetLoop.py](../Loading/AddTruthsetLoop.py) -## API Calls -* [searchByAttributes](../../../Python/APIs/G2Engine/Search/searchByAttributes.py) - * Search for any existing matching entities - diff --git a/Python/Tasks/Searching/Search5kFutures.py b/Python/Tasks/Searching/Search5kFutures.py index e796ec8..b3d7920 100755 --- a/Python/Tasks/Searching/Search5kFutures.py +++ b/Python/Tasks/Searching/Search5kFutures.py @@ -7,15 +7,21 @@ import sys import time from collections import Counter -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def search_record(engine, rec_to_search): @@ -28,97 +34,126 @@ def engine_stats(engine): response = bytearray() try: engine.stats(response) - print(f'\n{response.decode()}\n') - except G2RetryableException as ex: - mock_logger('WARN', ex) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex) + print(f"\n{response.decode()}\n") + except G2RetryableException as err: + mock_logger("WARN", err) + except G2Exception as err: + mock_logger("CRITICAL", err) raise -def record_stats(success_recs, prev_time): - print(f'Processed {success_recs} searches, {int(1000 / (time.time() - prev_time))} records per second') +def record_stats(success, error, prev_time): + print( + f"Processed {success:,} searches," + f" {int(1000 / (time.time() - prev_time)):,} records per second," + f" {error} errors" + ) return time.time() def search_results(result, record, out_file): response_dict = json.loads(result.decode()) - response_entities = response_dict.get('RESOLVED_ENTITIES', None) + response_entities = response_dict.get("RESOLVED_ENTITIES", None) if response_entities: results_str = [] - results_count = Counter(k for entity in response_entities for k in entity.keys() if k.startswith('MATCH_INFO')) + results_count = Counter( + k + for entity in response_entities + for k in entity.keys() + if k.startswith("MATCH_INFO") + ) results_str.append(f'\n{results_count["MATCH_INFO"]} results for {record}') for idx, entity in enumerate(response_entities, start=1): - results_str.append(f'\n Result {idx}') - results_str.append(f'\n Entity ID: {entity["ENTITY"]["RESOLVED_ENTITY"]["ENTITY_ID"]}') - results_str.append(f'\n Entity name: {entity["ENTITY"]["RESOLVED_ENTITY"]["ENTITY_NAME"]}') - results_str.append(f'\n Match key: {entity["MATCH_INFO"]["MATCH_KEY"]}') - results_str.append('\n Records summary: ') + results_str.append(f"\n Result {idx}") + results_str.append( + "\n Entity ID: " + f" {entity['ENTITY']['RESOLVED_ENTITY']['ENTITY_ID']}" + ) + results_str.append( + "\n Entity name: " + f" {entity['ENTITY']['RESOLVED_ENTITY']['ENTITY_NAME']}" + ) + results_str.append( + f'\n Match key: {entity["MATCH_INFO"]["MATCH_KEY"]}' + ) + results_str.append("\n Records summary: ") for record_summary in entity["ENTITY"]["RESOLVED_ENTITY"]["RECORD_SUMMARY"]: - results_str.append(f'{record_summary["DATA_SOURCE"]}: {record_summary["RECORD_COUNT"]}' + ' ') - results_str.append('\n') + results_str.append( + f'{record_summary["DATA_SOURCE"]}: {record_summary["RECORD_COUNT"]}' + + " " + ) + results_str.append("\n") - out_file.write(''.join(results_str)) + out_file.write("".join(results_str)) else: - out_file.write(f'\nNo result for {record}\n') + out_file.write(f"\nNo result for {record}\n") def futures_search(engine, input_file, output_file): prev_time = time.time() success_recs = error_recs = 0 - with open(output_file, 'w') as out_file: - with open(input_file, 'r') as in_file: + with open(output_file, "w") as out_file: + with open(input_file, "r") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {executor.submit(search_record, engine, record): record for record in itertools.islice(in_file, executor._max_workers)} + futures = { + executor.submit(search_record, engine, record): record + for record in itertools.islice(in_file, executor._max_workers) + } while futures: - for f in concurrent.futures.as_completed(futures.keys()): + done, _ = concurrent.futures.wait( + futures, return_when=concurrent.futures.FIRST_COMPLETED + ) + for f in done: try: result = f.result() - except G2BadInputException as ex: - mock_logger('ERROR', ex, futures[f]) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, futures[f]) error_recs += 1 - except G2RetryableException as ex: - mock_logger('WARN', ex, futures[f]) + except G2RetryableException as err: + mock_logger("WARN", err, futures[f]) error_recs += 1 - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, futures[f]) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, futures[f]) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, futures[f]) - error_recs += 1 else: - success_recs += 1 + record = in_file.readline() + if record: + futures[ + executor.submit(search_record, engine, record) + ] = record + success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats(success_recs, prev_time) + prev_time = record_stats( + success_recs, error_recs, prev_time + ) if success_recs % 10000 == 0: engine_stats(engine) search_results(result, futures[f], out_file) finally: - futures.pop(f) - - record = in_file.readline() - if record: - futures[executor.submit(search_record, engine, record)] = record + del futures[f] - print(f'\nSuccessfully searched {success_recs} records, with {error_recs} errors') - print(f'Search results are located in: {output_file}') + print( + f"\nSuccessfully searched {success_recs:,} records, with" + f" {error_recs:,} errors" + ) + print(f"Search results are located in: {output_file}") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) futures_search( g2_engine, - '../../../Resources/Data/search-5K.json', - '../../../Resources/Output/search_file.out') + "../../../Resources/Data/search-5K.json", + "../../../Resources/Output/search_file.out", + ) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Searching/SearchRecords.py b/Python/Tasks/Searching/SearchRecords.py index fd0d71b..c8db57a 100755 --- a/Python/Tasks/Searching/SearchRecords.py +++ b/Python/Tasks/Searching/SearchRecords.py @@ -4,67 +4,103 @@ import os import sys from collections import Counter -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2BadInputException, + G2Engine, + G2Exception, + G2RetryableException, + G2UnrecoverableException, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) search_records = [ - {"NAME_FULL": "Susan Moony", "DATE_OF_BIRTH": "15/6/1998", "SSN_NUMBER": "521212123"}, - {"NAME_FIRST": "Robert", "NAME_LAST": "Smith", "ADDR_FULL": "123 Main Street Las Vegas NV 89132"}, - {"NAME_FIRST": "Makio", "NAME_LAST": "Yamanaka", "ADDR_FULL": "787 Rotary Drive Rotorville FL 78720"} + { + "NAME_FULL": "Susan Moony", + "DATE_OF_BIRTH": "15/6/1998", + "SSN_NUMBER": "521212123", + }, + { + "NAME_FIRST": "Robert", + "NAME_LAST": "Smith", + "ADDR_FULL": "123 Main Street Las Vegas NV 89132", + }, + { + "NAME_FIRST": "Makio", + "NAME_LAST": "Yamanaka", + "ADDR_FULL": "787 Rotary Drive Rotorville FL 78720", + }, ] def mock_logger(level, exception, error_rec=None): - print(f'\n{level}: {exception}', file=sys.stderr) + print(f"\n{level}: {exception}", file=sys.stderr) if error_rec: - print(f'{error_rec}', file=sys.stderr) + print(f"{error_rec}", file=sys.stderr) def searcher(engine): - for rec_to_search in search_records: try: search_response = bytearray() engine.searchByAttributes(json.dumps(rec_to_search), search_response) - except G2BadInputException as ex: - mock_logger('ERROR', ex, rec_to_search) - except G2RetryableException as ex: - mock_logger('WARN', ex, rec_to_search) - except (G2UnrecoverableException, G2Exception) as ex: - mock_logger('CRITICAL', ex, rec_to_search) + except (G2BadInputException, json.JSONDecodeError) as err: + mock_logger("ERROR", err, rec_to_search) + except G2RetryableException as err: + mock_logger("WARN", err, rec_to_search) + except (G2UnrecoverableException, G2Exception) as err: + mock_logger("CRITICAL", err, rec_to_search) raise - except json.JSONDecodeError as ex: - mock_logger('ERROR', ex, rec_to_search) else: response_dict = json.loads(search_response.decode()) - response_entities = response_dict.get('RESOLVED_ENTITIES', None) + response_entities = response_dict.get("RESOLVED_ENTITIES", None) if response_entities: results_str = [] - results_count = Counter(k for entity in response_entities for k in entity.keys() if k.startswith('MATCH_INFO')) - results_str.append(f'\n{results_count["MATCH_INFO"]} results for {json.dumps(rec_to_search)}\n') + results_count = Counter( + k + for entity in response_entities + for k in entity.keys() + if k.startswith("MATCH_INFO") + ) + results_str.append( + f'\n{results_count["MATCH_INFO"]} results for' + f" {json.dumps(rec_to_search)}\n" + ) for idx, result in enumerate(response_entities, start=1): - results_str.append(f'\n Result {idx}') - results_str.append(f'\n Entity ID: {result["ENTITY"]["RESOLVED_ENTITY"]["ENTITY_ID"]}') - results_str.append(f'\n Entity name: {result["ENTITY"]["RESOLVED_ENTITY"]["ENTITY_NAME"]}') - results_str.append(f'\n Match key: {result["MATCH_INFO"]["MATCH_KEY"]}') - results_str.append('\n Records summary: ') - for record_summary in result["ENTITY"]["RESOLVED_ENTITY"]["RECORD_SUMMARY"]: - results_str.append(f'{record_summary["DATA_SOURCE"]}: {record_summary["RECORD_COUNT"]}' + ' ') - results_str.append('\n') + results_str.append(f"\n Result {idx}") + results_str.append( + "\n Entity ID: " + f" {result['ENTITY']['RESOLVED_ENTITY']['ENTITY_ID']}" + ) + results_str.append( + "\n Entity name: " + f" {result['ENTITY']['RESOLVED_ENTITY']['ENTITY_NAME']}" + ) + results_str.append( + f'\n Match key: {result["MATCH_INFO"]["MATCH_KEY"]}' + ) + results_str.append("\n Records summary: ") + for record_summary in result["ENTITY"]["RESOLVED_ENTITY"][ + "RECORD_SUMMARY" + ]: + results_str.append( + f'{record_summary["DATA_SOURCE"]}:' + f' {record_summary["RECORD_COUNT"]}' + + " " + ) + results_str.append("\n") - print(''.join(results_str)) + print("".join(results_str)) else: - print(f'\nNo result for {json.dumps(rec_to_search)}\n') + print(f"\nNo result for {json.dumps(rec_to_search)}\n") try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) + g2_engine.init("G2Engine", engine_config_json, False) searcher(g2_engine) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + mock_logger("CRITICAL", err) diff --git a/Python/Tasks/Stewardship/ForceResolve.py b/Python/Tasks/Stewardship/ForceResolve.py index 367f3f2..9203679 100755 --- a/Python/Tasks/Stewardship/ForceResolve.py +++ b/Python/Tasks/Stewardship/ForceResolve.py @@ -3,62 +3,121 @@ import json import os import sys -from senzing import G2BadInputException, G2Engine, G2EngineFlags, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2Engine, + G2EngineFlags, + G2Exception, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) +get_ent_response = bytearray() +get1_rec_response = bytearray() +get2_rec_response = bytearray() +purge_msg = """ +********** WARNING ********** +This example will purge all currently loaded data from the senzing database! +Before proceeding, all instances of senzing (custom code, rest api, redoer, etc.) must be shut down. +********** WARNING ********** + +Are you sure you want to continue and purge the senzing database? (y/n) """ +response = bytearray() +records = [ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "1", + "PRIMARY_NAME_FULL": "Patrick Smith", + "AKA_NAME_FULL": "Paddy Smith", + "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "787-767-2688", + "DATE_OF_BIRTH": "1/12/1990", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "2", + "PRIMARY_NAME_FULL": "Patricia Smith", + "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "787-767-2688", + "DATE_OF_BIRTH": "5/4/1994", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "3", + "PRIMARY_NAME_FULL": "Pat Smith", + "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "787-767-2688", + }, +] + +if input(purge_msg) not in ["y", "Y", "yes", "YES"]: + sys.exit() try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - - records = [ - {"DATA_SOURCE": "TEST", "RECORD_ID": "1", "PRIMARY_NAME_FULL": "Patrick Smith", "AKA_NAME_FULL": "Paddy Smith", "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", "PHONE_NUMBER": "787-767-2688", "DATE_OF_BIRTH": "1/12/1990"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "2", "PRIMARY_NAME_FULL": "Patricia Smith", "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", "PHONE_NUMBER": "787-767-2688", "DATE_OF_BIRTH": "5/4/1994"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "3", "PRIMARY_NAME_FULL": "Pat Smith", "ADDR_FULL": "787 Rotary Dr, Rotorville, RI, 78720", "PHONE_NUMBER": "787-767-2688"}, - ] - get_ent_response = bytearray() - get1_rec_response = bytearray() - get2_rec_response = bytearray() + g2_engine.init("G2Engine", engine_config_json, False) g2_engine.purgeRepository() for record in records: - DATA_SOURCE = record['DATA_SOURCE'] - RECORD_ID = record['RECORD_ID'] - g2_engine.addRecord(DATA_SOURCE, RECORD_ID, json.dumps(record)) - print(f'Record {RECORD_ID} added') + data_source = record["DATA_SOURCE"] + record_id = record["RECORD_ID"] + g2_engine.addRecord(data_source, record_id, json.dumps(record)) + print(f"Record {record_id} added") - response = bytearray() - g2_engine.getEntityByRecordID('TEST', '3', response) + g2_engine.getEntityByRecordID("TEST", "3", response) response_json = json.loads(response.decode()) - print(f'\nEntity {response_json["RESOLVED_ENTITY"]["ENTITY_ID"]} - {response_json["RESOLVED_ENTITY"]["ENTITY_NAME"]} is related to:') + print( + f'\nEntity {response_json["RESOLVED_ENTITY"]["ENTITY_ID"]} -' + f' {response_json["RESOLVED_ENTITY"]["ENTITY_NAME"]} is currently related to:' + ) - for rel_entity in response_json['RELATED_ENTITIES']: - print(f' Entity {rel_entity["ENTITY_ID"]} - {rel_entity["ENTITY_NAME"]} as {rel_entity["MATCH_LEVEL_CODE"]} with {rel_entity["MATCH_KEY"]}') + for rel_entity in response_json["RELATED_ENTITIES"]: + print( + f' Entity {rel_entity["ENTITY_ID"]} - {rel_entity["ENTITY_NAME"]} as' + f' {rel_entity["MATCH_LEVEL_CODE"]} with {rel_entity["MATCH_KEY"]}' + ) print() - for RECORD_ID in ('1', '2', '3'): - g2_engine.getEntityByRecordID('TEST', RECORD_ID, get_ent_response, G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS) + for record_id in ("1", "2", "3"): + g2_engine.getEntityByRecordID( + "TEST", + record_id, + get_ent_response, + G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS, + ) get_json = json.loads(get_ent_response) - print(f'Record {RECORD_ID} currently resolves to entity {get_json["RESOLVED_ENTITY"]["ENTITY_ID"]}') + print( + f"Record {record_id} currently resolves to entity" + f" {get_json['RESOLVED_ENTITY']['ENTITY_ID']}" + ) - print('\nUpdating records...\n') - g2_engine.getRecord('TEST', '1', get1_rec_response) - g2_engine.getRecord('TEST', '3', get2_rec_response) + print("\nUpdating records...\n") + g2_engine.getRecord("TEST", "1", get1_rec_response) + g2_engine.getRecord("TEST", "3", get2_rec_response) get1_json = json.loads(get1_rec_response) get2_json = json.loads(get2_rec_response) - get1_json["JSON_DATA"].update({"TRUSTED_ID_NUMBER": "TEST_R1-TEST_R3", "TRUSTED_ID_TYPE": "FORCE_RESOLVE"}) - get2_json["JSON_DATA"].update({"TRUSTED_ID_NUMBER": "TEST_R1-TEST_R3", "TRUSTED_ID_TYPE": "FORCE_RESOLVE"}) - g2_engine.replaceRecord('TEST', '1', json.dumps(get1_json["JSON_DATA"])) - g2_engine.replaceRecord('TEST', '3', json.dumps(get2_json["JSON_DATA"])) + get1_json["JSON_DATA"].update( + {"TRUSTED_ID_NUMBER": "TEST_R1-TEST_R3", "TRUSTED_ID_TYPE": "FORCE_RESOLVE"} + ) + get2_json["JSON_DATA"].update( + {"TRUSTED_ID_NUMBER": "TEST_R1-TEST_R3", "TRUSTED_ID_TYPE": "FORCE_RESOLVE"} + ) + g2_engine.replaceRecord("TEST", "1", json.dumps(get1_json["JSON_DATA"])) + g2_engine.replaceRecord("TEST", "3", json.dumps(get2_json["JSON_DATA"])) - for RECORD_ID in ('1', '2', '3'): - g2_engine.getEntityByRecordID('TEST', RECORD_ID, get_ent_response, G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS) + for record_id in ("1", "2", "3"): + g2_engine.getEntityByRecordID( + "TEST", + record_id, + get_ent_response, + G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS, + ) get_json = json.loads(get_ent_response) - print(f'Record {RECORD_ID} now resolves to entity {get_json["RESOLVED_ENTITY"]["ENTITY_ID"]}') + print( + f"Record {record_id} now resolves to entity" + f" {get_json['RESOLVED_ENTITY']['ENTITY_ID']}" + ) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception, json.JSONDecodeError) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + print(err) diff --git a/Python/Tasks/Stewardship/ForceUnResolve.py b/Python/Tasks/Stewardship/ForceUnResolve.py index 713a5e2..39d08f3 100755 --- a/Python/Tasks/Stewardship/ForceUnResolve.py +++ b/Python/Tasks/Stewardship/ForceUnResolve.py @@ -3,53 +3,105 @@ import json import os import sys -from senzing import G2BadInputException, G2Engine, G2EngineFlags, G2Exception, G2RetryableException, G2UnrecoverableException +from senzing import ( + G2Engine, + G2EngineFlags, + G2Exception, +) -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) +get_ent_response = bytearray() +get1_rec_response = bytearray() +get2_rec_response = bytearray() +purge_msg = """ +********** WARNING ********** +This example will purge all currently loaded data from the senzing database! +Before proceeding, all instances of senzing (custom code, rest api, redoer, etc.) must be shut down. +********** WARNING ********** + +Are you sure you want to continue and purge the senzing database? (y/n) """ +records = [ + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "4", + "PRIMARY_NAME_FULL": "Elizabeth Jonas", + "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", + "SSN_NUMBER": "767-87-7678", + "DATE_OF_BIRTH": "1/12/1990", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "5", + "PRIMARY_NAME_FULL": "Beth Jones", + "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", + "SSN_NUMBER": "767-87-7678", + "DATE_OF_BIRTH": "1/12/1990", + }, + { + "DATA_SOURCE": "TEST", + "RECORD_ID": "6", + "PRIMARY_NAME_FULL": "Betsey Jones", + "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", + "PHONE_NUMBER": "202-787-7678", + }, +] + +if input(purge_msg) not in ["y", "Y", "yes", "YES"]: + sys.exit() try: g2_engine = G2Engine() - g2_engine.init('G2Engine', engine_config_json, False) - - records = [ - {"DATA_SOURCE": "TEST", "RECORD_ID": "4", "PRIMARY_NAME_FULL": "Elizabeth Jonas", "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", "SSN_NUMBER": "767-87-7678", "DATE_OF_BIRTH": "1/12/1990"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "5", "PRIMARY_NAME_FULL": "Beth Jones", "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", "SSN_NUMBER": "767-87-7678", "DATE_OF_BIRTH": "1/12/1990"}, - {"DATA_SOURCE": "TEST", "RECORD_ID": "6", "PRIMARY_NAME_FULL": "Betsey Jones", "ADDR_FULL": "202 Rotary Dr, Rotorville, RI, 78720", "PHONE_NUMBER": "202-787-7678"}, - ] - get_ent_response = bytearray() - get1_rec_response = bytearray() - get2_rec_response = bytearray() + g2_engine.init("G2Engine", engine_config_json, False) g2_engine.purgeRepository() for record in records: - DATA_SOURCE = record['DATA_SOURCE'] - RECORD_ID = record['RECORD_ID'] - g2_engine.addRecord(DATA_SOURCE, RECORD_ID, json.dumps(record)) - print(f'Record {RECORD_ID} added...') + data_source = record["DATA_SOURCE"] + record_id = record["RECORD_ID"] + g2_engine.addRecord(data_source, record_id, json.dumps(record)) + print(f"Record {record_id} added...") print() - for RECORD_ID in ('4', '5', '6'): - g2_engine.getEntityByRecordID('TEST', RECORD_ID, get_ent_response, G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS) + for record_id in ("4", "5", "6"): + g2_engine.getEntityByRecordID( + "TEST", + record_id, + get_ent_response, + G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS, + ) get_json = json.loads(get_ent_response) - print(f'Record {RECORD_ID} currently resolves to entity {get_json["RESOLVED_ENTITY"]["ENTITY_ID"]}') + print( + f"Record {record_id} currently resolves to entity" + f" {get_json['RESOLVED_ENTITY']['ENTITY_ID']}" + ) - print('\nUpdating records...\n') - g2_engine.getRecord('TEST', '4', get1_rec_response) - g2_engine.getRecord('TEST', '6', get2_rec_response) + print("\nUpdating records...\n") + g2_engine.getRecord("TEST", "4", get1_rec_response) + g2_engine.getRecord("TEST", "6", get2_rec_response) get1_json = json.loads(get1_rec_response) get2_json = json.loads(get2_rec_response) - get1_json["JSON_DATA"].update({"TRUSTED_ID_NUMBER": "TEST_R4-TEST_R6", "TRUSTED_ID_TYPE": "FORCE_UNRESOLVE"}) - get2_json["JSON_DATA"].update({"TRUSTED_ID_NUMBER": "TEST_R6-TEST_R4", "TRUSTED_ID_TYPE": "FORCE_UNRESOLVE"}) - g2_engine.replaceRecord('TEST', '4', json.dumps(get1_json["JSON_DATA"])) - g2_engine.replaceRecord('TEST', '6', json.dumps(get2_json["JSON_DATA"])) + get1_json["JSON_DATA"].update( + {"TRUSTED_ID_NUMBER": "TEST_R4-TEST_R6", "TRUSTED_ID_TYPE": "FORCE_UNRESOLVE"} + ) + get2_json["JSON_DATA"].update( + {"TRUSTED_ID_NUMBER": "TEST_R6-TEST_R4", "TRUSTED_ID_TYPE": "FORCE_UNRESOLVE"} + ) + g2_engine.replaceRecord("TEST", "4", json.dumps(get1_json["JSON_DATA"])) + g2_engine.replaceRecord("TEST", "6", json.dumps(get2_json["JSON_DATA"])) - for RECORD_ID in ('4', '5', '6'): - g2_engine.getEntityByRecordID('TEST', RECORD_ID, get_ent_response, G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS) + for record_id in ("4", "5", "6"): + g2_engine.getEntityByRecordID( + "TEST", + record_id, + get_ent_response, + G2EngineFlags.G2_ENTITY_BRIEF_DEFAULT_FLAGS, + ) get_json = json.loads(get_ent_response) - print(f'Record {RECORD_ID} now resolves to entity {get_json["RESOLVED_ENTITY"]["ENTITY_ID"]}') + print( + f"Record {record_id} now resolves to entity" + f" {get_json['RESOLVED_ENTITY']['ENTITY_ID']}" + ) g2_engine.destroy() -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception, json.JSONDecodeError) as ex: - print(ex) - sys.exit(-1) +except G2Exception as err: + print(err) diff --git a/Python/Tasks/Stewardship/README.md b/Python/Tasks/Stewardship/README.md index ec0d261..6a8a595 100644 --- a/Python/Tasks/Stewardship/README.md +++ b/Python/Tasks/Stewardship/README.md @@ -22,12 +22,3 @@ With additional knowledge not represented in Senzing you know record 3 "Pat Smit Force UnResolve first adds 3 records and details all records resolved to the same entity. With additional knowledge not represented in Senzing you know record 6 "Betsey Jones" is not the same as records 4 and 5; Betsey is a twin to "Elizabeth Jones". To force unresolve Betsey from the Elizabeth entity, first fetch the current representation of each record with getRecord. Next add `TRUSTED_ID_NUMBER` and `TRUSTED_ID_TYPE` attributes to each of the retrieved records. `TRUSTED_ID_NUMBER` uses a different value to indicate these records should always be considered different entities and not resolve together. In this example the data source of the records and their record IDs are used to create `TRUSTED_ID_NUMBER`. `TRUSTED_ID_TYPE` is set as FORCE_UNRESOLVE as an indicator they were forced apart. -## API Calls -* [addRecord](../../../Python/APIs/G2Engine/Data_Manipulation/addRecord.py) - * Add records for the example -* [getEntityByRecordID](../../../Python/APIs/G2Engine/Get/getEntityByRecordID.py) - * Get entity by the record ID to identify the entity the record resolved to -* [getRecord](../../../Python/APIs/G2Engine/Get/getRecord.py) - * Fetch the record to modify -* [replaceRecord](../../../Python/APIs/G2Engine/Data_Manipulation/replaceRecord.py) - * Replace the modified record diff --git a/README.md b/README.md index 00c69bb..148b188 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,11 @@ # code-snippets -## Synopsis - -Succinct examples for working with the Senzing SDK. - ## Overview -The snippets are divided into 2 categories: - -1) APIs - examples of calling the SDK APIs -2) Tasks - basic examples of how you might use the APIs for operational tasks - +Succinct examples of how you might use the Senzing APIs for operational tasks. ## Contents -1. [Preamble](#preamble) - 1. [Legend](#legend) +1. [Legend](#legend) 1. [Warning](#warning) 1. [Senzing Engine Configuration](#senzing-engine-configuration) 1. [Senzing APIs Bare Metal Usage](#senzing-apis-bare-metal-usage) @@ -31,18 +22,6 @@ The snippets are divided into 2 categories: 5. [Purging Senzing Repository Between Examples](#purging-senzing-repository-between-examples) 6. [Input Load File Sizes](#input-load-file-sizes) -## Preamble -Items of Note -At [Senzing](http://senzing.com), -we strive to create GitHub documentation in a -"[don't make me think](https://github.com/Senzing/knowledge-base/blob/main/WHATIS/dont-make-me-think.md)" style. -For the most part, instructions are copy and paste. -Whenever thinking is needed, it's marked with a "thinking" icon :thinking:. -Whenever customization is needed, it's marked with a "pencil" icon :pencil2:. -If the instructions are not clear, please let us know by opening a new -[Documentation issue](https://github.com/Senzing/template-python/issues/new?template=documentation_request.md) -describing where we can improve. Now on with the show... - ### Legend 1. :thinking: - A "thinker" icon means that a little extra thinking may be required. @@ -185,22 +164,22 @@ A feature of Senzing is the capability to pass changes from data manipulation AP The AFFECTED_ENTITIES object contains a list of all entity IDs affected. Separate processes can query the affected entities and synchronize changes and information to downstream systems. For additional information see [Real-time replication and analytics](https://senzing.zendesk.com/hc/en-us/articles/4417768234131--Advanced-Real-time-replication-and-analytics). ### Parallel Processing -Many of the example tasks demonstrate asynchronous execution with concurrent threads. The entity resolution process involves IO operations, the use of concurrent processes and threads when calling the Senzing APIs provides scalability and performance. If using multiple processes, each process should have its own instance of a Senzing engine, for example G2Engine. Each engine object can support multiple threads. +Many of the example tasks demonstrate concurrent execution with threads. The entity resolution process involves IO operations, the use of concurrent processes and threads when calling the Senzing APIs provides scalability and performance. If using multiple processes, each process should have its own instance of a Senzing engine, for example G2Engine. Each engine object can support multiple threads. ### Scalability Many of the examples demonstrate using multiple threads to utilize the resources available on the machine. Consider loading data into Senzing and increasing the load rate, loading (and other tasks) can be horizontally scaled by utilizing additional machines. If a single very large load file and 3 machines were available for performing data load, the file can be split into 3 with each machine running the sample code or your own application. Horizontal scaling such as this does require the Senzing database to have the capacity to accept the additional workload and not become the bottleneck. -### Randomize Input Files -When providing your own input file(s) to the snippets or your own applications and processing data manipulation tasks (adding, deleting, replacing), it is important to randomize the file(s) when running multiple threads. If source records that pertain to the same entity are clustered together in the input file, multiple processes or threads could all be trying to work on the same entity concurrently. This causes contention and overhead resulting in slower performance. To prevent this contention always randomize input files. +### Randomize Input Data +When providing your own input file(s) to the snippets or your own applications and processing data manipulation tasks (adding, deleting, replacing), it is important to randomize the file(s) or other input methods when running multiple threads. If source records that pertain to the same entity are clustered together, multiple processes or threads could all be trying to work on the same entity concurrently. This causes contention and overhead resulting in slower performance. To prevent this contention always randomize input data. You may be able to randomize your input files during ETL and mapping the source data to the [Senzing Entity Specification](https://senzing.zendesk.com/hc/en-us/articles/231925448-Generic-Entity-Specification). Otherwise utilities such as [shuf](https://man7.org/linux/man-pages/man1/shuf.1.html) or [terashuf](https://github.com/alexandres/terashuf) for large files can be used. ### Purging Senzing Repository Between Examples When trying out different examples you may notice consecutive tasks complete much faster than an initial run. For example, running a loading task for the first time without the data in the system will be representative of load rate. If the same example is subsequently run again without purging the system it will complete much faster. This is because Senzing knows the records already exist in the system and it skips them. -To run the same example again and see representative performance, first [purge](Python/APIs/G2Engine/Purge/) the Senzing repository of the loaded data. Some examples don't require purging between running them, an example would be the deleting examples that require data to be ingested first. See the usage notes for each task category for an overview of how to use the snippets. +To run the same example again and see representative performance, first [purge](Python/Tasks/Initialization/PurgeRepository.py) the Senzing repository of the loaded data. Some examples don't require purging between running them, an example would be the deleting examples that require data to be ingested first. See the usage notes for each task category for an overview of how to use the snippets. ### Input Load File Sizes -There are different sized load files within the [Data](Resources/Data/) path that can be used to decrease or increase the volume of data loaded depending on the specification of your hardware. The files are named load-x.json, where the x specifies the number of records in the file. +There are different sized load files within the [Data](Resources/Data/) path that can be used to decrease or increase the volume of data loaded depending on the specification of your hardware. The files are named loadx.json, where the x specifies the number of records in the file.