Skip to content

Commit

Permalink
rewrite named_pipe using async_initiate
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Oct 15, 2023
1 parent 7a01ebe commit 92af950
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 131 deletions.
89 changes: 45 additions & 44 deletions include/boost/winasio/named_pipe/named_pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,50 +75,51 @@ class named_pipe : public boost::asio::windows::basic_stream_handle<Executor> {
parent_type::assign(hPipe);
}

template <BOOST_ASIO_COMPLETION_TOKEN_FOR(void(boost::system::error_code,
std::size_t)) ConnectHandler
BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(ConnectHandler,
void(boost::system::error_code,
std::size_t))
async_server_connect(
BOOST_ASIO_MOVE_ARG(ConnectHandler)
handler BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)) {
boost::system::error_code ec;
boost::asio::windows::overlapped_ptr optr(this->get_executor(),
std::move(handler));
bool fConnected = false;
fConnected = ConnectNamedPipe(this->native_handle(), optr.get());
// Overlapped ConnectNamedPipe should return zero.
if (fConnected) {
// printf("ConnectNamedPipe failed with %d.\n", GetLastError());
ec = boost::system::error_code(::GetLastError(),
boost::system::system_category());
optr.complete(ec, 0);
return;
}

switch (GetLastError()) {
// The overlapped connection in progress.
case ERROR_IO_PENDING:
optr.release();
break;
// Client is already connected, so signal an event.
case ERROR_PIPE_CONNECTED: {
// In the win32 example here we need to reset the overlapp event when pipe
// already is connected. But this case overlapped_ptr cannot trigger this
// because iocp does not register this pipe instance.
optr.complete(ec, 0);
break;
}
// If an error occurs during the connect operation...
default: {
// printf("Some named pipe op failed with %d.\n", GetLastError());
ec = boost::system::error_code(::GetLastError(),
boost::asio::error::get_system_category());
optr.complete(ec, 0);
}
}
template<typename Token>
auto async_server_connect(Token && token){
return boost::asio::async_initiate<decltype(token),
void(boost::system::error_code)>(
[this](auto handler) {
// init optr to pass through the user handler.
boost::asio::windows::overlapped_ptr optr(this->get_executor(),
[h=std::move(handler)](boost::system::error_code ec, std::size_t) mutable {
std::move(h)(ec);
});
boost::system::error_code ec;
bool fConnected = false;
fConnected = ConnectNamedPipe(this->native_handle(), optr.get());
// Overlapped ConnectNamedPipe should return zero.
if (fConnected) {
// printf("ConnectNamedPipe failed with %d.\n", GetLastError());
ec = boost::system::error_code(::GetLastError(),
boost::system::system_category());
optr.complete(ec, 0);
return;
}

switch (GetLastError()) {
// The overlapped connection in progress.
case ERROR_IO_PENDING:
optr.release();
break;
// Client is already connected, so signal an event.
case ERROR_PIPE_CONNECTED: {
// In the win32 example here we need to reset the overlapp event when pipe
// already is connected. But this case overlapped_ptr cannot trigger this
// because iocp does not register this pipe instance.
optr.complete(ec, 0);
break;
}
// If an error occurs during the connect operation...
default: {
// printf("Some named pipe op failed with %d.\n", GetLastError());
ec = boost::system::error_code(::GetLastError(),
boost::asio::error::get_system_category());
optr.complete(ec, 0);
}
}
},
token);
}

// used for client to connect
Expand Down
42 changes: 29 additions & 13 deletions include/boost/winasio/named_pipe/named_pipe_acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,20 @@ template <typename Executor> class named_pipe_acceptor {
async_accept(named_pipe<executor_type> &pipe,
BOOST_ASIO_MOVE_ARG(AcceptToken)
token BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)) {

return boost::asio::async_compose<AcceptToken,
void(boost::system::error_code)>(
details::async_accept_op<executor_type>(&pipe, this->endpoint_), token,
pipe);
return boost::asio::async_initiate<decltype(token),
void(boost::system::error_code)>(
[this, &pipe](auto handler) {
boost::system::error_code ec;
pipe.server_create(ec, endpoint_);
if (ec) {
std::move(handler)(ec);
return;
}
pipe.async_server_connect([h=std::move(handler), this](boost::system::error_code ec) mutable {
std::move(h)(ec);
});
},
token);
}

// TODO: fix move and rebind executor
Expand All @@ -73,14 +82,21 @@ template <typename Executor> class named_pipe_acceptor {
named_pipe<executor_type>))
async_accept(BOOST_ASIO_MOVE_ARG(MoveAcceptToken)
token BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)) {
// std::cout << "async_accept func" << std::endl;

return boost::asio::async_compose<MoveAcceptToken,
void(boost::system::error_code,
named_pipe<executor_type>)>(
details::async_move_accept_op<executor_type>(&this->pipe_,
this->endpoint_),
token, this->pipe_);
return boost::asio::async_initiate<decltype(token),
void(boost::system::error_code,
named_pipe<executor_type>)>(
[this](auto handler) {
boost::system::error_code ec;
pipe_.server_create(ec, endpoint_);
if (ec) {
std::move(handler)(ec, std::move(pipe_));
return;
}
pipe_.async_server_connect([h=std::move(handler), this](boost::system::error_code ec) mutable {
std::move(h)(ec, std::move(pipe_));
});
},
token);
}

const endpoint_type endpoint_;
Expand Down
74 changes: 0 additions & 74 deletions include/boost/winasio/named_pipe/named_pipe_server_details.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,80 +20,6 @@ namespace boost {
namespace winasio {
namespace details {

// handler of signature void(error_code, pipe)
template <typename Executor>
class async_move_accept_op : boost::asio::coroutine {
public:
typedef std::string endpoint_type;

async_move_accept_op(named_pipe<Executor> *pipe, endpoint_type endpoint)
: pipe_(pipe), endpoint_(endpoint) {}

template <typename Self>
void operator()(Self &self, boost::system::error_code ec = {}) {
// std::cout << "async_move_accept_op" << std::endl;
if (ec) {
// std::cout << "async_move_accept_op has error" << std::endl;
self.complete(ec, std::move(*pipe_));
return;
}
// create named pipe
pipe_->server_create(ec, endpoint_);
if (ec) {
self.complete(ec, std::move(*pipe_));
return;
}
// connect to namedpipe
pipe_->async_server_connect(
[self = std::move(self), p = pipe_](boost::system::error_code ec,
std::size_t) mutable {
self.complete(ec, std::move(*p));
});
}

private:
// pipe for movable case is the pipe holder in acceptor, which needs to moved
// to handler function, so that to free acceptor pipe holder to handle the
// next connection.
named_pipe<Executor> *pipe_;
endpoint_type const endpoint_;
};

// handler of signature void(error_code)
template <typename Executor> class async_accept_op : boost::asio::coroutine {
public:
typedef std::string endpoint_type;

async_accept_op(named_pipe<Executor> *pipe, endpoint_type endpoint)
: pipe_(pipe), endpoint_(endpoint) {}

template <typename Self>
void operator()(Self &self, boost::system::error_code ec = {}) {
// std::cout << "async_move_accept_op" << std::endl;
if (ec) {
// std::cout << "async_move_accept_op has error" << std::endl;
self.complete(ec);
return;
}
// create named pipe
pipe_->server_create(ec, endpoint_);
if (ec) {
self.complete(ec);
return;
}
// connect to namedpipe
pipe_->async_server_connect(
[self = std::move(self)](boost::system::error_code ec,
std::size_t) mutable { self.complete(ec); });
}

private:
// pipe for movable case is the pipe holder in acceptor, which needs to moved
// to handler function, so that to free acceptor pipe holder to handle the
// next connection.
named_pipe<Executor> *pipe_;
endpoint_type const endpoint_;
};

} // namespace details
} // namespace winasio
Expand Down

0 comments on commit 92af950

Please sign in to comment.