Skip to content

Commit

Permalink
Merge request POC.
Browse files Browse the repository at this point in the history
Implemented merge requests mechanism in a separate RequestBroker class

Relates-To: OLPEDGE-1805

Signed-off-by: Mykhailo Kuchma <[email protected]>
  • Loading branch information
mykhailo-kuchma committed May 14, 2020
1 parent 4b2c2e9 commit b37bf14
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 16 deletions.
134 changes: 134 additions & 0 deletions olp-cpp-sdk-dataservice-read/src/RequestBroker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (C) 2019-2020 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

#include "RequestBroker.h"

namespace olp {
namespace dataservice {
namespace read {

namespace {
std::string UniqueId() {
// TODO: generate a better unique id
static unsigned int id = 0;
return std::to_string(id++);
}
} // namespace

void RequestBroker::RequestContext::AddCallback(CallerId id,
Callback callback) {
callbacks[id] = std::move(callback);
}

void RequestBroker::RequestContext::PropagateResponse(DataResponse response) {
for (auto& callback : callbacks) {
callback.second(response);
}
callbacks.clear();
}

// Return true if the operation was canceled
bool RequestBroker::RequestContext::CancelRequest(CallerId id) {
// Cancel individual request
{
auto callback_it = callbacks.find(id);
if (callback_it != callbacks.end()) {
Callback callback = std::move(callback_it->second);
callback(client::ApiError(client::ErrorCode::Cancelled, "Canceled"));
callbacks.erase(callback_it);
} else {
assert(false);
}
}

const bool cancel_operation = callbacks.empty();

if (cancel_operation) {
cancelation_context.CancelOperation();
}

return cancel_operation;
}

client::CancellationContext
RequestBroker::RequestContext::CancelationContext() {
return cancelation_context;
}

RequestBroker::CreateOrAssociateResult RequestBroker::CreateOrAssociateRequest(
RequestId req_id, Callback callback) {
const CallerId caller_id = UniqueId();
GetOrCreateResult result = GetOrCreateContext(req_id);
result.ctx.AddCallback(caller_id, std::move(callback));
return {result.ctx.CancelationContext(), CancelToken(req_id, caller_id),
result.just_created};
}

DataResponseCallback RequestBroker::ResponseHandler(RequestId req_id) {
return [=](DataResponse response) {
PropagateResponse(req_id, std::move(response));
};
}

RequestBroker::GetOrCreateResult RequestBroker::GetOrCreateContext(
RequestId req_id) {
std::unique_lock<std::mutex> lock(mutex_);

auto request_ctx_it = request_map_.find(req_id);
if (request_ctx_it != request_map_.end()) {
return {request_ctx_it->second, false};
} else {
request_ctx_it =
request_map_.insert(std::make_pair(req_id, RequestContext{})).first;
return {request_ctx_it->second, true};
}
}

void RequestBroker::PropagateResponse(RequestId req_id, DataResponse response) {
std::unique_lock<std::mutex> lock(mutex_);

auto request_ctx_it = request_map_.find(req_id);
if (request_ctx_it == request_map_.end()) {
assert(!response.IsSuccessful()); // Expect cancel here
return;
}

auto ctx = std::move(request_ctx_it->second);
request_map_.erase(request_ctx_it);
ctx.PropagateResponse(std::move(response));
}

void RequestBroker::CancelRequest(RequestId req_id, CallerId id) {
std::unique_lock<std::mutex> lock(mutex_);

auto request_ctx_it = request_map_.find(req_id);
if (request_ctx_it == request_map_.end()) {
assert(false);
return;
}

RequestContext& ctx = request_ctx_it->second;
if (ctx.CancelRequest(id)) {
request_map_.erase(request_ctx_it);
}
}

} // namespace read
} // namespace dataservice
} // namespace olp
87 changes: 87 additions & 0 deletions olp-cpp-sdk-dataservice-read/src/RequestBroker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (C) 2019-2020 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

#pragma once

#include <unordered_map>

#include <olp/core/client/CancellationContext.h>
#include <olp/dataservice/read/Types.h>

namespace olp {
namespace dataservice {
namespace read {

class RequestBroker {
public:
using Callback = DataResponseCallback;
using RequestId = std::string;

struct CreateOrAssociateResult {
client::CancellationContext context;
client::CancellationToken caller_cancelation_token;
bool just_created;
};

CreateOrAssociateResult CreateOrAssociateRequest(RequestId req_id,
Callback callback);

DataResponseCallback ResponseHandler(RequestId req_id);

private:
using CallerId = std::string;

class RequestContext {
public:
void AddCallback(CallerId id, Callback callback);
void PropagateResponse(DataResponse response);
// Return true if the operation was canceled
bool CancelRequest(CallerId id);

client::CancellationContext CancelationContext();

private:
client::CancellationContext cancelation_context;
std::unordered_map<CallerId, DataResponseCallback> callbacks;
};

using RequestMap = std::unordered_map<RequestId, RequestContext>;

inline client::CancellationToken CancelToken(RequestId req_id, CallerId id) {
return client::CancellationToken([=]() { CancelRequest(req_id, id); });
}

struct GetOrCreateResult {
RequestContext& ctx;
bool just_created;
};

GetOrCreateResult GetOrCreateContext(RequestId req_id);

void PropagateResponse(RequestId req_id, DataResponse response);

void CancelRequest(RequestId req_id, CallerId id);

std::mutex mutex_;
RequestMap request_map_;
};

} // namespace read
} // namespace dataservice
} // namespace olp
43 changes: 28 additions & 15 deletions olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,37 @@ client::CancellationToken VersionedLayerClientImpl::GetData(
auto layer_id = layer_id_;
auto settings = settings_;

auto data_task =
[=](client::CancellationContext context) mutable -> DataResponse {
if (!request.GetDataHandle()) {
auto version_response = GetVersion(request.GetBillingTag(),
request.GetFetchOption(), context);
if (!version_response.IsSuccessful()) {
return version_response.GetError();
auto request_id = request.CreateKey(layer_id);

auto result =
broker_.CreateOrAssociateRequest(request_id, std::move(callback));

if (result.just_created) {
// We just created a task, so trigger a real one and consume the
// CancellationContext provided.
auto data_task =
[=](client::CancellationContext context) mutable -> DataResponse {
if (!request.GetDataHandle()) {
auto version_response = GetVersion(request.GetBillingTag(),
request.GetFetchOption(), context);
if (!version_response.IsSuccessful()) {
return version_response.GetError();
}
request.WithVersion(version_response.GetResult().GetVersion());
}
request.WithVersion(version_response.GetResult().GetVersion());
}

return repository::DataRepository::GetVersionedData(
std::move(catalog), std::move(layer_id), std::move(request), context,
std::move(settings));
};
return repository::DataRepository::GetVersionedData(
std::move(catalog), std::move(layer_id), std::move(request),
context, std::move(settings));
};

return AddTask(settings.task_scheduler, pending_requests_,
std::move(data_task), std::move(callback));
// We don't care about result here since we are passing the context
// outside.
AddTask(settings.task_scheduler, pending_requests_, std::move(data_task),
broker_.ResponseHandler(request_id), result.context);
}

return result.caller_cancelation_token;
};

return ScheduleFetch(std::move(schedule_get_data), std::move(request),
Expand Down
4 changes: 3 additions & 1 deletion olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include <memory>

#include <boost/optional.hpp>
#include <olp/core/client/CancellationContext.h>
#include <olp/core/client/CancellationToken.h>
#include <olp/core/client/HRN.h>
Expand All @@ -33,6 +32,8 @@
#include <olp/dataservice/read/PrefetchTilesRequest.h>
#include <olp/dataservice/read/TileRequest.h>
#include <olp/dataservice/read/Types.h>
#include <boost/optional.hpp>
#include "RequestBroker.h"
#include "repositories/ExecuteOrSchedule.inl"

namespace olp {
Expand Down Expand Up @@ -93,6 +94,7 @@ class VersionedLayerClientImpl {
client::HRN catalog_;
std::string layer_id_;
client::OlpClientSettings settings_;
RequestBroker broker_;
std::shared_ptr<client::PendingRequests> pending_requests_;
std::atomic<int64_t> catalog_version_;
};
Expand Down
Loading

0 comments on commit b37bf14

Please sign in to comment.