Skip to content

Commit

Permalink
Merge branch 'linkedin:main' into rocksDBName
Browse files Browse the repository at this point in the history
  • Loading branch information
majisourav99 authored Feb 14, 2025
2 parents 0499ff7 + 8bc7133 commit d5264c8
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
private int controllerRequestRetryCount;

private String bootstrapFileSystemPath;
private long versionSwapDetectionIntervalTimeInMs = 600000L;
private long versionSwapDetectionIntervalTimeInSeconds = 60L;

/**
* This will be used in BootstrappingVeniceChangelogConsumer to determine when to sync updates with the underlying
Expand Down Expand Up @@ -166,12 +166,12 @@ public String getBootstrapFileSystemPath() {
return this.bootstrapFileSystemPath;
}

public long getVersionSwapDetectionIntervalTimeInMs() {
return versionSwapDetectionIntervalTimeInMs;
public long getVersionSwapDetectionIntervalTimeInSeconds() {
return versionSwapDetectionIntervalTimeInSeconds;
}

public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInMs(long intervalTimeInMs) {
this.versionSwapDetectionIntervalTimeInMs = intervalTimeInMs;
public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInSeconds(long intervalTimeInSeconds) {
this.versionSwapDetectionIntervalTimeInSeconds = intervalTimeInSeconds;
return this;
}

Expand Down Expand Up @@ -216,7 +216,7 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
.setD2Client(config.getD2Client())
.setControllerRequestRetryCount(config.getControllerRequestRetryCount())
.setBootstrapFileSystemPath(config.getBootstrapFileSystemPath())
.setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs())
.setVersionSwapDetectionIntervalTimeInSeconds(config.getVersionSwapDetectionIntervalTimeInSeconds())
.setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes())
.setConsumerName(config.consumerName)
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.linkedin.davinci.consumer;

import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.alpini.base.concurrency.ScheduledExecutorService;
import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
Expand All @@ -22,7 +20,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.logging.log4j.LogManager;
Expand All @@ -31,14 +28,11 @@

public class VeniceAfterImageConsumerImpl<K, V> extends VeniceChangelogConsumerImpl<K, V> {
private static final Logger LOGGER = LogManager.getLogger(VeniceAfterImageConsumerImpl.class);
// 10 Minute default
protected long versionSwapDetectionIntervalTimeInMs;
// This consumer is used to find EOP messages without impacting consumption by other subscriptions. It's only used
// in the context of seeking to EOP in the event of the user calling that seek or a version push.
// TODO: We shouldn't use this in the long run. Once the EOP position is queryable from venice and version
// swap is produced to VT, then we should remove this as it's no longer needed.
final private Lazy<VeniceChangelogConsumerImpl<K, V>> internalSeekConsumer;
private final ScheduledExecutorService versionSwapExecutorService = Executors.newSingleThreadScheduledExecutor();
AtomicBoolean versionSwapThreadScheduled = new AtomicBoolean(false);
private final VersionSwapDataChangeListener<K, V> versionSwapListener;

Expand All @@ -60,7 +54,6 @@ protected VeniceAfterImageConsumerImpl(
Lazy<VeniceChangelogConsumerImpl<K, V>> seekConsumer) {
super(changelogClientConfig, consumer);
internalSeekConsumer = seekConsumer;
versionSwapDetectionIntervalTimeInMs = changelogClientConfig.getVersionSwapDetectionIntervalTimeInMs();
versionSwapListener = new VersionSwapDataChangeListener<K, V>(
this,
storeRepository,
Expand All @@ -74,8 +67,6 @@ public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll
return internalPoll(timeoutInMs, "");
} catch (UnknownTopicOrPartitionException ex) {
LOGGER.error("Caught unknown Topic exception, will attempt repair and retry: ", ex);
storeRepository.refresh();
versionSwapListener.handleStoreChanged(null);
return internalPoll(timeoutInMs, "");
}
}
Expand All @@ -101,11 +92,6 @@ public CompletableFuture<Void> subscribe(Set<Integer> partitions) {
if (!versionSwapThreadScheduled.get()) {
// schedule the version swap thread and set up the callback listener
this.storeRepository.registerStoreDataChangedListener(versionSwapListener);
versionSwapExecutorService.scheduleAtFixedRate(
new VersionSwapDetectionThread(),
versionSwapDetectionIntervalTimeInMs,
versionSwapDetectionIntervalTimeInMs,
TimeUnit.MILLISECONDS);
versionSwapThreadScheduled.set(true);
}
return super.subscribe(partitions);
Expand Down Expand Up @@ -206,15 +192,6 @@ protected CompletableFuture<Void> internalSeek(
return super.internalSeek(partitions, targetTopic, seekAction);
}

private class VersionSwapDetectionThread implements Runnable {
@Override
public void run() {
// the purpose of this thread is to just keep polling just in case something goes wrong at time of the store
// repository change.
versionSwapListener.handleStoreChanged(null);
}
}

@Override
public void setStoreRepository(NativeMetadataRepositoryViewAdapter repository) {
super.setStoreRepository(repository);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.consumer;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_POS;

Expand Down Expand Up @@ -66,6 +67,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -173,9 +175,13 @@ public VeniceChangelogConsumerImpl(
this.startTimestamp = System.currentTimeMillis();
LOGGER.info("VeniceChangelogConsumer created at timestamp: {}", startTimestamp);

Properties properties = new Properties();
properties.put(
CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS,
String.valueOf(changelogClientConfig.getVersionSwapDetectionIntervalTimeInSeconds()));
ThinClientMetaStoreBasedRepository repository = new ThinClientMetaStoreBasedRepository(
changelogClientConfig.getInnerClientConfig(),
VeniceProperties.empty(),
new VeniceProperties(properties),
null);
repository.start();
this.storeRepository = new NativeMetadataRepositoryViewAdapter(repository);
Expand Down Expand Up @@ -229,7 +235,6 @@ protected CompletableFuture<Void> internalSubscribe(Set<Integer> partitions, Pub
}
}
}
storeRepository.refresh();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -317,7 +322,6 @@ public CompletableFuture<Void> seekToBeginningOfPush() {
@Override
public CompletableFuture<Void> seekToEndOfPush(Set<Integer> partitions) {
// Get the latest change capture topic
storeRepository.refresh();
Store store = getStore();
int currentVersion = store.getCurrentVersion();
PubSubTopic topic = pubSubTopicRepository
Expand Down Expand Up @@ -430,10 +434,9 @@ public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> chec
}

void checkLiveVersion(String topicName) {
storeRepository.refresh();
Store store = storeRepository.getStore(storeName);
try {
store.getVersionOrThrow(Version.parseVersionFromVersionTopicName(topicName));
store.getVersionOrThrow(Version.parseVersionFromKafkaTopicName(topicName));
} catch (StoreVersionNotFoundException ex) {
throw new VeniceCoordinateOutOfRangeException("Checkpoint is off retention! Version has been deprecated...", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE
changelogClientConfig,
mockPubSubConsumer,
Lazy.of(() -> mockInternalSeekConsumer));
veniceChangelogConsumer.versionSwapDetectionIntervalTimeInMs = 1;
NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Expand Down
Loading

0 comments on commit d5264c8

Please sign in to comment.