From 908a03a3de60f22cbc427295c04c4a1a185f2770 Mon Sep 17 00:00:00 2001 From: Florian Brons Date: Fri, 12 Nov 2021 16:57:05 +0100 Subject: [PATCH] #140 Persist multiple events in batch --- README.md | 33 +- nakadi-producer-loadtest/pom.xml | 2 +- nakadi-producer-spring-boot-starter/pom.xml | 2 +- .../pom.xml | 2 +- nakadi-producer/pom.xml | 2 +- .../eventlog/EventLogWriter.java | 73 ++++ .../eventlog/impl/EventLogRepository.java | 6 +- .../eventlog/impl/EventLogWriterImpl.java | 41 +- .../eventlog/impl/EventLogWriterTest.java | 373 ++++++++++++------ pom.xml | 2 +- 10 files changed, 404 insertions(+), 132 deletions(-) diff --git a/README.md b/README.md index 530bf112..e6d483f6 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,10 @@ If you do not use the STUPS Tokens library, you can implement token retrieval yo The typical use case for this library is to publish events like creating or updating of some objects. -In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java) service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`. +In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java) +service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`. + +To store events in bulk the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used. You normally don't need to call `fireSnapshotEvent` directly, see below for [snapshot creation](#event-snapshots-optional). @@ -203,6 +206,34 @@ For business events, you have just two parameters, the **eventType** and the eve You usually should fire those also in the same transaction as you are storing the results of the process step the event is reporting. +Example of using `fireCreateEvents`: + +```java +@Service +public class SomeYourService { + + @Autowired + private EventLogWriter eventLogWriter; + + @Autowired + private WarehouseRepository repository; + + @Transactional + public void createObjects(Collections data) { + + // here we store an object in a database table + repository.saveAll(data); + + // then we group the data by dataType + Map> groupedData = Map.of("wholesale:warehouse", data); + + // and then in the same transaction we save the events about the object creation + eventLogWriter.fireCreateEvents("wholesale.warehouse-change-event", groupedData); + } +} +``` + + ### Event snapshots (optional) diff --git a/nakadi-producer-loadtest/pom.xml b/nakadi-producer-loadtest/pom.xml index a33fc775..d565455d 100644 --- a/nakadi-producer-loadtest/pom.xml +++ b/nakadi-producer-loadtest/pom.xml @@ -11,7 +11,7 @@ org.zalando nakadi-producer-reactor - 20.3.2 + 20.4.0 diff --git a/nakadi-producer-spring-boot-starter/pom.xml b/nakadi-producer-spring-boot-starter/pom.xml index a79813f0..2922cebf 100644 --- a/nakadi-producer-spring-boot-starter/pom.xml +++ b/nakadi-producer-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 20.3.2 + 20.4.0 nakadi-producer-spring-boot-starter diff --git a/nakadi-producer-starter-spring-boot-2-test/pom.xml b/nakadi-producer-starter-spring-boot-2-test/pom.xml index 9a046c98..8274d3ad 100644 --- a/nakadi-producer-starter-spring-boot-2-test/pom.xml +++ b/nakadi-producer-starter-spring-boot-2-test/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 20.3.2 + 20.4.0 diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml index fbe6acfc..c054c322 100644 --- a/nakadi-producer/pom.xml +++ b/nakadi-producer/pom.xml @@ -10,7 +10,7 @@ org.zalando nakadi-producer-reactor - 20.3.2 + 20.4.0 nakadi-producer diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java index e1b7c2f1..adbdc5f2 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java @@ -1,6 +1,7 @@ package org.zalando.nakadiproducer.eventlog; import java.util.Collection; +import java.util.Map; import javax.transaction.Transactional; import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator; @@ -43,6 +44,24 @@ public interface EventLogWriter { @Transactional void fireCreateEvent(String eventType, String dataType, Object data); + /** + * Fires data change events about the creation of some resources (objects), see + * {@link #fireCreateEvent(String, String, Object) fireCreateEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataTypeToData + * the content of the {@code data_type} field of the Nakadi + * event mapped to some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireCreateEvents(String eventType, Map> dataTypeToData); + /** * Fires a data change event about an update of some resource (object). * @@ -63,6 +82,24 @@ public interface EventLogWriter { @Transactional void fireUpdateEvent(String eventType, String dataType, Object data); + /** + * Fires data change events about the update of some resources (objects), see + * {@link #fireUpdateEvent(String, String, Object) fireUpdateEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataTypeToData + * the content of the {@code data_type} field of the Nakadi + * event mapped to some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireUpdateEvents(String eventType, Map> dataTypeToData); + /** * Fires a data change event about the deletion of some resource (object). * @@ -84,6 +121,24 @@ public interface EventLogWriter { @Transactional void fireDeleteEvent(String eventType, String dataType, Object data); + /** + * Fires data change events about the deletion of some resources (objects), see + * {@link #fireDeleteEvent(String, String, Object) fireDeleteEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataTypeToData + * the content of the {@code data_type} field of the Nakadi + * event mapped to some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireDeleteEvents(String eventType, Map> dataTypeToData); + /** * Fires a data change event with a snapshot of some resource (object). *

@@ -116,6 +171,24 @@ public interface EventLogWriter { @Transactional void fireSnapshotEvent(String eventType, String dataType, Object data); + /** + * Fires data change events, see {@link #fireSnapshotEvent(String, String, Object) + * fireSnapshotEvent} for more details. + * + * @param eventType + * the Nakadi event type of the event. This is roughly equivalent + * to an event channel or topic. + * + * @param dataTypeToData + * the content of the {@code data_type} field of the Nakadi + * event mapped to some POJOs that can be serialized into JSON (required + * parameter). This is meant to be a representation of the + * current state of the resource. It will be used as content of + * the {@code data} field of the Nakadi event. + */ + @Transactional + void fireSnapshotEvents(String eventType, Map> dataTypeToData); + /** * Fires a business event, i.e. an event communicating the fact that some * business process step happened. The payload object will be used as the diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java index aa0f0725..4ae6011e 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java @@ -12,7 +12,11 @@ public interface EventLogRepository { void persist(EventLog eventLog); - void persist(Collection eventLogs); + default void persist(Collection eventLogs) { + for (EventLog eventLog : eventLogs) { + persist(eventLog); + } + } void deleteAll(); diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java index b7c19aa8..d8ccbffc 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java @@ -6,7 +6,7 @@ import static org.zalando.nakadiproducer.eventlog.impl.EventDataOperation.UPDATE; import java.util.Collection; -import java.util.Collections; +import java.util.Map; import java.util.stream.Collectors; import org.zalando.nakadiproducer.flowid.FlowIdComponent; import org.zalando.nakadiproducer.eventlog.EventLogWriter; @@ -36,6 +36,12 @@ public void fireCreateEvent(final String eventType, final String dataType, final eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireCreateEvents(final String eventType, final Map> dataTypeToData) { + eventLogRepository.persist(createEventLogs(eventType, CREATE, dataTypeToData)); + } + @Override @Transactional public void fireUpdateEvent(final String eventType, final String dataType, final Object data) { @@ -43,6 +49,12 @@ public void fireUpdateEvent(final String eventType, final String dataType, final eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireUpdateEvents(final String eventType, final Map> dataTypeToData) { + eventLogRepository.persist(createEventLogs(eventType, UPDATE, dataTypeToData)); + } + @Override @Transactional public void fireDeleteEvent(final String eventType, final String dataType, final Object data) { @@ -50,6 +62,12 @@ public void fireDeleteEvent(final String eventType, final String dataType, final eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireDeleteEvents(final String eventType, final Map> dataTypeToData) { + eventLogRepository.persist(createEventLogs(eventType, DELETE, dataTypeToData)); + } + @Override @Transactional public void fireSnapshotEvent(final String eventType, final String dataType, final Object data) { @@ -57,6 +75,12 @@ public void fireSnapshotEvent(final String eventType, final String dataType, fin eventLogRepository.persist(eventLog); } + @Override + @Transactional + public void fireSnapshotEvents(final String eventType, final Map> dataTypeToData) { + eventLogRepository.persist(createEventLogs(eventType, SNAPSHOT, dataTypeToData)); + } + @Override @Transactional public void fireBusinessEvent(final String eventType, Object payload) { @@ -91,4 +115,19 @@ private EventLog createEventLog(final String eventType, final Object eventPayloa return eventLog; } + private Collection createEventLogs( + final String eventType, + final EventDataOperation eventDataOperation, + final Map> dataTypeToData + ) { + return dataTypeToData.entrySet().stream() + .flatMap(entry -> entry.getValue() + .stream() + .map(data -> createEventLog( + eventType, + new DataChangeEventEnvelope(eventDataOperation.toString(), entry.getKey(), data) + ))) + .collect(Collectors.toList()); + } + } diff --git a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java index 35de91a8..97391cd5 100644 --- a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java @@ -7,8 +7,15 @@ import static org.mockito.Mockito.when; import static org.zalando.nakadiproducer.util.Fixture.PUBLISHER_EVENT_TYPE; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.Before; @@ -27,144 +34,262 @@ @RunWith(MockitoJUnitRunner.class) public class EventLogWriterTest { - @Mock - private EventLogRepository eventLogRepository; + @Mock + private EventLogRepository eventLogRepository; - @Mock - private FlowIdComponent flowIdComponent; + @Mock + private FlowIdComponent flowIdComponent; - @Captor - private ArgumentCaptor eventLogCapture; + @Captor + private ArgumentCaptor eventLogCapture; - @Captor - private ArgumentCaptor> eventLogCaptures; + @Captor + private ArgumentCaptor> eventLogsCapture; - private EventLogWriterImpl eventLogWriter; + private EventLogWriterImpl eventLogWriter; - private MockPayload eventPayload; + private MockPayload eventPayload1; + private MockPayload eventPayload2; + private MockPayload eventPayload3; - private static final String TRACE_ID = "TRACE_ID"; + private static final String TRACE_ID = "TRACE_ID"; - private static final String EVENT_BODY_DATA = - ("{'id':1," - + "'code':'mockedcode'," - + "'more':{'info':'some info'}," - + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," - + "'active':true" - + "}").replace('\'', '"'); + private static final String EVENT_BODY_DATA_1 = + ("{'id':1," + + "'code':'mockedcode1'," + + "'more':{'info':'some info'}," + + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," + + "'active':true" + + "}").replace('\'', '"'); - private static final String DATA_CHANGE_BODY_DATA = ("{'data_op':'{DATA_OP}','data_type':'nakadi:some-publisher','data':" + EVENT_BODY_DATA + "}").replace('\'', '"'); - private static final String PUBLISHER_DATA_TYPE = "nakadi:some-publisher"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String EVENT_BODY_DATA_2 = + ("{'id':2," + + "'code':'mockedcode2'," + + "'more':{'info':'some info'}," + + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," + + "'active':true" + + "}").replace('\'', '"'); - @Before - public void setUp() throws Exception { - eventPayload = Fixture.mockPayload(1, "mockedcode", true, - Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + private static final String EVENT_BODY_DATA_3 = + ("{'id':3," + + "'code':'mockedcode3'," + + "'more':{'info':'some info'}," + + "'items':[{'detail':'some detail0'},{'detail':'some detail1'}]," + + "'active':true" + + "}").replace('\'', '"'); - when(flowIdComponent.getXFlowIdValue()).thenReturn(TRACE_ID); + private static final String PUBLISHER_DATA_TYPE_1 = "nakadi:some-publisher"; + private static final String PUBLISHER_DATA_TYPE_2 = "nakadi:some-publisher2"; + + private static final String DATA_CHANGE_BODY_DATA_1 = ("{'data_op':'{DATA_OP}','data_type':'" + + PUBLISHER_DATA_TYPE_1 + "','data':" + EVENT_BODY_DATA_1 + + "}").replace('\'', '"'); + + private static final String DATA_CHANGE_BODY_DATA_2 = ("{'data_op':'{DATA_OP}','data_type':'" + + PUBLISHER_DATA_TYPE_1 + "','data':" + EVENT_BODY_DATA_2 + + "}").replace('\'', '"'); + + private static final String DATA_CHANGE_BODY_DATA_3 = ("{'data_op':'{DATA_OP}','data_type':'" + + PUBLISHER_DATA_TYPE_2 + "','data':" + EVENT_BODY_DATA_3 + + "}").replace('\'', '"'); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - eventLogWriter = new EventLogWriterImpl(eventLogRepository, new ObjectMapper(), flowIdComponent); - } + @Before + public void setUp() { + eventPayload1 = Fixture.mockPayload(1, "mockedcode1", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); - @Test - public void testFireCreateEvent() throws Exception { - eventLogWriter.fireCreateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); - verify(eventLogRepository).persist(eventLogCapture.capture()); + eventPayload2 = Fixture.mockPayload(2, "mockedcode2", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + + eventPayload3 = Fixture.mockPayload(3, "mockedcode3", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + + when(flowIdComponent.getXFlowIdValue()).thenReturn(TRACE_ID); + + eventLogWriter = new EventLogWriterImpl(eventLogRepository, new ObjectMapper(), + flowIdComponent); + } + + @Test + public void testFireCreateEvent() { + eventLogWriter.fireCreateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, eventPayload1); - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "C"))); - assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); - assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); - assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); - assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); + verify(eventLogRepository).persist(eventLogCapture.capture()); + + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "C"))); + assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); + assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); + assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); + } - } + @Test + public void testFireCreateEvents() { + Map> groupedData = new HashMap<>(); + groupedData.put(PUBLISHER_DATA_TYPE_1, Arrays.asList(eventPayload1, eventPayload2)); + groupedData.put(PUBLISHER_DATA_TYPE_2, Collections.singletonList(eventPayload3)); - @Test - public void testFireUpdateEvent() throws Exception { - - eventLogWriter.fireUpdateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); - - verify(eventLogRepository).persist(eventLogCapture.capture()); - - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "U"))); - assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); - assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); - assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); - assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); - - } - - @Test - public void testFireDeleteEvent() throws Exception { - - eventLogWriter.fireDeleteEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); - - verify(eventLogRepository).persist(eventLogCapture.capture()); - - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "D"))); - assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); - assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); - assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); - assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); - } - - @Test - public void testFireSnapshotEvent() throws Exception { - - eventLogWriter.fireSnapshotEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE, eventPayload); - - verify(eventLogRepository).persist(eventLogCapture.capture()); - - assertThat(eventLogCapture.getValue().getEventBodyData(), is(DATA_CHANGE_BODY_DATA.replace("{DATA_OP}", "S"))); - assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); - assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); - assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); - assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); - } - - @Test - public void testFireBusinessEvent() throws Exception { - MockPayload mockPayload = Fixture.mockPayload(1, "mockedcode", true, - Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); - - eventLogWriter.fireBusinessEvent(PUBLISHER_EVENT_TYPE, mockPayload); - - verify(eventLogRepository).persist(eventLogCapture.capture()); - - assertThat(eventLogCapture.getValue().getEventBodyData(), is(EVENT_BODY_DATA)); - assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); - assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); - assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); - assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); - } - - @Test - public void testFireBusinessEvents() throws Exception { - MockPayload mockPayload1 = Fixture.mockPayload(1, "mockedcode1", true, - Fixture.mockSubClass("some info 1_0"), Fixture.mockSubList(2, "some detail 1_2")); - MockPayload mockPayload2 = Fixture.mockPayload(2, "mockedcode2", true, - Fixture.mockSubClass("some info 2_0"), Fixture.mockSubList(2, "some detail 2_1")); - - eventLogWriter.fireBusinessEvents(PUBLISHER_EVENT_TYPE, - Stream.of(mockPayload1, mockPayload2).collect(Collectors.toList())); - - verify(eventLogRepository).persist(eventLogCaptures.capture()); - - Iterator eventLogIterator = eventLogCaptures.getValue().iterator(); - EventLog eventLog1 = eventLogIterator.next(); - assertThat(eventLog1.getEventBodyData(), is(OBJECT_MAPPER.writeValueAsString(mockPayload1))); - assertThat(eventLog1.getEventType(), is(PUBLISHER_EVENT_TYPE)); - assertThat(eventLog1.getFlowId(), is(TRACE_ID)); - assertThat(eventLog1.getLockedBy(), is(nullValue())); - assertThat(eventLog1.getLockedUntil(), is(nullValue())); - - EventLog eventLog2 = eventLogIterator.next(); - assertThat(eventLog2.getEventBodyData(), is(OBJECT_MAPPER.writeValueAsString(mockPayload2))); - assertThat(eventLog2.getEventType(), is(PUBLISHER_EVENT_TYPE)); - assertThat(eventLog2.getFlowId(), is(TRACE_ID)); - assertThat(eventLog2.getLockedBy(), is(nullValue())); - assertThat(eventLog2.getLockedUntil(), is(nullValue())); - } + eventLogWriter.fireCreateEvents(PUBLISHER_EVENT_TYPE, groupedData); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("C", new HashSet<>(eventLogsCapture.getValue())); + } + + @Test + public void testFireUpdateEvent() { + eventLogWriter.fireUpdateEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, eventPayload1); + + verify(eventLogRepository).persist(eventLogCapture.capture()); + + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "U"))); + assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); + assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); + assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); + } + + @Test + public void testFireUpdateEvents() { + Map> groupedData = new HashMap<>(); + groupedData.put(PUBLISHER_DATA_TYPE_1, Arrays.asList(eventPayload1, eventPayload2)); + groupedData.put(PUBLISHER_DATA_TYPE_2, Collections.singletonList(eventPayload3)); + + eventLogWriter.fireUpdateEvents(PUBLISHER_EVENT_TYPE, groupedData); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("U", new HashSet<>(eventLogsCapture.getValue())); + } + + @Test + public void testFireDeleteEvent() { + eventLogWriter.fireDeleteEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, eventPayload1); + + verify(eventLogRepository).persist(eventLogCapture.capture()); + + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "D"))); + assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); + assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); + assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); + } + + @Test + public void testFireDeleteEvents() { + Map> groupedData = new HashMap<>(); + groupedData.put(PUBLISHER_DATA_TYPE_1, Arrays.asList(eventPayload1, eventPayload2)); + groupedData.put(PUBLISHER_DATA_TYPE_2, Collections.singletonList(eventPayload3)); + + eventLogWriter.fireDeleteEvents(PUBLISHER_EVENT_TYPE, groupedData); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("D", new HashSet<>(eventLogsCapture.getValue())); + } + + @Test + public void testFireSnapshotEvent() throws Exception { + eventLogWriter.fireSnapshotEvent(PUBLISHER_EVENT_TYPE, PUBLISHER_DATA_TYPE_1, eventPayload1); + + verify(eventLogRepository).persist(eventLogCapture.capture()); + + assertThat(eventLogCapture.getValue().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", "S"))); + assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); + assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); + assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); + } + + @Test + public void testFireSnapshotEvents() { + Map> groupedData = new HashMap<>(); + groupedData.put(PUBLISHER_DATA_TYPE_1, Arrays.asList(eventPayload1, eventPayload2)); + groupedData.put(PUBLISHER_DATA_TYPE_2, Collections.singletonList(eventPayload3)); + + eventLogWriter.fireSnapshotEvents(PUBLISHER_EVENT_TYPE, groupedData); + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + verifyEventLogs("S", new HashSet<>(eventLogsCapture.getValue())); + } + + @Test + public void testFireBusinessEvent() throws Exception { + MockPayload mockPayload = Fixture.mockPayload(1, "mockedcode1", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + + eventLogWriter.fireBusinessEvent(PUBLISHER_EVENT_TYPE, mockPayload); + + verify(eventLogRepository).persist(eventLogCapture.capture()); + + assertThat(eventLogCapture.getValue().getEventBodyData(), is(EVENT_BODY_DATA_1)); + assertThat(eventLogCapture.getValue().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLogCapture.getValue().getFlowId(), is(TRACE_ID)); + assertThat(eventLogCapture.getValue().getLockedBy(), is(nullValue())); + assertThat(eventLogCapture.getValue().getLockedUntil(), is(nullValue())); + } + + @Test + public void testFireBusinessEvents() throws Exception { + MockPayload mockPayload1 = Fixture.mockPayload(1, "mockedcode1", true, + Fixture.mockSubClass("some info 1_0"), Fixture.mockSubList(2, "some detail 1_2")); + MockPayload mockPayload2 = Fixture.mockPayload(2, "mockedcode2", true, + Fixture.mockSubClass("some info 2_0"), Fixture.mockSubList(2, "some detail 2_1")); + + eventLogWriter.fireBusinessEvents(PUBLISHER_EVENT_TYPE, + Stream.of(mockPayload1, mockPayload2).collect(Collectors.toList())); + + verify(eventLogRepository).persist(eventLogsCapture.capture()); + + Iterator eventLogIterator = eventLogsCapture.getValue().iterator(); + EventLog eventLog1 = eventLogIterator.next(); + assertThat(eventLog1.getEventBodyData(), is(OBJECT_MAPPER.writeValueAsString(mockPayload1))); + assertThat(eventLog1.getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLog1.getFlowId(), is(TRACE_ID)); + assertThat(eventLog1.getLockedBy(), is(nullValue())); + assertThat(eventLog1.getLockedUntil(), is(nullValue())); + + EventLog eventLog2 = eventLogIterator.next(); + assertThat(eventLog2.getEventBodyData(), is(OBJECT_MAPPER.writeValueAsString(mockPayload2))); + assertThat(eventLog2.getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(eventLog2.getFlowId(), is(TRACE_ID)); + assertThat(eventLog2.getLockedBy(), is(nullValue())); + assertThat(eventLog2.getLockedUntil(), is(nullValue())); + } + + private void verifyEventLogs(String dataOp, Set eventLogs) { + Optional firstEventLog = eventLogs.stream().filter( + eventLog -> eventLog.getEventBodyData().contains("mockedcode1")).findFirst(); + assertThat(firstEventLog.isPresent(), is(true)); + assertThat(firstEventLog.get().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_1.replace("{DATA_OP}", dataOp))); + assertThat(firstEventLog.get().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(firstEventLog.get().getFlowId(), is(TRACE_ID)); + assertThat(firstEventLog.get().getLockedBy(), is(nullValue())); + assertThat(firstEventLog.get().getLockedUntil(), is(nullValue())); + + Optional secondEventLog = eventLogs.stream().filter( + eventLog -> eventLog.getEventBodyData().contains("mockedcode2")).findFirst(); + assertThat(secondEventLog.isPresent(), is(true)); + assertThat(secondEventLog.get().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_2.replace("{DATA_OP}", dataOp))); + assertThat(secondEventLog.get().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(secondEventLog.get().getFlowId(), is(TRACE_ID)); + assertThat(secondEventLog.get().getLockedBy(), is(nullValue())); + assertThat(secondEventLog.get().getLockedUntil(), is(nullValue())); + + Optional thirdEventLog = eventLogs.stream().filter( + eventLog -> eventLog.getEventBodyData().contains("mockedcode3")).findFirst(); + assertThat(thirdEventLog.isPresent(), is(true)); + assertThat(thirdEventLog.get().getEventBodyData(), is( + DATA_CHANGE_BODY_DATA_3.replace("{DATA_OP}", dataOp))); + assertThat(thirdEventLog.get().getEventType(), is(PUBLISHER_EVENT_TYPE)); + assertThat(thirdEventLog.get().getFlowId(), is(TRACE_ID)); + assertThat(thirdEventLog.get().getLockedBy(), is(nullValue())); + assertThat(thirdEventLog.get().getLockedUntil(), is(nullValue())); + } } diff --git a/pom.xml b/pom.xml index 3b55880d..38af18ee 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ nakadi-producer-reactor org.zalando - 20.3.2 + 20.4.0 pom Nakadi Event Producer Reactor