diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index fe4f3af..b74e4ea 100644 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -53,6 +53,10 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 125 + config :sentinel_hosts, :validate => :array + + config :master, :validate => :string, :default => "mymaster" + public # public API # use to store a proc that can provide a redis instance or mock @@ -72,8 +76,8 @@ def new_redis_instance end def register - @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" - + @redis_url = identity + @redis_builder ||= method(:internal_redis_builder) # just switch on data_type once @@ -90,10 +94,17 @@ def register @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) - @identity = "#{@redis_url} #{@data_type}:#{@key}" + @identity = identity @logger.info("Registering Redis", :identity => @identity) end # def register + def identity + if @sentinel_hosts + return "redis-sentinel://#{@password} #{$sentinel_hosts} #{@db} #{@data_type}:#{@key}" + end + @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" + end + def run(output_queue) @run_method.call(output_queue) rescue LogStash::ShutdownSignal @@ -119,10 +130,26 @@ def is_list_type? # private def redis_params if @path.nil? - connectionParams = { - :host => @host, - :port => @port - } + if @sentinel_hosts.nil? + connectionParams = { + :host => @host, + :port => @port + } + else + hosts = [] + for sentinel_host in @sentinel_hosts + host, port = sentinel_host.split(":") + unless port + port = @sentinel_port + end + hosts.push({:host => host, :port => port}) + end + connectionParams = { + :url => 'redis://' + @master, + :sentinels => hosts, + :role => :master + } + end else @logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'") connectionParams = {