Skip to content

Commit

Permalink
Merge pull request #2867 from chenBright/tcp_user_timeout
Browse files Browse the repository at this point in the history
Support tcp user timeout of client
  • Loading branch information
yanglimingcn authored Jan 14, 2025
2 parents 4c33f88 + 609676d commit 47c1228
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 194 deletions.
17 changes: 14 additions & 3 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,20 @@ DEFINE_bool(socket_keepalive, false,
"Enable keepalive of sockets if this value is true");

DEFINE_int32(socket_keepalive_idle_s, -1,
"Set idle time of sockets before keepalive if this value is positive");
"Set idle time for socket keepalive in seconds if this value is positive");

DEFINE_int32(socket_keepalive_interval_s, -1,
"Set interval of sockets between keepalives if this value is positive");
"Set interval between keepalives in seconds if this value is positive");

DEFINE_int32(socket_keepalive_count, -1,
"Set number of keepalives of sockets before close if this value is positive");
"Set number of keepalives before death if this value is positive");

DEFINE_int32(socket_tcp_user_timeout_ms, -1,
"If this value is positive, set number of milliseconds that transmitted "
"data may remain unacknowledged, or bufferred data may remain untransmitted "
"(due to zero window size) before TCP will forcibly close the corresponding "
"connection and return ETIMEDOUT to the application. Only linux supports "
"TCP_USER_TIMEOUT.");

DECLARE_bool(usercode_in_pthread);
DECLARE_bool(usercode_in_coroutine);
Expand Down Expand Up @@ -501,6 +508,7 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,
options.keepalive_options->keepalive_count
= FLAGS_socket_keepalive_count;
}
options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms;
return Socket::Create(options, id);
}

Expand Down Expand Up @@ -535,6 +543,9 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) {
= FLAGS_socket_keepalive_count;
}
}
if (options.tcp_user_timeout_ms <= 0) {
options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms;
}
return Socket::Create(options, id);
}

Expand Down
49 changes: 35 additions & 14 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ Socket::Socket(Forbidden f)
, _stream_set(NULL)
, _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
, _tcp_user_timeout_ms(-1)
, _http_request_method(HTTP_METHOD_GET) {
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
Expand Down Expand Up @@ -597,6 +598,21 @@ int Socket::ResetFileDescriptor(int fd) {
// turn off nagling.
// OK to fail, namely unix domain socket does not support this.
butil::make_no_delay(fd);

SetSocketOptions(fd);

if (_on_edge_triggered_events) {
if (_io_event.AddConsumer(fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
}
}
return 0;
}

void Socket::SetSocketOptions(int fd) {
if (_tos > 0 &&
setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) {
PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos;
Expand All @@ -618,27 +634,21 @@ int Socket::ResetFileDescriptor(int fd) {
}
}

EnableKeepaliveIfNeeded(fd);

if (_on_edge_triggered_events) {
if (_io_event.AddConsumer(fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
#if defined(OS_LINUX)
if (_tcp_user_timeout_ms > 0) {
if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT,
&_tcp_user_timeout_ms, sizeof(_tcp_user_timeout_ms)) != 0) {
PLOG(ERROR) << "Fail to set TCP_USER_TIMEOUT of fd=" << fd;
}
}
return 0;
}
#endif

void Socket::EnableKeepaliveIfNeeded(int fd) {
if (!_keepalive_options) {
return;
}

int keepalive = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
sizeof(keepalive)) != 0) {
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)) != 0) {
PLOG(ERROR) << "Fail to set keepalive of fd=" << fd;
return;
}
Expand Down Expand Up @@ -782,6 +792,7 @@ int Socket::OnCreated(const SocketOptions& options) {
_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
_unwritten_bytes.store(0, butil::memory_order_relaxed);
_keepalive_options = options.keepalive_options;
_tcp_user_timeout_ms = options.tcp_user_timeout_ms;
CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
_is_write_shutdown = false;
int fd = options.fd;
Expand Down Expand Up @@ -1388,7 +1399,7 @@ int Socket::CheckConnected(int sockfd) {
butil::EndPoint local_point;
CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
LOG(INFO) << "Connected to " << remote_side()
<< " via fd=" << (int)sockfd << " SocketId=" << id()
<< " via fd=" << sockfd << " SocketId=" << id()
<< " local_side=" << local_point;
}

Expand Down Expand Up @@ -2501,6 +2512,16 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
#endif
}

#if defined(OS_LINUX)
{
int tcp_user_timeout = 0;
socklen_t len = sizeof(tcp_user_timeout);
if (getsockopt(fd, SOL_TCP, TCP_USER_TIMEOUT, &tcp_user_timeout, &len) == 0) {
os << "\ntcp_user_timeout=" << tcp_user_timeout;
}
}
#endif

#if defined(OS_MACOSX)
struct tcp_connection_info ti;
socklen_t len = sizeof(ti);
Expand Down
52 changes: 29 additions & 23 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,60 +234,57 @@ struct SocketSSLContext {
};

struct SocketKeepaliveOptions {
SocketKeepaliveOptions()
: keepalive_idle_s(-1)
, keepalive_interval_s(-1)
, keepalive_count(-1)
{}
// Start keeplives after this period.
int keepalive_idle_s;
int keepalive_idle_s{-1};
// Interval between keepalives.
int keepalive_interval_s;
int keepalive_interval_s{-1};
// Number of keepalives before death.
int keepalive_count;
int keepalive_count{-1};
};

// TODO: Comment fields
struct SocketOptions {
SocketOptions();

// If `fd' is non-negative, set `fd' to be non-blocking and take the
// ownership. Socket will close the fd(if needed) and call
// user->BeforeRecycle() before recycling.
int fd;
int fd{-1};
butil::EndPoint remote_side;
// If `connect_on_create' is true and `fd' is less than 0,
// a client connection will be established to remote_side()
// regarding deadline `connect_abstime' when Socket is being created.
// Default: false, means that a connection will be established
// on first write.
bool connect_on_create;
bool connect_on_create{false};
// Default: NULL, means no timeout.
const timespec* connect_abstime;
SocketUser* user;
const timespec* connect_abstime{NULL};
SocketUser* user{NULL};
// When *edge-triggered* events happen on the file descriptor, callback
// `on_edge_triggered_events' will be called. Inside the callback, user
// shall read fd() in non-blocking mode until all data has been read
// or EAGAIN is met, otherwise the callback will not be called again
// until new data arrives. The callback will not be called from more than
// one thread at any time.
void (*on_edge_triggered_events)(Socket*);
int health_check_interval_s;
void (*on_edge_triggered_events)(Socket*){NULL};
int health_check_interval_s{-1};
// Only accept ssl connection.
bool force_ssl;
bool force_ssl{false};
std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
bool use_rdma;
bthread_keytable_pool_t* keytable_pool;
SocketConnection* conn;
bool use_rdma{false};
bthread_keytable_pool_t* keytable_pool{NULL};
SocketConnection* conn{NULL};
std::shared_ptr<AppConnect> app_connect;
// The created socket will set parsing_context with this value.
Destroyable* initial_parsing_context;
Destroyable* initial_parsing_context{NULL};

// Socket keepalive related options.
// Refer to `SocketKeepaliveOptions' for details.
std::shared_ptr<SocketKeepaliveOptions> keepalive_options;
// https://github.com/apache/brpc/issues/1154
// https://github.com/grpc/grpc/pull/16419/files
// Only linux supports TCP_USER_TIMEOUT.
int tcp_user_timeout_ms{ -1};
// Tag of this socket
bthread_tag_t bthread_tag;
bthread_tag_t bthread_tag{BTHREAD_TAG_DEFAULT};
};

// Abstractions on reading from and writing into file descriptors.
Expand Down Expand Up @@ -725,7 +722,7 @@ friend void DereferenceSocket(Socket*);

int ResetFileDescriptor(int fd);

void EnableKeepaliveIfNeeded(int fd);
void SetSocketOptions(int fd);

// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);
Expand Down Expand Up @@ -973,6 +970,15 @@ friend void DereferenceSocket(Socket*);
// non-NULL means that keepalive is on.
std::shared_ptr<SocketKeepaliveOptions> _keepalive_options;

// Only linux supports TCP_USER_TIMEOUT.
// When the value is greater than 0, it specifies the maximum
// amount of time in milliseconds that transmitted data may
// remain unacknowledged, or bufferred data may remain
// untransmitted (due to zero window size) before TCP will
// forcibly close the corresponding connection and return
// ETIMEDOUT to the application.
int _tcp_user_timeout_ms;

HttpMethod _http_request_method;
};

Expand Down
15 changes: 0 additions & 15 deletions src/brpc/socket_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,6 @@

namespace brpc {

inline SocketOptions::SocketOptions()
: fd(-1)
, connect_on_create(false)
, connect_abstime(NULL)
, user(NULL)
, on_edge_triggered_events(NULL)
, health_check_interval_s(-1)
, force_ssl(false)
, use_rdma(false)
, keytable_pool(NULL)
, conn(NULL)
, app_connect(NULL)
, initial_parsing_context(NULL)
, bthread_tag(BTHREAD_TAG_DEFAULT) {}

inline bool Socket::MoreReadEvents(int* progress) {
// Fail to CAS means that new events arrived.
return !_nevent.compare_exchange_strong(
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/versioned_ref_with_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class VersionedRefWithId {
// Create a VersionedRefWithId, put the identifier into `id'.
// `args' will be passed to OnCreated() directly.
// Returns 0 on success, -1 otherwise.
template<typename ... Args>
template<typename... Args>
static int Create(VRefId* id, Args&&... args);

// Place the VersionedRefWithId associated with identifier `id' into
Expand Down Expand Up @@ -350,7 +350,7 @@ void DereferenceVersionedRefWithId(T* r) {
}

template <typename T>
template<typename ... Args>
template<typename... Args>
int VersionedRefWithId<T>::Create(VRefId* id, Args&&... args) {
resource_id_t slot;
T* const t = butil::get_resource(&slot, Forbidden());
Expand Down
Loading

0 comments on commit 47c1228

Please sign in to comment.