Skip to content

Commit

Permalink
feat: message bus improvements (#322)
Browse files Browse the repository at this point in the history
* adds onClose and onError callbacks to Message Bus connect
* custom message bus error callbacks are no longer overwritten
* only reject connect promise if never connected (later stage errors are not sent to the reject handler)
* cleanup websocket reference on socket error
* include SDK name, version in HTTP User-Agent for API requests
  • Loading branch information
zebehringer authored Jun 14, 2023
1 parent 9320bc9 commit 512eb82
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 13 deletions.
12 changes: 10 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
## [v5.3.1](http://github.com/ndustrialio/contxt-sdk-js/tree/v4.3.1) (2022-03-17)
## [v5.5.0](http://github.com/ndustrialio/contxt-sdk-js/tree/v5.5.0) (2023-06-14)

**Changed**

- adds onClose and onError callbacks to Message Bus connect
- custom message bus error callbacks are no longer overwritten
- adds app and contxt lib version as API requests User-Agent header

## [v5.4.0](http://github.com/ndustrialio/contxt-sdk-js/tree/v5.4.0) (2022-03-17)

**Changed**

- exposed `peek` operation on message bus channel for a given subscription

## [v5.3.0](http://github.com/ndustrialio/contxt-sdk-js/tree/v4.3.1) (2022-03-17)
## [v5.3.0](http://github.com/ndustrialio/contxt-sdk-js/tree/v5.3.0) (2022-03-17)

**Changed**

Expand Down
38 changes: 31 additions & 7 deletions src/bus/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,27 @@ class Bus {
* If a connection already exists for that organization id, the connection is returned, otherwise a new connection is created and returned.
*
* @param {string} organizationId UUID corresponding with an organization
* @param {function} onClose optional callback to be executed when the connection is closed
* @param {function} onError optional callback to be executed when the connection encounters an error
*
* @returns {Promise}
* @fulfill {WebSocketConnection}
* @reject {errorEvent} The error event
*
* @example
* contxtSdk.bus.connect('4f0e51c6-728b-4892-9863-6d002e61204d')
* contxtSdk.bus.connect('4f0e51c6-728b-4892-9863-6d002e61204d', (orgId, evt) => {
* console.log(`connection closed: ${evt}`);
* }, (orgId, evt) => {
* console.log(`connection error: ${evt}`);
* })
* .then((webSocket) => {
* console.log(webSocket);
* })
* .catch((errorEvent) => {
* console.log(errorEvent);
* });
*/
connect(organizationId) {
connect(organizationId, onClose, onError) {
return new Promise((resolve, reject) => {
if (this._webSockets[organizationId]) {
return resolve(this._webSockets[organizationId]);
Expand Down Expand Up @@ -124,13 +130,31 @@ class Bus {
resolve(this._webSockets[organizationId]);
};

ws.onclose = (event) => {
ws.addEventListener("close", (event) => {
this._webSockets[organizationId] = null;
};
if (onClose) {
try {
onClose(organizationId, event);
} catch (ex) {
console.log('Message Bus Error calling onClose callback: ', ex)
}
}
});

ws.onerror = (errorEvent) => {
reject(errorEvent);
};
ws.addEventListener("error", (errorEvent) => {
const connected = this._webSockets[organizationId] != null;
this._webSockets[organizationId] = null;
if (!connected) {
reject(errorEvent)
}
if (onError) {
try {
onError(organizationId, errorEvent);
} catch (ex) {
console.log('Message Bus Error calling onError callback: ', ex)
}
}
});
})
.catch((err) => {
reject(err);
Expand Down
32 changes: 29 additions & 3 deletions src/bus/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ describe('Bus', function() {
let promise;
let sdk;
let server;
let testOnClose;
let testOnClose2;

beforeEach(function() {
expectedApiToken = faker.internet.password();
Expand All @@ -160,10 +162,16 @@ describe('Bus', function() {
`${expectedHost}/organizations/${expectedOrganization.id}/stream`
);

testOnClose = sinon.stub();
testOnClose2 = sinon.stub();

bus = new Bus(sdk, baseRequest);
bus._baseWebSocketUrl = expectedHost;

promise = bus.connect(expectedOrganization.id);
promise = bus.connect(
expectedOrganization.id,
testOnClose
);
});

afterEach(function() {
Expand Down Expand Up @@ -191,10 +199,19 @@ describe('Bus', function() {
context('when the connection closes', function() {
beforeEach(function() {
return promise.then(() => {
const ws = bus._webSockets[expectedOrganization.id];
ws._webSocket.addEventListener("close", testOnClose2);
server.close();
});
});

it('calls the onClose callback', function() {
return promise.then(() => {
expect(testOnClose).to.have.been.calledOnce;
expect(testOnClose2).to.have.been.calledOnce;
});
});

it('clears out the stored copy of the websocket', function() {
return promise.then(() => {
expect(bus._webSockets[expectedOrganization.id]).to.be.null;
Expand Down Expand Up @@ -237,6 +254,7 @@ describe('Bus', function() {
let promise;
let sdk;
let server;
let testOnError;

beforeEach(function() {
expectedApiToken = faker.internet.password();
Expand All @@ -259,10 +277,16 @@ describe('Bus', function() {
}
);

testOnError = sinon.stub();

const bus = new Bus(sdk, baseRequest);
bus._baseWebSocketUrl = expectedHost;

promise = bus.connect(expectedOrganization.id);
promise = bus.connect(
expectedOrganization.id,
null,
testOnError
);
});

afterEach(function() {
Expand All @@ -281,11 +305,13 @@ describe('Bus', function() {
return expect(promise).to.be.rejected;
});

it('rejects with an error event', function() {
it('rejects with an error event AND onError is called', function() {
return promise.catch((event) => {
expect(event.type).to.equal('error');
expect(testOnError).to.have.been.calledOnce;
});
});

}
);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bus/webSocketConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class WebSocketConnection {
this._autoAck = autoAcknowledge;

if (this._webSocket) {
this._webSocket.onerror = this._onError;
this._webSocket.addEventListener("error", this._onError);
this._webSocket.onmessage = this._onMessage;
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/request.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import axios from 'axios';

function loadSdkInfo() {
try {
const { name: sdkName, version: sdkVersion } = require('../package.json');
return { sdkName, sdkVersion };
} catch (e) {
return { sdkName: "@ndustrial/contxt-sdk", sdkVersion: "5.5.0+" };
}
}

const { sdkName, sdkVersion } = loadSdkInfo();
class Request {
/**
* @param {Object} sdk An instance of the SDK so the module can communicate with other modules
Expand Down Expand Up @@ -141,6 +151,7 @@ class Request {
.getCurrentApiToken(this._audienceName)
.then((apiToken) => {
config.headers.common.Authorization = `Bearer ${apiToken}`;
config.headers.common['User-Agent'] = `${process.env.npm_package_name}/${process.env.npm_package_version} (${sdkName}/${sdkVersion})`;

return config;
});
Expand Down

0 comments on commit 512eb82

Please sign in to comment.