diff --git a/index.js b/index.js index 6bc5ee3..c079bed 100644 --- a/index.js +++ b/index.js @@ -16,7 +16,6 @@ const rclnodejs = require('rclnodejs'); const {Server} = require('ws'); -const NodeManager = require('./lib/node_manager.js'); const Bridge = require('./lib/bridge.js'); const debug = require('debug')('ros2-web-bridge:index'); @@ -27,7 +26,6 @@ function createServer(options) { return rclnodejs.init().then(() => { let node = rclnodejs.createNode('ros2_web_bridge'); - let nodeManager = new NodeManager(node); let bridgeMap = new Map(); function closeAllBridges() { @@ -37,7 +35,7 @@ function createServer(options) { } server.on('connection', (ws) => { - let bridge = new Bridge(nodeManager, ws); + let bridge = new Bridge(node, ws); bridgeMap.set(bridge.bridgeId, bridge); bridge.on('error', (error) => { diff --git a/lib/bridge.js b/lib/bridge.js index 411c367..7f4e33c 100644 --- a/lib/bridge.js +++ b/lib/bridge.js @@ -15,6 +15,7 @@ 'use strict'; const rclnodejs = require('rclnodejs'); +const ResourceProvider = require('./resource_provider.js'); const debug = require('debug')('ros2-web-bridge:Bridge'); const EventEmitter = require('events'); const uuidv4 = require('uuid/v4'); @@ -64,17 +65,15 @@ class MessageParser { } class Bridge extends EventEmitter { - constructor(nodeManager, ws) { + constructor(node, ws) { super(); - this._nodeManager = nodeManager; this._ws = ws; this._parser = new MessageParser(); this._bridgeId = this._generateRandomId(); this._servicesResponse = new Map(); this._closed = false; - + this._resourceProvider = new ResourceProvider(node, this._bridgeId); this._registerConnectionEvent(ws); - this._rebuildOpMap(); } @@ -98,7 +97,7 @@ class Bridge extends EventEmitter { close() { if (!this._closed) { - this._nodeManager.cleanResourceByBridgeId(this._bridgeId); + this._resourceProvider.clean(); this._servicesResponse.clear(); this._closed = true; } @@ -146,18 +145,18 @@ class Bridge extends EventEmitter { _rebuildOpMap() { this._registerOpMap('advertise', (command) => { debug(`advertise a topic: ${command.topic}`); - this._nodeManager.createPublisher(this._exractMessageType(command.type), command.topic, this._bridgeId); + this._resourceProvider.createPublisher(this._exractMessageType(command.type), command.topic); }); this._registerOpMap('unadvertise', (command) => { debug(`unadvertise a topic: ${command.topic}`); - this._nodeManager.destroyPublisher(command.topic, this._bridgeId); + this._resourceProvider.destroyPublisher(command.topic); }); this._registerOpMap('publish', (command) => { debug(`Publish a topic named ${command.topic} with ${JSON.stringify(command.msg)}`); - let publisher = this._nodeManager.getPublisherByTopic(command.topic, this._bridgeId); + let publisher = this._resourceProvider.getPublisherByTopicName(command.topic); if (publisher) { publisher.publish(command.msg); } @@ -166,21 +165,20 @@ class Bridge extends EventEmitter { this._registerOpMap('subscribe', (command) => { debug(`subscribe a topic named ${command.topic}`); - this._nodeManager.createSubscription(this._exractMessageType(command.type), - command.topic, - this._bridgeId, - this._sendSubscriptionResponse.bind(this)); + this._resourceProvider.createSubscription(this._exractMessageType(command.type), + command.topic, + this._sendSubscriptionResponse.bind(this)); }); this._registerOpMap('unsubscribe', (command) => { debug(`unsubscribe a topic named ${command.topic}`); - this._nodeManager.destroySubscription(command.topic, this._bridgeId); + this._resourceProvider.destroySubscription(command.topic); }); this._registerOpMap('call_service', (command) => { let serviceName = command.service; let client = - this._nodeManager.createClient(this._exractServiceType(command.args.type), serviceName, this._bridgeId); + this._resourceProvider.createClient(this._exractServiceType(command.args.type), serviceName); if (client) { client.sendRequest(command.args.request, (response) => { @@ -194,9 +192,9 @@ class Bridge extends EventEmitter { this._registerOpMap('advertise_service', (command) => { let serviceName = command.service; - let service = this._nodeManager.createService( + let service = this._resourceProvider.createService( this._exractServiceType(command.type), - serviceName, this._bridgeId, + serviceName, (request, response) => { let id = this._generateRandomId(); let serviceRequest = {op: 'call_service', service: command.service, args: request, id: id}; diff --git a/lib/node_manager.js b/lib/node_manager.js deleted file mode 100644 index 26b55da..0000000 --- a/lib/node_manager.js +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright (c) 2017 Intel Corporation. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -'use strict'; - -const rclnodejs = require('rclnodejs'); -const debug = require('debug')('ros2-web-bridge:NodeManager'); - -class HandleWithCallbacks { - constructor(handle) { - if (handle) { - this._handle = handle; - this._callbacks = new Map(); - this._count = 1; - } - } - - get handle() { - return this._handle; - } - - addCallback(id, callback) { - this._callbacks.set(id, callback); - } - - removeCallback(id) { - this._callbacks.delete(id); - } - - hasCallbackForId(id) { - return this._callbacks.has(id); - } - - get callbacks() { - return Array.from(this._callbacks.values()); - } - - release() { - if (this._count > 0) { - this._count--; - } - } - - retain() { - this._count++; - } - - get count() { - return this._count; - } -} - -class NodeManager { - constructor(node) { - this._node = node; - this._publishers = new Map(); - this._subscriptions = new Map(); - this._clients = new Map(); - this._services = new Map(); - } - - createPublisher(messageType, topicName, bridgeId) { - let map = this._publishers.get(bridgeId); - if (!map) { - map = new Map(); - this._publishers.set(bridgeId, map); - } - - let publisherHandle = map.get(topicName); - if (!publisherHandle) { - publisherHandle = new HandleWithCallbacks(this._node.createPublisher(messageType, topicName)); - map.set(topicName, publisherHandle); - debug(`Publisher has been created, and the topic is ${topicName}.`); - } else { - publisherHandle.retain(); - } - return publisherHandle.handle; - } - - createSubscription(messageType, topicName, bridgeId, callback) { - let subscriptionHandle = this._subscriptions.get(topicName); - if (!subscriptionHandle) { - let subscription = this._node.createSubscription(messageType, topicName, (message) => { - this._subscriptions.get(topicName).callbacks.forEach(callback => { - callback(topicName, message); - }); - }); - - subscriptionHandle = new HandleWithCallbacks(subscription); - subscriptionHandle.addCallback(bridgeId, callback); - this._subscriptions.set(topicName, subscriptionHandle); - debug(`Subscription has been created, and the topic is ${topicName}.`); - return subscriptionHandle.handle; - } - - if (!subscriptionHandle.hasCallbackForId(bridgeId)) { - subscriptionHandle.addCallback(bridgeId, callback); - subscriptionHandle.retain(); - return subscriptionHandle.handle; - } - } - - createClient(serviceType, serviceName, bridgeId) { - let map = this._clients.get(bridgeId); - if (!map) { - map = new Map(); - this._clients.set(bridgeId, map); - } - - let clientHandle = map.get(serviceName); - if (!clientHandle) { - clientHandle = new HandleWithCallbacks(this._node.createClient(serviceType, serviceName)); - map.set(serviceName, clientHandle); - debug(`Client has been created, and the service name is ${serviceName}.`); - } else { - clientHandle.retain(); - } - return clientHandle.handle; - } - - createService(serviceType, serviceName, bridgeId, callback) { - let map = this._services.get(bridgeId); - if (!map) { - map = new Map(); - this._services.set(bridgeId, map); - } - - if (!map.has(serviceName)) { - let service = this._node.createService(serviceType, serviceName, (request, response) => { - callback(request, response); - }); - map.set(serviceName, service); - return service; - } - } - - getPublisherByTopic(topicName, bridgeId) { - let map = this._publishers.get(bridgeId); - if (map) { - if (map.has(topicName)) { - return map.get(topicName).handle; - } - } - } - - getSubscriptionByTopic(topicName) { - if (this._subscripions.has(topicName)) { - return this._subscripions.get(topicName); - } - } - - destroyPublisher(topicName, bridgeId) { - let map = this._publishers.get(bridgeId); - if (map) { - let publisherHandle = map.get(topicName); - if (publisherHandle) { - publisherHandle.release(); - if (handle.count === 0) { - this._node.destroyPublisher(publisherHandle.handle); - map.delete(topicName); - debug(`Publisher is destroyed, and the topic name is ${topicName}.`); - } - } - } - } - - destroySubscription(topicName, bridgeId) { - let subscriptionHandle = this._subscriptions.get(topicName); - if (subscriptionHandle) { - if (subscriptionHandle.hasCallbackForId(bridgeId)) { - subscriptionHandle.removeCallback(bridgeId); - subscriptionHandle.release(); - } - - if (subscriptionHandle.count === 0) { - this._node.destroySubscription(subscriptionHandle.handle); - debug(`Subscription is destroyed, and the topic name is ${topicName}.`); - } - } - } - - destroyClient(serviceName, bridgeId) { - let map = this._clients.get(bridgeId); - if (map) { - let clientHandle = map.get(serviceName); - if (clientHandle) { - clientHandle.release(); - if (clientHandle.count === 0) { - this._node.destroyClient(clientHandle.handle); - map.delete(serviceName); - debug(`Client is destroyed, and the service name is ${serviceName}.`); - } - } - } - } - - destroyService(serviceName, bridgeId) { - let map = this._services.get(bridgeId); - if (map && map.has(serviceName)) { - this._node.destroyService(this._services.get(serviceName)); - map.delete(serviceName); - debug(`Service is destroyed, and the service name is ${serviceName}.`); - } - } - - cleanResourceByBridgeId(bridgeId) { - if (this._clients.has(bridgeId)) { - this._clients.get(bridgeId).forEach(clientHandle => { - this._node.destroyClient(clientHandle.handle); - debug(`Client is destroyed for bridge ${bridgeId}.`); - }); - this._clients.delete(bridgeId); - } - - if (this._publishers.has(bridgeId)) { - this._publishers.get(bridgeId).forEach(publisherHandle => { - this._node.destroyPublisher(publisherHandle.handle); - debug(`Publisher is destroyed for bridge ${bridgeId}.`); - }); - this._publishers.delete(bridgeId); - } - - if (this._services.has(bridgeId)) { - this._services.get(bridgeId).forEach(service => { - this._node.destroyService(service); - debug(`Service is destroyed for bridge ${bridgeId}.`); - }); - this._publishers.delete(bridgeId); - } - - this._subscriptions.forEach(subscriptionHandle => { - if (subscriptionHandle.hasCallbackForId(bridgeId)) { - subscriptionHandle.removeCallback(bridgeId); - subscriptionHandle.release(); - if (subscriptionHandle.count === 0) { - this._node.destroySubscription(subscriptionHandle.handle); - this._subscriptions.delete(subscriptionHandle.handle.topic); - debug(`Subscription is destroyed for bridge ${bridgeId}.`); - } - } - }); - - debug(`The bridge ${bridgeId} has been cleaned.`); - } - - get node() { - return this._node; - } - - set node(node) { - this._node = node; - } -} - -module.exports = NodeManager; diff --git a/lib/ref_counting_handle.js b/lib/ref_counting_handle.js new file mode 100644 index 0000000..53365dc --- /dev/null +++ b/lib/ref_counting_handle.js @@ -0,0 +1,60 @@ +// Copyright (c) 2017 Intel Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const debug = require('debug')('ros2-web-bridge:RefCountingHandle'); + +class RefCountingHandle { + constructor(object, destroyHandle) { + if (object) { + this._object = object; + this._count = 1; + this._destroyHandle = destroyHandle; + } + } + + get() { + return this._object; + } + + release() { + if (this._count > 0) { + if (--this._count === 0) { + this._destroyHandle(this._object); + this._object = undefined; + debug('Handle is destroyed.'); + } + } + } + + retain() { + this._count++; + } + + destroy() { + if (this._count > 0) { + this._destroyHandle(this._object); + this._count = 0; + this._object = undefined; + debug('Handle is destroyed.'); + } + } + + get count() { + return this._count; + } +} + +module.exports = RefCountingHandle; diff --git a/lib/resource_provider.js b/lib/resource_provider.js new file mode 100644 index 0000000..b74e167 --- /dev/null +++ b/lib/resource_provider.js @@ -0,0 +1,142 @@ +// Copyright (c) 2017 Intel Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const rclnodejs = require('rclnodejs'); +const SubscriptionManager = require('./subscription_manager.js'); +const RefCountingHandle = require('./ref_counting_handle.js'); +const debug = require('debug')('ros2-web-bridge:ResourceProvider'); + +class ResourceProvider { + constructor(node, bridgeId) { + SubscriptionManager.init(node); + this._bridgeId = bridgeId; + this._node = node; + this._publishers = new Map(); + this._clients = new Map(); + this._services = new Map(); + } + + getPublisherByTopicName(topicName) { + return this._publishers.get(topicName).get(); + } + + getClientByServiceName(serviceName) { + return this._clients.get(serviceName).get(); + } + + getServiceByServiceName(serviceName) { + return this._services.get(serviceName).get(); + } + + createPublisher(messageType, topicName) { + let handle = this._publishers.get(topicName); + if (!handle) { + handle = new RefCountingHandle(this._node.createPublisher(messageType, topicName), + this._node.destroyPublisher.bind(this._node)); + this._publishers.set(topicName, handle); + debug(`Publisher has been created, and the topic name is ${topicName}.`); + } else { + handle.retain(); + } + return handle.get(); + } + + createSubscription(messageType, topicName, callback) { + return SubscriptionManager.getInstance().createSubscription(messageType, topicName, this._bridgeId, callback); + } + + createClient(serviceType, serviceName) { + let handle = this._clients.get(serviceName); + if (!handle) { + handle = new RefCountingHandle(this._node.createClient(serviceType, serviceName), + this._node.destroyClient.bind(this._node)); + this._clients.set(serviceName, handle); + debug(`Client has been created, and the service name is ${serviceName}.`); + } else { + handle.retain(); + } + return handle.get(); + } + + createService(serviceType, serviceName, callback) { + let handle = this._services.get(serviceName); + if (!handle) { + handle = new RefCountingHandle(this._node.createService(serviceType, serviceName, (request, response) => { + callback(request, response); + }), + this._node.destroyService.bind(this._node)); + this._services.set(serviceName, handle); + debug(`Service has been created, and the service name is ${serviceName}.`); + } else { + handle.retain(); + } + return handle.get(); + } + + destroyPublisher(topicName) { + if (this._publishers.has(topicName)) { + let handle = this._publishers.get(topicName); + handle.release(); + this._removeInvalidHandle(this._publishers, handle, topicName); + } + } + + destroySubscription(topicName) { + SubscriptionManager.getInstance().destroySubscription(topicName, this._bridgeId); + } + + _destroySubscriptionForBridge() { + SubscriptionManager.getInstance().destroyForBridgeId(this._bridgeId); + } + + destroyClient(serviceName) { + if (this._clients.has(serviceName)) { + let handle = this._clients.get(serviceName); + handle.release(); + this._removeInvalidHandle(this._clients, handle, serviceName); + } + } + + destroyService(serviceName) { + if (this._services.has(serviceName)) { + let handle = this._services.get(serviceName); + handle.release(); + this._removeInvalidHandle(this._services, handle, serviceName); + } + } + + clean() { + this._cleanHandleInMap(this._publishers); + this._cleanHandleInMap(this._services); + this._cleanHandleInMap(this._clients); + this._destroySubscriptionForBridge(); + } + + _removeInvalidHandle(map, handle, name) { + if (handle.count === 0) { + map.delete(name); + } + } + + _cleanHandleInMap(map) { + map.forEach(handle => { + handle.destroy(); + }); + map.clear(); + } +} + +module.exports = ResourceProvider; diff --git a/lib/subscription_manager.js b/lib/subscription_manager.js new file mode 100644 index 0000000..7985c45 --- /dev/null +++ b/lib/subscription_manager.js @@ -0,0 +1,122 @@ +// Copyright (c) 2017 Intel Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const rclnodejs = require('rclnodejs'); +const RefCountingHandle = require('./ref_counting_handle.js'); +const debug = require('debug')('ros2-web-bridge:SubscriptionManager'); + +class HandleWithCallbacks extends RefCountingHandle { + constructor(object, destroyHandle) { + super(object, destroyHandle); + this._callbacks = new Map(); + } + + addCallback(id, callback) { + this._callbacks.set(id, callback); + } + + removeCallback(id) { + this._callbacks.delete(id); + } + + hasCallbackForId(id) { + return this._callbacks.has(id); + } + + get callbacks() { + return Array.from(this._callbacks.values()); + } +} + +class SubscriptionManager { + constructor(node) { + this._subscripions = new Map(); + this._node = node; + } + + getSubscriptionByTopicName(topicName) { + return this._subscripions.get(topicName).get(); + } + + createSubscription(messageType, topicName, bridgeId, callback) { + let handle = this._subscripions.get(topicName); + + if (!handle) { + let subscription = this._node.createSubscription(messageType, topicName, (message) => { + this._subscripions.get(topicName).callbacks.forEach(callback => { + callback(topicName, message); + }); + }); + handle = new HandleWithCallbacks(subscription, this._node.destroySubscription.bind(this._node)); + handle.addCallback(bridgeId, callback); + this._subscripions.set(topicName, handle); + debug(`Subscription has been created, and the topic name is ${topicName}.`); + + return handle.get(); + } + + handle.addCallback(bridgeId, callback); + handle.retain(); + return handle.get(); + } + + destroySubscription(topicName, bridgeId) { + if (this._subscripions.has(topicName)) { + let handle = this._subscripions.get(topicName); + if (handle.hasCallbackForId(bridgeId)) { + handle.removeCallback(bridgeId); + handle.release(); + if (handle.count === 0) { + this._subscripions.delete(topicName); + } + } + } + } + + destroyForBridgeId(bridgeId) { + this._subscripions.forEach(handle => { + if (handle.hasCallbackForId(bridgeId)) { + handle.removeCallback(bridgeId); + handle.release(); + this._removeInvalidHandle(); + } + }); + } + + _removeInvalidHandle() { + this._subscripions.forEach((handle, topicName, map) => { + if (handle.count === 0) { + map.delete(topicName); + } + }); + } +} + +let subscriptionManager = { + _instance: undefined, + + init(node) { + if (!this._instance) { + this._instance = new SubscriptionManager(node); + } + }, + + getInstance() { + return this._instance; + } +}; + +module.exports = subscriptionManager;