From 2119f8f55edb8d3da53783715532143963c01814 Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 17 Jun 2016 12:26:03 +0100 Subject: [PATCH] peer-bridge --- examples/peer-bridge/README.md | 6 + gulpfile.js | 16 +++ package.json | 17 ++- src/{public/js/app.js => app/index.js} | 0 src/app/peer-bridge.js | 91 +++++++++++++ src/index.js | 35 +---- src/sig-server/index.js | 54 ++++++++ src/sig-server/ws-server.js | 75 +++++++++++ src/ws-server.js | 34 ----- test/browser.js | 3 + test/node.js | 3 + test/peer-bridge/test-peer-bridge.js | 13 ++ test/sig-server/test-sig-server.js | 178 +++++++++++++++++++++++++ 13 files changed, 455 insertions(+), 70 deletions(-) create mode 100644 examples/peer-bridge/README.md create mode 100644 gulpfile.js rename src/{public/js/app.js => app/index.js} (100%) create mode 100644 src/app/peer-bridge.js create mode 100644 src/sig-server/index.js create mode 100644 src/sig-server/ws-server.js delete mode 100644 src/ws-server.js create mode 100644 test/browser.js create mode 100644 test/node.js create mode 100644 test/peer-bridge/test-peer-bridge.js create mode 100644 test/sig-server/test-sig-server.js diff --git a/examples/peer-bridge/README.md b/examples/peer-bridge/README.md new file mode 100644 index 0000000..0d0cd8f --- /dev/null +++ b/examples/peer-bridge/README.md @@ -0,0 +1,6 @@ +How to use Peer Bridge +====================== + +> Peer Bridge is used by Remote Performer to connect the peers together through WebRTC DataChannels, exposing a primitive to broadcast the MIDI controls. + + diff --git a/gulpfile.js b/gulpfile.js new file mode 100644 index 0000000..78822e3 --- /dev/null +++ b/gulpfile.js @@ -0,0 +1,16 @@ +'use strict' + +var gulp = require('gulp') + +var sigServer = require('./src/sig-server') +var sigS + +gulp.task('test:browser:before', (done) => { + sigS = sigServer.start(15555, done) +}) + +gulp.task('test:browser:after', (done) => { + sigS.stop(done) +}) + +require('aegir/gulp')(gulp) diff --git a/package.json b/package.json index 1ad959b..2ae334e 100644 --- a/package.json +++ b/package.json @@ -4,16 +4,17 @@ "description": "Real-time music collaboration for the web", "main": "src/index.js", "scripts": { - "build": "browserify -t brfs src/public/js/app.js > src/public/js/bundle.js", - "start": "npm run build && node src/index.js", + "build": "browserify -t brfs src/app/index.js > src/public/js/bundle.js", + "start": "npm run build && node src/sig-server/index.js", "lint": "standard", - "test": "exit 0" + "test": "gulp test", + "test:node": "gulp test:node", + "test:browser": "gulp test:browser" }, "repository": { "type": "git", "url": "https://github.com/websound/RemotePerformer.git" }, - "author": "Nathan Ward", "license": "MIT", "bugs": { "url": "https://github.com/websound/RemotePerformer/issues" @@ -32,14 +33,22 @@ "inert": "^4.0.0", "insert-css": "^0.2.0", "jquery": "^2.1.0", + "length-prefixed-stream": "^1.5.0", "process-nextick-args": "^1.0.7", + "simple-peer": "^6.0.4", + "socket.io": "^1.4.6", + "socket.io-client": "^1.4.6", "watchify": "^3.7.0", "ws": "^1.1.0" }, "devDependencies": { + "aegir": "^3.2.0", + "chai": "^3.5.0", "eslint-config-standard": "^5.1.0", "eslint-plugin-standard": "^1.3.2", + "mocha": "^2.5.3", "pre-commit": "^1.1.3", + "run-parallel": "^1.1.6", "standard": "^7.1.2" } } diff --git a/src/public/js/app.js b/src/app/index.js similarity index 100% rename from src/public/js/app.js rename to src/app/index.js diff --git a/src/app/peer-bridge.js b/src/app/peer-bridge.js new file mode 100644 index 0000000..d0313b1 --- /dev/null +++ b/src/app/peer-bridge.js @@ -0,0 +1,91 @@ +var SimplePeer = require('simple-peer') +var EE = require('events').EventEmitter +var util = require('util') +var lpstream = require('length-prefixed-stream') +var io = require('socket.io-client') + +exports = module.exports = PeerBridge + +util.inherits(PeerBridge, EE) + +function PeerBridge () { + this.conns = {} + this.id = (~~(Math.random() * 1e9)).toString(36) + Date.now() + + this.setUp = (sigUrl, callback) => { + var sioClient = io.connect(sigUrl, { + transports: ['websocket'], + 'force new connection': true + }) + sioClient.once('connect_error', callback) + sioClient.on('connect', () => { + sioClient.emit('ss-join', this.id) + sioClient.on('ws-handshake', incommingDial) + sioClient.on('ws-peer', peerDiscovered.bind(this)) + callback() + }) + + function incommingDial (offer) { + if (offer.answer) { + return + } + + const channel = new SimplePeer({ trickle: false }) + + channel.on('connect', () => { + this.emit('peer', offer.srcId) + // this.conns[offer.srcId] = { + // channel: channel, + // lps: '' + // } + // attach LPS + // TODO decode the messages received + }) + + channel.on('signal', function (signal) { + offer.signal = signal + offer.answer = true + sioClient.emit('ss-handshake', offer) + }) + + channel.signal(offer.signal) + } + + function peerDiscovered (_id) { + var intentId = (~~(Math.random() * 1e9)).toString(36) + Date.now() + var channel = new SimplePeer({ initiator: true, trickle: false }) + + channel.on('signal', function (signal) { + sioClient.emit('ss-handshake', { + intentId: intentId, + srcId: this.id, + dstId: _id, + signal: signal + }) + }) + + sioClient.on('ws-handshake', (offer) => { + if (offer.intentId !== intentId || !offer.answer) { + return + } + + channel.on('connect', () => { + // TODO add to the conns map with lps attached + this.emit('peer', _id) + }) + channel.signal(offer.signal) + }) + } + } + + this.broadcast = (buf) => { + Object.keys(this.conns).forEach((key) => { + this.conns[key].lps.encode.write(buf) + }) + } + + this.send = (id, buf) => { + this.conns[id].lps.encode.write(buf) + } +} + diff --git a/src/index.js b/src/index.js index 1bec9f7..615d5b5 100644 --- a/src/index.js +++ b/src/index.js @@ -1,34 +1,5 @@ -var attachWSS = require('./ws-server.js') -var Hapi = require('hapi') -var path = require('path') +'use strict' -var server = new Hapi.Server({}) +exports = module.exports -server.connection({ - port: Number(process.env.PORT) || 9090 -}) - -server.register(require('inert'), function (err) { - if (err) { - throw err - } - - server.route({ - method: 'GET', - path: '/{param*}', - handler: { - directory: { - path: path.join(__dirname, '/public') - } - } - }) - - server.start(function (err) { - if (err) { - throw err - } - - console.log('Server running at:', server.info.uri) - attachWSS(server) - }) -}) +exports.PeerBridge = require('./app/peer-bridge.js') diff --git a/src/sig-server/index.js b/src/sig-server/index.js new file mode 100644 index 0000000..f93d9f3 --- /dev/null +++ b/src/sig-server/index.js @@ -0,0 +1,54 @@ +var attachWSS = require('./ws-server.js') +var Hapi = require('hapi') +var path = require('path') + +exports = module.exports + +exports.start = (port, callback) => { + if (typeof port === 'function') { + callback = port + port = undefined + } + var options = { + connections: { + routes: { + cors: true + } + } + } + + var httpListener = new Hapi.Server(options) + + httpListener.connection({ + port: port || Number(process.env.PORT) || 9090 + }) + + httpListener.register(require('inert'), function (err) { + if (err) { + throw err + } + + httpListener.route({ + method: 'GET', + path: '/{param*}', + handler: { + directory: { + path: path.join(__dirname, '../public') + } + } + }) + + httpListener.start(function (err) { + if (err) { + throw err + } + + console.log('Server running at:', httpListener.info.uri) + httpListener.peers = attachWSS(httpListener).peers + callback(null, httpListener.info) + }) + }) + + return httpListener +} + diff --git a/src/sig-server/ws-server.js b/src/sig-server/ws-server.js new file mode 100644 index 0000000..a5fe5f3 --- /dev/null +++ b/src/sig-server/ws-server.js @@ -0,0 +1,75 @@ +'use strict' + +const log = console.log +const SocketIO = require('socket.io') + +module.exports = (http) => { + const io = new SocketIO(http.listener) + + io.on('connection', handle) + + const peers = {} + + this.peers = () => { + return peers + } + + function safeEmit (addr, event, arg) { + const peer = peers[addr] + if (!peer) { + log('trying to emit %s but peer is gone', event) + return + } + + peer.emit(event, arg) + } + + function handle (socket) { + socket.on('ss-join', join.bind(socket)) + socket.on('ss-leave', leave.bind(socket)) + socket.on('disconnect', disconnect.bind(socket)) // socket.io own event + socket.on('ss-handshake', forwardHandshake) + } + + // join this signaling server network + function join (id) { + peers[id] = this // socket + Object.keys(peers).forEach((_id) => { + if (_id === id) { + return + } + // broadcast the new peer + safeEmit(_id, 'ws-peer', id) + }) + } + + function leave (multiaddr) { + if (peers[multiaddr]) { + delete peers[multiaddr] + } + } + + function disconnect () { + Object.keys(peers).forEach((mh) => { + if (peers[mh].id === this.id) { + delete peers[mh] + } + }) + } + + // forward an WebRTC offer to another peer + function forwardHandshake (offer) { + if (offer.answer) { + safeEmit(offer.srcId, 'ws-handshake', offer) + } else { + if (peers[offer.dstId]) { + safeEmit(offer.dstId, 'ws-handshake', offer) + } else { + offer.err = 'peer is not available' + safeEmit(offer.srcId, 'ws-handshake', offer) + } + } + } + + return this +} diff --git a/src/ws-server.js b/src/ws-server.js deleted file mode 100644 index f995223..0000000 --- a/src/ws-server.js +++ /dev/null @@ -1,34 +0,0 @@ -var WebSocketServer = require('ws').Server - -module.exports = function attachWSS (server) { - // Websocket server - var wss = new WebSocketServer({server: server.listener}) - - wss.broadcast = function broadcast (data, flags) { - wss.clients.forEach(function each (client) { - client.send(data, flags) - }) - } - - console.log('websocket server created') - wss.on('connection', function (ws) { - ws.broadcast = function broadcast (data, flags) { - wss.clients.forEach(function bc (client) { - if (client === ws) return - client.send(data, flags) - }) - } - - ws.on('message', function (data, flags) { - if (flags.binary) { // If received binary message, i.e. MIDI - console.log('MIDI:', data) - wss.broadcast(data, {binary: true}) // Echo MIDI message back to client - } - }) - - console.log('websocket connection open') - ws.on('close', function () { - console.log('websocket connection close') - }) - }) -} diff --git a/test/browser.js b/test/browser.js new file mode 100644 index 0000000..3ee48c0 --- /dev/null +++ b/test/browser.js @@ -0,0 +1,3 @@ +'use strict' + +require('./peer-bridge/test-peer-bridge.js') diff --git a/test/node.js b/test/node.js new file mode 100644 index 0000000..78078f7 --- /dev/null +++ b/test/node.js @@ -0,0 +1,3 @@ +'use strict' + +require('./sig-server/test-sig-server.js') diff --git a/test/peer-bridge/test-peer-bridge.js b/test/peer-bridge/test-peer-bridge.js new file mode 100644 index 0000000..deef07e --- /dev/null +++ b/test/peer-bridge/test-peer-bridge.js @@ -0,0 +1,13 @@ +/* eslint-env mocha */ +'use strict' + +var expect = require('chai').expect +// var parallel = require('run-parallel') + +var PeerBridge = require('../../src').PeerBridge + +describe('peer bridge', () => { + it('a test', (done) => { + done() + }) +}) diff --git a/test/sig-server/test-sig-server.js b/test/sig-server/test-sig-server.js new file mode 100644 index 0000000..9629bdb --- /dev/null +++ b/test/sig-server/test-sig-server.js @@ -0,0 +1,178 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const io = require('socket.io-client') +const parallel = require('run-parallel') + +const sigServer = require('../../src/sig-server') + +describe('signalling server', () => { + const sioOptions = { + transports: ['websocket'], + 'force new connection': true + } + + let sioUrl + let sigS + let c1 + let c2 + let c3 + let c4 + + let c1Id = '1' + let c2Id = '2' + let c3Id = '3' + let c4Id = '4' + + it('start and stop signalling server (default port)', (done) => { + const sigS = sigServer.start((err, info) => { + expect(err).to.not.exist + expect(info.port).to.equal(9090) + expect(info.protocol).to.equal('http') + expect(info.address).to.equal('0.0.0.0') + sigS.stop(done) + }) + }) + + it('start and stop signalling server (custom port)', (done) => { + const sigS = sigServer.start(12345, (err, info) => { + expect(err).to.not.exist + expect(info.port).to.equal(12345) + expect(info.protocol).to.equal('http') + expect(info.address).to.equal('0.0.0.0') + sigS.stop(done) + }) + }) + + it('start signalling server for client tests', (done) => { + sigS = sigServer.start(12345, (err, info) => { + expect(err).to.not.exist + expect(info.port).to.equal(12345) + expect(info.protocol).to.equal('http') + expect(info.address).to.equal('0.0.0.0') + sioUrl = info.uri + done() + }) + }) + + it('zero peers', () => { + expect(Object.keys(sigS.peers).length).to.equal(0) + }) + + it('connect one client', (done) => { + c1 = io.connect(sioUrl, sioOptions) + c1.on('connect', done) + }) + + it('connect three more clients', (done) => { + let count = 0 + + c2 = io.connect(sioUrl, sioOptions) + c3 = io.connect(sioUrl, sioOptions) + c4 = io.connect(sioUrl, sioOptions) + + c2.on('connect', connected) + c3.on('connect', connected) + c4.on('connect', connected) + + function connected () { + if (++count === 3) { done() } + } + }) + + it('ss-join first client', (done) => { + c1.emit('ss-join', c1Id) + setTimeout(() => { + expect(Object.keys(sigS.peers()).length).to.equal(1) + done() + }, 10) + }) + + it('ss-join and ss-leave second client', (done) => { + c2.emit('ss-join', c2Id) + setTimeout(() => { + expect(Object.keys(sigS.peers()).length).to.equal(2) + c2.emit('ss-leave', c2Id) + setTimeout(() => { + expect(Object.keys(sigS.peers()).length).to.equal(1) + done() + }, 10) + }, 10) + }) + + it('ss-join and disconnect third client', (done) => { + c3.emit('ss-join', c3Id) + setTimeout(() => { + expect(Object.keys(sigS.peers()).length).to.equal(2) + c3.disconnect() + setTimeout(() => { + expect(Object.keys(sigS.peers()).length).to.equal(1) + done() + }, 10) + }, 10) + }) + + it('ss-join the fourth', (done) => { + c1.on('ws-peer', (_id) => { + expect(_id).to.equal(c4Id) + expect(Object.keys(sigS.peers()).length).to.equal(2) + done() + }) + c4.emit('ss-join', c4Id) + }) + + it('c1 handshake c4', (done) => { + c4.once('ws-handshake', (offer) => { + offer.answer = true + c4.emit('ss-handshake', offer) + }) + + c1.once('ws-handshake', (offer) => { + expect(offer.err).to.not.exist + expect(offer.answer).to.equal(true) + done() + }) + + c1.emit('ss-handshake', { + srcId: c1Id, + dstId: c4Id + }) + }) + + it('c1 handshake c2 fail (does not exist anymore)', (done) => { + c1.once('ws-handshake', (offer) => { + expect(offer.err).to.exist + done() + }) + + c1.emit('ss-handshake', { + srcId: c1Id, + dstId: c2Id + }) + }) + + it('stop signalling server', (done) => { + parallel([ + (cb) => { + c1.disconnect() + cb() + }, + (cb) => { + c2.disconnect() + cb() + }, + // done in test + // (cb) => { + // c3.disconnect() + // cb() + // }, + (cb) => { + c4.disconnect() + cb() + } + ], () => { + sigS.stop(done) + }) + }) +})