Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to 5.x of fixes from more recent branches #217

Open
wants to merge 5 commits into
base: 5.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ jdk: oraclejdk8
rvm:
- jruby-1.7.25
env:
- KAFKA_VERSION=0.10.0.1
before_install: ./kafka_test_setup.sh
- KAFKA_VERSION=0.10.2.2
before_install:
- gem install bundler -v '< 2'
- ./kafka_test_setup.sh
before_script:
- bundle exec rake vendor
script: bundle exec rspec && bundle exec rspec --tag integration
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 5.1.12
- Backport of fixes from more recent branches:
- Fixed incorrect millisecond to second conversion for retry_backoff_ms [#216](https://github.com/logstash-plugins/logstash-output-kafka/pull/216)
- Fixed unnecessary sleep after exhausted retries [#166](https://github.com/logstash-plugins/logstash-output-kafka/pull/166)
- Changed Kafka send errors to log as warn [#179](https://github.com/logstash-plugins/logstash-output-kafka/pull/179)

## 5.1.11
- Bugfix: Sends are now retried until successful. Previously, failed transmissions to Kafka
could have been lost by the KafkaProducer library. Now we verify transmission explicitly.
Expand Down
2 changes: 1 addition & 1 deletion kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -ex
if [ -n "${KAFKA_VERSION+1}" ]; then
echo "KAFKA_VERSION is $KAFKA_VERSION"
else
KAFKA_VERSION=0.10.1.0
KAFKA_VERSION=0.10.2.2
fi

echo "Downloading Kafka version $KAFKA_VERSION"
Expand Down
18 changes: 10 additions & 8 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def multi_receive(events)
end

def retrying_send(batch)
remaining = @retries;
remaining = @retries

while batch.any?
if !remaining.nil?
Expand Down Expand Up @@ -275,7 +275,7 @@ def retrying_send(batch)
result = future.get()
rescue => e
# TODO(sissel): Add metric to count failures, possibly by exception type.
logger.debug? && logger.debug("KafkaProducer.send() failed: #{e}", :exception => e);
logger.warn("KafkaProducer.send() failed: #{e}", :exception => e)
failures << batch[i]
end
end
Expand All @@ -284,13 +284,15 @@ def retrying_send(batch)
break if failures.empty?

# Otherwise, retry with any failed transmissions
batch = failures
delay = 1.0 / @retry_backoff_ms
logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
:failures => failures.size, :sleep => delay);
sleep(delay)
if remaining.nil? || remaining >= 0
delay = @retry_backoff_ms / 1000.0
logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
:failures => failures.size,
:sleep => delay)
batch = failures
sleep(delay)
end
end

end

def close
Expand Down
52 changes: 51 additions & 1 deletion spec/unit/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
kafka.register
kafka.multi_receive([event])
end

it 'should raise config error when truststore location is not set and ssl is enabled' do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL"))
expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)
Expand Down Expand Up @@ -120,6 +120,41 @@
end
end

context 'when retries is 0' do
let(:retries) { 0 }
let(:max_sends) { 1 }

it "should should only send once" do
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
.once
.and_wrap_original do |m, *args|
# Always fail.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future.run
future
end
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
kafka.register
kafka.multi_receive([event])
end

it 'should not sleep' do
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
.once
.and_wrap_original do |m, *args|
# Always fail.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future.run
future
end

kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
expect(kafka).not_to receive(:sleep).with(anything)
kafka.register
kafka.multi_receive([event])
end
end

context "and when retries is set by the user" do
let(:retries) { (rand * 10).to_i }
let(:max_sends) { retries + 1 }
Expand All @@ -137,6 +172,21 @@
kafka.register
kafka.multi_receive([event])
end

it 'should only sleep retries number of times' do
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
.at_most(max_sends)
.and_wrap_original do |m, *args|
# Always fail.
future = java.util.concurrent.FutureTask.new { raise "Failed" }
future.run
future
end
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
expect(kafka).to receive(:sleep).exactly(retries).times
kafka.register
kafka.multi_receive([event])
end
end
end
end