diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index fa9d6258..8118208d 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -227,9 +227,6 @@ def test_data_ingest(): assert item_api['properties']['phenomenonTime'] == '2021-07-06T14:55:00Z/2021-07-07T14:55:00Z' # noqa - assert r.status_code == codes.ok - - def test_data_api(): """Test data API collection queries""" diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index 48e51c4c..b02e77e4 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -406,30 +406,42 @@ def delete_collections_by_retention(self, days: int) -> bool: indices = self.conn.indices.get(index='*').keys() before = datetime_days_ago(days) + # also delete future data + after = datetime_days_ago(-1) - query_by_date = { + msg_query_by_date = { 'query': { 'bool': { - 'should': [{ - 'range': { - 'properties.resultTime': { - 'lte': before - } - } - }, { - 'range': { - 'properties.pubTime': { - 'lte': before - } - } - }] + 'should': [ + {'range': {'properties.pubTime': {'lte': before}}}, + {'range': {'properties.pubTime': {'gte': after}}} + ] + } + } + } + obs_query_by_date = { + 'query': { + 'bool': { + 'should': [ + {'range': {'properties.reportTime': {'lte': before}}}, + {'range': {'properties.reportTime': {'gte': after}}} + ] } } } for index in indices: - LOGGER.debug(f'deleting documents older than {days} days ({before})') # noqa - self.conn.delete_by_query(index=index, **query_by_date) + if index == 'messages': + query_by_date = msg_query_by_date + elif index.startswith('urn-wmo-md'): + query_by_date = obs_query_by_date + else: + # don't run delete-query on other indexes + LOGGER.info(f'items for index={index} will not be deleted') + continue + LOGGER.info(f'deleting documents from index={index} older than {days} days ({before}) or newer than {after}') # noqa + result = self.conn.delete_by_query(index=index, **query_by_date) + LOGGER.info(f'deleted {result["deleted"]} documents from index={index}') # noqa return diff --git a/wis2box-management/wis2box/metadata/discovery.py b/wis2box-management/wis2box/metadata/discovery.py index a7e25787..589bbf0f 100644 --- a/wis2box-management/wis2box/metadata/discovery.py +++ b/wis2box-management/wis2box/metadata/discovery.py @@ -63,8 +63,6 @@ def generate(self, mcf: dict) -> str: md = deepcopy(mcf) - identifier = md['metadata']['identifier'] - local_topic = mcf['wis2box']['topic_hierarchy'].replace('.', '/') mqtt_topic = f'origin/a/wis2/{local_topic}' @@ -77,7 +75,7 @@ def generate(self, mcf: dict) -> str: if md['identification']['extents']['temporal'][0].get('begin', 'BEGIN_DATE') is None: # noqa today = date.today().strftime('%Y-%m-%d') md['identification']['extents']['temporal'][0]['begin'] = today - + LOGGER.debug('Adding data policy') md['identification']['wmo_data_policy'] = mqtt_topic.split('/')[5] @@ -125,7 +123,7 @@ def get_distribution_links(self, """ LOGGER.info('Adding distribution links') - + identifier = record['id'] topic = record['properties']['wmo:topicHierarchy']