diff --git a/LICENSE b/LICENSE index e9c023b30..7b965b4a5 100644 --- a/LICENSE +++ b/LICENSE @@ -218,6 +218,7 @@ This product includes code from Apache Iceberg. * spec/iceberg-rest-catalog-open-api.yaml * spec/polaris-catalog-apis/oauth-tokens-api.yaml * integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java +* service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java Copyright: Copyright 2017-2025 The Apache Software Foundation Home page: https://iceberg.apache.org diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index f80d4077a..ae34be928 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -35,25 +35,32 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.LocationProviders; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -68,13 +75,17 @@ import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseMetastoreViewCatalog; -import org.apache.iceberg.view.BaseViewOperations; import org.apache.iceberg.view.ViewBuilder; import org.apache.iceberg.view.ViewMetadata; import org.apache.iceberg.view.ViewMetadataParser; import org.apache.iceberg.view.ViewOperations; +import org.apache.iceberg.view.ViewProperties; import org.apache.iceberg.view.ViewUtil; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.admin.model.StorageConfigInfo; @@ -1190,7 +1201,14 @@ public ViewBuilder withLocation(String newLocation) { } } - private class BasePolarisTableOperations extends BaseMetastoreTableOperations { + /** + * An implementation of {@link TableOperations} that integrates with {@link IcebergCatalog}. Much + * of this code was originally copied from {@link + * org.apache.iceberg.BaseMetastoreTableOperations}. CODE_COPIED_TO_POLARIS From Apache Iceberg + * Version: 1.8 + */ + private class BasePolarisTableOperations extends PolarisOperationsBase + implements TableOperations { private final TableIdentifier tableIdentifier; private final String fullTableName; private FileIO tableFileIO; @@ -1203,6 +1221,77 @@ private class BasePolarisTableOperations extends BaseMetastoreTableOperations { } @Override + public TableMetadata current() { + if (shouldRefresh) { + return refresh(); + } + return currentMetadata; + } + + @Override + public TableMetadata refresh() { + boolean currentMetadataWasAvailable = currentMetadata != null; + try { + doRefresh(); + } catch (NoSuchTableException e) { + if (currentMetadataWasAvailable) { + LOGGER.warn( + "Could not find the table during refresh, setting current metadata to null", e); + shouldRefresh = true; + } + + currentMetadata = null; + currentMetadataLocation = null; + version = -1; + throw e; + } + return current(); + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + // if the metadata is already out of date, reject it + if (base != current()) { + if (base != null) { + throw new CommitFailedException("Cannot commit: stale table metadata"); + } else { + // when current is non-null, the table exists. but when base is null, the commit is trying + // to create the table + throw new AlreadyExistsException("Table already exists: %s", fullTableName); + } + } + // if the metadata is not changed, return early + if (base == metadata) { + LOGGER.info("Nothing to commit."); + return; + } + + long start = System.currentTimeMillis(); + doCommit(base, metadata); + CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata); + requestRefresh(); + + LOGGER.info( + "Successfully committed to table {} in {} ms", + fullTableName, + System.currentTimeMillis() - start); + } + + @Override + public FileIO io() { + return tableFileIO; + } + + @Override + public String metadataFileLocation(String filename) { + return metadataFileLocation(current(), filename); + } + + @Override + public LocationProvider locationProvider() { + return LocationProviders.locationsFor(current().location(), current().properties()); + } + public void doRefresh() { LOGGER.debug("doRefresh for tableIdentifier {}", tableIdentifier); // While doing refresh/commit protocols, we must fetch the fresh "passthrough" resolved @@ -1250,7 +1339,6 @@ public void doRefresh() { } } - @Override public void doCommit(TableMetadata base, TableMetadata metadata) { LOGGER.debug( "doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata); @@ -1376,11 +1464,11 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } if (!Objects.equal(existingLocation, oldLocation)) { if (null == base) { - throw new AlreadyExistsException("Table already exists: %s", tableName()); + throw new AlreadyExistsException("Table already exists: %s", fullTableName); } if (null == existingLocation) { - throw new NoSuchTableException("Table does not exist: %s", tableName()); + throw new NoSuchTableException("Table does not exist: %s", fullTableName); } throw new CommitFailedException( @@ -1396,45 +1484,101 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } @Override - public FileIO io() { - return tableFileIO; + public TableOperations temp(TableMetadata uncommittedMetadata) { + return new TableOperations() { + @Override + public TableMetadata current() { + return uncommittedMetadata; + } + + @Override + public TableMetadata refresh() { + throw new UnsupportedOperationException( + "Cannot call refresh on temporary table operations"); + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + throw new UnsupportedOperationException( + "Cannot call commit on temporary table operations"); + } + + @Override + public String metadataFileLocation(String fileName) { + return BasePolarisTableOperations.this.metadataFileLocation( + uncommittedMetadata, fileName); + } + + @Override + public LocationProvider locationProvider() { + return LocationProviders.locationsFor( + uncommittedMetadata.location(), uncommittedMetadata.properties()); + } + + @Override + public FileIO io() { + return BasePolarisTableOperations.this.io(); + } + + @Override + public EncryptionManager encryption() { + return BasePolarisTableOperations.this.encryption(); + } + + @Override + public long newSnapshotId() { + return BasePolarisTableOperations.this.newSnapshotId(); + } + }; } - @Override - protected String tableName() { - return fullTableName; + protected String writeNewMetadataIfRequired(boolean newTable, TableMetadata metadata) { + return newTable && metadata.metadataFileLocation() != null + ? metadata.metadataFileLocation() + : writeNewMetadata(metadata, version + 1); } - } - private void validateMetadataFileInTableDir( - TableIdentifier identifier, TableMetadata metadata, CatalogEntity catalog) { - PolarisCallContext polarisCallContext = callContext.getPolarisCallContext(); - boolean allowEscape = - polarisCallContext - .getConfigurationStore() - .getConfiguration( - polarisCallContext, FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION); - if (!allowEscape - && !polarisCallContext - .getConfigurationStore() - .getConfiguration( - polarisCallContext, FeatureConfiguration.ALLOW_EXTERNAL_METADATA_FILE_LOCATION)) { - LOGGER.debug( - "Validating base location {} for table {} in metadata file {}", - metadata.location(), - identifier, - metadata.metadataFileLocation()); - StorageLocation metadataFileLocation = StorageLocation.of(metadata.metadataFileLocation()); - StorageLocation baseLocation = StorageLocation.of(metadata.location()); - if (!metadataFileLocation.isChildOf(baseLocation)) { - throw new BadRequestException( - "Metadata location %s is not allowed outside of table location %s", - metadata.metadataFileLocation(), metadata.location()); + protected String writeNewMetadata(TableMetadata metadata, int newVersion) { + String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion); + OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath); + + // write the new metadata + // use overwrite to avoid negative caching in S3. this is safe because the metadata location + // is + // always unique because it includes a UUID. + TableMetadataParser.overwrite(metadata, newMetadataLocation); + + return newMetadataLocation.location(); + } + + private String metadataFileLocation(TableMetadata metadata, String filename) { + String metadataLocation = metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION); + + if (metadataLocation != null) { + return String.format("%s/%s", LocationUtil.stripTrailingSlash(metadataLocation), filename); + } else { + return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename); } } + + private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { + String codecName = + meta.property( + TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT); + String fileExtension = TableMetadataParser.getFileExtension(codecName); + return metadataFileLocation( + meta, + String.format(Locale.ROOT, "%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension)); + } } - private class BasePolarisViewOperations extends BaseViewOperations { + /** + * An implementation of {@link ViewOperations} that integrates with {@link IcebergCatalog}. Much + * of this code was originally copied from {@link org.apache.iceberg.view.BaseViewOperations}. + * CODE_COPIED_TO_POLARIS From Apache Iceberg Version: 1.8 + */ + private class BasePolarisViewOperations extends PolarisOperationsBase + implements ViewOperations { private final TableIdentifier identifier; private final String fullViewName; private FileIO viewFileIO; @@ -1446,6 +1590,65 @@ private class BasePolarisViewOperations extends BaseViewOperations { } @Override + public ViewMetadata current() { + if (shouldRefresh) { + return refresh(); + } + + return currentMetadata; + } + + @Override + public ViewMetadata refresh() { + boolean currentMetadataWasAvailable = currentMetadata != null; + try { + doRefresh(); + } catch (NoSuchViewException e) { + if (currentMetadataWasAvailable) { + LOGGER.warn( + "Could not find the view during refresh, setting current metadata to null", e); + shouldRefresh = true; + } + + currentMetadata = null; + currentMetadataLocation = null; + version = -1; + throw e; + } + + return current(); + } + + @Override + @SuppressWarnings("ImmutablesReferenceEquality") + public void commit(ViewMetadata base, ViewMetadata metadata) { + // if the metadata is already out of date, reject it + if (base != current()) { + if (base != null) { + throw new CommitFailedException("Cannot commit: stale view metadata"); + } else { + // when current is non-null, the view exists. but when base is null, the commit is trying + // to create the view + throw new AlreadyExistsException("View already exists: %s", viewName()); + } + } + + // if the metadata is not changed, return early + if (base == metadata) { + LOGGER.info("Nothing to commit."); + return; + } + + long start = System.currentTimeMillis(); + doCommit(base, metadata); + requestRefresh(); + + LOGGER.info( + "Successfully committed to view {} in {} ms", + viewName(), + System.currentTimeMillis() - start); + } + public void doRefresh() { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getPassthroughResolvedPath( @@ -1492,7 +1695,6 @@ public void doRefresh() { } } - @Override public void doCommit(ViewMetadata base, ViewMetadata metadata) { // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict LOGGER.debug("doCommit for view {} with base {}, metadata {}", identifier, base, metadata); @@ -1548,7 +1750,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE)); String newLocation = writeNewMetadataIfRequired(metadata); - String oldLocation = base == null ? null : currentMetadataLocation(); + String oldLocation = base == null ? null : currentMetadataLocation; IcebergTableLikeEntity entity = IcebergTableLikeEntity.of( @@ -1589,17 +1791,170 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) { } } - @Override + protected String writeNewMetadataIfRequired(ViewMetadata metadata) { + return null != metadata.metadataFileLocation() + ? metadata.metadataFileLocation() + : writeNewMetadata(metadata, version + 1); + } + + private String writeNewMetadata(ViewMetadata metadata, int newVersion) { + String newMetadataFilePath = newMetadataFilePath(metadata, newVersion); + OutputFile newMetadataLocation = io().newOutputFile(newMetadataFilePath); + + // write the new metadata + // use overwrite to avoid negative caching in S3. this is safe because the metadata location + // is + // always unique because it includes a UUID. + ViewMetadataParser.overwrite(metadata, newMetadataLocation); + + return newMetadataLocation.location(); + } + + private String newMetadataFilePath(ViewMetadata metadata, int newVersion) { + String codecName = + metadata + .properties() + .getOrDefault( + ViewProperties.METADATA_COMPRESSION, ViewProperties.METADATA_COMPRESSION_DEFAULT); + String fileExtension = TableMetadataParser.getFileExtension(codecName); + return metadataFileLocation( + metadata, + String.format(Locale.ROOT, "%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension)); + } + + private String metadataFileLocation(ViewMetadata metadata, String filename) { + String metadataLocation = metadata.properties().get(ViewProperties.WRITE_METADATA_LOCATION); + if (metadataLocation != null) { + return String.format("%s/%s", LocationUtil.stripTrailingSlash(metadataLocation), filename); + } else { + return String.format( + "%s/%s/%s", + LocationUtil.stripTrailingSlash(metadata.location()), METADATA_FOLDER_NAME, filename); + } + } + public FileIO io() { return viewFileIO; } - @Override protected String viewName() { return fullViewName; } } + /** + * An ABC for {@link BasePolarisTableOperations} and {@link BasePolarisViewOperations}. Much of + * this code was originally copied from {@link org.apache.iceberg.BaseMetastoreTableOperations}. + * CODE_COPIED_TO_POLARIS From Apache Iceberg Version: 1.8 + */ + private abstract static class PolarisOperationsBase { + + protected static final String METADATA_FOLDER_NAME = "metadata"; + + protected T currentMetadata = null; + protected String currentMetadataLocation = null; + protected boolean shouldRefresh = true; + protected int version = -1; + + protected void requestRefresh() { + this.shouldRefresh = true; + } + + protected void disableRefresh() { + this.shouldRefresh = false; + } + + /** + * Parse the version from table/view metadata file name. + * + * @param metadataLocation table/view metadata file location + * @return version of the table/view metadata file in success case and -1 if the version is not + * parsable (as a sign that the metadata is not part of this catalog) + */ + protected int parseVersion(String metadataLocation) { + int versionStart = + metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 + int versionEnd = metadataLocation.indexOf('-', versionStart); + if (versionEnd < 0) { + // found filesystem object's metadata + return -1; + } + + try { + return Integer.parseInt(metadataLocation.substring(versionStart, versionEnd)); + } catch (NumberFormatException e) { + LOGGER.warn("Unable to parse version from metadata location: {}", metadataLocation, e); + return -1; + } + } + + protected void refreshFromMetadataLocation( + String newLocation, + Predicate shouldRetry, + int numRetries, + Function metadataLoader) { + // use null-safe equality check because new tables have a null metadata location + if (!Objects.equal(currentMetadataLocation, newLocation)) { + LOGGER.info("Refreshing table metadata from new version: {}", newLocation); + + AtomicReference newMetadata = new AtomicReference<>(); + Tasks.foreach(newLocation) + .retry(numRetries) + .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) + .throwFailureWhenFinished() + .stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null + .shouldRetryTest(shouldRetry) + .run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation))); + + if (newMetadata.get() instanceof TableMetadata tableMetadata) { + if (currentMetadata instanceof TableMetadata currentTableMetadata) { + String newUUID = tableMetadata.uuid(); + if (currentMetadata != null && currentTableMetadata.uuid() != null && newUUID != null) { + Preconditions.checkState( + newUUID.equals(currentTableMetadata.uuid()), + "Table UUID does not match: current=%s != refreshed=%s", + currentTableMetadata.uuid(), + newUUID); + } + } + } + + this.currentMetadata = newMetadata.get(); + this.currentMetadataLocation = newLocation; + this.version = parseVersion(newLocation); + } + this.shouldRefresh = false; + } + } + + private void validateMetadataFileInTableDir( + TableIdentifier identifier, TableMetadata metadata, CatalogEntity catalog) { + PolarisCallContext polarisCallContext = callContext.getPolarisCallContext(); + boolean allowEscape = + polarisCallContext + .getConfigurationStore() + .getConfiguration( + polarisCallContext, FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION); + if (!allowEscape + && !polarisCallContext + .getConfigurationStore() + .getConfiguration( + polarisCallContext, FeatureConfiguration.ALLOW_EXTERNAL_METADATA_FILE_LOCATION)) { + LOGGER.debug( + "Validating base location {} for table {} in metadata file {}", + metadata.location(), + identifier, + metadata.metadataFileLocation()); + StorageLocation metadataFileLocation = StorageLocation.of(metadata.metadataFileLocation()); + StorageLocation baseLocation = StorageLocation.of(metadata.location()); + if (!metadataFileLocation.isChildOf(baseLocation)) { + throw new BadRequestException( + "Metadata location %s is not allowed outside of table location %s", + metadata.metadataFileLocation(), metadata.location()); + } + } + } + private FileIO loadFileIOForTableLike( TableIdentifier identifier, Set readLocations,