Skip to content

Commit d799945

Browse files
committed
introducing libhostpool - a library of methods to make it easy to deal with endpoint selection, failure, backoff, and recovery
1 parent e503be5 commit d799945

File tree

4 files changed

+316
-1
lines changed

4 files changed

+316
-1
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ simplehttp
55
simple and straightforward to write.
66

77
The following libraries and daemons are included:
8-
8+
9+
* `host_pool` - a library for dealing with endpoint selection, pooling, failure, recovery, and backoff
910
* `ps_to_http` - a daemon built on top of pubsubclient to write messages from a source pubsub to destination simplequeue or pubsub server
1011
* `ps_to_file` - a daemon built on top of pubsubclient to write messages from a source pubsub to time rolled output files
1112
* `pubsub` - a daemon that receives data via HTTP POST events and writes to all subscribed long-lived HTTP connections

host_pool/Makefile

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
TARGET ?= /usr/local
2+
LIBEVENT ?= /usr/local
3+
LIBSIMPLEHTTP ?= /usr/local
4+
5+
CFLAGS = -I. -I$(LIBSIMPLEHTTP)/include -I.. -I../simplehttp -I$(LIBEVENT)/include -g -Wall -O2
6+
AR = ar
7+
AR_FLAGS = rc
8+
RANLIB = ranlib
9+
10+
libhost_pool.a: host_pool.o host_pool.h
11+
/bin/rm -f $@
12+
$(AR) $(AR_FLAGS) $@ $^
13+
$(RANLIB) $@
14+
15+
all: libhost_pool.a
16+
17+
install:
18+
/usr/bin/install -d $(TARGET)/lib/
19+
/usr/bin/install -d $(TARGET)/bin/
20+
/usr/bin/install -d $(TARGET)/include/host_pool
21+
/usr/bin/install libhost_pool.a $(TARGET)/lib/
22+
/usr/bin/install host_pool.h $(TARGET)/include/host_pool
23+
24+
clean:
25+
/bin/rm -f *.a *.o

host_pool/host_pool.c

+238
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <assert.h>
4+
#include <time.h>
5+
#include <uthash.h>
6+
#include <json/json.h>
7+
#include <simplehttp/simplehttp.h>
8+
#include "host_pool.h"
9+
10+
#ifdef DEBUG
11+
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
12+
#else
13+
#define _DEBUG(...) do {;} while (0)
14+
#endif
15+
16+
/*
17+
* retry_failed_hosts - the number of times to retry a failed host. set to -1 for indefinite retries
18+
* retry_interval - seconds between retries. set to -1 for exponential backoff (ie: 1, 2, 4, 8, ...)
19+
* max_retry_interval - the maximum seconds to wait between retries
20+
* reset_on_all_failed - reset all hosts to alive when all are marked as failed.
21+
*/
22+
struct HostPool *new_host_pool(int retry_failed_hosts, int retry_interval,
23+
int max_retry_interval, int reset_on_all_failed)
24+
{
25+
struct HostPool *host_pool;
26+
27+
host_pool = malloc(sizeof(struct HostPool));
28+
host_pool->count = 0;
29+
host_pool->retry_failed_hosts = retry_failed_hosts;
30+
host_pool->retry_interval = retry_interval;
31+
host_pool->max_retry_interval = max_retry_interval;
32+
host_pool->reset_on_all_failed = reset_on_all_failed;
33+
host_pool->endpoints = NULL;
34+
host_pool->current_endpoint = NULL;
35+
host_pool->checkpoint = -1;
36+
37+
return host_pool;
38+
}
39+
40+
void free_host_pool(struct HostPool *host_pool)
41+
{
42+
struct HostPoolEndpoint *endpoint, *tmp;
43+
44+
if (host_pool) {
45+
HASH_ITER(hh, host_pool->endpoints, endpoint, tmp) {
46+
HASH_DELETE(hh, host_pool->endpoints, endpoint);
47+
free_host_pool_endpoint(endpoint);
48+
}
49+
50+
free(host_pool);
51+
}
52+
}
53+
54+
struct HostPoolEndpoint *new_host_pool_endpoint(struct HostPool *host_pool,
55+
char *address, int port, char *path)
56+
{
57+
struct HostPoolEndpoint *host_pool_endpoint;
58+
59+
host_pool_endpoint = malloc(sizeof(struct HostPoolEndpoint));
60+
host_pool_endpoint->address = strdup(address);
61+
host_pool_endpoint->port = port;
62+
host_pool_endpoint->path = strdup(path);
63+
host_pool_endpoint->id = host_pool->count++;
64+
host_pool_endpoint->alive = 1;
65+
host_pool_endpoint->retry_count = 0;
66+
host_pool_endpoint->retry_delay = 0;
67+
host_pool_endpoint->next_retry = 0;
68+
69+
HASH_ADD_INT(host_pool->endpoints, id, host_pool_endpoint);
70+
71+
return host_pool_endpoint;
72+
}
73+
74+
void free_host_pool_endpoint(struct HostPoolEndpoint *host_pool_endpoint)
75+
{
76+
if (host_pool_endpoint) {
77+
free(host_pool_endpoint->address);
78+
free(host_pool_endpoint->path);
79+
free(host_pool_endpoint);
80+
}
81+
}
82+
83+
void host_pool_from_json(struct HostPool *host_pool, json_object *host_pool_endpoint_list)
84+
{
85+
char *endpoint_url;
86+
char *address;
87+
char *path;
88+
int port;
89+
int i;
90+
91+
for (i = 0; i < json_object_array_length(host_pool_endpoint_list); i++) {
92+
endpoint_url = (char *)json_object_get_string(json_object_array_get_idx(host_pool_endpoint_list, i));
93+
if (!simplehttp_parse_url(endpoint_url, strlen(endpoint_url), &address, &port, &path)) {
94+
fprintf(stderr, "ERROR: failed to parse host pool endpoint (%s)\n", endpoint_url);
95+
exit(1);
96+
}
97+
new_host_pool_endpoint(host_pool, address, port, path);
98+
free(address);
99+
free(path);
100+
}
101+
}
102+
103+
struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
104+
enum HostPoolEndpointSelectionMode mode, int64_t state)
105+
{
106+
struct HostPoolEndpoint *endpoint;
107+
int c;
108+
time_t now;
109+
110+
c = host_pool->count;
111+
while (c--) {
112+
endpoint = host_pool_next_endpoint(host_pool, mode, state);
113+
114+
_DEBUG("HOST_POOL: trying #%d (%s:%d%s)\n", endpoint->id, endpoint->address, endpoint->port, endpoint->path);
115+
116+
if (endpoint->alive) {
117+
return endpoint;
118+
}
119+
120+
if ((host_pool->retry_failed_hosts == -1) ||
121+
(endpoint->retry_count <= host_pool->retry_failed_hosts)) {
122+
time(&now);
123+
if (endpoint->next_retry < now) {
124+
endpoint->retry_count++;
125+
if (host_pool->retry_interval == -1) {
126+
endpoint->retry_delay = endpoint->retry_delay * 2;
127+
if (endpoint->retry_delay > host_pool->max_retry_interval) {
128+
endpoint->retry_delay = host_pool->max_retry_interval;
129+
}
130+
} else {
131+
endpoint->retry_delay = host_pool->retry_interval;
132+
}
133+
endpoint->next_retry = now + endpoint->retry_delay;
134+
return endpoint;
135+
}
136+
}
137+
138+
// if any of the modes fail default to round robin in order
139+
// to find a suitable endpoint.
140+
//
141+
// this ensures we always try each endpoint in the host pool
142+
//
143+
// however, if we were asked to find a random endpoint, randomize once
144+
// more so that the endpoint following the failed endpoint won't get a
145+
// disproportionate number of additional requests
146+
if (mode == HOST_POOL_RANDOM) {
147+
host_pool_next_endpoint(host_pool, mode, state);
148+
}
149+
mode = HOST_POOL_ROUND_ROBIN;
150+
}
151+
152+
if (host_pool->reset_on_all_failed) {
153+
host_pool_reset(host_pool);
154+
return host_pool_next_endpoint(host_pool, mode, 0);
155+
}
156+
157+
return NULL;
158+
}
159+
160+
struct HostPoolEndpoint *host_pool_next_endpoint(struct HostPool *host_pool,
161+
enum HostPoolEndpointSelectionMode mode, int64_t state)
162+
{
163+
int index;
164+
165+
switch (mode) {
166+
default:
167+
case HOST_POOL_RANDOM:
168+
// choose HOST_POOL_RANDOMly
169+
index = rand() % host_pool->count;
170+
HASH_FIND_INT(host_pool->endpoints, &index, host_pool->current_endpoint);
171+
break;
172+
case HOST_POOL_ROUND_ROBIN:
173+
// round-robin through the endpoints for each request
174+
host_pool->current_endpoint = host_pool->current_endpoint ?
175+
(host_pool->current_endpoint->hh.next ? host_pool->current_endpoint->hh.next :
176+
host_pool->endpoints) : host_pool->endpoints;
177+
break;
178+
case HOST_POOL_SINGLE:
179+
// choose the same endpoint for all requests for this message
180+
if (state != host_pool->checkpoint) {
181+
host_pool->checkpoint = state;
182+
host_pool->current_endpoint = host_pool->current_endpoint ?
183+
(host_pool->current_endpoint->hh.next ? host_pool->current_endpoint->hh.next :
184+
host_pool->endpoints) : host_pool->endpoints;
185+
}
186+
break;
187+
}
188+
189+
assert(host_pool->current_endpoint != NULL);
190+
191+
return host_pool->current_endpoint;
192+
}
193+
194+
void host_pool_mark_success(struct HostPool *host_pool, int id)
195+
{
196+
struct HostPoolEndpoint *endpoint;
197+
198+
HASH_FIND_INT(host_pool->endpoints, &id, endpoint);
199+
assert(endpoint != NULL);
200+
201+
_DEBUG("HOST_POOL: marking endpoint #%d (%s:%d%s) as SUCCESS\n",
202+
endpoint->id, endpoint->address, endpoint->port, endpoint->path);
203+
204+
endpoint->alive = 1;
205+
}
206+
207+
void host_pool_mark_failed(struct HostPool *host_pool, int id)
208+
{
209+
struct HostPoolEndpoint *endpoint;
210+
time_t now;
211+
212+
HASH_FIND_INT(host_pool->endpoints, &id, endpoint);
213+
assert(endpoint != NULL);
214+
215+
_DEBUG("HOST_POOL: marking endpoint #%d (%s:%d%s) as FAILED\n",
216+
endpoint->id, endpoint->address, endpoint->port, endpoint->path);
217+
218+
if (endpoint->alive) {
219+
endpoint->alive = 0;
220+
endpoint->retry_count = 0;
221+
if (host_pool->retry_interval == -1) {
222+
endpoint->retry_delay = 1;
223+
} else {
224+
endpoint->retry_delay = 0;
225+
}
226+
time(&now);
227+
endpoint->next_retry = now + endpoint->retry_delay;
228+
}
229+
}
230+
231+
void host_pool_reset(struct HostPool *host_pool)
232+
{
233+
struct HostPoolEndpoint *endpoint, *tmp;
234+
235+
HASH_ITER(hh, host_pool->endpoints, endpoint, tmp) {
236+
host_pool_mark_success(host_pool, endpoint->id);
237+
}
238+
}

host_pool/host_pool.h

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#ifndef __host_pool_h
2+
#define __host_pool_h
3+
4+
#include <uthash.h>
5+
#include <time.h>
6+
7+
struct HostPoolEndpoint {
8+
int id;
9+
int alive;
10+
int retry_count;
11+
int retry_delay;
12+
time_t next_retry;
13+
char *address;
14+
int port;
15+
char *path;
16+
UT_hash_handle hh;
17+
};
18+
19+
struct HostPool {
20+
int count;
21+
int retry_failed_hosts;
22+
int retry_interval;
23+
time_t max_retry_interval;
24+
int reset_on_all_failed;
25+
struct HostPoolEndpoint *endpoints;
26+
struct HostPoolEndpoint *current_endpoint;
27+
int64_t checkpoint;
28+
};
29+
30+
enum HostPoolEndpointSelectionMode {
31+
HOST_POOL_RANDOM,
32+
HOST_POOL_ROUND_ROBIN,
33+
HOST_POOL_SINGLE
34+
};
35+
36+
struct HostPool *new_host_pool(int retry_failed_hosts, int retry_interval,
37+
int max_retry_interval, int reset_on_all_failed);
38+
void free_host_pool(struct HostPool *host_pool);
39+
struct HostPoolEndpoint *new_host_pool_endpoint(struct HostPool *host_pool,
40+
char *address, int port, char *path);
41+
void free_host_pool_endpoint(struct HostPoolEndpoint *host_pool_endpoint);
42+
void host_pool_from_json(struct HostPool *host_pool, json_object *host_pool_endpoint_list);
43+
struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
44+
enum HostPoolEndpointSelectionMode mode, int64_t state);
45+
struct HostPoolEndpoint *host_pool_next_endpoint(struct HostPool *host_pool,
46+
enum HostPoolEndpointSelectionMode mode, int64_t state);
47+
void host_pool_mark_success(struct HostPool *host_pool, int id);
48+
void host_pool_mark_failed(struct HostPool *host_pool, int id);
49+
void host_pool_reset(struct HostPool *host_pool);
50+
51+
#endif

0 commit comments

Comments
 (0)