From 169eecc658eec1243c027c79273acc5401dda9a0 Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Wed, 20 Mar 2024 15:37:12 +0100 Subject: [PATCH] refactor: inline `DataRequest` value object into `TransferProcess` (#4009) * refactor: inline data request * simplify sql schema --- .../TransferProcessProtocolServiceImpl.java | 18 +- ...ransferProcessProtocolServiceImplTest.java | 15 +- .../TransferProcessServiceImplTest.java | 3 +- .../InMemoryTransferProcessStore.java | 2 +- .../transfer/TransferCoreExtension.java | 2 - .../process/TransferProcessManagerImpl.java | 26 +-- .../flow/DataFlowManagerImplTest.java | 22 +-- ...sferProcessManagerImplIntegrationTest.java | 15 +- .../TransferProcessManagerImplTest.java | 44 ++--- .../DeprovisionResponsesHandlerTest.java | 19 +- .../ProvisionResponsesHandlerTest.java | 15 +- .../ResourceManifestGeneratorImplTest.java | 16 +- .../DspTransferProcessApiControllerTest.java | 1 - ...ectFromTransferProcessTransformerTest.java | 17 +- ...nsferProcessHttpClientIntegrationTest.java | 11 +- ...ectFromTransferProcessTransformerTest.java | 11 +- ...oviderResourceDefinitionGeneratorTest.java | 16 +- .../sql/transfer-process-store-sql/README.md | 26 +++ .../docs/schema.sql | 34 +--- .../store/SqlTransferProcessStore.java | 80 +++----- .../schema/BaseSqlDialectStatements.java | 38 ++-- .../TransferProcessStoreStatements.java | 24 +-- .../schema/postgres/DataRequestMapping.java | 48 ----- .../postgres/TransferProcessMapping.java | 15 +- .../PostgresTransferProcessStoreTest.java | 1 - .../DataPlaneSignalingFlowControllerTest.java | 72 +++---- ...onsumerPullTransferDataFlowController.java | 14 +- .../ConsumerPullDataPlaneProxyResolver.java | 10 +- ...merPullTransferDataFlowControllerTest.java | 41 ++-- ...derPushTransferDataFlowControllerTest.java | 54 ++---- ...onsumerPullDataPlaneProxyResolverTest.java | 40 ++-- .../receiver/http/dynamic/TestFunctions.java | 5 +- .../transfer/spi/types/DataRequest.java | 176 ------------------ .../transfer/spi/types/TransferProcess.java | 80 +++++--- .../transfer/spi/types/DataRequestTest.java | 35 ---- .../spi/testfixtures/store/TestFunctions.java | 36 +--- .../store/TransferProcessStoreTestBase.java | 62 ++---- .../TransferProcessApiEndToEndTest.java | 20 +- .../protocol/DspTransferApiEndToEndTest.java | 7 +- 39 files changed, 348 insertions(+), 823 deletions(-) delete mode 100644 extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/DataRequestMapping.java delete mode 100644 spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/types/DataRequest.java delete mode 100644 spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/types/DataRequestTest.java diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java index 7cabd1ca48b..2d8c57606c6 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java @@ -24,7 +24,6 @@ import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessStartedData; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; @@ -154,22 +153,19 @@ public ServiceResult findById(String id, TokenRepresentation to private ServiceResult requestedAction(TransferRequestMessage message, String assetId) { var destination = message.getDataDestination() != null ? message.getDataDestination() : DataAddress.Builder.newInstance().type(HTTP_PROXY).build(); - var dataRequest = DataRequest.Builder.newInstance() - .id(message.getConsumerPid()) - .protocol(message.getProtocol()) - .connectorAddress(message.getCallbackAddress()) - .dataDestination(destination) - .assetId(assetId) - .contractId(message.getContractId()) - .build(); - var existingTransferProcess = transferProcessStore.findForCorrelationId(dataRequest.getId()); + var existingTransferProcess = transferProcessStore.findForCorrelationId(message.getConsumerPid()); if (existingTransferProcess != null) { return ServiceResult.success(existingTransferProcess); } var process = TransferProcess.Builder.newInstance() .id(randomUUID().toString()) - .dataRequest(dataRequest) + .protocol(message.getProtocol()) + .correlationId(message.getConsumerPid()) + .counterPartyAddress(message.getCallbackAddress()) + .dataDestination(destination) + .assetId(assetId) + .contractId(message.getContractId()) .transferType(message.getTransferType()) .type(PROVIDER) .clock(clock) diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java index a7f8d6cf3ef..a2a82afa536 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java @@ -24,7 +24,6 @@ import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener; import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessStartedData; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage; @@ -139,7 +138,7 @@ void notifyRequested_validAgreement_shouldInitiateTransfer() { assertThat(result).isSucceeded().satisfies(tp -> { assertThat(tp.getCorrelationId()).isEqualTo("consumerPid"); - assertThat(tp.getConnectorAddress()).isEqualTo("http://any"); + assertThat(tp.getCounterPartyAddress()).isEqualTo("http://any"); assertThat(tp.getAssetId()).isEqualTo("assetId"); }); verify(listener).preCreated(any()); @@ -241,7 +240,7 @@ void notifyRequested_missingDestination_shouldInitiateTransfer() { assertThat(result).isSucceeded().satisfies(transferProcess -> { assertThat(transferProcess.getCorrelationId()).isEqualTo("consumerPid"); - assertThat(transferProcess.getConnectorAddress()).isEqualTo("http://any"); + assertThat(transferProcess.getCounterPartyAddress()).isEqualTo("http://any"); assertThat(transferProcess.getAssetId()).isEqualTo("assetId"); assertThat(transferProcess.getDataDestination().getType()).isEqualTo(HTTP_PROXY); }); @@ -782,7 +781,8 @@ private TransferProcess transferProcess(TransferProcessStates state, String id) private TransferProcess.Builder transferProcessBuilder() { return TransferProcess.Builder.newInstance() - .dataRequest(dataRequest()); + .contractId("contractId") + .dataDestination(DataAddress.Builder.newInstance().type("type").build()); } private ParticipantAgent participantAgent() { @@ -805,13 +805,6 @@ private ContractAgreement contractAgreement() { .build(); } - private DataRequest dataRequest() { - return DataRequest.Builder.newInstance() - .contractId("contractId") - .destinationType("type") - .build(); - } - @FunctionalInterface private interface MethodCall { ServiceResult call(TransferProcessProtocolService service, M message, TokenRepresentation token); diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java index 3c6c17a785d..20f64a603c9 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessServiceImplTest.java @@ -19,7 +19,6 @@ import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService; import org.eclipse.edc.connector.transfer.spi.TransferProcessManager; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.connector.transfer.spi.types.TransferRequest; @@ -262,7 +261,7 @@ private TransferProcess transferProcess(TransferProcessStates state, String id) return TransferProcess.Builder.newInstance() .state(state.code()) .id(id) - .dataRequest(DataRequest.Builder.newInstance().dataDestination(DataAddress.Builder.newInstance().type("any").build()).build()) + .dataDestination(DataAddress.Builder.newInstance().type("any").build()) .build(); } diff --git a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java index 326cb5e24e1..4c9abfc6a2a 100644 --- a/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java +++ b/core/control-plane/control-plane-core/src/main/java/org/eclipse/edc/connector/defaults/storage/transferprocess/InMemoryTransferProcessStore.java @@ -44,7 +44,7 @@ public InMemoryTransferProcessStore(String leaserId, Clock clock, CriterionOpera @Override public @Nullable TransferProcess findForCorrelationId(String correlationId) { - var querySpec = QuerySpec.Builder.newInstance().filter(criterion("dataRequest.id", "=", correlationId)).build(); + var querySpec = QuerySpec.Builder.newInstance().filter(criterion("correlationId", "=", correlationId)).build(); return super.findAll(querySpec).findFirst().orElse(null); } diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferCoreExtension.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferCoreExtension.java index 421d8c5a857..d7e33d47def 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferCoreExtension.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/TransferCoreExtension.java @@ -34,7 +34,6 @@ import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator; import org.eclipse.edc.connector.transfer.spi.retry.TransferWaitStrategy; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedContentResource; import org.eclipse.edc.runtime.metamodel.annotation.CoreExtension; @@ -219,7 +218,6 @@ private EntityRetryProcessConfiguration getEntityRetryProcessConfiguration(Servi } private void registerTypes(TypeManager typeManager) { - typeManager.registerTypes(DataRequest.class); typeManager.registerTypes(ProvisionedContentResource.class); typeManager.registerTypes(DeprovisionedResource.class); } diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java index 528161bf197..c4f491eac46 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java @@ -32,7 +32,6 @@ import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; @@ -133,20 +132,15 @@ public StatusResult initiateConsumerRequest(TransferRequest tra if (existingTransferProcess != null) { return StatusResult.success(existingTransferProcess); } - var dataRequest = DataRequest.Builder.newInstance() + + var process = TransferProcess.Builder.newInstance() .id(id) .assetId(transferRequest.getAssetId()) .dataDestination(transferRequest.getDataDestination()) - .connectorAddress(transferRequest.getCounterPartyAddress()) + .counterPartyAddress(transferRequest.getCounterPartyAddress()) .contractId(transferRequest.getContractId()) - .destinationType(transferRequest.getDataDestination().getType()) .protocol(transferRequest.getProtocol()) .dataDestination(transferRequest.getDataDestination()) - .build(); - - var process = TransferProcess.Builder.newInstance() - .id(id) - .dataRequest(dataRequest) .type(CONSUMER) .clock(clock) .transferType(transferRequest.getTransferType()) @@ -297,7 +291,7 @@ private boolean processRequesting(TransferProcess process) { .onRetryExhausted(this::transitionToTerminated) .onFailure((t, throwable) -> transitionToRequesting(t)) .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) - .execute("send transfer request to " + process.getConnectorAddress()); + .execute("send transfer request to " + process.getCounterPartyAddress()); } /** @@ -338,7 +332,7 @@ private boolean processCompleting(TransferProcess process) { .onFailure((t, throwable) -> transitionToCompleting(t)) .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute("send transfer completion to " + process.getConnectorAddress()); + .execute("send transfer completion to " + process.getCounterPartyAddress()); } /** @@ -412,7 +406,7 @@ private void sendTransferStartMessage(TransferProcess process, DataFlowResponse .onFailure((t, throwable) -> transitionToStarting(t)) .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute("send transfer start to " + process.getConnectorAddress()); + .execute("send transfer start to " + process.getCounterPartyAddress()); } @NotNull @@ -442,7 +436,7 @@ private boolean sendTransferSuspensionMessage(TransferProcess process) { .onFailure((t, throwable) -> transitionToSuspending(t, throwable.getMessage())) .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) .onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) - .execute("send transfer suspension to " + process.getConnectorAddress()); + .execute("send transfer suspension to " + process.getCounterPartyAddress()); } private boolean sendTransferTerminationMessage(TransferProcess process) { @@ -459,15 +453,15 @@ private boolean sendTransferTerminationMessage(TransferProcess process) { .onFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) .onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail())) .onRetryExhausted(this::transitionToTerminated) - .execute("send transfer termination to " + process.getConnectorAddress()); + .execute("send transfer termination to " + process.getCounterPartyAddress()); } private > AsyncStatusResultRetryProcess dispatch(B messageBuilder, TransferProcess process, Policy policy, Class responseType) { messageBuilder.protocol(process.getProtocol()) - .counterPartyAddress(process.getConnectorAddress()) - .processId(process.getCorrelationId()) + .counterPartyAddress(process.getCounterPartyAddress()) + .processId(Optional.ofNullable(process.getCorrelationId()).orElse(process.getId())) .policy(policy); if (process.lastSentProtocolMessage() != null) { diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java index 4f9f7e8e44c..9dfe72e7484 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java @@ -16,7 +16,6 @@ import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.EdcException; @@ -46,10 +45,11 @@ class Initiate { @Test void shouldInitiateFlowOnCorrectController() { var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); var policy = Policy.Builder.newInstance().build(); var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + var transferProcess = TransferProcess.Builder.newInstance() + .dataDestination(DataAddress.Builder.newInstance().type("test-dest-type").build()) + .contentDataAddress(dataAddress).build(); when(controller.canHandle(any())).thenReturn(true); when(controller.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); @@ -63,10 +63,10 @@ void shouldInitiateFlowOnCorrectController() { @Test void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() { var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build(); var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); var policy = Policy.Builder.newInstance().build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build(); when(controller.canHandle(any())).thenReturn(false); manager.register(controller); @@ -80,10 +80,10 @@ void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() { @Test void shouldCatchExceptionsAndReturnFatalError() { var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build(); var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); var policy = Policy.Builder.newInstance().build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build(); var errorMsg = "Test Error Message"; when(controller.canHandle(any())).thenReturn(true); @@ -123,9 +123,9 @@ class Suspend { @Test void shouldChooseControllerAndSuspend() { var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build(); var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build(); when(controller.canHandle(any())).thenReturn(true); when(controller.suspend(any())).thenReturn(StatusResult.success()); @@ -143,9 +143,9 @@ class Terminate { @Test void shouldChooseControllerAndTerminate() { var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build(); var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build(); when(controller.canHandle(any())).thenReturn(true); when(controller.terminate(any())).thenReturn(StatusResult.success()); diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java index b4393c43e7f..d4a47583338 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplIntegrationTest.java @@ -25,7 +25,6 @@ import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResourceSet; import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest; @@ -45,6 +44,7 @@ import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.retry.ExponentialWaitStrategy; +import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.callback.CallbackAddress; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -173,18 +173,13 @@ private ProvisionedResourceSet provisionedResourceSet() { } private TransferProcess.Builder transferProcessBuilder() { - var processId = UUID.randomUUID().toString(); - var dataRequest = DataRequest.Builder.newInstance() - .id(processId) - .destinationType("test-type") - .contractId(UUID.randomUUID().toString()) - .build(); - return TransferProcess.Builder.newInstance() .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) .type(CONSUMER) - .id("test-process-" + processId) - .dataRequest(dataRequest); + .id("test-process-" + UUID.randomUUID()) + .correlationId(UUID.randomUUID().toString()) + .dataDestination(DataAddress.Builder.newInstance().type("test-type").build()) + .contractId(UUID.randomUUID().toString()); } @Nested diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java index 41831df787a..9e278908433 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java @@ -30,7 +30,6 @@ import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource; import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedDataDestinationResource; @@ -194,7 +193,8 @@ void initiateConsumerRequest() { verify(transferProcessStore, times(RETRY_LIMIT)).save(captor.capture()); var transferProcess = captor.getValue(); - assertThat(transferProcess.getId()).isEqualTo("1").isEqualTo(transferProcess.getCorrelationId()); + assertThat(transferProcess.getId()).isEqualTo("1"); + assertThat(transferProcess.getCorrelationId()).isNull(); assertThat(transferProcess.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().contains(callback); verify(listener).initiated(any()); } @@ -288,8 +288,7 @@ void initial_provider_shouldStoreSecret_whenItIsFoundInTheDataAddress() { .type("type") .property(EDC_DATA_ADDRESS_SECRET, "secret") .build(); - var dataRequest = createDataRequestBuilder().dataDestination(destinationDataAddress).build(); - var transferProcess = createTransferProcessBuilder(INITIAL).type(PROVIDER).dataRequest(dataRequest).build(); + var transferProcess = createTransferProcessBuilder(INITIAL).type(PROVIDER).dataDestination(destinationDataAddress).build(); when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build()); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(INITIAL.code()))).thenReturn(List.of(transferProcess)).thenReturn(emptyList()); when(addressResolver.resolveForAsset(any())).thenReturn(DataAddress.Builder.newInstance().type("type").build()); @@ -463,7 +462,7 @@ void requesting_shouldSendMessageAndTransitionToRequested() { var captor = ArgumentCaptor.forClass(TransferRequestMessage.class); verify(dispatcherRegistry).dispatch(eq(TransferProcessAck.class), captor.capture()); var message = captor.getValue(); - assertThat(message.getProcessId()).isEqualTo(null); + assertThat(message.getProcessId()).isEqualTo(process.getId()); assertThat(message.getConsumerPid()).isEqualTo(process.getId()); assertThat(message.getProviderPid()).isEqualTo(null); assertThat(message.getCallbackAddress()).isEqualTo(protocolWebhookUrl); @@ -474,7 +473,7 @@ void requesting_shouldSendMessageAndTransitionToRequested() { @Test void requesting_shouldAddSecretToDataAddress_whenItExists() { var destination = DataAddress.Builder.newInstance().type("any").keyName("keyName").build(); - var process = createTransferProcessBuilder(REQUESTING).dataRequest(createDataRequestBuilder().dataDestination(destination).build()).build(); + var process = createTransferProcessBuilder(REQUESTING).dataDestination(destination).build(); var ack = TransferProcessAck.Builder.newInstance().providerPid("providerPid").build(); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); when(transferProcessStore.nextNotLeased(anyInt(), consumerStateIs(REQUESTING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); @@ -564,7 +563,7 @@ void starting_onFailureAndRetriesExhausted_transitToTerminating() { @Test void completing_provider_shouldTransitionToDeprovisioning_whenSendingMessageSucceed() { - var process = createTransferProcessBuilder(COMPLETING).type(PROVIDER).dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); + var process = createTransferProcessBuilder(COMPLETING).type(PROVIDER).correlationId("correlationId").build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); @@ -585,7 +584,7 @@ void completing_provider_shouldTransitionToDeprovisioning_whenSendingMessageSucc @Test void completing_consumer_shouldTransitionToCompleted_whenSendingMessageSucceed() { - var process = createTransferProcessBuilder(COMPLETING).type(CONSUMER).dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); + var process = createTransferProcessBuilder(COMPLETING).type(CONSUMER).correlationId("correlationId").build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); @@ -606,8 +605,7 @@ void completing_consumer_shouldTransitionToCompleted_whenSendingMessageSucceed() @Test void terminating_provider_shouldTransitionToDeprovisioning_whenMessageSentCorrectly() { - var process = createTransferProcessBuilder(TERMINATING).type(PROVIDER) - .dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); + var process = createTransferProcessBuilder(TERMINATING).type(PROVIDER).correlationId("correlationId").build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(TERMINATING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(TERMINATING.code()).build()); when(dispatcherRegistry.dispatch(any(), isA(TransferTerminationMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); @@ -630,8 +628,7 @@ void terminating_provider_shouldTransitionToDeprovisioning_whenMessageSentCorrec @Test void terminating_consumer_shouldTransitionToTerminated_whenMessageSentCorrectly() { - var process = createTransferProcessBuilder(TERMINATING).type(CONSUMER) - .dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); + var process = createTransferProcessBuilder(TERMINATING).type(CONSUMER).correlationId("correlationId").build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(TERMINATING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(TERMINATING.code()).build()); when(dispatcherRegistry.dispatch(any(), isA(TransferTerminationMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); @@ -652,8 +649,7 @@ void terminating_consumer_shouldTransitionToTerminated_whenMessageSentCorrectly( @Test void terminating_shouldNotTerminateDataTransfer_whenIsConsumer() { - var process = createTransferProcessBuilder(TERMINATING).type(CONSUMER) - .dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); + var process = createTransferProcessBuilder(TERMINATING).type(CONSUMER).correlationId("correlationId").build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(TERMINATING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(TERMINATING.code()).build()); when(dispatcherRegistry.dispatch(any(), isA(TransferTerminationMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); @@ -716,8 +712,7 @@ class Suspending { @Test void provider_shouldSuspendDataFlowAndTransitionToSuspended_whenMessageSentCorrectly() { - var process = createTransferProcessBuilder(SUSPENDING).type(PROVIDER) - .dataRequest(createDataRequestBuilder().id("counterPartyId").build()).build(); + var process = createTransferProcessBuilder(SUSPENDING).type(PROVIDER).correlationId("counterPartyId").build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING.code()).build()); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); @@ -740,8 +735,7 @@ void provider_shouldSuspendDataFlowAndTransitionToSuspended_whenMessageSentCorre @Test void consumer_shouldTransitionToSuspended_whenMessageSentCorrectly() { - var process = createTransferProcessBuilder(SUSPENDING).type(CONSUMER) - .dataRequest(createDataRequestBuilder().id("counterPartyId").build()).build(); + var process = createTransferProcessBuilder(SUSPENDING).type(CONSUMER).correlationId("counterPartyId").build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING.code()).build()); when(dispatcherRegistry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); @@ -890,25 +884,17 @@ private TransferProcess createTransferProcess(TransferProcessStates inState) { private TransferProcess.Builder createTransferProcessBuilder(TransferProcessStates state) { var processId = UUID.randomUUID().toString(); - var dataRequest = createDataRequestBuilder() - .processId(processId) - .connectorAddress("http://an/address") - .build(); return TransferProcess.Builder.newInstance() .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) .type(CONSUMER) .id("test-process-" + processId) .state(state.code()) - .dataRequest(dataRequest); - } - - private DataRequest.Builder createDataRequestBuilder() { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) + .correlationId(UUID.randomUUID().toString()) + .counterPartyAddress("http://an/address") .contractId(UUID.randomUUID().toString()) .assetId(UUID.randomUUID().toString()) - .destinationType(DESTINATION_TYPE) + .dataDestination(DataAddress.Builder.newInstance().type(DESTINATION_TYPE).build()) .protocol("protocol"); } diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/DeprovisionResponsesHandlerTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/DeprovisionResponsesHandlerTest.java index 6854b053349..2bfb1fffe1c 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/DeprovisionResponsesHandlerTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/DeprovisionResponsesHandlerTest.java @@ -18,7 +18,6 @@ import org.eclipse.edc.connector.transfer.TokenTestProvisionResource; import org.eclipse.edc.connector.transfer.observe.TransferProcessObservableImpl; import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResourceSet; import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest; @@ -27,7 +26,6 @@ import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.security.Vault; -import org.eclipse.edc.spi.types.domain.DataAddress; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -170,27 +168,14 @@ void shouldNotChangeStateOnErrorRetry() { private TransferProcess.Builder createTransferProcessBuilder(TransferProcessStates inState) { var processId = UUID.randomUUID().toString(); - var dataRequest = createDataRequestBuilder() - .processId(processId) - .protocol("protocol") - .connectorAddress("http://an/address") - .build(); return TransferProcess.Builder.newInstance() .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) .type(CONSUMER) .id("test-process-" + processId) .state(inState.code()) - .dataRequest(dataRequest); - } - - private DataRequest.Builder createDataRequestBuilder() { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .contractId(UUID.randomUUID().toString()) - .assetId(UUID.randomUUID().toString()) - .dataDestination(DataAddress.Builder.newInstance().type("type") - .build()); + .protocol("protocol") + .counterPartyAddress("http://an/address"); } } diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ProvisionResponsesHandlerTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ProvisionResponsesHandlerTest.java index 168531c2c5a..8f89b208e76 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ProvisionResponsesHandlerTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ProvisionResponsesHandlerTest.java @@ -20,7 +20,6 @@ import org.eclipse.edc.connector.transfer.TestToken; import org.eclipse.edc.connector.transfer.observe.TransferProcessObservableImpl; import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedDataDestinationResource; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResourceSet; @@ -206,23 +205,15 @@ void shouldNotChangeStatus_whenErrorRetry() { private TransferProcess.Builder createTransferProcessBuilder(TransferProcessStates inState) { var processId = UUID.randomUUID().toString(); - var dataRequest = createDataRequestBuilder() - .processId(processId) - .protocol("protocol") - .connectorAddress("http://an/address") - .build(); return TransferProcess.Builder.newInstance() .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) .type(CONSUMER) .id("test-process-" + processId) .state(inState.code()) - .dataRequest(dataRequest); - } - - private DataRequest.Builder createDataRequestBuilder() { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) + .protocol("protocol") + .counterPartyAddress("http://an/address") + .correlationId(UUID.randomUUID().toString()) .contractId(UUID.randomUUID().toString()) .assetId(UUID.randomUUID().toString()) .dataDestination(DataAddress.Builder.newInstance().type("type") diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ResourceManifestGeneratorImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ResourceManifestGeneratorImplTest.java index 442f2d5b29a..770058c3e57 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ResourceManifestGeneratorImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/provision/ResourceManifestGeneratorImplTest.java @@ -17,7 +17,6 @@ import org.eclipse.edc.connector.transfer.TestResourceDefinition; import org.eclipse.edc.connector.transfer.spi.provision.ConsumerResourceDefinitionGenerator; import org.eclipse.edc.connector.transfer.spi.provision.ProviderResourceDefinitionGenerator; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.engine.spi.PolicyContext; import org.eclipse.edc.policy.engine.spi.PolicyEngine; @@ -53,7 +52,7 @@ void setUp() { @Test void shouldGenerateResourceManifestForConsumerManagedTransferProcess() { - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(createDataRequest()).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination()).build(); var resourceDefinition = TestResourceDefinition.Builder.newInstance().id(UUID.randomUUID().toString()).build(); when(consumerGenerator.canGenerate(any(), any())).thenReturn(true); when(consumerGenerator.generate(any(), any())).thenReturn(resourceDefinition); @@ -68,7 +67,7 @@ void shouldGenerateResourceManifestForConsumerManagedTransferProcess() { @Test void shouldGenerateEmptyResourceManifestForNotGeneratedFilter() { - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(createDataRequest()).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination()).build(); when(consumerGenerator.canGenerate(any(), any())).thenReturn(false); when(policyEngine.evaluate(any(), any(), isA(PolicyContext.class))).thenReturn(Result.success()); @@ -80,7 +79,7 @@ void shouldGenerateEmptyResourceManifestForNotGeneratedFilter() { @Test void shouldReturnFailedResultForConsumerWhenPolicyEvaluationFailed() { - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(createDataRequest()).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination()).build(); var resourceDefinition = TestResourceDefinition.Builder.newInstance().id(UUID.randomUUID().toString()).build(); when(consumerGenerator.generate(any(), any())).thenReturn(resourceDefinition); when(policyEngine.evaluate(any(), any(), isA(PolicyContext.class))).thenReturn(Result.failure("error")); @@ -92,7 +91,7 @@ void shouldReturnFailedResultForConsumerWhenPolicyEvaluationFailed() { @Test void shouldGenerateResourceManifestForProviderTransferProcess() { - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(createDataRequest()).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination()).build(); var resourceDefinition = TestResourceDefinition.Builder.newInstance().id(UUID.randomUUID().toString()).build(); when(providerGenerator.canGenerate(any(), any(), any())).thenReturn(true); when(providerGenerator.generate(any(), any(), any())).thenReturn(resourceDefinition); @@ -105,7 +104,7 @@ void shouldGenerateResourceManifestForProviderTransferProcess() { @Test void shouldGenerateEmptyResourceManifestForProviderTransferProcess() { - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(createDataRequest()).build(); + var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination()).build(); when(providerGenerator.canGenerate(any(), any(), any())).thenReturn(false); var resourceManifest = generator.generateProviderResourceManifest(transferProcess, dataAddress, policy); @@ -114,8 +113,7 @@ void shouldGenerateEmptyResourceManifestForProviderTransferProcess() { verifyNoInteractions(consumerGenerator); } - private DataRequest createDataRequest() { - var destination = DataAddress.Builder.newInstance().type("any").build(); - return DataRequest.Builder.newInstance().dataDestination(destination).build(); + private DataAddress dataDestination() { + return DataAddress.Builder.newInstance().type("any").build(); } } diff --git a/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-api/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/api/controller/DspTransferProcessApiControllerTest.java b/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-api/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/api/controller/DspTransferProcessApiControllerTest.java index 3ee03a49a64..64df89e264a 100644 --- a/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-api/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/api/controller/DspTransferProcessApiControllerTest.java +++ b/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-api/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/api/controller/DspTransferProcessApiControllerTest.java @@ -138,7 +138,6 @@ void callEndpoint_shouldUpdateResource(String path, Class messageClass, Strin var captor = ArgumentCaptor.forClass(PostDspRequest.class); verify(dspRequestHandler).updateResource(captor.capture()); var request = captor.getValue(); - assertThat(request.getExpectedMessageType()); assertThat(request.getToken()).isEqualTo("auth"); assertThat(request.getProcessId()).isEqualTo(PROCESS_ID); assertThat(request.getMessage()).isNotNull(); diff --git a/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-transform/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transformer/from/JsonObjectFromTransferProcessTransformerTest.java b/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-transform/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transformer/from/JsonObjectFromTransferProcessTransformerTest.java index e0792c9555c..e8fc97a3921 100644 --- a/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-transform/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transformer/from/JsonObjectFromTransferProcessTransformerTest.java +++ b/data-protocols/dsp/dsp-transfer-process/dsp-transfer-process-transform/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transformer/from/JsonObjectFromTransferProcessTransformerTest.java @@ -16,7 +16,6 @@ import jakarta.json.Json; import jakarta.json.JsonBuilderFactory; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.protocol.dsp.transferprocess.transformer.type.from.JsonObjectFromTransferProcessTransformer; import org.eclipse.edc.spi.types.domain.DataAddress; @@ -54,15 +53,11 @@ void transformTransferProcessProvider() { .property("type", "TestValueProperty") .build(); - var dataRequest = DataRequest.Builder.newInstance() - .id("consumerPid") - .dataDestination(dataAddress) - .build(); - var transferProcess = TransferProcess.Builder.newInstance() .id("providerPid") .callbackAddresses(new ArrayList<>()) - .dataRequest(dataRequest) + .correlationId("consumerPid") + .dataDestination(dataAddress) .type(TransferProcess.Type.PROVIDER) .contentDataAddress(dataAddress) .build(); @@ -86,15 +81,11 @@ void transformTransferProcessConsumer() { .property("type", "TestValueProperty") .build(); - var dataRequest = DataRequest.Builder.newInstance() - .id("providerPid") - .dataDestination(dataAddress) - .build(); - var transferProcess = TransferProcess.Builder.newInstance() .id("consumerPid") .callbackAddresses(new ArrayList<>()) - .dataRequest(dataRequest) + .correlationId("providerPid") + .dataDestination(dataAddress) .type(TransferProcess.Type.CONSUMER) .contentDataAddress(dataAddress) .build(); diff --git a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java index 49de02d5385..29c302a956a 100644 --- a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java +++ b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java @@ -23,7 +23,6 @@ import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.ComponentTest; @@ -151,12 +150,10 @@ private TransferProcess createTransferProcess(String id) { .id(id) .state(TransferProcessStates.STARTED.code()) .type(TransferProcess.Type.PROVIDER) - .dataRequest(DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .destinationType("file") - .protocol("any") - .connectorAddress("http://an/address") - .build()) + .correlationId(UUID.randomUUID().toString()) + .dataDestination(DataAddress.Builder.newInstance().type("file").build()) + .protocol("any") + .counterPartyAddress("http://an/address") .build(); } diff --git a/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/transform/JsonObjectFromTransferProcessTransformerTest.java b/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/transform/JsonObjectFromTransferProcessTransformerTest.java index b60a776bf90..846ec799a78 100644 --- a/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/transform/JsonObjectFromTransferProcessTransformerTest.java +++ b/extensions/control-plane/api/management-api/transfer-process-api/src/test/java/org/eclipse/edc/connector/api/management/transferprocess/transform/JsonObjectFromTransferProcessTransformerTest.java @@ -16,7 +16,6 @@ import jakarta.json.Json; import jakarta.json.JsonObject; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.callback.CallbackAddress; @@ -73,12 +72,10 @@ void transform() { .privateProperties(Map.of("foo", "bar")) .transferType("transferType") .type(CONSUMER) - .dataRequest(DataRequest.Builder.newInstance() - .id("correlationId") - .assetId("assetId") - .contractId("contractId") - .dataDestination(DataAddress.Builder.newInstance().type("any").properties(Map.of("bar", "foo")).build()) - .build()) + .correlationId("correlationId") + .assetId("assetId") + .contractId("contractId") + .dataDestination(DataAddress.Builder.newInstance().type("any").properties(Map.of("bar", "foo")).build()) .callbackAddresses(List.of(CallbackAddress.Builder.newInstance().uri("http://any").events(emptySet()).build())) .errorDetail("an error") .build(); diff --git a/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProviderResourceDefinitionGeneratorTest.java b/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProviderResourceDefinitionGeneratorTest.java index d7a30aa842c..845d7cd05c8 100644 --- a/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProviderResourceDefinitionGeneratorTest.java +++ b/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProviderResourceDefinitionGeneratorTest.java @@ -17,7 +17,6 @@ package org.eclipse.edc.connector.provision.http.impl; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.types.domain.DataAddress; @@ -33,8 +32,9 @@ class HttpProviderResourceDefinitionGeneratorTest { @Test void generate() { - var dataRequest = DataRequest.Builder.newInstance().destinationType("destination").assetId("asset-id").processId("process-id").build(); - var transferProcess = TransferProcess.Builder.newInstance().id("process-id").dataRequest(dataRequest).build(); + var transferProcess = TransferProcess.Builder.newInstance().id("process-id").assetId("asset-id") + .dataDestination(DataAddress.Builder.newInstance().type("destination").build()) + .build(); var policy = Policy.Builder.newInstance().build(); var assetAddress1 = DataAddress.Builder.newInstance().type(DATA_ADDRESS_TYPE).build(); @@ -49,8 +49,9 @@ void generate() { @Test void canGenerate() { - var dataRequest = DataRequest.Builder.newInstance().destinationType("destination").assetId("asset-id").processId("process-id").build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).build(); + var transferProcess = TransferProcess.Builder.newInstance().id("process-id").assetId("asset-id") + .dataDestination(DataAddress.Builder.newInstance().type("destination").build()) + .build(); var policy = Policy.Builder.newInstance().build(); var assetAddress1 = DataAddress.Builder.newInstance().type(DATA_ADDRESS_TYPE).build(); @@ -61,8 +62,9 @@ void canGenerate() { @Test void canGenerate_dataAddressTypeDifferentThanAssetAddressType() { - var dataRequest = DataRequest.Builder.newInstance().destinationType("destination").assetId("asset-id").processId("process-id").build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).build(); + var transferProcess = TransferProcess.Builder.newInstance().id("process-id").assetId("asset-id") + .dataDestination(DataAddress.Builder.newInstance().type("destination").build()) + .build(); var policy = Policy.Builder.newInstance().build(); var assetAddress1 = DataAddress.Builder.newInstance().type("a-different-addressType").build(); diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/README.md b/extensions/control-plane/store/sql/transfer-process-store-sql/README.md index 90064dd5536..050700bfdac 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/README.md +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/README.md @@ -23,3 +23,29 @@ implemented for CosmosDB, and create an equivalent set of clauses for SQL. Thus, That way, dialect-dependent variants can be implemented should the need arise, because the actual SQL statement is encoded in those clauses, offering a fluent Java API. + +## Migrate from 0.5.1 to 0.6.0 + +The schema has changed, the columns contained in `edc_data_request` have been moved to `edc_transfer_process` with this +mapping: +- `datarequest_id` -> `correlation_id` +- `connector_address` -> `counter_party_address` +- `protocol` -> `protocol` +- `asset_id` -> `asset_id` +- `contract_id` -> `contract_id` +- `data_destination` -> `data_destination` + +These columns need to be added to `edc_transfer_process`: +```sql +correlation_id VARCHAR, +counter_party_address VARCHAR, +protocol VARCHAR, +asset_id VARCHAR, +contract_id VARCHAR, +data_destination JSON, +``` + +then they should be filled with the corresponding value in the `edc_data_request` table, they can be joined by: +```edc_data_request.transfer_process_id = edc_transfer_process.transferprocess_id``` + +after that and after upgrading all the connector instance, the `edc_data_request` can be deleted. diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql b/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql index 2a91b187d3f..1e629968a21 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql @@ -37,10 +37,16 @@ CREATE TABLE IF NOT EXISTS edc_transfer_process transfer_type VARCHAR, protocol_messages JSON, data_plane_id VARCHAR, + correlation_id VARCHAR, + counter_party_address VARCHAR, + protocol VARCHAR, + asset_id VARCHAR, + contract_id VARCHAR, + data_destination JSON, lease_id VARCHAR - CONSTRAINT transfer_process_lease_lease_id_fk - REFERENCES edc_lease - ON DELETE SET NULL + CONSTRAINT transfer_process_lease_lease_id_fk + REFERENCES edc_lease + ON DELETE SET NULL ); COMMENT ON COLUMN edc_transfer_process.trace_context IS 'Java Map serialized as JSON'; @@ -57,27 +63,5 @@ COMMENT ON COLUMN edc_transfer_process.deprovisioned_resources IS 'List of depro CREATE UNIQUE INDEX IF NOT EXISTS transfer_process_id_uindex ON edc_transfer_process (transferprocess_id); -CREATE TABLE IF NOT EXISTS edc_data_request -( - datarequest_id VARCHAR NOT NULL - CONSTRAINT data_request_pk - PRIMARY KEY, - process_id VARCHAR NOT NULL, - connector_address VARCHAR NOT NULL, - protocol VARCHAR NOT NULL, - asset_id VARCHAR NOT NULL, - contract_id VARCHAR NOT NULL, - data_destination JSON NOT NULL, - transfer_process_id VARCHAR NOT NULL - CONSTRAINT data_request_transfer_process_id_fk - REFERENCES edc_transfer_process - ON UPDATE RESTRICT ON DELETE CASCADE -); - -COMMENT ON COLUMN edc_data_request.data_destination IS 'DataAddress serialized as JSON'; - -CREATE UNIQUE INDEX IF NOT EXISTS data_request_id_uindex - ON edc_data_request (datarequest_id); - CREATE UNIQUE INDEX IF NOT EXISTS lease_lease_id_uindex ON edc_lease (lease_id); diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java index 492c5068413..dac7efb7de0 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/SqlTransferProcessStore.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.edc.connector.store.sql.transferprocess.store.schema.TransferProcessStoreStatements; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResourceSet; import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; @@ -138,15 +137,12 @@ public StoreResult findByCorrelationIdAndLease(String correlati @Override public void save(TransferProcess entity) { Objects.requireNonNull(entity.getId(), "TransferProcesses must have an ID!"); - if (entity.getDataRequest() == null) { - throw new IllegalArgumentException("Cannot store TransferProcess without a DataRequest"); - } transactionContext.execute(() -> { try (var conn = getConnection()) { var existing = findByIdInternal(conn, entity.getId()); if (existing != null) { leaseContext.by(leaseHolderName).withConnection(conn).breakLease(entity.getId()); - update(conn, entity, existing.getDataRequest().getId()); + update(conn, entity); } else { insert(conn, entity); } @@ -211,20 +207,8 @@ public Stream findAll(QuerySpec querySpec) { }); } - private DataRequest mapDataRequest(ResultSet resultSet) throws SQLException { - return DataRequest.Builder.newInstance() - .id(resultSet.getString("edc_data_request_id")) - .assetId(resultSet.getString(statements.getAssetIdColumn())) - .protocol(resultSet.getString(statements.getProtocolColumn())) - .dataDestination(fromJson(resultSet.getString(statements.getDataDestinationColumn()), DataAddress.class)) - .connectorAddress(resultSet.getString(statements.getConnectorAddressColumn())) - .contractId(resultSet.getString(statements.getContractIdColumn())) - .processId(resultSet.getString(statements.getProcessIdColumn())) - .build(); - } - private QuerySpec correlationIdQuerySpec(String correlationId) { - var criterion = criterion("dataRequest.id", "=", correlationId); + var criterion = criterion("correlationId", "=", correlationId); return QuerySpec.Builder.newInstance().filter(criterion).build(); } @@ -240,7 +224,7 @@ private Stream executeQuery(Connection connection, QuerySpec qu return queryExecutor.query(connection, true, this::mapTransferProcess, statement.getQueryAsString(), statement.getParameters()); } - private void update(Connection conn, TransferProcess process, String existingDataRequestId) { + private void update(Connection conn, TransferProcess process) { var updateStmt = statements.getUpdateTransferProcessTemplate(); queryExecutor.execute(conn, updateStmt, process.getState(), @@ -258,24 +242,13 @@ private void update(Connection conn, TransferProcess process, String existingDat process.getTransferType(), toJson(process.getProtocolMessages()), process.getDataPlaneId(), + process.getCorrelationId(), + process.getCounterPartyAddress(), + process.getProtocol(), + process.getAssetId(), + process.getContractId(), + toJson(process.getDataDestination()), process.getId()); - - var newDr = process.getDataRequest(); - updateDataRequest(conn, newDr, existingDataRequestId); - } - - private void updateDataRequest(Connection conn, DataRequest dataRequest, String existingDataRequestId) { - var updateDrStmt = statements.getUpdateDataRequestTemplate(); - - queryExecutor.execute(conn, updateDrStmt, - dataRequest.getId(), - dataRequest.getProcessId(), - dataRequest.getConnectorAddress(), - dataRequest.getProtocol(), - dataRequest.getAssetId(), - dataRequest.getContractId(), - toJson(dataRequest.getDataDestination()), - existingDataRequestId); } /** @@ -295,7 +268,6 @@ private String getMultiplicityError(int expectedSize, int actualSize) { } private void insert(Connection conn, TransferProcess process) { - // insert TransferProcess var insertTpStatement = statements.getInsertStatement(); queryExecutor.execute(conn, insertTpStatement, process.getId(), process.getState(), @@ -315,26 +287,13 @@ private void insert(Connection conn, TransferProcess process) { process.isPending(), process.getTransferType(), toJson(process.getProtocolMessages()), - process.getDataPlaneId()); - - //insert DataRequest - var dr = process.getDataRequest(); - if (dr != null) { - insertDataRequest(process.getId(), dr, conn); - } - } - - private void insertDataRequest(String processId, DataRequest dr, Connection conn) { - var insertDrStmt = statements.getInsertDataRequestTemplate(); - queryExecutor.execute(conn, insertDrStmt, - dr.getId(), - dr.getProcessId(), - dr.getConnectorAddress(), - dr.getAssetId(), - dr.getContractId(), - toJson(dr.getDataDestination()), - processId, - dr.getProtocol()); + process.getDataPlaneId(), + process.getCorrelationId(), + process.getCounterPartyAddress(), + process.getProtocol(), + process.getAssetId(), + process.getContractId(), + toJson(process.getDataDestination())); } private TransferProcess mapTransferProcess(ResultSet resultSet) throws SQLException { @@ -350,7 +309,12 @@ private TransferProcess mapTransferProcess(ResultSet resultSet) throws SQLExcept .resourceManifest(fromJson(resultSet.getString(statements.getResourceManifestColumn()), ResourceManifest.class)) .provisionedResourceSet(fromJson(resultSet.getString(statements.getProvisionedResourceSetColumn()), ProvisionedResourceSet.class)) .errorDetail(resultSet.getString(statements.getErrorDetailColumn())) - .dataRequest(mapDataRequest(resultSet)) + .correlationId(resultSet.getString(statements.getCorrelationIdColumn())) + .assetId(resultSet.getString(statements.getAssetIdColumn())) + .protocol(resultSet.getString(statements.getProtocolColumn())) + .dataDestination(fromJson(resultSet.getString(statements.getDataDestinationColumn()), DataAddress.class)) + .counterPartyAddress(resultSet.getString(statements.getCounterPartyAddressColumn())) + .contractId(resultSet.getString(statements.getContractIdColumn())) .contentDataAddress(fromJson(resultSet.getString(statements.getContentDataAddressColumn()), DataAddress.class)) .deprovisionedResources(fromJson(resultSet.getString(statements.getDeprovisionedResourcesColumn()), new TypeReference<>() { })) diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java index 5557a68b4fe..1a0b09511b1 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/BaseSqlDialectStatements.java @@ -83,6 +83,12 @@ public String getInsertStatement() { .column(getTransferTypeColumn()) .jsonColumn(getProtocolMessagesColumn()) .column(getDataPlaneIdColumn()) + .column(getCorrelationIdColumn()) + .column(getCounterPartyAddressColumn()) + .column(getProtocolColumn()) + .column(getAssetIdColumn()) + .column(getContractIdColumn()) + .jsonColumn(getDataDestinationColumn()) .insertInto(getTransferProcessTableName()); } @@ -109,40 +115,18 @@ public String getUpdateTransferProcessTemplate() { .column(getTransferTypeColumn()) .jsonColumn(getProtocolMessagesColumn()) .column(getDataPlaneIdColumn()) - .update(getTransferProcessTableName(), getIdColumn()); - } - - @Override - public String getInsertDataRequestTemplate() { - return executeStatement() - .column(getDataRequestIdColumn()) - .column(getProcessIdColumn()) - .column(getConnectorAddressColumn()) + .column(getCorrelationIdColumn()) + .column(getCounterPartyAddressColumn()) + .column(getProtocolColumn()) .column(getAssetIdColumn()) .column(getContractIdColumn()) .jsonColumn(getDataDestinationColumn()) - .column(getTransferProcessIdFkColumn()) - .column(getProtocolColumn()) - .insertInto(getDataRequestTable()); + .update(getTransferProcessTableName(), getIdColumn()); } @Override public String getSelectTemplate() { - return format("SELECT *, edr.%s as edc_data_request_id FROM %s LEFT OUTER JOIN %s edr on %s.%s = edr.%s", getDataRequestIdColumn(), - getTransferProcessTableName(), getDataRequestTable(), getTransferProcessTableName(), getIdColumn(), getProcessIdColumn()); - } - - @Override - public String getUpdateDataRequestTemplate() { - return executeStatement() - .column(getDataRequestIdColumn()) - .column(getProcessIdColumn()) - .column(getConnectorAddressColumn()) - .column(getProtocolColumn()) - .column(getAssetIdColumn()) - .column(getContractIdColumn()) - .jsonColumn(getDataDestinationColumn()) - .update(getDataRequestTable(), getDataRequestIdColumn()); + return "SELECT * FROM %s".formatted(getTransferProcessTableName()); } @Override diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java index 1a734eaa969..0afcdc7ed0d 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/TransferProcessStoreStatements.java @@ -33,12 +33,8 @@ public interface TransferProcessStoreStatements extends StatefulEntityStatements String getUpdateTransferProcessTemplate(); - String getInsertDataRequestTemplate(); - String getSelectTemplate(); - String getUpdateDataRequestTemplate(); - default String getTransferProcessTableName() { return "edc_transfer_process"; } @@ -71,8 +67,8 @@ default String getProtocolColumn() { return "protocol"; } - default String getConnectorAddressColumn() { - return "connector_address"; + default String getCounterPartyAddressColumn() { + return "counter_party_address"; } default String getTransferTypeColumn() { @@ -87,28 +83,16 @@ default String getContractIdColumn() { return "contract_id"; } - default String getProcessIdColumn() { - return "process_id"; - } - default String getPrivatePropertiesColumn() { return "private_properties"; } - default String getDataRequestTable() { - return "edc_data_request"; - } - - default String getTransferProcessIdFkColumn() { - return "transfer_process_id"; - } - default String getDataDestinationColumn() { return "data_destination"; } - default String getDataRequestIdColumn() { - return "datarequest_id"; + default String getCorrelationIdColumn() { + return "correlation_id"; } default String getDeprovisionedResourcesColumn() { diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/DataRequestMapping.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/DataRequestMapping.java deleted file mode 100644 index 76aca03f087..00000000000 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/DataRequestMapping.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2022 Microsoft Corporation - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Microsoft Corporation - initial API and implementation - * Mercedes-Benz Tech Innovation GmbH - connector id removal - * - */ - -package org.eclipse.edc.connector.store.sql.transferprocess.store.schema.postgres; - -import org.eclipse.edc.connector.store.sql.transferprocess.store.schema.TransferProcessStoreStatements; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; -import org.eclipse.edc.sql.translation.JsonFieldTranslator; -import org.eclipse.edc.sql.translation.TranslationMapping; - -/** - * Maps fields of a {@link DataRequest} onto the corresponding - * SQL schema (= column names) enabling access through Postgres JSON operators - */ -class DataRequestMapping extends TranslationMapping { - - private static final String FIELD_ID = "id"; - private static final String FIELD_PROCESS_ID = "processId"; - private static final String FIELD_CONNECTOR_ADDRESS = "connectorAddress"; - private static final String FIELD_PROTOCOL = "protocol"; - private static final String FIELD_ASSET_ID = "assetId"; - private static final String FIELD_CONTRACT_ID = "contractId"; - private static final String FIELD_DATA_DESTINATION = "dataDestination"; - private static final String FIELD_TRANSFER_PROCESS_ID = "transferProcessId"; - - DataRequestMapping(TransferProcessStoreStatements statements) { - add(FIELD_ID, statements.getDataRequestIdColumn()); - add(FIELD_PROCESS_ID, statements.getProcessIdColumn()); - add(FIELD_CONNECTOR_ADDRESS, statements.getConnectorAddressColumn()); - add(FIELD_PROTOCOL, statements.getProtocolColumn()); - add(FIELD_ASSET_ID, statements.getAssetIdColumn()); - add(FIELD_CONTRACT_ID, statements.getContractIdColumn()); - add(FIELD_DATA_DESTINATION, new JsonFieldTranslator(statements.getDataDestinationColumn())); - add(FIELD_TRANSFER_PROCESS_ID, statements.getTransferProcessIdFkColumn()); - } -} diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java index 1c58071630f..bb2a00b50cf 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/store/sql/transferprocess/store/schema/postgres/TransferProcessMapping.java @@ -27,9 +27,7 @@ public class TransferProcessMapping extends StatefulEntityMapping { private static final String FIELD_TYPE = "type"; private static final String FIELD_CREATED_TIMESTAMP = "createdAt"; - private static final String FIELD_DATAREQUEST = "dataRequest"; private static final String FIELD_DATAADDRESS = "dataAddress"; - // this actually an alias for "dataAddress": private static final String FIELD_CONTENTDATAADDRESS = "contentDataAddress"; private static final String FIELD_RESOURCE_MANIFEST = "resourceManifest"; private static final String FIELD_PROVISIONED_RESOURCE_SET = "provisionedResourceSet"; @@ -40,12 +38,23 @@ public class TransferProcessMapping extends StatefulEntityMapping { private static final String FIELD_TRANSFER_TYPE = "transferType"; private static final String FIELD_DATA_PLANE_ID = "dataPlaneId"; + private static final String FIELD_CORRELATION_ID = "correlationId"; + private static final String FIELD_COUNTER_PARTY_ADDRESS = "counterPartyAddress"; + private static final String FIELD_PROTOCOL = "protocol"; + private static final String FIELD_ASSET_ID = "assetId"; + private static final String FIELD_CONTRACT_ID = "contractId"; + private static final String FIELD_DATA_DESTINATION = "dataDestination"; public TransferProcessMapping(TransferProcessStoreStatements statements) { super(statements); add(FIELD_TYPE, statements.getTypeColumn()); add(FIELD_CREATED_TIMESTAMP, statements.getCreatedAtColumn()); - add(FIELD_DATAREQUEST, new DataRequestMapping(statements)); + add(FIELD_CORRELATION_ID, statements.getCorrelationIdColumn()); + add(FIELD_COUNTER_PARTY_ADDRESS, statements.getCounterPartyAddressColumn()); + add(FIELD_PROTOCOL, statements.getProtocolColumn()); + add(FIELD_ASSET_ID, statements.getAssetIdColumn()); + add(FIELD_CONTRACT_ID, statements.getContractIdColumn()); + add(FIELD_DATA_DESTINATION, new JsonFieldTranslator(statements.getDataDestinationColumn())); add(FIELD_DATAADDRESS, new JsonFieldTranslator(statements.getContentDataAddressColumn())); add(FIELD_CONTENTDATAADDRESS, new JsonFieldTranslator(statements.getContentDataAddressColumn())); add(FIELD_RESOURCE_MANIFEST, new ResourceManifestMapping()); diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/test/java/org/eclipse/edc/connector/store/sql/transferprocess/PostgresTransferProcessStoreTest.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/test/java/org/eclipse/edc/connector/store/sql/transferprocess/PostgresTransferProcessStoreTest.java index 0ae3eee9476..c24f1cc1c6b 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/test/java/org/eclipse/edc/connector/store/sql/transferprocess/PostgresTransferProcessStoreTest.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/test/java/org/eclipse/edc/connector/store/sql/transferprocess/PostgresTransferProcessStoreTest.java @@ -59,7 +59,6 @@ void setUp(PostgresqlStoreSetupExtension extension, QueryExecutor queryExecutor) @AfterEach void tearDown(PostgresqlStoreSetupExtension extension) { extension.runQuery("DROP TABLE " + statements.getTransferProcessTableName() + " CASCADE"); - extension.runQuery("DROP TABLE " + statements.getDataRequestTable() + " CASCADE"); extension.runQuery("DROP TABLE " + statements.getLeaseTableName() + " CASCADE"); } diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java index 0d251840e8e..0816efe5ca4 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java @@ -19,7 +19,6 @@ import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.ResponseStatus; @@ -57,11 +56,6 @@ public class DataPlaneSignalingFlowControllerTest { private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, dataPlaneClientFactory, "random"); - @NotNull - private static DataPlaneInstance.Builder dataPlaneInstanceBuilder() { - return DataPlaneInstance.Builder.newInstance().url("http://any"); - } - @Test void canHandle() { var transferProcess = transferProcess("HttpData", HTTP_DATA_PULL); @@ -79,11 +73,9 @@ void canHandle() { CUSTOM_PUSH, }) void initiateFlow_transferSuccess(String transferType) { - var request = createDataRequest(); var source = testDataAddress(); var policy = Policy.Builder.newInstance().assignee("participantId").build(); - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder() .transferType(transferType) .contentDataAddress(testDataAddress()) .build(); @@ -101,7 +93,7 @@ void initiateFlow_transferSuccess(String transferType) { var captured = captor.getValue(); assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); - assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(request.getDataDestination()); + assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(transferProcess.getDataDestination()); assertThat(captured.getParticipantId()).isEqualTo(policy.getAssignee()); assertThat(captured.getAgreementId()).isEqualTo(transferProcess.getContractId()); assertThat(captured.getAssetId()).isEqualTo(transferProcess.getAssetId()); @@ -113,8 +105,7 @@ void initiateFlow_transferSuccess(String transferType) { @Test void initiateFlow_transferSuccess_withReturnedDataAddress() { var policy = Policy.Builder.newInstance().assignee("participantId").build(); - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder() .transferType(HTTP_DATA_PULL) .contentDataAddress(testDataAddress()) .build(); @@ -137,10 +128,8 @@ void initiateFlow_transferSuccess_withReturnedDataAddress() { @Test void initiateFlow_transferSuccess_withoutDataPlane() { - var request = createDataRequest(); var source = testDataAddress(); - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) .transferType(HTTP_DATA_PULL) .build(); @@ -157,7 +146,7 @@ void initiateFlow_transferSuccess_withoutDataPlane() { var captured = captor.getValue(); assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); - assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(request.getDataDestination()); + assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(transferProcess.getDataDestination()); assertThat(captured.getProperties()).isEmpty(); assertThat(captured.getCallbackAddress()).isNotNull(); } @@ -170,8 +159,7 @@ void initiateFlow_transferSuccess_withoutDataPlane() { "", }) void initiateFlow_invalidTransferType(String transferType) { - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) .transferType(transferType) .build(); @@ -186,8 +174,7 @@ void initiateFlow_invalidTransferType(String transferType) { @Test void initiateFlow_returnFailedResultIfTransferFails() { var errorMsg = "error"; - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) .transferType(HTTP_DATA_PULL) .build(); @@ -212,7 +199,6 @@ class Suspend { void shouldCallTerminate() { var transferProcess = TransferProcess.Builder.newInstance() .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .build(); when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success()); @@ -232,7 +218,6 @@ void shouldCallTerminateOnTheRightDataPlane() { var mockedDataPlane = mock(DataPlaneInstance.class); var transferProcess = TransferProcess.Builder.newInstance() .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .dataPlaneId(dataPlaneInstance.getId()) .build(); @@ -253,7 +238,6 @@ void shouldFail_withInvalidDataPlaneId() { var dataPlaneInstance = createDataPlaneInstance(); var transferProcess = TransferProcess.Builder.newInstance() .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .dataPlaneId("invalid") .build(); @@ -269,9 +253,8 @@ void shouldFail_withInvalidDataPlaneId() { @Test void terminate_shouldCallTerminate() { - var transferProcess = TransferProcess.Builder.newInstance() + var transferProcess = transferProcessBuilder() .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .build(); when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); @@ -289,9 +272,8 @@ void terminate_shouldCallTerminate() { void terminate_shouldCallTerminateOnTheRightDataPlane() { var dataPlaneInstance = createDataPlaneInstance(); var mockedDataPlane = mock(DataPlaneInstance.class); - var transferProcess = TransferProcess.Builder.newInstance() + var transferProcess = transferProcessBuilder() .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .dataPlaneId(dataPlaneInstance.getId()) .build(); @@ -310,9 +292,8 @@ void terminate_shouldCallTerminateOnTheRightDataPlane() { @Test void terminate_shouldFail_withInvalidDataPlaneId() { var dataPlaneInstance = createDataPlaneInstance(); - var transferProcess = TransferProcess.Builder.newInstance() + var transferProcess = transferProcessBuilder() .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .dataPlaneId("invalid") .build(); @@ -339,6 +320,12 @@ void transferTypes_shouldReturnTypesForSpecifiedAsset() { assertThat(transferTypes).containsExactly("Custom-PUSH", "Custom-PULL"); } + @NotNull + private DataPlaneInstance.Builder dataPlaneInstanceBuilder() { + return DataPlaneInstance.Builder.newInstance().url("http://any"); + } + + private DataPlaneInstance createDataPlaneInstance() { return dataPlaneInstanceBuilder().build(); } @@ -347,27 +334,20 @@ private DataAddress testDataAddress() { return DataAddress.Builder.newInstance().type("test-type").build(); } - private DataRequest createDataRequest() { - return createDataRequest("test"); + private TransferProcess transferProcess(String destinationType, String transferType) { + return TransferProcess.Builder.newInstance() + .transferType(transferType) + .dataDestination(DataAddress.Builder.newInstance().type(destinationType).build()) + .build(); } - private DataRequest createDataRequest(String destinationType) { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) + private TransferProcess.Builder transferProcessBuilder() { + return TransferProcess.Builder.newInstance() + .correlationId(UUID.randomUUID().toString()) .protocol("test-protocol") .contractId(UUID.randomUUID().toString()) .assetId(UUID.randomUUID().toString()) - .connectorAddress("test.connector.address") - .processId(UUID.randomUUID().toString()) - .destinationType(destinationType) - .build(); - } - - - private TransferProcess transferProcess(String destinationType, String transferType) { - return TransferProcess.Builder.newInstance() - .transferType(transferType) - .dataRequest(DataRequest.Builder.newInstance().destinationType(destinationType).build()) - .build(); + .counterPartyAddress("test.connector.address") + .dataDestination(DataAddress.Builder.newInstance().type("test").build()); } } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java index 31d2579fa04..a717068de73 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java @@ -18,7 +18,6 @@ import org.eclipse.edc.connector.transfer.dataplane.proxy.ConsumerPullDataPlaneProxyResolver; import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.StatusResult; @@ -58,10 +57,9 @@ public boolean canHandle(TransferProcess transferProcess) { @Override public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { var contentAddress = transferProcess.getContentDataAddress(); - var dataRequest = transferProcess.getDataRequest(); - return Optional.ofNullable(selectorService.select(contentAddress, destinationAddress(dataRequest))) - .map(instance -> resolver.toDataAddress(dataRequest, contentAddress, instance) + return Optional.ofNullable(selectorService.select(contentAddress, destinationAddress(transferProcess))) + .map(instance -> resolver.toDataAddress(transferProcess, contentAddress, instance) .map(this::toResponse) .map(StatusResult::success) .orElse(failure -> failure(FATAL_ERROR, "Failed to generate proxy: " + failure.getFailureDetail()))) @@ -84,13 +82,13 @@ public Set transferTypesFor(Asset asset) { } // Shim translation from "Http-PULL" to HttpProxy dataAddress - private DataAddress destinationAddress(DataRequest dataRequest) { - if (transferTypes.contains(dataRequest.getDestinationType())) { + private DataAddress destinationAddress(TransferProcess transferProcess) { + if (transferTypes.contains(transferProcess.getDestinationType())) { var dadBuilder = DataAddress.Builder.newInstance(); - dataRequest.getDataDestination().getProperties().forEach(dadBuilder::property); + transferProcess.getDataDestination().getProperties().forEach(dadBuilder::property); return dadBuilder.type(HTTP_PROXY).build(); } else { - return dataRequest.getDataDestination(); + return transferProcess.getDataDestination(); } } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolver.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolver.java index 7c59875d52c..b46fd1af719 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolver.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolver.java @@ -18,7 +18,7 @@ import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.transfer.dataplane.spi.security.DataEncrypter; import org.eclipse.edc.connector.transfer.dataplane.spi.token.ConsumerPullTokenExpirationDateFunction; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.TypeManager; @@ -61,13 +61,13 @@ private static Object getPublicApiUrl(DataPlaneInstance instance) { .orElseGet(() -> instance.getProperties().get(PUBLIC_API_URL_PROPERTY_DEPRECATED)); } - public Result toDataAddress(DataRequest request, DataAddress address, DataPlaneInstance instance) { + public Result toDataAddress(TransferProcess transferProcess, DataAddress address, DataPlaneInstance instance) { return resolveProxyUrl(instance) - .compose(proxyUrl -> generateAccessToken(address, request.getContractId()) + .compose(proxyUrl -> generateAccessToken(address, transferProcess.getContractId()) .map(token -> DataAddress.Builder.newInstance() .type(EndpointDataReference.EDR_SIMPLE_TYPE) - .property(EndpointDataReference.ID, request.getId()) - .property(EndpointDataReference.CONTRACT_ID, request.getContractId()) + .property(EndpointDataReference.ID, transferProcess.getCorrelationId()) + .property(EndpointDataReference.CONTRACT_ID, transferProcess.getContractId()) .property(EndpointDataReference.ENDPOINT, proxyUrl) .property(EndpointDataReference.AUTH_KEY, HttpHeaders.AUTHORIZATION) .property(EndpointDataReference.AUTH_CODE, token) diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java index 81fba72a3ed..89b86598057 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java @@ -17,7 +17,6 @@ import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.transfer.dataplane.proxy.ConsumerPullDataPlaneProxyResolver; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.result.Failure; @@ -57,8 +56,7 @@ void verifyCanHandle() { void initiateFlow_success() { var proxyAddress = dataAddress(); var instance = mock(DataPlaneInstance.class); - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(dataRequest()) + var transferProcess = transferProcessBuilder(HTTP_PROXY) .contentDataAddress(dataAddress()) .build(); @@ -76,9 +74,8 @@ void initiateFlow_success() { void initiateFlow_success_withTransferType() { var proxyAddress = dataAddress(); var instance = mock(DataPlaneInstance.class); - var transferProcess = TransferProcess.Builder.newInstance() + var transferProcess = transferProcessBuilder(HTTP_PROXY) .transferType(HTTP_DATA_PULL) - .dataRequest(dataRequest()) .contentDataAddress(dataAddress()) .build(); @@ -94,8 +91,7 @@ void initiateFlow_success_withTransferType() { @Test void initiateFlow_returnsFailureIfNoDataPlaneInstance() { - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(dataRequest()) + var transferProcess = transferProcessBuilder(HTTP_PROXY) .contentDataAddress(dataAddress()) .build(); @@ -110,8 +106,7 @@ void initiateFlow_returnsFailureIfNoDataPlaneInstance() { void initiateFlow_returnsFailureIfAddressResolutionFails() { var errorMsg = "Test Error Message"; var instance = mock(DataPlaneInstance.class); - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(dataRequest()) + var transferProcess = transferProcessBuilder(HTTP_PROXY) .contentDataAddress(dataAddress()) .build(); @@ -125,8 +120,7 @@ void initiateFlow_returnsFailureIfAddressResolutionFails() { @Test void terminate_shouldAlwaysReturnSuccess() { - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(dataRequest()) + var transferProcess = transferProcessBuilder(HTTP_PROXY) .contentDataAddress(dataAddress()) .build(); @@ -148,10 +142,20 @@ private TransferProcess transferProcess(String destinationType) { return transferProcess(destinationType, null); } - private TransferProcess transferProcess(String destinationType, String transferType) { + private TransferProcess.Builder transferProcessBuilder(String destinationType) { return TransferProcess.Builder.newInstance() + .correlationId(UUID.randomUUID().toString()) + .protocol("protocol") + .contractId(UUID.randomUUID().toString()) + .assetId(UUID.randomUUID().toString()) + .counterPartyAddress("test.connector.address") + .dataDestination(DataAddress.Builder.newInstance().type(destinationType).build()); + } + + private TransferProcess transferProcess(String destinationType, String transferType) { + return transferProcessBuilder(destinationType) .transferType(transferType) - .dataRequest(DataRequest.Builder.newInstance().destinationType(destinationType).build()) + .dataDestination(DataAddress.Builder.newInstance().type(destinationType).build()) .build(); } @@ -159,15 +163,4 @@ private DataAddress dataAddress() { return DataAddress.Builder.newInstance().type(UUID.randomUUID().toString()).build(); } - private DataRequest dataRequest() { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .protocol("protocol") - .contractId(UUID.randomUUID().toString()) - .assetId(UUID.randomUUID().toString()) - .connectorAddress("test.connector.address") - .processId(UUID.randomUUID().toString()) - .destinationType(HTTP_PROXY) - .build(); - } } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java index 45dfb5ea9b2..2dd51ee16e8 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java @@ -19,7 +19,6 @@ import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.ResponseStatus; @@ -68,10 +67,8 @@ void canHandle() { @Test void initiateFlow_transferSuccess() { - var request = createDataRequest(); var source = testDataAddress(); - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder("test") .contentDataAddress(testDataAddress()) .build(); @@ -88,17 +85,15 @@ void initiateFlow_transferSuccess() { var captured = captor.getValue(); assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); - assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(request.getDataDestination()); + assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(transferProcess.getDataDestination()); assertThat(captured.getProperties()).isEmpty(); assertThat(captured.getCallbackAddress()).isNotNull(); } @Test void initiateFlow_transferSuccess_withoutDataPlane() { - var request = createDataRequest(); var source = testDataAddress(); - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder("test") .contentDataAddress(testDataAddress()) .build(); @@ -114,7 +109,7 @@ void initiateFlow_transferSuccess_withoutDataPlane() { var captured = captor.getValue(); assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); - assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(request.getDataDestination()); + assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(transferProcess.getDataDestination()); assertThat(captured.getProperties()).isEmpty(); assertThat(captured.getCallbackAddress()).isNotNull(); } @@ -122,8 +117,7 @@ void initiateFlow_transferSuccess_withoutDataPlane() { @Test void initiateFlow_returnFailedResultIfTransferFails() { var errorMsg = "error"; - var transferProcess = TransferProcess.Builder.newInstance() - .dataRequest(createDataRequest()) + var transferProcess = transferProcessBuilder("test") .contentDataAddress(testDataAddress()) .build(); @@ -142,9 +136,8 @@ void initiateFlow_returnFailedResultIfTransferFails() { @Test void terminate_shouldCallTerminate() { - var transferProcess = TransferProcess.Builder.newInstance() + var transferProcess = transferProcessBuilder("test") .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .build(); when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); @@ -162,9 +155,8 @@ void terminate_shouldCallTerminate() { void terminate_shouldCallTerminateOnTheRightDataPlane() { var dataPlaneInstance = createDataPlaneInstance(); var mockedDataPlane = mock(DataPlaneInstance.class); - var transferProcess = TransferProcess.Builder.newInstance() + var transferProcess = transferProcessBuilder("test") .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .dataPlaneId(dataPlaneInstance.getId()) .build(); @@ -183,9 +175,8 @@ void terminate_shouldCallTerminateOnTheRightDataPlane() { @Test void terminate_shouldFail_withInvalidDataPlaneId() { var dataPlaneInstance = createDataPlaneInstance(); - var transferProcess = TransferProcess.Builder.newInstance() + var transferProcess = transferProcessBuilder("test") .id("transferProcessId") - .dataRequest(createDataRequest()) .contentDataAddress(testDataAddress()) .dataPlaneId("invalid") .build(); @@ -220,30 +211,23 @@ private DataAddress testDataAddress() { return DataAddress.Builder.newInstance().type("test-type").build(); } - private DataRequest createDataRequest() { - return createDataRequest("test"); - } - - private DataRequest createDataRequest(String destinationType) { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .protocol("test-protocol") - .contractId(UUID.randomUUID().toString()) - .assetId(UUID.randomUUID().toString()) - .connectorAddress("test.connector.address") - .processId(UUID.randomUUID().toString()) - .destinationType(destinationType) - .build(); - } - private TransferProcess transferProcess(String destinationType) { return transferProcess(destinationType, null); } private TransferProcess transferProcess(String destinationType, String transferType) { - return TransferProcess.Builder.newInstance() + return transferProcessBuilder(destinationType) .transferType(transferType) - .dataRequest(DataRequest.Builder.newInstance().destinationType(destinationType).build()) .build(); } + + private TransferProcess.Builder transferProcessBuilder(String destinationType) { + return TransferProcess.Builder.newInstance() + .correlationId(UUID.randomUUID().toString()) + .protocol("test-protocol") + .contractId(UUID.randomUUID().toString()) + .assetId(UUID.randomUUID().toString()) + .counterPartyAddress("test.connector.address") + .dataDestination(DataAddress.Builder.newInstance().type(destinationType).build()); + } } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolverTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolverTest.java index e8ad354f79e..a58baf6b81b 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolverTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/proxy/ConsumerPullDataPlaneProxyResolverTest.java @@ -18,7 +18,7 @@ import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.transfer.dataplane.spi.security.DataEncrypter; import org.eclipse.edc.connector.transfer.dataplane.spi.token.ConsumerPullTokenExpirationDateFunction; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.spi.iam.TokenParameters; import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.Result; @@ -64,27 +64,27 @@ void verifyToDataAddressSuccess() { var expiration = Date.from(Instant.now().plusSeconds(100)); var proxyUrl = "test.proxy.url"; var token = "token-test"; - var request = dataRequest(); var instance = DataPlaneInstance.Builder.newInstance() .id(UUID.randomUUID().toString()) .url("http://some.test.url") .property("publicApiUrl", proxyUrl) .build(); + var transferProcess = transferProcessBuilder().build(); var captor = ArgumentCaptor.forClass(TokenDecorator[].class); when(dataEncrypter.encrypt(TYPE_MANAGER.writeValueAsString(address))).thenReturn(encryptedAddress); - when(tokenExpirationDateFunction.expiresAt(address, request.getContractId())).thenReturn(Result.success(expiration)); + when(tokenExpirationDateFunction.expiresAt(address, transferProcess.getContractId())).thenReturn(Result.success(expiration)); when(tokenGenerationService.generate(any(), captor.capture())) .thenReturn(Result.success(TokenRepresentation.Builder.newInstance().token(token).build())); - var result = resolver.toDataAddress(request, address, instance); + var result = resolver.toDataAddress(transferProcess, address, instance); assertThat(result.succeeded()).isTrue(); var proxyAddress = result.getContent(); assertThat(proxyAddress.getType()).isEqualTo(EndpointDataReference.EDR_SIMPLE_TYPE); assertThat(proxyAddress.getProperties()) - .containsEntry(EndpointDataReference.ID, request.getId()) - .containsEntry(EndpointDataReference.CONTRACT_ID, request.getContractId()) + .containsEntry(EndpointDataReference.ID, transferProcess.getCorrelationId()) + .containsEntry(EndpointDataReference.CONTRACT_ID, transferProcess.getContractId()) .containsEntry(EndpointDataReference.ENDPOINT, proxyUrl) .containsEntry(EndpointDataReference.AUTH_KEY, HttpHeaders.AUTHORIZATION) .containsEntry(EndpointDataReference.AUTH_CODE, token); @@ -114,8 +114,9 @@ void verifyToDataAddressReturnsFailureIfMissingPublicApiUrl() { .id(UUID.randomUUID().toString()) .url("http://some.test.url") .build(); + var transferProcess = transferProcessBuilder().build(); - var result = resolver.toDataAddress(dataRequest(), dataAddress(), instance); + var result = resolver.toDataAddress(transferProcess, dataAddress(), instance); assertThat(result.failed()).isTrue(); assertThat(result.getFailureDetail()).isEqualTo("Missing property `https://w3id.org/edc/v0.0.1/ns/publicApiUrl` (deprecated: `publicApiUrl`) in DataPlaneInstance"); @@ -130,11 +131,12 @@ void verifyToDataAddressReturnsFailureIfTokenExpirationDateFunctionFails() { .url("http://some.test.url") .property("publicApiUrl", "test.proxy.url") .build(); + var transferProcess = transferProcessBuilder().build(); when(dataEncrypter.encrypt(TYPE_MANAGER.writeValueAsString(address))).thenReturn("encryptedAddress"); when(tokenExpirationDateFunction.expiresAt(any(), any())).thenReturn(Result.failure(errorMsg)); - var result = resolver.toDataAddress(dataRequest(), address, instance); + var result = resolver.toDataAddress(transferProcess, address, instance); assertThat(result.failed()).isTrue(); assertThat(result.getFailureDetail()).contains(errorMsg); @@ -144,7 +146,7 @@ void verifyToDataAddressReturnsFailureIfTokenExpirationDateFunctionFails() { void verifyToDataAddressReturnsFailureIfTokenGenerationFails() { var address = dataAddress(); var errorMsg = "error test"; - var request = dataRequest(); + var transferProcess = transferProcessBuilder().build(); var expiration = Date.from(Instant.now().plusSeconds(100)); var instance = DataPlaneInstance.Builder.newInstance() .id(UUID.randomUUID().toString()) @@ -153,24 +155,22 @@ void verifyToDataAddressReturnsFailureIfTokenGenerationFails() { .build(); when(dataEncrypter.encrypt(TYPE_MANAGER.writeValueAsString(address))).thenReturn("encryptedAddress"); - when(tokenExpirationDateFunction.expiresAt(address, request.getContractId())).thenReturn(Result.success(expiration)); + when(tokenExpirationDateFunction.expiresAt(address, transferProcess.getContractId())).thenReturn(Result.success(expiration)); when(tokenGenerationService.generate(any(), any(TokenDecorator[].class))).thenReturn(Result.failure(errorMsg)); - var result = resolver.toDataAddress(request, address, instance); + var result = resolver.toDataAddress(transferProcess, address, instance); assertThat(result.failed()).isTrue(); assertThat(result.getFailureDetail()).contains(errorMsg); } - private DataRequest dataRequest() { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .protocol("protocol") + private TransferProcess.Builder transferProcessBuilder() { + return TransferProcess.Builder.newInstance() + .correlationId(UUID.randomUUID().toString()) + .protocol("test-protocol") .contractId(UUID.randomUUID().toString()) .assetId(UUID.randomUUID().toString()) - .connectorAddress("test.connector.address") - .processId(UUID.randomUUID().toString()) - .destinationType(HTTP_PROXY) - .build(); + .counterPartyAddress("test.connector.address") + .dataDestination(DataAddress.Builder.newInstance().type(HTTP_PROXY).build()); } -} \ No newline at end of file +} diff --git a/extensions/control-plane/transfer/transfer-pull-http-dynamic-receiver/src/test/java/org/eclipse/edc/connector/receiver/http/dynamic/TestFunctions.java b/extensions/control-plane/transfer/transfer-pull-http-dynamic-receiver/src/test/java/org/eclipse/edc/connector/receiver/http/dynamic/TestFunctions.java index 723a5a1c887..3efd32e1bed 100644 --- a/extensions/control-plane/transfer/transfer-pull-http-dynamic-receiver/src/test/java/org/eclipse/edc/connector/receiver/http/dynamic/TestFunctions.java +++ b/extensions/control-plane/transfer/transfer-pull-http-dynamic-receiver/src/test/java/org/eclipse/edc/connector/receiver/http/dynamic/TestFunctions.java @@ -14,7 +14,6 @@ package org.eclipse.edc.connector.receiver.http.dynamic; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.spi.types.domain.DataAddress; @@ -37,8 +36,8 @@ public static TransferProcess createTransferProcess(String id, Map { public static final String TRANSFER_PROCESS_DATA_DESTINATION = EDC_NAMESPACE + "dataDestination"; public static final String TRANSFER_PROCESS_CALLBACK_ADDRESSES = EDC_NAMESPACE + "callbackAddresses"; private Type type = CONSUMER; - private DataRequest dataRequest; + private String protocol; + private String correlationId; + private String counterPartyAddress; + private DataAddress dataDestination; + private String assetId; + private String contractId; private DataAddress contentDataAddress; private ResourceManifest resourceManifest; private ProvisionedResourceSet provisionedResourceSet = ProvisionedResourceSet.Builder.newInstance().build(); @@ -131,7 +136,6 @@ public class TransferProcess extends StatefulEntity { private ProtocolMessages protocolMessages = new ProtocolMessages(); private String transferType; - private String dataPlaneId; private TransferProcess() { @@ -145,10 +149,6 @@ public Type getType() { return type; } - public DataRequest getDataRequest() { - return dataRequest; - } - public ResourceManifest getResourceManifest() { return resourceManifest; } @@ -385,7 +385,7 @@ public boolean currentStateIsOneOf(TransferProcessStates... states) { @JsonIgnore public String getCorrelationId() { - return dataRequest.getId(); + return correlationId; } /** @@ -395,7 +395,7 @@ public String getCorrelationId() { * @param correlationId the correlation id. */ public void setCorrelationId(String correlationId) { - dataRequest.setId(correlationId); + this.correlationId = correlationId; } @JsonIgnore @@ -404,8 +404,8 @@ public List getProvisionedResources() { } @JsonIgnore - public String getConnectorAddress() { - return dataRequest.getConnectorAddress(); + public String getCounterPartyAddress() { + return counterPartyAddress; } /** @@ -418,32 +418,32 @@ public String getTransferType() { @JsonIgnore public String getAssetId() { - return dataRequest.getAssetId(); + return assetId; } @JsonIgnore public String getContractId() { - return dataRequest.getContractId(); + return contractId; } @JsonIgnore public DataAddress getDataDestination() { - return dataRequest.getDataDestination(); + return dataDestination; } @JsonIgnore public String getProtocol() { - return dataRequest.getProtocol(); + return protocol; } @JsonIgnore public void updateDestination(DataAddress dataAddress) { - dataRequest.updateDestination(dataAddress); + this.dataDestination = dataAddress; } @JsonIgnore public String getDestinationType() { - return dataRequest.getDestinationType(); + return dataDestination.getType(); } public String getDataPlaneId() { @@ -454,7 +454,12 @@ public String getDataPlaneId() { public TransferProcess copy() { var builder = Builder.newInstance() .resourceManifest(resourceManifest) - .dataRequest(dataRequest) + .protocol(protocol) + .correlationId(correlationId) + .counterPartyAddress(counterPartyAddress) + .dataDestination(dataDestination) + .assetId(assetId) + .contractId(contractId) .provisionedResourceSet(provisionedResourceSet) .contentDataAddress(contentDataAddress) .deprovisionedResources(deprovisionedResources) @@ -547,11 +552,6 @@ public Builder type(Type type) { return this; } - public Builder dataRequest(DataRequest request) { - entity.dataRequest = request; - return this; - } - public Builder resourceManifest(ResourceManifest manifest) { entity.resourceManifest = manifest; return this; @@ -597,6 +597,36 @@ public Builder dataPlaneId(String dataPlaneId) { return this; } + public Builder protocol(String protocol) { + entity.protocol = protocol; + return this; + } + + public Builder correlationId(String correlationId) { + entity.correlationId = correlationId; + return this; + } + + public Builder counterPartyAddress(String counterPartyAddress) { + entity.counterPartyAddress = counterPartyAddress; + return this; + } + + public Builder dataDestination(DataAddress dataDestination) { + entity.dataDestination = dataDestination; + return this; + } + + public Builder assetId(String assetId) { + entity.assetId = assetId; + return this; + } + + public Builder contractId(String contractId) { + entity.contractId = contractId; + return this; + } + @Override public Builder self() { return this; @@ -606,12 +636,6 @@ public Builder self() { public TransferProcess build() { super.build(); - if (entity.dataRequest == null) { - entity.dataRequest = DataRequest.Builder.newInstance().destinationType("type").build(); - } - - entity.dataRequest.associateWithProcessId(entity.id); - if (entity.resourceManifest != null) { entity.resourceManifest.setTransferProcessId(entity.id); } diff --git a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/types/DataRequestTest.java b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/types/DataRequestTest.java deleted file mode 100644 index 0ac53b35607..00000000000 --- a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/transfer/spi/types/DataRequestTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2021 Microsoft Corporation - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Microsoft Corporation - initial API and implementation - * - */ - -package org.eclipse.edc.connector.transfer.spi.types; - -import org.eclipse.edc.spi.types.domain.asset.Asset; -import org.junit.jupiter.api.Test; - -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.assertThrows; - - -class DataRequestTest { - - @Test - void verifyNoDestination() { - var id = UUID.randomUUID().toString(); - var asset = Asset.Builder.newInstance().build(); - - assertThrows(IllegalArgumentException.class, () -> DataRequest.Builder.newInstance().id(id).assetId(asset.getId()).build()); - } - -} diff --git a/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TestFunctions.java b/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TestFunctions.java index 155001fcd44..f3d0f96455b 100644 --- a/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TestFunctions.java +++ b/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TestFunctions.java @@ -19,14 +19,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResource; import org.eclipse.edc.connector.transfer.spi.types.ResourceDefinition; import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.spi.types.domain.callback.CallbackAddress; import org.jetbrains.annotations.NotNull; @@ -41,28 +39,6 @@ public static ResourceManifest createManifest() { .build(); } - public static DataRequest createDataRequest() { - return createDataRequest("test-process-id"); - } - - public static DataRequest.Builder createDataRequestBuilder() { - return DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .dataDestination(createDataAddressBuilder("Test Address Type") - .keyName("Test Key Name") - .build()) - .connectorAddress("http://some-connector.com") - .protocol("protocol") - .contractId("some-contract") - .assetId(Asset.Builder.newInstance().id("asset-id").build().getId()) - .processId("test-process-id"); - } - - public static DataRequest createDataRequest(String transferProcessId) { - return createDataRequestBuilder().processId(transferProcessId) - .build(); - } - public static TransferProcess createTransferProcess() { return createTransferProcess("test-process"); } @@ -71,10 +47,6 @@ public static TransferProcess createTransferProcess(String processId, TransferPr return createTransferProcessBuilder(processId).state(state.code()).build(); } - public static TransferProcess createTransferProcess(String processId, DataRequest dataRequest) { - return createTransferProcessBuilder(processId).dataRequest(dataRequest).build(); - } - public static TransferProcess createTransferProcess(String processId) { return createTransferProcessBuilder(processId) .state(TransferProcessStates.INITIAL.code()) @@ -87,7 +59,6 @@ public static TransferProcess.Builder createTransferProcessBuilder(String proces .createdAt(Clock.systemUTC().millis()) .state(TransferProcessStates.INITIAL.code()) .type(TransferProcess.Type.CONSUMER) - .dataRequest(createDataRequest()) .contentDataAddress(createDataAddressBuilder("any").build()) .callbackAddresses(List.of(CallbackAddress.Builder.newInstance().uri("local://test").build())) .resourceManifest(createManifest()); @@ -100,12 +71,7 @@ public static DataAddress.Builder createDataAddressBuilder(String type) { @NotNull public static TransferProcess initialTransferProcess() { - return initialTransferProcess(UUID.randomUUID().toString(), "clientid"); - } - - @NotNull - public static TransferProcess initialTransferProcess(String processId, String dataRequestId) { - return createTransferProcess(processId, createDataRequestBuilder().id(dataRequestId).build()); + return createTransferProcessBuilder(UUID.randomUUID().toString()).correlationId("clientid").build(); } @JsonTypeName("dataspaceconnector:testresourcedef") diff --git a/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java b/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java index 801bdb64bd5..57d21410c84 100644 --- a/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java +++ b/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java @@ -43,8 +43,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.eclipse.edc.connector.transfer.spi.testfixtures.store.TestFunctions.createDataAddressBuilder; -import static org.eclipse.edc.connector.transfer.spi.testfixtures.store.TestFunctions.createDataRequest; -import static org.eclipse.edc.connector.transfer.spi.testfixtures.store.TestFunctions.createDataRequestBuilder; import static org.eclipse.edc.connector.transfer.spi.testfixtures.store.TestFunctions.createTransferProcess; import static org.eclipse.edc.connector.transfer.spi.testfixtures.store.TestFunctions.createTransferProcessBuilder; import static org.eclipse.edc.connector.transfer.spi.testfixtures.store.TestFunctions.initialTransferProcess; @@ -69,7 +67,7 @@ class Create { @Test void shouldCreateTheEntity() { var transferProcess = createTransferProcessBuilder("test-id") - .dataRequest(createDataRequestBuilder().id("data-request-id").build()) + .correlationId("data-request-id") .privateProperties(Map.of("key", "value")).build(); getTransferProcessStore().save(transferProcess); @@ -95,7 +93,7 @@ void verifyCallbacks() { @Test void verifyTransferType() { - var t = createTransferProcessBuilder("test-id").transferType("transferType").dataRequest(createDataRequestBuilder().build()).build(); + var t = createTransferProcessBuilder("test-id").transferType("transferType").build(); getTransferProcessStore().save(t); var all = getTransferProcessStore().findAll(QuerySpec.none()).collect(Collectors.toList()); @@ -106,7 +104,7 @@ void verifyTransferType() { @Test void verifyDataPlaneId() { - var t = createTransferProcessBuilder("test-id").dataPlaneId("dataPlaneId").dataRequest(createDataRequestBuilder().build()).build(); + var t = createTransferProcessBuilder("test-id").dataPlaneId("dataPlaneId").build(); getTransferProcessStore().save(t); var all = getTransferProcessStore().findAll(QuerySpec.none()).collect(Collectors.toList()); @@ -321,8 +319,7 @@ void notExist() { class FindForCorrelationId { @Test void shouldFindEntityByCorrelationId() { - var dataRequest = createDataRequestBuilder().id("correlationId").build(); - var transferProcess = createTransferProcessBuilder("id1").dataRequest(dataRequest).build(); + var transferProcess = createTransferProcessBuilder("id1").correlationId("correlationId").build(); getTransferProcessStore().save(transferProcess); var res = getTransferProcessStore().findForCorrelationId("correlationId"); @@ -390,22 +387,19 @@ void leasedByOther_shouldThrowException() { @Test void shouldReplaceDataRequest_whenItGetsTheIdUpdated() { var builder = createTransferProcessBuilder("id1").state(STARTED.code()); - var newDataRequest = createDataRequestBuilder() - .id("new-dr-id") + getTransferProcessStore().save(builder.build()); + var newTransferProcess = builder.correlationId("new-dr-id") .assetId("new-asset") .contractId("new-contract") - .protocol("test-protocol") - .build(); - getTransferProcessStore().save(builder.build()); - getTransferProcessStore().save(builder.dataRequest(newDataRequest).build()); + .protocol("test-protocol").build(); + getTransferProcessStore().save(newTransferProcess); var result = getTransferProcessStore().findAll(QuerySpec.none()); assertThat(result) .hasSize(1) .usingRecursiveFieldByFieldElementComparator() - .map(TransferProcess::getDataRequest) - .containsExactly(newDataRequest); + .containsExactly(newTransferProcess); } } @@ -607,34 +601,15 @@ void queryByDataAddress_invalidKey_valueNotExist() { } @Test - void queryByDataRequestProperty_processId() { - var da = createDataRequest(); + void queryByCorrelationId() { var tp = createTransferProcessBuilder("testprocess1") - .dataRequest(da) + .correlationId("counterPartyId") .build(); getTransferProcessStore().save(tp); getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() - .filter(List.of(new Criterion("dataRequest.processId", "=", "testprocess1"))) - .build(); - - var result = getTransferProcessStore().findAll(query); - - assertThat(result).usingRecursiveFieldByFieldElementComparatorIgnoringFields("deprovisionedResources").containsOnly(tp); - } - - @Test - void queryByDataRequestProperty_id() { - var da = createDataRequest(); - var tp = createTransferProcessBuilder("testprocess1") - .dataRequest(da) - .build(); - getTransferProcessStore().save(tp); - getTransferProcessStore().save(createTransferProcess("testprocess2")); - - var query = QuerySpec.Builder.newInstance() - .filter(List.of(new Criterion("dataRequest.id", "=", da.getId()))) + .filter(List.of(new Criterion("correlationId", "=", "counterPartyId"))) .build(); var result = getTransferProcessStore().findAll(query); @@ -644,15 +619,14 @@ void queryByDataRequestProperty_id() { @Test void queryByDataRequestProperty_protocol() { - var da = createDataRequestBuilder().protocol("%/protocol").build(); var tp = createTransferProcessBuilder("testprocess1") - .dataRequest(da) + .protocol("test-protocol") .build(); getTransferProcessStore().save(tp); getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() - .filter(List.of(new Criterion("dataRequest.protocol", "like", "%/protocol"))) + .filter(List.of(new Criterion("protocol", "like", "test-protocol"))) .build(); var result = getTransferProcessStore().findAll(query); @@ -662,9 +636,7 @@ void queryByDataRequestProperty_protocol() { @Test void queryByDataRequest_valueNotExist() { - var da = createDataRequest(); var tp = createTransferProcessBuilder("testprocess1") - .dataRequest(da) .build(); getTransferProcessStore().save(tp); getTransferProcessStore().save(createTransferProcess("testprocess2")); @@ -981,8 +953,7 @@ class FindByCorrelationIdAndLease { void shouldReturnTheEntityAndLeaseIt() { var id = UUID.randomUUID().toString(); var correlationId = UUID.randomUUID().toString(); - var dataRequest = createDataRequestBuilder().id(correlationId).build(); - getTransferProcessStore().save(createTransferProcessBuilder(id).dataRequest(dataRequest).build()); + getTransferProcessStore().save(createTransferProcessBuilder(id).correlationId(correlationId).build()); var result = getTransferProcessStore().findByCorrelationIdAndLease(correlationId); @@ -1001,8 +972,7 @@ void shouldReturnNotFound_whenEntityDoesNotExist() { void shouldReturnAlreadyLeased_whenEntityIsAlreadyLeased() { var id = UUID.randomUUID().toString(); var correlationId = UUID.randomUUID().toString(); - var dataRequest = createDataRequestBuilder().id(correlationId).build(); - getTransferProcessStore().save(createTransferProcessBuilder(id).dataRequest(dataRequest).build()); + getTransferProcessStore().save(createTransferProcessBuilder(id).correlationId(correlationId).build()); leaseEntity(id, "other owner"); var result = getTransferProcessStore().findByCorrelationIdAndLease(correlationId); diff --git a/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/TransferProcessApiEndToEndTest.java b/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/TransferProcessApiEndToEndTest.java index ffa5b4e4199..cbdadf3fdef 100644 --- a/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/TransferProcessApiEndToEndTest.java +++ b/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/TransferProcessApiEndToEndTest.java @@ -20,7 +20,6 @@ import jakarta.json.JsonArrayBuilder; import jakarta.json.JsonObject; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.jsonld.util.JacksonJsonLd; import org.eclipse.edc.junit.annotations.EndToEndTest; @@ -249,17 +248,14 @@ private TransferProcess.Builder createTransferProcessBuilder(String id) { return TransferProcess.Builder.newInstance() .id(id) .callbackAddresses(List.of(CallbackAddress.Builder.newInstance().uri("http://any").events(emptySet()).build())) - .dataRequest(DataRequest.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .dataDestination(DataAddress.Builder.newInstance() - .type("type") - .build()) - .protocol("dataspace-protocol-http") - .assetId("asset-id") - .contractId("contractId") - .connectorAddress("http://connector/address") - .processId(id) - .build()); + .correlationId(UUID.randomUUID().toString()) + .dataDestination(DataAddress.Builder.newInstance() + .type("type") + .build()) + .protocol("dataspace-protocol-http") + .assetId("asset-id") + .contractId("contractId") + .counterPartyAddress("http://connector/address"); } private JsonArrayBuilder createCallbackAddress() { diff --git a/system-tests/protocol-test/src/test/java/org/eclipse/edc/test/e2e/protocol/DspTransferApiEndToEndTest.java b/system-tests/protocol-test/src/test/java/org/eclipse/edc/test/e2e/protocol/DspTransferApiEndToEndTest.java index bbd40bf2543..38a6787d2e2 100644 --- a/system-tests/protocol-test/src/test/java/org/eclipse/edc/test/e2e/protocol/DspTransferApiEndToEndTest.java +++ b/system-tests/protocol-test/src/test/java/org/eclipse/edc/test/e2e/protocol/DspTransferApiEndToEndTest.java @@ -19,12 +19,12 @@ import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates; import org.eclipse.edc.connector.spi.protocol.ProtocolVersionRegistry; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.junit.testfixtures.TestUtils; import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement; import org.eclipse.edc.spi.types.domain.offer.ContractOffer; import org.junit.jupiter.api.Test; @@ -66,7 +66,10 @@ void shouldExposeVersion2024_1() { var id = UUID.randomUUID().toString(); var contractId = UUID.randomUUID().toString(); var transfer = TransferProcess.Builder.newInstance() - .id(id).dataRequest(DataRequest.Builder.newInstance().id("any").destinationType("any").contractId(contractId).build()).state(REQUESTED.code()) + .id(id) + .contractId(contractId) + .dataDestination(DataAddress.Builder.newInstance().type("any").build()) + .state(REQUESTED.code()) .build(); runtime.getService(TransferProcessStore.class).save(transfer); runtime.getService(ContractNegotiationStore.class).save(createNegotiationWithAgreement(contractId));