Skip to content

Commit

Permalink
fix(GCS+gRPC): handle offsets in Hash*::Update() (#14567)
Browse files Browse the repository at this point in the history
When reading data from GCS we don't necessarily know the initial offset.
Reading the last N bytes is expressed as a negative offset in the
request, and the returned offset is not known until the first message.
But once the offset is known, receiving out of order data indicates a
bug in the client library, and should be detected.

The code was assuming the initial offset was always 0.
  • Loading branch information
coryan authored Jul 25, 2024
1 parent ad1d045 commit ee21b70
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 69 deletions.
21 changes: 11 additions & 10 deletions google/cloud/storage/internal/hash_function_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using ::google::cloud::storage_internal::ExtendCrc32c;
template <typename Buffer>
bool AlreadyHashed(std::int64_t offset, Buffer const& buffer,
std::int64_t minimum_offset) {
// TODO(#14566) - maybe this is more forgiving than we want to be
auto const end = offset + buffer.size();
return static_cast<std::int64_t>(end) <= minimum_offset;
}
Expand Down Expand Up @@ -96,9 +97,9 @@ HashValues CompositeFunction::Finish() {
}

Status MD5HashFunction::Update(std::int64_t offset, absl::string_view buffer) {
if (offset == minimum_offset_) {
if (offset == minimum_offset_ || minimum_offset_ == 0) {
Update(buffer);
minimum_offset_ += buffer.size();
minimum_offset_ = offset + buffer.size();
return {};
}
if (AlreadyHashed(offset, buffer, minimum_offset_)) return {};
Expand All @@ -112,11 +113,11 @@ Status MD5HashFunction::Update(std::int64_t offset, absl::string_view buffer,

Status MD5HashFunction::Update(std::int64_t offset, absl::Cord const& buffer,
std::uint32_t /*buffer_crc*/) {
if (offset == minimum_offset_) {
if (offset == minimum_offset_ || minimum_offset_ == 0) {
for (auto i = buffer.chunk_begin(); i != buffer.chunk_end(); ++i) {
Update(*i);
}
minimum_offset_ += buffer.size();
minimum_offset_ = offset + buffer.size();
return {};
}
if (AlreadyHashed(offset, buffer, minimum_offset_)) return {};
Expand All @@ -136,9 +137,9 @@ void Crc32cHashFunction::Update(absl::string_view buffer) {

Status Crc32cHashFunction::Update(std::int64_t offset,
absl::string_view buffer) {
if (offset == minimum_offset_) {
if (offset == minimum_offset_ || minimum_offset_ == 0) {
Update(buffer);
minimum_offset_ += buffer.size();
minimum_offset_ = offset + buffer.size();
return {};
}
if (AlreadyHashed(offset, buffer, minimum_offset_)) return {};
Expand All @@ -147,9 +148,9 @@ Status Crc32cHashFunction::Update(std::int64_t offset,

Status Crc32cHashFunction::Update(std::int64_t offset, absl::string_view buffer,
std::uint32_t buffer_crc) {
if (offset == minimum_offset_) {
if (offset == minimum_offset_ || minimum_offset_ == 0) {
current_ = ExtendCrc32c(current_, buffer, buffer_crc);
minimum_offset_ += buffer.size();
minimum_offset_ = offset + buffer.size();
return {};
}
if (AlreadyHashed(offset, buffer, minimum_offset_)) return {};
Expand All @@ -158,9 +159,9 @@ Status Crc32cHashFunction::Update(std::int64_t offset, absl::string_view buffer,

Status Crc32cHashFunction::Update(std::int64_t offset, absl::Cord const& buffer,
std::uint32_t buffer_crc) {
if (offset == minimum_offset_) {
if (offset == minimum_offset_ || minimum_offset_ == 0) {
current_ = ExtendCrc32c(current_, buffer, buffer_crc);
minimum_offset_ += buffer.size();
minimum_offset_ = offset + buffer.size();
return {};
}
if (AlreadyHashed(offset, buffer, minimum_offset_)) return {};
Expand Down
186 changes: 127 additions & 59 deletions google/cloud/storage/internal/hash_function_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,44 +102,88 @@ TEST(HashFunctionImplTest, Crc32cStringView) {
EXPECT_THAT(actual.md5, IsEmpty());
}

TEST(HashFunctionImplTest, Crc32cStringViewWithCrc) {
TEST(HashFunctionImplTest, Crc32cStringViewOffsetJumps) {
Crc32cHashFunction function;
auto payload = absl::string_view{kQuickFox};
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.substr(pos, 5);
auto const message_crc = storage_internal::Crc32c(message);
EXPECT_STATUS_OK(function.Update(pos, message, message_crc));
EXPECT_STATUS_OK(function.Update(pos, message, message_crc));
EXPECT_THAT(function.Update(pos, payload, message_crc),
StatusIs(StatusCode::kInvalidArgument));
}
auto actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());
EXPECT_STATUS_OK(function.Update(1024, payload));
EXPECT_STATUS_OK(function.Update(1024 + payload.size(), payload));
EXPECT_THAT(function.Update(2048, payload),
StatusIs(StatusCode::kInvalidArgument));
EXPECT_THAT(function.Update(2048, payload, storage_internal::Crc32c(payload)),
StatusIs(StatusCode::kInvalidArgument));
}

actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());
TEST(HashFunctionImplTest, Crc32cStringViewOffsetRewinds) {
Crc32cHashFunction function;
auto payload = absl::string_view{kQuickFox};
EXPECT_STATUS_OK(function.Update(2048, payload));
EXPECT_STATUS_OK(function.Update(1024, payload));
EXPECT_STATUS_OK(
function.Update(1024, payload, storage_internal::Crc32c(payload)));
}

TEST(HashFunctionImplTest, Crc32cStringViewWithCrc) {
for (auto const offset : {0, 1024, 10240}) {
SCOPED_TRACE("Testing with offset: " + std::to_string(offset));
Crc32cHashFunction function;
auto payload = absl::string_view{kQuickFox};
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.substr(pos, 5);
auto const message_crc = storage_internal::Crc32c(message);
EXPECT_STATUS_OK(function.Update(offset + pos, message, message_crc));
EXPECT_STATUS_OK(function.Update(offset + pos, message, message_crc));
EXPECT_THAT(function.Update(offset + pos, payload, message_crc),
StatusIs(StatusCode::kInvalidArgument));
}
auto actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());

actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());
}
}

TEST(HashFunctionImplTest, Crc32cCord) {
for (auto const offset : {0, 1024, 10240}) {
SCOPED_TRACE("Testing with offset: " + std::to_string(offset));
Crc32cHashFunction function;
auto payload = absl::Cord(absl::string_view{kQuickFox});
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.Subcord(pos, 5);
auto const message_crc = storage_internal::Crc32c(message);
EXPECT_STATUS_OK(function.Update(offset + pos, message, message_crc));
EXPECT_STATUS_OK(function.Update(offset + pos, message, message_crc));
EXPECT_THAT(function.Update(offset + pos, payload, message_crc),
StatusIs(StatusCode::kInvalidArgument));
}
auto actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());

actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());
}
}

TEST(HashFunctionImplTest, Crc32cCordOffsetJumps) {
Crc32cHashFunction function;
auto payload = absl::Cord(absl::string_view{kQuickFox});
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.Subcord(pos, 5);
auto const message_crc = storage_internal::Crc32c(message);
EXPECT_STATUS_OK(function.Update(pos, message, message_crc));
EXPECT_STATUS_OK(function.Update(pos, message, message_crc));
EXPECT_THAT(function.Update(pos, payload, message_crc),
StatusIs(StatusCode::kInvalidArgument));
}
auto actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());
auto const crc = storage_internal::Crc32c(payload);
EXPECT_STATUS_OK(function.Update(1024, payload, crc));
EXPECT_STATUS_OK(function.Update(1024 + payload.size(), payload, crc));
EXPECT_THAT(function.Update(2048, payload, crc),
StatusIs(StatusCode::kInvalidArgument));
}

actual = function.Finish();
EXPECT_EQ(actual.crc32c, kQuickFoxCrc32cChecksum);
EXPECT_THAT(actual.md5, IsEmpty());
TEST(HashFunctionImplTest, Crc32cCordOffsetRewinds) {
Crc32cHashFunction function;
auto payload = absl::Cord(absl::string_view{kQuickFox});
auto const crc = storage_internal::Crc32c(payload);
EXPECT_STATUS_OK(function.Update(2048, payload, crc));
EXPECT_STATUS_OK(function.Update(1024, payload, crc));
}

TEST(HashFunctionImplTest, MD5Empty) {
Expand Down Expand Up @@ -179,43 +223,67 @@ TEST(HashFunctionImplTest, MD5StringView) {
}

TEST(HashFunctionImplTest, MD5StringViewWithCrc) {
auto function = MD5HashFunction::Create();
auto payload = absl::string_view{kQuickFox};
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.substr(pos, 5);
auto const unused = std::uint32_t{0};
EXPECT_STATUS_OK(function->Update(pos, message, unused));
EXPECT_STATUS_OK(function->Update(pos, message, unused));
EXPECT_THAT(function->Update(pos, payload, unused),
StatusIs(StatusCode::kInvalidArgument));
for (auto const offset : {0, 1024, 10240}) {
SCOPED_TRACE("Testing with offset: " + std::to_string(offset));
auto function = MD5HashFunction::Create();
auto payload = absl::string_view{kQuickFox};
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.substr(pos, 5);
auto const unused = std::uint32_t{0};
EXPECT_STATUS_OK(function->Update(offset + pos, message, unused));
EXPECT_STATUS_OK(function->Update(offset + pos, message, unused));
EXPECT_THAT(function->Update(offset + pos, payload, unused),
StatusIs(StatusCode::kInvalidArgument));
}
auto actual = function->Finish();
EXPECT_THAT(actual.crc32c, IsEmpty());
EXPECT_THAT(actual.md5, kQuickFoxMD5Hash);

actual = function->Finish();
EXPECT_THAT(actual.crc32c, IsEmpty());
EXPECT_THAT(actual.md5, kQuickFoxMD5Hash);
}
auto actual = function->Finish();
EXPECT_THAT(actual.crc32c, IsEmpty());
EXPECT_THAT(actual.md5, kQuickFoxMD5Hash);

actual = function->Finish();
EXPECT_THAT(actual.crc32c, IsEmpty());
EXPECT_THAT(actual.md5, kQuickFoxMD5Hash);
}

TEST(HashFunctionImplTest, MD5Cord) {
for (auto const offset : {0, 1024, 10240}) {
SCOPED_TRACE("Testing with offset: " + std::to_string(offset));
auto function = MD5HashFunction::Create();
auto payload = absl::Cord(absl::string_view{kQuickFox});
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.Subcord(pos, 5);
auto const unused = std::uint32_t{0};
EXPECT_STATUS_OK(function->Update(offset + pos, message, unused));
EXPECT_STATUS_OK(function->Update(offset + pos, message, unused));
EXPECT_THAT(function->Update(offset + pos, payload, unused),
StatusIs(StatusCode::kInvalidArgument));
}
auto const actual = function->Finish();
EXPECT_THAT(actual.crc32c, IsEmpty());
EXPECT_THAT(actual.md5, kQuickFoxMD5Hash);

auto const a2 = function->Finish();
EXPECT_THAT(a2.crc32c, IsEmpty());
EXPECT_THAT(a2.md5, kQuickFoxMD5Hash);
}
}

TEST(HashFunctionImplTest, MD5CordViewOffsetJumps) {
auto function = MD5HashFunction::Create();
auto payload = absl::Cord(absl::string_view{kQuickFox});
for (std::size_t pos = 0; pos < payload.size(); pos += 5) {
auto message = payload.Subcord(pos, 5);
auto const unused = std::uint32_t{0};
EXPECT_STATUS_OK(function->Update(pos, message, unused));
EXPECT_STATUS_OK(function->Update(pos, message, unused));
EXPECT_THAT(function->Update(pos, payload, unused),
StatusIs(StatusCode::kInvalidArgument));
}
auto const actual = function->Finish();
EXPECT_THAT(actual.crc32c, IsEmpty());
EXPECT_THAT(actual.md5, kQuickFoxMD5Hash);
auto const crc = storage_internal::Crc32c(payload);
EXPECT_STATUS_OK(function->Update(1024, payload, crc));
EXPECT_STATUS_OK(function->Update(1024 + payload.size(), payload, crc));
EXPECT_THAT(function->Update(2048, payload, crc),
StatusIs(StatusCode::kInvalidArgument));
}

auto const a2 = function->Finish();
EXPECT_THAT(a2.crc32c, IsEmpty());
EXPECT_THAT(a2.md5, kQuickFoxMD5Hash);
TEST(HashFunctionImplTest, MD5CordOffsetRewinds) {
auto function = MD5HashFunction::Create();
auto payload = absl::Cord(absl::string_view{kQuickFox});
auto const crc = storage_internal::Crc32c(payload);
EXPECT_STATUS_OK(function->Update(2048, payload, crc));
EXPECT_STATUS_OK(function->Update(1024, payload, crc));
}

TEST(HashFunctionImplTest, CompositeEmpty) {
Expand Down

0 comments on commit ee21b70

Please sign in to comment.