From 3a47be14840ab09f681372007e0440ade22ae834 Mon Sep 17 00:00:00 2001 From: Benjamin Webb <40066515+webb-ben@users.noreply.github.com> Date: Thu, 13 Jun 2024 11:47:34 -0400 Subject: [PATCH 1/2] Update plugin based on new mapping input --- docker/sta.env | 5 +- requirements.txt | 2 +- wis2box/api/backend/sensorthings.py | 4 + wis2box/data/csv2sta.py | 148 +++++++++++++++++++--------- wis2box/env.py | 1 + wis2box/metadata/datastream.py | 123 +++++++++++++++++++---- wis2box/metadata/thing.py | 2 +- 7 files changed, 216 insertions(+), 69 deletions(-) diff --git a/docker/sta.env b/docker/sta.env index 7c8bd9788..5c345cdba 100644 --- a/docker/sta.env +++ b/docker/sta.env @@ -22,8 +22,9 @@ http_cors_allowed_origins=* # MQTT bus_mqttBroker=tcp://${WIS2BOX_BROKER_HOST}:${WIS2BOX_BROKER_PORT} bus_busImplementationClass=de.fraunhofer.iosb.ilt.sta.messagebus.MqttMessageBus -bus_sendQueueSize=500 +bus_sendQueueSize=1000 +bus_sendWorkerPoolSize=3 # Plugins # plugins.multiDatastream.enable=true -plugins.coreModel.idType=STRING +plugins_coreModel_idType=STRING diff --git a/requirements.txt b/requirements.txt index 3a9ad79f5..4b9ae31e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ isodate minio OWSLib -paho-mqtt +paho-mqtt<2 pygeometa PyYAML requests diff --git a/wis2box/api/backend/sensorthings.py b/wis2box/api/backend/sensorthings.py index 53eaa7367..a3463ccd5 100644 --- a/wis2box/api/backend/sensorthings.py +++ b/wis2box/api/backend/sensorthings.py @@ -124,6 +124,10 @@ def delete_collection_item(self, collection_id: str, item_id: str) -> str: LOGGER.debug(f'Deleting {item_id} from {collection_id}') sta_index = self.sta_id(collection_id) + try: + item_id = int(item_id) + except ValueError: + item_id = f"'{item_id}'" try: self.http.delete(f'{sta_index}({item_id})') except Exception as err: diff --git a/wis2box/data/csv2sta.py b/wis2box/data/csv2sta.py index 1c8c3f10b..4103d7035 100644 --- a/wis2box/data/csv2sta.py +++ b/wis2box/data/csv2sta.py @@ -25,10 +25,12 @@ from io import StringIO import logging from pathlib import Path +from requests import Session from typing import Union +from wis2box.env import NLDI_URL from wis2box.data.geojson import ObservationDataGeoJSON -from wis2box.util import make_uuid +from wis2box.util import make_uuid, url_join LOGGER = logging.getLogger(__name__) @@ -45,29 +47,69 @@ def transform(self, input_data: Union[Path, bytes], fh = StringIO(input_bytes.decode()) reader = DictReader(fh) + http = Session() + for row in reader: + monitoring_location_identifier = \ + row['MonitoringLocationIdentifier'] + url = url_join(NLDI_URL, monitoring_location_identifier) + try: + result = http.get(url) + feature = result.json()['features'][0] + except KeyError: + msg = f'Could not discover {monitoring_location_identifier}' + LOGGER.info(msg) + continue + identifier = row['ResultIdentifier'] unitOfMeasurement = row['ResultMeasure/MeasureUnitCode'] or row['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa datastream = make_uuid(f"{row['CharacteristicName']}-{row['MonitoringLocationIdentifier']}-{unitOfMeasurement}") # noqa - _ = f"{row['ActivityStartDate']} {row['ActivityStartTime/Time']}" - isodate = datetime.strptime( - _, '%Y-%m-%d %H:%M:%S' - ).replace(tzinfo=timezone(row['ActivityStartTime/TimeZoneCode'])) + _ = ' '.join([row['ActivityStartDate'], row['ActivityStartTime/Time']]) # noqa + try: + isodate = datetime.strptime(_, '%Y-%m-%d %H:%M:%S') + except ValueError: + isodate = datetime.strptime(_, '%Y-%m-%d ') + try: + isodate = isodate.replace( + tzinfo=timezone(row['ActivityStartTime/TimeZoneCode'])) + except Exception: + LOGGER.info('Could not apply time zone information') + rowdate = isodate.strftime('%Y-%m-%dT%H:%M:%SZ') isodate = isodate.strftime('%Y%m%dT%H%M%S') - LongitudeMeasure = row['ActivityLocation/LongitudeMeasure'] # noqa - LatitudeMeasure = row['ActivityLocation/LatitudeMeasure'] # noqa + try: + analysisStartDate = datetime.strptime( + row['AnalysisStartDate'], '%Y-%m-%d' + ).strftime('%Y-%m-%dT%H:%M:%SZ') + except ValueError: + analysisStartDate = rowdate + + try: + LongitudeMeasure = float(row['ActivityLocation/LongitudeMeasure']) # noqa + LatitudeMeasure = float(row['ActivityLocation/LatitudeMeasure']) # noqa + geom = { + 'type': 'Point', + 'coordinates': [LongitudeMeasure, LatitudeMeasure] + } + except ValueError: + geom = feature['geometry'] + try: result = float(row['ResultMeasureValue']) except ValueError: result = row['ResultDetectionConditionText'] - if not result: - LOGGER.warning(f'No results for {identifier}') - continue - + resultQuality = { + 'detectionCondition': row['ResultDetectionConditionText'], + 'precision': row['DataQuality/PrecisionValue'], + 'accuracy': row['DataQuality/BiasValue'], + 'detectionLimit': { + 'value': row['DetectionQuantitationLimitMeasure/MeasureValue'], # noqa + 'unit': row['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa + } + } resultQuality = (row['MeasureQualifierCode'] or row['ResultStatusIdentifier']) or ' '.join([ # noqa row['ResultDetectionQuantitationLimitUrl'], row['DetectionQuantitationLimitMeasure/MeasureValue'], @@ -82,57 +124,69 @@ def transform(self, input_data: Union[Path, bytes], }, 'geojson': { 'phenomenonTime': rowdate, - 'resultTime': rowdate, + 'resultTime': analysisStartDate, 'result': result, 'resultQuality': resultQuality, 'parameters': { - 'ResultCommentText': row['ResultCommentText'], - 'HydrologicCondition': row['HydrologicCondition'], - 'HydrologicEvent': row['HydrologicEvent'] + 'hydrologicCondition': row['HydrologicCondition'], + 'hydrologicEvent': row['HydrologicEvent'], + 'modified': row['LastUpdated'], + 'status': row['ResultStatusIdentifier'], + 'publisher': row['ProviderName'], + 'valueType': row['ResultValueTypeName'], + 'comment': row['ResultCommentText'] }, 'Datastream': {'@iot.id': datastream}, 'FeatureOfInterest': { '@iot.id': datastream, 'name': row['MonitoringLocationName'], 'description': row['MonitoringLocationName'], - 'encodingType': 'application/vnd.geo+json', - 'feature': { - 'type': 'Point', - 'coordinates': [LongitudeMeasure, LatitudeMeasure] - }, + 'encodingType': 'application/geo+json', + 'feature': geom, }, } } - try: - depth = float(row['ActivityDepthHeightMeasure/MeasureValue']) - LOGGER.info('Adding samplings') + deployment_info = row['ActivityTypeCode'] in ( + 'Field Msr/Obs-Portable Data Logger', 'Field Msr/Obs') + if not deployment_info: + LOGGER.info('Adding Sampling Entity') + sampling_name = '-'.join([ + row['MonitoringLocationIdentifier'], + row['ActivityIdentifier'] + ]) + samplingProcedure_id = '-'.join([ + row['SampleCollectionMethod/MethodIdentifierContext'], + row['SampleCollectionMethod/MethodIdentifier'] + ]) featureOfInterest = self.output_data[identifier]['geojson']['FeatureOfInterest'] # noqa - featureOfInterest['Samplings'] = [{ - 'name': row['ActivityTypeCode'], - 'description': row['ActivityTypeCode'] + row['ActivityRelativeDepthName'], # noqa - 'atDepth': depth, # noqa - 'depthUom': row['ActivityDepthHeightMeasure/MeasureUnitCode'], # noqa - 'encodingType': 'application/vnd.geo+json', - 'samplingLocation': { - 'type': 'Point', - 'coordinates': [LongitudeMeasure, LatitudeMeasure] - }, - 'Thing': { - '@iot.id': row['MonitoringLocationIdentifier'] - }, - 'Sampler': { - 'name': row['OrganizationFormalName'], - 'SamplingProcedure': { - 'name': row['ActivityTypeCode'] + + try: + featureOfInterest['Samplings'] = [{ + 'name': sampling_name, + 'description': row['ActivityTypeCode'] + row['ActivityRelativeDepthName'], # noqa + 'depthUom': row['ResultDepthHeightMeasure/MeasureUnitCode'], # noqa + 'encodingType': 'application/geo+json', + # 'samplingLocation': geom, + 'Thing': { + '@iot.id': row['MonitoringLocationIdentifier'] + }, + 'Sampler': { + 'name': row['OrganizationFormalName'], + 'SamplingProcedure': { + '@iot.id': make_uuid(samplingProcedure_id), + 'name': row['SampleCollectionMethod/MethodName'], # noqa + 'definition': row['SampleCollectionMethod/MethodDescriptionText'], # noqa + 'description': row['SampleCollectionMethod/MethodDescriptionText'] # noqa + } } - }, - 'SamplingProcedure': { - 'name': row['ActivityTypeCode'] - } - }] - except (TypeError, ValueError): - LOGGER.info('No Sampling detected') + }] + if row['ActivityDepthHeightMeasure/MeasureValue']: + featureOfInterest['Samplings'][0]['atDepth'] = \ + row['ActivityDepthHeightMeasure/MeasureValue'] + + except (TypeError, ValueError): + LOGGER.error('No Sampling detected') def __repr__(self): return '' diff --git a/wis2box/env.py b/wis2box/env.py index 6eb3d4744..5be79613b 100644 --- a/wis2box/env.py +++ b/wis2box/env.py @@ -95,6 +95,7 @@ THINGS = 'Things' GEOCONNEX = 'https://geoconnex.us/' +NLDI_URL = 'https://labs.waterdata.usgs.gov/api/nldi/linked-data/wqp' WQP_URL = 'https://www.waterqualitydata.us' STATION_URL = url_join(WQP_URL, 'data/Station/search') RESULTS_URL = url_join(WQP_URL, 'data/Result/search') diff --git a/wis2box/metadata/datastream.py b/wis2box/metadata/datastream.py index 07230f1c6..2dacf0b2a 100644 --- a/wis2box/metadata/datastream.py +++ b/wis2box/metadata/datastream.py @@ -21,15 +21,18 @@ import click from csv import DictReader +from datetime import datetime from io import StringIO import logging +from pytz import timezone import re from requests import Session +from typing import Iterable from wis2box import cli_helpers from wis2box.api import setup_collection -from wis2box.env import RESULTS_URL, WQP_URL +from wis2box.env import RESULTS_URL, WQP_URL, NLDI_URL from wis2box.resources.code_mapping import MAPPING from wis2box.util import make_uuid, url_join @@ -87,30 +90,108 @@ def fetch_datastreams(station_id: str): return datastreams -def yield_datastreams(datasets: dict) -> list: +def yield_datastreams(station_identifier: str, + datasets: dict) -> Iterable: """ Yield datasets from USBR RISE API - :returns: `list`, of link relations for all datasets + :returns: `Iterable`, of link relations for all datasets """ + http = Session() + for id, dataset in datasets.items(): + kwargs = {} + monitoring_location_identifier = \ + dataset['MonitoringLocationIdentifier'] + url = url_join(NLDI_URL, monitoring_location_identifier) + try: + result = http.get(url) + feature = result.json()['features'][0] + mainstem = http.get(feature['properties']['mainstem']).json() + kwargs['UltimateFeatureOfInterest'] = { + '@iot.id': make_uuid(feature['properties']['mainstem']), + 'name': mainstem['properties']['name_at_outlet'], + 'description': mainstem['properties']['name_at_outlet'], + 'encodingType': 'application/geo+json', + 'feature': mainstem['geometry'], + 'properties': { + 'uri': feature['properties']['mainstem'] + } + } + except KeyError: + LOGGER.info(f'Could not discover {monitoring_location_identifier}') + continue + + sensor_kwargs = {} + sensor_name = ' '.join([ + dataset['ResultAnalyticalMethod/MethodName'], + 'applied', 'by', dataset['OrganizationFormalName']]) sensor_ResultAnalyticalMethodMethodIdentifier = dataset[ 'ResultAnalyticalMethod/MethodIdentifier'] sensor_ResultAnalyticalMethodMethodIdentifierContext = dataset[ 'ResultAnalyticalMethod/MethodIdentifierContext'] if sensor_ResultAnalyticalMethodMethodIdentifier and sensor_ResultAnalyticalMethodMethodIdentifierContext: # noqa sensor_identifier = f"{sensor_ResultAnalyticalMethodMethodIdentifierContext}-{sensor_ResultAnalyticalMethodMethodIdentifier}" # noqa - sensor_description = dataset[ - 'ResultAnalyticalMethod/MethodDescriptionText'] + sensor_description = ' '.join([ + dataset['ResultAnalyticalMethod/MethodName'], + 'applied', 'by', dataset['OrganizationFormalName'], + 'analyzed', 'by', dataset['LaboratoryName']]) else: sensor_identifier = f"{dataset['SampleCollectionMethod/MethodIdentifierContext'] or dataset['OrganizationIdentifier']}-{dataset['SampleCollectionMethod/MethodIdentifier']}" # noqa - sensor_description = dataset[ - 'SampleCollectionMethod/MethodDescriptionText'] + sensor_description = ' '.join([ + dataset['SampleCollectionMethod/MethodName'], + 'applied', 'by', dataset['OrganizationFormalName'], + 'analyzed', 'by', dataset['LaboratoryName']]) + + observed_property_name = ' '.join([ + dataset['ResultSampleFractionText'], + dataset['CharacteristicName'], + dataset['MethodSpeciationName'] + ]).strip() - observed_property_name = ' '.join([dataset['ResultSampleFractionText'], - dataset['CharacteristicName'], - dataset['MethodSpeciationName']] - ).strip() + observing_procedure_id = '-'.join([ + dataset['ResultAnalyticalMethod/MethodIdentifierContext'], + dataset['ResultAnalyticalMethod/MethodIdentifier']]) + + deployment_info = dataset['ActivityTypeCode'] in ( + 'Field Msr/Obs-Portable Data Logger', 'Field Msr/Obs') + if deployment_info: + _ = ' '.join([dataset['ActivityStartDate'], + dataset['ActivityStartTime/Time']]) + try: + isodate = datetime.strptime(_, '%Y-%m-%d %H:%M:%S') + except ValueError: + isodate = datetime.strptime(_, '%Y-%m-%d ') + try: + isodate = isodate.replace( + tzinfo=timezone(dataset['ActivityStartTime/TimeZoneCode'])) + except Exception: + LOGGER.info('Could not apply time zone information') + deployment_ActivityStartTime = isodate.strftime( + '%Y-%m-%dT%H:%M:%SZ') + deployment_description = ' '.join([ + dataset['OrganizationFormalName'], + dataset['ActivityTypeCode'], 'at', + dataset['MonitoringLocationName'], 'at', + dataset['ActivityStartDate'] + ]) + deployment_id = '-'.join([dataset['ActivityIdentifier'], + dataset['MonitoringLocationIdentifier']]) + sensor_kwargs['Deployments'] = [{ + '@iot.id': make_uuid(deployment_id), + 'name': deployment_id, + 'deploymentTime': deployment_ActivityStartTime, + 'depthUom': dataset['ActivityDepthHeightMeasure/MeasureUnitCode'], # noqa + 'description': deployment_description, + 'reason': dataset['ProjectName'], + 'Host': {'@iot.id': station_identifier}, + 'properties': { + 'orgName': dataset['OrganizationFormalName'] + } + }] + if dataset['ActivityDepthHeightMeasure/MeasureValue']: + sensor_kwargs['Deployments'][0]['atDepth'] = \ + dataset['ActivityDepthHeightMeasure/MeasureValue'] observed_property_definition = '' _url = 'https://cdxapps.epa.gov/oms-substance-registry-services/substance-details' # noqa @@ -129,7 +210,6 @@ def yield_datastreams(datasets: dict) -> list: break except KeyError: continue - unitOfMeasurement = dataset['ResultMeasure/MeasureUnitCode'] or dataset['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa yield { '@iot.id': id, @@ -152,22 +232,29 @@ def yield_datastreams(datasets: dict) -> list: 'definition': observed_property_definition, 'properties': { 'USGSPCode': dataset['USGSPCode'], - 'MethodSpeciationName': dataset['MethodSpeciationName'], + 'speciation': dataset['MethodSpeciationName'], 'iop': dataset['ResultSampleFractionText'] } }, + 'ObservingProcedure': { + '@iot.id': observing_procedure_id, + 'name': dataset['ResultAnalyticalMethod/MethodName'] + }, 'Sensor': { - 'name': dataset['SampleCollectionMethod/MethodName'], + '@iot.id': sensor_identifier, + 'name': sensor_name, 'description': sensor_description, - 'metadata': dataset['ResultAnalyticalMethod/MethodUrl'], + 'metadata': dataset['ResultAnalyticalMethod/MethodDescriptionText'], # noqa 'encodingType': 'text/html', 'properties': { 'identifier': sensor_identifier, 'EquipmentName': dataset['SampleCollectionEquipmentName'], 'ResultValueTypeName': dataset['ResultValueTypeName'], 'ResultAnalyticalMethod.MethodUrl': dataset['ResultAnalyticalMethod/MethodUrl'] # noqa - } - } + }, + **sensor_kwargs + }, + **kwargs } @@ -177,7 +264,7 @@ def load_datastreams(station_id: str): :returns: `list`, of link relations for all datasets """ - return yield_datastreams(fetch_datastreams(station_id)) + return yield_datastreams(station_id, fetch_datastreams(station_id)) @click.group() diff --git a/wis2box/metadata/thing.py b/wis2box/metadata/thing.py index f371e091e..51b1a7b63 100644 --- a/wis2box/metadata/thing.py +++ b/wis2box/metadata/thing.py @@ -146,7 +146,7 @@ def cache_stations(ctx, counties, verbosity): 'countycode': _counties, 'zip': 'yes', 'mimeType': 'csv', - 'startDateLo': '01-01-2020', + 'startDateLo': '01-01-2023', 'dataProfile': 'resultPhysChem' } r = requests.get(RESULTS_URL, params=params) From 5b3fa96e79c8b22b1a30a3c9d9499c8efa5e491b Mon Sep 17 00:00:00 2001 From: Benjamin Webb <40066515+webb-ben@users.noreply.github.com> Date: Thu, 13 Jun 2024 23:08:26 -0400 Subject: [PATCH 2/2] Update default.env --- docker/default.env | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker/default.env b/docker/default.env index 029e8b35c..8f302eb4d 100644 --- a/docker/default.env +++ b/docker/default.env @@ -15,6 +15,8 @@ WIS2BOX_DOCKER_API_URL=http://wis2box-api:80/oapi WIS2BOX_LOGGING_LOGLEVEL=ERROR WIS2BOX_LOGGING_LOGFILE=stdout +WIS2BOX_UI_CLUSTER=True + # PubSub WIS2BOX_BROKER_USERNAME=wis2box WIS2BOX_BROKER_PASSWORD=wis2box