From 8bc713349ef959adebd19e0e51a769275927843e Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Thu, 13 Feb 2025 23:01:31 -0800 Subject: [PATCH] [changelog] Fix metadata refresh deadlock scenario (#1536) * [changelog] Fix metadata refresh deadlock scenario There exists a deadlocking race condition when a version swap happens. The version swap data change listener locks itself and then proceeds with the version swap on the consumer. However, other function calls would also call refresh() on the metadata repository. This in turn would trigger more callbacks on the data change listener. In some scenarios, this can cause deadlocking. This change makes sure that we no longer leverage manual refreshes and instead let the whole process get driven by the scheduled metadata refresh. This also makes the refresh time configurable, and seems to speed up tests (yay!) as there were frequent retries on consuming events as we had to wait for the version swap to happen internally before we could consume the events we expected. --- .../consumer/ChangelogClientConfig.java | 12 +- .../VeniceAfterImageConsumerImpl.java | 23 -- .../consumer/VeniceChangelogConsumerImpl.java | 13 +- .../VeniceChangelogConsumerImplTest.java | 1 - .../consumer/TestChangelogConsumer.java | 251 +++++++++++------- 5 files changed, 175 insertions(+), 125 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java index 63345892dcc..aa76e91cc0b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java @@ -24,7 +24,7 @@ public class ChangelogClientConfig { private int controllerRequestRetryCount; private String bootstrapFileSystemPath; - private long versionSwapDetectionIntervalTimeInMs = 600000L; + private long versionSwapDetectionIntervalTimeInSeconds = 60L; /** * This will be used in BootstrappingVeniceChangelogConsumer to determine when to sync updates with the underlying @@ -166,12 +166,12 @@ public String getBootstrapFileSystemPath() { return this.bootstrapFileSystemPath; } - public long getVersionSwapDetectionIntervalTimeInMs() { - return versionSwapDetectionIntervalTimeInMs; + public long getVersionSwapDetectionIntervalTimeInSeconds() { + return versionSwapDetectionIntervalTimeInSeconds; } - public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInMs(long intervalTimeInMs) { - this.versionSwapDetectionIntervalTimeInMs = intervalTimeInMs; + public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInSeconds(long intervalTimeInSeconds) { + this.versionSwapDetectionIntervalTimeInSeconds = intervalTimeInSeconds; return this; } @@ -216,7 +216,7 @@ public static ChangelogClientConfig cloneConfig(Ch .setD2Client(config.getD2Client()) .setControllerRequestRetryCount(config.getControllerRequestRetryCount()) .setBootstrapFileSystemPath(config.getBootstrapFileSystemPath()) - .setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs()) + .setVersionSwapDetectionIntervalTimeInSeconds(config.getVersionSwapDetectionIntervalTimeInSeconds()) .setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes()) .setConsumerName(config.consumerName) .setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval()) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java index e1df68aed26..cb9b814fda9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java @@ -1,7 +1,5 @@ package com.linkedin.davinci.consumer; -import com.linkedin.alpini.base.concurrency.Executors; -import com.linkedin.alpini.base.concurrency.ScheduledExecutorService; import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; @@ -22,7 +20,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.logging.log4j.LogManager; @@ -31,14 +28,11 @@ public class VeniceAfterImageConsumerImpl extends VeniceChangelogConsumerImpl { private static final Logger LOGGER = LogManager.getLogger(VeniceAfterImageConsumerImpl.class); - // 10 Minute default - protected long versionSwapDetectionIntervalTimeInMs; // This consumer is used to find EOP messages without impacting consumption by other subscriptions. It's only used // in the context of seeking to EOP in the event of the user calling that seek or a version push. // TODO: We shouldn't use this in the long run. Once the EOP position is queryable from venice and version // swap is produced to VT, then we should remove this as it's no longer needed. final private Lazy> internalSeekConsumer; - private final ScheduledExecutorService versionSwapExecutorService = Executors.newSingleThreadScheduledExecutor(); AtomicBoolean versionSwapThreadScheduled = new AtomicBoolean(false); private final VersionSwapDataChangeListener versionSwapListener; @@ -60,7 +54,6 @@ protected VeniceAfterImageConsumerImpl( Lazy> seekConsumer) { super(changelogClientConfig, consumer); internalSeekConsumer = seekConsumer; - versionSwapDetectionIntervalTimeInMs = changelogClientConfig.getVersionSwapDetectionIntervalTimeInMs(); versionSwapListener = new VersionSwapDataChangeListener( this, storeRepository, @@ -74,8 +67,6 @@ public Collection, VeniceChangeCoordinate>> poll return internalPoll(timeoutInMs, ""); } catch (UnknownTopicOrPartitionException ex) { LOGGER.error("Caught unknown Topic exception, will attempt repair and retry: ", ex); - storeRepository.refresh(); - versionSwapListener.handleStoreChanged(null); return internalPoll(timeoutInMs, ""); } } @@ -101,11 +92,6 @@ public CompletableFuture subscribe(Set partitions) { if (!versionSwapThreadScheduled.get()) { // schedule the version swap thread and set up the callback listener this.storeRepository.registerStoreDataChangedListener(versionSwapListener); - versionSwapExecutorService.scheduleAtFixedRate( - new VersionSwapDetectionThread(), - versionSwapDetectionIntervalTimeInMs, - versionSwapDetectionIntervalTimeInMs, - TimeUnit.MILLISECONDS); versionSwapThreadScheduled.set(true); } return super.subscribe(partitions); @@ -206,15 +192,6 @@ protected CompletableFuture internalSeek( return super.internalSeek(partitions, targetTopic, seekAction); } - private class VersionSwapDetectionThread implements Runnable { - @Override - public void run() { - // the purpose of this thread is to just keep polling just in case something goes wrong at time of the store - // repository change. - versionSwapListener.handleStoreChanged(null); - } - } - @Override public void setStoreRepository(NativeMetadataRepositoryViewAdapter repository) { super.setStoreRepository(repository); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index 462fb43df19..c5643a856cf 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -1,5 +1,6 @@ package com.linkedin.davinci.consumer; +import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT; import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_POS; @@ -66,6 +67,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -173,9 +175,13 @@ public VeniceChangelogConsumerImpl( this.startTimestamp = System.currentTimeMillis(); LOGGER.info("VeniceChangelogConsumer created at timestamp: {}", startTimestamp); + Properties properties = new Properties(); + properties.put( + CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, + String.valueOf(changelogClientConfig.getVersionSwapDetectionIntervalTimeInSeconds())); ThinClientMetaStoreBasedRepository repository = new ThinClientMetaStoreBasedRepository( changelogClientConfig.getInnerClientConfig(), - VeniceProperties.empty(), + new VeniceProperties(properties), null); repository.start(); this.storeRepository = new NativeMetadataRepositoryViewAdapter(repository); @@ -229,7 +235,6 @@ protected CompletableFuture internalSubscribe(Set partitions, Pub } } } - storeRepository.refresh(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -317,7 +322,6 @@ public CompletableFuture seekToBeginningOfPush() { @Override public CompletableFuture seekToEndOfPush(Set partitions) { // Get the latest change capture topic - storeRepository.refresh(); Store store = getStore(); int currentVersion = store.getCurrentVersion(); PubSubTopic topic = pubSubTopicRepository @@ -430,10 +434,9 @@ public CompletableFuture seekToCheckpoint(Set chec } void checkLiveVersion(String topicName) { - storeRepository.refresh(); Store store = storeRepository.getStore(storeName); try { - store.getVersionOrThrow(Version.parseVersionFromVersionTopicName(topicName)); + store.getVersionOrThrow(Version.parseVersionFromKafkaTopicName(topicName)); } catch (StoreVersionNotFoundException ex) { throw new VeniceCoordinateOutOfRangeException("Checkpoint is off retention! Version has been deprecated...", ex); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 752571c1c6c..30e46dc47cc 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -219,7 +219,6 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE changelogClientConfig, mockPubSubConsumer, Lazy.of(() -> mockInternalSeekConsumer)); - veniceChangelogConsumer.versionSwapDetectionIntervalTimeInMs = 1; NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); Store store = mock(Store.class); Version mockVersion = new VersionImpl(storeName, 1, "foo"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java index 1d9aa9b0735..7c61ec6756e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java @@ -1,12 +1,7 @@ package com.linkedin.venice.consumer; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; -import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX; -import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; -import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; -import static com.linkedin.venice.ConfigKeys.KAFKA_LINGER_MS; -import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED; -import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; +import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.D2_SERVICE_NAME; import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; @@ -175,6 +170,109 @@ public void cleanUp() { TestView.resetCounters(); } + // This is a beefier test, so giving it a bit more time + @Test(timeOut = TEST_TIMEOUT * 3, priority = 3) + public void testVersionSwapInALoop() throws Exception { + // create a active-active enabled store and run batch push job + // batch job contains 100 records + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("store"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + props.put(KAFKA_LINGER_MS, 0); + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true) + .setHybridRewindSeconds(500) + .setHybridOffsetLagThreshold(8) + .setChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setPartitionCount(3); + MetricsRepository metricsRepository = new MetricsRepository(); + ControllerClient setupControllerClient = + createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms); + + // This is a dumb check that we're doing just to make static analysis happy + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> Assert.assertEquals(setupControllerClient.getStore(storeName).getStore().getCurrentVersion(), 0)); + + ControllerClient controllerClient = + new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString()); + + // Write Records to the store for version v1, the push job will contain 100 records. + TestWriteUtils.runPushJob("Run push job", props); + + // Write Records from nearline + Map samzaConfig = getSamzaProducerConfig(childDatacenters, 0, storeName); + VeniceSystemFactory factory = new VeniceSystemFactory(); + // Use a unique key for DELETE with RMD validation + int deleteWithRmdKeyIndex = 1000; + + ZkServerWrapper localZkServer = multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper(); + PubSubBrokerWrapper localKafka = multiRegionMultiClusterWrapper.getChildRegions().get(0).getPubSubBrokerWrapper(); + Properties consumerProperties = new Properties(); + String localKafkaUrl = localKafka.getAddress(); + consumerProperties.put(KAFKA_BOOTSTRAP_SERVERS, localKafkaUrl); + ChangelogClientConfig globalAfterImageClientConfig = + new ChangelogClientConfig().setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localZkServer.getAddress()) + .setControllerRequestRetryCount(3) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) + .setIsBeforeImageView(true); + + VeniceChangelogConsumerClientFactory veniceAfterImageConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(globalAfterImageClientConfig, metricsRepository); + + VeniceChangelogConsumer versionTopicConsumer = + veniceAfterImageConsumerClientFactory.getChangelogConsumer(storeName); + Assert.assertTrue(versionTopicConsumer instanceof VeniceAfterImageConsumerImpl); + versionTopicConsumer.subscribeAll().get(); + + Map versionTopicEvents = new HashMap<>(); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 100); + }); + + versionTopicEvents.clear(); + + // in a loop, write a push job, then write some stream data, then poll data with versionTopicConsumer + for (int i = 0; i < 10; i++) { + try (VeniceSystemProducer veniceProducer = + factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { + veniceProducer.start(); + // Run Samza job to send PUT and DELETE requests. + runSamzaStreamJob(veniceProducer, storeName, null, 10, 10, 100); + // Produce a DELETE record with large timestamp + sendStreamingRecordWithLogicalTimestamp(veniceProducer, storeName, deleteWithRmdKeyIndex, 1000, true); + } + + // run push job + TestWriteUtils.runPushJob("Run push job", props); + + // Assert the push has gone through + int expectedVersion = i + 2; + TestUtils.waitForNonDeterministicAssertion( + 5, + TimeUnit.SECONDS, + () -> Assert + .assertEquals(controllerClient.getStore(storeName).getStore().getCurrentVersion(), expectedVersion)); + + // poll data from version topic + TestUtils.waitForNonDeterministicAssertion(100, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 21); + }); + versionTopicEvents.clear(); + } + } + @Test(timeOut = TEST_TIMEOUT, priority = 3) public void testAAIngestionWithStoreView() throws Exception { // Set up the store @@ -273,6 +371,7 @@ public void testAAIngestionWithStoreView() throws Exception { .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setLocalD2ZkHosts(localZkServer.getAddress()) .setControllerRequestRetryCount(3) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) .setIsBeforeImageView(true); VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); @@ -292,6 +391,7 @@ public void testAAIngestionWithStoreView() throws Exception { .setControllerD2ServiceName(D2_SERVICE_NAME) .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setLocalD2ZkHosts(localZkServer.getAddress()) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) .setControllerRequestRetryCount(3); VeniceChangelogConsumerClientFactory veniceViewChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(viewChangeLogClientConfig, metricsRepository); @@ -373,6 +473,15 @@ public void testAAIngestionWithStoreView() throws Exception { Assert.assertEquals(versionTopicEvents.size(), 21); }); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 21); + }); + + allChangeEvents.putAll(polledChangeEvents); + versionTopicEvents.clear(); + polledChangeEvents.clear(); + /** * Now we have store version v2. */ @@ -419,6 +528,21 @@ public void testAAIngestionWithStoreView() throws Exception { } }); } + + // we shouldn't pull anything on this version if filtering is working correctly + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 0); + }); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + // poll a few times in a row to make sure version jump happens + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + Assert.assertEquals(polledChangeEvents.size(), 0); + }); + try ( VeniceSystemProducer veniceProducer = factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { veniceProducer.start(); @@ -440,25 +564,6 @@ public void testAAIngestionWithStoreView() throws Exception { }); } - // Validate changed events for version 2. - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); - // As records keys from VPJ start from 1, real-time produced records' key starts from 0, the message with key as 0 - // is new message. - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { - // poll enough to get through the empty push and the topic jump to RT. - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - String deleteWithRmdKey = Integer.toString(deleteWithRmdKeyIndex); - String persistWithRmdKey = Integer.toString(deleteWithRmdKeyIndex + 1); - Assert.assertNull(polledChangeEvents.get(deleteWithRmdKey)); - Assert.assertNotNull(polledChangeEvents.get(persistWithRmdKey)); - Assert.assertEquals( - polledChangeEvents.get(persistWithRmdKey).getValue().getCurrentValue().toString(), - "stream_" + persistWithRmdKey); - }); - /** * Now we have store version v3. */ @@ -474,61 +579,38 @@ public void testAAIngestionWithStoreView() throws Exception { Instant past = now.minus(1, ChronoUnit.HOURS); mockTimestampInMs.add(past.toEpochMilli()); Time mockTime = new MockCircularTime(mockTimestampInMs); - try ( - VeniceSystemProducer veniceProducer = factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { - veniceProducer.start(); - // run samza to stream put and delete - runSamzaStreamJob(veniceProducer, storeName, mockTime, 10, 10, 20); - } - // Validate changed events for version 3. - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - // Filter previous 21 messages. - Assert.assertEquals(polledChangeEvents.size(), 1); + // We should only poll 1 record as we produced 1 that would have been applied in LWW + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + Assert.assertEquals(versionTopicEvents.size(), 1); }); - // Drain the remaining events on version 3 and verify that we got everything. We don't verify the count - // because at this stage, the total events which will get polled will be determined by how far back the rewind - // managed to get (and test run duration might be variable) - TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + // poll a few times in a row to make sure version jump happens pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - for (int i = 20; i < 40; i++) { - String key = Integer.toString(i); - ChangeEvent changeEvent = polledChangeEvents.get(key).getValue(); - Assert.assertNotNull(changeEvent); - Assert.assertNull(changeEvent.getPreviousValue()); - if (i < 30) { - Assert.assertEquals(changeEvent.getCurrentValue().toString(), "stream_" + i); - } else { - Assert.assertNull(changeEvent.getCurrentValue()); - } - } + Assert.assertEquals(polledChangeEvents.size(), 1); }); - allChangeEvents.putAll(polledChangeEvents); - polledChangeEvents.clear(); - - // This should get everything submitted to the CC topic on this version since the timestamp is before anything got - // transmitted - veniceChangelogConsumer.seekToTimestamp(timestamp); + // Write 20 records + try ( + VeniceSystemProducer veniceProducer = factory.getClosableProducer("venice", new MapConfig(samzaConfig), null)) { + veniceProducer.start(); + // run samza to stream put and delete + runSamzaStreamJob(veniceProducer, storeName, mockTime, 10, 10, 20); + } // test pause and resume veniceChangelogConsumer.pause(); + polledChangeEvents.clear(); TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); Assert.assertEquals(polledChangeEvents.size(), 0); }); veniceChangelogConsumer.resume(); - // This should get everything submitted to the CC topic on this version since the timestamp is before anything got - // transmitted - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 42); - }); allChangeEvents.putAll(polledChangeEvents); polledChangeEvents.clear(); @@ -572,17 +654,10 @@ public void testAAIngestionWithStoreView() throws Exception { } } - // Since nothing is produced, so no changed events generated. + polledChangeEvents.clear(); TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, true, () -> { pollChangeEventsFromChangeCaptureConsumer2(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 0); - }); - - // Seek to the beginning of the push - veniceChangelogConsumer.seekToBeginningOfPush().join(); - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 10); + Assert.assertEquals(polledChangeEvents.size(), 20); }); // Save a checkpoint and clear the map @@ -603,27 +678,20 @@ public void testAAIngestionWithStoreView() throws Exception { veniceChangelogConsumer.seekToCheckpoint(checkpointSet).join(); allChangeEvents.putAll(polledChangeEvents); polledChangeEvents.clear(); - // Poll Change events again, verify we get everything TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); + pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); // Repush with TTL will include delete events in the topic - Assert.assertEquals(polledChangeEvents.size(), 5); + Assert.assertEquals(polledChangeEvents.size(), 7); }); allChangeEvents.putAll(polledChangeEvents); polledChangeEvents.clear(); - Assert.assertEquals(allChangeEvents.size(), 121); + Assert.assertEquals(allChangeEvents.size(), 120); - // Seek the consumer to the beginning of push (since the latest is version 4 with no nearline writes, shouldn't - // have any new writes) - // veniceAfterImageConsumer.seekToEndOfPush().join(); - TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { - pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 0); - }); - - // Also should be nothing on the tail - // veniceAfterImageConsumer.seekToTail().join(); + // Should be nothing on the tail + veniceChangelogConsumer.seekToTail().join(); TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); Assert.assertEquals(polledChangeEvents.size(), 0); @@ -641,22 +709,23 @@ public void testAAIngestionWithStoreView() throws Exception { pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); // At this point, the consumer should have auto tracked to version 4, and since we didn't apply any nearline // writes to version 4, there should be no events to consume at this point - Assert.assertEquals(versionTopicEvents.size(), 0); + Assert.assertEquals(versionTopicEvents.size(), 42); }); + // The current topic had nothing in the push, but 42 messages got played on top versionTopicConsumer.seekToEndOfPush().get(); TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); + pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); // Again, no events to consume here. - Assert.assertEquals(versionTopicEvents.size(), 0); + Assert.assertEquals(versionTopicEvents.size(), 42); }); versionTopicConsumer.seekToBeginningOfPush().get(); TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); - // Reconsuming the events from the version topic, which at this point should just contain the same 16 - // events we consumed with the before/after image consumer earlier. - Assert.assertEquals(versionTopicEvents.size(), 10); + Assert.assertEquals(versionTopicEvents.size(), 42); }); // Verify version swap count matches with version count - 1 (since we don't transmit from version 0 to version 1). @@ -723,6 +792,7 @@ public void testSpecificRecordBootstrappingVeniceChangelogConsumer() throws Exce .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setLocalD2ZkHosts(localZkServer.getAddress()) .setControllerRequestRetryCount(3) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) .setSpecificValue(TestChangelogValue.class) .setBootstrapFileSystemPath(Utils.getUniqueString(inputDirPath)); VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = @@ -835,6 +905,7 @@ public void testSpecificRecordVeniceChangelogConsumer() throws Exception { .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setLocalD2ZkHosts(localZkServer.getAddress()) .setControllerRequestRetryCount(3) + .setVersionSwapDetectionIntervalTimeInSeconds(3L) .setSpecificValue(TestChangelogValue.class) .setBootstrapFileSystemPath(Utils.getUniqueString(inputDirPath)); VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =