diff --git a/Gruntfile.coffee b/Gruntfile.coffee index 8511437..f85b7e7 100644 --- a/Gruntfile.coffee +++ b/Gruntfile.coffee @@ -32,7 +32,7 @@ module.exports = (grunt) -> pckg: options: globals: - version: "0.0.4" + version: "<%= pkg.version %>" prefix: "@@" suffix: '' diff --git a/README.md b/README.md index e9cd890..29993c5 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![Build Status](https://david-dm.org/mpneuried/rsmq-worker.png)](https://david-dm.org/mpneuried/rsmq-worker) [![NPM version](https://badge.fury.io/js/rsmq-worker.png)](http://badge.fury.io/js/rsmq-worker) -RSMQ helper to simply implement a worker around the message queue. +Helper to simply implement a worker [RSMQ ( Redis Simple Message Queue )](https://github.com/smrchy/rsmq). [![NPM](https://nodei.co/npm/rsmq-worker.png?downloads=true&stars=true)](https://nodei.co/npm/rsmq-worker/) @@ -48,7 +48,7 @@ RSMQ helper to simply implement a worker around the message queue. - **options.redis**: *( `RedisClient` optional; default = `null` )* A already existing redis client instance to use if no `rsmq` instance has been defined - **options.redisPrefix**: *( `String` optional; default = `""` )* The redis Prefix for rsmq if no `rsmq` instance has been defined - **options.host**: *( `String` optional; default = `"localhost"` )* Host to connect to redis if no `rsmq` or `redis` instance has been defined - - **options.host**: *( `Number` optional; default = `6379` )* Port to connect to redis if no `rsmq` or `redis` instance has been defined + - **options.port**: *( `Number` optional; default = `6379` )* Port to connect to redis if no `rsmq` or `redis` instance has been defined - **options.options**: *( `Object` optional; default = `{}` )* Options to connect to redis if no `rsmq` or `redis` instance has been defined @@ -72,18 +72,6 @@ If you haven't defined the config `autostart` to `true` you have to call the `.s *( Self )*: The instance itself for chaining. -### `.del( id )` - -Helper function to simply delete a message after it has been processed. - -**Arguments** - -* `id` : *( `String` required )*: The rsmq message id. - -**Return** - -*( Self )*: The instance itself for chaining. - ### `.stop()` Stop the receive interval. @@ -105,6 +93,18 @@ Helper function to simply send a message in the configured queue. *( Self )*: The instance itself for chaining. +### `.del( id )` + +Helper function to simply delete a message after it has been processed. + +**Arguments** + +* `id` : *( `String` required )*: The rsmq message id. + +**Return** + +*( Self )*: The instance itself for chaining. + ## Events ### `message` @@ -222,13 +222,14 @@ This is an advanced example showing some features in action. ## Todos/Ideas -- TEST scripts ;-) -- Code method docs +- Timeout for message processing +- MORE tests! ## Release History |Version|Date|Description| |:--:|:--:|:--| -|0.1.1|2015-1-??|Added test scripts and optimized repository file list| +|0.1.2|2015-1-20|Reorganized code, added code docs and optimized readme| +|0.1.1|2015-1-17|Added test scripts and optimized repository file list| |0.1.0|2015-1-16|First working and documented version |0.0.1|2015-1-14|Initial commit| diff --git a/_docs/README.md.html b/_docs/README.md.html index 12ba215..89cb636 100644 --- a/_docs/README.md.html +++ b/_docs/README.md.html @@ -18,9 +18,6 @@
-
- rsmq-worker -
Install
@@ -36,15 +33,15 @@
.start()
-
- .del( id ) -
.stop()
.send( msg [, delay ] )
+
+ .del( id ) +
Events
@@ -72,26 +69,24 @@
Release History
+
+ Other projects +
The MIT License (MIT)
-
-
-

- - rsmq-worker -

-
- +

RSMQ-Worker

Build Status Build Status NPM version

-

RSMQ helper to simply implement a worker around the message queue.

+

Helper to simply implement a worker RSMQ ( Redis Simple Message Queue ).

+ +

NPM

@@ -123,31 +118,13 @@

-

Config

- -
    -
  • queuename: ( String required ) The queuename to pull the messages
  • -
  • options ( Object optional ) The configuration object -
    • options.interval: ( Number[] optional; default = [ 0, 1, 5, 10 ] ) An Array of increasing wait times in seconds
    • -
    • options.maxReceiveCount: ( Number optional; default = 10 ) Receive count until a message will be exceeded
    • -
    • options.invisibletime: ( Number optional; default = 30 ) A time in seconds to hide a message after it has been received.
    • -
    • options.autostart: ( Boolean optional; default = false ) Autostart the worker on init
    • -
    • options.customExceedCheck: ( Function optional; ) A custom function, with the raw message (see message format) as argument to build a custom exceed check. If you return a true the message will not exceed. On return false the regular check for maxReceiveCount will be used.
    • -
    • options.rsmq: ( RedisSMQ optional; default = null ) A allready existing rsmq instance to use instead of creating a new client
    • -
    • options.redis: ( RedisClient optional; default = null ) A allready existing redis client instance to use if no rsmq instance has been defiend
    • -
    • options.redisPrefix: ( String optional; default = "" ) The redis Prefix for rsmq if no rsmq instance has been defiend
    • -
    • options.host: ( String optional; default = "localhost" ) Host to connect to redis if no rsmq or redis instance has been defiend
    • -
    • options.host: ( Number optional; default = 6379 ) Port to connect to redis if no rsmq or redis instance has been defiend
    • -
    • options.options: ( Object optional; default = {} ) Options to connect to redis if no rsmq or redis instance has been defiend
  • -
-

Example:

  var RSMQWorker = require( "rsmq-worker" );
   var worker = new RSMQWorker( "myqueue" );
 
-  worker.on( "message", function( msg, next, id ){
+  worker.on( "message", function( msg, next ){
       // process your message
       next()
   });
@@ -157,6 +134,24 @@ 

+

Config

+ +
    +
  • queuename: ( String required ) The queuename to pull the messages
  • +
  • options ( Object optional ) The configuration object +
    • options.interval: ( Number[] optional; default = [ 0, 1, 5, 10 ] ) An Array of increasing wait times in seconds
    • +
    • options.maxReceiveCount: ( Number optional; default = 10 ) Receive count until a message will be exceeded
    • +
    • options.invisibletime: ( Number optional; default = 30 ) A time in seconds to hide a message after it has been received.
    • +
    • options.autostart: ( Boolean optional; default = false ) Autostart the worker on init
    • +
    • options.customExceedCheck: ( Function optional; ) A custom function, with the raw message (see message format) as argument to build a custom exceed check. If you return a true the message will not exceed. On return false the regular check for maxReceiveCount will be used.
    • +
    • options.rsmq: ( RedisSMQ optional; default = null ) A already existing rsmq instance to use instead of creating a new client
    • +
    • options.redis: ( RedisClient optional; default = null ) A already existing redis client instance to use if no rsmq instance has been defined
    • +
    • options.redisPrefix: ( String optional; default = "" ) The redis Prefix for rsmq if no rsmq instance has been defined
    • +
    • options.host: ( String optional; default = "localhost" ) Host to connect to redis if no rsmq or redis instance has been defined
    • +
    • options.port: ( Number optional; default = 6379 ) Port to connect to redis if no rsmq or redis instance has been defined
    • +
    • options.options: ( Object optional; default = {} ) Options to connect to redis if no rsmq or redis instance has been defined
  • +
+

@@ -169,7 +164,7 @@

A message ( e.g. received by the event data or customExceedCheck ) contains the following keys:

    -
  • msg.message : ( String ) The queue message content. You can use complex content by using a stringfied JSON.
  • +
  • msg.message : ( String ) The queue message content. You can use complex content by using a stringified JSON.
  • msg.id : ( String ) The rsmq internal messag id
  • msg.sent : ( Number ) Timestamp of when this message was sent / created.
  • msg.fr : ( Number ) Timestamp of when this message was first received.
  • @@ -201,57 +196,57 @@

    ( Self ): The instance itself for chaining.

    -
    +

    - - .del( id ) + + .stop()

    -

    Helper function to simply delete a message after it has been processed.

    - -

    Arguments

    - -
      -
    • id : ( String required ): The rsmq message id.
    • -
    +

    Stop the receive interval.

    Return

    ( Self ): The instance itself for chaining.

    -
    +

    - - .stop() + + .send( msg [, delay ] )

    -

    Stop the receive interval.

    +

    Helper function to simply send a message in the configured queue.

    + +

    Arguments

    + +
      +
    • filename : ( String required ): The rsmq message. In best practice it's a stringified JSON with additional data.
    • +
    • delay : ( Number optional; default = 0 ): The message delay to hide this message for the next x seconds.
    • +

    Return

    ( Self ): The instance itself for chaining.

    -
    +

    - - .send( msg [, delay ] ) + + .del( id )

    -

    Helper function to simply send a message in the configured queue.

    +

    Helper function to simply delete a message after it has been processed.

    Arguments

      -
    • filename : ( String required ): The rsmq message. In best practice it's a stringified JSON with additional data.
    • -
    • delay : ( Number optional; default = 0 ): The message delay to hide this message for the next x seconds.
    • +
    • id : ( String required ): The rsmq message id.

    Return

    @@ -294,9 +289,9 @@

    • message : ( String ) The queue message content to process. You can use complex content by using a stringfied JSON.
    • -
    • next : ( Function ) A function you have to call when your message has been prcessed.
      +
    • next : ( Function ) A function you have to call when your message has been processed.
      Arguments -
      • delete: ( Boolean optional; default = true ) It's possible to prevent the worker from autodelete the message on end. This is useful if you want to pop up a message multiple times. To implement this, please check the config options.customExceedCheck
    • +
      • delete: ( Boolean optional; default = true ) It's possible to prevent the worker from auto-delete the message on end. This is useful if you want to pop up a message multiple times. To implement this, please check the config options.customExceedCheck
    • msgid : ( String ) The message id. This is useful if you want to delete a message manually.
    @@ -395,7 +390,9 @@

    // worker.on( "message", function( message, next, id ){ + console.log( "message", message ) + if( message === "createmessages" ){ next( false ) worker.send( JSON.stringify( { type: "writefile", filename: "./test.txt", txt: "Foo Bar" } ) ); @@ -442,7 +439,8 @@

      -
    • TEST scripts ;-)
    • +
    • Timeout for message processing
    • +
    • MORE tests!
    @@ -456,9 +454,34 @@

    |Version|Date|Description| |:--:|:--:|:--| -|0.1.0|2015-1-16|First working and documented version| +|0.1.2|2015-1-20|Reorganized code, added code docs and optimized readme| +|0.1.1|2015-1-17|Added test scripts and optimized repository file list| +|0.1.0|2015-1-16|First working and documented version |0.0.1|2015-1-14|Initial commit|

    +

    NPM

    + + +
    +

    + + Other projects +

    +
    + + +

    |Name|Description| +|:--|:--| +|rsmq|A really simple message queue based on Redis| +|node-cache|Simple and fast NodeJS internal caching. Node internal in memory cache like memcached.| +|redis-sessions|An advanced session store for NodeJS and Redis| +|connect-redis-sessions|A connect or express middleware to simply use the redis sessions. With redis sessions you can handle multiple sessions per user_id.| +|systemhealth|Node module to run simple custom checks for your machine or it's connections. It will use redis-heartbeat to send the current state to redis.| +|task-queue-worker|A powerful tool for background processing of tasks that are run by making standard http requests.| +|soyer|Soyer is small lib for serverside use of Google Closure Templates with node.js.| +|grunt-soy-compile|Compile Goggle Closure Templates ( SOY ) templates inclding the handling of XLIFF language files.| +|backlunr|A solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js|

    +

    diff --git a/_docs/_src/example/example.coffee.html b/_docs/_src/example/example.coffee.html new file mode 100644 index 0000000..0d5e89b --- /dev/null +++ b/_docs/_src/example/example.coffee.html @@ -0,0 +1,56 @@ + + + + example.coffee + + + + + + + + + +
    + + + + + + + + + + + +
    +

    example.coffee

    +
    +
    + +
    + +
    1  RSMQWorker = require( "../." )
    +2  worker = new RSMQWorker( "myqueue", { interval: [ 0, 1, 2, 3 ] } )
    +3  
    +4  worker.on "message", ( msg, next, id )=>
    +5    console.log( "RECEIVED", msg )
    +6    next()
    +7    return
    +8  
    +9  worker.start()
    +10  
    +
    +
    + + diff --git a/_docs/_src/example/sender.coffee.html b/_docs/_src/example/sender.coffee.html new file mode 100644 index 0000000..d648832 --- /dev/null +++ b/_docs/_src/example/sender.coffee.html @@ -0,0 +1,55 @@ + + + + sender.coffee + + + + + + + + + +
    + + + + + + + + + + + +
    +

    sender.coffee

    +
    +
    + +
    + +
    1  RSMQWorker = require( "../." )
    +2  worker = new RSMQWorker( "myqueue", autostart: false )
    +3  
    +4  worker.on "ready", =>
    +5    for msg in "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split( "" )
    +6      console.log( "SEND", msg )
    +7      worker.send( msg )
    +8    return
    +9  
    +
    +
    + + diff --git a/_docs/_src/lib/rsmq-worker.coffee.html b/_docs/_src/lib/rsmq-worker.coffee.html index e874e2c..918003b 100644 --- a/_docs/_src/lib/rsmq-worker.coffee.html +++ b/_docs/_src/lib/rsmq-worker.coffee.html @@ -19,7 +19,7 @@
    extends NPM:MPBasic @@ -33,6 +33,18 @@ +
    + start +
    +
    + stop +
    +
    + send +
    +
    + del +
    @@ -43,7 +55,22 @@ _initQueue
    - start + _send +
    + +
    + receive +
    +
    + check +
    +
    + interval +
    +
    + next

    @@ -56,7 +83,7 @@

    - rsmqWorker + RSMQWorker

    @@ -110,7 +137,7 @@

    14 async = require("async") 15 RSMQ = require("rsmq") 16 -17 class rsmqWorker extends require( "mpbasic" )() +17 class RSMQWorker extends require( "mpbasic" )() 18 19 20

@@ -138,7 +165,7 @@

-

rsmqWorker.interval Number[] An Array of increasing wait times in seconds

+

RSMQWorker.interval Number[] An Array of increasing wait times in seconds

22  
 23        interval: [ 0, 1, 5, 10 ]
@@ -151,7 +178,7 @@ 

-

rsmqWorker.maxReceiveCount Number Receive count until a message will be exceeded

+

RSMQWorker.maxReceiveCount Number Receive count until a message will be exceeded

24  
 25        maxReceiveCount: 10
@@ -164,7 +191,7 @@ 

-

rsmqWorker.invisibletime Number A time in seconds to hide a message after it has been received.

+

RSMQWorker.invisibletime Number A time in seconds to hide a message after it has been received.

26  
 27        invisibletime: 30
@@ -177,7 +204,7 @@ 

-

rsmqWorker.autostart Boolean Autostart the worker on init

+

RSMQWorker.autostart Boolean Autostart the worker on init

28  
 29        autostart: false
@@ -190,7 +217,7 @@ 

-

rsmqWorker.customExceedCheck Function A custom function, with the message id and content as argument to build a custom exceed check

+

RSMQWorker.customExceedCheck Function A custom function, with the message id and content as argument to build a custom exceed check

30  
 31        customExceedCheck: null
@@ -204,7 +231,7 @@ 

-

rsmqWorker.rsmq RedisSMQ A allready existing rsmq instance to use instead of creating a new client

+

RSMQWorker.rsmq RedisSMQ A allready existing rsmq instance to use instead of creating a new client

33  
 34        rsmq: null
@@ -218,7 +245,7 @@ 

-

rsmqWorker.redis RedisClient A allready existing redis client instance to use if no rsmq instance has been defiend

+

RSMQWorker.redis RedisClient A allready existing redis client instance to use if no rsmq instance has been defiend

36  
 37        redis: null
@@ -231,7 +258,7 @@ 

-

rsmqWorker.redisPrefix String The redis Prefix for rsmq if no rsmq instance has been defiend

+

RSMQWorker.redisPrefix String The redis Prefix for rsmq if no rsmq instance has been defiend

38  
 39        redisPrefix: ""
@@ -245,7 +272,7 @@ 

-

rsmqWorker.host String Host to connect to redis if no rsmq or redis instance has been defiend

+

RSMQWorker.host String Host to connect to redis if no rsmq or redis instance has been defiend

41  
 42        host: "localhost"
@@ -258,7 +285,7 @@ 

-

rsmqWorker.host Number Port to connect to redis if no rsmq or redis instance has been defiend

+

RSMQWorker.host Number Port to connect to redis if no rsmq or redis instance has been defiend

43  
 44        port: 6379
@@ -271,7 +298,7 @@ 

-

rsmqWorker.options Object Options to connect to redis if no rsmq or redis instance has been defiend

+

RSMQWorker.options Object Options to connect to redis if no rsmq or redis instance has been defiend

45  
 46        options: {}
@@ -333,6 +360,197 @@ 

+
+

+ + start +

+
+ +
+

RSMQWorker.start()

+ +

Start the worker

+
+
+
Returns
+
+ + RedisSMQ + A rsmq instance +
+
+ + RSMQWorker + The instance itself for chaining. +
+
API
+
+ public + +
+
+
+ +
82  
+83    start: =>
+84      if @ready
+85        @interval()
+86        return
+87      @on "ready", @interval
+88      return @
+89  
+90  
+91  
+ + + +
+
+
+

+ + stop +

+
+ +
+

RSMQWorker.stop()

+ +

Stop the worker receiving messages

+
+
+
Returns
+
+ + RSMQWorker + The instance itself for chaining. +
+
API
+
+ public + +
+
+
+ +
100  
+101    stop: =>
+102      clearTimeout( @timeout ) if @timeout?
+103      return @
+104  
+105  
+106  
+ + + +
+
+
+

+ + send +

+
+ +
+

RSMQWorker.send( msg [, delay] )

+ +

Helper/Convinience method to send a new message to the queue.

+
+
+
Params
+
+ msg + String + The message content +
+
+ [delay=0] + Number + The message delay to hide this message for the next x seconds. +
+
Returns
+
+ + RSMQWorker + The instance itself for chaining. +
+
API
+
+ public + +
+
+
+ +
118  
+119    send: ( msg, delay = 0 )=>
+120      if @queue.connected
+121        @_send( msg, delay )
+122      else
+123        @debug "store message during redis offline time", msg, delay
+124        @offlineQueue.push( msg: msg, delay: delay )
+125      return @
+126  
+127  
+128  
+ + + +
+
+
+

+ + del +

+
+ +
+

RSMQWorker.del( id )

+ +

Delete a messge from queue. This is usually done automatically unless you call next(false)

+
+
+
Params
+
+ id + String + The rsmq message id +
+
Returns
+
+ + RSMQWorker + The instance itself for chaining. +
+
API
+
+ public + +
+
+
+ +
139  
+140    del: ( id )=>
+141      @queue.deleteMessage qname: @queuename, id: id, ( err, resp )=>
+142        if err
+143          @error "delete queue message", err
+144          return
+145        @debug "delete queue message", resp
+146        @emit( "deleted", id )
+147        return
+148      return @
+149  
+150  
+151  
+ + + +
+

@@ -341,7 +559,7 @@

-

rsmq-worker._initRSMQ()

+

RSMQWorker._initRSMQ()

Initialize rsmq and handle disconnects

@@ -354,62 +572,62 @@

-
78  
-79    _initRSMQ: =>
-80      @queue = @_getRsmq()
-81  
-82      @reconnectActive = false
-83  
-84  
-85  
+
158  
+159    _initRSMQ: =>
+160      @queue = @_getRsmq()
+161  
+162      @reconnectActive = false
+163  
+164  
+165  
- +

handle redis disconnect

-
84  
-85      @queue.on "disconnect", ( err )=>
-86        @warning "redis connection lost"
-87        _interval = @timeout?
-88        if not @reconnectActive
-89          @reconnectActive = true
-90          @stop() if _interval
-91  
-92  
-93  
+
164  
+165      @queue.on "disconnect", ( err )=>
+166        @warning "redis connection lost"
+167        _interval = @timeout?
+168        if not @reconnectActive
+169          @reconnectActive = true
+170          @stop() if _interval
+171  
+172  
+173  
- +

on reconnect

-
92  
-93          @queue.once "connect", =>
-94            @waitCount = 0
-95            @reconnectActive = false
-96            @queue = new @_getRsmq( true )
-97            @_runOfflineMessages()
-98            @interval() if _interval
-99            @warning "redis connection reconnected"
-100            return
-101  
-102        return
-103      if @queue.connected
-104        @_initQueue()
-105      else
-106        @queue.once "connect", @_initQueue
-107      
-108      return
-109  
-110  
-111  
+
172  
+173          @queue.once "connect", =>
+174            @waitCount = 0
+175            @reconnectActive = false
+176            @queue = new @_getRsmq( true )
+177            @_runOfflineMessages()
+178            @interval() if _interval
+179            @warning "redis connection reconnected"
+180            return
+181  
+182        return
+183      if @queue.connected
+184        @_initQueue()
+185      else
+186        @queue.once "connect", @_initQueue
+187      
+188      return
+189  
+190  
+191  
@@ -423,7 +641,7 @@

-

rsmq-worker._getRsmq( [forceInit] )

+

RSMQWorker._getRsmq( [forceInit] )

get or init the rsmq instance

@@ -448,24 +666,23 @@

-
122  
-123    _getRsmq: ( forceInit = false )=>
-124      if not forceInit and @queue?
-125        return @queue
-126  
-127      if @config.rsmq?.constructor?.name is "RedisSMQ"
-128        @debug "use given rsmq client"
-129        return @config.rsmq
-130        
-131  
-132      if @config.redis?.constructor?.name is "RedisClient"
-133        return new RSMQ( client: @config.redis, ns: @config.redisPrefix )
-134      else
-135        return new RSMQ( host: @config.host, port: @config.port, options: @config.options, ns: @config.redisPrefix )
-136  
-137  
-138  
-139  
+
202  
+203    _getRsmq: ( forceInit = false )=>
+204      if not forceInit and @queue?
+205        return @queue
+206  
+207      if @config.rsmq?.constructor?.name is "RedisSMQ"
+208        @debug "use given rsmq client"
+209        return @config.rsmq
+210        
+211  
+212      if @config.redis?.constructor?.name is "RedisClient"
+213        return new RSMQ( client: @config.redis, ns: @config.redisPrefix )
+214      else
+215        return new RSMQ( host: @config.host, port: @config.port, options: @config.options, ns: @config.redisPrefix )
+216  
+217  
+218  
@@ -479,7 +696,7 @@

-

rsmq-worker._initQueue()

+

RSMQWorker._initQueue()

check if the given queue exists

@@ -492,194 +709,339 @@

-
146  
-147    _initQueue: =>
-148      @queue.createQueue qname: @queuename, ( err, resp )=>
-149        if err?.name is "queueExists"
-150          @ready = true
-151          @emit "ready"
-152          @_runOfflineMessages()
-153          return
-154  
-155        throw err if err
-156  
-157        if resp is 1
-158          @debug "queue created"
-159        else
-160          @debug "queue allready existed"
-161  
-162        @ready = true
-163        @emit "ready"
-164  
-165  
-166  
+
225  
+226    _initQueue: =>
+227      @queue.createQueue qname: @queuename, ( err, resp )=>
+228        if err?.name is "queueExists"
+229          @ready = true
+230          @emit "ready"
+231          @_runOfflineMessages()
+232          return
+233  
+234        throw err if err
+235  
+236        if resp is 1
+237          @debug "queue created"
+238        else
+239          @debug "queue allready existed"
+240  
+241        @ready = true
+242        @emit "ready"
+243  
+244  
+245  
- +

after the ready has been fired run saved messages

-
165  
-166        @_runOfflineMessages()
-167        return
-168      return
-169  
-170  
-171  
+
244  
+245        @_runOfflineMessages()
+246        return
+247      return
+248  
+249  
+250  
-
+

- - start + + _send

-

rsmq-worker.start()

+

RSMQWorker._send( msg, delay )

-

Start the worker

+

Internal send method that directly calls rsmq.sendMessage() .

+
Params
+
+ msg + String + The message content +
+
+ delay + Number + The message delay to hide this message for the next x seconds. +
API
- public + private
-
178  
-179    start: =>
-180      if @ready
-181        @interval()
-182        return
-183      @on "ready", @interval
-184      return @
-185  
-186  
-187    send: ( msg, delay = 0 )=>
-188      if @queue.connected
-189        @_send( msg, delay )
-190      else
-191        @debug "store message during redis offline time", msg, delay
-192        @offlineQueue.push( msg: msg, delay: delay )
-193      return @
-194  
-195    _send: ( msg, delay )=>
-196      @queue.sendMessage { qname: @queuename, message: msg, delay: delay }, ( err, resp )=>
-197        if err
-198          @error "send pending queue message", err
-199          return
-200        @emit "new", resp
-201        return
-202      return
-203  
-204    _runOfflineMessages: =>
-205      if @offlineQueue.length
-206        _aq = async.queue( ( sndData, cb )=>
-207          @debug "run offline stored message", arguments
-208          @_send( sndData.msg, sndData.delay )
-209          cb()
-210          return
-211        , 3 )
-212        for sndData in @offlineQueue
-213          @debug "queue offline stored message", sndData
-214          _aq.push sndData
-215      return
-216  
-217    receive: ( _useInterval = false )=>
-218      @debug "start receive"
-219      @queue.receiveMessage { qname: @queuename, vt: @config.invisibletime }, ( err, msg )=>
-220        @debug "received", msg
-221        if err
-222          @emit( "next", true ) if _useInterval
-223          @error "receive queue message", err
-224          return
-225  
-226        if msg?.id
-227          @emit "data", msg
-228          _id = msg.id
-229          _fnNext = ( del = true )=>
-230            @del( _id ) if del
-231            @emit( "next" ) if _useInterval
-232            return
-233          @emit "message", msg.message, _fnNext, _id
-234        else
-235          @emit( "next", true ) if _useInterval
-236        return
-237      return
-238  
-239    del: ( id )=>
-240      @queue.deleteMessage qname: @queuename, id: id, ( err, resp )=>
-241        if err
-242          @error "delete queue message", err
-243          return
-244        @debug "delete queue message", resp
-245        @emit( "deleted", id )
-246        return
-247      return @
-248  
-249    check: ( msg )=>
-250      if @config.customExceedCheck?( msg )
-251        return
-252  
-253      if msg.rc >= @config.maxReceiveCount
-254        @emit "exceeded", msg
-255        @warning "message received more than #{@config.maxReceiveCount} times. So delete it", msg
-256        @del( msg.id )
-257      return
-258  
-259    interval: =>
-260      @debug "run interval"
-261      @receive( true )
-262      return
-263  
-264    next: ( wait = false )=>
-265      if not wait
-266        @waitCount = 0
-267  
-268      if _.isArray( @config.interval )
-269        _timeout = if @config.interval[ @waitCount ]? then @config.interval[ @waitCount ] else _.last( @config.interval )
-270      else
-271        if wait
-272          _timeout = @config.interval
-273        else
-274          _timeout = 0
-275      
-276      @debug "wait", @waitCount, _timeout * 1000
-277      if _timeout >= 0
-278        clearTimeout( @timeout ) if @timeout?
-279        @timeout = _.delay( @interval, _timeout * 1000 )
-280        @waitCount++
-281      else
-282        @interval()
-283      return
-284  
-285    stop: =>
-286      clearTimeout( @timeout ) if @timeout?
-287      return @
-288  
-289  
-290  
+
260  
+261    _send: ( msg, delay )=>
+262      @queue.sendMessage { qname: @queuename, message: msg, delay: delay }, ( err, resp )=>
+263        if err
+264          @error "send pending queue message", err
+265          return
+266        @emit "new", resp
+267        return
+268      return
+269  
+270  
+271  
+ + + +
+
+
+

+ + _runOfflineMessages +

+
+ +
+

RSMQWorker._runOfflineMessages()

+ +

Runn all messages collected by .send() while redis has been offline

+
+
+
API
+
+ private + +
+
+
+ +
278  
+279    _runOfflineMessages: =>
+280      if @offlineQueue.length
+281        _aq = async.queue( ( sndData, cb )=>
+282          @debug "run offline stored message", arguments
+283          @_send( sndData.msg, sndData.delay )
+284          cb()
+285          return
+286        , 3 )
+287        for sndData in @offlineQueue
+288          @debug "queue offline stored message", sndData
+289          _aq.push sndData
+290      return
+291  
+292  
+293  
+ + + +
+
+
+

+ + receive +

+
+ +
+

RSMQWorker.receive( _useInterval )

+ +

Receive a message

+
+
+
Params
+
+ _us + Boolean + Fire a next event to call e new receive on the call of next() +
+
API
+
+ private + +
+
+
+ +
302  
+303    receive: ( _useInterval = false )=>
+304      @debug "start receive"
+305      @queue.receiveMessage { qname: @queuename, vt: @config.invisibletime }, ( err, msg )=>
+306        @debug "received", msg
+307        if err
+308          @emit( "next", true ) if _useInterval
+309          @error "receive queue message", err
+310          return
+311  
+312        if msg?.id
+313          @emit "data", msg
+314          _id = msg.id
+315          _fnNext = ( del = true )=>
+316            @del( _id ) if del
+317            @emit( "next" ) if _useInterval
+318            return
+319          @emit "message", msg.message, _fnNext, _id
+320        else
+321          @emit( "next", true ) if _useInterval
+322        return
+323      return
+324  
+325  
+326  
+ + + +
+
+
+

+ + check +

+
+ +
+

RSMQWorker.check( msg )

+ +

Check if a message has been received to often and has to be deleted

+
+
+
Params
+
+ msg + Object + The raw rsmq message +
+
API
+
+ private + +
+
+
+ +
335  
+336    check: ( msg )=>
+337      if @config.customExceedCheck?( msg )
+338        return
+339  
+340      if msg.rc >= @config.maxReceiveCount
+341        @emit "exceeded", msg
+342        @warning "message received more than #{@config.maxReceiveCount} times. So delete it", msg
+343        @del( msg.id )
+344      return
+345  
+346  
+347  
+ + + +
+
+
+

+ + interval +

+
+ +
+

RSMQWorker.interval()

+ +

call receive the intervall

+
+
+
API
+
+ private + +
+
+
+ +
354  
+355    interval: =>
+356      @debug "run interval"
+357      @receive( true )
+358      return
+359  
+360  
+361  
+ + + +
+
+ + +
+

RSMQWorker.next( [wait] )

+ +

Call the next recieve or wait until the next recieve has to be called

+
+
+
Params
+
+ [wait=false] + Boolean + Tell the next call that the last receive was empty to increase the wait time +
+
API
+
+ private + +
+
+
+ +
370  
+371    next: ( wait = false )=>
+372      if not wait
+373        @waitCount = 0
+374  
+375      if _.isArray( @config.interval )
+376        _timeout = if @config.interval[ @waitCount ]? then @config.interval[ @waitCount ] else _.last( @config.interval )
+377      else
+378        if wait
+379          _timeout = @config.interval
+380        else
+381          _timeout = 0
+382      
+383      @debug "wait", @waitCount, _timeout * 1000
+384      if _timeout >= 0
+385        clearTimeout( @timeout ) if @timeout?
+386        @timeout = _.delay( @interval, _timeout * 1000 )
+387        @waitCount++
+388      else
+389        @interval()
+390      return
+391  
+392  
+393  
+394  
- +

export this class

-
289  
-290  module.exports = rsmqWorker
-291  
+
393  
+394  module.exports = RSMQWorker
+395  
diff --git a/_docs/doc-filelist.js b/_docs/doc-filelist.js index 4d02b10..b02e50c 100644 --- a/_docs/doc-filelist.js +++ b/_docs/doc-filelist.js @@ -1 +1 @@ -var tree={"files":["README.md"],"dirs":{"_src":{"files":["index.coffee"],"dirs":{"lib":{"files":["rsmq-worker.coffee"]},"test":{"files":["example.coffee","main.coffee","sender.coffee","utils.coffee"]}}}}}; \ No newline at end of file +var tree={"files":["README.md"],"dirs":{"_src":{"dirs":{"example":{"files":["example.coffee","sender.coffee"]},"lib":{"files":["rsmq-worker.coffee"]},"test":{"files":["main.coffee","utils.coffee"]}},"files":["index.coffee"]}}}; \ No newline at end of file diff --git a/_src/lib/rsmq-worker.coffee b/_src/lib/rsmq-worker.coffee index b068e21..91d622e 100644 --- a/_src/lib/rsmq-worker.coffee +++ b/_src/lib/rsmq-worker.coffee @@ -1,4 +1,4 @@ -# # rsmqWorker +# # RSMQWorker # ### extends [NPM:MPBasic](https://cdn.rawgit.com/mpneuried/mpbaisc/master/_docs/index.coffee.html) @@ -13,35 +13,35 @@ _ = require("lodash") async = require("async") RSMQ = require("rsmq") -class rsmqWorker extends require( "mpbasic" )() +class RSMQWorker extends require( "mpbasic" )() # ## defaults defaults: => return @extend super, - # **rsmqWorker.interval** *Number[]* An Array of increasing wait times in seconds + # **RSMQWorker.interval** *Number[]* An Array of increasing wait times in seconds interval: [ 0, 1, 5, 10 ] - # **rsmqWorker.maxReceiveCount** *Number* Receive count until a message will be exceeded + # **RSMQWorker.maxReceiveCount** *Number* Receive count until a message will be exceeded maxReceiveCount: 10 - # **rsmqWorker.invisibletime** *Number* A time in seconds to hide a message after it has been received. + # **RSMQWorker.invisibletime** *Number* A time in seconds to hide a message after it has been received. invisibletime: 30 - # **rsmqWorker.autostart** *Boolean* Autostart the worker on init + # **RSMQWorker.autostart** *Boolean* Autostart the worker on init autostart: false - # **rsmqWorker.customExceedCheck** *Function* A custom function, with the message id and content as argument to build a custom exceed check + # **RSMQWorker.customExceedCheck** *Function* A custom function, with the message id and content as argument to build a custom exceed check customExceedCheck: null - # **rsmqWorker.rsmq** *RedisSMQ* A allready existing rsmq instance to use instead of creating a new client + # **RSMQWorker.rsmq** *RedisSMQ* A allready existing rsmq instance to use instead of creating a new client rsmq: null - # **rsmqWorker.redis** *RedisClient* A allready existing redis client instance to use if no `rsmq` instance has been defiend + # **RSMQWorker.redis** *RedisClient* A allready existing redis client instance to use if no `rsmq` instance has been defiend redis: null - # **rsmqWorker.redisPrefix** *String* The redis Prefix for rsmq if no `rsmq` instance has been defiend + # **RSMQWorker.redisPrefix** *String* The redis Prefix for rsmq if no `rsmq` instance has been defiend redisPrefix: "" - # **rsmqWorker.host** *String* Host to connect to redis if no `rsmq` or `redis` instance has been defiend + # **RSMQWorker.host** *String* Host to connect to redis if no `rsmq` or `redis` instance has been defiend host: "localhost" - # **rsmqWorker.host** *Number* Port to connect to redis if no `rsmq` or `redis` instance has been defiend + # **RSMQWorker.host** *Number* Port to connect to redis if no `rsmq` or `redis` instance has been defiend port: 6379 - # **rsmqWorker.options** *Object* Options to connect to redis if no `rsmq` or `redis` instance has been defiend + # **RSMQWorker.options** *Object* Options to connect to redis if no `rsmq` or `redis` instance has been defiend options: {} ### @@ -66,10 +66,90 @@ class rsmqWorker extends require( "mpbasic" )() @debug "config", @config return + ### + ## start + + `RSMQWorker.start()` + + Start the worker + + @return { RedisSMQ } A rsmq instance + + @return { RSMQWorker } The instance itself for chaining. + + @api public + ### + start: => + if @ready + @interval() + return + @on "ready", @interval + return @ + + ### + ## stop + + `RSMQWorker.stop()` + + Stop the worker receiving messages + + @return { RSMQWorker } The instance itself for chaining. + + @api public + ### + stop: => + clearTimeout( @timeout ) if @timeout? + return @ + + ### + ## send + + `RSMQWorker.send( msg [, delay] )` + + Helper/Convinience method to send a new message to the queue. + + @param { String } msg The message content + @param { Number } [delay=0] The message delay to hide this message for the next `x` seconds. + + @return { RSMQWorker } The instance itself for chaining. + + @api public + ### + send: ( msg, delay = 0 )=> + if @queue.connected + @_send( msg, delay ) + else + @debug "store message during redis offline time", msg, delay + @offlineQueue.push( msg: msg, delay: delay ) + return @ + + ### + ## del + + `RSMQWorker.del( id )` + + Delete a messge from queue. This is usually done automatically unless you call `next(false)` + + @param { String } id The rsmq message id + + @return { RSMQWorker } The instance itself for chaining. + + @api public + ### + del: ( id )=> + @queue.deleteMessage qname: @queuename, id: id, ( err, resp )=> + if err + @error "delete queue message", err + return + @debug "delete queue message", resp + @emit( "deleted", id ) + return + return @ + ### ## _initRSMQ - `rsmq-worker._initRSMQ()` + `RSMQWorker._initRSMQ()` Initialize rsmq and handle disconnects @@ -109,7 +189,7 @@ class rsmqWorker extends require( "mpbasic" )() ### ## _getRsmq - `rsmq-worker._getRsmq( [forceInit] )` + `RSMQWorker._getRsmq( [forceInit] )` get or init the rsmq instance @@ -133,11 +213,10 @@ class rsmqWorker extends require( "mpbasic" )() else return new RSMQ( host: @config.host, port: @config.port, options: @config.options, ns: @config.redisPrefix ) - ### ## _initQueue - `rsmq-worker._initQueue()` + `RSMQWorker._initQueue()` check if the given queue exists @@ -167,30 +246,17 @@ class rsmqWorker extends require( "mpbasic" )() return ### - ## start + ## _send - `rsmq-worker.start()` + `RSMQWorker._send( msg, delay )` - Start the worker + Internal send method that directly calls `rsmq.sendMessage()` . - @api public + @param { String } msg The message content + @param { Number } delay The message delay to hide this message for the next `x` seconds. + + @api private ### - start: => - if @ready - @interval() - return - @on "ready", @interval - return @ - - - send: ( msg, delay = 0 )=> - if @queue.connected - @_send( msg, delay ) - else - @debug "store message during redis offline time", msg, delay - @offlineQueue.push( msg: msg, delay: delay ) - return @ - _send: ( msg, delay )=> @queue.sendMessage { qname: @queuename, message: msg, delay: delay }, ( err, resp )=> if err @@ -200,6 +266,15 @@ class rsmqWorker extends require( "mpbasic" )() return return + ### + ## _runOfflineMessages + + `RSMQWorker._runOfflineMessages()` + + Runn all messages collected by `.send()` while redis has been offline + + @api private + ### _runOfflineMessages: => if @offlineQueue.length _aq = async.queue( ( sndData, cb )=> @@ -213,6 +288,17 @@ class rsmqWorker extends require( "mpbasic" )() _aq.push sndData return + ### + ## receive + + `RSMQWorker.receive( _useInterval )` + + Receive a message + + @param { Boolean } _us Fire a `next` event to call e new receive on the call of `next()` + + @api private + ### receive: ( _useInterval = false )=> @debug "start receive" @queue.receiveMessage { qname: @queuename, vt: @config.invisibletime }, ( err, msg )=> @@ -235,16 +321,17 @@ class rsmqWorker extends require( "mpbasic" )() return return - del: ( id )=> - @queue.deleteMessage qname: @queuename, id: id, ( err, resp )=> - if err - @error "delete queue message", err - return - @debug "delete queue message", resp - @emit( "deleted", id ) - return - return @ - + ### + ## check + + `RSMQWorker.check( msg )` + + Check if a message has been received to often and has to be deleted + + @param { Object } msg The raw rsmq message + + @api private + ### check: ( msg )=> if @config.customExceedCheck?( msg ) return @@ -255,11 +342,31 @@ class rsmqWorker extends require( "mpbasic" )() @del( msg.id ) return + ### + ## interval + + `RSMQWorker.interval()` + + call receive the intervall + + @api private + ### interval: => @debug "run interval" @receive( true ) return + ### + ## next + + `RSMQWorker.next( [wait] )` + + Call the next recieve or wait until the next recieve has to be called + + @param { Boolean } [wait=false] Tell the next call that the last receive was empty to increase the wait time + + @api private + ### next: ( wait = false )=> if not wait @waitCount = 0 @@ -281,9 +388,6 @@ class rsmqWorker extends require( "mpbasic" )() @interval() return - stop: => - clearTimeout( @timeout ) if @timeout? - return @ #export this class -module.exports = rsmqWorker \ No newline at end of file +module.exports = RSMQWorker \ No newline at end of file diff --git a/index.js b/index.js index 7989efd..70cf00c 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,5 @@ (function() { - exports.version = '0.0.4'; + exports.version = '0.1.2'; module.exports = require('./lib/rsmq-worker'); diff --git a/package.json b/package.json index aa6f415..f8b491a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rsmq-worker", - "version": "0.1.0", + "version": "0.1.2", "description": "RSMQ helper to simply implement a worker around the message queue", "keywords": [], "homepage": "https://github.com/mpneuried/rsmq-worker",