Skip to content

Commit

Permalink
feat(dsp): implement transfer resumption
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Mar 21, 2024
1 parent 7f9b059 commit 362a353
Show file tree
Hide file tree
Showing 24 changed files with 903 additions and 375 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.PROVIDER;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.SUSPENDED;

public class TransferProcessProtocolServiceImpl implements TransferProcessProtocolService {

Expand Down Expand Up @@ -192,6 +193,11 @@ private ServiceResult<TransferProcess> startedAction(TransferStartMessage messag
.build();
observable.invokeForEach(l -> l.started(transferProcess, transferStartedData));
return ServiceResult.success(transferProcess);
} else if (transferProcess.getType() == PROVIDER && transferProcess.currentStateIsOneOf(SUSPENDED)) {
transferProcess.protocolMessageReceived(message.getId());
transferProcess.transitionStarting();
update(transferProcess);
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be started"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.edc.connector.transfer.spi.types.command.CompleteTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionCompleteCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.ResumeTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.SuspendTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.spi.command.CommandHandlerRegistry;
Expand Down Expand Up @@ -106,6 +107,11 @@ public ServiceResult<List<TransferProcess>> search(QuerySpec query) {
return execute(command);
}

@Override
public @NotNull ServiceResult<Void> resume(ResumeTransferCommand command) {
return execute(command);
}

@Override
public @NotNull ServiceResult<Void> deprovision(String transferProcessId) {
return execute(new DeprovisionRequest(transferProcessId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener;
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessStartedData;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage;
Expand Down Expand Up @@ -72,6 +73,7 @@
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.INITIAL;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.REQUESTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTING;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.SUSPENDED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
Expand Down Expand Up @@ -250,98 +252,166 @@ void notifyRequested_missingDestination_shouldInitiateTransfer() {
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void notifyStarted_shouldTransitionToStarted() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.dataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
var agreement = contractAgreement();
var transferProcess = transferProcess(STARTED, "transferProcessId");
@Nested
class NotifyStarted {
@Test
void shouldTransitionToStarted() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.dataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
var agreement = contractAgreement();
var transferProcess = transferProcess(STARTED, "transferProcessId");

when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

var result = service.notifyStarted(message, tokenRepresentation);
var result = service.notifyStarted(message, tokenRepresentation);

var startedDataCaptor = ArgumentCaptor.forClass(TransferProcessStartedData.class);
var transferProcessCaptor = ArgumentCaptor.forClass(TransferProcess.class);
assertThat(result).isSucceeded();
verify(listener).preStarted(any());
verify(store).save(transferProcessCaptor.capture());
verify(store).save(argThat(t -> t.getState() == STARTED.code()));
verify(listener).started(any(), startedDataCaptor.capture());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
assertThat(startedDataCaptor.getValue().getDataAddress()).usingRecursiveComparison().isEqualTo(message.getDataAddress());
}
var startedDataCaptor = ArgumentCaptor.forClass(TransferProcessStartedData.class);
var transferProcessCaptor = ArgumentCaptor.forClass(TransferProcess.class);
assertThat(result).isSucceeded();
verify(listener).preStarted(any());
verify(store).save(transferProcessCaptor.capture());
verify(store).save(argThat(t -> t.getState() == STARTED.code()));
verify(listener).started(any(), startedDataCaptor.capture());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
assertThat(startedDataCaptor.getValue().getDataAddress()).usingRecursiveComparison().isEqualTo(message.getDataAddress());
}

@Test
void notifyStarted_shouldReturnConflict_whenTransferCannotBeStarted() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString());
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.build();
var agreement = contractAgreement();
@Test
void shouldReturnConflict_whenTransferCannotBeStarted() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString());
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.build();
var agreement = contractAgreement();

when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

var result = service.notifyStarted(message, tokenRepresentation);
var result = service.notifyStarted(message, tokenRepresentation);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
// state didn't change
verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code()));
verifyNoInteractions(listener);
assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
// state didn't change
verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code()));
verifyNoInteractions(listener);
}

@Test
void shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.dataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
var agreement = contractAgreement();

var transferProcess = transferProcess(REQUESTED, "transferProcessId");
when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error"));

var result = service.notifyStarted(message, tokenRepresentation);

assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(BAD_REQUEST);

verify(store, times(1)).save(any());

}
}

@Test
void notifyStarted_shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.dataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
var agreement = contractAgreement();
@Nested
class NotifyStartedResumed {

var transferProcess = transferProcess(REQUESTED, "transferProcessId");
when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error"));
@Test
void shouldTransitionToStartedAndStartDataFlow_whenProvider() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.dataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
var agreement = contractAgreement();
var transferProcess = transferProcessBuilder().id("transferProcessId")
.state(SUSPENDED.code()).type(PROVIDER).build();

var result = service.notifyStarted(message, tokenRepresentation);
when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(BAD_REQUEST);
var result = service.notifyStarted(message, tokenRepresentation);

verify(store, times(1)).save(any());
var transferProcessCaptor = ArgumentCaptor.forClass(TransferProcess.class);
assertThat(result).isSucceeded();
verify(store).save(transferProcessCaptor.capture());
var storedTransferProcess = transferProcessCaptor.getValue();
assertThat(storedTransferProcess.getState()).isEqualTo(STARTING.code());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void shouldReturnError_whenStatusIsNotSuspendedAndTypeProvider() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferStartMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.dataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
var agreement = contractAgreement();
var transferProcess = transferProcessBuilder().id("transferProcessId")
.state(REQUESTED.code()).type(PROVIDER).build();
var dataFlowResponse = DataFlowResponse.Builder.newInstance().dataPlaneId("dataPlaneId").build();
when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse));

when(protocolTokenValidator.verify(eq(tokenRepresentation), eq(TRANSFER_PROCESS_REQUEST_SCOPE), any())).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

var result = service.notifyStarted(message, tokenRepresentation);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.DeprovisionRequest;
import org.eclipse.edc.connector.transfer.spi.types.command.ResumeTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.SuspendTransferCommand;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.spi.command.CommandHandlerRegistry;
Expand Down Expand Up @@ -209,6 +210,31 @@ void shouldFailWhenCommandHandlerFails() {

}

@Nested
class Resume {

@Test
void shouldExecuteCommandAndReturnResult() {
when(commandHandlerRegistry.execute(any())).thenReturn(CommandResult.success());
var command = new ResumeTransferCommand("id");

var result = service.resume(command);

assertThat(result).isSucceeded();
verify(commandHandlerRegistry).execute(command);
}

@Test
void shouldFailWhenCommandHandlerFails() {
when(commandHandlerRegistry.execute(any())).thenReturn(CommandResult.notFound("not found"));
var command = new ResumeTransferCommand("id");

var result = service.resume(command);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
}
}

@Test
void deprovision() {
when(commandHandlerRegistry.execute(any())).thenReturn(CommandResult.success());
Expand Down
Loading

0 comments on commit 362a353

Please sign in to comment.