Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for millisecond precision on vt and delay #116

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion _src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,18 @@ class RedisSMQ extends EventEmitter {
this.redis.time( (err, resp) => {
if (err) { this._handleError(cb, err); return; }

// Make sure to always have correct 6digit millionth seconds from redis
const ms: any = this._formatZeroPad(Number(resp[1]), 6);
// Create the epoch time in ms from the redis timestamp
const ts = Number(resp[0] + ms.toString(10).slice(0, 3));

// Get basic attributes and counter
// Get total number of messages
// Get total number of messages in flight (not visible yet)
const mc = [
["hmget", `${key}:Q`, "vt", "delay", "maxsize", "totalrecv", "totalsent", "created", "modified"],
["zcard", key],
["zcount", key, resp[0] + "000", "+inf"]
["zcount", key, ts + 1, "+inf"]
];
this.redis.multi(mc).exec( (err, resp) => {
if (err) {
Expand Down Expand Up @@ -646,7 +651,12 @@ class RedisSMQ extends EventEmitter {
break;
case "vt":
case "delay":
// Allow float values and increase precision to milliseconds
o[item] = parseFloat(o[item]) * 1000;
// Remove any excessive decimal information
o[item] = parseInt(o[item],10);
// Reduce value back to seconds with correct decimal information
o[item] = o[item] / 1000;
if (_.isNaN(o[item]) || !_.isNumber(o[item]) || o[item] < 0 || o[item] > 9999999) {
this._handleError(cb, "invalidValue", {item: item, min: 0, max: 9999999});
return false;
Expand Down
6 changes: 5 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ class RedisSMQ extends EventEmitter {
this._handleError(cb, err);
return;
}
const ms = this._formatZeroPad(Number(resp[1]), 6);
const ts = Number(resp[0] + ms.toString(10).slice(0, 3));
const mc = [
["hmget", `${key}:Q`, "vt", "delay", "maxsize", "totalrecv", "totalsent", "created", "modified"],
["zcard", key],
["zcount", key, resp[0] + "000", "+inf"]
["zcount", key, ts + 1, "+inf"]
];
this.redis.multi(mc).exec((err, resp) => {
if (err) {
Expand Down Expand Up @@ -455,7 +457,9 @@ class RedisSMQ extends EventEmitter {
break;
case "vt":
case "delay":
o[item] = parseFloat(o[item]) * 1000;
o[item] = parseInt(o[item], 10);
o[item] = o[item] / 1000;
if (_.isNaN(o[item]) || !_.isNumber(o[item]) || o[item] < 0 || o[item] > 9999999) {
this._handleError(cb, "invalidValue", { item: item, min: 0, max: 9999999 });
return false;
Expand Down
62 changes: 61 additions & 1 deletion test/test.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ describe 'Redis-Simple-Message-Queue Test', ->
name: "test3promises"
m1: "Hello"
m2: "World"
queue4 =
name: "test4precision"

q1m1 = null
q1m2 = null
Expand All @@ -48,6 +50,9 @@ describe 'Redis-Simple-Message-Queue Test', ->

rsmq.deleteQueue {qname: queue2.name}, (err) ->
return

rsmq.deleteQueue {qname: queue4.name}, (err) ->
return
@timeout(100)
console.log("Disconnecting Redis")
rsmq.quit()
Expand Down Expand Up @@ -77,6 +82,9 @@ describe 'Redis-Simple-Message-Queue Test', ->
rsmq.deleteQueue {qname: queue3.name}, (err) ->
return

rsmq.deleteQueue {qname: queue4.name}, (err) ->
return

setTimeout(done, 100)
return

Expand Down Expand Up @@ -241,7 +249,6 @@ describe 'Redis-Simple-Message-Queue Test', ->
return
return


it 'ListQueues: Should return array with two elements', (done) ->
rsmq.listQueues (err, resp) ->
should.not.exist(err)
Expand All @@ -252,6 +259,25 @@ describe 'Redis-Simple-Message-Queue Test', ->
return
return

it 'Create a new queue: queue4', (done) ->
rsmq.createQueue {qname: queue4.name, vt: 0.2}, (err, resp) ->
should.not.exist(err)
resp.should.equal(1)
done()
return
return

it 'ListQueues: Should return array with three elements', (done) ->
rsmq.listQueues (err, resp) ->
should.not.exist(err)
resp.length.should.equal(3)
resp.should.containEql(queue1.name)
resp.should.containEql(queue2.name)
resp.should.containEql(queue4.name)
done()
return
return

it 'Should succeed: GetQueueAttributes of queue 1', (done) ->
rsmq.getQueueAttributes {qname: queue1.name}, (err, resp) ->
should.not.exist(err)
Expand Down Expand Up @@ -647,6 +673,40 @@ describe 'Redis-Simple-Message-Queue Test', ->
return
return
return

it 'Should send a message to queue4', (done) ->
rsmq.sendMessage {qname: queue4.name , delay: 0, message: 'test'}, (err, resp) ->
should.not.exist(err)
done()
return
return

it 'Should receive a message to queue4 with vt 200ms', (done) ->
rsmq.receiveMessage {qname: queue4.name , vt: 0.2}, (err, resp) ->
should.not.exist(err)
resp.message.should.equal('test')
done()
return
return

it 'getQueueAttributes: Should return the queue4 with 1 hiddenmsgs', (done) ->
rsmq.getQueueAttributes {qname: queue4.name}, (err, resp) ->
resp.hiddenmsgs.should.equal(1)
resp.msgs.should.equal(1)
done()
return
return

it 'wait 200ms', (done) -> setTimeout(done, 200)

it 'getQueueAttributes: Should return the queue4 with 0 hiddenmsgs', (done) ->
rsmq.getQueueAttributes {qname: queue4.name}, (err, resp) ->
resp.hiddenmsgs.should.equal(0)
resp.msgs.should.equal(1)
done()
return
return

return

describe 'Realtime Pub/Sub notifications', ->
Expand Down
Loading