diff --git a/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/facility_type.json b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/facility_type.json new file mode 100644 index 000000000..8c0cb2903 --- /dev/null +++ b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/facility_type.json @@ -0,0 +1,16 @@ +{ + "Air (fixed)": "airFixed", + "Air (mobile)": "airMobile", + "Lake/River (fixed)": "lakeRiverFixed", + "Lake/River (mobile)": "lakeRiverMobile", + "Land (fixed)": "landFixed", + "Land (mobile)": "landMobile", + "Land (on ice)": "landOnIce", + "Sea (fixed)": "seaFixed", + "Sea (mobile)": "seaMobile", + "Sea (on ice)": "seaOnIce", + "Space-based": "spaceBased", + "Underwater (fixed)": "underwaterFixed", + "Underwater (mobile)": "underwaterMobile", + "unknown": "unknown" +} diff --git a/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/operating_status.json b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/operating_status.json new file mode 100644 index 000000000..a71058383 --- /dev/null +++ b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/operating_status.json @@ -0,0 +1,10 @@ +{ + "Closed": "closed", + "Non-reporting": "nonReporting", + "Operational": "operational", + "Partly operational": "partlyOperational", + "Planned": "planned", + "Pre-operational": "preOperational", + "Stand-by": "standBy", + "unknown": "unknown" +} diff --git a/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/territory_name.json b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/territory_name.json new file mode 100644 index 000000000..3f106559d --- /dev/null +++ b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/territory_name.json @@ -0,0 +1,226 @@ +{ + "Afghanistan": "AFG", + "Angola": "AGO", + "Anguilla": "AIA", + "Albania": "ALB", + "Andorra": "AND", + "United Arab Emirates (the)": "ARE", + "Argentina": "ARG", + "Armenia": "ARM", + "Antigua and Barbuda": "ATG", + "Australia": "AUS", + "Austria": "AUT", + "Azerbaijan": "AZE", + "British Caribbean Territories": "BCT", + "Burundi": "BDI", + "Belgium": "BEL", + "Benin": "BEN", + "Burkina Faso": "BFA", + "Bangladesh": "BGD", + "Bulgaria": "BGR", + "Bahrain": "BHR", + "Bahamas": "BHS", + "Bosnia and Herzegovina": "BIH", + "Belarus": "BLR", + "Belize": "BLZ", + "Bermuda": "BMU", + "Bolivia, Plurinational State of": "BOL", + "Brazil": "BRA", + "Barbados": "BRB", + "Brunei Darussalam": "BRN", + "Bhutan": "BTN", + "Botswana": "BWA", + "Central African Republic": "CAF", + "Canada": "CAN", + "Cocos (Keeling) Islands": "CCK", + "Switzerland": "CHE", + "Chile": "CHL", + "China": "CHN", + "Cote d'Ivoire": "CIV", + "Cameroon": "CMR", + "Congo, Democratic Republic of the": "COD", + "Congo": "COG", + "Cook Islands": "COK", + "Colombia": "COL", + "Comoros": "COM", + "Cabo Verde": "CPV", + "Costa Rica": "CRI", + "Curacao and Sint Maarten": "CSM", + "Cuba": "CUB", + "Curacao": "CUW", + "Christmas Island": "CXR", + "Cayman Islands": "CYM", + "Cyprus": "CYP", + "Czech Republic": "CZE", + "Germany": "DEU", + "Djibouti": "DJI", + "Dominica": "DMA", + "Denmark": "DNK", + "Dominican Republic": "DOM", + "Algeria": "DZA", + "Ecuador": "ECU", + "Egypt": "EGY", + "Eritrea": "ERI", + "Western Sahara": "ESH", + "Spain": "ESP", + "Estonia": "EST", + "Ethiopia": "ETH", + "Finland": "FIN", + "Fiji": "FJI", + "Falkland Islands (Malvinas)": "FLK", + "France": "FRA", + "Micronesia, Federated States of": "FSM", + "Gabon": "GAB", + "United Kingdom (the)": "GBR", + "Georgia": "GEO", + "Ghana": "GHA", + "Gibraltar": "GIB", + "Guinea": "GIN", + "Gambia": "GMB", + "Guinea-Bissau": "GNB", + "Equatorial Guinea": "GNQ", + "Greece": "GRC", + "Grenada": "GRD", + "Greenland": "GRL", + "Guatemala": "GTM", + "Guyana": "GUY", + "Hong Kong, China": "HKG", + "Honduras": "HND", + "Croatia": "HRV", + "Haiti": "HTI", + "Hungary": "HUN", + "Indonesia": "IDN", + "India": "IND", + "Ireland": "IRL", + "Iran, Islamic Republic of": "IRN", + "Iraq": "IRQ", + "Iceland": "ISL", + "Israel": "ISR", + "Italy": "ITA", + "Jamaica": "JAM", + "Jordan": "JOR", + "Japan": "JPN", + "Kazakhstan": "KAZ", + "Kenya": "KEN", + "Kyrgyzstan": "KGZ", + "Cambodia": "KHM", + "Kiribati": "KIR", + "Saint Kitts and Nevis": "KNA", + "Korea, Republic of": "KOR", + "Kuwait": "KWT", + "Lao People's Democratic Republic": "LAO", + "Lebanon": "LBN", + "Liberia": "LBR", + "Libya": "LBY", + "Saint Lucia": "LCA", + "Liechtenstein": "LIE", + "Sri Lanka": "LKA", + "Lesotho": "LSO", + "Lithuania": "LTU", + "Luxembourg": "LUX", + "Latvia": "LVA", + "Macao, China": "MAC", + "Morocco": "MAR", + "Monaco": "MCO", + "Moldova, Republic of": "MDA", + "Madagascar": "MDG", + "Maldives": "MDV", + "Mexico": "MEX", + "Marshall Islands": "MHL", + "Macedonia, the former Yugoslav Republic of": "MKD", + "Mali": "MLI", + "Malta": "MLT", + "Myanmar": "MMR", + "Montenegro": "MNE", + "Mongolia": "MNG", + "Mozambique": "MOZ", + "Mauritania": "MRT", + "Montserrat": "MSR", + "Mauritius": "MUS", + "Malawi": "MWI", + "Malaysia": "MYS", + "Namibia": "NAM", + "New Caledonia": "NCL", + "Niger": "NER", + "Nigeria": "NGA", + "Nicaragua": "NIC", + "Niue": "NIU", + "Netherlands": "NLD", + "Norway": "NOR", + "Nepal": "NPL", + "Nauru": "NRU", + "New Zealand": "NZL", + "Oman": "OMN", + "Pakistan": "PAK", + "Panama": "PAN", + "Pitcairn": "PCN", + "Peru": "PER", + "Philippines": "PHL", + "Palau": "PLW", + "Papua New Guinea": "PNG", + "Poland": "POL", + "Puerto Rico": "PRI", + "Korea, Democratic People's Republic of": "PRK", + "Portugal": "PRT", + "Paraguay": "PRY", + "Palestine, State of": "PSE", + "French Polynesia": "PYF", + "Qatar": "QAT", + "Romania": "ROU", + "Russian Federation": "RUS", + "Rwanda": "RWA", + "Saudi Arabia": "SAU", + "Sudan": "SDN", + "Senegal": "SEN", + "Singapore": "SGP", + "South Georgia and the South Sandwich Islands": "SGS", + "Saint Helena": "SHN", + "Solomon Islands": "SLB", + "Sierra Leone": "SLE", + "El Salvador": "SLV", + "Somalia": "SOM", + "Saint Pierre and Miquelon": "SPM", + "Serbia": "SRB", + "South Sudan": "SSD", + "Sao Tome and Principe": "STP", + "Suriname": "SUR", + "Slovakia": "SVK", + "Slovenia": "SVN", + "Sweden": "SWE", + "Eswatini": "SWZ", + "Sint Maarten": "SXM", + "Seychelles": "SYC", + "Syrian Arab Republic": "SYR", + "Turks and Caicos Islands": "TCA", + "Chad": "TCD", + "Togo": "TGO", + "Thailand": "THA", + "Tajikistan": "TJK", + "Tokelau": "TKL", + "Turkmenistan": "TKM", + "Timor-Leste": "TLS", + "Tonga": "TON", + "Trinidad and Tobago": "TTO", + "Tunisia": "TUN", + "Turkey": "TUR", + "Tuvalu": "TUV", + "Taiwan, Province of China": "TWN", + "Tanzania, United Republic of": "TZA", + "Uganda": "UGA", + "Ukraine": "UKR", + "Uruguay": "URY", + "United States (the)": "USA", + "Uzbekistan": "UZB", + "Saint Vincent and Grenadines": "VCT", + "Venezuela, Bolivarian Republic of": "VEN", + "British Virgin Islands": "VGB", + "Viet Nam": "VNM", + "Vanuatu": "VUT", + "Samoa": "WSM", + "Yemen": "YEM", + "South Africa": "ZAF", + "Zambia": "ZMB", + "Zimbabwe": "ZWE", + "inapplicable": "inapplicable", + "unknown": "unknown" +} diff --git a/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/wmo_region.json b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/wmo_region.json new file mode 100644 index 000000000..062ec5abd --- /dev/null +++ b/wis2box-management/migrations/v1_0b6_to_v1_0b7/mapping_files/wmo_region.json @@ -0,0 +1,8 @@ +{ + "I": "africa", + "II": "asia", + "III": "southAmerica", + "IV": "northCentralAmericaCaribbean", + "V": "southWestPacific", + "VI": "europe" +} \ No newline at end of file diff --git a/wis2box-management/migrations/v1_0b6_to_v1_0b7/update_station_definition_v1.0b7.py b/wis2box-management/migrations/v1_0b6_to_v1_0b7/update_station_definition_v1.0b7.py new file mode 100644 index 000000000..a610699ec --- /dev/null +++ b/wis2box-management/migrations/v1_0b6_to_v1_0b7/update_station_definition_v1.0b7.py @@ -0,0 +1,190 @@ +############################################################################### +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +############################################################################### +import argparse +import csv +import json +import os +from pathlib import Path + +from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk + +from wis2box.log import LOGGER, setup_logger + +LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() +setup_logger(loglevel=LOG_LEVEL) + +DATADIR = os.getenv("WIS2BOX_DATADIR") +THISDIR = os.path.dirname(os.path.realpath(__file__)) + +es_api = os.getenv("WIS2BOX_API_BACKEND_URL") +es_index = "stations" +station_file = Path(DATADIR) / "metadata" / "station" / "station_list.csv" + + +def apply_mapping(value, mapping): + return mapping.get(value, value) # noqa use existing value as default in case no match found + + +def apply_mapping_elastic(records, codelists, code_maps): + updates = [] + for idx in range(len(records)): + record = records[idx]['_source'] + # iterate over code lists and map entries + for codelist in codelists: + if codelist in record['properties']: + record['properties'][codelist] = code_maps[codelist].get( + record['properties'][codelist], + record['properties'][codelist] # noqa use existing value as default in case no match found + ) + else: + LOGGER.info(f"No matching code list found for {codelist}") + # now update record for ES + updates.append({ + "_op_type": "update", + "_index": records[idx].get('_index'), + "_id": records[idx].get('_id'), + "doc": record + }) + + return updates + + +def migrate(dryrun: bool = False): + # first load code lists / mappings + code_maps = {} + codelists = ('facility_type', 'territory_name', 'wmo_region') + for codelist in codelists: + LOGGER.info(f"Loading code list map for {codelist}") + p = Path(THISDIR) + mapping_file = p / "mapping_files" / f"{codelist}.json" + try: + with open(mapping_file) as fh: + code_maps[codelist] = json.load(fh) + except Exception as e: + LOGGER.error(f"Error loading mapping file for {codelist}") + raise e + + # First migrate / update data in station list CSV file + # list to store stations + stations = [] + # open station file and map + LOGGER.info("Processing {station_file}") + with open(station_file, 'r') as fh: + try: + reader = csv.DictReader(fh) + except Exception as e: + LOGGER.error("Error creating DictReader") + raise e + LOGGER.info("Iterating over rows") + for idx, row in enumerate(reader): + for codelist in codelists: + if codelist in row: + try: + row[codelist] = apply_mapping(row.get(codelist), + code_maps.get(codelist)) + except Exception as e: + LOGGER.error(f"Error processing {codelist} in row {idx}") # noqa + raise e + else: + pass + stations.append(row) + + if dryrun: + LOGGER.info( + f"dryrun == True, writing updated {station_file} to stdout") + print(','.join(map(str, stations[0].keys()))) + for station in stations: + print(','.join(map(str, station.values()))) + else: + # now write data to file + LOGGER.info( + f"Writing updated {station_file} to {station_file}.v1.0b7") + try: + with open(f"{station_file}.v1.0b7", 'w') as fh: + columns = list(stations[0].keys()) + writer = csv.DictWriter(fh, fieldnames=columns) + writer.writeheader() + for station in stations: + writer.writerow(station) + except Exception as e: + LOGGER.error("Error writing updated station file") + raise e + + # now migrate ES data + # Get elastic search connection + LOGGER.info("Updating station data in Elasticsearch index") + LOGGER.info("Connecting to API ...") + try: + es = Elasticsearch(es_api) + except Exception as e: + LOGGER.error(f"Error connecting to {es_api}") + raise e + + more_data = True # flag to keep looping until all data processed + batch_size = 100 # process in batch sizes of 100 + cursor = 0 # cursor to keep track of position + + LOGGER.info(f"Processing stations in batches of {batch_size}") + # now loop until all data processed + while more_data: + try: + res = es.search(index=es_index, + query={'match_all': {}}, + size=batch_size, + from_=cursor) + except Exception as e: + LOGGER.error(f"Error fetching data from {es_index}") + raise e + nhits = len(res['hits']['hits']) + LOGGER.info(f"Processing {nhits} stations") + cursor += batch_size + if nhits < batch_size: + more_data = False + stations = res['hits']['hits'] + LOGGER.info("Applying mappings ...") + try: + updates = apply_mapping_elastic(stations, codelists, code_maps) + except Exception as e: + LOGGER.error("Error applying mappings") + raise e + if dryrun: + LOGGER.info("dryrun == True, writing updates to stdout") + print(updates) + else: + LOGGER.info("Updating index ...") + try: + bulk(es, updates) + except Exception as e: + LOGGER.error("Error applying bulk update") + raise e + + +if __name__ == "__main__": + # Parse command-line arguments + parser = argparse.ArgumentParser() + parser.add_argument('--dryrun', + action='store_true', + help='Run in dry run mode (output to stdout)') + args = parser.parse_args() + # Execute + LOGGER.info("Running wis2box migration from v1_0b6 to v1_0b7 (update station definitions)") # noqa + migrate(dryrun=args.dryrun)