Skip to content

Commit

Permalink
builder improvements
Browse files Browse the repository at this point in the history
Expands the test coverage and uses the real aws builder class in the tests. In a
way this is trading one evil for another, since we're now using java reflection
to access private variables on the builder, but it captures and tests the intent
of the code better than it did before.
  • Loading branch information
codekitchen committed Sep 28, 2015
1 parent f9f8fee commit 78fcde5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 66 deletions.
36 changes: 17 additions & 19 deletions lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
# The library can optionally also send worker statistics to CloudWatch.
class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
KCL = com.amazonaws.services.kinesis.clientlibrary.lib.worker
KCL_PROCESSOR_FACTORY_CLASS = "com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory"
KCL_PROCESSOR_FACTORY_CLASS = com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
require "logstash/inputs/kinesis/worker"

config_name 'kinesis'
milestone 1

attr_reader(
:kcl_config,
:kcl_builder,
:kcl_worker,
)

Expand All @@ -51,15 +50,7 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# to enable the cloudwatch integration in the Kinesis Client Library.
config :metrics, :validate => [nil, "cloudwatch"], :default => nil

#what a nasty hack to use the overloaded method :_(
java_import KCL::Worker::Builder
class Builder
#I don't know how to make it work using the class directly
java_alias :v2RecordProcessorFactory, :recordProcessorFactory, [Java::JavaClass.for_name(KCL_PROCESSOR_FACTORY_CLASS)]
end

def initialize(params = {}, kcl_builder = Builder.new)
@kcl_builder = kcl_builder
def initialize(params = {})
super(params)
end

Expand All @@ -80,22 +71,29 @@ def register
end

def run(output_queue)
worker_factory = proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) }
@kcl_builder.v2RecordProcessorFactory(worker_factory)
@kcl_builder.config(@kcl_config)
@kcl_worker = kcl_builder(output_queue).build
@kcl_worker.run
end

if metrics_factory
@kcl_builder.metricsFactory(metrics_factory)
end
def kcl_builder(output_queue)
KCL::Worker::Builder.new.tap do |builder|
builder.java_send(:recordProcessorFactory, [KCL_PROCESSOR_FACTORY_CLASS.java_class], worker_factory(output_queue))
builder.config(@kcl_config)

@kcl_worker = @kcl_builder.build
@kcl_worker.run
if metrics_factory
builder.metricsFactory(metrics_factory)
end
end
end

def teardown
@kcl_worker.shutdown if @kcl_worker
end

def worker_factory(output_queue)
proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) }
end

protected

def metrics_factory
Expand Down
84 changes: 37 additions & 47 deletions spec/inputs/kinesis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,6 @@
RSpec.describe "inputs/kinesis" do
KCL = com.amazonaws.services.kinesis.clientlibrary.lib.worker

# Since kinesis.rb is using now a builder, subclassing the kinesis Worker is no longer needed.
# This test only need to check if run method is called on the instance returned
class TestKCLBuilder < KCL::Worker::Builder
attr_reader :kcl_worker_factory, :kcl_metrics_factory

def initialize(worker_double)
super()
@worker = worker_double
end

# mimics the actual interface
def metricsFactory(factory)
@kcl_metrics_factory = factory
end

# mimics aliased method interface
def v2RecordProcessorFactory(factory)
@kcl_worker_factory = factory
end

def build
return @worker
end
end

let(:config) {{
"application_name" => "my-processor",
"kinesis_stream_name" => "run-specs",
Expand All @@ -40,11 +15,12 @@ def build
"region" => "ap-southeast-1",
}}

subject!(:kinesis) { LogStash::Inputs::Kinesis.new(config, kcl_builder) }
let(:kcl_worker) { double('kcl_worker', run: nil) }
let(:kcl_builder) { TestKCLBuilder.new(kcl_worker) }
subject!(:kinesis) { LogStash::Inputs::Kinesis.new(config) }
let(:kcl_worker) { double('kcl_worker') }
let(:stub_builder) { double('kcl_builder', build: kcl_worker) }
let(:metrics) { nil }
let(:codec) { LogStash::Codecs::JSON.new() }
let(:queue) { Queue.new }

it "registers without error" do
input = LogStash::Plugin.lookup("input", "kinesis").new("kinesis_stream_name" => "specs", "codec" => codec)
Expand All @@ -60,49 +36,63 @@ def build
end

context "#run" do
let(:queue) { Queue.new }

before do
kinesis.register
end

it "runs the KCL worker" do
expect(kinesis).to receive(:kcl_builder).with(queue).and_return(stub_builder)
expect(kcl_worker).to receive(:run).with(no_args)
kinesis.run(queue)
builder = kinesis.run(queue)
end
end

context "#worker_factory" do
it "clones the codec for each worker" do
expect(codec).to receive(:clone).once
kinesis.run(queue)

worker = kinesis.kcl_builder.kcl_worker_factory.call()
worker = kinesis.worker_factory(queue).call()
expect(worker).to be_kind_of(LogStash::Inputs::Kinesis::Worker)
end

it "generates a valid worker via the factory proc" do
kinesis.run(queue)
worker = kinesis.kcl_builder.kcl_worker_factory.call()

it "generates a valid worker" do
worker = kinesis.worker_factory(queue).call()

expect(worker.codec).to be_kind_of(codec.class)
expect(worker.checkpoint_interval).to eq(120)
expect(worker.output_queue).to eq(queue)
expect(worker.decorator).to eq(kinesis.method(:decorate))
expect(worker.logger).to eq(kinesis.logger)
end
end

# these tests are heavily dependent on the current Worker::Builder
# implementation because its state is all private
context "#kcl_builder" do
let(:builder) { kinesis.kcl_builder(queue) }

it "sets the worker factory" do
expect(field(builder, "recordProcessorFactory")).to_not eq(nil)
end

it "sets the config" do
kinesis.register
config = field(builder, "config")
expect(config).to eq(kinesis.kcl_config)
end

it "disables metric tracking by default" do
kinesis.run(queue)
expect(kinesis.kcl_builder.kcl_metrics_factory).to be_kind_of(com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory)
expect(field(builder, "metricsFactory")).to be_kind_of(com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory)
end

context "cloudwatch" do
let(:metrics) { "cloudwatch" }
it "uses cloudwatch metrics if specified" do
kinesis.run(queue)
# since the behaviour is enclosed on private methods it is not testable. So here
# since the behaviour is enclosed on private methods it is not testable. So here
# the expected value can be tested, not the result associated to set this value
expect(kinesis.kcl_builder.kcl_metrics_factory).to eq(nil)
expect(field(builder, "metricsFactory")).to eq(nil)
end
end
end

def field(obj, name)
field = obj.java_class.declared_field(name)
field.accessible = true
field.value(obj)
end
end

0 comments on commit 78fcde5

Please sign in to comment.