diff --git a/lib/logstash/inputs/kinesis.rb b/lib/logstash/inputs/kinesis.rb index 2596b5b..c0dbd10 100644 --- a/lib/logstash/inputs/kinesis.rb +++ b/lib/logstash/inputs/kinesis.rb @@ -29,6 +29,7 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base attr_reader( :kcl_config, + :kcl_builder, :kcl_worker, ) @@ -49,8 +50,8 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base # to enable the cloudwatch integration in the Kinesis Client Library. config :metrics, :validate => [nil, "cloudwatch"], :default => nil - def initialize(params = {}, kcl_class = KCL::Worker) - @kcl_class = kcl_class + def initialize(params = {}, kcl_builder = KCL::Worker::Builder.new) + @kcl_builder = kcl_builder super(params) end @@ -60,7 +61,7 @@ def register logger.setLevel(java.util.logging::Level::WARNING) worker_id = java.util::UUID.randomUUID.to_s - creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new() + creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new @kcl_config = KCL::KinesisClientLibConfiguration.new( @application_name, @kinesis_stream_name, @@ -72,18 +73,15 @@ def register def run(output_queue) worker_factory = proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) } + @kcl_builder.recordProcessorFactory(worker_factory) + @kcl_builder.config(@kcl_config) + if metrics_factory - @kcl_worker = @kcl_class.new( - worker_factory, - @kcl_config, - metrics_factory) - else - @kcl_worker = @kcl_class.new( - worker_factory, - @kcl_config) + @kcl_builder.metricsFactory(metrics_factory) end - @kcl_worker.run() + @kcl_worker = @kcl_builder.build + @kcl_worker.run end def teardown diff --git a/lib/logstash/inputs/kinesis/worker.rb b/lib/logstash/inputs/kinesis/worker.rb index a10baac..266fe80 100644 --- a/lib/logstash/inputs/kinesis/worker.rb +++ b/lib/logstash/inputs/kinesis/worker.rb @@ -1,5 +1,5 @@ class LogStash::Inputs::Kinesis::Worker - include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor + include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor attr_reader( :checkpoint_interval, @@ -16,22 +16,22 @@ def initialize(*args) @next_checkpoint = Time.now - 600 @constructed = true else - _shard_id, _ = args + _shard_id = args[0].shardId @decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder() end end public :initialize - def processRecords(records, checkpointer) - records.each { |record| process_record(record) } + def processRecords(records_input) + records_input.records.each { |record| process_record(record) } if Time.now >= @next_checkpoint - checkpoint(checkpointer) + checkpoint(records_input.checkpointer) @next_checkpoint = Time.now + @checkpoint_interval end end - def shutdown(checkpointer, reason) - if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE + def shutdown(shutdown_input) + if shutdown_input.reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE checkpoint(checkpointer) end end