From 9d2fd99ffc86101aa359b5fe2317dfd8dd26951c Mon Sep 17 00:00:00 2001 From: Flo Roeske Date: Fri, 16 Dec 2022 16:18:17 +1300 Subject: [PATCH] Fix tcp server/client request Pretend eCAL::STcpHeader before the request data and set the payload size on the client. Read and decode the header and read until all request data has been received before executing the callback on the server. --- ecal/core/src/service/asio_server.h | 59 +++++++++++++++++------- ecal/core/src/service/ecal_tcpclient.cpp | 21 ++++++++- ecal/core/src/service/ecal_tcpclient.h | 1 + 3 files changed, 62 insertions(+), 19 deletions(-) diff --git a/ecal/core/src/service/asio_server.h b/ecal/core/src/service/asio_server.h index 5b83dbcc2a..8c0d5a1a75 100644 --- a/ecal/core/src/service/asio_server.h +++ b/ecal/core/src/service/asio_server.h @@ -77,29 +77,44 @@ class CAsioSession { if (request_callback_) { - // collect request - //std::cout << "CAsioSession::handle_read read bytes " << bytes_transferred << std::endl; - request_ += std::string(data_, bytes_transferred); - // are there some more data on the socket ? - if (socket_.available()) + size_t bytes_used = 0; + std::string data = std::string(data_, bytes_transferred); + + // collect header + if (data.size() < sizeof(eCAL::STcpHeader) && header_.size() < sizeof(eCAL::STcpHeader)) { - // read some more bytes - socket_.async_read_some(asio::buffer(data_, max_length), - std::bind(&CAsioSession::handle_read, this, - std::placeholders::_1, - std::placeholders::_2)); + header_ += data; } - // no more data - else + else if (data.size() >= sizeof(eCAL::STcpHeader) && header_.size() < sizeof(eCAL::STcpHeader)) + { + bytes_used += sizeof(eCAL::STcpHeader) - header_.size(); + header_ += data.substr(0, bytes_used); + } + + // decode header and get request size + if (header_.size() == sizeof(eCAL::STcpHeader) && header_request_size_ == 0) + { + eCAL::STcpHeader tcp_header; + memcpy(&tcp_header, header_.data(), sizeof(eCAL::STcpHeader)); + header_request_size_ = static_cast(ntohl(tcp_header.psize_n)); + } + + // collect request_ + if (header_request_size_ != 0 && request_.size() < header_request_size_) + { + request_ += data.substr(bytes_used, bytes_transferred - bytes_used); + } + + // execute callback + if (header_request_size_ != 0 && request_.size() == header_request_size_) { - // execute service callback - //std::cout << "CAsioSession::handle_read final request size " << request_.size() << std::endl; response_.clear(); request_callback_(request_, response_); + header_.clear(); request_.clear(); - //std::cout << "CAsioSession::handle_read server callback executed - reponse size " << response_.size() << std::endl; + header_request_size_ = 0; - // write response back + // write response packed_response_.clear(); packed_response_ = pack_write(response_); asio::async_write(socket_, @@ -108,6 +123,14 @@ class CAsioSession std::placeholders::_1, std::placeholders::_2)); } + else + { + // read some more bytes until all of request data has been received + socket_.async_read_some(asio::buffer(data_, max_length), + std::bind(&CAsioSession::handle_read, this, + std::placeholders::_1, + std::placeholders::_2)); + } } } else @@ -118,7 +141,7 @@ class CAsioSession // handle the disconnect if (event_callback_) { - event_callback_(server_event_disconnected, "CAsioSession disconnected on read"); + event_callback_(server_event_disconnected, "CAsioSession disconnected on read: " + ec.message()); } } delete this; @@ -167,6 +190,8 @@ class CAsioSession asio::ip::tcp::socket socket_; RequestCallbackT request_callback_; EventCallbackT event_callback_; + std::string header_; + size_t header_request_size_ = 0; std::string request_; std::string response_; std::vector packed_response_; diff --git a/ecal/core/src/service/ecal_tcpclient.cpp b/ecal/core/src/service/ecal_tcpclient.cpp index d0837eb926..fbee6b9fd2 100644 --- a/ecal/core/src/service/ecal_tcpclient.cpp +++ b/ecal/core/src/service/ecal_tcpclient.cpp @@ -164,6 +164,20 @@ namespace eCAL } } + std::vector CTcpClient::PackRequest(const std::string& request) + { + // create header + eCAL::STcpHeader tcp_header; + // set up package size + const size_t psize = request.size(); + tcp_header.psize_n = htonl(static_cast(psize)); + // repack + std::vector packed_request(sizeof(tcp_header) + psize); + memcpy(packed_request.data(), &tcp_header, sizeof(tcp_header)); + memcpy(packed_request.data() + sizeof(tcp_header), request.data(), psize); + return packed_request; + } + bool CTcpClient::SendRequest(const std::string& request_) { size_t written(0); @@ -177,10 +191,13 @@ namespace eCAL m_socket->read_some(asio::buffer(resp_buffer)); } + std::vector packed_request = PackRequest(request_); + std::string packed_request_str(packed_request.begin(), packed_request.end()); + // send payload to server - while (written != request_.size()) + while (written != packed_request.size()) { - auto bytes_written = m_socket->write_some(asio::buffer(request_.c_str() + written, request_.size() - written)); + auto bytes_written = m_socket->write_some(asio::buffer(packed_request_str.c_str() + written, packed_request_str.size() - written)); written += bytes_written; } } diff --git a/ecal/core/src/service/ecal_tcpclient.h b/ecal/core/src/service/ecal_tcpclient.h index 32fa6be081..fc01daab66 100644 --- a/ecal/core/src/service/ecal_tcpclient.h +++ b/ecal/core/src/service/ecal_tcpclient.h @@ -89,6 +89,7 @@ namespace eCAL std::atomic m_async_request_in_progress; private: + std::vector PackRequest(const std::string &request); bool SendRequest(const std::string &request_); size_t ReceiveResponse(std::string &response_, int timeout_); void ReceiveResponseAsync(AsyncCallbackT callback_, int timeout_);