Skip to content

Commit

Permalink
Merge pull request #31 from terraref/syntax-updates
Browse files Browse the repository at this point in the history
EnvLog metadata updates
  • Loading branch information
max-zilla authored Sep 12, 2019
2 parents c292525 + 60238b5 commit 2ac42cf
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 195 deletions.
7 changes: 2 additions & 5 deletions envlog2netcdf/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
FROM terraref/terrautils
FROM terraref/terrautils:1.4
MAINTAINER Max Burnette <[email protected]>

# Install any programs needed
RUN useradd -u 49044 extractor \
&& mkdir -p /home/extractor/sites/ua-mac/raw_data \
&& mkdir -p /home/extractor/sites/ua-mac/Level_1/EnvironmentLogger \
&& chown -R extractor /home/extractor
RUN useradd -u 49044 extractor

RUN apt-get -y update && \
apt-get -y install nco && \
Expand Down
6 changes: 3 additions & 3 deletions envlog2netcdf/extractor_info.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{
"@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld",
"name": "terra.environmental.envlog2netcdf",
"version": "1.0",
"version": "1.1",
"description": "EnvironmentLogger to NetCDF extractor",
"author": "Max Burnette <[email protected]>",
"contributors": [],
"contexts": [],
"repository": {"repType": "git", "repUrl": "https://github.com/terraref/extractors-environmental.git"},
"process": {
"file": [
"application/json"
"dataset": [
"file.added"
]
},
"external_services": [],
Expand Down
281 changes: 103 additions & 178 deletions envlog2netcdf/terra_envlog2netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,34 @@

import datetime
import os
import logging
import shutil
import subprocess
import time
from netCDF4 import Dataset
from collections import OrderedDict
import json
from netCDF4 import Dataset

from pyclowder.utils import CheckMessage
from pyclowder.datasets import get_info, get_file_list
from pyclowder.files import upload_to_dataset, upload_metadata, download_metadata, download as download_file
from terrautils.extractors import TerrarefExtractor, build_dataset_hierarchy, build_metadata
from terrautils.geostreams import get_sensor_by_name, create_datapoints, create_stream, \
create_sensor, get_stream_by_name
from pyclowder.datasets import get_info, get_file_list, download_metadata, upload_metadata
from pyclowder.files import upload_to_dataset, submit_extraction
from terrautils.extractors import TerrarefExtractor, build_dataset_hierarchy_crawl, build_metadata, \
is_latest_file, file_exists, contains_required_files
from terrautils.metadata import get_extractor_metadata

import environmental_logger_json2netcdf as ela


def add_local_arguments(parser):
# add any additional arguments to parser
parser.add_argument('--batchsize', type=int, default=3000,
help="max number of datapoints to submit at a time")

def get_hour(filename):
timestamp = filename.split('_')[1]
hour = timestamp.split('-')[0]
return int(hour)
def _produce_attr_dict(netCDF_variable_obj):
'''
Produce a list of dictionary with attributes and value (Each dictionary is one datapoint)
'''
attributes = [attr for attr in dir(netCDF_variable_obj) if isinstance(attr, unicode)]
result = {name:getattr(netCDF_variable_obj, name) for name in attributes}

return [dict(result.items()+ {"value":str(data)}.items()) for data in netCDF_variable_obj[...]]

class EnvironmentLoggerJSON2NetCDF(TerrarefExtractor):
def __init__(self):
Expand All @@ -43,194 +44,118 @@ def __init__(self):
self.batchsize = self.args.batchsize

def check_message(self, connector, host, secret_key, resource, parameters):
# Only trigger extraction if the newly added file is a relevant JSON file
if not resource['name'].endswith("_environmentlogger.json"):
return CheckMessage.ignore
if "rulechecked" in parameters and parameters["rulechecked"]:
return CheckMessage.download

file_md = download_metadata(connector, host, secret_key, resource['id'], self.extractor_info['name'])
if len(file_md) > 0:
# This file was already processed
if not is_latest_file(resource):
self.log_skip(resource, "not latest file")
return CheckMessage.ignore

return CheckMessage.download
if len(resource['files']) >= 23:
md = download_metadata(connector, host, secret_key, resource['id'])
if get_extractor_metadata(md, self.extractor_info['name'], self.extractor_info['version']):
timestamp = resource['name'].split(" - ")[1]
out_fullday_netcdf = self.sensors.create_sensor_path(timestamp)
out_fullday_csv = out_fullday_netcdf.replace(".nc", "_geo.csv")
if file_exists(out_fullday_netcdf) and file_exists(out_fullday_csv):
self.log_skip(resource, "metadata v%s and outputs already exist" % self.extractor_info['version'])
return CheckMessage.ignore
return CheckMessage.download
else:
self.log_skip(resource, "found less than 23 files")
return CheckMessage.ignore

def process_message(self, connector, host, secret_key, resource, parameters):
self.start_message()
self.start_message(resource)

# Build list of JSON files
json_files = []
for f in resource['files']:
if f['filename'].endswith("_environmentlogger.json"):
if f['filepath'].startswith("/home/clowder"):
json_files.append(f['filepath'].replace("/home/clowder", "/home/extractor"))
else:
json_files.append(f['filepath'])
json_files.sort()

# path to input JSON file
in_envlog = resource['local_paths'][0]
ds_info = get_info(connector, host, secret_key, resource['parent']['id'])
timestamp = ds_info['name'].split(" - ")[1]
out_temp = "temp_output.nc"
# Determine full output path
timestamp = resource['name'].split(" - ")[1]
out_fullday_netcdf = self.sensors.create_sensor_path(timestamp)
lockfile = out_fullday_netcdf.replace(".nc", ".lock")

logging.info("converting JSON to: %s" % out_temp)
ela.mainProgramTrigger(in_envlog, out_temp)

full_file = 'full_file.json'

self.created += 1
self.bytes += os.path.getsize(out_temp)

# Merge this chunk into full day
if not os.path.exists(out_fullday_netcdf):
shutil.move(out_temp, out_fullday_netcdf)

# Push to geostreams
prepareDatapoint(connector, host, secret_key, resource, out_fullday_netcdf)
else:
# Create lockfile to make sure we don't step on each others' toes
total_wait = 0
max_wait_mins = 10
while os.path.exists(lockfile):
time.sleep(1)
total_wait += 1
if total_wait > max_wait_mins*60:
logging.error("wait time for %s exceeded %s minutes, unlocking" % (lockfile, max_wait_mins))
os.remove(lockfile)

open(lockfile, 'w').close()
try:
cmd = "ncrcat --record_append %s %s" % (out_temp, out_fullday_netcdf)
temp_out_full = os.path.join(os.path.dirname(out_fullday_netcdf), "temp_full.nc")
temp_out_single = temp_out_full.replace("_full.nc", "_single.nc")
geo_csv = out_fullday_netcdf.replace(".nc", "_geo.csv")

if not file_exists(temp_out_full):
for json_file in json_files:
self.log_info(resource, "converting %s to netCDF & appending" % os.path.basename(json_file))
ela.mainProgramTrigger(json_file, temp_out_single)
cmd = "ncrcat --record_append %s %s" % (temp_out_single, temp_out_full)
subprocess.call([cmd], shell=True)
finally:
os.remove(lockfile)

# Push to geostreams
prepareDatapoint(connector, host, secret_key, resource, out_temp, self.batchsize)
os.remove(out_temp)
os.remove(temp_out_single)

shutil.move(temp_out_full, out_fullday_netcdf)
self.created += 1
self.bytes += os.path.getsize(out_fullday_netcdf)

# Write out geostreams.csv
if not file_exists(geo_csv):
self.log_info(resource, "writing geostreams CSV")
geo_file = open(geo_csv, 'w')
geo_file.write(','.join(['site', 'trait', 'lat', 'lon', 'dp_time', 'source', 'value', 'timestamp']) + '\n')
with Dataset(out_fullday_netcdf, "r") as ncdf:
streams = set([sensor_info.name for sensor_info in ncdf.variables.values() if sensor_info.name.startswith('sensor')])
for stream in streams:
if stream != "sensor_spectrum":
try:
memberlist = ncdf.get_variables_by_attributes(sensor=stream)
for members in memberlist:
data_points = _produce_attr_dict(members)
for index in range(len(data_points)):
dp_obj = data_points[index]
if dp_obj["sensor"] == stream:
time_format = "%Y-%m-%dT%H:%M:%S-07:00"
time_point = (datetime.datetime(year=1970, month=1, day=1) + \
datetime.timedelta(days=ncdf.variables["time"][index])).strftime(time_format)

geo_file.write(','.join(["Full Field - Environmental Logger",
"(EL) %s" % stream,
str(33.075576),
str(-111.974304),
time_point,
host + ("" if host.endswith("/") else "/") + "datasets/" + resource['id'],
'"%s"' % json.dumps(dp_obj).replace('"', '""'),
timestamp]) + '\n')

except:
self.log_error(resource, "NetCDF attribute not found: %s" % stream)

# Fetch dataset ID by dataset name if not provided
target_dsid = build_dataset_hierarchy(host, secret_key, self.clowder_user, self.clowder_pass, self.clowderspace,
self.sensors.get_display_name(), timestamp[:4], timestamp[5:7],
leaf_ds_name=self.sensors.get_display_name()+' - '+timestamp)
target_dsid = build_dataset_hierarchy_crawl(host, secret_key, self.clowder_user, self.clowder_pass, self.clowderspace,
None, None, self.sensors.get_display_name(),
timestamp[:4], timestamp[5:7], timestamp[8:10],
leaf_ds_name=self.sensors.get_display_name() + ' - ' + timestamp)
ds_files = get_file_list(connector, host, secret_key, target_dsid)
found_full = False

found_csv = False
for f in ds_files:
if f['filename'] == os.path.basename(out_fullday_netcdf):
found_full = True
if f['filename'] == os.path.basename(geo_csv):
found_csv = True
if not found_full:
upload_to_dataset(connector, host, secret_key, target_dsid, out_fullday_netcdf)
if not found_csv:
geoid = upload_to_dataset(connector, host, secret_key, target_dsid, geo_csv)
self.log_info(resource, "triggering geostreams extractor on %s" % geoid)
submit_extraction(connector, host, secret_key, geoid, "terra.geostreams")

# Tell Clowder this is completed so subsequent file updates don't daisy-chain
ext_meta = build_metadata(host, self.extractor_info, resource['id'], {
"output_dataset": target_dsid
}, 'file')
}, 'dataset')
upload_metadata(connector, host, secret_key, resource['id'], ext_meta)

# ADDED BY TODD

if len(ds_files) == 23 and not os.path.isfile(full_file):
hourly_json_files = []
json_for_files = dict()
file_names = []
for ds_file in ds_files:
current_file_name = ds_file['filename']
file_names.append(current_file_name)
file_as_json = download_file(connector, host, secret_key, ds_file['id'], ext='.json')
with open(file_as_json, 'r') as f:
content = f.read()
json_for_files[current_file_name] = json.loads(content)
# SORT the dictionary
sorted_json = OrderedDict(sorted(json_for_files.items(), key=lambda t: get_hour(t[0])))
keys = sorted_json.keys()
for key in keys:
value = sorted_json[key]
hourly_json_files.append(value)
with open(full_file, 'w') as f:
f.write(json.dumps(hourly_json_files))
f.close()

if len(ds_files) == 24:
json_for_files = dict()
file_names = []
for ds_file in ds_files:
current_file_name = ds_file['filename']
file_names.append(current_file_name)
file_as_json = download_file(connector, host, secret_key, ds_file['id'], ext='.json')
with open(file_as_json, 'r') as f:
content = f.read()
json_for_files[current_file_name] = json.loads(content)
# SORT the dictionary
sorted_json = OrderedDict(sorted(json_for_files.items(), key=lambda t: get_hour(t[0])))
keys = sorted_json.keys()
for key in keys:
value = sorted_json[key]
hourly_json_files.append(value)
with open(full_file, 'w') as f:
f.write(json.dumps(hourly_json_files))
f.close()

self.end_message()


def _produce_attr_dict(netCDF_variable_obj):
'''
Produce a list of dictionary with attributes and value (Each dictionary is one datapoint)
'''
attributes = [attr for attr in dir(netCDF_variable_obj) if isinstance(attr, unicode)]
result = {name:getattr(netCDF_variable_obj, name) for name in attributes}

return [dict(result.items()+ {"value":str(data)}.items()) for data in netCDF_variable_obj[...]]

def prepareDatapoint(connector, host, secret_key, resource, ncdf, batchsize):
# TODO: Get this from Clowder
coords = [-111.974304, 33.075576, 0]
geom = {
"type": "Point",
"coordinates": coords
}

with Dataset(ncdf, "r") as netCDF_handle:
sensor_name = "Full Field - Environmental Logger"
sensor_data = get_sensor_by_name(connector, host, secret_key, sensor_name)
if not sensor_data:
sensor_id = create_sensor(connector, host, secret_key, sensor_name, geom,
{"id": "MAC Field Scanner", "title": "MAC Field Scanner", "sensorType": 4},
"Maricopa")
else:
sensor_id = sensor_data['id']

stream_list = set([sensor_info.name for sensor_info in netCDF_handle.variables.values() if sensor_info.name.startswith('sensor')])
for stream in stream_list:
if stream != "sensor_spectrum":
stream_name = "(EL) %s" % stream
stream_data = get_stream_by_name(connector, host, secret_key, stream_name)
if not stream_data:
stream_id = create_stream(connector, host, secret_key, stream_name, sensor_id, geom)
else:
stream_id = stream_data['id']

try:
memberlist = netCDF_handle.get_variables_by_attributes(sensor=stream)
for members in memberlist:
data_points = _produce_attr_dict(members)
data_point_list = []
for index in range(len(data_points)):
dp_obj = data_points[index]
if dp_obj["sensor"] == stream:
time_format = "%Y-%m-%dT%H:%M:%S-07:00"
time_point = (datetime.datetime(year=1970, month=1, day=1) + \
datetime.timedelta(days=netCDF_handle.variables["time"][index])).strftime(time_format)

data_point_list.append({
"start_time": time_point,
"end_time": time_point,
"type": "Point",
"geometry": geom,
"properties": dp_obj
})

if len(data_point_list) > batchsize:
create_datapoints(connector, host, secret_key, stream_id, data_point_list)
data_point_list = []

if len(data_point_list) > 0:
create_datapoints(connector, host, secret_key, stream_id, data_point_list)
except:
logging.error("NetCDF attribute not found: %s" % stream)
self.end_message(resource)

if __name__ == "__main__":
extractor = EnvironmentLoggerJSON2NetCDF()
Expand Down
2 changes: 1 addition & 1 deletion weather_datparser/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM terraref/terrautils
FROM terraref/terrautils:1.5
MAINTAINER Max Burnette <[email protected]>

# Install any programs needed
Expand Down
Loading

0 comments on commit 2ac42cf

Please sign in to comment.