Skip to content

Commit

Permalink
Merge pull request #87 from zalando-nakadi/spring-boot-2-test
Browse files Browse the repository at this point in the history
Support Spring Boot 2
  • Loading branch information
BGehrels authored Jul 18, 2018
2 parents c2ba401 + bcebbd2 commit 8064262
Show file tree
Hide file tree
Showing 22 changed files with 605 additions and 251 deletions.
48 changes: 31 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The goal of this Spring Boot starter is to simplify the reliable integration bet

There are already [multiple clients for the Nakadi REST API](https://zalando.github.io/nakadi/manual.html#using_clients), but none of them solves the mentioned issues.

We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchonously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded.
We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchronously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded.

The Transmitter generates a strictly monotonically increasing event id that can be used for ordering the events during retrieval. It is not guaranteed, that events will be sent to Nakadi in the order they have been produced. If an event could not be sent to Nakadi, the library will periodically retry the transmission.

Expand All @@ -40,7 +40,8 @@ You may of course always setup a fresh system with the newest version.

## Prerequisites

This library was tested with Spring Boot 1.5.3.RELEASE and relies on an existing configured PostgreSQL DataSource.
This library was tested with Spring Boot 2.0.3.RELEASE and relies on an existing configured PostgreSQL DataSource.
**If you are still using Spring Boot 1.x, please use versions < 20.0.0, they are still actively maintained ([Documentation](https://github.com/zalando-nakadi/nakadi-producer-spring-boot-starter/tree/spring-boot-1)).**

This library also uses:

Expand All @@ -50,14 +51,14 @@ This library also uses:
* jackson >= 2.7.0
* (optional) Zalando's [tracer-spring-boot-starter](https://github.com/zalando/tracer)
* (optional) Zalando's [tokens library](https://github.com/zalando/tokens) >= 0.10.0
* Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though.
* Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though. To be used in zalando's k8s environment, you must at least use 0.11.0.


## Usage

### Setup

If you are using maven, include the library in your `pom.xml`:
If you are using Maven, include the library in your `pom.xml`:

```xml
<dependency>
Expand All @@ -81,7 +82,7 @@ public class Application {
}
```

The library uses flyway migrations to set up its own database schema `nakadi_events`.
The library uses Flyway migrations to set up its own database schema `nakadi_events`.

### Nakadi communication configuration

Expand Down Expand Up @@ -211,30 +212,43 @@ process step the event is reporting.
### Event snapshots (optional)

A Snapshot event is a special type of data change event (data operation) defined by Nakadi.
It does not represent a change of the state of a resource, but a current snapshot of the state of the resource.
It does not represent a change of the state of a resource, but a current snapshot of its state. It can be useful to
bootstrap a new consumer or to recover from inconsistencies between sender and consumer after an incident.

You can create snapshot events programmatically (using EventLogWriter.fireSnapshotEvent), but usually snapshot event
creation is a irregular, manually triggered maintenance task.

This library provides a Spring Boot Actuator endpoint named `snapshot_event_creation` that can be used to trigger a Snapshot for a given event type. Assuming your management port is set to `7979`,

GET localhost:7979/snapshot_event_creation
GET localhost:7979/actuator/snapshot-event-creation

will return a list of all event types available for snapshot creation and

POST localhost:7979/snapshot_event_creation/my.event-type
POST localhost:7979/actuator/snapshot-event-creation/my.event-type

will trigger a snapshot for the event type `my.event-type`. The (optional) request body is a "filter specifier".
will trigger a snapshot for the event type `my.event-type`. You can change the port, the authentication scheme and the
path prefix as part of your Spring Boot Actuator configuration.

This will only work if your application has configured spring-boot-actuator
You can provide an optional filter specifier that will be passed to your application to implement any application
specific event/entity filtering logic. It can be provided either as a query parameter called `filter`, or as a
request body

{"filter":"myFilter"}

This endpoint will only work if your application includes spring-boot-actuator,
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
```
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface. Otherwise (or if the generator is not for the event type you requested), the library will respond with an error message when you request a snapshot creation.
The request body (the "filter specifier") of the trigger request will be passed as a string parameter to the SnapshotEventGenerator's `generateSnapshots` method.
your `application.properties` includes
```
management.endpoints.web.exposure.include=snapshot-event-creation,your-other-endpoints,...`
```
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface.
The optional filter specifier of the trigger request will be passed as a string parameter to the
SnapshotEventGenerator's `generateSnapshots` method and may be null, if none is given.
We provide a `SimpleSnapshotEventGenerator` to ease bean creation using a more functional style:
```java
Expand Down Expand Up @@ -269,13 +283,13 @@ tracer:
By default, the library will pick up your flyway data source (or the primary data source if no flyway data source is
configured), create its own schema and start setting up its tables in there. You can customize this process in two ways:
If you want to use a different data source for schema maintainence (for example to use a different username) and
configuring the Spring flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate
If you want to use a different data source for schema maintenance (for example to use a different username) and
configuring the Spring Flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate
it with `@NakadiProducerDataSource`.

You may also define a spring bean of type `FlywayCallback` and annotate it with `@NakadiProducerFlywayCallback`. The
interface provide several hook into the schema management lifecycle that may, for example, be used to
`SET ROLE migrator` before and `RESET ROLE` after each migration.
You may also define a spring bean of type `NakadiProducerFlywayCallback`. The interface provides several hooks into the
schema management lifecycle that may, for example, be used to `SET ROLE migrator` before and `RESET ROLE` after each
migration.

### Test support

Expand Down
10 changes: 9 additions & 1 deletion nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>4.2.0</version>
<version>20.0.0-SNAPSHOT</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand Down Expand Up @@ -102,6 +102,14 @@
<artifactId>spring-boot-starter-web</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator-autoconfigure</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@
import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.MigrationInfo;
import org.flywaydb.core.api.callback.BaseFlywayCallback;
import org.flywaydb.core.api.callback.FlywayCallback;
import org.flywaydb.core.api.configuration.ConfigurationAware;
import org.flywaydb.core.api.configuration.FlywayConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayDataSource;
import org.springframework.boot.autoconfigure.flyway.FlywayProperties;

import java.sql.Connection;
import java.util.List;

public class FlywayMigrator {
@Autowired(required = false)
@NakadiProducerFlywayDataSource
Expand All @@ -22,10 +29,9 @@ public class FlywayMigrator {
private DataSource dataSource;

@Autowired(required = false)
@NakadiProducerFlywayCallback
private FlywayCallback callback;
private List<NakadiProducerFlywayCallback> callbacks;

@Autowired
@Autowired(required = false)
private FlywayProperties flywayProperties;

@PostConstruct
Expand All @@ -34,7 +40,7 @@ public void migrateFlyway() {

if (this.nakadiProducerFlywayDataSource != null) {
flyway.setDataSource(nakadiProducerFlywayDataSource);
} else if (this.flywayProperties.isCreateDataSource()) {
} else if (this.flywayProperties != null && this.flywayProperties.isCreateDataSource()) {
flyway.setDataSource(this.flywayProperties.getUrl(), this.flywayProperties.getUser(),
this.flywayProperties.getPassword(),
this.flywayProperties.getInitSqls().toArray(new String[0]));
Expand All @@ -46,11 +52,118 @@ public void migrateFlyway() {

flyway.setLocations("classpath:db_nakadiproducer/migrations");
flyway.setSchemas("nakadi_events");
if (callback != null) {
flyway.setCallbacks(callback);
if (callbacks != null) {
flyway.setCallbacks(callbacks.stream().map(FlywayCallbackAdapter::new).toArray(FlywayCallback[]::new));
}

flyway.setBaselineOnMigrate(true);
flyway.setBaselineVersionAsString("2133546886.1.0");
flyway.migrate();
}

private static class FlywayCallbackAdapter extends BaseFlywayCallback {

private NakadiProducerFlywayCallback callback;

private FlywayCallbackAdapter(NakadiProducerFlywayCallback callback) {
this.callback = callback;
}

@Override
public void setFlywayConfiguration(FlywayConfiguration flywayConfiguration) {
if (callback instanceof ConfigurationAware) {
((ConfigurationAware) callback).setFlywayConfiguration(flywayConfiguration);
}
}

@Override
public void beforeClean(Connection connection) {
callback.beforeClean(connection);
}

@Override
public void afterClean(Connection connection) {
callback.afterClean(connection);
}

@Override
public void beforeMigrate(Connection connection) {
callback.beforeMigrate(connection);
}

@Override
public void afterMigrate(Connection connection) {
callback.afterMigrate(connection);
}

@Override
public void beforeEachMigrate(Connection connection, MigrationInfo info) {
callback.beforeEachMigrate(connection, info);
}

@Override
public void afterEachMigrate(Connection connection, MigrationInfo info) {
callback.afterEachMigrate(connection, info);
}

@Override
public void beforeUndo(Connection connection) {
callback.beforeUndo(connection);
}

@Override
public void beforeEachUndo(Connection connection, MigrationInfo info) {
callback.beforeEachUndo(connection, info);
}

@Override
public void afterEachUndo(Connection connection, MigrationInfo info) {
callback.afterEachUndo(connection, info);
}

@Override
public void afterUndo(Connection connection) {
callback.afterUndo(connection);
}

@Override
public void beforeValidate(Connection connection) {
callback.beforeValidate(connection);
}

@Override
public void afterValidate(Connection connection) {
callback.afterValidate(connection);
}

@Override
public void beforeBaseline(Connection connection) {
callback.beforeBaseline(connection);
}

@Override
public void afterBaseline(Connection connection) {
callback.afterBaseline(connection);
}

@Override
public void beforeRepair(Connection connection) {
callback.beforeRepair(connection);
}

@Override
public void afterRepair(Connection connection) {
callback.afterRepair(connection);
}

@Override
public void beforeInfo(Connection connection) {
callback.beforeInfo(connection);
}

@Override
public void afterInfo(Connection connection) {
callback.afterInfo(connection);
}
}
}
Loading

0 comments on commit 8064262

Please sign in to comment.