From 1a9acf8e1b694cfc83c23ff39aec5f673a2830fe Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Wed, 18 Mar 2020 13:56:24 -0700 Subject: [PATCH] Support reading tables with only Hive metadata (#23) * Support reading tables with Hive metadata if Iceberg metadata is not available --- .../org/apache/iceberg/BaseFileScanTask.java | 4 +- .../apache/iceberg/BaseMetastoreCatalog.java | 4 +- .../iceberg/BaseMetastoreTableOperations.java | 9 + .../org/apache/iceberg/hive/HiveCatalogs.java | 12 + ...HiveMetadataPreservingTableOperations.java | 1 + .../iceberg/hive/HiveTableOperations.java | 5 + .../iceberg/hive/legacy/DirectoryInfo.java | 51 +++ .../iceberg/hive/legacy/FileSystemUtils.java | 61 ++++ .../iceberg/hive/legacy/HiveExpressions.java | 307 ++++++++++++++++++ .../hive/legacy/LegacyHiveCatalog.java | 105 ++++++ .../iceberg/hive/legacy/LegacyHiveTable.java | 209 ++++++++++++ .../legacy/LegacyHiveTableOperations.java | 211 ++++++++++++ .../hive/legacy/LegacyHiveTableScan.java | 87 +++++ .../hive/legacy/LegacyHiveTableUtils.java | 143 ++++++++ .../hive/legacy/TestHiveExpressions.java | 113 +++++++ .../hive/legacy/TestLegacyHiveTableScan.java | 248 ++++++++++++++ 16 files changed, 1566 insertions(+), 4 deletions(-) create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java create mode 100644 hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java create mode 100644 hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index 988b07eb0..f9950047c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -29,7 +29,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ResidualEvaluator; -class BaseFileScanTask implements FileScanTask { +public class BaseFileScanTask implements FileScanTask { private final DataFile file; private final String schemaString; private final String specString; @@ -37,7 +37,7 @@ class BaseFileScanTask implements FileScanTask { private transient PartitionSpec spec = null; - BaseFileScanTask(DataFile file, String schemaString, String specString, ResidualEvaluator residuals) { + public BaseFileScanTask(DataFile file, String schemaString, String specString, ResidualEvaluator residuals) { this.file = file; this.schemaString = schemaString; this.specString = specString; diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index fd6a817ba..72d387da4 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -177,7 +177,7 @@ private Table loadMetadataTable(TableIdentifier identifier) { } } - private boolean isValidMetadataIdentifier(TableIdentifier identifier) { + protected boolean isValidMetadataIdentifier(TableIdentifier identifier) { return MetadataTableType.from(identifier.name()) != null && isValidIdentifier(TableIdentifier.of(identifier.namespace().levels())); } @@ -275,7 +275,7 @@ private static void deleteFiles(FileIO io, Set allManifests) { }); } - private static String fullTableName(String catalogName, TableIdentifier identifier) { + protected static String fullTableName(String catalogName, TableIdentifier identifier) { StringBuilder sb = new StringBuilder(); if (catalogName.contains("/") || catalogName.contains(":")) { diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 0d4432e86..d02b7e787 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -270,4 +270,13 @@ private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metada .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); } } + + protected void setCurrentMetadata(TableMetadata currentMetadata) { + this.currentMetadata = currentMetadata; + } + + protected void setShouldRefresh(boolean shouldRefresh) { + this.shouldRefresh = shouldRefresh; + } + } diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java index d282c9901..6026ba63f 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.hive.legacy.LegacyHiveCatalog; public final class HiveCatalogs { @@ -39,6 +40,11 @@ public final class HiveCatalogs { .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) .build(); + private static final Cache LEGACY_CATALOG_CACHE = Caffeine.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) + .build(); + private HiveCatalogs() {} public static HiveCatalog loadCatalog(Configuration conf) { @@ -47,6 +53,12 @@ public static HiveCatalog loadCatalog(Configuration conf) { return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf)); } + public static HiveCatalog loadLegacyCatalog(Configuration conf) { + // metastore URI can be null in local mode + String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); + return LEGACY_CATALOG_CACHE.get(metastoreUri, uri -> new LegacyHiveCatalog(conf)); + } + /** * @deprecated Use {@link #loadHiveMetadataPreservingCatalog(Configuration)} instead */ diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java index d6d9f79ea..41c09374f 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java @@ -83,6 +83,7 @@ protected void doRefresh() { String errMsg = String.format("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP); throw new IllegalArgumentException(errMsg); } + if (!io().newInputFile(metadataLocation).exists()) { String errMsg = String.format("%s property for %s.%s points to a non-existent file %s", METADATA_LOCATION_PROP, database, tableName, metadataLocation); diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index d25fcf9ae..80bc0b850 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -113,6 +113,11 @@ protected void doRefresh() { throw new IllegalArgumentException(errMsg); } + if (!io().newInputFile(metadataLocation).exists()) { + String errMsg = String.format("%s property for %s.%s points to a non-existent file %s", + METADATA_LOCATION_PROP, database, tableName, metadataLocation); + throw new IllegalArgumentException(errMsg); + } } catch (NoSuchObjectException e) { if (currentMetadataLocation() != null) { throw new NoSuchTableException(String.format("No such table: %s.%s", database, tableName)); diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java new file mode 100644 index 000000000..50de7957c --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.StructLike; + + +/** + * Metadata for a data directory referenced by either a Hive table or a partition + */ +class DirectoryInfo { + private final String location; + private final FileFormat format; + private final StructLike partitionData; + + DirectoryInfo(String location, FileFormat format, StructLike partitionData) { + this.location = location; + this.format = format; + this.partitionData = partitionData; + } + + public String location() { + return location; + } + + public FileFormat format() { + return format; + } + + public StructLike partitionData() { + return partitionData; + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java new file mode 100644 index 000000000..81cad5f5b --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.iceberg.exceptions.RuntimeIOException; + + +class FileSystemUtils { + + private FileSystemUtils() {} + + /** + * Lists all non-hidden files for the given directory + */ + static List listFiles(String directory, Configuration conf) { + + final Path directoryPath = new Path(directory); + final FileStatus[] files; + try { + FileSystem fs = directoryPath.getFileSystem(conf); + files = fs.listStatus(directoryPath, HiddenPathFilter.INSTANCE); + } catch (IOException e) { + throw new RuntimeIOException(e, "Error listing files for directory: " + directory); + } + return Arrays.asList(files); + } + + private enum HiddenPathFilter implements PathFilter { + INSTANCE; + + @Override + public boolean accept(Path path) { + return !path.getName().startsWith("_") && !path.getName().startsWith("."); + } + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java new file mode 100644 index 000000000..7d545f71b --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.expressions.UnboundTerm; + + +class HiveExpressions { + + private HiveExpressions() {} + + /** + * Simplifies the {@link Expression} so that it fits the restrictions of the expression that can be passed + * to the Hive metastore. For details about the simplification, please see {@link RemoveNonPartitionPredicates} and + * {@link RewriteUnsupportedOperators} + * @param expr The {@link Expression} to be simplified + * @param partitionColumnNames The set of partition column names + * @return TRUE if the simplified expression results in an always true expression or if there are no predicates on + * partition columns in the simplified expression, + * FALSE if the simplified expression results in an always false expression, + * otherwise returns the simplified expression + */ + static Expression simplifyPartitionFilter(Expression expr, Set partitionColumnNames) { + try { + Expression partitionPredicatesOnly = ExpressionVisitors.visit(expr, + new RemoveNonPartitionPredicates(partitionColumnNames)); + return ExpressionVisitors.visit(partitionPredicatesOnly, new RewriteUnsupportedOperators()); + } catch (Exception e) { + throw new RuntimeException("Error while processing expression: " + expr, e); + } + } + + /** + * Converts an {@link Expression} into a filter string which can be passed to the Hive metastore + * + * It is expected that caller handles TRUE and FALSE expressions before calling this method. The given + * {@link Expressions} must also be passed through {@link #simplifyPartitionFilter(Expression, Set)} first to + * remove any unsupported predicates. + * @param expr The {@link Expression} to be converted into a filter string + * @return a filter string equivalent to the given {@link Expression} which can be passed to the Hive metastore + */ + static String toPartitionFilterString(Expression expr) { + return ExpressionVisitors.visit(expr, ExpressionToPartitionFilterString.get()); + } + + /** + * Removes any predicates on non-partition columns from the given {@link Expression} + */ + private static class RemoveNonPartitionPredicates extends ExpressionVisitors.ExpressionVisitor { + + private final Set partitionColumnNamesLowerCase; + + RemoveNonPartitionPredicates(Set partitionColumnNames) { + this.partitionColumnNamesLowerCase = + partitionColumnNames.stream().map(String::toLowerCase).collect(Collectors.toSet()); + } + + @Override + public Expression alwaysTrue() { + return Expressions.alwaysTrue(); + } + + @Override + public Expression alwaysFalse() { + return Expressions.alwaysFalse(); + } + + @Override + public Expression not(Expression result) { + return Expressions.not(result); + } + + @Override + public Expression and(Expression leftResult, Expression rightResult) { + return Expressions.and(leftResult, rightResult); + } + + @Override + public Expression or(Expression leftResult, Expression rightResult) { + return Expressions.or(leftResult, rightResult); + } + + @Override + public Expression predicate(BoundPredicate pred) { + throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName()); + } + + @Override + public Expression predicate(UnboundPredicate pred) { + return (partitionColumnNamesLowerCase.contains(pred.ref().name().toLowerCase())) ? pred + : Expressions.alwaysTrue(); + } + } + + /** + * Rewrites the {@link Expression} so that it fits the restrictions of the expression that can be passed + * to the Hive metastore. + * + * This visitor assumes that all predicates are on partition columns. Predicates on non-partition columns should be + * removed using {@link RemoveNonPartitionPredicates} before calling this visitor. It performs the following changes: + * 1. Rewrites NOT operators by inverting binary operators, negating unary literals and using De Morgan's laws + * e.g. NOT(value > 0 AND TRUE) => value <= 0 OR FALSE + * NOT(value < 0 OR value > 10) => value >= 0 AND value <= 10 + * 2. Removes IS NULL and IS NOT NULL predicates (Replaced with FALSE and TRUE respectively as partition column values + * are always non null for Hive) + * e.g. partitionColumn IS NULL => FALSE + * partitionColumn IS NOT NULL => TRUE + * 3. Expands IN and NOT IN operators into ORs of EQUAL operations and ANDs of NOT EQUAL operations respectively + * e.g. value IN (1, 2, 3) => value = 1 OR value = 2 OR value = 3 + * value NOT IN (1, 2, 3) => value != 1 AND value != 2 AND value != 3 + * 4. Removes any children TRUE and FALSE expressions. The checks to remove these are happening inside + * {@link Expressions#and(Expression, Expression)} and {@link Expressions#or(Expression, Expression)} + * (Note that the rewritten expression still can be TRUE and FALSE at the root and will have to be handled + * appropriately by the caller) + * + * For examples take a look at the tests in {@code TestHiveExpressions} + */ + private static class RewriteUnsupportedOperators extends ExpressionVisitors.ExpressionVisitor { + + @Override + public Expression alwaysTrue() { + return Expressions.alwaysTrue(); + } + + @Override + public Expression alwaysFalse() { + return Expressions.alwaysFalse(); + } + + @Override + public Expression not(Expression result) { + return result.negate(); + } + + @Override + public Expression and(Expression leftResult, Expression rightResult) { + return Expressions.and(leftResult, rightResult); + } + + @Override + public Expression or(Expression leftResult, Expression rightResult) { + return Expressions.or(leftResult, rightResult); + } + + Expression in(UnboundTerm term, List> literals) { + Expression in = alwaysFalse(); + for (Literal literal : literals) { + in = Expressions.or(in, Expressions.equal(term, literal.value())); + } + return in; + } + + Expression notIn(UnboundTerm term, List> literals) { + Expression notIn = alwaysTrue(); + for (Literal literal : literals) { + notIn = Expressions.and(notIn, Expressions.notEqual(term, literal.value())); + } + return notIn; + } + + @Override + public Expression predicate(BoundPredicate pred) { + throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName()); + } + + @Override + public Expression predicate(UnboundPredicate pred) { + switch (pred.op()) { + case LT: + case LT_EQ: + case GT: + case GT_EQ: + case EQ: + case NOT_EQ: + return pred; + case IS_NULL: + return Expressions.alwaysFalse(); + case NOT_NULL: + return Expressions.alwaysTrue(); + case IN: + return in(pred.term(), pred.literals()); + case NOT_IN: + return notIn(pred.term(), pred.literals()); + case STARTS_WITH: + throw new UnsupportedOperationException("STARTS_WITH predicate not supported in partition filter " + + "expression. Please use a combination of greater than AND less than predicates instead."); + default: + throw new IllegalStateException("Unexpected predicate: " + pred.op()); + } + } + } + + private static class ExpressionToPartitionFilterString extends ExpressionVisitors.ExpressionVisitor { + private static final ExpressionToPartitionFilterString INSTANCE = new ExpressionToPartitionFilterString(); + + private ExpressionToPartitionFilterString() { + } + + static ExpressionToPartitionFilterString get() { + return INSTANCE; + } + + @Override + public String alwaysTrue() { + throw new IllegalStateException("TRUE literal not allowed in Hive partition filter string"); + } + + @Override + public String alwaysFalse() { + throw new IllegalStateException("FALSE literal not allowed in Hive partition filter string"); + } + + @Override + public String not(String result) { + throw new IllegalStateException("NOT operator not allowed in Hive partition filter string"); + } + + @Override + public String and(String leftResult, String rightResult) { + return String.format("((%s) AND (%s))", leftResult, rightResult); + } + + @Override + public String or(String leftResult, String rightResult) { + return String.format("((%s) OR (%s))", leftResult, rightResult); + } + + @Override + public String predicate(BoundPredicate pred) { + throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName()); + } + + @Override + public String predicate(UnboundPredicate pred) { + switch (pred.op()) { + case LT: + case LT_EQ: + case GT: + case GT_EQ: + case EQ: + case NOT_EQ: + return getBinaryExpressionString(pred.ref().name(), pred.op(), pred.literal()); + default: + throw new IllegalStateException("Unexpected operator in Hive partition filter string: " + pred.op()); + } + } + + private String getBinaryExpressionString(String columnName, Expression.Operation op, Literal lit) { + return String.format("( %s %s %s )", columnName, getOperationString(op), getLiteralValue(lit)); + } + + private String getOperationString(Expression.Operation op) { + switch (op) { + case LT: + return "<"; + case LT_EQ: + return "<="; + case GT: + return ">"; + case GT_EQ: + return ">="; + case EQ: + return "="; + case NOT_EQ: + return "!="; + default: + throw new IllegalStateException("Unexpected operator in Hive partition filter string: " + op); + } + } + + private String getLiteralValue(Literal lit) { + Object value = lit.value(); + if (value instanceof String) { + String escapedString = ((String) value).replace("'", "\\'"); + return String.format("'%s'", escapedString); + } else { + return String.valueOf(value); + } + } + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java new file mode 100644 index 000000000..c97d29a94 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hive.HiveCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link HiveCatalog} which uses Hive metadata to read tables. Features like time travel, snapshot isolation and + * incremental computation are not supported along with any WRITE operations to either the data or metadata. + */ +public class LegacyHiveCatalog extends HiveCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveCatalog.class); + + public LegacyHiveCatalog(Configuration conf) { + super(conf); + } + + @Override + @SuppressWarnings("CatchBlockLogException") + public Table loadTable(TableIdentifier identifier) { + if (isValidIdentifier(identifier)) { + TableOperations ops = newTableOps(identifier); + if (ops.current() == null) { + throw new NoSuchTableException("Table does not exist: %s", identifier); + } + + return new LegacyHiveTable(ops, fullTableName(name(), identifier)); + } else if (isValidMetadataIdentifier(identifier)) { + throw new UnsupportedOperationException( + "Metadata views not supported for Hive tables without Iceberg metadata. Table: " + identifier); + } else { + throw new NoSuchTableException("Invalid table identifier: %s", identifier); + } + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new LegacyHiveTableOperations(conf(), clientPool(), dbName, tableName); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + throw new UnsupportedOperationException( + "Dropping tables not supported through legacy Hive catalog. Table: " + identifier); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + throw new UnsupportedOperationException( + "Renaming tables not supported through legacy Hive catalog. From: " + from + " To: " + to); + } + + @Override + public Table createTable(TableIdentifier identifier, Schema schema, PartitionSpec spec, String location, + Map properties) { + throw new UnsupportedOperationException( + "Creating tables not supported through legacy Hive catalog. Table: " + identifier); + } + + @Override + public Transaction newCreateTableTransaction(TableIdentifier identifier, Schema schema, PartitionSpec spec, + String location, Map properties) { + throw new UnsupportedOperationException( + "Creating tables not supported through legacy Hive catalog. Table: " + identifier); + } + + @Override + public Transaction newReplaceTableTransaction(TableIdentifier identifier, Schema schema, PartitionSpec spec, + String location, Map properties, boolean orCreate) { + throw new UnsupportedOperationException( + "Replacing tables not supported through legacy Hive catalog. Table: " + identifier); + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java new file mode 100644 index 000000000..d481e5937 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.RewriteManifests; +import org.apache.iceberg.Rollback; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateLocation; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; + + +/** + * A {@link Table} which uses Hive table/partition metadata to perform scans using {@link LegacyHiveTableScan}. + * This table does not provide any time travel, snapshot isolation, incremental computation benefits. + * It also does not allow any WRITE operations to either the data or metadata. + */ +public class LegacyHiveTable implements Table, HasTableOperations { + private final TableOperations ops; + private final String name; + + LegacyHiveTable(TableOperations ops, String name) { + this.ops = ops; + this.name = name; + } + + @Override + public TableOperations operations() { + return ops; + } + + @Override + public void refresh() { + ops.refresh(); + } + + @Override + public TableScan newScan() { + return new LegacyHiveTableScan(ops, this); + } + + @Override + public Schema schema() { + return ops.current().schema(); + } + + @Override + public PartitionSpec spec() { + return ops.current().spec(); + } + + @Override + public Map specs() { + throw new UnsupportedOperationException( + "Multiple partition specs not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Map properties() { + return ops.current().properties(); + } + + @Override + public String location() { + return ops.current().location(); + } + + @Override + public Snapshot currentSnapshot() { + throw new UnsupportedOperationException("Snapshots not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Snapshot snapshot(long snapshotId) { + throw new UnsupportedOperationException("Snapshots not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Iterable snapshots() { + throw new UnsupportedOperationException("Snapshots not supported for Hive tables without Iceberg metadata"); + } + + @Override + public List history() { + throw new UnsupportedOperationException("History not available for Hive tables without Iceberg metadata"); + } + + @Override + public UpdateSchema updateSchema() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public UpdateProperties updateProperties() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public UpdateLocation updateLocation() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public AppendFiles newAppend() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public RewriteFiles newRewrite() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public RewriteManifests rewriteManifests() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public OverwriteFiles newOverwrite() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public ReplacePartitions newReplacePartitions() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public DeleteFiles newDelete() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public ExpireSnapshots expireSnapshots() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Rollback rollback() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public ManageSnapshots manageSnapshots() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Transaction newTransaction() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public FileIO io() { + return ops.io(); + } + + @Override + public EncryptionManager encryption() { + return ops.encryption(); + } + + @Override + public LocationProvider locationProvider() { + return ops.locationProvider(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java new file mode 100644 index 000000000..52b01b698 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hive.HiveClientPool; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LegacyHiveTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveTableOperations.class); + + private final HiveClientPool metaClients; + private final String databaseName; + private final String tableName; + private final Configuration conf; + + private FileIO fileIO; + + protected LegacyHiveTableOperations(Configuration conf, HiveClientPool metaClients, String database, String table) { + this.conf = conf; + this.metaClients = metaClients; + this.databaseName = database; + this.tableName = table; + } + + @Override + public FileIO io() { + if (fileIO == null) { + fileIO = new HadoopFileIO(conf); + } + + return fileIO; + } + + @Override + protected void doRefresh() { + try { + org.apache.hadoop.hive.metastore.api.Table hiveTable = + metaClients.run(client -> client.getTable(databaseName, tableName)); + + Schema schema = LegacyHiveTableUtils.getSchema(hiveTable); + PartitionSpec spec = LegacyHiveTableUtils.getPartitionSpec(hiveTable, schema); + + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, hiveTable.getSd().getLocation(), + LegacyHiveTableUtils.getTableProperties(hiveTable)); + setCurrentMetadata(metadata); + } catch (TException e) { + String errMsg = String.format("Failed to get table info from metastore %s.%s", databaseName, tableName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + setShouldRefresh(false); + } + + /** + * Returns an {@link Iterable} of {@link Iterable}s of {@link DataFile}s which belong to the current table and + * match the partition predicates from the given expression. + * + * Each element in the outer {@link Iterable} maps to an {@link Iterable} of {@link DataFile}s originating from the + * same directory + */ + Iterable> getFilesByFilter(Expression expression) { + Iterable matchingDirectories; + if (current().spec().fields().isEmpty()) { + matchingDirectories = ImmutableList.of(getDirectoryInfo()); + } else { + matchingDirectories = getDirectoryInfosByFilter(expression); + } + + Iterable> filesPerDirectory = Iterables.transform(matchingDirectories, directory -> { + return Iterables.transform(FileSystemUtils.listFiles(directory.location(), conf), + file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format())); + }); + + // Note that we return an Iterable of Iterables here so that the TableScan can process iterables of individual + // directories in parallel hence resulting in a parallel file listing + return filesPerDirectory; + } + + private DirectoryInfo getDirectoryInfo() { + Preconditions.checkArgument(current().spec().fields().isEmpty(), + "getDirectoryInfo only allowed for unpartitioned tables"); + try { + org.apache.hadoop.hive.metastore.api.Table hiveTable = + metaClients.run(client -> client.getTable(databaseName, tableName)); + + return LegacyHiveTableUtils.toDirectoryInfo(hiveTable); + } catch (TException e) { + String errMsg = String.format("Failed to get table info for %s.%s from metastore", databaseName, tableName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getDirectoryInfo", e); + } + } + + private List getDirectoryInfosByFilter(Expression expression) { + Preconditions.checkArgument(!current().spec().fields().isEmpty(), + "getDirectoryInfosByFilter only allowed for partitioned tables"); + try { + LOG.info("Fetching partitions for {}.{} with expression: {}", databaseName, tableName, expression); + Set partitionColumnNames = current().spec() + .identitySourceIds() + .stream() + .map(id -> current().schema().findColumnName(id)) + .collect(Collectors.toSet()); + Expression simplified = HiveExpressions.simplifyPartitionFilter(expression, partitionColumnNames); + LOG.info("Simplified expression for {}.{} to {}", databaseName, tableName, simplified); + + final List partitions; + if (simplified.equals(Expressions.alwaysFalse())) { + // If simplifyPartitionFilter returns FALSE, no partitions are going to match the filter expression + partitions = ImmutableList.of(); + } else if (simplified.equals(Expressions.alwaysTrue())) { + // If simplifyPartitionFilter returns TRUE, all partitions are going to match the filter expression + partitions = metaClients.run(client -> client.listPartitionsByFilter( + databaseName, tableName, null, (short) -1)); + } else { + String partitionFilterString = HiveExpressions.toPartitionFilterString(simplified); + LOG.info("Listing partitions for {}.{} with filter string: {}", databaseName, tableName, partitionFilterString); + partitions = metaClients.run( + client -> client.listPartitionsByFilter(databaseName, tableName, partitionFilterString, (short) -1)); + } + + return LegacyHiveTableUtils.toDirectoryInfos(partitions); + } catch (TException e) { + String errMsg = String.format("Failed to get partition info for %s.%s + expression %s from metastore", + databaseName, tableName, expression); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getPartitionsByFilter", e); + } + } + + private static DataFile createDataFile(FileStatus fileStatus, PartitionSpec partitionSpec, StructLike partitionData, + FileFormat format) { + DataFiles.Builder builder = DataFiles.builder(partitionSpec) + .withPath(fileStatus.getPath().toString()) + .withFormat(format) + .withFileSizeInBytes(fileStatus.getLen()) + .withMetrics(new Metrics(10000L, null, null, null, null, null)); + + if (partitionSpec.fields().isEmpty()) { + return builder.build(); + } else { + return builder.withPartition(partitionData).build(); + } + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public String metadataFileLocation(String filename) { + throw new UnsupportedOperationException( + "Metadata file location not available for Hive tables without Iceberg metadata"); + } + + @Override + public LocationProvider locationProvider() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java new file mode 100644 index 000000000..7e375caec --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DataTableScan; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.util.ParallelIterable; +import org.apache.iceberg.util.ThreadPools; + + +/** + * A {@link DataTableScan} which uses Hive table and partition metadata to read tables. + * This scan does not provide any time travel, snapshot isolation, incremental computation benefits. + */ +public class LegacyHiveTableScan extends DataTableScan { + + protected LegacyHiveTableScan(TableOperations ops, Table table) { + super(ops, table); + } + + protected LegacyHiveTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter, + boolean caseSensitive, boolean colStats, Collection selectedColumns, + ImmutableMap options) { + super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options); + } + + @Override + @SuppressWarnings("checkstyle:HiddenField") + protected TableScan newRefinedScan(TableOperations ops, Table table, Long snapshotId, Schema schema, + Expression rowFilter, boolean caseSensitive, boolean colStats, Collection selectedColumns, + ImmutableMap options) { + return new LegacyHiveTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, + options); + } + + @Override + public CloseableIterable planFiles() { + LegacyHiveTableOperations hiveOps = (LegacyHiveTableOperations) tableOps(); + PartitionSpec spec = hiveOps.current().spec(); + String schemaString = SchemaParser.toJson(spec.schema()); + String specString = PartitionSpecParser.toJson(spec); + ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter(), isCaseSensitive()); + + Iterable> tasks = Iterables.transform(hiveOps.getFilesByFilter(filter()), fileIterable -> + Iterables.transform(fileIterable, file -> new BaseFileScanTask(file, schemaString, specString, residuals))); + + return new ParallelIterable<>(tasks, ThreadPools.getWorkerPool()); + } + + @Override + public CloseableIterable planFiles(TableOperations ops, Snapshot snapshot, + Expression rowFilter, boolean caseSensitive, boolean colStats) { + throw new IllegalStateException("Control flow should never reach here"); + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java new file mode 100644 index 000000000..f2fcc31db --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +class LegacyHiveTableUtils { + + private LegacyHiveTableUtils() {} + + private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveTableUtils.class); + + static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { + Map props = getTableProperties(table); + String schemaStr = props.get("avro.schema.literal"); + Schema schema; + if (schemaStr == null) { + LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. The schema will not" + + " have case sensitivity and nullability information", table.getDbName(), table.getTableName()); + // TODO: Add support for tables without avro.schema.literal + throw new UnsupportedOperationException("Reading tables without avro.schema.literal not implemented yet"); + } else { + schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr)); + } + + List partCols = table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); + return addPartitionColumnsIfRequired(schema, partCols); + } + + private static Schema addPartitionColumnsIfRequired(Schema schema, List partitionColumns) { + List fields = new ArrayList<>(schema.columns()); + AtomicInteger fieldId = new AtomicInteger(10000); + partitionColumns.stream().forEachOrdered(column -> { + Types.NestedField field = schema.findField(column); + if (field == null) { + // TODO: Support partition fields with non-string types + fields.add(Types.NestedField.required(fieldId.incrementAndGet(), column, Types.StringType.get())); + } else { + Preconditions.checkArgument(field.type().equals(Types.StringType.get()), + "Tables with non-string partition columns not supported yet"); + } + }); + return new Schema(fields); + } + + static Map getTableProperties(org.apache.hadoop.hive.metastore.api.Table table) { + Map props = new HashMap<>(); + props.putAll(table.getSd().getSerdeInfo().getParameters()); + props.putAll(table.getSd().getParameters()); + props.putAll(table.getParameters()); + return props; + } + + static PartitionSpec getPartitionSpec(org.apache.hadoop.hive.metastore.api.Table table, Schema schema) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + + table.getPartitionKeys().forEach(fieldSchema -> { + // TODO: Support partition fields with non-string types + Preconditions.checkArgument(fieldSchema.getType().equals("string"), + "Tables with non-string partition columns not supported yet"); + builder.identity(fieldSchema.getName()); + }); + + return builder.build(); + } + + static DirectoryInfo toDirectoryInfo(org.apache.hadoop.hive.metastore.api.Table table) { + return new DirectoryInfo(table.getSd().getLocation(), + serdeToFileFormat(table.getSd().getSerdeInfo().getSerializationLib()), null); + } + + static List toDirectoryInfos(List partitions) { + return partitions.stream().map(p -> { + return new DirectoryInfo(p.getSd().getLocation(), + serdeToFileFormat(p.getSd().getSerdeInfo().getSerializationLib()), buildPartitionStructLike(p.getValues())); + }).collect(Collectors.toList()); + } + + private static StructLike buildPartitionStructLike(List partitionValues) { + return new StructLike() { + + @Override + public int size() { + return partitionValues.size(); + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(partitionValues.get(pos)); + } + + @Override + public void set(int pos, T value) { + throw new IllegalStateException("Read-only"); + } + }; + } + + private static FileFormat serdeToFileFormat(String serde) { + switch (serde) { + case "org.apache.hadoop.hive.serde2.avro.AvroSerDe": + return FileFormat.AVRO; + case "org.apache.hadoop.hive.ql.io.orc.OrcSerde": + return FileFormat.ORC; + default: + throw new IllegalArgumentException("Unrecognized serde: " + serde); + } + } +} diff --git a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java new file mode 100644 index 000000000..4489e003d --- /dev/null +++ b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.ImmutableSet; +import org.apache.iceberg.expressions.Expression; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.expressions.Expressions.alwaysFalse; +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notEqual; +import static org.apache.iceberg.expressions.Expressions.notIn; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; +import static org.apache.iceberg.hive.legacy.HiveExpressions.simplifyPartitionFilter; +import static org.apache.iceberg.hive.legacy.HiveExpressions.toPartitionFilterString; + + +public class TestHiveExpressions { + + @Test + public void testSimplifyRemoveNonPartitionColumns() { + Expression input = and(and(equal("pCol", 1), equal("nonpCol", 2)), isNull("nonpCol")); + Expression expected = equal("pCol", 1); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveNot() { + Expression input = not(and(equal("pCol", 1), equal("pCol", 2))); + Expression expected = or(notEqual("pCol", 1), notEqual("pCol", 2)); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveIsNull() { + Expression input = isNull("pcol"); + Expression expected = alwaysFalse(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveNotNull() { + Expression input = notNull("pcol"); + Expression expected = alwaysTrue(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyExpandIn() { + Expression input = in("pcol", 1, 2, 3); + Expression expected = or(or(equal("pcol", 1), equal("pcol", 2)), equal("pcol", 3)); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyExpandNotIn() { + Expression input = notIn("pcol", 1, 2, 3); + Expression expected = and(and(notEqual("pcol", 1), notEqual("pcol", 2)), notEqual("pcol", 3)); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveAlwaysTrueChildren() { + Expression input = and(alwaysTrue(), equal("pcol", 1)); + Expression expected = equal("pcol", 1); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + + input = or(alwaysTrue(), equal("pcol", 1)); + expected = alwaysTrue(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveAlwaysFalseChildren() { + Expression input = and(alwaysFalse(), equal("pcol", 1)); + Expression expected = alwaysFalse(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + + input = or(alwaysFalse(), equal("pcol", 1)); + expected = equal("pcol", 1); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testToPartitionFilterStringEscapeStringLiterals() { + Expression input = equal("pcol", "s'1"); + Assert.assertEquals("( pcol = 's\\'1' )", toPartitionFilterString(input)); + } +} diff --git a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java new file mode 100644 index 000000000..277ab75f1 --- /dev/null +++ b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveMetastoreTest; +import org.apache.iceberg.io.CloseableIterable; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.iceberg.FileFormat.AVRO; +import static org.apache.iceberg.FileFormat.ORC; + + +public class TestLegacyHiveTableScan extends HiveMetastoreTest { + + private static final List DATA_COLUMNS = ImmutableList.of( + new FieldSchema("strCol", "string", ""), + new FieldSchema("intCol", "int", "")); + private static final List PARTITION_COLUMNS = ImmutableList.of( + new FieldSchema("pcol", "string", "")); + private static HiveCatalog legacyCatalog; + private static Path dbPath; + + @BeforeClass + public static void beforeClass() throws Exception { + legacyCatalog = new LegacyHiveCatalog(HiveMetastoreTest.hiveConf); + dbPath = Paths.get(URI.create(metastoreClient.getDatabase(DB_NAME).getLocationUri())); + } + + @AfterClass + public static void afterClass() { + legacyCatalog.close(); + TestLegacyHiveTableScan.legacyCatalog = null; + } + + @Test + public void testHiveScanUnpartitioned() throws Exception { + String tableName = "unpartitioned"; + Table table = createTable(tableName, DATA_COLUMNS, ImmutableList.of()); + addFiles(table, AVRO, "A", "B"); + filesMatch(ImmutableMap.of("A", AVRO, "B", AVRO), hiveScan(table)); + } + + @Test + public void testHiveScanSinglePartition() throws Exception { + String tableName = "single_partition"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A", "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=1/B", AVRO), hiveScan(table)); + } + + @Test + public void testHiveScanMultiPartition() throws Exception { + String tableName = "multi_partition"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), AVRO, "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", AVRO), hiveScan(table)); + } + + @Test + public void testHiveScanMultiPartitionWithFilter() throws Exception { + String tableName = "multi_partition_with_filter"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), AVRO, "B"); + filesMatch(ImmutableMap.of("pcol=2/B", AVRO), hiveScan(table, Expressions.equal("pcol", "2"))); + } + + @Test + public void testHiveScanMultiPartitionWithNonPartitionFilter() throws Exception { + String tableName = "multi_partition_with_non_partition_filter"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), AVRO, "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", AVRO), hiveScan(table, Expressions.equal("intCol", 1))); + } + + @Test + public void testHiveScanHybridTable() throws Exception { + String tableName = "hybrid_table"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), ORC, "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", ORC), hiveScan(table)); + } + + private static Table createTable(String tableName, List columns, List partitionColumns) + throws Exception { + long currentTimeMillis = System.currentTimeMillis(); + Path tableLocation = dbPath.resolve(tableName); + Files.createDirectories(tableLocation); + Table tbl = new Table(tableName, + DB_NAME, + System.getProperty("user.name"), + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + Integer.MAX_VALUE, + storageDescriptor(columns, tableLocation.toString(), AVRO), + partitionColumns, + new HashMap<>(), + null, + null, + TableType.EXTERNAL_TABLE.toString()); + tbl.getParameters().put("EXTERNAL", "TRUE"); + metastoreClient.createTable(tbl); + return tbl; + } + + private static StorageDescriptor storageDescriptor(List columns, String location, FileFormat format) { + final StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(columns); + storageDescriptor.setLocation(location); + SerDeInfo serDeInfo = new SerDeInfo(); + switch (format) { + case AVRO: + storageDescriptor.setOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"); + storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"); + serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.avro.AvroSerDe"); + break; + case ORC: + storageDescriptor.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); + storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); + serDeInfo.setSerializationLib("org.apache.hadoop.hive.ql.io.orc.OrcSerde"); + break; + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + storageDescriptor.setSerdeInfo(serDeInfo); + storageDescriptor.setParameters(ImmutableMap.of( + AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schemaLiteral(columns))); + return storageDescriptor; + } + + private static String schemaLiteral(List columns) { + List columnNames = columns.stream().map(FieldSchema::getName).collect(Collectors.toList()); + List columnTypes = columns.stream().map(c -> TypeInfoUtils.getTypeInfoFromTypeString(c.getType())) + .collect(Collectors.toList()); + return AvroSerDe.getSchemaFromCols(new Properties(), columnNames, columnTypes, null).toString(); + } + + private static Path location(Table table) { + return Paths.get(table.getSd().getLocation()); + } + + private static Path location(Table table, List partitionValues) { + Path partitionLocation = location(table); + for (int i = 0; i < table.getPartitionKeysSize(); i++) { + partitionLocation = partitionLocation.resolve( + table.getPartitionKeys().get(i).getName() + "=" + partitionValues.get(i)); + } + return partitionLocation; + } + + private void addFiles(Table table, FileFormat format, String... fileNames) throws IOException { + Path tableLocation = location(table); + for (String fileName : fileNames) { + Path filePath = tableLocation.resolve(format.addExtension(fileName)); + Files.createFile(filePath); + } + } + + private void addPartition(Table table, List partitionValues, FileFormat format, String... fileNames) + throws Exception { + Path partitionLocation = location(table, partitionValues); + Files.createDirectories(partitionLocation); + long currentTimeMillis = System.currentTimeMillis(); + metastoreClient.add_partition(new Partition( + partitionValues, + table.getDbName(), + table.getTableName(), + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + storageDescriptor(table.getSd().getCols(), partitionLocation.toString(), format), + new HashMap<>() + )); + for (String fileName : fileNames) { + Path filePath = partitionLocation.resolve(format.addExtension(fileName)); + Files.createFile(filePath); + } + } + + private Map hiveScan(Table table) { + return hiveScan(table, Expressions.alwaysTrue()); + } + + private Map hiveScan(Table table, Expression filter) { + Path tableLocation = location(table); + CloseableIterable fileScanTasks = legacyCatalog + .loadTable(TableIdentifier.of(table.getDbName(), table.getTableName())) + .newScan().filter(filter).planFiles(); + return StreamSupport.stream(fileScanTasks.spliterator(), false).collect(Collectors.toMap( + f -> tableLocation.relativize(Paths.get(URI.create(f.file().path().toString()))).toString().split("\\.")[0], + f -> f.file().format())); + } + + private static void filesMatch(Map expected, Map actual) { + Assert.assertEquals(expected, actual); + } +}