From 5cf216baa6806cd82f8fcddd1f024ef6a506f667 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 4 Apr 2024 11:27:41 +0100 Subject: [PATCH] feat: add block session support to @helia/interface (#398) The usage pattern is: ```javascript // unixfs cat command export async function * cat (cid: CID, blockstore: Blocks, options: Partial = {}): AsyncIterable { // create a session for the CID if support is available const blocks = await (blockstore.createSession != null ? blockstore.createSession(cid, options) : blockstore) const opts: CatOptions = mergeOptions(defaultOptions, options) // resolve and export using the session, if created, otherwise fall back to regular blockstore access const resolved = await resolve(cid, opts.path, blocks, opts) const result = await exporter(resolved.cid, blocks, opts) if (result.type !== 'file' && result.type !== 'raw') { throw new NotAFileError() } if (result.content == null) { throw new NoContentError() } yield * result.content(opts) } ``` Alternatively the user can control session creation: ```javascript import { unixfs } from '@helia/unixfs' import { createHelia } from 'helia' // or http import { CID } from 'multiformats/cid' const node = await createHelia() const rootCid = CID.parse('Qmfoo') const sessionBlockstore = await node.blockstore.createSession(rootCid, { signal: AbortSignal.timeout(5000) }) // all operations will use the same session const fs = unixfs({ blockstore: sessionBlockstore }) for await (const entry of fs.ls(rootCid) { if (entry.type !== 'file') { continue } for await (const buf of fs.cat(entry.cid)) { // ... } } ``` Removes the `BlockAnnouncer`/`BlockRetriever` single-method interface `BlockBroker` split because we would have to add another `BlockSessionFactory` interface for this which starts getting unwieldy. Instead just have all the methods be optional and filter the brokers before use. --------- Co-authored-by: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> --- packages/block-brokers/src/bitswap.ts | 11 +- .../src/trustless-gateway/broker.ts | 9 +- .../src/trustless-gateway/index.ts | 4 +- .../test/trustless-gateway.spec.ts | 12 +- packages/interface/src/blocks.ts | 76 ++++++++-- packages/utils/package.json | 1 - packages/utils/src/index.ts | 27 ++-- packages/utils/src/routing.ts | 19 --- packages/utils/src/storage.ts | 22 ++- packages/utils/src/utils/networked-storage.ts | 137 +++++++++++------- packages/utils/test/block-broker.spec.ts | 4 +- packages/utils/test/storage.spec.ts | 11 +- .../test/utils/networked-storage.spec.ts | 4 +- 13 files changed, 210 insertions(+), 127 deletions(-) diff --git a/packages/block-brokers/src/bitswap.ts b/packages/block-brokers/src/bitswap.ts index 072594fce..dfe79ff18 100644 --- a/packages/block-brokers/src/bitswap.ts +++ b/packages/block-brokers/src/bitswap.ts @@ -1,11 +1,10 @@ import { createBitswap } from 'ipfs-bitswap' -import type { BlockAnnouncer, BlockBroker, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks' +import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions } from '@helia/interface/blocks' import type { Libp2p, Startable } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Bitswap, BitswapNotifyProgressEvents, BitswapOptions, BitswapWantBlockProgressEvents } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' import type { MultihashHasher } from 'multiformats/hashes/interface' -import type { ProgressOptions } from 'progress-events' interface BitswapComponents { libp2p: Libp2p @@ -17,9 +16,7 @@ export interface BitswapInit extends BitswapOptions { } -class BitswapBlockBroker implements BlockAnnouncer>, BlockRetriever< -ProgressOptions ->, Startable { +class BitswapBlockBroker implements BlockBroker, Startable { private readonly bitswap: Bitswap private started: boolean @@ -65,11 +62,11 @@ ProgressOptions this.started = false } - announce (cid: CID, block: Uint8Array, options?: ProgressOptions): void { + async announce (cid: CID, block: Uint8Array, options?: BlockAnnounceOptions): Promise { this.bitswap.notify(cid, block, options) } - async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions> = {}): Promise { + async retrieve (cid: CID, options: BlockRetrievalOptions = {}): Promise { return this.bitswap.want(cid, options) } } diff --git a/packages/block-brokers/src/trustless-gateway/broker.ts b/packages/block-brokers/src/trustless-gateway/broker.ts index 433a690d0..d19548167 100644 --- a/packages/block-brokers/src/trustless-gateway/broker.ts +++ b/packages/block-brokers/src/trustless-gateway/broker.ts @@ -1,18 +1,15 @@ import { TrustlessGateway } from './trustless-gateway.js' import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js' import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js' -import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks' +import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks' import type { Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' -import type { ProgressOptions } from 'progress-events' /** * A class that accepts a list of trustless gateways that are queried * for blocks. */ -export class TrustlessGatewayBlockBroker implements BlockRetriever< -ProgressOptions -> { +export class TrustlessGatewayBlockBroker implements BlockBroker { private readonly gateways: TrustlessGateway[] private readonly log: Logger @@ -24,7 +21,7 @@ ProgressOptions }) } - async retrieve (cid: CID, options: BlockRetrievalOptions> = {}): Promise { + async retrieve (cid: CID, options: BlockRetrievalOptions = {}): Promise { // Loop through the gateways until we get a block or run out of gateways // TODO: switch to toSorted when support is better const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability()) diff --git a/packages/block-brokers/src/trustless-gateway/index.ts b/packages/block-brokers/src/trustless-gateway/index.ts index 91dcab15c..93489ecbd 100644 --- a/packages/block-brokers/src/trustless-gateway/index.ts +++ b/packages/block-brokers/src/trustless-gateway/index.ts @@ -1,5 +1,5 @@ import { TrustlessGatewayBlockBroker } from './broker.js' -import type { BlockRetriever } from '@helia/interface/src/blocks.js' +import type { BlockBroker } from '@helia/interface/src/blocks.js' import type { ComponentLogger } from '@libp2p/interface' import type { ProgressEvent } from 'progress-events' @@ -25,6 +25,6 @@ export interface TrustlessGatewayComponents { logger: ComponentLogger } -export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockRetriever { +export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockBroker { return (components) => new TrustlessGatewayBlockBroker(components, init) } diff --git a/packages/block-brokers/test/trustless-gateway.spec.ts b/packages/block-brokers/test/trustless-gateway.spec.ts index 481ac6df8..9851e4121 100644 --- a/packages/block-brokers/test/trustless-gateway.spec.ts +++ b/packages/block-brokers/test/trustless-gateway.spec.ts @@ -8,12 +8,12 @@ import { type StubbedInstance, stubConstructor } from 'sinon-ts' import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js' import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js' import { createBlock } from './fixtures/create-block.js' -import type { BlockRetriever } from '@helia/interface/blocks' +import type { BlockBroker } from '@helia/interface/blocks' import type { CID } from 'multiformats/cid' describe('trustless-gateway-block-broker', () => { let blocks: Array<{ cid: CID, block: Uint8Array }> - let gatewayBlockBroker: BlockRetriever + let gatewayBlockBroker: BlockBroker let gateways: Array> // take a Record) => void> and stub the gateways @@ -54,7 +54,7 @@ describe('trustless-gateway-block-broker', () => { gateway.getRawBlock.rejects(new Error('failed')) } - await expect(gatewayBlockBroker.retrieve(blocks[0].cid)) + await expect(gatewayBlockBroker.retrieve?.(blocks[0].cid)) .to.eventually.be.rejected() .with.property('errors') .with.lengthOf(gateways.length) @@ -78,7 +78,7 @@ describe('trustless-gateway-block-broker', () => { } }) - await expect(gatewayBlockBroker.retrieve(blocks[1].cid)).to.eventually.be.rejected() + await expect(gatewayBlockBroker.retrieve?.(blocks[1].cid)).to.eventually.be.rejected() // all gateways were called expect(gateways[0].getRawBlock.calledWith(blocks[1].cid)).to.be.true() @@ -105,7 +105,7 @@ describe('trustless-gateway-block-broker', () => { } }) - const block = await gatewayBlockBroker.retrieve(cid1, { + const block = await gatewayBlockBroker.retrieve?.(cid1, { validateFn: async (block) => { if (block !== block1) { throw new Error('invalid block') @@ -136,7 +136,7 @@ describe('trustless-gateway-block-broker', () => { gateway.reliability.returns(0) // make sure other gateways are called last } }) - const block = await gatewayBlockBroker.retrieve(cid1, { + const block = await gatewayBlockBroker.retrieve?.(cid1, { validateFn: async (block) => { if (block !== block1) { throw new Error('invalid block') diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index ecd230126..a69d5d6db 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -44,7 +44,9 @@ export type DeleteManyBlocksProgressEvents = export interface GetOfflineOptions { /** - * If true, do not attempt to fetch any missing blocks from the network (default: false) + * If true, do not attempt to fetch any missing blocks from the network + * + * @default false */ offline?: boolean } @@ -54,10 +56,19 @@ ProgressOptions, ProgressOptions, GetOfflineOptions & ProgressOptions, ProgressOptions, ProgressOptions, ProgressOptions > { - + /** + * A session blockstore is a special blockstore that only pulls content from a + * subset of network peers which respond as having the block for the initial + * root CID. + * + * Any blocks written to the blockstore as part of the session will propagate + * to the blockstore the session was created from. + * + */ + createSession(root: CID, options?: CreateSessionOptions): Promise } -export type BlockRetrievalOptions = AbortOptions & GetProgressOptions & { +export interface BlockRetrievalOptions = ProgressEvent> extends AbortOptions, ProgressOptions { /** * A function that blockBrokers should call prior to returning a block to ensure it can maintain control * of the block request flow. e.g. TrustedGatewayBlockBroker will use this to ensure that the block @@ -67,18 +78,65 @@ export type BlockRetrievalOptions } -export interface BlockRetriever { +export interface BlockAnnounceOptions = ProgressEvent> extends AbortOptions, ProgressOptions { + +} + +export interface CreateSessionOptions = ProgressEvent> extends AbortOptions, ProgressOptions { /** - * Retrieve a block from a source + * The minimum number of providers for the root CID that are required for + * successful session creation. + * + * The session will become usable once this many providers have been + * discovered, up to `maxProviders` providers will continue to be added. + * + * @default 1 + */ + minProviders?: number + + /** + * The maximum number of providers for the root CID to be added to a session. + * + * @default 5 */ - retrieve(cid: CID, options?: BlockRetrievalOptions): Promise + maxProviders?: number + + /** + * When searching for providers of the root CID, implementations can check + * that providers are still online and have the requested block. This setting + * controls how many peers to query at the same time. + * + * @default 5 + */ + providerQueryConcurrency?: number + + /** + * How long each queried provider has to respond either that they have the + * root block or to send it to us. + * + * @default 5000 + */ + providerQueryTimeout?: number } -export interface BlockAnnouncer { +export interface BlockBroker = ProgressEvent, AnnounceProgressEvents extends ProgressEvent = ProgressEvent> { + /** + * Retrieve a block from a source + */ + retrieve?(cid: CID, options?: BlockRetrievalOptions): Promise + /** * Make a new block available to peers */ - announce(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void + announce?(cid: CID, block: Uint8Array, options?: BlockAnnounceOptions): Promise + + /** + * Create a new session + */ + createSession?(root: CID, options?: CreateSessionOptions): Promise> } -export type BlockBroker = BlockRetriever | BlockAnnouncer +export const DEFAULT_SESSION_MIN_PROVIDERS = 1 +export const DEFAULT_SESSION_MAX_PROVIDERS = 5 +export const DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY = 5 +export const DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT = 5000 diff --git a/packages/utils/package.json b/packages/utils/package.json index 81da7c659..120511365 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -59,7 +59,6 @@ "@ipld/dag-pb": "^4.1.0", "@libp2p/interface": "^1.1.4", "@libp2p/logger": "^4.0.7", - "@libp2p/peer-collections": "^5.1.7", "@libp2p/utils": "^5.2.6", "@multiformats/dns": "^1.0.1", "any-signal": "^4.1.1", diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index ef618e7b0..d4feccf74 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -119,6 +119,7 @@ interface Components { dagWalkers: Record logger: ComponentLogger blockBrokers: BlockBroker[] + routing: Routing dns: DNS } @@ -140,6 +141,7 @@ export class Helia implements HeliaInterface { this.dagWalkers = defaultDagWalkers(init.dagWalkers) this.dns = init.dns ?? dns() + // @ts-expect-error routing is not set const components: Components = { blockstore: init.blockstore, datastore: init.datastore, @@ -151,19 +153,7 @@ export class Helia implements HeliaInterface { ...(init.components ?? {}) } - components.blockBrokers = init.blockBrokers.map((fn) => { - return fn(components) - }) - - const networkedStorage = new NetworkedStorage(components) - - this.pins = new PinsImpl(init.datastore, networkedStorage, this.dagWalkers) - - this.blockstore = new BlockStorage(networkedStorage, this.pins, { - holdGcLock: init.holdGcLock ?? true - }) - this.datastore = init.datastore - this.routing = new RoutingClass(components, { + this.routing = components.routing = new RoutingClass(components, { routers: (init.routers ?? []).flatMap((router: any) => { // if the router itself is a router const routers = [ @@ -183,6 +173,17 @@ export class Helia implements HeliaInterface { return routers }) }) + + const networkedStorage = new NetworkedStorage(components) + this.pins = new PinsImpl(init.datastore, networkedStorage, this.dagWalkers) + this.blockstore = new BlockStorage(networkedStorage, this.pins, { + holdGcLock: init.holdGcLock ?? true + }) + this.datastore = init.datastore + + components.blockBrokers = init.blockBrokers.map((fn) => { + return fn(components) + }) } async start (): Promise { diff --git a/packages/utils/src/routing.ts b/packages/utils/src/routing.ts index c0b6e26ce..be5b75a67 100644 --- a/packages/utils/src/routing.ts +++ b/packages/utils/src/routing.ts @@ -1,5 +1,4 @@ import { CodeError, start, stop } from '@libp2p/interface' -import { PeerSet } from '@libp2p/peer-collections' import merge from 'it-merge' import type { Routing as RoutingInterface, Provider, RoutingOptions } from '@helia/interface' import type { AbortOptions, ComponentLogger, Logger, PeerId, PeerInfo, Startable } from '@libp2p/interface' @@ -38,8 +37,6 @@ export class Routing implements RoutingInterface, Startable { throw new CodeError('No content routers available', 'ERR_NO_ROUTERS_AVAILABLE') } - const seen = new PeerSet() - for await (const peer of merge( ...supports(this.routers, 'findProviders') .map(router => router.findProviders(key, options)) @@ -50,13 +47,6 @@ export class Routing implements RoutingInterface, Startable { continue } - // deduplicate peers - if (seen.has(peer.id)) { - continue - } - - seen.add(peer.id) - yield peer } } @@ -142,8 +132,6 @@ export class Routing implements RoutingInterface, Startable { throw new CodeError('No peer routers available', 'ERR_NO_ROUTERS_AVAILABLE') } - const seen = new PeerSet() - for await (const peer of merge( ...supports(this.routers, 'getClosestPeers') .map(router => router.getClosestPeers(key, options)) @@ -152,13 +140,6 @@ export class Routing implements RoutingInterface, Startable { continue } - // deduplicate peers - if (seen.has(peer.id)) { - continue - } - - seen.add(peer.id) - yield peer } } diff --git a/packages/utils/src/storage.ts b/packages/utils/src/storage.ts index 909c2e89c..f184ae22c 100644 --- a/packages/utils/src/storage.ts +++ b/packages/utils/src/storage.ts @@ -1,4 +1,4 @@ -import { start, stop } from '@libp2p/interface' +import { CodeError, start, stop } from '@libp2p/interface' import createMortice from 'mortice' import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' import type { Pins } from '@helia/interface/pins' @@ -24,14 +24,14 @@ export interface GetOptions extends AbortOptions { */ export class BlockStorage implements Blocks, Startable { public lock: Mortice - private readonly child: Blockstore + private readonly child: Blocks private readonly pins: Pins private started: boolean /** * Create a new BlockStorage */ - constructor (blockstore: Blockstore, pins: Pins, options: BlockStorageInit = {}) { + constructor (blockstore: Blocks, pins: Pins, options: BlockStorageInit = {}) { this.child = blockstore this.pins = pins this.lock = createMortice({ @@ -169,4 +169,20 @@ export class BlockStorage implements Blocks, Startable { releaseLock() } } + + async createSession (root: CID, options?: AbortOptions): Promise { + const releaseLock = await this.lock.readLock() + + try { + const blocks = await this.child.createSession(root, options) + + if (blocks == null) { + throw new CodeError('Sessions not supported', 'ERR_UNSUPPORTED') + } + + return blocks + } finally { + releaseLock() + } + } } diff --git a/packages/utils/src/utils/networked-storage.ts b/packages/utils/src/utils/networked-storage.ts index 5e8f59642..2fd5f2417 100644 --- a/packages/utils/src/utils/networked-storage.ts +++ b/packages/utils/src/utils/networked-storage.ts @@ -6,30 +6,26 @@ import filter from 'it-filter' import forEach from 'it-foreach' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' -import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks' +import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetrievalOptions } from '@helia/interface/blocks' import type { AbortOptions, ComponentLogger, Logger, LoggerOptions, Startable } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { AwaitIterable } from 'interface-store' import type { CID } from 'multiformats/cid' import type { MultihashHasher } from 'multiformats/hashes/interface' -export interface GetOptions extends AbortOptions { - progress?(evt: Event): void -} - -function isBlockRetriever (b: any): b is BlockRetriever { - return typeof b.retrieve === 'function' +export interface NetworkedStorageInit { + root?: CID } -function isBlockAnnouncer (b: any): b is BlockAnnouncer { - return typeof b.announce === 'function' +export interface GetOptions extends AbortOptions { + progress?(evt: Event): void } export interface NetworkedStorageComponents { blockstore: Blockstore logger: ComponentLogger - blockBrokers?: BlockBroker[] - hashers?: Record + blockBrokers: BlockBroker[] + hashers: Record } /** @@ -38,23 +34,23 @@ export interface NetworkedStorageComponents { */ export class NetworkedStorage implements Blocks, Startable { private readonly child: Blockstore - private readonly blockRetrievers: BlockRetriever[] - private readonly blockAnnouncers: BlockAnnouncer[] private readonly hashers: Record private started: boolean private readonly log: Logger + private readonly logger: ComponentLogger + private readonly components: NetworkedStorageComponents /** * Create a new BlockStorage */ - constructor (components: NetworkedStorageComponents) { - this.log = components.logger.forComponent('helia:networked-storage') + constructor (components: NetworkedStorageComponents, init: NetworkedStorageInit = {}) { + this.log = components.logger.forComponent(`helia:networked-storage${init.root == null ? '' : `:${init.root}`}`) + this.logger = components.logger + this.components = components this.child = new TieredBlockstore([ new IdentityBlockstore(), components.blockstore ]) - this.blockRetrievers = (components.blockBrokers ?? []).filter(isBlockRetriever) - this.blockAnnouncers = (components.blockBrokers ?? []).filter(isBlockAnnouncer) this.hashers = components.hashers ?? {} this.started = false } @@ -64,12 +60,12 @@ export class NetworkedStorage implements Blocks, Startable { } async start (): Promise { - await start(this.child, ...new Set([...this.blockRetrievers, ...this.blockAnnouncers])) + await start(this.child, ...this.components.blockBrokers) this.started = true } async stop (): Promise { - await stop(this.child, ...new Set([...this.blockRetrievers, ...this.blockAnnouncers])) + await stop(this.child, ...this.components.blockBrokers) this.started = false } @@ -88,9 +84,9 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) - }) + await Promise.all( + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + ) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -111,11 +107,11 @@ export class NetworkedStorage implements Blocks, Startable { return !has }) - const notifyEach = forEach(missingBlocks, ({ cid, block }): void => { + const notifyEach = forEach(missingBlocks, async ({ cid, block }): Promise => { options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) - }) + await Promise.all( + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + ) }) options.onProgress?.(new CustomProgressEvent('blocks:put-many:blockstore:put-many')) @@ -129,7 +125,7 @@ export class NetworkedStorage implements Blocks, Startable { if (options.offline !== true && !(await this.child.has(cid))) { // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.components.blockBrokers, this.hashers[cid.multihash.code], { ...options, log: this.log }) @@ -138,9 +134,9 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) - }) + await Promise.all( + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + ) return block } @@ -160,7 +156,7 @@ export class NetworkedStorage implements Blocks, Startable { if (options.offline !== true && !(await this.child.has(cid))) { // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.components.blockBrokers, this.hashers[cid.multihash.code], { ...options, log: this.log }) @@ -169,9 +165,9 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) - }) + await Promise.all( + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + ) } })) } @@ -205,8 +201,30 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:get-all:blockstore:get-many')) yield * this.child.getAll(options) } + + async createSession (root: CID, options?: AbortOptions & ProgressOptions): Promise { + const blockBrokers = await Promise.all(this.components.blockBrokers.map(async broker => { + if (broker.createSession == null) { + return broker + } + + return broker.createSession(root, options) + })) + + return new NetworkedStorage({ + blockstore: this.child, + blockBrokers, + hashers: this.hashers, + logger: this.logger + }, { + root + }) + } } +function isRetrievingBlockBroker (broker: BlockBroker): broker is Required> { + return typeof broker.retrieve === 'function' +} export const getCidBlockVerifierFunction = (cid: CID, hasher: MultihashHasher): Required['validateFn'] => { if (hasher == null) { throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG') @@ -227,38 +245,47 @@ export const getCidBlockVerifierFunction = (cid: CID, hasher: MultihashHasher): * Race block providers cancelling any pending requests once the block has been * found. */ -async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hasher: MultihashHasher, options: AbortOptions & LoggerOptions): Promise { +async function raceBlockRetrievers (cid: CID, blockBrokers: BlockBroker[], hasher: MultihashHasher, options: AbortOptions & LoggerOptions): Promise { const validateFn = getCidBlockVerifierFunction(cid, hasher) const controller = new AbortController() const signal = anySignal([controller.signal, options.signal]) + const retrievers: Array>> = [] + + for (const broker of blockBrokers) { + if (isRetrievingBlockBroker(broker)) { + retrievers.push(broker) + } + } + try { return await Promise.any( - providers.map(async provider => { - try { - let blocksWereValidated = false - const block = await provider.retrieve(cid, { - ...options, - signal, - validateFn: async (block: Uint8Array): Promise => { + retrievers + .map(async retriever => { + try { + let blocksWereValidated = false + const block = await retriever.retrieve(cid, { + ...options, + signal, + validateFn: async (block: Uint8Array): Promise => { + await validateFn(block) + blocksWereValidated = true + } + }) + + if (!blocksWereValidated) { + // the blockBroker either did not throw an error when attempting to validate the block + // or did not call the validateFn at all. We should validate the block ourselves await validateFn(block) - blocksWereValidated = true } - }) - if (!blocksWereValidated) { - // the blockBroker either did not throw an error when attempting to validate the block - // or did not call the validateFn at all. We should validate the block ourselves - await validateFn(block) + return block + } catch (err) { + options.log.error('could not retrieve verified block for %c', cid, err) + throw err } - - return block - } catch (err) { - options.log.error('could not retrieve verified block for %c', cid, err) - throw err - } - }) + }) ) } finally { signal.clear() diff --git a/packages/utils/test/block-broker.spec.ts b/packages/utils/test/block-broker.spec.ts index 1cef6c475..140b7a51a 100644 --- a/packages/utils/test/block-broker.spec.ts +++ b/packages/utils/test/block-broker.spec.ts @@ -12,7 +12,7 @@ import { type StubbedInstance, stubInterface } from 'sinon-ts' import { defaultHashers } from '../src/utils/default-hashers.js' import { NetworkedStorage } from '../src/utils/networked-storage.js' import { createBlock } from './fixtures/create-block.js' -import type { BlockBroker, BlockRetriever } from '@helia/interface/blocks' +import type { BlockBroker } from '@helia/interface/blocks' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' @@ -21,7 +21,7 @@ describe('block-broker', () => { let blockstore: Blockstore let bitswapBlockBroker: StubbedInstance> let blocks: Array<{ cid: CID, block: Uint8Array }> - let gatewayBlockBroker: StubbedInstance> + let gatewayBlockBroker: StubbedInstance> beforeEach(async () => { blocks = [] diff --git a/packages/utils/test/storage.spec.ts b/packages/utils/test/storage.spec.ts index 92d7614a2..42c5a5b49 100644 --- a/packages/utils/test/storage.spec.ts +++ b/packages/utils/test/storage.spec.ts @@ -10,13 +10,20 @@ import * as raw from 'multiformats/codecs/raw' import { PinsImpl } from '../src/pins.js' import { BlockStorage } from '../src/storage.js' import { createBlock } from './fixtures/create-block.js' +import type { Blocks } from '@helia/interface' import type { Pins } from '@helia/interface/pins' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' +class MemoryBlocks extends MemoryBlockstore implements Blocks { + async createSession (): Promise { + throw new Error('Not implemented') + } +} + describe('storage', () => { let storage: BlockStorage - let blockstore: Blockstore + let blockstore: Blocks let pins: Pins let blocks: Array<{ cid: CID, block: Uint8Array }> @@ -29,7 +36,7 @@ describe('storage', () => { const datastore = new MemoryDatastore() - blockstore = new MemoryBlockstore() + blockstore = new MemoryBlocks() pins = new PinsImpl(datastore, blockstore, []) storage = new BlockStorage(blockstore, pins, { holdGcLock: true diff --git a/packages/utils/test/utils/networked-storage.spec.ts b/packages/utils/test/utils/networked-storage.spec.ts index 7d0fd4b20..ba3eb9799 100644 --- a/packages/utils/test/utils/networked-storage.spec.ts +++ b/packages/utils/test/utils/networked-storage.spec.ts @@ -16,13 +16,13 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { defaultHashers } from '../../src/utils/default-hashers.js' import { NetworkedStorage } from '../../src/utils/networked-storage.js' import { createBlock } from '../fixtures/create-block.js' -import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks' +import type { BlockBroker } from '@helia/interface/blocks' import type { Blockstore } from 'interface-blockstore' describe('networked-storage', () => { let storage: NetworkedStorage let blockstore: Blockstore - let bitswap: StubbedInstance> + let bitswap: StubbedInstance> let blocks: Array<{ cid: CID, block: Uint8Array }> beforeEach(async () => {