Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge request POC. #836

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions olp-cpp-sdk-dataservice-read/src/RequestBroker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (C) 2019-2020 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

#include "RequestBroker.h"

namespace olp {
namespace dataservice {
namespace read {

namespace {
std::string UniqueId() {
// TODO: generate a better unique id
static unsigned int id = 0;
return std::to_string(id++);
}
} // namespace

void RequestBroker::RequestContext::AddCallback(CallerId id,
Callback callback) {
callbacks[id] = std::move(callback);
}

void RequestBroker::RequestContext::PropagateResponse(DataResponse response) {
for (auto& callback : callbacks) {
callback.second(response);
}
callbacks.clear();
}

// Return true if the operation was canceled
bool RequestBroker::RequestContext::CancelRequest(CallerId id) {
// Cancel individual request
{
auto callback_it = callbacks.find(id);
if (callback_it != callbacks.end()) {
Callback callback = std::move(callback_it->second);
callback(client::ApiError(client::ErrorCode::Cancelled, "Canceled"));
callbacks.erase(callback_it);
} else {
assert(false);
}
}

const bool cancel_operation = callbacks.empty();

if (cancel_operation) {
cancelation_context.CancelOperation();
}

return cancel_operation;
}

client::CancellationContext
RequestBroker::RequestContext::CancelationContext() {
return cancelation_context;
}

RequestBroker::CreateOrAssociateResult RequestBroker::CreateOrAssociateRequest(
RequestId req_id, Callback callback) {
const CallerId caller_id = UniqueId();
GetOrCreateResult result = GetOrCreateContext(req_id);
result.ctx.AddCallback(caller_id, std::move(callback));
return {result.ctx.CancelationContext(), CancelToken(req_id, caller_id),
result.just_created};
}

DataResponseCallback RequestBroker::ResponseHandler(RequestId req_id) {
return [=](DataResponse response) {
PropagateResponse(req_id, std::move(response));
};
}

RequestBroker::GetOrCreateResult RequestBroker::GetOrCreateContext(
RequestId req_id) {
std::unique_lock<std::mutex> lock(mutex_);

auto request_ctx_it = request_map_.find(req_id);
if (request_ctx_it != request_map_.end()) {
return {request_ctx_it->second, false};
} else {
request_ctx_it =
request_map_.insert(std::make_pair(req_id, RequestContext{})).first;
return {request_ctx_it->second, true};
}
}

void RequestBroker::PropagateResponse(RequestId req_id, DataResponse response) {
std::unique_lock<std::mutex> lock(mutex_);

auto request_ctx_it = request_map_.find(req_id);
if (request_ctx_it == request_map_.end()) {
assert(!response.IsSuccessful()); // Expect cancel here
return;
}

auto ctx = std::move(request_ctx_it->second);
request_map_.erase(request_ctx_it);
ctx.PropagateResponse(std::move(response));
}

void RequestBroker::CancelRequest(RequestId req_id, CallerId id) {
std::unique_lock<std::mutex> lock(mutex_);

auto request_ctx_it = request_map_.find(req_id);
if (request_ctx_it == request_map_.end()) {
assert(false);
return;
}

RequestContext& ctx = request_ctx_it->second;
if (ctx.CancelRequest(id)) {
request_map_.erase(request_ctx_it);
}
}

} // namespace read
} // namespace dataservice
} // namespace olp
87 changes: 87 additions & 0 deletions olp-cpp-sdk-dataservice-read/src/RequestBroker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (C) 2019-2020 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

#pragma once

#include <unordered_map>

#include <olp/core/client/CancellationContext.h>
#include <olp/dataservice/read/Types.h>

namespace olp {
namespace dataservice {
namespace read {

class RequestBroker {
public:
using Callback = DataResponseCallback;
using RequestId = std::string;

struct CreateOrAssociateResult {
client::CancellationContext context;
client::CancellationToken caller_cancelation_token;
bool just_created;
};

CreateOrAssociateResult CreateOrAssociateRequest(RequestId req_id,
Callback callback);

DataResponseCallback ResponseHandler(RequestId req_id);

private:
using CallerId = std::string;

class RequestContext {
public:
void AddCallback(CallerId id, Callback callback);
void PropagateResponse(DataResponse response);
// Return true if the operation was canceled
bool CancelRequest(CallerId id);

client::CancellationContext CancelationContext();

private:
client::CancellationContext cancelation_context;
std::unordered_map<CallerId, DataResponseCallback> callbacks;
};

using RequestMap = std::unordered_map<RequestId, RequestContext>;

inline client::CancellationToken CancelToken(RequestId req_id, CallerId id) {
return client::CancellationToken([=]() { CancelRequest(req_id, id); });
}

struct GetOrCreateResult {
RequestContext& ctx;
bool just_created;
};

GetOrCreateResult GetOrCreateContext(RequestId req_id);

void PropagateResponse(RequestId req_id, DataResponse response);

void CancelRequest(RequestId req_id, CallerId id);

std::mutex mutex_;
RequestMap request_map_;
};

} // namespace read
} // namespace dataservice
} // namespace olp
43 changes: 28 additions & 15 deletions olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,37 @@ client::CancellationToken VersionedLayerClientImpl::GetData(
auto layer_id = layer_id_;
auto settings = settings_;

auto data_task =
[=](client::CancellationContext context) mutable -> DataResponse {
if (!request.GetDataHandle()) {
auto version_response = GetVersion(request.GetBillingTag(),
request.GetFetchOption(), context);
if (!version_response.IsSuccessful()) {
return version_response.GetError();
auto request_id = request.CreateKey(layer_id);

auto result =
broker_.CreateOrAssociateRequest(request_id, std::move(callback));

if (result.just_created) {
// We just created a task, so trigger a real one and consume the
// CancellationContext provided.
auto data_task =
[=](client::CancellationContext context) mutable -> DataResponse {
if (!request.GetDataHandle()) {
auto version_response = GetVersion(request.GetBillingTag(),
request.GetFetchOption(), context);
if (!version_response.IsSuccessful()) {
return version_response.GetError();
}
request.WithVersion(version_response.GetResult().GetVersion());
}
request.WithVersion(version_response.GetResult().GetVersion());
}

return repository::DataRepository::GetVersionedData(
std::move(catalog), std::move(layer_id), std::move(request), context,
std::move(settings));
};
return repository::DataRepository::GetVersionedData(
std::move(catalog), std::move(layer_id), std::move(request),
context, std::move(settings));
};

return AddTask(settings.task_scheduler, pending_requests_,
std::move(data_task), std::move(callback));
// We don't care about result here since we are passing the context
// outside.
AddTask(settings.task_scheduler, pending_requests_, std::move(data_task),
broker_.ResponseHandler(request_id), result.context);
}

return result.caller_cancelation_token;
};

return ScheduleFetch(std::move(schedule_get_data), std::move(request),
Expand Down
4 changes: 3 additions & 1 deletion olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include <memory>

#include <boost/optional.hpp>
#include <olp/core/client/CancellationContext.h>
#include <olp/core/client/CancellationToken.h>
#include <olp/core/client/HRN.h>
Expand All @@ -33,6 +32,8 @@
#include <olp/dataservice/read/PrefetchTilesRequest.h>
#include <olp/dataservice/read/TileRequest.h>
#include <olp/dataservice/read/Types.h>
#include <boost/optional.hpp>
#include "RequestBroker.h"
#include "repositories/ExecuteOrSchedule.inl"

namespace olp {
Expand Down Expand Up @@ -93,6 +94,7 @@ class VersionedLayerClientImpl {
client::HRN catalog_;
std::string layer_id_;
client::OlpClientSettings settings_;
RequestBroker broker_;
std::shared_ptr<client::PendingRequests> pending_requests_;
std::atomic<int64_t> catalog_version_;
};
Expand Down
Loading