From 7bc8ebe434176545378bb0a8e017546a2cd34bf8 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Thu, 19 Oct 2023 16:50:16 +0200 Subject: [PATCH 01/17] Enceladus mapping script: init, no service used --- scripts/migration/dataset_paths_to_ecs.py | 177 ++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 scripts/migration/dataset_paths_to_ecs.py diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py new file mode 100644 index 000000000..ed69751dd --- /dev/null +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 + +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +from typing import List + +from constants import * +from menas_db import MenasDb, MenasDbCollectionError + +# python package needed are denoted in requirements.txt, so to fix missing dependencies, just run +# pip install -r requirements.txt + + +DEFAULT_MAPPING_SERVICE_URL = "xxx" +# Example usage of the service: +# curl -X GET -d '{"hdfs_path":"/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN"}' 'https://my_service.amazonaws.com/dev/map' +# {"ecs_path": "ursamajor123-abs1234-prod-edla-abc123-ke/publish/CNSMR_ACCNT/country_code=KEN/"} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog='dataset_paths_to_ecs', + description='Menas MongoDB path changes to ECS', + formatter_class=argparse.ArgumentDefaultsHelpFormatter # prints default values, too, on help (-h) + ) + + parser.add_argument('-n', '--dryrun', action='store_true', default=DEFAULT_DRYRUN, + help="if specified, skip the actual changes, just print what would be done.") + parser.add_argument('-v', '--verbose', action="store_true", default=DEFAULT_VERBOSE, + help="prints extra information while running.") + + parser.add_argument('target', metavar="TARGET", help="connection string for target MongoDB") + parser.add_argument('-t', '--target-database', dest="targetdb", default=DEFAULT_DB_NAME, + help="Name of db on target to be affected.") + + parser.add_argument('-s', '--mapping-service', dest="mappingservice", default=DEFAULT_MAPPING_SERVICE_URL, + help="Service to use for path change mapping.") + + parser.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], + nargs="+", help='list datasets names to change paths in') + # todo not used now + parser.add_argument('-m', '--mapping-tables', dest="mtables", metavar="MTABLE_NAME", default=[], + nargs="+", help='list mapping tables names to change paths in') + + return parser.parse_args() + + +def pathchange_datasets(target_db: MenasDb, collection_name: str, dataset_names_list: List[str], dryrun:bool) -> None: + if not dataset_names_list: + print("No datasets to path-change in {}, skipping.".format(collection_name)) + return + + print("Path changing of collection {} started".format(collection_name)) + dataset_collection = target_db.mongodb[collection_name] + + query = {"name": {"$in": dataset_names_list}} # dataset name + + docs_count = dataset_collection.count_documents(query) + docs = dataset_collection.find(query) + + print("Found: {} documents for the path change. In progress ... ".format(docs_count)) + + patched = 0 + failed_count = 0 + for item in docs: + # item preview + if verbose: + print("Changing paths for dataset {} v{} (_id={}).".format(item["name"], item["version"], item["_id"])) + + hdfs_path = item["hdfsPath"] + hdfs_publish_path = item["hdfsPublishPath"] + + # todo updated: + updated_hdfs_path = hdfs_path + "aaa" + updated_hdfs_publish_path = hdfs_publish_path + "bbb" + + if dryrun: + print(" *would set* hdfsPath: {} -> {}".format(hdfs_path, updated_hdfs_path)) + print(" *would set* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) + print("") + + else: + try: + if verbose: + print(" *changing* hdfsPath: {} -> {}".format(hdfs_path, updated_hdfs_path)) + print(" *changing* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) + + update_result = dataset_collection.update_one( + {"_id": item["_id"]}, + {"$set": { + "hdfsPath": updated_hdfs_path, + "hdfsPublishPath": updated_hdfs_publish_path, + "bakHdfsPath": hdfs_path, + "bakHdfsPublishPath": hdfs_publish_path + # todo add migration tag to properties map + }} + ) + if update_result.acknowledged: + if verbose: + print("Successfully changed path for dataset {} v{} (_id={}).".format(item["name"], item["version"], item["_id"])) + print("") + + except Exception as e: + print("Warning: Error while changing paths for dataset {} v{} (_id={}): {}".format(item["name"], item["version"], item["_id"], e)) + failed_count += 1 + else: + patched += 1 + + print("Successfully migrated {} of {} entries, failed: {}".format(patched, docs_count, failed_count)) + + +def pathchange_collections_by_ds_names(target_db: MenasDb, + supplied_ds_names: List[str], + dryrun: bool) -> None: + + if verbose: + print("Dataset names given: {}".format(supplied_ds_names)) + + ds_names_found = target_db.get_distinct_ds_names_from_ds_names(supplied_ds_names, migration_free_only=False) + print('Dataset names to path change (actually found db): {}'.format(ds_names_found)) + + + print("") + pathchange_datasets(target_db, DATASET_COLLECTION, ds_names_found, dryrun) + +def run(parsed_args: argparse.Namespace): + target_conn_string = parsed_args.target + target_db_name = parsed_args.targetdb + + dryrun = args.dryrun # if set, only path change description will be printed, no actual patching will run + mapping_service = args.mappingservice + + print('Menas mongo ECS paths mapping') + print('Running with settings: dryrun={}, verbose={}'.format(dryrun, verbose)) + print("Using mapping service at: {}".format(mapping_service)) + print(' target connection-string: {}'.format(target_conn_string)) + print(' target DB: {}'.format(target_db_name)) + + target_db = MenasDb.from_connection_string(target_conn_string, target_db_name, alias="target db", verbose=verbose) + + # todo could be used for real menas + # Checks raise MenasDbErrors + # print("Checking target db validity...") + # target_db.check_db_version() + # target_db.check_menas_collections_exist() + + dataset_names = parsed_args.datasets + + # todo do remapping for mapping tables, too + # mt_names = parsed_args.mtables + pathchange_collections_by_ds_names(target_db, dataset_names, dryrun=dryrun) + + print("Done.") + + +if __name__ == '__main__': + args = parse_args() + + # globals script vars + verbose = args.verbose + run(args) + + # example test-runs: + # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -v -d DM9_actn_Cd -t menas_remap_test From 4d6c6cd8da299da55b7e2de7042549f2cb13547a Mon Sep 17 00:00:00 2001 From: Daniel K Date: Thu, 19 Oct 2023 17:20:03 +0200 Subject: [PATCH 02/17] Enceladus mapping script: url work, prefix s3a:// added --- scripts/migration/dataset_paths_to_ecs.py | 38 +++++++++++++++++++---- scripts/migration/requirements.txt | 1 + 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index ed69751dd..0234dd79b 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -19,6 +19,7 @@ from constants import * from menas_db import MenasDb, MenasDbCollectionError +import requests # python package needed are denoted in requirements.txt, so to fix missing dependencies, just run # pip install -r requirements.txt @@ -29,6 +30,7 @@ # curl -X GET -d '{"hdfs_path":"/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN"}' 'https://my_service.amazonaws.com/dev/map' # {"ecs_path": "ursamajor123-abs1234-prod-edla-abc123-ke/publish/CNSMR_ACCNT/country_code=KEN/"} +DEFAULT_MAPPING_PREFIX = "s3a://" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( @@ -49,6 +51,9 @@ def parse_args() -> argparse.Namespace: parser.add_argument('-s', '--mapping-service', dest="mappingservice", default=DEFAULT_MAPPING_SERVICE_URL, help="Service to use for path change mapping.") + parser.add_argument('-p', '--mapping-prefix', dest="mappingprefix", default=DEFAULT_MAPPING_PREFIX, + help="Default mapping prefix to be applied for paths") + parser.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], nargs="+", help='list datasets names to change paths in') # todo not used now @@ -57,8 +62,23 @@ def parse_args() -> argparse.Namespace: return parser.parse_args() +def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str: + # session = requests.Session() + #response = session.post(url, auth=basic_auth, verify=http_verify) + + payload = "{\"hdfs_path\":\"" + path + "\"}" + response = requests.get(svc_url, data=payload) + + if response.status_code != 200: + raise Exception(f"Could load ECS path from {svc_url}, received error {response.status_code} {response.text}") + + wrapper = response.json() + ecs_path = wrapper['ecs_path'] -def pathchange_datasets(target_db: MenasDb, collection_name: str, dataset_names_list: List[str], dryrun:bool) -> None: + return path_prefix_to_add + ecs_path + +def pathchange_datasets(target_db: MenasDb, collection_name: str, dataset_names_list: List[str], + mapping_svc_url: str, mapping_prefix: str, dryrun:bool) -> None: if not dataset_names_list: print("No datasets to path-change in {}, skipping.".format(collection_name)) return @@ -83,9 +103,8 @@ def pathchange_datasets(target_db: MenasDb, collection_name: str, dataset_names_ hdfs_path = item["hdfsPath"] hdfs_publish_path = item["hdfsPublishPath"] - # todo updated: - updated_hdfs_path = hdfs_path + "aaa" - updated_hdfs_publish_path = hdfs_publish_path + "bbb" + updated_hdfs_path = map_path_from_svc(hdfs_path, mapping_prefix, mapping_svc_url,) + updated_hdfs_publish_path = map_path_from_svc(hdfs_publish_path, mapping_prefix, mapping_svc_url) if dryrun: print(" *would set* hdfsPath: {} -> {}".format(hdfs_path, updated_hdfs_path)) @@ -124,6 +143,8 @@ def pathchange_datasets(target_db: MenasDb, collection_name: str, dataset_names_ def pathchange_collections_by_ds_names(target_db: MenasDb, supplied_ds_names: List[str], + mapping_svc_url: str, + mapping_prefix: str, dryrun: bool) -> None: if verbose: @@ -134,7 +155,7 @@ def pathchange_collections_by_ds_names(target_db: MenasDb, print("") - pathchange_datasets(target_db, DATASET_COLLECTION, ds_names_found, dryrun) + pathchange_datasets(target_db, DATASET_COLLECTION, ds_names_found, mapping_svc_url, mapping_prefix, dryrun) def run(parsed_args: argparse.Namespace): target_conn_string = parsed_args.target @@ -142,6 +163,7 @@ def run(parsed_args: argparse.Namespace): dryrun = args.dryrun # if set, only path change description will be printed, no actual patching will run mapping_service = args.mappingservice + mapping_prefix = args.mappingprefix print('Menas mongo ECS paths mapping') print('Running with settings: dryrun={}, verbose={}'.format(dryrun, verbose)) @@ -159,9 +181,13 @@ def run(parsed_args: argparse.Namespace): dataset_names = parsed_args.datasets + # debug # todo remove + # print("res" + map_path_from_svc("/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN", mapping_service)) + # todo do remapping for mapping tables, too # mt_names = parsed_args.mtables - pathchange_collections_by_ds_names(target_db, dataset_names, dryrun=dryrun) + + pathchange_collections_by_ds_names(target_db, dataset_names, mapping_service, mapping_prefix, dryrun=dryrun) print("Done.") diff --git a/scripts/migration/requirements.txt b/scripts/migration/requirements.txt index 9aa9e6fac..547411cdd 100644 --- a/scripts/migration/requirements.txt +++ b/scripts/migration/requirements.txt @@ -1 +1,2 @@ pymongo>=4.0.1 +requests==2.31.0 From c5da790b87e22c5a92b3d0e437355a888840dedb Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 10:07:18 +0200 Subject: [PATCH 03/17] Enceladus mapping script: mapping ds+tied mts --- scripts/migration/dataset_paths_to_ecs.py | 73 +++++++++++++---------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 0234dd79b..8898787ed 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -18,7 +18,7 @@ from typing import List from constants import * -from menas_db import MenasDb, MenasDbCollectionError +from menas_db import MenasDb import requests # python package needed are denoted in requirements.txt, so to fix missing dependencies, just run @@ -77,68 +77,80 @@ def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str: return path_prefix_to_add + ecs_path -def pathchange_datasets(target_db: MenasDb, collection_name: str, dataset_names_list: List[str], - mapping_svc_url: str, mapping_prefix: str, dryrun:bool) -> None: - if not dataset_names_list: - print("No datasets to path-change in {}, skipping.".format(collection_name)) +def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: str, entity_names_list: List[str], + mapping_svc_url: str, mapping_prefix: str, dryrun: bool) -> None: + + assert entity_name == "dataset" or entity_name == "mapping table" , "this method supports datasets and MTs only!" + + if not entity_names_list: + print("No {}s to path-change in {}, skipping.".format(entity_name, collection_name)) return print("Path changing of collection {} started".format(collection_name)) dataset_collection = target_db.mongodb[collection_name] - query = {"name": {"$in": dataset_names_list}} # dataset name + query = {"name": {"$in": entity_names_list}} # dataset/MT name docs_count = dataset_collection.count_documents(query) docs = dataset_collection.find(query) - print("Found: {} documents for the path change. In progress ... ".format(docs_count)) + print("Found: {} {} documents for the path change. In progress ... ".format(docs_count, entity_name)) patched = 0 failed_count = 0 for item in docs: # item preview if verbose: - print("Changing paths for dataset {} v{} (_id={}).".format(item["name"], item["version"], item["_id"])) + print("Changing paths for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) + # common logic for datasets and mapping tables, but MTs do not have hdfsPublishPath hdfs_path = item["hdfsPath"] - hdfs_publish_path = item["hdfsPublishPath"] + updated_hdfs_path = map_path_from_svc(hdfs_path, mapping_prefix, mapping_svc_url) - updated_hdfs_path = map_path_from_svc(hdfs_path, mapping_prefix, mapping_svc_url,) - updated_hdfs_publish_path = map_path_from_svc(hdfs_publish_path, mapping_prefix, mapping_svc_url) + has_hdfs_publish_path = "hdfsPublishPath" in item + if has_hdfs_publish_path: + hdfs_publish_path = item["hdfsPublishPath"] + updated_hdfs_publish_path = map_path_from_svc(hdfs_publish_path, mapping_prefix, mapping_svc_url) if dryrun: print(" *would set* hdfsPath: {} -> {}".format(hdfs_path, updated_hdfs_path)) - print(" *would set* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) + if has_hdfs_publish_path: + print(" *would set* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) print("") else: try: if verbose: print(" *changing* hdfsPath: {} -> {}".format(hdfs_path, updated_hdfs_path)) - print(" *changing* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) + + update_data = { + "hdfsPath": updated_hdfs_path, + "bakHdfsPath": hdfs_path + # todo add migration tag to properties map + } + + if has_hdfs_publish_path: + update_data["hdfsPublishPath"] = updated_hdfs_publish_path + update_data["bakHdfsPublishPath"] = hdfs_publish_path + if verbose: + print(" *changing* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) update_result = dataset_collection.update_one( {"_id": item["_id"]}, - {"$set": { - "hdfsPath": updated_hdfs_path, - "hdfsPublishPath": updated_hdfs_publish_path, - "bakHdfsPath": hdfs_path, - "bakHdfsPublishPath": hdfs_publish_path - # todo add migration tag to properties map - }} + {"$set": update_data} ) - if update_result.acknowledged: - if verbose: - print("Successfully changed path for dataset {} v{} (_id={}).".format(item["name"], item["version"], item["_id"])) - print("") + if update_result.acknowledged and verbose: + print("Successfully changed path for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) + print("") except Exception as e: - print("Warning: Error while changing paths for dataset {} v{} (_id={}): {}".format(item["name"], item["version"], item["_id"], e)) + print("Warning: Error while changing paths for {} '{}' v{} (_id={}): {}".format(entity_name, item["name"], item["version"], item["_id"], e)) failed_count += 1 else: patched += 1 - print("Successfully migrated {} of {} entries, failed: {}".format(patched, docs_count, failed_count)) + print("Successfully migrated {} of {} {} entries, failed: {}".format(patched, docs_count, entity_name, failed_count)) + print("") def pathchange_collections_by_ds_names(target_db: MenasDb, @@ -153,9 +165,13 @@ def pathchange_collections_by_ds_names(target_db: MenasDb, ds_names_found = target_db.get_distinct_ds_names_from_ds_names(supplied_ds_names, migration_free_only=False) print('Dataset names to path change (actually found db): {}'.format(ds_names_found)) + mapping_table_found_for_dss = target_db.get_distinct_mapping_tables_from_ds_names(ds_names_found, migration_free_only=False) + print('MTs to path change: {}'.format(mapping_table_found_for_dss)) print("") - pathchange_datasets(target_db, DATASET_COLLECTION, ds_names_found, mapping_svc_url, mapping_prefix, dryrun) + pathchange_entities(target_db, DATASET_COLLECTION, "dataset", ds_names_found, mapping_svc_url, mapping_prefix, dryrun) + pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_svc_url, mapping_prefix, dryrun) + def run(parsed_args: argparse.Namespace): target_conn_string = parsed_args.target @@ -184,9 +200,6 @@ def run(parsed_args: argparse.Namespace): # debug # todo remove # print("res" + map_path_from_svc("/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN", mapping_service)) - # todo do remapping for mapping tables, too - # mt_names = parsed_args.mtables - pathchange_collections_by_ds_names(target_db, dataset_names, mapping_service, mapping_prefix, dryrun=dryrun) print("Done.") From bea9c43eb572ccc2efd12201755ba140927da2ef Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 10:27:25 +0200 Subject: [PATCH 04/17] Enceladus mapping script: `pathChanged=true` used for patched marking --- scripts/migration/dataset_paths_to_ecs.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 8898787ed..afc12e693 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -32,6 +32,8 @@ DEFAULT_MAPPING_PREFIX = "s3a://" +PATH_CHANGE_FREE_MONGO_FILTER = {"pathChanged": {"$exists": False}} + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog='dataset_paths_to_ecs', @@ -89,7 +91,10 @@ def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: s print("Path changing of collection {} started".format(collection_name)) dataset_collection = target_db.mongodb[collection_name] - query = {"name": {"$in": entity_names_list}} # dataset/MT name + query = {"$and": [ + {"name": {"$in": entity_names_list}}, # dataset/MT name + PATH_CHANGE_FREE_MONGO_FILTER + ]} docs_count = dataset_collection.count_documents(query) docs = dataset_collection.find(query) @@ -125,8 +130,8 @@ def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: s update_data = { "hdfsPath": updated_hdfs_path, - "bakHdfsPath": hdfs_path - # todo add migration tag to properties map + "bakHdfsPath": hdfs_path, + "pathChanged": True } if has_hdfs_publish_path: @@ -136,7 +141,10 @@ def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: s print(" *changing* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) update_result = dataset_collection.update_one( - {"_id": item["_id"]}, + {"$and": [ + {"_id": item["_id"]}, + PATH_CHANGE_FREE_MONGO_FILTER + ]}, {"$set": update_data} ) if update_result.acknowledged and verbose: From 8fce7b74ab2c644122f2d190fe376e616447d7d9 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 10:28:33 +0200 Subject: [PATCH 05/17] Enceladus mapping script: default mapping service placeholder --- scripts/migration/dataset_paths_to_ecs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index afc12e693..924296af3 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -25,7 +25,7 @@ # pip install -r requirements.txt -DEFAULT_MAPPING_SERVICE_URL = "xxx" +DEFAULT_MAPPING_SERVICE_URL = "https://set-your-mapping-service-here.execute-api.af-south-1.amazonaws.com/dev/map" # Example usage of the service: # curl -X GET -d '{"hdfs_path":"/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN"}' 'https://my_service.amazonaws.com/dev/map' # {"ecs_path": "ursamajor123-abs1234-prod-edla-abc123-ke/publish/CNSMR_ACCNT/country_code=KEN/"} From 36fee2ff119cb8bf25816c4f6066dda1a2cfa6cf Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 10:34:52 +0200 Subject: [PATCH 06/17] Enceladus mapping script: examples --- scripts/migration/dataset_paths_to_ecs.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 924296af3..a114d59cd 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -220,5 +220,8 @@ def run(parsed_args: argparse.Namespace): verbose = args.verbose run(args) - # example test-runs: - # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -v -d DM9_actn_Cd -t menas_remap_test + ## Examples runs: + # Dry-run example: + # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d DM9_actn_Cd -t menas_remap_test -n -s https://my_service.amazonaws.com/dev/map + # Verbose run example, will use DEFAULT_MAPPING_SERVICE_URL on line 28: + # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d XMSK083 -t menas_remap_test -v \ No newline at end of file From 5489354ebac326000105510c864fe1ff39724417 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 10:39:05 +0200 Subject: [PATCH 07/17] Enceladus mapping script: cleanup --- scripts/migration/dataset_paths_to_ecs.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index a114d59cd..e74bbfd7f 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -58,9 +58,6 @@ def parse_args() -> argparse.Namespace: parser.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], nargs="+", help='list datasets names to change paths in') - # todo not used now - parser.add_argument('-m', '--mapping-tables', dest="mtables", metavar="MTABLE_NAME", default=[], - nargs="+", help='list mapping tables names to change paths in') return parser.parse_args() @@ -197,17 +194,7 @@ def run(parsed_args: argparse.Namespace): target_db = MenasDb.from_connection_string(target_conn_string, target_db_name, alias="target db", verbose=verbose) - # todo could be used for real menas - # Checks raise MenasDbErrors - # print("Checking target db validity...") - # target_db.check_db_version() - # target_db.check_menas_collections_exist() - dataset_names = parsed_args.datasets - - # debug # todo remove - # print("res" + map_path_from_svc("/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN", mapping_service)) - pathchange_collections_by_ds_names(target_db, dataset_names, mapping_service, mapping_prefix, dryrun=dryrun) print("Done.") From 2b5fecf7556ee7a7d1877f10931aab0969552955 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 10:58:02 +0200 Subject: [PATCH 08/17] Enceladus mapping script: cleanup --- scripts/migration/dataset_paths_to_ecs.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index e74bbfd7f..a2d578040 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -26,9 +26,6 @@ DEFAULT_MAPPING_SERVICE_URL = "https://set-your-mapping-service-here.execute-api.af-south-1.amazonaws.com/dev/map" -# Example usage of the service: -# curl -X GET -d '{"hdfs_path":"/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN"}' 'https://my_service.amazonaws.com/dev/map' -# {"ecs_path": "ursamajor123-abs1234-prod-edla-abc123-ke/publish/CNSMR_ACCNT/country_code=KEN/"} DEFAULT_MAPPING_PREFIX = "s3a://" @@ -62,8 +59,9 @@ def parse_args() -> argparse.Namespace: return parser.parse_args() def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str: - # session = requests.Session() - #response = session.post(url, auth=basic_auth, verify=http_verify) + # Example usage of the service: + # curl -X GET -d '{"hdfs_path":"/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN"}' 'https://my_service.amazonaws.com/dev/map' + # {"ecs_path": "ursamajor123-abs1234-prod-edla-abc123-ke/publish/CNSMR_ACCNT/country_code=KEN/"} payload = "{\"hdfs_path\":\"" + path + "\"}" response = requests.get(svc_url, data=payload) From 8c2e09d8166916754b90959fb7fc879af15582c5 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 10:58:44 +0200 Subject: [PATCH 09/17] Enceladus mapping script: cleanup --- scripts/migration/dataset_paths_to_ecs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index a2d578040..ed3b5f2c4 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -209,4 +209,4 @@ def run(parsed_args: argparse.Namespace): # Dry-run example: # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d DM9_actn_Cd -t menas_remap_test -n -s https://my_service.amazonaws.com/dev/map # Verbose run example, will use DEFAULT_MAPPING_SERVICE_URL on line 28: - # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d XMSK083 -t menas_remap_test -v \ No newline at end of file + # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d XMSK083 -t menas_remap_test -v From c2bce3150eaba3b1fa028654e126fcbad32fca75 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Fri, 20 Oct 2023 14:57:16 +0200 Subject: [PATCH 10/17] Enceladus mapping script: comments update --- scripts/migration/constants.py | 2 +- scripts/migration/dataset_paths_to_ecs.py | 2 +- scripts/migration/initialize_menas.py | 2 +- scripts/migration/menas_db.py | 2 +- scripts/migration/migrate_menas.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/migration/constants.py b/scripts/migration/constants.py index 21eca2bb7..e16b58698 100644 --- a/scripts/migration/constants.py +++ b/scripts/migration/constants.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright 2018 ABSA Group Limited +# Copyright 2022 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index ed3b5f2c4..287eada9f 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright 2018 ABSA Group Limited +# Copyright 2023 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/scripts/migration/initialize_menas.py b/scripts/migration/initialize_menas.py index 74dc2f405..1272896bb 100644 --- a/scripts/migration/initialize_menas.py +++ b/scripts/migration/initialize_menas.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright 2018 ABSA Group Limited +# Copyright 2022 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/scripts/migration/menas_db.py b/scripts/migration/menas_db.py index 8bc761e83..7cd675f8f 100644 --- a/scripts/migration/menas_db.py +++ b/scripts/migration/menas_db.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright 2018 ABSA Group Limited +# Copyright 2022 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/scripts/migration/migrate_menas.py b/scripts/migration/migrate_menas.py index f6f99943b..f393b13c8 100755 --- a/scripts/migration/migrate_menas.py +++ b/scripts/migration/migrate_menas.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright 2018 ABSA Group Limited +# Copyright 2022 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 44b1f2e9162733408518cbdbde211ff4e5bf39ba Mon Sep 17 00:00:00 2001 From: Daniel K Date: Mon, 23 Oct 2023 12:45:19 +0200 Subject: [PATCH 11/17] Enceladus mapping script: new feature -o/--only-datasets - if given, related mapping tables will not be path-changed. --- scripts/migration/dataset_paths_to_ecs.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 287eada9f..b524a0968 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -28,6 +28,7 @@ DEFAULT_MAPPING_SERVICE_URL = "https://set-your-mapping-service-here.execute-api.af-south-1.amazonaws.com/dev/map" DEFAULT_MAPPING_PREFIX = "s3a://" +DEFAULT_DATASETS_ONLY = False PATH_CHANGE_FREE_MONGO_FILTER = {"pathChanged": {"$exists": False}} @@ -56,6 +57,10 @@ def parse_args() -> argparse.Namespace: parser.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], nargs="+", help='list datasets names to change paths in') + parser.add_argument('-o', '--only-datasets', dest='onlydatasets', action='store_true', default=DEFAULT_DATASETS_ONLY, + help="if specified, mapping table changes will NOT be done.") + + return parser.parse_args() def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str: @@ -160,6 +165,7 @@ def pathchange_collections_by_ds_names(target_db: MenasDb, supplied_ds_names: List[str], mapping_svc_url: str, mapping_prefix: str, + onlydatasets: bool, dryrun: bool) -> None: if verbose: @@ -168,12 +174,17 @@ def pathchange_collections_by_ds_names(target_db: MenasDb, ds_names_found = target_db.get_distinct_ds_names_from_ds_names(supplied_ds_names, migration_free_only=False) print('Dataset names to path change (actually found db): {}'.format(ds_names_found)) - mapping_table_found_for_dss = target_db.get_distinct_mapping_tables_from_ds_names(ds_names_found, migration_free_only=False) - print('MTs to path change: {}'.format(mapping_table_found_for_dss)) + if onlydatasets: + print("Configured *NOT* to path-change related mapping tables.") + else: + mapping_table_found_for_dss = target_db.get_distinct_mapping_tables_from_ds_names(ds_names_found, migration_free_only=False) + print('MTs to path change: {}'.format(mapping_table_found_for_dss)) + print("") pathchange_entities(target_db, DATASET_COLLECTION, "dataset", ds_names_found, mapping_svc_url, mapping_prefix, dryrun) - pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_svc_url, mapping_prefix, dryrun) + if not onlydatasets: + pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_svc_url, mapping_prefix, dryrun) def run(parsed_args: argparse.Namespace): @@ -193,7 +204,8 @@ def run(parsed_args: argparse.Namespace): target_db = MenasDb.from_connection_string(target_conn_string, target_db_name, alias="target db", verbose=verbose) dataset_names = parsed_args.datasets - pathchange_collections_by_ds_names(target_db, dataset_names, mapping_service, mapping_prefix, dryrun=dryrun) + only_datasets = parsed_args.onlydatasets + pathchange_collections_by_ds_names(target_db, dataset_names, mapping_service, mapping_prefix, only_datasets, dryrun=dryrun) print("Done.") From 506d0d1de5d7540fa932c7186d9db154835f60e8 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Tue, 24 Oct 2023 11:47:23 +0200 Subject: [PATCH 12/17] Enceladus mapping script: `-s/--mapping-service` changed to `-u/--mapping-service-url`. `-f/--fields-to-map=[hdfsPath|hdfsPublishPath|all(default)]` param has been added. - there is now no `pathChanged` mark in the DB, instead - the -s/--skip-prefix value is check to determined if path-changed or not. --- scripts/migration/dataset_paths_to_ecs.py | 167 ++++++++++++++-------- 1 file changed, 107 insertions(+), 60 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index b524a0968..36cabe945 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -19,6 +19,7 @@ from constants import * from menas_db import MenasDb +from dataclasses import dataclass import requests # python package needed are denoted in requirements.txt, so to fix missing dependencies, just run @@ -28,9 +29,12 @@ DEFAULT_MAPPING_SERVICE_URL = "https://set-your-mapping-service-here.execute-api.af-south-1.amazonaws.com/dev/map" DEFAULT_MAPPING_PREFIX = "s3a://" +DEFAULT_SKIP_PREFIX = "s3a://" DEFAULT_DATASETS_ONLY = False -PATH_CHANGE_FREE_MONGO_FILTER = {"pathChanged": {"$exists": False}} +MAPPING_FIELD_HDFS_PATH = "hdfsPath" +MAPPING_FIELD_HDFS_PUBLISH_PATH = "hdfsPublishPath" +MAPPING_FIELD_HDFS_ALL = "all" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( @@ -48,11 +52,17 @@ def parse_args() -> argparse.Namespace: parser.add_argument('-t', '--target-database', dest="targetdb", default=DEFAULT_DB_NAME, help="Name of db on target to be affected.") - parser.add_argument('-s', '--mapping-service', dest="mappingservice", default=DEFAULT_MAPPING_SERVICE_URL, - help="Service to use for path change mapping.") + parser.add_argument('-u', '--mapping-service-url', dest="mappingservice", default=DEFAULT_MAPPING_SERVICE_URL, + help="Service URL to use for path change mapping.") parser.add_argument('-p', '--mapping-prefix', dest="mappingprefix", default=DEFAULT_MAPPING_PREFIX, - help="Default mapping prefix to be applied for paths") + help="This prefix will be prepended to mapped path by the Mapping service") + + parser.add_argument('-s', '--skip-prefix', dest="skipprefix", default=DEFAULT_SKIP_PREFIX, + help="Path with these prefix will be skipped from mapping") + + parser.add_argument('-f', '--fields-to-map', dest='fieldstomap', choices=[MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL], + default=MAPPING_FIELD_HDFS_ALL, help="Map either item's 'hdfsPath', 'hdfsPublishPath' or 'all'") parser.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[], nargs="+", help='list datasets names to change paths in') @@ -79,8 +89,62 @@ def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str: return path_prefix_to_add + ecs_path + +@dataclass +class MappingSettings: + mapping_service_url: str + mapping_prefix: str + skip_prefix: str + fields_to_map: str # HDFS_MAPPING_FIELD_HDFS_* + + +def update_data_for_item(item: dict, mapping_settings: MappingSettings) -> dict: + # hdfsPath + hdfs_to_be_path_changed = True if mapping_settings.fields_to_map in {MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_ALL} else False + hdfs_path = item["hdfsPath"] + + if hdfs_to_be_path_changed and not hdfs_path.startswith(mapping_settings.skip_prefix): + updated_hdfs_path = map_path_from_svc(hdfs_path, mapping_settings.mapping_prefix, mapping_settings.mapping_service_url) + data_update = { + "hdfsPath": updated_hdfs_path, + "bakHdfsPath": hdfs_path + } + else: + # not mapped + data_update = {} + + # hdfsPublishPath + has_hdfs_publish_path = "hdfsPublishPath" in item + if has_hdfs_publish_path: + hdfs_publish_path = item["hdfsPublishPath"] + hdfs_publish_to_be_path_changed = True if mapping_settings.fields_to_map in {MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL} else False + + if hdfs_publish_to_be_path_changed and not hdfs_publish_path.startswith(mapping_settings.skip_prefix): + updated_hdfs_publish_path = map_path_from_svc(hdfs_publish_path, mapping_settings.mapping_prefix, mapping_settings.mapping_service_url) + data_update["hdfsPublishPath"] = updated_hdfs_publish_path + data_update["bakHdfsPublishPath"] = hdfs_publish_path + + return data_update + +def data_update_to_nice_string(data_update: dict) -> str: + mappings = [] + + # hdfsPath + if "hdfsPath" in data_update and "bakHdfsPath" in data_update: + hdfs_path = data_update["hdfsPath"] + bak_hdfs_path = data_update["hdfsPath"] + mappings.append(f"hdfsPath: {bak_hdfs_path} -> {hdfs_path}") + + # hdfsPublishPath + if "hdfsPublishPath" in data_update and "bakHdfsPublishPath" in data_update: + hdfs_publish_path = data_update["hdfsPublishPath"] + bak_hdfs_publish_path = data_update["hdfsPublishPath"] + mappings.append(f"hdfsPublishPath: {bak_hdfs_publish_path} -> {hdfs_publish_path}") + + return ", ".join(mappings) + def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: str, entity_names_list: List[str], - mapping_svc_url: str, mapping_prefix: str, dryrun: bool) -> None: + mapping_settings: MappingSettings, dryrun: bool) -> None: assert entity_name == "dataset" or entity_name == "mapping table" , "this method supports datasets and MTs only!" @@ -92,14 +156,13 @@ def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: s dataset_collection = target_db.mongodb[collection_name] query = {"$and": [ - {"name": {"$in": entity_names_list}}, # dataset/MT name - PATH_CHANGE_FREE_MONGO_FILTER + {"name": {"$in": entity_names_list}} # dataset/MT name ]} docs_count = dataset_collection.count_documents(query) docs = dataset_collection.find(query) - print("Found: {} {} documents for the path change. In progress ... ".format(docs_count, entity_name)) + print("Found: {} {} documents for a potential path change. In progress ... ".format(docs_count, entity_name)) patched = 0 failed_count = 0 @@ -108,54 +171,35 @@ def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: s if verbose: print("Changing paths for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) - # common logic for datasets and mapping tables, but MTs do not have hdfsPublishPath - hdfs_path = item["hdfsPath"] - updated_hdfs_path = map_path_from_svc(hdfs_path, mapping_prefix, mapping_svc_url) - - has_hdfs_publish_path = "hdfsPublishPath" in item - if has_hdfs_publish_path: - hdfs_publish_path = item["hdfsPublishPath"] - updated_hdfs_publish_path = map_path_from_svc(hdfs_publish_path, mapping_prefix, mapping_svc_url) - - if dryrun: - print(" *would set* hdfsPath: {} -> {}".format(hdfs_path, updated_hdfs_path)) - if has_hdfs_publish_path: - print(" *would set* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) - print("") + update_data = update_data_for_item(item, mapping_settings) + if update_data != {}: + if dryrun: + print(" *would set* {}".format(data_update_to_nice_string(update_data))) + print("") - else: - try: - if verbose: - print(" *changing* hdfsPath: {} -> {}".format(hdfs_path, updated_hdfs_path)) - - update_data = { - "hdfsPath": updated_hdfs_path, - "bakHdfsPath": hdfs_path, - "pathChanged": True - } - - if has_hdfs_publish_path: - update_data["hdfsPublishPath"] = updated_hdfs_publish_path - update_data["bakHdfsPublishPath"] = hdfs_publish_path - if verbose: - print(" *changing* hdfsPublishPath: {} -> {}".format(hdfs_publish_path, updated_hdfs_publish_path)) - - update_result = dataset_collection.update_one( - {"$and": [ - {"_id": item["_id"]}, - PATH_CHANGE_FREE_MONGO_FILTER - ]}, - {"$set": update_data} - ) - if update_result.acknowledged and verbose: - print("Successfully changed path for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) - print("") - - except Exception as e: - print("Warning: Error while changing paths for {} '{}' v{} (_id={}): {}".format(entity_name, item["name"], item["version"], item["_id"], e)) - failed_count += 1 else: - patched += 1 + try: + if verbose: + print(" *changing*: {}".format(data_update_to_nice_string(update_data))) + + update_result = dataset_collection.update_one( + {"$and": [ + {"_id": item["_id"]} + ]}, + {"$set": update_data} + ) + if update_result.acknowledged and verbose: + print("Successfully changed path for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) + print("") + + except Exception as e: + print("Warning: Error while changing paths for {} '{}' v{} (_id={}): {}".format(entity_name, item["name"], item["version"], item["_id"], e)) + failed_count += 1 + else: + patched += 1 + else: + if verbose: + print("Nothing left to change for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"])) print("Successfully migrated {} of {} {} entries, failed: {}".format(patched, docs_count, entity_name, failed_count)) print("") @@ -163,8 +207,7 @@ def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: s def pathchange_collections_by_ds_names(target_db: MenasDb, supplied_ds_names: List[str], - mapping_svc_url: str, - mapping_prefix: str, + mapping_settings: MappingSettings, onlydatasets: bool, dryrun: bool) -> None: @@ -182,9 +225,9 @@ def pathchange_collections_by_ds_names(target_db: MenasDb, print("") - pathchange_entities(target_db, DATASET_COLLECTION, "dataset", ds_names_found, mapping_svc_url, mapping_prefix, dryrun) + pathchange_entities(target_db, DATASET_COLLECTION, "dataset", ds_names_found, mapping_settings, dryrun) if not onlydatasets: - pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_svc_url, mapping_prefix, dryrun) + pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_settings, dryrun) def run(parsed_args: argparse.Namespace): @@ -192,8 +235,12 @@ def run(parsed_args: argparse.Namespace): target_db_name = parsed_args.targetdb dryrun = args.dryrun # if set, only path change description will be printed, no actual patching will run + mapping_service = args.mappingservice mapping_prefix = args.mappingprefix + skip_prefix = args.skipprefix + fields_to_map = args.fieldstomap # argparse allow only one of HDFS_MAPPING_FIELD_HDFS_* + mapping_settings = MappingSettings(mapping_service, mapping_prefix, skip_prefix, fields_to_map) print('Menas mongo ECS paths mapping') print('Running with settings: dryrun={}, verbose={}'.format(dryrun, verbose)) @@ -205,7 +252,7 @@ def run(parsed_args: argparse.Namespace): dataset_names = parsed_args.datasets only_datasets = parsed_args.onlydatasets - pathchange_collections_by_ds_names(target_db, dataset_names, mapping_service, mapping_prefix, only_datasets, dryrun=dryrun) + pathchange_collections_by_ds_names(target_db, dataset_names, mapping_settings, only_datasets, dryrun=dryrun) print("Done.") @@ -219,6 +266,6 @@ def run(parsed_args: argparse.Namespace): ## Examples runs: # Dry-run example: - # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d DM9_actn_Cd -t menas_remap_test -n -s https://my_service.amazonaws.com/dev/map + # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d MyDataset1 AnotherDatasetB -t menas_remap_test -n -u https://my_service.amazonaws.com/dev/map # Verbose run example, will use DEFAULT_MAPPING_SERVICE_URL on line 28: # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d XMSK083 -t menas_remap_test -v From 8058ede2f02befc8116bc9db11591ca486aef32e Mon Sep 17 00:00:00 2001 From: Daniel K Date: Tue, 24 Oct 2023 13:03:49 +0200 Subject: [PATCH 13/17] Enceladus mapping script: prettyprint fix --- scripts/migration/dataset_paths_to_ecs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 36cabe945..b0485d4a3 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -132,13 +132,13 @@ def data_update_to_nice_string(data_update: dict) -> str: # hdfsPath if "hdfsPath" in data_update and "bakHdfsPath" in data_update: hdfs_path = data_update["hdfsPath"] - bak_hdfs_path = data_update["hdfsPath"] + bak_hdfs_path = data_update["bakHdfsPath"] mappings.append(f"hdfsPath: {bak_hdfs_path} -> {hdfs_path}") # hdfsPublishPath if "hdfsPublishPath" in data_update and "bakHdfsPublishPath" in data_update: hdfs_publish_path = data_update["hdfsPublishPath"] - bak_hdfs_publish_path = data_update["hdfsPublishPath"] + bak_hdfs_publish_path = data_update["bakHdfsPublishPath"] mappings.append(f"hdfsPublishPath: {bak_hdfs_publish_path} -> {hdfs_publish_path}") return ", ".join(mappings) @@ -268,4 +268,4 @@ def run(parsed_args: argparse.Namespace): # Dry-run example: # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d MyDataset1 AnotherDatasetB -t menas_remap_test -n -u https://my_service.amazonaws.com/dev/map # Verbose run example, will use DEFAULT_MAPPING_SERVICE_URL on line 28: - # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d XMSK083 -t menas_remap_test -v + # python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d DatasetA -t menas_remap_test -v From dda3c50ebd12d160e0e11c687fcd447dffa71845 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Tue, 24 Oct 2023 13:27:42 +0200 Subject: [PATCH 14/17] Enceladus mapping script: prettyprint fix --- scripts/migration/dataset_paths_to_ecs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index b0485d4a3..780259bfc 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -255,6 +255,7 @@ def run(parsed_args: argparse.Namespace): pathchange_collections_by_ds_names(target_db, dataset_names, mapping_settings, only_datasets, dryrun=dryrun) print("Done.") + print("") if __name__ == '__main__': From 71bd55034ce4270d86f99bae093c73e480140b6d Mon Sep 17 00:00:00 2001 From: Daniel K Date: Tue, 31 Oct 2023 09:15:50 +0100 Subject: [PATCH 15/17] Enceladus mapping script: ECS mapping service usage improved: pre-run ECS service mapping check added; exception communicates the HDFS path used --- scripts/migration/dataset_paths_to_ecs.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 780259bfc..1138a7a4c 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -82,7 +82,7 @@ def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str: response = requests.get(svc_url, data=payload) if response.status_code != 200: - raise Exception(f"Could load ECS path from {svc_url}, received error {response.status_code} {response.text}") + raise Exception(f"Could load ECS path from {svc_url} for hdfs_path='{path}', received error {response.status_code} {response.text}") wrapper = response.json() ecs_path = wrapper['ecs_path'] @@ -229,6 +229,10 @@ def pathchange_collections_by_ds_names(target_db: MenasDb, if not onlydatasets: pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_settings, dryrun) +def pre_run_mapping_service_check(svc_url: str, path_prefix_to_add: str): + test_path = "/bigdatahdfs/datalake/publish/pcub/just/a/path/to/test/the/service/" + + map_path_from_svc(test_path, path_prefix_to_add, svc_url) def run(parsed_args: argparse.Namespace): target_conn_string = parsed_args.target @@ -248,6 +252,11 @@ def run(parsed_args: argparse.Namespace): print(' target connection-string: {}'.format(target_conn_string)) print(' target DB: {}'.format(target_db_name)) + print('Testing mapping service availability first...') + pre_run_mapping_service_check(mapping_service, mapping_prefix) # would throw on error, let's fail fast before mongo is tried + print(' ok') + print("") + target_db = MenasDb.from_connection_string(target_conn_string, target_db_name, alias="target db", verbose=verbose) dataset_names = parsed_args.datasets From aad5a8bb89523fa66bb97709ef5c142bca5c51b2 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Wed, 8 Nov 2023 15:27:20 +0100 Subject: [PATCH 16/17] Enceladus mapping script: --skip-prefixes/-s now take multiple values and default to usage `-s s3a:// /tmp` --- scripts/migration/dataset_paths_to_ecs.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 1138a7a4c..7829de8fd 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -29,7 +29,7 @@ DEFAULT_MAPPING_SERVICE_URL = "https://set-your-mapping-service-here.execute-api.af-south-1.amazonaws.com/dev/map" DEFAULT_MAPPING_PREFIX = "s3a://" -DEFAULT_SKIP_PREFIX = "s3a://" +DEFAULT_SKIP_PREFIXES = ["s3a://", "/tmp"] DEFAULT_DATASETS_ONLY = False MAPPING_FIELD_HDFS_PATH = "hdfsPath" @@ -58,8 +58,8 @@ def parse_args() -> argparse.Namespace: parser.add_argument('-p', '--mapping-prefix', dest="mappingprefix", default=DEFAULT_MAPPING_PREFIX, help="This prefix will be prepended to mapped path by the Mapping service") - parser.add_argument('-s', '--skip-prefix', dest="skipprefix", default=DEFAULT_SKIP_PREFIX, - help="Path with these prefix will be skipped from mapping") + parser.add_argument('-s', '--skip-prefixes', dest="skipprefixes", metavar="SKIP_PREFIX", default=DEFAULT_SKIP_PREFIXES, + nargs="+", help="Path with these prefixes will be skipped from mapping") parser.add_argument('-f', '--fields-to-map', dest='fieldstomap', choices=[MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL], default=MAPPING_FIELD_HDFS_ALL, help="Map either item's 'hdfsPath', 'hdfsPublishPath' or 'all'") @@ -94,7 +94,7 @@ def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str: class MappingSettings: mapping_service_url: str mapping_prefix: str - skip_prefix: str + skip_prefixes: List[str] fields_to_map: str # HDFS_MAPPING_FIELD_HDFS_* @@ -103,7 +103,10 @@ def update_data_for_item(item: dict, mapping_settings: MappingSettings) -> dict: hdfs_to_be_path_changed = True if mapping_settings.fields_to_map in {MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_ALL} else False hdfs_path = item["hdfsPath"] - if hdfs_to_be_path_changed and not hdfs_path.startswith(mapping_settings.skip_prefix): + def starts_with_one_of_prefixes(path: str, prefixes: List[str]) -> bool: + return any(path.startswith(prefix) for prefix in prefixes) + + if hdfs_to_be_path_changed and not starts_with_one_of_prefixes(hdfs_path, mapping_settings.skip_prefixes): updated_hdfs_path = map_path_from_svc(hdfs_path, mapping_settings.mapping_prefix, mapping_settings.mapping_service_url) data_update = { "hdfsPath": updated_hdfs_path, @@ -119,7 +122,7 @@ def update_data_for_item(item: dict, mapping_settings: MappingSettings) -> dict: hdfs_publish_path = item["hdfsPublishPath"] hdfs_publish_to_be_path_changed = True if mapping_settings.fields_to_map in {MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL} else False - if hdfs_publish_to_be_path_changed and not hdfs_publish_path.startswith(mapping_settings.skip_prefix): + if hdfs_publish_to_be_path_changed and not starts_with_one_of_prefixes(hdfs_publish_path, mapping_settings.skip_prefixes): updated_hdfs_publish_path = map_path_from_svc(hdfs_publish_path, mapping_settings.mapping_prefix, mapping_settings.mapping_service_url) data_update["hdfsPublishPath"] = updated_hdfs_publish_path data_update["bakHdfsPublishPath"] = hdfs_publish_path @@ -242,9 +245,9 @@ def run(parsed_args: argparse.Namespace): mapping_service = args.mappingservice mapping_prefix = args.mappingprefix - skip_prefix = args.skipprefix + skip_prefixes = args.skipprefixes fields_to_map = args.fieldstomap # argparse allow only one of HDFS_MAPPING_FIELD_HDFS_* - mapping_settings = MappingSettings(mapping_service, mapping_prefix, skip_prefix, fields_to_map) + mapping_settings = MappingSettings(mapping_service, mapping_prefix, skip_prefixes, fields_to_map) print('Menas mongo ECS paths mapping') print('Running with settings: dryrun={}, verbose={}'.format(dryrun, verbose)) From 8d0e5207b4925ae0d853d6333e9061f46b9643a3 Mon Sep 17 00:00:00 2001 From: Daniel K Date: Mon, 27 Nov 2023 15:33:53 +0100 Subject: [PATCH 17/17] Enceladus mapping script: Skipping prefixes printed at the start of the script --- scripts/migration/dataset_paths_to_ecs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/migration/dataset_paths_to_ecs.py b/scripts/migration/dataset_paths_to_ecs.py index 7829de8fd..c0f47b5c0 100644 --- a/scripts/migration/dataset_paths_to_ecs.py +++ b/scripts/migration/dataset_paths_to_ecs.py @@ -251,6 +251,7 @@ def run(parsed_args: argparse.Namespace): print('Menas mongo ECS paths mapping') print('Running with settings: dryrun={}, verbose={}'.format(dryrun, verbose)) + print('Skipping prefixes: {}'.format(skip_prefixes)) print("Using mapping service at: {}".format(mapping_service)) print(' target connection-string: {}'.format(target_conn_string)) print(' target DB: {}'.format(target_db_name))