diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java index 645fef1f..be30a262 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationService.java @@ -2,7 +2,7 @@ import static java.util.Collections.unmodifiableSet; import static java.util.function.Function.identity; -import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.*; import java.util.List; import java.util.Map; @@ -50,10 +50,12 @@ public void createSnapshotEvents(final String eventType, String filter) { break; } - for (final Snapshot snapshot : snapshots) { - eventLogWriter.fireSnapshotEvent(eventType, snapshot.getDataType(), snapshot.getData()); - lastProcessedId = snapshot.getId(); - } + snapshots.stream() + .collect(groupingBy(Snapshot::getDataType, mapping(Snapshot::getData, toList()))) + .forEach((dataType, snapshotPartition) -> + eventLogWriter.fireSnapshotEvents(eventType, dataType, snapshotPartition)); + + lastProcessedId = snapshots.get(snapshots.size()-1).getId(); } while (true); } diff --git a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java index ea96d066..efb59285 100644 --- a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/snapshots/impl/SnapshotCreationServiceTest.java @@ -3,7 +3,10 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.Is.is; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -13,8 +16,11 @@ import static org.zalando.nakadiproducer.util.Fixture.PUBLISHER_DATA_TYPE; import static org.zalando.nakadiproducer.util.Fixture.PUBLISHER_EVENT_TYPE; +import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -41,18 +47,14 @@ public class SnapshotCreationServiceTest { private SnapshotCreationService snapshotCreationService; - private MockPayload eventPayload; - @Rule public final ExpectedException expectedException = ExpectedException.none(); @Captor - private ArgumentCaptor listEventLogCaptor; + private ArgumentCaptor> eventLogDataCaptor; @Before public void setUp() throws Exception { - eventPayload = Fixture.mockPayload(1, "mockedcode", true, - Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); when(snapshotEventGenerator.getSupportedEventType()).thenReturn(PUBLISHER_EVENT_TYPE); snapshotCreationService = new SnapshotCreationService(asList(snapshotEventGenerator), eventLogWriter); } @@ -61,32 +63,43 @@ public void setUp() throws Exception { public void testCreateSnapshotEvents() { final String filter = "exampleFilter"; + MockPayload eventPayload = Fixture.mockPayload(1, "mockedcode", true, + Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail")); + when(snapshotEventGenerator.generateSnapshots(null, filter)).thenReturn( singletonList(new Snapshot(1, PUBLISHER_DATA_TYPE, eventPayload))); snapshotCreationService.createSnapshotEvents(PUBLISHER_EVENT_TYPE, filter); - verify(eventLogWriter).fireSnapshotEvent(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE), - listEventLogCaptor.capture()); - assertThat(listEventLogCaptor.getValue(), is(eventPayload)); + verify(eventLogWriter).fireSnapshotEvents(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE), + eventLogDataCaptor.capture()); + assertThat(eventLogDataCaptor.getValue(), Matchers.contains(eventPayload)); } @Test public void testSnapshotSavedInBatches() { final String filter = "exampleFilter2"; - final List eventPayloads = Fixture.mockSnapshotList(5); + final List eventSnapshots = Fixture.mockSnapshotList(5); // when snapshot returns 5 item stream - when(snapshotEventGenerator.generateSnapshots(null, filter)).thenReturn(eventPayloads.subList(0, 3)); - when(snapshotEventGenerator.generateSnapshots(2, filter)).thenReturn(eventPayloads.subList(3, 5)); + when(snapshotEventGenerator.generateSnapshots(null, filter)).thenReturn(eventSnapshots.subList(0, 3)); + when(snapshotEventGenerator.generateSnapshots(2, filter)).thenReturn(eventSnapshots.subList(3, 5)); when(snapshotEventGenerator.generateSnapshots(4, filter)).thenReturn(emptyList()); // create a snapshot snapshotCreationService.createSnapshotEvents(PUBLISHER_EVENT_TYPE, filter); // verify that all returned events got written - verify(eventLogWriter, times(5)).fireSnapshotEvent(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE), - isA(MockPayload.class)); + verify(eventLogWriter, times(2)).fireSnapshotEvents(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE), + eventLogDataCaptor.capture()); + + List payloads = eventSnapshots.stream().map(Snapshot::getData).collect(toList()); + List writtenEvents = eventLogDataCaptor + .getAllValues() + .stream() + .flatMap(Collection::stream) + .collect(toList()); + assertThat(writtenEvents, is(equalTo(payloads))); } }