diff --git a/nakadi-producer-starter-spring-boot-2-test/pom.xml b/nakadi-producer-starter-spring-boot-2-test/pom.xml
index 6f3887b9..e1ae77a9 100644
--- a/nakadi-producer-starter-spring-boot-2-test/pom.xml
+++ b/nakadi-producer-starter-spring-boot-2-test/pom.xml
@@ -19,6 +19,12 @@
nakadi-producer-spring-boot-starter
${project.version}
+
+ org.zalando
+ nakadi-mock
+ 0.0.1-SNAPSHOT
+ test
+
org.springframework.boot
spring-boot-starter-web
@@ -63,6 +69,7 @@
io.rest-assured
rest-assured
3.1.0
+ test
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java b/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java
index 10db90a9..edb78e48 100644
--- a/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java
+++ b/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java
@@ -5,7 +5,6 @@
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
-import org.zalando.nakadiproducer.EnableNakadiProducer;
import org.zalando.nakadiproducer.snapshots.SimpleSnapshotEventGenerator;
import org.zalando.nakadiproducer.snapshots.Snapshot;
import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator;
@@ -13,11 +12,9 @@
import javax.sql.DataSource;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
@EnableAutoConfiguration
-@EnableNakadiProducer
public class Application {
public static void main(String[] args) throws Exception {
@@ -39,14 +36,24 @@ public EmbeddedPostgres embeddedPostgres() throws IOException {
public SnapshotEventGenerator snapshotEventGenerator() {
return new SimpleSnapshotEventGenerator("eventtype", (withIdGreaterThan, filter) -> {
if (withIdGreaterThan == null) {
- return Collections.singletonList(new Snapshot("1", "foo", filter));
+ return Collections.singletonList(new Snapshot("1", "foo", new Data("1", filter)));
} else if (withIdGreaterThan.equals("1")) {
- return Collections.singletonList(new Snapshot("2", "foo", filter));
+ return Collections.singletonList(new Snapshot("2", "foo", new Data("2", filter)));
} else {
- return new ArrayList<>();
+ return Collections.emptyList();
}
});
// Todo: Test that some events arrive at a local nakadi mock
}
+
+ public static class Data {
+ public String id;
+ public String filter;
+ public Data(String id, String filter) {
+ super();
+ this.id = id;
+ this.filter = filter;
+ }
+ }
}
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationWithMockClientIT.java
similarity index 90%
rename from nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java
rename to nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationWithMockClientIT.java
index 2b29af03..4512ff0f 100644
--- a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java
+++ b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationWithMockClientIT.java
@@ -22,11 +22,11 @@
// This line looks like that by intention: We want to test that the MockNakadiPublishingClient will be picked up
// by our starter *even if* it has been defined *after* the application itself. This has been a problem until
// this commit.
- classes = { Application.class, MockNakadiConfig.class },
- properties = { "nakadi-producer.transmission-polling-delay=30"},
+ classes = { Application.class, MockNakadiClientConfig.class },
+ properties = { "nakadi-producer.transmission-polling-delay=30" },
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
-public class ApplicationIT {
+public class ApplicationWithMockClientIT {
@LocalManagementPort
private int localManagementPort;
@@ -54,7 +54,9 @@ public void cleanUpMock() {
@Test
public void shouldSuccessfullyStartAndSnapshotCanBeTriggered() throws InterruptedException {
- given().baseUri("http://localhost:" + localManagementPort).contentType("application/json")
+ given().baseUri("http://localhost:" + localManagementPort)
+ .contentType("application/json")
+ .body("{'filter':'Example filter'}".replace('\'', '"'))
.when().post("/actuator/snapshot-event-creation/eventtype")
.then().statusCode(204);
@@ -64,6 +66,4 @@ public void shouldSuccessfullyStartAndSnapshotCanBeTriggered() throws Interrupte
List events = mockClient.getSentEvents("eventtype");
assertThat(events, hasSize(2));
}
-
-
}
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationWithMockServerIT.java b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationWithMockServerIT.java
new file mode 100644
index 00000000..53ed3f80
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationWithMockServerIT.java
@@ -0,0 +1,70 @@
+package org.zalando.nakadiproducer.tests;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.actuate.autoconfigure.web.server.LocalManagementPort;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.zalando.nakadi_mock.EventSubmissionCallback.CollectingCallback;
+import org.zalando.nakadi_mock.EventSubmissionCallback.DataChangeEvent;
+import org.zalando.nakadi_mock.NakadiMock;
+import org.zalando.nakadiproducer.tests.Application.Data;
+import java.io.File;
+import java.util.List;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+ classes = { Application.class },
+ properties = { "nakadi-producer.transmission-polling-delay=30"},
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
+)
+@ContextConfiguration(initializers=NakadiServerMockInitializer.class)
+public class ApplicationWithMockServerIT {
+
+ @LocalManagementPort
+ private int localManagementPort;
+
+ @ClassRule
+ public static final EnvironmentVariables environmentVariables
+ = new EnvironmentVariables();
+
+ @BeforeClass
+ public static void fakeCredentialsDir() {
+ environmentVariables.set("CREDENTIALS_DIR", new File("src/test/resources/tokens").getAbsolutePath());
+ }
+
+ @Autowired
+ NakadiMock nakadiMock;
+
+ @Test
+ public void shouldSuccessfullyStartAndSnapshotCanBeTriggered() throws InterruptedException {
+ CollectingCallback> collector = new CollectingCallback>() {};
+ nakadiMock.eventType("eventtype").setSubmissionCallback(collector);
+
+ given().baseUri("http://localhost:" + localManagementPort)
+ .contentType("application/json")
+ .body("{'filter':'Example filter'}".replace('\'', '"'))
+ .when().post("/actuator/snapshot-event-creation/eventtype")
+ .then().statusCode(204);
+
+ Thread.sleep(1200);
+
+ List> events = collector.getSubmittedEvents();
+ assertThat(events, hasSize(2));
+ assertThat(events.get(0).getDataOp(), is("S"));
+ assertThat(events.get(0).getData().id, is("1"));
+
+ assertThat(events.get(1).getDataOp(), is("S"));
+ assertThat(events.get(1).getData().id, is("2"));
+ }
+}
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/MockNakadiConfig.java b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/MockNakadiClientConfig.java
similarity index 92%
rename from nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/MockNakadiConfig.java
rename to nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/MockNakadiClientConfig.java
index 95a52610..6c466f47 100644
--- a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/MockNakadiConfig.java
+++ b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/MockNakadiClientConfig.java
@@ -6,7 +6,7 @@
import org.zalando.nakadiproducer.transmission.NakadiPublishingClient;
@Configuration
-public class MockNakadiConfig {
+public class MockNakadiClientConfig {
@Bean
public NakadiPublishingClient mockNakadiPublishingClient() {
return new MockNakadiPublishingClient();
diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/NakadiServerMockInitializer.java b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/NakadiServerMockInitializer.java
new file mode 100644
index 00000000..9929d3c1
--- /dev/null
+++ b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/NakadiServerMockInitializer.java
@@ -0,0 +1,33 @@
+package org.zalando.nakadiproducer.tests;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.zalando.nakadi_mock.NakadiMock;
+
+import java.net.URL;
+
+
+/**
+ * An application context initializer which sets up a NakadiMock bean and registers the server URL as a property.
+ */
+class NakadiServerMockInitializer implements ApplicationContextInitializer{
+
+ private static final Logger LOG = LoggerFactory.getLogger(NakadiServerMockInitializer.class);
+
+ @Override
+ public void initialize(ConfigurableApplicationContext context) {
+ // setup NakadiMock, inject URL into nakadi-producer
+
+ NakadiMock mock = NakadiMock.make();
+ context.getBeanFactory().registerSingleton("nakadiMock", mock);
+ mock.start();
+ URL url = mock.getRootUrl();
+
+ LOG.info("started mock nakadi on {}", url);
+
+ TestPropertyValues.of("nakadi-producer.nakadi-base-uri="+url).applyTo(context);
+ }
+}
\ No newline at end of file
diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java
index 81cfc224..6ff628c1 100644
--- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java
+++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java
@@ -55,7 +55,7 @@ public void sendEvent(EventLog eventLog) {
log.info("Event {} locked by {} was successfully transmitted to nakadi", eventLog.getId(), eventLog.getLockedBy());
eventLogRepository.delete(eventLog);
} catch (Exception e) {
- log.error("Event {} locked by {} could not be transmitted to nakadi: {}", eventLog.getId(), eventLog.getLockedBy(), e.getMessage());
+ log.error("Event {} locked by {} could not be transmitted to nakadi: {}", eventLog.getId(), eventLog.getLockedBy(), e.toString());
}
}