Skip to content

Commit

Permalink
fix(GCS+gRPC): no hash validation for ranged reads
Browse files Browse the repository at this point in the history
Also cleaned up the integration test a bit, as the number of tests has
grown.
  • Loading branch information
coryan committed Jul 25, 2024
1 parent 00b80da commit c6777fe
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 23 deletions.
7 changes: 6 additions & 1 deletion google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ std::unique_ptr<storage::internal::HashFunction> CreateHashFunction(
}

std::unique_ptr<storage::internal::HashValidator> CreateHashValidator(
google::storage::v2::ReadObjectRequest const& request,
Options const& options) {
auto const is_ranged_read =
request.read_limit() != 0 || request.read_offset() != 0;
if (is_ranged_read) return storage::internal::CreateNullHashValidator();

auto const enable_crc32c =
options.get<storage_experimental::EnableCrc32cValidationOption>();
auto const enable_md5 =
Expand Down Expand Up @@ -192,7 +197,7 @@ AsyncConnectionImpl::ReadObject(ReadObjectParams p) {
auto hash_function =
std::make_shared<storage::internal::Crc32cMessageHashFunction>(
CreateHashFunction(*current));
auto hash_validator = CreateHashValidator(*current);
auto hash_validator = CreateHashValidator(p.request, *current);

auto connection_factory = MakeReaderConnectionFactory(
std::move(current), std::move(p.request), hash_function);
Expand Down
171 changes: 171 additions & 0 deletions google/cloud/storage/internal/async/connection_impl_read_hash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace {

using ::google::cloud::storage::testing::MockAsyncObjectMediaStream;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::IsOk;
using ::google::cloud::testing_util::StatusIs;
using ::google::protobuf::TextFormat;
using ::testing::_;
Expand Down Expand Up @@ -315,6 +316,176 @@ TEST_P(AsyncConnectionImplReadHashTest, ValidateFullChecksums) {
VariantWith<Status>(StatusIs(param.expected_status_code)));
}

TEST_P(AsyncConnectionImplReadHashTest, ReadFromOffsetNoValidate) {
auto const& param = GetParam();

AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();
EXPECT_CALL(*mock, AsyncReadObject).WillOnce([&] {
auto stream = std::make_unique<MockAsyncObjectMediaStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start");
});
EXPECT_CALL(*stream, Read)
.WillOnce([&] {
return sequencer.PushBack("Read").then([&](auto) {
auto response = google::storage::v2::ReadObjectResponse{};
auto constexpr kObject = R"pb(
bucket: "projects/_/buckets/test-bucket"
name: "test-object"
generation: 123456
)pb";
EXPECT_TRUE(TextFormat::ParseFromString(
kObject, response.mutable_metadata()));
SetMutableContent(*response.mutable_checksummed_data(),
ContentType(kQuickFox));
response.mutable_checksummed_data()->set_crc32c(
kQuickFoxCrc32cChecksum);
*response.mutable_object_checksums() =
GeneratedObjectChecksums(param);
return absl::make_optional(response);
});
})
.WillOnce([&] {
return sequencer.PushBack("Read").then([](auto) {
return absl::optional<google::storage::v2::ReadObjectResponse>{};
});
});
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
});
return std::unique_ptr<AsyncReadObjectStream>(std::move(stream));
});

auto options =
DefaultOptionsAsync(param.options)
.set<GrpcNumChannelsOption>(1)
.set<storage::TransferStallTimeoutOption>(std::chrono::seconds(0));
internal::AutomaticallyCreatedBackgroundThreads pool(1);
auto connection =
MakeAsyncConnection(pool.cq(), std::move(mock), std::move(options));
auto request = google::storage::v2::ReadObjectRequest{};
request.set_read_offset(1);
request.set_read_limit(0);
auto pending =
connection->ReadObject({std::move(request), connection->options()});

ASSERT_TRUE(pending.is_ready());
auto r = pending.get();
ASSERT_STATUS_OK(r);
auto reader = *std::move(r);
auto data = reader->Read();

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(true);

// We expect the first `Read()` to return data, and the second to indicate the
// end of the stream.
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read");
next.first.set_value(true);
EXPECT_THAT(data.get(), VariantWith<storage_experimental::ReadPayload>(_));

// The last Read() triggers the end of stream message, including a call to
// `Finish()`. It should detect the invalid checksum.
data = reader->Read();
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read");
next.first.set_value(true);
// The stream Finish() function should be called in the background.
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);

EXPECT_THAT(data.get(), VariantWith<Status>(IsOk()));
}

TEST_P(AsyncConnectionImplReadHashTest, PartialReadNoValidate) {
auto const& param = GetParam();

AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();
EXPECT_CALL(*mock, AsyncReadObject).WillOnce([&] {
auto stream = std::make_unique<MockAsyncObjectMediaStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start");
});
EXPECT_CALL(*stream, Read)
.WillOnce([&] {
return sequencer.PushBack("Read").then([&](auto) {
auto response = google::storage::v2::ReadObjectResponse{};
auto constexpr kObject = R"pb(
bucket: "projects/_/buckets/test-bucket"
name: "test-object"
generation: 123456
)pb";
EXPECT_TRUE(TextFormat::ParseFromString(
kObject, response.mutable_metadata()));
SetMutableContent(*response.mutable_checksummed_data(),
ContentType(kQuickFox));
response.mutable_checksummed_data()->set_crc32c(
kQuickFoxCrc32cChecksum);
*response.mutable_object_checksums() =
GeneratedObjectChecksums(param);
return absl::make_optional(response);
});
})
.WillOnce([&] {
return sequencer.PushBack("Read").then([](auto) {
return absl::optional<google::storage::v2::ReadObjectResponse>{};
});
});
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
});
return std::unique_ptr<AsyncReadObjectStream>(std::move(stream));
});

auto options =
DefaultOptionsAsync(param.options)
.set<GrpcNumChannelsOption>(1)
.set<storage::TransferStallTimeoutOption>(std::chrono::seconds(0));
internal::AutomaticallyCreatedBackgroundThreads pool(1);
auto connection =
MakeAsyncConnection(pool.cq(), std::move(mock), std::move(options));
auto request = google::storage::v2::ReadObjectRequest{};
request.set_read_offset(0);
request.set_read_limit(1);
auto pending =
connection->ReadObject({std::move(request), connection->options()});

ASSERT_TRUE(pending.is_ready());
auto r = pending.get();
ASSERT_STATUS_OK(r);
auto reader = *std::move(r);
auto data = reader->Read();

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(true);

// We expect the first `Read()` to return data, and the second to indicate the
// end of the stream.
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read");
next.first.set_value(true);
EXPECT_THAT(data.get(), VariantWith<storage_experimental::ReadPayload>(_));

// The last Read() triggers the end of stream message, including a call to
// `Finish()`. It should detect the invalid checksum.
data = reader->Read();
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read");
next.first.set_value(true);
// The stream Finish() function should be called in the background.
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);

EXPECT_THAT(data.get(), VariantWith<Status>(IsOk()));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
Loading

0 comments on commit c6777fe

Please sign in to comment.