From 061b500dc0cffb52e6082050cd7250860c26dff4 Mon Sep 17 00:00:00 2001 From: Vladan Date: Mon, 5 Feb 2024 16:32:29 +0100 Subject: [PATCH 1/8] feat: bmt with streams --- src/file-streams.ts | 565 ++++++++++++++++++++++++++++++++++ test/integration/file.spec.ts | 125 +++++++- 2 files changed, 689 insertions(+), 1 deletion(-) create mode 100644 src/file-streams.ts diff --git a/src/file-streams.ts b/src/file-streams.ts new file mode 100644 index 0000000..b263fb5 --- /dev/null +++ b/src/file-streams.ts @@ -0,0 +1,565 @@ +import { Chunk, ChunkAddress, DEFAULT_MAX_PAYLOAD_SIZE, SEGMENT_SIZE, makeChunk } from './chunk' +import { createIntermediateChunk, nextBmtLevel } from './file' +import { DEFAULT_SPAN_SIZE, Span, makeSpan } from './span' +import { Deferred, Flavor, concatBytes } from './utils' + +export interface GenericReadable { + on(event: 'close', listener: () => void): this + on(event: 'data', listener: (chunk: T) => void): this + on(event: 'error', listener: (err: Error) => void): this + push(chunk: unknown, encoding?: BufferEncoding): boolean + destroy(error?: Error): this + emit(event: 'error', err: Error): boolean +} + +export interface ChunkedFileDeferred< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +> extends Flavor<'ChunkedFile'> { + // zero level data chunks + leafChunks: GenericReadable> + rootChunk: Promise> + payload: GenericReadable + address: Promise + span: () => Span + bmt: GenericReadable> +} + +/** + * Calculates number of leaf chunks from payload length + * @param payloadLength Payload length + * @param options settings for the used chunks + * @returns Number of leaf chunks + */ +export function byteLengthToChunkLength< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payloadLength: number, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): number { + const maxPayloadLength = (options?.maxPayloadLength || DEFAULT_MAX_PAYLOAD_SIZE) as MaxChunkPayloadLength + + return Math.ceil(payloadLength / maxPayloadLength) +} + +/** + * Creates object for performing BMT functions on payload data using streams + * + * @param payload byte array stream of the data + * @param options settings for the used chunks + * @returns ChunkedFileDeferred object with helper methods + */ +export function makeChunkedFileWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + payloadLength: number, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): ChunkedFileDeferred { + const spanLength = (options?.spanLength || DEFAULT_SPAN_SIZE) as SpanLength + const chunkLength = byteLengthToChunkLength(payloadLength, options) + + const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) + + const rootChunk = bmtRootChunkWithStreams(leafStream, chunkLength, chunkStreamFactory) + + const address = new Promise(async (resolve, reject) => { + try { + resolve((await rootChunk).address()) + } catch (error) { + reject(error) + } + }) + + const bmt = bmtWithStreams(leafStream, chunkLength, chunkStreamFactory) + + return { + payload, + span: () => makeSpan(payloadLength, spanLength), + leafChunks: leafStream, + address: address as Promise, + rootChunk, + bmt, + } +} + +/** + * Generates BMT chunks and outputs them to a readable stream. + * @param payload Readable stream of Uint8Array data + * @param payloadLength Total number of bytes in payload + * @param chunkStreamFactory A factory function for a readable stream + * @param options settings for the used chunks + * @returns A readable stream with all chunks from BMT. Levels are separated + * by empty chunks (payload.length === 0) + */ +export function createBmtWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + payloadLength: number, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): GenericReadable> { + const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) + const leafChunkLength = byteLengthToChunkLength(payloadLength, options) + + return bmtWithStreams(leafStream, leafChunkLength, chunkStreamFactory) +} + +/** + * Calculates root chunk for leaf chunks received by a readable stream + * @param payload Readable stream of Uint8Array data + * @param payloadLength Total number of bytes in payload + * @param chunkStreamFactory A factory function for a readable stream + * @param options settings for the used chunks + * @returns Promise resolved with root chunk + */ +export async function createBmtRootChunkWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + payloadLength: number, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): Promise> { + const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) + const leafChunkLength = byteLengthToChunkLength(payloadLength, options) + + return bmtRootChunkWithStreams(leafStream, leafChunkLength, chunkStreamFactory) +} + +/** + * Returns a readable stream of leaf chunks for received bytes. + * @param payload Readable stream of Uint8Array data + * @param chunkStreamFactory A factory function for a readable stream + * @param options settings for the used chunks + */ +export function createLeafChunksStream< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): GenericReadable> { + const maxPayloadLength = (options?.maxPayloadLength || DEFAULT_MAX_PAYLOAD_SIZE) as MaxChunkPayloadLength + + let buffer: Uint8Array = new Uint8Array() + let dataLength = 0 + const leafStream = chunkStreamFactory() + + payload.on('data', chunk => { + buffer = concatBytes(buffer, chunk) + dataLength += chunk.length + + for (let offset = 0; offset + maxPayloadLength <= buffer.length; offset += maxPayloadLength) { + leafStream.push(makeChunk(buffer.slice(offset, offset + maxPayloadLength), options)) + } + + if (buffer.length >= maxPayloadLength) { + buffer = buffer.slice(Math.floor(buffer.length / maxPayloadLength) * maxPayloadLength, buffer.length) + } + }) + + payload.on('close', () => { + if (dataLength === 0) { + leafStream.push(makeChunk(new Uint8Array(), options)) + } else { + for (let offset = 0; offset < buffer.length; offset += maxPayloadLength) { + leafStream.push(makeChunk(buffer.slice(offset, offset + maxPayloadLength), options)) + } + } + + leafStream.destroy() + }) + + payload.on('error', error => { + leafStream.emit('error', error) + }) + + return leafStream +} + +/** + * Generates BMT chunks and outputs them to a readable stream. + * @param chunks Readable stream of leaf chunks + * @param chunksLength Total number of leaf chunks expected + * @param chunkStreamFactory A factory function for a readable stream + * @returns A readable stream with all chunks from BMT. Levels are separated + * by empty chunks (payload.length === 0) + */ +function bmtWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + leafChunks: GenericReadable>, + chunksLength: number, + chunkStreamFactory: () => GenericReadable>, +): GenericReadable> { + const outputStream = chunkStreamFactory() + + if (chunksLength === 0) { + throw new Error(`given chunk array is empty`) + } + + checkShouldPopCarrierChunkWithStreams(leafChunks, chunksLength, (error, initialChunk, popCarrierChunk) => { + try { + if (error) { + throw error + } + + if (popCarrierChunk) { + chunksLength -= 1 + } + + let prevChunk = initialChunk + + leafChunks.on('data', chunk => { + outputStream.push(prevChunk) + prevChunk = chunk + }) + + leafChunks.on('close', () => { + if (!popCarrierChunk && prevChunk) { + outputStream.push(prevChunk) + } + + if (chunksLength === 1) { + outputStream.destroy() + } else { + outputStream.push(makeChunk(new Uint8Array())) + } + }) + + leafChunks.on('error', error => outputStream.emit('error', error)) + + if (chunksLength === 1) { + return + } + + const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( + leafChunks, + chunksLength, + initialChunk as Chunk, + popCarrierChunk, + chunkStreamFactory, + ) + + let levelChunks: Chunk[] = [] + + nextLevelChunks.on('data', chunk => levelChunks.push(chunk)) + + nextLevelChunks.on('close', async () => { + let carrierChunk = await nextLevelCarrierChunk + + levelChunks.forEach(chunk => outputStream.push(chunk)) + + while (levelChunks.length !== 1 || carrierChunk) { + outputStream.push(makeChunk(new Uint8Array())) + + const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) + + nextLevelChunks.forEach(chunk => outputStream.push(chunk)) + + levelChunks = nextLevelChunks + carrierChunk = nextLevelCarrierChunk + } + + outputStream.destroy() + }) + + nextLevelChunks.on('error', error => { + outputStream.emit('error', error) + }) + } catch (error) { + outputStream.emit('error', error as Error) + } + }) + + return outputStream +} + +/** + * Calculates root chunk for leaf chunks received by a readable stream + * @param chunks Readable stream of leaf chunks + * @param chunksLength Total number of leaf chunks expected + * @param chunkStreamFactory A factory function for a readable stream + * @returns Promise resolved with root chunk + */ +async function bmtRootChunkWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + chunks: GenericReadable>, + chunksLength: number, + chunkStreamFactory: () => GenericReadable>, +): Promise> { + const result = new Deferred>() + + try { + if (chunksLength === 0) { + result.reject(new Error(`given chunk array is empty`)) + } + + checkShouldPopCarrierChunkWithStreams(chunks, chunksLength, (error, initialChunk, popCarrierChunk) => { + try { + if (error) { + throw error + } + + if (popCarrierChunk) { + chunksLength -= 1 + } + + if (chunksLength === 1 && !popCarrierChunk) { + return result.resolve(initialChunk as Chunk) + } + + const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( + chunks, + chunksLength, + initialChunk as Chunk, + popCarrierChunk, + chunkStreamFactory, + ) + + let levelChunks: Chunk[] = [] + + nextLevelChunks.on('data', chunk => { + levelChunks.push(chunk) + }) + + nextLevelChunks.on('close', async () => { + let carrierChunk = await nextLevelCarrierChunk + + while (levelChunks.length !== 1 || carrierChunk) { + const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) + levelChunks = nextLevelChunks + carrierChunk = nextLevelCarrierChunk + } + + result.resolve(levelChunks[0]) + }) + + nextLevelChunks.on('error', error => { + result.reject(error) + }) + } catch (error) { + result.reject(error) + } + }) + } catch (error) { + result.reject(error) + } + + return result.promise +} + +/** + * A helper function that generates first level of intermediate chunks using streams. + * It is expected that first chunk has already been received and calculated whether + * last chunk should be excluded. + * @param chunks Readable stream of leaf chunks + * @param chunksLength Total number of leaf chunks expected + * @param initialChunk First chunk that has already been received + * @param popCarrierChunk Whether last chunk should be excluded from current level + * @param chunkArrayStreamFactory A factory function for a readable stream + * @returns A readable stream of first level intermediate chunks, + * number of chunks in first level and a promise of carrierChunk for this level + */ +function firstBmtLevelWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + chunks: GenericReadable>, + chunksLength: number, + initialChunk: Chunk, + popCarrierChunk: boolean, + chunkArrayStreamFactory: () => GenericReadable>, +): { + nextLevelChunks: GenericReadable> + nextCarrierChunk: Promise | null> + nextLevelChunksLength: number +} { + const nextLevelChunks: GenericReadable> = chunkArrayStreamFactory() + + if (chunksLength === 0) { + throw new Error('The given chunk array is empty') + } + + let lastChunk: Chunk + let nextPopCarrierChunk = popCarrierChunk + let nextLevelChunksBuffer: Chunk[] = [initialChunk] + const nextCarrierChunk = new Deferred | null>() + let generatedChunksCount = 0 + + const maxPayloadLength = initialChunk.maxPayloadLength + const spanLength = initialChunk.spanLength + const maxSegmentCount = maxPayloadLength / SEGMENT_SIZE + let nextLevelChunksLength = Math.ceil(chunksLength / maxSegmentCount) + const carrierChunkIncluded = nextLevelChunksLength % maxSegmentCount !== 0 + + if (popCarrierChunk) { + if (carrierChunkIncluded) { + nextLevelChunksLength += 1 + nextPopCarrierChunk = false + } + } else { + nextPopCarrierChunk = shouldPopCarrierChunk(initialChunk, nextLevelChunksLength) + if (nextPopCarrierChunk) { + nextLevelChunksLength -= 1 + } + } + + if (!nextPopCarrierChunk) { + nextCarrierChunk.resolve(null) + } + + const handleChunk = (chunk: Chunk) => { + generatedChunksCount += 1 + + if (generatedChunksCount === nextLevelChunksLength && nextPopCarrierChunk) { + nextCarrierChunk.resolve(chunk) + } else { + nextLevelChunks.push(chunk) + } + } + + chunks.on('data', chunk => { + try { + nextLevelChunksBuffer.push(chunk) + lastChunk = chunk + + for ( + let offset = 0; + offset + maxSegmentCount <= nextLevelChunksBuffer.length; + offset += maxSegmentCount + ) { + const childrenChunks = nextLevelChunksBuffer.slice(offset, offset + maxSegmentCount) + const intermediateChunk = createIntermediateChunk(childrenChunks, spanLength, maxPayloadLength) + + handleChunk(intermediateChunk as Chunk) + } + + if (nextLevelChunksBuffer.length >= maxSegmentCount) { + nextLevelChunksBuffer = nextLevelChunksBuffer.slice( + Math.floor(nextLevelChunksBuffer.length / maxSegmentCount) * maxSegmentCount, + nextLevelChunksBuffer.length, + ) + } + } catch (error) { + nextLevelChunks.emit('error', error as Error) + nextCarrierChunk.reject(error) + } + }) + + chunks.on('close', () => { + for (let offset = 0; offset < nextLevelChunksBuffer.length; offset += maxSegmentCount) { + const childrenChunks = nextLevelChunksBuffer.slice(offset, offset + maxSegmentCount) + const intermediateChunk = createIntermediateChunk(childrenChunks, spanLength, maxPayloadLength) + handleChunk(intermediateChunk as Chunk) + } + + if (popCarrierChunk && carrierChunkIncluded) { + nextLevelChunks.push(lastChunk) + } + + nextLevelChunks.destroy() + }) + + chunks.on('error', error => { + nextLevelChunks.emit('error', error) + nextCarrierChunk.reject(error) + }) + + return { + nextLevelChunks, + nextLevelChunksLength, + nextCarrierChunk: nextCarrierChunk.promise, + } +} + +/** + * A helper function that waits for first chunk to arrive and determines + * whether last chunk should be excluded from current level. + * + * @param chunks Readable chunk stream + * @param chunkLength Total number of chunks expected in the stream + * @param callback Called when first chunk is received and determined wheter last chunk + * should be excluded + */ +function checkShouldPopCarrierChunkWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + chunks: GenericReadable>, + chunksLength: number, + callback: ( + error: unknown | null, + initialChunk: Chunk | null, + popCarrierChunk: boolean, + ) => void, +) { + let firstChunk: Chunk | null = null + let popCarrierChunk = false + + chunks.on('data', chunk => { + if (!firstChunk) { + firstChunk = chunk + + popCarrierChunk = shouldPopCarrierChunk(firstChunk, chunksLength) + + callback(null, firstChunk, popCarrierChunk) + } + }) + + chunks.on('close', () => { + if (!firstChunk) { + callback(null, firstChunk, popCarrierChunk) + } + }) + + chunks.on('error', error => { + callback(error, firstChunk, popCarrierChunk) + }) +} + +/** + * Returs whether last chunk should be excluded from current level. + * This can be calculated as soon as first chunk arrives + * @param firstChunk First chunk of current level + * @param chunkLength Number of chunks in current level + * @returns Whether last chunk should be excluded + */ +function shouldPopCarrierChunk< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>(firstChunk: Chunk, chunkLength: number): boolean { + if (chunkLength <= 1) return false + + const maxDataLength = firstChunk.maxPayloadLength + // max segment count in one chunk. the segment size have to be equal to the chunk addresses + const maxSegmentCount = maxDataLength / SEGMENT_SIZE + + return chunkLength % maxSegmentCount === 1 +} diff --git a/test/integration/file.spec.ts b/test/integration/file.spec.ts index 4fe564b..120afff 100644 --- a/test/integration/file.spec.ts +++ b/test/integration/file.spec.ts @@ -1,9 +1,13 @@ +/* eslint-disable @typescript-eslint/no-empty-function */ +import { Readable, Transform } from 'node:stream' import { Bee, SPAN_SIZE } from '@ethersphere/bee-js' import FS from 'fs' +import { stat } from 'fs/promises' import path from 'path' import { makeChunkedFile } from '../../src' -import { DEFAULT_MAX_PAYLOAD_SIZE } from '../../src/chunk' +import { Chunk, DEFAULT_MAX_PAYLOAD_SIZE } from '../../src/chunk' import { bytesToHex } from '../../src/utils' +import { createBmtRootChunkWithStreams, makeChunkedFileWithStreams } from '../../src/file-streams' const beeUrl = process.env.BEE_API_URL || 'http://localhost:1633' const bee = new Bee(beeUrl) @@ -66,3 +70,122 @@ describe('file', () => { expect(bytesToHex(chunkedFile.address(), 64)).toBe(beeResult.reference) }) }) + +describe('file-streams', () => { + const transformToByteStream = (readable: FS.ReadStream) => + readable.pipe( + new Transform({ + transform(chunk, encoding, callback) { + callback(null, Uint8Array.from(chunk as Buffer)) + }, + }), + ) + + it('should produce same chunk like Bee for data < 4KB', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'text.txt') + const { size } = await stat(filePath) + + const chunkedFileFromStream = makeChunkedFileWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + size, + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + + expect(bytesToHex((await chunkedFileFromStream.rootChunk).address(), 64)).toStrictEqual( + bytesToHex(chunkedFile.address(), 64), + ) + }) + + it('should produce same BMT tree like Bee for data > 4KB', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'The-Book-of-Swarm.pdf') + const { size } = await stat(filePath) + + const chunkedFileFromStream = makeChunkedFileWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + size, + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const bmtStream = chunkedFileFromStream.bmt + const treeFromStream: Chunk<4096, 8>[][] = [[]] + + bmtStream.on('data', chunk => { + if (chunk.payload.length === 0) { + treeFromStream.push([]) + } else { + treeFromStream[treeFromStream.length - 1].push(chunk) + } + }) + + await new Promise((resolve, reject) => { + bmtStream.on('close', () => resolve()) + bmtStream.on('error', error => reject(error)) + }) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + const tree = chunkedFile.bmt() + + expect(treeFromStream[0][0].payload).toStrictEqual(tree[0][0].payload) + + expect(treeFromStream[1][0].payload).toStrictEqual(tree[1][0].payload) + expect(treeFromStream[1][0].span()).toStrictEqual(tree[1][0].span()) + expect(treeFromStream[1][0].address()).toStrictEqual(tree[1][0].address()) + expect(bytesToHex(treeFromStream[2][0].address(), 64)).toStrictEqual(bytesToHex(tree[2][0].address(), 64)) + + expect(bytesToHex(await chunkedFileFromStream.address, 64)).toStrictEqual( + bytesToHex(chunkedFile.address(), 64), + ) + }) + + it('should work with edge case - carrier chunk', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'carrier-chunk-blob') + const { size } = await stat(filePath) + + const rootChunk = await createBmtRootChunkWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + size, + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + + expect(bytesToHex(rootChunk.address(), 64)).toBe(bytesToHex(chunkedFile.address(), 64)) + }) + + it('should work with edge case - carrier chunk in intermediate level', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'carrier-chunk-blob-2') + const { size } = await stat(filePath) + + const rootChunk = await createBmtRootChunkWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + size, + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + + expect(bytesToHex(rootChunk.address(), 64)).toBe(bytesToHex(chunkedFile.address(), 64)) + }) +}) From 590db659e20013c059302d12a8fa81315cf71285 Mon Sep 17 00:00:00 2001 From: Vladan Date: Mon, 5 Feb 2024 18:51:18 +0100 Subject: [PATCH 2/8] fix: bmt streams --- src/file-streams.ts | 16 +++++++++++----- src/utils.ts | 23 +++++++++++++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/src/file-streams.ts b/src/file-streams.ts index b263fb5..b46aaa8 100644 --- a/src/file-streams.ts +++ b/src/file-streams.ts @@ -274,7 +274,7 @@ function bmtWithStreams< levelChunks.forEach(chunk => outputStream.push(chunk)) - while (levelChunks.length !== 1 || carrierChunk) { + while (levelChunks.length !== 1) { outputStream.push(makeChunk(new Uint8Array())) const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) @@ -418,6 +418,7 @@ function firstBmtLevelWithStreams< const maxSegmentCount = maxPayloadLength / SEGMENT_SIZE let nextLevelChunksLength = Math.ceil(chunksLength / maxSegmentCount) const carrierChunkIncluded = nextLevelChunksLength % maxSegmentCount !== 0 + let receivedChunks = 1 if (popCarrierChunk) { if (carrierChunkIncluded) { @@ -438,16 +439,21 @@ function firstBmtLevelWithStreams< const handleChunk = (chunk: Chunk) => { generatedChunksCount += 1 - if (generatedChunksCount === nextLevelChunksLength && nextPopCarrierChunk) { - nextCarrierChunk.resolve(chunk) - } else { + if (generatedChunksCount <= nextLevelChunksLength) { nextLevelChunks.push(chunk) + } else if (nextPopCarrierChunk) { + nextCarrierChunk.resolve(chunk) } } chunks.on('data', chunk => { try { - nextLevelChunksBuffer.push(chunk) + receivedChunks += 1 + + if (receivedChunks <= chunksLength || !popCarrierChunk) { + nextLevelChunksBuffer.push(chunk) + } + lastChunk = chunk for ( diff --git a/src/utils.ts b/src/utils.ts index 622fc9d..b88d3c0 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,8 +1,22 @@ +/* eslint-disable @typescript-eslint/no-empty-function */ import { keccak256, Message } from 'js-sha3' /** Used for FavorTypes */ export type Flavor = { __tag__?: Name } +export class Deferred { + public resolve: (value: T) => void = () => {} + public reject: (reason?: unknown) => void = () => {} + public promise: Promise + + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + } +} + /** * Nominal type to represent hex strings WITHOUT '0x' prefix. * For example for 32 bytes hex representation you have to use 64 length. @@ -111,3 +125,12 @@ export function equalBytes(a: Uint8Array, b: Uint8Array): boolean { } export { Message } + +export function concatBytes(bytes1: Uint8Array, bytes2: Uint8Array): Uint8Array { + const buffer = new Uint8Array(bytes1.length + bytes2.length) + + buffer.set(bytes1, 0) + buffer.set(bytes2, bytes1.length) + + return buffer +} From 5ef29181195bcc2b20e6548af7f36fde8f85ec87 Mon Sep 17 00:00:00 2001 From: Vladan Date: Tue, 6 Feb 2024 10:13:47 +0100 Subject: [PATCH 3/8] chore: new stream methods update --- README.md | 9 +++++++++ src/file-streams.ts | 2 +- src/index.ts | 5 +++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fa17ea2..277fbdf 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,12 @@ You can import the followings directly from `@fairdatasociety/bmt-js`: * address() # gives back the calculated chunk address of file data * span() # serialized span value of the file * bmt() # gives back the Binary Merkle Tree of the file data +* ChunkedFileDeferred + * payload # the passed byte stream with which the object was initialized. + * leafChunks # data chunks stream of the file data + * rootChunk # promise that resolves topmost chunk in the file BMT + * address # promise of the calculated chunk address of file data + * bmt # stream of the Binary Merkle Tree of the file data (levels are separated by an empty chunk) * ChunkInclusionProof # groups chunk inclusion proof segments and span value of a chunk * ChunkAddress # chunk address resulted from BMT hashing of data. It is used also fole FileAddress * Span # span value in byte format. Indicates how much data subsumed under the Chunk/File @@ -85,6 +91,9 @@ You can import the followings directly from `@fairdatasociety/bmt-js`: * getBmtIndexOfSegment # get the chunk's position of a given payload segment index in the BMT tree * fileInclusionProofBottomUp # gives back required sister segments of a given payload segment index for inclusion proof * fileAddressFromInclusionProof # gives back the file address that is calculated with only the inclusion proof segments and the corresponding proved segment and its position. +* makeChunkedFileWithStreams # makes `Chunk` helper object for performing BMT actions on file data using streams +* createBmtWithStreams # generates BMT chunks and outputs them to a readable stream. +* createBmtRootChunkWithStreams # calculates root chunk for bytes received by a readable stream ## Other objects diff --git a/src/file-streams.ts b/src/file-streams.ts index b46aaa8..0fd526d 100644 --- a/src/file-streams.ts +++ b/src/file-streams.ts @@ -120,7 +120,7 @@ export function createBmtWithStreams< } /** - * Calculates root chunk for leaf chunks received by a readable stream + * Calculates root chunk for bytes received by a readable stream * @param payload Readable stream of Uint8Array data * @param payloadLength Total number of bytes in payload * @param chunkStreamFactory A factory function for a readable stream diff --git a/src/index.ts b/src/index.ts index 8f33394..18ec6ba 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,6 +6,11 @@ export { fileAddressFromInclusionProof, getBmtIndexOfSegment, } from './file' +export { + makeChunkedFileWithStreams, + createBmtWithStreams, + createBmtRootChunkWithStreams, +} from './file-streams' export { Chunk, ChunkAddress, makeChunk, rootHashFromInclusionProof } from './chunk' export { Span, makeSpan, getSpanValue } from './span' export * as Utils from './utils' From faa74a12ca6bf2b859354586ac77551156ea1cee Mon Sep 17 00:00:00 2001 From: Vladan Date: Tue, 6 Feb 2024 10:38:46 +0100 Subject: [PATCH 4/8] fix: missing exports --- src/file.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/file.ts b/src/file.ts index 1951c15..99cbb9a 100644 --- a/src/file.ts +++ b/src/file.ts @@ -264,7 +264,7 @@ function bmtRootChunk< return levelChunks[0] } -function nextBmtLevel< +export function nextBmtLevel< MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( @@ -308,7 +308,7 @@ function nextBmtLevel< } } -function createIntermediateChunk< +export function createIntermediateChunk< MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( From b49200aa0659609b0d80393e432e1a1c35bdd539 Mon Sep 17 00:00:00 2001 From: Vladan Date: Tue, 6 Feb 2024 11:01:28 +0100 Subject: [PATCH 5/8] ci: increase test timeout --- jest.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jest.config.ts b/jest.config.ts index c55452d..5abcd81 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -26,7 +26,7 @@ export default async (): Promise => { // An array of regexp pattern strings that are matched against all test paths, matched tests are skipped testPathIgnorePatterns: ['/node_modules/'], - testTimeout: 90000, // 1.5 min + testTimeout: 180000, // 3 min // Run tests from one or more projects projects: [ From cfffc9f29ee5e733f8b65152ef26ce80bea99e6e Mon Sep 17 00:00:00 2001 From: Vladan Date: Tue, 6 Feb 2024 14:12:34 +0100 Subject: [PATCH 6/8] ci: batch usability fix --- test/test-setup.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test-setup.ts b/test/test-setup.ts index 0542f46..9b76413 100644 --- a/test/test-setup.ts +++ b/test/test-setup.ts @@ -26,5 +26,7 @@ export default async function testsSetup(): Promise { // so we are only logging errors and not leaving them to propagate console.error(e) } + + await sleep(120 * 1000) } } From 983c866aeadc935f10abc6e72eef7689fda69b97 Mon Sep 17 00:00:00 2001 From: Vladan Date: Wed, 7 Feb 2024 13:26:08 +0100 Subject: [PATCH 7/8] ci: node versions update --- .github/workflows/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cccbaee..a5dabd3 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -24,7 +24,7 @@ jobs: strategy: matrix: - node-version: [14.x, 15.x, 16.x, 17.x] + node-version: [16.x, 17.x, 18.x] steps: - name: Checkout From 82259dc0898b780ad40cc53a25813aa3d9a44cfb Mon Sep 17 00:00:00 2001 From: Vladan Date: Thu, 8 Feb 2024 18:15:33 +0100 Subject: [PATCH 8/8] feat: file streaming improvements --- src/file-streams.ts | 416 +++++++++++++++------------------- test/integration/file.spec.ts | 9 - 2 files changed, 181 insertions(+), 244 deletions(-) diff --git a/src/file-streams.ts b/src/file-streams.ts index 0fd526d..3149301 100644 --- a/src/file-streams.ts +++ b/src/file-streams.ts @@ -21,35 +21,31 @@ export interface ChunkedFileDeferred< rootChunk: Promise> payload: GenericReadable address: Promise - span: () => Span + span: Promise> bmt: GenericReadable> } /** - * Calculates number of leaf chunks from payload length - * @param payloadLength Payload length - * @param options settings for the used chunks - * @returns Number of leaf chunks + * Calculates total number of bytes received in given readable stream + * @param payload byte array stream + * @returns Total number of bytes resolved by a promise */ -export function byteLengthToChunkLength< - MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, - SpanLength extends number = typeof DEFAULT_SPAN_SIZE, ->( - payloadLength: number, - options?: { - maxPayloadLength?: MaxChunkPayloadLength - spanLength?: SpanLength - }, -): number { - const maxPayloadLength = (options?.maxPayloadLength || DEFAULT_MAX_PAYLOAD_SIZE) as MaxChunkPayloadLength +async function getByteStreamLength(payload: GenericReadable): Promise { + return new Promise((resolve, reject) => { + let dataLength = 0 + payload.on('data', chunk => (dataLength += chunk.length)) + + payload.on('close', () => resolve(dataLength)) - return Math.ceil(payloadLength / maxPayloadLength) + payload.on('error', error => reject(error)) + }) } /** * Creates object for performing BMT functions on payload data using streams * * @param payload byte array stream of the data + * @param chunkStreamFactory A factory function for a readable stream * @param options settings for the used chunks * @returns ChunkedFileDeferred object with helper methods */ @@ -58,7 +54,6 @@ export function makeChunkedFileWithStreams< SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( payload: GenericReadable, - payloadLength: number, chunkStreamFactory: () => GenericReadable>, options?: { maxPayloadLength?: MaxChunkPayloadLength @@ -66,11 +61,11 @@ export function makeChunkedFileWithStreams< }, ): ChunkedFileDeferred { const spanLength = (options?.spanLength || DEFAULT_SPAN_SIZE) as SpanLength - const chunkLength = byteLengthToChunkLength(payloadLength, options) + const payloadLengthPromise = getByteStreamLength(payload) const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) - const rootChunk = bmtRootChunkWithStreams(leafStream, chunkLength, chunkStreamFactory) + const rootChunk = bmtRootChunkWithStreams(leafStream, chunkStreamFactory) const address = new Promise(async (resolve, reject) => { try { @@ -80,11 +75,19 @@ export function makeChunkedFileWithStreams< } }) - const bmt = bmtWithStreams(leafStream, chunkLength, chunkStreamFactory) + const span = new Promise>(async (resolve, reject) => { + try { + resolve(makeSpan(await payloadLengthPromise, spanLength)) + } catch (error) { + reject(error) + } + }) + + const bmt = bmtWithStreams(leafStream, chunkStreamFactory) return { payload, - span: () => makeSpan(payloadLength, spanLength), + span, leafChunks: leafStream, address: address as Promise, rootChunk, @@ -95,7 +98,6 @@ export function makeChunkedFileWithStreams< /** * Generates BMT chunks and outputs them to a readable stream. * @param payload Readable stream of Uint8Array data - * @param payloadLength Total number of bytes in payload * @param chunkStreamFactory A factory function for a readable stream * @param options settings for the used chunks * @returns A readable stream with all chunks from BMT. Levels are separated @@ -106,7 +108,6 @@ export function createBmtWithStreams< SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( payload: GenericReadable, - payloadLength: number, chunkStreamFactory: () => GenericReadable>, options?: { maxPayloadLength?: MaxChunkPayloadLength @@ -114,15 +115,13 @@ export function createBmtWithStreams< }, ): GenericReadable> { const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) - const leafChunkLength = byteLengthToChunkLength(payloadLength, options) - return bmtWithStreams(leafStream, leafChunkLength, chunkStreamFactory) + return bmtWithStreams(leafStream, chunkStreamFactory) } /** * Calculates root chunk for bytes received by a readable stream * @param payload Readable stream of Uint8Array data - * @param payloadLength Total number of bytes in payload * @param chunkStreamFactory A factory function for a readable stream * @param options settings for the used chunks * @returns Promise resolved with root chunk @@ -132,7 +131,6 @@ export async function createBmtRootChunkWithStreams< SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( payload: GenericReadable, - payloadLength: number, chunkStreamFactory: () => GenericReadable>, options?: { maxPayloadLength?: MaxChunkPayloadLength @@ -140,9 +138,8 @@ export async function createBmtRootChunkWithStreams< }, ): Promise> { const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) - const leafChunkLength = byteLengthToChunkLength(payloadLength, options) - return bmtRootChunkWithStreams(leafStream, leafChunkLength, chunkStreamFactory) + return bmtRootChunkWithStreams(leafStream, chunkStreamFactory) } /** @@ -203,7 +200,6 @@ export function createLeafChunksStream< /** * Generates BMT chunks and outputs them to a readable stream. * @param chunks Readable stream of leaf chunks - * @param chunksLength Total number of leaf chunks expected * @param chunkStreamFactory A factory function for a readable stream * @returns A readable stream with all chunks from BMT. Levels are separated * by empty chunks (payload.length === 0) @@ -213,34 +209,36 @@ function bmtWithStreams< SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( leafChunks: GenericReadable>, - chunksLength: number, chunkStreamFactory: () => GenericReadable>, ): GenericReadable> { const outputStream = chunkStreamFactory() + let chunksLength = 0 - if (chunksLength === 0) { - throw new Error(`given chunk array is empty`) - } + try { + let firstChunk: Chunk | null = null + let prevChunk: Chunk | null = null - checkShouldPopCarrierChunkWithStreams(leafChunks, chunksLength, (error, initialChunk, popCarrierChunk) => { - try { - if (error) { - throw error + leafChunks.on('data', chunk => { + chunksLength += 1 + + if (chunksLength === 1) { + firstChunk = chunk } - if (popCarrierChunk) { - chunksLength -= 1 + if (prevChunk) { + outputStream.push(prevChunk) } - let prevChunk = initialChunk + prevChunk = chunk + }) - leafChunks.on('data', chunk => { - outputStream.push(prevChunk) - prevChunk = chunk - }) + leafChunks.on('close', () => { + try { + if (chunksLength === 0) { + throw new Error(`given chunk array is empty`) + } - leafChunks.on('close', () => { - if (!popCarrierChunk && prevChunk) { + if (!shouldPopCarrierChunk(firstChunk as Chunk, chunksLength)) { outputStream.push(prevChunk) } @@ -249,52 +247,47 @@ function bmtWithStreams< } else { outputStream.push(makeChunk(new Uint8Array())) } - }) - - leafChunks.on('error', error => outputStream.emit('error', error)) - - if (chunksLength === 1) { - return + } catch (error) { + outputStream.emit('error', error as Error) } + }) - const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( - leafChunks, - chunksLength, - initialChunk as Chunk, - popCarrierChunk, - chunkStreamFactory, - ) + leafChunks.on('error', error => outputStream.emit('error', error)) - let levelChunks: Chunk[] = [] + const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( + leafChunks, + chunkStreamFactory, + ) - nextLevelChunks.on('data', chunk => levelChunks.push(chunk)) + let levelChunks: Chunk[] = [] - nextLevelChunks.on('close', async () => { - let carrierChunk = await nextLevelCarrierChunk + nextLevelChunks.on('data', chunk => levelChunks.push(chunk)) - levelChunks.forEach(chunk => outputStream.push(chunk)) + nextLevelChunks.on('close', async () => { + let carrierChunk = await nextLevelCarrierChunk - while (levelChunks.length !== 1) { - outputStream.push(makeChunk(new Uint8Array())) + levelChunks.forEach(chunk => outputStream.push(chunk)) - const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) + while (levelChunks.length !== 1) { + outputStream.push(makeChunk(new Uint8Array())) - nextLevelChunks.forEach(chunk => outputStream.push(chunk)) + const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) - levelChunks = nextLevelChunks - carrierChunk = nextLevelCarrierChunk - } + nextLevelChunks.forEach(chunk => outputStream.push(chunk)) - outputStream.destroy() - }) + levelChunks = nextLevelChunks + carrierChunk = nextLevelCarrierChunk + } - nextLevelChunks.on('error', error => { - outputStream.emit('error', error) - }) - } catch (error) { - outputStream.emit('error', error as Error) - } - }) + outputStream.destroy() + }) + + nextLevelChunks.on('error', error => { + outputStream.emit('error', error) + }) + } catch (error) { + outputStream.emit('error', error as Error) + } return outputStream } @@ -302,7 +295,6 @@ function bmtWithStreams< /** * Calculates root chunk for leaf chunks received by a readable stream * @param chunks Readable stream of leaf chunks - * @param chunksLength Total number of leaf chunks expected * @param chunkStreamFactory A factory function for a readable stream * @returns Promise resolved with root chunk */ @@ -311,62 +303,42 @@ async function bmtRootChunkWithStreams< SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( chunks: GenericReadable>, - chunksLength: number, chunkStreamFactory: () => GenericReadable>, ): Promise> { const result = new Deferred>() + let chunksLength = 0 try { - if (chunksLength === 0) { - result.reject(new Error(`given chunk array is empty`)) - } - - checkShouldPopCarrierChunkWithStreams(chunks, chunksLength, (error, initialChunk, popCarrierChunk) => { - try { - if (error) { - throw error - } + const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( + chunks, + chunkStreamFactory, + ) - if (popCarrierChunk) { - chunksLength -= 1 - } - - if (chunksLength === 1 && !popCarrierChunk) { - return result.resolve(initialChunk as Chunk) - } + let levelChunks: Chunk[] = [] - const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( - chunks, - chunksLength, - initialChunk as Chunk, - popCarrierChunk, - chunkStreamFactory, - ) - - let levelChunks: Chunk[] = [] + nextLevelChunks.on('data', chunk => { + chunksLength += 1 + levelChunks.push(chunk) + }) - nextLevelChunks.on('data', chunk => { - levelChunks.push(chunk) - }) + nextLevelChunks.on('close', async () => { + if (chunksLength === 0) { + result.reject(new Error(`given chunk array is empty`)) + } - nextLevelChunks.on('close', async () => { - let carrierChunk = await nextLevelCarrierChunk + let carrierChunk = await nextLevelCarrierChunk - while (levelChunks.length !== 1 || carrierChunk) { - const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) - levelChunks = nextLevelChunks - carrierChunk = nextLevelCarrierChunk - } + while (levelChunks.length !== 1 || carrierChunk) { + const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) + levelChunks = nextLevelChunks + carrierChunk = nextLevelCarrierChunk + } - result.resolve(levelChunks[0]) - }) + result.resolve(levelChunks[0]) + }) - nextLevelChunks.on('error', error => { - result.reject(error) - }) - } catch (error) { - result.reject(error) - } + nextLevelChunks.on('error', error => { + result.reject(error) }) } catch (error) { result.reject(error) @@ -377,97 +349,83 @@ async function bmtRootChunkWithStreams< /** * A helper function that generates first level of intermediate chunks using streams. - * It is expected that first chunk has already been received and calculated whether - * last chunk should be excluded. * @param chunks Readable stream of leaf chunks - * @param chunksLength Total number of leaf chunks expected - * @param initialChunk First chunk that has already been received - * @param popCarrierChunk Whether last chunk should be excluded from current level * @param chunkArrayStreamFactory A factory function for a readable stream - * @returns A readable stream of first level intermediate chunks, - * number of chunks in first level and a promise of carrierChunk for this level + * @returns A readable stream of first level intermediate chunks and a promise of + * carrierChunk for this level */ function firstBmtLevelWithStreams< MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( chunks: GenericReadable>, - chunksLength: number, - initialChunk: Chunk, - popCarrierChunk: boolean, chunkArrayStreamFactory: () => GenericReadable>, ): { nextLevelChunks: GenericReadable> nextCarrierChunk: Promise | null> - nextLevelChunksLength: number } { const nextLevelChunks: GenericReadable> = chunkArrayStreamFactory() - if (chunksLength === 0) { - throw new Error('The given chunk array is empty') - } + let firstReceivedChunk: Chunk + let lastReceivedChunk: Chunk + let firstSentChunk: Chunk + + let prevIntermediateChunk: Chunk | null = null - let lastChunk: Chunk - let nextPopCarrierChunk = popCarrierChunk - let nextLevelChunksBuffer: Chunk[] = [initialChunk] const nextCarrierChunk = new Deferred | null>() + let nextLevelChunksBuffer: Chunk[] = [] let generatedChunksCount = 0 - - const maxPayloadLength = initialChunk.maxPayloadLength - const spanLength = initialChunk.spanLength - const maxSegmentCount = maxPayloadLength / SEGMENT_SIZE - let nextLevelChunksLength = Math.ceil(chunksLength / maxSegmentCount) - const carrierChunkIncluded = nextLevelChunksLength % maxSegmentCount !== 0 - let receivedChunks = 1 - - if (popCarrierChunk) { - if (carrierChunkIncluded) { - nextLevelChunksLength += 1 - nextPopCarrierChunk = false - } - } else { - nextPopCarrierChunk = shouldPopCarrierChunk(initialChunk, nextLevelChunksLength) - if (nextPopCarrierChunk) { - nextLevelChunksLength -= 1 - } - } - - if (!nextPopCarrierChunk) { - nextCarrierChunk.resolve(null) - } + let receivedChunks = 0 + let maxPayloadLength: number + let spanLength: number + let maxSegmentCount: number const handleChunk = (chunk: Chunk) => { generatedChunksCount += 1 - if (generatedChunksCount <= nextLevelChunksLength) { - nextLevelChunks.push(chunk) - } else if (nextPopCarrierChunk) { - nextCarrierChunk.resolve(chunk) + if (!firstReceivedChunk) { + firstReceivedChunk = chunk } + + if (generatedChunksCount === 1) { + firstSentChunk = chunk + } + + nextLevelChunks.push(chunk) } chunks.on('data', chunk => { try { receivedChunks += 1 - if (receivedChunks <= chunksLength || !popCarrierChunk) { - nextLevelChunksBuffer.push(chunk) - } + lastReceivedChunk = chunk - lastChunk = chunk + nextLevelChunksBuffer.push(chunk) + + if (receivedChunks === 1) { + firstReceivedChunk = chunk + maxPayloadLength = chunk.maxPayloadLength + spanLength = chunk.spanLength + maxSegmentCount = maxPayloadLength / SEGMENT_SIZE + } for ( let offset = 0; - offset + maxSegmentCount <= nextLevelChunksBuffer.length; + offset + maxSegmentCount < nextLevelChunksBuffer.length; offset += maxSegmentCount ) { + if (prevIntermediateChunk) { + handleChunk(prevIntermediateChunk) + } const childrenChunks = nextLevelChunksBuffer.slice(offset, offset + maxSegmentCount) - const intermediateChunk = createIntermediateChunk(childrenChunks, spanLength, maxPayloadLength) - - handleChunk(intermediateChunk as Chunk) + prevIntermediateChunk = createIntermediateChunk( + childrenChunks, + spanLength, + maxPayloadLength, + ) as Chunk } - if (nextLevelChunksBuffer.length >= maxSegmentCount) { + if (nextLevelChunksBuffer.length > maxSegmentCount) { nextLevelChunksBuffer = nextLevelChunksBuffer.slice( Math.floor(nextLevelChunksBuffer.length / maxSegmentCount) * maxSegmentCount, nextLevelChunksBuffer.length, @@ -480,17 +438,51 @@ function firstBmtLevelWithStreams< }) chunks.on('close', () => { - for (let offset = 0; offset < nextLevelChunksBuffer.length; offset += maxSegmentCount) { - const childrenChunks = nextLevelChunksBuffer.slice(offset, offset + maxSegmentCount) - const intermediateChunk = createIntermediateChunk(childrenChunks, spanLength, maxPayloadLength) - handleChunk(intermediateChunk as Chunk) - } + let nextCarrierChunkValue: Chunk | null = null - if (popCarrierChunk && carrierChunkIncluded) { - nextLevelChunks.push(lastChunk) - } + try { + if (receivedChunks === 0) { + throw new Error('The given chunk array is empty') + } + + const popCarrierChunk = shouldPopCarrierChunk(firstReceivedChunk, receivedChunks) + + if (popCarrierChunk) { + nextLevelChunksBuffer.pop() + } + + if (receivedChunks === 1 && !popCarrierChunk) { + return nextLevelChunks.push(firstReceivedChunk) + } + + for (let offset = 0; offset < nextLevelChunksBuffer.length; offset += maxSegmentCount) { + if (prevIntermediateChunk) { + handleChunk(prevIntermediateChunk) + } + const childrenChunks = nextLevelChunksBuffer.slice(offset, offset + maxSegmentCount) + prevIntermediateChunk = createIntermediateChunk( + childrenChunks, + spanLength, + maxPayloadLength, + ) as Chunk + } - nextLevelChunks.destroy() + if (popCarrierChunk || !shouldPopCarrierChunk(firstSentChunk, generatedChunksCount + 1)) { + handleChunk(prevIntermediateChunk as Chunk) + } else if (shouldPopCarrierChunk(firstSentChunk, generatedChunksCount + 1)) { + nextCarrierChunkValue = prevIntermediateChunk + } + + if (popCarrierChunk && generatedChunksCount % maxSegmentCount !== 0) { + nextLevelChunks.push(lastReceivedChunk) + } + } catch (error) { + nextLevelChunks.emit('error', error as Error) + nextCarrierChunk.reject(error) + } finally { + nextCarrierChunk.resolve(nextCarrierChunkValue) + nextLevelChunks.destroy() + } }) chunks.on('error', error => { @@ -500,56 +492,10 @@ function firstBmtLevelWithStreams< return { nextLevelChunks, - nextLevelChunksLength, nextCarrierChunk: nextCarrierChunk.promise, } } -/** - * A helper function that waits for first chunk to arrive and determines - * whether last chunk should be excluded from current level. - * - * @param chunks Readable chunk stream - * @param chunkLength Total number of chunks expected in the stream - * @param callback Called when first chunk is received and determined wheter last chunk - * should be excluded - */ -function checkShouldPopCarrierChunkWithStreams< - MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, - SpanLength extends number = typeof DEFAULT_SPAN_SIZE, ->( - chunks: GenericReadable>, - chunksLength: number, - callback: ( - error: unknown | null, - initialChunk: Chunk | null, - popCarrierChunk: boolean, - ) => void, -) { - let firstChunk: Chunk | null = null - let popCarrierChunk = false - - chunks.on('data', chunk => { - if (!firstChunk) { - firstChunk = chunk - - popCarrierChunk = shouldPopCarrierChunk(firstChunk, chunksLength) - - callback(null, firstChunk, popCarrierChunk) - } - }) - - chunks.on('close', () => { - if (!firstChunk) { - callback(null, firstChunk, popCarrierChunk) - } - }) - - chunks.on('error', error => { - callback(error, firstChunk, popCarrierChunk) - }) -} - /** * Returs whether last chunk should be excluded from current level. * This can be calculated as soon as first chunk arrives diff --git a/test/integration/file.spec.ts b/test/integration/file.spec.ts index 120afff..93390be 100644 --- a/test/integration/file.spec.ts +++ b/test/integration/file.spec.ts @@ -2,7 +2,6 @@ import { Readable, Transform } from 'node:stream' import { Bee, SPAN_SIZE } from '@ethersphere/bee-js' import FS from 'fs' -import { stat } from 'fs/promises' import path from 'path' import { makeChunkedFile } from '../../src' import { Chunk, DEFAULT_MAX_PAYLOAD_SIZE } from '../../src/chunk' @@ -83,11 +82,9 @@ describe('file-streams', () => { it('should produce same chunk like Bee for data < 4KB', async () => { const filePath = path.join(__dirname, '..', 'test-files', 'text.txt') - const { size } = await stat(filePath) const chunkedFileFromStream = makeChunkedFileWithStreams( transformToByteStream(FS.createReadStream(filePath)), - size, () => new Readable({ objectMode: true, @@ -105,11 +102,9 @@ describe('file-streams', () => { it('should produce same BMT tree like Bee for data > 4KB', async () => { const filePath = path.join(__dirname, '..', 'test-files', 'The-Book-of-Swarm.pdf') - const { size } = await stat(filePath) const chunkedFileFromStream = makeChunkedFileWithStreams( transformToByteStream(FS.createReadStream(filePath)), - size, () => new Readable({ objectMode: true, @@ -151,11 +146,9 @@ describe('file-streams', () => { it('should work with edge case - carrier chunk', async () => { const filePath = path.join(__dirname, '..', 'test-files', 'carrier-chunk-blob') - const { size } = await stat(filePath) const rootChunk = await createBmtRootChunkWithStreams( transformToByteStream(FS.createReadStream(filePath)), - size, () => new Readable({ objectMode: true, @@ -171,11 +164,9 @@ describe('file-streams', () => { it('should work with edge case - carrier chunk in intermediate level', async () => { const filePath = path.join(__dirname, '..', 'test-files', 'carrier-chunk-blob-2') - const { size } = await stat(filePath) const rootChunk = await createBmtRootChunkWithStreams( transformToByteStream(FS.createReadStream(filePath)), - size, () => new Readable({ objectMode: true,