Skip to content

Commit

Permalink
607 review request to integrate universal pipeline (#611)
Browse files Browse the repository at this point in the history
* grib2-pipeline-plugin-demo-from-CMA

This is grib2 pipeline plugin from CMA which named universal.py, it is recommended to put it in /wis2box/wis2box-management/wis2box/data directory.

* Update data-mappings.yml

reference grib2-pipeline-plugin in CMA demo TH in data-mapping to deal with grib2 files, which is recomended to append it in /wis2box/tests/data/data-mappings.yml

* add china-NWP-grib2 testing data

these testing dataset provide some sample to support users to test grib2-data-pipeline-plugin

* update universal pipeline and center-id,add discovery-metadata

* Update GRAPES-GEPS-GLB.yml

* add test and documentation

* uncommit local port

* flake8, remove md, fix expected numbers

* align numbers

* 5 + 11 = 16 (to validate congo msg 5, skip 11 china msg)

* Update tests-docker.yml

* Update data-pipeline-plugins.rst

* Update GRAPES-GEPS-GLB.yml

* update metadata, topic, identifier, filename

* update metadata, topic, identifier, filename

* Update cn-grapes-geps-global.yml

* Update tests-docker.yml

* Update data-mappings.yml

---------

Co-authored-by: Maaike <[email protected]>
Co-authored-by: Tom Kralidis <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2024
1 parent a28d7e2 commit dafcecc
Show file tree
Hide file tree
Showing 16 changed files with 201 additions and 12 deletions.
21 changes: 16 additions & 5 deletions .github/workflows/tests-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: populate stations from CSV 📡
run: |
python3 wis2box-ctl.py execute wis2box metadata station publish-collection
- name: add Malawi data 🇲🇼
- name: add Malawi synop data (csv2bufr synop_bufr template) 🇲🇼
env:
TOPIC_HIERARCHY: mw-mw_met_centre.data.core.weather.surface-based-observations.synop
DISCOVERY_METADATA: /data/wis2box/metadata/discovery/mw-surface-weather-observations.yml
Expand All @@ -61,7 +61,7 @@ jobs:
curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID
check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA_UPDATE
- name: add Italy data 🇮🇹
- name: add Italy synop data (bufr2bufr) 🇮🇹
env:
TOPIC_HIERARCHY: it-roma_met_centre.data.core.weather.surface-based-observations.synop
DISCOVERY_METADATA: /data/wis2box/metadata/discovery/it-surface-weather-observations.yml
Expand All @@ -73,7 +73,7 @@ jobs:
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA
curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID
check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID
- name: add Algeria data 🇩🇿
- name: add Algeria synop data (bufr2bufr) 🇩🇿
env:
TOPIC_HIERARCHY: dz-alger_met_centre.data.core.weather.surface-based-observations.synop
DISCOVERY_METADATA: /data/wis2box/metadata/discovery/dz-surface-weather-observations.yml
Expand All @@ -85,7 +85,7 @@ jobs:
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA
curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID
check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID
- name: add Romania data 🇷🇴
- name: add Romania synop data (synop2bufr and csv2bufr aws-template) 🇷🇴
env:
TOPIC_HIERARCHY: ro-rnimh.data.core.weather.surface-based-observations.synop
DISCOVERY_METADATA: /data/wis2box/metadata/discovery/ro-synoptic-weather-observations.yml
Expand All @@ -97,7 +97,7 @@ jobs:
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA
curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID
check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID
- name: add Congo data 🇨🇩
- name: add Congo synop data (synop2bufr) 🇨🇩
env:
TOPIC_HIERARCHY: cd-brazza_met_centre.data.core.weather.surface-based-observations.synop
DISCOVERY_METADATA: /data/wis2box/metadata/discovery/cd-surface-weather-observations.yml
Expand All @@ -109,6 +109,17 @@ jobs:
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA
curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID
check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID
- name: add China GRIB2 data (universal pipeline) 🇨🇳
env:
TOPIC_HIERARCHY: cn-cma-babj.data.core.weather.prediction.forecast.medium-range.probabilistic.global
DISCOVERY_METADATA: /data/wis2box/metadata/discovery/cn-grapes-geps-global.yml
DISCOVERY_METADATA_ID: urn:x-wmo:md:cn-cma-babj:grapes-geps-global
TEST_DATA: /data/wis2box/observations/china
run: |
python3 wis2box-ctl.py execute wis2box metadata discovery publish $DISCOVERY_METADATA
python3 wis2box-ctl.py execute wis2box data ingest -th $TOPIC_HIERARCHY -p $TEST_DATA
curl -s http://localhost/oapi/collections/discovery-metadata/items/$DISCOVERY_METADATA_ID --output /tmp/$DISCOVERY_METADATA_ID
check-jsonschema --schemafile /tmp/wcmp2-bundled.json /tmp/$DISCOVERY_METADATA_ID
- name: sync stations 🔄
run: |
sleep 30
Expand Down
24 changes: 22 additions & 2 deletions docs/source/reference/running/data-pipeline-plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ A typical synop2bufr plugin workflow definition would be defined as follows:
notify: true # trigger GeoJSON publishing for API and UI
file-pattern: '^station_123_(\d{4})(\d{2}).*.txt$' # example: station_123_202305_112342.txt (where 2023 is the year and 05 is the month)
``wis2box.data.bufr4.ObservationDataBUFR``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This plugin takes an incoming BUFR4 data file and separates it into individual BUFR bulletins if there
Expand All @@ -99,6 +98,27 @@ A typical BUFR4 plugin workflow definition would be defined as follows:
notify: true # trigger GeoJSON publishing for API and UI
file-pattern: '^.*\.bin$'
``wis2box.data.universal.UniversalData``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This plugin can be used to publish any data, without any transformation.

The plugin takes any incoming data, copies it to the ``/data`` endpoint configured in wis2box, providing minimal information in the WIS2 Notification:

- ``properties.datatime`` in the WIS2 notification is parsed as ``match.group(1)`` of the regular expression defined in the plugin configuration. If the group cannot be parsed by ``dateutil.parser``, an error will be raised and the data will not be published
- ``geometry`` in the WIS2 Notification will be null

For example, to publish GRIB2 data matching the file-pattern ``^.*_(\d{8})\d{2}.*\.grib2$`` the following configuration could be used:

.. code-block:: yaml
grib2:
- plugin: wis2box.data.universal.UniversalData
notify: true
buckets:
- ${WIS2BOX_STORAGE_INCOMING}
file-pattern: '^.*_(\d{8})\d{2}.*\.grib2$' # example: Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-036.grib2 (where 20231207000000 will be used as the datetime)
See :ref:`data-mappings` for a full example data mapping configuration.

Expand Down
8 changes: 8 additions & 0 deletions tests/data/data-mappings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,11 @@ data:
buckets:
- ${WIS2BOX_STORAGE_PUBLIC}
file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.bufr4$'
cn-cma-babj.data.core.weather.prediction.forecast.medium-range.probabilistic.global:
plugins:
grib2:
- plugin: wis2box.data.universal.UniversalData
notify: true
buckets:
- ${WIS2BOX_STORAGE_INCOMING}
file-pattern: '^.*_(\d{8})\d{2}.*\.grib2$'
66 changes: 66 additions & 0 deletions tests/data/metadata/discovery/cn-grapes-geps-global.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
wis2box:
retention: P30D
topic_hierarchy: cn-cma-babj.data.core.weather.prediction.forecast.medium-range.probabilistic.global
country: chn
centre_id: cn-cma-babj

mcf:
version: 1.0

metadata:
identifier: urn:x-wmo:md:cn-cma-babj:grapes-geps-global
hierarchylevel: dataset

identification:
title: CMA GRAPES GEPS v1.3
abstract: GRAPES GEPS is the main technical means to solve the uncertainty of CMA-GFS medium-term forecast and the difficulties of extreme weather forecast.
dates:
creation: 2024-01-17
keywords:
default:
keywords:
- mean sea level Pressure
- 2 m above ground Temperature
- 10 m above ground U-Component of Wind
- 10 m above ground V-Component of Wind
- Total Precipitation
- Geopotential Height
- Temperature
- U-Component of Wind
- V-Component of Wind
wmo:
keywords:
- weather
keywords_type: theme
vocabulary:
name: Earth system disciplines as defined by the WMO Unified Data Policy, Resolution 1 (Cg-Ext(2021), Annex 1.
url: https://github.com/wmo-im/wis2-topic-hierarchy/blob/main/topic-hierarchy/earth-system-discipline/index.csv
extents:
spatial:
- bbox: [73.66000, 4.00000, 135.08000, 53.52000]
crs: 259200
temporal:
- begin: 2021-11-29
end: null
resolution: P12H
url: http://gisc.wis.cma.cn/wis/portal.pub?M_PID=urn:x-wmo:md:int.wmo.wis::CMA_GEPS
wmo_data_policy: core

contact:
pointOfContact: &contact_poc
organization: China Meteorological Administration (CMA)
url: https://www.cma.gov.cn/
individualname: National Meteorological Information Center (NMIC)
positionname: National Meteorological Information Center (NMIC)
phone: 86-10-68409329
fax: null
address: 46 Zhongguancun Nandajie
city: Beijing
administrativearea: Beijing
postalcode: 100 081
country: China
email: [email protected]
hoursofservice: 0000h - 0900h UTC
contactinstructions: email

distributor: *contact_poc
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
11 changes: 6 additions & 5 deletions tests/integration/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_metadata_discovery_publish():
"""Test discovery metadata publishing"""

r = SESSION.get(f'{API_URL}/collections/discovery-metadata/items').json()
assert r['numberMatched'] == 5
assert r['numberMatched'] == 6

r = SESSION.get(f'{API_URL}/collections/discovery-metadata/items/{ID}').json() # noqa

Expand Down Expand Up @@ -110,7 +110,7 @@ def test_metadata_discovery_publish():
r = SESSION.get(f'{API_URL}/collections/discovery-metadata/items',
params=params).json()

assert r['numberMatched'] == 5
assert r['numberMatched'] == 6

# test access of discovery metadata from notification message

Expand Down Expand Up @@ -225,8 +225,8 @@ def test_data_api():
def test_message_api():
"""Test message API collection queries"""

# check messages with "wigos_station_identifier"="0-454-2-AWSBALAKA"
url = f'{API_URL}/collections/messages/items?q=0-454-2-AWSBALAKA&limit=2' # noqa
# check messages with "q=AWSBALAKA" contains link with rel='update'
url = f'{API_URL}/collections/messages/items?q=AWSBALAKA&limit=2' # noqa
r = SESSION.get(url).json()
# get links from 2nd message
links = r['features'][1]['links']
Expand All @@ -236,6 +236,7 @@ def test_message_api():

# test messages per test dataset
counts = {
'cn-cma-babj': 11,
'mw_met_centre': 25,
'roma_met_centre': 33,
'alger_met_centre': 29,
Expand All @@ -253,7 +254,7 @@ def test_message_api():
# should match sum of counts above
assert r['numberMatched'] == sum(counts.values())

msg = r['features'][5]
msg = r['features'][16]
is_valid, _ = validate_message(msg)
assert is_valid

Expand Down
83 changes: 83 additions & 0 deletions wis2box-management/wis2box/data/universal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
###############################################################################
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
###############################################################################

import logging
from pathlib import Path
from typing import Union

from dateutil.parser import parse

from wis2box.data.base import BaseAbstractData

LOGGER = logging.getLogger(__name__)


class UniversalData(BaseAbstractData):
"""Universal data"""

def __init__(self, defs: dict) -> None:
super().__init__(defs)

def transform(self, input_data: Union[Path, bytes],
filename: str = '') -> bool:

filename = Path(filename)
LOGGER.debug('Procesing data')
input_bytes = self.as_bytes(input_data)

LOGGER.debug('Deriving datetime')
match = self.validate_filename_pattern(filename.name)

if match is None:
msg = f'Invalid filename format: {filename} ({self.file_filter})'
LOGGER.error(msg)
raise ValueError(msg)
try:
date_time = match.group(1)
except IndexError:
msg = 'Missing date/time in filename pattern'
LOGGER.error(msg)
raise ValueError(msg)

try:
date_time = parse(date_time)
except Exception:
msg = f'Invalid date/time format: {date_time}'
LOGGER.error(msg)
raise ValueError(msg)

rmk = filename.stem
suffix = filename.suffix.replace('.', '')

self.output_data[rmk] = {
suffix: input_bytes,
'_meta': {
'identifier': rmk,
'relative_filepath': self.get_local_filepath(date_time),
'data_date': date_time
}
}

return True

def get_local_filepath(self, date_):
yyyymmdd = date_.strftime('%Y-%m-%d')
return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath

0 comments on commit dafcecc

Please sign in to comment.