diff --git a/docs/index.asciidoc b/docs/index.asciidoc index cc8340e..7f80126 100755 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -84,12 +84,14 @@ Redis allows for the renaming or disabling of commands in its protocol, see: ht ===== `data_type` * This is a required setting. - * Value can be any of: `list`, `channel`, `pattern_channel` + * Value can be any of: `list`, `pattern_list`, `channel`, `pattern_channel` * There is no default value for this setting. Specify either list or channel. If `data_type` is `list`, then we will BLPOP the -key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. -If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. +key. If `data_type` is `pattern_list`, then we will spawn a number of worker +threads that will LPOP from keys matching that pattern. If `data_type` is +`channel`, then we will SUBSCRIBE to the key. If `data_type` is +`pattern_channel`, then we will PSUBSCRIBE to the key. [id="plugins-{type}s-{plugin}-db"] ===== `db` @@ -125,6 +127,7 @@ The unix socket path of your Redis server. The name of a Redis list or channel. + [id="plugins-{type}s-{plugin}-password"] ===== `password` @@ -133,6 +136,37 @@ The name of a Redis list or channel. Password to authenticate with. There is no authentication by default. + +[id="plugins-{type}s-{plugin}-pattern_list_max_items"] +===== `pattern_list_max_items` + + * Value type is <> + * Default value is `1000` + +Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`. +After the list is empty or this number of items have been processed, the thread will exit and a +new one will be started if there are non-empty lists matching the pattern without a consumer. + + +[id="plugins-{type}s-{plugin}-pattern_list_threadpool_sleep"] +===== `pattern_list_threadpool_sleep` + + * Value type is <> + * Default value is `0.2` + +Time to sleep in main loop after checking if more threads can/need to be spawned. +Applies to `data_type` is `pattern_list` + + +[id="plugins-{type}s-{plugin}-pattern_list_threads"] +===== `pattern_list_threads` + + * Value type is <> + * Default value is `20` + +Maximum number of worker threads to spawn when using `data_type` `pattern_list`. + + [id="plugins-{type}s-{plugin}-port"] ===== `port` @@ -141,8 +175,9 @@ Password to authenticate with. There is no authentication by default. The port to connect on. + [id="plugins-{type}s-{plugin}-ssl"] -===== `ssl` +===== `ssl` * Value type is <> * Default value is `false` @@ -157,7 +192,6 @@ Enable SSL support. * Default value is `1` - [id="plugins-{type}s-{plugin}-timeout"] ===== `timeout` @@ -166,7 +200,9 @@ Enable SSL support. Initial connection timeout in seconds. + + [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] -:default_codec!: \ No newline at end of file +:default_codec!: diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb index e8a2667..e8b24a7 100755 --- a/lib/logstash/inputs/redis.rb +++ b/lib/logstash/inputs/redis.rb @@ -3,6 +3,8 @@ require "logstash/inputs/base" require "logstash/inputs/threadable" require 'redis' +require 'concurrent' +require 'concurrent/executors' require "stud/interval" # This input will read events from a Redis instance; it supports both Redis channels and lists. @@ -50,9 +52,11 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable config :key, :validate => :string, :required => true # Specify either list or channel. If `data_type` is `list`, then we will BLPOP the - # key. If `data_type` is `channel`, then we will SUBSCRIBE to the key. - # If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. - config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true + # key. If `data_type` is `pattern_list`, then we will spawn a number of worker + # threads that will LPOP from keys matching that pattern. If `data_type` is + # `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`, + # then we will PSUBSCRIBE to the key. + config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 125 @@ -60,8 +64,29 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable # Redefined Redis commands to be passed to the Redis client. config :command_map, :validate => :hash, :default => {} + # Maximum number of worker threads to spawn when using `data_type` `pattern_list`. + config :pattern_list_threads, :validate => :number, :default => 20 + + # Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`. + # After the list is empty or this number of items have been processed, the thread will exit and a + # new one will be started if there are non-empty lists matching the pattern without a consumer. + config :pattern_list_max_items, :validate => :number, :default => 1000 + + # Time to sleep in main loop after checking if more threads can/need to be spawned. + # Applies to `data_type` is `pattern_list` + config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2 + public + def init_threadpool + @threadpool ||= Concurrent::ThreadPoolExecutor.new( + min_threads: @pattern_list_threads, + max_threads: @pattern_list_threads, + max_queue: 2 * @pattern_list_threads + ) + @current_workers ||= Concurrent::Set.new + end + def register @redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}" @@ -69,6 +94,9 @@ def register if @data_type == 'list' || @data_type == 'dummy' @run_method = method(:list_runner) @stop_method = method(:list_stop) + elsif @data_type == 'pattern_list' + @run_method = method(:pattern_list_runner) + @stop_method = method(:pattern_list_stop) elsif @data_type == 'channel' @run_method = method(:channel_runner) @stop_method = method(:subscribe_stop) @@ -77,8 +105,6 @@ def register @stop_method = method(:subscribe_stop) end - @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) - @identity = "#{@redis_url} #{@data_type}:#{@key}" @logger.info("Registering Redis", :identity => @identity) end # def register @@ -102,7 +128,7 @@ def batched? # private def is_list_type? - @data_type == 'list' + @data_type == 'list' || @data_type == 'pattern_list' end # private @@ -169,7 +195,7 @@ def queue_event(msg, output_queue, channel=nil) end # private - def list_stop + def reset_redis redis = @redis # might change during method invocation return if redis.nil? || !redis.connected? @@ -179,8 +205,14 @@ def list_stop @redis = nil end + # private + def list_stop + reset_redis + end + # private def list_runner(output_queue) + @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener) while !stop? begin @redis ||= connect @@ -192,16 +224,113 @@ def list_runner(output_queue) end end - def list_batch_listener(redis, output_queue) + #private + def reset_threadpool + return if @threadpool.nil? + @threadpool.shutdown + @threadpool.wait_for_termination + @threadpool = nil + end + + # private + def pattern_list_stop + reset_redis + reset_threadpool + end + + # private + def pattern_list_process_item(redis, output_queue, key) + if stop? + @logger.debug("Breaking from thread #{key} as it was requested to stop") + return false + end + value = redis.lpop(key) + return false if value.nil? + queue_event(value, output_queue) + true + end + + # private + def pattern_list_single_processor(redis, output_queue, key) + (0...@pattern_list_max_items).each do + break unless pattern_list_process_item(redis, output_queue, key) + end + end + + # private + def pattern_list_batch_processor(redis, output_queue, key) + items_left = @pattern_list_max_items + while items_left > 0 + limit = [items_left, @batch_count].min + processed = process_batch(redis, output_queue, key, limit, 0) + if processed.zero? || processed < limit + return + end + items_left -= processed + end + end + + # private + def pattern_list_worker_consume(output_queue, key) begin - results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]) - results.each do |item| - queue_event(item, output_queue) + redis ||= connect + @pattern_list_processor.call(redis, output_queue, key) + rescue ::Redis::BaseError => e + @logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e) + sleep 1 + return + ensure + redis.quit rescue nil + end + end + + # private + def threadpool_capacity? + @threadpool.remaining_capacity > 0 + end + + # private + def pattern_list_launch_worker(output_queue, key) + @current_workers.add(key) + @threadpool.post do + begin + pattern_list_worker_consume(output_queue, key) + ensure + @current_workers.delete(key) end + end + end - if results.size.zero? - sleep BATCH_EMPTY_SLEEP + # private + def pattern_list_ensure_workers(output_queue) + return unless threadpool_capacity? + redis_runner do + @redis.keys(@key).shuffle.each do |key| + next if @current_workers.include?(key) + pattern_list_launch_worker(output_queue, key) + break unless threadpool_capacity? end + end + end + + # private + def pattern_list_runner(output_queue) + @pattern_list_processor = batched? ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor) + while !stop? + init_threadpool if @threadpool.nil? + pattern_list_ensure_workers(output_queue) + sleep(@pattern_list_threadpool_sleep) + end + end + + def process_batch(redis, output_queue, key, batch_size, sleep_time) + begin + results = redis.evalsha(@redis_script_sha, [key], [batch_size-1]) + results.each do |item| + queue_event(item, output_queue) + end + sleep sleep_time if results.size.zero? && sleep_time > 0 + results.size # Below is a commented-out implementation of 'batch fetch' # using pipelined LPOP calls. This in practice has been observed to @@ -230,6 +359,10 @@ def list_batch_listener(redis, output_queue) end end + def list_batch_listener(redis, output_queue) + process_batch(redis, output_queue, @key, @batch_count, BATCH_EMPTY_SLEEP) + end + def list_single_listener(redis, output_queue) item = redis.blpop(@key, 0, :timeout => 1) return unless item # from timeout or other conditions diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb index d070f13..78c3213 100755 --- a/spec/inputs/redis_spec.rb +++ b/spec/inputs/redis_spec.rb @@ -16,11 +16,17 @@ def populate(key, event_count) end end -def process(conf, event_count) +def wait_events(conf, event_count) events = input(conf) do |_, queue| sleep 0.1 until queue.size >= event_count queue.size.times.map { queue.pop } end + expect(events.size).to eq event_count + events +end + +def process(conf, event_count) + events = wait_events(conf, event_count) # due multiple workers we get events out-of-order in the output events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') } expect(events[0].get('sequence')).to eq(0) @@ -66,14 +72,57 @@ def process(conf, event_count) populate(key, event_count) process(conf, event_count) end + + it "should read events from a list pattern" do + key_base = SecureRandom.hex + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key_base}.*" + data_type => "pattern_list" + batch_count => 1 + } + } + CONFIG + total_event_count = 0 + (0..10).each do |idx| + event_count = 100 + rand(50) + total_event_count += event_count + populate("#{key_base}.#{idx}", event_count) + end + wait_events(conf, total_event_count) + end + + it "should read events from a list pattern using batch_count (default 125)" do + key_base = SecureRandom.hex + conf = <<-CONFIG + input { + redis { + type => "blah" + key => "#{key_base}.*" + data_type => "pattern_list" + batch_count => 125 + } + } + CONFIG + total_event_count = 0 + (0..10).each do |idx| + event_count = 100 + rand(50) + total_event_count += event_count + populate("#{key_base}.#{idx}", event_count) + end + wait_events(conf, total_event_count) + end end describe LogStash::Inputs::Redis do let(:queue) { Queue.new } let(:data_type) { 'list' } + let(:redis_key) { 'foo' } let(:batch_count) { 1 } - let(:config) { {'key' => 'foo', 'data_type' => data_type, 'batch_count' => batch_count} } + let(:config) { {'key' => redis_key, 'data_type' => data_type, 'batch_count' => batch_count} } let(:quit_calls) { [:quit] } subject do @@ -310,6 +359,65 @@ def process(conf, event_count) end end + context 'runtime for pattern_list data_type' do + let(:data_type) { 'pattern_list' } + let(:redis_key) { 'foo.*' } + + before do + subject.register + allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true + allow_any_instance_of( Redis::Client ).to receive(:disconnect) + allow_any_instance_of( Redis ).to receive(:quit) + subject.init_threadpool + end + + after do + subject.stop + end + + context 'close when redis is unset' do + let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] } + + it 'does not attempt to quit' do + allow_any_instance_of( Redis::Client ).to receive(:nil?).and_return(true) + quit_calls.each do |call| + expect_any_instance_of( Redis::Client ).not_to receive(call) + end + expect {subject.do_stop}.not_to raise_error + end + end + + it 'calling the run method, adds events to the queue' do + expect_any_instance_of( Redis ).to receive(:keys).at_least(:once).with(redis_key).and_return ['foo.bar'] + expect_any_instance_of( Redis ).to receive(:lpop).at_least(:once).with('foo.bar').and_return 'l1' + + tt = Thread.new do + end_by = Time.now + 3 + sleep 0.1 until queue.size > 0 or Time.now > end_by + subject.do_stop + end + + subject.run(queue) + + tt.join + + expect(queue.size).to be > 0 + end + + it 'multiple close calls, calls to redis once' do + allow_any_instance_of( Redis ).to receive(:keys).with(redis_key).and_return(['foo.bar']) + allow_any_instance_of( Redis ).to receive(:lpop).with('foo.bar').and_return('l1') + + quit_calls.each do |call| + allow_any_instance_of( Redis ).to receive(call).at_most(:once) + end + + subject.do_stop + expect {subject.do_stop}.not_to raise_error + subject.do_stop + end + end + context 'for the subscribe data_types' do before { subject.register } @@ -432,7 +540,7 @@ def close_thread(inst, rt) context "when using data type" do - ["list", "channel", "pattern_channel"].each do |data_type| + ["list", "channel", "pattern_channel", "pattern_list"].each do |data_type| context data_type do # TODO pending # redis-rb ends up in a read wait loop since we do not use subscribe_with_timeout