Skip to content

Commit

Permalink
feat: add block session support to @helia/interface (#398)
Browse files Browse the repository at this point in the history
The usage pattern is:

```javascript
// unixfs cat command
export async function * cat (cid: CID, blockstore: Blocks, options: Partial<CatOptions> = {}): AsyncIterable<Uint8Array> {
  // create a session for the CID if support is available
  const blocks = await (blockstore.createSession != null ? blockstore.createSession(cid, options) : blockstore)
  const opts: CatOptions = mergeOptions(defaultOptions, options)

  // resolve and export using the session, if created, otherwise fall back to regular blockstore access
  const resolved = await resolve(cid, opts.path, blocks, opts)
  const result = await exporter(resolved.cid, blocks, opts)

  if (result.type !== 'file' && result.type !== 'raw') {
    throw new NotAFileError()
  }

  if (result.content == null) {
    throw new NoContentError()
  }

  yield * result.content(opts)
}
```

Alternatively the user can control session creation:

```javascript
import { unixfs } from '@helia/unixfs'
import { createHelia } from 'helia' // or http
import { CID } from 'multiformats/cid'

const node = await createHelia()
const rootCid = CID.parse('Qmfoo')
const sessionBlockstore = await node.blockstore.createSession(rootCid, {
  signal: AbortSignal.timeout(5000)
})

// all operations will use the same session
const fs = unixfs({ blockstore: sessionBlockstore })

for await (const entry of fs.ls(rootCid) {
  if (entry.type !== 'file') {
    continue
  }

  for await (const buf of fs.cat(entry.cid)) {
    // ...
  }
}
```

Removes the `BlockAnnouncer`/`BlockRetriever` single-method interface `BlockBroker` split because we would have to add another `BlockSessionFactory` interface for this which starts getting unwieldy.  Instead just have all the methods be optional and filter the brokers before use.

---------

Co-authored-by: Russell Dempsey <[email protected]>
  • Loading branch information
achingbrain and SgtPooki authored Apr 4, 2024
1 parent 1c2c4b0 commit 5cf216b
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 127 deletions.
11 changes: 4 additions & 7 deletions packages/block-brokers/src/bitswap.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockAnnouncer, BlockBroker, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions } from '@helia/interface/blocks'
import type { Libp2p, Startable } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapOptions, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

interface BitswapComponents {
libp2p: Libp2p
Expand All @@ -17,9 +16,7 @@ export interface BitswapInit extends BitswapOptions {

}

class BitswapBlockBroker implements BlockAnnouncer<ProgressOptions<BitswapNotifyProgressEvents>>, BlockRetriever<
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
class BitswapBlockBroker implements BlockBroker<BitswapWantBlockProgressEvents, BitswapNotifyProgressEvents>, Startable {
private readonly bitswap: Bitswap
private started: boolean

Expand Down Expand Up @@ -65,11 +62,11 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.started = false
}

announce (cid: CID, block: Uint8Array, options?: ProgressOptions<BitswapNotifyProgressEvents>): void {
async announce (cid: CID, block: Uint8Array, options?: BlockAnnounceOptions<BitswapNotifyProgressEvents>): Promise<void> {
this.bitswap.notify(cid, block, options)
}

async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions<ProgressOptions<BitswapWantBlockProgressEvents>> = {}): Promise<Uint8Array> {
async retrieve (cid: CID, options: BlockRetrievalOptions<BitswapWantBlockProgressEvents> = {}): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
Expand Down
9 changes: 3 additions & 6 deletions packages/block-brokers/src/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks'
import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks'
import type { Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockRetriever<
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGatewayGetBlockProgressEvents> {
private readonly gateways: TrustlessGateway[]
private readonly log: Logger

Expand All @@ -24,7 +21,7 @@ ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
})
}

async retrieve (cid: CID, options: BlockRetrievalOptions<ProgressOptions<TrustlessGatewayGetBlockProgressEvents>> = {}): Promise<Uint8Array> {
async retrieve (cid: CID, options: BlockRetrievalOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
// TODO: switch to toSorted when support is better
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
Expand Down
4 changes: 2 additions & 2 deletions packages/block-brokers/src/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { BlockRetriever } from '@helia/interface/src/blocks.js'
import type { BlockBroker } from '@helia/interface/src/blocks.js'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressEvent } from 'progress-events'

Expand All @@ -25,6 +25,6 @@ export interface TrustlessGatewayComponents {
logger: ComponentLogger
}

export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockRetriever {
export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockBroker<TrustlessGatewayGetBlockProgressEvents> {
return (components) => new TrustlessGatewayBlockBroker(components, init)
}
12 changes: 6 additions & 6 deletions packages/block-brokers/test/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import { type StubbedInstance, stubConstructor } from 'sinon-ts'
import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js'
import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js'
import { createBlock } from './fixtures/create-block.js'
import type { BlockRetriever } from '@helia/interface/blocks'
import type { BlockBroker } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'

describe('trustless-gateway-block-broker', () => {
let blocks: Array<{ cid: CID, block: Uint8Array }>
let gatewayBlockBroker: BlockRetriever
let gatewayBlockBroker: BlockBroker
let gateways: Array<StubbedInstance<TrustlessGateway>>

// take a Record<gatewayIndex, (gateway: StubbedInstance<TrustlessGateway>) => void> and stub the gateways
Expand Down Expand Up @@ -54,7 +54,7 @@ describe('trustless-gateway-block-broker', () => {
gateway.getRawBlock.rejects(new Error('failed'))
}

await expect(gatewayBlockBroker.retrieve(blocks[0].cid))
await expect(gatewayBlockBroker.retrieve?.(blocks[0].cid))
.to.eventually.be.rejected()
.with.property('errors')
.with.lengthOf(gateways.length)
Expand All @@ -78,7 +78,7 @@ describe('trustless-gateway-block-broker', () => {
}
})

await expect(gatewayBlockBroker.retrieve(blocks[1].cid)).to.eventually.be.rejected()
await expect(gatewayBlockBroker.retrieve?.(blocks[1].cid)).to.eventually.be.rejected()

// all gateways were called
expect(gateways[0].getRawBlock.calledWith(blocks[1].cid)).to.be.true()
Expand All @@ -105,7 +105,7 @@ describe('trustless-gateway-block-broker', () => {
}
})

const block = await gatewayBlockBroker.retrieve(cid1, {
const block = await gatewayBlockBroker.retrieve?.(cid1, {
validateFn: async (block) => {
if (block !== block1) {
throw new Error('invalid block')
Expand Down Expand Up @@ -136,7 +136,7 @@ describe('trustless-gateway-block-broker', () => {
gateway.reliability.returns(0) // make sure other gateways are called last
}
})
const block = await gatewayBlockBroker.retrieve(cid1, {
const block = await gatewayBlockBroker.retrieve?.(cid1, {
validateFn: async (block) => {
if (block !== block1) {
throw new Error('invalid block')
Expand Down
76 changes: 67 additions & 9 deletions packages/interface/src/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export type DeleteManyBlocksProgressEvents =

export interface GetOfflineOptions {
/**
* If true, do not attempt to fetch any missing blocks from the network (default: false)
* If true, do not attempt to fetch any missing blocks from the network
*
* @default false
*/
offline?: boolean
}
Expand All @@ -54,10 +56,19 @@ ProgressOptions<PutBlockProgressEvents>, ProgressOptions<PutManyBlocksProgressEv
GetOfflineOptions & ProgressOptions<GetBlockProgressEvents>, GetOfflineOptions & ProgressOptions<GetManyBlocksProgressEvents>, ProgressOptions<GetAllBlocksProgressEvents>,
ProgressOptions<DeleteBlockProgressEvents>, ProgressOptions<DeleteManyBlocksProgressEvents>
> {

/**
* A session blockstore is a special blockstore that only pulls content from a
* subset of network peers which respond as having the block for the initial
* root CID.
*
* Any blocks written to the blockstore as part of the session will propagate
* to the blockstore the session was created from.
*
*/
createSession(root: CID, options?: CreateSessionOptions<GetBlockProgressEvents>): Promise<Blockstore>
}

export type BlockRetrievalOptions<GetProgressOptions extends ProgressOptions = ProgressOptions> = AbortOptions & GetProgressOptions & {
export interface BlockRetrievalOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents> {
/**
* A function that blockBrokers should call prior to returning a block to ensure it can maintain control
* of the block request flow. e.g. TrustedGatewayBlockBroker will use this to ensure that the block
Expand All @@ -67,18 +78,65 @@ export type BlockRetrievalOptions<GetProgressOptions extends ProgressOptions = P
validateFn?(block: Uint8Array): Promise<void>
}

export interface BlockRetriever<GetProgressOptions extends ProgressOptions = ProgressOptions> {
export interface BlockAnnounceOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents> {

}

export interface CreateSessionOptions <ProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> extends AbortOptions, ProgressOptions<ProgressEvents> {
/**
* Retrieve a block from a source
* The minimum number of providers for the root CID that are required for
* successful session creation.
*
* The session will become usable once this many providers have been
* discovered, up to `maxProviders` providers will continue to be added.
*
* @default 1
*/
minProviders?: number

/**
* The maximum number of providers for the root CID to be added to a session.
*
* @default 5
*/
retrieve(cid: CID, options?: BlockRetrievalOptions<GetProgressOptions>): Promise<Uint8Array>
maxProviders?: number

/**
* When searching for providers of the root CID, implementations can check
* that providers are still online and have the requested block. This setting
* controls how many peers to query at the same time.
*
* @default 5
*/
providerQueryConcurrency?: number

/**
* How long each queried provider has to respond either that they have the
* root block or to send it to us.
*
* @default 5000
*/
providerQueryTimeout?: number
}

export interface BlockAnnouncer<NotifyProgressOptions extends ProgressOptions = ProgressOptions> {
export interface BlockBroker<RetrieveProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>, AnnounceProgressEvents extends ProgressEvent<any, any> = ProgressEvent<any, any>> {
/**
* Retrieve a block from a source
*/
retrieve?(cid: CID, options?: BlockRetrievalOptions<RetrieveProgressEvents>): Promise<Uint8Array>

/**
* Make a new block available to peers
*/
announce(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void
announce?(cid: CID, block: Uint8Array, options?: BlockAnnounceOptions<AnnounceProgressEvents>): Promise<void>

/**
* Create a new session
*/
createSession?(root: CID, options?: CreateSessionOptions<RetrieveProgressEvents>): Promise<BlockBroker<RetrieveProgressEvents, AnnounceProgressEvents>>
}

export type BlockBroker = BlockRetriever | BlockAnnouncer
export const DEFAULT_SESSION_MIN_PROVIDERS = 1
export const DEFAULT_SESSION_MAX_PROVIDERS = 5
export const DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY = 5
export const DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT = 5000
1 change: 0 additions & 1 deletion packages/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
"@ipld/dag-pb": "^4.1.0",
"@libp2p/interface": "^1.1.4",
"@libp2p/logger": "^4.0.7",
"@libp2p/peer-collections": "^5.1.7",
"@libp2p/utils": "^5.2.6",
"@multiformats/dns": "^1.0.1",
"any-signal": "^4.1.1",
Expand Down
27 changes: 14 additions & 13 deletions packages/utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ interface Components {
dagWalkers: Record<number, DAGWalker>
logger: ComponentLogger
blockBrokers: BlockBroker[]
routing: Routing
dns: DNS
}

Expand All @@ -140,6 +141,7 @@ export class Helia implements HeliaInterface {
this.dagWalkers = defaultDagWalkers(init.dagWalkers)
this.dns = init.dns ?? dns()

// @ts-expect-error routing is not set
const components: Components = {
blockstore: init.blockstore,
datastore: init.datastore,
Expand All @@ -151,19 +153,7 @@ export class Helia implements HeliaInterface {
...(init.components ?? {})
}

components.blockBrokers = init.blockBrokers.map((fn) => {
return fn(components)
})

const networkedStorage = new NetworkedStorage(components)

this.pins = new PinsImpl(init.datastore, networkedStorage, this.dagWalkers)

this.blockstore = new BlockStorage(networkedStorage, this.pins, {
holdGcLock: init.holdGcLock ?? true
})
this.datastore = init.datastore
this.routing = new RoutingClass(components, {
this.routing = components.routing = new RoutingClass(components, {
routers: (init.routers ?? []).flatMap((router: any) => {
// if the router itself is a router
const routers = [
Expand All @@ -183,6 +173,17 @@ export class Helia implements HeliaInterface {
return routers
})
})

const networkedStorage = new NetworkedStorage(components)
this.pins = new PinsImpl(init.datastore, networkedStorage, this.dagWalkers)
this.blockstore = new BlockStorage(networkedStorage, this.pins, {
holdGcLock: init.holdGcLock ?? true
})
this.datastore = init.datastore

components.blockBrokers = init.blockBrokers.map((fn) => {
return fn(components)
})
}

async start (): Promise<void> {
Expand Down
19 changes: 0 additions & 19 deletions packages/utils/src/routing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { CodeError, start, stop } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import merge from 'it-merge'
import type { Routing as RoutingInterface, Provider, RoutingOptions } from '@helia/interface'
import type { AbortOptions, ComponentLogger, Logger, PeerId, PeerInfo, Startable } from '@libp2p/interface'
Expand Down Expand Up @@ -38,8 +37,6 @@ export class Routing implements RoutingInterface, Startable {
throw new CodeError('No content routers available', 'ERR_NO_ROUTERS_AVAILABLE')
}

const seen = new PeerSet()

for await (const peer of merge(
...supports(this.routers, 'findProviders')
.map(router => router.findProviders(key, options))
Expand All @@ -50,13 +47,6 @@ export class Routing implements RoutingInterface, Startable {
continue
}

// deduplicate peers
if (seen.has(peer.id)) {
continue
}

seen.add(peer.id)

yield peer
}
}
Expand Down Expand Up @@ -142,8 +132,6 @@ export class Routing implements RoutingInterface, Startable {
throw new CodeError('No peer routers available', 'ERR_NO_ROUTERS_AVAILABLE')
}

const seen = new PeerSet()

for await (const peer of merge(
...supports(this.routers, 'getClosestPeers')
.map(router => router.getClosestPeers(key, options))
Expand All @@ -152,13 +140,6 @@ export class Routing implements RoutingInterface, Startable {
continue
}

// deduplicate peers
if (seen.has(peer.id)) {
continue
}

seen.add(peer.id)

yield peer
}
}
Expand Down
22 changes: 19 additions & 3 deletions packages/utils/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { start, stop } from '@libp2p/interface'
import { CodeError, start, stop } from '@libp2p/interface'
import createMortice from 'mortice'
import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks'
import type { Pins } from '@helia/interface/pins'
Expand All @@ -24,14 +24,14 @@ export interface GetOptions extends AbortOptions {
*/
export class BlockStorage implements Blocks, Startable {
public lock: Mortice
private readonly child: Blockstore
private readonly child: Blocks
private readonly pins: Pins
private started: boolean

/**
* Create a new BlockStorage
*/
constructor (blockstore: Blockstore, pins: Pins, options: BlockStorageInit = {}) {
constructor (blockstore: Blocks, pins: Pins, options: BlockStorageInit = {}) {
this.child = blockstore
this.pins = pins
this.lock = createMortice({
Expand Down Expand Up @@ -169,4 +169,20 @@ export class BlockStorage implements Blocks, Startable {
releaseLock()
}
}

async createSession (root: CID, options?: AbortOptions): Promise<Blockstore> {
const releaseLock = await this.lock.readLock()

try {
const blocks = await this.child.createSession(root, options)

if (blocks == null) {
throw new CodeError('Sessions not supported', 'ERR_UNSUPPORTED')
}

return blocks
} finally {
releaseLock()
}
}
}
Loading

0 comments on commit 5cf216b

Please sign in to comment.