From d967310a39ed55b3c7f09cb40984734d17be4409 Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Fri, 17 Nov 2023 15:32:48 -0600 Subject: [PATCH] Async car reader (#1867) * asynchronously read in car * dont buffer car * tweak --- .../pds/src/api/com/atproto/temp/importRepo.ts | 16 ++-------------- packages/repo/src/util.ts | 13 ++++++++----- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/packages/pds/src/api/com/atproto/temp/importRepo.ts b/packages/pds/src/api/com/atproto/temp/importRepo.ts index 382bd153839..896e822ba97 100644 --- a/packages/pds/src/api/com/atproto/temp/importRepo.ts +++ b/packages/pds/src/api/com/atproto/temp/importRepo.ts @@ -1,13 +1,12 @@ import { Readable } from 'stream' import assert from 'assert' -import * as ui8 from 'uint8arrays' import PQueue from 'p-queue' import axios from 'axios' import { CID } from 'multiformats/cid' import { InvalidRequestError } from '@atproto/xrpc-server' import { AsyncBuffer, TID, wait } from '@atproto/common' import { AtUri } from '@atproto/syntax' -import { Repo, WriteOpAction, readCar, verifyDiff } from '@atproto/repo' +import { Repo, WriteOpAction, readCarStream, verifyDiff } from '@atproto/repo' import { BlobRef, LexValue } from '@atproto/lexicon' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' @@ -73,16 +72,6 @@ const processImport = async ( outBuffer.close() } -const readStreamToBytes = async ( - stream: AsyncIterable, -): Promise => { - const chunks: Uint8Array[] = [] - for await (const chunk of stream) { - chunks.push(chunk) - } - return ui8.concat(chunks) -} - const importRepo = async ( actorStore: ActorStoreTransactor, incomingCar: AsyncIterable, @@ -92,8 +81,7 @@ const importRepo = async ( const rev = TID.nextStr() const did = actorStore.repo.did - const carBytes = await readStreamToBytes(incomingCar) - const { roots, blocks } = await readCar(carBytes) + const { roots, blocks } = await readCarStream(incomingCar) if (roots.length !== 1) { throw new InvalidRequestError('expected one root') } diff --git a/packages/repo/src/util.ts b/packages/repo/src/util.ts index a8cdb30a439..3c7a151a844 100644 --- a/packages/repo/src/util.ts +++ b/packages/repo/src/util.ts @@ -1,6 +1,7 @@ +import { setImmediate } from 'node:timers/promises' import { CID } from 'multiformats/cid' import * as cbor from '@ipld/dag-cbor' -import { CarReader } from '@ipld/car/reader' +import { CarBlockIterator } from '@ipld/car' import { BlockWriter, CarWriter } from '@ipld/car/writer' import { streamToBuffer, @@ -89,12 +90,14 @@ export const blocksToCarFile = ( } export const carToBlocks = async ( - car: CarReader, + car: CarBlockIterator, ): Promise<{ roots: CID[]; blocks: BlockMap }> => { const roots = await car.getRoots() const blocks = new BlockMap() - for await (const block of verifyIncomingCarBlocks(car.blocks())) { + for await (const block of verifyIncomingCarBlocks(car)) { blocks.set(block.cid, block.bytes) + // break up otherwise "synchronous" work in car parsing + await setImmediate() } return { roots, @@ -105,12 +108,12 @@ export const carToBlocks = async ( export const readCar = async ( bytes: Uint8Array, ): Promise<{ roots: CID[]; blocks: BlockMap }> => { - const car = await CarReader.fromBytes(bytes) + const car = await CarBlockIterator.fromBytes(bytes) return carToBlocks(car) } export const readCarStream = async (stream: AsyncIterable) => { - const car = await CarReader.fromIterable(stream) + const car = await CarBlockIterator.fromIterable(stream) return carToBlocks(car) }