Skip to content

Commit

Permalink
update universal pipeline and center-id,add discovery-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
alimand committed Jan 15, 2024
1 parent 69397bb commit 569af06
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 16 deletions.
2 changes: 1 addition & 1 deletion tests/data/data-mappings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ data:
buckets:
- ${WIS2BOX_STORAGE_PUBLIC}
file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.bufr4$'
chn.babj.data.core.weather.prediction.forecast.shortrange.probabilistic.global.CMA_GEPS:
cn-cma-babj.data.core.weather.prediction.forecast.short-range.probabilistic.global:
plugins:
grib2:
- plugin: wis2box.data.universal.UniversalData
Expand Down
66 changes: 66 additions & 0 deletions tests/data/metadata/discovery/GRAPES-GEPS-GLB.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
wis2box:
retention: P30D
topic_hierarchy: cn-cma-babj.data.core.weather.prediction.forecast.short-range.probabilistic.global
country: chn
centre_id: cn-cma-babj

mcf:
version: 1.0

metadata:
identifier: urn:x-wmo:md:cn-cma-babj:data.core.weather.prediction.forecast.short-range.probabilistic.global
hierarchylevel: dataset

identification:
title: CMA GRAPES GEPS v1.3
abstract: GRAPES GEPS is the main technical means to solve the uncertainty of CMA-GFS medium-term forecast and the difficulties of extreme weather forecast.
dates:
creation: 2023-04-23
keywords:
default:
keywords:
- mean sea level Pressure
- 2 m above ground Temperature
- 10 m above ground U-Component of Wind
- 10 m above ground V-Component of Wind
- Total Precipitation
- Geopotential Height
- Temperature
- U-Component of Wind
- V-Component of Wind
wmo:
keywords:
- weatherObservations
keywords_type: theme
vocabulary:
name: WMO Category Code
url: https://github.com/wmo-im/wcmp-codelists/blob/main/codelists/WMO_CategoryCode.csv
extents:
spatial:
- bbox: [73.66000, 4.00000, 135.08000, 53.52000]
crs: 259200
temporal:
- begin: 2021-11-29
end: null
resolution: P6H
url: http://gisc.wis.cma.cn/wis/portal.pub?M_PID=urn:x-wmo:md:int.wmo.wis::CMA_GEPS
wmo_data_policy: core

contact:
pointOfContact: &contact_poc
organization: China Meteorological Administration (CMA)
url: https://www.cma.gov.cn/
individualname: National Meteorological Information Center (NMIC)
positionname: National Meteorological Information Center (NMIC)
phone: 86-10-68409329
fax: null
address: 46 Zhongguancun Nandajie
city: Beijing
administrativearea: Beijing
postalcode: 100 081
country: China
email: [email protected]
hoursofservice: 0000h - 0900h UTC
contactinstructions: email

distributor: *contact_poc
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Guidance for grib2 data pipeline plugin

1. Related File
/wis2box/wis2box-management/wis2box/data/universal.py
/wis2box/tests/data/data-mappings.py

2. Source Code

"""create function: UniversalData,inherit wis2box.data.base.BaseAbstractData"""

Implement the transform method and fill in the output_data property, returning True

/wis2box/wis2box-management/wis2box/data/universal.py

```py
###############################################################################
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
###############################################################################
from datetime import datetime
import logging
from pathlib import Path
import re
from typing import Union

from dateutil.parser import parse

from wis2box.data.base import BaseAbstractData

LOGGER = logging.getLogger(__name__)


class UniversalData(BaseAbstractData):
"""Universal data"""

def __init__(self, defs: dict) -> None:
super().__init__(defs)

def transform(self, input_data: Union[Path, bytes],
filename: str = '') -> bool:

filename = Path(filename)
LOGGER.debug('Procesing data')
input_bytes = self.as_bytes(input_data)

LOGGER.debug('Deriving datetime')
match = self.validate_filename_pattern(filename.name)

if match is None:
msg = f'Invalid filename format: {filename} ({self.file_filter})'
LOGGER.error(msg)
raise ValueError(msg)
try:
date_time = match.group(1)
except IndexError:
msg = 'Missing date/time in filename pattern'
LOGGER.error(msg)
raise ValueError(msg)

date_time = parse(date_time)

rmk = filename.stem
suffix = filename.suffix.replace('.', '')

self.output_data[rmk] = {
suffix: input_bytes,
'_meta': {
'identifier': rmk,
'relative_filepath': self.get_local_filepath(date_time),
'data_date': date_time
}
}

return True

def get_local_filepath(self, date_):
yyyymmdd = date_.strftime('%Y-%m-%d')
return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath
```

3. Data-mappings.yml configures the topic hierarchy of the numerical prediction data (CMA as an example)
/wis2box/tests/data/data-mappings.py

```yml
data:
cn-cma-babj.data.core.weather.prediction.forecast.short-range.probabilistic.global:
plugins:
grib2:
"""call grib2 data pipeline plugin to deal with CMA_GEPS grib2 data"""
- plugin: wis2box.data.universal.UniversalData
notify: true
buckets:
- ${WIS2BOX_STORAGE_INCOMING}
file-pattern: '^.*_(\d{8})\d{2}.*\.grib2$'
```
4. Test data list
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-024.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-036.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-048.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-060.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-072.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-084.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-096.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-108.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-120.grib2
Z_NAFP_C_BABJ_20231207000000_P_CMA-GEPS-GLB-132.grib2
50 changes: 35 additions & 15 deletions wis2box-management/wis2box/data/universal.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
from datetime import datetime
###############################################################################
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
###############################################################################
import logging
from pathlib import Path
import re
from typing import Union

from dateutil.parser import parse

from wis2box.data.base import BaseAbstractData

LOGGER = logging.getLogger(__name__)
Expand All @@ -20,24 +36,28 @@ def __init__(self, defs: dict) -> None:
def transform(self, input_data: Union[Path, bytes],
filename: str = '') -> bool:

filename2 = Path(filename)
filename = Path(filename)
LOGGER.debug('Procesing data')
input_bytes = self.as_bytes(input_data)

LOGGER.debug('Deriving datetime')
match = self.validate_filename_pattern(filename.name)

match = re.search(self.file_filter, filename2.name)
if match:
if match is None:
msg = f'Invalid filename format: {filename} ({self.file_filter})'
LOGGER.error(msg)
raise ValueError(msg)
try:
date_time = match.group(1)
else:
LOGGER.debug('Could not derive date/time: using today\'s date')
date_time = datetime.now()
except IndexError:
msg = 'Missing date/time in filename pattern'
LOGGER.error(msg)
raise ValueError(msg)

if date_time:
date_time = parse(date_time)
date_time = parse(date_time)

rmk = filename2.stem
suffix = filename2.suffix.replace('.', '')
rmk = filename.stem
suffix = filename.suffix.replace('.', '')

self.output_data[rmk] = {
suffix: input_bytes,
Expand All @@ -52,4 +72,4 @@ def transform(self, input_data: Union[Path, bytes],

def get_local_filepath(self, date_):
yyyymmdd = date_.strftime('%Y-%m-%d')
return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath
return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath

0 comments on commit 569af06

Please sign in to comment.