-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtcp_lb.cpp
172 lines (153 loc) · 4.36 KB
/
tcp_lb.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#include <elle/Exception.hh>
#include <elle/log.hh>
#include <elle/reactor/network/Error.hh>
#include <elle/reactor/network/TCPSocket.hh>
#include <elle/reactor/network/TCPServer.hh>
#include <elle/reactor/scheduler.hh>
ELLE_LOG_COMPONENT("LB");
struct Node {
std::string host;
std::string port;
Node(std::string host, std::string port)
: host{std::move(host)}, port{std::move(port)}
{}
auto connect() const
{
return std::make_unique<elle::reactor::network::TCPSocket>(host, port);
}
};
class Nodes {
public:
using Collection = std::vector<Node>;
using Iter = Collection::const_iterator;
public:
Nodes(std::initializer_list<Node> nodes)
: nodes{nodes}
{
assert(not this->nodes.empty());
iter = this->nodes.cend();
}
const Node& next()
{
// round robin
const auto& node = [this]()->const Node&
{
if(iter != nodes.cend()) {
return *iter;
}
// reached end, start from beginning
else {
iter = Iter{nodes.cbegin()};
return *iter;
}
}();
++iter;
return node;
}
private:
const std::vector<Node> nodes;
Iter iter;
};
struct Connection
: std::enable_shared_from_this<Connection>
{
using Socket = elle::reactor::network::TCPSocket;
using Socket_ptr = std::shared_ptr<Socket>;
using Thread = elle::reactor::Thread;
using Thread_ptr = std::unique_ptr<Thread>;
using Collection = std::unordered_set<std::shared_ptr<Connection>>;
Collection& collection;
Socket_ptr outside;
Socket_ptr inside;
Thread_ptr out_to_in;
Thread_ptr in_to_out;
Connection(Collection& col, Socket_ptr out, Socket_ptr in)
: collection{col},
outside{std::move(out)},
inside{std::move(in)}
{
// Classical trick: forward holds a reference to the Connection to maintain
// it alive while we are not done. The scheduler reset thread actions once
// they are done, thus losing those reference and deleting properly the
// threads once they are both done.
auto forward =
[this, self = std::shared_ptr<Connection>()]
(Socket& from, Socket& to) mutable
{
self = this->shared_from_this();
try
{
while (true)
{
auto payload = from.read_some(4096);
to.write(payload);
}
}
catch (elle::reactor::network::ConnectionClosed const&)
{}
from.close();
to.close();
collection.erase(self);
};
out_to_in = std::make_unique<Thread>(
elle::print("{}: out -> in", outside),
[this, forward] () mutable { forward(*outside, *inside); });
in_to_out = std::make_unique<Thread>(
elle::print("{}: in -> out", outside),
[this, forward] () mutable { forward(*inside, *outside); });
}
~Connection()
{
ELLE_LOG("lost connection from {}", this->outside->peer());
}
};
int
main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;
return 1;
}
// Create a Scheduler, the coroutines operator.
elle::reactor::Scheduler sched;
// Properly terminate the scheduler in case of SIGINT.
sched.signal_handle(SIGINT, [&sched] { sched.terminate(); });
Nodes nodes{
{"localhost", "8090"},
{"localhost", "8091"},
{"localhost", "8092"}
};
// Create a coroutine (named elle::reactor::Thread).
elle::reactor::Thread acceptor(sched, "acceptor", [&]
{
elle::reactor::network::TCPServer server;
auto port = std::atoi(argv[1]);
server.listen(port);
Connection::Collection connections;
while (true)
{
// Server::accept yields until it gets a connection.
auto outside = server.accept();
// Connect to one of our nodes
auto inside = nodes.next().connect();
ELLE_LOG("new connection from {}, forward to {}",
outside->peer(), inside->peer());
connections.emplace(
std::make_shared<Connection>(
connections, std::move(outside), std::move(inside)));
} // < while(true)
}); // < thread acceptor
// Run the Scheduler until all coroutines are over or it gets interrupted
// (by a signal or programmatically).
sched.run();
}
catch (...)
{
std::cerr << "fatal error: " << elle::exception_string() << std::endl;
return 1;
}
return 0;
}