From ec8f1b3bc7eb1d80d268a1e0ae812536c6881c0f Mon Sep 17 00:00:00 2001 From: Maaike Date: Wed, 27 Nov 2024 15:58:37 +0100 Subject: [PATCH 01/18] first version reorganizing the data-collection for bufr2geojson output --- wis2box-management/wis2box/api/__init__.py | 29 ++++++++- .../wis2box/api/backend/elastic.py | 62 ++++++++++++++++++- wis2box-management/wis2box/data/__init__.py | 8 +-- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index bab16b84..5db7679a 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -232,6 +232,28 @@ def delete_collections_by_retention(days: int) -> None: backend.delete_collections_by_retention(days) +def get_plugins(record: dict) -> list: + """ + Get plugins from record + + :param record: `dict` of record + + :returns: `list` of plugins + """ + + plugins = [] + + try: + dm = record['wis2box']['data_mappings'] + for filetype in dm['plugins'].keys(): + for p in dm['plugins'][filetype]: + plugins.append(p['plugin']) + except Exception as e: + LOGGER.info(f"No plugins found for record-id={record['id']} : {e}") + + return plugins + + @click.group() def api(): """API management""" @@ -258,10 +280,15 @@ def setup(ctx, verbosity): except Exception as err: click.echo(f'Issue loading discovery-metadata: {err}') return False + # loop over records and add data-collection when bufr2geojson is used for record in records['features']: metadata_id = record['id'] + plugins = get_plugins(record) + LOGGER.info(f'Plugins used by {metadata_id}: {plugins}') + if 'wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON' not in plugins: # noqa + continue if metadata_id not in api_collections: - click.echo(f'Adding collection: {metadata_id}') + click.echo(f'Adding data-collection for: {metadata_id}') from wis2box.data import gcm meta = gcm(record) setup_collection(meta=meta) diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index 81220b34..0b6ab4bf 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -103,6 +103,57 @@ } } +MAPPINGS_OBS = { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + }, + 'properties': { + 'properties': { + 'name': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'observationTime': { + 'type': 'date', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'phenomenonTime': { + 'type': 'text' + }, + 'wigos_station_identifier': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'units': { + 'type': 'text' + }, + 'value': { + 'type': 'float', + 'coerce': True + }, + 'description': { + 'type': 'text' + }, + 'reportId': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + } + } + } +} + MAPPINGS_STATIONS = { 'properties': { 'geometry': { @@ -216,8 +267,10 @@ def add_collection(self, collection_id: str) -> dict: if collection_id == 'stations': mappings = MAPPINGS_STATIONS - else: + elif collection_id in ['discovery-metadata', 'messages']: mappings = MAPPINGS + else: + mappings = MAPPINGS_OBS es_index = self.es_id(collection_id) @@ -316,8 +369,11 @@ def gendata(features): '_id': feature['id'], '_source': feature } - - helpers.bulk(self.conn, gendata(items)) + success, errors = helpers.bulk(self.conn, gendata(items), raise_on_error=False) # noqa + if errors: + for error in errors: + LOGGER.error(f"Indexing error: {error}") + raise RuntimeError(f"Upsert failed with {len(errors)} errors") def delete_collection_item(self, collection_id: str, item_id: str) -> str: """ diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index d68e82ae..1c5c8c8d 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -124,13 +124,13 @@ def gcm(mcf: Union[dict, str]) -> dict: 'id': generated['id'], 'type': 'feature', 'topic_hierarchy': generated['properties']['wmo:topicHierarchy'].replace('origin/a/wis2/', '').replace('/', '.'), # noqa: E501 - 'title': generated['properties']['title'], - 'description': generated['properties']['description'], + 'title': f'bufr2geojson output ({generated["id"]})', + 'description': f'Output published by bufr2geojson for dataset with id={generated["id"]}', # noqa 'keywords': generated['properties']['keywords'], 'bbox': bbox, 'links': generated['links'], 'id_field': 'id', - 'time_field': 'resultTime', + 'time_field': 'observationTime', 'title_field': 'id' } @@ -145,7 +145,7 @@ def add_collection_data(metadata: str): """ meta = gcm(metadata) - + LOGGER.info(f'Adding data-collection for {meta["id"]}') setup_collection(meta=meta) return From 4ad46e6fa7342685aeb61961d7fc668a5930224c Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 2 Dec 2024 10:12:20 +0100 Subject: [PATCH 02/18] use get_plugins to decide whether or not to add oafeat-link --- wis2box-management/wis2box/api/__init__.py | 28 ++-------- wis2box-management/wis2box/data_mappings.py | 22 ++++++++ .../wis2box/metadata/discovery.py | 53 +++++++++++-------- 3 files changed, 57 insertions(+), 46 deletions(-) diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index 5db7679a..51cf51e2 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -30,6 +30,8 @@ from wis2box import cli_helpers from wis2box.api.backend import load_backend from wis2box.api.config import load_config +from wis2box.data_mappings import get_plugins + from wis2box.env import (DOCKER_API_URL, API_URL) LOGGER = logging.getLogger(__name__) @@ -232,28 +234,6 @@ def delete_collections_by_retention(days: int) -> None: backend.delete_collections_by_retention(days) -def get_plugins(record: dict) -> list: - """ - Get plugins from record - - :param record: `dict` of record - - :returns: `list` of plugins - """ - - plugins = [] - - try: - dm = record['wis2box']['data_mappings'] - for filetype in dm['plugins'].keys(): - for p in dm['plugins'][filetype]: - plugins.append(p['plugin']) - except Exception as e: - LOGGER.info(f"No plugins found for record-id={record['id']} : {e}") - - return plugins - - @click.group() def api(): """API management""" @@ -285,7 +265,9 @@ def setup(ctx, verbosity): metadata_id = record['id'] plugins = get_plugins(record) LOGGER.info(f'Plugins used by {metadata_id}: {plugins}') - if 'wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON' not in plugins: # noqa + # check if any plugin-names contains 2geojson + has_2geojson = any('2geojson' in plugin for plugin in plugins) + if has_2geojson is False: continue if metadata_id not in api_collections: click.echo(f'Adding data-collection for: {metadata_id}') diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index b81b131a..cc5cdd22 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -31,6 +31,28 @@ LOGGER = logging.getLogger(__name__) +def get_plugins(record: dict) -> list: + """ + Get plugins from record + + :param record: `dict` of record + + :returns: `list` of plugins + """ + + plugins = [] + + try: + dm = record['wis2box']['data_mappings'] + for filetype in dm['plugins'].keys(): + for p in dm['plugins'][filetype]: + plugins.append(p['plugin']) + except Exception as e: + LOGGER.info(f"No plugins found for record-id={record['id']} : {e}") + + return plugins + + def refresh_data_mappings(): # load plugin for local broker and publish refresh request defs_local = { diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 332275c7..d32c69a3 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -35,7 +35,8 @@ from wis2box import cli_helpers from wis2box.api import (delete_collection_item, remove_collection, setup_collection, upsert_collection_item) -from wis2box.data_mappings import refresh_data_mappings +from wis2box.data_mappings import refresh_data_mappings, get_plugins + from wis2box.env import (API_URL, BROKER_PUBLIC, DOCKER_API_URL, STORAGE_PUBLIC, STORAGE_SOURCE, URL) from wis2box.metadata.base import BaseMetadata @@ -76,17 +77,7 @@ def generate(self, mcf: dict) -> str: if md['identification']['extents']['temporal'][0].get('begin', 'BEGIN_DATE') is None: # noqa today = date.today().strftime('%Y-%m-%d') md['identification']['extents']['temporal'][0]['begin'] = today - - LOGGER.debug('Adding distribution links') - oafeat_link, mqp_link, canonical_link = self.get_distribution_links( - identifier, mqtt_topic, format_='mcf') - - md['distribution'] = { - 'oafeat': oafeat_link, - 'mqtt': mqp_link, - 'canonical': canonical_link - } - + LOGGER.debug('Adding data policy') md['identification']['wmo_data_policy'] = mqtt_topic.split('/')[5] @@ -95,6 +86,13 @@ def generate(self, mcf: dict) -> str: record['properties']['wmo:topicHierarchy'] = mqtt_topic record['wis2box'] = mcf['wis2box'] + LOGGER.debug('Adding distribution links') + distribution_links = self.get_distribution_links( + record, + format_='wcmp2') + # update links, do not extend or we get duplicates + record['links'] = distribution_links + if record['properties']['contacts'][0].get('organization') is None: record['properties']['contacts'][0]['organization'] = record['properties']['contacts'][0].pop('name', "NOTSET") # noqa @@ -118,26 +116,35 @@ def generate(self, mcf: dict) -> str: return record - def get_distribution_links(self, identifier: str, topic: str, + def get_distribution_links(self, + record: dict, format_='mcf') -> list: """ Generates distribution links - :param identifier: `str` of metadata identifier - :param topic: `str` of associated topic + :param record: `dict` of discovery metadata record :param format_: `str` of format (`mcf` or `wcmp2`) :returns: `list` of distribution links """ LOGGER.debug('Adding distribution links') - oafeat_link = { - 'href': f"{API_URL}/collections/{identifier}?f=json", - 'type': 'application/json', - 'name': identifier, - 'description': identifier, - 'rel': 'collection' - } + + identifier = record['id'] + topic = record['properties']['wmo:topicHierarchy'] + + oafeat_link = None + plugins = get_plugins(record) + # check if any plugin-names contains 2geojson + has_2geojson = any('2geojson' in plugin for plugin in plugins) + if has_2geojson: + oafeat_link = { + 'href': f"{API_URL}/collections/{identifier}?f=json", + 'type': 'application/json', + 'name': identifier, + 'description': f'Observations in json format for {identifier}', + 'rel': 'collection' + } mqp_link = { 'href': get_broker_public_endpoint(), @@ -240,7 +247,7 @@ def publish_discovery_metadata(metadata: Union[dict, str]): record = metadata dm = DiscoveryMetadata() distribution_links = dm.get_distribution_links( - record['id'], record['properties']['wmo:topicHierarchy'], + record, format_='wcmp2') # update links, do not extend or we get duplicates record['links'] = distribution_links From e36acde7738c6f111499e816068f25ec93e12a66 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 2 Dec 2024 10:27:29 +0100 Subject: [PATCH 03/18] add ignore for new medium zap vulnerability --- .zap/rules.tsv | 1 + 1 file changed, 1 insertion(+) diff --git a/.zap/rules.tsv b/.zap/rules.tsv index 4a191ad7..fcdad969 100644 --- a/.zap/rules.tsv +++ b/.zap/rules.tsv @@ -21,3 +21,4 @@ 10036 IGNORE "Server Leaks Version Information via ""Server"" HTTP Response Header Field" Low 10110 IGNORE Dangerous JS Functions Low 10105 IGNORE Authentication Credentials Captured Medium +10003 IGNORE Vulnerable JS Library Medium From 9b1e9f3ad4112f9fb28be5fea50c129292461f91 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 2 Dec 2024 13:50:54 +0100 Subject: [PATCH 04/18] add distributions-links after record prepared --- .../wis2box/metadata/discovery.py | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index d32c69a3..a7e25787 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -81,18 +81,14 @@ def generate(self, mcf: dict) -> str: LOGGER.debug('Adding data policy') md['identification']['wmo_data_policy'] = mqtt_topic.split('/')[5] + # md set 'distribution' to empty object, we add links later + md['distribution'] = {} + LOGGER.debug('Generating OARec discovery metadata') record = WMOWCMP2OutputSchema().write(md, stringify=False) record['properties']['wmo:topicHierarchy'] = mqtt_topic record['wis2box'] = mcf['wis2box'] - LOGGER.debug('Adding distribution links') - distribution_links = self.get_distribution_links( - record, - format_='wcmp2') - # update links, do not extend or we get duplicates - record['links'] = distribution_links - if record['properties']['contacts'][0].get('organization') is None: record['properties']['contacts'][0]['organization'] = record['properties']['contacts'][0].pop('name', "NOTSET") # noqa @@ -128,12 +124,12 @@ def get_distribution_links(self, :returns: `list` of distribution links """ - LOGGER.debug('Adding distribution links') + LOGGER.info('Adding distribution links') identifier = record['id'] topic = record['properties']['wmo:topicHierarchy'] - oafeat_link = None + links = [] plugins = get_plugins(record) # check if any plugin-names contains 2geojson has_2geojson = any('2geojson' in plugin for plugin in plugins) @@ -145,6 +141,7 @@ def get_distribution_links(self, 'description': f'Observations in json format for {identifier}', 'rel': 'collection' } + links.append(oafeat_link) mqp_link = { 'href': get_broker_public_endpoint(), @@ -154,6 +151,7 @@ def get_distribution_links(self, 'rel': 'items', 'channel': topic } + links.append(mqp_link) canonical_link = { 'href': f"{API_URL}/collections/discovery-metadata/items/{identifier}", # noqa @@ -162,12 +160,13 @@ def get_distribution_links(self, 'description': identifier, 'rel': 'canonical' } + links.append(canonical_link) if format_ == 'mcf': - for link in [oafeat_link, mqp_link, canonical_link]: + for link in links: link['url'] = link.pop('href') - return oafeat_link, mqp_link, canonical_link + return links def publish_broker_message(record: dict, storage_path: str, @@ -246,22 +245,21 @@ def publish_discovery_metadata(metadata: Union[dict, str]): LOGGER.info('Adding WCMP2 record from dictionary') record = metadata dm = DiscoveryMetadata() - distribution_links = dm.get_distribution_links( - record, - format_='wcmp2') - # update links, do not extend or we get duplicates - record['links'] = distribution_links - for link in record['links']: - if 'description' in link: - link['title'] = link.pop('description') - if 'url' in link: - link['href'] = link.pop('url') else: - LOGGER.debug('Transforming MCF into WCMP2 record') + LOGGER.info('Transforming MCF into WCMP2 record') dm = DiscoveryMetadata() record_mcf = dm.parse_record(metadata) record = dm.generate(record_mcf) + distribution_links = dm.get_distribution_links(record, format_='wcmp2') + # update links, do not extend or we get duplicates + record['links'] = distribution_links + for link in record['links']: + if 'description' in link: + link['title'] = link.pop('description') + if 'url' in link: + link['href'] = link.pop('url') + if 'x-wmo' in record['id']: msg = 'Change x-wmo to wmo in metadata identifier' LOGGER.error(msg) From d6db2c3cabbff3572e6df4fa514bbf4466afe970 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 2 Dec 2024 14:08:29 +0100 Subject: [PATCH 05/18] resultTime -> reportTime --- .../wis2box/api/backend/elastic.py | 18 +++++++++--------- .../wis2box/data/bufr2geojson.py | 2 +- wis2box-management/wis2box/data/geojson.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index 0b6ab4bf..48e51c4c 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -116,12 +116,20 @@ 'raw': {'type': 'keyword'} } }, - 'observationTime': { + 'reportTime': { 'type': 'date', 'fields': { 'raw': {'type': 'keyword'} } }, + 'reportId': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, 'phenomenonTime': { 'type': 'text' }, @@ -141,14 +149,6 @@ 'description': { 'type': 'text' }, - 'reportId': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, } } } diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index 0858c374..676ace64 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -70,7 +70,7 @@ def transform(self, input_data: Union[Path, bytes], for item in result['items']: id = item['id'] - data_date = item['properties']['resultTime'] + data_date = item['properties']['reportTime'] self.output_data[id] = { '_meta': { 'identifier': id, diff --git a/wis2box-management/wis2box/data/geojson.py b/wis2box-management/wis2box/data/geojson.py index a9df7b62..008635d8 100644 --- a/wis2box-management/wis2box/data/geojson.py +++ b/wis2box-management/wis2box/data/geojson.py @@ -39,7 +39,7 @@ def transform(self, input_data: Union[Path, bytes], LOGGER.debug('Procesing GeoJSON data') data_ = json.loads(input_data) identifier = data_['id'] - data_date = data_['properties']['resultTime'] + data_date = data_['properties']['reportTime'] self.output_data[identifier] = { '_meta': { 'identifier': identifier, From 458ee109dd32cfbcd1c029c8fc7c919f1b970dda Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 2 Dec 2024 14:12:11 +0100 Subject: [PATCH 06/18] consolidate titles for json collections --- wis2box-management/wis2box/data/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index 1c5c8c8d..bdcdada5 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -124,8 +124,8 @@ def gcm(mcf: Union[dict, str]) -> dict: 'id': generated['id'], 'type': 'feature', 'topic_hierarchy': generated['properties']['wmo:topicHierarchy'].replace('origin/a/wis2/', '').replace('/', '.'), # noqa: E501 - 'title': f'bufr2geojson output ({generated["id"]})', - 'description': f'Output published by bufr2geojson for dataset with id={generated["id"]}', # noqa + 'title': f'Observations in json format for {generated["id"]}', + 'description': f'Observations in json format for {generated["id"]}', # noqa 'keywords': generated['properties']['keywords'], 'bbox': bbox, 'links': generated['links'], From 4b75b66fb31b75f1cd56136dffbbfffe54e8a869 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 2 Dec 2024 15:57:36 +0100 Subject: [PATCH 07/18] change publish-test, change time-field --- tests/integration/test_workflow.py | 15 ++++++++++----- wis2box-management/wis2box/data/__init__.py | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 4250ccee..fa9d6258 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -214,14 +214,19 @@ def test_metadata_discovery_publish(): def test_data_ingest(): """Test data ingest/process publish""" - item_api_url = f'{API_URL}/collections/{ID}/items/WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500-82' # noqa + item_api_url = f'{API_URL}/collections/{ID}/items/0-454-2-AWSNAMITAMBO-202107071455-15' # noqa item_api = SESSION.get(item_api_url).json() - assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500' - assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa - item_source = f'2021-07-07/wis/{ID}/{item_api["reportId"]}.bufr4' # noqa - r = SESSION.get(f'{URL}/data/{item_source}') # noqa + assert item_api['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455' + assert item_api['properties']['reportTime'] == '2021-07-07T14:55:00Z' # noqa + assert item_api['properties']['wigos_station_identifier'] == '0-454-2-AWSNAMITAMBO' # noqa + assert item_api['properties']['name'] == 'global_solar_radiation_integrated_over_period_specified' # noqa + assert item_api['properties']['value'] == 0.0 + assert item_api['properties']['unit'] == 'J m-2' + assert item_api['properties']['phenomenonTime'] == '2021-07-06T14:55:00Z/2021-07-07T14:55:00Z' # noqa + + assert r.status_code == codes.ok diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index bdcdada5..dec5a7d3 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -130,7 +130,7 @@ def gcm(mcf: Union[dict, str]) -> dict: 'bbox': bbox, 'links': generated['links'], 'id_field': 'id', - 'time_field': 'observationTime', + 'time_field': 'reportTime', 'title_field': 'id' } From ac688f6feb451765947563ad6598dc0bc9ea6fb3 Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 3 Dec 2024 17:20:30 +0100 Subject: [PATCH 08/18] flake8 and fix data clean --- tests/integration/test_workflow.py | 3 -- .../wis2box/api/backend/elastic.py | 44 ++++++++++++------- .../wis2box/metadata/discovery.py | 6 +-- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index fa9d6258..8118208d 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -227,9 +227,6 @@ def test_data_ingest(): assert item_api['properties']['phenomenonTime'] == '2021-07-06T14:55:00Z/2021-07-07T14:55:00Z' # noqa - assert r.status_code == codes.ok - - def test_data_api(): """Test data API collection queries""" diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index 48e51c4c..b02e77e4 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -406,30 +406,42 @@ def delete_collections_by_retention(self, days: int) -> bool: indices = self.conn.indices.get(index='*').keys() before = datetime_days_ago(days) + # also delete future data + after = datetime_days_ago(-1) - query_by_date = { + msg_query_by_date = { 'query': { 'bool': { - 'should': [{ - 'range': { - 'properties.resultTime': { - 'lte': before - } - } - }, { - 'range': { - 'properties.pubTime': { - 'lte': before - } - } - }] + 'should': [ + {'range': {'properties.pubTime': {'lte': before}}}, + {'range': {'properties.pubTime': {'gte': after}}} + ] + } + } + } + obs_query_by_date = { + 'query': { + 'bool': { + 'should': [ + {'range': {'properties.reportTime': {'lte': before}}}, + {'range': {'properties.reportTime': {'gte': after}}} + ] } } } for index in indices: - LOGGER.debug(f'deleting documents older than {days} days ({before})') # noqa - self.conn.delete_by_query(index=index, **query_by_date) + if index == 'messages': + query_by_date = msg_query_by_date + elif index.startswith('urn-wmo-md'): + query_by_date = obs_query_by_date + else: + # don't run delete-query on other indexes + LOGGER.info(f'items for index={index} will not be deleted') + continue + LOGGER.info(f'deleting documents from index={index} older than {days} days ({before}) or newer than {after}') # noqa + result = self.conn.delete_by_query(index=index, **query_by_date) + LOGGER.info(f'deleted {result["deleted"]} documents from index={index}') # noqa return diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index a7e25787..589bbf0f 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -63,8 +63,6 @@ def generate(self, mcf: dict) -> str: md = deepcopy(mcf) - identifier = md['metadata']['identifier'] - local_topic = mcf['wis2box']['topic_hierarchy'].replace('.', '/') mqtt_topic = f'origin/a/wis2/{local_topic}' @@ -77,7 +75,7 @@ def generate(self, mcf: dict) -> str: if md['identification']['extents']['temporal'][0].get('begin', 'BEGIN_DATE') is None: # noqa today = date.today().strftime('%Y-%m-%d') md['identification']['extents']['temporal'][0]['begin'] = today - + LOGGER.debug('Adding data policy') md['identification']['wmo_data_policy'] = mqtt_topic.split('/')[5] @@ -125,7 +123,7 @@ def get_distribution_links(self, """ LOGGER.info('Adding distribution links') - + identifier = record['id'] topic = record['properties']['wmo:topicHierarchy'] From d5d9645c5b7f0a7dec3e2c41aeeb7b26efa369ce Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 3 Dec 2024 19:40:37 +0100 Subject: [PATCH 09/18] fix test --- tests/integration/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 8118208d..1cf2ae1b 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -218,7 +218,7 @@ def test_data_ingest(): item_api = SESSION.get(item_api_url).json() - assert item_api['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455' + assert item_api['properties']['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455' assert item_api['properties']['reportTime'] == '2021-07-07T14:55:00Z' # noqa assert item_api['properties']['wigos_station_identifier'] == '0-454-2-AWSNAMITAMBO' # noqa assert item_api['properties']['name'] == 'global_solar_radiation_integrated_over_period_specified' # noqa From 1849dad94280df1f7603e35cc62440ebc17b7848 Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 3 Dec 2024 19:53:35 +0100 Subject: [PATCH 10/18] fix data-ingest test --- tests/integration/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 1cf2ae1b..2b4d195b 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -223,7 +223,7 @@ def test_data_ingest(): assert item_api['properties']['wigos_station_identifier'] == '0-454-2-AWSNAMITAMBO' # noqa assert item_api['properties']['name'] == 'global_solar_radiation_integrated_over_period_specified' # noqa assert item_api['properties']['value'] == 0.0 - assert item_api['properties']['unit'] == 'J m-2' + assert item_api['properties']['units'] == 'J m-2' assert item_api['properties']['phenomenonTime'] == '2021-07-06T14:55:00Z/2021-07-07T14:55:00Z' # noqa From 48b88746d293eb3c2e9ef4b405e02ffe3887cd3e Mon Sep 17 00:00:00 2001 From: Maaike Date: Tue, 3 Dec 2024 20:09:46 +0100 Subject: [PATCH 11/18] flake8 --- tests/integration/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 2b4d195b..176cbd59 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -218,7 +218,7 @@ def test_data_ingest(): item_api = SESSION.get(item_api_url).json() - assert item_api['properties']['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455' + assert item_api['properties']['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455' # noqa assert item_api['properties']['reportTime'] == '2021-07-07T14:55:00Z' # noqa assert item_api['properties']['wigos_station_identifier'] == '0-454-2-AWSNAMITAMBO' # noqa assert item_api['properties']['name'] == 'global_solar_radiation_integrated_over_period_specified' # noqa From 59af98a77d0d37734eb6757bc276647f503882e1 Mon Sep 17 00:00:00 2001 From: Maaike Date: Fri, 6 Dec 2024 13:34:49 +0100 Subject: [PATCH 12/18] oafeat-link should show items --- wis2box-management/wis2box/metadata/discovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 589bbf0f..57e00c54 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -133,7 +133,7 @@ def get_distribution_links(self, has_2geojson = any('2geojson' in plugin for plugin in plugins) if has_2geojson: oafeat_link = { - 'href': f"{API_URL}/collections/{identifier}?f=json", + 'href': f"{API_URL}/collections/{identifier}/items", 'type': 'application/json', 'name': identifier, 'description': f'Observations in json format for {identifier}', From d529f413c6cfe5036b0088c3ead0ca8dc17a2140 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 9 Dec 2024 10:45:24 +0100 Subject: [PATCH 13/18] add keyword to phenomenonTime --- wis2box-management/wis2box/api/backend/elastic.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index b02e77e4..e2cc55cb 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -78,7 +78,10 @@ } }, 'phenomenonTime': { - 'type': 'text' + 'type': 'text', + 'fields': { + 'raw': { 'type': 'keyword'} + } }, 'wigos_station_identifier': { 'type': 'text', From 067fb3465c0f5eae625cba86cfa60b75df329da5 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 9 Dec 2024 11:02:18 +0100 Subject: [PATCH 14/18] oafeat-link, default sort --- wis2box-management/wis2box/api/backend/elastic.py | 2 +- wis2box-management/wis2box/metadata/discovery.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index e2cc55cb..5791fc6b 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -80,7 +80,7 @@ 'phenomenonTime': { 'type': 'text', 'fields': { - 'raw': { 'type': 'keyword'} + 'raw': {'type': 'keyword'} } }, 'wigos_station_identifier': { diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 57e00c54..696f9505 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -132,8 +132,9 @@ def get_distribution_links(self, # check if any plugin-names contains 2geojson has_2geojson = any('2geojson' in plugin for plugin in plugins) if has_2geojson: + # default view is descending by reportTime oafeat_link = { - 'href': f"{API_URL}/collections/{identifier}/items", + 'href': f"{API_URL}/collections/{identifier}/items?sortby=-reportTime", # noqa 'type': 'application/json', 'name': identifier, 'description': f'Observations in json format for {identifier}', From 765016745b5f2beb0065578d51c00fc24bb193eb Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Mon, 16 Dec 2024 07:52:26 -0500 Subject: [PATCH 15/18] Update discovery.py --- wis2box-management/wis2box/metadata/discovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 696f9505..81f6aede 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -134,7 +134,7 @@ def get_distribution_links(self, if has_2geojson: # default view is descending by reportTime oafeat_link = { - 'href': f"{API_URL}/collections/{identifier}/items?sortby=-reportTime", # noqa + 'href': f'{API_URL}/collections/{identifier}/items?sortby=-reportTime', # noqa 'type': 'application/json', 'name': identifier, 'description': f'Observations in json format for {identifier}', From 7b58b32bd0bda59ae2f2d3e733db715042dbc242 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 16 Dec 2024 13:54:34 +0100 Subject: [PATCH 16/18] move import to top --- wis2box-management/wis2box/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index 51cf51e2..dd9f3ddc 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -30,6 +30,7 @@ from wis2box import cli_helpers from wis2box.api.backend import load_backend from wis2box.api.config import load_config +from wis2box.data import gcm from wis2box.data_mappings import get_plugins from wis2box.env import (DOCKER_API_URL, API_URL) @@ -271,7 +272,6 @@ def setup(ctx, verbosity): continue if metadata_id not in api_collections: click.echo(f'Adding data-collection for: {metadata_id}') - from wis2box.data import gcm meta = gcm(record) setup_collection(meta=meta) From 08f5a0c406a17971d140f63e2777dbbbdb2e3866 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 16 Dec 2024 14:13:31 +0100 Subject: [PATCH 17/18] remove mosquitto changes --- docker-compose.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 28573366..13a23cf1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -110,8 +110,6 @@ services: context: ./wis2box-broker env_file: - wis2box.env - volumes: - - mosquitto-config:/mosquitto/config wis2box-management: container_name: wis2box-management @@ -162,4 +160,3 @@ volumes: minio-data: auth-data: htpasswd: - mosquitto-config: From 9e81698b0857e3489f1a847e418b4b2a8be074a1 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 16 Dec 2024 14:27:46 +0100 Subject: [PATCH 18/18] avoid circular import --- wis2box-management/wis2box/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index dd9f3ddc..51cf51e2 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -30,7 +30,6 @@ from wis2box import cli_helpers from wis2box.api.backend import load_backend from wis2box.api.config import load_config -from wis2box.data import gcm from wis2box.data_mappings import get_plugins from wis2box.env import (DOCKER_API_URL, API_URL) @@ -272,6 +271,7 @@ def setup(ctx, verbosity): continue if metadata_id not in api_collections: click.echo(f'Adding data-collection for: {metadata_id}') + from wis2box.data import gcm meta = gcm(record) setup_collection(meta=meta)