diff --git a/.zap/rules.tsv b/.zap/rules.tsv index 4a191ad7..fcdad969 100644 --- a/.zap/rules.tsv +++ b/.zap/rules.tsv @@ -21,3 +21,4 @@ 10036 IGNORE "Server Leaks Version Information via ""Server"" HTTP Response Header Field" Low 10110 IGNORE Dangerous JS Functions Low 10105 IGNORE Authentication Credentials Captured Medium +10003 IGNORE Vulnerable JS Library Medium diff --git a/docker-compose.yml b/docker-compose.yml index 28573366..13a23cf1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -110,8 +110,6 @@ services: context: ./wis2box-broker env_file: - wis2box.env - volumes: - - mosquitto-config:/mosquitto/config wis2box-management: container_name: wis2box-management @@ -162,4 +160,3 @@ volumes: minio-data: auth-data: htpasswd: - mosquitto-config: diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 4250ccee..176cbd59 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -214,15 +214,17 @@ def test_metadata_discovery_publish(): def test_data_ingest(): """Test data ingest/process publish""" - item_api_url = f'{API_URL}/collections/{ID}/items/WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500-82' # noqa + item_api_url = f'{API_URL}/collections/{ID}/items/0-454-2-AWSNAMITAMBO-202107071455-15' # noqa item_api = SESSION.get(item_api_url).json() - assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500' - assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa - item_source = f'2021-07-07/wis/{ID}/{item_api["reportId"]}.bufr4' # noqa - r = SESSION.get(f'{URL}/data/{item_source}') # noqa - assert r.status_code == codes.ok + assert item_api['properties']['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455' # noqa + assert item_api['properties']['reportTime'] == '2021-07-07T14:55:00Z' # noqa + assert item_api['properties']['wigos_station_identifier'] == '0-454-2-AWSNAMITAMBO' # noqa + assert item_api['properties']['name'] == 'global_solar_radiation_integrated_over_period_specified' # noqa + assert item_api['properties']['value'] == 0.0 + assert item_api['properties']['units'] == 'J m-2' + assert item_api['properties']['phenomenonTime'] == '2021-07-06T14:55:00Z/2021-07-07T14:55:00Z' # noqa def test_data_api(): diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index bab16b84..51cf51e2 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -30,6 +30,8 @@ from wis2box import cli_helpers from wis2box.api.backend import load_backend from wis2box.api.config import load_config +from wis2box.data_mappings import get_plugins + from wis2box.env import (DOCKER_API_URL, API_URL) LOGGER = logging.getLogger(__name__) @@ -258,10 +260,17 @@ def setup(ctx, verbosity): except Exception as err: click.echo(f'Issue loading discovery-metadata: {err}') return False + # loop over records and add data-collection when bufr2geojson is used for record in records['features']: metadata_id = record['id'] + plugins = get_plugins(record) + LOGGER.info(f'Plugins used by {metadata_id}: {plugins}') + # check if any plugin-names contains 2geojson + has_2geojson = any('2geojson' in plugin for plugin in plugins) + if has_2geojson is False: + continue if metadata_id not in api_collections: - click.echo(f'Adding collection: {metadata_id}') + click.echo(f'Adding data-collection for: {metadata_id}') from wis2box.data import gcm meta = gcm(record) setup_collection(meta=meta) diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index 81220b34..5791fc6b 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -78,7 +78,10 @@ } }, 'phenomenonTime': { - 'type': 'text' + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } }, 'wigos_station_identifier': { 'type': 'text', @@ -103,6 +106,57 @@ } } +MAPPINGS_OBS = { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + }, + 'properties': { + 'properties': { + 'name': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'reportTime': { + 'type': 'date', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'reportId': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'phenomenonTime': { + 'type': 'text' + }, + 'wigos_station_identifier': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'units': { + 'type': 'text' + }, + 'value': { + 'type': 'float', + 'coerce': True + }, + 'description': { + 'type': 'text' + }, + } + } + } +} + MAPPINGS_STATIONS = { 'properties': { 'geometry': { @@ -216,8 +270,10 @@ def add_collection(self, collection_id: str) -> dict: if collection_id == 'stations': mappings = MAPPINGS_STATIONS - else: + elif collection_id in ['discovery-metadata', 'messages']: mappings = MAPPINGS + else: + mappings = MAPPINGS_OBS es_index = self.es_id(collection_id) @@ -316,8 +372,11 @@ def gendata(features): '_id': feature['id'], '_source': feature } - - helpers.bulk(self.conn, gendata(items)) + success, errors = helpers.bulk(self.conn, gendata(items), raise_on_error=False) # noqa + if errors: + for error in errors: + LOGGER.error(f"Indexing error: {error}") + raise RuntimeError(f"Upsert failed with {len(errors)} errors") def delete_collection_item(self, collection_id: str, item_id: str) -> str: """ @@ -350,30 +409,42 @@ def delete_collections_by_retention(self, days: int) -> bool: indices = self.conn.indices.get(index='*').keys() before = datetime_days_ago(days) + # also delete future data + after = datetime_days_ago(-1) - query_by_date = { + msg_query_by_date = { 'query': { 'bool': { - 'should': [{ - 'range': { - 'properties.resultTime': { - 'lte': before - } - } - }, { - 'range': { - 'properties.pubTime': { - 'lte': before - } - } - }] + 'should': [ + {'range': {'properties.pubTime': {'lte': before}}}, + {'range': {'properties.pubTime': {'gte': after}}} + ] + } + } + } + obs_query_by_date = { + 'query': { + 'bool': { + 'should': [ + {'range': {'properties.reportTime': {'lte': before}}}, + {'range': {'properties.reportTime': {'gte': after}}} + ] } } } for index in indices: - LOGGER.debug(f'deleting documents older than {days} days ({before})') # noqa - self.conn.delete_by_query(index=index, **query_by_date) + if index == 'messages': + query_by_date = msg_query_by_date + elif index.startswith('urn-wmo-md'): + query_by_date = obs_query_by_date + else: + # don't run delete-query on other indexes + LOGGER.info(f'items for index={index} will not be deleted') + continue + LOGGER.info(f'deleting documents from index={index} older than {days} days ({before}) or newer than {after}') # noqa + result = self.conn.delete_by_query(index=index, **query_by_date) + LOGGER.info(f'deleted {result["deleted"]} documents from index={index}') # noqa return diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index d68e82ae..dec5a7d3 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -124,13 +124,13 @@ def gcm(mcf: Union[dict, str]) -> dict: 'id': generated['id'], 'type': 'feature', 'topic_hierarchy': generated['properties']['wmo:topicHierarchy'].replace('origin/a/wis2/', '').replace('/', '.'), # noqa: E501 - 'title': generated['properties']['title'], - 'description': generated['properties']['description'], + 'title': f'Observations in json format for {generated["id"]}', + 'description': f'Observations in json format for {generated["id"]}', # noqa 'keywords': generated['properties']['keywords'], 'bbox': bbox, 'links': generated['links'], 'id_field': 'id', - 'time_field': 'resultTime', + 'time_field': 'reportTime', 'title_field': 'id' } @@ -145,7 +145,7 @@ def add_collection_data(metadata: str): """ meta = gcm(metadata) - + LOGGER.info(f'Adding data-collection for {meta["id"]}') setup_collection(meta=meta) return diff --git a/wis2box-management/wis2box/data/bufr2geojson.py b/wis2box-management/wis2box/data/bufr2geojson.py index 0858c374..676ace64 100644 --- a/wis2box-management/wis2box/data/bufr2geojson.py +++ b/wis2box-management/wis2box/data/bufr2geojson.py @@ -70,7 +70,7 @@ def transform(self, input_data: Union[Path, bytes], for item in result['items']: id = item['id'] - data_date = item['properties']['resultTime'] + data_date = item['properties']['reportTime'] self.output_data[id] = { '_meta': { 'identifier': id, diff --git a/wis2box-management/wis2box/data/geojson.py b/wis2box-management/wis2box/data/geojson.py index a9df7b62..008635d8 100644 --- a/wis2box-management/wis2box/data/geojson.py +++ b/wis2box-management/wis2box/data/geojson.py @@ -39,7 +39,7 @@ def transform(self, input_data: Union[Path, bytes], LOGGER.debug('Procesing GeoJSON data') data_ = json.loads(input_data) identifier = data_['id'] - data_date = data_['properties']['resultTime'] + data_date = data_['properties']['reportTime'] self.output_data[identifier] = { '_meta': { 'identifier': identifier, diff --git a/wis2box-management/wis2box/data_mappings.py b/wis2box-management/wis2box/data_mappings.py index b81b131a..cc5cdd22 100644 --- a/wis2box-management/wis2box/data_mappings.py +++ b/wis2box-management/wis2box/data_mappings.py @@ -31,6 +31,28 @@ LOGGER = logging.getLogger(__name__) +def get_plugins(record: dict) -> list: + """ + Get plugins from record + + :param record: `dict` of record + + :returns: `list` of plugins + """ + + plugins = [] + + try: + dm = record['wis2box']['data_mappings'] + for filetype in dm['plugins'].keys(): + for p in dm['plugins'][filetype]: + plugins.append(p['plugin']) + except Exception as e: + LOGGER.info(f"No plugins found for record-id={record['id']} : {e}") + + return plugins + + def refresh_data_mappings(): # load plugin for local broker and publish refresh request defs_local = { diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index 332275c7..81f6aede 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -35,7 +35,8 @@ from wis2box import cli_helpers from wis2box.api import (delete_collection_item, remove_collection, setup_collection, upsert_collection_item) -from wis2box.data_mappings import refresh_data_mappings +from wis2box.data_mappings import refresh_data_mappings, get_plugins + from wis2box.env import (API_URL, BROKER_PUBLIC, DOCKER_API_URL, STORAGE_PUBLIC, STORAGE_SOURCE, URL) from wis2box.metadata.base import BaseMetadata @@ -62,8 +63,6 @@ def generate(self, mcf: dict) -> str: md = deepcopy(mcf) - identifier = md['metadata']['identifier'] - local_topic = mcf['wis2box']['topic_hierarchy'].replace('.', '/') mqtt_topic = f'origin/a/wis2/{local_topic}' @@ -77,19 +76,12 @@ def generate(self, mcf: dict) -> str: today = date.today().strftime('%Y-%m-%d') md['identification']['extents']['temporal'][0]['begin'] = today - LOGGER.debug('Adding distribution links') - oafeat_link, mqp_link, canonical_link = self.get_distribution_links( - identifier, mqtt_topic, format_='mcf') - - md['distribution'] = { - 'oafeat': oafeat_link, - 'mqtt': mqp_link, - 'canonical': canonical_link - } - LOGGER.debug('Adding data policy') md['identification']['wmo_data_policy'] = mqtt_topic.split('/')[5] + # md set 'distribution' to empty object, we add links later + md['distribution'] = {} + LOGGER.debug('Generating OARec discovery metadata') record = WMOWCMP2OutputSchema().write(md, stringify=False) record['properties']['wmo:topicHierarchy'] = mqtt_topic @@ -118,26 +110,37 @@ def generate(self, mcf: dict) -> str: return record - def get_distribution_links(self, identifier: str, topic: str, + def get_distribution_links(self, + record: dict, format_='mcf') -> list: """ Generates distribution links - :param identifier: `str` of metadata identifier - :param topic: `str` of associated topic + :param record: `dict` of discovery metadata record :param format_: `str` of format (`mcf` or `wcmp2`) :returns: `list` of distribution links """ - LOGGER.debug('Adding distribution links') - oafeat_link = { - 'href': f"{API_URL}/collections/{identifier}?f=json", - 'type': 'application/json', - 'name': identifier, - 'description': identifier, - 'rel': 'collection' - } + LOGGER.info('Adding distribution links') + + identifier = record['id'] + topic = record['properties']['wmo:topicHierarchy'] + + links = [] + plugins = get_plugins(record) + # check if any plugin-names contains 2geojson + has_2geojson = any('2geojson' in plugin for plugin in plugins) + if has_2geojson: + # default view is descending by reportTime + oafeat_link = { + 'href': f'{API_URL}/collections/{identifier}/items?sortby=-reportTime', # noqa + 'type': 'application/json', + 'name': identifier, + 'description': f'Observations in json format for {identifier}', + 'rel': 'collection' + } + links.append(oafeat_link) mqp_link = { 'href': get_broker_public_endpoint(), @@ -147,6 +150,7 @@ def get_distribution_links(self, identifier: str, topic: str, 'rel': 'items', 'channel': topic } + links.append(mqp_link) canonical_link = { 'href': f"{API_URL}/collections/discovery-metadata/items/{identifier}", # noqa @@ -155,12 +159,13 @@ def get_distribution_links(self, identifier: str, topic: str, 'description': identifier, 'rel': 'canonical' } + links.append(canonical_link) if format_ == 'mcf': - for link in [oafeat_link, mqp_link, canonical_link]: + for link in links: link['url'] = link.pop('href') - return oafeat_link, mqp_link, canonical_link + return links def publish_broker_message(record: dict, storage_path: str, @@ -239,22 +244,21 @@ def publish_discovery_metadata(metadata: Union[dict, str]): LOGGER.info('Adding WCMP2 record from dictionary') record = metadata dm = DiscoveryMetadata() - distribution_links = dm.get_distribution_links( - record['id'], record['properties']['wmo:topicHierarchy'], - format_='wcmp2') - # update links, do not extend or we get duplicates - record['links'] = distribution_links - for link in record['links']: - if 'description' in link: - link['title'] = link.pop('description') - if 'url' in link: - link['href'] = link.pop('url') else: - LOGGER.debug('Transforming MCF into WCMP2 record') + LOGGER.info('Transforming MCF into WCMP2 record') dm = DiscoveryMetadata() record_mcf = dm.parse_record(metadata) record = dm.generate(record_mcf) + distribution_links = dm.get_distribution_links(record, format_='wcmp2') + # update links, do not extend or we get duplicates + record['links'] = distribution_links + for link in record['links']: + if 'description' in link: + link['title'] = link.pop('description') + if 'url' in link: + link['href'] = link.pop('url') + if 'x-wmo' in record['id']: msg = 'Change x-wmo to wmo in metadata identifier' LOGGER.error(msg)