From 245004df7c9950625f65d358f5a0d01d80eca13a Mon Sep 17 00:00:00 2001 From: Maaike Date: Thu, 18 Apr 2024 18:24:59 +0200 Subject: [PATCH 01/19] wip new data-mappings approach --- wis2box-management/wis2box/data_mappings.py | 9 ++++-- wis2box-management/wis2box/dataset.py | 36 ++++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index 6f4deab54..c47822263 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -63,10 +63,13 @@ def get_data_mappings() -> dict: continue if 'data_mappings' not in record['wis2box']: continue - key = record['wis2box']['topic_hierarchy'] value = record['wis2box']['data_mappings'] - value['metadata_id'] = record['id'] - data_mappings[key] = value + if 'wmo:topicHierarchy' not in record['properties']: + LOGGER.error(f'No topic hierarchy for {record["id"]}') + continue + value['topic_hierarchy'] = record['properties']['wmo:topicHierarchy'] + metadata_id = record['id'] + data_mappings[metadata_id] = value except Exception as err: msg = f'Issue loading data mappings: {err}' LOGGER.error(msg) diff --git a/wis2box-management/wis2box/dataset.py b/wis2box-management/wis2box/dataset.py index fbf824605..90a7de7d1 100644 --- a/wis2box-management/wis2box/dataset.py +++ b/wis2box-management/wis2box/dataset.py @@ -25,17 +25,51 @@ from wis2box import cli_helpers from wis2box.data import add_collection +from wis2box.data_mappings import get_data_mappings from wis2box.metadata.discovery import publish, unpublish LOGGER = logging.getLogger(__name__) +class Dataset: + def __init__(self, path: Union[Path, str]) -> None: + self.path = str(path) + self.dotpath = None + self.dirpath = None + + self.metadata_id = None + self.topic_hierarchy = None + + # determine if path matches a metadata_id + for metadata_id, data_mappings in get_data_mappings().items(): + if metadata_id in self.path: + self.metadata_id = metadata_id + self.topic_hierarchy = data_mappings['topic_hierarchy'] + + if self.metadata_id is None: + # otherwise directory represents topic_hierarchy + if not self.path.startswith('origin/a/wis2'): + self.topic_hierarchy = f'origin/a/wis2/{self.path}' + else: + self.topic_hierarchy = self.path + for metadata_id, data_mappings in get_data_mappings().items(): + if self.topic_hierarchy == data_mappings['topic_hierarchy']: + self.metadata_id = metadata_id + + if '/' in self.path: + LOGGER.debug('Transforming from directory to dotted path') + self.dirpath = self.path + self.dotpath = self.path.replace('/', '.') + elif '.' in self.path: + LOGGER.debug('Transforming from dotted to directory path') + self.dotpath = self.path + self.dirpath = self.path.replace('.', '/') + @click.group() def dataset(): """Dataset workflow""" pass - @click.command('publish') @click.pass_context @cli_helpers.ARGUMENT_FILEPATH From a41fd7946f471eeb69a494332b32971987027f89 Mon Sep 17 00:00:00 2001 From: Maaike Date: Fri, 19 Apr 2024 17:39:31 +0200 Subject: [PATCH 02/19] WIP --- wis2box-management/wis2box/auth.py | 88 +++++++++---------- wis2box-management/wis2box/data/base.py | 28 ++---- wis2box-management/wis2box/handler.py | 11 ++- .../wis2box/metadata/discovery.py | 7 +- wis2box-management/wis2box/pubsub/message.py | 12 +-- wis2box-management/wis2box/topic_hierarchy.py | 16 ++-- 6 files changed, 72 insertions(+), 90 deletions(-) diff --git a/wis2box-management/wis2box/auth.py b/wis2box-management/wis2box/auth.py index c22edfebb..2451dd6cc 100644 --- a/wis2box-management/wis2box/auth.py +++ b/wis2box-management/wis2box/auth.py @@ -25,22 +25,21 @@ from secrets import token_hex from wis2box import cli_helpers -from wis2box.topic_hierarchy import validate_and_load from wis2box.env import AUTH_URL LOGGER = logging.getLogger(__name__) -def create_token(topic: str, token: str) -> bool: +def create_token(path: str, token: str) -> bool: """ Creates a token with access control - :param topic: `str` topic hierarchy + :param path: `str` path :param token: `str` authentication token :returns: `bool` of result """ - data = {'topic': topic, 'token': token} + data = {'path': path, 'token': token} r = requests.post(f'{AUTH_URL}/add_token', data=data) LOGGER.info(r.json().get('description')) @@ -48,16 +47,16 @@ def create_token(topic: str, token: str) -> bool: return r.ok -def delete_token(topic: str, token: str = '') -> bool: +def delete_token(path: str, token: str = '') -> bool: """ Creates a token with access control - :param topic: `str` topic hierarchy + :param path: `str` path :param token: `str` authentication token :returns: `bool` of result """ - data = {'topic': topic} + data = {'path': path} if token != '': # Delete all tokens for a given th @@ -69,37 +68,35 @@ def delete_token(topic: str, token: str = '') -> bool: return r.ok -def is_resource_open(topic: str) -> bool: +def is_resource_open(path: str) -> bool: """ - Checks if topic has access control configured + Checks if path has access control configured - :param topic: `str` topic hierarchy + :param path: `str` path :returns: `bool` of result """ - headers = {'X-Original-URI': topic} + headers = {'X-Original-URI': path} r = requests.get(f'{AUTH_URL}/authorize', headers=headers) return r.ok -def is_token_authorized(topic: str, token: str) -> bool: +def is_token_authorized(path: str, token: str) -> bool: """ - Checks if token is authorized to access a topic + Checks if token is authorized to access a path - :param topic: `str` topic hierarchy + :param path: `str` path :param token: `str` authentication token :returns: `bool` of result """ headers = { - 'X-Original-URI': topic, + 'X-Original-URI': path, 'Authorization': f'Bearer {token}', } - r = requests.get(f'{AUTH_URL}/authorize', headers=headers) - return r.ok @@ -111,11 +108,10 @@ def auth(): @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY -def is_restricted_topic(ctx, topic_hierarchy): - """Check if topic has access control""" - th, _ = validate_and_load(topic_hierarchy) - click.echo(not is_resource_open(th.dotpath)) +@cli_helpers.OPTION_DATASET +def is_restricted_dataset(ctx, metadata_id): + """Check if dataset has access control""" + click.echo(not is_resource_open(metadata_id)) @click.command() @@ -128,12 +124,11 @@ def is_restricted_path(ctx, path): @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_DATASET @click.argument('token') -def has_access_topic(ctx, topic_hierarchy, token): - """Check if a token has access to a topic""" - th, _ = validate_and_load(topic_hierarchy) - click.echo(is_token_authorized(th.dotpath, token)) +def has_access_dataset(ctx, metadata_id, token): + """Check if a token has access to a dataset""" + click.echo(is_token_authorized(metadata_id, token)) @click.command() @@ -147,20 +142,19 @@ def has_access_path(ctx, path, token): @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_DATASET @click.option('--path', '-p') @click.option('--yes', '-y', default=False, is_flag=True, help='Automatic yes') @click.argument('token', required=False) -def add_token(ctx, topic_hierarchy, path, yes, token): - """Add access token for a topic""" +def add_token(ctx, metadata_id, path, yes, token): + """Add access token for a path or dataset""" - if topic_hierarchy is not None: - th, _ = validate_and_load(topic_hierarchy) - topic = th.dotpath + if metadata_id is not None: + path = metadata_id elif path is not None: - topic = path + path = path else: - raise click.ClickException('Missing path or topic hierarchy') + raise click.ClickException('Missing path or metadata_id') token = token_hex(32) if token is None else token if yes: @@ -168,33 +162,33 @@ def add_token(ctx, topic_hierarchy, path, yes, token): elif not click.confirm(f'Continue with token: {token}', prompt_suffix='?'): return - if create_token(topic, token): + if create_token(path, token): click.echo('Token successfully created') @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_DATASET @click.option('--path', '-p') @click.argument('token', required=False, nargs=-1) -def remove_token(ctx, topic_hierarchy, path, token): - """Delete one to many tokens for a topic""" +def remove_token(ctx, metadata_id, path, token): + """Delete one to many tokens for a dataset""" + - if topic_hierarchy is not None: - th, _ = validate_and_load(topic_hierarchy) - topic = th.dotpath + if metadata_id is not None: + path = metadata_id elif path is not None: - topic = path + path = path else: - raise click.ClickException('Missing path or topic hierarchy') + raise click.ClickException('Missing path or metadata_id') - if delete_token(topic, token): + if delete_token(path, token): click.echo('Token successfully deleted') auth.add_command(add_token) auth.add_command(remove_token) -auth.add_command(has_access_topic) +auth.add_command(has_access_dataset) auth.add_command(has_access_path) -auth.add_command(is_restricted_topic) +auth.add_command(is_restricted_dataset) auth.add_command(is_restricted_path) diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index fb3961ff1..e8ed1d903 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -25,11 +25,12 @@ import re from typing import Iterator, Union -from wis2box.env import (STORAGE_INCOMING, STORAGE_PUBLIC, +from wis2box.env import (STORAGE_PUBLIC, STORAGE_SOURCE, BROKER_PUBLIC, DOCKER_BROKER) from wis2box.storage import exists, get_data, put_data -from wis2box.topic_hierarchy import TopicHierarchy +from wis2box.dataset import Dataset + from wis2box.plugin import load_plugin, PLUGINS from wis2box.pubsub.message import WISNotificationMessage @@ -52,7 +53,7 @@ def __init__(self, defs: dict) -> None: LOGGER.debug('Parsing resource mappings') self.filename = None self.incoming_filepath = None - self.topic_hierarchy = TopicHierarchy(defs['topic_hierarchy']) + self.dataset = Dataset(defs['dataset']) self.template = defs.get('template', None) self.file_filter = defs.get('pattern', '.*') self.enable_notification = defs.get('notify', False) @@ -91,10 +92,6 @@ def setup_discovery_metadata(self, discovery_metadata: dict) -> None: """ self.discovery_metadata = discovery_metadata - - self.topic_hierarchy = TopicHierarchy( - discovery_metadata['metadata']['identifier']) - self.country = discovery_metadata['wis2box']['country'] self.centre_id = discovery_metadata['wis2box']['centre_id'] @@ -146,13 +143,13 @@ def notify(self, identifier: str, storage_path: str, LOGGER.info('Publishing WISNotificationMessage to public broker') LOGGER.debug(f'Prepare message for: {storage_path}') - topic = f'origin/a/wis2/{self.topic_hierarchy.dirpath}' - data_id = topic.replace('origin/a/wis2/', '') + topic = self.dataset.topic_hierarchy + metadata_id = self.dataset.metadata_id operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - identifier, data_id, storage_path, datetime_, geometry, + identifier, metadata_id, storage_path, datetime_, geometry, wigos_station_identifier, operation) # load plugin for public broker @@ -299,17 +296,6 @@ def files(self) -> Iterator[str]: yield f'{STORAGE_PUBLIC}/{rfp}/{identifier}.{format_}' - @property - def directories(self): - """Dataset directories""" - - dirpath = self.topic_hierarchy.dirpath - - return { - 'incoming': f'{STORAGE_INCOMING}/{dirpath}', - 'public': f'{STORAGE_PUBLIC}/{dirpath}' - } - def get_public_filepath(self): """Public filepath""" diff --git a/wis2box-management/wis2box/handler.py b/wis2box-management/wis2box/handler.py index e39dc358d..53f11f845 100644 --- a/wis2box-management/wis2box/handler.py +++ b/wis2box-management/wis2box/handler.py @@ -25,7 +25,7 @@ from wis2box.api import upsert_collection_item from wis2box.storage import get_data -from wis2box.topic_hierarchy import validate_and_load +from wis2box.dataset import validate_and_load from wis2box.plugin import load_plugin from wis2box.plugin import PLUGINS @@ -37,7 +37,7 @@ class Handler: def __init__(self, filepath: str, - topic_hierarchy: str = None, + metadata_id: str = None, data_mappings: dict = None) -> None: self.filepath = filepath self.plugins = () @@ -56,8 +56,7 @@ def __init__(self, filepath: str, if self.filepath.startswith('http'): self.input_bytes = get_data(self.filepath) - if topic_hierarchy is not None: - th = topic_hierarchy + if metadata_id is not None: fuzzy = False else: th = self.filepath @@ -68,10 +67,10 @@ def __init__(self, filepath: str, raise NotHandledError(msg) try: - self.topic_hierarchy, self.plugins = validate_and_load( + self.metadata_id, self.plugins = validate_and_load( th, data_mappings, self.filetype, fuzzy=fuzzy) except Exception as err: - msg = f'Topic Hierarchy validation error: {err}' + msg = f'Path validation error: {err}' # errors in public storage are not handled if STORAGE_PUBLIC in self.filepath: raise NotHandledError(msg) diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 3cceae2d2..43ada8dc6 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -178,8 +178,11 @@ def publish_broker_message(record: dict, storage_path: str, topic = f'origin/a/wis2/{centre_id.lower()}/metadata' # noqa datetime_ = datetime.strptime(record['properties']['created'], '%Y-%m-%dT%H:%M:%SZ') # noqa - wis_message = WISNotificationMessage(record['id'], topic, storage_path, - datetime_, record['geometry']).dumps() + wis_message = WISNotificationMessage(identifier=record['id'], + metadata_id=None, + filepath=storage_path, + datetime_=datetime_, + geometry=record['geometry']).dumps() # load plugin for plugin-broker defs = { diff --git a/wis2box-management/wis2box/pubsub/message.py b/wis2box-management/wis2box/pubsub/message.py index 9e51aff7b..e8788ab31 100644 --- a/wis2box-management/wis2box/pubsub/message.py +++ b/wis2box-management/wis2box/pubsub/message.py @@ -53,7 +53,7 @@ class PubSubMessage: Generic message class """ - def __init__(self, type_: str, identifier: str, topic: str, filepath: str, + def __init__(self, type_: str, identifier: str, metadata_id: str, filepath: str, datetime_: datetime, geometry: dict = None, wigos_station_identifier: str = None) -> None: """ @@ -61,7 +61,7 @@ def __init__(self, type_: str, identifier: str, topic: str, filepath: str, :param type_: message type :param identifier: identifier - :param topic: topic + :param metadata_id: metadata_id :param filepath: `Path` of file :param datetime_: `datetime` object of temporal aspect of data :param geometry: `dict` of GeoJSON geometry object @@ -128,16 +128,16 @@ def _generate_checksum(self, bytes, algorithm: SecureHashAlgorithms) -> str: # class WISNotificationMessage(PubSubMessage): - def __init__(self, identifier: str, topic: str, filepath: str, + def __init__(self, identifier: str, metadata_id: str, filepath: str, datetime_: str, geometry=None, wigos_station_identifier=None, operation: str = 'create') -> None: super().__init__('wis2-notification-message', identifier, - topic, filepath, datetime_, geometry) + metadata_id, filepath, datetime_, geometry) - data_id = f'{topic}/{self.identifier}'.replace('origin/a/wis2/', '') + data_id = f'{metadata_id}/{self.identifier}'.replace('origin/a/wis2/', '') - if '/metadata' in topic: + if '/metadata' in metadata_id: mimetype = 'application/geo+json' else: suffix = self.filepath.split('.')[-1] diff --git a/wis2box-management/wis2box/topic_hierarchy.py b/wis2box-management/wis2box/topic_hierarchy.py index 9501bad3b..58cbb29bd 100644 --- a/wis2box-management/wis2box/topic_hierarchy.py +++ b/wis2box-management/wis2box/topic_hierarchy.py @@ -67,36 +67,36 @@ def is_valid(self) -> bool: return True -def validate_and_load(topic_hierarchy: str, +def validate_and_load(path: str, data_mappings: dict = None, file_type: str = None, fuzzy: bool = False ) -> Tuple[TopicHierarchy, Tuple[Any]]: """ - Validate topic hierarchy and load plugins + Validate path and load plugins - :param topic_hierarchy: `str` of topic hierarchy path + :param path: `str` of path :param data_mappings: `dict` of data mappings :param file_type: `str` the type of file we are processing, e.g. csv, bufr, xml # noqa - :param fuzzy: `bool` of whether to do fuzzy matching of topic hierarchy + :param fuzzy: `bool` of whether to do fuzzy matching of path (e.g. "*foo.bar.baz*). Defaults to `False` (i.e. "foo.bar.baz") - :returns: tuple of `wis2box.topic_hierarchy.TopicHierarchy` and + :returns: tuple of `wis2box.dataset.Dataset` and list of plugins objects """ - LOGGER.debug(f'Validating topic hierarchy: {topic_hierarchy}') + LOGGER.debug(f'Validating path: {path}') LOGGER.debug(f'Data mappings {data_mappings}') if not data_mappings: msg = 'Data mappings are empty. Fetching' data_mappings = get_data_mappings() - th = TopicHierarchy(topic_hierarchy) + dataset = Dataset(path) found = False - if not th.is_valid(): + if not dataset.is_valid(): msg = 'Invalid topic hierarchy' raise ValueError(msg) From 68ea096766271df41704a2f0283935c037b925b7 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 20 May 2024 22:19:30 +0200 Subject: [PATCH 03/19] remove topic_hierarchy class, move validate_and_load to data_mappings --- .github/workflows/tests-docker.yml | 20 +-- wis2box-management/wis2box/auth.py | 9 +- wis2box-management/wis2box/cli_helpers.py | 3 + wis2box-management/wis2box/data/__init__.py | 24 ++- wis2box-management/wis2box/data/base.py | 8 +- .../wis2box/data/bufr2geojson.py | 2 +- wis2box-management/wis2box/data/bufr4.py | 4 +- wis2box-management/wis2box/data/csv2bufr.py | 4 +- wis2box-management/wis2box/data/geojson.py | 4 +- wis2box-management/wis2box/data/message.py | 2 +- wis2box-management/wis2box/data/synop2bufr.py | 4 +- wis2box-management/wis2box/data/universal.py | 2 +- wis2box-management/wis2box/data_mappings.py | 75 ++++++++- wis2box-management/wis2box/dataset.py | 36 +--- wis2box-management/wis2box/handler.py | 15 +- wis2box-management/wis2box/pubsub/message.py | 12 +- .../wis2box/pubsub/subscribe.py | 2 +- wis2box-management/wis2box/topic_hierarchy.py | 156 ------------------ 18 files changed, 134 insertions(+), 248 deletions(-) delete mode 100644 wis2box-management/wis2box/topic_hierarchy.py diff --git a/.github/workflows/tests-docker.yml b/.github/workflows/tests-docker.yml index b11f762d9..621dc6db0 100644 --- a/.github/workflows/tests-docker.yml +++ b/.github/workflows/tests-docker.yml @@ -61,8 +61,8 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA_UPDATE + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA_UPDATE - name: add Italy synop data (bufr2bufr) ๐Ÿ‡ฎ๐Ÿ‡น env: TOPIC_HIERARCHY: it-roma_met_centre.data.core.weather.surface-based-observations.synop @@ -76,7 +76,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add Algeria synop data (bufr2bufr) ๐Ÿ‡ฉ๐Ÿ‡ฟ env: TOPIC_HIERARCHY: dz-alger_met_centre.data.core.weather.surface-based-observations.synop @@ -90,7 +90,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add Romania synop data (synop2bufr and csv2bufr aws-template) ๐Ÿ‡ท๐Ÿ‡ด env: TOPIC_HIERARCHY: ro-rnimh.data.core.weather.surface-based-observations.synop @@ -104,7 +104,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add Congo synop data (synop2bufr) ๐Ÿ‡จ๐Ÿ‡ฉ env: TOPIC_HIERARCHY: cd-brazza_met_centre.data.core.weather.surface-based-observations.synop @@ -118,7 +118,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add example ship data (bufr2bufr) WMO env: TOPIC_HIERARCHY: int-wmo-test.data.core.weather.surface-based-observations.ship @@ -135,7 +135,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --wsi 0-22000-0-EUCDE34 $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add example buoy data (bufr2bufr) WMO env: TOPIC_HIERARCHY: int-wmo-test.data.core.weather.surface-based-observations.buoy @@ -148,7 +148,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --wsi 0-22000-0-1400011 $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add example wind profiler data (bufr2bufr) WMO env: TOPIC_HIERARCHY: int-wmo-test.data.core.weather.surface-based-observations.wind_profiler @@ -161,7 +161,7 @@ jobs: python wis2box-ctl.py execute wis2box metadata station add-topic --wsi 0-702-0-48698 $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add China GRIB2 data (universal pipeline) ๐Ÿ‡จ๐Ÿ‡ณ env: TOPIC_HIERARCHY: cn-cma.data.core.weather.prediction.forecast.medium-range.probabilistic.global @@ -172,7 +172,7 @@ jobs: python3 wis2box-ctl.py execute wis2box dataset publish $DISCOVERY_METADATA curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: sleep 30 seconds then run integration tests โš™๏ธ run: | sleep 30 diff --git a/wis2box-management/wis2box/auth.py b/wis2box-management/wis2box/auth.py index 2451dd6cc..207eae062 100644 --- a/wis2box-management/wis2box/auth.py +++ b/wis2box-management/wis2box/auth.py @@ -108,7 +108,7 @@ def auth(): @click.command() @click.pass_context -@cli_helpers.OPTION_DATASET +@cli_helpers.OPTION_METADATA_ID def is_restricted_dataset(ctx, metadata_id): """Check if dataset has access control""" click.echo(not is_resource_open(metadata_id)) @@ -124,7 +124,7 @@ def is_restricted_path(ctx, path): @click.command() @click.pass_context -@cli_helpers.OPTION_DATASET +@cli_helpers.OPTION_METADATA_ID @click.argument('token') def has_access_dataset(ctx, metadata_id, token): """Check if a token has access to a dataset""" @@ -142,7 +142,7 @@ def has_access_path(ctx, path, token): @click.command() @click.pass_context -@cli_helpers.OPTION_DATASET +@cli_helpers.OPTION_METADATA_ID @click.option('--path', '-p') @click.option('--yes', '-y', default=False, is_flag=True, help='Automatic yes') @click.argument('token', required=False) @@ -168,13 +168,12 @@ def add_token(ctx, metadata_id, path, yes, token): @click.command() @click.pass_context -@cli_helpers.OPTION_DATASET +@cli_helpers.OPTION_METADATA_ID @click.option('--path', '-p') @click.argument('token', required=False, nargs=-1) def remove_token(ctx, metadata_id, path, token): """Delete one to many tokens for a dataset""" - if metadata_id is not None: path = metadata_id elif path is not None: diff --git a/wis2box-management/wis2box/cli_helpers.py b/wis2box-management/wis2box/cli_helpers.py index 627fa1397..a6962e8dc 100644 --- a/wis2box-management/wis2box/cli_helpers.py +++ b/wis2box-management/wis2box/cli_helpers.py @@ -37,6 +37,9 @@ OPTION_TOPIC_HIERARCHY = click.option('--topic-hierarchy', '-th', help='Topic hierarchy') +OPTION_METADATA_ID = click.option('--metadata-id', '-mdi', + help='Metadata ID') + OPTION_RECURSIVE = click.option('--recursive', '-r', default=False, is_flag=True, help='Process directory recursively') diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index e2d70ce5b..b745014d4 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -37,7 +37,6 @@ from wis2box.storage import put_data, move_data, list_content, delete_data from wis2box.util import older_than, walk_path - LOGGER = logging.getLogger(__name__) @@ -194,20 +193,31 @@ def clean(ctx, days, verbosity): @click.command() @click.pass_context @cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_METADATA_ID @cli_helpers.OPTION_PATH @cli_helpers.OPTION_RECURSIVE @cli_helpers.OPTION_VERBOSITY -def ingest(ctx, topic_hierarchy, path, recursive, verbosity): +def ingest(ctx, topic_hierarchy, metadata_id, path, recursive, verbosity): """Ingest data file or directory""" - data_mappings = get_data_mappings() + # either topic_hierarchy or metadata_id must be provided + if topic_hierarchy and metadata_id: + raise click.ClickException('Only one of topic_hierarchy or metadata_id can be provided') # noqa + + if not topic_hierarchy and not metadata_id: + raise click.ClickException('Please specify a metadata_id using the option --metadata-id') # noqa + + rfp = None + if metadata_id: + data_mappings = get_data_mappings() + if metadata_id not in data_mappings: + raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') # noqa + rfp = metadata_id + else: + rfp = topic_hierarchy.replace('.', '/') for file_to_process in walk_path(path, '.*', recursive): click.echo(f'Processing {file_to_process}') - handler = Handler(filepath=file_to_process, - topic_hierarchy=topic_hierarchy, - data_mappings=data_mappings) - rfp = handler.topic_hierarchy.dirpath path = f'{STORAGE_INCOMING}/{rfp}/{file_to_process.name}' with file_to_process.open('rb') as fh: diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index e8ed1d903..6026f5d3f 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -29,7 +29,6 @@ STORAGE_SOURCE, BROKER_PUBLIC, DOCKER_BROKER) from wis2box.storage import exists, get_data, put_data -from wis2box.dataset import Dataset from wis2box.plugin import load_plugin, PLUGINS @@ -53,7 +52,8 @@ def __init__(self, defs: dict) -> None: LOGGER.debug('Parsing resource mappings') self.filename = None self.incoming_filepath = None - self.dataset = Dataset(defs['dataset']) + self.metadata_id = defs.get('metadata_id', None) + self.topic_hierarchy = defs.get('topic_hierarchy', None) self.template = defs.get('template', None) self.file_filter = defs.get('pattern', '.*') self.enable_notification = defs.get('notify', False) @@ -143,8 +143,8 @@ def notify(self, identifier: str, storage_path: str, LOGGER.info('Publishing WISNotificationMessage to public broker') LOGGER.debug(f'Prepare message for: {storage_path}') - topic = self.dataset.topic_hierarchy - metadata_id = self.dataset.metadata_id + topic = self.topic_hierarchy + metadata_id = self.metadata_id operation = 'create' if is_update is False else 'update' diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index a9b81141d..082f679cc 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -84,4 +84,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.topic_hierarchy diff --git a/wis2box-management/wis2box/data/bufr4.py b/wis2box-management/wis2box/data/bufr4.py index d15203703..a759ef6aa 100644 --- a/wis2box-management/wis2box/data/bufr4.py +++ b/wis2box-management/wis2box/data/bufr4.py @@ -53,7 +53,7 @@ def transform(self, input_data: Union[Path, bytes], payload = { 'inputs': { - 'channel': self.topic_hierarchy.dirpath, + 'channel': self.topic_hierarchy.replace('origin/a/wis2/', ''), 'notify': False, 'data': data } @@ -100,4 +100,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.topic_hierarchy diff --git a/wis2box-management/wis2box/data/csv2bufr.py b/wis2box-management/wis2box/data/csv2bufr.py index 1823f3cda..f8d32344e 100644 --- a/wis2box-management/wis2box/data/csv2bufr.py +++ b/wis2box-management/wis2box/data/csv2bufr.py @@ -65,7 +65,7 @@ def transform(self, input_data: Union[Path, bytes], payload = { 'inputs': { - 'channel': self.topic_hierarchy.dirpath, + 'channel': self.topic_hierarchy.replace('origin/a/wis2/', ''), 'template': self.template, 'notify': False, 'data': data @@ -113,4 +113,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath) + return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy) diff --git a/wis2box-management/wis2box/data/geojson.py b/wis2box-management/wis2box/data/geojson.py index 5353df838..3f3740a9d 100644 --- a/wis2box-management/wis2box/data/geojson.py +++ b/wis2box-management/wis2box/data/geojson.py @@ -65,10 +65,10 @@ def publish(self) -> bool: continue LOGGER.debug('Publishing data to API') - upsert_collection_item(self.topic_hierarchy.dotpath, the_data) + upsert_collection_item(self.metadata_id, the_data) return True def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.topic_hierarchy diff --git a/wis2box-management/wis2box/data/message.py b/wis2box-management/wis2box/data/message.py index ce75c8376..da6909d21 100644 --- a/wis2box-management/wis2box/data/message.py +++ b/wis2box-management/wis2box/data/message.py @@ -76,4 +76,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.topic_hierarchy diff --git a/wis2box-management/wis2box/data/synop2bufr.py b/wis2box-management/wis2box/data/synop2bufr.py index 5b9d317be..2347788ca 100644 --- a/wis2box-management/wis2box/data/synop2bufr.py +++ b/wis2box-management/wis2box/data/synop2bufr.py @@ -75,7 +75,7 @@ def transform(self, input_data: Union[Path, bytes], # post data do wis2box-api/oapi/processes/synop2bufr payload = { 'inputs': { - 'channel': self.topic_hierarchy.dirpath, + 'channel': self.topic_hierarchy.replace('origin/a/wis2/', ''), 'year': year, 'month': month, 'notify': False, @@ -124,4 +124,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath) + return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy) diff --git a/wis2box-management/wis2box/data/universal.py b/wis2box-management/wis2box/data/universal.py index b985b59a8..a3b17344b 100644 --- a/wis2box-management/wis2box/data/universal.py +++ b/wis2box-management/wis2box/data/universal.py @@ -80,4 +80,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.topic_hierarchy diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index c47822263..68c3949f7 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -21,6 +21,8 @@ import logging +from typing import Any, Tuple + from owslib.ogcapi.records import Records from wis2box.env import (DOCKER_BROKER, DOCKER_API_URL) @@ -67,7 +69,7 @@ def get_data_mappings() -> dict: if 'wmo:topicHierarchy' not in record['properties']: LOGGER.error(f'No topic hierarchy for {record["id"]}') continue - value['topic_hierarchy'] = record['properties']['wmo:topicHierarchy'] + value['topic_hierarchy'] = record['properties']['wmo:topicHierarchy'] # noqa metadata_id = record['id'] data_mappings[metadata_id] = value except Exception as err: @@ -76,3 +78,74 @@ def get_data_mappings() -> dict: raise EnvironmentError(msg) return data_mappings + + +def validate_and_load(path: str, + data_mappings: dict = None, + file_type: str = None + ) -> Tuple[str, Tuple[Any]]: + """ + Validate path and load plugins + + :param path: `str` of path + :param data_mappings: `dict` of data mappings + :param file_type: `str` the type of file we are processing, e.g. csv, bufr, xml # noqa + :param fuzzy: `bool` of whether to do fuzzy matching of path + (e.g. "*foo.bar.baz*). + Defaults to `False` (i.e. "foo.bar.baz") + + :returns: tuple of metadata_id and list of plugins objects + """ + + LOGGER.debug(f'Validating path: {path}') + LOGGER.debug(f'Data mappings {data_mappings}') + + metadata_id = None + topic_hierarchy = None + # determine if path matches a metadata_id + for key in data_mappings.keys(): + if key in path: + metadata_id = key + topic_hierarchy = data_mappings[key]['topic_hierarchy'] + # else try to match topic_hierarchy + if metadata_id is None: + for key, data_mappings in data_mappings.items(): + if (data_mappings['topic_hierarchy']).replace('origin/a/wis2/', '') in path: # noqa + metadata_id = key + topic_hierarchy = data_mappings['topic_hierarchy'] + if metadata_id is None: + msg = f'Could not match {path} to dataset. Did not match any of the following: ' # noqa + options = [v["topic_hierarchy"] for v in data_mappings.values()] + options += [k for k in data_mappings.keys()] + msg += ', '.join(options) + raise ValueError(msg) + + plugins = data_mappings[metadata_id]['plugins'] + + if file_type is None: + LOGGER.warning('File type missing') + file_type = next(iter(plugins)) + LOGGER.debug(f'File type set to first type: {file_type}') + + if file_type not in plugins: + msg = f'Unknown file type ({file_type}) for metadata_id={metadata_id}. Did not match any of the following:' # noqa + msg += ', '.join(plugins) + raise ValueError(msg) + + LOGGER.debug(f'Adding plugin definition for {file_type}') + + def data_defs(plugin): + return { + 'metadata_id': metadata_id, + 'topic_hierarchy': topic_hierarchy, + 'codepath': plugin['plugin'], + 'pattern': plugin['file-pattern'], + 'template': plugin.get('template'), + 'buckets': plugin.get('buckets', ()), + 'notify': plugin.get('notify', False), + 'format': file_type + } + + plugins_ = [load_plugin('data', data_defs(p), data_mappings) + for p in plugins[file_type]] + return metadata_id, plugins_ diff --git a/wis2box-management/wis2box/dataset.py b/wis2box-management/wis2box/dataset.py index 90a7de7d1..fbf824605 100644 --- a/wis2box-management/wis2box/dataset.py +++ b/wis2box-management/wis2box/dataset.py @@ -25,51 +25,17 @@ from wis2box import cli_helpers from wis2box.data import add_collection -from wis2box.data_mappings import get_data_mappings from wis2box.metadata.discovery import publish, unpublish LOGGER = logging.getLogger(__name__) -class Dataset: - def __init__(self, path: Union[Path, str]) -> None: - self.path = str(path) - self.dotpath = None - self.dirpath = None - - self.metadata_id = None - self.topic_hierarchy = None - - # determine if path matches a metadata_id - for metadata_id, data_mappings in get_data_mappings().items(): - if metadata_id in self.path: - self.metadata_id = metadata_id - self.topic_hierarchy = data_mappings['topic_hierarchy'] - - if self.metadata_id is None: - # otherwise directory represents topic_hierarchy - if not self.path.startswith('origin/a/wis2'): - self.topic_hierarchy = f'origin/a/wis2/{self.path}' - else: - self.topic_hierarchy = self.path - for metadata_id, data_mappings in get_data_mappings().items(): - if self.topic_hierarchy == data_mappings['topic_hierarchy']: - self.metadata_id = metadata_id - - if '/' in self.path: - LOGGER.debug('Transforming from directory to dotted path') - self.dirpath = self.path - self.dotpath = self.path.replace('/', '.') - elif '.' in self.path: - LOGGER.debug('Transforming from dotted to directory path') - self.dotpath = self.path - self.dirpath = self.path.replace('.', '/') - @click.group() def dataset(): """Dataset workflow""" pass + @click.command('publish') @click.pass_context @cli_helpers.ARGUMENT_FILEPATH diff --git a/wis2box-management/wis2box/handler.py b/wis2box-management/wis2box/handler.py index 53f11f845..1230b3afa 100644 --- a/wis2box-management/wis2box/handler.py +++ b/wis2box-management/wis2box/handler.py @@ -25,7 +25,7 @@ from wis2box.api import upsert_collection_item from wis2box.storage import get_data -from wis2box.dataset import validate_and_load +from wis2box.data_mappings import validate_and_load from wis2box.plugin import load_plugin from wis2box.plugin import PLUGINS @@ -56,19 +56,12 @@ def __init__(self, filepath: str, if self.filepath.startswith('http'): self.input_bytes = get_data(self.filepath) - if metadata_id is not None: - fuzzy = False - else: - th = self.filepath - fuzzy = True - - if '/metadata/' in th: + if '/metadata/' in self.filepath: msg = 'Passing on handling metadata in workflow' raise NotHandledError(msg) - try: self.metadata_id, self.plugins = validate_and_load( - th, data_mappings, self.filetype, fuzzy=fuzzy) + self.filepath, data_mappings, self.filetype) except Exception as err: msg = f'Path validation error: {err}' # errors in public storage are not handled @@ -131,7 +124,7 @@ def handle(self) -> bool: return True def publish(self) -> bool: - index_name = self.topic_hierarchy.dotpath + index_name = self.metadata_id if self.input_bytes: geojson = json.load(self.input_bytes) upsert_collection_item(index_name, geojson) diff --git a/wis2box-management/wis2box/pubsub/message.py b/wis2box-management/wis2box/pubsub/message.py index e8788ab31..cd4bc6f6e 100644 --- a/wis2box-management/wis2box/pubsub/message.py +++ b/wis2box-management/wis2box/pubsub/message.py @@ -53,19 +53,16 @@ class PubSubMessage: Generic message class """ - def __init__(self, type_: str, identifier: str, metadata_id: str, filepath: str, - datetime_: datetime, geometry: dict = None, - wigos_station_identifier: str = None) -> None: + def __init__(self, type_: str, identifier: str, filepath: str, + datetime_: datetime, geometry: dict = None) -> None: """ Initializer :param type_: message type :param identifier: identifier - :param metadata_id: metadata_id :param filepath: `Path` of file :param datetime_: `datetime` object of temporal aspect of data :param geometry: `dict` of GeoJSON geometry object - :param wigos_station_identifier: WSI associated with the data :returns: `wis2box.pubsub.message.PubSubMessage` message object """ @@ -135,9 +132,9 @@ def __init__(self, identifier: str, metadata_id: str, filepath: str, super().__init__('wis2-notification-message', identifier, metadata_id, filepath, datetime_, geometry) - data_id = f'{metadata_id}/{self.identifier}'.replace('origin/a/wis2/', '') + data_id = f'{metadata_id}/{self.identifier}' - if '/metadata' in metadata_id: + if '/metadata' in filepath: mimetype = 'application/geo+json' else: suffix = self.filepath.split('.')[-1] @@ -175,6 +172,7 @@ def __init__(self, identifier: str, metadata_id: str, filepath: str, 'geometry': self.geometry, 'properties': { 'data_id': data_id, + 'metadata_id': metadata_id, 'datetime': self.datetime, 'pubtime': self.publish_datetime, 'integrity': { diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index 74e4b656f..d296c48b5 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -70,7 +70,7 @@ def handle(self, filepath): LOGGER.debug(msg) except ValueError as err: msg = f'handle() error: {err}' - LOGGER.error(msg) + LOGGER.error(msg, exc_info=1) except Exception as err: msg = f'handle() error: {err}' raise err diff --git a/wis2box-management/wis2box/topic_hierarchy.py b/wis2box-management/wis2box/topic_hierarchy.py deleted file mode 100644 index 58cbb29bd..000000000 --- a/wis2box-management/wis2box/topic_hierarchy.py +++ /dev/null @@ -1,156 +0,0 @@ -############################################################################### -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -############################################################################### - -from fnmatch import fnmatch -import logging -from pathlib import Path -from typing import Any, Tuple, Union - -# from pywis_topics.topics import TopicHierarchy as pywis_topics_th - -from wis2box.data_mappings import get_data_mappings -from wis2box.plugin import load_plugin - -LOGGER = logging.getLogger(__name__) - - -class TopicHierarchy: - def __init__(self, path: Union[Path, str]) -> None: - self.path = str(path) - self.dotpath = None - self.dirpath = None - - if not self.path.startswith('origin/a/wis2'): - self.fullpath = f'origin/a/wis2/{self.dirpath}' - else: - self.fullpath = self.dirpath - - if '/' in self.path: - LOGGER.debug('Transforming from directory to dotted path') - self.dirpath = self.path - self.dotpath = self.path.replace('/', '.') - elif '.' in self.path: - LOGGER.debug('Transforming from dotted to directory path') - self.dotpath = self.path - self.dirpath = self.path.replace('.', '/') - - def is_valid(self) -> bool: - """ - Determines whether a topic hierarchy is valid - - :returns: `bool` of whether the topic hierarchy is valid - """ - - # TODO: uncomment once WTH is approved - # LOGGER.debug(f'Validating topic {self.dirpath} (fuzzy match)') - # th = pywis_topics_th() - # return th.validate(self.fullpath) - - return True - - -def validate_and_load(path: str, - data_mappings: dict = None, - file_type: str = None, - fuzzy: bool = False - ) -> Tuple[TopicHierarchy, Tuple[Any]]: - """ - Validate path and load plugins - - :param path: `str` of path - :param data_mappings: `dict` of data mappings - :param file_type: `str` the type of file we are processing, e.g. csv, bufr, xml # noqa - :param fuzzy: `bool` of whether to do fuzzy matching of path - (e.g. "*foo.bar.baz*). - Defaults to `False` (i.e. "foo.bar.baz") - - :returns: tuple of `wis2box.dataset.Dataset` and - list of plugins objects - """ - - LOGGER.debug(f'Validating path: {path}') - LOGGER.debug(f'Data mappings {data_mappings}') - - if not data_mappings: - msg = 'Data mappings are empty. Fetching' - data_mappings = get_data_mappings() - - dataset = Dataset(path) - found = False - - if not dataset.is_valid(): - msg = 'Invalid topic hierarchy' - raise ValueError(msg) - - if fuzzy: - LOGGER.debug('Searching data mappings for fuzzy topic match') - for topic, defs in data_mappings.items(): - - pattern = f'*{topic}*' - LOGGER.debug(f'Attempting to fuzzy match with {pattern}') - if fnmatch(th.dotpath, pattern): - - LOGGER.debug(f'Matched topic {th.path} on {th.dotpath}') - found = True - plugins = defs['plugins'] - - LOGGER.debug(f'Reloading topic to {topic}') - th = TopicHierarchy(topic) - - else: - LOGGER.debug('Searching data mappings for exact topic match') - if th.dotpath in data_mappings: - - LOGGER.debug(f'Matched topic {th.path} on {th.dotpath}') - found = True - plugins = data_mappings[th.dotpath]['plugins'] - - if not found: - msg = f'No plugins for {th.path} in data mappings. Did not match any of the following: ' # noqa - msg += ', '.join(data_mappings.keys()) - raise ValueError(msg) - - if file_type is None: - LOGGER.warning('File type missing') - file_type = next(iter(plugins)) - LOGGER.debug(f'File type set to first type: {file_type}') - - if file_type not in plugins: - msg = f'Unknown file type ({file_type}) for topic {th.dotpath}. Did not match any of the following:' # noqa - msg += ', '.join(plugins) - raise ValueError(msg) - - LOGGER.debug(f'Adding plugin definition for {file_type}') - - def data_defs(plugin): - return { - 'topic_hierarchy': th.dotpath, - 'codepath': plugin['plugin'], - 'pattern': plugin['file-pattern'], - 'template': plugin.get('template'), - 'buckets': plugin.get('buckets', ()), - 'notify': plugin.get('notify', False), - 'format': file_type - } - - plugins_ = [load_plugin('data', data_defs(p), data_mappings) - for p in plugins[file_type]] - return th, plugins_ From e82068b243bde2dccc8c97a2c21a55dcd524209b Mon Sep 17 00:00:00 2001 From: Maaike Date: Wed, 22 May 2024 21:01:11 +0200 Subject: [PATCH 04/19] fix message, add incoming_filepath to failure_message --- wis2box-management/wis2box/data_mappings.py | 1 + wis2box-management/wis2box/pubsub/message.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index 68c3949f7..e7c044976 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -137,6 +137,7 @@ def validate_and_load(path: str, def data_defs(plugin): return { 'metadata_id': metadata_id, + 'รญncoming_filepath': path, 'topic_hierarchy': topic_hierarchy, 'codepath': plugin['plugin'], 'pattern': plugin['file-pattern'], diff --git a/wis2box-management/wis2box/pubsub/message.py b/wis2box-management/wis2box/pubsub/message.py index cd4bc6f6e..e86e9252e 100644 --- a/wis2box-management/wis2box/pubsub/message.py +++ b/wis2box-management/wis2box/pubsub/message.py @@ -130,7 +130,7 @@ def __init__(self, identifier: str, metadata_id: str, filepath: str, operation: str = 'create') -> None: super().__init__('wis2-notification-message', identifier, - metadata_id, filepath, datetime_, geometry) + filepath, datetime_, geometry) data_id = f'{metadata_id}/{self.identifier}' From 49c4e5fdf872f440ef81ac40eb66493ca9a53a78 Mon Sep 17 00:00:00 2001 From: Maaike Date: Wed, 22 May 2024 21:01:49 +0200 Subject: [PATCH 05/19] add incoming filepath and identifier to failure message --- wis2box-management/wis2box/data/base.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index 6026f5d3f..959726583 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -51,7 +51,7 @@ def __init__(self, defs: dict) -> None: LOGGER.debug('Parsing resource mappings') self.filename = None - self.incoming_filepath = None + self.incoming_filepath = defs.get('incoming_filepath', None) self.metadata_id = defs.get('metadata_id', None) self.topic_hierarchy = defs.get('topic_hierarchy', None) self.template = defs.get('template', None) @@ -63,9 +63,10 @@ def __init__(self, defs: dict) -> None: # if discovery_metadata: # self.setup_discovery_metadata(discovery_metadata) - def publish_failure_message(self, description, wsi=None): + def publish_failure_message(self, description, wsi=None, identifier=None): message = { - 'filepath': self.incoming_filepath, + 'incoming_filepath': self.incoming_filepath, + 'identifier': identifier, 'description': description } if wsi is not None: @@ -246,12 +247,10 @@ def publish_item(self, identifier, item) -> bool: if self.enable_notification and is_new: LOGGER.debug('Sending notification to broker') - try: datetime_ = item['_meta']['properties']['datetime'] except KeyError: datetime_ = item['_meta'].get('data_date') - self.notify(identifier, storage_path, datetime_, item['_meta'].get('geometry'), wsi, is_update) @@ -259,9 +258,10 @@ def publish_item(self, identifier, item) -> bool: LOGGER.debug('No notification sent') except Exception as err: msg = f'Failed to publish item {identifier}: {err}' - LOGGER.error(msg, exc_info=True) + LOGGER.error(msg) self.publish_failure_message( description='Failed to publish item', + identifier=identifier, wsi=wsi) return True From 5aabacc1161c0acf9ca6cd81557004c03f18b0e2 Mon Sep 17 00:00:00 2001 From: Maaike Date: Thu, 23 May 2024 09:56:39 +0200 Subject: [PATCH 06/19] test with re-ordered commands in entrypoint --- wis2box-management/docker/entrypoint.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/wis2box-management/docker/entrypoint.sh b/wis2box-management/docker/entrypoint.sh index c357cc9a2..c403a6be2 100755 --- a/wis2box-management/docker/entrypoint.sh +++ b/wis2box-management/docker/entrypoint.sh @@ -29,18 +29,18 @@ set -e #ensure environment-variables are available for cronjob printenv | grep -v "no_proxy" >> /etc/environment -# ensure cron is running -service cron start -service cron status - # wis2box commands # TODO: avoid re-creating environment if it already exists # TODO: catch errors and avoid bounce in conjuction with restart: always +wis2box metadata discovery setup +wis2box metadata station setup wis2box environment create wis2box environment show wis2box api setup -wis2box metadata discovery setup -wis2box metadata station setup + +# ensure cron is running +service cron start +service cron status echo "Caching topic hierarchy CSVs" pywis-topics bundle sync From 87f2977b22099d0274f77159fb8206f11a30f531 Mon Sep 17 00:00:00 2001 From: Maaike Date: Thu, 23 May 2024 10:38:46 +0200 Subject: [PATCH 07/19] use id as resource_id for backend --- wis2box-management/wis2box/api/config/pygeoapi.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/wis2box-management/wis2box/api/config/pygeoapi.py b/wis2box-management/wis2box/api/config/pygeoapi.py index d097cbdbd..e0a2ccc8f 100644 --- a/wis2box-management/wis2box/api/config/pygeoapi.py +++ b/wis2box-management/wis2box/api/config/pygeoapi.py @@ -142,8 +142,6 @@ def prepare_collection(self, meta: dict) -> bool: if meta['id'] in ['discovery-metadata', 'messages', 'stations']: resource_id = meta['id'] - else: - resource_id = meta['topic_hierarchy'] if meta['id'] in ['discovery-metadata', 'stations']: editable = True From c60b2aa3c7d0e65705517715486c4a523cf709d5 Mon Sep 17 00:00:00 2001 From: Maaike Date: Thu, 23 May 2024 14:22:54 +0200 Subject: [PATCH 08/19] remove dotpath in test-files, update data_id, don't use topic_hierarchy for data_name --- .../metadata/discovery/cd-surface-weather-observations.yml | 2 +- tests/data/metadata/discovery/cn-grapes-geps-global.yml | 2 +- .../metadata/discovery/dz-surface-weather-observations.yml | 2 +- tests/data/metadata/discovery/int-wmo-test-buoy.yml | 2 +- tests/data/metadata/discovery/int-wmo-test-ship.yml | 2 +- .../data/metadata/discovery/int-wmo-test-wind_profiler.yml | 2 +- .../metadata/discovery/it-surface-weather-observations.yml | 2 +- .../metadata/discovery/mw-surface-weather-observations.yml | 2 +- .../metadata/discovery/ro-synoptic-weather-observations.yml | 2 +- wis2box-management/wis2box/api/__init__.py | 5 +---- wis2box-management/wis2box/data/base.py | 2 +- wis2box-management/wis2box/metadata/discovery.py | 2 +- wis2box-management/wis2box/pubsub/message.py | 6 ++++-- 13 files changed, 16 insertions(+), 17 deletions(-) diff --git a/tests/data/metadata/discovery/cd-surface-weather-observations.yml b/tests/data/metadata/discovery/cd-surface-weather-observations.yml index 0a0a9d8c4..76dcda288 100644 --- a/tests/data/metadata/discovery/cd-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/cd-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P180D - topic_hierarchy: cd-brazza_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: cd-brazza_met_centre/data/core/weather/surface-based-observations/synop country: cog centre_id: cd-brazza_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/cn-grapes-geps-global.yml b/tests/data/metadata/discovery/cn-grapes-geps-global.yml index 180854e4e..035270e82 100644 --- a/tests/data/metadata/discovery/cn-grapes-geps-global.yml +++ b/tests/data/metadata/discovery/cn-grapes-geps-global.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: cn-cma.data.core.weather.prediction.forecast.medium-range.probabilistic.global + topic_hierarchy: cn-cma/data/core/weather/prediction/forecast/medium-range/probabilistic/global country: chn centre_id: cn-cma data_mappings: diff --git a/tests/data/metadata/discovery/dz-surface-weather-observations.yml b/tests/data/metadata/discovery/dz-surface-weather-observations.yml index 1d0b2a407..a850c49c7 100644 --- a/tests/data/metadata/discovery/dz-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/dz-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: dz-alger_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: dz-alger_met_centre/data/core/weather/surface-based-observations/synop country: dza centre_id: dz-alger_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/int-wmo-test-buoy.yml b/tests/data/metadata/discovery/int-wmo-test-buoy.yml index 102cb95fb..e439731de 100644 --- a/tests/data/metadata/discovery/int-wmo-test-buoy.yml +++ b/tests/data/metadata/discovery/int-wmo-test-buoy.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: int-wmo-test.data.core.weather.surface-based-observations.buoy + topic_hierarchy: int-wmo-test/data/core/weather/surface-based-observations/buoy country: int centre_id: int-wmo-test data_mappings: diff --git a/tests/data/metadata/discovery/int-wmo-test-ship.yml b/tests/data/metadata/discovery/int-wmo-test-ship.yml index 52417815f..56a37695e 100644 --- a/tests/data/metadata/discovery/int-wmo-test-ship.yml +++ b/tests/data/metadata/discovery/int-wmo-test-ship.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: int-wmo-test.data.core.weather.surface-based-observations.ship + topic_hierarchy: int-wmo-test/data/core/weather/surface-based-observations/ship country: int centre_id: int-wmo-test data_mappings: diff --git a/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml b/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml index eec608cb6..e6eeacd36 100644 --- a/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml +++ b/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: int-wmo-test.data.core.weather.surface-based-observations.wind_profiler + topic_hierarchy: int-wmo-test/data/core/weather/surface-based-observations/wind_profiler country: int centre_id: int-wmo-test data_mappings: diff --git a/tests/data/metadata/discovery/it-surface-weather-observations.yml b/tests/data/metadata/discovery/it-surface-weather-observations.yml index a64322c84..14e2c1a14 100644 --- a/tests/data/metadata/discovery/it-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/it-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: it-roma_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: it-roma_met_centre/data/core/weather/surface-based-observations/synop country: ita centre_id: it-roma_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/mw-surface-weather-observations.yml b/tests/data/metadata/discovery/mw-surface-weather-observations.yml index bcb14c489..bfd80d255 100644 --- a/tests/data/metadata/discovery/mw-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/mw-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: mw-mw_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: mw-mw_met_centre/data/core/weather/surface-based-observations/synop country: mwi centre_id: mw-mw_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml b/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml index 58fd2676e..e55d791bf 100644 --- a/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml +++ b/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: ro-rnimh.data.core.weather.surface-based-observations.synop + topic_hierarchy: ro-rnimh/data/core/weather/surface-based-observations/synop country: rou centre_id: ro-rnimh data_mappings: diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index 349b8d839..c96601d4f 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -98,10 +98,7 @@ def setup_collection(meta: dict = {}) -> bool: LOGGER.error(f'Invalid configuration: {meta}') return False - if 'topic_hierarchy' in meta: - data_name = meta['topic_hierarchy'] - else: - data_name = meta['id'] + data_name = meta['id'] backend = load_backend() if not backend.has_collection(data_name): diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index 959726583..d1c292df8 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -150,7 +150,7 @@ def notify(self, identifier: str, storage_path: str, operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - identifier, metadata_id, storage_path, datetime_, geometry, + f'{metadata_id}/{identifier}', metadata_id, storage_path, datetime_, geometry, wigos_station_identifier, operation) # load plugin for public broker diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 43ada8dc6..46e50575d 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -178,7 +178,7 @@ def publish_broker_message(record: dict, storage_path: str, topic = f'origin/a/wis2/{centre_id.lower()}/metadata' # noqa datetime_ = datetime.strptime(record['properties']['created'], '%Y-%m-%dT%H:%M:%SZ') # noqa - wis_message = WISNotificationMessage(identifier=record['id'], + wis_message = WISNotificationMessage(identifier=f"{centre_id.lower()}/metadata/{record['id']}", metadata_id=None, filepath=storage_path, datetime_=datetime_, diff --git a/wis2box-management/wis2box/pubsub/message.py b/wis2box-management/wis2box/pubsub/message.py index e86e9252e..d036a630e 100644 --- a/wis2box-management/wis2box/pubsub/message.py +++ b/wis2box-management/wis2box/pubsub/message.py @@ -132,7 +132,7 @@ def __init__(self, identifier: str, metadata_id: str, filepath: str, super().__init__('wis2-notification-message', identifier, filepath, datetime_, geometry) - data_id = f'{metadata_id}/{self.identifier}' + data_id = f'{self.identifier}' if '/metadata' in filepath: mimetype = 'application/geo+json' @@ -172,7 +172,6 @@ def __init__(self, identifier: str, metadata_id: str, filepath: str, 'geometry': self.geometry, 'properties': { 'data_id': data_id, - 'metadata_id': metadata_id, 'datetime': self.datetime, 'pubtime': self.publish_datetime, 'integrity': { @@ -184,6 +183,9 @@ def __init__(self, identifier: str, metadata_id: str, filepath: str, 'generated-by': f'wis2box {__version__}' } + if metadata_id is not None: + self.message['properties']['metadata_id'] = metadata_id + if self.length < 4096: LOGGER.debug('Including data inline via properties.content') content_value = base64.b64encode(self.filebytes) From 4e10a586de72abace994aced22123369857887cc Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 27 May 2024 09:34:15 +0200 Subject: [PATCH 09/19] handle identifiers in messages and es-indices --- wis2box-management/wis2box/api/config/pygeoapi.py | 8 +++++--- wis2box-management/wis2box/data/base.py | 4 ++-- wis2box-management/wis2box/data/bufr2geojson.py | 2 +- wis2box-management/wis2box/data/bufr4.py | 2 +- wis2box-management/wis2box/data/csv2bufr.py | 2 +- wis2box-management/wis2box/data/geojson.py | 2 +- wis2box-management/wis2box/data/message.py | 2 +- wis2box-management/wis2box/data/synop2bufr.py | 2 +- wis2box-management/wis2box/data/universal.py | 2 +- wis2box-management/wis2box/data_mappings.py | 5 +---- wis2box-management/wis2box/pubsub/subscribe.py | 3 +-- 11 files changed, 16 insertions(+), 18 deletions(-) diff --git a/wis2box-management/wis2box/api/config/pygeoapi.py b/wis2box-management/wis2box/api/config/pygeoapi.py index e0a2ccc8f..9cc15d034 100644 --- a/wis2box-management/wis2box/api/config/pygeoapi.py +++ b/wis2box-management/wis2box/api/config/pygeoapi.py @@ -140,13 +140,15 @@ def prepare_collection(self, meta: dict) -> bool: editable = False - if meta['id'] in ['discovery-metadata', 'messages', 'stations']: - resource_id = meta['id'] + resource_id = meta['id'] if meta['id'] in ['discovery-metadata', 'stations']: editable = True + else: + # avoid colons in resource id + resource_id = resource_id.lower().replace(':', '-') - LOGGER.debug(f'Resource id: {resource_id}') + LOGGER.info(f'Prepare collection with resource_id={resource_id}') type_ = meta.get('type', 'feature') diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index d1c292df8..d64785c1e 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -150,8 +150,8 @@ def notify(self, identifier: str, storage_path: str, operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - f'{metadata_id}/{identifier}', metadata_id, storage_path, datetime_, geometry, - wigos_station_identifier, operation) + f"{metadata_id.replace('urn:wmo:md:', '')}/{identifier}", metadata_id, + storage_path, datetime_, geometry, wigos_station_identifier, operation) # load plugin for public broker defs = { diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index 082f679cc..fe282486b 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -84,4 +84,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') diff --git a/wis2box-management/wis2box/data/bufr4.py b/wis2box-management/wis2box/data/bufr4.py index a759ef6aa..24938a558 100644 --- a/wis2box-management/wis2box/data/bufr4.py +++ b/wis2box-management/wis2box/data/bufr4.py @@ -100,4 +100,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') diff --git a/wis2box-management/wis2box/data/csv2bufr.py b/wis2box-management/wis2box/data/csv2bufr.py index f8d32344e..0348ce548 100644 --- a/wis2box-management/wis2box/data/csv2bufr.py +++ b/wis2box-management/wis2box/data/csv2bufr.py @@ -113,4 +113,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy) + return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','')) diff --git a/wis2box-management/wis2box/data/geojson.py b/wis2box-management/wis2box/data/geojson.py index 3f3740a9d..06bdc17fa 100644 --- a/wis2box-management/wis2box/data/geojson.py +++ b/wis2box-management/wis2box/data/geojson.py @@ -71,4 +71,4 @@ def publish(self) -> bool: def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') diff --git a/wis2box-management/wis2box/data/message.py b/wis2box-management/wis2box/data/message.py index da6909d21..0339c9e6d 100644 --- a/wis2box-management/wis2box/data/message.py +++ b/wis2box-management/wis2box/data/message.py @@ -76,4 +76,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') diff --git a/wis2box-management/wis2box/data/synop2bufr.py b/wis2box-management/wis2box/data/synop2bufr.py index 2347788ca..6148ab6da 100644 --- a/wis2box-management/wis2box/data/synop2bufr.py +++ b/wis2box-management/wis2box/data/synop2bufr.py @@ -124,4 +124,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy) + return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','')) diff --git a/wis2box-management/wis2box/data/universal.py b/wis2box-management/wis2box/data/universal.py index a3b17344b..00515877a 100644 --- a/wis2box-management/wis2box/data/universal.py +++ b/wis2box-management/wis2box/data/universal.py @@ -80,4 +80,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index e7c044976..df2528162 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -114,10 +114,7 @@ def validate_and_load(path: str, metadata_id = key topic_hierarchy = data_mappings['topic_hierarchy'] if metadata_id is None: - msg = f'Could not match {path} to dataset. Did not match any of the following: ' # noqa - options = [v["topic_hierarchy"] for v in data_mappings.values()] - options += [k for k in data_mappings.keys()] - msg += ', '.join(options) + msg = f'Could not match {path} to dataset, available keys are: {list(data_mappings.keys())}' # noqa raise ValueError(msg) plugins = data_mappings[metadata_id]['plugins'] diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index d296c48b5..d42febf75 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -69,8 +69,7 @@ def handle(self, filepath): msg = f'not handled error: {err}' LOGGER.debug(msg) except ValueError as err: - msg = f'handle() error: {err}' - LOGGER.error(msg, exc_info=1) + LOGGER.error(err) except Exception as err: msg = f'handle() error: {err}' raise err From 6dccd2593889f611d3f0ffff0ad254b59032f6c0 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 27 May 2024 10:33:43 +0200 Subject: [PATCH 10/19] match without urn --- wis2box-management/wis2box/data_mappings.py | 5 +++-- wis2box-management/wis2box/pubsub/subscribe.py | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index df2528162..ef84b59a2 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -104,7 +104,7 @@ def validate_and_load(path: str, topic_hierarchy = None # determine if path matches a metadata_id for key in data_mappings.keys(): - if key in path: + if key.replace('urn:wmo:md:', '') in path: metadata_id = key topic_hierarchy = data_mappings[key]['topic_hierarchy'] # else try to match topic_hierarchy @@ -114,7 +114,8 @@ def validate_and_load(path: str, metadata_id = key topic_hierarchy = data_mappings['topic_hierarchy'] if metadata_id is None: - msg = f'Could not match {path} to dataset, available keys are: {list(data_mappings.keys())}' # noqa + options = [key.replace('urn:wmo:md:', '') for key in data_mappings.keys()] + msg = f'Could not match {path} to dataset, options are: {options}' # noqa raise ValueError(msg) plugins = data_mappings[metadata_id]['plugins'] diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index d42febf75..8595f5e11 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -61,13 +61,13 @@ def handle(self, filepath): handler = Handler(filepath=filepath, data_mappings=self.data_mappings) if handler.handle(): - LOGGER.info('Data processed') + LOGGER.debug('Data processed') for plugin in handler.plugins: for filepath in plugin.files(): - LOGGER.info(f'Public filepath: {filepath}') + LOGGER.debug(f'Public filepath: {filepath}') except NotHandledError as err: - msg = f'not handled error: {err}' - LOGGER.debug(msg) + msg = f'not handled: {err}' + LOGGER.info(msg) except ValueError as err: LOGGER.error(err) except Exception as err: From 1a5faa4408f4e0290e9bb3980d0cb7f196134777 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 27 May 2024 10:45:18 +0200 Subject: [PATCH 11/19] Update test_workflow.py replace paths --- tests/integration/test_workflow.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 4201a3636..da7169de4 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -167,7 +167,7 @@ def test_data_ingest(): assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500' assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa - item_source = f'2021-07-07/wis/mw-mw_met_centre/data/core/weather/surface-based-observations/synop/{item_api["reportId"]}.bufr4' # noqa + item_source = f'2021-07-07/mw-mw_met_centre:surface-weather-observations/{item_api["reportId"]}.bufr4' # noqa r = SESSION.get(f'{URL}/data/{item_source}') # noqa assert r.status_code == codes.ok @@ -247,12 +247,12 @@ def test_message_api(): # test messages per test dataset counts = { - 'mw_met_centre': 25, - 'roma_met_centre': 33, - 'alger_met_centre': 29, - 'rnimh': 50, - 'brazza_met_centre': 15, - 'wmo-test': 11, + 'mw-mw_met_centre': 25, + 'it-roma_met_centre': 33, + 'dz-alger_met_centre': 29, + 'ro-rnimh': 50, + 'cd-brazza_met_centre': 15, + 'int-wmo-test': 11, 'cn-cma': 11 } for key, value in counts.items(): @@ -267,8 +267,7 @@ def test_message_api(): assert r['numberMatched'] == sum(counts.values()) # we want to find a particular message with data ID - target_data_id = "cd-brazza_met_centre/data/core/weather/" \ - "surface-based-observations/synop/" \ + target_data_id = "cd-brazza_met_centre:surface-weather-observations/" \ "WIGOS_0-20000-0-64406_20230803T090000" msg = None From 31b2588cc099085ade7de2f01fd047b821b82bf8 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 27 May 2024 10:50:19 +0200 Subject: [PATCH 12/19] flake8 fixes --- wis2box-management/wis2box/data/base.py | 5 +++-- wis2box-management/wis2box/data/bufr2geojson.py | 2 +- wis2box-management/wis2box/data/bufr4.py | 2 +- wis2box-management/wis2box/data/csv2bufr.py | 2 +- wis2box-management/wis2box/data/geojson.py | 2 +- wis2box-management/wis2box/data/message.py | 2 +- wis2box-management/wis2box/data/synop2bufr.py | 2 +- wis2box-management/wis2box/data/universal.py | 2 +- wis2box-management/wis2box/data_mappings.py | 2 +- wis2box-management/wis2box/metadata/discovery.py | 3 ++- 10 files changed, 13 insertions(+), 11 deletions(-) diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index d64785c1e..a597659fd 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -150,8 +150,9 @@ def notify(self, identifier: str, storage_path: str, operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - f"{metadata_id.replace('urn:wmo:md:', '')}/{identifier}", metadata_id, - storage_path, datetime_, geometry, wigos_station_identifier, operation) + f"{metadata_id.replace('urn:wmo:md:', '')}/{identifier}", + metadata_id, storage_path, datetime_, geometry, + wigos_station_identifier, operation) # load plugin for public broker defs = { diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index fe282486b..a1bab2847 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -84,4 +84,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa diff --git a/wis2box-management/wis2box/data/bufr4.py b/wis2box-management/wis2box/data/bufr4.py index 24938a558..fc57d3b12 100644 --- a/wis2box-management/wis2box/data/bufr4.py +++ b/wis2box-management/wis2box/data/bufr4.py @@ -100,4 +100,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa diff --git a/wis2box-management/wis2box/data/csv2bufr.py b/wis2box-management/wis2box/data/csv2bufr.py index 0348ce548..54d9520b3 100644 --- a/wis2box-management/wis2box/data/csv2bufr.py +++ b/wis2box-management/wis2box/data/csv2bufr.py @@ -113,4 +113,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','')) + return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '')) # noqa diff --git a/wis2box-management/wis2box/data/geojson.py b/wis2box-management/wis2box/data/geojson.py index 06bdc17fa..25b8702cf 100644 --- a/wis2box-management/wis2box/data/geojson.py +++ b/wis2box-management/wis2box/data/geojson.py @@ -71,4 +71,4 @@ def publish(self) -> bool: def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa diff --git a/wis2box-management/wis2box/data/message.py b/wis2box-management/wis2box/data/message.py index 0339c9e6d..76d4ee6e5 100644 --- a/wis2box-management/wis2box/data/message.py +++ b/wis2box-management/wis2box/data/message.py @@ -76,4 +76,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa diff --git a/wis2box-management/wis2box/data/synop2bufr.py b/wis2box-management/wis2box/data/synop2bufr.py index 6148ab6da..f57a6abc4 100644 --- a/wis2box-management/wis2box/data/synop2bufr.py +++ b/wis2box-management/wis2box/data/synop2bufr.py @@ -124,4 +124,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','')) + return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '')) # noqa diff --git a/wis2box-management/wis2box/data/universal.py b/wis2box-management/wis2box/data/universal.py index 00515877a..63beab94a 100644 --- a/wis2box-management/wis2box/data/universal.py +++ b/wis2box-management/wis2box/data/universal.py @@ -80,4 +80,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:','') + return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index ef84b59a2..354ac35db 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -114,7 +114,7 @@ def validate_and_load(path: str, metadata_id = key topic_hierarchy = data_mappings['topic_hierarchy'] if metadata_id is None: - options = [key.replace('urn:wmo:md:', '') for key in data_mappings.keys()] + options = [key.replace('urn:wmo:md:', '') for key in data_mappings.keys()] # noqa msg = f'Could not match {path} to dataset, options are: {options}' # noqa raise ValueError(msg) diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 46e50575d..b394d6e16 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -178,7 +178,8 @@ def publish_broker_message(record: dict, storage_path: str, topic = f'origin/a/wis2/{centre_id.lower()}/metadata' # noqa datetime_ = datetime.strptime(record['properties']['created'], '%Y-%m-%dT%H:%M:%SZ') # noqa - wis_message = WISNotificationMessage(identifier=f"{centre_id.lower()}/metadata/{record['id']}", + identifier = f"{centre_id.lower()}/metadata/{record['id']}" + wis_message = WISNotificationMessage(identifier=identifier, metadata_id=None, filepath=storage_path, datetime_=datetime_, From 082304ddcb96f82603eada0f359b1a33d94de7ac Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 27 May 2024 10:53:41 +0200 Subject: [PATCH 13/19] test fix --- tests/integration/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index da7169de4..1fe555bef 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -167,7 +167,7 @@ def test_data_ingest(): assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500' assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa - item_source = f'2021-07-07/mw-mw_met_centre:surface-weather-observations/{item_api["reportId"]}.bufr4' # noqa + item_source = f'2021-07-07/wis/mw-mw_met_centre:surface-weather-observations/{item_api["reportId"]}.bufr4' # noqa r = SESSION.get(f'{URL}/data/{item_source}') # noqa assert r.status_code == codes.ok From 41a93c61fc2c2d2327fbfb4665705d0e2ce3fb15 Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 28 May 2024 12:10:26 +0200 Subject: [PATCH 14/19] improved logging, avoid overriding data_mappings --- wis2box-management/wis2box/data_mappings.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index 354ac35db..b496aef7e 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -109,13 +109,15 @@ def validate_and_load(path: str, topic_hierarchy = data_mappings[key]['topic_hierarchy'] # else try to match topic_hierarchy if metadata_id is None: - for key, data_mappings in data_mappings.items(): - if (data_mappings['topic_hierarchy']).replace('origin/a/wis2/', '') in path: # noqa + for key, v in data_mappings.items(): + if (v['topic_hierarchy']).replace('origin/a/wis2/', '') in path: # noqa metadata_id = key - topic_hierarchy = data_mappings['topic_hierarchy'] + topic_hierarchy = v['topic_hierarchy'] + break if metadata_id is None: options = [key.replace('urn:wmo:md:', '') for key in data_mappings.keys()] # noqa - msg = f'Could not match {path} to dataset, options are: {options}' # noqa + options += [v['topic_hierarchy'].replace('origin/a/wis2/', '') for v in data_mappings.values()] # noqa + msg = f'Could not match {path} to dataset, path should include one of the following: {options}' # noqa raise ValueError(msg) plugins = data_mappings[metadata_id]['plugins'] From 7f1f1e23a30979a380b8c193b5f78e8a31899441 Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 28 May 2024 21:58:50 +0200 Subject: [PATCH 15/19] updated docs, fix auth, metadata_id in path --- docs/source/reference/auth.rst | 12 +++++----- .../data-ingest-processing-and-publishing.rst | 19 ++++++++-------- docs/source/user/data-ingest.rst | 22 ++++++++++++------- tests/integration/test_workflow.py | 4 ++-- wis2box-management/wis2box/auth.py | 12 ++++++++-- wis2box-management/wis2box/data/base.py | 2 +- .../wis2box/data/bufr2geojson.py | 2 +- wis2box-management/wis2box/data/bufr4.py | 2 +- wis2box-management/wis2box/data/csv2bufr.py | 2 +- wis2box-management/wis2box/data/geojson.py | 2 +- wis2box-management/wis2box/data/message.py | 2 +- wis2box-management/wis2box/data/synop2bufr.py | 2 +- wis2box-management/wis2box/data/universal.py | 2 +- wis2box-management/wis2box/data_mappings.py | 4 ++-- 14 files changed, 51 insertions(+), 38 deletions(-) diff --git a/docs/source/reference/auth.rst b/docs/source/reference/auth.rst index 41ff3ea8f..cda4a2e83 100644 --- a/docs/source/reference/auth.rst +++ b/docs/source/reference/auth.rst @@ -32,11 +32,11 @@ To add a token to PUT/POST/DELETE requests to the stations collection, use the f This will generate a random token that can be use to update the stations collection. -Adding Access Control on topics +Adding Access Control on datasets ------------------------------- -All topic hierarchies in wis2box are open by default. A topic becomes closed, with access control applied, the -first time a token is generated for a topic hierarchy. +All dataset in wis2box are open by default. A dataset becomes closed, with access control applied, the +first time a token is generated for a dataset .. note:: @@ -44,7 +44,7 @@ first time a token is generated for a topic hierarchy. .. code-block:: bash - wis2box auth add-token --topic-hierarchy mw-mw_met_centre.data.core.weather.surface-based-observations.synop mytoken + wis2box auth add-token --metadata-id urn:wmo:md:mw-mw_met_centre:surface-weather-observations mytoken If no token is provided, a random string will be generated. Be sure to the record token now, there is no @@ -58,8 +58,8 @@ Token credentials can be validated using the wis2box command line utility. .. code-block:: bash wis2box auth show - wis2box auth has-access-topic --topic-hierarchy mw-mw_met_centre.data.core.weather.surface-based-observations.synop mytoken - wis2box auth has-access-topic --topic-hierarchy mw-mw_met_centre.data.core.weather.surface-based-observations.synop notmytoken + wis2box auth has-access-topic --metadata-id urn:wmo:md:mw-mw_met_centre:surface-weather-observations mytoken + wis2box auth has-access-topic --metadata-id urn:wmo:md:mw-mw_met_centre:surface-weather-observations notmytoken Once a token has been generated, access to any data of that topic in the WAF or API requires token authentication. diff --git a/docs/source/reference/running/data-ingest-processing-and-publishing.rst b/docs/source/reference/running/data-ingest-processing-and-publishing.rst index a65d70d0e..eb271f9d2 100644 --- a/docs/source/reference/running/data-ingest-processing-and-publishing.rst +++ b/docs/source/reference/running/data-ingest-processing-and-publishing.rst @@ -27,28 +27,27 @@ Explicit topic hierarchy workflow .. code-block:: bash # process a single CSV file - wis2box data ingest --topic-hierarchy foo.bar.baz -p /path/to/file.csv + wis2box data ingest --metadata-id urn:wmo:md:centre-id:mydata -p /path/to/file.csv # process a directory of CSV files - wis2box data ingest --topic-hierarchy foo.bar.baz -p /path/to/dir + wis2box data ingest --metadata-id urn:wmo:md:centre-id:mydata -p /path/to/dir # process a directory of CSV files recursively - wis2box data ingest --topic-hierarchy foo.bar.baz -p /path/to/dir -r + wis2box data ingest --metadata-id urn:wmo:md:centre-id:mydata -p /path/to/dir -r -Implicit topic hierarchy workflow -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - +Implicit metadata_id workflow +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: bash - # process incoming data; topic hierarchy is inferred from fuzzy filepath equivalent - # wis2box will detect 'foo/bar/baz' as topic hierarchy 'foo.bar.baz' + # process incoming data; metadata_id is inferred from fuzzy filepath equivalent + # wis2box will detect 'mydata' as metadata_id 'urn:md:wmo:mydata' wis2box data ingest -p /path/to/foo/bar/baz/data/file.csv Event driven ingest, processing and publishing ---------------------------------------------- -Once all metadata and topic hierarchies are setup, event driven workflow +Once all datasets are setup, event driven workflow will immediately start to listen on files in the ``wis2box-incoming`` storage bucket as they are -placed in the appropriate topic hierarchy directory. +placed in the appropriate directory that can be matched to a metadata_id. diff --git a/docs/source/user/data-ingest.rst b/docs/source/user/data-ingest.rst index f6cbaf830..3a00b7411 100644 --- a/docs/source/user/data-ingest.rst +++ b/docs/source/user/data-ingest.rst @@ -79,16 +79,21 @@ Select 'browse' on the ``wis2box-incoming`` bucket and select 'Choose or create :alt: MinIO new folder path .. note:: - The folder in which the file is placed defines the topic you want to share the data on and should match the datasets defined in your data mappings. + The folder in which the file is placed will be used to determine the dataset to which the file belongs. - The first 3 levels of the WIS2 topic hierarchy ``origin/a/wis2`` are automatically included by wis2box when publishing data notification messages. - - For example: + The wis2box-management container will match the path of the file to the dataset defined in the data mappings by checking it either contains the metadata_id or the topic (excluding 'origin/a/wis2/'). - * data to be published on: ``origin/a/wis2/cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` - * upload data in the path: ``cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` + For example, using a filepath matching the metadata_id: + + * Metadata ID: ``urn:wmo:md:it-roma_met_centre:surface-weather-observations.synop`` + * upload data in path containing: ``it-roma_met_centre:surface-weather-observations.synop`` + + For example using a filepath matching the topic hierarchy: - The error message ``Topic Hierarchy validation error: No plugins for minio:9000/wis2box-incoming/... in data mappings`` indicates you stored a file in a folder for which no matching dataset was defined in the data mappings. + * Topic Hierarchy: ``origin/a/wis2/cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` + * upload data in the path containing: ``cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` + + The error message ``Path validation error: Could not match http://minio:9000/wis2box-incoming/... to dataset, ...`` indicates you stored a file in a folder that could not be matched to a dataset. After uploading a file to ``wis2box-incoming`` storage bucket, you can browse the content in the ``wis2box-public`` bucket. If the data ingest was successful, new data will appear as follows: @@ -132,7 +137,8 @@ See below a Python example to upload data using the MinIO package: from minio import Minio filepath = '/home/wis2box-user/local-data/mydata.bin' - minio_path = '/it-roma_met_centre/data/core/weather/surface-based-observations/synop/' + # path should match the metadata or the topic in the data mappings + minio_path = 'urn:wmo:md:it-roma_met_centre:surface-weather-observations' endpoint = 'http://localhost:9000' WIS2BOX_STORAGE_USERNAME = 'wis2box' diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 1fe555bef..4c61ac534 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -167,7 +167,7 @@ def test_data_ingest(): assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500' assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa - item_source = f'2021-07-07/wis/mw-mw_met_centre:surface-weather-observations/{item_api["reportId"]}.bufr4' # noqa + item_source = f'2021-07-07/wis/{ID}/{item_api["reportId"]}.bufr4' # noqa r = SESSION.get(f'{URL}/data/{item_source}') # noqa assert r.status_code == codes.ok @@ -267,7 +267,7 @@ def test_message_api(): assert r['numberMatched'] == sum(counts.values()) # we want to find a particular message with data ID - target_data_id = "cd-brazza_met_centre:surface-weather-observations/" \ + target_data_id = "urn:wmo:md:cd-brazza_met_centre:surface-weather-observations/" \ "WIGOS_0-20000-0-64406_20230803T090000" msg = None diff --git a/wis2box-management/wis2box/auth.py b/wis2box-management/wis2box/auth.py index 207eae062..5f65031f9 100644 --- a/wis2box-management/wis2box/auth.py +++ b/wis2box-management/wis2box/auth.py @@ -25,8 +25,10 @@ from secrets import token_hex from wis2box import cli_helpers +from wis2box.data_mappings import get_data_mappings from wis2box.env import AUTH_URL + LOGGER = logging.getLogger(__name__) @@ -39,7 +41,7 @@ def create_token(path: str, token: str) -> bool: :returns: `bool` of result """ - data = {'path': path, 'token': token} + data = {'topic': path, 'token': token} r = requests.post(f'{AUTH_URL}/add_token', data=data) LOGGER.info(r.json().get('description')) @@ -56,7 +58,7 @@ def delete_token(path: str, token: str = '') -> bool: :returns: `bool` of result """ - data = {'path': path} + data = {'topic': path} if token != '': # Delete all tokens for a given th @@ -150,6 +152,9 @@ def add_token(ctx, metadata_id, path, yes, token): """Add access token for a path or dataset""" if metadata_id is not None: + data_mappings = get_data_mappings() + if metadata_id not in data_mappings: + raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') # noqa path = metadata_id elif path is not None: path = path @@ -175,6 +180,9 @@ def remove_token(ctx, metadata_id, path, token): """Delete one to many tokens for a dataset""" if metadata_id is not None: + data_mappings = get_data_mappings() + if metadata_id not in data_mappings: + raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') path = metadata_id elif path is not None: path = path diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index a597659fd..098b3f88e 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -150,7 +150,7 @@ def notify(self, identifier: str, storage_path: str, operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - f"{metadata_id.replace('urn:wmo:md:', '')}/{identifier}", + f'{metadata_id}/{identifier}', metadata_id, storage_path, datetime_, geometry, wigos_station_identifier, operation) diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index a1bab2847..0858c374d 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -84,4 +84,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/bufr4.py b/wis2box-management/wis2box/data/bufr4.py index fc57d3b12..82fed6c3a 100644 --- a/wis2box-management/wis2box/data/bufr4.py +++ b/wis2box-management/wis2box/data/bufr4.py @@ -100,4 +100,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/csv2bufr.py b/wis2box-management/wis2box/data/csv2bufr.py index 54d9520b3..767a8a2cb 100644 --- a/wis2box-management/wis2box/data/csv2bufr.py +++ b/wis2box-management/wis2box/data/csv2bufr.py @@ -113,4 +113,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '')) # noqa + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/geojson.py b/wis2box-management/wis2box/data/geojson.py index 25b8702cf..a9df7b62a 100644 --- a/wis2box-management/wis2box/data/geojson.py +++ b/wis2box-management/wis2box/data/geojson.py @@ -71,4 +71,4 @@ def publish(self) -> bool: def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/message.py b/wis2box-management/wis2box/data/message.py index 76d4ee6e5..279d5875d 100644 --- a/wis2box-management/wis2box/data/message.py +++ b/wis2box-management/wis2box/data/message.py @@ -76,4 +76,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa + return Path(yyyymmdd) / 'wis' / self.metadata_id \ No newline at end of file diff --git a/wis2box-management/wis2box/data/synop2bufr.py b/wis2box-management/wis2box/data/synop2bufr.py index f57a6abc4..78e00aebb 100644 --- a/wis2box-management/wis2box/data/synop2bufr.py +++ b/wis2box-management/wis2box/data/synop2bufr.py @@ -124,4 +124,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '')) # noqa + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/universal.py b/wis2box-management/wis2box/data/universal.py index 63beab94a..1d75e41d3 100644 --- a/wis2box-management/wis2box/data/universal.py +++ b/wis2box-management/wis2box/data/universal.py @@ -80,4 +80,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id.replace('urn:wmo:md:', '') # noqa + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index b496aef7e..9c694d553 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -104,7 +104,7 @@ def validate_and_load(path: str, topic_hierarchy = None # determine if path matches a metadata_id for key in data_mappings.keys(): - if key.replace('urn:wmo:md:', '') in path: + if key in path: metadata_id = key topic_hierarchy = data_mappings[key]['topic_hierarchy'] # else try to match topic_hierarchy @@ -115,7 +115,7 @@ def validate_and_load(path: str, topic_hierarchy = v['topic_hierarchy'] break if metadata_id is None: - options = [key.replace('urn:wmo:md:', '') for key in data_mappings.keys()] # noqa + options = [key for key in data_mappings.keys()] # noqa options += [v['topic_hierarchy'].replace('origin/a/wis2/', '') for v in data_mappings.values()] # noqa msg = f'Could not match {path} to dataset, path should include one of the following: {options}' # noqa raise ValueError(msg) From 90e77aae4fa1030d030580aefdd327b8116f5970 Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 28 May 2024 22:19:42 +0200 Subject: [PATCH 16/19] flake8 and remove urn:wmo:md from data_id in data-notification --- tests/integration/test_workflow.py | 3 +-- wis2box-management/wis2box/auth.py | 2 +- wis2box-management/wis2box/data/base.py | 2 +- wis2box-management/wis2box/data/message.py | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 4c61ac534..7a831cc85 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -267,8 +267,7 @@ def test_message_api(): assert r['numberMatched'] == sum(counts.values()) # we want to find a particular message with data ID - target_data_id = "urn:wmo:md:cd-brazza_met_centre:surface-weather-observations/" \ - "WIGOS_0-20000-0-64406_20230803T090000" + target_data_id = "urn:wmo:md:cd-brazza_met_centre:surface-weather-observations/WIGOS_0-20000-0-64406_20230803T090000" # noqa msg = None for feature in r['features']: diff --git a/wis2box-management/wis2box/auth.py b/wis2box-management/wis2box/auth.py index 5f65031f9..4d6f8c2e6 100644 --- a/wis2box-management/wis2box/auth.py +++ b/wis2box-management/wis2box/auth.py @@ -182,7 +182,7 @@ def remove_token(ctx, metadata_id, path, token): if metadata_id is not None: data_mappings = get_data_mappings() if metadata_id not in data_mappings: - raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') + raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') # noqa path = metadata_id elif path is not None: path = path diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index 098b3f88e..038f1339e 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -150,7 +150,7 @@ def notify(self, identifier: str, storage_path: str, operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - f'{metadata_id}/{identifier}', + f'{metadata_id.replace('urn:wmo:md:','')}/{identifier}', metadata_id, storage_path, datetime_, geometry, wigos_station_identifier, operation) diff --git a/wis2box-management/wis2box/data/message.py b/wis2box-management/wis2box/data/message.py index 279d5875d..513a23b14 100644 --- a/wis2box-management/wis2box/data/message.py +++ b/wis2box-management/wis2box/data/message.py @@ -76,4 +76,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.metadata_id \ No newline at end of file + return Path(yyyymmdd) / 'wis' / self.metadata_id From fbd4e7c6068678149e75352a18d2235dfe45faf5 Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 28 May 2024 22:26:56 +0200 Subject: [PATCH 17/19] fix quotes --- wis2box-management/wis2box/data/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index 038f1339e..12e1b91bc 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -150,7 +150,7 @@ def notify(self, identifier: str, storage_path: str, operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - f'{metadata_id.replace('urn:wmo:md:','')}/{identifier}', + f"{metadata_id.replace('urn:wmo:md:','')}/{identifier}", metadata_id, storage_path, datetime_, geometry, wigos_station_identifier, operation) From 3a6babb4e840ba5a34e7aea912021d5556198e5e Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 28 May 2024 22:34:59 +0200 Subject: [PATCH 18/19] fix test --- tests/integration/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 7a831cc85..9661b538f 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -267,7 +267,7 @@ def test_message_api(): assert r['numberMatched'] == sum(counts.values()) # we want to find a particular message with data ID - target_data_id = "urn:wmo:md:cd-brazza_met_centre:surface-weather-observations/WIGOS_0-20000-0-64406_20230803T090000" # noqa + target_data_id = "cd-brazza_met_centre:surface-weather-observations/WIGOS_0-20000-0-64406_20230803T090000" # noqa msg = None for feature in r['features']: From 7f8c0b691158e08bc96d2c18099d02bcb753b565 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 3 Jun 2024 08:26:12 +0200 Subject: [PATCH 19/19] changes requested by Tom --- docs/source/reference/auth.rst | 2 +- docs/source/user/data-ingest.rst | 8 ++++---- wis2box-management/wis2box/auth.py | 4 ++-- wis2box-management/wis2box/cli_helpers.py | 2 +- wis2box-management/wis2box/data/__init__.py | 2 +- wis2box-management/wis2box/data/base.py | 8 ++++---- wis2box-management/wis2box/data_mappings.py | 6 +++--- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/source/reference/auth.rst b/docs/source/reference/auth.rst index cda4a2e83..8d9d920cf 100644 --- a/docs/source/reference/auth.rst +++ b/docs/source/reference/auth.rst @@ -33,7 +33,7 @@ To add a token to PUT/POST/DELETE requests to the stations collection, use the f This will generate a random token that can be use to update the stations collection. Adding Access Control on datasets -------------------------------- +--------------------------------- All dataset in wis2box are open by default. A dataset becomes closed, with access control applied, the first time a token is generated for a dataset diff --git a/docs/source/user/data-ingest.rst b/docs/source/user/data-ingest.rst index 3a00b7411..5652fd275 100644 --- a/docs/source/user/data-ingest.rst +++ b/docs/source/user/data-ingest.rst @@ -81,11 +81,11 @@ Select 'browse' on the ``wis2box-incoming`` bucket and select 'Choose or create .. note:: The folder in which the file is placed will be used to determine the dataset to which the file belongs. - The wis2box-management container will match the path of the file to the dataset defined in the data mappings by checking it either contains the metadata_id or the topic (excluding 'origin/a/wis2/'). + The wis2box-management container will match the path of the file to the dataset defined in the data mappings by checking it either contains the metadata identifier or the topic (excluding 'origin/a/wis2/'). - For example, using a filepath matching the metadata_id: + For example, using a filepath matching the metadata identifier: - * Metadata ID: ``urn:wmo:md:it-roma_met_centre:surface-weather-observations.synop`` + * Metadata identifier: ``urn:wmo:md:it-roma_met_centre:surface-weather-observations.synop`` * upload data in path containing: ``it-roma_met_centre:surface-weather-observations.synop`` For example using a filepath matching the topic hierarchy: @@ -93,7 +93,7 @@ Select 'browse' on the ``wis2box-incoming`` bucket and select 'Choose or create * Topic Hierarchy: ``origin/a/wis2/cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` * upload data in the path containing: ``cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` - The error message ``Path validation error: Could not match http://minio:9000/wis2box-incoming/... to dataset, ...`` indicates you stored a file in a folder that could not be matched to a dataset. + The error message ``Path validation error: Could not match http://minio:9000/wis2box-incoming/... to dataset, ...`` indicates that a file was stored in a directory that could not be matched to a dataset. After uploading a file to ``wis2box-incoming`` storage bucket, you can browse the content in the ``wis2box-public`` bucket. If the data ingest was successful, new data will appear as follows: diff --git a/wis2box-management/wis2box/auth.py b/wis2box-management/wis2box/auth.py index 4d6f8c2e6..0728e875f 100644 --- a/wis2box-management/wis2box/auth.py +++ b/wis2box-management/wis2box/auth.py @@ -154,7 +154,7 @@ def add_token(ctx, metadata_id, path, yes, token): if metadata_id is not None: data_mappings = get_data_mappings() if metadata_id not in data_mappings: - raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') # noqa + raise click.ClickException(f'Metadata identifier {metadata_id} not found in data mappings') # noqa path = metadata_id elif path is not None: path = path @@ -182,7 +182,7 @@ def remove_token(ctx, metadata_id, path, token): if metadata_id is not None: data_mappings = get_data_mappings() if metadata_id not in data_mappings: - raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') # noqa + raise click.ClickException(f'Metadata identifier {metadata_id} not found in data mappings') # noqa path = metadata_id elif path is not None: path = path diff --git a/wis2box-management/wis2box/cli_helpers.py b/wis2box-management/wis2box/cli_helpers.py index a6962e8dc..f9771ee86 100644 --- a/wis2box-management/wis2box/cli_helpers.py +++ b/wis2box-management/wis2box/cli_helpers.py @@ -38,7 +38,7 @@ help='Topic hierarchy') OPTION_METADATA_ID = click.option('--metadata-id', '-mdi', - help='Metadata ID') + help='Metadata identifier') OPTION_RECURSIVE = click.option('--recursive', '-r', default=False, is_flag=True, diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index b745014d4..d68e82aed 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -211,7 +211,7 @@ def ingest(ctx, topic_hierarchy, metadata_id, path, recursive, verbosity): if metadata_id: data_mappings = get_data_mappings() if metadata_id not in data_mappings: - raise click.ClickException(f'Metadata ID {metadata_id} not found in data mappings') # noqa + raise click.ClickException(f'metadata_id={metadata_id} not found in data mappings') # noqa rfp = metadata_id else: rfp = topic_hierarchy.replace('.', '/') diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index 12e1b91bc..340cf56b0 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -51,10 +51,10 @@ def __init__(self, defs: dict) -> None: LOGGER.debug('Parsing resource mappings') self.filename = None - self.incoming_filepath = defs.get('incoming_filepath', None) - self.metadata_id = defs.get('metadata_id', None) - self.topic_hierarchy = defs.get('topic_hierarchy', None) - self.template = defs.get('template', None) + self.incoming_filepath = defs.get('incoming_filepath') + self.metadata_id = defs.get('metadata_id') + self.topic_hierarchy = defs.get('topic_hierarchy') + self.template = defs.get('template') self.file_filter = defs.get('pattern', '.*') self.enable_notification = defs.get('notify', False) self.buckets = defs.get('buckets', ()) diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index 9c694d553..37300d93d 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -109,10 +109,10 @@ def validate_and_load(path: str, topic_hierarchy = data_mappings[key]['topic_hierarchy'] # else try to match topic_hierarchy if metadata_id is None: - for key, v in data_mappings.items(): - if (v['topic_hierarchy']).replace('origin/a/wis2/', '') in path: # noqa + for key, value in data_mappings.items(): + if (value['topic_hierarchy']).replace('origin/a/wis2/', '') in path: # noqa metadata_id = key - topic_hierarchy = v['topic_hierarchy'] + topic_hierarchy = value['topic_hierarchy'] break if metadata_id is None: options = [key for key in data_mappings.keys()] # noqa