diff --git a/.travis.yml b/.travis.yml index ce6e007..0592b1e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,8 +6,10 @@ jdk: oraclejdk8 rvm: - jruby-1.7.25 env: - - KAFKA_VERSION=0.10.0.1 -before_install: ./kafka_test_setup.sh + - KAFKA_VERSION=0.10.2.2 +before_install: + - gem install bundler -v '< 2' + - ./kafka_test_setup.sh before_script: - bundle exec rake vendor script: bundle exec rspec && bundle exec rspec --tag integration diff --git a/CHANGELOG.md b/CHANGELOG.md index c23a609..3fcb468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 5.1.12 + - Backport of fixes from more recent branches: + - Fixed incorrect millisecond to second conversion for retry_backoff_ms [#216](https://github.com/logstash-plugins/logstash-output-kafka/pull/216) + - Fixed unnecessary sleep after exhausted retries [#166](https://github.com/logstash-plugins/logstash-output-kafka/pull/166) + - Changed Kafka send errors to log as warn [#179](https://github.com/logstash-plugins/logstash-output-kafka/pull/179) + ## 5.1.11 - Bugfix: Sends are now retried until successful. Previously, failed transmissions to Kafka could have been lost by the KafkaProducer library. Now we verify transmission explicitly. diff --git a/kafka_test_setup.sh b/kafka_test_setup.sh index 771fb17..b9abdfa 100755 --- a/kafka_test_setup.sh +++ b/kafka_test_setup.sh @@ -5,7 +5,7 @@ set -ex if [ -n "${KAFKA_VERSION+1}" ]; then echo "KAFKA_VERSION is $KAFKA_VERSION" else - KAFKA_VERSION=0.10.1.0 + KAFKA_VERSION=0.10.2.2 fi echo "Downloading Kafka version $KAFKA_VERSION" diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index e4d29f7..02dd9a8 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -234,7 +234,7 @@ def multi_receive(events) end def retrying_send(batch) - remaining = @retries; + remaining = @retries while batch.any? if !remaining.nil? @@ -275,7 +275,7 @@ def retrying_send(batch) result = future.get() rescue => e # TODO(sissel): Add metric to count failures, possibly by exception type. - logger.debug? && logger.debug("KafkaProducer.send() failed: #{e}", :exception => e); + logger.warn("KafkaProducer.send() failed: #{e}", :exception => e) failures << batch[i] end end @@ -284,13 +284,15 @@ def retrying_send(batch) break if failures.empty? # Otherwise, retry with any failed transmissions - batch = failures - delay = 1.0 / @retry_backoff_ms - logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, - :failures => failures.size, :sleep => delay); - sleep(delay) + if remaining.nil? || remaining >= 0 + delay = @retry_backoff_ms / 1000.0 + logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, + :failures => failures.size, + :sleep => delay) + batch = failures + sleep(delay) + end end - end def close diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb index 7041a45..2d99a05 100644 --- a/spec/unit/outputs/kafka_spec.rb +++ b/spec/unit/outputs/kafka_spec.rb @@ -49,7 +49,7 @@ kafka.register kafka.multi_receive([event]) end - + it 'should raise config error when truststore location is not set and ssl is enabled' do kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL")) expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/) @@ -120,6 +120,41 @@ end end + context 'when retries is 0' do + let(:retries) { 0 } + let(:max_sends) { 1 } + + it "should should only send once" do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + kafka.register + kafka.multi_receive([event]) + end + + it 'should not sleep' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).not_to receive(:sleep).with(anything) + kafka.register + kafka.multi_receive([event]) + end + end + context "and when retries is set by the user" do let(:retries) { (rand * 10).to_i } let(:max_sends) { retries + 1 } @@ -137,6 +172,21 @@ kafka.register kafka.multi_receive([event]) end + + it 'should only sleep retries number of times' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .at_most(max_sends) + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).to receive(:sleep).exactly(retries).times + kafka.register + kafka.multi_receive([event]) + end end end end