From c8d0954637d6a81bb568a4e915701c09a2c8a3be Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Nov 2024 14:27:38 +0530 Subject: [PATCH 1/4] Reduce metadata IO during segment allocation --- .../actions/SegmentAllocationQueue.java | 20 ++- .../druid/indexing/overlord/TaskLockbox.java | 36 ++-- .../overlord/config/TaskLockConfig.java | 9 + .../actions/SegmentAllocateActionTest.java | 19 +- .../actions/SegmentAllocationQueueTest.java | 6 + .../common/actions/TaskActionTestKit.java | 13 ++ ...TestIndexerMetadataStorageCoordinator.java | 18 +- .../IndexerMetadataStorageCoordinator.java | 18 +- .../IndexerSQLMetadataStorageCoordinator.java | 105 ++++++++++- .../metadata/SqlSegmentsMetadataQuery.java | 66 +++++++ ...exerSQLMetadataStorageCoordinatorTest.java | 163 +++++++++++++++++- 11 files changed, 432 insertions(+), 41 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 98ab50cff788..334af961b256 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -25,7 +25,6 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.java.util.common.ISE; @@ -41,6 +40,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; import org.joda.time.Interval; import java.util.ArrayList; @@ -87,6 +87,8 @@ public class SegmentAllocationQueue private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + private final boolean skipSegmentPayloadFetchForAllocation; + @Inject public SegmentAllocationQueue( TaskLockbox taskLockbox, @@ -100,6 +102,7 @@ public SegmentAllocationQueue( this.taskLockbox = taskLockbox; this.metadataStorage = metadataStorage; this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime(); + this.skipSegmentPayloadFetchForAllocation = taskLockConfig.isBatchAllocationReduceMetadataIO(); this.executor = taskLockConfig.isBatchSegmentAllocation() ? executorFactory.create(1, "SegmentAllocQueue-%s") : null; @@ -380,13 +383,11 @@ private boolean processBatch(AllocateRequestBatch requestBatch) private Set retrieveUsedSegments(AllocateRequestKey key) { - return new HashSet<>( - metadataStorage.retrieveUsedSegmentsForInterval( - key.dataSource, - key.preferredAllocationInterval, - Segments.ONLY_VISIBLE - ) - ); + return metadataStorage.getSegmentTimelineForAllocation( + key.dataSource, + key.preferredAllocationInterval, + (key.lockGranularity == LockGranularity.TIME_CHUNK) && skipSegmentPayloadFetchForAllocation + ).findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); } private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set usedSegments) @@ -493,7 +494,8 @@ private int allocateSegmentsForInterval( requestKey.dataSource, tryInterval, requestKey.skipSegmentLineageCheck, - requestKey.lockGranularity + requestKey.lockGranularity, + skipSegmentPayloadFetchForAllocation ); int successfulRequests = 0; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index bebb52157d6f..f38666859abd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -460,12 +460,14 @@ public LockResult tryLock(final Task task, final LockRequest request) * successfully and others failed. In that case, only the failed ones should be * retried. * - * @param requests List of allocation requests - * @param dataSource Datasource for which segment is to be allocated. - * @param interval Interval for which segment is to be allocated. - * @param skipSegmentLineageCheck Whether lineage check is to be skipped - * (this is true for streaming ingestion) - * @param lockGranularity Granularity of task lock + * @param requests List of allocation requests + * @param dataSource Datasource for which segment is to be allocated. + * @param interval Interval for which segment is to be allocated. + * @param skipSegmentLineageCheck Whether lineage check is to be skipped + * (this is true for streaming ingestion) + * @param lockGranularity Granularity of task lock + * @param skipSegmentPayloadFetchForAllocation Whether to skip fetching payloads for all used + * segments and rely on their ids instead. * @return List of allocation results in the same order as the requests. */ public List allocateSegments( @@ -473,7 +475,8 @@ public List allocateSegments( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - LockGranularity lockGranularity + LockGranularity lockGranularity, + boolean skipSegmentPayloadFetchForAllocation ) { log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval); @@ -487,9 +490,15 @@ public List allocateSegments( if (isTimeChunkLock) { // For time-chunk locking, segment must be allocated only after acquiring the lock holderList.getPending().forEach(holder -> acquireTaskLock(holder, true)); - allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + allocateSegmentIds( + dataSource, + interval, + skipSegmentLineageCheck, + holderList.getPending(), + skipSegmentPayloadFetchForAllocation + ); } else { - allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending(), false); holderList.getPending().forEach(holder -> acquireTaskLock(holder, false)); } holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded); @@ -702,12 +711,12 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest request) * for the given requests. Updates the holder with the allocated segment if * the allocation succeeds, otherwise marks it as failed. */ - @VisibleForTesting - void allocateSegmentIds( + private void allocateSegmentIds( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - Collection holders + Collection holders, + boolean skipSegmentPayloadFetchForAllocation ) { if (holders.isEmpty()) { @@ -724,7 +733,8 @@ void allocateSegmentIds( dataSource, interval, skipSegmentLineageCheck, - createRequests + createRequests, + skipSegmentPayloadFetchForAllocation ); for (SegmentAllocationHolder holder : holders) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java index 2634c4328fec..1f863133d65f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java @@ -36,6 +36,9 @@ public class TaskLockConfig @JsonProperty private long batchAllocationWaitTime = 0L; + @JsonProperty + private boolean batchAllocationReduceMetadataIO = false; + public boolean isForceTimeChunkLock() { return forceTimeChunkLock; @@ -50,4 +53,10 @@ public long getBatchAllocationWaitTime() { return batchAllocationWaitTime; } + + public boolean isBatchAllocationReduceMetadataIO() + { + return batchAllocationReduceMetadataIO; + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index fdb7fcd5595b..3c0b08758f76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -93,21 +93,28 @@ public class SegmentAllocateActionTest private SegmentAllocationQueue allocationQueue; - @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}") + @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}, skipSegmentPayloadFetchForAllocation = {2}") public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.SEGMENT, true}, - new Object[]{LockGranularity.SEGMENT, false}, - new Object[]{LockGranularity.TIME_CHUNK, true}, - new Object[]{LockGranularity.TIME_CHUNK, false} + new Object[]{LockGranularity.SEGMENT, true, true}, + new Object[]{LockGranularity.SEGMENT, true, false}, + new Object[]{LockGranularity.SEGMENT, false, false}, + new Object[]{LockGranularity.TIME_CHUNK, true, true}, + new Object[]{LockGranularity.TIME_CHUNK, true, false}, + new Object[]{LockGranularity.TIME_CHUNK, false, false} ); } - public SegmentAllocateActionTest(LockGranularity lockGranularity, boolean useBatch) + public SegmentAllocateActionTest( + LockGranularity lockGranularity, + boolean useBatch, + boolean skipSegmentPayloadFetchForAllocation + ) { this.lockGranularity = lockGranularity; this.useBatch = useBatch; + this.taskActionTestKit.setSkipSegmentPayloadFetchForAllocation(skipSegmentPayloadFetchForAllocation); } @Before diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index 6a5c84082f2c..f91341ec496c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -73,6 +73,12 @@ public long getBatchAllocationWaitTime() { return 0; } + + @Override + public boolean isBatchAllocationReduceMetadataIO() + { + return true; + } }; allocationQueue = new SegmentAllocationQueue( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index fcbf37c956da..208fec01fe45 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -58,6 +58,8 @@ public class TaskActionTestKit extends ExternalResource private SegmentSchemaManager segmentSchemaManager; private SegmentSchemaCache segmentSchemaCache; + private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO(); + public TaskLockbox getTaskLockbox() { return taskLockbox; @@ -78,6 +80,11 @@ public TaskActionToolbox getTaskActionToolbox() return taskActionToolbox; } + public void setSkipSegmentPayloadFetchForAllocation(boolean skipSegmentPayloadFetchForAllocation) + { + this.skipSegmentPayloadFetchForAllocation = skipSegmentPayloadFetchForAllocation; + } + @Override public void before() { @@ -126,6 +133,12 @@ public long getBatchAllocationWaitTime() { return 10L; } + + @Override + public boolean isBatchAllocationReduceMetadataIO() + { + return skipSegmentPayloadFetchForAllocation; + } }; taskActionToolbox = new TaskActionToolbox( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d2055d6e0c99..54e323581c47 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -168,7 +169,8 @@ public Map allocatePendingSegments String dataSource, Interval interval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean isTimeChunk ) { return Collections.emptyMap(); @@ -332,6 +334,20 @@ public Map> retrieveUpgradedToSegmentIds( return Collections.emptyMap(); } + @Override + public SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation + ) + { + return SegmentTimeline.forSegments(retrieveUsedSegmentsForIntervals( + dataSource, + Collections.singletonList(interval), + Segments.INCLUDING_OVERSHADOWED + )); + } + public Set getPublished() { return ImmutableSet.copyOf(published); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 83b4ac7e474c..867b1f618c97 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -25,6 +25,7 @@ import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -179,6 +180,8 @@ List retrieveUnusedSegmentsForInterval( * Should be set to false if replica tasks would index events in same order * @param requests Requests for which to allocate segments. All * the requests must share the same partition space. + * @param skipSegmentPayloadFetch If true, try to use the segment ids instead of fetching every segment + * payload from the metadata store * @return Map from request to allocated segment id. The map does not contain * entries for failed requests. */ @@ -186,7 +189,20 @@ Map allocatePendingSegments( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean skipSegmentPayloadFetch + ); + + /** + * Return a segment timeline of all used segments including overshadowed ones for a given datasource and interval + * if skipSegmentPayloadFetchForAllocation is set to true, do not fetch all the segment payloads for allocation + * Instead fetch all the ids and numCorePartitions using exactly one segment per version per interval + * return a dummy DataSegment for each id that holds only the SegmentId and a NumberedShardSpec with numCorePartitions + */ + SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation ); /** diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a512f7935740..bf1bfdae0830 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -719,7 +719,8 @@ public Map allocatePendingSegments String dataSource, Interval allocateInterval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean skipSegmentPayloadFetch ) { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -727,7 +728,14 @@ public Map allocatePendingSegments final Interval interval = allocateInterval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( - handle -> allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests) + handle -> allocatePendingSegments( + handle, + dataSource, + interval, + skipSegmentLineageCheck, + requests, + skipSegmentPayloadFetch + ) ); } @@ -1003,18 +1011,39 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( return newIdentifier; } + @Override + public SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation + ) + { + return connector.retryWithHandle( + handle -> { + if (skipSegmentPayloadFetchForAllocation) { + return SegmentTimeline.forSegments(retrieveUsedSegmentsForAllocation(handle, dataSource, interval)); + } else { + return getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)); + } + } + ); + } + private Map allocatePendingSegments( final Handle handle, final String dataSource, final Interval interval, final boolean skipSegmentLineageCheck, - final List requests + final List requests, + final boolean skipSegmentPayloadFetch ) throws IOException { // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = - getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) - .lookup(interval); + final List> existingChunks = getSegmentTimelineForAllocation( + dataSource, + interval, + skipSegmentPayloadFetch + ).lookup(interval); if (existingChunks.size() > 1) { log.warn( "Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", @@ -2900,6 +2929,70 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set retrieveUsedSegmentsForAllocation( + final Handle handle, + final String dataSource, + final Interval interval + ) + { + final Set overlappingSegmentIds = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUsedSegmentIds( + dataSource, + interval + ); + // Map from version -> interval -> segmentId with the smallest partitionNum + Map> versionIntervalToSmallestSegmentId = new HashMap<>(); + for (SegmentId segmentId : overlappingSegmentIds) { + final Map map + = versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v -> new HashMap<>()); + final SegmentId value = map.get(segmentId.getInterval()); + if (value == null || value.getPartitionNum() > segmentId.getPartitionNum()) { + map.put(interval, segmentId); + } + } + + // Retrieve the segments for the ids stored in the map to get the numCorePartitions + final Set segmentIdsToRetrieve = new HashSet<>(); + for (Map itvlMap : versionIntervalToSmallestSegmentId.values()) { + segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList())); + } + final Set dataSegments = retrieveSegmentsById(dataSource, segmentIdsToRetrieve); + final Set retrievedIds = new HashSet<>(); + final Map> versionIntervalToNumCorePartitions = new HashMap<>(); + for (DataSegment segment : dataSegments) { + versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(), v -> new HashMap<>()) + .put(segment.getInterval(), segment.getShardSpec().getNumCorePartitions()); + retrievedIds.add(segment.getId().toString()); + } + if (!retrievedIds.equals(segmentIdsToRetrieve)) { + throw DruidException.defensive( + "Cannot create DataSegments for segment allocations." + + "The used segments may have changed for dataSource[%s] and interval[%s].", + dataSource, interval + ); + } + + // Populate the dummy segment info + Set segmentsWithAllocationInfo = new HashSet<>(); + for (SegmentId id : overlappingSegmentIds) { + final int corePartitions = versionIntervalToNumCorePartitions.get(id.getVersion()).get(id.getInterval()); + segmentsWithAllocationInfo.add( + new DataSegment( + id, + null, + null, + null, + new NumberedShardSpec(id.getPartitionNum(), corePartitions), + null, + null, + 1 + ) + ); + } + return segmentsWithAllocationInfo; + } + @Override public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index fc1c84a70371..bd53026a5aee 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.UnmodifiableIterator; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -35,6 +36,7 @@ import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; @@ -48,6 +50,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -242,6 +245,69 @@ public CloseableIterator retrieveUnusedSegmentsPlus( ); } + public Set retrieveUsedSegmentIds( + final String dataSource, + final Interval interval + ) + { + return retrieveSegmentIds(dataSource, Collections.singletonList(interval)); + } + + private Set retrieveSegmentIds( + final String dataSource, + final Collection intervals + ) + { + if (CollectionUtils.isNullOrEmpty(intervals)) { + return Collections.emptySet(); + } + + // Check if the intervals all support comparing as strings. If so, bake them into the SQL. + final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings); + + final StringBuilder sb = new StringBuilder(); + sb.append("SELECT id FROM %s WHERE used = :used AND dataSource = :dataSource"); + + if (compareAsString) { + sb.append( + getConditionForIntervalsAndMatchMode(intervals, IntervalMode.OVERLAPS, connector.getQuoteString()) + ); + } + + return connector.inReadOnlyTransaction( + (handle, status) -> { + final Query> sql = handle + .createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable())) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("used", true) + .bind("dataSource", dataSource); + + if (compareAsString) { + bindIntervalsToQuery(sql, intervals); + } + + final Set segmentIds = new HashSet<>(); + try (final ResultIterator iterator = sql.map((index, r, ctx) -> r.getString(1)).iterator()) { + while (iterator.hasNext()) { + final String id = iterator.next(); + final SegmentId segmentId = SegmentId.tryParse(dataSource, id); + if (segmentId == null) { + throw DruidException.defensive( + "Failed to parse SegmentId for id[%s] and dataSource[%s].", + id, dataSource + ); + } + for (Interval interval : intervals) { + if (IntervalMode.OVERLAPS.apply(interval, segmentId.getInterval())) { + segmentIds.add(segmentId); + } + } + } + } + return segmentIds; + }); + } + public List retrieveSegmentsById( String datasource, Set segmentIds diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 4b592e5f40da..0377faac5fbf 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2669,6 +2669,76 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() } + + @Test + public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() + { + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final String dataSource = "ds"; + final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final String sequenceName = "seq"; + + final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request), + true + ).get(request); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); + + final SegmentCreateRequest request1 = + new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request1), + true + ).get(request1); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); + + final SegmentCreateRequest request2 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request2), + true + ).get(request2); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); + + final SegmentCreateRequest request3 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request3), + true + ).get(request3); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString()); + Assert.assertEquals(segmentId2, segmentId3); + + final SegmentCreateRequest request4 = + new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request4), + true + ).get(request4); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString()); + } + @Test public void testAllocatePendingSegments() { @@ -2682,7 +2752,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request) + Collections.singletonList(request), + false ).get(request); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); @@ -2693,7 +2764,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request1) + Collections.singletonList(request1), + false ).get(request1); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); @@ -2704,7 +2776,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request2) + Collections.singletonList(request2), + false ).get(request2); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); @@ -2715,7 +2788,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request3) + Collections.singletonList(request3), + false ).get(request3); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString()); @@ -2727,7 +2801,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request4) + Collections.singletonList(request4), + false ).get(request4); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString()); @@ -3639,6 +3714,84 @@ public void testRetrieveUpgradedToSegmentIdsInBatches() Assert.assertEquals(expected, actual); } + @Test + public void testRetrieveUsedSegmentsForSegmentAllocation() + { + final String datasource = "DS"; + DataSegment firstSegment; + Set nextSegments; + final Map loadspec = ImmutableMap.of("loadSpec", "loadSpec"); + final List dimensions = ImmutableList.of("dim1", "dim2"); + final List metrics = ImmutableList.of("metric1", "metric2"); + final int numSegmentsPerInterval = 100; + + final Interval month = Intervals.of("2024-10-01/2024-11-01"); + + final Interval year = Intervals.of("2024/2025"); + + final Interval overlappingDay = Intervals.of("2024-10-01/2024-10-02"); + final Interval nonOverlappingDay = Intervals.of("2024-01-01/2024-01-02"); + + final List intervals = ImmutableList.of(month, year, overlappingDay, nonOverlappingDay); + final List versions = ImmutableList.of("v0", "v1", "v2", "v2"); + for (int i = 0; i < 4; i++) { + nextSegments = new HashSet<>(); + firstSegment = new DataSegment( + datasource, + intervals.get(i), + versions.get(i), + loadspec, + dimensions, + metrics, + new DimensionRangeShardSpec(dimensions, null, null, 0, 1), + 0, + 100 + ); + insertUsedSegments(Collections.singleton(firstSegment), Collections.emptyMap()); + for (int j = 1; j < numSegmentsPerInterval; j++) { + nextSegments.add( + new DataSegment( + datasource, + intervals.get(i), + versions.get(i), + loadspec, + dimensions, + metrics, + // The numCorePartitions is intentionally 0 + new NumberedShardSpec(j, 0), + 0, + 100 + ) + ); + } + insertUsedSegments(nextSegments, Collections.emptyMap()); + } + + final Set expected = new HashSet<>(); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < numSegmentsPerInterval; j++) { + expected.add( + new SegmentIdWithShardSpec( + datasource, + intervals.get(i), + versions.get(i), + new NumberedShardSpec(j, 1) + ) + ); + } + } + + Assert.assertEquals(expected, + derbyConnector.retryWithHandle( + handle -> coordinator.retrieveUsedSegmentsForAllocation(handle, datasource, month) + .stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toSet()) + ) + ); + } + + private void insertUsedSegments(Set segments, Map upgradedFromSegmentIdMap) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); From 5ba2f2460aeacc166ac4099d075b7c26c7d0cb33 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Nov 2024 20:01:55 +0530 Subject: [PATCH 2/4] Address comments from #17420 --- .../actions/SegmentAllocationQueue.java | 8 ++--- .../actions/SegmentAllocationQueueTest.java | 18 ++++++++++- .../IndexerSQLMetadataStorageCoordinator.java | 13 +++----- .../metadata/SqlSegmentsMetadataQuery.java | 32 ++++++------------- 4 files changed, 36 insertions(+), 35 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 334af961b256..3c86e8e5637d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -87,7 +87,7 @@ public class SegmentAllocationQueue private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); - private final boolean skipSegmentPayloadFetchForAllocation; + private final boolean reduceMetadataIO; @Inject public SegmentAllocationQueue( @@ -102,7 +102,7 @@ public SegmentAllocationQueue( this.taskLockbox = taskLockbox; this.metadataStorage = metadataStorage; this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime(); - this.skipSegmentPayloadFetchForAllocation = taskLockConfig.isBatchAllocationReduceMetadataIO(); + this.reduceMetadataIO = taskLockConfig.isBatchAllocationReduceMetadataIO(); this.executor = taskLockConfig.isBatchSegmentAllocation() ? executorFactory.create(1, "SegmentAllocQueue-%s") : null; @@ -386,7 +386,7 @@ private Set retrieveUsedSegments(AllocateRequestKey key) return metadataStorage.getSegmentTimelineForAllocation( key.dataSource, key.preferredAllocationInterval, - (key.lockGranularity == LockGranularity.TIME_CHUNK) && skipSegmentPayloadFetchForAllocation + (key.lockGranularity == LockGranularity.TIME_CHUNK) && reduceMetadataIO ).findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); } @@ -495,7 +495,7 @@ private int allocateSegmentsForInterval( tryInterval, requestKey.skipSegmentLineageCheck, requestKey.lockGranularity, - skipSegmentPayloadFetchForAllocation + reduceMetadataIO ); int successfulRequests = 0; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index f91341ec496c..9aa9f4c9d263 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -36,6 +36,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.List; @@ -44,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +@RunWith(Parameterized.class) public class SegmentAllocationQueueTest { @Rule @@ -54,6 +57,19 @@ public class SegmentAllocationQueueTest private StubServiceEmitter emitter; private BlockingExecutorService executor; + private final boolean reduceMetadataIO; + + @Parameterized.Parameters(name = "reduceMetadataIO = {0}") + public static Object[][] getTestParameters() + { + return new Object[][]{{true}, {false}}; + } + + public SegmentAllocationQueueTest(boolean reduceMetadataIO) + { + this.reduceMetadataIO = reduceMetadataIO; + } + @Before public void setUp() { @@ -77,7 +93,7 @@ public long getBatchAllocationWaitTime() @Override public boolean isBatchAllocationReduceMetadataIO() { - return true; + return reduceMetadataIO; } }; diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index bf1bfdae0830..137218397572 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2936,11 +2936,9 @@ Set retrieveUsedSegmentsForAllocation( final Interval interval ) { - final Set overlappingSegmentIds = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUsedSegmentIds( - dataSource, - interval - ); + final Set overlappingSegmentIds + = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUsedSegmentIds(dataSource, interval); // Map from version -> interval -> segmentId with the smallest partitionNum Map> versionIntervalToSmallestSegmentId = new HashMap<>(); for (SegmentId segmentId : overlappingSegmentIds) { @@ -2967,13 +2965,12 @@ Set retrieveUsedSegmentsForAllocation( } if (!retrievedIds.equals(segmentIdsToRetrieve)) { throw DruidException.defensive( - "Cannot create DataSegments for segment allocations." - + "The used segments may have changed for dataSource[%s] and interval[%s].", + "Used segment IDs for dataSource[%s] and interval[%s] have changed in the metadata store.", dataSource, interval ); } - // Populate the dummy segment info + // Create dummy segments for each segmentId with only the shard spec populated Set segmentsWithAllocationInfo = new HashSet<>(); for (SegmentId id : overlappingSegmentIds) { final int corePartitions = versionIntervalToNumCorePartitions.get(id.getVersion()).get(id.getInterval()); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index bd53026a5aee..501c6d46134c 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -36,7 +36,6 @@ import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; @@ -250,27 +249,18 @@ public Set retrieveUsedSegmentIds( final Interval interval ) { - return retrieveSegmentIds(dataSource, Collections.singletonList(interval)); - } - - private Set retrieveSegmentIds( - final String dataSource, - final Collection intervals - ) - { - if (CollectionUtils.isNullOrEmpty(intervals)) { - return Collections.emptySet(); - } - - // Check if the intervals all support comparing as strings. If so, bake them into the SQL. - final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings); - final StringBuilder sb = new StringBuilder(); sb.append("SELECT id FROM %s WHERE used = :used AND dataSource = :dataSource"); + // If the interval supports comparing as a string, bake it into the SQL + final boolean compareAsString = Intervals.canCompareEndpointsAsStrings(interval); if (compareAsString) { sb.append( - getConditionForIntervalsAndMatchMode(intervals, IntervalMode.OVERLAPS, connector.getQuoteString()) + getConditionForIntervalsAndMatchMode( + Collections.singletonList(interval), + IntervalMode.OVERLAPS, + connector.getQuoteString() + ) ); } @@ -283,7 +273,7 @@ private Set retrieveSegmentIds( .bind("dataSource", dataSource); if (compareAsString) { - bindIntervalsToQuery(sql, intervals); + bindIntervalsToQuery(sql, Collections.singletonList(interval)); } final Set segmentIds = new HashSet<>(); @@ -297,10 +287,8 @@ private Set retrieveSegmentIds( id, dataSource ); } - for (Interval interval : intervals) { - if (IntervalMode.OVERLAPS.apply(interval, segmentId.getInterval())) { - segmentIds.add(segmentId); - } + if (IntervalMode.OVERLAPS.apply(interval, segmentId.getInterval())) { + segmentIds.add(segmentId); } } } From 7d7c08432ea4d1ac06ee48ff1ec078d5ee9603de Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 22 Nov 2024 14:22:53 +0530 Subject: [PATCH 3/4] Rename arg to reduceMetadataIO --- .../druid/indexing/overlord/TaskLockbox.java | 24 +++++++++---------- .../IndexerMetadataStorageCoordinator.java | 4 ++-- .../IndexerSQLMetadataStorageCoordinator.java | 17 ++++++------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index f38666859abd..916f8cab75ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -460,14 +460,14 @@ public LockResult tryLock(final Task task, final LockRequest request) * successfully and others failed. In that case, only the failed ones should be * retried. * - * @param requests List of allocation requests - * @param dataSource Datasource for which segment is to be allocated. - * @param interval Interval for which segment is to be allocated. - * @param skipSegmentLineageCheck Whether lineage check is to be skipped - * (this is true for streaming ingestion) - * @param lockGranularity Granularity of task lock - * @param skipSegmentPayloadFetchForAllocation Whether to skip fetching payloads for all used - * segments and rely on their ids instead. + * @param requests List of allocation requests + * @param dataSource Datasource for which segment is to be allocated. + * @param interval Interval for which segment is to be allocated. + * @param skipSegmentLineageCheck Whether lineage check is to be skipped + * (this is true for streaming ingestion) + * @param lockGranularity Granularity of task lock + * @param reduceMetadataIO Whether to skip fetching payloads for all used + * segments and rely on their IDs instead. * @return List of allocation results in the same order as the requests. */ public List allocateSegments( @@ -476,7 +476,7 @@ public List allocateSegments( Interval interval, boolean skipSegmentLineageCheck, LockGranularity lockGranularity, - boolean skipSegmentPayloadFetchForAllocation + boolean reduceMetadataIO ) { log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval); @@ -495,7 +495,7 @@ public List allocateSegments( interval, skipSegmentLineageCheck, holderList.getPending(), - skipSegmentPayloadFetchForAllocation + reduceMetadataIO ); } else { allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending(), false); @@ -716,7 +716,7 @@ private void allocateSegmentIds( Interval interval, boolean skipSegmentLineageCheck, Collection holders, - boolean skipSegmentPayloadFetchForAllocation + boolean reduceMetadataIO ) { if (holders.isEmpty()) { @@ -734,7 +734,7 @@ private void allocateSegmentIds( interval, skipSegmentLineageCheck, createRequests, - skipSegmentPayloadFetchForAllocation + reduceMetadataIO ); for (SegmentAllocationHolder holder : holders) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 867b1f618c97..da54d7e39986 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -180,7 +180,7 @@ List retrieveUnusedSegmentsForInterval( * Should be set to false if replica tasks would index events in same order * @param requests Requests for which to allocate segments. All * the requests must share the same partition space. - * @param skipSegmentPayloadFetch If true, try to use the segment ids instead of fetching every segment + * @param reduceMetadataIO If true, try to use the segment ids instead of fetching every segment * payload from the metadata store * @return Map from request to allocated segment id. The map does not contain * entries for failed requests. @@ -190,7 +190,7 @@ Map allocatePendingSegments( Interval interval, boolean skipSegmentLineageCheck, List requests, - boolean skipSegmentPayloadFetch + boolean reduceMetadataIO ); /** diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 137218397572..e3a3ee047088 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -720,7 +720,7 @@ public Map allocatePendingSegments Interval allocateInterval, boolean skipSegmentLineageCheck, List requests, - boolean skipSegmentPayloadFetch + boolean reduceMetadataIO ) { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -734,7 +734,7 @@ public Map allocatePendingSegments interval, skipSegmentLineageCheck, requests, - skipSegmentPayloadFetch + reduceMetadataIO ) ); } @@ -1015,12 +1015,12 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( public SegmentTimeline getSegmentTimelineForAllocation( String dataSource, Interval interval, - boolean skipSegmentPayloadFetchForAllocation + boolean reduceMetadataIO ) { return connector.retryWithHandle( handle -> { - if (skipSegmentPayloadFetchForAllocation) { + if (reduceMetadataIO) { return SegmentTimeline.forSegments(retrieveUsedSegmentsForAllocation(handle, dataSource, interval)); } else { return getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)); @@ -1035,15 +1035,12 @@ private Map allocatePendingSegment final Interval interval, final boolean skipSegmentLineageCheck, final List requests, - final boolean skipSegmentPayloadFetch + final boolean reduceMetadataIO ) throws IOException { // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = getSegmentTimelineForAllocation( - dataSource, - interval, - skipSegmentPayloadFetch - ).lookup(interval); + final List> existingChunks + = getSegmentTimelineForAllocation(dataSource, interval, reduceMetadataIO).lookup(interval); if (existingChunks.size() > 1) { log.warn( "Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", From 85986addc41ff5857e02e5838ec0b71ec065876a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 25 Nov 2024 13:11:49 +0530 Subject: [PATCH 4/4] Set new flag to true by default --- .../apache/druid/indexing/overlord/config/TaskLockConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java index 1f863133d65f..bc3acdc597d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java @@ -37,7 +37,7 @@ public class TaskLockConfig private long batchAllocationWaitTime = 0L; @JsonProperty - private boolean batchAllocationReduceMetadataIO = false; + private boolean batchAllocationReduceMetadataIO = true; public boolean isForceTimeChunkLock() {