Skip to content

Commit

Permalink
some error handling, less frequent checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
codekitchen committed Mar 3, 2015
1 parent 077e91c commit 59e2600
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
40 changes: 23 additions & 17 deletions lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# The kinesis stream name.
config :kinesis_stream_name, :validate => :string, :required => true

# How many seconds between worker checkpoints to dynamodb.
config :checkpoint_interval_seconds, :validate => :number, :default => 60

def register
# the INFO log level is extremely noisy in KCL
org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis").
Expand All @@ -35,9 +38,8 @@ def register
end

def run(output_queue)
worker_factory = WorkerFactory.new(@codec, output_queue, method(:decorate))
@worker = KCL::Worker.new(
worker_factory,
proc { Worker.new(@codec, output_queue, method(:decorate), @checkpoint_interval_seconds) },
@config,
com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory.new)
@worker.run()
Expand All @@ -47,26 +49,14 @@ def teardown
@worker.shutdown if @worker
end

class WorkerFactory
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessorFactory
def initialize(codec, output_queue, decorator)
@codec = codec
@output_queue = output_queue
@decorator = decorator
end

def createProcessor
Worker.new(@codec.clone, @output_queue, @decorator)
end
end

class Worker
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor

def initialize(*args)
# nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
if !@constructed
@codec, @output_queue, @decorator = args
@codec, @output_queue, @decorator, @checkpoint_interval = args
@next_checkpoint = Time.now - 600
@constructed = true
else
_shard_id, _ = args
Expand All @@ -76,10 +66,24 @@ def initialize(*args)

def processRecords(records, checkpointer)
records.each { |record| process_record(record) }
checkpointer.checkpoint()
if Time.now >= @next_checkpoint
checkpoint(checkpointer)
@next_checkpoint = Time.now + @checkpoint_interval
end
end

def shutdown(checkpointer, reason)
if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
checkpoint(checkpointer)
end
end

protected

def checkpoint(checkpointer)
checkpointer.checkpoint()
rescue => error
@logger.error("Kinesis worker failed checkpointing: #{error}")
end

def process_record(record)
Expand All @@ -88,6 +92,8 @@ def process_record(record)
@decorator.call(event)
@output_queue << event
end
rescue => error
@logger.error("Error processing record: #{error}")
end
end
end
2 changes: 1 addition & 1 deletion lib/logstash/inputs/kinesis/version.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Logstash
module Input
module Kinesis
VERSION = "1.0.0"
VERSION = "1.1.0"
end
end
end

0 comments on commit 59e2600

Please sign in to comment.