Skip to content

Commit de2a5a9

Browse files
committed
cleanup pubsubclient / introduce ps_to_sq:
* compatibility with non-multipart, chunked, newline terminated source messages * refactor async callback usage without group * rewrite/move parse_url into simplehttp * add simplehttp_encode_uri which correctly handles spaces * removed pubsub_to_pubsub (no longer used) * removed pubsub2simplequeue.sh (old, deprecated) * renamed sub.py to something meaningful * update ps_to_file for compatibility with updated pubsubclient * updated READMEs
1 parent ccc3517 commit de2a5a9

21 files changed

+472
-434
lines changed

README.md

+10-8
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ simplehttp is a library built upon libevent that makes high performance http bas
55

66
The following daemons are built on simplehttp and included
77

8-
* pubsub - a daemon that receives data via http POST events and writes that data to all currently connected long-lived http connections
9-
* pubsub_to_pubsub - a library for piping data from one pubsub stream to another pubsub server
10-
* simplequeue - an in memory queue with HTTP /get and /post endpoints to push/pop data
11-
* simpletokyo - a HTTP /get /post /del /fwmatch /incr interface in front of ttserver
12-
* sortdb - Sorted database server
13-
* simplegeo
14-
* simplememdb - an in-memory version of simpletokyo
15-
* qrencode
8+
* `pubsub` - a daemon that receives data via HTTP POST events and writes to all subscribed long-lived HTTP connections
9+
* `pubsubclient` - a library for writing clients that read from a pubsub
10+
* `ps_to_sq` - a daemon built on top of pubsubclient to write messages from a source pubsub to destination simplequeue(s)
11+
* `ps_to_file` - a daemon built on top of pubsubclient to write messages from a source pubsub to time rolled output files
12+
* `simplequeue` - an in memory queue with HTTP /put and /get endpoints to push and pop data
13+
* `simpletokyo` - a HTTP CRUD interface to front tokyo cabinet's ttserver
14+
* `sortdb` - sorted database server
15+
* `simplegeo`
16+
* `simplememdb` - an in-memory version of simpletokyo
17+
* `qrencode`
1618

1719
simplehttp Install Instructions
1820
===============================

ps_to_file/Makefile

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ TARGET ?= /usr/local
33
LIBSIMPLEHTTP ?= ../simplehttp
44
LIBSIMPLEHTTP_INC ?= $(LIBSIMPLEHTTP)/..
55
LIBSIMPLEHTTP_LIB ?= $(LIBSIMPLEHTTP)
6+
LIBPUBSUBCLIENT ?= ../pubsubclient
67

78
CFLAGS = -I. -I$(LIBSIMPLEHTTP_INC) -I$(LIBEVENT)/include -g
8-
LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBEVENT)/lib -levent -lpubsubclient -lsimplehttp -lm
9+
LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBPUBSUBCLIENT) -L$(LIBEVENT)/lib -levent -lpubsubclient -lsimplehttp -lm
910

1011
all: ps_to_file
1112

1213
ps_to_file: ps_to_file.c
13-
$(CC) $(CFLAGS) -o $@ $< $(LIBS)
14+
$(CC) $(CFLAGS) -o $@ $^ $(LIBS)
1415

1516
install:
16-
/usr/bin/install -d $(TARGET)/bin/
17-
/usr/bin/install ps_to_file $(TARGET)/bin/
17+
/usr/bin/install -D ps_to_file $(TARGET)/bin/ps_to_file
1818

1919
clean:
2020
rm -f *.o *.a ps_to_file

ps_to_file/README.md

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
ps_to_file
2+
======
3+
4+
helper application to subscribe to a pubsub and write incoming messages
5+
to time rolled files.
6+
7+
source pubsub should output non-multipart, chunked data where each
8+
message is newline terminated.
9+
10+
Commandline Options:
11+
12+
--pubsub-url=<str> source pubsub url in the form of
13+
http://domain.com:port/path
14+
--filename-format=<str> output filename format (strftime compatible)
15+
/var/log/pubsub.%%Y-%%m-%%d_%%H.log
16+
--version

ps_to_file/ps_to_file.c

+57-29
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44
#include <string.h>
55
#include <unistd.h>
66
#include <time.h>
7-
#include <simplehttp/pubsubclient.h>
87
#include <simplehttp/simplehttp.h>
8+
#include <pubsubclient/pubsubclient.h>
99

10-
#define DEBUG 0
11-
#define VERSION "1.1"
10+
#ifdef DEBUG
11+
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
12+
#else
13+
#define _DEBUG(...) do {;} while (0)
14+
#endif
15+
16+
#define VERSION "1.2"
1217

1318
struct output_metadata {
1419
char *filename_format;
@@ -17,62 +22,85 @@ struct output_metadata {
1722
FILE *output_file;
1823
};
1924

20-
void
21-
process_message_cb(char *source, void *cbarg){
22-
if(DEBUG) fprintf(stdout, "processing message\n");
23-
if (source == NULL || strlen(source) < 3){return;}
24-
25-
struct output_metadata *data = (struct output_metadata *)cbarg;
25+
void process_message_cb(char *message, void *cbarg)
26+
{
27+
struct output_metadata *data;
28+
time_t timer;
29+
struct tm *time_struct;
30+
31+
_DEBUG("process_message_cb()\n");
32+
33+
if (message == NULL || strlen(message) < 3) {
34+
return;
35+
}
36+
37+
data = (struct output_metadata *)cbarg;
2638

27-
time_t timer = time(NULL);
28-
struct tm *time_struct = gmtime(&timer);
29-
if (DEBUG) fprintf(stdout, "strftime format %s\n", data->filename_format);
39+
timer = time(NULL);
40+
time_struct = gmtime(&timer);
41+
_DEBUG("strftime format %s\n", data->filename_format);
3042
strftime(data->temp_filename, 255, data->filename_format, time_struct);
31-
if (DEBUG) fprintf(stdout, "after strftime %s\n", data->temp_filename);
32-
if (strcmp(data->temp_filename, data->current_filename) != 0){
33-
if (DEBUG) fprintf(stdout, "rolling file\n");
43+
_DEBUG("after strftime %s\n", data->temp_filename);
44+
if (strcmp(data->temp_filename, data->current_filename) != 0) {
45+
_DEBUG("rolling file\n");
3446
// roll file or open file
35-
if (data->output_file){
36-
if(DEBUG) fprintf(stdout, "closing file %s\n", data->current_filename);
47+
if (data->output_file) {
48+
_DEBUG("closing file %s\n", data->current_filename);
3749
fclose(data->output_file);
3850
}
39-
if (DEBUG) fprintf(stdout, "opening file %s\n", data->temp_filename);
51+
_DEBUG("opening file %s\n", data->temp_filename);
4052
strcpy(data->current_filename, data->temp_filename);
4153
data->output_file = fopen(data->current_filename, "ab");
4254
}
4355

44-
fprintf(data->output_file,"%s\n",source);
56+
fprintf(data->output_file, "%s\n", message);
4557
}
4658

47-
int version_cb(int value) {
59+
int version_cb(int value)
60+
{
4861
fprintf(stdout, "Version: %s\n", VERSION);
4962
return 0;
5063
}
5164

52-
int
53-
main(int argc, char **argv)
65+
int main(int argc, char **argv)
5466
{
55-
char *source_address = "127.0.0.1";
56-
int source_port = 80;
67+
char *pubsub_url;
68+
char *address;
69+
int port;
70+
char *path;
5771
char *filename_format = NULL;
72+
struct output_metadata *data;
5873

5974
define_simplehttp_options();
6075
option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
61-
option_define_str("source_host", OPT_OPTIONAL, "127.0.0.1", &source_address, NULL, NULL);
62-
option_define_int("source_port", OPT_OPTIONAL, 80, &source_port, NULL, NULL);
76+
option_define_str("pubsub_url", OPT_REQUIRED, "http://127.0.0.1:80/sub?multipart=0", &pubsub_url, NULL, "url of pubsub to read from");
6377
option_define_str("filename_format", OPT_REQUIRED, NULL, &filename_format, NULL, "/var/log/pubsub.%%Y-%%m-%%d_%%H.log");
6478

6579
if (!option_parse_command_line(argc, argv)){
6680
return 1;
6781
}
6882

69-
struct output_metadata *data;
70-
data = calloc(1,sizeof(*data));
83+
data = calloc(1, sizeof(struct output_metadata));
7184
data->filename_format = filename_format;
7285
data->current_filename[0] = '\0';
7386
data->temp_filename[0] = '\0';
7487
data->output_file = NULL;
7588

76-
return pubsub_to_pubsub_main(source_address, source_port, process_message_cb, data);
89+
if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
90+
pubsub_to_pubsub_main(address, port, path, process_message_cb, NULL);
91+
92+
if (data->output_file) {
93+
fclose(data->output_file);
94+
}
95+
96+
free(address);
97+
free(path);
98+
} else {
99+
fprintf(stderr, "ERROR: failed to parse pubsub_url\n");
100+
}
101+
102+
free(data);
103+
free_options();
77104

105+
return 0;
78106
}

ps_to_sq/Makefile

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
LIBEVENT ?= /usr/local
2+
TARGET ?= /usr/local
3+
LIBSIMPLEHTTP ?= ../simplehttp
4+
LIBSIMPLEHTTP_INC ?= $(LIBSIMPLEHTTP)/..
5+
LIBSIMPLEHTTP_LIB ?= $(LIBSIMPLEHTTP)
6+
LIBPUBSUBCLIENT ?= ../pubsubclient
7+
8+
CFLAGS = -I. -I$(LIBSIMPLEHTTP_INC) -I$(LIBEVENT)/include -g -Wall -O2
9+
LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBPUBSUBCLIENT) -L$(LIBEVENT)/lib -levent -lpubsubclient -lsimplehttp -lm
10+
11+
all: ps_to_sq
12+
13+
ps_to_sq: ps_to_sq.c
14+
$(CC) $(CFLAGS) -o $@ $^ $(LIBS)
15+
16+
install:
17+
/usr/bin/install -D ps_to_sq $(TARGET)/bin/ps_to_sq
18+
19+
clean:
20+
rm -rf *.o ps_to_sq *.dSYM

ps_to_sq/README.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
ps_to_sq
2+
======
3+
4+
helper application to subscribe to a pubsub and write incoming messages
5+
to simplequeue(s).
6+
7+
supports multiple destination simplequeues via round robin.
8+
9+
source pubsub should output non-multipart, chunked data where each
10+
message is newline terminated.
11+
12+
Commandline Options:
13+
14+
--pubsub-url=<str> source pubsub url in the form of
15+
http://domain.com:port/path
16+
--simplequeue-url=<str> destination simplequeue url in the form of
17+
http://domain.com:port/ (multiple)
18+
--version

ps_to_sq/ps_to_sq.c

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <stdbool.h>
4+
#include <string.h>
5+
#include <time.h>
6+
#include <simplehttp/simplehttp.h>
7+
#include <pubsubclient/pubsubclient.h>
8+
#include <simplehttp/utlist.h>
9+
10+
#ifdef DEBUG
11+
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
12+
#else
13+
#define _DEBUG(...) do {;} while (0)
14+
#endif
15+
16+
#define VERSION "0.2"
17+
18+
struct simplequeue_destination {
19+
char *address;
20+
int port;
21+
char *path;
22+
struct simplequeue_destination *next;
23+
};
24+
25+
struct simplequeue_destination *sqs = NULL;
26+
struct simplequeue_destination *cur_sq_dest = NULL;
27+
28+
struct simplequeue_destination *new_simplequeue_destination(char *url)
29+
{
30+
struct simplequeue_destination *sq_dest;
31+
char *address;
32+
int port;
33+
char *path;
34+
35+
sq_dest = malloc(sizeof(struct simplequeue_destination));
36+
simplehttp_parse_url(url, strlen(url), &address, &port, &path);
37+
_DEBUG("url: %s\n", url);
38+
_DEBUG("address: %s\n", address);
39+
_DEBUG("port: %d\n", port);
40+
_DEBUG("path: %s\n", path);
41+
free(path);
42+
sq_dest->address = address;
43+
sq_dest->port = port;
44+
sq_dest->path = strdup("/put?data=");
45+
sq_dest->next = NULL;
46+
47+
return sq_dest;
48+
}
49+
50+
void free_simplequeue_destination(struct simplequeue_destination *sq_dest)
51+
{
52+
if (sq_dest) {
53+
free(sq_dest->address);
54+
free(sq_dest->path);
55+
free(sq_dest);
56+
}
57+
}
58+
59+
void finish_simplequeue_put_cb(struct evhttp_request *req, void *cb_arg)
60+
{
61+
_DEBUG("finish_simplequeue_put_cb()\n");
62+
}
63+
64+
void process_message_cb(char *message, void *cb_arg)
65+
{
66+
char *path;
67+
char *encoded_message;
68+
69+
_DEBUG("process_message_cb()\n");
70+
71+
if (message == NULL || strlen(message) < 3) {
72+
return;
73+
}
74+
75+
if (cur_sq_dest && cur_sq_dest->next) {
76+
cur_sq_dest = cur_sq_dest->next;
77+
} else {
78+
cur_sq_dest = sqs;
79+
}
80+
81+
encoded_message = simplehttp_encode_uri(message);
82+
path = malloc(10 + strlen(encoded_message) + 1); // /put?data= + encoded_message + NULL
83+
strcpy(path, "/put?data=");
84+
strcpy(path + 10, encoded_message);
85+
new_async_request(cur_sq_dest->address, cur_sq_dest->port, path, finish_simplequeue_put_cb, NULL);
86+
free(encoded_message);
87+
free(path);
88+
}
89+
90+
int version_cb(int value)
91+
{
92+
fprintf(stdout, "Version: %s\n", VERSION);
93+
return 0;
94+
}
95+
96+
int simplequeue_url_cb(char *value)
97+
{
98+
struct simplequeue_destination *sq_dest;
99+
100+
sq_dest = new_simplequeue_destination(value);
101+
LL_APPEND(sqs, sq_dest);
102+
103+
return 1;
104+
}
105+
106+
void free_simplequeue_destinations()
107+
{
108+
struct simplequeue_destination *sq_dest, *tmp;
109+
110+
LL_FOREACH_SAFE(sqs, sq_dest, tmp) {
111+
LL_DELETE(sqs, sq_dest);
112+
free_simplequeue_destination(sq_dest);
113+
}
114+
}
115+
116+
int main(int argc, char **argv)
117+
{
118+
char *pubsub_url;
119+
char *address;
120+
int port;
121+
char *path;
122+
123+
option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
124+
option_define_str("pubsub_url", OPT_REQUIRED, "http://127.0.0.1:80/sub?multipart=0", &pubsub_url, NULL, "url of pubsub to read from");
125+
option_define_str("simplequeue_url", OPT_REQUIRED, NULL, NULL, simplequeue_url_cb, "(multiple) url(s) of simplequeue(s) to write to");
126+
127+
if (!option_parse_command_line(argc, argv)) {
128+
return 1;
129+
}
130+
131+
init_async_connection_pool(1);
132+
133+
if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
134+
pubsub_to_pubsub_main(address, port, path, process_message_cb, NULL);
135+
136+
free(address);
137+
free(path);
138+
} else {
139+
fprintf(stderr, "ERROR: failed to parse pubsub_url\n");
140+
}
141+
142+
free_simplequeue_destinations();
143+
free_async_connection_pool();
144+
free_options();
145+
146+
return 0;
147+
}

pubsub/README.md

-1
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,3 @@ and turn off buffering. Note: this has been tested with Nginx 0.7 series.
5959
proxy_next_upstream off;
6060
charset utf-8;
6161
}
62-

0 commit comments

Comments
 (0)