diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 44b981e88fd29..c14c0491c2c41 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -104,7 +104,12 @@ std::unique_ptr CreateHashFunction( } std::unique_ptr CreateHashValidator( + google::storage::v2::ReadObjectRequest const& request, Options const& options) { + auto const is_ranged_read = + request.read_limit() != 0 || request.read_offset() != 0; + if (is_ranged_read) return storage::internal::CreateNullHashValidator(); + auto const enable_crc32c = options.get(); auto const enable_md5 = @@ -192,7 +197,7 @@ AsyncConnectionImpl::ReadObject(ReadObjectParams p) { auto hash_function = std::make_shared( CreateHashFunction(*current)); - auto hash_validator = CreateHashValidator(*current); + auto hash_validator = CreateHashValidator(p.request, *current); auto connection_factory = MakeReaderConnectionFactory( std::move(current), std::move(p.request), hash_function); diff --git a/google/cloud/storage/internal/async/connection_impl_read_hash_test.cc b/google/cloud/storage/internal/async/connection_impl_read_hash_test.cc index 2bb56f8583edf..d0816c50fbff4 100644 --- a/google/cloud/storage/internal/async/connection_impl_read_hash_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_read_hash_test.cc @@ -40,6 +40,7 @@ namespace { using ::google::cloud::storage::testing::MockAsyncObjectMediaStream; using ::google::cloud::testing_util::AsyncSequencer; +using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; using ::testing::_; @@ -315,6 +316,176 @@ TEST_P(AsyncConnectionImplReadHashTest, ValidateFullChecksums) { VariantWith(StatusIs(param.expected_status_code))); } +TEST_P(AsyncConnectionImplReadHashTest, ReadFromOffsetNoValidate) { + auto const& param = GetParam(); + + AsyncSequencer sequencer; + auto mock = std::make_shared(); + EXPECT_CALL(*mock, AsyncReadObject).WillOnce([&] { + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce([&] { + return sequencer.PushBack("Start"); + }); + EXPECT_CALL(*stream, Read) + .WillOnce([&] { + return sequencer.PushBack("Read").then([&](auto) { + auto response = google::storage::v2::ReadObjectResponse{}; + auto constexpr kObject = R"pb( + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + generation: 123456 + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString( + kObject, response.mutable_metadata())); + SetMutableContent(*response.mutable_checksummed_data(), + ContentType(kQuickFox)); + response.mutable_checksummed_data()->set_crc32c( + kQuickFoxCrc32cChecksum); + *response.mutable_object_checksums() = + GeneratedObjectChecksums(param); + return absl::make_optional(response); + }); + }) + .WillOnce([&] { + return sequencer.PushBack("Read").then([](auto) { + return absl::optional{}; + }); + }); + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); + }); + return std::unique_ptr(std::move(stream)); + }); + + auto options = + DefaultOptionsAsync(param.options) + .set(1) + .set(std::chrono::seconds(0)); + internal::AutomaticallyCreatedBackgroundThreads pool(1); + auto connection = + MakeAsyncConnection(pool.cq(), std::move(mock), std::move(options)); + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_read_offset(1); + request.set_read_limit(0); + auto pending = + connection->ReadObject({std::move(request), connection->options()}); + + ASSERT_TRUE(pending.is_ready()); + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto reader = *std::move(r); + auto data = reader->Read(); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + // We expect the first `Read()` to return data, and the second to indicate the + // end of the stream. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read"); + next.first.set_value(true); + EXPECT_THAT(data.get(), VariantWith(_)); + + // The last Read() triggers the end of stream message, including a call to + // `Finish()`. It should detect the invalid checksum. + data = reader->Read(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read"); + next.first.set_value(true); + // The stream Finish() function should be called in the background. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); + + EXPECT_THAT(data.get(), VariantWith(IsOk())); +} + +TEST_P(AsyncConnectionImplReadHashTest, PartialReadNoValidate) { + auto const& param = GetParam(); + + AsyncSequencer sequencer; + auto mock = std::make_shared(); + EXPECT_CALL(*mock, AsyncReadObject).WillOnce([&] { + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce([&] { + return sequencer.PushBack("Start"); + }); + EXPECT_CALL(*stream, Read) + .WillOnce([&] { + return sequencer.PushBack("Read").then([&](auto) { + auto response = google::storage::v2::ReadObjectResponse{}; + auto constexpr kObject = R"pb( + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + generation: 123456 + )pb"; + EXPECT_TRUE(TextFormat::ParseFromString( + kObject, response.mutable_metadata())); + SetMutableContent(*response.mutable_checksummed_data(), + ContentType(kQuickFox)); + response.mutable_checksummed_data()->set_crc32c( + kQuickFoxCrc32cChecksum); + *response.mutable_object_checksums() = + GeneratedObjectChecksums(param); + return absl::make_optional(response); + }); + }) + .WillOnce([&] { + return sequencer.PushBack("Read").then([](auto) { + return absl::optional{}; + }); + }); + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); + }); + return std::unique_ptr(std::move(stream)); + }); + + auto options = + DefaultOptionsAsync(param.options) + .set(1) + .set(std::chrono::seconds(0)); + internal::AutomaticallyCreatedBackgroundThreads pool(1); + auto connection = + MakeAsyncConnection(pool.cq(), std::move(mock), std::move(options)); + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_read_offset(0); + request.set_read_limit(1); + auto pending = + connection->ReadObject({std::move(request), connection->options()}); + + ASSERT_TRUE(pending.is_ready()); + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto reader = *std::move(r); + auto data = reader->Read(); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + // We expect the first `Read()` to return data, and the second to indicate the + // end of the stream. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read"); + next.first.set_value(true); + EXPECT_THAT(data.get(), VariantWith(_)); + + // The last Read() triggers the end of stream message, including a call to + // `Finish()`. It should detect the invalid checksum. + data = reader->Read(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read"); + next.first.set_value(true); + // The stream Finish() function should be called in the background. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); + + EXPECT_THAT(data.get(), VariantWith(IsOk())); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 379a482dd8393..2d473837b2485 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,7 +17,9 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" +#include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" +#include "google/cloud/grpc_options.h" #include "google/cloud/internal/getenv.h" #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/status_matchers.h" @@ -68,15 +70,21 @@ class AsyncClientIntegrationTest std::string bucket_name_; }; +auto TestOptions() { + // Disable metrics in the test, they just make the logs harder to grok. + return Options{} + .set(false) + .set(1); +} + auto AlwaysRetry() { - return Options{}.set( + return TestOptions().set( MakeAlwaysRetryIdempotencyPolicy); } TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { - auto client = MakeIntegrationTestClient(); + auto async = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); - auto async = AsyncClient(); auto insert = async .InsertObject(BucketName(bucket_name()), object_name, @@ -85,12 +93,16 @@ TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { ASSERT_STATUS_OK(insert); ScheduleForDelete(*insert); - auto pending0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 0, LoremIpsum().size()); - auto pending1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 0, LoremIpsum().size()); + auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); + auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); - for (auto* p : {&pending1, &pending0}) { + for (auto* p : {&full1, &full0}) { auto response = p->get(); ASSERT_STATUS_OK(response); auto contents = response->contents(); @@ -101,22 +113,34 @@ TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { }); EXPECT_EQ(full, LoremIpsum()); } + for (auto* p : {&partial1, &partial0}) { + auto response = p->get(); + ASSERT_STATUS_OK(response); + auto contents = response->contents(); + auto const partial = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(partial, LoremIpsum().substr(2)); + } + auto status = async .DeleteObject(BucketName(bucket_name()), object_name, insert->generation()) .get(); EXPECT_STATUS_OK(status); + auto client = MakeIntegrationTestClient(); auto get = client.GetObjectMetadata(bucket_name(), object_name); EXPECT_THAT(get, StatusIs(StatusCode::kNotFound)); } TEST_F(AsyncClientIntegrationTest, ComposeObject) { - auto client = MakeIntegrationTestClient(); + auto async = AsyncClient(TestOptions()); auto o1 = MakeRandomObjectName(); auto o2 = MakeRandomObjectName(); auto destination = MakeRandomObjectName(); - auto async = AsyncClient(); auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), AlwaysRetry()); @@ -159,6 +183,7 @@ TEST_F(AsyncClientIntegrationTest, ComposeObject) { } TEST_F(AsyncClientIntegrationTest, StreamingRead) { + auto async = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); // Create a relatively large object so the streaming read makes sense. We // aim for something around 5MiB, enough for 3 `Read()` calls. @@ -173,10 +198,10 @@ TEST_F(AsyncClientIntegrationTest, StreamingRead) { insert_data.begin(), insert_data.end(), static_cast(0), [](auto a, auto const& b) { return a + b.size(); }); - auto async = AsyncClient(); - auto insert = - async.InsertObject(BucketName(bucket_name()), object_name, insert_data) - .get(); + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + insert_data, AlwaysRetry()) + .get(); ASSERT_STATUS_OK(insert); ScheduleForDelete(*insert); @@ -206,10 +231,56 @@ TEST_F(AsyncClientIntegrationTest, StreamingRead) { EXPECT_EQ(view, absl::string_view{}); } +TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a relatively large object so the streaming read makes sense. We + // aim for something around 5MiB, enough for 3 `Read()` calls. + auto constexpr kLineSize = 64; + auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; + auto constexpr kReadOffset = kLineCount * kLineSize / 2; + auto const block = MakeRandomData(kLineSize - 1) + "\n"; + std::string contents; + for (int i = 0; i != kLineCount; ++i) contents += block; + auto const expected_insert_size = contents.size(); + + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + contents, AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + ASSERT_EQ(insert->size(), expected_insert_size); + + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(insert->bucket()); + request.set_object(insert->name()); + request.set_generation(insert->generation()); + request.set_read_offset(kReadOffset); + auto r = async.ReadObject(request).get(); + ASSERT_STATUS_OK(r); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(r); + + std::string actual; + while (token.valid()) { + auto p = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(p); + ReadPayload payload; + std::tie(payload, token) = *std::move(p); + for (auto v : payload.contents()) actual += std::string(v); + } + + EXPECT_EQ(absl::string_view(actual), + absl::string_view(contents).substr(kReadOffset)); +} + TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); - auto client = AsyncClient(); auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) .get(); ASSERT_STATUS_OK(w); @@ -227,13 +298,13 @@ TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { } TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); // Create a small block to send over and over. auto constexpr kBlockSize = 256 * 1024; auto constexpr kBlockCount = 16; auto const block = MakeRandomData(kBlockSize); - auto client = AsyncClient(); auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) .get(); ASSERT_STATUS_OK(w); @@ -256,6 +327,7 @@ TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { } TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { + auto client = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); // Create a small block to send over and over. auto constexpr kBlockSize = 256 * 1024; @@ -264,7 +336,6 @@ TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; auto const block = MakeRandomData(kBlockSize); - auto client = AsyncClient(); auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) .get(); ASSERT_STATUS_OK(w); @@ -318,12 +389,12 @@ TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { } TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { + auto client = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); // Create a small block to send over and over. auto constexpr kBlockSize = static_cast(256 * 1024); auto const block = MakeRandomData(kBlockSize); - auto client = AsyncClient(); auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) .get(); ASSERT_STATUS_OK(w); @@ -349,9 +420,9 @@ TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { } TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); - auto client = AsyncClient(); auto w = client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); ASSERT_STATUS_OK(w); @@ -369,13 +440,13 @@ TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { } TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); auto object_name = MakeRandomObjectName(); // Create a small block to send over and over. auto constexpr kBlockSize = 256 * 1024; auto constexpr kBlockCount = 16; auto const block = MakeRandomData(kBlockSize); - auto client = AsyncClient(); auto w = client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); ASSERT_STATUS_OK(w); @@ -398,10 +469,10 @@ TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { } TEST_F(AsyncClientIntegrationTest, RewriteObject) { + auto async = AsyncClient(TestOptions()); auto o1 = MakeRandomObjectName(); auto o2 = MakeRandomObjectName(); - auto async = AsyncClient(); auto constexpr kBlockSize = 4 * 1024 * 1024; auto insert = async .InsertObject(BucketName(bucket_name()), o1, @@ -439,11 +510,11 @@ TEST_F(AsyncClientIntegrationTest, RewriteObject) { } TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { + auto async = AsyncClient(TestOptions()); auto destination = GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); if (!destination || destination->empty()) GTEST_SKIP(); - auto async = AsyncClient(); auto constexpr kBlockSize = 4 * 1024 * 1024; auto source = async