diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 98ab50cff788..3c86e8e5637d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -25,7 +25,6 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.java.util.common.ISE; @@ -41,6 +40,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; import org.joda.time.Interval; import java.util.ArrayList; @@ -87,6 +87,8 @@ public class SegmentAllocationQueue private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + private final boolean reduceMetadataIO; + @Inject public SegmentAllocationQueue( TaskLockbox taskLockbox, @@ -100,6 +102,7 @@ public SegmentAllocationQueue( this.taskLockbox = taskLockbox; this.metadataStorage = metadataStorage; this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime(); + this.reduceMetadataIO = taskLockConfig.isBatchAllocationReduceMetadataIO(); this.executor = taskLockConfig.isBatchSegmentAllocation() ? executorFactory.create(1, "SegmentAllocQueue-%s") : null; @@ -380,13 +383,11 @@ private boolean processBatch(AllocateRequestBatch requestBatch) private Set retrieveUsedSegments(AllocateRequestKey key) { - return new HashSet<>( - metadataStorage.retrieveUsedSegmentsForInterval( - key.dataSource, - key.preferredAllocationInterval, - Segments.ONLY_VISIBLE - ) - ); + return metadataStorage.getSegmentTimelineForAllocation( + key.dataSource, + key.preferredAllocationInterval, + (key.lockGranularity == LockGranularity.TIME_CHUNK) && reduceMetadataIO + ).findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); } private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set usedSegments) @@ -493,7 +494,8 @@ private int allocateSegmentsForInterval( requestKey.dataSource, tryInterval, requestKey.skipSegmentLineageCheck, - requestKey.lockGranularity + requestKey.lockGranularity, + reduceMetadataIO ); int successfulRequests = 0; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index bebb52157d6f..916f8cab75ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -466,6 +466,8 @@ public LockResult tryLock(final Task task, final LockRequest request) * @param skipSegmentLineageCheck Whether lineage check is to be skipped * (this is true for streaming ingestion) * @param lockGranularity Granularity of task lock + * @param reduceMetadataIO Whether to skip fetching payloads for all used + * segments and rely on their IDs instead. * @return List of allocation results in the same order as the requests. */ public List allocateSegments( @@ -473,7 +475,8 @@ public List allocateSegments( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - LockGranularity lockGranularity + LockGranularity lockGranularity, + boolean reduceMetadataIO ) { log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval); @@ -487,9 +490,15 @@ public List allocateSegments( if (isTimeChunkLock) { // For time-chunk locking, segment must be allocated only after acquiring the lock holderList.getPending().forEach(holder -> acquireTaskLock(holder, true)); - allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + allocateSegmentIds( + dataSource, + interval, + skipSegmentLineageCheck, + holderList.getPending(), + reduceMetadataIO + ); } else { - allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending(), false); holderList.getPending().forEach(holder -> acquireTaskLock(holder, false)); } holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded); @@ -702,12 +711,12 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest request) * for the given requests. Updates the holder with the allocated segment if * the allocation succeeds, otherwise marks it as failed. */ - @VisibleForTesting - void allocateSegmentIds( + private void allocateSegmentIds( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - Collection holders + Collection holders, + boolean reduceMetadataIO ) { if (holders.isEmpty()) { @@ -724,7 +733,8 @@ void allocateSegmentIds( dataSource, interval, skipSegmentLineageCheck, - createRequests + createRequests, + reduceMetadataIO ); for (SegmentAllocationHolder holder : holders) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java index 2634c4328fec..bc3acdc597d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java @@ -36,6 +36,9 @@ public class TaskLockConfig @JsonProperty private long batchAllocationWaitTime = 0L; + @JsonProperty + private boolean batchAllocationReduceMetadataIO = true; + public boolean isForceTimeChunkLock() { return forceTimeChunkLock; @@ -50,4 +53,10 @@ public long getBatchAllocationWaitTime() { return batchAllocationWaitTime; } + + public boolean isBatchAllocationReduceMetadataIO() + { + return batchAllocationReduceMetadataIO; + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index fdb7fcd5595b..3c0b08758f76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -93,21 +93,28 @@ public class SegmentAllocateActionTest private SegmentAllocationQueue allocationQueue; - @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}") + @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}, skipSegmentPayloadFetchForAllocation = {2}") public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.SEGMENT, true}, - new Object[]{LockGranularity.SEGMENT, false}, - new Object[]{LockGranularity.TIME_CHUNK, true}, - new Object[]{LockGranularity.TIME_CHUNK, false} + new Object[]{LockGranularity.SEGMENT, true, true}, + new Object[]{LockGranularity.SEGMENT, true, false}, + new Object[]{LockGranularity.SEGMENT, false, false}, + new Object[]{LockGranularity.TIME_CHUNK, true, true}, + new Object[]{LockGranularity.TIME_CHUNK, true, false}, + new Object[]{LockGranularity.TIME_CHUNK, false, false} ); } - public SegmentAllocateActionTest(LockGranularity lockGranularity, boolean useBatch) + public SegmentAllocateActionTest( + LockGranularity lockGranularity, + boolean useBatch, + boolean skipSegmentPayloadFetchForAllocation + ) { this.lockGranularity = lockGranularity; this.useBatch = useBatch; + this.taskActionTestKit.setSkipSegmentPayloadFetchForAllocation(skipSegmentPayloadFetchForAllocation); } @Before diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index 6a5c84082f2c..9aa9f4c9d263 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -36,6 +36,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.List; @@ -44,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +@RunWith(Parameterized.class) public class SegmentAllocationQueueTest { @Rule @@ -54,6 +57,19 @@ public class SegmentAllocationQueueTest private StubServiceEmitter emitter; private BlockingExecutorService executor; + private final boolean reduceMetadataIO; + + @Parameterized.Parameters(name = "reduceMetadataIO = {0}") + public static Object[][] getTestParameters() + { + return new Object[][]{{true}, {false}}; + } + + public SegmentAllocationQueueTest(boolean reduceMetadataIO) + { + this.reduceMetadataIO = reduceMetadataIO; + } + @Before public void setUp() { @@ -73,6 +89,12 @@ public long getBatchAllocationWaitTime() { return 0; } + + @Override + public boolean isBatchAllocationReduceMetadataIO() + { + return reduceMetadataIO; + } }; allocationQueue = new SegmentAllocationQueue( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index fcbf37c956da..208fec01fe45 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -58,6 +58,8 @@ public class TaskActionTestKit extends ExternalResource private SegmentSchemaManager segmentSchemaManager; private SegmentSchemaCache segmentSchemaCache; + private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO(); + public TaskLockbox getTaskLockbox() { return taskLockbox; @@ -78,6 +80,11 @@ public TaskActionToolbox getTaskActionToolbox() return taskActionToolbox; } + public void setSkipSegmentPayloadFetchForAllocation(boolean skipSegmentPayloadFetchForAllocation) + { + this.skipSegmentPayloadFetchForAllocation = skipSegmentPayloadFetchForAllocation; + } + @Override public void before() { @@ -126,6 +133,12 @@ public long getBatchAllocationWaitTime() { return 10L; } + + @Override + public boolean isBatchAllocationReduceMetadataIO() + { + return skipSegmentPayloadFetchForAllocation; + } }; taskActionToolbox = new TaskActionToolbox( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d2055d6e0c99..54e323581c47 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -168,7 +169,8 @@ public Map allocatePendingSegments String dataSource, Interval interval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean isTimeChunk ) { return Collections.emptyMap(); @@ -332,6 +334,20 @@ public Map> retrieveUpgradedToSegmentIds( return Collections.emptyMap(); } + @Override + public SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation + ) + { + return SegmentTimeline.forSegments(retrieveUsedSegmentsForIntervals( + dataSource, + Collections.singletonList(interval), + Segments.INCLUDING_OVERSHADOWED + )); + } + public Set getPublished() { return ImmutableSet.copyOf(published); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 83b4ac7e474c..da54d7e39986 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -25,6 +25,7 @@ import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -179,6 +180,8 @@ List retrieveUnusedSegmentsForInterval( * Should be set to false if replica tasks would index events in same order * @param requests Requests for which to allocate segments. All * the requests must share the same partition space. + * @param reduceMetadataIO If true, try to use the segment ids instead of fetching every segment + * payload from the metadata store * @return Map from request to allocated segment id. The map does not contain * entries for failed requests. */ @@ -186,7 +189,20 @@ Map allocatePendingSegments( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean reduceMetadataIO + ); + + /** + * Return a segment timeline of all used segments including overshadowed ones for a given datasource and interval + * if skipSegmentPayloadFetchForAllocation is set to true, do not fetch all the segment payloads for allocation + * Instead fetch all the ids and numCorePartitions using exactly one segment per version per interval + * return a dummy DataSegment for each id that holds only the SegmentId and a NumberedShardSpec with numCorePartitions + */ + SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation ); /** diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a512f7935740..e3a3ee047088 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -719,7 +719,8 @@ public Map allocatePendingSegments String dataSource, Interval allocateInterval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean reduceMetadataIO ) { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -727,7 +728,14 @@ public Map allocatePendingSegments final Interval interval = allocateInterval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( - handle -> allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests) + handle -> allocatePendingSegments( + handle, + dataSource, + interval, + skipSegmentLineageCheck, + requests, + reduceMetadataIO + ) ); } @@ -1003,18 +1011,36 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( return newIdentifier; } + @Override + public SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean reduceMetadataIO + ) + { + return connector.retryWithHandle( + handle -> { + if (reduceMetadataIO) { + return SegmentTimeline.forSegments(retrieveUsedSegmentsForAllocation(handle, dataSource, interval)); + } else { + return getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)); + } + } + ); + } + private Map allocatePendingSegments( final Handle handle, final String dataSource, final Interval interval, final boolean skipSegmentLineageCheck, - final List requests + final List requests, + final boolean reduceMetadataIO ) throws IOException { // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = - getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) - .lookup(interval); + final List> existingChunks + = getSegmentTimelineForAllocation(dataSource, interval, reduceMetadataIO).lookup(interval); if (existingChunks.size() > 1) { log.warn( "Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", @@ -2900,6 +2926,67 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set retrieveUsedSegmentsForAllocation( + final Handle handle, + final String dataSource, + final Interval interval + ) + { + final Set overlappingSegmentIds + = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUsedSegmentIds(dataSource, interval); + // Map from version -> interval -> segmentId with the smallest partitionNum + Map> versionIntervalToSmallestSegmentId = new HashMap<>(); + for (SegmentId segmentId : overlappingSegmentIds) { + final Map map + = versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v -> new HashMap<>()); + final SegmentId value = map.get(segmentId.getInterval()); + if (value == null || value.getPartitionNum() > segmentId.getPartitionNum()) { + map.put(interval, segmentId); + } + } + + // Retrieve the segments for the ids stored in the map to get the numCorePartitions + final Set segmentIdsToRetrieve = new HashSet<>(); + for (Map itvlMap : versionIntervalToSmallestSegmentId.values()) { + segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList())); + } + final Set dataSegments = retrieveSegmentsById(dataSource, segmentIdsToRetrieve); + final Set retrievedIds = new HashSet<>(); + final Map> versionIntervalToNumCorePartitions = new HashMap<>(); + for (DataSegment segment : dataSegments) { + versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(), v -> new HashMap<>()) + .put(segment.getInterval(), segment.getShardSpec().getNumCorePartitions()); + retrievedIds.add(segment.getId().toString()); + } + if (!retrievedIds.equals(segmentIdsToRetrieve)) { + throw DruidException.defensive( + "Used segment IDs for dataSource[%s] and interval[%s] have changed in the metadata store.", + dataSource, interval + ); + } + + // Create dummy segments for each segmentId with only the shard spec populated + Set segmentsWithAllocationInfo = new HashSet<>(); + for (SegmentId id : overlappingSegmentIds) { + final int corePartitions = versionIntervalToNumCorePartitions.get(id.getVersion()).get(id.getInterval()); + segmentsWithAllocationInfo.add( + new DataSegment( + id, + null, + null, + null, + new NumberedShardSpec(id.getPartitionNum(), corePartitions), + null, + null, + 1 + ) + ); + } + return segmentsWithAllocationInfo; + } + @Override public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index fc1c84a70371..501c6d46134c 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.UnmodifiableIterator; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -48,6 +49,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -242,6 +244,58 @@ public CloseableIterator retrieveUnusedSegmentsPlus( ); } + public Set retrieveUsedSegmentIds( + final String dataSource, + final Interval interval + ) + { + final StringBuilder sb = new StringBuilder(); + sb.append("SELECT id FROM %s WHERE used = :used AND dataSource = :dataSource"); + + // If the interval supports comparing as a string, bake it into the SQL + final boolean compareAsString = Intervals.canCompareEndpointsAsStrings(interval); + if (compareAsString) { + sb.append( + getConditionForIntervalsAndMatchMode( + Collections.singletonList(interval), + IntervalMode.OVERLAPS, + connector.getQuoteString() + ) + ); + } + + return connector.inReadOnlyTransaction( + (handle, status) -> { + final Query> sql = handle + .createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable())) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("used", true) + .bind("dataSource", dataSource); + + if (compareAsString) { + bindIntervalsToQuery(sql, Collections.singletonList(interval)); + } + + final Set segmentIds = new HashSet<>(); + try (final ResultIterator iterator = sql.map((index, r, ctx) -> r.getString(1)).iterator()) { + while (iterator.hasNext()) { + final String id = iterator.next(); + final SegmentId segmentId = SegmentId.tryParse(dataSource, id); + if (segmentId == null) { + throw DruidException.defensive( + "Failed to parse SegmentId for id[%s] and dataSource[%s].", + id, dataSource + ); + } + if (IntervalMode.OVERLAPS.apply(interval, segmentId.getInterval())) { + segmentIds.add(segmentId); + } + } + } + return segmentIds; + }); + } + public List retrieveSegmentsById( String datasource, Set segmentIds diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 4b592e5f40da..0377faac5fbf 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2669,6 +2669,76 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() } + + @Test + public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() + { + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final String dataSource = "ds"; + final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final String sequenceName = "seq"; + + final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request), + true + ).get(request); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); + + final SegmentCreateRequest request1 = + new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request1), + true + ).get(request1); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); + + final SegmentCreateRequest request2 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request2), + true + ).get(request2); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); + + final SegmentCreateRequest request3 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request3), + true + ).get(request3); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString()); + Assert.assertEquals(segmentId2, segmentId3); + + final SegmentCreateRequest request4 = + new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request4), + true + ).get(request4); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString()); + } + @Test public void testAllocatePendingSegments() { @@ -2682,7 +2752,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request) + Collections.singletonList(request), + false ).get(request); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); @@ -2693,7 +2764,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request1) + Collections.singletonList(request1), + false ).get(request1); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); @@ -2704,7 +2776,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request2) + Collections.singletonList(request2), + false ).get(request2); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); @@ -2715,7 +2788,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request3) + Collections.singletonList(request3), + false ).get(request3); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString()); @@ -2727,7 +2801,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request4) + Collections.singletonList(request4), + false ).get(request4); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString()); @@ -3639,6 +3714,84 @@ public void testRetrieveUpgradedToSegmentIdsInBatches() Assert.assertEquals(expected, actual); } + @Test + public void testRetrieveUsedSegmentsForSegmentAllocation() + { + final String datasource = "DS"; + DataSegment firstSegment; + Set nextSegments; + final Map loadspec = ImmutableMap.of("loadSpec", "loadSpec"); + final List dimensions = ImmutableList.of("dim1", "dim2"); + final List metrics = ImmutableList.of("metric1", "metric2"); + final int numSegmentsPerInterval = 100; + + final Interval month = Intervals.of("2024-10-01/2024-11-01"); + + final Interval year = Intervals.of("2024/2025"); + + final Interval overlappingDay = Intervals.of("2024-10-01/2024-10-02"); + final Interval nonOverlappingDay = Intervals.of("2024-01-01/2024-01-02"); + + final List intervals = ImmutableList.of(month, year, overlappingDay, nonOverlappingDay); + final List versions = ImmutableList.of("v0", "v1", "v2", "v2"); + for (int i = 0; i < 4; i++) { + nextSegments = new HashSet<>(); + firstSegment = new DataSegment( + datasource, + intervals.get(i), + versions.get(i), + loadspec, + dimensions, + metrics, + new DimensionRangeShardSpec(dimensions, null, null, 0, 1), + 0, + 100 + ); + insertUsedSegments(Collections.singleton(firstSegment), Collections.emptyMap()); + for (int j = 1; j < numSegmentsPerInterval; j++) { + nextSegments.add( + new DataSegment( + datasource, + intervals.get(i), + versions.get(i), + loadspec, + dimensions, + metrics, + // The numCorePartitions is intentionally 0 + new NumberedShardSpec(j, 0), + 0, + 100 + ) + ); + } + insertUsedSegments(nextSegments, Collections.emptyMap()); + } + + final Set expected = new HashSet<>(); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < numSegmentsPerInterval; j++) { + expected.add( + new SegmentIdWithShardSpec( + datasource, + intervals.get(i), + versions.get(i), + new NumberedShardSpec(j, 1) + ) + ); + } + } + + Assert.assertEquals(expected, + derbyConnector.retryWithHandle( + handle -> coordinator.retrieveUsedSegmentsForAllocation(handle, datasource, month) + .stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toSet()) + ) + ); + } + + private void insertUsedSegments(Set segments, Map upgradedFromSegmentIdMap) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();