Skip to content

Commit

Permalink
Add ioRedis support
Browse files Browse the repository at this point in the history
  • Loading branch information
itaylor committed Dec 6, 2019
1 parent 8e29daa commit 504f8f1
Show file tree
Hide file tree
Showing 6 changed files with 2,436 additions and 781 deletions.
41 changes: 37 additions & 4 deletions _src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,24 @@ class RedisSMQ extends EventEmitter {
this.realtime = opts.realtime;
this.redisns = opts.ns + ":";

if (opts.client && options.client.constructor.name === "RedisClient") {
this.redis = opts.client
if (opts.client) {
if (opts.client.constructor.name === "Redis") {
this.redis = opts.client
this.isIoRedis = true
} else if (opts.client.constructor.name === "RedisClient") {
this.redis = opts.client
}
}
else {
this.redis = RedisInst.createClient(opts)
}

this.connected = this.redis.connected || false;
if (this.isIoRedis) {
this.connected = this.redis.status === 'ready';
}
else {
this.connected = this.redis.connected || false;
}

// If external client is used it might alrdy be connected. So we check here:
if (this.connected) {
Expand Down Expand Up @@ -122,14 +132,15 @@ class RedisSMQ extends EventEmitter {
["time"]
];
this.redis.multi(mc).exec( (err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp)
if (err) { this._handleError(cb, err); return; }
if (resp[0][0] === null || resp[0][1] === null || resp[0][2] === null) {
this._handleError(cb, "queueNotFound");
return;
}
// Make sure to always have correct 6digit millionth seconds from redis
const ms: any = this._formatZeroPad(Number(resp[1][1]), 6);
// Create the epoch time in ms from the redis timestamp
// Create the epoch time in ms from the redis timestamp
const ts = Number(resp[1][0] + ms.toString(10).slice(0, 3));

const q: any = {
Expand Down Expand Up @@ -207,6 +218,7 @@ class RedisSMQ extends EventEmitter {
];

this.redis.multi(mc).exec( (err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp)
if (err) {
this._handleError(cb, err);
return
Expand Down Expand Up @@ -238,6 +250,7 @@ class RedisSMQ extends EventEmitter {
];

this.redis.multi(mc).exec( (err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp)
if (err) { this._handleError(cb, err); return; }
if (resp[0] === 1 && resp[1] > 0) {
cb(null, 1)
Expand All @@ -259,6 +272,7 @@ class RedisSMQ extends EventEmitter {
];

this.redis.multi(mc).exec( (err,resp) => {
resp = this._ioRedisMultiToRedisMulti(resp)
if (err) { this._handleError(cb, err); return; }
if (resp[0] === 0) {
this._handleError(cb, "queueNotFound");
Expand All @@ -285,6 +299,7 @@ class RedisSMQ extends EventEmitter {
["zcount", key, resp[0] + "000", "+inf"]
];
this.redis.multi(mc).exec( (err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp)
if (err) {
this._handleError(cb, err);
return;
Expand Down Expand Up @@ -530,6 +545,7 @@ class RedisSMQ extends EventEmitter {
mc.push(["zcard", key]);
}
this.redis.multi(mc).exec( (err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp)
if (err) {
this._handleError(cb, err);
return;
Expand Down Expand Up @@ -586,9 +602,26 @@ class RedisSMQ extends EventEmitter {
}

// Helpers

private _ioRedisMultiToRedisMulti (multiResult) {
if (this.isIoRedis) {
return multiResult.map((r) => {
const err = r[0]
const val = r[1]
if (err) {
throw err
}
return val
})
}
return multiResult
}


private _formatZeroPad (num, count) {
return ((Math.pow(10, count) + num) + "").substr(1);
}


private _handleError = (cb, err, data = {}) => {
// try to create a error Object with humanized message
Expand Down
36 changes: 33 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class RedisSMQ extends EventEmitter {
["time"]
];
this.redis.multi(mc).exec((err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp);
if (err) {
this._handleError(cb, err);
return;
Expand Down Expand Up @@ -98,6 +99,7 @@ class RedisSMQ extends EventEmitter {
["hsetnx", key, "modified", resp[0]],
];
this.redis.multi(mc).exec((err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp);
if (err) {
this._handleError(cb, err);
return;
Expand Down Expand Up @@ -125,6 +127,7 @@ class RedisSMQ extends EventEmitter {
["hdel", `${key}:Q`, `${options.id}`, `${options.id}:rc`, `${options.id}:fr`]
];
this.redis.multi(mc).exec((err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp);
if (err) {
this._handleError(cb, err);
return;
Expand All @@ -146,6 +149,7 @@ class RedisSMQ extends EventEmitter {
["srem", `${this.redisns}QUEUES`, options.qname]
];
this.redis.multi(mc).exec((err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp);
if (err) {
this._handleError(cb, err);
return;
Expand All @@ -172,6 +176,7 @@ class RedisSMQ extends EventEmitter {
["zcount", key, resp[0] + "000", "+inf"]
];
this.redis.multi(mc).exec((err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp);
if (err) {
this._handleError(cb, err);
return;
Expand Down Expand Up @@ -362,6 +367,7 @@ class RedisSMQ extends EventEmitter {
mc.push(["zcard", key]);
}
this.redis.multi(mc).exec((err, resp) => {
resp = this._ioRedisMultiToRedisMulti(resp);
if (err) {
this._handleError(cb, err);
return;
Expand Down Expand Up @@ -513,13 +519,24 @@ class RedisSMQ extends EventEmitter {
opts.options.port = opts.port;
this.realtime = opts.realtime;
this.redisns = opts.ns + ":";
if (opts.client && options.client.constructor.name === "RedisClient") {
this.redis = opts.client;
if (opts.client) {
if (opts.client.constructor.name === "Redis") {
this.redis = opts.client;
this.isIoRedis = true;
}
else if (opts.client.constructor.name === "RedisClient") {
this.redis = opts.client;
}
}
else {
this.redis = RedisInst.createClient(opts);
}
this.connected = this.redis.connected || false;
if (this.isIoRedis) {
this.connected = this.redis.status === 'ready';
}
else {
this.connected = this.redis.connected || false;
}
if (this.connected) {
this.emit("connect");
this.initScript();
Expand All @@ -541,6 +558,19 @@ class RedisSMQ extends EventEmitter {
});
this._initErrors();
}
_ioRedisMultiToRedisMulti(multiResult) {
if (this.isIoRedis) {
return multiResult.map((r) => {
const err = r[0];
const val = r[1];
if (err) {
throw err;
}
return val;
});
}
return multiResult;
}
_formatZeroPad(num, count) {
return ((Math.pow(10, count) + num) + "").substr(1);
}
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
},
"scripts": {
"build": "tsc",
"build:tests": "coffee -c ./",
"watch": "tsc -w",
"test": "mocha ./test/test.js"
"test": "mocha ./test/*.js"
},
"dependencies": {
"@types/redis": "^2.8.0",
Expand All @@ -20,6 +21,7 @@
"devDependencies": {
"async": "^2.6.2",
"coffeescript": "^2.4.1",
"ioredis": "^4.14.1",
"mocha": "^4.0.1",
"should": "^13.1.3",
"ts-loader": "^5.4.3",
Expand Down
Loading

0 comments on commit 504f8f1

Please sign in to comment.