Skip to content

Commit

Permalink
Merge pull request #141 from fbrns/issues/140-persist-multiple-events…
Browse files Browse the repository at this point in the history
…-in-batch

#140 Persist multiple events in batch
  • Loading branch information
ePaul authored Nov 29, 2022
2 parents 304b72a + 4163a01 commit 7c6b572
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 64 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ If you do not use the STUPS Tokens library, you can implement token retrieval yo

The typical use case for this library is to publish events like creating or updating of some objects.

In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java) service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.
In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java)
service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.

To store events in bulk the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used.

You normally don't need to call `fireSnapshotEvent` directly, see below for [snapshot creation](#event-snapshots-optional).

Expand Down Expand Up @@ -203,7 +206,6 @@ For business events, you have just two parameters, the **eventType** and the eve
You usually should fire those also in the same transaction as you are storing the results of the
process step the event is reporting.


### Event snapshots (optional)

A Snapshot event is a special type of data change event (data operation) defined by Nakadi.
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-loadtest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.4.0</version>
<version>21.0.0</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.4.0</version>
<version>21.0.0</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;

public class EventLogRepositoryImpl implements EventLogRepository {

Expand Down Expand Up @@ -71,27 +71,34 @@ public void delete(EventLog eventLog) {

@Override
public void persist(EventLog eventLog) {
Timestamp now = toSqlTimestamp(Instant.now());
MapSqlParameterSource namedParameterMap = new MapSqlParameterSource();
namedParameterMap.addValue("eventType", eventLog.getEventType());
namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData());
namedParameterMap.addValue("flowId", eventLog.getFlowId());
namedParameterMap.addValue("created", now);
namedParameterMap.addValue("lastModified", now);
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
GeneratedKeyHolder generatedKeyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)",
namedParameterMap,
generatedKeyHolder
);
persist(Collections.singleton(eventLog));
}

@Override
public void persist(Collection<EventLog> eventLogs) {
MapSqlParameterSource[] namedParameterMaps = eventLogs.stream()
.map(eventLog -> {
Timestamp now = toSqlTimestamp(Instant.now());
MapSqlParameterSource namedParameterMap = new MapSqlParameterSource();
namedParameterMap.addValue("eventType", eventLog.getEventType());
namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData());
namedParameterMap.addValue("flowId", eventLog.getFlowId());
namedParameterMap.addValue("created", now);
namedParameterMap.addValue("lastModified", now);
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
return namedParameterMap;
})
.toArray(MapSqlParameterSource[]::new);

eventLog.setId((Integer) generatedKeyHolder.getKeys().get("id"));
jdbcTemplate.batchUpdate(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)",
namedParameterMaps
);
}

private Timestamp toSqlTimestamp(Instant now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT;

@Transactional
Expand All @@ -16,6 +17,9 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
@Autowired
private EventLogRepositoryImpl eventLogRepository;

@Autowired
private JdbcTemplate jdbcTemplate;

private static final String WAREHOUSE_EVENT_BODY_DATA =
("{'self':'http://WAREHOUSE_DOMAIN',"
+ "'code':'WH-DE-EF',"
Expand All @@ -34,21 +38,21 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {

private final String WAREHOUSE_EVENT_TYPE = "wholesale.warehouse-change-event";

private Integer id;

@Before
public void setUp() throws Exception {
public void setUp() {
eventLogRepository.deleteAll();

final EventLog eventLog = EventLog.builder().eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
.eventType(WAREHOUSE_EVENT_TYPE)
.flowId("FLOW_ID").build();
eventLogRepository.persist(eventLog);
id = eventLog.getId();
}

@Test
public void findEventRepositoryId() {
Integer id = jdbcTemplate.queryForObject(
"SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'",
Integer.class);
final EventLog eventLog = eventLogRepository.findOne(id);
compareWithPersistedEvent(eventLog);
}
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-starter-spring-boot-2-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.4.0</version>
<version>21.0.0</version>
</parent>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.4.0</version>
<version>21.0.0</version>
</parent>

<artifactId>nakadi-producer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadiproducer.eventlog;

import java.util.Collection;
import javax.transaction.Transactional;

import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator;
Expand Down Expand Up @@ -42,6 +43,27 @@ public interface EventLogWriter {
@Transactional
void fireCreateEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the creation of some resources (objects), see
* {@link #fireCreateEvent(String, String, Object) fireCreateEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataType
* the content of the {@code data_type} field of the Nakadi
* event
*
* @param data
* some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireCreateEvents(String eventType, String dataType, Collection<?> data);

/**
* Fires a data change event about an update of some resource (object).
*
Expand All @@ -62,6 +84,27 @@ public interface EventLogWriter {
@Transactional
void fireUpdateEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the update of some resources (objects), see
* {@link #fireUpdateEvent(String, String, Object) fireUpdateEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataType
* the content of the {@code data_type} field of the Nakadi
* event
*
* @param data
* some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireUpdateEvents(String eventType, String dataType, Collection<?> data);

/**
* Fires a data change event about the deletion of some resource (object).
*
Expand All @@ -83,6 +126,27 @@ public interface EventLogWriter {
@Transactional
void fireDeleteEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the deletion of some resources (objects), see
* {@link #fireDeleteEvent(String, String, Object) fireDeleteEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataType
* the content of the {@code data_type} field of the Nakadi
* event
*
* @param data
* some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireDeleteEvents(String eventType, String dataType, Collection<?> data);

/**
* Fires a data change event with a snapshot of some resource (object).
* <p>
Expand Down Expand Up @@ -115,6 +179,27 @@ public interface EventLogWriter {
@Transactional
void fireSnapshotEvent(String eventType, String dataType, Object data);

/**
* Fires data change events, see {@link #fireSnapshotEvent(String, String, Object)
* fireSnapshotEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataType
* the content of the {@code data_type} field of the Nakadi
* event
*
* @param data
* some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireSnapshotEvents(String eventType, String dataType, Collection<?> data);

/**
* Fires a business event, i.e. an event communicating the fact that some
* business process step happened. The payload object will be used as the
Expand All @@ -132,4 +217,19 @@ public interface EventLogWriter {
*/
@Transactional
void fireBusinessEvent(String eventType, Object payload);

/**
* Fires business events, see {@link #fireBusinessEvent(String, Object) fireBusinessEvent} for
* more details
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param payloads
* some POJOs that can be serialized into JSON (required
* parameter)
*/
@Transactional
void fireBusinessEvents(String eventType, Collection<Object> payloads);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ public interface EventLogRepository {

void persist(EventLog eventLog);

default void persist(Collection<EventLog> eventLogs) {
for (EventLog eventLog : eventLogs) {
persist(eventLog);
}
}

void deleteAll();

EventLog findOne(Integer id);
Expand Down
Loading

0 comments on commit 7c6b572

Please sign in to comment.