diff --git a/README.md b/README.md index 3a2a4d6d..a030765f 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,10 @@ If you do not use the STUPS Tokens library, you can implement token retrieval yo The typical use case for this library is to publish events like creating or updating of some objects. -In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java) service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`. +In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java) +service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`. + +To store events in bulk the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used. You normally don't need to call `fireSnapshotEvent` directly, see below for [snapshot creation](#event-snapshots-optional). @@ -203,7 +206,6 @@ For business events, you have just two parameters, the **eventType** and the eve You usually should fire those also in the same transaction as you are storing the results of the process step the event is reporting. - ### Event snapshots (optional) A Snapshot event is a special type of data change event (data operation) defined by Nakadi. diff --git a/nakadi-producer-loadtest/pom.xml b/nakadi-producer-loadtest/pom.xml index 8a874914..80d39fa1 100644 --- a/nakadi-producer-loadtest/pom.xml +++ b/nakadi-producer-loadtest/pom.xml @@ -11,7 +11,7 @@ org.zalando nakadi-producer-reactor - 20.4.0 + 21.0.0 diff --git a/nakadi-producer-spring-boot-starter/pom.xml b/nakadi-producer-spring-boot-starter/pom.xml index 0834e164..4ecb2bbb 100644 --- a/nakadi-producer-spring-boot-starter/pom.xml +++ b/nakadi-producer-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 20.4.0 + 21.0.0 nakadi-producer-spring-boot-starter diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java index bbb62b40..029da7ac 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java @@ -3,6 +3,7 @@ import java.sql.Timestamp; import java.time.Instant; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -10,7 +11,6 @@ import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; -import org.springframework.jdbc.support.GeneratedKeyHolder; public class EventLogRepositoryImpl implements EventLogRepository { @@ -71,27 +71,34 @@ public void delete(EventLog eventLog) { @Override public void persist(EventLog eventLog) { - Timestamp now = toSqlTimestamp(Instant.now()); - MapSqlParameterSource namedParameterMap = new MapSqlParameterSource(); - namedParameterMap.addValue("eventType", eventLog.getEventType()); - namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData()); - namedParameterMap.addValue("flowId", eventLog.getFlowId()); - namedParameterMap.addValue("created", now); - namedParameterMap.addValue("lastModified", now); - namedParameterMap.addValue("lockedBy", eventLog.getLockedBy()); - namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil()); - GeneratedKeyHolder generatedKeyHolder = new GeneratedKeyHolder(); - jdbcTemplate.update( - "INSERT INTO " + - " nakadi_events.event_log " + - " (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " + - "VALUES " + - " (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)", - namedParameterMap, - generatedKeyHolder - ); + persist(Collections.singleton(eventLog)); + } + + @Override + public void persist(Collection eventLogs) { + MapSqlParameterSource[] namedParameterMaps = eventLogs.stream() + .map(eventLog -> { + Timestamp now = toSqlTimestamp(Instant.now()); + MapSqlParameterSource namedParameterMap = new MapSqlParameterSource(); + namedParameterMap.addValue("eventType", eventLog.getEventType()); + namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData()); + namedParameterMap.addValue("flowId", eventLog.getFlowId()); + namedParameterMap.addValue("created", now); + namedParameterMap.addValue("lastModified", now); + namedParameterMap.addValue("lockedBy", eventLog.getLockedBy()); + namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil()); + return namedParameterMap; + }) + .toArray(MapSqlParameterSource[]::new); - eventLog.setId((Integer) generatedKeyHolder.getKeys().get("id")); + jdbcTemplate.batchUpdate( + "INSERT INTO " + + " nakadi_events.event_log " + + " (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " + + "VALUES " + + " (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)", + namedParameterMaps + ); } private Timestamp toSqlTimestamp(Instant now) { diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java index 845785e2..3aaa0740 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java @@ -8,6 +8,7 @@ import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT; @Transactional @@ -16,6 +17,9 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { @Autowired private EventLogRepositoryImpl eventLogRepository; + @Autowired + private JdbcTemplate jdbcTemplate; + private static final String WAREHOUSE_EVENT_BODY_DATA = ("{'self':'http://WAREHOUSE_DOMAIN'," + "'code':'WH-DE-EF'," @@ -34,21 +38,21 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { private final String WAREHOUSE_EVENT_TYPE = "wholesale.warehouse-change-event"; - private Integer id; - @Before - public void setUp() throws Exception { + public void setUp() { eventLogRepository.deleteAll(); final EventLog eventLog = EventLog.builder().eventBodyData(WAREHOUSE_EVENT_BODY_DATA) .eventType(WAREHOUSE_EVENT_TYPE) .flowId("FLOW_ID").build(); eventLogRepository.persist(eventLog); - id = eventLog.getId(); } @Test public void findEventRepositoryId() { + Integer id = jdbcTemplate.queryForObject( + "SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'", + Integer.class); final EventLog eventLog = eventLogRepository.findOne(id); compareWithPersistedEvent(eventLog); } diff --git a/nakadi-producer-starter-spring-boot-2-test/pom.xml b/nakadi-producer-starter-spring-boot-2-test/pom.xml index 8274d3ad..68d4d543 100644 --- a/nakadi-producer-starter-spring-boot-2-test/pom.xml +++ b/nakadi-producer-starter-spring-boot-2-test/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 20.4.0 + 21.0.0 diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml index c054c322..43d9668a 100644 --- a/nakadi-producer/pom.xml +++ b/nakadi-producer/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 20.4.0 + 21.0.0 nakadi-producer diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java index 23030616..d90c647c 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java @@ -1,5 +1,6 @@ package org.zalando.nakadiproducer.eventlog; +import java.util.Collection; import javax.transaction.Transactional; import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator; @@ -42,6 +43,27 @@ public interface EventLogWriter { @Transactional void fireCreateEvent(String eventType, String dataType, Object data); + /** + * Fires data change events about the creation of some resources (objects), see + * {@link #fireCreateEvent(String, String, Object) fireCreateEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataType + * the content of the {@code data_type} field of the Nakadi + * event + * + * @param data + * some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireCreateEvents(String eventType, String dataType, Collection data); + /** * Fires a data change event about an update of some resource (object). * @@ -62,6 +84,27 @@ public interface EventLogWriter { @Transactional void fireUpdateEvent(String eventType, String dataType, Object data); + /** + * Fires data change events about the update of some resources (objects), see + * {@link #fireUpdateEvent(String, String, Object) fireUpdateEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataType + * the content of the {@code data_type} field of the Nakadi + * event + * + * @param data + * some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireUpdateEvents(String eventType, String dataType, Collection data); + /** * Fires a data change event about the deletion of some resource (object). * @@ -83,6 +126,27 @@ public interface EventLogWriter { @Transactional void fireDeleteEvent(String eventType, String dataType, Object data); + /** + * Fires data change events about the deletion of some resources (objects), see + * {@link #fireDeleteEvent(String, String, Object) fireDeleteEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataType + * the content of the {@code data_type} field of the Nakadi + * event + * + * @param data + * some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireDeleteEvents(String eventType, String dataType, Collection data); + /** * Fires a data change event with a snapshot of some resource (object). *

@@ -115,6 +179,27 @@ public interface EventLogWriter { @Transactional void fireSnapshotEvent(String eventType, String dataType, Object data); + /** + * Fires data change events, see {@link #fireSnapshotEvent(String, String, Object) + * fireSnapshotEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataType + * the content of the {@code data_type} field of the Nakadi + * event + * + * @param data + * some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireSnapshotEvents(String eventType, String dataType, Collection data); + /** * Fires a business event, i.e. an event communicating the fact that some * business process step happened. The payload object will be used as the @@ -132,4 +217,19 @@ public interface EventLogWriter { */ @Transactional void fireBusinessEvent(String eventType, Object payload); + + /** + * Fires business events, see {@link #fireBusinessEvent(String, Object) fireBusinessEvent} for + * more details + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param payloads + * some POJOs that can be serialized into JSON (required + * parameter) + */ + @Transactional + void fireBusinessEvents(String eventType, Collection payloads); } diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java index 937ffdc3..1d3bcbd4 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java @@ -12,6 +12,12 @@ public interface EventLogRepository { void persist(EventLog eventLog); + default void persist(Collection eventLogs) { + for (EventLog eventLog : eventLogs) { + persist(eventLog); + } + } + void deleteAll(); EventLog findOne(Integer id); diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java index 38dbf80d..dc4519fa 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java @@ -1,10 +1,13 @@ package org.zalando.nakadiproducer.eventlog.impl; +import static java.util.stream.Collectors.toList; import static org.zalando.nakadiproducer.eventlog.impl.EventDataOperation.CREATE; import static org.zalando.nakadiproducer.eventlog.impl.EventDataOperation.DELETE; import static org.zalando.nakadiproducer.eventlog.impl.EventDataOperation.SNAPSHOT; import static org.zalando.nakadiproducer.eventlog.impl.EventDataOperation.UPDATE; +import java.util.Collection; + import org.zalando.nakadiproducer.flowid.FlowIdComponent; import org.zalando.nakadiproducer.eventlog.EventLogWriter; @@ -33,6 +36,12 @@ public void fireCreateEvent(final String eventType, final String dataType, final eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireCreateEvents(final String eventType, final String dataType, final Collection data) { + eventLogRepository.persist(createEventLogs(eventType, CREATE, dataType, data)); + } + @Override @Transactional public void fireUpdateEvent(final String eventType, final String dataType, final Object data) { @@ -40,6 +49,12 @@ public void fireUpdateEvent(final String eventType, final String dataType, final eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireUpdateEvents(final String eventType, final String dataType, final Collection data) { + eventLogRepository.persist(createEventLogs(eventType, UPDATE, dataType, data)); + } + @Override @Transactional public void fireDeleteEvent(final String eventType, final String dataType, final Object data) { @@ -47,6 +62,12 @@ public void fireDeleteEvent(final String eventType, final String dataType, final eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireDeleteEvents(final String eventType, final String dataType, final Collection data) { + eventLogRepository.persist(createEventLogs(eventType, DELETE, dataType, data)); + } + @Override @Transactional public void fireSnapshotEvent(final String eventType, final String dataType, final Object data) { @@ -54,6 +75,12 @@ public void fireSnapshotEvent(final String eventType, final String dataType, fin eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireSnapshotEvents(final String eventType, final String dataType, final Collection data) { + eventLogRepository.persist(createEventLogs(eventType, SNAPSHOT, dataType, data)); + } + @Override @Transactional public void fireBusinessEvent(final String eventType, Object payload) { @@ -61,6 +88,20 @@ public void fireBusinessEvent(final String eventType, Object payload) { eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireBusinessEvents(final String eventType, final Collection payload) { + final Collection eventLogs = createEventLogs(eventType, payload); + eventLogRepository.persist(eventLogs); + } + + private Collection createEventLogs(final String eventType, + final Collection eventPayloads) { + return eventPayloads.stream() + .map(payload -> createEventLog(eventType, payload)) + .collect(toList()); + } + private EventLog createEventLog(final String eventType, final Object eventPayload) { final EventLog eventLog = new EventLog(); eventLog.setEventType(eventType); @@ -74,4 +115,16 @@ private EventLog createEventLog(final String eventType, final Object eventPayloa return eventLog; } + private Collection createEventLogs( + final String eventType, + final EventDataOperation eventDataOperation, + final String dataType, + final Collection data + ) { + return data.stream() + .map(payload -> createEventLog(eventType, + new DataChangeEventEnvelope(eventDataOperation.toString(), dataType, payload))) + .collect(toList()); + } + } diff --git a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java index bc2a9377..c72db023 100644 --- a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java @@ -7,12 +7,21 @@ import static org.mockito.Mockito.when; import static org.zalando.nakadiproducer.util.Fixture.PUBLISHER_EVENT_TYPE; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.zalando.nakadiproducer.flowid.FlowIdComponent; import org.zalando.nakadiproducer.util.Fixture; @@ -32,69 +41,136 @@ public class EventLogWriterTest { @Captor private ArgumentCaptor eventLogCapture; + @Captor + private ArgumentCaptor> eventLogsCapture; + private EventLogWriterImpl eventLogWriter; - private MockPayload eventPayload; + private MockPayload eventPayload1; + private MockPayload eventPayload2; + private MockPayload eventPayload3; private static final String TRACE_ID = "TRACE_ID"; - private static final String EVENT_BODY_DATA = - ("{'id':1," - + "'code':'mockedcode'," - + "'more':{'info':'some info'}," - + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," - + "'active':true" - + "}").replace('\'', '"'); - - private static final String DATA_CHANGE_BODY_DATA = ("{'data_op':'{DATA_OP}','data_type':'nakadi:some-publisher','data':" + EVENT_BODY_DATA + "}").replace('\'', '"'); - private static final String PUBLISHER_DATA_TYPE = "nakadi:some-publisher"; + private static final String EVENT_BODY_DATA_1 = + ("{'id':1," + + "'code':'mockedcode1'," + + "'more':{'info':'some info'}," + + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," + + "'active':true" + + "}").replace('\'', '"'); + + private static final String EVENT_BODY_DATA_2 = + ("{'id':2," + + "'code':'mockedcode2'," + + "'more':{'info':'some info'}," + + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," + + "'active':true" + + "}").replace('\'', '"'); + + private static final String EVENT_BODY_DATA_3 = + ("{'id':3," + + "'code':'mockedcode3'," + + "'more':{'info':'some info'}," + + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," + + "'active':true" + + "}").replace('\'', '"'); + + private static final String PUBLISHER_DATA_TYPE_1 = "nakadi:some-publisher"; + + private static final String DATA_CHANGE_BODY_DATA_1 = ("{'data_op':'{DATA_OP}','data_type':'" + + PUBLISHER_DATA_TYPE_1 + "','data':" + EVENT_BODY_DATA_1 + + "}").replace('\'', '"'); + + private static final String DATA_CHANGE_BODY_DATA_2 = ("{'data_op':'{DATA_OP}','data_type':'" + + PUBLISHER_DATA_TYPE_1 + "','data':" + EVENT_BODY_DATA_2 + + "}").replace('\'', '"'); + + private static final String DATA_CHANGE_BODY_DATA_3 = ("{'data_op':'{DATA_OP}','data_type':'" + + PUBLISHER_DATA_TYPE_1 + "','data':" + EVENT_BODY_DATA_3 + + "}").replace('\'', '"'); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Before - public void setUp() throws Exception { - eventPayload = Fixture.mockPayload(1, "mockedcode", true, - Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + public void setUp() { + Mockito.reset(eventLogRepository, flowIdComponent); + + eventPayload1 = Fixture.mockPayload(1, "mockedcode1", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + + eventPayload2 = Fixture.mockPayload(2, "mockedcode2", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + + eventPayload3 = Fixture.mockPayload(3, "mockedcode3", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); when(flowIdComponent.getXFlowIdValue()).thenReturn(TRACE_ID); - eventLogWriter = new EventLogWriterImpl(eventLogRepository, new ObjectMapper(), flowIdComponent); + eventLogWriter = new EventLogWriterImpl(eventLogRepository, new ObjectMapper(), + flowIdComponent); } @Test - public void testFireCreateEvent() throws Exception { - eventLogWriter.fireCreateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); + public void testFireCreateEvent() { + eventLogWriter.fireCreateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, eventPayload1); + verify(eventLogRepository).persist(eventLogCapture.capture()); - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "C"))); + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "C"))); assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); - } @Test - public void testFireUpdateEvent() throws Exception { + public void testFireCreateEvents() { + eventLogWriter.fireCreateEvents( + PUBLISHER_EVENT_TYPE, + PUBLISHER_DATA_TYPE_1, + Arrays.asList(eventPayload1, eventPayload2, eventPayload3) + ); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("C", new HashSet<>(eventLogsCapture.getValue())); + } - eventLogWriter.fireUpdateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); + @Test + public void testFireUpdateEvent() { + eventLogWriter.fireUpdateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, eventPayload1); verify(eventLogRepository).persist(eventLogCapture.capture()); - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "U"))); + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "U"))); assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); - } @Test - public void testFireDeleteEvent() throws Exception { + public void testFireUpdateEvents() { + eventLogWriter.fireUpdateEvents( + PUBLISHER_EVENT_TYPE, + PUBLISHER_DATA_TYPE_1, + Arrays.asList(eventPayload1, eventPayload2, eventPayload3) + ); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("U", new HashSet<>(eventLogsCapture.getValue())); + } - eventLogWriter.fireDeleteEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); + @Test + public void testFireDeleteEvent() { + eventLogWriter.fireDeleteEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, eventPayload1); verify(eventLogRepository).persist(eventLogCapture.capture()); - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "D"))); + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "D"))); assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); @@ -102,33 +178,118 @@ public void testFireDeleteEvent() throws Exception { } @Test - public void testFireSnapshotEvent() throws Exception { + public void testFireDeleteEvents() { + eventLogWriter.fireDeleteEvents( + PUBLISHER_EVENT_TYPE, + PUBLISHER_DATA_TYPE_1, + Arrays.asList(eventPayload1, eventPayload2, eventPayload3)); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("D", new HashSet<>(eventLogsCapture.getValue())); + } - eventLogWriter.fireSnapshotEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); + @Test + public void testFireSnapshotEvent() throws Exception { + eventLogWriter.fireSnapshotEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, + eventPayload1); verify(eventLogRepository).persist(eventLogCapture.capture()); - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "S"))); + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "S"))); assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); } + @Test + public void testFireSnapshotEvents() { + eventLogWriter.fireSnapshotEvents( + PUBLISHER_EVENT_TYPE, + PUBLISHER_DATA_TYPE_1, + Arrays.asList(eventPayload1, eventPayload2, eventPayload3)); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("S", new HashSet<>(eventLogsCapture.getValue())); + } + @Test public void testFireBusinessEvent() throws Exception { - MockPayload mockPayload = Fixture.mockPayload(1, "mockedcode", true, - Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + MockPayload mockPayload = Fixture.mockPayload(1, "mockedcode1", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); eventLogWriter.fireBusinessEvent(PUBLISHER_EVENT_TYPE, mockPayload); verify(eventLogRepository).persist(eventLogCapture.capture()); - assertThat(eventLogCapture.getValue().getEventBodyData(), is(EVENT_BODY_DATA)); + assertThat(eventLogCapture.getValue().getEventBodyData(), is(EVENT_BODY_DATA_1)); assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); } + @Test + public void testFireBusinessEvents() throws Exception { + MockPayload mockPayload1 = Fixture.mockPayload(1, "mockedcode1", true, + Fixture.mockSubClass("some info 1_0"), Fixture.mockSubList(2, "some detail 1_2")); + MockPayload mockPayload2 = Fixture.mockPayload(2, "mockedcode2", true, + Fixture.mockSubClass("some info 2_0"), Fixture.mockSubList(2, "some detail 2_1")); + + eventLogWriter.fireBusinessEvents(PUBLISHER_EVENT_TYPE, + Stream.of(mockPayload1, mockPayload2).collect(Collectors.toList())); + + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + Iterator eventLogIterator = eventLogsCapture.getValue().iterator(); + EventLog eventLog1 = eventLogIterator.next(); + assertThat(eventLog1.getEventBodyData(), + is(OBJECT_MAPPER.writeValueAsString(mockPayload1))); + assertThat(eventLog1.getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLog1.getFlowId(), is(TRACE_ID)); + assertThat(eventLog1.getLockedBy(), is(nullValue())); + assertThat(eventLog1.getLockedUntil(), is(nullValue())); + + EventLog eventLog2 = eventLogIterator.next(); + assertThat(eventLog2.getEventBodyData(), + is(OBJECT_MAPPER.writeValueAsString(mockPayload2))); + assertThat(eventLog2.getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLog2.getFlowId(), is(TRACE_ID)); + assertThat(eventLog2.getLockedBy(), is(nullValue())); + assertThat(eventLog2.getLockedUntil(), is(nullValue())); + } + + private void verifyEventLogs(String dataOp, Set eventLogs) { + Optional firstEventLog = eventLogs.stream().filter( + eventLog -> eventLog.getEventBodyData().contains("mockedcode1")).findFirst(); + assertThat(firstEventLog.isPresent(), is(true)); + assertThat(firstEventLog.get().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", dataOp))); + assertThat(firstEventLog.get().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(firstEventLog.get().getFlowId(), is(TRACE_ID)); + assertThat(firstEventLog.get().getLockedBy(), is(nullValue())); + assertThat(firstEventLog.get().getLockedUntil(), is(nullValue())); + + Optional secondEventLog = eventLogs.stream().filter( + eventLog -> eventLog.getEventBodyData().contains("mockedcode2")).findFirst(); + assertThat(secondEventLog.isPresent(), is(true)); + assertThat(secondEventLog.get().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_2.replace("{DATA_OP}", dataOp))); + assertThat(secondEventLog.get().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(secondEventLog.get().getFlowId(), is(TRACE_ID)); + assertThat(secondEventLog.get().getLockedBy(), is(nullValue())); + assertThat(secondEventLog.get().getLockedUntil(), is(nullValue())); + + Optional thirdEventLog = eventLogs.stream().filter( + eventLog -> eventLog.getEventBodyData().contains("mockedcode3")).findFirst(); + assertThat(thirdEventLog.isPresent(), is(true)); + assertThat(thirdEventLog.get().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_3.replace("{DATA_OP}", dataOp))); + assertThat(thirdEventLog.get().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(thirdEventLog.get().getFlowId(), is(TRACE_ID)); + assertThat(thirdEventLog.get().getLockedBy(), is(nullValue())); + assertThat(thirdEventLog.get().getLockedUntil(), is(nullValue())); + } + } diff --git a/pom.xml b/pom.xml index 38af18ee..9f301ec0 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ nakadi-producer-reactor org.zalando - 20.4.0 + 21.0.0 pom Nakadi Event Producer Reactor