-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
141 lines (124 loc) · 5.33 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// All side effects are contained in Observable monads (at least I hope so)
const sideEffects = () => true;
const pendingMark = {};
const isPending = promise => Promise.race([promise, Promise.resolve(pendingMark)]).then(v => v === pendingMark, () => false);
const keepPending = async promises => {
const pending = await Promise.all(promises.map(isPending));
return promises.filter((p, i) => pending[i]);
};
const observerBindings = observer => [observer.next.bind(observer), observer.error.bind(observer)];
const debounce = Observable => ms => observable => new Observable(observer => {
const debouncer = {};
return observable.subscribe({
next: value => sideEffects(
clearTimeout(debouncer.timeout),
// eslint-disable-next-line fp/no-mutation
debouncer.timeout = setTimeout(() => observer.next(value), ms)
),
error: err => setTimeout(() => observer.error(err), ms),
complete: () => setTimeout(() => observer.complete(), ms)
});
});
const filter = Observable => predicate => observable => new Observable(observer => observable.subscribe({
next: value => predicate(value) ? observer.next(value) : value,
error: observer.error.bind(observer),
complete: observer.complete.bind(observer),
}));
const first = Observable => observable => new Observable(observer => observable.subscribe({
next: value => (observer.next(value) || true) && observer.complete(),
error: observer.error.bind(observer),
complete: observer.complete.bind(observer)
}));
const forEach = () => next => observable => new Promise((resolve, reject) => observable.subscribe({
next,
error: reject,
complete: resolve
}));
const last = Observable => observable => new Observable(observer => {
const last = {};
return observable.subscribe({
// eslint-disable-next-line fp/no-mutation
next: value => (last.value = value),
error: observer.error.bind(observer),
complete: () => (last.hasOwnProperty('value') && observer.next(last.value) || true) && observer.complete()
});
});
const map = Observable => mapper => observable => new Observable(observer => observable.subscribe({
next: value => observer.next(mapper(value)),
error: observer.error.bind(observer),
complete: observer.complete.bind(observer),
}));
const merge = Observable => observables => new Observable(observer => {
const [next, error] = observerBindings(observer);
const subscriptions = observables.map(observable => observable.subscribe({next, error}));
const completeSubscriptions = observables.map(observable => observable.subscribe({
complete: () => subscriptions.filter(s => !s.closed).length < 1 ? observer.complete() : observer
}));
return () => [...subscriptions, ...completeSubscriptions].forEach(s => s.unsubscribe());
});
const reduce = Observable => (reducer, initial) => observable => new Observable(observer => {
const accumulator = {value: initial};
return sideEffects(observer.next(accumulator.value)) && observable.subscribe({
next: value => {
// eslint-disable-next-line fp/no-mutation
accumulator.value = reducer(accumulator.value, value);
return observer.next(accumulator.value);
},
error: observer.error.bind(observer),
complete: observer.complete.bind(observer)
});
});
const repeat = Observable => ms => observable => new Observable(observer => {
const repeater = {};
const stop = () => sideEffects(clearInterval(repeater.interval));
return observable.subscribe({
next: value => stop() && sideEffects(
// eslint-disable-next-line fp/no-mutation
repeater.interval = setInterval(() => observer.next(value), ms),
) && observer.next(value),
error: err => stop() && observer.error(err),
complete: () => stop() && observer.complete()
});
});
const thenAccumulatorFactory = order => order ? observer => {
const context = {last: Promise.resolve()};
return {
// eslint-disable-next-line fp/no-mutation
add: value => context.last = context.last
.then(() => value)
.then(...observerBindings(observer)),
last: () => context.last
};
} : observer => {
const context = {promises: []};
return {
add: async value => {
const promise = value.then(...observerBindings(observer));
const pending = keepPending(context.promises);
// eslint-disable-next-line fp/no-mutation
context.promises = [...context.promises, pending, promise];
// eslint-disable-next-line fp/no-mutation
return context.promises = [...await pending, promise];
},
last: () => Promise.all(context.promises)
};
};
const then = Observable => order => {
const newAccumulator = thenAccumulatorFactory(order);
return observable => new Observable(observer => {
const accumulator = newAccumulator(observer);
return observable.subscribe({
next: value => typeof value.then === 'function'
? accumulator.add(value)
: observer.next(value),
error: observer.error.bind(observer),
complete: () => accumulator.last().then(observer.complete.bind(observer))
});
});
};
const exported = {debounce, filter, first, forEach, last, map, merge, reduce, repeat, then};
const exportTest = test => exported => test ? {...exported, keepPending} : exported;
module.exports = (Observable, test = false) => exportTest(test)(Object.entries(exported)
.map(([name, func]) => [name, func(Observable)])
.reduce((object, [name, func]) => ({...object, [name]: func}), {})
);