Skip to content

Commit

Permalink
server: cleanup state routines
Browse files Browse the repository at this point in the history
Signed-off-by: He Xian <[email protected]>
  • Loading branch information
hexian000 committed Jan 15, 2025
1 parent 3547687 commit 4972092
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 39 deletions.
82 changes: 53 additions & 29 deletions src/api_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@

/* never rollback */
enum api_client_state {
STATE_CLIENT_INIT,
STATE_CLIENT_CONNECT,
STATE_CLIENT_REQUEST,
STATE_CLIENT_RESPONSE,
STATE_CLIENT_RULESET,
};

struct api_client_ctx {
Expand All @@ -42,10 +44,10 @@ struct api_client_ctx {
struct api_client_cb cb;
struct ev_watcher w_start;
struct ev_timer w_timeout;
struct ev_io w_socket;
struct ev_idle w_ruleset;
struct dialreq *dialreq;
struct dialer dialer;
struct ev_io w_socket;
struct http_parser parser;
struct {
const char *errmsg;
Expand All @@ -55,34 +57,52 @@ struct api_client_ctx {
};
ASSERT_SUPER(struct session, struct api_client_ctx, ss);

static void api_client_close(
struct ev_loop *restrict loop, struct api_client_ctx *restrict ctx)
static void
api_client_stop(struct ev_loop *loop, struct api_client_ctx *restrict ctx)
{
ev_clear_pending(loop, &ctx->w_start);
ev_timer_stop(loop, &ctx->w_timeout);
ev_idle_stop(loop, &ctx->w_ruleset);
if (ctx->state == STATE_CLIENT_CONNECT) {

switch (ctx->state) {
case STATE_CLIENT_INIT:
ev_clear_pending(loop, &ctx->w_start);
break;
case STATE_CLIENT_CONNECT:
dialer_cancel(&ctx->dialer, loop);
}
if (ctx->dialreq != NULL) {
dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
}
if (ctx->state >= STATE_CLIENT_REQUEST) {
break;
case STATE_CLIENT_REQUEST:
ev_io_stop(loop, &ctx->w_socket);
break;
case STATE_CLIENT_RESPONSE:
ev_io_stop(loop, &ctx->w_socket);
break;
case STATE_CLIENT_RULESET:
ev_idle_stop(loop, &ctx->w_ruleset);
break;
default:
FAIL();
}
}

static void
api_client_close(struct ev_loop *loop, struct api_client_ctx *restrict ctx)
{
api_client_stop(loop, ctx);

dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
if (ctx->w_socket.fd != -1) {
CLOSE_FD(ctx->w_socket.fd);
ev_io_set(&ctx->w_socket, -1, EV_NONE);
}
if (ctx->state == STATE_CLIENT_RESPONSE) {
if (ctx->result.stream != NULL) {
stream_close(ctx->result.stream);
ctx->result.stream = NULL;
}
if (ctx->result.stream != NULL) {
stream_close(ctx->result.stream);
ctx->result.stream = NULL;
}
ctx->parser.cbuf = VBUF_FREE(ctx->parser.cbuf);
if (ctx->ss.close != NULL) {
/* managed by session */
session_del(&ctx->ss);
}
ctx->parser.cbuf = VBUF_FREE(ctx->parser.cbuf);
free(ctx);
}

Expand Down Expand Up @@ -118,10 +138,9 @@ static void api_client_finish(
ctx->result.errmsg = errmsg;
ctx->result.errlen = errlen;
ctx->result.stream = stream;
/* ignore further io events */
if (ctx->state >= STATE_CLIENT_REQUEST) {
ev_io_stop(loop, &ctx->w_socket);
}

api_client_stop(loop, ctx);
ctx->state = STATE_CLIENT_RULESET;
ev_idle_start(loop, &ctx->w_ruleset);
}

Expand Down Expand Up @@ -220,8 +239,8 @@ static void send_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)

ctx->state = STATE_CLIENT_RESPONSE;
p->fd = fd;
ev_io_set(watcher, fd, EV_READ);
ev_set_cb(watcher, recv_cb);
ev_io_set(watcher, fd, EV_READ);
ev_io_start(loop, watcher);
}

Expand All @@ -242,10 +261,11 @@ static void dialer_cb(struct ev_loop *loop, void *data)
ctx->dialreq = NULL;

ctx->state = STATE_CLIENT_REQUEST;
struct ev_io *restrict w_write = &ctx->w_socket;
ev_io_init(w_write, send_cb, fd, EV_WRITE);
w_write->data = ctx;
ev_io_start(loop, w_write);
struct ev_io *restrict w_send = &ctx->w_socket;
ev_set_cb(w_send, send_cb);
w_send->data = ctx;
ev_io_set(w_send, fd, EV_WRITE);
ev_io_start(loop, w_send);
}

static void
Expand Down Expand Up @@ -327,7 +347,8 @@ start_cb(struct ev_loop *loop, struct ev_watcher *watcher, int revents)
{
CHECK_REVENTS(revents, EV_CUSTOM);
struct api_client_ctx *restrict ctx = watcher->data;
ev_timer_start(loop, &ctx->w_timeout);
ASSERT(ctx->state == STATE_CLIENT_INIT);
ctx->state = STATE_CLIENT_CONNECT;
dialer_do(&ctx->dialer, loop, ctx->dialreq);
}

Expand All @@ -344,7 +365,7 @@ static bool api_client_do(
dialreq_free(req);
return false;
}
ctx->state = STATE_CLIENT_CONNECT;
ctx->state = STATE_CLIENT_INIT;
ctx->dialreq = req;
const struct http_parsehdr_cb on_header = { parse_header, ctx };
http_parser_init(&ctx->parser, -1, STATE_PARSE_RESPONSE, on_header);
Expand All @@ -360,6 +381,8 @@ static bool api_client_do(
ctx->w_timeout.data = ctx;
ev_idle_init(&ctx->w_ruleset, idle_cb);
ctx->w_ruleset.data = ctx;
ev_io_init(&ctx->w_socket, NULL, -1, EV_NONE);
ctx->w_socket.data = ctx;
ctx->result.errmsg = NULL;
ctx->result.errlen = 0;
ctx->result.stream = NULL;
Expand All @@ -377,6 +400,7 @@ static bool api_client_do(
session_add(&ctx->ss);
}

ev_timer_start(loop, &ctx->w_timeout);
ev_feed_event(loop, &ctx->w_start, EV_CUSTOM);
if (pctx != NULL) {
*pctx = ctx;
Expand Down
7 changes: 4 additions & 3 deletions src/forward.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ forward_ctx_stop(struct ev_loop *loop, struct forward_ctx *restrict ctx)
/* fallthrough */
case STATE_CONNECT:
dialer_cancel(&ctx->dialer, loop);
dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
stats->num_halfopen--;
return;
case STATE_CONNECTED:
Expand All @@ -118,6 +116,8 @@ forward_ctx_close(struct ev_loop *loop, struct forward_ctx *restrict ctx)
FW_CTX_LOG_F(VERBOSE, ctx, "close, state=%d", ctx->state);
forward_ctx_stop(loop, ctx);

dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
if (ctx->accepted_fd != -1) {
CLOSE_FD(ctx->accepted_fd);
ctx->accepted_fd = -1;
Expand Down Expand Up @@ -212,6 +212,7 @@ static void dialer_cb(struct ev_loop *loop, void *data)
FW_CTX_LOG_F(DEBUG, ctx, "connected, fd=%d", fd);
/* cleanup before state change */
dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;

if (G.conf->proto_timeout) {
ctx->state = STATE_CONNECTED;
Expand Down Expand Up @@ -336,11 +337,11 @@ forward_ctx_new(struct server *restrict s, const int accepted_fd)
ctx->ruleset_state = NULL;
#endif

ctx->dialreq = NULL;
const struct event_cb cb = {
.func = dialer_cb,
.data = ctx,
};
ctx->dialreq = NULL;
dialer_init(&ctx->dialer, &cb);
ctx->ss.close = forward_ss_close;
session_add(&ctx->ss);
Expand Down
10 changes: 5 additions & 5 deletions src/http_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ static void http_ctx_stop(struct ev_loop *loop, struct http_ctx *restrict ctx)
ev_io_stop(loop, &ctx->w_recv);
ev_io_stop(loop, &ctx->w_send);
dialer_cancel(&ctx->dialer, loop);
dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
stats->num_halfopen--;
return;
case STATE_CONNECTED:
Expand All @@ -146,6 +144,8 @@ static void http_ctx_close(struct ev_loop *loop, struct http_ctx *restrict ctx)
VERBOSE, ctx, "close state=%d", ctx->accepted_fd, ctx->state);
http_ctx_stop(loop, ctx);

dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
if (ctx->accepted_fd != -1) {
CLOSE_FD(ctx->accepted_fd);
ctx->accepted_fd = -1;
Expand Down Expand Up @@ -311,7 +311,6 @@ static void http_proxy_pass(struct ev_loop *loop, struct http_ctx *restrict ctx)
static void
http_proxy_handle(struct ev_loop *loop, struct http_ctx *restrict ctx)
{
ctx->dialreq = NULL;
const struct http_message *restrict msg = &ctx->parser.msg;
if (strcmp(msg->req.method, "CONNECT") != 0) {
http_proxy_pass(loop, ctx);
Expand Down Expand Up @@ -547,13 +546,14 @@ static struct http_ctx *http_ctx_new(struct server *restrict s, const int fd)
}
ctx->ruleset_state = NULL;
#endif
const struct http_parsehdr_cb on_header = { parse_header, ctx };
http_parser_init(&ctx->parser, fd, STATE_PARSE_REQUEST, on_header);
ctx->dialreq = NULL;
const struct event_cb cb = {
.func = dialer_cb,
.data = ctx,
};
dialer_init(&ctx->dialer, &cb);
const struct http_parsehdr_cb on_header = { parse_header, ctx };
http_parser_init(&ctx->parser, fd, STATE_PARSE_REQUEST, on_header);

ctx->ss.close = http_ss_close;
session_add(&ctx->ss);
Expand Down
5 changes: 3 additions & 2 deletions src/socks.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ socks_ctx_stop(struct ev_loop *restrict loop, struct socks_ctx *restrict ctx)
return;
case STATE_CONNECT:
dialer_cancel(&ctx->dialer, loop);
dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
stats->num_halfopen--;
return;
case STATE_CONNECTED:
Expand All @@ -166,6 +164,8 @@ socks_ctx_close(struct ev_loop *restrict loop, struct socks_ctx *restrict ctx)
SOCKS_CTX_LOG_F(VERBOSE, ctx, "close, state=%d", ctx->state);
socks_ctx_stop(loop, ctx);

dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;
if (ctx->accepted_fd != -1) {
CLOSE_FD(ctx->accepted_fd);
ctx->accepted_fd = -1;
Expand Down Expand Up @@ -401,6 +401,7 @@ static void dialer_cb(struct ev_loop *loop, void *data)
/* cleanup before state change */
ev_io_stop(loop, &ctx->w_socket);
dialreq_free(ctx->dialreq);
ctx->dialreq = NULL;

if (G.conf->proto_timeout) {
ctx->state = STATE_CONNECTED;
Expand Down

0 comments on commit 4972092

Please sign in to comment.