diff --git a/ecal/core/src/service/asio_server.h b/ecal/core/src/service/asio_server.h index 5b83dbcc2a..8c0d5a1a75 100644 --- a/ecal/core/src/service/asio_server.h +++ b/ecal/core/src/service/asio_server.h @@ -77,29 +77,44 @@ class CAsioSession { if (request_callback_) { - // collect request - //std::cout << "CAsioSession::handle_read read bytes " << bytes_transferred << std::endl; - request_ += std::string(data_, bytes_transferred); - // are there some more data on the socket ? - if (socket_.available()) + size_t bytes_used = 0; + std::string data = std::string(data_, bytes_transferred); + + // collect header + if (data.size() < sizeof(eCAL::STcpHeader) && header_.size() < sizeof(eCAL::STcpHeader)) { - // read some more bytes - socket_.async_read_some(asio::buffer(data_, max_length), - std::bind(&CAsioSession::handle_read, this, - std::placeholders::_1, - std::placeholders::_2)); + header_ += data; } - // no more data - else + else if (data.size() >= sizeof(eCAL::STcpHeader) && header_.size() < sizeof(eCAL::STcpHeader)) + { + bytes_used += sizeof(eCAL::STcpHeader) - header_.size(); + header_ += data.substr(0, bytes_used); + } + + // decode header and get request size + if (header_.size() == sizeof(eCAL::STcpHeader) && header_request_size_ == 0) + { + eCAL::STcpHeader tcp_header; + memcpy(&tcp_header, header_.data(), sizeof(eCAL::STcpHeader)); + header_request_size_ = static_cast(ntohl(tcp_header.psize_n)); + } + + // collect request_ + if (header_request_size_ != 0 && request_.size() < header_request_size_) + { + request_ += data.substr(bytes_used, bytes_transferred - bytes_used); + } + + // execute callback + if (header_request_size_ != 0 && request_.size() == header_request_size_) { - // execute service callback - //std::cout << "CAsioSession::handle_read final request size " << request_.size() << std::endl; response_.clear(); request_callback_(request_, response_); + header_.clear(); request_.clear(); - //std::cout << "CAsioSession::handle_read server callback executed - reponse size " << response_.size() << std::endl; + header_request_size_ = 0; - // write response back + // write response packed_response_.clear(); packed_response_ = pack_write(response_); asio::async_write(socket_, @@ -108,6 +123,14 @@ class CAsioSession std::placeholders::_1, std::placeholders::_2)); } + else + { + // read some more bytes until all of request data has been received + socket_.async_read_some(asio::buffer(data_, max_length), + std::bind(&CAsioSession::handle_read, this, + std::placeholders::_1, + std::placeholders::_2)); + } } } else @@ -118,7 +141,7 @@ class CAsioSession // handle the disconnect if (event_callback_) { - event_callback_(server_event_disconnected, "CAsioSession disconnected on read"); + event_callback_(server_event_disconnected, "CAsioSession disconnected on read: " + ec.message()); } } delete this; @@ -167,6 +190,8 @@ class CAsioSession asio::ip::tcp::socket socket_; RequestCallbackT request_callback_; EventCallbackT event_callback_; + std::string header_; + size_t header_request_size_ = 0; std::string request_; std::string response_; std::vector packed_response_; diff --git a/ecal/core/src/service/ecal_tcpclient.cpp b/ecal/core/src/service/ecal_tcpclient.cpp index d0837eb926..fbee6b9fd2 100644 --- a/ecal/core/src/service/ecal_tcpclient.cpp +++ b/ecal/core/src/service/ecal_tcpclient.cpp @@ -164,6 +164,20 @@ namespace eCAL } } + std::vector CTcpClient::PackRequest(const std::string& request) + { + // create header + eCAL::STcpHeader tcp_header; + // set up package size + const size_t psize = request.size(); + tcp_header.psize_n = htonl(static_cast(psize)); + // repack + std::vector packed_request(sizeof(tcp_header) + psize); + memcpy(packed_request.data(), &tcp_header, sizeof(tcp_header)); + memcpy(packed_request.data() + sizeof(tcp_header), request.data(), psize); + return packed_request; + } + bool CTcpClient::SendRequest(const std::string& request_) { size_t written(0); @@ -177,10 +191,13 @@ namespace eCAL m_socket->read_some(asio::buffer(resp_buffer)); } + std::vector packed_request = PackRequest(request_); + std::string packed_request_str(packed_request.begin(), packed_request.end()); + // send payload to server - while (written != request_.size()) + while (written != packed_request.size()) { - auto bytes_written = m_socket->write_some(asio::buffer(request_.c_str() + written, request_.size() - written)); + auto bytes_written = m_socket->write_some(asio::buffer(packed_request_str.c_str() + written, packed_request_str.size() - written)); written += bytes_written; } } diff --git a/ecal/core/src/service/ecal_tcpclient.h b/ecal/core/src/service/ecal_tcpclient.h index 32fa6be081..fc01daab66 100644 --- a/ecal/core/src/service/ecal_tcpclient.h +++ b/ecal/core/src/service/ecal_tcpclient.h @@ -89,6 +89,7 @@ namespace eCAL std::atomic m_async_request_in_progress; private: + std::vector PackRequest(const std::string &request); bool SendRequest(const std::string &request_); size_t ReceiveResponse(std::string &response_, int timeout_); void ReceiveResponseAsync(AsyncCallbackT callback_, int timeout_);