Skip to content

Commit

Permalink
use batching for snapshots, too (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
ePaul committed Nov 29, 2022
1 parent 7c6b572 commit adc803f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static java.util.Collections.unmodifiableSet;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.*;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -50,10 +50,12 @@ public void createSnapshotEvents(final String eventType, String filter) {
break;
}

for (final Snapshot snapshot : snapshots) {
eventLogWriter.fireSnapshotEvent(eventType, snapshot.getDataType(), snapshot.getData());
lastProcessedId = snapshot.getId();
}
snapshots.stream()
.collect(groupingBy(Snapshot::getDataType, mapping(Snapshot::getData, toList())))
.forEach((dataType, snapshotPartition) ->
eventLogWriter.fireSnapshotEvents(eventType, dataType, snapshotPartition));

lastProcessedId = snapshots.get(snapshots.size()-1).getId();
} while (true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand All @@ -13,8 +16,11 @@
import static org.zalando.nakadiproducer.util.Fixture.PUBLISHER_DATA_TYPE;
import static org.zalando.nakadiproducer.util.Fixture.PUBLISHER_EVENT_TYPE;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -41,18 +47,14 @@ public class SnapshotCreationServiceTest {

private SnapshotCreationService snapshotCreationService;

private MockPayload eventPayload;

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Captor
private ArgumentCaptor<MockPayload> listEventLogCaptor;
private ArgumentCaptor<Collection<?>> eventLogDataCaptor;

@Before
public void setUp() throws Exception {
eventPayload = Fixture.mockPayload(1, "mockedcode", true,
Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail"));
when(snapshotEventGenerator.getSupportedEventType()).thenReturn(PUBLISHER_EVENT_TYPE);
snapshotCreationService = new SnapshotCreationService(asList(snapshotEventGenerator), eventLogWriter);
}
Expand All @@ -61,32 +63,43 @@ public void setUp() throws Exception {
public void testCreateSnapshotEvents() {
final String filter = "exampleFilter";

MockPayload eventPayload = Fixture.mockPayload(1, "mockedcode", true,
Fixture.mockSubClass("some info"), Fixture.mockSubList(2, "some detail"));

when(snapshotEventGenerator.generateSnapshots(null, filter)).thenReturn(
singletonList(new Snapshot(1, PUBLISHER_DATA_TYPE, eventPayload)));

snapshotCreationService.createSnapshotEvents(PUBLISHER_EVENT_TYPE, filter);

verify(eventLogWriter).fireSnapshotEvent(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE),
listEventLogCaptor.capture());
assertThat(listEventLogCaptor.getValue(), is(eventPayload));
verify(eventLogWriter).fireSnapshotEvents(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE),
eventLogDataCaptor.capture());
assertThat(eventLogDataCaptor.getValue(), Matchers.contains(eventPayload));
}

@Test
public void testSnapshotSavedInBatches() {
final String filter = "exampleFilter2";

final List<Snapshot> eventPayloads = Fixture.mockSnapshotList(5);
final List<Snapshot> eventSnapshots = Fixture.mockSnapshotList(5);

// when snapshot returns 5 item stream
when(snapshotEventGenerator.generateSnapshots(null, filter)).thenReturn(eventPayloads.subList(0, 3));
when(snapshotEventGenerator.generateSnapshots(2, filter)).thenReturn(eventPayloads.subList(3, 5));
when(snapshotEventGenerator.generateSnapshots(null, filter)).thenReturn(eventSnapshots.subList(0, 3));
when(snapshotEventGenerator.generateSnapshots(2, filter)).thenReturn(eventSnapshots.subList(3, 5));
when(snapshotEventGenerator.generateSnapshots(4, filter)).thenReturn(emptyList());

// create a snapshot
snapshotCreationService.createSnapshotEvents(PUBLISHER_EVENT_TYPE, filter);

// verify that all returned events got written
verify(eventLogWriter, times(5)).fireSnapshotEvent(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE),
isA(MockPayload.class));
verify(eventLogWriter, times(2)).fireSnapshotEvents(eq(PUBLISHER_EVENT_TYPE), eq(PUBLISHER_DATA_TYPE),
eventLogDataCaptor.capture());

List<?> payloads = eventSnapshots.stream().map(Snapshot::getData).collect(toList());
List<?> writtenEvents = eventLogDataCaptor
.getAllValues()
.stream()
.flatMap(Collection::stream)
.collect(toList());
assertThat(writtenEvents, is(equalTo(payloads)));
}
}

0 comments on commit adc803f

Please sign in to comment.