Skip to content

Commit

Permalink
Adds a buffer that has a tolerance.
Browse files Browse the repository at this point in the history
  • Loading branch information
mzimbres committed Aug 7, 2023
1 parent 81927de commit f506e1b
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 9 deletions.
99 changes: 99 additions & 0 deletions include/boost/redis/detail/buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#ifndef BOOST_REDIS_BUFFER_HPP
#define BOOST_REDIS_BUFFER_HPP

#include <limits>
#include <string>
#include <algorithm>

#include <boost/asio/buffer.hpp>
#include <boost/throw_exception.hpp>

namespace boost::redis::detail
{

template <class Elem, class Traits, class Allocator>
class dynamic_string_buffer {
public:
using const_buffers_type = asio::const_buffer;
using mutable_buffers_type = asio::mutable_buffer;

explicit
dynamic_string_buffer(
std::basic_string<Elem, Traits, Allocator>& s,
std::size_t maximum_size) noexcept
: string_(s)
, max_size_(maximum_size)
{
}

void clear()
{
consumed_ = 0;
}

std::size_t size() const noexcept
{
return (std::min)(string_.size() - consumed_, max_size_);
}

bool empty() const noexcept
{
return size() == 0;
}

std::size_t max_size() const noexcept
{
return max_size_;
}

char const& front() const noexcept
{
return string_.at(consumed_);
}

std::size_t capacity() const noexcept
{
return (std::min)(string_.capacity(), max_size());
}

mutable_buffers_type data(std::size_t pos, std::size_t n) noexcept
{
return mutable_buffers_type(asio::buffer(asio::buffer(string_, max_size_) + pos + consumed_, n));
}

const_buffers_type data(std::size_t pos, std::size_t n) const noexcept
{
return const_buffers_type(asio::buffer(asio::buffer(string_, max_size_) + pos + consumed_, n));
}

void grow(std::size_t n)
{
if (string_.size() > max_size_ || (string_.size() + n) > max_size_)
BOOST_THROW_EXCEPTION(std::length_error{"dynamic_string_buffer too long"});

string_.resize(string_.size() + n);
}

void shrink(std::size_t n)
{
string_.resize(n > (string_.size() - consumed_) ? consumed_ : string_.size() - n);
}

void consume(std::size_t n, std::size_t tolerance = 100'000)
{
consumed_ += n;
if (consumed_ > tolerance) {
string_.erase(0, consumed_);
consumed_ = 0;
}
}

private:
std::basic_string<Elem, Traits, Allocator>& string_;
std::size_t consumed_ = 0;
std::size_t const max_size_;
};

}

#endif // BOOST_REDIS_BUFFER_HPP
24 changes: 16 additions & 8 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/detail/runner.hpp>
#include <boost/redis/detail/buffer.hpp>

#include <boost/system.hpp>
#include <boost/asio/basic_stream_socket.hpp>
Expand Down Expand Up @@ -111,7 +112,7 @@ class read_next_op {
// If we detect a push in the middle of a request we have
// to hand it to the push consumer. To do that we need
// some data in the read bufer.
if (conn_->read_buffer_.empty()) {
if (std::empty(conn_->dbuf_)) {

if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
Expand All @@ -130,7 +131,7 @@ class read_next_op {

// If the next request is a push we have to handle it to
// the receive_op wait for it to be done and continue.
if (resp3::to_type(conn_->read_buffer_.front()) == resp3::type::push) {
if (resp3::to_type(conn_->dbuf_.front()) == resp3::type::push) {
BOOST_ASIO_CORO_YIELD
conn_->async_wait_receive(std::move(self));
BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
Expand All @@ -154,7 +155,7 @@ class read_next_op {
return;
}

conn_->dbuf_.consume(n);
conn_->consume(n);
read_size_ += n;

BOOST_ASSERT(cmds_ != 0);
Expand Down Expand Up @@ -204,7 +205,7 @@ struct receive_op {
return;
}

conn_->dbuf_.consume(n);
conn_->consume(n);

if (!conn_->is_next_push()) {
conn_->read_op_timer_.cancel();
Expand Down Expand Up @@ -329,6 +330,7 @@ struct run_op {
{
conn->write_buffer_.clear();
conn->read_buffer_.clear();
conn->dbuf_.clear();

BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
Expand Down Expand Up @@ -408,7 +410,7 @@ struct reader_op {
bool as_push() const
{
return
(resp3::to_type(conn->read_buffer_.front()) == resp3::type::push)
(resp3::to_type(conn->dbuf_.front()) == resp3::type::push)
|| conn->reqs_.empty()
|| (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)
|| !conn->is_waiting_response(); // Added to deal with MONITOR.
Expand Down Expand Up @@ -457,7 +459,7 @@ struct reader_op {
// - https://github.com/redis/redis/issues/11784
// - https://github.com/redis/redis/issues/6426
//
BOOST_ASSERT(!conn->read_buffer_.empty());
BOOST_ASSERT(!conn->dbuf_.empty());
if (as_push()) {
BOOST_ASIO_CORO_YIELD
conn->async_wait_receive(std::move(self));
Expand Down Expand Up @@ -890,12 +892,17 @@ class connection_base {

bool is_next_push() const noexcept
{
return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push);
return !dbuf_.empty() && (resp3::to_type(dbuf_.front()) == resp3::type::push);
}

auto is_open() const noexcept { return stream_->next_layer().is_open(); }
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }

void consume(std::size_t n)
{
dbuf_.consume(n, 1'000'000);
}

asio::ssl::context ctx_;
std::unique_ptr<next_layer_type> stream_;

Expand All @@ -907,8 +914,9 @@ class connection_base {
timer_type read_op_timer_;
runner_type runner_;

using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
using dyn_buffer_type = redis::detail::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;

std::size_t const read_buffer_consume_tolerance_ = 1'000'000; // 1Mb
std::string read_buffer_;
dyn_buffer_type dbuf_;
std::string write_buffer_;
Expand Down
2 changes: 1 addition & 1 deletion tests/test_conn_echo_stress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ auto async_echo_stress() -> net::awaitable<void>
int const sessions = 500;

// The number of pings that will be sent by each session.
int const msgs = 1000;
int const msgs = 1'000;

// The number of publishes that will be sent by each session with
// each message.
Expand Down

0 comments on commit f506e1b

Please sign in to comment.