-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathclient.hpp
129 lines (112 loc) · 4.73 KB
/
client.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#ifndef CLIENT_HPP_
#define CLIENT_HPP_
#pragma once
#include "basic.hpp"
namespace pika
{
class client : public std::enable_shared_from_this<client>
{
lib::tcp::endpoint export_ep_;
lib::tcp::endpoint controller_ep_;
public:
client(std::string_view export_host, boost::asio::io_context &io_context):
export_ep_{util::make_connectable(export_host, io_context)} {}
lib::awaitable<void> run(std::string_view controller_host,
std::string_view controller_bind,
error::restart_request &req)
{
auto executor = co_await lib::this_coro::executor();
auto token = co_await lib::this_coro::token();
try
{
auto self = shared_from_this();
self->controller_ep_ = util::make_connectable(controller_host, executor.context());
lib::tcp::socket controller_socket{executor.context()};
co_await controller_socket.async_connect(self->controller_ep_, token);
auto controller_bind_ep = util::make_connectable(controller_bind, executor.context());
{ // send bind request
std::uint32_t ip = controller_bind_ep.address().to_v4().to_ulong();
boost::endian::native_to_big_inplace(ip);
std::uint16_t port = controller_bind_ep.port();
boost::endian::native_to_big_inplace(port);
std::array<std::uint8_t, 8> req{0x01, 0x00};
std::memcpy(&req[2] , &ip, sizeof ip);
std::memcpy(&req[2 + sizeof ip], &port, sizeof port);
std::ignore = co_await boost::asio::async_write(controller_socket, boost::asio::buffer(req), token);
}
for (;;)
{
std::array<std::uint8_t, 8> buf{};
std::size_t length = co_await boost::asio::async_read(controller_socket, boost::asio::buffer(buf), token);
if (buf.at(1) != 0)
{
std::cout << "Error connecting to remote server\n";
using namespace std::chrono_literals;
throw error::restart_request{1s};
}
else
{
switch(buf.at(0))
{
case 0x00: // do nothing
break;
case 0x02: // Is remote request
{
std::uint32_t id = 0;
std::memcpy(&id, &buf[2], 4);
lib::co_spawn(executor,
[self, id]() mutable {
return self->make_bridge(id);
}, lib::detached);
break;
}
default:
// response failed
break;
}
}
}
}
catch (error::restart_request const & e)
{
std::cerr << "client restart exception, restarting\n";
executor.context().stop();
req = e;
co_return;
}
catch (std::exception const & e)
{
std::cerr << "client::start() exception: " << e.what() << std::endl;
executor.context().stop();
}
// this is temparary
using namespace std::chrono_literals;
req = error::restart_request{1s};
}
lib::awaitable<void> make_bridge(std::uint32_t const id)
{
try
{
auto executor = co_await lib::this_coro::executor();
auto token = co_await lib::this_coro::token();
auto self = shared_from_this();
auto proxy_bridge = std::make_shared<bridge>(lib::tcp::socket{executor.context()},
lib::tcp::socket{executor.context()});
lib::tcp::socket
& export_socket{proxy_bridge->first_socket_},
& controller_socket{proxy_bridge->second_socket_};
co_await export_socket.async_connect(self->export_ep_, token);
co_await controller_socket.async_connect(self->controller_ep_, token);
std::array<std::uint8_t, 8> req{0x02, 0x00};
std::memcpy(&req[2], &id, sizeof id);
std::ignore = co_await boost::asio::async_write(controller_socket, boost::asio::buffer(req), token);
co_await proxy_bridge->start_transport();
}
catch (std::exception const & e)
{
std::cerr << "client::make_bridge() exception: " << e.what() << std::endl;
}
}
};
}
#endif // CLIENT_HPP_