Skip to content

Commit

Permalink
refactor: e2e pull transfer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrYurk committed Jan 17, 2025
1 parent a6a339f commit c88fee7
Showing 1 changed file with 82 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonObject;
import okhttp3.Request;
Expand All @@ -33,6 +34,7 @@
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -115,26 +117,13 @@ void httpPull_dataTransfer_withCallbacks() {
var callbacks = Json.createArrayBuilder()
.add(createCallback(callbackUrl, true, Set.of("transfer.process.started")))
.build();

var request = request().withPath("/hooks")
.withMethod(HttpMethod.POST.name());

var events = new ConcurrentHashMap<String, TransferProcessStarted>();

callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events));

var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.withCallbacks(callbacks)
.execute();

CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull());

var event = events.get(transferProcessId);
var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));
StartedTransferContext startedTransferContext = startTransferProcess(assetId, callbacks);
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId, events);

providerDataSource.verify(request("/source").withMethod("GET"));
stopQuietly(callbacksEndpoint);
Expand All @@ -143,62 +132,29 @@ void httpPull_dataTransfer_withCallbacks() {
@Test
void httpPull_dataTransfer_withEdrCache() {
var assetId = UUID.randomUUID().toString();
var sourceDataAddress = httpSourceDataAddress();
createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), sourceDataAddress);

var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

// Do the transfer
var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

// checks that the EDR is gone once the contract expires
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)));

// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), httpSourceDataAddress());
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext);
providerDataSource.verify(request("/source").withMethod("GET"));
}

@Test
void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() {
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

CONSUMER.suspendTransfer(transferProcessId, "supension");
CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId, "suspension");
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED);
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext);

CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

CONSUMER.resumeTransfer(transferProcessId);

// check that transfer is available again
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));
CONSUMER.resumeTransfer(startedTransferContext.consumerTransferProcessId);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED);
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

providerDataSource.verify(request("/source").withMethod("GET"));
}
Expand All @@ -207,38 +163,18 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() {
void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() {
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);

var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();
PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId, "suspension");
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED);
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext);

PROVIDER.suspendTransfer(providerTransferProcessId, "supension");

PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

PROVIDER.resumeTransfer(providerTransferProcessId);

// check that transfer is available again
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));
PROVIDER.resumeTransfer(startedTransferContext.providerTransferProcessId);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED);
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

providerDataSource.verify(request("/source").withMethod("GET"));
}
Expand All @@ -256,26 +192,15 @@ void pullFromHttp_httpProvision() {
"proxyQueryParams", "true"
));

var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> {
var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
CONSUMER.pullData(edr, Map.of("message", "some information"), body -> assertThat(body).isEqualTo("data"));
});
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

provisionServer.verify(request("/provision"));
provisionServer.clear(request("provision"));

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();

PROVIDER.terminateTransfer(providerTransferProcessId);
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, DEPROVISIONED);
PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED);

provisionServer.verify(request("/provision"));

Expand Down Expand Up @@ -340,6 +265,55 @@ private HttpResponse cacheEdr(HttpRequest request, Map<String, TransferProcessSt
}
}

private StartedTransferContext startTransferProcess(String assetId) {
return startTransferProcess(assetId, null);
}

private StartedTransferContext startTransferProcess(String assetId, JsonArray callbacks) {
var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.withCallbacks(callbacks)
.execute();
CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED);

return new StartedTransferContext(consumerTransferProcessId, providerTransferProcessId);
}

private EdrMessageContext assertDataIsAccessible(String consumerTransferProcessId) {
var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

return new EdrMessageContext(edr, msg);
}

private EdrMessageContext assertDataIsAccessible(String consumerTransferProcessId, ConcurrentHashMap<String, TransferProcessStarted> events) {
await().atMost(timeout).untilAsserted(() -> assertThat(events.get(consumerTransferProcessId)).isNotNull());

var event = events.get(consumerTransferProcessId);
var msg = UUID.randomUUID().toString();
var edr = event.getDataAddress();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

return new EdrMessageContext(edr, msg);
}

private void assertDataIsNotAccessible(String consumerTransferProcessId, EdrMessageContext edrMessageContext) {
// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edrMessageContext.edr, Map.of("message", edrMessageContext.message), body -> assertThat(body).isEqualTo("data"))));
}

private record StartedTransferContext(String consumerTransferProcessId, String providerTransferProcessId) { }

private record EdrMessageContext(DataAddress edr, String message) { }

/**
* Mocked http provisioner
*/
Expand Down

0 comments on commit c88fee7

Please sign in to comment.