diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cccbaee..a5dabd3 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -24,7 +24,7 @@ jobs: strategy: matrix: - node-version: [14.x, 15.x, 16.x, 17.x] + node-version: [16.x, 17.x, 18.x] steps: - name: Checkout diff --git a/README.md b/README.md index fa17ea2..277fbdf 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,12 @@ You can import the followings directly from `@fairdatasociety/bmt-js`: * address() # gives back the calculated chunk address of file data * span() # serialized span value of the file * bmt() # gives back the Binary Merkle Tree of the file data +* ChunkedFileDeferred + * payload # the passed byte stream with which the object was initialized. + * leafChunks # data chunks stream of the file data + * rootChunk # promise that resolves topmost chunk in the file BMT + * address # promise of the calculated chunk address of file data + * bmt # stream of the Binary Merkle Tree of the file data (levels are separated by an empty chunk) * ChunkInclusionProof # groups chunk inclusion proof segments and span value of a chunk * ChunkAddress # chunk address resulted from BMT hashing of data. It is used also fole FileAddress * Span # span value in byte format. Indicates how much data subsumed under the Chunk/File @@ -85,6 +91,9 @@ You can import the followings directly from `@fairdatasociety/bmt-js`: * getBmtIndexOfSegment # get the chunk's position of a given payload segment index in the BMT tree * fileInclusionProofBottomUp # gives back required sister segments of a given payload segment index for inclusion proof * fileAddressFromInclusionProof # gives back the file address that is calculated with only the inclusion proof segments and the corresponding proved segment and its position. +* makeChunkedFileWithStreams # makes `Chunk` helper object for performing BMT actions on file data using streams +* createBmtWithStreams # generates BMT chunks and outputs them to a readable stream. +* createBmtRootChunkWithStreams # calculates root chunk for bytes received by a readable stream ## Other objects diff --git a/jest.config.ts b/jest.config.ts index c55452d..5abcd81 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -26,7 +26,7 @@ export default async (): Promise => { // An array of regexp pattern strings that are matched against all test paths, matched tests are skipped testPathIgnorePatterns: ['/node_modules/'], - testTimeout: 90000, // 1.5 min + testTimeout: 180000, // 3 min // Run tests from one or more projects projects: [ diff --git a/src/file-streams.ts b/src/file-streams.ts new file mode 100644 index 0000000..3149301 --- /dev/null +++ b/src/file-streams.ts @@ -0,0 +1,517 @@ +import { Chunk, ChunkAddress, DEFAULT_MAX_PAYLOAD_SIZE, SEGMENT_SIZE, makeChunk } from './chunk' +import { createIntermediateChunk, nextBmtLevel } from './file' +import { DEFAULT_SPAN_SIZE, Span, makeSpan } from './span' +import { Deferred, Flavor, concatBytes } from './utils' + +export interface GenericReadable { + on(event: 'close', listener: () => void): this + on(event: 'data', listener: (chunk: T) => void): this + on(event: 'error', listener: (err: Error) => void): this + push(chunk: unknown, encoding?: BufferEncoding): boolean + destroy(error?: Error): this + emit(event: 'error', err: Error): boolean +} + +export interface ChunkedFileDeferred< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +> extends Flavor<'ChunkedFile'> { + // zero level data chunks + leafChunks: GenericReadable> + rootChunk: Promise> + payload: GenericReadable + address: Promise + span: Promise> + bmt: GenericReadable> +} + +/** + * Calculates total number of bytes received in given readable stream + * @param payload byte array stream + * @returns Total number of bytes resolved by a promise + */ +async function getByteStreamLength(payload: GenericReadable): Promise { + return new Promise((resolve, reject) => { + let dataLength = 0 + payload.on('data', chunk => (dataLength += chunk.length)) + + payload.on('close', () => resolve(dataLength)) + + payload.on('error', error => reject(error)) + }) +} + +/** + * Creates object for performing BMT functions on payload data using streams + * + * @param payload byte array stream of the data + * @param chunkStreamFactory A factory function for a readable stream + * @param options settings for the used chunks + * @returns ChunkedFileDeferred object with helper methods + */ +export function makeChunkedFileWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): ChunkedFileDeferred { + const spanLength = (options?.spanLength || DEFAULT_SPAN_SIZE) as SpanLength + const payloadLengthPromise = getByteStreamLength(payload) + + const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) + + const rootChunk = bmtRootChunkWithStreams(leafStream, chunkStreamFactory) + + const address = new Promise(async (resolve, reject) => { + try { + resolve((await rootChunk).address()) + } catch (error) { + reject(error) + } + }) + + const span = new Promise>(async (resolve, reject) => { + try { + resolve(makeSpan(await payloadLengthPromise, spanLength)) + } catch (error) { + reject(error) + } + }) + + const bmt = bmtWithStreams(leafStream, chunkStreamFactory) + + return { + payload, + span, + leafChunks: leafStream, + address: address as Promise, + rootChunk, + bmt, + } +} + +/** + * Generates BMT chunks and outputs them to a readable stream. + * @param payload Readable stream of Uint8Array data + * @param chunkStreamFactory A factory function for a readable stream + * @param options settings for the used chunks + * @returns A readable stream with all chunks from BMT. Levels are separated + * by empty chunks (payload.length === 0) + */ +export function createBmtWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): GenericReadable> { + const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) + + return bmtWithStreams(leafStream, chunkStreamFactory) +} + +/** + * Calculates root chunk for bytes received by a readable stream + * @param payload Readable stream of Uint8Array data + * @param chunkStreamFactory A factory function for a readable stream + * @param options settings for the used chunks + * @returns Promise resolved with root chunk + */ +export async function createBmtRootChunkWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): Promise> { + const leafStream = createLeafChunksStream(payload, chunkStreamFactory, options) + + return bmtRootChunkWithStreams(leafStream, chunkStreamFactory) +} + +/** + * Returns a readable stream of leaf chunks for received bytes. + * @param payload Readable stream of Uint8Array data + * @param chunkStreamFactory A factory function for a readable stream + * @param options settings for the used chunks + */ +export function createLeafChunksStream< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + payload: GenericReadable, + chunkStreamFactory: () => GenericReadable>, + options?: { + maxPayloadLength?: MaxChunkPayloadLength + spanLength?: SpanLength + }, +): GenericReadable> { + const maxPayloadLength = (options?.maxPayloadLength || DEFAULT_MAX_PAYLOAD_SIZE) as MaxChunkPayloadLength + + let buffer: Uint8Array = new Uint8Array() + let dataLength = 0 + const leafStream = chunkStreamFactory() + + payload.on('data', chunk => { + buffer = concatBytes(buffer, chunk) + dataLength += chunk.length + + for (let offset = 0; offset + maxPayloadLength <= buffer.length; offset += maxPayloadLength) { + leafStream.push(makeChunk(buffer.slice(offset, offset + maxPayloadLength), options)) + } + + if (buffer.length >= maxPayloadLength) { + buffer = buffer.slice(Math.floor(buffer.length / maxPayloadLength) * maxPayloadLength, buffer.length) + } + }) + + payload.on('close', () => { + if (dataLength === 0) { + leafStream.push(makeChunk(new Uint8Array(), options)) + } else { + for (let offset = 0; offset < buffer.length; offset += maxPayloadLength) { + leafStream.push(makeChunk(buffer.slice(offset, offset + maxPayloadLength), options)) + } + } + + leafStream.destroy() + }) + + payload.on('error', error => { + leafStream.emit('error', error) + }) + + return leafStream +} + +/** + * Generates BMT chunks and outputs them to a readable stream. + * @param chunks Readable stream of leaf chunks + * @param chunkStreamFactory A factory function for a readable stream + * @returns A readable stream with all chunks from BMT. Levels are separated + * by empty chunks (payload.length === 0) + */ +function bmtWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + leafChunks: GenericReadable>, + chunkStreamFactory: () => GenericReadable>, +): GenericReadable> { + const outputStream = chunkStreamFactory() + let chunksLength = 0 + + try { + let firstChunk: Chunk | null = null + let prevChunk: Chunk | null = null + + leafChunks.on('data', chunk => { + chunksLength += 1 + + if (chunksLength === 1) { + firstChunk = chunk + } + + if (prevChunk) { + outputStream.push(prevChunk) + } + + prevChunk = chunk + }) + + leafChunks.on('close', () => { + try { + if (chunksLength === 0) { + throw new Error(`given chunk array is empty`) + } + + if (!shouldPopCarrierChunk(firstChunk as Chunk, chunksLength)) { + outputStream.push(prevChunk) + } + + if (chunksLength === 1) { + outputStream.destroy() + } else { + outputStream.push(makeChunk(new Uint8Array())) + } + } catch (error) { + outputStream.emit('error', error as Error) + } + }) + + leafChunks.on('error', error => outputStream.emit('error', error)) + + const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( + leafChunks, + chunkStreamFactory, + ) + + let levelChunks: Chunk[] = [] + + nextLevelChunks.on('data', chunk => levelChunks.push(chunk)) + + nextLevelChunks.on('close', async () => { + let carrierChunk = await nextLevelCarrierChunk + + levelChunks.forEach(chunk => outputStream.push(chunk)) + + while (levelChunks.length !== 1) { + outputStream.push(makeChunk(new Uint8Array())) + + const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) + + nextLevelChunks.forEach(chunk => outputStream.push(chunk)) + + levelChunks = nextLevelChunks + carrierChunk = nextLevelCarrierChunk + } + + outputStream.destroy() + }) + + nextLevelChunks.on('error', error => { + outputStream.emit('error', error) + }) + } catch (error) { + outputStream.emit('error', error as Error) + } + + return outputStream +} + +/** + * Calculates root chunk for leaf chunks received by a readable stream + * @param chunks Readable stream of leaf chunks + * @param chunkStreamFactory A factory function for a readable stream + * @returns Promise resolved with root chunk + */ +async function bmtRootChunkWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + chunks: GenericReadable>, + chunkStreamFactory: () => GenericReadable>, +): Promise> { + const result = new Deferred>() + let chunksLength = 0 + + try { + const { nextCarrierChunk: nextLevelCarrierChunk, nextLevelChunks } = firstBmtLevelWithStreams( + chunks, + chunkStreamFactory, + ) + + let levelChunks: Chunk[] = [] + + nextLevelChunks.on('data', chunk => { + chunksLength += 1 + levelChunks.push(chunk) + }) + + nextLevelChunks.on('close', async () => { + if (chunksLength === 0) { + result.reject(new Error(`given chunk array is empty`)) + } + + let carrierChunk = await nextLevelCarrierChunk + + while (levelChunks.length !== 1 || carrierChunk) { + const { nextLevelChunks, nextLevelCarrierChunk } = nextBmtLevel(levelChunks, carrierChunk) + levelChunks = nextLevelChunks + carrierChunk = nextLevelCarrierChunk + } + + result.resolve(levelChunks[0]) + }) + + nextLevelChunks.on('error', error => { + result.reject(error) + }) + } catch (error) { + result.reject(error) + } + + return result.promise +} + +/** + * A helper function that generates first level of intermediate chunks using streams. + * @param chunks Readable stream of leaf chunks + * @param chunkArrayStreamFactory A factory function for a readable stream + * @returns A readable stream of first level intermediate chunks and a promise of + * carrierChunk for this level + */ +function firstBmtLevelWithStreams< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>( + chunks: GenericReadable>, + chunkArrayStreamFactory: () => GenericReadable>, +): { + nextLevelChunks: GenericReadable> + nextCarrierChunk: Promise | null> +} { + const nextLevelChunks: GenericReadable> = chunkArrayStreamFactory() + + let firstReceivedChunk: Chunk + let lastReceivedChunk: Chunk + let firstSentChunk: Chunk + + let prevIntermediateChunk: Chunk | null = null + + const nextCarrierChunk = new Deferred | null>() + let nextLevelChunksBuffer: Chunk[] = [] + let generatedChunksCount = 0 + let receivedChunks = 0 + let maxPayloadLength: number + let spanLength: number + let maxSegmentCount: number + + const handleChunk = (chunk: Chunk) => { + generatedChunksCount += 1 + + if (!firstReceivedChunk) { + firstReceivedChunk = chunk + } + + if (generatedChunksCount === 1) { + firstSentChunk = chunk + } + + nextLevelChunks.push(chunk) + } + + chunks.on('data', chunk => { + try { + receivedChunks += 1 + + lastReceivedChunk = chunk + + nextLevelChunksBuffer.push(chunk) + + if (receivedChunks === 1) { + firstReceivedChunk = chunk + maxPayloadLength = chunk.maxPayloadLength + spanLength = chunk.spanLength + maxSegmentCount = maxPayloadLength / SEGMENT_SIZE + } + + for ( + let offset = 0; + offset + maxSegmentCount < nextLevelChunksBuffer.length; + offset += maxSegmentCount + ) { + if (prevIntermediateChunk) { + handleChunk(prevIntermediateChunk) + } + const childrenChunks = nextLevelChunksBuffer.slice(offset, offset + maxSegmentCount) + prevIntermediateChunk = createIntermediateChunk( + childrenChunks, + spanLength, + maxPayloadLength, + ) as Chunk + } + + if (nextLevelChunksBuffer.length > maxSegmentCount) { + nextLevelChunksBuffer = nextLevelChunksBuffer.slice( + Math.floor(nextLevelChunksBuffer.length / maxSegmentCount) * maxSegmentCount, + nextLevelChunksBuffer.length, + ) + } + } catch (error) { + nextLevelChunks.emit('error', error as Error) + nextCarrierChunk.reject(error) + } + }) + + chunks.on('close', () => { + let nextCarrierChunkValue: Chunk | null = null + + try { + if (receivedChunks === 0) { + throw new Error('The given chunk array is empty') + } + + const popCarrierChunk = shouldPopCarrierChunk(firstReceivedChunk, receivedChunks) + + if (popCarrierChunk) { + nextLevelChunksBuffer.pop() + } + + if (receivedChunks === 1 && !popCarrierChunk) { + return nextLevelChunks.push(firstReceivedChunk) + } + + for (let offset = 0; offset < nextLevelChunksBuffer.length; offset += maxSegmentCount) { + if (prevIntermediateChunk) { + handleChunk(prevIntermediateChunk) + } + const childrenChunks = nextLevelChunksBuffer.slice(offset, offset + maxSegmentCount) + prevIntermediateChunk = createIntermediateChunk( + childrenChunks, + spanLength, + maxPayloadLength, + ) as Chunk + } + + if (popCarrierChunk || !shouldPopCarrierChunk(firstSentChunk, generatedChunksCount + 1)) { + handleChunk(prevIntermediateChunk as Chunk) + } else if (shouldPopCarrierChunk(firstSentChunk, generatedChunksCount + 1)) { + nextCarrierChunkValue = prevIntermediateChunk + } + + if (popCarrierChunk && generatedChunksCount % maxSegmentCount !== 0) { + nextLevelChunks.push(lastReceivedChunk) + } + } catch (error) { + nextLevelChunks.emit('error', error as Error) + nextCarrierChunk.reject(error) + } finally { + nextCarrierChunk.resolve(nextCarrierChunkValue) + nextLevelChunks.destroy() + } + }) + + chunks.on('error', error => { + nextLevelChunks.emit('error', error) + nextCarrierChunk.reject(error) + }) + + return { + nextLevelChunks, + nextCarrierChunk: nextCarrierChunk.promise, + } +} + +/** + * Returs whether last chunk should be excluded from current level. + * This can be calculated as soon as first chunk arrives + * @param firstChunk First chunk of current level + * @param chunkLength Number of chunks in current level + * @returns Whether last chunk should be excluded + */ +function shouldPopCarrierChunk< + MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, + SpanLength extends number = typeof DEFAULT_SPAN_SIZE, +>(firstChunk: Chunk, chunkLength: number): boolean { + if (chunkLength <= 1) return false + + const maxDataLength = firstChunk.maxPayloadLength + // max segment count in one chunk. the segment size have to be equal to the chunk addresses + const maxSegmentCount = maxDataLength / SEGMENT_SIZE + + return chunkLength % maxSegmentCount === 1 +} diff --git a/src/file.ts b/src/file.ts index 1951c15..99cbb9a 100644 --- a/src/file.ts +++ b/src/file.ts @@ -264,7 +264,7 @@ function bmtRootChunk< return levelChunks[0] } -function nextBmtLevel< +export function nextBmtLevel< MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( @@ -308,7 +308,7 @@ function nextBmtLevel< } } -function createIntermediateChunk< +export function createIntermediateChunk< MaxChunkPayloadLength extends number = typeof DEFAULT_MAX_PAYLOAD_SIZE, SpanLength extends number = typeof DEFAULT_SPAN_SIZE, >( diff --git a/src/index.ts b/src/index.ts index 8f33394..18ec6ba 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,6 +6,11 @@ export { fileAddressFromInclusionProof, getBmtIndexOfSegment, } from './file' +export { + makeChunkedFileWithStreams, + createBmtWithStreams, + createBmtRootChunkWithStreams, +} from './file-streams' export { Chunk, ChunkAddress, makeChunk, rootHashFromInclusionProof } from './chunk' export { Span, makeSpan, getSpanValue } from './span' export * as Utils from './utils' diff --git a/src/utils.ts b/src/utils.ts index 622fc9d..b88d3c0 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,8 +1,22 @@ +/* eslint-disable @typescript-eslint/no-empty-function */ import { keccak256, Message } from 'js-sha3' /** Used for FavorTypes */ export type Flavor = { __tag__?: Name } +export class Deferred { + public resolve: (value: T) => void = () => {} + public reject: (reason?: unknown) => void = () => {} + public promise: Promise + + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + } +} + /** * Nominal type to represent hex strings WITHOUT '0x' prefix. * For example for 32 bytes hex representation you have to use 64 length. @@ -111,3 +125,12 @@ export function equalBytes(a: Uint8Array, b: Uint8Array): boolean { } export { Message } + +export function concatBytes(bytes1: Uint8Array, bytes2: Uint8Array): Uint8Array { + const buffer = new Uint8Array(bytes1.length + bytes2.length) + + buffer.set(bytes1, 0) + buffer.set(bytes2, bytes1.length) + + return buffer +} diff --git a/test/integration/file.spec.ts b/test/integration/file.spec.ts index 4fe564b..93390be 100644 --- a/test/integration/file.spec.ts +++ b/test/integration/file.spec.ts @@ -1,9 +1,12 @@ +/* eslint-disable @typescript-eslint/no-empty-function */ +import { Readable, Transform } from 'node:stream' import { Bee, SPAN_SIZE } from '@ethersphere/bee-js' import FS from 'fs' import path from 'path' import { makeChunkedFile } from '../../src' -import { DEFAULT_MAX_PAYLOAD_SIZE } from '../../src/chunk' +import { Chunk, DEFAULT_MAX_PAYLOAD_SIZE } from '../../src/chunk' import { bytesToHex } from '../../src/utils' +import { createBmtRootChunkWithStreams, makeChunkedFileWithStreams } from '../../src/file-streams' const beeUrl = process.env.BEE_API_URL || 'http://localhost:1633' const bee = new Bee(beeUrl) @@ -66,3 +69,114 @@ describe('file', () => { expect(bytesToHex(chunkedFile.address(), 64)).toBe(beeResult.reference) }) }) + +describe('file-streams', () => { + const transformToByteStream = (readable: FS.ReadStream) => + readable.pipe( + new Transform({ + transform(chunk, encoding, callback) { + callback(null, Uint8Array.from(chunk as Buffer)) + }, + }), + ) + + it('should produce same chunk like Bee for data < 4KB', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'text.txt') + + const chunkedFileFromStream = makeChunkedFileWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + + expect(bytesToHex((await chunkedFileFromStream.rootChunk).address(), 64)).toStrictEqual( + bytesToHex(chunkedFile.address(), 64), + ) + }) + + it('should produce same BMT tree like Bee for data > 4KB', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'The-Book-of-Swarm.pdf') + + const chunkedFileFromStream = makeChunkedFileWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const bmtStream = chunkedFileFromStream.bmt + const treeFromStream: Chunk<4096, 8>[][] = [[]] + + bmtStream.on('data', chunk => { + if (chunk.payload.length === 0) { + treeFromStream.push([]) + } else { + treeFromStream[treeFromStream.length - 1].push(chunk) + } + }) + + await new Promise((resolve, reject) => { + bmtStream.on('close', () => resolve()) + bmtStream.on('error', error => reject(error)) + }) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + const tree = chunkedFile.bmt() + + expect(treeFromStream[0][0].payload).toStrictEqual(tree[0][0].payload) + + expect(treeFromStream[1][0].payload).toStrictEqual(tree[1][0].payload) + expect(treeFromStream[1][0].span()).toStrictEqual(tree[1][0].span()) + expect(treeFromStream[1][0].address()).toStrictEqual(tree[1][0].address()) + expect(bytesToHex(treeFromStream[2][0].address(), 64)).toStrictEqual(bytesToHex(tree[2][0].address(), 64)) + + expect(bytesToHex(await chunkedFileFromStream.address, 64)).toStrictEqual( + bytesToHex(chunkedFile.address(), 64), + ) + }) + + it('should work with edge case - carrier chunk', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'carrier-chunk-blob') + + const rootChunk = await createBmtRootChunkWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + + expect(bytesToHex(rootChunk.address(), 64)).toBe(bytesToHex(chunkedFile.address(), 64)) + }) + + it('should work with edge case - carrier chunk in intermediate level', async () => { + const filePath = path.join(__dirname, '..', 'test-files', 'carrier-chunk-blob-2') + + const rootChunk = await createBmtRootChunkWithStreams( + transformToByteStream(FS.createReadStream(filePath)), + () => + new Readable({ + objectMode: true, + read: () => {}, + }), + ) + + const fileBytes = Uint8Array.from(FS.readFileSync(filePath)) + const chunkedFile = makeChunkedFile(fileBytes) + + expect(bytesToHex(rootChunk.address(), 64)).toBe(bytesToHex(chunkedFile.address(), 64)) + }) +}) diff --git a/test/test-setup.ts b/test/test-setup.ts index 0542f46..9b76413 100644 --- a/test/test-setup.ts +++ b/test/test-setup.ts @@ -26,5 +26,7 @@ export default async function testsSetup(): Promise { // so we are only logging errors and not leaving them to propagate console.error(e) } + + await sleep(120 * 1000) } }