Skip to content

Feature/codes #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 30 additions & 37 deletions lib/logstash/outputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base

config_name "redis"

default :codec, 'json'

# Name is used for logging in case there are multiple instances.
# TODO: delete
config :name, :validate => :string, :default => 'default',
Expand Down Expand Up @@ -137,48 +139,39 @@ def register
@host_idx = 0

@congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval }
end # def register

def receive(event)
return unless output?(event)

if @batch and @data_type == 'list' # Don't use batched method for pubsub.
# Stud::Buffer
buffer_receive(event.to_json, event.sprintf(@key))
return
end

key = event.sprintf(@key)
# TODO(sissel): We really should not drop an event, but historically
# we have dropped events that fail to be converted to json.
# TODO(sissel): Find a way to continue passing events through even
# if they fail to convert properly.
begin
payload = event.to_json
rescue Encoding::UndefinedConversionError, ArgumentError
puts "FAILUREENCODING"
@logger.error("Failed to convert event to JSON. Invalid UTF-8, maybe?",
:event => event.inspect)
return
end
@codec.on_event do |event, data|

begin
@redis ||= connect
if @data_type == 'list'
congestion_check(key)
@redis.rpush(key, payload)
if @batch and @data_type == 'list' # Don't use batched method for pubsub.
# Stud::Buffer
buffer_receive(data, event.sprintf(@key))
else
@redis.publish(key, payload)
key = event.sprintf(@key)

begin
@redis ||= connect
if @data_type == 'list'
congestion_check(key)
@redis.rpush(key, data)
else
@redis.publish(key, data)
end
rescue => e
@logger.warn("Failed to send event to Redis", :event => event,
:identity => identity, :exception => e,
:backtrace => e.backtrace)
sleep @reconnect_interval
@redis = nil
retry
end
end
rescue => e
@logger.warn("Failed to send event to Redis", :event => event,
:identity => identity, :exception => e,
:backtrace => e.backtrace)
sleep @reconnect_interval
@redis = nil
retry
end
end # def receive
end # def register

def receive(event)
return unless output?(event)
@codec.encode(event)
end

def congestion_check(key)
return if @congestion_threshold == 0
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-redis.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-redis'
s.version = '1.0.0'
s.version = '1.0.1'
s.licenses = ['Apache License (2.0)']
s.summary = "This output will send events to a Redis queue using RPUSH"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand Down