Skip to content

Commit

Permalink
cleanup: explicit options for LRO polling loop (#13049)
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan authored Nov 6, 2023
1 parent 6c066ee commit 9b9ab59
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 126 deletions.
4 changes: 2 additions & 2 deletions google/cloud/internal/async_long_running_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ template <typename ReturnType, typename RequestType, typename StartFunctor,
typename RetryPolicyType>
future<StatusOr<ReturnType>> AsyncLongRunningOperation(
google::cloud::CompletionQueue cq, RequestType&& request,
StartFunctor&& start, AsyncPollLongRunningOperation poll,
AsyncCancelLongRunningOperation cancel,
StartFunctor&& start, AsyncPollLongRunningOperationImplicitOptions poll,
AsyncCancelLongRunningOperationImplicitOptions cancel,
LongRunningOperationValueExtractor<ReturnType> value_extractor,
std::unique_ptr<RetryPolicyType> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotent,
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/internal/async_long_running_operation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,16 @@ StartOperation MakeStart(std::shared_ptr<MockStub> const& m) {
};
}

AsyncPollLongRunningOperation MakePoll(std::shared_ptr<MockStub> const& m) {
AsyncPollLongRunningOperationImplicitOptions MakePoll(
std::shared_ptr<MockStub> const& m) {
return [m](CompletionQueue& cq, auto context,
google::longrunning::GetOperationRequest const& request) {
return m->AsyncGetOperation(cq, std::move(context), request);
};
}

AsyncCancelLongRunningOperation MakeCancel(std::shared_ptr<MockStub> const& m) {
AsyncCancelLongRunningOperationImplicitOptions MakeCancel(
std::shared_ptr<MockStub> const& m) {
return [m](CompletionQueue& cq, auto context,
google::longrunning::CancelOperationRequest const& request) {
return m->AsyncCancelOperation(cq, std::move(context), request);
Expand Down
55 changes: 43 additions & 12 deletions google/cloud/internal/async_polling_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ class AsyncPollingLoopImpl
: public std::enable_shared_from_this<AsyncPollingLoopImpl> {
public:
AsyncPollingLoopImpl(google::cloud::CompletionQueue cq,
ImmutableOptions options,
AsyncPollLongRunningOperation poll,
AsyncCancelLongRunningOperation cancel,
std::unique_ptr<PollingPolicy> polling_policy,
std::string location)
: cq_(std::move(cq)),
options_(std::move(options)),
poll_(std::move(poll)),
cancel_(std::move(cancel)),
polling_policy_(std::move(polling_policy)),
Expand Down Expand Up @@ -76,10 +78,11 @@ class AsyncPollingLoopImpl
// Cancels are best effort, so we use weak pointers.
auto w = WeakFromThis();
auto context = std::make_shared<grpc::ClientContext>();
ConfigurePollContext(*context, CurrentOptions());
cancel_(cq_, std::move(context), request).then([w](future<Status> f) {
if (auto self = w.lock()) self->OnCancel(f.get());
});
ConfigurePollContext(*context, *options_);
cancel_(cq_, std::move(context), *options_, request)
.then([w](future<Status> f) {
if (auto self = w.lock()) self->OnCancel(f.get());
});
}

void OnCancel(Status const& status) {
Expand All @@ -88,7 +91,7 @@ class AsyncPollingLoopImpl

void OnStart(StatusOr<Operation> op) {
if (!op) return promise_.set_value(std::move(op));
AddSpanAttribute(CurrentOptions(), "gl-cpp.LRO_name", op->name());
AddSpanAttribute(*options_, "gl-cpp.LRO_name", op->name());
if (op->done()) return promise_.set_value(std::move(op));
GCP_LOG(DEBUG) << location_ << "() polling loop starting for "
<< op->name();
Expand All @@ -107,8 +110,9 @@ class AsyncPollingLoopImpl
GCP_LOG(DEBUG) << location_ << "() polling loop waiting "
<< duration.count() << "ms";
auto self = shared_from_this();
TracedAsyncBackoff(cq_, CurrentOptions(), duration)
.then([self](TimerResult f) { self->OnTimer(std::move(f)); });
TracedAsyncBackoff(cq_, *options_, duration).then([self](TimerResult f) {
self->OnTimer(std::move(f));
});
}

void OnTimer(TimerResult f) {
Expand All @@ -122,8 +126,8 @@ class AsyncPollingLoopImpl
}
auto self = shared_from_this();
auto context = std::make_shared<grpc::ClientContext>();
ConfigurePollContext(*context, CurrentOptions());
poll_(cq_, std::move(context), request)
ConfigurePollContext(*context, *options_);
poll_(cq_, std::move(context), *options_, request)
.then([self](future<StatusOr<Operation>> g) {
self->OnPoll(std::move(g));
});
Expand Down Expand Up @@ -158,6 +162,7 @@ class AsyncPollingLoopImpl
// `Start()`, and then only used from the `On*()` callbacks, which are
// serialized, so they need no external synchronization.
google::cloud::CompletionQueue cq_;
ImmutableOptions options_;
AsyncPollLongRunningOperation poll_;
AsyncCancelLongRunningOperation cancel_;
std::unique_ptr<PollingPolicy> polling_policy_;
Expand All @@ -172,15 +177,41 @@ class AsyncPollingLoopImpl
};

future<StatusOr<Operation>> AsyncPollingLoop(
google::cloud::CompletionQueue cq, future<StatusOr<Operation>> op,
AsyncPollLongRunningOperation poll, AsyncCancelLongRunningOperation cancel,
google::cloud::CompletionQueue cq, ImmutableOptions options,
future<StatusOr<Operation>> op, AsyncPollLongRunningOperation poll,
AsyncCancelLongRunningOperation cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location) {
auto loop = std::make_shared<AsyncPollingLoopImpl>(
std::move(cq), std::move(poll), std::move(cancel),
std::move(cq), std::move(options), std::move(poll), std::move(cancel),
std::move(polling_policy), std::move(location));
return loop->Start(std::move(op));
}

future<StatusOr<google::longrunning::Operation>> AsyncPollingLoop(
google::cloud::CompletionQueue cq, future<StatusOr<Operation>> op,
AsyncPollLongRunningOperationImplicitOptions poll,
AsyncCancelLongRunningOperationImplicitOptions cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location) {
auto poll_wrapper =
[poll = std::move(poll)](
CompletionQueue& cq, std::shared_ptr<grpc::ClientContext> context,
Options const&,
google::longrunning::GetOperationRequest const& request) {
return poll(cq, std::move(context), request);
};
auto cancel_wrapper =
[cancel = std::move(cancel)](
CompletionQueue& cq, std::shared_ptr<grpc::ClientContext> context,
Options const&,
google::longrunning::CancelOperationRequest const& request) {
return cancel(cq, std::move(context), request);
};
return AsyncPollingLoop(std::move(cq), internal::SaveCurrentOptions(),
std::move(op), std::move(poll_wrapper),
std::move(cancel_wrapper), std::move(polling_policy),
std::move(location));
}

} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace cloud
Expand Down
48 changes: 36 additions & 12 deletions google/cloud/internal/async_polling_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "google/cloud/completion_queue.h"
#include "google/cloud/future.h"
#include "google/cloud/options.h"
#include "google/cloud/polling_policy.h"
#include "google/cloud/status_or.h"
#include "google/cloud/version.h"
Expand All @@ -32,11 +33,11 @@ namespace internal {

using AsyncPollLongRunningOperation =
std::function<future<StatusOr<google::longrunning::Operation>>(
CompletionQueue&, std::shared_ptr<grpc::ClientContext>,
CompletionQueue&, std::shared_ptr<grpc::ClientContext>, Options const&,
google::longrunning::GetOperationRequest const&)>;

using AsyncCancelLongRunningOperation = std::function<future<Status>(
CompletionQueue&, std::shared_ptr<grpc::ClientContext>,
CompletionQueue&, std::shared_ptr<grpc::ClientContext>, Options const&,
google::longrunning::CancelOperationRequest const&)>;

/**
Expand Down Expand Up @@ -80,10 +81,12 @@ using AsyncCancelLongRunningOperation = std::function<future<Status>(
* virtual future<StatusOr<google::longrunning::Operation>> AsyncGetOperation(
* google::cloud::CompletionQueue& cq,
* std::shared_ptr<grpc::ClientContext> context,
* Options const& options,
* google::longrunning::GetOperationRequest const& request) = 0;
* virtual future<Status> AsyncCancelOperation(
* google::cloud::CompletionQueue& cq,
* std::shared_ptr<grpc::ClientContext> context,
* Options const& options,
* google::longrunning::CancelOperationRequest const& request) = 0;
* };
* @endcode
Expand All @@ -93,21 +96,22 @@ using AsyncCancelLongRunningOperation = std::function<future<Status>(
* @code
* class BarConnectionImpl : public BarConnection {
* public:
* // Using C++14 for exposition purposes. The implementation supports C++11.
* future<StatusOr<FooResponse>> Foo(FooRequest const& request) override {
* using ::google::longrunning::Operation;
* auto current = google::cloud::internal::SaveCurrentOptions();
* future<StatusOr<Operation>> op = AsyncStart();
*
* return op.then([stub = stub_, cq = cq_, loc = __func__](auto f) {
* StatusOr<Operation> op = f.get();
* if (!op) return make_ready_future(op);
* return op.then([stub = stub_, cq = cq_, current, loc = __func__](auto f)
* { StatusOr<Operation> op = f.get(); if (!op) return make_ready_future(op);
* return AsyncPollingLoop(
* std::move(cq), *std::move(op),
* [stub](auto cq, auto context, auto const& r) {
* return stub->AsyncGetOperation(cq, std::move(context), r);
* std::move(cq), current, *std::move(op),
* [stub](auto cq, auto context, auto const& options, auto const& r) {
* return stub->AsyncGetOperation(
* cq, std::move(context), options, r);
* },
* [stub](auto cq, auto context, auto const& r) {
* return stub->AsyncCancelOperation(cq, std::move(context), r);
* [stub](auto cq, auto context, auto const& options, auto const& r) {
* return stub->AsyncCancelOperation(
* cq, std::move(context), options, r);
* },
* polling_policy_->clone(), loc);
* });
Expand All @@ -122,11 +126,31 @@ using AsyncCancelLongRunningOperation = std::function<future<Status>(
* [aip/151]: https://google.aip.dev/151
*/
future<StatusOr<google::longrunning::Operation>> AsyncPollingLoop(
google::cloud::CompletionQueue cq,
google::cloud::CompletionQueue cq, ImmutableOptions options,
future<StatusOr<google::longrunning::Operation>> op,
AsyncPollLongRunningOperation poll, AsyncCancelLongRunningOperation cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location);

// TODO(#12359) - remove this once it becomes unused
using AsyncPollLongRunningOperationImplicitOptions =
std::function<future<StatusOr<google::longrunning::Operation>>(
CompletionQueue&, std::shared_ptr<grpc::ClientContext>,
google::longrunning::GetOperationRequest const&)>;

// TODO(#12359) - remove this once it becomes unused
using AsyncCancelLongRunningOperationImplicitOptions =
std::function<future<Status>(
CompletionQueue&, std::shared_ptr<grpc::ClientContext>,
google::longrunning::CancelOperationRequest const&)>;

// TODO(#12359) - remove this once it becomes unused
future<StatusOr<google::longrunning::Operation>> AsyncPollingLoop(
google::cloud::CompletionQueue cq,
future<StatusOr<google::longrunning::Operation>> op,
AsyncPollLongRunningOperationImplicitOptions poll,
AsyncCancelLongRunningOperationImplicitOptions cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location);

} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace cloud
Expand Down
Loading

0 comments on commit 9b9ab59

Please sign in to comment.