Skip to content

Commit

Permalink
script should be able to call the callback multiple times in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
bjrmatos committed May 1, 2019
1 parent 6a5bf73 commit 4facdd6
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 11 deletions.
1 change: 1 addition & 0 deletions lib/manager-processes.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ ScriptsManager.prototype.execute = function (inputs, options, cb) {

worker.send(messageHandler.serialize({
action: 'callback-response',
cid: m.cid,
params: args
}))
})
Expand Down
4 changes: 3 additions & 1 deletion lib/manager-servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

var childProcess = require('child_process')
var path = require('path')
var uuid = require('uuid').v1
var uuid = require('uuid').v4
var request = require('request')
var cluster = require('cluster')
var netCluster = require('net-cluster')
Expand Down Expand Up @@ -119,8 +119,10 @@ ScriptsManager.prototype.start = function (cb) {
if (args.length && args[0]) {
args[0] = args[0].message
}

self.workersCluster.send(messageHandler.serialize({
action: 'callback-response',
cid: m.cid,
rid: m.rid,
params: args
}))
Expand Down
14 changes: 12 additions & 2 deletions lib/worker-processes.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
var uuid = require('uuid').v4
var messageHandler = require('./messageHandler')

process.on('uncaughtException', function (err) {
Expand All @@ -9,14 +10,19 @@ process.on('uncaughtException', function (err) {
process.exit()
})

var cb
var cbs = {}

function callback () {
cb = arguments[arguments.length - 1]
var cid = uuid()

cbs[cid] = arguments[arguments.length - 1]

var args = Array.prototype.slice.call(arguments)
args.pop()

process.send(messageHandler.serialize({
action: 'callback',
cid: cid,
pid: process.pid,
params: args.sort()
}))
Expand Down Expand Up @@ -50,6 +56,10 @@ process.on('message', function (rawM) {
}
}

var cb = cbs[m.cid]

delete cbs[m.cid]

return cb.apply(this, m.params)
}

Expand Down
30 changes: 22 additions & 8 deletions lib/worker-servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
var cluster = require('cluster')
// eslint-disable-next-line
var domain = require('domain')
var uuid = require('uuid').v4
var messageHandler = require('./messageHandler')

var workers = []
Expand Down Expand Up @@ -224,19 +225,31 @@ function processRequest (req, res) {
}))

try {
var cbs = {}

var callback = function () {
var cb = arguments[arguments.length - 1]
var cid = uuid()

cbs[cid] = arguments[arguments.length - 1]

callbackRequests[req.body.options.rid] = function (m) {
if (m.params.length) {
if (m.params[0]) {
m.params[0] = new Error(m.params[0])
if (!callbackRequests[req.body.options.rid]) {
callbackRequests[req.body.options.rid] = function (m) {
if (m.params.length) {
if (m.params[0]) {
m.params[0] = new Error(m.params[0])
}
}
}

cb.apply(this, m.params)
var cb = cbs[m.cid]

delete cbs[m.cid]

delete callbackRequests[req.body.options.rid]
cb.apply(this, m.params)

if (Object.keys(cbs).length === 0) {
delete callbackRequests[req.body.options.rid]
}
}
}

var args = Array.prototype.slice.call(arguments)
Expand All @@ -245,6 +258,7 @@ function processRequest (req, res) {

process.send(messageHandler.serialize({
action: 'callback',
cid: cid,
rid: req.body.options.rid,
pid: process.pid,
params: args.sort()
Expand Down
14 changes: 14 additions & 0 deletions test/scripts/parallelCallbackCalls.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
var util = require('util')

module.exports = function (inputs, callback, done) {
var promises = []

var callbackAsync = util.promisify(callback)

promises.push(callbackAsync(`${inputs.name} Matos`))
promises.push(callbackAsync(`${inputs.name} Morillo`))

Promise.all(promises).then(function (result) {
done(null, result)
}).catch(done)
}
22 changes: 22 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,28 @@ describe('scripts manager', function () {
})
})

it('should be able to handle parallel callback calls', function (done) {
var callback = function (name, cb) {
cb(null, 'hi ' + name)
}

scriptsManager.execute({
name: 'Boris'
}, {
execModulePath: path.join(__dirname, 'scripts', 'parallelCallbackCalls.js'),
callback: callback
}, function (err, res) {
if (err) {
return done(err)
}

res[0].should.be.eql('hi Boris Matos')
res[1].should.be.eql('hi Boris Morillo')

done()
})
})

it('should be able to customize message when timeout error', function (done) {
scriptsManager.execute({ foo: 'foo' }, {
execModulePath: path.join(__dirname, 'scripts', 'timeout.js'),
Expand Down

0 comments on commit 4facdd6

Please sign in to comment.