Skip to content

Commit

Permalink
Support reading tables with only Hive metadata (#23)
Browse files Browse the repository at this point in the history
* Support reading tables with Hive metadata if Iceberg metadata is not available
  • Loading branch information
shardulm94 authored Mar 18, 2020
1 parent 733e122 commit 1a9acf8
Show file tree
Hide file tree
Showing 16 changed files with 1,566 additions and 4 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;

class BaseFileScanTask implements FileScanTask {
public class BaseFileScanTask implements FileScanTask {
private final DataFile file;
private final String schemaString;
private final String specString;
private final ResidualEvaluator residuals;

private transient PartitionSpec spec = null;

BaseFileScanTask(DataFile file, String schemaString, String specString, ResidualEvaluator residuals) {
public BaseFileScanTask(DataFile file, String schemaString, String specString, ResidualEvaluator residuals) {
this.file = file;
this.schemaString = schemaString;
this.specString = specString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private Table loadMetadataTable(TableIdentifier identifier) {
}
}

private boolean isValidMetadataIdentifier(TableIdentifier identifier) {
protected boolean isValidMetadataIdentifier(TableIdentifier identifier) {
return MetadataTableType.from(identifier.name()) != null &&
isValidIdentifier(TableIdentifier.of(identifier.namespace().levels()));
}
Expand Down Expand Up @@ -275,7 +275,7 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
});
}

private static String fullTableName(String catalogName, TableIdentifier identifier) {
protected static String fullTableName(String catalogName, TableIdentifier identifier) {
StringBuilder sb = new StringBuilder();

if (catalogName.contains("/") || catalogName.contains(":")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,13 @@ private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metada
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}

protected void setCurrentMetadata(TableMetadata currentMetadata) {
this.currentMetadata = currentMetadata;
}

protected void setShouldRefresh(boolean shouldRefresh) {
this.shouldRefresh = shouldRefresh;
}

}
12 changes: 12 additions & 0 deletions hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.legacy.LegacyHiveCatalog;

public final class HiveCatalogs {

Expand All @@ -39,6 +40,11 @@ public final class HiveCatalogs {
.removalListener((RemovalListener<String, HiveCatalog>) (uri, catalog, cause) -> catalog.close())
.build();

private static final Cache<String, HiveCatalog> LEGACY_CATALOG_CACHE = Caffeine.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, HiveCatalog>) (uri, catalog, cause) -> catalog.close())
.build();

private HiveCatalogs() {}

public static HiveCatalog loadCatalog(Configuration conf) {
Expand All @@ -47,6 +53,12 @@ public static HiveCatalog loadCatalog(Configuration conf) {
return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf));
}

public static HiveCatalog loadLegacyCatalog(Configuration conf) {
// metastore URI can be null in local mode
String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
return LEGACY_CATALOG_CACHE.get(metastoreUri, uri -> new LegacyHiveCatalog(conf));
}

/**
* @deprecated Use {@link #loadHiveMetadataPreservingCatalog(Configuration)} instead
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ protected void doRefresh() {
String errMsg = String.format("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP);
throw new IllegalArgumentException(errMsg);
}

if (!io().newInputFile(metadataLocation).exists()) {
String errMsg = String.format("%s property for %s.%s points to a non-existent file %s",
METADATA_LOCATION_PROP, database, tableName, metadataLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ protected void doRefresh() {
throw new IllegalArgumentException(errMsg);
}

if (!io().newInputFile(metadataLocation).exists()) {
String errMsg = String.format("%s property for %s.%s points to a non-existent file %s",
METADATA_LOCATION_PROP, database, tableName, metadataLocation);
throw new IllegalArgumentException(errMsg);
}
} catch (NoSuchObjectException e) {
if (currentMetadataLocation() != null) {
throw new NoSuchTableException(String.format("No such table: %s.%s", database, tableName));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive.legacy;

import org.apache.iceberg.FileFormat;
import org.apache.iceberg.StructLike;


/**
* Metadata for a data directory referenced by either a Hive table or a partition
*/
class DirectoryInfo {
private final String location;
private final FileFormat format;
private final StructLike partitionData;

DirectoryInfo(String location, FileFormat format, StructLike partitionData) {
this.location = location;
this.format = format;
this.partitionData = partitionData;
}

public String location() {
return location;
}

public FileFormat format() {
return format;
}

public StructLike partitionData() {
return partitionData;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive.legacy;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.iceberg.exceptions.RuntimeIOException;


class FileSystemUtils {

private FileSystemUtils() {}

/**
* Lists all non-hidden files for the given directory
*/
static List<FileStatus> listFiles(String directory, Configuration conf) {

final Path directoryPath = new Path(directory);
final FileStatus[] files;
try {
FileSystem fs = directoryPath.getFileSystem(conf);
files = fs.listStatus(directoryPath, HiddenPathFilter.INSTANCE);
} catch (IOException e) {
throw new RuntimeIOException(e, "Error listing files for directory: " + directory);
}
return Arrays.asList(files);
}

private enum HiddenPathFilter implements PathFilter {
INSTANCE;

@Override
public boolean accept(Path path) {
return !path.getName().startsWith("_") && !path.getName().startsWith(".");
}
}
}
Loading

0 comments on commit 1a9acf8

Please sign in to comment.