Skip to content

Commit

Permalink
Add auto retry on client mode
Browse files Browse the repository at this point in the history
  • Loading branch information
hare1039 committed Dec 11, 2018
1 parent c24b237 commit e3f24a0
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 82 deletions.
21 changes: 21 additions & 0 deletions basic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <boost/endian/conversion.hpp>
#include <boost/asio.hpp>
#include <boost/scope_exit.hpp>
#include <chrono>

namespace pika
{
Expand Down Expand Up @@ -57,6 +58,26 @@ lib::tcp::endpoint make_connectable(std::string_view host, boost::asio::io_conte

}// namespace util

namespace error
{
using namespace std::chrono_literals;
class restart_request : public std::exception
{
constexpr static std::chrono::seconds max_waittime_ {600s};
std::chrono::seconds waittime_ {600s};
bool empty_ {true};
public:
restart_request() = default;
restart_request(std::chrono::seconds time):
waittime_{(time > max_waittime_? max_waittime_ : time)},
empty_{false} {}

virtual char const * what() const noexcept override { return "Restart Requested\n"; }
void sleep() const {std::this_thread::sleep_for(waittime_);}
operator bool() {return not empty_;};
};
} // namespace error

}// namespace pika

#endif // BASIC_HPP_
18 changes: 10 additions & 8 deletions bridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class bridge : public std::enable_shared_from_this<bridge>
auto self = shared_from_this();
auto executor = co_await lib::this_coro::executor();

lib::co_spawn(executor, [self]() mutable {
return self->redir(self->first_socket_,
self->second_socket_);
}, lib::detached);
lib::co_spawn(executor, [self]() mutable {
return self->redir(self->second_socket_,
self->first_socket_);
}, lib::detached);
lib::co_spawn(executor,
[self]() mutable {
return self->redir(self->first_socket_,
self->second_socket_);
}, lib::detached);
lib::co_spawn(executor,
[self]() mutable {
return self->redir(self->second_socket_,
self->first_socket_);
}, lib::detached);
}

private:
Expand Down
63 changes: 43 additions & 20 deletions client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ class client : public std::enable_shared_from_this<client>
client(std::string_view export_host, boost::asio::io_context &io_context):
export_ep_{util::make_connectable(export_host, io_context)} {}

lib::awaitable<void> run(std::string_view controller_host, std::string_view controller_bind)
lib::awaitable<void> run(std::string_view controller_host,
std::string_view controller_bind,
error::restart_request &req)
{
auto executor = co_await lib::this_coro::executor();
auto token = co_await lib::this_coro::token();

try
{
auto executor = co_await lib::this_coro::executor();
auto token = co_await lib::this_coro::token();
auto self = shared_from_this();

self->controller_ep_ = util::make_connectable(controller_host, executor.context());
Expand All @@ -47,31 +50,51 @@ class client : public std::enable_shared_from_this<client>
std::array<std::uint8_t, 8> buf{};
std::size_t length = co_await boost::asio::async_read(controller_socket, boost::asio::buffer(buf), token);

switch(buf.at(0))
if (buf.at(1) != 0)
{
case 0x00: // do nothing
break;
case 0x03: // Is remote request
std::cout << "Error connecting to remote server\n";
using namespace std::chrono_literals;
throw error::restart_request{1s};
}
else
{
switch(buf.at(0))
{
std::uint32_t id = 0;
std::memcpy(&id, &buf[2], 4);
lib::co_spawn(executor,
[self, id]() mutable {
return self->make_bridge(id);
}, lib::detached);
break;
}
default:
// response failed
break;
case 0x00: // do nothing
break;
case 0x02: // Is remote request
{
std::uint32_t id = 0;
std::memcpy(&id, &buf[2], 4);
lib::co_spawn(executor,
[self, id]() mutable {
return self->make_bridge(id);
}, lib::detached);
break;
}
default:
// response failed
break;
}
}
}

}
catch (error::restart_request const & e)
{
std::cerr << "client restart exception, restarting\n";
executor.context().stop();
req = e;
co_return;
}
catch (std::exception const & e)
{
std::cerr << "session::start() exception: " << e.what() << std::endl;
std::cerr << "client::start() exception: " << e.what() << std::endl;
executor.context().stop();
}

// this is temparary
using namespace std::chrono_literals;
req = error::restart_request{1s};
}

lib::awaitable<void> make_bridge(std::uint32_t const id)
Expand Down
8 changes: 7 additions & 1 deletion controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ class controller
break;
}
default:
{
// response failed
std::array<std::uint8_t, 8> response{0x00 /* CONNECT */, 0x01 /* FAILED */};
std::ignore = co_await boost::asio::async_write(socket, boost::asio::buffer(response), token);
break;
}
}
}

Expand Down Expand Up @@ -107,7 +111,7 @@ class controller
std::uint32_t address = util::hash(socket.remote_endpoint());
clients.insert({address, std::move(socket)});

std::array<std::uint8_t, 8> response{0x03};
std::array<std::uint8_t, 8> response{0x02};
boost::endian::native_to_big_inplace(address);
std::memcpy(&response[2], &address, sizeof address);

Expand All @@ -116,6 +120,8 @@ class controller
}
catch (std::exception const & e)
{
std::array<std::uint8_t, 8> response{0x02 /* CONNECT */, 0x01 /* FAILED */};
std::ignore = co_await boost::asio::async_write(remote_socket, boost::asio::buffer(response), token);
std::cerr << "controller::start_reverse_tunnel exception: " << e.what() << std::endl;
}
}
Expand Down
116 changes: 63 additions & 53 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,65 +70,75 @@ int main(int argc, char *argv[])
boost::asio::signal_set signals{io_context, SIGINT, SIGTERM};
signals.async_wait([&](auto, auto){ io_context.stop(); });

switch (run_mode)
bool restart{true};
while (restart)
{
case mode::socks5:
{
std::cout << "[socks5 mode] ";
pika::socks5::server server{socks5_listen_host, io_context};
pika::lib::co_spawn(io_context,
[&server] {
return server.run();
}, pika::lib::detached);
io_context.run();
break;
}
case mode::srv:
{
pika::controller server{srv_listen_host, io_context};
pika::lib::co_spawn(io_context,
[&server] {
return server.run();
}, pika::lib::detached);
io_context.run();
break;
}
case mode::exp:
pika::error::restart_request req;
restart = false;

switch (run_mode)
{
// if socks5_server_endpoint_socket not null, start socks5 server
if (socks5_server_endpoint_socket)
case mode::socks5:
{
std::cout << "[socks5 mode] ";
pika::socks5::server server{socks5_listen_host, io_context};
pika::lib::co_spawn(io_context,
[&server] {
return server.run();
}, pika::lib::detached);
io_context.run();
break;
}
case mode::srv:
{
pika::controller server{srv_listen_host, io_context};
pika::lib::co_spawn(io_context,
[&server] {
return server.run();
}, pika::lib::detached);
io_context.run();
break;
}
case mode::exp:
{
std::thread t(
[socket = std::move(socks5_server_endpoint_socket), &export_host]() mutable
{
// retrive the random port from socket, release it, than bind it again
using namespace std::literals;
std::cout << "Starting socks5 server at " << export_host << "\n";
socket.reset();
boost::asio::io_context io;
pika::socks5::server server {export_host, io};
pika::lib::co_spawn(io,
[&server] {
return server.run();
}, pika::lib::detached);
io.run();
});
t.detach();
// if socks5_server_endpoint_socket not null, start socks5 server
if (socks5_server_endpoint_socket)
{
std::thread t(
[socket = std::move(socks5_server_endpoint_socket), &export_host]() mutable
{
// retrive the random port from socket, release it, than bind it again
using namespace std::literals;
std::cout << "Starting socks5 server at " << export_host << "\n";
socket.reset();
boost::asio::io_context io;
pika::socks5::server server {export_host, io};
pika::lib::co_spawn(io,
[&server] {
return server.run();
}, pika::lib::detached);
io.run();
});
t.detach();
}
auto c = std::make_shared<pika::client>(export_host, io_context);
pika::lib::co_spawn(io_context,
[&c, &connect_host, &bind_host, &req] {
return c->run(connect_host, bind_host, req);
}, pika::lib::detached);
io_context.run();
break;
}
auto c = std::make_shared<pika::client>(export_host, io_context);
pika::lib::co_spawn(io_context,
[&c, &connect_host, &bind_host] {
return c->run(connect_host, bind_host);
}, pika::lib::detached);
io_context.run();
break;
}

if (req)
{
io_context.restart();
std::cout << "Retrying ... first sleep.\n";
req.sleep();
restart = true;
}
}
// std::vector<std::thread> p(std::thread::hardware_concurrency());
// for (std::thread & t : p)
// t = std::thread ([&io_context]{ io_context.run(); });
// for (std::thread & t : p)
// t.join();
}
catch (const std::exception& e)
{
Expand Down

0 comments on commit e3f24a0

Please sign in to comment.