Skip to content

Commit

Permalink
feat: support end-to-end idempotency for process state replication (#…
Browse files Browse the repository at this point in the history
…3777)

* feat: support end-to-end idempotency for process state replication

* PR remarks
  • Loading branch information
ndr-brt authored Jan 17, 2024
1 parent 14efd0a commit 89c0903
Show file tree
Hide file tree
Showing 36 changed files with 784 additions and 383 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.spi.entity;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.spi.types.TypeManager;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class ProtocolMessagesTest {

private final ObjectMapper mapper = new TypeManager().getMapper();

@Test
void serdes() throws JsonProcessingException {
var protocolMessages = new ProtocolMessages();
protocolMessages.addReceived("received");
protocolMessages.setLastSent("lastSent");

var json = mapper.writeValueAsString(protocolMessages);
var deserialized = mapper.readValue(json, ProtocolMessages.class);

assertThat(deserialized).usingRecursiveComparison().isEqualTo(protocolMessages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ boolean process(E entity, String description) {
if (throwable == null) {
onSuccessHandler.accept(reloadedEntity, result);
} else {
if (retriesExhausted(entity)) {
if (retriesExhausted(reloadedEntity)) {
var message = format("%s: ID %s. Attempt #%d failed to %s. Retry limit exceeded. Cause: %s",
reloadedEntity.getClass().getSimpleName(),
reloadedEntity.getId(),
Expand All @@ -65,7 +65,7 @@ boolean process(E entity, String description) {
throwable.getMessage());
monitor.severe(message, throwable);

onRetryExhausted.accept(entity, throwable);
onRetryExhausted.accept(reloadedEntity, throwable);
} else {
var message = format("%s: ID %s. Attempt #%d failed to %s. Cause: %s",
reloadedEntity.getClass().getSimpleName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.eclipse.edc.spi.protocol.ProtocolWebhook;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.spi.types.domain.message.ProcessRemoteMessage;
import org.eclipse.edc.statemachine.Processor;
import org.eclipse.edc.statemachine.ProcessorImpl;
import org.eclipse.edc.statemachine.retry.AsyncStatusResultRetryProcess;

import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

import static java.lang.String.format;
Expand Down Expand Up @@ -72,32 +75,39 @@ private boolean setPending(ContractNegotiation contractNegotiation) {
*/
@WithSpan
protected boolean processTerminating(ContractNegotiation negotiation) {
var terminationBuilder = ContractNegotiationTerminationMessage.Builder.newInstance()
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
var messageBuilder = ContractNegotiationTerminationMessage.Builder.newInstance()
.rejectionReason(negotiation.getErrorDetail())
.policy(negotiation.getLastContractOffer().getPolicy())
.processId(negotiation.getCorrelationId());
.policy(negotiation.getLastContractOffer().getPolicy());

return dispatch(messageBuilder, negotiation)
.onSuccess((n, result) -> transitionToTerminated(n))
.onFailure((n, throwable) -> transitionToTerminating(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminated(n, format("Failed to send termination to counter party: %s", throwable.getMessage())))
.execute("[%s] send termination".formatted(type().name()));
}

protected AsyncStatusResultRetryProcess<ContractNegotiation, Object, ?> dispatch(ProcessRemoteMessage.Builder<?, ?> messageBuilder,
ContractNegotiation negotiation) {
messageBuilder.counterPartyAddress(negotiation.getCounterPartyAddress())
.protocol(negotiation.getProtocol())
.processId(Optional.ofNullable(negotiation.getCorrelationId()).orElse(negotiation.getId()));

if (type() == ContractNegotiation.Type.CONSUMER) {
terminationBuilder
.consumerPid(negotiation.getId())
.providerPid(negotiation.getCorrelationId());
messageBuilder.consumerPid(negotiation.getId()).providerPid(negotiation.getCorrelationId());
} else {
terminationBuilder
.providerPid(negotiation.getId())
.consumerPid(negotiation.getCorrelationId());
messageBuilder.providerPid(negotiation.getId()).consumerPid(negotiation.getCorrelationId());
}

var termination = terminationBuilder.build();
if (negotiation.lastSentProtocolMessage() != null) {
messageBuilder.id(negotiation.lastSentProtocolMessage());
}

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, terminationBuilder.build()))
.entityRetrieve(store::findById)
.onSuccess((n, result) -> transitionToTerminated(n))
.onFailure((n, throwable) -> transitionToTerminating(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminated(n, format("Failed to send %s to counter party: %s", termination.getClass().getSimpleName(), throwable.getMessage())))
.execute("[%s] send termination".formatted(type().name()));
var message = messageBuilder.build();

negotiation.lastSentProtocolMessage(message.getId());

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, message));
}

protected void transitionToInitial(ContractNegotiation negotiation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@

import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementVerificationMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractNegotiationEventMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestMessage;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement;
import org.eclipse.edc.statemachine.StateMachineManager;

import java.util.Optional;
import java.util.UUID;

import static java.lang.String.format;
import static org.eclipse.edc.connector.contract.spi.types.agreement.ContractNegotiationEventMessage.Type.ACCEPTED;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation.Type.CONSUMER;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.ACCEPTING;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.AGREED;
Expand Down Expand Up @@ -112,25 +111,17 @@ private boolean processInitial(ContractNegotiation negotiation) {
*/
@WithSpan
private boolean processRequesting(ContractNegotiation negotiation) {
var offer = negotiation.getLastContractOffer();
var request = ContractRequestMessage.Builder.newInstance()
.contractOffer(offer)
.counterPartyAddress(negotiation.getCounterPartyAddress())
var messageBuilder = ContractRequestMessage.Builder.newInstance()
.contractOffer(negotiation.getLastContractOffer())
.callbackAddress(protocolWebhook.url())
.protocol(negotiation.getProtocol())
.consumerPid(negotiation.getId())
.providerPid(negotiation.getCorrelationId())
.processId(Optional.ofNullable(negotiation.getCorrelationId()).orElse(negotiation.getId()))
.type(ContractRequestMessage.Type.INITIAL)
.build();
.type(ContractRequestMessage.Type.INITIAL);

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, request))
.entityRetrieve(store::findById)
return dispatch(messageBuilder, negotiation)
.onSuccess((n, result) -> transitionToRequested(n))
.onFailure((n, throwable) -> transitionToRequesting(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send %s to provider: %s", request.getClass().getSimpleName(), throwable.getMessage())))
.execute("[Consumer] Send ContractRequestMessage message");
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send request to provider: %s", throwable.getMessage())))
.execute("[Consumer] send request");
}

/**
Expand All @@ -143,33 +134,14 @@ private boolean processRequesting(ContractNegotiation negotiation) {
*/
@WithSpan
private boolean processAccepting(ContractNegotiation negotiation) {
var lastOffer = negotiation.getLastContractOffer();

var policy = lastOffer.getPolicy();
var agreement = ContractAgreement.Builder.newInstance()
.contractSigningDate(clock.instant().getEpochSecond())
.providerId(negotiation.getCounterPartyId())
.consumerId(participantId)
.policy(policy)
.assetId(lastOffer.getAssetId())
.build();
var messageBuilder = ContractNegotiationEventMessage.Builder.newInstance().type(ACCEPTED);

var request = ContractAgreementMessage.Builder.newInstance()
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
.contractAgreement(agreement)
.consumerPid(negotiation.getId())
.providerPid(negotiation.getCorrelationId())
.processId(negotiation.getCorrelationId())
.build();

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, request))
.entityRetrieve(store::findById)
return dispatch(messageBuilder, negotiation)
.onSuccess((n, result) -> transitionToAccepted(n))
.onFailure((n, throwable) -> transitionToAccepting(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send %s to provider: %s", request.getClass().getSimpleName(), throwable.getMessage())))
.execute("[consumer] send agreement");
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send acceptance to provider: %s", throwable.getMessage())))
.execute("[consumer] send acceptance");
}

/**
Expand All @@ -192,22 +164,15 @@ private boolean processAgreed(ContractNegotiation negotiation) {
*/
@WithSpan
private boolean processVerifying(ContractNegotiation negotiation) {
var message = ContractAgreementVerificationMessage.Builder.newInstance()
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
.consumerPid(negotiation.getId())
.providerPid(negotiation.getCorrelationId())
.policy(negotiation.getContractAgreement().getPolicy())
.processId(negotiation.getCorrelationId())
.build();
var messageBuilder = ContractAgreementVerificationMessage.Builder.newInstance()
.policy(negotiation.getContractAgreement().getPolicy());

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(store::findById)
return dispatch(messageBuilder, negotiation)
.onSuccess((n, result) -> transitionToVerified(n))
.onFailure((n, throwable) -> transitionToVerifying(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send %s to provider: %s", message.getClass().getSimpleName(), throwable.getMessage())))
.execute(format("[consumer] send %s", message.getClass().getSimpleName()));
.onRetryExhausted((n, throwable) -> transitionToTerminating(n, format("Failed to send verification to provider: %s", throwable.getMessage())))
.execute("[consumer] send verification");
}

/**
Expand Down
Loading

0 comments on commit 89c0903

Please sign in to comment.