Skip to content

Commit

Permalink
feat: add filter option to de-duplicate blocks in car files
Browse files Browse the repository at this point in the history
When calling export on an @helia/car instance the yielded export contains duplicate blocks.

A `filter` option has been added that skips yielding duplicate blocks resulting in compact CAR files.

---------

Co-authored-by: Alex Potsides <[email protected]>
  • Loading branch information
jtsmedley and achingbrain authored Jul 31, 2024
1 parent 6952f05 commit 461d219
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 3 deletions.
3 changes: 3 additions & 0 deletions packages/car/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
"@helia/interface": "^4.3.0",
"@ipld/car": "^5.3.0",
"@libp2p/interfaces": "^3.3.2",
"@libp2p/utils": "^5.4.6",
"interface-blockstore": "^5.2.10",
"it-drain": "^3.0.5",
"it-map": "^3.0.5",
Expand All @@ -151,10 +152,12 @@
"progress-events": "^1.0.0"
},
"devDependencies": {
"@helia/mfs": "^3.0.6",
"@helia/unixfs": "^3.0.6",
"@ipld/dag-pb": "^4.1.0",
"aegir": "^44.0.1",
"blockstore-core": "^4.4.0",
"datastore-core": "^9.2.9",
"ipfs-unixfs-importer": "^15.2.4",
"it-to-buffer": "^4.0.5"
},
Expand Down
20 changes: 17 additions & 3 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import type { DAGWalker } from '@helia/interface'
import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Filter } from '@libp2p/utils/filters'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
import type { ProgressOptions } from 'progress-events'
Expand All @@ -76,6 +77,13 @@ export interface CarComponents {
dagWalkers: Record<number, DAGWalker>
}

interface ExportCarOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* If a filter is passed it will be used to deduplicate blocks exported in the car file
*/
blockFilter?: Filter
}

/**
* The Car interface provides operations for importing and exporting Car files
* from Helia's underlying blockstore.
Expand Down Expand Up @@ -129,7 +137,7 @@ export interface Car {
* await eventPromise
* ```
*/
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void>
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: ExportCarOptions): Promise<void>

/**
* Returns an AsyncGenerator that yields CAR file bytes.
Expand Down Expand Up @@ -170,7 +178,7 @@ class DefaultCar implements Car {
))
}

async export (root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
async export (root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: ExportCarOptions): Promise<void> {
const deferred = defer<Error | undefined>()
const roots = Array.isArray(root) ? root : [root]

Expand All @@ -189,6 +197,12 @@ class DefaultCar implements Car {
for (const root of roots) {
void queue.add(async () => {
await this.#walkDag(root, queue, async (cid, bytes) => {
// if a filter has been passed, skip blocks that have already been written
if (options?.blockFilter?.has(cid.multihash.bytes) === true) {
return
}

options?.blockFilter?.add(cid.multihash.bytes)
await writer.put({ cid, bytes })
}, options)
})
Expand All @@ -203,7 +217,7 @@ class DefaultCar implements Car {
}
}

async * stream (root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array, void, undefined> {
async * stream (root: CID | CID[], options?: ExportCarOptions): AsyncGenerator<Uint8Array, void, undefined> {
const { writer, out } = CarWriter.create(root)

// has to be done async so we write to `writer` and read from `out` at the
Expand Down
54 changes: 54 additions & 0 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
/* eslint-env mocha */

import { mfs } from '@helia/mfs'
import { type UnixFS, unixfs } from '@helia/unixfs'
import { CarReader } from '@ipld/car'
import { createScalableCuckooFilter } from '@libp2p/utils/filters'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
Expand Down Expand Up @@ -115,4 +118,55 @@ describe('import/export car file', () => {
expect(await toBuffer(u.cat(cid2))).to.equalBytes(fileData2)
expect(await toBuffer(u.cat(cid3))).to.equalBytes(fileData3)
})

it('exports a car file without duplicates', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')

const sourceCid = await otherUnixFS.addBytes(smallFile)
await otherMFS.cp(sourceCid, '/testDups/a.smallfile')
await otherMFS.cp(sourceCid, '/testDups/sub/b.smallfile')

const rootObject = await otherMFS.stat('/testDups/')
const rootCid = rootObject.cid

const writer = memoryCarWriter(rootCid)
const blockFilter = createScalableCuckooFilter(5)
await otherCar.export(rootCid, writer, {
blockFilter
})

const carBytes = await writer.bytes()
expect(carBytes.length).to.equal(349)
})

it('exports a car file with duplicates', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')

const sourceCid = await otherUnixFS.addBytes(smallFile)
await otherMFS.cp(sourceCid, '/testDups/a.smallfile')
await otherMFS.cp(sourceCid, '/testDups/sub/b.smallfile')

const rootObject = await otherMFS.stat('/testDups/')
const rootCid = rootObject.cid

const writer = memoryCarWriter(rootCid)
await otherCar.export(rootCid, writer)

const carBytes = await writer.bytes()
expect(carBytes.length).to.equal(399)
})
})

0 comments on commit 461d219

Please sign in to comment.