diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb new file mode 100644 index 00000000..de868f9d --- /dev/null +++ b/app/jobs/retry_mqtt_message_job.rb @@ -0,0 +1,10 @@ +class RetryMQTTMessageJob < ApplicationJob + queue_as :default + + def perform(topic, message) + result = MqttMessagesHandler.handle_topic(topic, message, false) + raise "Message handler returned nil, retrying" if result.nil? + end +end + + diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index 86fd867c..607d32be 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -1,5 +1,5 @@ class MqttMessagesHandler - def self.handle_topic(topic, message) + def self.handle_topic(topic, message, retry_on_nil_device=true) Sentry.set_tags('mqtt-topic': topic) crumb = Sentry::Breadcrumb.new( @@ -19,7 +19,10 @@ def self.handle_topic(topic, message) end device = Device.find_by(device_token: device_token(topic)) - return if device.nil? + if device.nil? + handle_nil_device(topic, message, retry_on_nil_device) + return nil + end if topic.to_s.include?('raw') handle_readings(device, parse_raw_readings(message, device.id)) @@ -42,6 +45,16 @@ def self.handle_topic(topic, message) end end + def self.handle_nil_device(topic, message, retry_on_nil_device) + if topic.to_s.include?("info") + retry_later(topic, message) if retry_on_nil_device + end + end + + def self.retry_later(topic, message) + RetryMQTTMessageJob.perform_later(topic, message) + end + # takes a packet and stores data def self.handle_readings(device, message) data = self.data(message) diff --git a/spec/jobs/retry_mqtt_message_job_spec.rb b/spec/jobs/retry_mqtt_message_job_spec.rb new file mode 100644 index 00000000..2746c164 --- /dev/null +++ b/spec/jobs/retry_mqtt_message_job_spec.rb @@ -0,0 +1,19 @@ +require 'rails_helper' + +RSpec.describe RetryMQTTMessageJob, type: :job do + it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do + topic = "topic/1/2/3" + message = '{"foo": "bar", "test": "message"}' + expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(true) + RetryMQTTMessageJob.perform_now(topic, message) + end + + it "raises an error if the handler returns nil" do + topic = "topic/1/2/3" + message = '{"foo": "bar", "test": "message"}' + expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(nil) + expect { + RetryMQTTMessageJob.perform_now(topic, message) + }.to raise_error + end +end diff --git a/spec/lib/mqtt_messages_handler_spec.rb b/spec/lib/mqtt_messages_handler_spec.rb index a70ae86f..5e2fb7ea 100644 --- a/spec/lib/mqtt_messages_handler_spec.rb +++ b/spec/lib/mqtt_messages_handler_spec.rb @@ -243,12 +243,22 @@ expect(orphan_device.reload.device_handshake).to be true end - it 'does not handle bad topic' do + it 'defers messages with unknown device tokens if retry flag is true' do expect(device.hardware_info["id"]).to eq(47) + expect(RetryMQTTMessageJob).to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) device.reload expect(device.hardware_info["id"]).to eq(47) expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) end + + it 'does not defer messages with unknown device tokens if retry flag is false' do + expect(device.hardware_info["id"]).to eq(47) + expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) + MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload, false) + device.reload + expect(device.hardware_info["id"]).to eq(47) + expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) + end end end