Skip to content

Commit

Permalink
implementation changes to use the v2 KCL API
Browse files Browse the repository at this point in the history
  • Loading branch information
sgarciamartinez committed Sep 28, 2015
1 parent e79350a commit 6243808
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
22 changes: 10 additions & 12 deletions lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base

attr_reader(
:kcl_config,
:kcl_builder,
:kcl_worker,
)

Expand All @@ -49,8 +50,8 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# to enable the cloudwatch integration in the Kinesis Client Library.
config :metrics, :validate => [nil, "cloudwatch"], :default => nil

def initialize(params = {}, kcl_class = KCL::Worker)
@kcl_class = kcl_class
def initialize(params = {}, kcl_builder = KCL::Worker::Builder.new)
@kcl_builder = kcl_builder
super(params)
end

Expand All @@ -60,7 +61,7 @@ def register
logger.setLevel(java.util.logging::Level::WARNING)

worker_id = java.util::UUID.randomUUID.to_s
creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new()
creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new
@kcl_config = KCL::KinesisClientLibConfiguration.new(
@application_name,
@kinesis_stream_name,
Expand All @@ -72,18 +73,15 @@ def register

def run(output_queue)
worker_factory = proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) }
@kcl_builder.recordProcessorFactory(worker_factory)
@kcl_builder.config(@kcl_config)

if metrics_factory
@kcl_worker = @kcl_class.new(
worker_factory,
@kcl_config,
metrics_factory)
else
@kcl_worker = @kcl_class.new(
worker_factory,
@kcl_config)
@kcl_builder.metricsFactory(metrics_factory)
end

@kcl_worker.run()
@kcl_worker = @kcl_builder.build
@kcl_worker.run
end

def teardown
Expand Down
14 changes: 7 additions & 7 deletions lib/logstash/inputs/kinesis/worker.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class LogStash::Inputs::Kinesis::Worker
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor
include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor

attr_reader(
:checkpoint_interval,
Expand All @@ -16,22 +16,22 @@ def initialize(*args)
@next_checkpoint = Time.now - 600
@constructed = true
else
_shard_id, _ = args
_shard_id = args[0].shardId
@decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder()
end
end
public :initialize

def processRecords(records, checkpointer)
records.each { |record| process_record(record) }
def processRecords(records_input)
records_input.records.each { |record| process_record(record) }
if Time.now >= @next_checkpoint
checkpoint(checkpointer)
checkpoint(records_input.checkpointer)
@next_checkpoint = Time.now + @checkpoint_interval
end
end

def shutdown(checkpointer, reason)
if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
def shutdown(shutdown_input)
if shutdown_input.reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
checkpoint(checkpointer)
end
end
Expand Down

0 comments on commit 6243808

Please sign in to comment.