From c5c53d497e38e8f77443a016a3887ac8bc76b125 Mon Sep 17 00:00:00 2001 From: zhaner08 Date: Sun, 15 Dec 2024 17:35:34 -0500 Subject: [PATCH 1/5] Initial implementation for supporting AWS Cloudtrail input format --- .../io/trino/hive/formats/HiveClassNames.java | 1 + .../CloudTrailFileReaderFactory.java | 52 ++++++ .../line/cloudtrail/CloudtrailLineReader.java | 161 ++++++++++++++++++ .../java/io/trino/plugin/hive/HiveModule.java | 4 + .../line/CloudtrailJsonPageSourceFactory.java | 34 ++++ .../CloudtrailOpenXJsonPageSourceFactory.java | 34 ++++ .../io/trino/plugin/hive/HiveTestUtils.java | 3 + 7 files changed, 289 insertions(+) create mode 100644 lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailFileReaderFactory.java create mode 100644 lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudtrailLineReader.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java index 43a357b59bd9..3fee19366a03 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java @@ -18,6 +18,7 @@ public final class HiveClassNames public static final String AVRO_CONTAINER_INPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"; public static final String AVRO_CONTAINER_OUTPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"; public static final String AVRO_SERDE_CLASS = "org.apache.hadoop.hive.serde2.avro.AvroSerDe"; + public static final String CLOUDTRAIL_INPUT_FORMAT = "com.amazon.emr.cloudtrail.CloudTrailInputFormat"; public static final String COLUMNAR_SERDE_CLASS = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; public static final String FILE_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.FileInputFormat"; public static final String FILE_OUTPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.FileOutputFormat"; diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailFileReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailFileReaderFactory.java new file mode 100644 index 000000000000..00c74934f133 --- /dev/null +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailFileReaderFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.line.cloudtrail; + +import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.TrinoInputFile; +import io.trino.hive.formats.line.LineReader; +import io.trino.hive.formats.line.text.TextLineReaderFactory; + +import java.io.IOException; +import java.util.Set; + +import static io.trino.hive.formats.HiveClassNames.CLOUDTRAIL_INPUT_FORMAT; + +public class CloudTrailFileReaderFactory + extends TextLineReaderFactory +{ + + public CloudTrailFileReaderFactory(int fileBufferSize, int initialLineBufferSize, int maxLineLength) + { + super(fileBufferSize, initialLineBufferSize, maxLineLength); + } + + @Override + public Set getHiveInputFormatClassNames() { + return ImmutableSet.of(CLOUDTRAIL_INPUT_FORMAT); + } + + @Override + public LineReader createLineReader( + TrinoInputFile inputFile, + long start, + long length, + int headerCount, + int footerCount) + throws IOException + { + LineReader delegate = super.createLineReader(inputFile, start, length, headerCount, footerCount); + return new CloudtrailLineReader(delegate, this::createLineBuffer); + } +} diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudtrailLineReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudtrailLineReader.java new file mode 100644 index 000000000000..17bd84c81a01 --- /dev/null +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudtrailLineReader.java @@ -0,0 +1,161 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.line.cloudtrail; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableSet; +import io.airlift.log.Logger; +import io.trino.hive.formats.line.LineBuffer; +import io.trino.hive.formats.line.LineReader; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/* + Cloudtrail reader should be used together with a JSON serde + Records from cloudtrail are json objects wrapped with an object with key named "Records" + This reader reads the line, and extract the serve the record through linebuffer one at a time + */ +public class CloudtrailLineReader + implements LineReader +{ + private static final Logger log = Logger.get(CloudtrailLineReader.class); + private static final String DOCUMENT_LIST_KEY = "Records"; + private static final Set NESTED_FIELDS_WITHOUT_SCHEMA = ImmutableSet.of( + "requestParameters", + "responseElements", + "additionalEventData", + "serviceEventDetails"); + + private static final Set AS_STRING_FIELDS = ImmutableSet.of( + "userIdentity"); + private static final String AS_STRING_SUFFIX = "_as_string"; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final LineReader delegate; + private Iterator records; + private LineBuffer fileLineBuffer; + + public CloudtrailLineReader(LineReader delegate, Supplier lineBufferSupplier) + throws IOException + { + this.delegate = requireNonNull(delegate, "delegate is null"); + fileLineBuffer = lineBufferSupplier.get(); + } + + @Override + public boolean isClosed() + { + return delegate.isClosed(); + } + + @Override + public long getRetainedSize() + { + return delegate.getRetainedSize(); + } + + @Override + public long getBytesRead() + { + return delegate.getBytesRead(); + } + + @Override + public long getReadTimeNanos() + { + return delegate.getReadTimeNanos(); + } + + @Override + public boolean readLine(LineBuffer lineBuffer) + throws IOException { + lineBuffer.reset(); + + if (records != null && records.hasNext()) { + lineBuffer.write(transformJsonNode(records.next())); + return true; + } + + while (delegate.readLine(fileLineBuffer)) + { + try { + JsonNode node = MAPPER.readValue(fileLineBuffer.getBuffer(), JsonNode.class); + records = node.get(DOCUMENT_LIST_KEY).elements(); + } + catch (IOException ioException) { + throw new UncheckedIOException(ioException); + } + catch (Exception e) { + // We do not throw error in order to fully read cloudtrail records and ignored backward incompatible records + log.error("Encountered an exception while parsing Cloudtrail records", e); + continue; + } + + if (records.hasNext()) { + lineBuffer.write(transformJsonNode(records.next())); + return true; + } + } + + // If there is no record left and not line left from the split, cleanup the resources + close(); + return false; + } + + @Override + public void close() + throws IOException + { + fileLineBuffer = null; + delegate.close(); + } + + private byte[] transformJsonNode(JsonNode node) + throws IOException { + ObjectNode objectNode = (ObjectNode) node; + for (String field : NESTED_FIELDS_WITHOUT_SCHEMA) { + if (node.has(field)) { + JsonNode fieldNode = node.get(field); + if (fieldNode.isNull()) { + objectNode.remove(field); + } + else { + objectNode.put(field, fieldNode.toString()); + } + } + } + + for (String field : AS_STRING_FIELDS) { + if (node.has(field)) { + JsonNode fieldNode = node.get(field); + if (fieldNode.isNull()) { + objectNode.remove(field); + } + else { + objectNode.put(field + AS_STRING_SUFFIX, fieldNode.toString()); + } + } + } + + return node.binaryValue(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 4aedf0a0786e..2191c142f526 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -27,6 +27,8 @@ import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; +import io.trino.plugin.hive.line.CloudtrailJsonPageSourceFactory; +import io.trino.plugin.hive.line.CloudtrailOpenXJsonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -128,6 +130,8 @@ public void configure(Binder binder) pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(AvroPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(CloudtrailJsonPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(CloudtrailOpenXJsonPageSourceFactory.class).in(Scopes.SINGLETON); Multibinder fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class); binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java new file mode 100644 index 000000000000..5e235be5de65 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.line; + +import com.google.inject.Inject; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.hive.formats.line.cloudtrail.CloudTrailFileReaderFactory; +import io.trino.hive.formats.line.json.JsonDeserializerFactory; +import io.trino.plugin.hive.HiveConfig; + +import static java.lang.Math.toIntExact; + +public class CloudtrailJsonPageSourceFactory + extends LinePageSourceFactory +{ + @Inject + public CloudtrailJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config) + { + super(trinoFileSystemFactory, + new JsonDeserializerFactory(), + new CloudTrailFileReaderFactory(1024, 1024, toIntExact(config.getTextMaxLineLength().toBytes()))); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java new file mode 100644 index 000000000000..23b14ec8037c --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.line; + +import com.google.inject.Inject; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.hive.formats.line.cloudtrail.CloudTrailFileReaderFactory; +import io.trino.hive.formats.line.openxjson.OpenXJsonDeserializerFactory; +import io.trino.plugin.hive.HiveConfig; + +import static java.lang.Math.toIntExact; + +public class CloudtrailOpenXJsonPageSourceFactory + extends LinePageSourceFactory +{ + @Inject + public CloudtrailOpenXJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config) + { + super(trinoFileSystemFactory, + new OpenXJsonDeserializerFactory(), + new CloudTrailFileReaderFactory(1024, 1024, toIntExact(config.getTextMaxLineLength().toBytes()))); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index 1d74a78334ed..6660e26a02e9 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -36,6 +36,7 @@ import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.hive.avro.AvroFileWriterFactory; import io.trino.plugin.hive.avro.AvroPageSourceFactory; +import io.trino.plugin.hive.line.CloudtrailJsonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -189,6 +190,8 @@ public static Set getDefaultHivePageSourceFactories(Trino .add(new RcFilePageSourceFactory(fileSystemFactory, hiveConfig)) .add(new OrcPageSourceFactory(new OrcReaderConfig(), fileSystemFactory, stats, hiveConfig)) .add(new ParquetPageSourceFactory(fileSystemFactory, stats, new ParquetReaderConfig(), hiveConfig)) + .add(new CloudtrailJsonPageSourceFactory(fileSystemFactory, hiveConfig)) + .add(new CloudtrailJsonPageSourceFactory(fileSystemFactory, hiveConfig)) .build(); } From 2215e8a485d07d825efa42bcd328e51c2f79e9a2 Mon Sep 17 00:00:00 2001 From: zhaner08 Date: Wed, 5 Feb 2025 21:01:27 -0500 Subject: [PATCH 2/5] Cleanup and adding tests for the format --- ...eReader.java => CloudTrailLineReader.java} | 33 +++---- ....java => CloudTrailLineReaderFactory.java} | 10 +- .../cloudtrail/TestCloudTrailLineReader.java | 99 +++++++++++++++++++ .../java/io/trino/plugin/hive/HiveModule.java | 2 - .../line/CloudtrailJsonPageSourceFactory.java | 4 +- .../CloudtrailOpenXJsonPageSourceFactory.java | 34 ------- .../io/trino/plugin/hive/HiveTestUtils.java | 1 - 7 files changed, 121 insertions(+), 62 deletions(-) rename lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/{CloudtrailLineReader.java => CloudTrailLineReader.java} (85%) rename lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/{CloudTrailFileReaderFactory.java => CloudTrailLineReaderFactory.java} (87%) create mode 100644 lib/trino-hive-formats/src/test/java/io/trino/hive/formats/line/cloudtrail/TestCloudTrailLineReader.java delete mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudtrailLineReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReader.java similarity index 85% rename from lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudtrailLineReader.java rename to lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReader.java index 17bd84c81a01..a2637c85738b 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudtrailLineReader.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReader.java @@ -22,39 +22,40 @@ import io.trino.hive.formats.line.LineReader; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Iterator; import java.util.Set; import java.util.function.Supplier; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; /* Cloudtrail reader should be used together with a JSON serde - Records from cloudtrail are json objects wrapped with an object with key named "Records" - This reader reads the line, and extract the serve the record through linebuffer one at a time + Records from cloudtrail are json objects wrapped in an object with key named "Records" + This reader reads the line, and extract and serve the record through linebuffer one at a time */ -public class CloudtrailLineReader +public class CloudTrailLineReader implements LineReader { - private static final Logger log = Logger.get(CloudtrailLineReader.class); + private static final Logger log = Logger.get(CloudTrailLineReader.class); private static final String DOCUMENT_LIST_KEY = "Records"; + private static final String AS_STRING_SUFFIX = "_as_string"; + // Nested fields that are returned as String regardless of their nested schemas private static final Set NESTED_FIELDS_WITHOUT_SCHEMA = ImmutableSet.of( "requestParameters", "responseElements", "additionalEventData", "serviceEventDetails"); - + // Fields that we will append an additional String field with a suffix private static final Set AS_STRING_FIELDS = ImmutableSet.of( "userIdentity"); - private static final String AS_STRING_SUFFIX = "_as_string"; private static final ObjectMapper MAPPER = new ObjectMapper(); private final LineReader delegate; private Iterator records; private LineBuffer fileLineBuffer; - public CloudtrailLineReader(LineReader delegate, Supplier lineBufferSupplier) + public CloudTrailLineReader(LineReader delegate, Supplier lineBufferSupplier) throws IOException { this.delegate = requireNonNull(delegate, "delegate is null"); @@ -87,7 +88,8 @@ public long getReadTimeNanos() @Override public boolean readLine(LineBuffer lineBuffer) - throws IOException { + throws IOException + { lineBuffer.reset(); if (records != null && records.hasNext()) { @@ -95,18 +97,14 @@ public boolean readLine(LineBuffer lineBuffer) return true; } - while (delegate.readLine(fileLineBuffer)) - { + while (delegate.readLine(fileLineBuffer)) { try { JsonNode node = MAPPER.readValue(fileLineBuffer.getBuffer(), JsonNode.class); records = node.get(DOCUMENT_LIST_KEY).elements(); } - catch (IOException ioException) { - throw new UncheckedIOException(ioException); - } catch (Exception e) { // We do not throw error in order to fully read cloudtrail records and ignored backward incompatible records - log.error("Encountered an exception while parsing Cloudtrail records", e); + log.error(e, "Encountered an exception while parsing CloudTrail records"); continue; } @@ -130,7 +128,7 @@ public void close() } private byte[] transformJsonNode(JsonNode node) - throws IOException { + { ObjectNode objectNode = (ObjectNode) node; for (String field : NESTED_FIELDS_WITHOUT_SCHEMA) { if (node.has(field)) { @@ -155,7 +153,6 @@ private byte[] transformJsonNode(JsonNode node) } } } - - return node.binaryValue(); + return objectNode.toString().getBytes(UTF_8); } } diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailFileReaderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReaderFactory.java similarity index 87% rename from lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailFileReaderFactory.java rename to lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReaderFactory.java index 00c74934f133..6b9edca0bab2 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailFileReaderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReaderFactory.java @@ -23,17 +23,17 @@ import static io.trino.hive.formats.HiveClassNames.CLOUDTRAIL_INPUT_FORMAT; -public class CloudTrailFileReaderFactory +public class CloudTrailLineReaderFactory extends TextLineReaderFactory { - - public CloudTrailFileReaderFactory(int fileBufferSize, int initialLineBufferSize, int maxLineLength) + public CloudTrailLineReaderFactory(int fileBufferSize, int initialLineBufferSize, int maxLineLength) { super(fileBufferSize, initialLineBufferSize, maxLineLength); } @Override - public Set getHiveInputFormatClassNames() { + public Set getHiveInputFormatClassNames() + { return ImmutableSet.of(CLOUDTRAIL_INPUT_FORMAT); } @@ -47,6 +47,6 @@ public LineReader createLineReader( throws IOException { LineReader delegate = super.createLineReader(inputFile, start, length, headerCount, footerCount); - return new CloudtrailLineReader(delegate, this::createLineBuffer); + return new CloudTrailLineReader(delegate, this::createLineBuffer); } } diff --git a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/line/cloudtrail/TestCloudTrailLineReader.java b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/line/cloudtrail/TestCloudTrailLineReader.java new file mode 100644 index 000000000000..b708d344e3da --- /dev/null +++ b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/line/cloudtrail/TestCloudTrailLineReader.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.line.cloudtrail; + +import com.google.common.collect.ImmutableList; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.memory.MemoryInputFile; +import io.trino.hive.formats.line.LineBuffer; +import io.trino.hive.formats.line.LineReader; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static io.airlift.slice.Slices.utf8Slice; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestCloudTrailLineReader +{ + @Test + public void testBasicRowParsing() + throws IOException + { + String data = "{\"Records\":[" + + // try different spacing, should all work + "{ \"data\":\"1\" }," + + "{\"data\":\"2\" }," + + "{\"data\": \"3\"}" + + "]}"; + + testAllValues(data, ImmutableList.of("{\"data\":\"1\"}", "{\"data\":\"2\"}", "{\"data\":\"3\"}")); + } + + @Test + public void testSkipMalformedDocument() + throws IOException + { + String data = "{randomcontent}"; + + testAllValues(data, ImmutableList.of()); + } + + @Test + public void testCloudTrailTransform() + throws IOException + { + String data = "{\"Records\":[" + + // try different spacing, should all work + "{ \"requestParameters\": {\"data\": \"1\"} }," + + "{ \"responseElements\": {\"data\": \"2\"} }," + + "{ \"additionalEventData\": {\"data\": \"3\"} }," + + "{ \"requestParameters\": {\"data\": \"4\"}, \"additionalEventData\": {\"data\": \"5\"} }" + + "]}"; + + testAllValues(data, ImmutableList.of("{\"requestParameters\":\"{\\\"data\\\":\\\"1\\\"}\"}", + "{\"responseElements\":\"{\\\"data\\\":\\\"2\\\"}\"}", + "{\"additionalEventData\":\"{\\\"data\\\":\\\"3\\\"}\"}", + "{\"requestParameters\":\"{\\\"data\\\":\\\"4\\\"}\",\"additionalEventData\":\"{\\\"data\\\":\\\"5\\\"}\"}")); + } + + @Test + public void testCloudTrailAsStringColumn() + throws IOException + { + String data = "{\"Records\":[" + + // try different spacing, should all work + "{ \"userIdentity\": {\"data\": \"1\"} }" + + "]}"; + + testAllValues(data, ImmutableList.of("{\"userIdentity\":{\"data\":\"1\"},\"userIdentity_as_string\":\"{\\\"data\\\":\\\"1\\\"}\"}")); + } + + private static void testAllValues(String data, List expectedValues) + throws IOException + { + CloudTrailLineReaderFactory readerFactory = new CloudTrailLineReaderFactory(1024, 1024, 8096); + TrinoInputFile file = new MemoryInputFile(Location.of("memory:///test"), utf8Slice(data)); + LineReader lineReader = readerFactory.createLineReader(file, 0, 1, 0, 0); + LineBuffer lineBuffer = new LineBuffer(1, 1024); + for (String expectedValue : expectedValues) { + lineReader.readLine(lineBuffer); + assertThat(new String(lineBuffer.getBuffer(), 0, lineBuffer.getLength(), StandardCharsets.UTF_8)).isEqualTo(expectedValue); + } + assertThat(lineReader.readLine(lineBuffer)).isFalse(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 62540dbacff4..3c057646f859 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -27,7 +27,6 @@ import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; import io.trino.plugin.hive.line.CloudtrailJsonPageSourceFactory; -import io.trino.plugin.hive.line.CloudtrailOpenXJsonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -124,7 +123,6 @@ public void configure(Binder binder) pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(AvroPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(CloudtrailJsonPageSourceFactory.class).in(Scopes.SINGLETON); - pageSourceFactoryBinder.addBinding().to(CloudtrailOpenXJsonPageSourceFactory.class).in(Scopes.SINGLETON); Multibinder fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class); binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java index 5e235be5de65..aa0ce1b86a61 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java @@ -15,7 +15,7 @@ import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.hive.formats.line.cloudtrail.CloudTrailFileReaderFactory; +import io.trino.hive.formats.line.cloudtrail.CloudTrailLineReaderFactory; import io.trino.hive.formats.line.json.JsonDeserializerFactory; import io.trino.plugin.hive.HiveConfig; @@ -29,6 +29,6 @@ public CloudtrailJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFac { super(trinoFileSystemFactory, new JsonDeserializerFactory(), - new CloudTrailFileReaderFactory(1024, 1024, toIntExact(config.getTextMaxLineLength().toBytes()))); + new CloudTrailLineReaderFactory(1024, 1024, toIntExact(config.getTextMaxLineLength().toBytes()))); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java deleted file mode 100644 index 23b14ec8037c..000000000000 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailOpenXJsonPageSourceFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hive.line; - -import com.google.inject.Inject; -import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.hive.formats.line.cloudtrail.CloudTrailFileReaderFactory; -import io.trino.hive.formats.line.openxjson.OpenXJsonDeserializerFactory; -import io.trino.plugin.hive.HiveConfig; - -import static java.lang.Math.toIntExact; - -public class CloudtrailOpenXJsonPageSourceFactory - extends LinePageSourceFactory -{ - @Inject - public CloudtrailOpenXJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config) - { - super(trinoFileSystemFactory, - new OpenXJsonDeserializerFactory(), - new CloudTrailFileReaderFactory(1024, 1024, toIntExact(config.getTextMaxLineLength().toBytes()))); - } -} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index 6660e26a02e9..03ca7472ba8f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -191,7 +191,6 @@ public static Set getDefaultHivePageSourceFactories(Trino .add(new OrcPageSourceFactory(new OrcReaderConfig(), fileSystemFactory, stats, hiveConfig)) .add(new ParquetPageSourceFactory(fileSystemFactory, stats, new ParquetReaderConfig(), hiveConfig)) .add(new CloudtrailJsonPageSourceFactory(fileSystemFactory, hiveConfig)) - .add(new CloudtrailJsonPageSourceFactory(fileSystemFactory, hiveConfig)) .build(); } From a5776d02066f8598c5ac81cf444d1700cd4995c4 Mon Sep 17 00:00:00 2001 From: zhaner08 Date: Wed, 5 Feb 2025 21:25:28 -0500 Subject: [PATCH 3/5] Update class name --- .../java/io/trino/plugin/hive/HiveModule.java | 16 +++++++++++++--- ...java => CloudTrailJsonPageSourceFactory.java} | 4 ++-- .../java/io/trino/plugin/hive/HiveTestUtils.java | 4 ++-- 3 files changed, 17 insertions(+), 7 deletions(-) rename plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/{CloudtrailJsonPageSourceFactory.java => CloudTrailJsonPageSourceFactory.java} (91%) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 3c057646f859..ebb2126433fb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive; +import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; @@ -26,7 +27,7 @@ import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; -import io.trino.plugin.hive.line.CloudtrailJsonPageSourceFactory; +import io.trino.plugin.hive.line.CloudTrailJsonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -55,6 +56,8 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.table.ConnectorTableFunction; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -84,6 +87,8 @@ public void configure(Binder binder) binder.bind(HiveViewProperties.class).in(Scopes.SINGLETON); binder.bind(HiveColumnProperties.class).in(Scopes.SINGLETON); binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, HiveMaterializedViewPropertiesProvider.class) + .setDefault().toInstance(ImmutableList::of); binder.bind(CachingDirectoryLister.class).in(Scopes.SINGLETON); newExporter(binder).export(CachingDirectoryLister.class).withGeneratedName(); @@ -96,13 +101,15 @@ public void configure(Binder binder) Multibinder systemTableProviders = newSetBinder(binder, SystemTableProvider.class); systemTableProviders.addBinding().to(PartitionsSystemTableProvider.class).in(Scopes.SINGLETON); systemTableProviders.addBinding().to(PropertiesSystemTableProvider.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, HiveRedirectionsProvider.class) + .setDefault().to(NoneHiveRedirectionsProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, TransactionalMetadataFactory.class) .setDefault().to(HiveMetadataFactory.class).in(Scopes.SINGLETON); binder.bind(TransactionScopeCachingDirectoryListerFactory.class).in(Scopes.SINGLETON); binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON); newExporter(binder).export(ConnectorSplitManager.class).as(generator -> generator.generatedNameOf(HiveSplitManager.class)); - binder.bind(ConnectorPageSourceProvider.class).to(HivePageSourceProvider.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, ConnectorPageSourceProvider.class).setDefault().to(HivePageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(HivePageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON); @@ -122,7 +129,7 @@ public void configure(Binder binder) pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(AvroPageSourceFactory.class).in(Scopes.SINGLETON); - pageSourceFactoryBinder.addBinding().to(CloudtrailJsonPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(CloudTrailJsonPageSourceFactory.class).in(Scopes.SINGLETON); Multibinder fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class); binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON); @@ -143,6 +150,9 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetWriterConfig.class); fileWriterFactoryBinder.addBinding().to(ParquetFileWriterFactory.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, FunctionProvider.class).setDefault().toInstance(new NoopFunctionProvider()); + newSetBinder(binder, ConnectorTableFunction.class); + closingBinder(binder).registerExecutor(ExecutorService.class); closingBinder(binder).registerExecutor(Key.get(ScheduledExecutorService.class, ForHiveTransactionHeartbeats.class)); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudTrailJsonPageSourceFactory.java similarity index 91% rename from plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java rename to plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudTrailJsonPageSourceFactory.java index aa0ce1b86a61..f50cdee425f8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudtrailJsonPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudTrailJsonPageSourceFactory.java @@ -21,11 +21,11 @@ import static java.lang.Math.toIntExact; -public class CloudtrailJsonPageSourceFactory +public class CloudTrailJsonPageSourceFactory extends LinePageSourceFactory { @Inject - public CloudtrailJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config) + public CloudTrailJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config) { super(trinoFileSystemFactory, new JsonDeserializerFactory(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index 03ca7472ba8f..9e8d0b9fbb11 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -36,7 +36,7 @@ import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.hive.avro.AvroFileWriterFactory; import io.trino.plugin.hive.avro.AvroPageSourceFactory; -import io.trino.plugin.hive.line.CloudtrailJsonPageSourceFactory; +import io.trino.plugin.hive.line.CloudTrailJsonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -190,7 +190,7 @@ public static Set getDefaultHivePageSourceFactories(Trino .add(new RcFilePageSourceFactory(fileSystemFactory, hiveConfig)) .add(new OrcPageSourceFactory(new OrcReaderConfig(), fileSystemFactory, stats, hiveConfig)) .add(new ParquetPageSourceFactory(fileSystemFactory, stats, new ParquetReaderConfig(), hiveConfig)) - .add(new CloudtrailJsonPageSourceFactory(fileSystemFactory, hiveConfig)) + .add(new CloudTrailJsonPageSourceFactory(fileSystemFactory, hiveConfig)) .build(); } From 12867a1df1a3683b0466a1f82ab417fef4396a1c Mon Sep 17 00:00:00 2001 From: zhaner08 Date: Wed, 5 Feb 2025 21:31:25 -0500 Subject: [PATCH 4/5] Remove non related changes --- .../main/java/io/trino/plugin/hive/HiveModule.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index ebb2126433fb..0ad678b7861f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hive; -import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; @@ -56,8 +55,6 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSplitManager; -import io.trino.spi.function.FunctionProvider; -import io.trino.spi.function.table.ConnectorTableFunction; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -87,8 +84,6 @@ public void configure(Binder binder) binder.bind(HiveViewProperties.class).in(Scopes.SINGLETON); binder.bind(HiveColumnProperties.class).in(Scopes.SINGLETON); binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON); - newOptionalBinder(binder, HiveMaterializedViewPropertiesProvider.class) - .setDefault().toInstance(ImmutableList::of); binder.bind(CachingDirectoryLister.class).in(Scopes.SINGLETON); newExporter(binder).export(CachingDirectoryLister.class).withGeneratedName(); @@ -101,15 +96,13 @@ public void configure(Binder binder) Multibinder systemTableProviders = newSetBinder(binder, SystemTableProvider.class); systemTableProviders.addBinding().to(PartitionsSystemTableProvider.class).in(Scopes.SINGLETON); systemTableProviders.addBinding().to(PropertiesSystemTableProvider.class).in(Scopes.SINGLETON); - newOptionalBinder(binder, HiveRedirectionsProvider.class) - .setDefault().to(NoneHiveRedirectionsProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, TransactionalMetadataFactory.class) .setDefault().to(HiveMetadataFactory.class).in(Scopes.SINGLETON); binder.bind(TransactionScopeCachingDirectoryListerFactory.class).in(Scopes.SINGLETON); binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON); newExporter(binder).export(ConnectorSplitManager.class).as(generator -> generator.generatedNameOf(HiveSplitManager.class)); - newOptionalBinder(binder, ConnectorPageSourceProvider.class).setDefault().to(HivePageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(ConnectorPageSourceProvider.class).to(HivePageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(HivePageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON); @@ -150,9 +143,6 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetWriterConfig.class); fileWriterFactoryBinder.addBinding().to(ParquetFileWriterFactory.class).in(Scopes.SINGLETON); - newOptionalBinder(binder, FunctionProvider.class).setDefault().toInstance(new NoopFunctionProvider()); - newSetBinder(binder, ConnectorTableFunction.class); - closingBinder(binder).registerExecutor(ExecutorService.class); closingBinder(binder).registerExecutor(Key.get(ScheduledExecutorService.class, ForHiveTransactionHeartbeats.class)); } From 088823592e430c9d6842c264b5517613c8f65643 Mon Sep 17 00:00:00 2001 From: zhaner08 Date: Thu, 6 Feb 2025 22:15:24 -0500 Subject: [PATCH 5/5] Fix unit tests --- .../main/java/io/trino/plugin/hive/HiveStorageFormat.java | 7 ++++++- .../java/io/trino/plugin/hive/BaseHiveConnectorTest.java | 6 ++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java index b9036479cff0..6c897f38a91d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java @@ -32,6 +32,7 @@ import static io.trino.hive.formats.HiveClassNames.AVRO_CONTAINER_INPUT_FORMAT_CLASS; import static io.trino.hive.formats.HiveClassNames.AVRO_CONTAINER_OUTPUT_FORMAT_CLASS; import static io.trino.hive.formats.HiveClassNames.AVRO_SERDE_CLASS; +import static io.trino.hive.formats.HiveClassNames.CLOUDTRAIL_INPUT_FORMAT; import static io.trino.hive.formats.HiveClassNames.COLUMNAR_SERDE_CLASS; import static io.trino.hive.formats.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS; import static io.trino.hive.formats.HiveClassNames.HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS; @@ -101,6 +102,10 @@ public enum HiveStorageFormat REGEX( REGEX_SERDE_CLASS, TEXT_INPUT_FORMAT_CLASS, + HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS), + CLOUDTRAIL( + JSON_SERDE_CLASS, + CLOUDTRAIL_INPUT_FORMAT, HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS); private final String serde; @@ -134,7 +139,7 @@ public boolean isSplittable(String path) // Only uncompressed text input format is splittable return switch (this) { case ORC, PARQUET, AVRO, RCBINARY, RCTEXT, SEQUENCEFILE -> true; - case JSON, OPENX_JSON, TEXTFILE, CSV, REGEX -> CompressionKind.forFile(path).isEmpty(); + case JSON, OPENX_JSON, TEXTFILE, CSV, REGEX, CLOUDTRAIL -> CompressionKind.forFile(path).isEmpty(); }; } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 3b53186f5c20..c923078293c4 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -142,6 +142,7 @@ import static io.trino.plugin.hive.HiveQueryRunner.HIVE_CATALOG; import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA; import static io.trino.plugin.hive.HiveQueryRunner.createBucketedSession; +import static io.trino.plugin.hive.HiveStorageFormat.CLOUDTRAIL; import static io.trino.plugin.hive.HiveStorageFormat.ORC; import static io.trino.plugin.hive.HiveStorageFormat.PARQUET; import static io.trino.plugin.hive.HiveStorageFormat.REGEX; @@ -5702,6 +5703,7 @@ private boolean isMappingByName(HiveStorageFormat format) case TEXTFILE -> false; case CSV -> false; case REGEX -> false; + case CLOUDTRAIL -> false; }; } @@ -9502,8 +9504,8 @@ private List getAllTestingHiveStorageFormat() // CSV supports only unbounded VARCHAR type continue; } - if (hiveStorageFormat == REGEX) { - // REGEX format is read-only + if (hiveStorageFormat == REGEX || hiveStorageFormat == CLOUDTRAIL) { + // REGEX and CLOUDTRAIL format is read-only continue; }