forked from akure/dao-dao-ui
-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.ts
108 lines (93 loc) · 2.71 KB
/
batch.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Generic batch client inspired by @cosmjs/tendermint-rpc's HttpBatchClient.
export type BatchClientOptions = {
dispatchInterval: number
batchSizeLimit: number
}
export type Request = {
url: string
}
export type Response = {
status: number
body: any
}
export type QueueItem = {
request: Request
resolve: (response: Response) => void
reject: (reason?: any) => void
}
const defaultHttpBatchClientOptions = {
dispatchInterval: 20,
batchSizeLimit: 20,
}
export class BatchClient {
protected readonly url: string
protected readonly options: BatchClientOptions
private timer: number | undefined
private readonly queue: QueueItem[]
constructor(endpoint: string, options: Partial<BatchClientOptions> = {}) {
this.queue = []
this.options = {
batchSizeLimit:
options.batchSizeLimit ?? defaultHttpBatchClientOptions.batchSizeLimit,
dispatchInterval:
options.dispatchInterval ??
defaultHttpBatchClientOptions.dispatchInterval,
}
this.url = endpoint
this.timer = setInterval(
() => this.tick(),
options.dispatchInterval
) as unknown as number
this.validate()
}
disconnect() {
this.timer && clearInterval(this.timer)
this.timer = undefined
}
async execute(request: Request): Promise<Response> {
return new Promise<Response>((resolve, reject) => {
this.queue.push({ request, resolve, reject })
if (this.queue.length >= this.options.batchSizeLimit) {
// this train is full, let's go
this.tick()
}
})
}
validate() {
if (
!this.options.batchSizeLimit ||
!Number.isSafeInteger(this.options.batchSizeLimit) ||
this.options.batchSizeLimit < 1
) {
throw new Error('batchSizeLimit must be a safe integer >= 1')
}
}
/**
* This is called in an interval where promise rejections cannot be handled.
* So this is not async and HTTP errors need to be handled by the queued promises.
*/
async tick() {
// Remove batch from queue.
const batch = this.queue.splice(0, this.options.batchSizeLimit)
if (!batch.length) {
return
}
const response = await fetch(this.url, {
method: 'POST',
body: JSON.stringify(batch.map(({ request }) => request.url)),
headers: {
'Content-Type': 'application/json',
},
})
if (!response.ok) {
batch.forEach(({ reject }) => reject(new Error(response.statusText)))
return
}
const responses = await response.json()
if (!Array.isArray(responses) || responses.length !== batch.length) {
batch.forEach(({ reject }) => reject(new Error('Invalid batch response')))
return
}
batch.forEach(({ resolve }, index) => resolve(responses[index]))
}
}