Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: muxed streams as web streams #1847

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/protocol-and-stream-muxing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ There is still one last feature, you can provide multiple protocols for the same

```JavaScript
node2.handle(['/another-protocol/1.0.0', '/another-protocol/2.0.0'], ({ stream }) => {
if (stream.stat.protocol === '/another-protocol/2.0.0') {
if (stream.protocol === '/another-protocol/2.0.0') {
// handle backwards compatibility
}

Expand Down Expand Up @@ -136,7 +136,7 @@ node2.handle(['/a', '/b'], ({ stream }) => {
stream,
async function (source) {
for await (const msg of source) {
console.log(`from: ${stream.stat.protocol}, msg: ${uint8ArrayToString(msg.subarray())}`)
console.log(`from: ${stream.protocol}, msg: ${uint8ArrayToString(msg.subarray())}`)
}
}
)
Expand Down
9 changes: 6 additions & 3 deletions packages/interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"test:electron-main": "aegir test -t electron-main"
"test:electron-main": "aegir test -t electron-main",
"generate": "protons src/stream-muxer/fixtures/pb/*.proto"
},
"dependencies": {
"@libp2p/interface": "~0.0.1",
Expand All @@ -104,9 +105,9 @@
"@libp2p/peer-collections": "^3.0.0",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/utils": "^3.0.12",
"@multiformats/multiaddr": "^12.1.3",
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"delay": "^6.0.0",
"it-all": "^3.0.2",
"it-drain": "^3.0.2",
Expand All @@ -121,13 +122,15 @@
"p-defer": "^4.0.0",
"p-limit": "^4.0.0",
"p-wait-for": "^5.0.2",
"protons-runtime": "^5.0.0",
"sinon": "^15.1.2",
"ts-sinon": "^2.0.2",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
},
"devDependencies": {
"aegir": "^39.0.10"
"aegir": "^39.0.10",
"protons": "^7.0.2"
},
"typedoc": {
"entryPoint": "./src/index.ts"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export function createMaConnPair (): [MultiaddrConnection, MultiaddrConnection]
const output: MultiaddrConnection = {
...duplex,
close: async () => {},
abort: () => {},
remoteAddr: multiaddr('/ip4/127.0.0.1/tcp/4001'),
timeline: {
open: Date.now()
Expand Down
50 changes: 24 additions & 26 deletions packages/interface-compliance-tests/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@ export default (test: TestSetup<Connection>): void => {
expect(connection.id).to.exist()
expect(connection.remotePeer).to.exist()
expect(connection.remoteAddr).to.exist()
expect(connection.stat.status).to.equal('OPEN')
expect(connection.stat.timeline.open).to.exist()
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.stat.direction).to.exist()
expect(connection.status).to.equal('OPEN')
expect(connection.timeline.open).to.exist()
expect(connection.timeline.close).to.not.exist()
expect(connection.direction).to.exist()
expect(connection.streams).to.eql([])
expect(connection.tags).to.eql([])
})

it('should get the metadata of an open connection', () => {
const stat = connection.stat

expect(stat.status).to.equal('OPEN')
expect(stat.direction).to.exist()
expect(stat.timeline.open).to.exist()
expect(stat.timeline.close).to.not.exist()
expect(connection.status).to.equal('OPEN')
expect(connection.direction).to.exist()
expect(connection.timeline.open).to.exist()
expect(connection.timeline.close).to.not.exist()
})

it('should return an empty array of streams', () => {
Expand All @@ -51,7 +49,7 @@ export default (test: TestSetup<Connection>): void => {
const protocolToUse = '/echo/0.0.1'
const stream = await connection.newStream([protocolToUse])

expect(stream).to.have.nested.property('stat.protocol', protocolToUse)
expect(stream).to.have.property('protocol', protocolToUse)

const connStreams = connection.streams

Expand Down Expand Up @@ -79,19 +77,19 @@ export default (test: TestSetup<Connection>): void => {
}, proxyHandler)

connection = await test.setup()
connection.stat.timeline = timelineProxy
connection.timeline = timelineProxy
})

afterEach(async () => {
await test.teardown()
})

it('should be able to close the connection after being created', async () => {
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
await connection.close()

expect(connection.stat.timeline.close).to.exist()
expect(connection.stat.status).to.equal('CLOSED')
expect(connection.timeline.close).to.exist()
expect(connection.status).to.equal('CLOSED')
})

it('should be able to close the connection after opening a stream', async () => {
Expand All @@ -100,21 +98,21 @@ export default (test: TestSetup<Connection>): void => {
await connection.newStream([protocol])

// Close connection
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
await connection.close()

expect(connection.stat.timeline.close).to.exist()
expect(connection.stat.status).to.equal('CLOSED')
expect(connection.timeline.close).to.exist()
expect(connection.status).to.equal('CLOSED')
})

it('should properly track streams', async () => {
// Open stream
const protocol = '/echo/0.0.1'
const stream = await connection.newStream([protocol])
expect(stream).to.have.nested.property('stat.protocol', protocol)
expect(stream).to.have.property('protocol', protocol)

// Close stream
stream.close()
await stream.close()

expect(connection.streams.filter(s => s.id === stream.id)).to.be.empty()
})
Expand All @@ -123,7 +121,7 @@ export default (test: TestSetup<Connection>): void => {
// Open stream
const protocol = '/echo/0.0.1'
const stream = await connection.newStream(protocol)
expect(stream).to.have.nested.property('stat.direction', 'outbound')
expect(stream).to.have.property('direction', 'outbound')
})

it.skip('should track inbound streams', async () => {
Expand All @@ -135,20 +133,20 @@ export default (test: TestSetup<Connection>): void => {

it('should support a proxy on the timeline', async () => {
sinon.spy(proxyHandler, 'set')
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()

await connection.close()
// @ts-expect-error - fails to infer callCount
expect(proxyHandler.set.callCount).to.equal(1)
// @ts-expect-error - fails to infer getCall
const [obj, key, value] = proxyHandler.set.getCall(0).args
expect(obj).to.eql(connection.stat.timeline)
expect(obj).to.eql(connection.timeline)
expect(key).to.equal('close')
expect(value).to.be.a('number').that.equals(connection.stat.timeline.close)
expect(value).to.be.a('number').that.equals(connection.timeline.close)
})

it('should fail to create a new stream if the connection is closing', async () => {
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
const p = connection.close()

try {
Expand All @@ -165,7 +163,7 @@ export default (test: TestSetup<Connection>): void => {
})

it('should fail to create a new stream if the connection is closed', async () => {
expect(connection.stat.timeline.close).to.not.exist()
expect(connection.timeline.close).to.not.exist()
await connection.close()

try {
Expand Down
87 changes: 39 additions & 48 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as STATUS from '@libp2p/interface/connection/status'
import * as Status from '@libp2p/interface/connection/status'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import * as mss from '@libp2p/multistream-select'
Expand All @@ -9,13 +9,11 @@ import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { mockMuxer } from './muxer.js'
import { mockRegistrar } from './registrar.js'
import type { AbortOptions } from '@libp2p/interface'
import type { MultiaddrConnection, Connection, Stream, ConnectionStat, Direction } from '@libp2p/interface/connection'
import type { MultiaddrConnection, Connection, Stream, Direction, ByteStream, ConnectionTimeline } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:mock-connection')

Expand All @@ -38,7 +36,10 @@ class MockConnection implements Connection {
public remoteAddr: Multiaddr
public remotePeer: PeerId
public direction: Direction
public stat: ConnectionStat
public timeline: ConnectionTimeline
public multiplexer?: string
public encryption?: string
public status: keyof typeof Status
public streams: Stream[]
public tags: string[]

Expand All @@ -52,13 +53,10 @@ class MockConnection implements Connection {
this.remoteAddr = remoteAddr
this.remotePeer = remotePeer
this.direction = direction
this.stat = {
status: STATUS.OPEN,
direction,
timeline: maConn.timeline,
multiplexer: 'test-multiplexer',
encryption: 'yes-yes-very-secure'
}
this.status = Status.OPEN
this.timeline = maConn.timeline
this.multiplexer = 'test-multiplexer'
this.encryption = 'yes-yes-very-secure'
this.streams = []
this.tags = []
this.muxer = muxer
Expand All @@ -74,30 +72,20 @@ class MockConnection implements Connection {
throw new Error('protocols must have a length')
}

if (this.stat.status !== STATUS.OPEN) {
if (this.status !== Status.OPEN) {
throw new CodeError('connection must be open to create streams', 'ERR_CONNECTION_CLOSED')
}

const id = `${Math.random()}`
const stream = await this.muxer.newStream(id)
const result = await mss.select(stream, protocols, options)

const streamWithProtocol: Stream = {
...stream,
...result.stream,
stat: {
...stream.stat,
direction: 'outbound',
protocol: result.protocol
}
}
const protocolStream = await mss.select(stream, protocols, options)

this.streams.push(streamWithProtocol)
this.streams.push(protocolStream)

return streamWithProtocol
return protocolStream
}

addStream (stream: Stream): void {
addStream (stream: any): void {
this.streams.push(stream)
}

Expand All @@ -106,13 +94,23 @@ class MockConnection implements Connection {
}

async close (): Promise<void> {
this.stat.status = STATUS.CLOSING
this.status = Status.CLOSING
await Promise.all(
this.streams.map(async s => s.close())
)
await this.maConn.close()
this.status = Status.CLOSED
this.timeline.close = Date.now()
}

abort (err: Error): void {
this.status = Status.CLOSING
this.streams.forEach(s => {
s.close()
s.abort(err)
})
this.stat.status = STATUS.CLOSED
this.stat.timeline.close = Date.now()
this.maConn.abort(err)
this.status = Status.CLOSED
this.timeline.close = Date.now()
}
}

Expand All @@ -134,15 +132,13 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
onIncomingStream: (muxedStream) => {
try {
mss.handle(muxedStream, registrar.getProtocols())
.then(({ stream, protocol }) => {
log('%s: incoming stream opened on %s', direction, protocol)
muxedStream = { ...muxedStream, ...stream }
muxedStream.stat.protocol = protocol
.then(stream => {
log('%s: incoming stream opened on %s', stream.direction, stream.protocol)

connection.addStream(muxedStream)
const { handler } = registrar.getHandler(protocol)
const { handler } = registrar.getHandler(stream.protocol)

handler({ connection, stream: muxedStream })
handler({ connection, stream })
}).catch(err => {
log.error(err)
})
Expand Down Expand Up @@ -170,20 +166,15 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
return connection
}

export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Source<Uint8ArrayList | Uint8Array>, Promise<void>>): Stream {
export function mockStream (stream: ByteStream): Stream {
return {
...stream,
close: () => {},
closeRead: () => {},
closeWrite: () => {},
close: async () => {},
abort: () => {},
reset: () => {},
stat: {
direction: 'outbound',
protocol: '/foo/1.0.0',
timeline: {
open: Date.now()
}
direction: 'outbound',
protocol: '/foo/1.0.0',
timeline: {
open: Date.now()
},
metadata: {},
id: `stream-${Date.now()}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export function mockMultiaddrConnection (source: Duplex<AsyncGenerator<Uint8Arra
async close () {

},
abort: () => {},
timeline: {
open: Date.now()
},
Expand Down Expand Up @@ -44,6 +45,10 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
close: async () => {
outbound.timeline.close = Date.now()
controller.abort()
},
abort: () => {
outbound.timeline.close = Date.now()
controller.abort()
}
}

Expand All @@ -56,6 +61,10 @@ export function mockMultiaddrConnPair (opts: MockMultiaddrConnPairOptions): { in
close: async () => {
inbound.timeline.close = Date.now()
controller.abort()
},
abort: () => {
inbound.timeline.close = Date.now()
controller.abort()
}
}

Expand Down
Loading