Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: e2e pull transfer tests #4733

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonObject;
import okhttp3.Request;
Expand All @@ -33,6 +34,7 @@
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -115,26 +117,13 @@ void httpPull_dataTransfer_withCallbacks() {
var callbacks = Json.createArrayBuilder()
.add(createCallback(callbackUrl, true, Set.of("transfer.process.started")))
.build();

var request = request().withPath("/hooks")
.withMethod(HttpMethod.POST.name());

var events = new ConcurrentHashMap<String, TransferProcessStarted>();

callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events));

var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.withCallbacks(callbacks)
.execute();

CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull());

var event = events.get(transferProcessId);
var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));
StartedTransferContext startedTransferContext = startTransferProcess(assetId, callbacks);
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId, events);

providerDataSource.verify(request("/source").withMethod("GET"));
stopQuietly(callbacksEndpoint);
Expand All @@ -143,62 +132,29 @@ void httpPull_dataTransfer_withCallbacks() {
@Test
void httpPull_dataTransfer_withEdrCache() {
var assetId = UUID.randomUUID().toString();
var sourceDataAddress = httpSourceDataAddress();
createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), sourceDataAddress);

var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

// Do the transfer
var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

// checks that the EDR is gone once the contract expires
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)));

// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), httpSourceDataAddress());
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext);
providerDataSource.verify(request("/source").withMethod("GET"));
}

@Test
void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() {
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

CONSUMER.suspendTransfer(transferProcessId, "supension");
CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId, "suspension");
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED);
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext);

CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

CONSUMER.resumeTransfer(transferProcessId);

// check that transfer is available again
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));
CONSUMER.resumeTransfer(startedTransferContext.consumerTransferProcessId);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED);
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

providerDataSource.verify(request("/source").withMethod("GET"));
}
Expand All @@ -207,38 +163,18 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() {
void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() {
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);

var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();
PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId, "suspension");
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED);
assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext);

PROVIDER.suspendTransfer(providerTransferProcessId, "supension");

PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

PROVIDER.resumeTransfer(providerTransferProcessId);

// check that transfer is available again
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));
PROVIDER.resumeTransfer(startedTransferContext.providerTransferProcessId);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED);
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

providerDataSource.verify(request("/source").withMethod("GET"));
}
Expand All @@ -256,26 +192,15 @@ void pullFromHttp_httpProvision() {
"proxyQueryParams", "true"
));

var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> {
var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
CONSUMER.pullData(edr, Map.of("message", "some information"), body -> assertThat(body).isEqualTo("data"));
});
StartedTransferContext startedTransferContext = startTransferProcess(assetId);
assertDataIsAccessible(startedTransferContext.consumerTransferProcessId);

provisionServer.verify(request("/provision"));
provisionServer.clear(request("provision"));

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();

PROVIDER.terminateTransfer(providerTransferProcessId);
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, DEPROVISIONED);
PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId);
PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED);
CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED);

provisionServer.verify(request("/provision"));

Expand Down Expand Up @@ -340,6 +265,55 @@ private HttpResponse cacheEdr(HttpRequest request, Map<String, TransferProcessSt
}
}

private StartedTransferContext startTransferProcess(String assetId) {
return startTransferProcess(assetId, null);
}

private StartedTransferContext startTransferProcess(String assetId, JsonArray callbacks) {
var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.withCallbacks(callbacks)
.execute();
CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED);

return new StartedTransferContext(consumerTransferProcessId, providerTransferProcessId);
}

private EdrMessageContext assertDataIsAccessible(String consumerTransferProcessId) {
var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

return new EdrMessageContext(edr, msg);
}

private EdrMessageContext assertDataIsAccessible(String consumerTransferProcessId, ConcurrentHashMap<String, TransferProcessStarted> events) {
await().atMost(timeout).untilAsserted(() -> assertThat(events.get(consumerTransferProcessId)).isNotNull());

var event = events.get(consumerTransferProcessId);
var msg = UUID.randomUUID().toString();
var edr = event.getDataAddress();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

return new EdrMessageContext(edr, msg);
}

private void assertDataIsNotAccessible(String consumerTransferProcessId, EdrMessageContext edrMessageContext) {
// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edrMessageContext.edr, Map.of("message", edrMessageContext.message), body -> assertThat(body).isEqualTo("data"))));
}

private record StartedTransferContext(String consumerTransferProcessId, String providerTransferProcessId) { }

private record EdrMessageContext(DataAddress edr, String message) { }

/**
* Mocked http provisioner
*/
Expand Down
Loading