diff --git a/.github/workflows/tests-docker.yml b/.github/workflows/tests-docker.yml index b11f762d9..621dc6db0 100644 --- a/.github/workflows/tests-docker.yml +++ b/.github/workflows/tests-docker.yml @@ -61,8 +61,8 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA_UPDATE + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA_UPDATE - name: add Italy synop data (bufr2bufr) ๐Ÿ‡ฎ๐Ÿ‡น env: TOPIC_HIERARCHY: it-roma_met_centre.data.core.weather.surface-based-observations.synop @@ -76,7 +76,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add Algeria synop data (bufr2bufr) ๐Ÿ‡ฉ๐Ÿ‡ฟ env: TOPIC_HIERARCHY: dz-alger_met_centre.data.core.weather.surface-based-observations.synop @@ -90,7 +90,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add Romania synop data (synop2bufr and csv2bufr aws-template) ๐Ÿ‡ท๐Ÿ‡ด env: TOPIC_HIERARCHY: ro-rnimh.data.core.weather.surface-based-observations.synop @@ -104,7 +104,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add Congo synop data (synop2bufr) ๐Ÿ‡จ๐Ÿ‡ฉ env: TOPIC_HIERARCHY: cd-brazza_met_centre.data.core.weather.surface-based-observations.synop @@ -118,7 +118,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --territory-name $TERRITORY $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add example ship data (bufr2bufr) WMO env: TOPIC_HIERARCHY: int-wmo-test.data.core.weather.surface-based-observations.ship @@ -135,7 +135,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --wsi 0-22000-0-EUCDE34 $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add example buoy data (bufr2bufr) WMO env: TOPIC_HIERARCHY: int-wmo-test.data.core.weather.surface-based-observations.buoy @@ -148,7 +148,7 @@ jobs: python3 wis2box-ctl.py execute wis2box metadata station add-topic --wsi 0-22000-0-1400011 $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add example wind profiler data (bufr2bufr) WMO env: TOPIC_HIERARCHY: int-wmo-test.data.core.weather.surface-based-observations.wind_profiler @@ -161,7 +161,7 @@ jobs: python wis2box-ctl.py execute wis2box metadata station add-topic --wsi 0-702-0-48698 $CHANNEL curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: add China GRIB2 data (universal pipeline) ๐Ÿ‡จ๐Ÿ‡ณ env: TOPIC_HIERARCHY: cn-cma.data.core.weather.prediction.forecast.medium-range.probabilistic.global @@ -172,7 +172,7 @@ jobs: python3 wis2box-ctl.py execute wis2box dataset publish $DISCOVERY_METADATA curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID - python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA + python3 wis2box-ctl.py execute wis2box data ingest -mdi $DISCOVERY_METADATA_ID -p $TEST_DATA - name: sleep 30 seconds then run integration tests โš™๏ธ run: | sleep 30 diff --git a/docs/source/reference/auth.rst b/docs/source/reference/auth.rst index 41ff3ea8f..8d9d920cf 100644 --- a/docs/source/reference/auth.rst +++ b/docs/source/reference/auth.rst @@ -32,11 +32,11 @@ To add a token to PUT/POST/DELETE requests to the stations collection, use the f This will generate a random token that can be use to update the stations collection. -Adding Access Control on topics -------------------------------- +Adding Access Control on datasets +--------------------------------- -All topic hierarchies in wis2box are open by default. A topic becomes closed, with access control applied, the -first time a token is generated for a topic hierarchy. +All dataset in wis2box are open by default. A dataset becomes closed, with access control applied, the +first time a token is generated for a dataset .. note:: @@ -44,7 +44,7 @@ first time a token is generated for a topic hierarchy. .. code-block:: bash - wis2box auth add-token --topic-hierarchy mw-mw_met_centre.data.core.weather.surface-based-observations.synop mytoken + wis2box auth add-token --metadata-id urn:wmo:md:mw-mw_met_centre:surface-weather-observations mytoken If no token is provided, a random string will be generated. Be sure to the record token now, there is no @@ -58,8 +58,8 @@ Token credentials can be validated using the wis2box command line utility. .. code-block:: bash wis2box auth show - wis2box auth has-access-topic --topic-hierarchy mw-mw_met_centre.data.core.weather.surface-based-observations.synop mytoken - wis2box auth has-access-topic --topic-hierarchy mw-mw_met_centre.data.core.weather.surface-based-observations.synop notmytoken + wis2box auth has-access-topic --metadata-id urn:wmo:md:mw-mw_met_centre:surface-weather-observations mytoken + wis2box auth has-access-topic --metadata-id urn:wmo:md:mw-mw_met_centre:surface-weather-observations notmytoken Once a token has been generated, access to any data of that topic in the WAF or API requires token authentication. diff --git a/docs/source/reference/running/data-ingest-processing-and-publishing.rst b/docs/source/reference/running/data-ingest-processing-and-publishing.rst index a65d70d0e..eb271f9d2 100644 --- a/docs/source/reference/running/data-ingest-processing-and-publishing.rst +++ b/docs/source/reference/running/data-ingest-processing-and-publishing.rst @@ -27,28 +27,27 @@ Explicit topic hierarchy workflow .. code-block:: bash # process a single CSV file - wis2box data ingest --topic-hierarchy foo.bar.baz -p /path/to/file.csv + wis2box data ingest --metadata-id urn:wmo:md:centre-id:mydata -p /path/to/file.csv # process a directory of CSV files - wis2box data ingest --topic-hierarchy foo.bar.baz -p /path/to/dir + wis2box data ingest --metadata-id urn:wmo:md:centre-id:mydata -p /path/to/dir # process a directory of CSV files recursively - wis2box data ingest --topic-hierarchy foo.bar.baz -p /path/to/dir -r + wis2box data ingest --metadata-id urn:wmo:md:centre-id:mydata -p /path/to/dir -r -Implicit topic hierarchy workflow -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - +Implicit metadata_id workflow +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: bash - # process incoming data; topic hierarchy is inferred from fuzzy filepath equivalent - # wis2box will detect 'foo/bar/baz' as topic hierarchy 'foo.bar.baz' + # process incoming data; metadata_id is inferred from fuzzy filepath equivalent + # wis2box will detect 'mydata' as metadata_id 'urn:md:wmo:mydata' wis2box data ingest -p /path/to/foo/bar/baz/data/file.csv Event driven ingest, processing and publishing ---------------------------------------------- -Once all metadata and topic hierarchies are setup, event driven workflow +Once all datasets are setup, event driven workflow will immediately start to listen on files in the ``wis2box-incoming`` storage bucket as they are -placed in the appropriate topic hierarchy directory. +placed in the appropriate directory that can be matched to a metadata_id. diff --git a/docs/source/user/data-ingest.rst b/docs/source/user/data-ingest.rst index f6cbaf830..5652fd275 100644 --- a/docs/source/user/data-ingest.rst +++ b/docs/source/user/data-ingest.rst @@ -79,16 +79,21 @@ Select 'browse' on the ``wis2box-incoming`` bucket and select 'Choose or create :alt: MinIO new folder path .. note:: - The folder in which the file is placed defines the topic you want to share the data on and should match the datasets defined in your data mappings. + The folder in which the file is placed will be used to determine the dataset to which the file belongs. - The first 3 levels of the WIS2 topic hierarchy ``origin/a/wis2`` are automatically included by wis2box when publishing data notification messages. - - For example: + The wis2box-management container will match the path of the file to the dataset defined in the data mappings by checking it either contains the metadata identifier or the topic (excluding 'origin/a/wis2/'). - * data to be published on: ``origin/a/wis2/cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` - * upload data in the path: ``cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` + For example, using a filepath matching the metadata identifier: + + * Metadata identifier: ``urn:wmo:md:it-roma_met_centre:surface-weather-observations.synop`` + * upload data in path containing: ``it-roma_met_centre:surface-weather-observations.synop`` + + For example using a filepath matching the topic hierarchy: - The error message ``Topic Hierarchy validation error: No plugins for minio:9000/wis2box-incoming/... in data mappings`` indicates you stored a file in a folder for which no matching dataset was defined in the data mappings. + * Topic Hierarchy: ``origin/a/wis2/cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` + * upload data in the path containing: ``cd-brazza_met_centre/data/core/weather/surface-based-observations/synop`` + + The error message ``Path validation error: Could not match http://minio:9000/wis2box-incoming/... to dataset, ...`` indicates that a file was stored in a directory that could not be matched to a dataset. After uploading a file to ``wis2box-incoming`` storage bucket, you can browse the content in the ``wis2box-public`` bucket. If the data ingest was successful, new data will appear as follows: @@ -132,7 +137,8 @@ See below a Python example to upload data using the MinIO package: from minio import Minio filepath = '/home/wis2box-user/local-data/mydata.bin' - minio_path = '/it-roma_met_centre/data/core/weather/surface-based-observations/synop/' + # path should match the metadata or the topic in the data mappings + minio_path = 'urn:wmo:md:it-roma_met_centre:surface-weather-observations' endpoint = 'http://localhost:9000' WIS2BOX_STORAGE_USERNAME = 'wis2box' diff --git a/tests/data/metadata/discovery/cd-surface-weather-observations.yml b/tests/data/metadata/discovery/cd-surface-weather-observations.yml index 0a0a9d8c4..76dcda288 100644 --- a/tests/data/metadata/discovery/cd-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/cd-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P180D - topic_hierarchy: cd-brazza_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: cd-brazza_met_centre/data/core/weather/surface-based-observations/synop country: cog centre_id: cd-brazza_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/cn-grapes-geps-global.yml b/tests/data/metadata/discovery/cn-grapes-geps-global.yml index 180854e4e..035270e82 100644 --- a/tests/data/metadata/discovery/cn-grapes-geps-global.yml +++ b/tests/data/metadata/discovery/cn-grapes-geps-global.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: cn-cma.data.core.weather.prediction.forecast.medium-range.probabilistic.global + topic_hierarchy: cn-cma/data/core/weather/prediction/forecast/medium-range/probabilistic/global country: chn centre_id: cn-cma data_mappings: diff --git a/tests/data/metadata/discovery/dz-surface-weather-observations.yml b/tests/data/metadata/discovery/dz-surface-weather-observations.yml index 1d0b2a407..a850c49c7 100644 --- a/tests/data/metadata/discovery/dz-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/dz-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: dz-alger_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: dz-alger_met_centre/data/core/weather/surface-based-observations/synop country: dza centre_id: dz-alger_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/int-wmo-test-buoy.yml b/tests/data/metadata/discovery/int-wmo-test-buoy.yml index 102cb95fb..e439731de 100644 --- a/tests/data/metadata/discovery/int-wmo-test-buoy.yml +++ b/tests/data/metadata/discovery/int-wmo-test-buoy.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: int-wmo-test.data.core.weather.surface-based-observations.buoy + topic_hierarchy: int-wmo-test/data/core/weather/surface-based-observations/buoy country: int centre_id: int-wmo-test data_mappings: diff --git a/tests/data/metadata/discovery/int-wmo-test-ship.yml b/tests/data/metadata/discovery/int-wmo-test-ship.yml index 52417815f..56a37695e 100644 --- a/tests/data/metadata/discovery/int-wmo-test-ship.yml +++ b/tests/data/metadata/discovery/int-wmo-test-ship.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: int-wmo-test.data.core.weather.surface-based-observations.ship + topic_hierarchy: int-wmo-test/data/core/weather/surface-based-observations/ship country: int centre_id: int-wmo-test data_mappings: diff --git a/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml b/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml index eec608cb6..e6eeacd36 100644 --- a/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml +++ b/tests/data/metadata/discovery/int-wmo-test-wind_profiler.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: int-wmo-test.data.core.weather.surface-based-observations.wind_profiler + topic_hierarchy: int-wmo-test/data/core/weather/surface-based-observations/wind_profiler country: int centre_id: int-wmo-test data_mappings: diff --git a/tests/data/metadata/discovery/it-surface-weather-observations.yml b/tests/data/metadata/discovery/it-surface-weather-observations.yml index a64322c84..14e2c1a14 100644 --- a/tests/data/metadata/discovery/it-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/it-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: it-roma_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: it-roma_met_centre/data/core/weather/surface-based-observations/synop country: ita centre_id: it-roma_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/mw-surface-weather-observations.yml b/tests/data/metadata/discovery/mw-surface-weather-observations.yml index bcb14c489..bfd80d255 100644 --- a/tests/data/metadata/discovery/mw-surface-weather-observations.yml +++ b/tests/data/metadata/discovery/mw-surface-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: mw-mw_met_centre.data.core.weather.surface-based-observations.synop + topic_hierarchy: mw-mw_met_centre/data/core/weather/surface-based-observations/synop country: mwi centre_id: mw-mw_met_centre data_mappings: diff --git a/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml b/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml index 58fd2676e..e55d791bf 100644 --- a/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml +++ b/tests/data/metadata/discovery/ro-synoptic-weather-observations.yml @@ -1,6 +1,6 @@ wis2box: retention: P30D - topic_hierarchy: ro-rnimh.data.core.weather.surface-based-observations.synop + topic_hierarchy: ro-rnimh/data/core/weather/surface-based-observations/synop country: rou centre_id: ro-rnimh data_mappings: diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 4201a3636..9661b538f 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -167,7 +167,7 @@ def test_data_ingest(): assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500' assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa - item_source = f'2021-07-07/wis/mw-mw_met_centre/data/core/weather/surface-based-observations/synop/{item_api["reportId"]}.bufr4' # noqa + item_source = f'2021-07-07/wis/{ID}/{item_api["reportId"]}.bufr4' # noqa r = SESSION.get(f'{URL}/data/{item_source}') # noqa assert r.status_code == codes.ok @@ -247,12 +247,12 @@ def test_message_api(): # test messages per test dataset counts = { - 'mw_met_centre': 25, - 'roma_met_centre': 33, - 'alger_met_centre': 29, - 'rnimh': 50, - 'brazza_met_centre': 15, - 'wmo-test': 11, + 'mw-mw_met_centre': 25, + 'it-roma_met_centre': 33, + 'dz-alger_met_centre': 29, + 'ro-rnimh': 50, + 'cd-brazza_met_centre': 15, + 'int-wmo-test': 11, 'cn-cma': 11 } for key, value in counts.items(): @@ -267,9 +267,7 @@ def test_message_api(): assert r['numberMatched'] == sum(counts.values()) # we want to find a particular message with data ID - target_data_id = "cd-brazza_met_centre/data/core/weather/" \ - "surface-based-observations/synop/" \ - "WIGOS_0-20000-0-64406_20230803T090000" + target_data_id = "cd-brazza_met_centre:surface-weather-observations/WIGOS_0-20000-0-64406_20230803T090000" # noqa msg = None for feature in r['features']: diff --git a/wis2box-management/docker/entrypoint.sh b/wis2box-management/docker/entrypoint.sh index c357cc9a2..c403a6be2 100755 --- a/wis2box-management/docker/entrypoint.sh +++ b/wis2box-management/docker/entrypoint.sh @@ -29,18 +29,18 @@ set -e #ensure environment-variables are available for cronjob printenv | grep -v "no_proxy" >> /etc/environment -# ensure cron is running -service cron start -service cron status - # wis2box commands # TODO: avoid re-creating environment if it already exists # TODO: catch errors and avoid bounce in conjuction with restart: always +wis2box metadata discovery setup +wis2box metadata station setup wis2box environment create wis2box environment show wis2box api setup -wis2box metadata discovery setup -wis2box metadata station setup + +# ensure cron is running +service cron start +service cron status echo "Caching topic hierarchy CSVs" pywis-topics bundle sync diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index 349b8d839..c96601d4f 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -98,10 +98,7 @@ def setup_collection(meta: dict = {}) -> bool: LOGGER.error(f'Invalid configuration: {meta}') return False - if 'topic_hierarchy' in meta: - data_name = meta['topic_hierarchy'] - else: - data_name = meta['id'] + data_name = meta['id'] backend = load_backend() if not backend.has_collection(data_name): diff --git a/wis2box-management/wis2box/api/config/pygeoapi.py b/wis2box-management/wis2box/api/config/pygeoapi.py index d097cbdbd..9cc15d034 100644 --- a/wis2box-management/wis2box/api/config/pygeoapi.py +++ b/wis2box-management/wis2box/api/config/pygeoapi.py @@ -140,15 +140,15 @@ def prepare_collection(self, meta: dict) -> bool: editable = False - if meta['id'] in ['discovery-metadata', 'messages', 'stations']: - resource_id = meta['id'] - else: - resource_id = meta['topic_hierarchy'] + resource_id = meta['id'] if meta['id'] in ['discovery-metadata', 'stations']: editable = True + else: + # avoid colons in resource id + resource_id = resource_id.lower().replace(':', '-') - LOGGER.debug(f'Resource id: {resource_id}') + LOGGER.info(f'Prepare collection with resource_id={resource_id}') type_ = meta.get('type', 'feature') diff --git a/wis2box-management/wis2box/auth.py b/wis2box-management/wis2box/auth.py index c22edfebb..0728e875f 100644 --- a/wis2box-management/wis2box/auth.py +++ b/wis2box-management/wis2box/auth.py @@ -25,22 +25,23 @@ from secrets import token_hex from wis2box import cli_helpers -from wis2box.topic_hierarchy import validate_and_load +from wis2box.data_mappings import get_data_mappings from wis2box.env import AUTH_URL + LOGGER = logging.getLogger(__name__) -def create_token(topic: str, token: str) -> bool: +def create_token(path: str, token: str) -> bool: """ Creates a token with access control - :param topic: `str` topic hierarchy + :param path: `str` path :param token: `str` authentication token :returns: `bool` of result """ - data = {'topic': topic, 'token': token} + data = {'topic': path, 'token': token} r = requests.post(f'{AUTH_URL}/add_token', data=data) LOGGER.info(r.json().get('description')) @@ -48,16 +49,16 @@ def create_token(topic: str, token: str) -> bool: return r.ok -def delete_token(topic: str, token: str = '') -> bool: +def delete_token(path: str, token: str = '') -> bool: """ Creates a token with access control - :param topic: `str` topic hierarchy + :param path: `str` path :param token: `str` authentication token :returns: `bool` of result """ - data = {'topic': topic} + data = {'topic': path} if token != '': # Delete all tokens for a given th @@ -69,37 +70,35 @@ def delete_token(topic: str, token: str = '') -> bool: return r.ok -def is_resource_open(topic: str) -> bool: +def is_resource_open(path: str) -> bool: """ - Checks if topic has access control configured + Checks if path has access control configured - :param topic: `str` topic hierarchy + :param path: `str` path :returns: `bool` of result """ - headers = {'X-Original-URI': topic} + headers = {'X-Original-URI': path} r = requests.get(f'{AUTH_URL}/authorize', headers=headers) return r.ok -def is_token_authorized(topic: str, token: str) -> bool: +def is_token_authorized(path: str, token: str) -> bool: """ - Checks if token is authorized to access a topic + Checks if token is authorized to access a path - :param topic: `str` topic hierarchy + :param path: `str` path :param token: `str` authentication token :returns: `bool` of result """ headers = { - 'X-Original-URI': topic, + 'X-Original-URI': path, 'Authorization': f'Bearer {token}', } - r = requests.get(f'{AUTH_URL}/authorize', headers=headers) - return r.ok @@ -111,11 +110,10 @@ def auth(): @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY -def is_restricted_topic(ctx, topic_hierarchy): - """Check if topic has access control""" - th, _ = validate_and_load(topic_hierarchy) - click.echo(not is_resource_open(th.dotpath)) +@cli_helpers.OPTION_METADATA_ID +def is_restricted_dataset(ctx, metadata_id): + """Check if dataset has access control""" + click.echo(not is_resource_open(metadata_id)) @click.command() @@ -128,12 +126,11 @@ def is_restricted_path(ctx, path): @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_METADATA_ID @click.argument('token') -def has_access_topic(ctx, topic_hierarchy, token): - """Check if a token has access to a topic""" - th, _ = validate_and_load(topic_hierarchy) - click.echo(is_token_authorized(th.dotpath, token)) +def has_access_dataset(ctx, metadata_id, token): + """Check if a token has access to a dataset""" + click.echo(is_token_authorized(metadata_id, token)) @click.command() @@ -147,20 +144,22 @@ def has_access_path(ctx, path, token): @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_METADATA_ID @click.option('--path', '-p') @click.option('--yes', '-y', default=False, is_flag=True, help='Automatic yes') @click.argument('token', required=False) -def add_token(ctx, topic_hierarchy, path, yes, token): - """Add access token for a topic""" - - if topic_hierarchy is not None: - th, _ = validate_and_load(topic_hierarchy) - topic = th.dotpath +def add_token(ctx, metadata_id, path, yes, token): + """Add access token for a path or dataset""" + + if metadata_id is not None: + data_mappings = get_data_mappings() + if metadata_id not in data_mappings: + raise click.ClickException(f'Metadata identifier {metadata_id} not found in data mappings') # noqa + path = metadata_id elif path is not None: - topic = path + path = path else: - raise click.ClickException('Missing path or topic hierarchy') + raise click.ClickException('Missing path or metadata_id') token = token_hex(32) if token is None else token if yes: @@ -168,33 +167,35 @@ def add_token(ctx, topic_hierarchy, path, yes, token): elif not click.confirm(f'Continue with token: {token}', prompt_suffix='?'): return - if create_token(topic, token): + if create_token(path, token): click.echo('Token successfully created') @click.command() @click.pass_context -@cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_METADATA_ID @click.option('--path', '-p') @click.argument('token', required=False, nargs=-1) -def remove_token(ctx, topic_hierarchy, path, token): - """Delete one to many tokens for a topic""" - - if topic_hierarchy is not None: - th, _ = validate_and_load(topic_hierarchy) - topic = th.dotpath +def remove_token(ctx, metadata_id, path, token): + """Delete one to many tokens for a dataset""" + + if metadata_id is not None: + data_mappings = get_data_mappings() + if metadata_id not in data_mappings: + raise click.ClickException(f'Metadata identifier {metadata_id} not found in data mappings') # noqa + path = metadata_id elif path is not None: - topic = path + path = path else: - raise click.ClickException('Missing path or topic hierarchy') + raise click.ClickException('Missing path or metadata_id') - if delete_token(topic, token): + if delete_token(path, token): click.echo('Token successfully deleted') auth.add_command(add_token) auth.add_command(remove_token) -auth.add_command(has_access_topic) +auth.add_command(has_access_dataset) auth.add_command(has_access_path) -auth.add_command(is_restricted_topic) +auth.add_command(is_restricted_dataset) auth.add_command(is_restricted_path) diff --git a/wis2box-management/wis2box/cli_helpers.py b/wis2box-management/wis2box/cli_helpers.py index 627fa1397..f9771ee86 100644 --- a/wis2box-management/wis2box/cli_helpers.py +++ b/wis2box-management/wis2box/cli_helpers.py @@ -37,6 +37,9 @@ OPTION_TOPIC_HIERARCHY = click.option('--topic-hierarchy', '-th', help='Topic hierarchy') +OPTION_METADATA_ID = click.option('--metadata-id', '-mdi', + help='Metadata identifier') + OPTION_RECURSIVE = click.option('--recursive', '-r', default=False, is_flag=True, help='Process directory recursively') diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index e2d70ce5b..d68e82aed 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -37,7 +37,6 @@ from wis2box.storage import put_data, move_data, list_content, delete_data from wis2box.util import older_than, walk_path - LOGGER = logging.getLogger(__name__) @@ -194,20 +193,31 @@ def clean(ctx, days, verbosity): @click.command() @click.pass_context @cli_helpers.OPTION_TOPIC_HIERARCHY +@cli_helpers.OPTION_METADATA_ID @cli_helpers.OPTION_PATH @cli_helpers.OPTION_RECURSIVE @cli_helpers.OPTION_VERBOSITY -def ingest(ctx, topic_hierarchy, path, recursive, verbosity): +def ingest(ctx, topic_hierarchy, metadata_id, path, recursive, verbosity): """Ingest data file or directory""" - data_mappings = get_data_mappings() + # either topic_hierarchy or metadata_id must be provided + if topic_hierarchy and metadata_id: + raise click.ClickException('Only one of topic_hierarchy or metadata_id can be provided') # noqa + + if not topic_hierarchy and not metadata_id: + raise click.ClickException('Please specify a metadata_id using the option --metadata-id') # noqa + + rfp = None + if metadata_id: + data_mappings = get_data_mappings() + if metadata_id not in data_mappings: + raise click.ClickException(f'metadata_id={metadata_id} not found in data mappings') # noqa + rfp = metadata_id + else: + rfp = topic_hierarchy.replace('.', '/') for file_to_process in walk_path(path, '.*', recursive): click.echo(f'Processing {file_to_process}') - handler = Handler(filepath=file_to_process, - topic_hierarchy=topic_hierarchy, - data_mappings=data_mappings) - rfp = handler.topic_hierarchy.dirpath path = f'{STORAGE_INCOMING}/{rfp}/{file_to_process.name}' with file_to_process.open('rb') as fh: diff --git a/wis2box-management/wis2box/data/base.py b/wis2box-management/wis2box/data/base.py index fb3961ff1..340cf56b0 100644 --- a/wis2box-management/wis2box/data/base.py +++ b/wis2box-management/wis2box/data/base.py @@ -25,11 +25,11 @@ import re from typing import Iterator, Union -from wis2box.env import (STORAGE_INCOMING, STORAGE_PUBLIC, +from wis2box.env import (STORAGE_PUBLIC, STORAGE_SOURCE, BROKER_PUBLIC, DOCKER_BROKER) from wis2box.storage import exists, get_data, put_data -from wis2box.topic_hierarchy import TopicHierarchy + from wis2box.plugin import load_plugin, PLUGINS from wis2box.pubsub.message import WISNotificationMessage @@ -51,9 +51,10 @@ def __init__(self, defs: dict) -> None: LOGGER.debug('Parsing resource mappings') self.filename = None - self.incoming_filepath = None - self.topic_hierarchy = TopicHierarchy(defs['topic_hierarchy']) - self.template = defs.get('template', None) + self.incoming_filepath = defs.get('incoming_filepath') + self.metadata_id = defs.get('metadata_id') + self.topic_hierarchy = defs.get('topic_hierarchy') + self.template = defs.get('template') self.file_filter = defs.get('pattern', '.*') self.enable_notification = defs.get('notify', False) self.buckets = defs.get('buckets', ()) @@ -62,9 +63,10 @@ def __init__(self, defs: dict) -> None: # if discovery_metadata: # self.setup_discovery_metadata(discovery_metadata) - def publish_failure_message(self, description, wsi=None): + def publish_failure_message(self, description, wsi=None, identifier=None): message = { - 'filepath': self.incoming_filepath, + 'incoming_filepath': self.incoming_filepath, + 'identifier': identifier, 'description': description } if wsi is not None: @@ -91,10 +93,6 @@ def setup_discovery_metadata(self, discovery_metadata: dict) -> None: """ self.discovery_metadata = discovery_metadata - - self.topic_hierarchy = TopicHierarchy( - discovery_metadata['metadata']['identifier']) - self.country = discovery_metadata['wis2box']['country'] self.centre_id = discovery_metadata['wis2box']['centre_id'] @@ -146,13 +144,14 @@ def notify(self, identifier: str, storage_path: str, LOGGER.info('Publishing WISNotificationMessage to public broker') LOGGER.debug(f'Prepare message for: {storage_path}') - topic = f'origin/a/wis2/{self.topic_hierarchy.dirpath}' - data_id = topic.replace('origin/a/wis2/', '') + topic = self.topic_hierarchy + metadata_id = self.metadata_id operation = 'create' if is_update is False else 'update' wis_message = WISNotificationMessage( - identifier, data_id, storage_path, datetime_, geometry, + f"{metadata_id.replace('urn:wmo:md:','')}/{identifier}", + metadata_id, storage_path, datetime_, geometry, wigos_station_identifier, operation) # load plugin for public broker @@ -249,12 +248,10 @@ def publish_item(self, identifier, item) -> bool: if self.enable_notification and is_new: LOGGER.debug('Sending notification to broker') - try: datetime_ = item['_meta']['properties']['datetime'] except KeyError: datetime_ = item['_meta'].get('data_date') - self.notify(identifier, storage_path, datetime_, item['_meta'].get('geometry'), wsi, is_update) @@ -262,9 +259,10 @@ def publish_item(self, identifier, item) -> bool: LOGGER.debug('No notification sent') except Exception as err: msg = f'Failed to publish item {identifier}: {err}' - LOGGER.error(msg, exc_info=True) + LOGGER.error(msg) self.publish_failure_message( description='Failed to publish item', + identifier=identifier, wsi=wsi) return True @@ -299,17 +297,6 @@ def files(self) -> Iterator[str]: yield f'{STORAGE_PUBLIC}/{rfp}/{identifier}.{format_}' - @property - def directories(self): - """Dataset directories""" - - dirpath = self.topic_hierarchy.dirpath - - return { - 'incoming': f'{STORAGE_INCOMING}/{dirpath}', - 'public': f'{STORAGE_PUBLIC}/{dirpath}' - } - def get_public_filepath(self): """Public filepath""" diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index a9b81141d..0858c374d 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -84,4 +84,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/bufr4.py b/wis2box-management/wis2box/data/bufr4.py index d15203703..82fed6c3a 100644 --- a/wis2box-management/wis2box/data/bufr4.py +++ b/wis2box-management/wis2box/data/bufr4.py @@ -53,7 +53,7 @@ def transform(self, input_data: Union[Path, bytes], payload = { 'inputs': { - 'channel': self.topic_hierarchy.dirpath, + 'channel': self.topic_hierarchy.replace('origin/a/wis2/', ''), 'notify': False, 'data': data } @@ -100,4 +100,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/csv2bufr.py b/wis2box-management/wis2box/data/csv2bufr.py index 1823f3cda..767a8a2cb 100644 --- a/wis2box-management/wis2box/data/csv2bufr.py +++ b/wis2box-management/wis2box/data/csv2bufr.py @@ -65,7 +65,7 @@ def transform(self, input_data: Union[Path, bytes], payload = { 'inputs': { - 'channel': self.topic_hierarchy.dirpath, + 'channel': self.topic_hierarchy.replace('origin/a/wis2/', ''), 'template': self.template, 'notify': False, 'data': data @@ -113,4 +113,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath) + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/geojson.py b/wis2box-management/wis2box/data/geojson.py index 5353df838..a9df7b62a 100644 --- a/wis2box-management/wis2box/data/geojson.py +++ b/wis2box-management/wis2box/data/geojson.py @@ -65,10 +65,10 @@ def publish(self) -> bool: continue LOGGER.debug('Publishing data to API') - upsert_collection_item(self.topic_hierarchy.dotpath, the_data) + upsert_collection_item(self.metadata_id, the_data) return True def get_local_filepath(self, date_): yyyymmdd = date_[0:10] # date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/message.py b/wis2box-management/wis2box/data/message.py index ce75c8376..513a23b14 100644 --- a/wis2box-management/wis2box/data/message.py +++ b/wis2box-management/wis2box/data/message.py @@ -76,4 +76,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/synop2bufr.py b/wis2box-management/wis2box/data/synop2bufr.py index 5b9d317be..78e00aebb 100644 --- a/wis2box-management/wis2box/data/synop2bufr.py +++ b/wis2box-management/wis2box/data/synop2bufr.py @@ -75,7 +75,7 @@ def transform(self, input_data: Union[Path, bytes], # post data do wis2box-api/oapi/processes/synop2bufr payload = { 'inputs': { - 'channel': self.topic_hierarchy.dirpath, + 'channel': self.topic_hierarchy.replace('origin/a/wis2/', ''), 'year': year, 'month': month, 'notify': False, @@ -124,4 +124,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return (Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath) + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data/universal.py b/wis2box-management/wis2box/data/universal.py index b985b59a8..1d75e41d3 100644 --- a/wis2box-management/wis2box/data/universal.py +++ b/wis2box-management/wis2box/data/universal.py @@ -80,4 +80,4 @@ def transform(self, input_data: Union[Path, bytes], def get_local_filepath(self, date_): yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index 6f4deab54..37300d93d 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -21,6 +21,8 @@ import logging +from typing import Any, Tuple + from owslib.ogcapi.records import Records from wis2box.env import (DOCKER_BROKER, DOCKER_API_URL) @@ -63,13 +65,88 @@ def get_data_mappings() -> dict: continue if 'data_mappings' not in record['wis2box']: continue - key = record['wis2box']['topic_hierarchy'] value = record['wis2box']['data_mappings'] - value['metadata_id'] = record['id'] - data_mappings[key] = value + if 'wmo:topicHierarchy' not in record['properties']: + LOGGER.error(f'No topic hierarchy for {record["id"]}') + continue + value['topic_hierarchy'] = record['properties']['wmo:topicHierarchy'] # noqa + metadata_id = record['id'] + data_mappings[metadata_id] = value except Exception as err: msg = f'Issue loading data mappings: {err}' LOGGER.error(msg) raise EnvironmentError(msg) return data_mappings + + +def validate_and_load(path: str, + data_mappings: dict = None, + file_type: str = None + ) -> Tuple[str, Tuple[Any]]: + """ + Validate path and load plugins + + :param path: `str` of path + :param data_mappings: `dict` of data mappings + :param file_type: `str` the type of file we are processing, e.g. csv, bufr, xml # noqa + :param fuzzy: `bool` of whether to do fuzzy matching of path + (e.g. "*foo.bar.baz*). + Defaults to `False` (i.e. "foo.bar.baz") + + :returns: tuple of metadata_id and list of plugins objects + """ + + LOGGER.debug(f'Validating path: {path}') + LOGGER.debug(f'Data mappings {data_mappings}') + + metadata_id = None + topic_hierarchy = None + # determine if path matches a metadata_id + for key in data_mappings.keys(): + if key in path: + metadata_id = key + topic_hierarchy = data_mappings[key]['topic_hierarchy'] + # else try to match topic_hierarchy + if metadata_id is None: + for key, value in data_mappings.items(): + if (value['topic_hierarchy']).replace('origin/a/wis2/', '') in path: # noqa + metadata_id = key + topic_hierarchy = value['topic_hierarchy'] + break + if metadata_id is None: + options = [key for key in data_mappings.keys()] # noqa + options += [v['topic_hierarchy'].replace('origin/a/wis2/', '') for v in data_mappings.values()] # noqa + msg = f'Could not match {path} to dataset, path should include one of the following: {options}' # noqa + raise ValueError(msg) + + plugins = data_mappings[metadata_id]['plugins'] + + if file_type is None: + LOGGER.warning('File type missing') + file_type = next(iter(plugins)) + LOGGER.debug(f'File type set to first type: {file_type}') + + if file_type not in plugins: + msg = f'Unknown file type ({file_type}) for metadata_id={metadata_id}. Did not match any of the following:' # noqa + msg += ', '.join(plugins) + raise ValueError(msg) + + LOGGER.debug(f'Adding plugin definition for {file_type}') + + def data_defs(plugin): + return { + 'metadata_id': metadata_id, + 'รญncoming_filepath': path, + 'topic_hierarchy': topic_hierarchy, + 'codepath': plugin['plugin'], + 'pattern': plugin['file-pattern'], + 'template': plugin.get('template'), + 'buckets': plugin.get('buckets', ()), + 'notify': plugin.get('notify', False), + 'format': file_type + } + + plugins_ = [load_plugin('data', data_defs(p), data_mappings) + for p in plugins[file_type]] + return metadata_id, plugins_ diff --git a/wis2box-management/wis2box/handler.py b/wis2box-management/wis2box/handler.py index e39dc358d..1230b3afa 100644 --- a/wis2box-management/wis2box/handler.py +++ b/wis2box-management/wis2box/handler.py @@ -25,7 +25,7 @@ from wis2box.api import upsert_collection_item from wis2box.storage import get_data -from wis2box.topic_hierarchy import validate_and_load +from wis2box.data_mappings import validate_and_load from wis2box.plugin import load_plugin from wis2box.plugin import PLUGINS @@ -37,7 +37,7 @@ class Handler: def __init__(self, filepath: str, - topic_hierarchy: str = None, + metadata_id: str = None, data_mappings: dict = None) -> None: self.filepath = filepath self.plugins = () @@ -56,22 +56,14 @@ def __init__(self, filepath: str, if self.filepath.startswith('http'): self.input_bytes = get_data(self.filepath) - if topic_hierarchy is not None: - th = topic_hierarchy - fuzzy = False - else: - th = self.filepath - fuzzy = True - - if '/metadata/' in th: + if '/metadata/' in self.filepath: msg = 'Passing on handling metadata in workflow' raise NotHandledError(msg) - try: - self.topic_hierarchy, self.plugins = validate_and_load( - th, data_mappings, self.filetype, fuzzy=fuzzy) + self.metadata_id, self.plugins = validate_and_load( + self.filepath, data_mappings, self.filetype) except Exception as err: - msg = f'Topic Hierarchy validation error: {err}' + msg = f'Path validation error: {err}' # errors in public storage are not handled if STORAGE_PUBLIC in self.filepath: raise NotHandledError(msg) @@ -132,7 +124,7 @@ def handle(self) -> bool: return True def publish(self) -> bool: - index_name = self.topic_hierarchy.dotpath + index_name = self.metadata_id if self.input_bytes: geojson = json.load(self.input_bytes) upsert_collection_item(index_name, geojson) diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 3cceae2d2..b394d6e16 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -178,8 +178,12 @@ def publish_broker_message(record: dict, storage_path: str, topic = f'origin/a/wis2/{centre_id.lower()}/metadata' # noqa datetime_ = datetime.strptime(record['properties']['created'], '%Y-%m-%dT%H:%M:%SZ') # noqa - wis_message = WISNotificationMessage(record['id'], topic, storage_path, - datetime_, record['geometry']).dumps() + identifier = f"{centre_id.lower()}/metadata/{record['id']}" + wis_message = WISNotificationMessage(identifier=identifier, + metadata_id=None, + filepath=storage_path, + datetime_=datetime_, + geometry=record['geometry']).dumps() # load plugin for plugin-broker defs = { diff --git a/wis2box-management/wis2box/pubsub/message.py b/wis2box-management/wis2box/pubsub/message.py index 9e51aff7b..d036a630e 100644 --- a/wis2box-management/wis2box/pubsub/message.py +++ b/wis2box-management/wis2box/pubsub/message.py @@ -53,19 +53,16 @@ class PubSubMessage: Generic message class """ - def __init__(self, type_: str, identifier: str, topic: str, filepath: str, - datetime_: datetime, geometry: dict = None, - wigos_station_identifier: str = None) -> None: + def __init__(self, type_: str, identifier: str, filepath: str, + datetime_: datetime, geometry: dict = None) -> None: """ Initializer :param type_: message type :param identifier: identifier - :param topic: topic :param filepath: `Path` of file :param datetime_: `datetime` object of temporal aspect of data :param geometry: `dict` of GeoJSON geometry object - :param wigos_station_identifier: WSI associated with the data :returns: `wis2box.pubsub.message.PubSubMessage` message object """ @@ -128,16 +125,16 @@ def _generate_checksum(self, bytes, algorithm: SecureHashAlgorithms) -> str: # class WISNotificationMessage(PubSubMessage): - def __init__(self, identifier: str, topic: str, filepath: str, + def __init__(self, identifier: str, metadata_id: str, filepath: str, datetime_: str, geometry=None, wigos_station_identifier=None, operation: str = 'create') -> None: super().__init__('wis2-notification-message', identifier, - topic, filepath, datetime_, geometry) + filepath, datetime_, geometry) - data_id = f'{topic}/{self.identifier}'.replace('origin/a/wis2/', '') + data_id = f'{self.identifier}' - if '/metadata' in topic: + if '/metadata' in filepath: mimetype = 'application/geo+json' else: suffix = self.filepath.split('.')[-1] @@ -186,6 +183,9 @@ def __init__(self, identifier: str, topic: str, filepath: str, 'generated-by': f'wis2box {__version__}' } + if metadata_id is not None: + self.message['properties']['metadata_id'] = metadata_id + if self.length < 4096: LOGGER.debug('Including data inline via properties.content') content_value = base64.b64encode(self.filebytes) diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index 74e4b656f..8595f5e11 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -61,16 +61,15 @@ def handle(self, filepath): handler = Handler(filepath=filepath, data_mappings=self.data_mappings) if handler.handle(): - LOGGER.info('Data processed') + LOGGER.debug('Data processed') for plugin in handler.plugins: for filepath in plugin.files(): - LOGGER.info(f'Public filepath: {filepath}') + LOGGER.debug(f'Public filepath: {filepath}') except NotHandledError as err: - msg = f'not handled error: {err}' - LOGGER.debug(msg) + msg = f'not handled: {err}' + LOGGER.info(msg) except ValueError as err: - msg = f'handle() error: {err}' - LOGGER.error(msg) + LOGGER.error(err) except Exception as err: msg = f'handle() error: {err}' raise err diff --git a/wis2box-management/wis2box/topic_hierarchy.py b/wis2box-management/wis2box/topic_hierarchy.py deleted file mode 100644 index 9501bad3b..000000000 --- a/wis2box-management/wis2box/topic_hierarchy.py +++ /dev/null @@ -1,156 +0,0 @@ -############################################################################### -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -############################################################################### - -from fnmatch import fnmatch -import logging -from pathlib import Path -from typing import Any, Tuple, Union - -# from pywis_topics.topics import TopicHierarchy as pywis_topics_th - -from wis2box.data_mappings import get_data_mappings -from wis2box.plugin import load_plugin - -LOGGER = logging.getLogger(__name__) - - -class TopicHierarchy: - def __init__(self, path: Union[Path, str]) -> None: - self.path = str(path) - self.dotpath = None - self.dirpath = None - - if not self.path.startswith('origin/a/wis2'): - self.fullpath = f'origin/a/wis2/{self.dirpath}' - else: - self.fullpath = self.dirpath - - if '/' in self.path: - LOGGER.debug('Transforming from directory to dotted path') - self.dirpath = self.path - self.dotpath = self.path.replace('/', '.') - elif '.' in self.path: - LOGGER.debug('Transforming from dotted to directory path') - self.dotpath = self.path - self.dirpath = self.path.replace('.', '/') - - def is_valid(self) -> bool: - """ - Determines whether a topic hierarchy is valid - - :returns: `bool` of whether the topic hierarchy is valid - """ - - # TODO: uncomment once WTH is approved - # LOGGER.debug(f'Validating topic {self.dirpath} (fuzzy match)') - # th = pywis_topics_th() - # return th.validate(self.fullpath) - - return True - - -def validate_and_load(topic_hierarchy: str, - data_mappings: dict = None, - file_type: str = None, - fuzzy: bool = False - ) -> Tuple[TopicHierarchy, Tuple[Any]]: - """ - Validate topic hierarchy and load plugins - - :param topic_hierarchy: `str` of topic hierarchy path - :param data_mappings: `dict` of data mappings - :param file_type: `str` the type of file we are processing, e.g. csv, bufr, xml # noqa - :param fuzzy: `bool` of whether to do fuzzy matching of topic hierarchy - (e.g. "*foo.bar.baz*). - Defaults to `False` (i.e. "foo.bar.baz") - - :returns: tuple of `wis2box.topic_hierarchy.TopicHierarchy` and - list of plugins objects - """ - - LOGGER.debug(f'Validating topic hierarchy: {topic_hierarchy}') - LOGGER.debug(f'Data mappings {data_mappings}') - - if not data_mappings: - msg = 'Data mappings are empty. Fetching' - data_mappings = get_data_mappings() - - th = TopicHierarchy(topic_hierarchy) - found = False - - if not th.is_valid(): - msg = 'Invalid topic hierarchy' - raise ValueError(msg) - - if fuzzy: - LOGGER.debug('Searching data mappings for fuzzy topic match') - for topic, defs in data_mappings.items(): - - pattern = f'*{topic}*' - LOGGER.debug(f'Attempting to fuzzy match with {pattern}') - if fnmatch(th.dotpath, pattern): - - LOGGER.debug(f'Matched topic {th.path} on {th.dotpath}') - found = True - plugins = defs['plugins'] - - LOGGER.debug(f'Reloading topic to {topic}') - th = TopicHierarchy(topic) - - else: - LOGGER.debug('Searching data mappings for exact topic match') - if th.dotpath in data_mappings: - - LOGGER.debug(f'Matched topic {th.path} on {th.dotpath}') - found = True - plugins = data_mappings[th.dotpath]['plugins'] - - if not found: - msg = f'No plugins for {th.path} in data mappings. Did not match any of the following: ' # noqa - msg += ', '.join(data_mappings.keys()) - raise ValueError(msg) - - if file_type is None: - LOGGER.warning('File type missing') - file_type = next(iter(plugins)) - LOGGER.debug(f'File type set to first type: {file_type}') - - if file_type not in plugins: - msg = f'Unknown file type ({file_type}) for topic {th.dotpath}. Did not match any of the following:' # noqa - msg += ', '.join(plugins) - raise ValueError(msg) - - LOGGER.debug(f'Adding plugin definition for {file_type}') - - def data_defs(plugin): - return { - 'topic_hierarchy': th.dotpath, - 'codepath': plugin['plugin'], - 'pattern': plugin['file-pattern'], - 'template': plugin.get('template'), - 'buckets': plugin.get('buckets', ()), - 'notify': plugin.get('notify', False), - 'format': file_type - } - - plugins_ = [load_plugin('data', data_defs(p), data_mappings) - for p in plugins[file_type]] - return th, plugins_