Skip to content

Commit

Permalink
Add Observation upload
Browse files Browse the repository at this point in the history
Adds Observation entity class.
Adds version module function.
  • Loading branch information
openfirmware committed May 10, 2018
1 parent 558ecc7 commit 55f205d
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 4 deletions.
8 changes: 5 additions & 3 deletions lib/transloader.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
require 'transloader/args'
require 'transloader/datastream'
require 'transloader/entity'
require 'transloader/environment_canada_station'
require 'transloader/environment_canada_provider'
require 'transloader/datastream'
require 'transloader/location'
require 'transloader/observation'
require 'transloader/observed_property'
require 'transloader/sensor'
require 'transloader/thing'
require 'transloader/environment_canada_station'
require 'transloader/environment_canada_provider'
require 'transloader/version'

module Transloader
end
86 changes: 86 additions & 0 deletions lib/transloader/environment_canada_station.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,92 @@ def put_metadata(server_url)
save_metadata
end

# Upload station observations for `date` to the SensorThings API server at
# `destination`. If `date` is "latest", then the most recent SWOB-ML file
# is used.
def put_observations(destination, date)
puts "Uploading observations for #{date} to #{destination}"

# Check for metadata
if @metadata.empty?
raise "Error: station metadata not loaded"
exit 3
end

# Check for cached datastream URLs
@metadata['datastreams'].each do |stream|
if stream['[email protected]'].nil?
raise "Error: Datastream navigation URLs not cached"
exit 3
end
end

# Check for cached observations at date
if !Dir.exist?(@observations_path)
raise "Error: observation cache directory does not exist"
exit 3
end

if date == "latest"
begin
year_dir = Dir.entries(@observations_path).last
month_dir = Dir.entries(File.join(@observations_path, year_dir)).last
day_dir = Dir.entries(File.join(@observations_path, year_dir, month_dir)).last
filename = Dir.entries(File.join(@observations_path, year_dir, month_dir, day_dir)).last
rescue
puts "Error: Could not locate latest observation cache file"
exit 3
end

file_path = File.join(@observations_path, year_dir, month_dir, day_dir, filename)
else
locate_date = DateTime.parse(date)
file_path = File.join(@observations_path, locate_date.strftime('%Y/%m/%d/%H%M%S%z.xml'))

if !File.exist?(file_path)
raise "Error: Could not locate desired observation cache file: #{file_path}"
exit 3
end
end

puts "Uploading observations from #{file_path}"

xml = observation_xml
@metadata['datastreams'].each do |datastream|
datastream_url = datastream['[email protected]']
datastream_name = datastream['name']


if xml.xpath("//om:result/po:elements/po:element[@name='#{datastream_name}']", NAMESPACES).empty?
# The result is not in this SWOB-ML document, perhaps not reported
# during this reporting interval. In that case, no Observation is
# created.
else
# OBSERVATION entity
# Create Observation entity
# TODO: Coerce result type based on datastream observation type

result = xml.xpath("//om:result/po:elements/po:element[@name='#{datastream_name}']/@value", NAMESPACES).text

# SensorThings API does not like an empty string, instead "null" string
# should be used.
if result == ""
puts "Found null for #{datastream_name}"
result = "null"
end

observation = Observation.new({
phenomenonTime: xml.xpath('//om:samplingTime/gml:TimeInstant/gml:timePosition', NAMESPACES).text,
result: result,
resultTime: xml.xpath('//om:resultTime/gml:TimeInstant/gml:timePosition', NAMESPACES).text
})

# Upload entity and parse response
observation.upload_to(datastream_url)
end
end
end

# Save the Station metadata to the metadata cache file
def save_metadata
IO.write(@metadata_path, JSON.pretty_generate(@metadata))
Expand Down
64 changes: 64 additions & 0 deletions lib/transloader/observation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
require 'json'
require 'uri'

require 'transloader/entity'

module Transloader
# Observation entity class.
class Observation < Entity

attr_accessor :phenomenon_time, :result, :result_time

def initialize(attributes)
super(attributes)
@phenomenon_time = attributes[:phenomenonTime]
@result = attributes[:result]
@result_time = attributes[:resultTime]
end

def to_json
JSON.generate({
phenomenonTime: @phenomenon_time,
result: @result,
resultTime: @result_time
})
end

# Check if self is a subset of entity.
# Cycling through JSON makes the keys the same order and all stringified.
def same_as?(entity)
JSON.parse(self.to_json) == JSON.parse(JSON.generate({
phenomenonTime: entity['phenomenonTime'],
result: entity['result'],
resultTime: entity['resultTime']
}))
end

def upload_to(url)
upload_url = self.join_uris(url, "Observations")

filter = "phenomenonTime eq '#{@phenomenon_time}'"
response = self.get(URI(upload_url + "?$filter=#{filter}"))
body = JSON.parse(response.body)

# Look for matching existing entities. If no entities match, use POST to
# create a new entity. If one or more entities match, then the first is
# re-used. If the matching entity has the same phenomenonTime but
# different other attributes, then a PATCH request is used to
# synchronize.
if body["value"].length == 0
self.post_to_path(upload_url)
else
existing_entity = body["value"].first
@link = existing_entity['@iot.selfLink']
@id = existing_entity['@iot.id']

if same_as?(existing_entity)
puts "Re-using existing Observation entity."
else
self.patch_to_path(URI(@link))
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/transloader/version.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module Transloader
def self.version
'0.1.0'
end
end
16 changes: 15 additions & 1 deletion transload
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ def put_station_metadata(source, station_id, cache, destination)
end
end

def put_station_observations(source, station_id, cache, destination, date)
case source
when "environment_canada"
provider = Transloader::EnvironmentCanadaProvider.new(cache)
station = provider.get_station(station_id)
station.get_metadata
station.save_metadata
station.put_metadata(destination)
station.put_observations(destination, date)
else
puts "Support for source not implemented"
end
end

# Parse Args
args = Transloader::Args.new(ARGV)

Expand All @@ -55,6 +69,6 @@ when :put
when :metadata
put_station_metadata(args.source, args.station, args.cache, args.destination)
when :observations
puts "Not yet implemented"
put_station_observations(args.source, args.station, args.cache, args.destination, args.date)
end
end

0 comments on commit 55f205d

Please sign in to comment.