diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f92d65eeb..288abe10c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Changed +- Optimized stream initialization + ## [2.3.3] - 2017-12-12 ### Added diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java index dc1259c0e9..6dee5d3ad6 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java @@ -1,11 +1,14 @@ package org.zalando.nakadi.service.subscription.state; +import org.zalando.nakadi.domain.PartitionEndStatistics; +import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.NakadiRuntimeException; import org.zalando.nakadi.exceptions.NoStreamingSlotsAvailable; +import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.subscription.model.Partition; @@ -15,11 +18,12 @@ import javax.ws.rs.core.Response; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.groupingBy; public class StartingState extends State { @Override @@ -113,19 +117,30 @@ public List calculate( final Subscription subscription, final TimelineService timelineService, final CursorConverter converter) { - final List result = new ArrayList<>(); - try { - for (final String eventType : subscription.getEventTypes()) { - final List activeTimelines = timelineService.getActiveTimelinesOrdered(eventType); - final Timeline timeline = activeTimelines.get(0); - timelineService.getTopicRepository(timeline) - .loadTopicStatistics(Collections.singletonList(timeline)) - .forEach(stat -> result.add(converter.convertToNoToken(stat.getBeforeFirst()))); - } - return result; - } catch (final Exception ex) { - throw new NakadiRuntimeException(ex); - } + return subscription.getEventTypes() + .stream() + .map(et -> { + try { + // get oldest active timeline + return timelineService.getActiveTimelinesOrdered(et).get(0); + } catch (final NakadiException e) { + throw new NakadiRuntimeException(e); + } + }) + .collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387 + .values() + .stream() + .flatMap(timelines -> { + try { + return timelineService.getTopicRepository(timelines.get(0)) + .loadTopicStatistics(timelines).stream(); + } catch (final ServiceUnavailableException e) { + throw new NakadiRuntimeException(e); + } + }) + .map(PartitionStatistics::getBeforeFirst) + .map(converter::convertToNoToken) + .collect(Collectors.toList()); } } @@ -140,19 +155,31 @@ public List calculate( final Subscription subscription, final TimelineService timelineService, final CursorConverter converter) { - final List result = new ArrayList<>(); - try { - for (final String eventType : subscription.getEventTypes()) { - final List activeTimelines = timelineService.getActiveTimelinesOrdered(eventType); - final Timeline timeline = activeTimelines.get(activeTimelines.size() - 1); - timelineService.getTopicRepository(timeline) - .loadTopicStatistics(Collections.singletonList(timeline)) - .forEach(stat -> result.add(converter.convertToNoToken(stat.getLast()))); - } - return result; - } catch (final Exception ex) { - throw new NakadiRuntimeException(ex); - } + return subscription.getEventTypes() + .stream() + .map(et -> { + try { + // get newest active timeline + final List activeTimelines = timelineService.getActiveTimelinesOrdered(et); + return activeTimelines.get(activeTimelines.size() - 1); + } catch (final NakadiException e) { + throw new NakadiRuntimeException(e); + } + }) + .collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387 + .values() + .stream() + .flatMap(timelines -> { + try { + return timelineService.getTopicRepository(timelines.get(0)) + .loadTopicEndStatistics(timelines).stream(); + } catch (final ServiceUnavailableException e) { + throw new NakadiRuntimeException(e); + } + }) + .map(PartitionEndStatistics::getLast) + .map(converter::convertToNoToken) + .collect(Collectors.toList()); } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index c3db07fdeb..b1e6c128e8 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -8,11 +8,9 @@ import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Timeline; -import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.NakadiRuntimeException; -import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.metrics.MetricUtils; import org.zalando.nakadi.repository.EventConsumer; @@ -24,7 +22,6 @@ import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -38,6 +35,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.stream.Collectors.groupingBy; + class StreamingState extends State { private final Map offsets = new HashMap<>(); @@ -441,14 +440,17 @@ private void reconfigureKafkaConsumer(final boolean forceSeek) { if (!currentAssignment.equals(newAssignment)) { try { - final List cursors = new ArrayList<>(); - for (final EventTypePartition pk : newAssignment) { - // Next 2 lines checks that current cursor is still available in storage - final NakadiCursor beforeFirstAvailable = getBeforeFirstCursor(pk); - offsets.get(pk).ensureDataAvailable(beforeFirstAvailable); - // Now it is safe to reposition. - cursors.add(offsets.get(pk).getSentOffset()); - } + final Map beforeFirst = getBeforeFirstCursors(newAssignment); + final List cursors = newAssignment.stream() + .map(pk -> { + final NakadiCursor beforeFirstAvailable = beforeFirst.get(pk); + + // Checks that current cursor is still available in storage + offsets.get(pk).ensureDataAvailable(beforeFirstAvailable); + return offsets.get(pk).getSentOffset(); + }) + .collect(Collectors.toList()); + eventConsumer.reassign(cursors); } catch (NakadiException | InvalidCursorException ex) { throw new NakadiRuntimeException(ex); @@ -456,16 +458,30 @@ private void reconfigureKafkaConsumer(final boolean forceSeek) { } } - private NakadiCursor getBeforeFirstCursor(final EventTypePartition pk) - throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException { - final Timeline firstTimelineForET = getContext().getTimelineService() - .getActiveTimelinesOrdered(pk.getEventType()).get(0); - - final Optional stats = getContext().getTimelineService() - .getTopicRepository(firstTimelineForET) - .loadPartitionStatistics(firstTimelineForET, pk.getPartition()); - - return stats.get().getBeforeFirst(); + private Map getBeforeFirstCursors(final Set newAssignment) { + return newAssignment.stream() + .map(EventTypePartition::getEventType) + .map(et -> { + try { + // get oldest active timeline + return getContext().getTimelineService().getActiveTimelinesOrdered(et).get(0); + } catch (final NakadiException e) { + throw new NakadiRuntimeException(e); + } + }) + .collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387 + .values() + .stream() + .flatMap(timelines -> { + try { + return getContext().getTimelineService().getTopicRepository(timelines.get(0)) + .loadTopicStatistics(timelines).stream(); + } catch (final ServiceUnavailableException e) { + throw new NakadiRuntimeException(e); + } + }) + .map(PartitionStatistics::getBeforeFirst) + .collect(Collectors.toMap(NakadiCursor::getEventTypePartition, cursor -> cursor)); } private NakadiCursor createNakadiCursor(final SubscriptionCursorWithoutToken cursor) { diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java index 834d942d8f..44bda1784f 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StartingStateTest.java @@ -8,6 +8,7 @@ import org.junit.Test; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; +import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.domain.Timeline; @@ -17,9 +18,9 @@ import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; -import java.util.Collections; import java.util.List; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -57,7 +58,6 @@ public void setUp() throws Exception { public void testGetSubscriptionOffsetsBegin() throws Exception { when(subscription.getReadFrom()).thenReturn(SubscriptionBase.InitialPosition.BEGIN); - final NakadiCursor beforeBegin0 = mock(NakadiCursor.class); final SubscriptionCursorWithoutToken beforeBegin0Converted = mock(SubscriptionCursorWithoutToken.class); when(cursorConverter.convertToNoToken(eq(beforeBegin0))).thenReturn(beforeBegin0Converted); @@ -65,24 +65,22 @@ public void testGetSubscriptionOffsetsBegin() throws Exception { final SubscriptionCursorWithoutToken beforeBegin1Converted = mock(SubscriptionCursorWithoutToken.class); when(cursorConverter.convertToNoToken(eq(beforeBegin1))).thenReturn(beforeBegin1Converted); - final TopicRepository firstTR = mock(TopicRepository.class); + final TopicRepository topicRepository = mock(TopicRepository.class); + + final PartitionStatistics resultForTopic0 = mock(PartitionStatistics.class); + when(resultForTopic0.getBeforeFirst()).thenReturn(beforeBegin0); - final List resultForTopic0 = Collections.singletonList( - mock(PartitionStatistics.class)); - when(resultForTopic0.get(0).getBeforeFirst()).thenReturn(beforeBegin0); - when(firstTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt00)))) - .thenReturn(resultForTopic0); + final PartitionStatistics resultForTopic1 = mock(PartitionStatistics.class); + when(resultForTopic1.getBeforeFirst()).thenReturn(beforeBegin1); - final TopicRepository secondTR = mock(TopicRepository.class); - final List resultForTopic1 = Collections.singletonList( - mock(PartitionStatistics.class)); - when(resultForTopic1.get(0).getBeforeFirst()).thenReturn(beforeBegin1); + when(topicRepository.loadTopicStatistics(any())) + .thenReturn(Lists.newArrayList(resultForTopic0, resultForTopic1)); - when(secondTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt10)))) - .thenReturn(resultForTopic1); + when(timelineService.getTopicRepository(eq(timelineEt00))).thenReturn(topicRepository); - when(timelineService.getTopicRepository(eq(timelineEt00))).thenReturn(firstTR); - when(timelineService.getTopicRepository(eq(timelineEt10))).thenReturn(secondTR); + final Storage storage = mock(Storage.class); + when(timelineEt00.getStorage()).thenReturn(storage); + when(timelineEt10.getStorage()).thenReturn(storage); final List cursors = StartingState.calculateStartPosition( subscription, timelineService, cursorConverter); @@ -103,18 +101,20 @@ public void testGetSubscriptionOffsetsEnd() throws Exception { final SubscriptionCursorWithoutToken end1Converted = mock(SubscriptionCursorWithoutToken.class); when(cursorConverter.convertToNoToken(eq(end1))).thenReturn(end1Converted); - final TopicRepository firstTR = mock(TopicRepository.class); - final List statsForEt0 = Collections.singletonList(mock(PartitionStatistics.class)); - when(statsForEt0.get(0).getLast()).thenReturn(end0); - when(firstTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt01)))).thenReturn(statsForEt0); + final TopicRepository topicRepository = mock(TopicRepository.class); + final PartitionStatistics statsForEt0 = mock(PartitionStatistics.class); + when(statsForEt0.getLast()).thenReturn(end0); + + final PartitionStatistics statsForTopic1 = mock(PartitionStatistics.class); + when(statsForTopic1.getLast()).thenReturn(end1); + + when(topicRepository.loadTopicEndStatistics(any())).thenReturn(Lists.newArrayList(statsForEt0, statsForTopic1)); - final TopicRepository secondTR = mock(TopicRepository.class); - final List statsForTopic1 = Collections.singletonList(mock(PartitionStatistics.class)); - when(statsForTopic1.get(0).getLast()).thenReturn(end1); - when(secondTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt11)))).thenReturn(statsForTopic1); + when(timelineService.getTopicRepository(eq(timelineEt01))).thenReturn(topicRepository); - when(timelineService.getTopicRepository(eq(timelineEt01))).thenReturn(firstTR); - when(timelineService.getTopicRepository(eq(timelineEt11))).thenReturn(secondTR); + final Storage storage = mock(Storage.class); + when(timelineEt01.getStorage()).thenReturn(storage); + when(timelineEt11.getStorage()).thenReturn(storage); final List cursors = StartingState.calculateStartPosition( subscription, timelineService, cursorConverter); diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index 10c4a20df2..9e1b3a8828 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -1,12 +1,14 @@ package org.zalando.nakadi.service.subscription.state; import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; +import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.InternalNakadiException; @@ -27,7 +29,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; -import java.util.Optional; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -123,13 +124,14 @@ public void ensureOffsetsSubscriptionsAreRefreshedAndClosed() when(timelineService.createEventConsumer(any())).thenReturn(consumer); when(subscription.getEventTypes()).thenReturn(Collections.singleton("t")); - final Timeline timeline = new Timeline("t", 0, null, "t", new Date()); + final Storage storage = mock(Storage.class); + final Timeline timeline = new Timeline("t", 0, storage, "t", new Date()); when(timelineService.getActiveTimelinesOrdered(eq("t"))).thenReturn(Collections.singletonList(timeline)); final TopicRepository topicRepository = mock(TopicRepository.class); when(timelineService.getTopicRepository(eq(timeline))).thenReturn(topicRepository); final PartitionStatistics stats = mock(PartitionStatistics.class); when(stats.getBeforeFirst()).thenReturn(new NakadiCursor(timeline, "0", "0")); - when(topicRepository.loadPartitionStatistics(eq(timeline), eq("0"))).thenReturn(Optional.of(stats)); + when(topicRepository.loadTopicStatistics(any())).thenReturn(Lists.newArrayList(stats)); state.onEnter();