Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #797 from zalando/ARUHA-1330
Browse files Browse the repository at this point in the history
ARUHA-1330 Fix nakadi-cursors comparison and usage of finished timelines
  • Loading branch information
antban authored Dec 14, 2017
2 parents c1a3f55 + 2677749 commit 0ca1b8f
Show file tree
Hide file tree
Showing 23 changed files with 401 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.InvalidStreamIdException;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
Expand Down Expand Up @@ -66,7 +65,6 @@ public class CursorsServiceAT extends BaseAT {

private CursorConverter cursorConverter;
private CursorsService cursorsService;
private EventTypeRepository eventTypeRepository;
private UUIDGenerator uuidGenerator;

private String etName;
Expand Down Expand Up @@ -99,9 +97,6 @@ public void before() throws Exception {
final EventType eventType = mock(EventType.class);
when(eventType.getName()).thenReturn(etName);

eventTypeRepository = mock(EventTypeRepository.class);
when(eventTypeRepository.findByName(etName)).thenReturn(eventType);

final ZooKeeperHolder zkHolder = mock(ZooKeeperHolder.class);
when(zkHolder.get()).thenReturn(CURATOR);

Expand All @@ -120,7 +115,7 @@ public void before() throws Exception {
final SubscriptionClientFactory zkSubscriptionFactory = new SubscriptionClientFactory(zkHolder, MAPPER);
uuidGenerator = mock(UUIDGenerator.class);
when(uuidGenerator.isUUID(any())).thenReturn(true);
cursorsService = new CursorsService(timelineService, subscriptionRepo, eventTypeRepository,
cursorsService = new CursorsService(timelineService, subscriptionRepo, null,
mock(NakadiSettings.class), zkSubscriptionFactory, cursorConverter, uuidGenerator);

// Register cursors in converter
Expand Down
21 changes: 6 additions & 15 deletions src/main/java/org/zalando/nakadi/domain/NakadiCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import java.util.Objects;

public class NakadiCursor implements Comparable<NakadiCursor> {
public class NakadiCursor {
public static final int VERSION_LENGTH = 3;

/**
* - ZERO is reserved for old offset format, e.g. those previous to timelines: "000000000000000010"
* - ONE is reserved for the first version of timeline offsets: "001-0001-0000000000000001"
Expand Down Expand Up @@ -80,15 +81,6 @@ public boolean equals(final Object o) {
&& Objects.equals(this.offset, that.offset);
}

@Override
public int compareTo(final NakadiCursor other) {
final int orderDiffers = Integer.compare(this.getTimeline().getOrder(), other.getTimeline().getOrder());
if (0 != orderDiffers) {
return orderDiffers;
}
return this.getOffset().compareTo(other.getOffset());
}

@Override
public int hashCode() {
int result = timeline.hashCode();
Expand All @@ -99,11 +91,10 @@ public int hashCode() {

@Override
public String toString() {
return "NakadiCursor{" +
"partition='" + partition + '\'' +
", offset='" + offset + '\'' +
", timeline='" + timeline + '\'' +
'}';
//Ok, it's time to compact the message.
return "T(" + Timeline.debugString(timeline) + ")-" +
"P(" + partition + ")-" +
"O(" + offset + ")";
}

}
8 changes: 8 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ public String toString() {
private Type type;
private Object configuration;

public Storage() {
}

public Storage(final String id, final Type type) {
this.id = id;
this.type = type;
}

public String getId() {
return id;
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/Timeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.zalando.nakadi.util.UUIDGenerator;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
Expand All @@ -23,6 +24,7 @@ public class Timeline {
private Date cleanedUpAt;
private StoragePosition latestPosition;
private boolean deleted;

public Timeline(
final String eventType,
final int order,
Expand Down Expand Up @@ -193,6 +195,7 @@ public interface StoragePosition {

NakadiCursor toNakadiCursor(Timeline timeline, String partition);

String toDebugString();
}

public static class KafkaStoragePosition implements StoragePosition {
Expand Down Expand Up @@ -230,6 +233,11 @@ public NakadiCursor toNakadiCursor(final Timeline timeline, final String partiti
return cursor.toNakadiCursor(timeline);
}

@Override
public String toDebugString() {
return Arrays.toString(offsets.toArray());
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -250,4 +258,12 @@ public int hashCode() {
}
}

public static String debugString(final Timeline timeline) {
if (null == timeline) {
return "NULL";
}
final String latestOffsetDescr = null == timeline.getLatestPosition() ? "NULL" :
timeline.getLatestPosition().toDebugString();
return timeline.getEventType() + ":" + timeline.getOrder() + ":" + latestOffsetDescr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -60,15 +61,18 @@ public class MultiTimelineEventConsumer implements EventConsumer.ReassignableEve
private final TimelineService timelineService;
private final TimelineSync timelineSync;
private final AtomicBoolean timelinesChanged = new AtomicBoolean(false);
private final Comparator<NakadiCursor> comparator;
private static final Logger LOG = LoggerFactory.getLogger(MultiTimelineEventConsumer.class);

public MultiTimelineEventConsumer(
final String clientId,
final TimelineService timelineService,
final TimelineSync timelineSync) {
final TimelineSync timelineSync,
final Comparator<NakadiCursor> comparator) {
this.clientId = clientId;
this.timelineService = timelineService;
this.timelineSync = timelineSync;
this.comparator = comparator;
}

@Override
Expand Down Expand Up @@ -136,13 +140,13 @@ private TopicRepository selectCorrectTopicRepo(
final NakadiCursor latest = toCheck.calculateNakadiLatestPosition(cursor.getPartition());
if (latest == null) {
electedTimeline = toCheck;
} else if (latest.compareTo(cursor) > 0) {
} else if (comparator.compare(latest, cursor) > 0) {
// There is a border case - latest is equal to begin (that means that there are no available events
// there), and one should position on timeline that have something inside.
final NakadiCursor firstItem = timelineService.getTopicRepository(toCheck)
.loadPartitionStatistics(toCheck, cursor.getPartition())
.get().getFirst();
if (latest.compareTo(firstItem) >= 0) {
if (comparator.compare(latest, firstItem) >= 0) {
electedTimeline = toCheck;
} else {
LOG.info("Timeline {} is empty, skipping", toCheck);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public EventConsumer.LowLevelConsumer createEventConsumer(
@Nullable final String clientId, final List<NakadiCursor> cursors)
throws ServiceUnavailableException, InvalidCursorException {

final Map<NakadiCursor, KafkaCursor> cursorMapping = this.convertToKafkaCursors(cursors);
final Map<NakadiCursor, KafkaCursor> cursorMapping = convertToKafkaCursors(cursors);
final Map<TopicPartition, Timeline> timelineMap = cursorMapping.entrySet().stream()
.collect(Collectors.toMap(
entry -> new TopicPartition(entry.getValue().getTopic(), entry.getValue().getPartition()),
Expand Down Expand Up @@ -475,6 +475,12 @@ public void validateReadCursors(final List<NakadiCursor> cursors)

private Map<NakadiCursor, KafkaCursor> convertToKafkaCursors(final List<NakadiCursor> cursors)
throws ServiceUnavailableException, InvalidCursorException {
// Validate, that topic for this cursor is available
for (final NakadiCursor c: cursors) {
if (c.getTimeline().isDeleted()) {
throw new InvalidCursorException(UNAVAILABLE, c);
}
}
final List<Timeline> timelines = cursors.stream().map(NakadiCursor::getTimeline).distinct().collect(toList());
final List<PartitionStatistics> statistics = loadTopicStatistics(timelines);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public long calculateDistance(final NakadiCursor initialCursor, final NakadiCurs

for (int order = startOrder; order < Math.max(initialOrder, finalOrder); ++order) {
final Timeline timeline = getTimeline(initialCursor.getEventType(), order);
final long eventsTotal = totalEventsInPartition(timeline, initialCursor.getPartition());
final long eventsTotal = StaticStorageWorkerFactory.get(timeline.getStorage())
.totalEventsInPartition(timeline, initialCursor.getPartition());
result += (finalOrder > initialOrder) ? eventsTotal : -eventsTotal;
}
return result;
}


public List<NakadiCursorLag> cursorsLag(final String eventTypeName, final List<NakadiCursor> cursors)
throws InvalidCursorOperation {
try {
Expand Down Expand Up @@ -222,16 +222,4 @@ private TopicRepository getTopicRepository(final Timeline timeline) {
}


// Method can work only with finished timeline (e.g. it will break for active timeline)
private long totalEventsInPartition(final Timeline timeline, final String partitionString)
throws InvalidCursorOperation {
final Timeline.StoragePosition positions = timeline.getLatestPosition();

try {
return 1 + ((Timeline.KafkaStoragePosition) positions).getLastOffsetForPartition(
KafkaCursor.toKafkaPartition(partitionString));
} catch (final IllegalArgumentException ex) {
throw new InvalidCursorOperation(InvalidCursorOperation.Reason.PARTITION_NOT_FOUND);
}
}
}
17 changes: 11 additions & 6 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.zalando.nakadi.exceptions.runtime.CursorUnavailableException;
import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.EventTypeCache;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
Expand All @@ -45,7 +45,7 @@ public class CursorsService {

private final TimelineService timelineService;
private final SubscriptionDbRepository subscriptionRepository;
private final EventTypeRepository eventTypeRepository;
private final EventTypeCache eventTypeCache;
private final NakadiSettings nakadiSettings;
private final SubscriptionClientFactory zkSubscriptionFactory;
private final CursorConverter cursorConverter;
Expand All @@ -54,14 +54,14 @@ public class CursorsService {
@Autowired
public CursorsService(final TimelineService timelineService,
final SubscriptionDbRepository subscriptionRepository,
final EventTypeRepository eventTypeRepository,
final EventTypeCache eventTypeCache,
final NakadiSettings nakadiSettings,
final SubscriptionClientFactory zkSubscriptionFactory,
final CursorConverter cursorConverter,
final UUIDGenerator uuidGenerator) {
this.timelineService = timelineService;
this.subscriptionRepository = subscriptionRepository;
this.eventTypeRepository = eventTypeRepository;
this.eventTypeCache = eventTypeCache;
this.nakadiSettings = nakadiSettings;
this.zkSubscriptionFactory = zkSubscriptionFactory;
this.cursorConverter = cursorConverter;
Expand Down Expand Up @@ -94,7 +94,7 @@ public List<Boolean> commitCursors(final String streamId, final String subscript
TimeLogger.addMeasure("writeToZK");
return zkClient.commitOffsets(
cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()),
new SubscriptionCursorComparator());
new SubscriptionCursorComparator(new NakadiCursorComparator(eventTypeCache)));
}

private void validateStreamId(final List<NakadiCursor> cursors, final String streamId,
Expand Down Expand Up @@ -210,10 +210,15 @@ private void validateCursorsBelongToSubscription(final Subscription subscription

private class SubscriptionCursorComparator implements Comparator<SubscriptionCursorWithoutToken> {
private final Map<SubscriptionCursorWithoutToken, NakadiCursor> cached = new HashMap<>();
private final Comparator<NakadiCursor> comparator;

private SubscriptionCursorComparator(final Comparator<NakadiCursor> comparator) {
this.comparator = comparator;
}

@Override
public int compare(final SubscriptionCursorWithoutToken c1, final SubscriptionCursorWithoutToken c2) {
return convert(c1).compareTo(convert(c2));
return comparator.compare(convert(c1), convert(c2));
}

private NakadiCursor convert(final SubscriptionCursorWithoutToken value) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.zalando.nakadi.service;

import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.InternalNakadiException;
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.repository.db.EventTypeCache;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

public class NakadiCursorComparator implements Comparator<NakadiCursor> {
private final EventTypeCache eventTypeCache;

public NakadiCursorComparator(final EventTypeCache eventTypeCache) {
this.eventTypeCache = eventTypeCache;
}

public int compare(final NakadiCursor c1, final NakadiCursor c2) {
if (!Objects.equals(c1.getEventType(), c2.getEventType())) {
throw new IllegalArgumentException("Cursors from different event types are not comparable");
}
if (c1.getTimeline().getOrder() == c2.getTimeline().getOrder()) {
return c1.getOffset().compareTo(c2.getOffset());
}
if (c1.getTimeline().getOrder() > c2.getTimeline().getOrder()) {
return -compareOrdered(c2, c1);
} else {
return compareOrdered(c1, c2);
}
}

private int compareOrdered(final NakadiCursor c1, final NakadiCursor c2) {
// Disclaimer: The reason of this method complexity is to avoid objects creation.

// If c2 moved from -1, than it is definitely greater.
if (!StaticStorageWorkerFactory.get(c2.getTimeline().getStorage()).isInitialOffset(c2.getOffset())) {
return -1;
}

Iterator<Timeline> timelineIterator = null;

NakadiCursor first = c1;
// Handle obsolete timeline information
if (first.getTimeline().getLatestPosition() == null) {
timelineIterator = createTimelinesIterator(first.getEventType(), first.getTimeline().getOrder());
first = new NakadiCursor(timelineIterator.next(), first.getPartition(), first.getOffset());
}

while (first.getTimeline().getOrder() != c2.getTimeline().getOrder()) {
final boolean isFirstAtEndOfTimeline = StaticStorageWorkerFactory.get(first.getTimeline().getStorage())
.isLastOffsetForPartition(first.getTimeline(), first.getPartition(), first.getOffset());
if (!isFirstAtEndOfTimeline) {
return -1;
}
if (null == timelineIterator) {
timelineIterator = createTimelinesIterator(first.getEventType(), first.getTimeline().getOrder() + 1);
}
final Timeline nextTimeline = timelineIterator.next();
final String initialOffset = StaticStorageWorkerFactory.get(nextTimeline.getStorage())
.getFirstOffsetInTimeline(first.getPartition());
first = new NakadiCursor(
nextTimeline,
first.getPartition(),
initialOffset);
}
return first.getOffset().compareTo(c2.getOffset());
}


private Iterator<Timeline> createTimelinesIterator(final String eventType, final int order) {
try {
final List<Timeline> timelines = eventTypeCache.getTimelinesOrdered(eventType);
final Iterator<Timeline> result = timelines.iterator();
if (timelines.get(0).getOrder() == order) {
return result;
}
// I do not want to handle hasNext
while (result.next().getOrder() != (order - 1)) {
}
return result;
} catch (final NoSuchEventTypeException | InternalNakadiException ex) {
// The reason for runtime exception is that cursors are constructed before, and probably should be working.
// Otherwise it makes no sense for this exception.
throw new IllegalStateException(ex);
}
}
}
Loading

0 comments on commit 0ca1b8f

Please sign in to comment.