Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reorganizing the data-collection for new bufr2geojson oaproc output #811

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions .zap/rules.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
10036 IGNORE "Server Leaks Version Information via ""Server"" HTTP Response Header Field" Low
10110 IGNORE Dangerous JS Functions Low
10105 IGNORE Authentication Credentials Captured Medium
10003 IGNORE Vulnerable JS Library Medium
14 changes: 8 additions & 6 deletions tests/integration/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,17 @@ def test_metadata_discovery_publish():
def test_data_ingest():
"""Test data ingest/process publish"""

item_api_url = f'{API_URL}/collections/{ID}/items/WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500-82' # noqa
item_api_url = f'{API_URL}/collections/{ID}/items/0-454-2-AWSNAMITAMBO-202107071455-15' # noqa

item_api = SESSION.get(item_api_url).json()

assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500'
assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa
item_source = f'2021-07-07/wis/{ID}/{item_api["reportId"]}.bufr4' # noqa
r = SESSION.get(f'{URL}/data/{item_source}') # noqa
assert r.status_code == codes.ok
assert item_api['properties']['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455' # noqa
assert item_api['properties']['reportTime'] == '2021-07-07T14:55:00Z' # noqa
assert item_api['properties']['wigos_station_identifier'] == '0-454-2-AWSNAMITAMBO' # noqa
assert item_api['properties']['name'] == 'global_solar_radiation_integrated_over_period_specified' # noqa
assert item_api['properties']['value'] == 0.0
assert item_api['properties']['units'] == 'J m-2'
assert item_api['properties']['phenomenonTime'] == '2021-07-06T14:55:00Z/2021-07-07T14:55:00Z' # noqa


def test_data_api():
Expand Down
11 changes: 10 additions & 1 deletion wis2box-management/wis2box/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from wis2box import cli_helpers
from wis2box.api.backend import load_backend
from wis2box.api.config import load_config
from wis2box.data_mappings import get_plugins

from wis2box.env import (DOCKER_API_URL, API_URL)

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -258,10 +260,17 @@ def setup(ctx, verbosity):
except Exception as err:
click.echo(f'Issue loading discovery-metadata: {err}')
return False
# loop over records and add data-collection when bufr2geojson is used
for record in records['features']:
metadata_id = record['id']
plugins = get_plugins(record)
LOGGER.info(f'Plugins used by {metadata_id}: {plugins}')
# check if any plugin-names contains 2geojson
has_2geojson = any('2geojson' in plugin for plugin in plugins)
if has_2geojson is False:
continue
if metadata_id not in api_collections:
click.echo(f'Adding collection: {metadata_id}')
click.echo(f'Adding data-collection for: {metadata_id}')
from wis2box.data import gcm
meta = gcm(record)
setup_collection(meta=meta)
Expand Down
111 changes: 91 additions & 20 deletions wis2box-management/wis2box/api/backend/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@
}
},
'phenomenonTime': {
'type': 'text'
'type': 'text',
'fields': {
'raw': {'type': 'keyword'}
}
},
'wigos_station_identifier': {
'type': 'text',
Expand All @@ -103,6 +106,57 @@
}
}

MAPPINGS_OBS = {
'properties': {
'geometry': {
'type': 'geo_shape'
},
'properties': {
'properties': {
'name': {
'type': 'text',
'fields': {
'raw': {'type': 'keyword'}
}
},
'reportTime': {
'type': 'date',
'fields': {
'raw': {'type': 'keyword'}
}
},
'reportId': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'phenomenonTime': {
'type': 'text'
},
'wigos_station_identifier': {
'type': 'text',
'fields': {
'raw': {'type': 'keyword'}
}
},
'units': {
'type': 'text'
},
'value': {
'type': 'float',
'coerce': True
},
'description': {
'type': 'text'
},
}
}
}
}

MAPPINGS_STATIONS = {
'properties': {
'geometry': {
Expand Down Expand Up @@ -216,8 +270,10 @@ def add_collection(self, collection_id: str) -> dict:

if collection_id == 'stations':
mappings = MAPPINGS_STATIONS
else:
elif collection_id in ['discovery-metadata', 'messages']:
mappings = MAPPINGS
else:
mappings = MAPPINGS_OBS

es_index = self.es_id(collection_id)

Expand Down Expand Up @@ -316,8 +372,11 @@ def gendata(features):
'_id': feature['id'],
'_source': feature
}

helpers.bulk(self.conn, gendata(items))
success, errors = helpers.bulk(self.conn, gendata(items), raise_on_error=False) # noqa
if errors:
for error in errors:
LOGGER.error(f"Indexing error: {error}")
raise RuntimeError(f"Upsert failed with {len(errors)} errors")

def delete_collection_item(self, collection_id: str, item_id: str) -> str:
"""
Expand Down Expand Up @@ -350,30 +409,42 @@ def delete_collections_by_retention(self, days: int) -> bool:
indices = self.conn.indices.get(index='*').keys()

before = datetime_days_ago(days)
# also delete future data
after = datetime_days_ago(-1)

query_by_date = {
msg_query_by_date = {
'query': {
'bool': {
'should': [{
'range': {
'properties.resultTime': {
'lte': before
}
}
}, {
'range': {
'properties.pubTime': {
'lte': before
}
}
}]
'should': [
{'range': {'properties.pubTime': {'lte': before}}},
{'range': {'properties.pubTime': {'gte': after}}}
]
}
}
}
obs_query_by_date = {
'query': {
'bool': {
'should': [
{'range': {'properties.reportTime': {'lte': before}}},
{'range': {'properties.reportTime': {'gte': after}}}
]
}
}
}

for index in indices:
LOGGER.debug(f'deleting documents older than {days} days ({before})') # noqa
self.conn.delete_by_query(index=index, **query_by_date)
if index == 'messages':
query_by_date = msg_query_by_date
elif index.startswith('urn-wmo-md'):
query_by_date = obs_query_by_date
else:
# don't run delete-query on other indexes
LOGGER.info(f'items for index={index} will not be deleted')
continue
LOGGER.info(f'deleting documents from index={index} older than {days} days ({before}) or newer than {after}') # noqa
result = self.conn.delete_by_query(index=index, **query_by_date)
LOGGER.info(f'deleted {result["deleted"]} documents from index={index}') # noqa

return

Expand Down
8 changes: 4 additions & 4 deletions wis2box-management/wis2box/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ def gcm(mcf: Union[dict, str]) -> dict:
'id': generated['id'],
'type': 'feature',
'topic_hierarchy': generated['properties']['wmo:topicHierarchy'].replace('origin/a/wis2/', '').replace('/', '.'), # noqa: E501
'title': generated['properties']['title'],
'description': generated['properties']['description'],
'title': f'Observations in json format for {generated["id"]}',
'description': f'Observations in json format for {generated["id"]}', # noqa
'keywords': generated['properties']['keywords'],
'bbox': bbox,
'links': generated['links'],
'id_field': 'id',
'time_field': 'resultTime',
'time_field': 'reportTime',
'title_field': 'id'
}

Expand All @@ -145,7 +145,7 @@ def add_collection_data(metadata: str):
"""

meta = gcm(metadata)

LOGGER.info(f'Adding data-collection for {meta["id"]}')
setup_collection(meta=meta)

return
Expand Down
2 changes: 1 addition & 1 deletion wis2box-management/wis2box/data/bufr2geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def transform(self, input_data: Union[Path, bytes],
for item in result['items']:
id = item['id']

data_date = item['properties']['resultTime']
data_date = item['properties']['reportTime']
self.output_data[id] = {
'_meta': {
'identifier': id,
Expand Down
2 changes: 1 addition & 1 deletion wis2box-management/wis2box/data/geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def transform(self, input_data: Union[Path, bytes],
LOGGER.debug('Procesing GeoJSON data')
data_ = json.loads(input_data)
identifier = data_['id']
data_date = data_['properties']['resultTime']
data_date = data_['properties']['reportTime']
self.output_data[identifier] = {
'_meta': {
'identifier': identifier,
Expand Down
22 changes: 22 additions & 0 deletions wis2box-management/wis2box/data_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@
LOGGER = logging.getLogger(__name__)


def get_plugins(record: dict) -> list:
"""
Get plugins from record

:param record: `dict` of record

:returns: `list` of plugins
"""

plugins = []

try:
dm = record['wis2box']['data_mappings']
for filetype in dm['plugins'].keys():
for p in dm['plugins'][filetype]:
plugins.append(p['plugin'])
except Exception as e:
LOGGER.info(f"No plugins found for record-id={record['id']} : {e}")

return plugins


def refresh_data_mappings():
# load plugin for local broker and publish refresh request
defs_local = {
Expand Down
Loading
Loading