From 34467ea2b8ff285879451d72d24c17e5ecd478e9 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Wed, 13 Mar 2024 15:43:06 +0100 Subject: [PATCH] feat: data plane refactor for DPS (#3996) * feat: data plane refactor for DPS * pr remarks --- .../DataPlaneFrameworkExtension.java | 8 +- .../manager/DataPlaneManagerImpl.java | 108 ++++++++++++++---- .../manager/DataPlaneManagerImplTest.java | 66 ++++++++++- ...nsferProcessHttpClientIntegrationTest.java | 8 +- .../build.gradle.kts | 32 ++++++ .../client/EmbeddedDataPlaneClient.java | 7 +- .../client/EmbeddedDataPlaneClientTest.java | 28 ++++- .../data-plane-client/build.gradle.kts | 1 + .../DataPlaneControlApiController.java | 2 +- .../DataPlaneControlApiControllerTest.java | 4 +- .../http/testfixtures/TestFunctions.java | 2 + .../api/DataPlaneSignalingApiExtension.java | 5 +- .../v1/DataPlaneSignalingApiController.java | 33 ++---- .../DataPlaneSignalingApiControllerTest.java | 32 +++--- .../build.gradle.kts | 1 + .../DataPlaneSignalingClientExtension.java | 19 ++- ...DataPlaneSignalingClientExtensionTest.java | 16 ++- .../sql/data-plane-store-sql/docs/schema.sql | 3 +- .../store/sql/SqlDataPlaneStore.java | 6 +- .../schema/BaseSqlDataPlaneStatements.java | 2 + .../store/sql/schema/DataPlaneStatements.java | 4 + settings.gradle.kts | 1 + .../domain/transfer/DataFlowStartMessage.java | 3 +- .../edc/connector/dataplane/spi/DataFlow.java | 16 ++- .../spi/manager/DataPlaneManager.java | 8 +- .../store/DataPlaneStoreTestBase.java | 2 + .../EndToEndTransferParticipant.java | 1 + ...EmbeddedSignalingTransferInMemoryTest.java | 79 +++++++++++++ .../SignalingTransferInMemoryTest.java | 10 +- 29 files changed, 404 insertions(+), 103 deletions(-) create mode 100644 extensions/data-plane/data-plane-client-embedded/build.gradle.kts rename extensions/data-plane/{data-plane-client => data-plane-client-embedded}/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java (87%) rename extensions/data-plane/{data-plane-client => data-plane-client-embedded}/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java (70%) create mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedSignalingTransferInMemoryTest.java diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java index 3a8b3b3fa56..f92008c9487 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneFrameworkExtension.java @@ -101,6 +101,8 @@ public class DataPlaneFrameworkExtension implements ServiceExtension { @Inject private PublicEndpointGeneratorService endpointGenerator; + private DataPlaneAuthorizationService authorizationService; + @Override public String name() { return NAME; @@ -132,6 +134,7 @@ public void initialize(ServiceExtensionContext context) { .transferServiceRegistry(transferServiceRegistry) .store(store) .transferProcessClient(transferProcessApiClient) + .authorizationService(authorizationService(context)) .monitor(monitor) .telemetry(telemetry) .build(); @@ -153,7 +156,10 @@ public void shutdown() { @Provider public DataPlaneAuthorizationService authorizationService(ServiceExtensionContext context) { - return new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, context.getParticipantId(), clock); + if (authorizationService == null) { + authorizationService = new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, context.getParticipantId(), clock); + } + return authorizationService; } @NotNull diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index be48d2adba9..c0f62f24b9c 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -18,6 +18,7 @@ import org.eclipse.edc.connector.core.entity.AbstractStateEntityManager; import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; @@ -26,7 +27,10 @@ import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.eclipse.edc.statemachine.Processor; import org.eclipse.edc.statemachine.ProcessorImpl; import org.eclipse.edc.statemachine.StateMachineManager; @@ -40,6 +44,7 @@ import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED; +import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.STARTED; import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.TERMINATED; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; @@ -49,6 +54,7 @@ */ public class DataPlaneManagerImpl extends AbstractStateEntityManager implements DataPlaneManager { + private DataPlaneAuthorizationService authorizationService; private TransferServiceRegistry transferServiceRegistry; private TransferProcessApiClient transferProcessClient; @@ -58,26 +64,35 @@ private DataPlaneManagerImpl() { @Override public Result validate(DataFlowStartMessage dataRequest) { - var transferService = transferServiceRegistry.resolveTransferService(dataRequest); - return transferService != null ? - transferService.validate(dataRequest) : - Result.failure(format("Cannot find a transfer Service that can handle %s source and %s destination", - dataRequest.getSourceDataAddress().getType(), dataRequest.getDestinationDataAddress().getType())); + // TODO for now no validation for pull scenario, since the transfer service registry + // is not applicable here. Probably validation only on the source part required. + if (FlowType.PULL.equals(dataRequest.getFlowType())) { + return Result.success(true); + } else { + var transferService = transferServiceRegistry.resolveTransferService(dataRequest); + return transferService != null ? + transferService.validate(dataRequest) : + Result.failure(format("Cannot find a transfer Service that can handle %s source and %s destination", + dataRequest.getSourceDataAddress().getType(), dataRequest.getDestinationDataAddress().getType())); + } } @Override - public void initiate(DataFlowStartMessage dataRequest) { - var dataFlow = DataFlow.Builder.newInstance() - .id(dataRequest.getProcessId()) - .source(dataRequest.getSourceDataAddress()) - .destination(dataRequest.getDestinationDataAddress()) - .callbackAddress(dataRequest.getCallbackAddress()) - .traceContext(telemetry.getCurrentTraceContext()) - .properties(dataRequest.getProperties()) - .state(RECEIVED.code()) + public Result start(DataFlowStartMessage startMessage) { + var dataFlowBuilder = dataFlowRequestBuilder(startMessage); + + var result = handleStart(startMessage, dataFlowBuilder); + if (result.failed()) { + return result.mapTo(); + } + + var response = DataFlowResponseMessage.Builder.newInstance() + .dataAddress(result.getContent().orElse(null)) .build(); + + update(dataFlowBuilder.build()); - update(dataFlow); + return Result.success(response); } @Override @@ -94,20 +109,24 @@ public StatusResult terminate(String dataFlowId, @Nullable String reason) } var dataFlow = result.getContent(); - var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest()); - if (transferService == null) { - return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId)); - } + if (FlowType.PUSH.equals(dataFlow.getFlowType())) { + var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest()); + + if (transferService == null) { + return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId)); + } - var terminateResult = transferService.terminate(dataFlow); - if (terminateResult.failed()) { - if (terminateResult.reason().equals(StreamFailure.Reason.NOT_FOUND)) { - monitor.warning("No source was found for DataFlow '%s'. This may indicate an inconsistent state.".formatted(dataFlowId)); - } else { - return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail())); + var terminateResult = transferService.terminate(dataFlow); + if (terminateResult.failed()) { + if (terminateResult.reason().equals(StreamFailure.Reason.NOT_FOUND)) { + monitor.warning("No source was found for DataFlow '%s'. This may indicate an inconsistent state.".formatted(dataFlowId)); + } else { + return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail())); + } } } + dataFlow.transitToTerminated(reason); store.save(dataFlow); return StatusResult.success(); @@ -121,6 +140,40 @@ protected StateMachineManager.Builder configureStateMachineManager(StateMachineM .processor(processDataFlowInState(FAILED, this::processFailed)); } + private Result> handleStart(DataFlowStartMessage startMessage, DataFlow.Builder dataFlowBuilder) { + return switch (startMessage.getFlowType()) { + case PULL -> handleStartPull(startMessage, dataFlowBuilder); + case PUSH -> handleStartPush(dataFlowBuilder); + }; + } + + private Result> handleStartPush(DataFlow.Builder dataFlowBuilder) { + dataFlowBuilder.state(RECEIVED.code()); + return Result.success(Optional.empty()); + } + + private Result> handleStartPull(DataFlowStartMessage startMessage, DataFlow.Builder dataFlowBuilder) { + var dataAddressResult = authorizationService.createEndpointDataReference(startMessage) + .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))); + + if (dataAddressResult.failed()) { + return dataAddressResult.mapTo(); + } + dataFlowBuilder.state(STARTED.code()); + return Result.success(Optional.of(dataAddressResult.getContent())); + } + + private DataFlow.Builder dataFlowRequestBuilder(DataFlowStartMessage startMessage) { + return DataFlow.Builder.newInstance() + .id(startMessage.getProcessId()) + .source(startMessage.getSourceDataAddress()) + .destination(startMessage.getDestinationDataAddress()) + .callbackAddress(startMessage.getCallbackAddress()) + .traceContext(telemetry.getCurrentTraceContext()) + .properties(startMessage.getProperties()) + .flowType(startMessage.getFlowType()); + } + private boolean processReceived(DataFlow dataFlow) { var request = dataFlow.toRequest(); var transferService = transferServiceRegistry.resolveTransferService(request); @@ -220,6 +273,11 @@ public Builder transferProcessClient(TransferProcessApiClient transferProcessCli manager.transferProcessClient = transferProcessClient; return this; } + + public Builder authorizationService(DataPlaneAuthorizationService authorizationService) { + manager.authorizationService = authorizationService; + return this; + } } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index ca4fa63e263..e3e224a812c 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -16,6 +16,7 @@ import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient; import org.eclipse.edc.connector.dataplane.spi.DataFlow; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService; import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry; @@ -26,7 +27,9 @@ import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.system.ExecutorInstrumentation; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -73,6 +76,7 @@ class DataPlaneManagerImplTest { private final DataPlaneStore store = mock(); private final DataFlowStartMessage request = createRequest(); private final TransferServiceRegistry registry = mock(); + private final DataPlaneAuthorizationService authorizationService = mock(); private DataPlaneManagerImpl manager; @BeforeEach @@ -83,6 +87,7 @@ public void setUp() { .transferServiceRegistry(registry) .store(store) .transferProcessClient(transferProcessApiClient) + .authorizationService(authorizationService) .monitor(mock()) .build(); } @@ -96,9 +101,10 @@ void initiateDataFlow() { .destinationDataAddress(DataAddress.Builder.newInstance().type("type").build()) .callbackAddress(URI.create("http://any")) .properties(Map.of("key", "value")) + .flowType(FlowType.PUSH) .build(); - manager.initiate(request); + manager.start(request); var captor = ArgumentCaptor.forClass(DataFlow.class); verify(store).save(captor.capture()); @@ -109,6 +115,63 @@ void initiateDataFlow() { assertThat(dataFlow.getCallbackAddress()).isEqualTo(URI.create("http://any")); assertThat(dataFlow.getProperties()).isEqualTo(request.getProperties()); assertThat(dataFlow.getState()).isEqualTo(RECEIVED.code()); + + verifyNoInteractions(authorizationService); + } + + @Test + void initiatePullDataFlow() { + + var dataAddress = DataAddress.Builder.newInstance().type("type").build(); + var request = DataFlowStartMessage.Builder.newInstance() + .id("1") + .processId("1") + .sourceDataAddress(DataAddress.Builder.newInstance().type("type").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("type").build()) + .callbackAddress(URI.create("http://any")) + .properties(Map.of("key", "value")) + .flowType(FlowType.PULL) + .build(); + + when(authorizationService.createEndpointDataReference(request)).thenReturn(Result.success(dataAddress)); + + var result = manager.start(request); + + assertThat(result).isSucceeded().extracting(DataFlowResponseMessage::getDataAddress).isEqualTo(dataAddress); + + var captor = ArgumentCaptor.forClass(DataFlow.class); + verify(store).save(captor.capture()); + var dataFlow = captor.getValue(); + assertThat(dataFlow.getId()).isEqualTo(request.getProcessId()); + assertThat(dataFlow.getSource()).isSameAs(request.getSourceDataAddress()); + assertThat(dataFlow.getDestination()).isSameAs(request.getDestinationDataAddress()); + assertThat(dataFlow.getCallbackAddress()).isEqualTo(URI.create("http://any")); + assertThat(dataFlow.getProperties()).isEqualTo(request.getProperties()); + assertThat(dataFlow.getState()).isEqualTo(STARTED.code()); + } + + @Test + void initiatePullDataFlow_shouldFail_whenEdrCreationFails() { + + var dataAddress = DataAddress.Builder.newInstance().type("type").build(); + var request = DataFlowStartMessage.Builder.newInstance() + .id("1") + .processId("1") + .sourceDataAddress(DataAddress.Builder.newInstance().type("type").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("type").build()) + .callbackAddress(URI.create("http://any")) + .properties(Map.of("key", "value")) + .flowType(FlowType.PULL) + .build(); + + when(authorizationService.createEndpointDataReference(request)).thenReturn(Result.failure("failure")); + + var result = manager.start(request); + + assertThat(result).isFailed().detail().contains("failure"); + + var captor = ArgumentCaptor.forClass(DataFlow.class); + verifyNoInteractions(store); } @Test @@ -366,6 +429,7 @@ private DataFlow.Builder dataFlowBuilder() { .source(DataAddress.Builder.newInstance().type("source").build()) .destination(DataAddress.Builder.newInstance().type("destination").build()) .callbackAddress(URI.create("http://any")) + .flowType(FlowType.PUSH) .properties(Map.of("key", "value")); } diff --git a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java index c754d2f44af..49de02d5385 100644 --- a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java +++ b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java @@ -40,6 +40,7 @@ import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -100,7 +101,7 @@ void shouldCallTransferProcessApiWithComplete(TransferProcessStore store, DataPl store.save(createTransferProcess(id)); var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get()); - manager.initiate(dataFlowRequest); + manager.start(dataFlowRequest); await().untilAsserted(() -> { var transferProcess = store.findById("tp-id"); @@ -116,7 +117,7 @@ void shouldCallTransferProcessApiWithFailed(TransferProcessStore store, DataPlan store.save(createTransferProcess(id)); var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get()); - manager.initiate(dataFlowRequest); + manager.start(dataFlowRequest); await().untilAsserted(() -> { var transferProcess = store.findById("tp-id"); @@ -134,7 +135,7 @@ void shouldCallTransferProcessApiWithException(TransferProcessStore store, DataP store.save(createTransferProcess(id)); var dataFlowRequest = createDataFlowRequest(id, callbackUrl.get()); - manager.initiate(dataFlowRequest); + manager.start(dataFlowRequest); await().untilAsserted(() -> { var transferProcess = store.findById("tp-id"); @@ -166,6 +167,7 @@ private DataFlowStartMessage createDataFlowRequest(String processId, URI callbac .callbackAddress(callbackAddress) .sourceDataAddress(DataAddress.Builder.newInstance().type("file").build()) .destinationDataAddress(DataAddress.Builder.newInstance().type("file").build()) + .flowType(FlowType.PUSH) .build(); } diff --git a/extensions/data-plane/data-plane-client-embedded/build.gradle.kts b/extensions/data-plane/data-plane-client-embedded/build.gradle.kts new file mode 100644 index 00000000000..1e236356a5e --- /dev/null +++ b/extensions/data-plane/data-plane-client-embedded/build.gradle.kts @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + + +plugins { + `java-library` +} + +dependencies { + api(project(":spi:data-plane:data-plane-spi")) + api(project(":spi:data-plane-selector:data-plane-selector-spi")) + + implementation(libs.opentelemetry.instrumentation.annotations) + + testImplementation(project(":core:common:junit")) + testImplementation(libs.restAssured) + testImplementation(libs.mockserver.netty) + testImplementation(libs.mockserver.client) +} + + diff --git a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java b/extensions/data-plane/data-plane-client-embedded/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java similarity index 87% rename from extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java rename to extensions/data-plane/data-plane-client-embedded/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java index 7064391e26b..d181788a323 100644 --- a/extensions/data-plane/data-plane-client/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java +++ b/extensions/data-plane/data-plane-client-embedded/src/main/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClient.java @@ -43,8 +43,11 @@ public StatusResult start(DataFlowStartMessage request) if (result.failed()) { return StatusResult.failure(ResponseStatus.FATAL_ERROR, result.getFailureDetail()); } - dataPlaneManager.initiate(request); - return StatusResult.success(DataFlowResponseMessage.Builder.newInstance().build()); + var startResult = dataPlaneManager.start(request); + if (startResult.failed()) { + return StatusResult.failure(ResponseStatus.FATAL_ERROR, startResult.getFailureDetail()); + } + return StatusResult.success(startResult.getContent()); } @Override diff --git a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java b/extensions/data-plane/data-plane-client-embedded/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java similarity index 70% rename from extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java rename to extensions/data-plane/data-plane-client-embedded/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java index c3eb3e096e0..7a2cdba3be6 100644 --- a/extensions/data-plane/data-plane-client/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java +++ b/extensions/data-plane/data-plane-client-embedded/src/test/java/org/eclipse/edc/connector/dataplane/client/EmbeddedDataPlaneClientTest.java @@ -19,13 +19,13 @@ import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -47,16 +47,17 @@ private static DataFlowStartMessage createDataFlowRequest() { @Test void transfer_shouldSucceed_whenTransferInitiatedCorrectly() { + var response = DataFlowResponseMessage.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("type").build()).build(); var request = createDataFlowRequest(); when(dataPlaneManager.validate(any())).thenReturn(Result.success(true)); - doNothing().when(dataPlaneManager).initiate(any()); + when(dataPlaneManager.start(any())).thenReturn(Result.success(response)); var result = client.start(request); verify(dataPlaneManager).validate(request); - verify(dataPlaneManager).initiate(request); + verify(dataPlaneManager).start(request); - assertThat(result).isSucceeded(); + assertThat(result).isSucceeded().isEqualTo(response); } @Test @@ -64,12 +65,27 @@ void transfer_shouldReturnFailedResult_whenValidationFailure() { var errorMsg = "error"; var request = createDataFlowRequest(); when(dataPlaneManager.validate(any())).thenReturn(Result.failure(errorMsg)); - doNothing().when(dataPlaneManager).initiate(any()); + when(dataPlaneManager.start(any())).thenReturn(Result.success(DataFlowResponseMessage.Builder.newInstance().build())); + + var result = client.start(request); + + verify(dataPlaneManager).validate(request); + verify(dataPlaneManager, never()).start(any()); + + assertThat(result).isFailed().messages().hasSize(1).allSatisfy(s -> assertThat(s).contains(errorMsg)); + } + + @Test + void transfer_shouldReturnFailedResult_whenStartFailure() { + var errorMsg = "error"; + var request = createDataFlowRequest(); + when(dataPlaneManager.validate(any())).thenReturn(Result.success(true)); + when(dataPlaneManager.start(any())).thenReturn(Result.failure(errorMsg)); var result = client.start(request); verify(dataPlaneManager).validate(request); - verify(dataPlaneManager, never()).initiate(any()); + verify(dataPlaneManager).start(request); assertThat(result).isFailed().messages().hasSize(1).allSatisfy(s -> assertThat(s).contains(errorMsg)); } diff --git a/extensions/data-plane/data-plane-client/build.gradle.kts b/extensions/data-plane/data-plane-client/build.gradle.kts index fb57504e62f..ca56c68ec64 100644 --- a/extensions/data-plane/data-plane-client/build.gradle.kts +++ b/extensions/data-plane/data-plane-client/build.gradle.kts @@ -23,6 +23,7 @@ dependencies { api(project(":spi:data-plane:data-plane-spi")) api(project(":spi:data-plane-selector:data-plane-selector-spi")) implementation(project(":core:common:util")) + implementation(project(":extensions:data-plane:data-plane-client-embedded")) implementation(libs.opentelemetry.instrumentation.annotations) diff --git a/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java b/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java index fc3686d055b..226b6d463b9 100644 --- a/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java +++ b/extensions/data-plane/data-plane-control-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiController.java @@ -51,7 +51,7 @@ public void initiateTransfer(DataFlowStartMessage request, @Suspended AsyncRespo // TODO token authentication var result = dataPlaneManager.validate(request); if (result.succeeded()) { - dataPlaneManager.initiate(request); + dataPlaneManager.start(request); response.resume(Response.ok().build()); } else { var resp = result.getFailureMessages().isEmpty() ? diff --git a/extensions/data-plane/data-plane-control-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java b/extensions/data-plane/data-plane-control-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java index fc593f0986b..54c233a4026 100644 --- a/extensions/data-plane/data-plane-control-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java +++ b/extensions/data-plane/data-plane-control-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlaneControlApiControllerTest.java @@ -63,7 +63,7 @@ void should_callDataPlaneManager_if_requestIsValid() { .then() .statusCode(Response.Status.OK.getStatusCode()); - verify(manager).initiate(isA(DataFlowStartMessage.class)); + verify(manager).start(isA(DataFlowStartMessage.class)); } @Test @@ -86,7 +86,7 @@ void should_returnBadRequest_if_requestIsInValid() { .statusCode(Response.Status.BAD_REQUEST.getStatusCode()) .body("errors", CoreMatchers.equalTo(List.of(errorMsg))); - verify(manager, never()).initiate(any()); + verify(manager, never()).start(any()); } @Test diff --git a/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java b/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java index 64d07a7ec1e..452c7d80e6c 100644 --- a/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java +++ b/extensions/data-plane/data-plane-http/src/testFixtures/java/org/eclipse/edc/connector/dataplane/http/testfixtures/TestFunctions.java @@ -23,6 +23,7 @@ import org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -52,6 +53,7 @@ public static DataFlowStartMessage.Builder createRequest(Map pro .processId(UUID.randomUUID().toString()) .properties(properties) .sourceDataAddress(source) + .flowType(FlowType.PUSH) .destinationDataAddress(destination); } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java index 80268d9cae8..2060d8642bd 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlaneSignalingApiExtension.java @@ -16,7 +16,6 @@ import org.eclipse.edc.connector.api.signaling.configuration.SignalingApiConfiguration; import org.eclipse.edc.connector.dataplane.api.controller.v1.DataPlaneSignalingApiController; -import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; @@ -38,8 +37,6 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension { @Inject private TypeTransformerRegistry transformerRegistry; @Inject - private DataPlaneAuthorizationService authorizationService; - @Inject private DataPlaneManager dataPlaneManager; @Override @@ -50,7 +47,7 @@ public String name() { @Override public void initialize(ServiceExtensionContext context) { var signalingApiTypeTransformerRegistry = transformerRegistry.forContext("signaling-api"); - var controller = new DataPlaneSignalingApiController(signalingApiTypeTransformerRegistry, authorizationService, + var controller = new DataPlaneSignalingApiController(signalingApiTypeTransformerRegistry, dataPlaneManager, context.getMonitor().withPrefix("SignalingAPI")); webService.registerResource(controlApiConfiguration.getContextAlias(), controller); } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java index ab1b8c275be..6388ccccd1a 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java @@ -23,15 +23,12 @@ import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; -import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowSuspendMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; -import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.web.spi.exception.InvalidRequestException; @@ -44,13 +41,11 @@ public class DataPlaneSignalingApiController implements DataPlaneSignalingApi { private final TypeTransformerRegistry typeTransformerRegistry; - private final DataPlaneAuthorizationService dataPlaneAuthorizationService; private final DataPlaneManager dataPlaneManager; private final Monitor monitor; - public DataPlaneSignalingApiController(TypeTransformerRegistry typeTransformerRegistry, DataPlaneAuthorizationService dataPlaneAuthorizationService, DataPlaneManager dataPlaneManager, Monitor monitor) { + public DataPlaneSignalingApiController(TypeTransformerRegistry typeTransformerRegistry, DataPlaneManager dataPlaneManager, Monitor monitor) { this.typeTransformerRegistry = typeTransformerRegistry; - this.dataPlaneAuthorizationService = dataPlaneAuthorizationService; this.dataPlaneManager = dataPlaneManager; this.monitor = monitor; } @@ -62,26 +57,16 @@ public JsonObject start(JsonObject dataFlowStartMessage) { .onFailure(f -> monitor.warning("Error transforming %s: %s".formatted(DataFlowStartMessage.class, f.getFailureDetail()))) .orElseThrow(InvalidRequestException::new); + dataPlaneManager.validate(startMsg) + .onFailure(f -> monitor.warning("Failed to validate request: %s".formatted(f.getFailureDetail()))) + .orElseThrow(f -> f.getMessages().isEmpty() ? + new InvalidRequestException("Failed to validate request: %s".formatted(startMsg.getId())) : + new InvalidRequestException(f.getMessages())); - var flowResponse = DataFlowResponseMessage.Builder.newInstance(); - if (startMsg.getFlowType().equals(FlowType.PULL)) { - monitor.debug("Create EDR"); - var dataAddress = dataPlaneAuthorizationService.createEndpointDataReference(startMsg) - .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))) - .orElseThrow(InvalidRequestException::new); + var response = dataPlaneManager.start(startMsg) + .orElseThrow(f -> new InvalidRequestException(f.getFailureDetail())); - flowResponse.dataAddress(dataAddress); - } else { - dataPlaneManager.validate(startMsg) - .onFailure(f -> monitor.warning("Failed to validate request: %s".formatted(f.getFailureDetail()))) - .orElseThrow(f -> f.getMessages().isEmpty() ? - new InvalidRequestException("Failed to validate request: %s".formatted(startMsg.getId())) : - new InvalidRequestException(f.getMessages())); - } - - dataPlaneManager.initiate(startMsg); - - return typeTransformerRegistry.transform(flowResponse.build(), JsonObject.class) + return typeTransformerRegistry.transform(response, JsonObject.class) .orElseThrow(f -> new EdcException(f.getFailureDetail())); } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java index b7e4db0a9a0..bd01ef1551a 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java @@ -20,7 +20,6 @@ import jakarta.json.JsonObject; import jakarta.json.JsonString; import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; -import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; @@ -56,18 +55,18 @@ class DataPlaneSignalingApiControllerTest extends RestControllerTestBase { private final TypeTransformerRegistry transformerRegistry = mock(); - private final DataPlaneAuthorizationService authService = mock(); private final DataPlaneManager dataplaneManager = mock(); @DisplayName("Expect HTTP 200 and the correct EDR when a data flow is started") @Test void start() { var flowStartMessage = createFlowStartMessage(); + var flowResponse = DataFlowResponseMessage.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("test-edr").build()).build(); when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))) .thenReturn(success(flowStartMessage)); when(dataplaneManager.validate(any())).thenReturn(success(true)); - when(authService.createEndpointDataReference(any())) - .thenReturn(success(DataAddress.Builder.newInstance().type("test-edr").build())); + when(dataplaneManager.start(any())) + .thenReturn(success(flowResponse)); when(transformerRegistry.transform(isA(DataFlowResponseMessage.class), eq(JsonObject.class))) .thenReturn(success(Json.createObjectBuilder().add("foo", "bar").build())); @@ -85,7 +84,7 @@ void start() { .extract().body().as(JsonObject.class); assertThat(result).hasEntrySatisfying("foo", val -> assertThat(((JsonString) val).getString()).isEqualTo("bar")); - verify(dataplaneManager).initiate(eq(flowStartMessage)); + verify(dataplaneManager).start(eq(flowStartMessage)); } @DisplayName("Expect HTTP 400 when DataFlowStartMessage is invalid") @@ -106,7 +105,7 @@ void start_whenInvalidMessage() { verify(transformerRegistry).transform(isA(JsonObject.class), eq(DataFlowStartMessage.class)); verify(dataplaneManager).validate(any(DataFlowStartMessage.class)); - verifyNoMoreInteractions(transformerRegistry, dataplaneManager, authService); + verifyNoMoreInteractions(transformerRegistry, dataplaneManager); } @@ -124,7 +123,7 @@ void start_whenTransformationFails() { .statusCode(400); verify(transformerRegistry).transform(isA(JsonObject.class), eq(DataFlowStartMessage.class)); - verifyNoMoreInteractions(transformerRegistry, dataplaneManager, authService); + verifyNoMoreInteractions(transformerRegistry, dataplaneManager); } @DisplayName("Expect HTTP 400 when an EDR cannot be created") @@ -133,7 +132,7 @@ void start_whenCreateEdrFails() { when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))) .thenReturn(success(createFlowStartMessage())); when(dataplaneManager.validate(any())).thenReturn(success(true)); - when(authService.createEndpointDataReference(any())) + when(dataplaneManager.start(any())) .thenReturn(Result.failure("test-failure")); var jsonObject = Json.createObjectBuilder().build(); @@ -145,32 +144,35 @@ void start_whenCreateEdrFails() { .statusCode(400); verify(transformerRegistry).transform(isA(JsonObject.class), eq(DataFlowStartMessage.class)); - verify(authService).createEndpointDataReference(any()); - verifyNoMoreInteractions(transformerRegistry, dataplaneManager, authService); + verify(dataplaneManager).validate(any()); + verify(dataplaneManager).start(any()); + verifyNoMoreInteractions(transformerRegistry, dataplaneManager); } @DisplayName("Expect HTTP 500 when the DataAddress cannot be serialized on egress") @Test void start_whenDataAddressTransformationFails() { var flowStartMessage = createFlowStartMessage(); + var flowResponse = DataFlowResponseMessage.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("test-edr").build()).build(); + when(transformerRegistry.transform(isA(JsonObject.class), eq(DataFlowStartMessage.class))) .thenReturn(success(flowStartMessage)); when(dataplaneManager.validate(any())).thenReturn(success(true)); - when(authService.createEndpointDataReference(any())) - .thenReturn(success(DataAddress.Builder.newInstance().type("test-edr").build())); + when(dataplaneManager.start(any())) + .thenReturn(success(flowResponse)); when(transformerRegistry.transform(isA(DataAddress.class), eq(JsonObject.class))) .thenReturn(failure("test-failure")); var jsonObject = Json.createObjectBuilder().build(); - var result = baseRequest() + baseRequest() .contentType(ContentType.JSON) .body(jsonObject) .post("/v1/dataflows") .then() .statusCode(500); - verify(dataplaneManager).initiate(eq(flowStartMessage)); + verify(dataplaneManager).start(eq(flowStartMessage)); } @DisplayName("Expect HTTP 200 and the correct response when getting the state") @@ -242,7 +244,7 @@ void suspend() { @Override protected Object controller() { - return new DataPlaneSignalingApiController(transformerRegistry, authService, dataplaneManager, mock()); + return new DataPlaneSignalingApiController(transformerRegistry, dataplaneManager, mock()); } private DataFlowStartMessage createFlowStartMessage() { diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/build.gradle.kts b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/build.gradle.kts index 5d22ded0433..88d2653c3d9 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/build.gradle.kts +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/build.gradle.kts @@ -24,6 +24,7 @@ dependencies { implementation(project(":core:common:util")) implementation(project(":core:common:transform-core")) implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform")) + implementation(project(":extensions:data-plane:data-plane-client-embedded")) implementation(libs.opentelemetry.instrumentation.annotations) diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java index 4d05e651c71..051551277a2 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java @@ -16,15 +16,19 @@ import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient; import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; import org.eclipse.edc.spi.http.EdcHttpClient; import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; +import java.util.Objects; + import static org.eclipse.edc.spi.CoreConstants.JSON_LD; /** @@ -34,7 +38,7 @@ public class DataPlaneSignalingClientExtension implements ServiceExtension { public static final String NAME = "Data Plane Signaling Client"; - @Inject + @Inject(required = false) private EdcHttpClient httpClient; @Inject @@ -45,6 +49,8 @@ public class DataPlaneSignalingClientExtension implements ServiceExtension { @Inject private JsonLd jsonLd; + @Inject(required = false) + private DataPlaneManager dataPlaneManager; @Override public String name() { @@ -52,8 +58,17 @@ public String name() { } @Provider - public DataPlaneClientFactory dataPlaneClientFactory() { + public DataPlaneClientFactory dataPlaneClientFactory(ServiceExtensionContext context) { + + if (dataPlaneManager != null) { + // Data plane manager is embedded in the current runtime + context.getMonitor().debug(() -> "Using embedded Data Plane client."); + return instance -> new EmbeddedDataPlaneClient(dataPlaneManager); + } + var mapper = typeManager.getMapper(JSON_LD); + context.getMonitor().debug(() -> "Using remote Data Plane client."); + Objects.requireNonNull(httpClient, "To use remote Data Plane client, an EdcHttpClient instance must be registered"); var signalingApiTypeTransformerRegistry = transformerRegistry.forContext("signaling-api"); return instance -> new DataPlaneSignalingClient(httpClient, signalingApiTypeTransformerRegistry, jsonLd, mapper, instance); } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtensionTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtensionTest.java index 448123608c0..7ecaa58b92d 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtensionTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtensionTest.java @@ -15,7 +15,10 @@ package org.eclipse.edc.connector.dataplane.client; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.system.injection.ObjectFactory; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -25,13 +28,22 @@ class DataPlaneSignalingClientExtensionTest { @Test - void verifyDataPlaneClientFactory(DataPlaneSignalingClientExtension extension) { + void verifyDataPlaneClientFactory(ServiceExtensionContext context, ObjectFactory factory) { + context.registerService(DataPlaneManager.class, null); + var extension = factory.constructInstance(DataPlaneSignalingClientExtension.class); - var client = extension.dataPlaneClientFactory().createClient(createDataPlaneInstance()); + var client = extension.dataPlaneClientFactory(context).createClient(createDataPlaneInstance()); assertThat(client).isInstanceOf(DataPlaneSignalingClient.class); } + @Test + void verifyDataPlaneClientFactory_withEmbedded(ServiceExtensionContext context, DataPlaneSignalingClientExtension extension) { + var client = extension.dataPlaneClientFactory(context).createClient(createDataPlaneInstance()); + + assertThat(client).isInstanceOf(EmbeddedDataPlaneClient.class); + } + private DataPlaneInstance createDataPlaneInstance() { return DataPlaneInstance.Builder.newInstance().url("http://any").build(); diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql index 0073ae461da..a7adac0d27e 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql +++ b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql @@ -30,7 +30,8 @@ CREATE TABLE IF NOT EXISTS edc_data_plane ON DELETE SET NULL, source JSON, destination JSON, - properties JSON + properties JSON, + flow_type VARCHAR ); COMMENT ON COLUMN edc_data_plane.trace_context IS 'Java Map serialized as JSON'; diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java index 8283044dda0..ebf1bc2f703 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java @@ -23,6 +23,7 @@ import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.eclipse.edc.sql.QueryExecutor; import org.eclipse.edc.sql.lease.SqlLeaseContextBuilder; import org.eclipse.edc.sql.store.AbstractSqlStore; @@ -147,7 +148,8 @@ private void insert(Connection connection, DataFlow dataFlow) { Optional.ofNullable(dataFlow.getCallbackAddress()).map(URI::toString).orElse(null), toJson(dataFlow.getSource()), toJson(dataFlow.getDestination()), - toJson(dataFlow.getProperties()) + toJson(dataFlow.getProperties()), + dataFlow.getFlowType().toString() ); } @@ -164,6 +166,7 @@ private void update(Connection connection, DataFlow dataFlow) { toJson(dataFlow.getSource()), toJson(dataFlow.getDestination()), toJson(dataFlow.getProperties()), + dataFlow.getFlowType().toString(), dataFlow.getId()); } @@ -181,6 +184,7 @@ private DataFlow mapDataFlow(ResultSet resultSet) throws SQLException { .source(fromJson(resultSet.getString(statements.getSourceColumn()), DataAddress.class)) .destination(fromJson(resultSet.getString(statements.getDestinationColumn()), DataAddress.class)) .properties(fromJson(resultSet.getString(statements.getPropertiesColumn()), getTypeRef())) + .flowType(FlowType.valueOf(resultSet.getString(statements.getFlowTypeColumn()))) .build(); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java index 07f9c15bac5..9c310727265 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java @@ -44,6 +44,7 @@ public String getInsertTemplate() { .jsonColumn(getSourceColumn()) .jsonColumn(getDestinationColumn()) .jsonColumn(getPropertiesColumn()) + .column(getFlowTypeColumn()) .insertInto(getDataPlaneTable()); } @@ -60,6 +61,7 @@ public String getUpdateTemplate() { .jsonColumn(getSourceColumn()) .jsonColumn(getDestinationColumn()) .jsonColumn(getPropertiesColumn()) + .column(getFlowTypeColumn()) .update(getDataPlaneTable(), getIdColumn()); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java index ba7fb3026cc..51850bd3217 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java @@ -48,6 +48,10 @@ default String getPropertiesColumn() { return "properties"; } + default String getFlowTypeColumn() { + return "flow_type"; + } + String getInsertTemplate(); String getUpdateTemplate(); diff --git a/settings.gradle.kts b/settings.gradle.kts index 92419fc3ae2..68e231ecc07 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -173,6 +173,7 @@ include(":extensions:control-plane:callback:callback-static-endpoint") include(":extensions:data-plane:data-plane-client") +include(":extensions:data-plane:data-plane-client-embedded") include(":extensions:data-plane:data-plane-control-api") include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api") include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api-configuration") diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java index 1cd3ee9500c..304d81ef0ce 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java @@ -59,7 +59,7 @@ public class DataFlowStartMessage implements Polymorphic, TraceCarrier { private DataAddress sourceDataAddress; private DataAddress destinationDataAddress; - private FlowType flowType; + private FlowType flowType = FlowType.PUSH; private URI callbackAddress; private Map properties = new HashMap<>(); @@ -249,6 +249,7 @@ public DataFlowStartMessage build() { Objects.requireNonNull(request.sourceDataAddress, "sourceDataAddress"); Objects.requireNonNull(request.destinationDataAddress, "destinationDataAddress"); Objects.requireNonNull(request.traceContext, "traceContext"); + Objects.requireNonNull(request.flowType, "flowType"); return request; } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java index f2551f8fcaf..550fefa9fc3 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java @@ -19,6 +19,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.jetbrains.annotations.Nullable; import java.net.URI; @@ -46,13 +47,16 @@ public class DataFlow extends StatefulEntity { private URI callbackAddress; private Map properties = new HashMap<>(); + private FlowType flowType = FlowType.PULL; + @Override public DataFlow copy() { var builder = Builder.newInstance() .source(source) .destination(destination) .callbackAddress(callbackAddress) - .properties(properties); + .properties(properties) + .flowType(flowType); return copy(builder); } @@ -78,6 +82,10 @@ public Map getProperties() { return Collections.unmodifiableMap(properties); } + public FlowType getFlowType() { + return flowType; + } + public DataFlowStartMessage toRequest() { return DataFlowStartMessage.Builder.newInstance() .id(getId()) @@ -87,6 +95,7 @@ public DataFlowStartMessage toRequest() { .callbackAddress(getCallbackAddress()) .traceContext(traceContext) .properties(getProperties()) + .flowType(getFlowType()) .build(); } @@ -161,6 +170,11 @@ public Builder callbackAddress(URI callbackAddress) { return this; } + public Builder flowType(FlowType flowType) { + entity.flowType = flowType; + return this; + } + public Builder properties(Map properties) { entity.properties = new HashMap<>(properties); return this; diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java index c5a8a005572..e24b129a77d 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/manager/DataPlaneManager.java @@ -19,6 +19,7 @@ import org.eclipse.edc.spi.entity.StateEntityManager; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.jetbrains.annotations.Nullable; @@ -34,9 +35,12 @@ public interface DataPlaneManager extends StateEntityManager { Result validate(DataFlowStartMessage dataRequest); /** - * Initiates a transfer for the data flow request. This method is non-blocking with respect to processing the request. + * Starts a transfer for the data flow request. This method is non-blocking with respect to processing the request. + * + * @param startMessage The {@link DataFlowStartMessage} + * @return success with the {@link DataFlowResponseMessage} if the request was correctly processed, failure otherwise */ - void initiate(DataFlowStartMessage dataRequest); + Result start(DataFlowStartMessage startMessage); /** * Returns the transfer state for the process. diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java index 854b97da345..d947b7e5517 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java @@ -22,6 +22,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.result.StoreFailure; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -60,6 +61,7 @@ private DataFlow createDataFlow(String id, DataFlowStates state) { .callbackAddress(URI.create("http://any")) .source(DataAddress.Builder.newInstance().type("src-type").build()) .destination(DataAddress.Builder.newInstance().type("dest-type").build()) + .flowType(FlowType.PUSH) .state(state.code()) .build(); } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java index a4a08dc19a1..f3cbd1aadc6 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java @@ -292,6 +292,7 @@ public Map dataPlaneConfiguration() { put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); + put("edc.dataplane.api.public.baseurl", dataPlanePublic + "/v2/"); put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token"); put("edc.transfer.proxy.token.signer.privatekey.alias", "1"); put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedSignalingTransferInMemoryTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedSignalingTransferInMemoryTest.java new file mode 100644 index 00000000000..eba3008aaf3 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedSignalingTransferInMemoryTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.signaling; + +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.HashMap; +import java.util.Map; + +@EndToEndTest +class EmbeddedSignalingTransferInMemoryTest extends AbstractSignalingTransfer { + + static String[] providerPlaneModules = new String[]{ + ":system-tests:e2e-transfer-test:control-plane", + ":extensions:control-plane:transfer:transfer-data-plane-signaling", + ":system-tests:e2e-transfer-test:data-plane", + ":extensions:data-plane:data-plane-public-api-v2" + }; + static String[] consumerPlaneModules = new String[]{ + ":system-tests:e2e-transfer-test:control-plane", + ":extensions:control-plane:callback:callback-event-dispatcher", + ":extensions:control-plane:callback:callback-http-dispatcher" + }; + + @RegisterExtension + static EdcClassRuntimesExtension runtimes = new EdcClassRuntimesExtension( + new EdcRuntimeExtension( + "consumer-control-plane", + CONSUMER.controlPlaneConfiguration(), + consumerPlaneModules + ), + new EdcRuntimeExtension( + ":system-tests:e2e-transfer-test:backend-service", + "consumer-backend-service", + new HashMap<>() { + { + put("web.http.port", String.valueOf(CONSUMER.backendService().getPort())); + } + } + ), + new EdcRuntimeExtension( + "provider-control-plane", + providerConfig(), + providerPlaneModules + ), + new EdcRuntimeExtension( + ":system-tests:e2e-transfer-test:backend-service", + "provider-backend-service", + new HashMap<>() { + { + put("web.http.port", String.valueOf(PROVIDER.backendService().getPort())); + } + } + ) + ); + + + private static Map providerConfig() { + var cfg = PROVIDER.dataPlaneConfiguration(); + cfg.putAll(PROVIDER.controlPlaneConfiguration()); + return cfg; + } + +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java index 07695f7d459..2377e494ff1 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java @@ -14,12 +14,9 @@ package org.eclipse.edc.test.e2e.signaling; -import org.eclipse.edc.connector.dataplane.spi.Endpoint; -import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.RegisterExtension; import java.util.HashMap; @@ -78,10 +75,5 @@ class SignalingTransferInMemoryTest extends AbstractSignalingTransfer { } ) ); - - @BeforeAll - static void setup() { - var generator = dataPlane.getContext().getService(PublicEndpointGeneratorService.class); - generator.addGeneratorFunction("HttpData", dataAddress -> Endpoint.url(PROVIDER.publicDataPlane() + "/v2/")); - } + }