forked from sogou/workflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
WFServer.h
244 lines (196 loc) · 6.35 KB
/
WFServer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Xie Han ([email protected])
Wu Jiaxu ([email protected])
*/
#ifndef _WFSERVER_H_
#define _WFSERVER_H_
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <openssl/ssl.h>
#include "EndpointParams.h"
#include "WFTaskFactory.h"
struct WFServerParams
{
enum TransportType transport_type;
size_t max_connections;
int peer_response_timeout; /* timeout of each read or write operation */
int receive_timeout; /* timeout of receiving the whole message */
int keep_alive_timeout;
size_t request_size_limit;
int ssl_accept_timeout; /* if not ssl, this will be ignored */
};
static constexpr struct WFServerParams SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_TCP,
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 60 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 10 * 1000,
};
class WFServerBase : protected CommService
{
public:
WFServerBase(const struct WFServerParams *params) :
conn_count(0)
{
this->params = *params;
this->unbind_finish = false;
this->listen_fd = -1;
}
public:
/* To start a TCP server */
/* Start on port with IPv4. */
int start(unsigned short port)
{
return start(AF_INET, NULL, port, NULL, NULL);
}
/* Start with family. AF_INET or AF_INET6. */
int start(int family, unsigned short port)
{
return start(family, NULL, port, NULL, NULL);
}
/* Start with hostname and port. */
int start(const char *host, unsigned short port)
{
return start(AF_INET, host, port, NULL, NULL);
}
/* Start with family, hostname and port. */
int start(int family, const char *host, unsigned short port)
{
return start(family, host, port, NULL, NULL);
}
/* Start with binding address. */
int start(const struct sockaddr *bind_addr, socklen_t addrlen)
{
return start(bind_addr, addrlen, NULL, NULL);
}
/* To start an SSL server. */
int start(unsigned short port, const char *cert_file, const char *key_file)
{
return start(AF_INET, NULL, port, cert_file, key_file);
}
int start(int family, unsigned short port,
const char *cert_file, const char *key_file)
{
return start(family, NULL, port, cert_file, key_file);
}
int start(const char *host, unsigned short port,
const char *cert_file, const char *key_file)
{
return start(AF_INET, host, port, cert_file, key_file);
}
int start(int family, const char *host, unsigned short port,
const char *cert_file, const char *key_file);
/* This is the only necessary start function. */
int start(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file);
/* To start with a specified fd. For graceful restart or SCTP server. */
int serve(int listen_fd)
{
return serve(listen_fd, NULL, NULL);
}
int serve(int listen_fd, const char *cert_file, const char *key_file);
/* stop() is a blocking operation. */
void stop()
{
this->shutdown();
this->wait_finish();
}
/* Nonblocking terminating the server. For stopping multiple servers.
* Typically, call shutdown() and then wait_finish().
* But indeed wait_finish() can be called before shutdown(), even before
* start() in another thread. */
void shutdown();
void wait_finish();
public:
size_t get_conn_count() const { return this->conn_count; }
/* Get the listening address. This is often used after starting
* server on a random port (start() with port == 0). */
int get_listen_addr(struct sockaddr *addr, socklen_t *addrlen) const
{
if (this->listen_fd >= 0)
return getsockname(this->listen_fd, addr, addrlen);
errno = ENOTCONN;
return -1;
}
const struct WFServerParams *get_params() const { return &this->params; }
protected:
/* Override this function to create the initial SSL CTX of the server */
virtual SSL_CTX *new_ssl_ctx(const char *cert_file, const char *key_file);
/* Override this function to implement server that supports TLS SNI.
* "servername" will be NULL if client does not set a host name.
* Returning NULL to indicate that servername is not supported. */
virtual SSL_CTX *get_server_ssl_ctx(const char *servername)
{
return this->get_ssl_ctx();
}
/* This can be used by the implementation of 'new_ssl_ctx'. */
static int ssl_ctx_callback(SSL *ssl, int *al, void *arg);
protected:
WFServerParams params;
protected:
virtual int create_listen_fd();
virtual WFConnection *new_connection(int accept_fd);
void delete_connection(WFConnection *conn);
private:
int init(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file);
virtual void handle_unbound();
protected:
std::atomic<size_t> conn_count;
private:
int listen_fd;
bool unbind_finish;
std::mutex mutex;
std::condition_variable cond;
class CommScheduler *scheduler;
};
template<class REQ, class RESP>
class WFServer : public WFServerBase
{
public:
WFServer(const struct WFServerParams *params,
std::function<void (WFNetworkTask<REQ, RESP> *)> proc) :
WFServerBase(params),
process(std::move(proc))
{
}
WFServer(std::function<void (WFNetworkTask<REQ, RESP> *)> proc) :
WFServerBase(&SERVER_PARAMS_DEFAULT),
process(std::move(proc))
{
}
protected:
virtual CommSession *new_session(long long seq, CommConnection *conn);
protected:
std::function<void (WFNetworkTask<REQ, RESP> *)> process;
};
template<class REQ, class RESP>
CommSession *WFServer<REQ, RESP>::new_session(long long seq, CommConnection *conn)
{
using factory = WFNetworkTaskFactory<REQ, RESP>;
WFNetworkTask<REQ, RESP> *task;
task = factory::create_server_task(this, this->process);
task->set_keep_alive(this->params.keep_alive_timeout);
task->set_receive_timeout(this->params.receive_timeout);
task->get_req()->set_size_limit(this->params.request_size_limit);
return task;
}
#endif