Skip to content

Commit

Permalink
feat(lib): add zip functions
Browse files Browse the repository at this point in the history
  • Loading branch information
TomerAberbach committed Nov 25, 2024
1 parent 6395518 commit 7ee17ea
Show file tree
Hide file tree
Showing 4 changed files with 409 additions and 3 deletions.
98 changes: 98 additions & 0 deletions src/operations/splices.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1238,3 +1238,101 @@ export const concatConcur: <Value>(
| ConcurIterable<Value>
)[]
) => ConcurIterable<Value>

/**
* Returns an iterable that pairs up same-index values from the given
* `iterables` into tuples.
*
* The `iterables` are iterated in parallel until the shortest one is done, at
* which point the returned iterable is done.
*
* @example
* ```js
* console.log(
* pipe(
* zip(
* [1, 2, 3, 4],
* [5, `sloth`, 7],
* [8, 9, 10],
* ),
* reduce(toArray()),
* ),
* )
* //=> [ [ 1, 5, 8 ], [ 2, 'sloth', 9 ], [ 3, 7, 10 ] ]
* ```
*
* @category Splices
* @since v3.8.0
*/
export const zip: <Values extends unknown[] | []>(
...iterables: Readonly<{ [Key in keyof Values]: Iterable<Values[Key]> }>
) => Iterable<Values>

/**
* Returns an async iterable that pairs up same-index values from the given
* `iterables` into tuples.
*
* The `iterables` are iterated in parallel until the shortest one is done, at
* which point the returned async iterable is done.
*
* @example
* ```js
* console.log(
* await pipe(
* zip(
* asAsync([1, 2, 3, 4]),
* [5, `sloth`, 7],
* asAsync([8, 9, 10]),
* ),
* reduceAsync(toArray()),
* ),
* )
* //=> [ [ 1, 5, 8 ], [ 2, 'sloth', 9 ], [ 3, 7, 10 ] ]
* ```
*
* @category Splices
* @since v3.8.0
*/
export const zipAsync: <Values extends unknown[] | []>(
...iterables: Readonly<{
[Key in keyof Values]: Iterable<Values[Key]> | AsyncIterable<Values[Key]>
}>
) => AsyncIterable<Values>

/**
* Returns a concur iterable that pairs up same-index values, in iteration
* order, from the given `iterables` into tuples.
*
* The `iterables` are iterated in parallel until the shortest one is done, at
* which point the returned concur iterable is done.
*
* WARNING: If one of the concur iterables yields values more quickly than
* others, then an unbounded number of its values will be buffered so that they
* can be yielded with the values of other concur iterables at the same index.
*
* @example
* ```js
* console.log(
* await pipe(
* zip(
* asAsync([1, 2, 3, 4]),
* [5, `sloth`, 7],
* asAsync([8, 9, 10]),
* ),
* reduceAsync(toArray()),
* ),
* )
* //=> [ [ 1, 5, 8 ], [ 2, 'sloth', 9 ], [ 3, 7, 10 ] ]
* ```
*
* @category Splices
* @since v3.8.0
*/
export const zipConcur: <Values extends unknown[] | []>(
...iterables: Readonly<{
[Key in keyof Values]:
| Iterable<Values[Key]>
| AsyncIterable<Values[Key]>
| ConcurIterable<Values[Key]>
}>
) => ConcurIterable<Values>
113 changes: 113 additions & 0 deletions src/operations/splices.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
pipe,
} from './core.js'
import { findLast, findLastAsync, findLastConcur } from './filters.js'
import { allConcur } from './predicates.js'
import {
flatten,
flattenAsync,
Expand Down Expand Up @@ -338,3 +339,115 @@ const createWindow = size => {
export const concat = (...iterables) => flatten(iterables)
export const concatAsync = (...iterables) => flattenAsync(asAsync(iterables))
export const concatConcur = (...iterables) => flattenConcur(asConcur(iterables))

export const zip = (...iterables) =>
createIterable(function* () {
if (iterables.length === 0) {
return
}

const iterators = iterables.map(iterable => iterable[Symbol.iterator]())
const values = iterables.map(() => undefined)
while (
iterators.every((iterator, index) => {
const result = iterator.next()
values[index] = result.value
return !result.done
})
) {
yield [...values]
}
})

export const zipAsync = (...iterables) =>
createAsyncIterable(async function* () {
if (iterables.length === 0) {
return
}

const asyncIterators = iterables.map(iterable =>
asAsync(iterable)[Symbol.asyncIterator](),
)
const values = iterables.map(() => undefined)
while (
await allConcur(async ([index, asyncIterator]) => {
const result = await asyncIterator.next()
values[index] = result.value
return !result.done
}, asConcur(asyncIterators.entries()))
) {
yield [...values]
}
})

export const zipConcur =
(...iterables) =>
apply =>
promiseWithEarlyResolve(async resolve => {
if (iterables.length === 0) {
return
}

const valuesPerIterable = iterables.map(() => [])
const remainingValues = []
let remainingBatches = Infinity
let resolved = false

await Promise.all(
map(async ([index, iterable]) => {
const values = valuesPerIterable[index]

await asConcur(iterable)(async value => {
if (resolved) {
return
}

const valueIndex = values.length
if (valueIndex >= remainingBatches) {
// There's no point in processing this value because the length of
// the shortest known iterable, so it's not going to be part of
// any yielded batch.
return
}

// Queue this value for a future batch and track the remaining
// number of values we're waiting on to be able to yield that batch.
values.push(value)
if (valueIndex >= remainingValues.length) {
remainingValues.push(iterables.length)
}
remainingValues[valueIndex]--

const canYieldBatch = valueIndex === 0 && remainingValues[0] === 0
if (!canYieldBatch) {
return
}

// Dequeue and yield the next batch.
remainingValues.shift()
const batch = valuesPerIterable.map(values => values.shift())
remainingBatches--
await apply(batch)

if (!resolved && remainingBatches === 0) {
resolved = true
resolve()
}
})

if (values.length > 0) {
// The remaining number of batches can be no more than the remaining
// number of queued values for this resolved concur iterable. Track
// that number so we can resolve the zipped concur iterable as soon
// as possible (see above).
remainingBatches = Math.min(remainingBatches, values.length)
} else {
// We can resolve early because there aren't any queued values left
// for concur iterable, so we'll never complete and yield another
// batch.
resolved = true
resolve()
}
}, iterables.entries()),
)
})
17 changes: 14 additions & 3 deletions test/helpers/fast-check/iterables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ export const getIterableArb = <Value>(
getArrayArb(arb, constraints).map(values => ({
iterable: new IterableWithPrivateFields(values),
values,
iterationOrder: values,
}))

export type GeneratedIterable<Value> = {
iterable: Iterable<Value>
values: Value[]
iterationOrder: Value[]
}

// Used to ensure we call `Symbol.iterator` with the right `this`.
Expand Down Expand Up @@ -62,11 +64,13 @@ export const getAsyncIterableArb = <Value>(
getArrayArb(arb, constraints).map(values => ({
iterable: new AsyncIterableWithPrivateFields(values),
values,
iterationOrder: values,
}))

export type GeneratedAsyncIterable<Value> = {
iterable: AsyncIterable<Value>
values: Value[]
iterationOrder: Value[]
}

// Used to ensure we call `Symbol.asyncIterator` with the right `this`.
Expand Down Expand Up @@ -102,24 +106,31 @@ export const getConcurIterableArb = <Value>(
): fc.Arbitrary<GeneratedConcurIterable<Value>> =>
getArrayArb(arb, constraints).map(values => {
const index = getIterableIndex()
const iterationOrder: Value[] = []
return {
iterable: Object.assign(
async (apply: (value: Value) => MaybePromiseLike<void>) => {
await Promise.all(
values.map(async value =>
apply(await getScheduler()!.schedule(value)),
),
values.map(async value => {
const scheduledValue = await getScheduler()!.schedule(value)
iterationOrder.push(value)
await apply(scheduledValue)
}),
)
},
{ [fc.toStringMethod]: () => `ConcurIterable$${index}` },
),
values,
get iterationOrder() {
return iterationOrder
},
}
})

export type GeneratedConcurIterable<Value> = {
iterable: ConcurIterable<Value>
values: Value[]
iterationOrder: Value[]
}

export const concurIterableArb = getConcurIterableArb(fc.anything())
Expand Down
Loading

0 comments on commit 7ee17ea

Please sign in to comment.