Skip to content

Commit

Permalink
zalando-nakadi#140 Persist multiple events in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
fbrns committed Nov 12, 2021
1 parent accc43b commit 908a03a
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 132 deletions.
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ If you do not use the STUPS Tokens library, you can implement token retrieval yo

The typical use case for this library is to publish events like creating or updating of some objects.

In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java) service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.
In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java)
service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.

To store events in bulk the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used.

You normally don't need to call `fireSnapshotEvent` directly, see below for [snapshot creation](#event-snapshots-optional).

Expand Down Expand Up @@ -203,6 +206,34 @@ For business events, you have just two parameters, the **eventType** and the eve
You usually should fire those also in the same transaction as you are storing the results of the
process step the event is reporting.

Example of using `fireCreateEvents`:

```java
@Service
public class SomeYourService {
@Autowired
private EventLogWriter eventLogWriter;
@Autowired
private WarehouseRepository repository;
@Transactional
public void createObjects(Collections<Warehouse> data) {
// here we store an object in a database table
repository.saveAll(data);
// then we group the data by dataType
Map<String, Collection<Object>> groupedData = Map.of("wholesale:warehouse", data);
// and then in the same transaction we save the events about the object creation
eventLogWriter.fireCreateEvents("wholesale.warehouse-change-event", groupedData);
}
}
```



### Event snapshots (optional)

Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-loadtest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.2</version>
<version>20.4.0</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.2</version>
<version>20.4.0</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-starter-spring-boot-2-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.2</version>
<version>20.4.0</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.2</version>
<version>20.4.0</version>
</parent>

<artifactId>nakadi-producer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadiproducer.eventlog;

import java.util.Collection;
import java.util.Map;
import javax.transaction.Transactional;

import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator;
Expand Down Expand Up @@ -43,6 +44,24 @@ public interface EventLogWriter {
@Transactional
void fireCreateEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the creation of some resources (objects), see
* {@link #fireCreateEvent(String, String, Object) fireCreateEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireCreateEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a data change event about an update of some resource (object).
*
Expand All @@ -63,6 +82,24 @@ public interface EventLogWriter {
@Transactional
void fireUpdateEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the update of some resources (objects), see
* {@link #fireUpdateEvent(String, String, Object) fireUpdateEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireUpdateEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a data change event about the deletion of some resource (object).
*
Expand All @@ -84,6 +121,24 @@ public interface EventLogWriter {
@Transactional
void fireDeleteEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the deletion of some resources (objects), see
* {@link #fireDeleteEvent(String, String, Object) fireDeleteEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireDeleteEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a data change event with a snapshot of some resource (object).
* <p>
Expand Down Expand Up @@ -116,6 +171,24 @@ public interface EventLogWriter {
@Transactional
void fireSnapshotEvent(String eventType, String dataType, Object data);

/**
* Fires data change events, see {@link #fireSnapshotEvent(String, String, Object)
* fireSnapshotEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireSnapshotEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a business event, i.e. an event communicating the fact that some
* business process step happened. The payload object will be used as the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ public interface EventLogRepository {

void persist(EventLog eventLog);

void persist(Collection<EventLog> eventLogs);
default void persist(Collection<EventLog> eventLogs) {
for (EventLog eventLog : eventLogs) {
persist(eventLog);
}
}

void deleteAll();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import static org.zalando.nakadiproducer.eventlog.impl.EventDataOperation.UPDATE;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.zalando.nakadiproducer.flowid.FlowIdComponent;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
Expand Down Expand Up @@ -36,27 +36,51 @@ public void fireCreateEvent(final String eventType, final String dataType, final
eventLogRepository.persist(eventLog);
}

@Override
@Transactional
public void fireCreateEvents(final String eventType, final Map<String, Collection<Object>> dataTypeToData) {
eventLogRepository.persist(createEventLogs(eventType, CREATE, dataTypeToData));
}

@Override
@Transactional
public void fireUpdateEvent(final String eventType, final String dataType, final Object data) {
final EventLog eventLog = createEventLog(eventType, new DataChangeEventEnvelope(UPDATE.toString(), dataType, data));
eventLogRepository.persist(eventLog);
}

@Override
@Transactional
public void fireUpdateEvents(final String eventType, final Map<String, Collection<Object>> dataTypeToData) {
eventLogRepository.persist(createEventLogs(eventType, UPDATE, dataTypeToData));
}

@Override
@Transactional
public void fireDeleteEvent(final String eventType, final String dataType, final Object data) {
final EventLog eventLog = createEventLog(eventType, new DataChangeEventEnvelope(DELETE.toString(), dataType, data));
eventLogRepository.persist(eventLog);
}

@Override
@Transactional
public void fireDeleteEvents(final String eventType, final Map<String, Collection<Object>> dataTypeToData) {
eventLogRepository.persist(createEventLogs(eventType, DELETE, dataTypeToData));
}

@Override
@Transactional
public void fireSnapshotEvent(final String eventType, final String dataType, final Object data) {
final EventLog eventLog = createEventLog(eventType, new DataChangeEventEnvelope(SNAPSHOT.toString(), dataType, data));
eventLogRepository.persist(eventLog);
}

@Override
@Transactional
public void fireSnapshotEvents(final String eventType, final Map<String, Collection<Object>> dataTypeToData) {
eventLogRepository.persist(createEventLogs(eventType, SNAPSHOT, dataTypeToData));
}

@Override
@Transactional
public void fireBusinessEvent(final String eventType, Object payload) {
Expand Down Expand Up @@ -91,4 +115,19 @@ private EventLog createEventLog(final String eventType, final Object eventPayloa
return eventLog;
}

private Collection<EventLog> createEventLogs(
final String eventType,
final EventDataOperation eventDataOperation,
final Map<String, Collection<Object>> dataTypeToData
) {
return dataTypeToData.entrySet().stream()
.flatMap(entry -> entry.getValue()
.stream()
.map(data -> createEventLog(
eventType,
new DataChangeEventEnvelope(eventDataOperation.toString(), entry.getKey(), data)
)))
.collect(Collectors.toList());
}

}
Loading

0 comments on commit 908a03a

Please sign in to comment.