Skip to content

Commit

Permalink
Merge pull request #388 from roomorama/release/0.11.4
Browse files Browse the repository at this point in the history
Release/0.11.4
  • Loading branch information
keang authored Sep 26, 2016
2 parents b5653ce + a8f26b8 commit 5a919ec
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 124 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ This file summarises the most important changes that went live on each release
of Concierge. Please check the Wiki entry on the release process to understand
how this file is formatted and how the process works.

## [0.11.4] - 2016-09-26
### Removed
- Requirement for workers to finish in 12h no longer exists

## [0.11.3] - 2016-09-23
### Fixed
- Kigo: set `minimum_stay` to `nil` instead of 1 for calendar entry when coming NUMBER is zero
Expand Down
61 changes: 16 additions & 45 deletions apps/workers/processor/background_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,23 @@ def initialize(data)
# parameter given on initialization), and invokes the +<worker_type>.<supplier_name>+
# event on +Concierge::Announcer+.
def run
worker_lookup = fetch_worker(data[:background_worker_id])
return worker_lookup if not_found?(worker_lookup)
worker = fetch_worker(data[:background_worker_id])

worker = worker_lookup.value
return Result.new(true) if worker.running?
return unless worker
return if worker.running?

runner = runner_for(worker)
supplier = runner.supplier
broadcast = [worker.type, ".", supplier.name].join

running(broadcast, worker) do
timing_out(worker.type, data) do
# gotcha: +Concierge::Announcer#trigger+ returns an array of the results of each
# listener for the given event. Here, we take the first of them assuming
# it will be the supplier implementation. That works since there has been no
# need for a supplier implementation to provide more than one listener for the
# same event. That is a good practice that allows arguments to be passed
# from one run to the next.
Concierge::Announcer.trigger(broadcast, *runner.args).first
end
# gotcha: +Concierge::Announcer#trigger+ returns an array of the results of each
# listener for the given event. Here, we take the first of them assuming
# it will be the supplier implementation. That works since there has been no
# need for a supplier implementation to provide more than one listener for the
# same event. That is a good practice that allows arguments to be passed
# from one run to the next.
Concierge::Announcer.trigger(broadcast, *runner.args).first
end
end

Expand All @@ -152,18 +149,6 @@ def runner_for(worker)
end
end

# timing out the operation to be processed by the queue makes sure that
# jobs take too long are not taken by a different worker, possibly duplicating
# work and causing conflicts.
def timing_out(operation, params)
Timeout.timeout(processing_timeout) { yield }
rescue Timeout::Error
error = TimeoutError.new(operation, params)
Rollbar.error(error)

Result.error(:timeout)
end

# coordinates the +BackgroundWorker+ instance status and timestamps by changing
# the worker status to +running+, yielding the block (which is supposed to do
# the worker's job), and ensuring that the worker's status is set back to +idle+
Expand Down Expand Up @@ -222,29 +207,15 @@ def ensure_valid_result!(event, instance)
end

# tries to fetch the worker with the given +id+ from the database. If there
# is none, notfies the occurrence to Rollbar, and returns a +Result+ wrapping
# +Workers::Processor::BackgroundWorker::WORKER_NOT_FOUND+.
# is none, notfies the occurrence to Rollbar.
def fetch_worker(id)
worker = BackgroundWorkerRepository.find(id)

if worker
Result.new(worker)
else
error = UnknownWorkerError.new(id)
Rollbar.warning(error)

Result.new(WORKER_NOT_FOUND)
BackgroundWorkerRepository.find(id).tap do |worker|
unless worker
error = UnknownWorkerError.new(id)
Rollbar.warning(error)
end
end
end

def not_found?(result)
result.value == WORKER_NOT_FOUND
end

# NOTE this time out should be shorter than the +VisibilityTimeout+ configured
# on the SQS queue to be used by Concierge.
def processing_timeout
@two_hour ||= 60 * 60 * 12
end
end
end
34 changes: 5 additions & 29 deletions apps/workers/queue/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,6 @@ class Workers::Queue
# over +Aws::SQS::QueuePoller+
class Poller

# +Workers::Queue::Poller::InvalidQueueProcessingResultError+
#
# This is raised by +Workers::Queue::Poller+ when doing a +poll+ and the block
# invoked to process an incoming result does not return a valid instance
# of +Result+, as it should.
class InvalidQueueProcessingResultError < StandardError
def initialize(object)
super("Queue processing must return a Result instance, returned instead #{object.class}")
end
end

attr_reader :queue_url, :client

# queue_url - a String containing the URL to the queue to be monitored
Expand All @@ -37,18 +26,14 @@ def before
# wraps the logic behind polling an SQS queue by receiving each message
# and yielding it back to the caller.
#
# The invoked block *must* return a +Result+ instance, indicating whether
# or not the message processing was successful. In case it was, the message
# is deleted from the queue; otherwise, it remains in the queue for later
# processing.
# The message is deleted from the queue as soon as it is received. Implementations
# of message processing workers must therefore ensure that failed messages are
# re-enqueued for later processing.
def poll
poller.poll(skip_delete: true) do |message|
result = yield(message)
ensure_result!(result)
poller.delete_message(message)

if result.success?
poller.delete_message(message)
end
yield(message)
end
end

Expand All @@ -58,14 +43,5 @@ def poller
@poller ||= Aws::SQS::QueuePoller.new(queue_url, client: client)
end

# ensures that the given +object+ is a valid instance of +Result+. Necessary to
# catch early on errors where the message processing block does not return
# a valid +Result+ instance.
def ensure_result!(object)
unless object.is_a?(Result)
raise InvalidQueueProcessingResultError.new(object)
end
end

end
end
2 changes: 1 addition & 1 deletion lib/concierge/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Concierge
VERSION = "0.11.3"
VERSION = "0.11.4"
end
34 changes: 17 additions & 17 deletions spec/lib/concierge/flows/external_error_creation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,28 @@
subject.perform
}.not_to change { ExternalErrorRepository.count }
end
end

it "is a no-op if the operation is not allowed" do
parameters[:operation] = "invalid_operation"
it "is a no-op if the operation is not allowed" do
parameters[:operation] = "invalid_operation"

expect {
subject.perform
}.not_to change { ExternalErrorRepository.count }
end
expect {
subject.perform
}.not_to change { ExternalErrorRepository.count }
end

it "saves an external error to the database in case all parameters are valid" do
expect {
subject.perform
}.to change { ExternalErrorRepository.count }.by(1)
end
it "saves an external error to the database in case all parameters are valid" do
expect {
subject.perform
}.to change { ExternalErrorRepository.count }.by(1)
end

it "does not fail with a hard error in case of a database failure" do
allow(ExternalErrorRepository).to receive(:create) { raise Hanami::Model::UniqueConstraintViolationError }
it "does not fail with a hard error in case of a database failure" do
allow(ExternalErrorRepository).to receive(:create) { raise Hanami::Model::UniqueConstraintViolationError }

expect {
subject.perform
}.not_to raise_error
end
expect {
subject.perform
}.not_to raise_error
end
end
end
22 changes: 2 additions & 20 deletions spec/workers/processor/background_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@
invoked += 1
Result.new({})
}) do
result = subject.run
expect(result).to be_a Result
expect(result).to be_success
subject.run
expect(invoked).to eq 0
end
end
Expand All @@ -101,18 +99,6 @@
expect(reloaded_worker.status).to eq "idle"
end
end

it "times out and fails if the operation takes too long" do
# simulates a timeout of 0.5s and a synchronisation process that takes
# one second, thus timing out.
allow(subject).to receive(:processing_timeout) { 0.5 }
listening_to("metadata.SupplierTest", block: ->(*) { sleep 1 }) do
result = subject.run
expect(result).to be_a Result
expect(result).not_to be_success
expect(result.error.code).to eq :timeout
end
end
end

context "background workers associated with suppliers" do
Expand Down Expand Up @@ -152,11 +138,7 @@
error = nil
expect(Rollbar).to receive(:warning) { |err| error = err }

result = subject.run
expect(result).to be_a Result
expect(result).to be_success
expect(result.value).to eq :invalid_worker_id

subject.run
expect(error).to be_a Workers::Processor::BackgroundWorker::UnknownWorkerError
end

Expand Down
16 changes: 4 additions & 12 deletions spec/workers/queue/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,19 @@ def delete_message(*)
allow(subject).to receive(:poller) { poller }
end

it "ensures that the result from the message processing block is a valid Result instance" do
expect {
subject.poll do
42
end
}.to raise_error Workers::Queue::Poller::InvalidQueueProcessingResultError
end

it "does not delete the message if the message processing indicates failure" do
expect(poller).not_to receive(:delete_message).with(message)
it "deletes the message immediately after the worker receives it" do
expect(poller).to receive(:delete_message).with(message)

subject.poll do
Result.error(:something_went_wrong)
42
end
end

it "deletes the message if the message processing indicates success" do
expect(poller).to receive(:delete_message).with(message)

subject.poll do
Result.new(42)
42
end
end
end
Expand Down

0 comments on commit 5a919ec

Please sign in to comment.