diff --git a/docs/index.asciidoc b/docs/index.asciidoc
index 9d0d284..2433734 100644
--- a/docs/index.asciidoc
+++ b/docs/index.asciidoc
@@ -42,10 +42,11 @@ This plugin supports the following configuration options plus the <<plugins-{typ
 |=======================================================================
 |Setting |Input type|Required
 | <<plugins-{type}s-{plugin}-batch_count>> |<<number,number>>|No
-| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "channel", "pattern_channel"]`|Yes
+| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "channel", "pattern_channel", "sortedset"]`|Yes
 | <<plugins-{type}s-{plugin}-db>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-host>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-key>> |<<string,string>>|Yes
+| <<plugins-{type}s-{plugin}-priority_reverse>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
 | <<plugins-{type}s-{plugin}-port>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-threads>> |<<number,number>>|No
@@ -58,7 +59,7 @@ input plugins.
 &nbsp;
 
 [id="plugins-{type}s-{plugin}-batch_count"]
-===== `batch_count` 
+===== `batch_count`
 
   * Value type is <<number,number>>
   * Default value is `125`
@@ -66,18 +67,21 @@ input plugins.
 The number of events to return from Redis using EVAL.
 
 [id="plugins-{type}s-{plugin}-data_type"]
-===== `data_type` 
+===== `data_type`
 
   * This is a required setting.
-  * Value can be any of: `list`, `channel`, `pattern_channel`
+  * Value can be any of: `list`, `channel`, `pattern_channel`, `sortedset`
   * There is no default value for this setting.
 
 Specify either list or channel.  If `redis\_type` is `list`, then we will BLPOP the
-key.  If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
+key.
+Specify either list or channel.  If `redis\_type` is `sortedset`, then we will
+ZRANGE/ZREVRANGE and ZREM/ZREMRANGEBYRANK the key.
+If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
 If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
 
 [id="plugins-{type}s-{plugin}-db"]
-===== `db` 
+===== `db`
 
   * Value type is <<number,number>>
   * Default value is `0`
@@ -85,7 +89,7 @@ If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
 The Redis database number.
 
 [id="plugins-{type}s-{plugin}-host"]
-===== `host` 
+===== `host`
 
   * Value type is <<string,string>>
   * Default value is `"127.0.0.1"`
@@ -93,16 +97,24 @@ The Redis database number.
 The hostname of your Redis server.
 
 [id="plugins-{type}s-{plugin}-key"]
-===== `key` 
+===== `key`
 
   * This is a required setting.
   * Value type is <<string,string>>
   * There is no default value for this setting.
 
-The name of a Redis list or channel.
+The name of a Redis list, sortedset or channel.
+
+[id="plugins-{type}s-{plugin}-priority_reverse"]
+===== `priority_reverse`
+
+  * Value type is <<boolean,boolean>>
+  * Default value if `false`
+
+When the data_type is `sortedset`, read the sortedset in reverse order.
 
 [id="plugins-{type}s-{plugin}-password"]
-===== `password` 
+===== `password`
 
   * Value type is <<password,password>>
   * There is no default value for this setting.
@@ -110,7 +122,7 @@ The name of a Redis list or channel.
 Password to authenticate with. There is no authentication by default.
 
 [id="plugins-{type}s-{plugin}-port"]
-===== `port` 
+===== `port`
 
   * Value type is <<number,number>>
   * Default value is `6379`
@@ -118,7 +130,7 @@ Password to authenticate with. There is no authentication by default.
 The port to connect on.
 
 [id="plugins-{type}s-{plugin}-threads"]
-===== `threads` 
+===== `threads`
 
   * Value type is <<number,number>>
   * Default value is `1`
@@ -126,7 +138,7 @@ The port to connect on.
 
 
 [id="plugins-{type}s-{plugin}-timeout"]
-===== `timeout` 
+===== `timeout`
 
   * Value type is <<number,number>>
   * Default value is `5`
diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb
index 203f854..bdde9dc 100644
--- a/lib/logstash/inputs/redis.rb
+++ b/lib/logstash/inputs/redis.rb
@@ -4,13 +4,17 @@
 require "logstash/inputs/threadable"
 require 'redis'
 
-# This input will read events from a Redis instance; it supports both Redis channels and lists.
+# This input will read events from a Redis instance; it supports both Redis channels, lists and sortedsets.
+#
 # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
 # the channel commands used by Logstash are found in Redis v1.3.8+.
 # While you may be able to make these Redis versions work, the best performance
 # and stability will be found in more recent stable versions.  Versions 2.6.0+
 # are recommended.
 #
+# The sortedset commands (ZREM, ZREMRANGEBYRANK, ZRANGE, ZREVRANGE) used by Logstash are supported
+# in Redis v2.0.0+
+#
 # For more information about Redis, see <http://redis.io/>
 #
 # `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
@@ -41,10 +45,14 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
   # The name of a Redis list or channel.
   config :key, :validate => :string, :required => true
 
-  # Specify either list or channel.  If `redis\_type` is `list`, then we will BLPOP the
-  # key.  If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
-  # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
-  config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
+  # Pop high scores item first in sortedset. No effect for other data types
+  config :priority_reverse, :validate => :boolean, :default => false
+
+  # Specify either list or channel.  If `redis\_type` is `list`, then we will BLPOP
+  # the key.  If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
+  # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. 
+  # If `redis\_type` is `sortedset`, then we will ZRANGE/ZREVRANGE
+  config :data_type, :validate => [ "list", "channel", "pattern_channel", "sortedset" ], :required => true
 
   # The number of events to return from Redis using EVAL.
   config :batch_count, :validate => :number, :default => 125
@@ -76,6 +84,9 @@ def register
     if @data_type == 'list' || @data_type == 'dummy'
       @run_method = method(:list_runner)
       @stop_method = method(:list_stop)
+    elsif @data_type == 'sortedset'
+      @run_method = method(:sortedset_runner)
+      @stop_method = method(:sortedset_stop)
     elsif @data_type == 'channel'
       @run_method = method(:channel_runner)
       @stop_method = method(:subscribe_stop)
@@ -84,7 +95,9 @@ def register
       @stop_method = method(:subscribe_stop)
     end
 
+    #TODO voir à terme comment fusionner ces deux méthodes
     @list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
+    @sortedset_method = batched? ? method(:sortedset_batch_listener) : method(:sortedset_single_listener) 
 
     @identity = "#{@redis_url} #{@data_type}:#{@key}"
     @logger.info("Registering Redis", :identity => @identity)
@@ -112,6 +125,11 @@ def is_list_type?
     @data_type == 'list'
   end
 
+  # private
+  def is_sortedset_type?
+    @data_type == 'sortedset'
+  end
+
   # private
   def redis_params
     {
@@ -131,12 +149,13 @@ def internal_redis_builder
   # private
   def connect
     redis = new_redis_instance
-    load_batch_script(redis) if batched? && is_list_type?
+    list_load_batch_script(redis) if batched? && is_list_type?
+    sortedset_load_batch_script(redis) if batched? && is_sortedset_type?
     redis
   end # def connect
 
   # private
-  def load_batch_script(redis)
+  def list_load_batch_script(redis)
     #A Redis Lua EVAL script to fetch a count of keys
       redis_script = <<EOF
         local batchsize = tonumber(ARGV[1])
@@ -147,6 +166,31 @@ def load_batch_script(redis)
     @redis_script_sha = redis.script(:load, redis_script)
   end
 
+  # private
+  def sortedset_load_batch_script(redis)
+    #A Redis Lua EVAL script to fetch a count of keys
+    redis_script = "local batchsize = tonumber(ARGV[1])\n"
+    redis_script << "local zcard = tonumber(redis.call('zcard', KEYS[1]))\n"
+    redis_script << "local result = redis.call('"
+    if @priority_reverse then
+      redis_script << 'zrevrange'
+    else
+      redis_script << 'zrange'
+    end
+    redis_script << "', KEYS[1], 0, batchsize)\n"
+    redis_script << "redis.call('"
+
+    redis_script << "zremrangebyrank', KEYS[1], "
+    if @priority_reverse then
+      redis_script << "zcard - batchsize - 1, zcard"
+    else
+      redis_script << "0, batchsize"
+    end    
+    redis_script << ")\nreturn result\n"
+    
+    @redis_script_sha = redis.script(:load, redis_script)
+  end
+
   # private
   def queue_event(msg, output_queue)
     begin
@@ -214,7 +258,7 @@ def list_batch_listener(redis, output_queue)
     rescue ::Redis::CommandError => e
       if e.to_s =~ /NOSCRIPT/ then
         @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
-        load_batch_script(redis)
+        list_load_batch_script(redis)
         retry
       else
         raise e
@@ -231,6 +275,75 @@ def list_single_listener(redis, output_queue)
     queue_event(item.last, output_queue)
   end
 
+  # private
+  def sortedset_stop
+    return if @redis.nil? || !@redis.connected?
+
+    @redis.quit rescue nil
+    @redis = nil
+  end
+
+  # private
+  def sortedset_runner(output_queue)
+    while !stop?
+      begin
+        @redis ||= connect
+        @sortedset_method.call(@redis, output_queue)
+      rescue ::Redis::BaseError => e
+        @logger.warn("Redis connection problem", :exception => e)
+        # Reset the redis variable to trigger reconnect
+        @redis = nil
+        # this sleep does not need to be stoppable as its
+        # in a while !stop? loop
+        sleep 1
+      end
+    end
+  end
+
+  def sortedset_batch_listener(redis, output_queue)
+    begin
+      results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
+      results.each do |item|
+        queue_event(item, output_queue)
+      end
+
+      if results.size.zero?
+        sleep BATCH_EMPTY_SLEEP
+      end
+    rescue ::Redis::CommandError => e
+      if e.to_s =~ /NOSCRIPT/ then
+        @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
+        sortedset_load_batch_script(redis)
+        retry
+      else
+        raise e
+      end
+    end
+  end
+
+  def sortedset_single_listener(redis, output_queue)
+    redis.watch(@key) do
+      if @priority_reverse then
+        item = redis.zrevrange(@key, 0, 0, :timeout => 1)
+      else
+        item = redis.zrange(@key, 0, 0, :timeout => 1)
+      end
+      
+      if item.size.zero?
+        sleep BATCH_EMPTY_SLEEP
+      end
+
+      return unless item.size > 0
+
+      redis.multi do |multi|
+        redis.zrem(@key, item)
+      end
+
+      queue_event(item.first, output_queue)
+    end
+  end
+
+
   # private
   def subscribe_stop
     return if @redis.nil? || !@redis.connected?
diff --git a/spec/inputs/redis_spec.rb b/spec/inputs/redis_spec.rb
index d4b6812..add7525 100644
--- a/spec/inputs/redis_spec.rb
+++ b/spec/inputs/redis_spec.rb
@@ -4,7 +4,7 @@
 require 'logstash/inputs/redis'
 require 'securerandom'
 
-def populate(key, event_count)
+def list_populate(key, event_count)
   require "logstash/event"
   redis = Redis.new(:host => "localhost")
   event_count.times do |value|
@@ -15,7 +15,42 @@ def populate(key, event_count)
   end
 end
 
-def process(conf, event_count)
+
+def sortedset_populate(key, event_count)
+  require "logstash/event"
+  redis = Redis.new(:host => "localhost")
+
+  
+  event_count_1 = event_count / 2
+  event_count_2 = event_count - event_count_1
+
+  # Add half events in default order
+  event_count_1.times do |value|
+    event = LogStash::Event.new("sequence" => value)
+    Stud.try(10.times) do
+      redis.zadd(key, value, event.to_json)
+    end
+  end
+
+  # Add half events in reverse order
+  event_count_2.times do |value|
+    value = event_count - value - 1
+    event = LogStash::Event.new("sequence" => value)
+    Stud.try(10.times) do
+      redis.zadd(key, value, event.to_json)
+    end
+  end
+end
+
+def list_process(conf, event_count)
+  events = input(conf) do |pipeline, queue|
+    event_count.times.map{queue.pop}
+  end
+
+  expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a)
+end
+
+def sortedset_process(conf, event_count)
   events = input(conf) do |pipeline, queue|
     event_count.times.map{queue.pop}
   end
@@ -23,6 +58,14 @@ def process(conf, event_count)
   expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a)
 end
 
+def sortedsetrev_process(conf, event_count)
+  events = input(conf) do |pipeline, queue|
+    event_count.times.map{queue.pop}
+  end
+
+  expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a.reverse)
+end
+
 # integration tests ---------------------
 
 describe "inputs/redis", :redis => true do
@@ -42,8 +85,47 @@ def process(conf, event_count)
       }
     CONFIG
 
-    populate(key, event_count)
-    process(conf, event_count)
+    list_populate(key, event_count)
+    list_process(conf, event_count)
+  end
+
+  it "should read events from a sortedset in default order" do
+    key = SecureRandom.hex
+    event_count = 1000 + rand(50)
+    # event_count = 100
+    conf = <<-CONFIG
+      input {
+        redis {
+          type => "blah"
+          key => "#{key}"
+          data_type => "sortedset"
+          batch_count => 1
+        }
+      }
+    CONFIG
+
+    sortedset_populate(key, event_count)
+    sortedset_process(conf, event_count)
+  end
+
+  it "should read events from a sortedset in reverse order" do
+    key = SecureRandom.hex
+    event_count = 1000 + rand(50)
+    # event_count = 100
+    conf = <<-CONFIG
+      input {
+        redis {
+          type => "blah"
+          key => "#{key}"
+          data_type => "sortedset"
+          batch_count => 1
+          priority_reverse => true
+        }
+      }
+    CONFIG
+
+    sortedset_populate(key, event_count)
+    sortedsetrev_process(conf, event_count)
   end
 
   it "should read events from a list using batch_count (default 125)" do
@@ -59,8 +141,45 @@ def process(conf, event_count)
       }
     CONFIG
 
-    populate(key, event_count)
-    process(conf, event_count)
+    list_populate(key, event_count)
+    list_process(conf, event_count)
+  end
+
+  it "should read events from a sortedset in default order using batch_count" do
+    key = SecureRandom.hex
+    event_count = 1000 + rand(50)
+    # event_count = 100
+    conf = <<-CONFIG
+      input {
+        redis {
+          type => "blah"
+          key => "#{key}"
+          data_type => "sortedset"
+        }
+      }
+    CONFIG
+
+    sortedset_populate(key, event_count)
+    sortedset_process(conf, event_count)
+  end
+
+  it "should read events from a sortedset in reverse order using batch_count" do
+    key = SecureRandom.hex
+    event_count = 1000 + rand(50)
+    # event_count = 100
+    conf = <<-CONFIG
+      input {
+        redis {
+          type => "blah"
+          key => "#{key}"
+          data_type => "sortedset"
+          priority_reverse => true
+        }
+      }
+    CONFIG
+
+    sortedset_populate(key, event_count)
+    sortedsetrev_process(conf, event_count)
   end
 end
 
@@ -305,7 +424,7 @@ def close_thread(inst, rt)
 
   describe LogStash::Inputs::Redis do
     context "when using data type" do
-      ["list", "channel", "pattern_channel"].each do |data_type|
+      ["list", "channel", "sortedset", "pattern_channel"].each do |data_type|
         context data_type do
           it_behaves_like "an interruptible input plugin" do
             let(:config) { {'key' => 'foo', 'data_type' => data_type } }