Skip to content

Commit

Permalink
ci: fix flaky testcontainers tests (#4712)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt authored Jan 13, 2025
1 parent 02dea95 commit 905b28f
Show file tree
Hide file tree
Showing 12 changed files with 369 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@

import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -44,37 +47,63 @@
public class EmbeddedRuntime extends BaseRuntime {

private final String name;
private final Map<String, String> properties;
private final LinkedHashMap<Class<?>, Object> serviceMocks = new LinkedHashMap<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final MultiSourceServiceLocator serviceLocator;
private final URL[] classPathEntries;
private Future<?> runtimeThread;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final List<Supplier<Config>> configurationProviders = new ArrayList<>();

public EmbeddedRuntime(String name, String... additionalModules) {
this(new MultiSourceServiceLocator(), name, ClasspathReader.classpathFor(additionalModules));
}

public EmbeddedRuntime(String name, URL[] classpathEntries) {
this(new MultiSourceServiceLocator(), name, classpathEntries);
}

/**
* Deprecated, configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*
* @deprecated configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*/
@Deprecated(since = "0.11.0")
public EmbeddedRuntime(String name, Map<String, String> properties, String... additionalModules) {
this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules));
}

/**
* Deprecated, configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*
* @deprecated configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*/
@Deprecated(since = "0.11.0")
public EmbeddedRuntime(String name, Map<String, String> properties, URL[] classpathEntries) {
this(new MultiSourceServiceLocator(), name, properties, classpathEntries);
}

private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map<String, String> properties, URL[] classPathEntries) {
private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, URL[] classPathEntries) {
super(serviceLocator);
this.serviceLocator = serviceLocator;
this.name = name;
this.properties = properties;
this.classPathEntries = classPathEntries;
}

@Deprecated(since = "0.11.0")
private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map<String, String> properties, URL[] classPathEntries) {
this(serviceLocator, name, classPathEntries);
this.configurationProviders.add(() -> ConfigFactory.fromMap(properties));
}

@Override
public void boot(boolean addShutdownHook) {
var monitor = super.createMonitor();

monitor.info("Starting runtime %s".formatted(name));

serviceLocator.registerSystemExtension(ConfigurationExtension.class, (ConfigurationExtension) () -> ConfigFactory.fromMap(properties));
configurationProviders.forEach(provider -> serviceLocator
.registerSystemExtension(ConfigurationExtension.class, (ConfigurationExtension) provider::get));

var runtimeThrowable = new AtomicReference<Throwable>();
var latch = new CountDownLatch(1);
Expand Down Expand Up @@ -155,4 +184,15 @@ public ServiceExtensionContext getContext() {
public boolean isRunning() {
return isRunning.get();
}

/**
* Adds a configuration provider, that will be invoked during connector startup.
*
* @param configurationProvider the configuration provider.
* @return self.
*/
public EmbeddedRuntime configurationProvider(Supplier<Config> configurationProvider) {
configurationProviders.add(configurationProvider);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.edc.sql.testfixtures;

import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.sql.DriverManagerConnectionFactory;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.SqlQueryExecutor;
Expand Down Expand Up @@ -135,6 +137,10 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
return null;
}

public Config getDatasourceConfig() {
return ConfigFactory.fromMap(getDatasourceConfiguration());
}

public Map<String, String> getDatasourceConfiguration() {
return postgres.createDefaultDatasourceConfiguration(postgreSqlContainer.getDatabaseName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2025 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.connector.controlplane.test.system.utils;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Lazy implementation of the {@link Supplier} interface.
*/
public class LazySupplier<T> implements Supplier<T> {

private final Supplier<T> dataSupplier;
private final AtomicReference<T> data = new AtomicReference<>();

public LazySupplier(Supplier<T> dataSupplier) {
this.dataSupplier = dataSupplier;
}

@Override
public T get() {
var currentValue = data.get();
if (currentValue == null) {
var newValue = dataSupplier.get();
data.compareAndExchange(null, newValue);
return newValue;
}
return currentValue;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ public abstract class AbstractDataPlaneTest {
protected static RuntimeExtension runtime =
new RuntimePerClassExtension(new EmbeddedRuntime(
"data-plane",
DATAPLANE.dataPlaneConfiguration(),
":system-tests:e2e-dataplane-tests:runtimes:data-plane"
));
).configurationProvider(DATAPLANE::dataPlaneConfig));

protected void seedVault() {
var vault = runtime.getService(Vault.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.junit.extensions.EmbeddedRuntime;
import org.eclipse.edc.junit.extensions.RuntimeExtension;
import org.eclipse.edc.junit.extensions.RuntimePerClassExtension;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
Expand Down Expand Up @@ -63,19 +64,17 @@ public class ClusteredDataPlaneEndToEndTest {
.id("urn:connector:provider")
.build();

private static final BiFunction<String, DataPlaneParticipant, EmbeddedRuntime> RUNTIME_SUPPLIER = (name, dataPlaneParticipant) -> {
var config = dataPlaneParticipant.dataPlaneConfiguration();
config.put("edc.runtime.id", name);
config.put("edc.sql.schema.autocreate", "true");
config.put("edc.core.retry.retries.max", "0");
config.putAll(POSTGRESQL.getDatasourceConfiguration());
return new EmbeddedRuntime(
name,
config,
":system-tests:e2e-dataplane-tests:runtimes:data-plane",
":dist:bom:dataplane-feature-sql-bom"
);
};
private static final BiFunction<String, DataPlaneParticipant, EmbeddedRuntime> RUNTIME_SUPPLIER =
(name, dataPlaneParticipant) -> new EmbeddedRuntime(
name,
":system-tests:e2e-dataplane-tests:runtimes:data-plane",
":dist:bom:dataplane-feature-sql-bom")
.configurationProvider(dataPlaneParticipant::dataPlaneConfig)
.configurationProvider(POSTGRESQL::getDatasourceConfig)
.configurationProvider(() -> ConfigFactory.fromMap(Map.of(
"edc.runtime.id", name,
"edc.sql.schema.autocreate", "true"
)));

private static final EmbeddedRuntime FOO_RUNTIME = RUNTIME_SUPPLIER.apply("foo", FOO_DATAPLANE);
private static final EmbeddedRuntime BAR_RUNTIME = RUNTIME_SUPPLIER.apply("bar", BAR_DATAPLANE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package org.eclipse.edc.test.e2e.participant;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.eclipse.edc.connector.controlplane.test.system.utils.LazySupplier;
import org.eclipse.edc.connector.controlplane.test.system.utils.Participant;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
Expand All @@ -28,42 +31,42 @@

public class DataPlaneParticipant extends Participant {

private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control");
private final URI dataPlaneDefault = URI.create("http://localhost:" + getFreePort());
private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control");
private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public");
private final URI dataplanePublicResponse = dataPlanePublic.resolve("/public/responseChannel");
private final String componentId = UUID.randomUUID().toString();
private final LazySupplier<URI> dataPlaneControl = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/control"));
private final LazySupplier<URI> dataPlanePublic = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/public"));

private DataPlaneParticipant() {
super();
}

public Endpoint getDataPlaneControlEndpoint() {
return new Endpoint(dataPlaneControl);
return new Endpoint(dataPlaneControl.get());
}

public Endpoint getDataPlanePublicEndpoint() {
return new Endpoint(dataPlanePublic);
return new Endpoint(dataPlanePublic.get());
}

public Config dataPlaneConfig() {
return ConfigFactory.fromMap(dataPlaneConfiguration());
}

public Map<String, String> dataPlaneConfiguration() {
return new HashMap<>() {
{
put("edc.component.id", componentId);
put("web.http.port", String.valueOf(dataPlaneDefault.getPort()));
put("edc.component.id", UUID.randomUUID().toString());
put("web.http.port", String.valueOf(getFreePort()));
put("web.http.path", "/api");
put("web.http.public.port", String.valueOf(dataPlanePublic.getPort()));
put("web.http.public.port", String.valueOf(dataPlanePublic.get().getPort()));
put("web.http.public.path", "/public");
put("web.http.control.port", String.valueOf(dataPlaneControl.getPort()));
put("web.http.control.path", dataPlaneControl.getPath());
put("web.http.control.port", String.valueOf(dataPlaneControl.get().getPort()));
put("web.http.control.path", dataPlaneControl.get().getPath());
put("edc.keystore", resourceAbsolutePath("certs/cert.pfx"));
put("edc.keystore.password", "123456");
put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token");
put("edc.dataplane.http.sink.partition.size", "1");
put("edc.transfer.proxy.token.signer.privatekey.alias", "1");
put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key");
put("edc.dataplane.api.public.response.baseurl", dataplanePublicResponse.toString());
put("edc.dataplane.api.public.response.baseurl", dataPlanePublic.get().resolve("/public/responseChannel").toString());
put("edc.core.retry.retries.max", "0");
}
};
}
Expand Down Expand Up @@ -92,4 +95,5 @@ public DataPlaneParticipant build() {
return participant;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2025 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.test.e2e;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Properties;

public class KafkaExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver {

private final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.7.1"));

@Override
public void beforeAll(ExtensionContext context) {
kafka.start();
}

@Override
public void afterAll(ExtensionContext context) {
kafka.stop();
kafka.close();
}

@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
var type = parameterContext.getParameter().getParameterizedType();
return type instanceof KafkaExtension;
}

@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
var type = parameterContext.getParameter().getParameterizedType();
if (type instanceof KafkaExtension) {
return this;
}
return null;
}

public Producer<String, String> createKafkaProducer() {
var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}

public Consumer<String, String> createKafkaConsumer() {
var props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}

public String getBootstrapServers() {
return kafka.getBootstrapServers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.eclipse.edc.junit.extensions.EmbeddedRuntime;

import java.net.URL;
import java.util.Map;

/**
* Runtimes for E2E transfer test.
Expand Down Expand Up @@ -66,10 +65,11 @@ public enum Runtimes {
this.modules = modules;
}

public EmbeddedRuntime create(String name, Map<String, String> configuration) {
public EmbeddedRuntime create(String name) {
if (classpathEntries == null) {
classpathEntries = ClasspathReader.classpathFor(modules);
}
return new EmbeddedRuntime(name, configuration, classpathEntries);
return new EmbeddedRuntime(name, classpathEntries);
}

}
Loading

0 comments on commit 905b28f

Please sign in to comment.