diff --git a/lib/logstash/inputs/kinesis.rb b/lib/logstash/inputs/kinesis.rb index 9824658..9ba4422 100644 --- a/lib/logstash/inputs/kinesis.rb +++ b/lib/logstash/inputs/kinesis.rb @@ -20,6 +20,9 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base # The kinesis stream name. config :kinesis_stream_name, :validate => :string, :required => true + # How many seconds between worker checkpoints to dynamodb. + config :checkpoint_interval_seconds, :validate => :number, :default => 60 + def register # the INFO log level is extremely noisy in KCL org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis"). @@ -35,9 +38,8 @@ def register end def run(output_queue) - worker_factory = WorkerFactory.new(@codec, output_queue, method(:decorate)) @worker = KCL::Worker.new( - worker_factory, + proc { Worker.new(@codec, output_queue, method(:decorate), @checkpoint_interval_seconds) }, @config, com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory.new) @worker.run() @@ -47,26 +49,14 @@ def teardown @worker.shutdown if @worker end - class WorkerFactory - include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessorFactory - def initialize(codec, output_queue, decorator) - @codec = codec - @output_queue = output_queue - @decorator = decorator - end - - def createProcessor - Worker.new(@codec.clone, @output_queue, @decorator) - end - end - class Worker include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor def initialize(*args) # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor if !@constructed - @codec, @output_queue, @decorator = args + @codec, @output_queue, @decorator, @checkpoint_interval = args + @next_checkpoint = Time.now - 600 @constructed = true else _shard_id, _ = args @@ -76,10 +66,24 @@ def initialize(*args) def processRecords(records, checkpointer) records.each { |record| process_record(record) } - checkpointer.checkpoint() + if Time.now >= @next_checkpoint + checkpoint(checkpointer) + @next_checkpoint = Time.now + @checkpoint_interval + end end def shutdown(checkpointer, reason) + if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE + checkpoint(checkpointer) + end + end + + protected + + def checkpoint(checkpointer) + checkpointer.checkpoint() + rescue => error + @logger.error("Kinesis worker failed checkpointing: #{error}") end def process_record(record) @@ -88,6 +92,8 @@ def process_record(record) @decorator.call(event) @output_queue << event end + rescue => error + @logger.error("Error processing record: #{error}") end end end diff --git a/lib/logstash/inputs/kinesis/version.rb b/lib/logstash/inputs/kinesis/version.rb index 9d237cc..bcf9c33 100644 --- a/lib/logstash/inputs/kinesis/version.rb +++ b/lib/logstash/inputs/kinesis/version.rb @@ -1,7 +1,7 @@ module Logstash module Input module Kinesis - VERSION = "1.0.0" + VERSION = "1.1.0" end end end