Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support spring boot 2 #89

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
859e013
Adding a spring boot 2 test. At the moment, it only tests successfull…
BGehrels Jul 9, 2018
49d61b1
Upgrading to Spring Boot 2, which will probably be a new major release
BGehrels Jul 9, 2018
f4a22b0
Merge branch 'master' into spring-boot-2-test
BGehrels Jul 9, 2018
6fb98e5
Merge remote-tracking branch 'origin/master' into spring-boot-2-test-new
BGehrels Jul 11, 2018
66040d4
Switching to Version 20.0.0 since it matches the 2.0 of Spring boot s…
BGehrels Jul 11, 2018
e74994e
better naming of a test, removed a dead import
BGehrels Jul 12, 2018
405ca3a
LBJ-982 Fixed actuator path in the readme
BGehrels Jul 17, 2018
2159c00
LBJ-982 make the wording of the docs more consistent regarding snapsh…
BGehrels Jul 18, 2018
dc6733d
LBJ-982 minor code beauty in the Flyway Migrator
BGehrels Jul 18, 2018
9a85b62
LBJ-982 added default implementations to all NakadiProducerFlywayCall…
BGehrels Jul 18, 2018
3a2cdd3
LBJ-982 verify that flyway config is set *before* the callbacks are c…
BGehrels Jul 18, 2018
09c486a
LBJ-982 verify that flyway config is set *before* the callbacks are c…
BGehrels Jul 18, 2018
7da346b
LBJ-982 add a test for filter specification in the request body
BGehrels Jul 18, 2018
501bc6a
LBJ-982 more precise docs
BGehrels Jul 18, 2018
e92b705
LBJ-982 use lambda instead of inner class
BGehrels Jul 18, 2018
9b2bb37
moved mok k8s token config to src/test
BGehrels Jul 18, 2018
e6ac451
fixed broken IT
BGehrels Jul 18, 2018
9f0993f
documented why i choose some weird variable name
BGehrels Jul 18, 2018
21a6fd6
removed unused imports
BGehrels Jul 18, 2018
d5f3c58
Some typo fixes in the README
ePaul Jul 18, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The goal of this Spring Boot starter is to simplify the reliable integration bet

There are already [multiple clients for the Nakadi REST API](https://zalando.github.io/nakadi/manual.html#using_clients), but none of them solves the mentioned issues.

We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchonously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded.
We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchronously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded.

The Transmitter generates a strictly monotonically increasing event id that can be used for ordering the events during retrieval. It is not guaranteed, that events will be sent to Nakadi in the order they have been produced. If an event could not be sent to Nakadi, the library will periodically retry the transmission.

Expand All @@ -40,7 +40,8 @@ You may of course always setup a fresh system with the newest version.

## Prerequisites

This library was tested with Spring Boot 1.5.3.RELEASE and relies on an existing configured PostgreSQL DataSource.
This library was tested with Spring Boot 2.0.3.RELEASE and relies on an existing configured PostgreSQL DataSource.
**If you are still using Spring Boot 1.x, please use versions < 20.0.0, they are still actively maintained ([Documentation](https://github.com/zalando-nakadi/nakadi-producer-spring-boot-starter/tree/spring-boot-1)).**

This library also uses:

Expand All @@ -50,14 +51,14 @@ This library also uses:
* jackson >= 2.7.0
* (optional) Zalando's [tracer-spring-boot-starter](https://github.com/zalando/tracer)
* (optional) Zalando's [tokens library](https://github.com/zalando/tokens) >= 0.10.0
* Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though.
* Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though. To be used in zalando's k8s environment, you must at least use 0.11.0.


## Usage

### Setup

If you are using maven, include the library in your `pom.xml`:
If you are using Maven, include the library in your `pom.xml`:

```xml
<dependency>
Expand All @@ -81,7 +82,7 @@ public class Application {
}
```

The library uses flyway migrations to set up its own database schema `nakadi_events`.
The library uses Flyway migrations to set up its own database schema `nakadi_events`.

### Nakadi communication configuration

Expand Down Expand Up @@ -211,30 +212,43 @@ process step the event is reporting.
### Event snapshots (optional)

A Snapshot event is a special type of data change event (data operation) defined by Nakadi.
It does not represent a change of the state of a resource, but a current snapshot of the state of the resource.
It does not represent a change of the state of a resource, but a current snapshot of its state. It can be useful to
bootstrap a new consumer or to recover from inconsistencies between sender and consumer after an incident.

You can create snapshot events programmatically (using EventLogWriter.fireSnapshotEvent), but usually snapshot event
creation is a irregular, manually triggered maintenance task.

This library provides a Spring Boot Actuator endpoint named `snapshot_event_creation` that can be used to trigger a Snapshot for a given event type. Assuming your management port is set to `7979`,

GET localhost:7979/snapshot_event_creation
GET localhost:7979/actuator/snapshot-event-creation

will return a list of all event types available for snapshot creation and

POST localhost:7979/snapshot_event_creation/my.event-type
POST localhost:7979/actuator/snapshot-event-creation/my.event-type

will trigger a snapshot for the event type `my.event-type`. The (optional) request body is a "filter specifier".
will trigger a snapshot for the event type `my.event-type`. You can change the port, the authentication scheme and the
path prefix as part of your Spring Boot Actuator configuration.

This will only work if your application has configured spring-boot-actuator
You can provide an optional filter specifier that will be passed to your application to implement any application
specific event/entity filtering logic. It can be provided either as a query parameter called `filter`, or as a
request body

{"filter":"myFilter"}

This endpoint will only work if your application includes spring-boot-actuator,
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
```
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface. Otherwise (or if the generator is not for the event type you requested), the library will respond with an error message when you request a snapshot creation.
The request body (the "filter specifier") of the trigger request will be passed as a string parameter to the SnapshotEventGenerator's `generateSnapshots` method.
your `application.properties` includes
```
management.endpoints.web.exposure.include=snapshot-event-creation,your-other-endpoints,...`
```
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface.
The optional filter specifier of the trigger request will be passed as a string parameter to the
SnapshotEventGenerator's `generateSnapshots` method and may be null, if none is given.

We provide a `SimpleSnapshotEventGenerator` to ease bean creation using a more functional style:
```java
Expand Down Expand Up @@ -269,13 +283,13 @@ tracer:
By default, the library will pick up your flyway data source (or the primary data source if no flyway data source is
configured), create its own schema and start setting up its tables in there. You can customize this process in two ways:

If you want to use a different data source for schema maintainence (for example to use a different username) and
configuring the Spring flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate
If you want to use a different data source for schema maintenance (for example to use a different username) and
configuring the Spring Flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate
it with `@NakadiProducerDataSource`.

You may also define a spring bean of type `FlywayCallback` and annotate it with `@NakadiProducerFlywayCallback`. The
interface provide several hook into the schema management lifecycle that may, for example, be used to
`SET ROLE migrator` before and `RESET ROLE` after each migration.
You may also define a spring bean of type `NakadiProducerFlywayCallback`. The interface provides several hooks into the
schema management lifecycle that may, for example, be used to `SET ROLE migrator` before and `RESET ROLE` after each
migration.

### Test support

Expand Down
10 changes: 9 additions & 1 deletion nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>4.2.0</version>
<version>20.0.0-SNAPSHOT</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand Down Expand Up @@ -102,6 +102,14 @@
<artifactId>spring-boot-starter-web</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator-autoconfigure</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@
import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.MigrationInfo;
import org.flywaydb.core.api.callback.BaseFlywayCallback;
import org.flywaydb.core.api.callback.FlywayCallback;
import org.flywaydb.core.api.configuration.ConfigurationAware;
import org.flywaydb.core.api.configuration.FlywayConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayDataSource;
import org.springframework.boot.autoconfigure.flyway.FlywayProperties;

import java.sql.Connection;
import java.util.List;

public class FlywayMigrator {
@Autowired(required = false)
@NakadiProducerFlywayDataSource
Expand All @@ -22,10 +29,9 @@ public class FlywayMigrator {
private DataSource dataSource;

@Autowired(required = false)
@NakadiProducerFlywayCallback
private FlywayCallback callback;
private List<NakadiProducerFlywayCallback> callbacks;

@Autowired
@Autowired(required = false)
private FlywayProperties flywayProperties;

@PostConstruct
Expand All @@ -34,7 +40,7 @@ public void migrateFlyway() {

if (this.nakadiProducerFlywayDataSource != null) {
flyway.setDataSource(nakadiProducerFlywayDataSource);
} else if (this.flywayProperties.isCreateDataSource()) {
} else if (this.flywayProperties != null && this.flywayProperties.isCreateDataSource()) {
flyway.setDataSource(this.flywayProperties.getUrl(), this.flywayProperties.getUser(),
this.flywayProperties.getPassword(),
this.flywayProperties.getInitSqls().toArray(new String[0]));
Expand All @@ -46,11 +52,118 @@ public void migrateFlyway() {

flyway.setLocations("classpath:db_nakadiproducer/migrations");
flyway.setSchemas("nakadi_events");
if (callback != null) {
flyway.setCallbacks(callback);
if (callbacks != null) {
flyway.setCallbacks(callbacks.stream().map(FlywayCallbackAdapter::new).toArray(FlywayCallback[]::new));
}

flyway.setBaselineOnMigrate(true);
flyway.setBaselineVersionAsString("2133546886.1.0");
flyway.migrate();
}

private static class FlywayCallbackAdapter extends BaseFlywayCallback {

private NakadiProducerFlywayCallback callback;

private FlywayCallbackAdapter(NakadiProducerFlywayCallback callback) {
this.callback = callback;
}

@Override
public void setFlywayConfiguration(FlywayConfiguration flywayConfiguration) {
if (callback instanceof ConfigurationAware) {
((ConfigurationAware) callback).setFlywayConfiguration(flywayConfiguration);
}
}

@Override
public void beforeClean(Connection connection) {
callback.beforeClean(connection);
}

@Override
public void afterClean(Connection connection) {
callback.afterClean(connection);
}

@Override
public void beforeMigrate(Connection connection) {
callback.beforeMigrate(connection);
}

@Override
public void afterMigrate(Connection connection) {
callback.afterMigrate(connection);
}

@Override
public void beforeEachMigrate(Connection connection, MigrationInfo info) {
callback.beforeEachMigrate(connection, info);
}

@Override
public void afterEachMigrate(Connection connection, MigrationInfo info) {
callback.afterEachMigrate(connection, info);
}

@Override
public void beforeUndo(Connection connection) {
callback.beforeUndo(connection);
}

@Override
public void beforeEachUndo(Connection connection, MigrationInfo info) {
callback.beforeEachUndo(connection, info);
}

@Override
public void afterEachUndo(Connection connection, MigrationInfo info) {
callback.afterEachUndo(connection, info);
}

@Override
public void afterUndo(Connection connection) {
callback.afterUndo(connection);
}

@Override
public void beforeValidate(Connection connection) {
callback.beforeValidate(connection);
}

@Override
public void afterValidate(Connection connection) {
callback.afterValidate(connection);
}

@Override
public void beforeBaseline(Connection connection) {
callback.beforeBaseline(connection);
}

@Override
public void afterBaseline(Connection connection) {
callback.afterBaseline(connection);
}

@Override
public void beforeRepair(Connection connection) {
callback.beforeRepair(connection);
}

@Override
public void afterRepair(Connection connection) {
callback.afterRepair(connection);
}

@Override
public void beforeInfo(Connection connection) {
callback.beforeInfo(connection);
}

@Override
public void afterInfo(Connection connection) {
callback.afterInfo(connection);
}
}
}
Loading