From 0ce3cc8736b7267d9355d080401968c40fb1819c Mon Sep 17 00:00:00 2001 From: auden-woolfson Date: Tue, 27 Aug 2024 12:01:57 -0700 Subject: [PATCH] Add support for timestamps with timezone in iceberg type converter --- .../src/main/sphinx/connector/iceberg.rst | 4 + .../parquet/AbstractTestParquetReader.java | 11 +- .../presto/iceberg/ExpressionConverter.java | 6 + .../iceberg/IcebergAbstractMetadata.java | 7 - .../presto/iceberg/IcebergHiveMetadata.java | 3 - .../presto/iceberg/IcebergNativeMetadata.java | 3 - .../facebook/presto/iceberg/IcebergUtil.java | 7 - .../presto/iceberg/TypeConverter.java | 5 + .../IcebergDistributedSmokeTestBase.java | 8 +- .../presto/iceberg/TestIcebergTypes.java | 121 ++++++++++++++++++ .../iceberg/TestNestedFieldConverter.java | 14 ++ .../presto/iceberg/TestSchemaConverter.java | 20 ++- .../batchreader/decoders/Decoders.java | 28 +++- .../batchreader/decoders/ValuesDecoder.java | 5 + ...pMicrosDeltaBinaryPackedValuesDecoder.java | 10 +- ...eAndTimestampMicrosPlainValuesDecoder.java | 14 +- ...stampMicrosRLEDictionaryValuesDecoder.java | 17 ++- .../presto/parquet/writer/ParquetWriters.java | 5 + .../TimestampWithTimezoneValueWriter.java | 51 ++++++++ 19 files changed, 291 insertions(+), 48 deletions(-) create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTypes.java create mode 100644 presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/TimestampWithTimezoneValueWriter.java diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 0e156b005a2cd..8408736f92abc 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1775,6 +1775,10 @@ Map of Iceberg types to the relevant PrestoDB types: - ``TIME`` * - ``TIMESTAMP`` - ``TIMESTAMP`` + * - ``TIMESTAMP`` + - ``TIMESTAMP_WITH_TIMEZONE`` + * - ``STRING`` + - ``VARCHAR`` * - ``UUID`` - ``UUID`` * - ``LIST`` diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java index 8afaded8b96e3..e8c88af69a7df 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/parquet/AbstractTestParquetReader.java @@ -43,9 +43,10 @@ import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.parquet.format.Statistics; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; import org.joda.time.DateTimeZone; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -999,8 +1000,12 @@ public void testDecimalBackedByINT32() public void testTimestampMicrosBackedByINT64() throws Exception { - org.apache.parquet.schema.MessageType parquetSchema = - MessageTypeParser.parseMessageType("message ts_micros { optional INT64 test (TIMESTAMP_MICROS); }"); + LogicalTypeAnnotation annotation = LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS); + MessageType parquetSchema = Types.buildMessage() + .primitive(PrimitiveType.PrimitiveTypeName.INT64, OPTIONAL) + .as(annotation) + .named("test") + .named("ts_micros"); ContiguousSet longValues = longsBetween(1_000_000, 1_001_000); ImmutableList.Builder expectedValues = new ImmutableList.Builder<>(); for (Long value : longValues) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java index 3f05a48d1219a..285a9cf863b5f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ExpressionConverter.java @@ -30,6 +30,7 @@ import com.facebook.presto.common.type.RowType; import com.facebook.presto.common.type.TimeType; import com.facebook.presto.common.type.TimestampType; +import com.facebook.presto.common.type.TimestampWithTimeZoneType; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.UuidType; import com.facebook.presto.common.type.VarbinaryType; @@ -47,6 +48,7 @@ import static com.facebook.presto.common.predicate.Marker.Bound.ABOVE; import static com.facebook.presto.common.predicate.Marker.Bound.BELOW; import static com.facebook.presto.common.predicate.Marker.Bound.EXACTLY; +import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc; import static com.facebook.presto.iceberg.IcebergColumnHandle.getPushedDownSubfield; import static com.facebook.presto.iceberg.IcebergColumnHandle.isPushedDownSubfield; import static com.facebook.presto.parquet.ParquetTypeUtils.columnPathFromSubfield; @@ -203,6 +205,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker) return MILLISECONDS.toMicros((Long) marker.getValue()); } + if (type instanceof TimestampWithTimeZoneType) { + return MILLISECONDS.toMicros(unpackMillisUtc((Long) marker.getValue())); + } + if (type instanceof VarcharType) { return ((Slice) marker.getValue()).toStringUtf8(); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 4fa63f09e5606..457896af447dd 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -154,7 +154,6 @@ import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties; import static com.facebook.presto.iceberg.IcebergUtil.tryGetSchema; import static com.facebook.presto.iceberg.IcebergUtil.validateTableMode; -import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported; import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName; import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm; import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields; @@ -693,10 +692,6 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle Type columnType = toIcebergType(column.getType()); - if (columnType.equals(Types.TimestampType.withZone())) { - throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", columnType)); - } - IcebergTableHandle handle = (IcebergTableHandle) tableHandle; verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns added"); Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); @@ -754,8 +749,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto Table icebergTable = getIcebergTable(session, table.getSchemaTableName()); validateTableMode(session, icebergTable); - verifyTypeSupported(icebergTable.schema()); - return beginIcebergTableInsert(session, table, icebergTable); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index f50d930e17966..4ae5daeab8527 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -124,7 +124,6 @@ import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties; import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns; import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties; -import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; @@ -307,8 +306,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con Schema schema = toIcebergSchema(tableMetadata.getColumns()); - verifyTypeSupported(schema); - PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties())); MetastoreContext metastoreContext = getMetastoreContext(session); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index a36759cd7a136..43a812b9d3dd0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -67,7 +67,6 @@ import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView; import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties; -import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; @@ -308,8 +307,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con Schema schema = toIcebergSchema(tableMetadata.getColumns()); - verifyTypeSupported(schema); - PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties())); FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index c509a814640af..09a879702e4a0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -457,13 +457,6 @@ public static void validateTableMode(ConnectorSession session, org.apache.iceber } } - public static void verifyTypeSupported(Schema schema) - { - if (schema.columns().stream().anyMatch(column -> Types.TimestampType.withZone().equals(column.type()))) { - throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", Types.TimestampType.withZone())); - } - } - public static Map createIcebergViewProperties(ConnectorSession session, String prestoVersion) { return ImmutableMap.builder() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java index 9897897cc8042..ca8db778ae87c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TypeConverter.java @@ -61,6 +61,7 @@ import static com.facebook.presto.common.type.RealType.REAL; import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.hive.HiveType.HIVE_BINARY; @@ -118,6 +119,10 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager case TIME: return TimeType.TIME; case TIMESTAMP: + Types.TimestampType timestampType = (Types.TimestampType) type.asPrimitiveType(); + if (timestampType.shouldAdjustToUTC()) { + return TIMESTAMP_WITH_TIME_ZONE; + } return TimestampType.TIMESTAMP; case STRING: return VarcharType.createUnboundedVarcharType(); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 074a36e43ebb8..a308202869338 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -89,11 +89,11 @@ public void testTimestamp() @Test public void testTimestampWithTimeZone() { - assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x timestamp with time zone)", "Iceberg column type timestamptz is not supported"); - assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Iceberg column type timestamptz is not supported"); - assertUpdate("CREATE TABLE test_timestamp_with_timezone (x timestamp)"); - assertQueryFails("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone", "Iceberg column type timestamptz is not supported"); + assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'"); + assertQuerySucceeds("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone"); dropTable(getSession(), "test_timestamp_with_timezone"); + + assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) WITH ( format = 'ORC') AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Unsupported Type: timestamp with time zone"); } @Test diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTypes.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTypes.java new file mode 100644 index 0000000000000..28265c9289c9b --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTypes.java @@ -0,0 +1,121 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.Session; +import com.facebook.presto.common.type.TimestampType; +import com.facebook.presto.common.type.TimestampWithTimeZoneType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.MaterializedRow; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.List; + +import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED; +import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class TestIcebergTypes + extends AbstractTestQueryFramework +{ + protected QueryRunner createQueryRunner() throws Exception + { + return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of()); + } + + @DataProvider(name = "testTimestampWithTimezone") + public Object[][] createTestTimestampWithTimezoneData() + { + return new Object[][] { + {Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true") + .build()}, + {Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false") + .build()} + }; + } + + @Test(dataProvider = "testTimestampWithTimezone") + public void testTimestampWithTimezone(Session session) + { + QueryRunner runner = getQueryRunner(); + String timestamptz = "TIMESTAMP '1984-12-08 00:10:00 America/Los_Angeles'"; + String timestamp = "TIMESTAMP '1984-12-08 00:10:00'"; + + dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz"); + assertQuerySucceeds(session, "CREATE TABLE test_timestamptz(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE)"); + + String row = "(" + timestamptz + ", " + timestamp + ", " + timestamptz + ")"; + for (int i = 0; i < 10; i++) { + assertUpdate(session, "INSERT INTO test_timestamptz values " + row, 1); + } + + MaterializedResult initialRows = runner.execute(session, "SELECT * FROM test_timestamptz"); + + List types = initialRows.getTypes(); + assertTrue(types.get(0) instanceof TimestampWithTimeZoneType); + assertTrue(types.get(1) instanceof TimestampType); + + List rows = initialRows.getMaterializedRows(); + for (int i = 0; i < 10; i++) { + assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString()); + } + + dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_partition"); + assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_partition(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE) " + + "WITH (PARTITIONING = ARRAY['b'])"); + assertUpdate(session, "INSERT INTO test_timestamptz_partition (a, b, c) SELECT a, b, c FROM test_timestamptz", 10); + + MaterializedResult partitionRows = runner.execute(session, "SELECT * FROM test_timestamptz"); + + List partitionTypes = partitionRows.getTypes(); + assertTrue(partitionTypes.get(0) instanceof TimestampWithTimeZoneType); + assertTrue(partitionTypes.get(1) instanceof TimestampType); + + rows = partitionRows.getMaterializedRows(); + for (int i = 0; i < 10; i++) { + assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString()); + } + + String earlyTimestamptz = "TIMESTAMP '1980-12-08 00:10:00 America/Los_Angeles'"; + dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_filter"); + assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_filter(a TIMESTAMP WITH TIME ZONE)"); + + for (int i = 0; i < 5; i++) { + assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + earlyTimestamptz + ")", 1); + } + for (int i = 0; i < 5; i++) { + assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + timestamptz + ")", 1); + } + + MaterializedResult lateRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a > " + earlyTimestamptz); + assertEquals(lateRows.getMaterializedRows().size(), 5); + + MaterializedResult lateRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + timestamptz); + com.facebook.presto.testing.assertions.Assert.assertEquals(lateRows, lateRowsFromEquals); + + MaterializedResult earlyRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a < " + timestamptz); + assertEquals(earlyRows.getMaterializedRows().size(), 5); + + MaterializedResult earlyRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + earlyTimestamptz); + com.facebook.presto.testing.assertions.Assert.assertEquals(earlyRows, earlyRowsFromEquals); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestNestedFieldConverter.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestNestedFieldConverter.java index cf0392c07df2c..a1eed697d5830 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestNestedFieldConverter.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestNestedFieldConverter.java @@ -38,6 +38,8 @@ import static com.facebook.presto.common.type.DoubleType.DOUBLE; import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.RealType.REAL; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; import static com.facebook.presto.iceberg.NestedFieldConverter.toIcebergNestedField; @@ -176,6 +178,12 @@ protected static PrestoIcebergNestedField prestoIcebergNestedField( case "date": prestoType = DATE; break; + case "timestamp": + prestoType = TIMESTAMP; + break; + case "timestamptz": + prestoType = TIMESTAMP_WITH_TIME_ZONE; + break; case "nested": prestoType = RowType.from(ImmutableList.of( RowType.field("int", INTEGER), @@ -239,6 +247,12 @@ protected static Types.NestedField nestedField(int id, String name) case "date": icebergType = Types.DateType.get(); break; + case "timestamp": + icebergType = Types.TimestampType.withoutZone(); + break; + case "timestamptz": + icebergType = Types.TimestampType.withZone(); + break; case "nested": icebergType = nested(); break; diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSchemaConverter.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSchemaConverter.java index adbbde80977af..8d276765c7441 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSchemaConverter.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSchemaConverter.java @@ -92,7 +92,9 @@ protected static PrestoIcebergSchema prestoIcebergSchema(TypeManager typeManager prestoIcebergNestedField(9, "varchar", typeManager), prestoIcebergNestedField(10, "varbinary", typeManager), prestoIcebergNestedField(11, "row", typeManager), - prestoIcebergNestedField(12, "date", typeManager))); + prestoIcebergNestedField(12, "date", typeManager), + prestoIcebergNestedField(13, "timestamp", typeManager), + prestoIcebergNestedField(14, "timestamptz", typeManager))); Map columnNameToIdMapping = getColumnNameToIdMapping(); @@ -114,11 +116,13 @@ private static Map getColumnNameToIdMapping() columnNameToIdMapping.put("varbinary", 10); columnNameToIdMapping.put("row", 11); columnNameToIdMapping.put("date", 12); - columnNameToIdMapping.put("array.element", 13); - columnNameToIdMapping.put("map.key", 14); - columnNameToIdMapping.put("map.value", 15); - columnNameToIdMapping.put("row.int", 16); - columnNameToIdMapping.put("row.varchar", 17); + columnNameToIdMapping.put("timestamp", 13); + columnNameToIdMapping.put("timestamptz", 14); + columnNameToIdMapping.put("array.element", 15); + columnNameToIdMapping.put("map.key", 16); + columnNameToIdMapping.put("map.value", 17); + columnNameToIdMapping.put("row.int", 18); + columnNameToIdMapping.put("row.varchar", 19); return columnNameToIdMapping; } @@ -137,7 +141,9 @@ protected static Schema schema() nestedField(9, "varchar"), nestedField(10, "varbinary"), nestedField(11, "row"), - nestedField(12, "date"))); + nestedField(12, "date"), + nestedField(13, "timestamp"), + nestedField(14, "timestamptz"))); Type schemaAsStruct = Types.StructType.of(fields); AtomicInteger nextFieldId = new AtomicInteger(1); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java index e51cf0c00fdea..235a42a522c17 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java @@ -62,6 +62,7 @@ import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import java.io.ByteArrayInputStream; @@ -128,10 +129,14 @@ private static ValuesDecoder createValuesDecoder(ColumnDescriptor columnDescript case FLOAT: return new Int32PlainValuesDecoder(buffer, offset, length); case INT64: { - if (isTimeStampMicrosType(columnDescriptor) || isTimeMicrosType(columnDescriptor)) { + if (isTimeStampMicrosType(columnDescriptor)) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation typeAnnotation = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) columnDescriptor.getPrimitiveType().getLogicalTypeAnnotation(); + boolean withTimezone = typeAnnotation.isAdjustedToUTC(); + return new Int64TimeAndTimestampMicrosPlainValuesDecoder(buffer, offset, length, withTimezone); + } + if (isTimeMicrosType(columnDescriptor)) { return new Int64TimeAndTimestampMicrosPlainValuesDecoder(buffer, offset, length); } - if (isShortDecimalType(columnDescriptor)) { return new Int64ShortDecimalPlainValuesDecoder(buffer, offset, length); } @@ -184,8 +189,13 @@ else if (isUuidType(columnDescriptor)) { return new Int32RLEDictionaryValuesDecoder(bitWidth, inputStream, (IntegerDictionary) dictionary); } case INT64: { - if (isTimeStampMicrosType(columnDescriptor) || isTimeMicrosType(columnDescriptor)) { - return new Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder(bitWidth, inputStream, (LongDictionary) dictionary); + if (isTimeStampMicrosType(columnDescriptor)) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation typeAnnotation = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) columnDescriptor.getPrimitiveType().getLogicalTypeAnnotation(); + boolean withTimezone = typeAnnotation.isAdjustedToUTC(); + return new Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder(bitWidth, inputStream, (LongDictionary) dictionary, withTimezone); + } + if (isTimeMicrosType(columnDescriptor)) { + return new Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder(bitWidth, inputStream, (LongDictionary) dictionary, false); } if (isShortDecimalType(columnDescriptor)) { return new Int64RLEDictionaryValuesDecoder(bitWidth, inputStream, (LongDictionary) dictionary); @@ -227,10 +237,14 @@ else if (isUuidType(columnDescriptor)) { return new Int32DeltaBinaryPackedValuesDecoder(valueCount, inputStream); } case INT64: { - if (isTimeStampMicrosType(columnDescriptor) || isTimeMicrosType(columnDescriptor)) { - return new Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder(valueCount, inputStream); + if (isTimeStampMicrosType(columnDescriptor)) { + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation typeAnnotation = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) columnDescriptor.getPrimitiveType().getLogicalTypeAnnotation(); + boolean withTimezone = typeAnnotation.isAdjustedToUTC(); + return new Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder(valueCount, inputStream, withTimezone); + } + if (isTimeMicrosType(columnDescriptor)) { + return new Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder(valueCount, inputStream, false); } - if (isShortDecimalType(columnDescriptor)) { ValuesReader parquetReader = getParquetReader(encoding, columnDescriptor, valueCount, inputStream); return new Int64ShortDecimalDeltaValuesDecoder(parquetReader); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java index b3944cfe9cd76..96a83d83c9026 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/ValuesDecoder.java @@ -64,6 +64,11 @@ void skip(int length) throws IOException; } + interface PackFunction + { + long pack(long millis); + } + interface TimestampValuesDecoder extends ValuesDecoder { diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder.java index d422c8d5ebcca..a8c7b8a6442d2 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/delta/Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder.java @@ -20,6 +20,8 @@ import java.io.IOException; +import static com.facebook.presto.common.type.DateTimeEncoding.packDateTimeWithZone; +import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY; import static java.util.concurrent.TimeUnit.MICROSECONDS; /** @@ -34,11 +36,14 @@ public class Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder private final DeltaBinaryPackingValuesReader innerReader; - public Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder(int valueCount, ByteBufferInputStream bufferInputStream) + private final PackFunction packFunction; + + public Int64TimeAndTimestampMicrosDeltaBinaryPackedValuesDecoder(int valueCount, ByteBufferInputStream bufferInputStream, boolean withTimezone) throws IOException { innerReader = new DeltaBinaryPackingValuesReader(); innerReader.initFromPage(valueCount, bufferInputStream); + this.packFunction = withTimezone ? millis -> packDateTimeWithZone(millis, UTC_KEY) : millis -> millis; } @Override @@ -46,7 +51,8 @@ public void readNext(long[] values, int offset, int length) { int endOffset = offset + length; for (int i = offset; i < endOffset; i++) { - values[i] = MICROSECONDS.toMillis(innerReader.readLong()); + long curValue = MICROSECONDS.toMillis(innerReader.readLong()); + values[i] = packFunction.pack(curValue); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int64TimeAndTimestampMicrosPlainValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int64TimeAndTimestampMicrosPlainValuesDecoder.java index cc78f9e981d55..96b76d5f20878 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int64TimeAndTimestampMicrosPlainValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/plain/Int64TimeAndTimestampMicrosPlainValuesDecoder.java @@ -17,6 +17,8 @@ import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.Int64TimeAndTimestampMicrosValuesDecoder; import org.openjdk.jol.info.ClassLayout; +import static com.facebook.presto.common.type.DateTimeEncoding.packDateTimeWithZone; +import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY; import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.sizeOf; import static java.util.concurrent.TimeUnit.MICROSECONDS; @@ -31,11 +33,19 @@ public class Int64TimeAndTimestampMicrosPlainValuesDecoder private int bufferOffset; + private final PackFunction packFunction; + public Int64TimeAndTimestampMicrosPlainValuesDecoder(byte[] byteBuffer, int bufferOffset, int length) + { + this(byteBuffer, bufferOffset, length, false); + } + + public Int64TimeAndTimestampMicrosPlainValuesDecoder(byte[] byteBuffer, int bufferOffset, int length, boolean withTimezone) { this.byteBuffer = byteBuffer; this.bufferOffset = bufferOffset; this.bufferEnd = bufferOffset + length; + this.packFunction = withTimezone ? millis -> packDateTimeWithZone(millis, UTC_KEY) : millis -> millis; } @Override @@ -49,10 +59,10 @@ public void readNext(long[] values, int offset, int length) int localBufferOffset = bufferOffset; while (offset < endOffset) { - values[offset++] = MICROSECONDS.toMillis(BytesUtils.getLong(localByteBuffer, localBufferOffset)); + long valueMillis = MICROSECONDS.toMillis(BytesUtils.getLong(localByteBuffer, localBufferOffset)); + values[offset++] = packFunction.pack(valueMillis); localBufferOffset += 8; } - bufferOffset = localBufferOffset; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java index e628a08292a9b..c94d88f896805 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; +import static com.facebook.presto.common.type.DateTimeEncoding.packDateTimeWithZone; +import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.airlift.slice.SizeOf.sizeOf; @@ -34,10 +36,18 @@ public class Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder private final LongDictionary dictionary; - public Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder(int bitWidth, InputStream inputStream, LongDictionary dictionary) + private final PackFunction packFunction; + + public Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder(int bitWidth, InputStream inputStream, LongDictionary dictionary, boolean withTimezone) { super(Integer.MAX_VALUE, bitWidth, inputStream); this.dictionary = dictionary; + this.packFunction = withTimezone ? millis -> packDateTimeWithZone(millis, UTC_KEY) : millis -> millis; + } + + public Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder(int bitWidth, InputStream inputStream, LongDictionary dictionary) + { + this(bitWidth, inputStream, dictionary, false); } @Override @@ -60,7 +70,7 @@ public void readNext(long[] values, int offset, int length) final int rleValue = currentValue; final long rleDictionaryValue = MICROSECONDS.toMillis(dictionary.decodeToLong(rleValue)); while (destinationIndex < endIndex) { - values[destinationIndex++] = rleDictionaryValue; + values[destinationIndex++] = packFunction.pack(rleDictionaryValue); } break; } @@ -69,7 +79,8 @@ public void readNext(long[] values, int offset, int length) final LongDictionary localDictionary = dictionary; for (int srcIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; srcIndex++) { long dictionaryValue = localDictionary.decodeToLong(localBuffer[srcIndex]); - values[destinationIndex++] = MICROSECONDS.toMillis(dictionaryValue); + long millisValue = MICROSECONDS.toMillis(dictionaryValue); + values[destinationIndex++] = packFunction.pack(millisValue); } break; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java index a6b152381622c..0204878bf0d00 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/ParquetWriters.java @@ -29,6 +29,7 @@ import com.facebook.presto.parquet.writer.valuewriter.RealValueWriter; import com.facebook.presto.parquet.writer.valuewriter.TimeValueWriter; import com.facebook.presto.parquet.writer.valuewriter.TimestampValueWriter; +import com.facebook.presto.parquet.writer.valuewriter.TimestampWithTimezoneValueWriter; import com.facebook.presto.parquet.writer.valuewriter.UuidValuesWriter; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; @@ -57,6 +58,7 @@ import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimeType.TIME; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.common.type.UuidType.UUID; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; @@ -211,6 +213,9 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, Ty if (TIMESTAMP.equals(type)) { return new TimestampValueWriter(valuesWriter, type, parquetType); } + if (TIMESTAMP_WITH_TIME_ZONE.equals(type)) { + return new TimestampWithTimezoneValueWriter(valuesWriter, type, parquetType); + } if (TIME.equals(type)) { return new TimeValueWriter(valuesWriter, type, parquetType); } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/TimestampWithTimezoneValueWriter.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/TimestampWithTimezoneValueWriter.java new file mode 100644 index 0000000000000..358310a333ae9 --- /dev/null +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/writer/valuewriter/TimestampWithTimezoneValueWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.parquet.writer.valuewriter; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.type.Type; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class TimestampWithTimezoneValueWriter + extends PrimitiveValueWriter +{ + private final Type type; + private final boolean writeMicroseconds; + + public TimestampWithTimezoneValueWriter(ValuesWriter valuesWriter, Type type, PrimitiveType parquetType) + { + super(parquetType, valuesWriter); + this.type = requireNonNull(type, "type is null"); + this.writeMicroseconds = parquetType.isPrimitive() && parquetType.getOriginalType() == OriginalType.TIMESTAMP_MICROS; + } + + @Override + public void write(Block block) + { + for (int i = 0; i < block.getPositionCount(); i++) { + if (!block.isNull(i)) { + long value = unpackMillisUtc(type.getLong(block, i)); + long scaledValue = writeMicroseconds ? MILLISECONDS.toMicros(value) : value; + getValueWriter().writeLong(scaledValue); + getStatistics().updateStats(scaledValue); + } + } + } +}