Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sessions to trustless gateways #459

Merged
merged 17 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions packages/block-brokers/.aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import cors from 'cors'
import polka from 'polka'

/** @type {import('aegir').PartialOptions} */
const options = {
test: {
async before (options) {
const server = polka({
port: 0,
host: '127.0.0.1'
})
server.use(cors())
server.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => {
res.writeHead(200, {
'content-type': 'application/octet-stream'
})
res.end(Uint8Array.from([0, 1, 2, 0]))
})

await server.listen()
const { port } = server.server.address()

return {
server,
env: {
TRUSTLESS_GATEWAY: `http://127.0.0.1:${port}`
}
}
},
async after (options, before) {
await before.server.server.close()
}
}
}

export default options
9 changes: 9 additions & 0 deletions packages/block-brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@
"dependencies": {
"@helia/interface": "^4.1.0",
"@libp2p/interface": "^1.1.4",
"@libp2p/utils": "^5.2.6",
"@multiformats/multiaddr-matcher": "^1.2.0",
"@multiformats/multiaddr-to-uri": "^10.0.1",
"interface-blockstore": "^5.2.10",
"ipfs-bitswap": "^20.0.2",
"multiformats": "^13.1.0",
"p-defer": "^4.0.0",
"progress-events": "^1.0.0"
},
"devDependencies": {
"@libp2p/logger": "^4.0.7",
"@libp2p/peer-id-factory": "^4.0.7",
"@multiformats/multiaddr": "^12.1.14",
"@multiformats/uri-to-multiaddr": "^8.0.0",
"@types/sinon": "^17.0.3",
"aegir": "^42.2.5",
"cors": "^2.8.5",
"polka": "^0.5.2",
"sinon": "^17.0.1",
"sinon-ts": "^2.0.0"
}
Expand Down
150 changes: 145 additions & 5 deletions packages/block-brokers/src/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
import { DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY, DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT } from '@helia/interface'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { isPrivateIp } from '@libp2p/utils/private-ip'
import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import pDefer from 'p-defer'
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks'
import type { Routing, BlockRetrievalOptions, BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptions<TrustlessGatewayGetBlockProgressEvents> {
/**
* Specify the cache control header to send to the remote. 'only-if-cached'
* will prevent the gateway from fetching the content if they don't have it.
*
* @default only-if-cached
*/
cacheControl?: string
Comment on lines +19 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about defaulting to asking gateways to only return content they have. will read more in the PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to stop them doing what gateways do, e.g. fetch content on your behalf. Otherwise it defeats the purpose of having a session.


/**
* By default we will only connect to peers with HTTPS addresses, pass true
* to also connect to HTTP addresses.
*
* @default false
*/
allowInsecure?: boolean

/**
* By default we will only connect to peers with public or DNS addresses, pass
* true to also connect to private addresses.
*
* @default false
*/
allowLocal?: boolean
}

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGatewayGetBlockProgressEvents> {
private readonly components: TrustlessGatewayComponents
private readonly gateways: TrustlessGateway[]
private readonly routing: Routing
private readonly log: Logger

constructor (components: TrustlessGatewayComponents, init: TrustlessGatewayBlockBrokerInit = {}) {
this.components = components
this.log = components.logger.forComponent('helia:trustless-gateway-block-broker')
this.routing = components.routing
this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS)
.map((gatewayOrUrl) => {
return new TrustlessGateway(gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl, components.logger)
})
}

addGateway (gatewayOrUrl: string): void {
this.gateways.push(new TrustlessGateway(gatewayOrUrl, this.components.logger))
}

async retrieve (cid: CID, options: BlockRetrievalOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
// TODO: switch to toSorted when support is better
Expand All @@ -38,7 +78,7 @@
this.log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
throw new Error(`Block for CID ${cid} from gateway ${gateway.url} failed validation`)
}

return block
Expand All @@ -47,7 +87,7 @@
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
aggregateErrors.push(new Error(`Unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))

Check warning on line 90 in packages/block-brokers/src/trustless-gateway/broker.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/broker.ts#L90

Added line #L90 was not covered by tests
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
Expand All @@ -57,6 +97,106 @@
}
}

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
if (aggregateErrors.length > 0) {
throw new AggregateError(aggregateErrors, `Unable to fetch raw block for CID ${cid} from any gateway`)
} else {
throw new Error(`Unable to fetch raw block for CID ${cid} from any gateway`)
}

Check warning on line 104 in packages/block-brokers/src/trustless-gateway/broker.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/broker.ts#L103-L104

Added lines #L103 - L104 were not covered by tests
}

async createSession (root: CID, options: CreateTrustlessGatewaySessionOptions = {}): Promise<BlockBroker<TrustlessGatewayGetBlockProgressEvents>> {
const gateways: string[] = []
const minProviders = options.minProviders ?? DEFAULT_SESSION_MIN_PROVIDERS
const maxProviders = options.minProviders ?? DEFAULT_SESSION_MAX_PROVIDERS
const deferred = pDefer<BlockBroker<TrustlessGatewayGetBlockProgressEvents>>()
const broker = new TrustlessGatewayBlockBroker(this.components, {
gateways
})

this.log('finding transport-ipfs-gateway-http providers for cid %c', root)

const queue = new PeerQueue({
concurrency: options.providerQueryConcurrency ?? DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY
})

Promise.resolve().then(async () => {
for await (const provider of this.routing.findProviders(root, options)) {
const httpAddresses = provider.multiaddrs.filter(ma => {
if (HTTPS.matches(ma) || (options.allowInsecure === true && HTTP.matches(ma))) {
if (options.allowLocal === true) {
return true
}

if (DNS.matches(ma)) {
return true
}

return isPrivateIp(ma.toOptions().host) === false

Check warning on line 134 in packages/block-brokers/src/trustless-gateway/broker.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/broker.ts#L129-L134

Added lines #L129 - L134 were not covered by tests
}

return false
})

if (httpAddresses.length === 0) {
continue
}

this.log('found transport-ipfs-gateway-http provider %p for cid %c', provider.id, root)

void queue.add(async () => {
for (const ma of httpAddresses) {
let uri: string | undefined

try {
// /ip4/x.x.x.x/tcp/31337/http
// /ip4/x.x.x.x/tcp/31337/https
// etc
uri = multiaddrToUri(ma)

const resource = `${uri}/ipfs/${root.toString()}?format=raw`

// make sure the peer is available - HEAD support doesn't seem to
// be very widely implemented so as long as the remote responds
// we are happy they are valid
// https://specs.ipfs.tech/http-gateways/trustless-gateway/#head-ipfs-cid-path-params

// in the future we should be able to request `${uri}/.well-known/libp2p-http
// and discover an IPFS gateway from $.protocols['/ipfs/gateway'].path
// in the response
// https://github.com/libp2p/specs/pull/508/files
const response = await fetch(resource, {
method: 'HEAD',
headers: {
Accept: 'application/vnd.ipld.raw',
'Cache-Control': options.cacheControl ?? 'only-if-cached'
},
signal: AbortSignal.timeout(options.providerQueryTimeout ?? DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT)
})

this.log('HEAD %s %d', resource, response.status)
gateways.push(uri)
broker.addGateway(uri)

this.log('found %d transport-ipfs-gateway-http providers for cid %c', gateways.length, root)

if (gateways.length === minProviders) {
deferred.resolve(broker)
}

if (gateways.length === maxProviders) {
queue.clear()
}
} catch (err: any) {
this.log.error('could not fetch %c from %a', root, uri ?? ma, err)
}

Check warning on line 191 in packages/block-brokers/src/trustless-gateway/broker.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/broker.ts#L190-L191

Added lines #L190 - L191 were not covered by tests
}
})
}
})
.catch(err => {
this.log.error('error creating session for %c', root, err)

Check warning on line 197 in packages/block-brokers/src/trustless-gateway/broker.ts

View check run for this annotation

Codecov / codecov/patch

packages/block-brokers/src/trustless-gateway/broker.ts#L197

Added line #L197 was not covered by tests
})

return deferred.promise
}
}
3 changes: 2 additions & 1 deletion packages/block-brokers/src/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { BlockBroker } from '@helia/interface/src/blocks.js'
import type { Routing, BlockBroker } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressEvent } from 'progress-events'

Expand All @@ -22,6 +22,7 @@ export interface TrustlessGatewayBlockBrokerInit {
}

export interface TrustlessGatewayComponents {
routing: Routing
logger: ComponentLogger
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

/**
Expand Down Expand Up @@ -36,8 +37,11 @@ export class TrustlessGateway {
*/
#successes = 0

constructor (url: URL | string) {
private readonly log: Logger

constructor (url: URL | string, logger: ComponentLogger) {
this.url = url instanceof URL ? url : new URL(url)
this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`)
2color marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -67,6 +71,9 @@ export class TrustlessGateway {
},
cache: 'force-cache'
})

this.log('GET %s %d', gwUrl, res.status)

if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
Expand Down
55 changes: 48 additions & 7 deletions packages/block-brokers/test/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
/* eslint-env mocha */

import { defaultLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr } from '@multiformats/multiaddr'
import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr'
import { expect } from 'aegir/chai'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
import { type StubbedInstance, stubConstructor } from 'sinon-ts'
import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts'
import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js'
import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js'
import { createBlock } from './fixtures/create-block.js'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Routing } from '@helia/interface'
import type { CID } from 'multiformats/cid'

describe('trustless-gateway-block-broker', () => {
let blocks: Array<{ cid: CID, block: Uint8Array }>
let gatewayBlockBroker: BlockBroker
let gatewayBlockBroker: TrustlessGatewayBlockBroker
let gateways: Array<StubbedInstance<TrustlessGateway>>
let routing: StubbedInstance<Routing>

// take a Record<gatewayIndex, (gateway: StubbedInstance<TrustlessGateway>) => void> and stub the gateways
// Record.default is the default handler
Expand All @@ -29,19 +33,21 @@ describe('trustless-gateway-block-broker', () => {
}

beforeEach(async () => {
routing = stubInterface<Routing>()
blocks = []

for (let i = 0; i < 10; i++) {
blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i])))
}

gateways = [
stubConstructor(TrustlessGateway, 'http://localhost:8080'),
stubConstructor(TrustlessGateway, 'http://localhost:8081'),
stubConstructor(TrustlessGateway, 'http://localhost:8082'),
stubConstructor(TrustlessGateway, 'http://localhost:8083')
stubConstructor(TrustlessGateway, 'http://localhost:8080', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8081', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8082', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8083', defaultLogger())
]
gatewayBlockBroker = new TrustlessGatewayBlockBroker({
routing,
logger: defaultLogger()
})
// must copy the array because the broker calls .sort which mutates in-place
Expand Down Expand Up @@ -150,4 +156,39 @@ describe('trustless-gateway-block-broker', () => {
expect(gateways[1].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false()
expect(gateways[2].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false()
})

it('creates a session', async () => {
routing.findProviders.returns(async function * () {
// non-http provider
yield {
id: await createEd25519PeerId(),
multiaddrs: [
multiaddr('/ip4/132.32.25.6/tcp/1234')
]
}
// expired peer info
yield {
id: await createEd25519PeerId(),
multiaddrs: []
}
// http gateway
yield {
id: await createEd25519PeerId(),
multiaddrs: [
uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '')
]
}
}())

const sessionBlockstore = await gatewayBlockBroker.createSession?.(blocks[0].cid, {
minProviders: 1,
providerQueryConcurrency: 1,
allowInsecure: true,
allowLocal: true
})

expect(sessionBlockstore).to.be.ok()

await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block)
})
})
Loading