Skip to content

Initial POC - streaming table metadata rather than de/serializing in memory #1395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,11 @@ protected FeatureConfiguration(
+ " requires experimentation in the specific deployment environment")
.defaultValue(100 * EntityWeigher.WEIGHT_PER_MB)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Boolean> ENABLE_STREAMING_TABLE_METADATA =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of just an enable/disable, can we have something like STREAMING_TABLE_METADATA_MIN_SIZE?

PolarisConfiguration.<Boolean>builder()
.key("ENABLE_STREAMING_TABLE_METADATA")
.description("If true, enable streaming table metadata without deserializing into memory")
.defaultValue(true)
.buildFeatureConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTResponse;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
Expand All @@ -48,6 +49,7 @@
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.view.ImmutableSQLViewRepresentation;
import org.apache.iceberg.view.ImmutableViewVersion;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
Expand Down Expand Up @@ -772,7 +774,7 @@ public void testRegisterTableAllSufficientPrivileges() {

// To get a handy metadata file we can use one from another table.
// to avoid overlapping directories, drop the original table and recreate it via registerTable
final String metadataLocation = newWrapper().loadTable(TABLE_NS1_1, "all").metadataLocation();
final String metadataLocation = getMetadataLocation(newWrapper().loadTable(TABLE_NS1_1, "all"));
newWrapper(Set.of(PRINCIPAL_ROLE2)).dropTableWithoutPurge(TABLE_NS1_1);

final RegisterTableRequest registerRequest =
Expand Down Expand Up @@ -802,6 +804,14 @@ public String metadataLocation() {
});
}

private static String getMetadataLocation(RESTResponse resp) {
final String metadataLocation =
resp instanceof IcebergCatalogHandler.StreamingLoadTableResponse
? ((IcebergCatalogHandler.StreamingLoadTableResponse) resp).metadataLocation()
: ((LoadTableResponse) resp).metadataLocation();
return metadataLocation;
}

@Test
public void testRegisterTableInsufficientPermissions() {
Assertions.assertThat(
Expand All @@ -810,7 +820,7 @@ public void testRegisterTableInsufficientPermissions() {
.isTrue();

// To get a handy metadata file we can use one from another table.
final String metadataLocation = newWrapper().loadTable(TABLE_NS1_1, "all").metadataLocation();
final String metadataLocation = getMetadataLocation(newWrapper().loadTable(TABLE_NS1_1, "all"));

final RegisterTableRequest registerRequest =
new RegisterTableRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -68,6 +70,8 @@
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.credentials.ImmutableCredential;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.BaseViewOperations;
Expand Down Expand Up @@ -381,23 +385,27 @@ private String defaultNamespaceLocation(Namespace namespace) {
}

private Set<String> getLocationsAllowedToBeAccessed(TableMetadata tableMetadata) {
return getLocationsAllowedToBeAccessed(tableMetadata.location(), tableMetadata.properties());
}

private Set<String> getLocationsAllowedToBeAccessed(IcebergTableLikeEntity tableMetadata) {
return getLocationsAllowedToBeAccessed(
tableMetadata.getBaseLocation(), tableMetadata.getPropertiesAsMap());
}

private Set<String> getLocationsAllowedToBeAccessed(
String baseLocation, Map<String, String> tableProperties) {
Set<String> locations = new HashSet<>();
locations.add(tableMetadata.location());
if (tableMetadata
.properties()
.containsKey(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)) {
locations.add(baseLocation);
if (tableProperties.containsKey(
IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)) {
locations.add(
tableMetadata
.properties()
.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY));
tableProperties.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY));
}
if (tableMetadata
.properties()
.containsKey(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) {
if (tableProperties.containsKey(
IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)) {
locations.add(
tableMetadata
.properties()
.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY));
tableProperties.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY));
}
return locations;
}
Expand Down Expand Up @@ -838,6 +846,43 @@ public boolean sendNotification(
PolarisEntitySubType.ICEBERG_TABLE, identifier, notificationRequest);
}

@Override
public Credential getCredentialConfig(
TableIdentifier tableIdentifier, Set<PolarisStorageActions> storageActions) {
BiFunction<FileIO, IcebergTableLikeEntity, Credential> credentialBuilder =
(fileIO, entity) ->
ImmutableCredential.builder()
.config(getCredentialConfig(tableIdentifier, entity, storageActions))
.prefix(entity.getBaseLocation())
.build();
return ((BasePolarisTableOperations) newTableOps(tableIdentifier))
.processLatestMetadata(credentialBuilder, () -> null);
}

@Override
public Map<String, String> getCredentialConfig(
TableIdentifier tableIdentifier,
IcebergTableLikeEntity tableMetadata,
Set<PolarisStorageActions> storageActions) {
Optional<PolarisEntity> storageInfo = findStorageInfo(tableIdentifier);
if (storageInfo.isEmpty()) {
LOGGER
.atWarn()
.addKeyValue("tableIdentifier", tableIdentifier)
.log("Table entity has no storage configuration in its hierarchy");
return Map.of();
}
return FileIOUtil.refreshCredentials(
callContext,
entityManager,
getCredentialVendor(),
callContext.getPolarisCallContext().getConfigurationStore(),
tableIdentifier,
getLocationsAllowedToBeAccessed(tableMetadata),
storageActions,
storageInfo.get());
}

@Override
public Map<String, String> getCredentialConfig(
TableIdentifier tableIdentifier,
Expand Down Expand Up @@ -1190,7 +1235,14 @@ public ViewBuilder withLocation(String newLocation) {
}
}

private class BasePolarisTableOperations extends BaseMetastoreTableOperations {
public interface TableMetadataProvider {
<T> T processLatestMetadata(
BiFunction<FileIO, IcebergTableLikeEntity, T> metadataFileProcessor,
Supplier<T> nullLocationHandler);
}

private class BasePolarisTableOperations extends BaseMetastoreTableOperations
implements TableMetadataProvider {
private final TableIdentifier tableIdentifier;
private final String fullTableName;
private FileIO tableFileIO;
Expand All @@ -1205,6 +1257,27 @@ private class BasePolarisTableOperations extends BaseMetastoreTableOperations {
@Override
public void doRefresh() {
LOGGER.debug("doRefresh for tableIdentifier {}", tableIdentifier);
BiFunction<FileIO, IcebergTableLikeEntity, Void> refreshConsumer =
(fileIO, entity) -> {
refreshFromMetadataLocation(
entity.getMetadataLocation(),
SHOULD_RETRY_REFRESH_PREDICATE,
getMaxMetadataRefreshRetries(),
metadataLocation -> TableMetadataParser.read(fileIO, metadataLocation));
return null;
};
processLatestMetadata(
refreshConsumer,
() -> {
this.disableRefresh();
return null;
});
}

@Override
public <T> T processLatestMetadata(
BiFunction<FileIO, IcebergTableLikeEntity, T> metadataFileProcessor,
Supplier<T> nullLocationHandler) {
// While doing refresh/commit protocols, we must fetch the fresh "passthrough" resolved
// table entity instead of the statically-resolved authz resolution set.
PolarisResolvedPathWrapper resolvedEntities =
Expand All @@ -1226,28 +1299,21 @@ public void doRefresh() {
String latestLocation = entity != null ? entity.getMetadataLocation() : null;
LOGGER.debug("Refreshing latestLocation: {}", latestLocation);
if (latestLocation == null) {
disableRefresh();
} else {
refreshFromMetadataLocation(
latestLocation,
SHOULD_RETRY_REFRESH_PREDICATE,
getMaxMetadataRefreshRetries(),
metadataLocation -> {
String latestLocationDir =
latestLocation.substring(0, latestLocation.lastIndexOf('/'));
// TODO: Once we have the "current" table properties pulled into the resolvedEntity
// then we should use the actual current table properties for IO refresh here
// instead of the general tableDefaultProperties.
FileIO fileIO =
loadFileIOForTableLike(
tableIdentifier,
Set.of(latestLocationDir),
resolvedEntities,
new HashMap<>(tableDefaultProperties),
Set.of(PolarisStorageActions.READ));
return TableMetadataParser.read(fileIO, metadataLocation);
});
return nullLocationHandler.get();
}

String latestLocationDir = latestLocation.substring(0, latestLocation.lastIndexOf('/'));
// TODO: Once we have the "current" table properties pulled into the resolvedEntity
// then we should use the actual current table properties for IO refresh here
// instead of the general tableDefaultProperties.
FileIO fileIO =
loadFileIOForTableLike(
tableIdentifier,
Set.of(latestLocationDir),
resolvedEntities,
new HashMap<>(tableDefaultProperties),
Set.of(PolarisStorageActions.READ));
return metadataFileProcessor.apply(fileIO, entity);
}

@Override
Expand Down Expand Up @@ -1363,6 +1429,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
.setCatalogId(getCatalogId())
.setSubType(PolarisEntitySubType.ICEBERG_TABLE)
.setBaseLocation(metadata.location())
.setProperties(metadata.properties())
.setId(
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
Expand All @@ -1372,6 +1439,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
new IcebergTableLikeEntity.Builder(entity)
.setBaseLocation(metadata.location())
.setMetadataLocation(newLocation)
.setProperties(metadata.properties())
.build();
}
if (!Objects.equal(existingLocation, oldLocation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.exceptions.NotAuthorizedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.rest.Endpoint;
import org.apache.iceberg.rest.RESTResponse;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
Expand All @@ -53,7 +54,7 @@
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
import org.apache.polaris.core.auth.PolarisAuthorizer;
Expand Down Expand Up @@ -236,15 +237,16 @@ public Response loadNamespaceMetadata(
* unable to get metadata location and logs a warning.
*/
private Response.ResponseBuilder tryInsertETagHeader(
Response.ResponseBuilder builder,
LoadTableResponse response,
String namespace,
String tableName) {
if (response.metadataLocation() != null) {
Response.ResponseBuilder builder, RESTResponse response, String namespace, String tableName) {
String metadataLocation =
response instanceof IcebergCatalogHandler.StreamingLoadTableResponse
? ((IcebergCatalogHandler.StreamingLoadTableResponse) response).metadataLocation()
: ((LoadTableResponse) response).metadataLocation();
if (metadataLocation != null) {
builder =
builder.header(
HttpHeaders.ETAG,
IcebergHttpUtil.generateETagForMetadataFileLocation(response.metadataLocation()));
IcebergHttpUtil.generateETagForMetadataFileLocation(metadataLocation));
} else {
LOGGER
.atWarn()
Expand Down Expand Up @@ -384,7 +386,7 @@ public Response loadTable(
securityContext,
prefix,
catalog -> {
LoadTableResponse response;
RESTResponse response;

if (delegationModes.isEmpty()) {
response =
Expand Down Expand Up @@ -541,13 +543,8 @@ public Response loadCredentials(
securityContext,
prefix,
catalog -> {
LoadTableResponse loadTableResponse =
catalog.loadTableWithAccessDelegation(tableIdentifier, "all");
return Response.ok(
ImmutableLoadCredentialsResponse.builder()
.credentials(loadTableResponse.credentials())
.build())
.build();
LoadCredentialsResponse response = catalog.loadAccessDelegation(tableIdentifier, "all");
return Response.ok(response).build();
});
}

Expand Down
Loading
Loading