Skip to content

[DO NOT MERGE] reproduce backup error #11966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions fdbclient/S3BlobStore.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

#include "fdbclient/ClientKnobs.h"
#include "fdbclient/Knobs.h"
#include "flow/Error.h"
#include "flow/IConnection.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/genericactors.actor.h"
Expand Down Expand Up @@ -1005,8 +1007,9 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
state int maxTries = std::min(bstore->knobs.request_tries, bstore->knobs.connect_tries);
state int thisTry = 1;
state int badRequestCode = 400;
state bool s3TokenError = false;
state bool s3TokenError = true;
state double nextRetryDelay = 2.0;
state bool hasDoneError = false;

loop {
state Optional<Error> err;
Expand Down Expand Up @@ -1083,8 +1086,13 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
.detail("V4", CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER)
.log();
wait(bstore->requestRate->getAllowance(1));
Future<Reference<HTTP::IncomingResponse>> dryrunResponse = HTTP::doRequest(
rconn.conn, dryrunRequest, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate);
Future<Reference<HTTP::IncomingResponse>> dryrunResponse =
HTTP::doRequest(rconn.conn,
dryrunRequest,
bstore->sendRate,
&bstore->s_stats.bytes_sent,
bstore->recvRate,
&hasDoneError);
Reference<HTTP::IncomingResponse> _dryrunR = wait(timeoutError(dryrunResponse, requestTimeout));
dryrunR = _dryrunR;
std::string s3Error = parseErrorCodeFromS3(dryrunR->data.content);
Expand Down Expand Up @@ -1114,7 +1122,8 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
}
} catch (Error& e) {
// retry with GET failed, but continue to do original request anyway
TraceEvent(SevError, "ErrorDuringRetryS3TokenIssue").errorUnsuppressed(e);
TraceEvent(SevWarn, "ErrorDuringRetryS3TokenIssue").errorUnsuppressed(e);
// continue;
}
setHeaders(bstore, req);
req->resource = getCanonicalURI(bstore, req);
Expand Down Expand Up @@ -1153,7 +1162,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
if (e.code() == error_code_actor_cancelled)
throw;
// TODO: should this also do rconn.conn.clear()? (would need to extend lifetime outside of try block)
err = e;
err = e; // ROOT CAUSE: If there is any error happens to HTTP::doRequest, we do not return the conn.
}

double end = g_network->timer();
Expand Down
10 changes: 6 additions & 4 deletions fdbrpc/HTTP.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequestActor(Reference<IConnec
Reference<OutgoingRequest> request,
Reference<IRateControl> sendRate,
int64_t* pSent,
Reference<IRateControl> recvRate) {
Reference<IRateControl> recvRate,
bool* hasDoneError = nullptr) {
state TraceEvent event(SevDebug, "HTTPRequest");

// There is no standard http request id header field, so either a global default can be set via a knob
Expand Down Expand Up @@ -650,7 +651,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequestActor(Reference<IConnec
trySend = deterministicRandom()->randomInt(1, 10);
}
wait(sendRate->getAllowance(trySend));
int len = conn->write(request->data.content->getUnsent(), trySend);
int len = conn->write(request->data.content->getUnsent(), trySend, hasDoneError);
if (pSent != nullptr)
*pSent += len;
sendRate->returnUnused(trySend - len);
Expand Down Expand Up @@ -757,8 +758,9 @@ Future<Reference<IncomingResponse>> doRequest(Reference<IConnection> conn,
Reference<OutgoingRequest> request,
Reference<IRateControl> sendRate,
int64_t* pSent,
Reference<IRateControl> recvRate) {
return doRequestActor(conn, request, sendRate, pSent, recvRate);
Reference<IRateControl> recvRate,
bool* hasDoneError) {
return doRequestActor(conn, request, sendRate, pSent, recvRate, hasDoneError);
}

ACTOR Future<Void> sendProxyConnectRequest(Reference<IConnection> conn,
Expand Down
6 changes: 3 additions & 3 deletions fdbrpc/HTTPServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doHelloWorldReq(Reference<IConne
pw.serializeBytes(hello);

Reference<HTTP::IncomingResponse> response =
wait(timeoutError(HTTP::doRequest(conn, req, sendReceiveRate, &bytes_sent, sendReceiveRate), 30.0));
wait(timeoutError(HTTP::doRequest(conn, req, sendReceiveRate, &bytes_sent, sendReceiveRate, nullptr), 30.0));

std::string expectedContent = "Hello World Response!";

Expand Down Expand Up @@ -367,7 +367,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doHelloWorldErrorReq(Reference<I
req->data.contentLen = 0;

Reference<HTTP::IncomingResponse> response =
wait(timeoutError(HTTP::doRequest(conn, req, sendReceiveRate, &bytes_sent, sendReceiveRate), 30.0));
wait(timeoutError(HTTP::doRequest(conn, req, sendReceiveRate, &bytes_sent, sendReceiveRate, nullptr), 30.0));

ASSERT(response->code == 500);

Expand All @@ -393,7 +393,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doHelloBadMD5Req(Reference<IConn
pw.serializeBytes(hello);

Reference<HTTP::IncomingResponse> response =
wait(timeoutError(HTTP::doRequest(conn, req, sendReceiveRate, &bytes_sent, sendReceiveRate), 30.0));
wait(timeoutError(HTTP::doRequest(conn, req, sendReceiveRate, &bytes_sent, sendReceiveRate, nullptr), 30.0));

// should have gotten error
ASSERT(false);
Expand Down
2 changes: 1 addition & 1 deletion fdbrpc/SimExternalConnection.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int SimExternalConnection::read(uint8_t* begin, uint8_t* end) {
return toRead;
}

int SimExternalConnection::write(SendBuffer const* buffer, int limit) {
int SimExternalConnection::write(SendBuffer const* buffer, int limit, bool* hasDoneError) {
boost::system::error_code err;
bool triggerReaders = (socket.available() == 0);
int bytesSent = socket.write_some(
Expand Down
3 changes: 2 additions & 1 deletion fdbrpc/include/fdbrpc/HTTP.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ Future<Reference<IncomingResponse>> doRequest(Reference<IConnection> conn,
Reference<OutgoingRequest> request,
Reference<IRateControl> sendRate,
int64_t* pSent,
Reference<IRateControl> recvRate);
Reference<IRateControl> recvRate,
bool* hasDoneError = nullptr);

// Connect to proxy, send CONNECT command, and connect to the remote host.
Future<Reference<IConnection>> proxyConnect(const std::string& remoteHost,
Expand Down
2 changes: 1 addition & 1 deletion fdbrpc/include/fdbrpc/SimExternalConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SimExternalConnection final : public IConnection, public ReferenceCounted<
Future<Void> onWritable() override;
Future<Void> onReadable() override;
int read(uint8_t* begin, uint8_t* end) override;
int write(SendBuffer const* buffer, int limit) override;
int write(SendBuffer const* buffer, int limit, bool* hasDoneError = nullptr) override;
NetworkAddress getPeerAddress() const override;
bool hasTrustedPeer() const override;
UID getDebugID() const override;
Expand Down
2 changes: 1 addition & 1 deletion fdbrpc/sim2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {

// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of
// bytes written (might be 0) (or may throw an error if the connection dies)
int write(SendBuffer const* buffer, int limit) override {
int write(SendBuffer const* buffer, int limit, bool* hasDoneError = nullptr) override {
rollRandomClose();
ASSERT(limit > 0);

Expand Down
16 changes: 13 additions & 3 deletions flow/Net2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,18 @@ class Connection final : public IConnection, ReferenceCounted<Connection> {

// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of
// bytes written (might be 0)
int write(SendBuffer const* data, int limit) override {
int write(SendBuffer const* data, int limit, bool* hasDoneError = nullptr) override {
boost::system::error_code err;
++g_net2->countWrites;

size_t sent = socket.write_some(
boost::iterator_range<SendBufferIterator>(SendBufferIterator(data, limit), SendBufferIterator()), err);

if (hasDoneError != nullptr && !(*hasDoneError)) {
*hasDoneError = true;
err = boost::system::error_code(boost::system::errc::operation_canceled, boost::system::generic_category());
}

if (err) {
// Since there was an error, sent's value can't be used to infer that the buffer has data and the limit is
// positive so check explicitly.
Expand Down Expand Up @@ -1124,7 +1129,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>

// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of
// bytes written (might be 0)
int write(SendBuffer const* data, int limit) override {
int write(SendBuffer const* data, int limit, bool* hasDoneError = nullptr) override {
#ifdef __APPLE__
// For some reason, writing ssl_sock with more than 2016 bytes when socket is writeable sometimes results in a
// broken pipe error.
Expand All @@ -1136,6 +1141,11 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
size_t sent = ssl_sock.write_some(
boost::iterator_range<SendBufferIterator>(SendBufferIterator(data, limit), SendBufferIterator()), err);

if (hasDoneError != nullptr && !(*hasDoneError)) {
*hasDoneError = true;
err = boost::system::error_code(boost::system::errc::operation_canceled, boost::system::generic_category());
}

if (err) {
// Since there was an error, sent's value can't be used to infer that the buffer has data and the limit is
// positive so check explicitly.
Expand Down Expand Up @@ -2296,7 +2306,7 @@ TEST_CASE("noSim/flow/Net2/onMainThreadFIFO") {
return Void();
}

void net2_test(){
void net2_test() {
/*
g_network = newNet2(); // for promise serialization below

Expand Down
4 changes: 3 additions & 1 deletion flow/include/flow/IConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class IConnection {
// entire byte set that the caller was attempting to write even if it is unable to write all of it immediately. Due
// to limitations of TLSConnection, callers must also avoid reallocations that reduce the amount of written data in
// the first buffer in the chain.
virtual int write(SendBuffer const* buffer, int limit = std::numeric_limits<int>::max()) = 0;
virtual int write(SendBuffer const* buffer,
int limit = std::numeric_limits<int>::max(),
bool* hasDoneError = nullptr) = 0;

// Returns the network address and port of the other end of the connection. In the case of an incoming connection,
// this may not be an address we can connect to!
Expand Down