Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ecs paths mapping script #2197

Merged
merged 17 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/migration/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2018 ABSA Group Limited
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
285 changes: 285 additions & 0 deletions scripts/migration/dataset_paths_to_ecs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
#!/usr/bin/env python3

# Copyright 2023 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse

from typing import List

from constants import *
from menas_db import MenasDb
from dataclasses import dataclass
import requests

# python package needed are denoted in requirements.txt, so to fix missing dependencies, just run
# pip install -r requirements.txt


DEFAULT_MAPPING_SERVICE_URL = "https://set-your-mapping-service-here.execute-api.af-south-1.amazonaws.com/dev/map"

DEFAULT_MAPPING_PREFIX = "s3a://"
DEFAULT_SKIP_PREFIXES = ["s3a://", "/tmp"]
DEFAULT_DATASETS_ONLY = False

MAPPING_FIELD_HDFS_PATH = "hdfsPath"
MAPPING_FIELD_HDFS_PUBLISH_PATH = "hdfsPublishPath"
MAPPING_FIELD_HDFS_ALL = "all"

def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog='dataset_paths_to_ecs',
description='Menas MongoDB path changes to ECS',
formatter_class=argparse.ArgumentDefaultsHelpFormatter # prints default values, too, on help (-h)
)

parser.add_argument('-n', '--dryrun', action='store_true', default=DEFAULT_DRYRUN,
help="if specified, skip the actual changes, just print what would be done.")
parser.add_argument('-v', '--verbose', action="store_true", default=DEFAULT_VERBOSE,
help="prints extra information while running.")

parser.add_argument('target', metavar="TARGET", help="connection string for target MongoDB")
parser.add_argument('-t', '--target-database', dest="targetdb", default=DEFAULT_DB_NAME,
help="Name of db on target to be affected.")

parser.add_argument('-u', '--mapping-service-url', dest="mappingservice", default=DEFAULT_MAPPING_SERVICE_URL,
help="Service URL to use for path change mapping.")

parser.add_argument('-p', '--mapping-prefix', dest="mappingprefix", default=DEFAULT_MAPPING_PREFIX,
help="This prefix will be prepended to mapped path by the Mapping service")

parser.add_argument('-s', '--skip-prefixes', dest="skipprefixes", metavar="SKIP_PREFIX", default=DEFAULT_SKIP_PREFIXES,
nargs="+", help="Path with these prefixes will be skipped from mapping")

parser.add_argument('-f', '--fields-to-map', dest='fieldstomap', choices=[MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL],
default=MAPPING_FIELD_HDFS_ALL, help="Map either item's 'hdfsPath', 'hdfsPublishPath' or 'all'")

parser.add_argument('-d', '--datasets', dest='datasets', metavar="DATASET_NAME", default=[],
nargs="+", help='list datasets names to change paths in')

parser.add_argument('-o', '--only-datasets', dest='onlydatasets', action='store_true', default=DEFAULT_DATASETS_ONLY,
help="if specified, mapping table changes will NOT be done.")


return parser.parse_args()

def map_path_from_svc(path: str, path_prefix_to_add: str, svc_url: str)-> str:
# Example usage of the service:
# curl -X GET -d '{"hdfs_path":"/bigdatahdfs/datalake/publish/dm9/CNSMR_ACCNT/country_code=KEN"}' 'https://my_service.amazonaws.com/dev/map'
# {"ecs_path": "ursamajor123-abs1234-prod-edla-abc123-ke/publish/CNSMR_ACCNT/country_code=KEN/"}

payload = "{\"hdfs_path\":\"" + path + "\"}"
response = requests.get(svc_url, data=payload)

if response.status_code != 200:
raise Exception(f"Could load ECS path from {svc_url} for hdfs_path='{path}', received error {response.status_code} {response.text}")

wrapper = response.json()
ecs_path = wrapper['ecs_path']

return path_prefix_to_add + ecs_path


@dataclass
class MappingSettings:
mapping_service_url: str
mapping_prefix: str
skip_prefixes: List[str]
fields_to_map: str # HDFS_MAPPING_FIELD_HDFS_*


def update_data_for_item(item: dict, mapping_settings: MappingSettings) -> dict:
# hdfsPath
hdfs_to_be_path_changed = True if mapping_settings.fields_to_map in {MAPPING_FIELD_HDFS_PATH, MAPPING_FIELD_HDFS_ALL} else False
hdfs_path = item["hdfsPath"]

def starts_with_one_of_prefixes(path: str, prefixes: List[str]) -> bool:
return any(path.startswith(prefix) for prefix in prefixes)

if hdfs_to_be_path_changed and not starts_with_one_of_prefixes(hdfs_path, mapping_settings.skip_prefixes):
updated_hdfs_path = map_path_from_svc(hdfs_path, mapping_settings.mapping_prefix, mapping_settings.mapping_service_url)
data_update = {
"hdfsPath": updated_hdfs_path,
"bakHdfsPath": hdfs_path
}
else:
# not mapped
data_update = {}

# hdfsPublishPath
has_hdfs_publish_path = "hdfsPublishPath" in item
if has_hdfs_publish_path:
hdfs_publish_path = item["hdfsPublishPath"]
hdfs_publish_to_be_path_changed = True if mapping_settings.fields_to_map in {MAPPING_FIELD_HDFS_PUBLISH_PATH, MAPPING_FIELD_HDFS_ALL} else False

if hdfs_publish_to_be_path_changed and not starts_with_one_of_prefixes(hdfs_publish_path, mapping_settings.skip_prefixes):
updated_hdfs_publish_path = map_path_from_svc(hdfs_publish_path, mapping_settings.mapping_prefix, mapping_settings.mapping_service_url)
data_update["hdfsPublishPath"] = updated_hdfs_publish_path
data_update["bakHdfsPublishPath"] = hdfs_publish_path

return data_update

def data_update_to_nice_string(data_update: dict) -> str:
mappings = []

# hdfsPath
if "hdfsPath" in data_update and "bakHdfsPath" in data_update:
hdfs_path = data_update["hdfsPath"]
bak_hdfs_path = data_update["bakHdfsPath"]
mappings.append(f"hdfsPath: {bak_hdfs_path} -> {hdfs_path}")

# hdfsPublishPath
if "hdfsPublishPath" in data_update and "bakHdfsPublishPath" in data_update:
hdfs_publish_path = data_update["hdfsPublishPath"]
bak_hdfs_publish_path = data_update["bakHdfsPublishPath"]
mappings.append(f"hdfsPublishPath: {bak_hdfs_publish_path} -> {hdfs_publish_path}")

return ", ".join(mappings)

def pathchange_entities(target_db: MenasDb, collection_name: str, entity_name: str, entity_names_list: List[str],
mapping_settings: MappingSettings, dryrun: bool) -> None:

assert entity_name == "dataset" or entity_name == "mapping table" , "this method supports datasets and MTs only!"

if not entity_names_list:
print("No {}s to path-change in {}, skipping.".format(entity_name, collection_name))
return

print("Path changing of collection {} started".format(collection_name))
dataset_collection = target_db.mongodb[collection_name]

query = {"$and": [
{"name": {"$in": entity_names_list}} # dataset/MT name
]}

docs_count = dataset_collection.count_documents(query)
docs = dataset_collection.find(query)

print("Found: {} {} documents for a potential path change. In progress ... ".format(docs_count, entity_name))

patched = 0
failed_count = 0
for item in docs:
# item preview
if verbose:
print("Changing paths for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"]))

update_data = update_data_for_item(item, mapping_settings)
if update_data != {}:
if dryrun:
print(" *would set* {}".format(data_update_to_nice_string(update_data)))
print("")

else:
try:
if verbose:
print(" *changing*: {}".format(data_update_to_nice_string(update_data)))

update_result = dataset_collection.update_one(
{"$and": [
{"_id": item["_id"]}
]},
{"$set": update_data}
)
if update_result.acknowledged and verbose:
print("Successfully changed path for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"]))
print("")

except Exception as e:
print("Warning: Error while changing paths for {} '{}' v{} (_id={}): {}".format(entity_name, item["name"], item["version"], item["_id"], e))
failed_count += 1
else:
patched += 1
else:
if verbose:
print("Nothing left to change for {} '{}' v{} (_id={}).".format(entity_name, item["name"], item["version"], item["_id"]))

print("Successfully migrated {} of {} {} entries, failed: {}".format(patched, docs_count, entity_name, failed_count))
print("")


def pathchange_collections_by_ds_names(target_db: MenasDb,
supplied_ds_names: List[str],
mapping_settings: MappingSettings,
onlydatasets: bool,
dryrun: bool) -> None:

if verbose:
print("Dataset names given: {}".format(supplied_ds_names))

ds_names_found = target_db.get_distinct_ds_names_from_ds_names(supplied_ds_names, migration_free_only=False)
print('Dataset names to path change (actually found db): {}'.format(ds_names_found))

if onlydatasets:
print("Configured *NOT* to path-change related mapping tables.")
else:
mapping_table_found_for_dss = target_db.get_distinct_mapping_tables_from_ds_names(ds_names_found, migration_free_only=False)
print('MTs to path change: {}'.format(mapping_table_found_for_dss))


print("")
pathchange_entities(target_db, DATASET_COLLECTION, "dataset", ds_names_found, mapping_settings, dryrun)
if not onlydatasets:
pathchange_entities(target_db, MAPPING_TABLE_COLLECTION, "mapping table", mapping_table_found_for_dss, mapping_settings, dryrun)

def pre_run_mapping_service_check(svc_url: str, path_prefix_to_add: str):
test_path = "/bigdatahdfs/datalake/publish/pcub/just/a/path/to/test/the/service/"

map_path_from_svc(test_path, path_prefix_to_add, svc_url)

def run(parsed_args: argparse.Namespace):
target_conn_string = parsed_args.target
target_db_name = parsed_args.targetdb

dryrun = args.dryrun # if set, only path change description will be printed, no actual patching will run

mapping_service = args.mappingservice
mapping_prefix = args.mappingprefix
skip_prefixes = args.skipprefixes
fields_to_map = args.fieldstomap # argparse allow only one of HDFS_MAPPING_FIELD_HDFS_*
mapping_settings = MappingSettings(mapping_service, mapping_prefix, skip_prefixes, fields_to_map)

print('Menas mongo ECS paths mapping')
print('Running with settings: dryrun={}, verbose={}'.format(dryrun, verbose))
print('Skipping prefixes: {}'.format(skip_prefixes))
print("Using mapping service at: {}".format(mapping_service))
print(' target connection-string: {}'.format(target_conn_string))
print(' target DB: {}'.format(target_db_name))

print('Testing mapping service availability first...')
pre_run_mapping_service_check(mapping_service, mapping_prefix) # would throw on error, let's fail fast before mongo is tried
print(' ok')
print("")

target_db = MenasDb.from_connection_string(target_conn_string, target_db_name, alias="target db", verbose=verbose)

dataset_names = parsed_args.datasets
only_datasets = parsed_args.onlydatasets
pathchange_collections_by_ds_names(target_db, dataset_names, mapping_settings, only_datasets, dryrun=dryrun)

print("Done.")
print("")


if __name__ == '__main__':
args = parse_args()

# globals script vars
verbose = args.verbose
run(args)

## Examples runs:
# Dry-run example:
# python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d MyDataset1 AnotherDatasetB -t menas_remap_test -n -u https://my_service.amazonaws.com/dev/map
# Verbose run example, will use DEFAULT_MAPPING_SERVICE_URL on line 28:
# python dataset_paths_to_ecs.py mongodb://localhost:27017/admin -d DatasetA -t menas_remap_test -v
2 changes: 1 addition & 1 deletion scripts/migration/initialize_menas.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2018 ABSA Group Limited
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion scripts/migration/menas_db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2018 ABSA Group Limited
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion scripts/migration/migrate_menas.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

# Copyright 2018 ABSA Group Limited
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
1 change: 1 addition & 0 deletions scripts/migration/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pymongo>=4.0.1
requests==2.31.0
Loading