-
Notifications
You must be signed in to change notification settings - Fork 0
/
iterable-mapper.ts
116 lines (94 loc) · 3.68 KB
/
iterable-mapper.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/* eslint-disable no-console */
import { IterableMapper } from '@shutterstock/p-map-iterable';
import { promisify } from 'util';
const sleep = promisify(setTimeout);
class SleepIterator implements AsyncIterable<{ fileName: string; fileSizeGB: number }> {
private _max: number;
private _current = 1;
private _files: number[];
constructor(max: number) {
this._max = max;
// Create a list of remote files to download
// Each file has a random size between 10 and 100 GB
this._files = Array.from({ length: 100 }, () => Math.floor(Math.random() * 90) + 10);
}
async *[Symbol.asyncIterator](): AsyncIterator<{ fileName: string; fileSizeGB: number }> {
for (let i = 0; i < this._max; i++) {
const nextFile = this._files[i];
if (nextFile === undefined) {
break;
}
// Take some time to generate the next item
await sleep(33 * (i % 10));
if (this._current <= this._max) {
yield { fileName: `${i.toString().padStart(5, '0')}.json`, fileSizeGB: nextFile };
}
this._current++;
}
}
}
async function main() {
const max = 100;
const iterator = new SleepIterator(max);
let queuedButUnreadFileSizeGB = 0;
let callCount = 0;
// Create an item prefetcher with IterableMapper
// Use `npm i it-batch`'s `batch` function to batch the source iterable
// into batches of a specific size, such as 50, if making batch requests
// to a remote service.
const prefetcher = new IterableMapper(
// Batch example:
// batch(iterator, 50),
iterator,
// Batch example:
// async (values: number[]): Promise<number> => {
async (value: {
fileName: string;
fileSizeGB: number;
}): Promise<{ fileName: string; fileSizeGB: number; status: number }> => {
const myCallCount = callCount++;
console.log(
`Mapper Call Start ${myCallCount}, FileName: ${value.fileName}, FileSizeGB: ${value.fileSizeGB}`,
);
// Simulate fetching an async files with varied delays
// for example: fetching from AWS S3
await sleep(Math.random() * 500);
console.log(
`Mapper Call Done ${myCallCount}, FileName: ${value.fileName}, FileSizeGB: ${value.fileSizeGB}`,
);
queuedButUnreadFileSizeGB += value.fileSizeGB;
return { ...value, status: 200 };
},
{ concurrency: 2, maxUnread: 10 },
);
let loopCount = 0;
// Loop through the prefetched items or batches
// Will pause and wait for prefetch to complete if none available
// Will immediately start processing if prefetched item or batch is already available
// If the `maxUnread` count was hit then no prefetches will be in progress until an item
// is returned from the iterable here. Once an item is returned a mapper
// will be started; this will repeat until `concurrency` mappers are running again.
for await (const item of prefetcher) {
loopCount++;
console.log(
`Result Start ${loopCount.toString().padStart(3, '0')}, FileName: ${
item.fileName
}, FileSizeGB: ${item.fileSizeGB}, QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB
.toString()
.padStart(3, '0')}`,
);
// Simulate taking some time to process this big file of stuff
// This is where we're going to keep the JS thread busy
await sleep(Math.random() * 500);
queuedButUnreadFileSizeGB -= item.fileSizeGB;
console.log(
`Result Done ${loopCount.toString().padStart(3, '0')}, FileName: ${
item.fileName
}, FileSizeGB: ${item.fileSizeGB}, QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB
.toString()
.padStart(3, '0')}`,
);
}
console.log(`QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB}`);
}
void main();