Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive: Arrange common part of the code for Iceberg View. #12

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,26 +285,7 @@ private Map<String, String> tableOverrideProperties() {
}

protected static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
} else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}

for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}

sb.append(identifier.name());

return sb.toString();
return CatalogUtil.fullTableName(catalogName, identifier);
}

protected MetricsReporter metricsReporter() {
Expand Down
118 changes: 118 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseMetastoreOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreOperations.class);

public enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}

/**
* Attempt to load the content and see if any current or past metadata location matches the one we
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but don't have proof that this is the case. Note that all
* the previous locations must also be searched on the chance that a second committer was able to
* successfully commit on top of our commit.
*
* @param contentName full name of the content
* @param newMetadataLocation the path of the new commit file
* @param properties properties for retry
* @param newMetadataCheckSupplier check if the latest metadata presents or not using metadata
* location for table, version id for view.
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(
String contentName,
String newMetadataLocation,
Map<String, String> properties,
Supplier<Boolean> newMetadataCheckSupplier) {
int maxAttempts =
PropertyUtil.propertyAsInt(
properties, COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MIN_WAIT_MS, COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
properties, COMMIT_STATUS_CHECKS_MAX_WAIT_MS, COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
properties,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", contentName, checkException))
.run(
location -> {
boolean commitSuccess = newMetadataCheckSupplier.get();

if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
contentName,
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
contentName,
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
contentName,
maxAttempts);
}
return status.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,12 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand All @@ -42,14 +34,15 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseMetastoreTableOperations implements TableOperations {
public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperations
implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);

public static final String TABLE_TYPE_PROP = "table_type";
Expand Down Expand Up @@ -291,6 +284,8 @@ public long newSnapshotId() {
};
}

/** @deprecated Use {@link BaseMetastoreOperations.CommitStatus} */
@Deprecated
protected enum CommitStatus {
FAILURE,
SUCCESS,
Expand All @@ -309,65 +304,39 @@ protected enum CommitStatus {
* @return Commit Status of Success, Failure or Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
int maxAttempts =
PropertyUtil.propertyAsInt(
config.properties(), COMMIT_NUM_STATUS_CHECKS, COMMIT_NUM_STATUS_CHECKS_DEFAULT);
long minWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MIN_WAIT_MS,
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT);
long maxWaitMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_MAX_WAIT_MS,
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT);
long totalRetryMs =
PropertyUtil.propertyAsLong(
config.properties(),
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS,
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0)
.onFailure(
(location, checkException) ->
LOG.error("Cannot check if commit to {} exists.", tableName(), checkException))
.run(
location -> {
TableMetadata metadata = refresh();
String currentMetadataFileLocation = metadata.metadataFileLocation();
boolean commitSuccess =
currentMetadataFileLocation.equals(newMetadataLocation)
|| metadata.previousFiles().stream()
.anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info(
"Commit status check: Commit to {} of {} succeeded",
tableName(),
newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.warn(
"Commit status check: Commit to {} of {} unknown, new metadata location is not current "
+ "or in history",
tableName(),
newMetadataLocation);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error(
"Cannot determine commit state to {}. Failed during checking {} times. "
+ "Treating commit state as unknown.",
tableName(),
maxAttempts);
return CommitStatus.valueOf(
checkCommitStatus(
tableName(),
newMetadataLocation,
config.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation))
.name());
}

/**
* Checks the new metadata location presents or not after refreshing the table.
*
* @param newMetadataLocation newly written metadata location
* @return true if the new metadata location presents with current or previous metadata files.
*/
protected boolean checkCurrentMetadataLocation(String newMetadataLocation) {
TableMetadata metadata = refresh();
Preconditions.checkNotNull(metadata, "Unexpected null table metadata");
ImmutableList.Builder<String> builder = ImmutableList.builder();
String latestMetadataLocation = metadata.metadataFileLocation();
if (latestMetadataLocation != null) {
builder.add(latestMetadataLocation);
}
return status.get();

ImmutableList<String> allMetadataLocations =
builder
.addAll(
metadata.previousFiles().stream()
.map(TableMetadata.MetadataLogEntry::file)
.collect(Collectors.toList()))
.build();

return allMetadataLocations.contains(newMetadataLocation);
}

private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.ViewMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,6 +138,18 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

/**
* Drops view metadata files referenced by ViewMetadata.
*
* <p>This should be called by dropView implementations
*
* @param io a FileIO to use for deletes
* @param metadata the last valid ViewMetadata instance for a dropped view.
*/
public static void dropViewMetadata(FileIO io, ViewMetadata metadata) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
Expand Down Expand Up @@ -473,4 +487,27 @@ public static MetricsReporter loadMetricsReporter(Map<String, String> properties

return reporter;
}

public static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
// use / for URI-like names: thrift://host:port/db.table
sb.append(catalogName);
if (!catalogName.endsWith("/")) {
sb.append("/");
}
} else {
// use . for non-URI named catalogs: prod.db.table
sb.append(catalogName).append(".");
}

for (String level : identifier.namespace().levels()) {
sb.append(level).append(".");
}

sb.append(identifier.name());

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.BaseMetastoreOperations;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand All @@ -35,7 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseViewOperations implements ViewOperations {
public abstract class BaseViewOperations extends BaseMetastoreOperations implements ViewOperations {
private static final Logger LOG = LoggerFactory.getLogger(BaseViewOperations.class);

private static final String METADATA_FOLDER_NAME = "metadata";
Expand Down
Loading
Loading