diff --git a/README.md b/README.md index 879839c2..914a9cc1 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ The goal of this Spring Boot starter is to simplify the reliable integration bet There are already [multiple clients for the Nakadi REST API](https://zalando.github.io/nakadi/manual.html#using_clients), but none of them solves the mentioned issues. -We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchonously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded. +We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchronously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded. The Transmitter generates a strictly monotonically increasing event id that can be used for ordering the events during retrieval. It is not guaranteed, that events will be sent to Nakadi in the order they have been produced. If an event could not be sent to Nakadi, the library will periodically retry the transmission. @@ -40,7 +40,8 @@ You may of course always setup a fresh system with the newest version. ## Prerequisites -This library was tested with Spring Boot 1.5.3.RELEASE and relies on an existing configured PostgreSQL DataSource. +This library was tested with Spring Boot 2.0.3.RELEASE and relies on an existing configured PostgreSQL DataSource. If +you are still using Spring Boot 1.x, please use version 4.2.0 ([Release Notes](https://github.com/zalando-nakadi/nakadi-producer-spring-boot-starter/releases/tag/4.2.0), [Documentation](https://github.com/zalando-nakadi/nakadi-producer-spring-boot-starter/blob/4.2.0/README.md)) versions of this library. This library also uses: @@ -50,14 +51,14 @@ This library also uses: * jackson >= 2.7.0 * (optional) Zalando's [tracer-spring-boot-starter](https://github.com/zalando/tracer) * (optional) Zalando's [tokens library](https://github.com/zalando/tokens) >= 0.10.0 - * Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though. + * Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though. To be used in zalando's k8s environment, you must at least use 0.11.0. ## Usage ### Setup -If you are using maven, include the library in your `pom.xml`: +If you are using Maven, include the library in your `pom.xml`: ```xml @@ -81,7 +82,7 @@ public class Application { } ``` -The library uses flyway migrations to set up its own database schema `nakadi_events`. +The library uses Flyway migrations to set up its own database schema `nakadi_events`. ### Nakadi communication configuration @@ -211,30 +212,43 @@ process step the event is reporting. ### Event snapshots (optional) A Snapshot event is a special type of data change event (data operation) defined by Nakadi. -It does not represent a change of the state of a resource, but a current snapshot of the state of the resource. +It does not represent a change of the state of a resource, but a current snapshot of its state. It can be useful to +bootstrap a new consumer or to recover from inconsistencies between sender and consumer after an incident. You can create snapshot events programmatically (using EventLogWriter.fireSnapshotEvent), but usually snapshot event creation is a irregular, manually triggered maintenance task. This library provides a Spring Boot Actuator endpoint named `snapshot_event_creation` that can be used to trigger a Snapshot for a given event type. Assuming your management port is set to `7979`, - GET localhost:7979/snapshot_event_creation + GET localhost:7979/actuator/snapshot-event-creation will return a list of all event types available for snapshot creation and - POST localhost:7979/snapshot_event_creation/my.event-type + POST localhost:7979/actuator/snapshot-event-creation/my.event-type -will trigger a snapshot for the event type `my.event-type`. The (optional) request body is a "filter specifier". +will trigger a snapshot for the event type `my.event-type`. You can change the port, the authentication scheme and the +path prefix as part of your Spring Boot Actuator configuration. -This will only work if your application has configured spring-boot-actuator +You can provide an optional filter specifier that will be passed to your application to implement any application +specific event/entity filtering logic. It can be provided either as a query parameter called `filter`, or as a +request body + + {"filter":"myFilter"} + +This endpoint will only work if your application includes spring-boot-actuator, ```xml org.springframework.boot spring-boot-starter-actuator ``` -and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface. Otherwise (or if the generator is not for the event type you requested), the library will respond with an error message when you request a snapshot creation. -The request body (the "filter specifier") of the trigger request will be passed as a string parameter to the SnapshotEventGenerator's `generateSnapshots` method. +your `application.properties` includes +``` +management.endpoints.web.exposure.include=snapshot-event-creation,your-other-endpoints,...` +``` +and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface. +The optional filter specifier of the trigger request will be passed as a string parameter to the +SnapshotEventGenerator's `generateSnapshots` method and may be null, if none is given. We provide a `SimpleSnapshotEventGenerator` to ease bean creation using a more functional style: ```java @@ -269,13 +283,13 @@ tracer: By default, the library will pick up your flyway data source (or the primary data source if no flyway data source is configured), create its own schema and start setting up its tables in there. You can customize this process in two ways: -If you want to use a different data source for schema maintainence (for example to use a different username) and -configuring the Spring flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate +If you want to use a different data source for schema maintenance (for example to use a different username) and +configuring the Spring Flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate it with `@NakadiProducerDataSource`. -You may also define a spring bean of type `FlywayCallback` and annotate it with `@NakadiProducerFlywayCallback`. The -interface provide several hook into the schema management lifecycle that may, for example, be used to - `SET ROLE migrator` before and `RESET ROLE` after each migration. +You may also define a spring bean of type `NakadiProducerFlywayCallback`. The interface provides several hooks into the +schema management lifecycle that may, for example, be used to `SET ROLE migrator` before and `RESET ROLE` after each +migration. ### Test support diff --git a/nakadi-producer-spring-boot-starter/pom.xml b/nakadi-producer-spring-boot-starter/pom.xml index 7dfb86f5..57679bc2 100644 --- a/nakadi-producer-spring-boot-starter/pom.xml +++ b/nakadi-producer-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 4.2.0 + 5.0.0-SNAPSHOT nakadi-producer-spring-boot-starter @@ -102,6 +102,14 @@ spring-boot-starter-web test + + org.springframework.boot + spring-boot-autoconfigure + + + org.springframework.boot + spring-boot-actuator-autoconfigure + diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/FlywayMigrator.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/FlywayMigrator.java index fa09a32d..bf3570f4 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/FlywayMigrator.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/FlywayMigrator.java @@ -4,11 +4,18 @@ import javax.sql.DataSource; import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.MigrationInfo; +import org.flywaydb.core.api.callback.BaseFlywayCallback; import org.flywaydb.core.api.callback.FlywayCallback; +import org.flywaydb.core.api.configuration.ConfigurationAware; +import org.flywaydb.core.api.configuration.FlywayConfiguration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.flyway.FlywayDataSource; import org.springframework.boot.autoconfigure.flyway.FlywayProperties; +import java.sql.Connection; +import java.util.List; + public class FlywayMigrator { @Autowired(required = false) @NakadiProducerFlywayDataSource @@ -22,10 +29,9 @@ public class FlywayMigrator { private DataSource dataSource; @Autowired(required = false) - @NakadiProducerFlywayCallback - private FlywayCallback callback; + private List callbacks; - @Autowired + @Autowired(required = false) private FlywayProperties flywayProperties; @PostConstruct @@ -34,7 +40,7 @@ public void migrateFlyway() { if (this.nakadiProducerFlywayDataSource != null) { flyway.setDataSource(nakadiProducerFlywayDataSource); - } else if (this.flywayProperties.isCreateDataSource()) { + } else if (this.flywayProperties != null && this.flywayProperties.isCreateDataSource()) { flyway.setDataSource(this.flywayProperties.getUrl(), this.flywayProperties.getUser(), this.flywayProperties.getPassword(), this.flywayProperties.getInitSqls().toArray(new String[0])); @@ -46,11 +52,118 @@ public void migrateFlyway() { flyway.setLocations("classpath:db_nakadiproducer/migrations"); flyway.setSchemas("nakadi_events"); - if (callback != null) { - flyway.setCallbacks(callback); + if (callbacks != null) { + flyway.setCallbacks(callbacks.stream().map(FlywayCallbackAdapter::new).toArray(FlywayCallback[]::new)); } + flyway.setBaselineOnMigrate(true); flyway.setBaselineVersionAsString("2133546886.1.0"); flyway.migrate(); } + + private static class FlywayCallbackAdapter extends BaseFlywayCallback { + + private NakadiProducerFlywayCallback callback; + + private FlywayCallbackAdapter(NakadiProducerFlywayCallback callback) { + this.callback = callback; + } + + @Override + public void setFlywayConfiguration(FlywayConfiguration flywayConfiguration) { + if (callback instanceof ConfigurationAware) { + ((ConfigurationAware) callback).setFlywayConfiguration(flywayConfiguration); + } + } + + @Override + public void beforeClean(Connection connection) { + callback.beforeClean(connection); + } + + @Override + public void afterClean(Connection connection) { + callback.afterClean(connection); + } + + @Override + public void beforeMigrate(Connection connection) { + callback.beforeMigrate(connection); + } + + @Override + public void afterMigrate(Connection connection) { + callback.afterMigrate(connection); + } + + @Override + public void beforeEachMigrate(Connection connection, MigrationInfo info) { + callback.beforeEachMigrate(connection, info); + } + + @Override + public void afterEachMigrate(Connection connection, MigrationInfo info) { + callback.afterEachMigrate(connection, info); + } + + @Override + public void beforeUndo(Connection connection) { + callback.beforeUndo(connection); + } + + @Override + public void beforeEachUndo(Connection connection, MigrationInfo info) { + callback.beforeEachUndo(connection, info); + } + + @Override + public void afterEachUndo(Connection connection, MigrationInfo info) { + callback.afterEachUndo(connection, info); + } + + @Override + public void afterUndo(Connection connection) { + callback.afterUndo(connection); + } + + @Override + public void beforeValidate(Connection connection) { + callback.beforeValidate(connection); + } + + @Override + public void afterValidate(Connection connection) { + callback.afterValidate(connection); + } + + @Override + public void beforeBaseline(Connection connection) { + callback.beforeBaseline(connection); + } + + @Override + public void afterBaseline(Connection connection) { + callback.afterBaseline(connection); + } + + @Override + public void beforeRepair(Connection connection) { + callback.beforeRepair(connection); + } + + @Override + public void afterRepair(Connection connection) { + callback.afterRepair(connection); + } + + @Override + public void beforeInfo(Connection connection) { + callback.beforeInfo(connection); + } + + @Override + public void afterInfo(Connection connection) { + callback.afterInfo(connection); + } + } } diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java index 29816e52..0ad70345 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java @@ -1,17 +1,13 @@ package org.zalando.nakadiproducer; -import static java.util.stream.Collectors.toList; - import java.net.URI; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.stream.Stream; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.actuate.autoconfigure.ManagementContextConfiguration; -import org.springframework.boot.actuate.condition.ConditionalOnEnabledEndpoint; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -30,13 +26,9 @@ import org.zalando.nakadiproducer.flowid.FlowIdComponent; import org.zalando.nakadiproducer.flowid.NoopFlowIdComponent; import org.zalando.nakadiproducer.flowid.TracerFlowIdComponent; -import org.zalando.nakadiproducer.snapshots.SimpleSnapshotEventGenerator; -import org.zalando.nakadiproducer.snapshots.Snapshot; import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator; -import org.zalando.nakadiproducer.snapshots.SnapshotEventProvider; import org.zalando.nakadiproducer.snapshots.impl.SnapshotCreationService; import org.zalando.nakadiproducer.snapshots.impl.SnapshotEventCreationEndpoint; -import org.zalando.nakadiproducer.snapshots.impl.SnapshotEventCreationMvcEndpoint; import org.zalando.nakadiproducer.transmission.NakadiPublishingClient; import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService; import org.zalando.nakadiproducer.transmission.impl.EventTransmitter; @@ -111,70 +103,23 @@ public FlowIdComponent flowIdComponent() { } } - @ManagementContextConfiguration - static class ManagementEndpointConfiguration { - @Bean - @ConditionalOnMissingBean - public SnapshotEventCreationEndpoint snapshotEventCreationEndpoint( - SnapshotCreationService snapshotCreationService) { - return new SnapshotEventCreationEndpoint(snapshotCreationService); - } - - @Bean - @ConditionalOnBean(SnapshotEventCreationEndpoint.class) - @ConditionalOnEnabledEndpoint("snapshot_event_creation") - public SnapshotEventCreationMvcEndpoint snapshotEventCreationMvcEndpoint( - SnapshotEventCreationEndpoint snapshotEventCreationEndpoint) { - return new SnapshotEventCreationMvcEndpoint(snapshotEventCreationEndpoint); - } + @Bean + @ConditionalOnMissingBean + public SnapshotEventCreationEndpoint snapshotEventCreationEndpoint( + SnapshotCreationService snapshotCreationService) { + return new SnapshotEventCreationEndpoint(snapshotCreationService); } @Bean public SnapshotCreationService snapshotCreationService( Optional> snapshotEventGenerators, - Optional snapshotEventProvider, EventLogWriter eventLogWriter) { - final Stream legacyGenerators = - snapshotEventProvider.map(this::wrapInSnapshotEventGenerators) - .orElseGet(Stream::empty); - final Stream nonLegacyGenerators = - snapshotEventGenerators.map(List::stream) - .orElseGet(Stream::empty); - final List allGenerators = - Stream.concat(legacyGenerators, nonLegacyGenerators) - .collect(toList()); - return new SnapshotCreationService(allGenerators, eventLogWriter); - } - - /** - * This method (and the following three) support the legacy {@link SnapshotEventProvider} interface, - * mapping it to the new logic (several {@link SnapshotEventGenerator}s). - * - * It will be removed when we don't support that interface anymore. - */ - private Stream wrapInSnapshotEventGenerators(SnapshotEventProvider p) { - return p.getSupportedEventTypes() - .stream() - .map(t -> wrapInSnapshotEventGenerator(p, t)); - } - - private SnapshotEventGenerator wrapInSnapshotEventGenerator(SnapshotEventProvider provider, String eventType) { - return new SimpleSnapshotEventGenerator( - eventType, - (cursor) -> createNonLegacySnapshots(provider, eventType, cursor) + EventLogWriter eventLogWriter) { + return new SnapshotCreationService( + snapshotEventGenerators.orElse(Collections.emptyList()), + eventLogWriter ); } - private List createNonLegacySnapshots(SnapshotEventProvider provider, String eventType, Object cursor) { - return provider.getSnapshot(eventType, cursor) - .stream() - .map(this::mapLegacyToNewSnapshot) - .collect(toList()); - } - - private Snapshot mapLegacyToNewSnapshot(SnapshotEventProvider.Snapshot snapshot) { - return new Snapshot(snapshot.getId(), snapshot.getDataType(), snapshot.getData()); - } - @Bean public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, ObjectMapper objectMapper, FlowIdComponent flowIdComponent) { diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java index e5552e12..bf7e4ff9 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallback.java @@ -1,18 +1,148 @@ package org.zalando.nakadiproducer; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import org.flywaydb.core.api.MigrationInfo; +import org.flywaydb.core.api.configuration.ConfigurationAware; -import org.springframework.beans.factory.annotation.Qualifier; +import java.sql.Connection; /** - * Qualifier annotation for a FlywayCallback to be injected in to nakadi-producer's Flyway instance. + * This is the main callback interface that should be implemented to get access to flyway lifecycle notifications. + * Simply add code to the callback method you are interested in having. + * + *

If a callback also implements the {@link ConfigurationAware} interface, + * a {@link org.flywaydb.core.api.configuration.FlywayConfiguration} object will automatically be injected before + * calling any methods, giving the callback access to the core flyway configuration.

+ * + *

Each callback method will run within its own transaction.

*/ -@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, - ElementType.ANNOTATION_TYPE }) -@Retention(RetentionPolicy.RUNTIME) -@Qualifier -public @interface NakadiProducerFlywayCallback { +public interface NakadiProducerFlywayCallback { + /** + * Runs before the clean task executes. + * + * @param connection A valid connection to the database. + */ + default void beforeClean(Connection connection) {} + + /** + * Runs after the clean task executes. + * + * @param connection A valid connection to the database. + */ + default void afterClean(Connection connection) {}; + + /** + * Runs before the migrate task executes. + * + * @param connection A valid connection to the database. + */ + default void beforeMigrate(Connection connection) {}; + + /** + * Runs after the migrate task executes. + * + * @param connection A valid connection to the database. + */ + default void afterMigrate(Connection connection) {} + + /** + * Runs before the undo task executes. + * + * @param connection A valid connection to the database. + */ + default void beforeUndo(Connection connection) {} + + /** + * Runs before each migration script is undone. + * + * @param connection A valid connection to the database. + * @param info The current MigrationInfo for the migration to be undone. + */ + default void beforeEachUndo(Connection connection, MigrationInfo info) {} + + /** + * Runs after each migration script is undone. + * + * @param connection A valid connection to the database. + * @param info The current MigrationInfo for the migration just undone. + */ + default void afterEachUndo(Connection connection, MigrationInfo info) {} + + /** + * Runs after the undo task executes. + * + * @param connection A valid connection to the database. + */ + default void afterUndo(Connection connection) {} + + /** + * Runs before each migration script is executed. + * + * @param connection A valid connection to the database. + * @param info The current MigrationInfo for this migration. + */ + default void beforeEachMigrate(Connection connection, MigrationInfo info) {} + + /** + * Runs after each migration script is executed. + * + * @param connection A valid connection to the database. + * @param info The current MigrationInfo for this migration. + */ + default void afterEachMigrate(Connection connection, MigrationInfo info) {} + + /** + * Runs before the validate task executes. + * + * @param connection A valid connection to the database. + */ + default void beforeValidate(Connection connection) {} + + /** + * Runs after the validate task executes. + * + * @param connection A valid connection to the database. + */ + default void afterValidate(Connection connection) {} + + /** + * Runs before the baseline task executes. + * + * @param connection A valid connection to the database. + */ + default void beforeBaseline(Connection connection) {} + + /** + * Runs after the baseline task executes. + * + * @param connection A valid connection to the database. + */ + default void afterBaseline(Connection connection) {} + + /** + * Runs before the repair task executes. + * + * @param connection A valid connection to the database. + */ + default void beforeRepair(Connection connection) {} + + /** + * Runs after the repair task executes. + * + * @param connection A valid connection to the database. + */ + default void afterRepair(Connection connection) {} + + /** + * Runs before the info task executes. + * + * @param connection A valid connection to the database. + */ + default void beforeInfo(Connection connection) {} + + /** + * Runs after the info task executes. + * + * @param connection A valid connection to the database. + */ + default void afterInfo(Connection connection) {} } diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java index 3545fbf9..ef423054 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationEndpoint.java @@ -2,28 +2,36 @@ import java.util.Set; -import org.springframework.boot.actuate.endpoint.AbstractEndpoint; -import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.actuate.endpoint.annotation.Endpoint; +import org.springframework.boot.actuate.endpoint.annotation.ReadOperation; +import org.springframework.boot.actuate.endpoint.annotation.Selector; +import org.springframework.boot.actuate.endpoint.annotation.WriteOperation; import lombok.AllArgsConstructor; import lombok.Getter; +import org.springframework.lang.Nullable; -@ConfigurationProperties("endpoints.snapshot-event-creation") -public class SnapshotEventCreationEndpoint extends AbstractEndpoint { +@Endpoint(id = "snapshot-event-creation") +public class SnapshotEventCreationEndpoint { private final SnapshotCreationService snapshotCreationService; public SnapshotEventCreationEndpoint(SnapshotCreationService snapshotCreationService) { - super("snapshot_event_creation", true, true); this.snapshotCreationService = snapshotCreationService; } - @Override - public SnapshotReport invoke() { + @ReadOperation + public SnapshotReport getSupportedEventTypes() { return new SnapshotReport(snapshotCreationService.getSupportedEventTypes()); } - public void invoke(String eventType, String filter) { - snapshotCreationService.createSnapshotEvents(eventType, filter); + @WriteOperation + public void createFilteredSnapshotEvents( + // this is the event type. Could have a better name, but since Spring Boot relies on the -parameters + // compiler flag being set to resolve path parameter names, it would then get trickier to reliably run this + // Test in the IDE. So let's stick with arg0 for now. + @Selector String arg0, + @Nullable String filter) { + snapshotCreationService.createSnapshotEvents(arg0, filter); } diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java deleted file mode 100644 index fffc577f..00000000 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotEventCreationMvcEndpoint.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.zalando.nakadiproducer.snapshots.impl; - -import org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter; -import org.springframework.boot.actuate.endpoint.mvc.HypermediaDisabled; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.ResponseBody; - -public class SnapshotEventCreationMvcEndpoint extends EndpointMvcAdapter { - private final SnapshotEventCreationEndpoint delegate; - - public SnapshotEventCreationMvcEndpoint(SnapshotEventCreationEndpoint delegate) { - super(delegate); - this.delegate = delegate; - } - - @RequestMapping(value = "/{eventType:.*}", method = RequestMethod.POST) - @ResponseBody - @HypermediaDisabled - public ResponseEntity createSnapshot(@PathVariable String eventType, - @RequestBody(required = false) String filter) { - if (!this.delegate.isEnabled()) { - // Shouldn't happen - MVC endpoint shouldn't be registered when delegate's - // disabled - return getDisabledResponse(); - } - - delegate.invoke(eventType, filter); - return ResponseEntity.ok().build(); - } - - -} diff --git a/nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories b/nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 7fac8de6..00000000 --- a/nakadi-producer-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,2 +0,0 @@ -# org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.zalando.nakadiproducer.NakadiProducerAutoConfiguration -org.springframework.boot.actuate.autoconfigure.ManagementContextConfiguration=org.zalando.nakadiproducer.NakadiProducerAutoConfiguration.ManagementEndpointConfiguration \ No newline at end of file diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java index 638a0c82..5d2e6275 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NakadiProducerFlywayCallbackIT.java @@ -1,23 +1,42 @@ package org.zalando.nakadiproducer; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.sql.Connection; -import org.flywaydb.core.api.callback.FlywayCallback; +import org.flywaydb.core.api.configuration.ConfigurationAware; +import org.flywaydb.core.api.configuration.FlywayConfiguration; import org.junit.Test; +import org.mockito.InOrder; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.annotation.DirtiesContext; public class NakadiProducerFlywayCallbackIT extends BaseMockedExternalCommunicationIT { @MockBean - @NakadiProducerFlywayCallback - private FlywayCallback flywayCallback; + private NakadiProducerFlywayCallback flywayCallback; + + @MockBean + private ConfigurationAwareNakadiProducerFlywayCallback configurationAwareNakadiProducerFlywayCallback; @Test + @DirtiesContext // Needed to make sure that flyway gets executed for each of the tests and Callbacks are called again public void flywayCallbackIsCalledIfAnnotatedWithQualifierAnnotation() { - verify(flywayCallback, times(1)).beforeValidate(any(Connection.class)); + verify(flywayCallback, times(1)).beforeMigrate(any(Connection.class)); + } + + @Test + @DirtiesContext // Needed to make sure that flyway gets executed for each of the tests and Callbacks are called again + public void flywayConfigurationIsSetIfCallbackIsConfigurationAware() { + InOrder inOrder = inOrder(configurationAwareNakadiProducerFlywayCallback); + inOrder.verify(configurationAwareNakadiProducerFlywayCallback).setFlywayConfiguration(any(FlywayConfiguration.class)); + inOrder.verify(configurationAwareNakadiProducerFlywayCallback, times(1)).beforeMigrate(any(Connection.class)); + + } + + public interface ConfigurationAwareNakadiProducerFlywayCallback extends NakadiProducerFlywayCallback, ConfigurationAware { } } diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java index 59330475..e6f19af7 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/NonNakadiProducerFlywayCallbackIT.java @@ -8,15 +8,34 @@ import org.flywaydb.core.api.callback.FlywayCallback; import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; +import org.zalando.nakadiproducer.config.EmbeddedDataSourceConfig; -public class NonNakadiProducerFlywayCallbackIT extends BaseMockedExternalCommunicationIT { +@ActiveProfiles("test") +@RunWith(SpringRunner.class) +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.MOCK, + properties = { "zalando.team.id:alpha-local-testing", "nakadi-producer.scheduled-transmission-enabled:false", "spring.flyway.enabled:false"}, + classes = { TestApplication.class, EmbeddedDataSourceConfig.class } +) +public class NonNakadiProducerFlywayCallbackIT { @MockBean private FlywayCallback flywayCallback; @Test - public void flywayCallbackIsNotUsedIfNotAnnotatedWithQualifierAnnotation() { + public void flywayCallbacksFromOurHostApplicationAreNotUsedByUs() { verify(flywayCallback, never()).beforeValidate(any(Connection.class)); } + + @Test + public void ourOwnFlywayConfigurationStillWorksFineWhenSpringsFlywayAutoconfigIsDisabled() { + // Yes, this is redundant to the other test in here. + // We consider it important to document the requirement, so it is here nonetheless. + // The test setup done by the class annotations does just enough to test it + } } diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java index 5569eb21..c61f4c80 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGenerationWebEndpointIT.java @@ -2,13 +2,15 @@ import static com.jayway.restassured.RestAssured.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.actuate.autoconfigure.LocalManagementPort; +import org.springframework.boot.actuate.autoconfigure.web.server.LocalManagementPort; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -21,13 +23,18 @@ @RunWith(SpringRunner.class) @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, - properties = { "management.security.enabled=false", "zalando.team.id:alpha-local-testing", "nakadi-producer.scheduled-transmission-enabled:false" }, + properties = { + "management.security.enabled=false", + "zalando.team.id:alpha-local-testing", + "nakadi-producer.scheduled-transmission-enabled:false", + "management.endpoints.web.exposure.include:snapshot-event-creation" + }, classes = { TestApplication.class, EmbeddedDataSourceConfig.class, SnapshotEventGenerationWebEndpointIT.Config.class } ) public class SnapshotEventGenerationWebEndpointIT { private static final String MY_EVENT_TYPE = "my.event-type"; - private static final String MY_REQUEST_BODY = "my request body"; + private static final String FILTER = "myRequestBody"; @LocalManagementPort private int managementPort; @@ -35,21 +42,38 @@ public class SnapshotEventGenerationWebEndpointIT { @Autowired private SnapshotEventGenerator snapshotEventGenerator; + @Before + public void resetMocks() { + reset(snapshotEventGenerator); + } + + @Test + public void passesFilterIfPresentInUrl() { + given().baseUri("http://localhost:" + managementPort) + .contentType("application/json") + .when().post("/actuator/snapshot-event-creation/" + MY_EVENT_TYPE + "?filter=" + FILTER) + .then().statusCode(204); + + verify(snapshotEventGenerator).generateSnapshots(null, FILTER); + } + @Test - public void passesRequestBodyIfPresent() { + public void passesFilterIfPresentInBody() { given().baseUri("http://localhost:" + managementPort) - .body(MY_REQUEST_BODY) - .when().post("/snapshot_event_creation/" + MY_EVENT_TYPE) - .then().statusCode(200); + .contentType("application/json") + .body("{\"filter\":\"" + FILTER + "\"}") + .when().post("/actuator/snapshot-event-creation/" + MY_EVENT_TYPE) + .then().statusCode(204); - verify(snapshotEventGenerator).generateSnapshots(null, MY_REQUEST_BODY); + verify(snapshotEventGenerator).generateSnapshots(null, FILTER); } @Test - public void passesNullIfNoRequestBodyPresent() { + public void passesNullIfNoFilterIsPresent() { given().baseUri("http://localhost:" + managementPort) - .when().post("/snapshot_event_creation/" + MY_EVENT_TYPE) - .then().statusCode(200); + .contentType("application/json") + .when().post("/actuator/snapshot-event-creation/" + MY_EVENT_TYPE) + .then().statusCode(204); verify(snapshotEventGenerator).generateSnapshots(null, null); } diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java index 9b52eb24..37c00718 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/snapshots/SnapshotEventGeneratorAutoconfigurationIT.java @@ -3,9 +3,6 @@ import static org.junit.Assert.fail; import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -29,12 +26,6 @@ public void picksUpDefinedSnapshotEventProviders() { // expect no exceptions snapshotCreationService.createSnapshotEvents("B", ""); - // expect no exceptions - snapshotCreationService.createSnapshotEvents("C", ""); - - // expect no exceptions - snapshotCreationService.createSnapshotEvents("D", ""); - try { snapshotCreationService.createSnapshotEvents("not defined", ""); } catch (final UnknownEventTypeException e) { @@ -56,26 +47,5 @@ public SnapshotEventGenerator snapshotEventProviderA() { public SnapshotEventGenerator snapshotEventProviderB() { return new SimpleSnapshotEventGenerator("B", (x) -> Collections.emptyList()); } - - @Bean - public SnapshotEventProvider snapshotEventProviderCandD() { - return new SnapshotEventProvider() { - @Override - public List getSnapshot(String eventType, Object withIdGreaterThan) { - if (!getSupportedEventTypes().contains(eventType)) { - throw new UnknownEventTypeException(eventType); - } - return Collections.emptyList(); - } - - @Override - public Set getSupportedEventTypes() { - final HashSet types = new HashSet<>(); - types.add("C"); - types.add("D"); - return types; - } - }; - } } } diff --git a/nakadi-producer-starter-spring-boot-2-test/pom.xml b/nakadi-producer-starter-spring-boot-2-test/pom.xml index 1ad73e53..4e867ab2 100644 --- a/nakadi-producer-starter-spring-boot-2-test/pom.xml +++ b/nakadi-producer-starter-spring-boot-2-test/pom.xml @@ -8,16 +8,16 @@ - org.springframework.boot - spring-boot-starter-parent - 2.0.3.RELEASE + org.zalando + nakadi-producer-reactor + 5.0.0-SNAPSHOT org.zalando nakadi-producer-spring-boot-starter - 4.1.3 + ${project.version} org.springframework.boot @@ -44,6 +44,10 @@ + + org.springframework.boot + spring-boot-starter-actuator + org.springframework.boot spring-boot-starter-test @@ -55,6 +59,11 @@ 1.18.0 test + + io.rest-assured + rest-assured + 3.1.0 + diff --git a/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java b/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java index c7b2cd1f..30929521 100644 --- a/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java +++ b/nakadi-producer-starter-spring-boot-2-test/src/main/java/org/zalando/nakadiproducer/tests/Application.java @@ -6,10 +6,15 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.zalando.nakadiproducer.EnableNakadiProducer; +import org.zalando.nakadiproducer.snapshots.SimpleSnapshotEventGenerator; +import org.zalando.nakadiproducer.snapshots.Snapshot; +import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator; import javax.sql.DataSource; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; @EnableAutoConfiguration @EnableNakadiProducer @@ -29,4 +34,19 @@ public DataSource dataSource() throws IOException { public EmbeddedPostgres embeddedPostgres() throws IOException { return EmbeddedPostgres.start(); } + + @Bean + public SnapshotEventGenerator snapshotEventGenerator() { + return new SimpleSnapshotEventGenerator("eventtype", (withIdGreaterThan, filter) -> { + if (withIdGreaterThan == null) { + return Collections.singletonList(new Snapshot("1", "foo", (Object) filter)); + } else if (withIdGreaterThan.equals("1")) { + return Collections.singletonList(new Snapshot("2", "foo", (Object) filter)); + } else { + return new ArrayList<>(); + } + }); + + // Todo: Test that some events arrive at a local nakadi mock + } } diff --git a/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.properties b/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.properties deleted file mode 100644 index 83fa688c..00000000 --- a/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.properties +++ /dev/null @@ -1,2 +0,0 @@ -nakadi-producer.access-token-uri: http://localhost:1234 -nakadi-producer.nakadi-base-uri: https://nakadi.example.org:5432 \ No newline at end of file diff --git a/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.yml b/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.yml new file mode 100644 index 00000000..47ed5c12 --- /dev/null +++ b/nakadi-producer-starter-spring-boot-2-test/src/main/resources/application.yml @@ -0,0 +1,4 @@ +nakadi-producer: + access-token-uri: http://localhost:1234 + nakadi-base-uri: https://nakadi.example.org:5432 +management.endpoints.web.exposure.include: snapshot-event-creation \ No newline at end of file diff --git a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java index e827afef..cce10fd9 100644 --- a/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java +++ b/nakadi-producer-starter-spring-boot-2-test/src/test/java/org/zalando/nakadiproducer/tests/ApplicationIT.java @@ -5,17 +5,22 @@ import org.junit.Test; import org.junit.contrib.java.lang.system.EnvironmentVariables; import org.junit.runner.RunWith; +import org.springframework.boot.actuate.autoconfigure.web.server.LocalManagementPort; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.io.File; +import static io.restassured.RestAssured.given; + @RunWith(SpringRunner.class) @SpringBootTest( classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT ) public class ApplicationIT { + @LocalManagementPort + private int localManagementPort; @ClassRule public static final EnvironmentVariables environmentVariables @@ -27,7 +32,10 @@ public static void fakeCredentialsDir() { } @Test - public void foo() { + public void shouldSuccessfullyStartAndSnapshotCanBeTriggered() { + given().baseUri("http://localhost:" + localManagementPort).contentType("application/json") + .when().post("/actuator/snapshot-event-creation/eventtype") + .then().statusCode(204); } diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml index 9b53550c..e91aa8a6 100644 --- a/nakadi-producer/pom.xml +++ b/nakadi-producer/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 4.2.0 + 5.0.0-SNAPSHOT nakadi-producer diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java deleted file mode 100644 index 47ed4dd2..00000000 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/SnapshotEventProvider.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.zalando.nakadiproducer.snapshots; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -import java.util.List; -import java.util.Set; - -/** - * The {@code SnapshotEventProvider} interface should be implemented by any - * Event Producer that wants to support snapshot events feature. The - * class must define a method {@code getSnapshot}. - * - * @deprecated Please use one or more {@link SnapshotEventGenerator} instances instead - */ -@Deprecated -public interface SnapshotEventProvider { - - /** - * Returns a batch of snapshots of given type (event type is an event channel topic name). The implementation may - * return an arbitrary amount of results, but it must return at least one element if there are entities - * matching the parameters. - * - * The library will call your implementation like this: - * Request: getSnapshot(eventType, null) Response: 1,2,3 - * Request: getSnapshot(eventType, 3) Response: 4,5 - * Request: getSnapshot(eventType, 5) Response: emptyList - * It is your responsibility to make sure that the returned events are ordered by their id asc and that, given you - * return a list of events for entities with ids {id1, ..., idN}, there exists no entity with an id between id1 and idN, that - * is not part of the result. - * - * - * @param eventType event type to make a snapshot of - * @param withIdGreaterThan if not null, only events for entities with an id greater than the given one must be returned - * @return list of elements ordered by their id - * @throws UnknownEventTypeException if {@code eventType} is unknown - */ - @Deprecated - List getSnapshot(String eventType, Object withIdGreaterThan); - - @Deprecated - Set getSupportedEventTypes(); - - @AllArgsConstructor - @Getter - @Deprecated - class Snapshot { - private Object id; - private String eventType; - private String dataType; - private Object data; - } - -} diff --git a/pom.xml b/pom.xml index 67fa7bee..a2fe8c86 100644 --- a/pom.xml +++ b/pom.xml @@ -10,12 +10,12 @@ org.springframework.boot spring-boot-starter-parent - 1.5.3.RELEASE + 2.0.3.RELEASE nakadi-producer-reactor org.zalando - 4.2.0 + 5.0.0-SNAPSHOT pom Nakadi Event Producer Reactor