Skip to content

Commit

Permalink
feat: add sessions to trustless gateways (#459)
Browse files Browse the repository at this point in the history
Implements blockstore sessions for trustless gateways.

- Queries the Helia routing for block providers
- Creates a set of trustless gateways from routing results
- Uses only these gateways to fetch session blocks

---------

Co-authored-by: Russell Dempsey <[email protected]>
  • Loading branch information
achingbrain and SgtPooki authored Apr 4, 2024
1 parent 5cf216b commit 6ddefb0
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 14 deletions.
36 changes: 36 additions & 0 deletions packages/block-brokers/.aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import cors from 'cors'
import polka from 'polka'

/** @type {import('aegir').PartialOptions} */
const options = {
test: {
async before (options) {
const server = polka({
port: 0,
host: '127.0.0.1'
})
server.use(cors())
server.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => {
res.writeHead(200, {
'content-type': 'application/octet-stream'
})
res.end(Uint8Array.from([0, 1, 2, 0]))
})

await server.listen()
const { port } = server.server.address()

return {
server,
env: {
TRUSTLESS_GATEWAY: `http://127.0.0.1:${port}`
}
}
},
async after (options, before) {
await before.server.server.close()
}
}
}

export default options
9 changes: 9 additions & 0 deletions packages/block-brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@
"dependencies": {
"@helia/interface": "^4.1.0",
"@libp2p/interface": "^1.1.4",
"@libp2p/utils": "^5.2.6",
"@multiformats/multiaddr-matcher": "^1.2.0",
"@multiformats/multiaddr-to-uri": "^10.0.1",
"interface-blockstore": "^5.2.10",
"ipfs-bitswap": "^20.0.2",
"multiformats": "^13.1.0",
"p-defer": "^4.0.0",
"progress-events": "^1.0.0"
},
"devDependencies": {
"@libp2p/logger": "^4.0.7",
"@libp2p/peer-id-factory": "^4.0.7",
"@multiformats/multiaddr": "^12.1.14",
"@multiformats/uri-to-multiaddr": "^8.0.0",
"@types/sinon": "^17.0.3",
"aegir": "^42.2.5",
"cors": "^2.8.5",
"polka": "^0.5.2",
"sinon": "^17.0.1",
"sinon-ts": "^2.0.0"
}
Expand Down
150 changes: 145 additions & 5 deletions packages/block-brokers/src/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
import { DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY, DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT } from '@helia/interface'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { isPrivateIp } from '@libp2p/utils/private-ip'
import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import pDefer from 'p-defer'
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks'
import type { Routing, BlockRetrievalOptions, BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptions<TrustlessGatewayGetBlockProgressEvents> {
/**
* Specify the cache control header to send to the remote. 'only-if-cached'
* will prevent the gateway from fetching the content if they don't have it.
*
* @default only-if-cached
*/
cacheControl?: string

/**
* By default we will only connect to peers with HTTPS addresses, pass true
* to also connect to HTTP addresses.
*
* @default false
*/
allowInsecure?: boolean

/**
* By default we will only connect to peers with public or DNS addresses, pass
* true to also connect to private addresses.
*
* @default false
*/
allowLocal?: boolean
}

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGatewayGetBlockProgressEvents> {
private readonly components: TrustlessGatewayComponents
private readonly gateways: TrustlessGateway[]
private readonly routing: Routing
private readonly log: Logger

constructor (components: TrustlessGatewayComponents, init: TrustlessGatewayBlockBrokerInit = {}) {
this.components = components
this.log = components.logger.forComponent('helia:trustless-gateway-block-broker')
this.routing = components.routing
this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS)
.map((gatewayOrUrl) => {
return new TrustlessGateway(gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl, components.logger)
})
}

addGateway (gatewayOrUrl: string): void {
this.gateways.push(new TrustlessGateway(gatewayOrUrl, this.components.logger))
}

async retrieve (cid: CID, options: BlockRetrievalOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
// TODO: switch to toSorted when support is better
Expand All @@ -38,7 +78,7 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
this.log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
throw new Error(`Block for CID ${cid} from gateway ${gateway.url} failed validation`)
}

return block
Expand All @@ -47,7 +87,7 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
aggregateErrors.push(new Error(`Unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
Expand All @@ -57,6 +97,106 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
}
}

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
if (aggregateErrors.length > 0) {
throw new AggregateError(aggregateErrors, `Unable to fetch raw block for CID ${cid} from any gateway`)
} else {
throw new Error(`Unable to fetch raw block for CID ${cid} from any gateway`)
}
}

async createSession (root: CID, options: CreateTrustlessGatewaySessionOptions = {}): Promise<BlockBroker<TrustlessGatewayGetBlockProgressEvents>> {
const gateways: string[] = []
const minProviders = options.minProviders ?? DEFAULT_SESSION_MIN_PROVIDERS
const maxProviders = options.minProviders ?? DEFAULT_SESSION_MAX_PROVIDERS
const deferred = pDefer<BlockBroker<TrustlessGatewayGetBlockProgressEvents>>()
const broker = new TrustlessGatewayBlockBroker(this.components, {
gateways
})

this.log('finding transport-ipfs-gateway-http providers for cid %c', root)

const queue = new PeerQueue({
concurrency: options.providerQueryConcurrency ?? DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY
})

Promise.resolve().then(async () => {
for await (const provider of this.routing.findProviders(root, options)) {
const httpAddresses = provider.multiaddrs.filter(ma => {
if (HTTPS.matches(ma) || (options.allowInsecure === true && HTTP.matches(ma))) {
if (options.allowLocal === true) {
return true
}

if (DNS.matches(ma)) {
return true
}

return isPrivateIp(ma.toOptions().host) === false
}

return false
})

if (httpAddresses.length === 0) {
continue
}

this.log('found transport-ipfs-gateway-http provider %p for cid %c', provider.id, root)

void queue.add(async () => {
for (const ma of httpAddresses) {
let uri: string | undefined

try {
// /ip4/x.x.x.x/tcp/31337/http
// /ip4/x.x.x.x/tcp/31337/https
// etc
uri = multiaddrToUri(ma)

const resource = `${uri}/ipfs/${root.toString()}?format=raw`

// make sure the peer is available - HEAD support doesn't seem to
// be very widely implemented so as long as the remote responds
// we are happy they are valid
// https://specs.ipfs.tech/http-gateways/trustless-gateway/#head-ipfs-cid-path-params

// in the future we should be able to request `${uri}/.well-known/libp2p-http
// and discover an IPFS gateway from $.protocols['/ipfs/gateway'].path
// in the response
// https://github.com/libp2p/specs/pull/508/files
const response = await fetch(resource, {
method: 'HEAD',
headers: {
Accept: 'application/vnd.ipld.raw',
'Cache-Control': options.cacheControl ?? 'only-if-cached'
},
signal: AbortSignal.timeout(options.providerQueryTimeout ?? DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT)
})

this.log('HEAD %s %d', resource, response.status)
gateways.push(uri)
broker.addGateway(uri)

this.log('found %d transport-ipfs-gateway-http providers for cid %c', gateways.length, root)

if (gateways.length === minProviders) {
deferred.resolve(broker)
}

if (gateways.length === maxProviders) {
queue.clear()
}
} catch (err: any) {
this.log.error('could not fetch %c from %a', root, uri ?? ma, err)
}
}
})
}
})
.catch(err => {
this.log.error('error creating session for %c', root, err)
})

return deferred.promise
}
}
3 changes: 2 additions & 1 deletion packages/block-brokers/src/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { BlockBroker } from '@helia/interface/src/blocks.js'
import type { Routing, BlockBroker } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressEvent } from 'progress-events'

Expand All @@ -22,6 +22,7 @@ export interface TrustlessGatewayBlockBrokerInit {
}

export interface TrustlessGatewayComponents {
routing: Routing
logger: ComponentLogger
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

/**
Expand Down Expand Up @@ -36,8 +37,11 @@ export class TrustlessGateway {
*/
#successes = 0

constructor (url: URL | string) {
private readonly log: Logger

constructor (url: URL | string, logger: ComponentLogger) {
this.url = url instanceof URL ? url : new URL(url)
this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`)
}

/**
Expand Down Expand Up @@ -67,6 +71,9 @@ export class TrustlessGateway {
},
cache: 'force-cache'
})

this.log('GET %s %d', gwUrl, res.status)

if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
Expand Down
55 changes: 48 additions & 7 deletions packages/block-brokers/test/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
/* eslint-env mocha */

import { defaultLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr } from '@multiformats/multiaddr'
import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr'
import { expect } from 'aegir/chai'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
import { type StubbedInstance, stubConstructor } from 'sinon-ts'
import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts'
import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js'
import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js'
import { createBlock } from './fixtures/create-block.js'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Routing } from '@helia/interface'
import type { CID } from 'multiformats/cid'

describe('trustless-gateway-block-broker', () => {
let blocks: Array<{ cid: CID, block: Uint8Array }>
let gatewayBlockBroker: BlockBroker
let gatewayBlockBroker: TrustlessGatewayBlockBroker
let gateways: Array<StubbedInstance<TrustlessGateway>>
let routing: StubbedInstance<Routing>

// take a Record<gatewayIndex, (gateway: StubbedInstance<TrustlessGateway>) => void> and stub the gateways
// Record.default is the default handler
Expand All @@ -29,19 +33,21 @@ describe('trustless-gateway-block-broker', () => {
}

beforeEach(async () => {
routing = stubInterface<Routing>()
blocks = []

for (let i = 0; i < 10; i++) {
blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i])))
}

gateways = [
stubConstructor(TrustlessGateway, 'http://localhost:8080'),
stubConstructor(TrustlessGateway, 'http://localhost:8081'),
stubConstructor(TrustlessGateway, 'http://localhost:8082'),
stubConstructor(TrustlessGateway, 'http://localhost:8083')
stubConstructor(TrustlessGateway, 'http://localhost:8080', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8081', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8082', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8083', defaultLogger())
]
gatewayBlockBroker = new TrustlessGatewayBlockBroker({
routing,
logger: defaultLogger()
})
// must copy the array because the broker calls .sort which mutates in-place
Expand Down Expand Up @@ -150,4 +156,39 @@ describe('trustless-gateway-block-broker', () => {
expect(gateways[1].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false()
expect(gateways[2].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false()
})

it('creates a session', async () => {
routing.findProviders.returns(async function * () {
// non-http provider
yield {
id: await createEd25519PeerId(),
multiaddrs: [
multiaddr('/ip4/132.32.25.6/tcp/1234')
]
}
// expired peer info
yield {
id: await createEd25519PeerId(),
multiaddrs: []
}
// http gateway
yield {
id: await createEd25519PeerId(),
multiaddrs: [
uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '')
]
}
}())

const sessionBlockstore = await gatewayBlockBroker.createSession?.(blocks[0].cid, {
minProviders: 1,
providerQueryConcurrency: 1,
allowInsecure: true,
allowLocal: true
})

expect(sessionBlockstore).to.be.ok()

await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block)
})
})

0 comments on commit 6ddefb0

Please sign in to comment.