createNonLegacySnapshots(SnapshotEventProvider provider, String eventType, Object cursor) {
- return provider.getSnapshot(eventType, cursor)
- .stream()
- .map(this::mapLegacyToNewSnapshot)
- .collect(toList());
- }
-
- private Snapshot mapLegacyToNewSnapshot(SnapshotEventProvider.Snapshot snapshot) {
- return new Snapshot(snapshot.getId(), snapshot.getDataType(), snapshot.getData());
- }
-
@Bean
public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, ObjectMapper objectMapper,
FlowIdComponent flowIdComponent) {
diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java
index e5552e12..bf7e4ff9 100644
--- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java
+++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java
@@ -1,18 +1,148 @@
package org.zalando.nakadiproducer;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.flywaydb.core.api.MigrationInfo;
+import org.flywaydb.core.api.configuration.ConfigurationAware;
-import org.springframework.beans.factory.annotation.Qualifier;
+import java.sql.Connection;
/**
- * Qualifier annotation for a FlywayCallback to be injected in to nakadi-producer's Flyway instance.
+ * This is the main callback interface that should be implemented to get access to flyway lifecycle notifications.
+ * Simply add code to the callback method you are interested in having.
+ *
+ * If a callback also implements the {@link ConfigurationAware} interface,
+ * a {@link org.flywaydb.core.api.configuration.FlywayConfiguration} object will automatically be injected before
+ * calling any methods, giving the callback access to the core flyway configuration.
+ *
+ * Each callback method will run within its own transaction.
*/
-@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE,
- ElementType.ANNOTATION_TYPE })
-@Retention(RetentionPolicy.RUNTIME)
-@Qualifier
-public @interface NakadiProducerFlywayCallback {
+public interface NakadiProducerFlywayCallback {
+ /**
+ * Runs before the clean task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void beforeClean(Connection connection) {}
+
+ /**
+ * Runs after the clean task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void afterClean(Connection connection) {};
+
+ /**
+ * Runs before the migrate task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void beforeMigrate(Connection connection) {};
+
+ /**
+ * Runs after the migrate task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void afterMigrate(Connection connection) {}
+
+ /**
+ * Runs before the undo task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void beforeUndo(Connection connection) {}
+
+ /**
+ * Runs before each migration script is undone.
+ *
+ * @param connection A valid connection to the database.
+ * @param info The current MigrationInfo for the migration to be undone.
+ */
+ default void beforeEachUndo(Connection connection, MigrationInfo info) {}
+
+ /**
+ * Runs after each migration script is undone.
+ *
+ * @param connection A valid connection to the database.
+ * @param info The current MigrationInfo for the migration just undone.
+ */
+ default void afterEachUndo(Connection connection, MigrationInfo info) {}
+
+ /**
+ * Runs after the undo task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void afterUndo(Connection connection) {}
+
+ /**
+ * Runs before each migration script is executed.
+ *
+ * @param connection A valid connection to the database.
+ * @param info The current MigrationInfo for this migration.
+ */
+ default void beforeEachMigrate(Connection connection, MigrationInfo info) {}
+
+ /**
+ * Runs after each migration script is executed.
+ *
+ * @param connection A valid connection to the database.
+ * @param info The current MigrationInfo for this migration.
+ */
+ default void afterEachMigrate(Connection connection, MigrationInfo info) {}
+
+ /**
+ * Runs before the validate task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void beforeValidate(Connection connection) {}
+
+ /**
+ * Runs after the validate task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void afterValidate(Connection connection) {}
+
+ /**
+ * Runs before the baseline task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void beforeBaseline(Connection connection) {}
+
+ /**
+ * Runs after the baseline task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void afterBaseline(Connection connection) {}
+
+ /**
+ * Runs before the repair task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void beforeRepair(Connection connection) {}
+
+ /**
+ * Runs after the repair task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void afterRepair(Connection connection) {}
+
+ /**
+ * Runs before the info task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void beforeInfo(Connection connection) {}
+
+ /**
+ * Runs after the info task executes.
+ *
+ * @param connection A valid connection to the database.
+ */
+ default void afterInfo(Connection connection) {}
}
diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java
index 3545fbf9..ef423054 100644
--- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java
+++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java
@@ -2,28 +2,36 @@
import java.util.Set;
-import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
-import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
+import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
+import org.springframework.boot.actuate.endpoint.annotation.Selector;
+import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import org.springframework.lang.Nullable;
-@ConfigurationProperties("endpoints.snapshot-event-creation")
-public class SnapshotEventCreationEndpoint extends AbstractEndpoint {
+@Endpoint(id = "snapshot-event-creation")
+public class SnapshotEventCreationEndpoint {
private final SnapshotCreationService snapshotCreationService;
public SnapshotEventCreationEndpoint(SnapshotCreationService snapshotCreationService) {
- super("snapshot_event_creation", true, true);
this.snapshotCreationService = snapshotCreationService;
}
- @Override
- public SnapshotReport invoke() {
+ @ReadOperation
+ public SnapshotReport getSupportedEventTypes() {
return new SnapshotReport(snapshotCreationService.getSupportedEventTypes());
}
- public void invoke(String eventType, String filter) {
- snapshotCreationService.createSnapshotEvents(eventType, filter);
+ @WriteOperation
+ public void createFilteredSnapshotEvents(
+ // this is the event type. Could have a better name, but since Spring Boot relies on the -parameters
+ // compiler flag being set to resolve path parameter names, it would then get trickier to reliably run this
+ // Test in the IDE. So let's stick with arg0 for now.
+ @Selector String arg0,
+ @Nullable String filter) {
+ snapshotCreationService.createSnapshotEvents(arg0, filter);
}
diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java
deleted file mode 100644
index fffc577f..00000000
--- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.zalando.nakadiproducer.snapshots.impl;
-
-import org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter;
-import org.springframework.boot.actuate.endpoint.mvc.HypermediaDisabled;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.ResponseBody;
-
-public class SnapshotEventCreationMvcEndpoint extends EndpointMvcAdapter {
- private final SnapshotEventCreationEndpoint delegate;
-
- public SnapshotEventCreationMvcEndpoint(SnapshotEventCreationEndpoint delegate) {
- super(delegate);
- this.delegate = delegate;
- }
-
- @RequestMapping(value = "/{eventType:.*}", method = RequestMethod.POST)
- @ResponseBody
- @HypermediaDisabled
- public ResponseEntity> createSnapshot(@PathVariable String eventType,
- @RequestBody(required = false) String filter) {
- if (!this.delegate.isEnabled()) {
- // Shouldn't happen - MVC endpoint shouldn't be registered when delegate's
- // disabled
- return getDisabledResponse();
- }
-
- delegate.invoke(eventType, filter);
- return ResponseEntity.ok().build();
- }
-
-
-}
diff --git a/nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories b/nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index 7fac8de6..00000000
--- a/nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,2 +0,0 @@
-# org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.zalando.nakadiproducer.NakadiProducerAutoConfiguration
-org.springframework.boot.actuate.autoconfigure.ManagementContextConfiguration=org.zalando.nakadiproducer.NakadiProducerAutoConfiguration.ManagementEndpointConfiguration
\ No newline at end of file
diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java
index 638a0c82..5d2e6275 100644
--- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java
+++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java
@@ -1,23 +1,42 @@
package org.zalando.nakadiproducer;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.sql.Connection;
-import org.flywaydb.core.api.callback.FlywayCallback;
+import org.flywaydb.core.api.configuration.ConfigurationAware;
+import org.flywaydb.core.api.configuration.FlywayConfiguration;
import org.junit.Test;
+import org.mockito.InOrder;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.annotation.DirtiesContext;
public class NakadiProducerFlywayCallbackIT extends BaseMockedExternalCommunicationIT {
@MockBean
- @NakadiProducerFlywayCallback
- private FlywayCallback flywayCallback;
+ private NakadiProducerFlywayCallback flywayCallback;
+
+ @MockBean
+ private ConfigurationAwareNakadiProducerFlywayCallback configurationAwareNakadiProducerFlywayCallback;
@Test
+ @DirtiesContext // Needed to make sure that flyway gets executed for each of the tests and Callbacks are called again
public void flywayCallbackIsCalledIfAnnotatedWithQualifierAnnotation() {
- verify(flywayCallback, times(1)).beforeValidate(any(Connection.class));
+ verify(flywayCallback, times(1)).beforeMigrate(any(Connection.class));
+ }
+
+ @Test
+ @DirtiesContext // Needed to make sure that flyway gets executed for each of the tests and Callbacks are called again
+ public void flywayConfigurationIsSetIfCallbackIsConfigurationAware() {
+ InOrder inOrder = inOrder(configurationAwareNakadiProducerFlywayCallback);
+ inOrder.verify(configurationAwareNakadiProducerFlywayCallback).setFlywayConfiguration(any(FlywayConfiguration.class));
+ inOrder.verify(configurationAwareNakadiProducerFlywayCallback, times(1)).beforeMigrate(any(Connection.class));
+
+ }
+
+ public interface ConfigurationAwareNakadiProducerFlywayCallback extends NakadiProducerFlywayCallback, ConfigurationAware {
}
}
diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java
index 59330475..e6f19af7 100644
--- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java
+++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java
@@ -8,15 +8,34 @@
import org.flywaydb.core.api.callback.FlywayCallback;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.zalando.nakadiproducer.config.EmbeddedDataSourceConfig;
-public class NonNakadiProducerFlywayCallbackIT extends BaseMockedExternalCommunicationIT {
+@ActiveProfiles("test")
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.MOCK,
+ properties = { "zalando.team.id:alpha-local-testing", "nakadi-producer.scheduled-transmission-enabled:false", "spring.flyway.enabled:false"},
+ classes = { TestApplication.class, EmbeddedDataSourceConfig.class }
+)
+public class NonNakadiProducerFlywayCallbackIT {
@MockBean
private FlywayCallback flywayCallback;
@Test
- public void flywayCallbackIsNotUsedIfNotAnnotatedWithQualifierAnnotation() {
+ public void flywayCallbacksFromOurHostApplicationAreNotUsedByUs() {
verify(flywayCallback, never()).beforeValidate(any(Connection.class));
}
+
+ @Test
+ public void ourOwnFlywayConfigurationStillWorksFineWhenSpringsFlywayAutoconfigIsDisabled() {
+ // Yes, this is redundant to the other test in here.
+ // We consider it important to document the requirement, so it is here nonetheless.
+ // The test setup done by the class annotations does just enough to test it
+ }
}
diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java
index 5569eb21..c61f4c80 100644
--- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java
+++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java
@@ -2,13 +2,15 @@
import static com.jayway.restassured.RestAssured.given;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.actuate.autoconfigure.LocalManagementPort;
+import org.springframework.boot.actuate.autoconfigure.web.server.LocalManagementPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -21,13 +23,18 @@
@RunWith(SpringRunner.class)
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = { "management.security.enabled=false", "zalando.team.id:alpha-local-testing", "nakadi-producer.scheduled-transmission-enabled:false" },
+ properties = {
+ "management.security.enabled=false",
+ "zalando.team.id:alpha-local-testing",
+ "nakadi-producer.scheduled-transmission-enabled:false",
+ "management.endpoints.web.exposure.include:snapshot-event-creation"
+ },
classes = { TestApplication.class, EmbeddedDataSourceConfig.class, SnapshotEventGenerationWebEndpointIT.Config.class }
)
public class SnapshotEventGenerationWebEndpointIT {
private static final String MY_EVENT_TYPE = "my.event-type";
- private static final String MY_REQUEST_BODY = "my request body";
+ private static final String FILTER = "myRequestBody";
@LocalManagementPort
private int managementPort;
@@ -35,21 +42,38 @@ public class SnapshotEventGenerationWebEndpointIT {
@Autowired
private SnapshotEventGenerator snapshotEventGenerator;
+ @Before
+ public void resetMocks() {
+ reset(snapshotEventGenerator);
+ }
+
+ @Test
+ public void passesFilterIfPresentInUrl() {
+ given().baseUri("http://localhost:" + managementPort)
+ .contentType("application/json")
+ .when().post("/actuator/snapshot-event-creation/" + MY_EVENT_TYPE + "?filter=" + FILTER)
+ .then().statusCode(204);
+
+ verify(snapshotEventGenerator).generateSnapshots(null, FILTER);
+ }
+
@Test
- public void passesRequestBodyIfPresent() {
+ public void passesFilterIfPresentInBody() {
given().baseUri("http://localhost:" + managementPort)
- .body(MY_REQUEST_BODY)
- .when().post("/snapshot_event_creation/" + MY_EVENT_TYPE)
- .then().statusCode(200);
+ .contentType("application/json")
+ .body("{\"filter\":\"" + FILTER + "\"}")
+ .when().post("/actuator/snapshot-event-creation/" + MY_EVENT_TYPE)
+ .then().statusCode(204);
- verify(snapshotEventGenerator).generateSnapshots(null, MY_REQUEST_BODY);
+ verify(snapshotEventGenerator).generateSnapshots(null, FILTER);
}
@Test
- public void passesNullIfNoRequestBodyPresent() {
+ public void passesNullIfNoFilterIsPresent() {
given().baseUri("http://localhost:" + managementPort)
- .when().post("/snapshot_event_creation/" + MY_EVENT_TYPE)
- .then().statusCode(200);
+ .contentType("application/json")
+ .when().post("/actuator/snapshot-event-creation/" + MY_EVENT_TYPE)
+ .then().statusCode(204);
verify(snapshotEventGenerator).generateSnapshots(null, null);
}
diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java
index 9b52eb24..37c00718 100644
--- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java
+++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java
@@ -3,9 +3,6 @@
import static org.junit.Assert.fail;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -29,12 +26,6 @@ public void picksUpDefinedSnapshotEventProviders() {
// expect no exceptions
snapshotCreationService.createSnapshotEvents("B", "");
- // expect no exceptions
- snapshotCreationService.createSnapshotEvents("C", "");
-
- // expect no exceptions
- snapshotCreationService.createSnapshotEvents("D", "");
-
try {
snapshotCreationService.createSnapshotEvents("not defined", "");
} catch (final UnknownEventTypeException e) {
@@ -56,26 +47,5 @@ public SnapshotEventGenerator snapshotEventProviderA() {
public SnapshotEventGenerator snapshotEventProviderB() {
return new SimpleSnapshotEventGenerator("B", (x) -> Collections.emptyList());
}
-
- @Bean
- public SnapshotEventProvider snapshotEventProviderCandD() {
- return new SnapshotEventProvider() {
- @Override
- public List getSnapshot(String eventType, Object withIdGreaterThan) {
- if (!getSupportedEventTypes().contains(eventType)) {
- throw new UnknownEventTypeException(eventType);
- }
- return Collections.emptyList();
- }
-
- @Override
- public Set getSupportedEventTypes() {
- final HashSet types = new HashSet<>();
- types.add("C");
- types.add("D");
- return types;
- }
- };
- }
}
}
diff --git a/nakadi-producer-starter-spring-boot-2-test/pom.xml b/nakadi-producer-starter-spring-boot-2-test/pom.xml
new file mode 100644
index 00000000..d2f6c912
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/pom.xml
@@ -0,0 +1,95 @@
+
+
+ 4.0.0
+
+ nakadi-producer-starter-spring-boot-2-test
+ org.zalando
+
+
+
+ org.zalando
+ nakadi-producer-reactor
+ 20.0.0-SNAPSHOT
+
+
+
+
+ org.zalando
+ nakadi-producer-spring-boot-starter
+ ${project.version}
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.zalando.stups
+ tokens
+ 0.12.0-beta-2
+
+
+ org.springframework.boot
+ spring-boot-starter-jdbc
+ 2.0.3.RELEASE
+
+
+ com.opentable.components
+ otj-pg-embedded
+ 0.12.0
+
+
+ postgresql
+ postgresql
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ com.github.stefanbirkner
+ system-rules
+ 1.18.0
+ test
+
+
+ io.rest-assured
+ rest-assured
+ 3.1.0
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 2.22.0
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java b/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java
new file mode 100644
index 00000000..30929521
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java
@@ -0,0 +1,52 @@
+package org.zalando.nakadiproducer.tests;
+
+import com.opentable.db.postgres.embedded.EmbeddedPostgres;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.zalando.nakadiproducer.EnableNakadiProducer;
+import org.zalando.nakadiproducer.snapshots.SimpleSnapshotEventGenerator;
+import org.zalando.nakadiproducer.snapshots.Snapshot;
+import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator;
+
+import javax.sql.DataSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+@EnableAutoConfiguration
+@EnableNakadiProducer
+public class Application {
+
+ public static void main(String[] args) throws Exception {
+ SpringApplication.run(Application.class, args);
+ }
+
+ @Bean
+ @Primary
+ public DataSource dataSource() throws IOException {
+ return embeddedPostgres().getPostgresDatabase();
+ }
+
+ @Bean
+ public EmbeddedPostgres embeddedPostgres() throws IOException {
+ return EmbeddedPostgres.start();
+ }
+
+ @Bean
+ public SnapshotEventGenerator snapshotEventGenerator() {
+ return new SimpleSnapshotEventGenerator("eventtype", (withIdGreaterThan, filter) -> {
+ if (withIdGreaterThan == null) {
+ return Collections.singletonList(new Snapshot("1", "foo", (Object) filter));
+ } else if (withIdGreaterThan.equals("1")) {
+ return Collections.singletonList(new Snapshot("2", "foo", (Object) filter));
+ } else {
+ return new ArrayList<>();
+ }
+ });
+
+ // Todo: Test that some events arrive at a local nakadi mock
+ }
+}
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.yml b/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.yml
new file mode 100644
index 00000000..47ed5c12
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.yml
@@ -0,0 +1,4 @@
+nakadi-producer:
+ access-token-uri: http://localhost:1234
+ nakadi-base-uri: https://nakadi.example.org:5432
+management.endpoints.web.exposure.include: snapshot-event-creation
\ No newline at end of file
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/main/resources/db/migration/V1.1__noop.sql b/nakadi-producer-starter-spring-boot-2-test/src/main/resources/db/migration/V1.1__noop.sql
new file mode 100644
index 00000000..e69de29b
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java
new file mode 100644
index 00000000..cce10fd9
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java
@@ -0,0 +1,42 @@
+package org.zalando.nakadiproducer.tests;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import org.junit.runner.RunWith;
+import org.springframework.boot.actuate.autoconfigure.web.server.LocalManagementPort;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.File;
+
+import static io.restassured.RestAssured.given;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+ classes = Application.class,
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
+)
+public class ApplicationIT {
+ @LocalManagementPort
+ private int localManagementPort;
+
+ @ClassRule
+ public static final EnvironmentVariables environmentVariables
+ = new EnvironmentVariables();
+
+ @BeforeClass
+ public static void fakeCredentialsDir() {
+ environmentVariables.set("CREDENTIALS_DIR", new File("src/main/test/tokens").getAbsolutePath());
+ }
+
+ @Test
+ public void shouldSuccessfullyStartAndSnapshotCanBeTriggered() {
+ given().baseUri("http://localhost:" + localManagementPort).contentType("application/json")
+ .when().post("/actuator/snapshot-event-creation/eventtype")
+ .then().statusCode(204);
+ }
+
+
+}
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/resources/tokens/nakadi-token-secret b/nakadi-producer-starter-spring-boot-2-test/src/test/resources/tokens/nakadi-token-secret
new file mode 100644
index 00000000..98592b28
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/src/test/resources/tokens/nakadi-token-secret
@@ -0,0 +1 @@
+my fake token
\ No newline at end of file
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/resources/tokens/nakadi-token-type b/nakadi-producer-starter-spring-boot-2-test/src/test/resources/tokens/nakadi-token-type
new file mode 100644
index 00000000..83f31cba
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/src/test/resources/tokens/nakadi-token-type
@@ -0,0 +1 @@
+Bearer
\ No newline at end of file
diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml
index 9b53550c..cd1ab088 100644
--- a/nakadi-producer/pom.xml
+++ b/nakadi-producer/pom.xml
@@ -10,7 +10,7 @@
org.zalando
nakadi-producer-reactor
- 4.2.0
+ 20.0.0-SNAPSHOT
nakadi-producer
diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java
deleted file mode 100644
index 47ed4dd2..00000000
--- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.zalando.nakadiproducer.snapshots;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * The {@code SnapshotEventProvider} interface should be implemented by any
- * Event Producer that wants to support snapshot events feature. The
- * class must define a method {@code getSnapshot}.
- *
- * @deprecated Please use one or more {@link SnapshotEventGenerator} instances instead
- */
-@Deprecated
-public interface SnapshotEventProvider {
-
- /**
- * Returns a batch of snapshots of given type (event type is an event channel topic name). The implementation may
- * return an arbitrary amount of results, but it must return at least one element if there are entities
- * matching the parameters.
- *
- * The library will call your implementation like this:
- * Request: getSnapshot(eventType, null) Response: 1,2,3
- * Request: getSnapshot(eventType, 3) Response: 4,5
- * Request: getSnapshot(eventType, 5) Response: emptyList
- * It is your responsibility to make sure that the returned events are ordered by their id asc and that, given you
- * return a list of events for entities with ids {id1, ..., idN}, there exists no entity with an id between id1 and idN, that
- * is not part of the result.
- *
- *
- * @param eventType event type to make a snapshot of
- * @param withIdGreaterThan if not null, only events for entities with an id greater than the given one must be returned
- * @return list of elements ordered by their id
- * @throws UnknownEventTypeException if {@code eventType} is unknown
- */
- @Deprecated
- List getSnapshot(String eventType, Object withIdGreaterThan);
-
- @Deprecated
- Set getSupportedEventTypes();
-
- @AllArgsConstructor
- @Getter
- @Deprecated
- class Snapshot {
- private Object id;
- private String eventType;
- private String dataType;
- private Object data;
- }
-
-}
diff --git a/pom.xml b/pom.xml
index e70d99e9..83d16fc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,18 +10,19 @@
org.springframework.boot
spring-boot-starter-parent
- 1.5.3.RELEASE
+ 2.0.3.RELEASE
nakadi-producer-reactor
org.zalando
- 4.2.0
+ 20.0.0-SNAPSHOT
pom
Nakadi Event Producer Reactor
nakadi-producer
nakadi-producer-spring-boot-starter
+ nakadi-producer-starter-spring-boot-2-test