-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdirectly.js
123 lines (104 loc) · 2.72 KB
/
directly.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
'use strict';
function getRemover (arr, target) {
return () => {
arr.splice(arr.indexOf(target), 1);
};
}
class Directly {
constructor (concurrence, funcs) {
this.results = [];
this.concurrence = concurrence;
this.funcs = funcs;
this.terminates = Array.isArray(this.funcs);
this.cancelled = false;
if (!Array.isArray(this.funcs)) {
this.funcs.attachDirectlyInstance(this);
}
this.competitors = [];
}
run () {
if (Array.isArray(this.funcs) && !this.funcs.length) {
return Promise.resolve([]);
}
if (Array.isArray(this.funcs) && typeof this.funcs[0] !== 'function') {
throw new TypeError('directly expects a list functions that return a Promise, not a list of Promises')
}
if (this.terminates) {
if (this.funcs.length <= this.concurrence) {
return Promise.all(this.funcs.map(func => func()));
}
while (this.concurrence - this.competitors.length) {
this.executeOne();
}
this.startRace();
} else if (!this.running) {
// never take the Promise.all shortcut as even if the initial list is short, it
// could easily grow to exceed the concurrence limit.
while (this.funcs.length && this.concurrence - this.competitors.length) {
this.executeOne();
}
this.startRace();
}
this.running === true;
if (!this.resolve) {
return this.instancePromise();
}
}
instancePromise () {
return new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
executeOne () {
const promise = this.funcs.shift()();
this.results.push(promise);
this.competitors.push(promise);
const remove = getRemover(this.competitors, promise)
promise.then(remove, remove);
}
startRace () {
const race = this.race = Promise.race(this.competitors);
race
.then(() => {
this.rejoinRace(race);
}, err => {
if (this.terminates) {
this.reject(err);
} else {
// give the ability to handle future errors;
const reject = this.reject;
const nextPromise = this.instancePromise();
reject({
error: err,
nextError: nextPromise,
terminate: this.terminate.bind(this)
});
this.rejoinRace(race);
}
});
}
rejoinRace (race) {
if (this.race === race) {
if (!this.funcs.length) {
if (this.terminates) {
return this.resolve(Promise.all(this.results));
} else {
this.running = false;
}
} else if (!this.cancelled) {
this.executeOne();
this.startRace();
}
}
}
terminate () {
this.resolve();
this.cancelled = true;
}
}
module.exports = function SmartConstructor (concurrence, funcs) {
const directly = new Directly(concurrence, funcs)
return (this instanceof SmartConstructor) ? directly : directly.run();
};
module.exports.Queue = require('./lib/queue');