From 04e8824160664d90e025abfc76fb052dfcd14d86 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Fri, 28 Feb 2025 16:14:29 -0800 Subject: [PATCH] addressed comments --- .../davinci/kafka/consumer/StoreIngestionTask.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index ce38ef57ba..7cc69f2eb8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -184,6 +184,9 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED = MINUTES.toMillis(1); // 1 min static final int MAX_CONSUMER_ACTION_ATTEMPTS = 5; + + static final int MAX_OFFSET_FETCH_ATTEMPTS = 10; + private static final int CONSUMER_ACTION_QUEUE_INIT_CAPACITY = 11; protected static final long KILL_WAIT_TIME_MS = 5000L; private static final int MAX_KILL_CHECKING_ATTEMPTS = 10; @@ -2407,12 +2410,18 @@ protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTop throw new VeniceException("Latest offset is unknown. Check if the topic: " + topicPartition + " exists."); } return offset; - }, 10, Duration.ofMillis(10), Duration.ofMillis(500), Duration.ofSeconds(60), RETRY_FAILURE_TYPES); + }, + MAX_OFFSET_FETCH_ATTEMPTS, + Duration.ofMillis(10), + Duration.ofMillis(500), + Duration.ofSeconds(60), + RETRY_FAILURE_TYPES); } catch (Exception e) { LOGGER.error( - "Failed to get end offset for topic-partition: {} with kafka url {} even after 10 retries", + "Failed to get end offset for topic-partition: {} with kafka url {} even after {} retries", topicPartition, kafkaUrl, + MAX_OFFSET_FETCH_ATTEMPTS, e); return StatsErrorCode.LAG_MEASUREMENT_FAILURE.code; }