-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathRequester.php
227 lines (192 loc) · 6.41 KB
/
Requester.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
<?php
namespace Crackle {
use \Crackle\Requests\GETRequest;
use \Crackle\Utilities\Curl;
use \Crackle\Exceptions\RequestException;
use \SplQueue;
/**
* Manages parallel execution of HTTP requests.
* @author George Brighton
*/
class Requester {
/**
* The maximum number of requests allowed to be executing simultaneously.
* Defaults to 20.
* @var int
*/
private $parallelLimit = 20;
/**
* The list of requests still to execute. These will be executed in the order they were added (FIFO).
* @var \SplQueue
*/
private $queue;
/**
* A lookup array of resource ID => request object. Used to retrieve the request object when a handle is finished.
* @var array[\Crackle\Requests\GETRequest]
*/
private $executing;
/**
* The cURL multi handle used for executing requests simultaneously.
* @var resource
*/
private $multiHandle;
/**
* Get the maximum number of requests allowed to be executing simultaneously.
* @return int The maximum number of requests allowed to be executing simultaneously.
*/
private final function getParallelLimit() {
return $this->parallelLimit;
}
/**
* Set the maximum number of requests allowed to be executing simultaneously.
* @param int $parallelLimit The maximum number of requests allowed to be executing simultaneously.
*/
private final function setParallelLimit($parallelLimit) {
$this->parallelLimit = (int)$parallelLimit;
}
/**
* Get the queue of requests still to execute.
* @return \SplQueue The queue of requests still to execute.
*/
private final function getQueue() {
return $this->queue;
}
/**
* Set the queue of requests still to execute.
* @param \SplQueue $queue The queue of requests still to execute.
*/
private final function setQueue(SplQueue $queue) {
$this->queue = $queue;
}
/**
* Retrieve a request object from the executing array.
* @param resource $handle The handle of the request to retrieve.
* @return \Crackle\Requests\GETRequest The corresponding request.
*/
private final function getExecutingRequest($handle) {
return $this->executing[(int)$handle];
}
/**
* Set the lookup array of currently executing requests.
* @param array[\Crackle\Requests\GETRequest] $executing The lookup array of currently executing requests.
*/
private final function setExecuting(array $executing) {
$this->executing = $executing;
}
/**
* Get the cURL multi handle used to execute requests queued to this object.
* @return resource The cURL multi handle.
*/
private final function getMultiHandle() {
return $this->multiHandle;
}
/**
* Set the cURL multi handle used to execute requests queued to this object.
* @param resource $multiHandle The cURL multi handle.
*/
private final function setMultiHandle($multiHandle) {
$this->multiHandle = $multiHandle;
}
/**
* Initialise a new requester instance.
* @param int $parallelLimit The maximum number of requests to allow to run simultaneously. Default: 40.
*/
public function __construct($parallelLimit = null) {
if ($parallelLimit !== null) {
$this->setParallelLimit($parallelLimit);
}
$this->setQueue(new SplQueue());
$this->setExecuting(array());
$this->setMultiHandle(curl_multi_init());
}
/**
* Disposes of this requester instance.
*/
public function __destruct() {
curl_multi_close($this->getMultiHandle());
}
/**
* Schedule a request for execution.
* @param \Crackle\Requests\GETRequest $request The request to queue.
*/
public function queue(GETRequest $request) {
$this->getQueue()->enqueue($request);
}
/**
* Schedule multiple requests for execution.
* @param array[\Crackle\Requests\GETRequest] $requests The requests to queue.
*/
public final function queueAll(array $requests) {
foreach ($requests as $request) {
$this->queue($request);
}
}
/**
* Execute the current queue simultaneously.
* @throws \Crackle\Exceptions\RequestException If the multi handle returns an error at any stage.
*/
public function fireAll() {
// fill the handle with initial requests to run, up to the parallel limit
$this->add(min($this->getQueue()->count(), $this->getParallelLimit()));
// optimisation: avoids many calls to $this->getMultiHandle()
$multiHandle = $this->getMultiHandle();
// keep going while not false
$running = null;
do {
// run requests in the stack
while (($status = curl_multi_exec($multiHandle, $running)) == CURLM_CALL_MULTI_PERFORM)
;
// if the multi handle is in an error state, stop
if ($status !== CURLM_OK) {
throw new RequestException(Curl::getStringError($status));
}
// deal with finished requests
while ($done = curl_multi_info_read($multiHandle)) {
$this->finished($done);
// do (at least) one more iteration to make sure any requests added by the finished request's callback are executed
$running = true;
}
if ($running) {
// block until data is received on any of the connections
curl_multi_select($multiHandle);
}
} while ($running);
}
/**
* Moves requests from the queue to the multi handle.
* @param int $number Optional: the number of requests to remove. Defaults to 1.
*/
private function add($number = 1) {
for ($i = 0; $i < $number; $i++) {
$request = $this->getQueue()->dequeue();
$request->finalise();
$this->executing[(int)$request->getHandle()] = $request;
curl_multi_add_handle($this->getMultiHandle(), $request->getHandle());
}
}
/**
* Cleans up after a finished request.
* @param \Crackle\Requests\GETRequest $request The request to dispose of.
*/
private function remove(GETRequest $request) {
unset($this->executing[(int)$request->getHandle()]);
curl_multi_remove_handle($this->getMultiHandle(), $request->getHandle());
}
/**
* Deals with a finished request, and adds a new one if the queue isn't empty.
* @param array[string] $message The array returned by curl_multi_info_read().
*/
private function finished(array $message) {
// retrieve the original request object
$request = $this->getExecutingRequest($message['handle']);
// let it update itself
$request->recover($message['result']);
if (!$this->getQueue()->isEmpty()) {
// this request has finished; make the multi-handle aware of the next one
$this->add();
}
// dispose of the finished request
$this->remove($request);
}
}
}