Skip to content

Commit

Permalink
feat: receive blob filters from archive devices (#969)
Browse files Browse the repository at this point in the history
This is a squashed commit of:

- #940
- #957
- #956

Co-authored-by: Gregor MacLennan <[email protected]>
  • Loading branch information
EvanHahn and gmaclennan authored Nov 20, 2024
1 parent 83c0b27 commit 3d1c94b
Show file tree
Hide file tree
Showing 29 changed files with 1,307 additions and 1,146 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
"@mapeo/crypto": "1.0.0-alpha.10",
"@mapeo/sqlite-indexer": "1.0.0-alpha.9",
"@sinclair/typebox": "^0.29.6",
"@sindresorhus/merge-streams": "^4.0.0",
"b4a": "^1.6.3",
"bcp-47": "^2.1.0",
"better-sqlite3": "^8.7.0",
Expand Down Expand Up @@ -203,6 +204,7 @@
"tiny-typed-emitter": "^2.1.0",
"type-fest": "^4.5.0",
"undici": "^6.13.0",
"unix-path-resolve": "^1.0.2",
"varint": "^6.0.0",
"ws": "^8.18.0",
"yauzl-promise": "^4.0.0"
Expand Down
130 changes: 130 additions & 0 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { createEntriesStream } from './entries-stream.js'
import { filePathMatchesFilter } from './utils.js'

/** @import { BlobFilter } from '../types.js' */
/** @import { THyperdriveIndex } from './hyperdrive-index.js' */

/**
* Like hyperdrive.download() but 'live', and for multiple drives.
*
* Will emit an 'error' event for any unexpected errors. A consumer must attach
* an error listener to avoid uncaught errors. Sources of errors include:
*
* - If the entries stream emits an error
* - If a drive referenced in an entry is not found
* - If core.has() throws (e.g. if hypercore is closed)
* - If core.download().done() throws, which should not happen according to
* current hypercore code.
* - If the entries stream ends unexpectedly (it should be live and not end)
*
* NB: unlike hyperdrive.download(), this will also download deleted and
* previous versions of blobs - we don't currently support editing or deleting
* of blobs, so this should not be an issue, and if we do in the future,
* downloading deleted and previous versions may be desirable behavior anyway
*
* @extends {TypedEmitter<{ error: (error: Error) => void }>}
*/
export class Downloader extends TypedEmitter {
/** @type {THyperdriveIndex} */
#driveIndex
/** @type {Set<{ done(): Promise<void>, destroy(): void }>} */
#queuedDownloads = new Set()
#entriesStream
#processEntriesPromise
#ac = new AbortController()
#shouldDownloadFile

/**
* @param {THyperdriveIndex} driveIndex
* @param {object} [options]
* @param {BlobFilter | null} [options.filter] Filter blobs of specific types and/or sizes to download
*/
constructor(driveIndex, { filter } = {}) {
super()
this.#driveIndex = driveIndex

this.#shouldDownloadFile = filter
? filePathMatchesFilter.bind(null, filter)
: () => true

this.#entriesStream = createEntriesStream(driveIndex, { live: true })
this.#entriesStream.once('error', this.#handleError)

this.#ac.signal.addEventListener('abort', this.#handleAbort, { once: true })

this.#processEntriesPromise = this.#processEntries()
this.#processEntriesPromise.catch(this.#handleError)
}

/**
* Start processing entries from the entries stream - if an entry matches the
* filter, and we don't already have it, queue it for download. If the
* Downloader is live, this method will never resolve, otherwise it will
* resolve when all the entries have been processed and downloaded.
*/
async #processEntries() {
for await (const entry of this.#entriesStream) {
this.#ac.signal.throwIfAborted()
const {
driveId,
key: filePath,
value: { blob },
} = entry
if (!this.#shouldDownloadFile(filePath)) continue
const drive = this.#driveIndex.get(driveId)
// ERROR HANDLING: this is unexpected and should not happen
if (!drive) throw new Error('Drive not found: ' + driveId)
const blobs = await drive.getBlobs()
this.#ac.signal.throwIfAborted()
await this.#processEntry(blobs.core, blob)
this.#ac.signal.throwIfAborted()
}
throw new Error('Entries stream ended unexpectedly')
}

/**
* Update state and queue missing entries for download
*
* @param {import('hypercore')} blobsCore
* @param {{ blockOffset: number, blockLength: number, byteLength: number }} blob
*/
async #processEntry(blobsCore, { blockOffset: start, blockLength: length }) {
const end = start + length
const have = await blobsCore.has(start, end)
this.#ac.signal.throwIfAborted()
if (have) return
const download = blobsCore.download({ start, end })
this.#queuedDownloads.add(download)
download
.done()
// According to the code, this should never throw.
.catch(this.#handleError)
.finally(() => {
this.#queuedDownloads.delete(download)
})
}

/**
* Cancel the downloads and clean up resources.
*/
destroy() {
this.#ac.abort()
}

/** @param {Error} error */
#handleError = (error) => {
if (this.#ac.signal.aborted) return
this.emit('error', error)
this.#ac.abort(error)
}

#handleAbort = () => {
for (const download of this.#queuedDownloads) download.destroy()
this.#ac.signal.removeEventListener('abort', this.#handleAbort)
this.#entriesStream.removeListener('error', this.#ac.abort)
// queuedDownloads is likely to be empty here anyway, but clear just in case.
this.#queuedDownloads.clear()
this.#entriesStream.destroy()
}
}
81 changes: 81 additions & 0 deletions src/blob-store/entries-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import SubEncoder from 'sub-encoder'
import mergeStreams from '@sindresorhus/merge-streams'
import { Transform, pipeline } from 'node:stream'
import { noop } from '../utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobStoreEntriesStream } from '../types.js' */
/** @import { THyperdriveIndex } from './hyperdrive-index.js' */

const keyEncoding = new SubEncoder('files', 'utf-8')

/**
*
* @param {THyperdriveIndex} driveIndex
* @param {object} opts
* @param {boolean} [opts.live=false]
* @returns {BlobStoreEntriesStream}
*/
export function createEntriesStream(driveIndex, { live = false } = {}) {
const mergedEntriesStreams = mergeStreams(
[...driveIndex].map((drive) => getHistoryStream(drive.db, { live }))
)
driveIndex.on('add-drive', addDrive)
// Close is always emitted, so we can use it to remove the listener
mergedEntriesStreams.once('close', () =>
driveIndex.off('add-drive', addDrive)
)
return mergedEntriesStreams

/** @param {Hyperdrive} drive */
function addDrive(drive) {
mergedEntriesStreams.add(getHistoryStream(drive.db, { live }))
}
}

/**
*
* @param {import('hyperbee')} bee
* @param {object} opts
* @param {boolean} opts.live
*/
function getHistoryStream(bee, { live }) {
// This will also include old versions of files, but it is the only way to
// get a live stream from a Hyperbee, however we currently do not support
// edits of blobs, so this should not be an issue, and the consequence is
// that old versions are downloaded too, which is acceptable.
const historyStream = bee.createHistoryStream({
live,
// `keyEncoding` is necessary because hyperdrive stores file index data
// under the `files` sub-encoding key
keyEncoding,
})
return pipeline(historyStream, new AddDriveIds(bee.core), noop)
}

class AddDriveIds extends Transform {
#core
#cachedDriveId

/** @param {import('hypercore')} core */
constructor(core) {
super({ objectMode: true })
this.#core = core
this.#cachedDriveId = core.discoveryKey?.toString('hex')
}

/** @type {Transform['_transform']} */
_transform(entry, _, callback) {
// Minimal performance optimization to only call toString() once.
// core.discoveryKey will always be defined by the time it starts
// streaming, but could be null when the instance is first created.
let driveId
if (this.#cachedDriveId) {
driveId = this.#cachedDriveId
} else {
driveId = this.#core.discoveryKey?.toString('hex')
this.#cachedDriveId = driveId
}
callback(null, { ...entry, driveId })
}
}
122 changes: 122 additions & 0 deletions src/blob-store/hyperdrive-index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import b4a from 'b4a'
import { discoveryKey } from 'hypercore-crypto'
import Hyperdrive from 'hyperdrive'
import util from 'node:util'
import { TypedEmitter } from 'tiny-typed-emitter'

/** @typedef {HyperdriveIndexImpl} THyperdriveIndex */

/**
* @extends {TypedEmitter<{ 'add-drive': (drive: Hyperdrive) => void }>}
*/
export class HyperdriveIndexImpl extends TypedEmitter {
/** @type {Map<string, Hyperdrive>} */
#hyperdrives = new Map()
#writer
#writerKey
/** @param {import('../core-manager/index.js').CoreManager} coreManager */
constructor(coreManager) {
super()
/** @type {undefined | Hyperdrive} */
let writer
const corestore = new PretendCorestore({ coreManager })
const blobIndexCores = coreManager.getCores('blobIndex')
const writerCoreRecord = coreManager.getWriterCore('blobIndex')
this.#writerKey = writerCoreRecord.key
for (const { key } of blobIndexCores) {
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
// We use the discovery key to derive the id for a drive
this.#hyperdrives.set(getDiscoveryId(key), drive)
if (key.equals(this.#writerKey)) {
writer = drive
}
}
if (!writer) {
throw new Error('Could not find a writer for the blobIndex namespace')
}
this.#writer = writer

coreManager.on('add-core', ({ key, namespace }) => {
if (namespace !== 'blobIndex') return
// We use the discovery key to derive the id for a drive
const driveId = getDiscoveryId(key)
if (this.#hyperdrives.has(driveId)) return
// @ts-ignore - we know pretendCorestore is not actually a Corestore
const drive = new Hyperdrive(corestore, key)
this.#hyperdrives.set(driveId, drive)
this.emit('add-drive', drive)
})
}
get writer() {
return this.#writer
}
get writerKey() {
return this.#writerKey
}
[Symbol.iterator]() {
return this.#hyperdrives.values()
}
/** @param {string} driveId */
get(driveId) {
return this.#hyperdrives.get(driveId)
}
}

/**
* Implements the `get()` method as used by hyperdrive-next. It returns the
* relevant cores from the Mapeo CoreManager.
*/
class PretendCorestore {
#coreManager
/**
* @param {object} options
* @param {import('../core-manager/index.js').CoreManager} options.coreManager
*/
constructor({ coreManager }) {
this.#coreManager = coreManager
}

/**
* @param {Buffer | { publicKey: Buffer } | { name: string }} opts
* @returns {import('hypercore')<"binary", Buffer> | undefined}
*/
get(opts) {
if (b4a.isBuffer(opts)) {
opts = { publicKey: opts }
}
if ('key' in opts) {
// @ts-ignore
opts.publicKey = opts.key
}
if ('publicKey' in opts) {
// NB! We should always add blobIndex (Hyperbee) cores to the core manager
// before we use them here. We would only reach the addCore path if the
// blob core is read from the hyperbee header (before it is added to the
// core manager)
return (
this.#coreManager.getCoreByKey(opts.publicKey) ||
this.#coreManager.addCore(opts.publicKey, 'blob').core
)
} else if (opts.name === 'db') {
return this.#coreManager.getWriterCore('blobIndex').core
} else if (opts.name.includes('blobs')) {
return this.#coreManager.getWriterCore('blob').core
} else {
throw new Error(
'Unsupported corestore.get() with opts ' + util.inspect(opts)
)
}
}

/** no-op */
close() {}
}

/**
* @param {Buffer} key Public key of hypercore
* @returns {string} Hex-encoded string of derived discovery key
*/
function getDiscoveryId(key) {
return discoveryKey(key).toString('hex')
}
Loading

0 comments on commit 3d1c94b

Please sign in to comment.