Skip to content

Commit

Permalink
feat: data plane refactor for DPS (#3996)
Browse files Browse the repository at this point in the history
* feat: data plane refactor for DPS

* pr remarks
  • Loading branch information
wolf4ood authored Mar 13, 2024
1 parent 7c575ab commit 34467ea
Show file tree
Hide file tree
Showing 29 changed files with 404 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class DataPlaneFrameworkExtension implements ServiceExtension {
@Inject
private PublicEndpointGeneratorService endpointGenerator;

private DataPlaneAuthorizationService authorizationService;

@Override
public String name() {
return NAME;
Expand Down Expand Up @@ -132,6 +134,7 @@ public void initialize(ServiceExtensionContext context) {
.transferServiceRegistry(transferServiceRegistry)
.store(store)
.transferProcessClient(transferProcessApiClient)
.authorizationService(authorizationService(context))
.monitor(monitor)
.telemetry(telemetry)
.build();
Expand All @@ -153,7 +156,10 @@ public void shutdown() {

@Provider
public DataPlaneAuthorizationService authorizationService(ServiceExtensionContext context) {
return new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, context.getParticipantId(), clock);
if (authorizationService == null) {
authorizationService = new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, context.getParticipantId(), clock);
}
return authorizationService;
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.core.entity.AbstractStateEntityManager;
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.DataFlowStates;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
Expand All @@ -26,7 +27,10 @@
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.eclipse.edc.statemachine.Processor;
import org.eclipse.edc.statemachine.ProcessorImpl;
import org.eclipse.edc.statemachine.StateMachineManager;
Expand All @@ -40,6 +44,7 @@
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.STARTED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;
Expand All @@ -49,6 +54,7 @@
*/
public class DataPlaneManagerImpl extends AbstractStateEntityManager<DataFlow, DataPlaneStore> implements DataPlaneManager {

private DataPlaneAuthorizationService authorizationService;
private TransferServiceRegistry transferServiceRegistry;
private TransferProcessApiClient transferProcessClient;

Expand All @@ -58,26 +64,35 @@ private DataPlaneManagerImpl() {

@Override
public Result<Boolean> validate(DataFlowStartMessage dataRequest) {
var transferService = transferServiceRegistry.resolveTransferService(dataRequest);
return transferService != null ?
transferService.validate(dataRequest) :
Result.failure(format("Cannot find a transfer Service that can handle %s source and %s destination",
dataRequest.getSourceDataAddress().getType(), dataRequest.getDestinationDataAddress().getType()));
// TODO for now no validation for pull scenario, since the transfer service registry
// is not applicable here. Probably validation only on the source part required.
if (FlowType.PULL.equals(dataRequest.getFlowType())) {
return Result.success(true);
} else {
var transferService = transferServiceRegistry.resolveTransferService(dataRequest);
return transferService != null ?
transferService.validate(dataRequest) :
Result.failure(format("Cannot find a transfer Service that can handle %s source and %s destination",
dataRequest.getSourceDataAddress().getType(), dataRequest.getDestinationDataAddress().getType()));
}
}

@Override
public void initiate(DataFlowStartMessage dataRequest) {
var dataFlow = DataFlow.Builder.newInstance()
.id(dataRequest.getProcessId())
.source(dataRequest.getSourceDataAddress())
.destination(dataRequest.getDestinationDataAddress())
.callbackAddress(dataRequest.getCallbackAddress())
.traceContext(telemetry.getCurrentTraceContext())
.properties(dataRequest.getProperties())
.state(RECEIVED.code())
public Result<DataFlowResponseMessage> start(DataFlowStartMessage startMessage) {
var dataFlowBuilder = dataFlowRequestBuilder(startMessage);

var result = handleStart(startMessage, dataFlowBuilder);
if (result.failed()) {
return result.mapTo();
}

var response = DataFlowResponseMessage.Builder.newInstance()
.dataAddress(result.getContent().orElse(null))
.build();

update(dataFlowBuilder.build());

update(dataFlow);
return Result.success(response);
}

@Override
Expand All @@ -94,20 +109,24 @@ public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason)
}

var dataFlow = result.getContent();
var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest());

if (transferService == null) {
return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId));
}
if (FlowType.PUSH.equals(dataFlow.getFlowType())) {
var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest());

if (transferService == null) {
return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId));
}

var terminateResult = transferService.terminate(dataFlow);
if (terminateResult.failed()) {
if (terminateResult.reason().equals(StreamFailure.Reason.NOT_FOUND)) {
monitor.warning("No source was found for DataFlow '%s'. This may indicate an inconsistent state.".formatted(dataFlowId));
} else {
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail()));
var terminateResult = transferService.terminate(dataFlow);
if (terminateResult.failed()) {
if (terminateResult.reason().equals(StreamFailure.Reason.NOT_FOUND)) {
monitor.warning("No source was found for DataFlow '%s'. This may indicate an inconsistent state.".formatted(dataFlowId));
} else {
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail()));
}
}
}

dataFlow.transitToTerminated(reason);
store.save(dataFlow);
return StatusResult.success();
Expand All @@ -121,6 +140,40 @@ protected StateMachineManager.Builder configureStateMachineManager(StateMachineM
.processor(processDataFlowInState(FAILED, this::processFailed));
}

private Result<Optional<DataAddress>> handleStart(DataFlowStartMessage startMessage, DataFlow.Builder dataFlowBuilder) {
return switch (startMessage.getFlowType()) {
case PULL -> handleStartPull(startMessage, dataFlowBuilder);
case PUSH -> handleStartPush(dataFlowBuilder);
};
}

private Result<Optional<DataAddress>> handleStartPush(DataFlow.Builder dataFlowBuilder) {
dataFlowBuilder.state(RECEIVED.code());
return Result.success(Optional.empty());
}

private Result<Optional<DataAddress>> handleStartPull(DataFlowStartMessage startMessage, DataFlow.Builder dataFlowBuilder) {
var dataAddressResult = authorizationService.createEndpointDataReference(startMessage)
.onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail())));

if (dataAddressResult.failed()) {
return dataAddressResult.mapTo();
}
dataFlowBuilder.state(STARTED.code());
return Result.success(Optional.of(dataAddressResult.getContent()));
}

private DataFlow.Builder dataFlowRequestBuilder(DataFlowStartMessage startMessage) {
return DataFlow.Builder.newInstance()
.id(startMessage.getProcessId())
.source(startMessage.getSourceDataAddress())
.destination(startMessage.getDestinationDataAddress())
.callbackAddress(startMessage.getCallbackAddress())
.traceContext(telemetry.getCurrentTraceContext())
.properties(startMessage.getProperties())
.flowType(startMessage.getFlowType());
}

private boolean processReceived(DataFlow dataFlow) {
var request = dataFlow.toRequest();
var transferService = transferServiceRegistry.resolveTransferService(request);
Expand Down Expand Up @@ -220,6 +273,11 @@ public Builder transferProcessClient(TransferProcessApiClient transferProcessCli
manager.transferProcessClient = transferProcessClient;
return this;
}

public Builder authorizationService(DataPlaneAuthorizationService authorizationService) {
manager.authorizationService = authorizationService;
return this;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
Expand All @@ -26,7 +27,9 @@
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -73,6 +76,7 @@ class DataPlaneManagerImplTest {
private final DataPlaneStore store = mock();
private final DataFlowStartMessage request = createRequest();
private final TransferServiceRegistry registry = mock();
private final DataPlaneAuthorizationService authorizationService = mock();
private DataPlaneManagerImpl manager;

@BeforeEach
Expand All @@ -83,6 +87,7 @@ public void setUp() {
.transferServiceRegistry(registry)
.store(store)
.transferProcessClient(transferProcessApiClient)
.authorizationService(authorizationService)
.monitor(mock())
.build();
}
Expand All @@ -96,9 +101,10 @@ void initiateDataFlow() {
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
.callbackAddress(URI.create("http://any"))
.properties(Map.of("key", "value"))
.flowType(FlowType.PUSH)
.build();

manager.initiate(request);
manager.start(request);

var captor = ArgumentCaptor.forClass(DataFlow.class);
verify(store).save(captor.capture());
Expand All @@ -109,6 +115,63 @@ void initiateDataFlow() {
assertThat(dataFlow.getCallbackAddress()).isEqualTo(URI.create("http://any"));
assertThat(dataFlow.getProperties()).isEqualTo(request.getProperties());
assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code());

verifyNoInteractions(authorizationService);
}

@Test
void initiatePullDataFlow() {

var dataAddress = DataAddress.Builder.newInstance().type("type").build();
var request = DataFlowStartMessage.Builder.newInstance()
.id("1")
.processId("1")
.sourceDataAddress(DataAddress.Builder.newInstance().type("type").build())
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
.callbackAddress(URI.create("http://any"))
.properties(Map.of("key", "value"))
.flowType(FlowType.PULL)
.build();

when(authorizationService.createEndpointDataReference(request)).thenReturn(Result.success(dataAddress));

var result = manager.start(request);

assertThat(result).isSucceeded().extracting(DataFlowResponseMessage::getDataAddress).isEqualTo(dataAddress);

var captor = ArgumentCaptor.forClass(DataFlow.class);
verify(store).save(captor.capture());
var dataFlow = captor.getValue();
assertThat(dataFlow.getId()).isEqualTo(request.getProcessId());
assertThat(dataFlow.getSource()).isSameAs(request.getSourceDataAddress());
assertThat(dataFlow.getDestination()).isSameAs(request.getDestinationDataAddress());
assertThat(dataFlow.getCallbackAddress()).isEqualTo(URI.create("http://any"));
assertThat(dataFlow.getProperties()).isEqualTo(request.getProperties());
assertThat(dataFlow.getState()).isEqualTo(STARTED.code());
}

@Test
void initiatePullDataFlow_shouldFail_whenEdrCreationFails() {

var dataAddress = DataAddress.Builder.newInstance().type("type").build();
var request = DataFlowStartMessage.Builder.newInstance()
.id("1")
.processId("1")
.sourceDataAddress(DataAddress.Builder.newInstance().type("type").build())
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
.callbackAddress(URI.create("http://any"))
.properties(Map.of("key", "value"))
.flowType(FlowType.PULL)
.build();

when(authorizationService.createEndpointDataReference(request)).thenReturn(Result.failure("failure"));

var result = manager.start(request);

assertThat(result).isFailed().detail().contains("failure");

var captor = ArgumentCaptor.forClass(DataFlow.class);
verifyNoInteractions(store);
}

@Test
Expand Down Expand Up @@ -366,6 +429,7 @@ private DataFlow.Builder dataFlowBuilder() {
.source(DataAddress.Builder.newInstance().type("source").build())
.destination(DataAddress.Builder.newInstance().type("destination").build())
.callbackAddress(URI.create("http://any"))
.flowType(FlowType.PUSH)
.properties(Map.of("key", "value"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -100,7 +101,7 @@ void shouldCallTransferProcessApiWithComplete(TransferProcessStore store, DataPl
store.save(createTransferProcess(id));
var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get());

manager.initiate(dataFlowRequest);
manager.start(dataFlowRequest);

await().untilAsserted(() -> {
var transferProcess = store.findById("tp-id");
Expand All @@ -116,7 +117,7 @@ void shouldCallTransferProcessApiWithFailed(TransferProcessStore store, DataPlan
store.save(createTransferProcess(id));
var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get());

manager.initiate(dataFlowRequest);
manager.start(dataFlowRequest);

await().untilAsserted(() -> {
var transferProcess = store.findById("tp-id");
Expand All @@ -134,7 +135,7 @@ void shouldCallTransferProcessApiWithException(TransferProcessStore store, DataP
store.save(createTransferProcess(id));
var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get());

manager.initiate(dataFlowRequest);
manager.start(dataFlowRequest);

await().untilAsserted(() -> {
var transferProcess = store.findById("tp-id");
Expand Down Expand Up @@ -166,6 +167,7 @@ private DataFlowStartMessage createDataFlowRequest(String processId, URI callbac
.callbackAddress(callbackAddress)
.sourceDataAddress(DataAddress.Builder.newInstance().type("file").build())
.destinationDataAddress(DataAddress.Builder.newInstance().type("file").build())
.flowType(FlowType.PUSH)
.build();
}

Expand Down
32 changes: 32 additions & 0 deletions extensions/data-plane/data-plane-client-embedded/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/


plugins {
`java-library`
}

dependencies {
api(project(":spi:data-plane:data-plane-spi"))
api(project(":spi:data-plane-selector:data-plane-selector-spi"))

implementation(libs.opentelemetry.instrumentation.annotations)

testImplementation(project(":core:common:junit"))
testImplementation(libs.restAssured)
testImplementation(libs.mockserver.netty)
testImplementation(libs.mockserver.client)
}


Loading

0 comments on commit 34467ea

Please sign in to comment.