Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourav Maji committed Mar 1, 2025
1 parent 2e327a5 commit 04e8824
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED = MINUTES.toMillis(1); // 1 min

static final int MAX_CONSUMER_ACTION_ATTEMPTS = 5;

static final int MAX_OFFSET_FETCH_ATTEMPTS = 10;

private static final int CONSUMER_ACTION_QUEUE_INIT_CAPACITY = 11;
protected static final long KILL_WAIT_TIME_MS = 5000L;
private static final int MAX_KILL_CHECKING_ATTEMPTS = 10;
Expand Down Expand Up @@ -2407,12 +2410,18 @@ protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTop
throw new VeniceException("Latest offset is unknown. Check if the topic: " + topicPartition + " exists.");
}
return offset;
}, 10, Duration.ofMillis(10), Duration.ofMillis(500), Duration.ofSeconds(60), RETRY_FAILURE_TYPES);
},
MAX_OFFSET_FETCH_ATTEMPTS,
Duration.ofMillis(10),
Duration.ofMillis(500),
Duration.ofSeconds(60),
RETRY_FAILURE_TYPES);
} catch (Exception e) {
LOGGER.error(
"Failed to get end offset for topic-partition: {} with kafka url {} even after 10 retries",
"Failed to get end offset for topic-partition: {} with kafka url {} even after {} retries",
topicPartition,
kafkaUrl,
MAX_OFFSET_FETCH_ATTEMPTS,
e);
return StatsErrorCode.LAG_MEASUREMENT_FAILURE.code;
}
Expand Down

0 comments on commit 04e8824

Please sign in to comment.