Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #804 from zalando/aruha-1387-faster-stream-initial…
Browse files Browse the repository at this point in the history
…ization

Aruha 1387 faster stream initialization
  • Loading branch information
antban authored Dec 14, 2017
2 parents 0ca1b8f + bdbf665 commit fc6739e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 78 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Changed
- Optimized stream initialization

## [2.3.3] - 2017-12-12

### Added
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.zalando.nakadi.service.subscription.state;

import org.zalando.nakadi.domain.PartitionEndStatistics;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.NoStreamingSlotsAvailable;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.subscription.model.Partition;
Expand All @@ -15,11 +18,12 @@

import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.groupingBy;

public class StartingState extends State {
@Override
Expand Down Expand Up @@ -113,19 +117,30 @@ public List<SubscriptionCursorWithoutToken> calculate(
final Subscription subscription,
final TimelineService timelineService,
final CursorConverter converter) {
final List<SubscriptionCursorWithoutToken> result = new ArrayList<>();
try {
for (final String eventType : subscription.getEventTypes()) {
final List<Timeline> activeTimelines = timelineService.getActiveTimelinesOrdered(eventType);
final Timeline timeline = activeTimelines.get(0);
timelineService.getTopicRepository(timeline)
.loadTopicStatistics(Collections.singletonList(timeline))
.forEach(stat -> result.add(converter.convertToNoToken(stat.getBeforeFirst())));
}
return result;
} catch (final Exception ex) {
throw new NakadiRuntimeException(ex);
}
return subscription.getEventTypes()
.stream()
.map(et -> {
try {
// get oldest active timeline
return timelineService.getActiveTimelinesOrdered(et).get(0);
} catch (final NakadiException e) {
throw new NakadiRuntimeException(e);
}
})
.collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387
.values()
.stream()
.flatMap(timelines -> {
try {
return timelineService.getTopicRepository(timelines.get(0))
.loadTopicStatistics(timelines).stream();
} catch (final ServiceUnavailableException e) {
throw new NakadiRuntimeException(e);
}
})
.map(PartitionStatistics::getBeforeFirst)
.map(converter::convertToNoToken)
.collect(Collectors.toList());
}
}

Expand All @@ -140,19 +155,31 @@ public List<SubscriptionCursorWithoutToken> calculate(
final Subscription subscription,
final TimelineService timelineService,
final CursorConverter converter) {
final List<SubscriptionCursorWithoutToken> result = new ArrayList<>();
try {
for (final String eventType : subscription.getEventTypes()) {
final List<Timeline> activeTimelines = timelineService.getActiveTimelinesOrdered(eventType);
final Timeline timeline = activeTimelines.get(activeTimelines.size() - 1);
timelineService.getTopicRepository(timeline)
.loadTopicStatistics(Collections.singletonList(timeline))
.forEach(stat -> result.add(converter.convertToNoToken(stat.getLast())));
}
return result;
} catch (final Exception ex) {
throw new NakadiRuntimeException(ex);
}
return subscription.getEventTypes()
.stream()
.map(et -> {
try {
// get newest active timeline
final List<Timeline> activeTimelines = timelineService.getActiveTimelinesOrdered(et);
return activeTimelines.get(activeTimelines.size() - 1);
} catch (final NakadiException e) {
throw new NakadiRuntimeException(e);
}
})
.collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387
.values()
.stream()
.flatMap(timelines -> {
try {
return timelineService.getTopicRepository(timelines.get(0))
.loadTopicEndStatistics(timelines).stream();
} catch (final ServiceUnavailableException e) {
throw new NakadiRuntimeException(e);
}
})
.map(PartitionEndStatistics::getLast)
.map(converter::convertToNoToken)
.collect(Collectors.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.exceptions.InvalidCursorException;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.metrics.MetricUtils;
import org.zalando.nakadi.repository.EventConsumer;
Expand All @@ -24,7 +22,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -38,6 +35,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.groupingBy;


class StreamingState extends State {
private final Map<EventTypePartition, PartitionData> offsets = new HashMap<>();
Expand Down Expand Up @@ -441,31 +440,48 @@ private void reconfigureKafkaConsumer(final boolean forceSeek) {

if (!currentAssignment.equals(newAssignment)) {
try {
final List<NakadiCursor> cursors = new ArrayList<>();
for (final EventTypePartition pk : newAssignment) {
// Next 2 lines checks that current cursor is still available in storage
final NakadiCursor beforeFirstAvailable = getBeforeFirstCursor(pk);
offsets.get(pk).ensureDataAvailable(beforeFirstAvailable);
// Now it is safe to reposition.
cursors.add(offsets.get(pk).getSentOffset());
}
final Map<EventTypePartition, NakadiCursor> beforeFirst = getBeforeFirstCursors(newAssignment);
final List<NakadiCursor> cursors = newAssignment.stream()
.map(pk -> {
final NakadiCursor beforeFirstAvailable = beforeFirst.get(pk);

// Checks that current cursor is still available in storage
offsets.get(pk).ensureDataAvailable(beforeFirstAvailable);
return offsets.get(pk).getSentOffset();
})
.collect(Collectors.toList());

eventConsumer.reassign(cursors);
} catch (NakadiException | InvalidCursorException ex) {
throw new NakadiRuntimeException(ex);
}
}
}

private NakadiCursor getBeforeFirstCursor(final EventTypePartition pk)
throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException {
final Timeline firstTimelineForET = getContext().getTimelineService()
.getActiveTimelinesOrdered(pk.getEventType()).get(0);

final Optional<PartitionStatistics> stats = getContext().getTimelineService()
.getTopicRepository(firstTimelineForET)
.loadPartitionStatistics(firstTimelineForET, pk.getPartition());

return stats.get().getBeforeFirst();
private Map<EventTypePartition, NakadiCursor> getBeforeFirstCursors(final Set<EventTypePartition> newAssignment) {
return newAssignment.stream()
.map(EventTypePartition::getEventType)
.map(et -> {
try {
// get oldest active timeline
return getContext().getTimelineService().getActiveTimelinesOrdered(et).get(0);
} catch (final NakadiException e) {
throw new NakadiRuntimeException(e);
}
})
.collect(groupingBy(Timeline::getStorage)) // for performance reasons. See ARUHA-1387
.values()
.stream()
.flatMap(timelines -> {
try {
return getContext().getTimelineService().getTopicRepository(timelines.get(0))
.loadTopicStatistics(timelines).stream();
} catch (final ServiceUnavailableException e) {
throw new NakadiRuntimeException(e);
}
})
.map(PartitionStatistics::getBeforeFirst)
.collect(Collectors.toMap(NakadiCursor::getEventTypePartition, cursor -> cursor));
}

private NakadiCursor createNakadiCursor(final SubscriptionCursorWithoutToken cursor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.Test;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.domain.Timeline;
Expand All @@ -17,9 +18,9 @@
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import java.util.Collections;
import java.util.List;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -57,32 +58,29 @@ public void setUp() throws Exception {
public void testGetSubscriptionOffsetsBegin() throws Exception {
when(subscription.getReadFrom()).thenReturn(SubscriptionBase.InitialPosition.BEGIN);


final NakadiCursor beforeBegin0 = mock(NakadiCursor.class);
final SubscriptionCursorWithoutToken beforeBegin0Converted = mock(SubscriptionCursorWithoutToken.class);
when(cursorConverter.convertToNoToken(eq(beforeBegin0))).thenReturn(beforeBegin0Converted);
final NakadiCursor beforeBegin1 = mock(NakadiCursor.class);
final SubscriptionCursorWithoutToken beforeBegin1Converted = mock(SubscriptionCursorWithoutToken.class);
when(cursorConverter.convertToNoToken(eq(beforeBegin1))).thenReturn(beforeBegin1Converted);

final TopicRepository firstTR = mock(TopicRepository.class);
final TopicRepository topicRepository = mock(TopicRepository.class);

final PartitionStatistics resultForTopic0 = mock(PartitionStatistics.class);
when(resultForTopic0.getBeforeFirst()).thenReturn(beforeBegin0);

final List<PartitionStatistics> resultForTopic0 = Collections.singletonList(
mock(PartitionStatistics.class));
when(resultForTopic0.get(0).getBeforeFirst()).thenReturn(beforeBegin0);
when(firstTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt00))))
.thenReturn(resultForTopic0);
final PartitionStatistics resultForTopic1 = mock(PartitionStatistics.class);
when(resultForTopic1.getBeforeFirst()).thenReturn(beforeBegin1);

final TopicRepository secondTR = mock(TopicRepository.class);
final List<PartitionStatistics> resultForTopic1 = Collections.singletonList(
mock(PartitionStatistics.class));
when(resultForTopic1.get(0).getBeforeFirst()).thenReturn(beforeBegin1);
when(topicRepository.loadTopicStatistics(any()))
.thenReturn(Lists.newArrayList(resultForTopic0, resultForTopic1));

when(secondTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt10))))
.thenReturn(resultForTopic1);
when(timelineService.getTopicRepository(eq(timelineEt00))).thenReturn(topicRepository);

when(timelineService.getTopicRepository(eq(timelineEt00))).thenReturn(firstTR);
when(timelineService.getTopicRepository(eq(timelineEt10))).thenReturn(secondTR);
final Storage storage = mock(Storage.class);
when(timelineEt00.getStorage()).thenReturn(storage);
when(timelineEt10.getStorage()).thenReturn(storage);

final List<SubscriptionCursorWithoutToken> cursors = StartingState.calculateStartPosition(
subscription, timelineService, cursorConverter);
Expand All @@ -103,18 +101,20 @@ public void testGetSubscriptionOffsetsEnd() throws Exception {
final SubscriptionCursorWithoutToken end1Converted = mock(SubscriptionCursorWithoutToken.class);
when(cursorConverter.convertToNoToken(eq(end1))).thenReturn(end1Converted);

final TopicRepository firstTR = mock(TopicRepository.class);
final List<PartitionStatistics> statsForEt0 = Collections.singletonList(mock(PartitionStatistics.class));
when(statsForEt0.get(0).getLast()).thenReturn(end0);
when(firstTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt01)))).thenReturn(statsForEt0);
final TopicRepository topicRepository = mock(TopicRepository.class);
final PartitionStatistics statsForEt0 = mock(PartitionStatistics.class);
when(statsForEt0.getLast()).thenReturn(end0);

final PartitionStatistics statsForTopic1 = mock(PartitionStatistics.class);
when(statsForTopic1.getLast()).thenReturn(end1);

when(topicRepository.loadTopicEndStatistics(any())).thenReturn(Lists.newArrayList(statsForEt0, statsForTopic1));

final TopicRepository secondTR = mock(TopicRepository.class);
final List<PartitionStatistics> statsForTopic1 = Collections.singletonList(mock(PartitionStatistics.class));
when(statsForTopic1.get(0).getLast()).thenReturn(end1);
when(secondTR.loadTopicStatistics(eq(Collections.singletonList(timelineEt11)))).thenReturn(statsForTopic1);
when(timelineService.getTopicRepository(eq(timelineEt01))).thenReturn(topicRepository);

when(timelineService.getTopicRepository(eq(timelineEt01))).thenReturn(firstTR);
when(timelineService.getTopicRepository(eq(timelineEt11))).thenReturn(secondTR);
final Storage storage = mock(Storage.class);
when(timelineEt01.getStorage()).thenReturn(storage);
when(timelineEt11.getStorage()).thenReturn(storage);

final List<SubscriptionCursorWithoutToken> cursors = StartingState.calculateStartPosition(
subscription, timelineService, cursorConverter);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.zalando.nakadi.service.subscription.state;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.InternalNakadiException;
Expand All @@ -27,7 +29,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Optional;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
Expand Down Expand Up @@ -123,13 +124,14 @@ public void ensureOffsetsSubscriptionsAreRefreshedAndClosed()
when(timelineService.createEventConsumer(any())).thenReturn(consumer);
when(subscription.getEventTypes()).thenReturn(Collections.singleton("t"));

final Timeline timeline = new Timeline("t", 0, null, "t", new Date());
final Storage storage = mock(Storage.class);
final Timeline timeline = new Timeline("t", 0, storage, "t", new Date());
when(timelineService.getActiveTimelinesOrdered(eq("t"))).thenReturn(Collections.singletonList(timeline));
final TopicRepository topicRepository = mock(TopicRepository.class);
when(timelineService.getTopicRepository(eq(timeline))).thenReturn(topicRepository);
final PartitionStatistics stats = mock(PartitionStatistics.class);
when(stats.getBeforeFirst()).thenReturn(new NakadiCursor(timeline, "0", "0"));
when(topicRepository.loadPartitionStatistics(eq(timeline), eq("0"))).thenReturn(Optional.of(stats));
when(topicRepository.loadTopicStatistics(any())).thenReturn(Lists.newArrayList(stats));

state.onEnter();

Expand Down

0 comments on commit fc6739e

Please sign in to comment.