From 74dcfd88c2752870bbe26f1a471a2d7959a4885c Mon Sep 17 00:00:00 2001 From: Mehmet Kamil Sulubulut Date: Thu, 19 Feb 2015 16:27:53 +0200 Subject: [PATCH] first commit --- README.md | 92 +++++++++++++++++++++++++++++++++++++++++++++ main.js | 71 ++++++++++++++++++++++++++++++++++ package.json | 36 ++++++++++++++++++ sample_server.js | 18 +++++++++ test/test.js | 42 +++++++++++++++++++++ test/test_worker.js | 4 ++ 6 files changed, 263 insertions(+) create mode 100644 README.md create mode 100644 main.js create mode 100644 package.json create mode 100644 sample_server.js create mode 100644 test/test.js create mode 100644 test/test_worker.js diff --git a/README.md b/README.md new file mode 100644 index 0000000..d900db8 --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +# beygir +Simple worker queue implemention with combination of redis pub-sub and worker-farm. + +beygir depends on [Worker Farm](https://github.com/rvagg/node-worker-farm), a very simple useful and stable child process controller package. (Available in npm with name node-worker-farm). On start up beygir subscribes to redis pubsub channels that will enqueue tasks to workers. + +## Installation +beygir is available in npm + +```bash +$ npm install beygir +``` + +## Quick Start + +Instantiate beygir by requiring package + +```js +var beygir = require('beygir'); +``` + +Then start service by calling start with configuration object + +```js +beygir.start({ + //keys in configuration is used as Redis pubsub channel name + 'worker_queue' : { + /* worker property contains path to js file to be run in + * child process see next section for worker function */ + worker: require.resolve('path/for/worker/task/js_file'), + + callback: function some_function(error, result){ + /* this function is provided as last argument to worker + * function */ + }, + + options: { + /* options passed to worker-farm function directly */ + /* see worker-farm for details */ + } + }, + 'task_*': { + /* to subscribe to a pattern for channel name you need to + * set pattern property to true */ + pattern: true, + + worker: require.resolve('another/worker/task/js_file') + + /* callback and options properties are not required */ + } +}) +``` + +If you need to trigger a worker manually, you can call + +```js +beygir.trigger('worker_queue') +``` + +If you need to terminate a queue + +```js +beygir.end('worker_queue') +``` + +Finally to shutdown beygir, call end method without arguments. + +```js +beygir.end() +``` + +### Worker Files +Worker file should export a function to be run in child process with signature below + +```js +module.exports = function worker(input, channel, callback) { + /* + * + */ + callback(); +} +``` + +**input** argument contains the message received from Redis pubsub + +**channel** argument contains the channel name that message is received from + +**callback** is the function provided with start method + + + + + diff --git a/main.js b/main.js new file mode 100644 index 0000000..9324fb5 --- /dev/null +++ b/main.js @@ -0,0 +1,71 @@ +var redis = require('redis'), + workerFarm = require('worker-farm'), + _ = require('lodash'), + events = require('events'), + http = require('http'), + listener = redis.createClient(), + broadcaster = redis.createClient(), + initialized = false, + workers = {}, callbacks = {}; + +var beygir = { + start: function(config) { + if(initialized) { + throw new Error(); + } + _.forEach(config, function(item, name){ + if(item.pattern) { + listener.psubscribe(name); + } + else { + listener.subscribe(name); + } + workers[name] = workerFarm(item.options, item.worker); + callbacks[name] = _.isFunction(item.callback) ? item.callback : function(err){if(err) console.log(err.message);}; + }); + + listener.on('message', function(channel, message){ + if(_.isFunction(workers[channel])) { + workers[channel](JSON.parse(message), channel, callbacks[channel]); + } + }); + listener.on('pmessage', function(pattern, channel, message){ + if(_.isFunction(workers[pattern])) { + workers[pattern](JSON.parse(message), channel, callbacks[pattern]); + } + }); + + initialized = true; + return initialized; + }, + end: function(name){ + if(name && workers.hasOwnProperty(name)) { + workerFarm.end(workers[name]); + return; + } + else { + _.forOwn(workers, function(worker) { + workerFarm.end(worker); + }); + workers = {}; callbacks = {}; + listener.removeAllListeners('message'); + listener.removeAllListeners('pmessage'); + initialized = false; + } + }, + createServer: function(config) { + if(initialized === false) + beygir.start(config); + setInterval(function() {}, 3000); + }, + enqueue: function(name, message) { + broadcaster.publish(name, JSON.stringify(message)); + }, + trigger: function(name, message) { + if(name && workers.hasOwnProperty(name)) { + workers[name](message, name, callbacks[name]); + } + } +}; + +module.exports = beygir; \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..68bd27a --- /dev/null +++ b/package.json @@ -0,0 +1,36 @@ +{ + "name": "beygir", + "version": "0.2.0", + "description": "Simple worker queue implemention with combination of redis pub-sub and worker-farm", + "keywords": [ + "worker queue", + "redis worker", + "node task queue", + "nodejs worker queue" + ], + "homepage": "https://github.com/ytlabs/beygir", + "main": "main.js", + "scripts": { + "test": "mocha" + }, + "repository": { + "type": "git", + "url": "git://github.com/ytlabs/beygir.git" + }, + "bugs": { + "url": "https://github.com/ytlabs/beygir/issues" + }, + "license": "MIT", + "author": { + "name": "Kamil Sulubulut", + "url": "http://github.com/kokeksibir/" + }, + "dependencies": { + "lodash": "^2.4.1", + "redis": "^0.12.1", + "worker-farm": "^1.0.1" + }, + "devDependencies": { + "mocha": "^2.0.1" + } +} diff --git a/sample_server.js b/sample_server.js new file mode 100644 index 0000000..a6661ba --- /dev/null +++ b/sample_server.js @@ -0,0 +1,18 @@ +var beygir = require('./main'); + +beygir.start({ + 'log': { + worker: require.resolve('./test/test_worker'), + callback: function(err, res) { + if(err) throw Error(err); + console.log(res); + } + } +}) + +var counter = 10; +setInterval(function(){ + console.log('send'); + beygir.enqueue('log', {test: counter}); + counter ++; +}, 1000); diff --git a/test/test.js b/test/test.js new file mode 100644 index 0000000..f673323 --- /dev/null +++ b/test/test.js @@ -0,0 +1,42 @@ +var redisClient = require('redis').createClient(), + assert = require("assert"), + beygir = require('../'); + + +describe('Start beygir with log event and test_worker', function(){ + it('start should return true', function(done){ + assert.equal(true, + //beygir started here + beygir.start({ + 'log': { + worker: require.resolve('./test_worker'), + callback: function(err, res) { + if(err) throw Error(err); + console.log(res); + } + }, + 'plog:*': { + pattern: true, + worker: require.resolve('./test_worker'), + callback: function(err, res) { + if(err) throw Error(err); + console.log(res); + } + } + }) + ); + done(); + }); + it('Send new log event through redis', function(done){ + beygir.enqueue('log', 'TEST'); + setTimeout(done, 100); + }); + it('Send new log event through redis', function(done){ + beygir.enqueue('plog:12', 'TEST'); + setTimeout(done, 100); + }); + it('Send new log event through redis', function(done){ + beygir.enqueue('plog:15', 'TEST'); + setTimeout(done, 100); + }); +}); \ No newline at end of file diff --git a/test/test_worker.js b/test/test_worker.js new file mode 100644 index 0000000..de3d7ce --- /dev/null +++ b/test/test_worker.js @@ -0,0 +1,4 @@ +module.exports = function(input, trigger, callback) { + console.log(input); + if(callback) callback(null, 'OK for '+trigger); +} \ No newline at end of file