Skip to content

Commit

Permalink
Fix tcp server/client request
Browse files Browse the repository at this point in the history
Pretend eCAL::STcpHeader before the request data and set the payload
size on the client. Read and decode the header and read until all
request data has been received before executing the callback on the server.
  • Loading branch information
floroeske committed Dec 16, 2022
1 parent 9eb54f3 commit 03432f0
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 19 deletions.
59 changes: 42 additions & 17 deletions ecal/core/src/service/asio_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,44 @@ class CAsioSession
{
if (request_callback_)
{
// collect request
//std::cout << "CAsioSession::handle_read read bytes " << bytes_transferred << std::endl;
request_ += std::string(data_, bytes_transferred);
// are there some more data on the socket ?
if (socket_.available())
size_t bytes_used = 0;
std::string data = std::string(data_, bytes_transferred);

// collect header
if (data.size() < sizeof(eCAL::STcpHeader) && header_.size() < sizeof(eCAL::STcpHeader))
{
// read some more bytes
socket_.async_read_some(asio::buffer(data_, max_length),
std::bind(&CAsioSession::handle_read, this,
std::placeholders::_1,
std::placeholders::_2));
header_ += data;
}
// no more data
else
else if (data.size() >= sizeof(eCAL::STcpHeader) && header_.size() < sizeof(eCAL::STcpHeader))
{
bytes_used += sizeof(eCAL::STcpHeader) - header_.size();
header_ += data.substr(0, bytes_used);
}

// decode header and get request size
if (header_.size() == sizeof(eCAL::STcpHeader) && header_request_size_ == 0)
{
eCAL::STcpHeader tcp_header;
memcpy(&tcp_header, header_.data(), sizeof(eCAL::STcpHeader));
header_request_size_ = static_cast<size_t>(ntohl(tcp_header.psize_n));
}

// collect request_
if (header_request_size_ != 0 && request_.size() < header_request_size_)
{
request_ += data.substr(bytes_used, bytes_transferred - bytes_used);
}

// execute callback
if (header_request_size_ != 0 && request_.size() == header_request_size_)
{
// execute service callback
//std::cout << "CAsioSession::handle_read final request size " << request_.size() << std::endl;
response_.clear();
request_callback_(request_, response_);
header_.clear();
request_.clear();
//std::cout << "CAsioSession::handle_read server callback executed - reponse size " << response_.size() << std::endl;
header_request_size_ = 0;

// write response back
// write response
packed_response_.clear();
packed_response_ = pack_write(response_);
asio::async_write(socket_,
Expand All @@ -108,6 +123,14 @@ class CAsioSession
std::placeholders::_1,
std::placeholders::_2));
}
else
{
// read some more bytes until all of request data has been received
socket_.async_read_some(asio::buffer(data_, max_length),
std::bind(&CAsioSession::handle_read, this,
std::placeholders::_1,
std::placeholders::_2));
}
}
}
else
Expand All @@ -118,7 +141,7 @@ class CAsioSession
// handle the disconnect
if (event_callback_)
{
event_callback_(server_event_disconnected, "CAsioSession disconnected on read");
event_callback_(server_event_disconnected, "CAsioSession disconnected on read: " + ec.message());
}
}
delete this;
Expand Down Expand Up @@ -167,6 +190,8 @@ class CAsioSession
asio::ip::tcp::socket socket_;
RequestCallbackT request_callback_;
EventCallbackT event_callback_;
std::string header_;
size_t header_request_size_ = 0;
std::string request_;
std::string response_;
std::vector<char> packed_response_;
Expand Down
21 changes: 19 additions & 2 deletions ecal/core/src/service/ecal_tcpclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ namespace eCAL
}
}

std::vector<char> CTcpClient::PackRequest(const std::string& request)
{
// create header
eCAL::STcpHeader tcp_header;
// set up package size
const size_t psize = request.size();
tcp_header.psize_n = htonl(static_cast<uint32_t>(psize));
// repack
std::vector<char> packed_request(sizeof(tcp_header) + psize);
memcpy(packed_request.data(), &tcp_header, sizeof(tcp_header));
memcpy(packed_request.data() + sizeof(tcp_header), request.data(), psize);
return packed_request;
}

bool CTcpClient::SendRequest(const std::string& request_)
{
size_t written(0);
Expand All @@ -177,10 +191,13 @@ namespace eCAL
m_socket->read_some(asio::buffer(resp_buffer));
}

std::vector<char> packed_request = PackRequest(request_);
std::string packed_request_str(packed_request.begin(), packed_request.end());

// send payload to server
while (written != request_.size())
while (written != packed_request.size())
{
auto bytes_written = m_socket->write_some(asio::buffer(request_.c_str() + written, request_.size() - written));
auto bytes_written = m_socket->write_some(asio::buffer(packed_request_str.c_str() + written, packed_request_str.size() - written));
written += bytes_written;
}
}
Expand Down
1 change: 1 addition & 0 deletions ecal/core/src/service/ecal_tcpclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ namespace eCAL
std::atomic<bool> m_async_request_in_progress;

private:
std::vector<char> PackRequest(const std::string &request);
bool SendRequest(const std::string &request_);
size_t ReceiveResponse(std::string &response_, int timeout_);
void ReceiveResponseAsync(AsyncCallbackT callback_, int timeout_);
Expand Down

0 comments on commit 03432f0

Please sign in to comment.