Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timestamp with timezone mapping in iceberg type converter #23534

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,10 @@ Map of Iceberg types to the relevant PrestoDB types:
- ``TIME``
* - ``TIMESTAMP``
- ``TIMESTAMP``
* - ``TIMESTAMP``
- ``TIMESTAMP_WITH_TIMEZONE``
* - ``STRING``
- ``VARCHAR``
* - ``UUID``
- ``UUID``
* - ``LIST``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTimeZone;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -999,8 +1000,12 @@ public void testDecimalBackedByINT32()
public void testTimestampMicrosBackedByINT64()
throws Exception
{
org.apache.parquet.schema.MessageType parquetSchema =
MessageTypeParser.parseMessageType("message ts_micros { optional INT64 test (TIMESTAMP_MICROS); }");
LogicalTypeAnnotation annotation = LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS);
MessageType parquetSchema = Types.buildMessage()
.primitive(PrimitiveType.PrimitiveTypeName.INT64, OPTIONAL)
.as(annotation)
.named("test")
.named("ts_micros");
ContiguousSet<Long> longValues = longsBetween(1_000_000, 1_001_000);
ImmutableList.Builder<SqlTimestamp> expectedValues = new ImmutableList.Builder<>();
for (Long value : longValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.UuidType;
import com.facebook.presto.common.type.VarbinaryType;
Expand All @@ -47,6 +48,7 @@
import static com.facebook.presto.common.predicate.Marker.Bound.ABOVE;
import static com.facebook.presto.common.predicate.Marker.Bound.BELOW;
import static com.facebook.presto.common.predicate.Marker.Bound.EXACTLY;
import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc;
import static com.facebook.presto.iceberg.IcebergColumnHandle.getPushedDownSubfield;
import static com.facebook.presto.iceberg.IcebergColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.parquet.ParquetTypeUtils.columnPathFromSubfield;
Expand Down Expand Up @@ -203,6 +205,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
return MILLISECONDS.toMicros((Long) marker.getValue());
}

if (type instanceof TimestampWithTimeZoneType) {
return MILLISECONDS.toMicros(unpackMillisUtc((Long) marker.getValue()));
}

if (type instanceof VarcharType) {
return ((Slice) marker.getValue()).toStringUtf8();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetSchema;
import static com.facebook.presto.iceberg.IcebergUtil.validateTableMode;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName;
import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm;
import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields;
Expand Down Expand Up @@ -693,10 +692,6 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle

Type columnType = toIcebergType(column.getType());

if (columnType.equals(Types.TimestampType.withZone())) {
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", columnType));
}

IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns added");
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Expand Down Expand Up @@ -754,8 +749,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
validateTableMode(session, icebergTable);

verifyTypeSupported(icebergTable.schema());

return beginIcebergTableInsert(session, table, icebergTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
Expand Down Expand Up @@ -307,8 +306,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

Schema schema = toIcebergSchema(tableMetadata.getColumns());

verifyTypeSupported(schema);

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));

MetastoreContext metastoreContext = getMetastoreContext(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
Expand Down Expand Up @@ -308,8 +307,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

Schema schema = toIcebergSchema(tableMetadata.getColumns());

verifyTypeSupported(schema);

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,6 @@ public static void validateTableMode(ConnectorSession session, org.apache.iceber
}
}

public static void verifyTypeSupported(Schema schema)
{
if (schema.columns().stream().anyMatch(column -> Types.TimestampType.withZone().equals(column.type()))) {
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", Types.TimestampType.withZone()));
}
}

public static Map<String, String> createIcebergViewProperties(ConnectorSession session, String prestoVersion)
{
return ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.hive.HiveType.HIVE_BINARY;
Expand Down Expand Up @@ -118,6 +119,10 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager
case TIME:
return TimeType.TIME;
case TIMESTAMP:
Types.TimestampType timestampType = (Types.TimestampType) type.asPrimitiveType();
if (timestampType.shouldAdjustToUTC()) {
return TIMESTAMP_WITH_TIME_ZONE;
}
return TimestampType.TIMESTAMP;
case STRING:
return VarcharType.createUnboundedVarcharType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public void testTimestamp()
@Test
public void testTimestampWithTimeZone()
{
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x timestamp with time zone)", "Iceberg column type timestamptz is not supported");
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Iceberg column type timestamptz is not supported");
assertUpdate("CREATE TABLE test_timestamp_with_timezone (x timestamp)");
assertQueryFails("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone", "Iceberg column type timestamptz is not supported");
assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'");
assertQuerySucceeds("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone");
dropTable(getSession(), "test_timestamp_with_timezone");

assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) WITH ( format = 'ORC') AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Unsupported Type: timestamp with time zone");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;

import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED;
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestIcebergTypes
extends AbstractTestQueryFramework
{
protected QueryRunner createQueryRunner() throws Exception
{
return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of());
}

@DataProvider(name = "testTimestampWithTimezone")
public Object[][] createTestTimestampWithTimezoneData()
{
return new Object[][] {
{Session.builder(getSession())
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true")
.build()},
{Session.builder(getSession())
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false")
.build()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be different timezone data for test as the name suggests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to test with batch reader enabled and disabled. We are not changing the data used

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think name of the DataProvider and Object method according to the properties would be better?

};
}

@Test(dataProvider = "testTimestampWithTimezone")
public void testTimestampWithTimezone(Session session)
{
QueryRunner runner = getQueryRunner();
String timestamptz = "TIMESTAMP '1984-12-08 00:10:00 America/Los_Angeles'";
String timestamp = "TIMESTAMP '1984-12-08 00:10:00'";

dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz");
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE)");

String row = "(" + timestamptz + ", " + timestamp + ", " + timestamptz + ")";
for (int i = 0; i < 10; i++) {
assertUpdate(session, "INSERT INTO test_timestamptz values " + row, 1);
}

MaterializedResult initialRows = runner.execute(session, "SELECT * FROM test_timestamptz");

List<Type> types = initialRows.getTypes();
assertTrue(types.get(0) instanceof TimestampWithTimeZoneType);
assertTrue(types.get(1) instanceof TimestampType);

List<MaterializedRow> rows = initialRows.getMaterializedRows();
for (int i = 0; i < 10; i++) {
assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString());
}

dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_partition");
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_partition(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE) " +
"WITH (PARTITIONING = ARRAY['b'])");
assertUpdate(session, "INSERT INTO test_timestamptz_partition (a, b, c) SELECT a, b, c FROM test_timestamptz", 10);

MaterializedResult partitionRows = runner.execute(session, "SELECT * FROM test_timestamptz");

List<Type> partitionTypes = partitionRows.getTypes();
assertTrue(partitionTypes.get(0) instanceof TimestampWithTimeZoneType);
assertTrue(partitionTypes.get(1) instanceof TimestampType);

rows = partitionRows.getMaterializedRows();
for (int i = 0; i < 10; i++) {
assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString());
}

String earlyTimestamptz = "TIMESTAMP '1980-12-08 00:10:00 America/Los_Angeles'";
dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_filter");
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_filter(a TIMESTAMP WITH TIME ZONE)");

for (int i = 0; i < 5; i++) {
assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + earlyTimestamptz + ")", 1);
}
for (int i = 0; i < 5; i++) {
assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + timestamptz + ")", 1);
}

MaterializedResult lateRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a > " + earlyTimestamptz);
assertEquals(lateRows.getMaterializedRows().size(), 5);

MaterializedResult lateRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + timestamptz);
com.facebook.presto.testing.assertions.Assert.assertEquals(lateRows, lateRowsFromEquals);

MaterializedResult earlyRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a < " + timestamptz);
assertEquals(earlyRows.getMaterializedRows().size(), 5);

MaterializedResult earlyRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + earlyTimestamptz);
com.facebook.presto.testing.assertions.Assert.assertEquals(earlyRows, earlyRowsFromEquals);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.iceberg.NestedFieldConverter.toIcebergNestedField;
Expand Down Expand Up @@ -176,6 +178,12 @@ protected static PrestoIcebergNestedField prestoIcebergNestedField(
case "date":
prestoType = DATE;
break;
case "timestamp":
prestoType = TIMESTAMP;
break;
case "timestamptz":
prestoType = TIMESTAMP_WITH_TIME_ZONE;
break;
case "nested":
prestoType = RowType.from(ImmutableList.of(
RowType.field("int", INTEGER),
Expand Down Expand Up @@ -239,6 +247,12 @@ protected static Types.NestedField nestedField(int id, String name)
case "date":
icebergType = Types.DateType.get();
break;
case "timestamp":
icebergType = Types.TimestampType.withoutZone();
break;
case "timestamptz":
icebergType = Types.TimestampType.withZone();
break;
case "nested":
icebergType = nested();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ protected static PrestoIcebergSchema prestoIcebergSchema(TypeManager typeManager
prestoIcebergNestedField(9, "varchar", typeManager),
prestoIcebergNestedField(10, "varbinary", typeManager),
prestoIcebergNestedField(11, "row", typeManager),
prestoIcebergNestedField(12, "date", typeManager)));
prestoIcebergNestedField(12, "date", typeManager),
prestoIcebergNestedField(13, "timestamp", typeManager),
prestoIcebergNestedField(14, "timestamptz", typeManager)));

Map<String, Integer> columnNameToIdMapping = getColumnNameToIdMapping();

Expand All @@ -114,11 +116,13 @@ private static Map<String, Integer> getColumnNameToIdMapping()
columnNameToIdMapping.put("varbinary", 10);
columnNameToIdMapping.put("row", 11);
columnNameToIdMapping.put("date", 12);
columnNameToIdMapping.put("array.element", 13);
columnNameToIdMapping.put("map.key", 14);
columnNameToIdMapping.put("map.value", 15);
columnNameToIdMapping.put("row.int", 16);
columnNameToIdMapping.put("row.varchar", 17);
columnNameToIdMapping.put("timestamp", 13);
columnNameToIdMapping.put("timestamptz", 14);
columnNameToIdMapping.put("array.element", 15);
columnNameToIdMapping.put("map.key", 16);
columnNameToIdMapping.put("map.value", 17);
columnNameToIdMapping.put("row.int", 18);
columnNameToIdMapping.put("row.varchar", 19);

return columnNameToIdMapping;
}
Expand All @@ -137,7 +141,9 @@ protected static Schema schema()
nestedField(9, "varchar"),
nestedField(10, "varbinary"),
nestedField(11, "row"),
nestedField(12, "date")));
nestedField(12, "date"),
nestedField(13, "timestamp"),
nestedField(14, "timestamptz")));

Type schemaAsStruct = Types.StructType.of(fields);
AtomicInteger nextFieldId = new AtomicInteger(1);
Expand Down
Loading
Loading