From 55f205ded569b036669d6b356c28abec78263476 Mon Sep 17 00:00:00 2001 From: James Badger Date: Wed, 9 May 2018 23:50:30 -0600 Subject: [PATCH] Add Observation upload Adds Observation entity class. Adds version module function. --- lib/transloader.rb | 8 +- lib/transloader/environment_canada_station.rb | 86 +++++++++++++++++++ lib/transloader/observation.rb | 64 ++++++++++++++ lib/transloader/version.rb | 5 ++ transload | 16 +++- 5 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 lib/transloader/observation.rb create mode 100644 lib/transloader/version.rb diff --git a/lib/transloader.rb b/lib/transloader.rb index 55fc903..55bfc63 100644 --- a/lib/transloader.rb +++ b/lib/transloader.rb @@ -1,12 +1,14 @@ require 'transloader/args' -require 'transloader/datastream' require 'transloader/entity' -require 'transloader/environment_canada_station' -require 'transloader/environment_canada_provider' +require 'transloader/datastream' require 'transloader/location' +require 'transloader/observation' require 'transloader/observed_property' require 'transloader/sensor' require 'transloader/thing' +require 'transloader/environment_canada_station' +require 'transloader/environment_canada_provider' +require 'transloader/version' module Transloader end diff --git a/lib/transloader/environment_canada_station.rb b/lib/transloader/environment_canada_station.rb index cc4976c..817289d 100644 --- a/lib/transloader/environment_canada_station.rb +++ b/lib/transloader/environment_canada_station.rb @@ -201,6 +201,92 @@ def put_metadata(server_url) save_metadata end + # Upload station observations for `date` to the SensorThings API server at + # `destination`. If `date` is "latest", then the most recent SWOB-ML file + # is used. + def put_observations(destination, date) + puts "Uploading observations for #{date} to #{destination}" + + # Check for metadata + if @metadata.empty? + raise "Error: station metadata not loaded" + exit 3 + end + + # Check for cached datastream URLs + @metadata['datastreams'].each do |stream| + if stream['Datastream@iot.navigationLink'].nil? + raise "Error: Datastream navigation URLs not cached" + exit 3 + end + end + + # Check for cached observations at date + if !Dir.exist?(@observations_path) + raise "Error: observation cache directory does not exist" + exit 3 + end + + if date == "latest" + begin + year_dir = Dir.entries(@observations_path).last + month_dir = Dir.entries(File.join(@observations_path, year_dir)).last + day_dir = Dir.entries(File.join(@observations_path, year_dir, month_dir)).last + filename = Dir.entries(File.join(@observations_path, year_dir, month_dir, day_dir)).last + rescue + puts "Error: Could not locate latest observation cache file" + exit 3 + end + + file_path = File.join(@observations_path, year_dir, month_dir, day_dir, filename) + else + locate_date = DateTime.parse(date) + file_path = File.join(@observations_path, locate_date.strftime('%Y/%m/%d/%H%M%S%z.xml')) + + if !File.exist?(file_path) + raise "Error: Could not locate desired observation cache file: #{file_path}" + exit 3 + end + end + + puts "Uploading observations from #{file_path}" + + xml = observation_xml + @metadata['datastreams'].each do |datastream| + datastream_url = datastream['Datastream@iot.navigationLink'] + datastream_name = datastream['name'] + + + if xml.xpath("//om:result/po:elements/po:element[@name='#{datastream_name}']", NAMESPACES).empty? + # The result is not in this SWOB-ML document, perhaps not reported + # during this reporting interval. In that case, no Observation is + # created. + else + # OBSERVATION entity + # Create Observation entity + # TODO: Coerce result type based on datastream observation type + + result = xml.xpath("//om:result/po:elements/po:element[@name='#{datastream_name}']/@value", NAMESPACES).text + + # SensorThings API does not like an empty string, instead "null" string + # should be used. + if result == "" + puts "Found null for #{datastream_name}" + result = "null" + end + + observation = Observation.new({ + phenomenonTime: xml.xpath('//om:samplingTime/gml:TimeInstant/gml:timePosition', NAMESPACES).text, + result: result, + resultTime: xml.xpath('//om:resultTime/gml:TimeInstant/gml:timePosition', NAMESPACES).text + }) + + # Upload entity and parse response + observation.upload_to(datastream_url) + end + end + end + # Save the Station metadata to the metadata cache file def save_metadata IO.write(@metadata_path, JSON.pretty_generate(@metadata)) diff --git a/lib/transloader/observation.rb b/lib/transloader/observation.rb new file mode 100644 index 0000000..ec994c6 --- /dev/null +++ b/lib/transloader/observation.rb @@ -0,0 +1,64 @@ +require 'json' +require 'uri' + +require 'transloader/entity' + +module Transloader + # Observation entity class. + class Observation < Entity + + attr_accessor :phenomenon_time, :result, :result_time + + def initialize(attributes) + super(attributes) + @phenomenon_time = attributes[:phenomenonTime] + @result = attributes[:result] + @result_time = attributes[:resultTime] + end + + def to_json + JSON.generate({ + phenomenonTime: @phenomenon_time, + result: @result, + resultTime: @result_time + }) + end + + # Check if self is a subset of entity. + # Cycling through JSON makes the keys the same order and all stringified. + def same_as?(entity) + JSON.parse(self.to_json) == JSON.parse(JSON.generate({ + phenomenonTime: entity['phenomenonTime'], + result: entity['result'], + resultTime: entity['resultTime'] + })) + end + + def upload_to(url) + upload_url = self.join_uris(url, "Observations") + + filter = "phenomenonTime eq '#{@phenomenon_time}'" + response = self.get(URI(upload_url + "?$filter=#{filter}")) + body = JSON.parse(response.body) + + # Look for matching existing entities. If no entities match, use POST to + # create a new entity. If one or more entities match, then the first is + # re-used. If the matching entity has the same phenomenonTime but + # different other attributes, then a PATCH request is used to + # synchronize. + if body["value"].length == 0 + self.post_to_path(upload_url) + else + existing_entity = body["value"].first + @link = existing_entity['@iot.selfLink'] + @id = existing_entity['@iot.id'] + + if same_as?(existing_entity) + puts "Re-using existing Observation entity." + else + self.patch_to_path(URI(@link)) + end + end + end + end +end diff --git a/lib/transloader/version.rb b/lib/transloader/version.rb new file mode 100644 index 0000000..90a6f51 --- /dev/null +++ b/lib/transloader/version.rb @@ -0,0 +1,5 @@ +module Transloader + def self.version + '0.1.0' + end +end diff --git a/transload b/transload index 67ed682..c312d0d 100755 --- a/transload +++ b/transload @@ -37,6 +37,20 @@ def put_station_metadata(source, station_id, cache, destination) end end +def put_station_observations(source, station_id, cache, destination, date) + case source + when "environment_canada" + provider = Transloader::EnvironmentCanadaProvider.new(cache) + station = provider.get_station(station_id) + station.get_metadata + station.save_metadata + station.put_metadata(destination) + station.put_observations(destination, date) + else + puts "Support for source not implemented" + end +end + # Parse Args args = Transloader::Args.new(ARGV) @@ -55,6 +69,6 @@ when :put when :metadata put_station_metadata(args.source, args.station, args.cache, args.destination) when :observations - puts "Not yet implemented" + put_station_observations(args.source, args.station, args.cache, args.destination, args.date) end end