Skip to content

Commit

Permalink
Add concurrent writes reconciliation for OPTIMIZE in Delta Lake
Browse files Browse the repository at this point in the history
Allow committing OPTIMIZE operations in a concurrent context
by placing these operations right after any other previously
concurrently completed write operations.
  • Loading branch information
findinpath authored and raunaqmorarka committed Jan 14, 2025
1 parent 69a2979 commit f548262
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2677,7 +2677,8 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(DeltaLak
tableHandle.getMetadataEntry().getOriginalPartitionColumns(),
maxScannedFileSize,
Optional.empty(),
retryMode != NO_RETRIES),
retryMode != NO_RETRIES,
tableHandle.getEnforcedPartitionConstraint()),
tableHandle.getLocation()));
}

Expand Down Expand Up @@ -2731,7 +2732,10 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
checkSupportedWriterVersion(table);

return new BeginTableExecuteResult<>(
executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())),
executeHandle.withProcedureHandle(
optimizeHandle
.withCurrentVersion(table.getReadVersion())
.withEnforcedPartitionConstraint(table.getEnforcedPartitionConstraint())),
table.forOptimize(true, optimizeHandle.getMaxScannedFileSize()));
}

Expand Down Expand Up @@ -2769,7 +2773,10 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl

boolean writeCommitted = false;
try {
long commitVersion = commitOptimizeOperation(session, optimizeHandle, tableLocation, scannedDataFiles, dataFileInfos);
IsolationLevel isolationLevel = getIsolationLevel(optimizeHandle.getMetadataEntry());
AtomicReference<Long> readVersion = new AtomicReference<>(optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set")));
long commitVersion = Failsafe.with(TRANSACTION_CONFLICT_RETRY_POLICY)
.get(context -> commitOptimizeOperation(session, optimizeHandle, isolationLevel, tableLocation, scannedDataFiles, dataFileInfos, readVersion, context.getAttemptCount()));
writeCommitted = true;
enqueueUpdateInfo(
session,
Expand Down Expand Up @@ -2799,17 +2806,29 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
private long commitOptimizeOperation(
ConnectorSession session,
DeltaTableOptimizeHandle optimizeHandle,
IsolationLevel isolationLevel,
String tableLocation,
Set<DeltaLakeScannedDataFile> scannedDataFiles,
List<DataFileInfo> dataFileInfos)
List<DataFileInfo> dataFileInfos,
AtomicReference<Long> readVersion,
int attemptCount)
throws IOException
{
long readVersion = optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set"));
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation);

long createdTime = Instant.now().toEpochMilli();
long commitVersion = readVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, OPTIMIZE_OPERATION, readVersion, false));
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get());
checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(optimizeHandle.getEnforcedPartitionConstraint()), isolationLevel, currentVersion, readVersion, tableLocation, attemptCount);
long commitVersion = currentVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(
session,
isolationLevel,
commitVersion,
createdTime,
OPTIMIZE_OPERATION,
optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set")),
false));
// TODO: Delta writes another field "operationMetrics" that I haven't
// seen before. It contains delete/update metrics. Investigate/include it.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Optional;
Expand All @@ -37,6 +38,7 @@ public class DeltaTableOptimizeHandle
private final DataSize maxScannedFileSize;
private final Optional<Long> currentVersion;
private final boolean retriesEnabled;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;

@JsonCreator
public DeltaTableOptimizeHandle(
Expand All @@ -46,7 +48,8 @@ public DeltaTableOptimizeHandle(
List<String> originalPartitionColumns,
DataSize maxScannedFileSize,
Optional<Long> currentVersion,
boolean retriesEnabled)
boolean retriesEnabled,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
Expand All @@ -55,6 +58,7 @@ public DeltaTableOptimizeHandle(
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.currentVersion = requireNonNull(currentVersion, "currentVersion is null");
this.retriesEnabled = retriesEnabled;
this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null");
}

public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
Expand All @@ -67,7 +71,21 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
originalPartitionColumns,
maxScannedFileSize,
Optional.of(currentVersion),
retriesEnabled);
retriesEnabled,
enforcedPartitionConstraint);
}

public DeltaTableOptimizeHandle withEnforcedPartitionConstraint(TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint)
{
return new DeltaTableOptimizeHandle(
metadataEntry,
protocolEntry,
tableColumns,
originalPartitionColumns,
maxScannedFileSize,
currentVersion,
retriesEnabled,
requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null"));
}

@JsonProperty
Expand Down Expand Up @@ -114,4 +132,10 @@ public boolean isRetriesEnabled()
{
return retriesEnabled;
}

@JsonProperty
public TupleDomain<DeltaLakeColumnHandle> getEnforcedPartitionConstraint()
{
return enforcedPartitionConstraint;
}
}
Loading

0 comments on commit f548262

Please sign in to comment.