Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kokeksibir committed Feb 19, 2015
0 parents commit 74dcfd8
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 0 deletions.
92 changes: 92 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# beygir
Simple worker queue implemention with combination of redis pub-sub and worker-farm.

beygir depends on [Worker Farm](https://github.com/rvagg/node-worker-farm), a very simple useful and stable child process controller package. (Available in npm with name node-worker-farm). On start up beygir subscribes to redis pubsub channels that will enqueue tasks to workers.

## Installation
beygir is available in npm

```bash
$ npm install beygir
```

## Quick Start

Instantiate beygir by requiring package

```js
var beygir = require('beygir');
```

Then start service by calling start with configuration object

```js
beygir.start({
//keys in configuration is used as Redis pubsub channel name
'worker_queue' : {
/* worker property contains path to js file to be run in
* child process see next section for worker function */
worker: require.resolve('path/for/worker/task/js_file'),

callback: function some_function(error, result){
/* this function is provided as last argument to worker
* function */
},

options: {
/* options passed to worker-farm function directly */
/* see worker-farm for details */
}
},
'task_*': {
/* to subscribe to a pattern for channel name you need to
* set pattern property to true */
pattern: true,

worker: require.resolve('another/worker/task/js_file')

/* callback and options properties are not required */
}
})
```

If you need to trigger a worker manually, you can call

```js
beygir.trigger('worker_queue')
```

If you need to terminate a queue

```js
beygir.end('worker_queue')
```

Finally to shutdown beygir, call end method without arguments.

```js
beygir.end()
```

### Worker Files
Worker file should export a function to be run in child process with signature below

```js
module.exports = function worker(input, channel, callback) {
/*
*
*/
callback();
}
```

**input** argument contains the message received from Redis pubsub

**channel** argument contains the channel name that message is received from

**callback** is the function provided with start method





71 changes: 71 additions & 0 deletions main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
var redis = require('redis'),
workerFarm = require('worker-farm'),
_ = require('lodash'),
events = require('events'),
http = require('http'),
listener = redis.createClient(),
broadcaster = redis.createClient(),
initialized = false,
workers = {}, callbacks = {};

var beygir = {
start: function(config) {
if(initialized) {
throw new Error();
}
_.forEach(config, function(item, name){
if(item.pattern) {
listener.psubscribe(name);
}
else {
listener.subscribe(name);
}
workers[name] = workerFarm(item.options, item.worker);
callbacks[name] = _.isFunction(item.callback) ? item.callback : function(err){if(err) console.log(err.message);};
});

listener.on('message', function(channel, message){
if(_.isFunction(workers[channel])) {
workers[channel](JSON.parse(message), channel, callbacks[channel]);
}
});
listener.on('pmessage', function(pattern, channel, message){
if(_.isFunction(workers[pattern])) {
workers[pattern](JSON.parse(message), channel, callbacks[pattern]);
}
});

initialized = true;
return initialized;
},
end: function(name){
if(name && workers.hasOwnProperty(name)) {
workerFarm.end(workers[name]);
return;
}
else {
_.forOwn(workers, function(worker) {
workerFarm.end(worker);
});
workers = {}; callbacks = {};
listener.removeAllListeners('message');
listener.removeAllListeners('pmessage');
initialized = false;
}
},
createServer: function(config) {
if(initialized === false)
beygir.start(config);
setInterval(function() {}, 3000);
},
enqueue: function(name, message) {
broadcaster.publish(name, JSON.stringify(message));
},
trigger: function(name, message) {
if(name && workers.hasOwnProperty(name)) {
workers[name](message, name, callbacks[name]);
}
}
};

module.exports = beygir;
36 changes: 36 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "beygir",
"version": "0.2.0",
"description": "Simple worker queue implemention with combination of redis pub-sub and worker-farm",
"keywords": [
"worker queue",
"redis worker",
"node task queue",
"nodejs worker queue"
],
"homepage": "https://github.com/ytlabs/beygir",
"main": "main.js",
"scripts": {
"test": "mocha"
},
"repository": {
"type": "git",
"url": "git://github.com/ytlabs/beygir.git"
},
"bugs": {
"url": "https://github.com/ytlabs/beygir/issues"
},
"license": "MIT",
"author": {
"name": "Kamil Sulubulut",
"url": "http://github.com/kokeksibir/"
},
"dependencies": {
"lodash": "^2.4.1",
"redis": "^0.12.1",
"worker-farm": "^1.0.1"
},
"devDependencies": {
"mocha": "^2.0.1"
}
}
18 changes: 18 additions & 0 deletions sample_server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var beygir = require('./main');

beygir.start({
'log': {
worker: require.resolve('./test/test_worker'),
callback: function(err, res) {
if(err) throw Error(err);
console.log(res);
}
}
})

var counter = 10;
setInterval(function(){
console.log('send');
beygir.enqueue('log', {test: counter});
counter ++;
}, 1000);
42 changes: 42 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
var redisClient = require('redis').createClient(),
assert = require("assert"),
beygir = require('../');


describe('Start beygir with log event and test_worker', function(){
it('start should return true', function(done){
assert.equal(true,
//beygir started here
beygir.start({
'log': {
worker: require.resolve('./test_worker'),
callback: function(err, res) {
if(err) throw Error(err);
console.log(res);
}
},
'plog:*': {
pattern: true,
worker: require.resolve('./test_worker'),
callback: function(err, res) {
if(err) throw Error(err);
console.log(res);
}
}
})
);
done();
});
it('Send new log event through redis', function(done){
beygir.enqueue('log', 'TEST');
setTimeout(done, 100);
});
it('Send new log event through redis', function(done){
beygir.enqueue('plog:12', 'TEST');
setTimeout(done, 100);
});
it('Send new log event through redis', function(done){
beygir.enqueue('plog:15', 'TEST');
setTimeout(done, 100);
});
});
4 changes: 4 additions & 0 deletions test/test_worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = function(input, trigger, callback) {
console.log(input);
if(callback) callback(null, 'OK for '+trigger);
}

0 comments on commit 74dcfd8

Please sign in to comment.