Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding native support of AWS Cloudtrail input format #24479

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public final class HiveClassNames
public static final String AVRO_CONTAINER_INPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat";
public static final String AVRO_CONTAINER_OUTPUT_FORMAT_CLASS = "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat";
public static final String AVRO_SERDE_CLASS = "org.apache.hadoop.hive.serde2.avro.AvroSerDe";
public static final String CLOUDTRAIL_INPUT_FORMAT = "com.amazon.emr.cloudtrail.CloudTrailInputFormat";
public static final String COLUMNAR_SERDE_CLASS = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
public static final String FILE_INPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.FileInputFormat";
public static final String FILE_OUTPUT_FORMAT_CLASS = "org.apache.hadoop.mapred.FileOutputFormat";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hive.formats.line.cloudtrail;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.trino.hive.formats.line.LineBuffer;
import io.trino.hive.formats.line.LineReader;

import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Supplier;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

/*
Cloudtrail reader should be used together with a JSON serde
Records from cloudtrail are json objects wrapped in an object with key named "Records"
This reader reads the line, and extract and serve the record through linebuffer one at a time
*/
public class CloudTrailLineReader
implements LineReader
{
private static final Logger log = Logger.get(CloudTrailLineReader.class);
private static final String DOCUMENT_LIST_KEY = "Records";
private static final String AS_STRING_SUFFIX = "_as_string";
// Nested fields that are returned as String regardless of their nested schemas
private static final Set<String> NESTED_FIELDS_WITHOUT_SCHEMA = ImmutableSet.of(
"requestParameters",
"responseElements",
"additionalEventData",
"serviceEventDetails");
// Fields that we will append an additional String field with a suffix
private static final Set<String> AS_STRING_FIELDS = ImmutableSet.of(
"userIdentity");
private static final ObjectMapper MAPPER = new ObjectMapper();

private final LineReader delegate;
private Iterator<JsonNode> records;
private LineBuffer fileLineBuffer;

public CloudTrailLineReader(LineReader delegate, Supplier<LineBuffer> lineBufferSupplier)
throws IOException
{
this.delegate = requireNonNull(delegate, "delegate is null");
fileLineBuffer = lineBufferSupplier.get();
}

@Override
public boolean isClosed()
{
return delegate.isClosed();
}

@Override
public long getRetainedSize()
{
return delegate.getRetainedSize();
}

@Override
public long getBytesRead()
{
return delegate.getBytesRead();
}

@Override
public long getReadTimeNanos()
{
return delegate.getReadTimeNanos();
}

@Override
public boolean readLine(LineBuffer lineBuffer)
throws IOException
{
lineBuffer.reset();

if (records != null && records.hasNext()) {
lineBuffer.write(transformJsonNode(records.next()));
return true;
}

while (delegate.readLine(fileLineBuffer)) {
try {
JsonNode node = MAPPER.readValue(fileLineBuffer.getBuffer(), JsonNode.class);
records = node.get(DOCUMENT_LIST_KEY).elements();
}
catch (Exception e) {
// We do not throw error in order to fully read cloudtrail records and ignored backward incompatible records
log.error(e, "Encountered an exception while parsing CloudTrail records");
continue;
}

if (records.hasNext()) {
lineBuffer.write(transformJsonNode(records.next()));
return true;
}
}

// If there is no record left and not line left from the split, cleanup the resources
close();
return false;
}

@Override
public void close()
throws IOException
{
fileLineBuffer = null;
delegate.close();
}

private byte[] transformJsonNode(JsonNode node)
{
ObjectNode objectNode = (ObjectNode) node;
for (String field : NESTED_FIELDS_WITHOUT_SCHEMA) {
if (node.has(field)) {
JsonNode fieldNode = node.get(field);
if (fieldNode.isNull()) {
objectNode.remove(field);
}
else {
objectNode.put(field, fieldNode.toString());
}
}
}

for (String field : AS_STRING_FIELDS) {
if (node.has(field)) {
JsonNode fieldNode = node.get(field);
if (fieldNode.isNull()) {
objectNode.remove(field);
}
else {
objectNode.put(field + AS_STRING_SUFFIX, fieldNode.toString());
}
}
}
return objectNode.toString().getBytes(UTF_8);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hive.formats.line.cloudtrail;

import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.TrinoInputFile;
import io.trino.hive.formats.line.LineReader;
import io.trino.hive.formats.line.text.TextLineReaderFactory;

import java.io.IOException;
import java.util.Set;

import static io.trino.hive.formats.HiveClassNames.CLOUDTRAIL_INPUT_FORMAT;

public class CloudTrailLineReaderFactory
extends TextLineReaderFactory
{
public CloudTrailLineReaderFactory(int fileBufferSize, int initialLineBufferSize, int maxLineLength)
{
super(fileBufferSize, initialLineBufferSize, maxLineLength);
}

@Override
public Set<String> getHiveInputFormatClassNames()
{
return ImmutableSet.of(CLOUDTRAIL_INPUT_FORMAT);
}

@Override
public LineReader createLineReader(
TrinoInputFile inputFile,
long start,
long length,
int headerCount,
int footerCount)
throws IOException
{
LineReader delegate = super.createLineReader(inputFile, start, length, headerCount, footerCount);
return new CloudTrailLineReader(delegate, this::createLineBuffer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hive.formats.line.cloudtrail;

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.memory.MemoryInputFile;
import io.trino.hive.formats.line.LineBuffer;
import io.trino.hive.formats.line.LineReader;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import static io.airlift.slice.Slices.utf8Slice;
import static org.assertj.core.api.Assertions.assertThat;

public class TestCloudTrailLineReader
{
@Test
public void testBasicRowParsing()
throws IOException
{
String data = "{\"Records\":[" +
// try different spacing, should all work
"{ \"data\":\"1\" }," +
"{\"data\":\"2\" }," +
"{\"data\": \"3\"}" +
"]}";

testAllValues(data, ImmutableList.of("{\"data\":\"1\"}", "{\"data\":\"2\"}", "{\"data\":\"3\"}"));
}

@Test
public void testSkipMalformedDocument()
throws IOException
{
String data = "{randomcontent}";

testAllValues(data, ImmutableList.of());
}

@Test
public void testCloudTrailTransform()
throws IOException
{
String data = "{\"Records\":[" +
// try different spacing, should all work
"{ \"requestParameters\": {\"data\": \"1\"} }," +
"{ \"responseElements\": {\"data\": \"2\"} }," +
"{ \"additionalEventData\": {\"data\": \"3\"} }," +
"{ \"requestParameters\": {\"data\": \"4\"}, \"additionalEventData\": {\"data\": \"5\"} }" +
"]}";

testAllValues(data, ImmutableList.of("{\"requestParameters\":\"{\\\"data\\\":\\\"1\\\"}\"}",
"{\"responseElements\":\"{\\\"data\\\":\\\"2\\\"}\"}",
"{\"additionalEventData\":\"{\\\"data\\\":\\\"3\\\"}\"}",
"{\"requestParameters\":\"{\\\"data\\\":\\\"4\\\"}\",\"additionalEventData\":\"{\\\"data\\\":\\\"5\\\"}\"}"));
}

@Test
public void testCloudTrailAsStringColumn()
throws IOException
{
String data = "{\"Records\":[" +
// try different spacing, should all work
"{ \"userIdentity\": {\"data\": \"1\"} }" +
"]}";

testAllValues(data, ImmutableList.of("{\"userIdentity\":{\"data\":\"1\"},\"userIdentity_as_string\":\"{\\\"data\\\":\\\"1\\\"}\"}"));
}

private static void testAllValues(String data, List<String> expectedValues)
throws IOException
{
CloudTrailLineReaderFactory readerFactory = new CloudTrailLineReaderFactory(1024, 1024, 8096);
TrinoInputFile file = new MemoryInputFile(Location.of("memory:///test"), utf8Slice(data));
LineReader lineReader = readerFactory.createLineReader(file, 0, 1, 0, 0);
LineBuffer lineBuffer = new LineBuffer(1, 1024);
for (String expectedValue : expectedValues) {
lineReader.readLine(lineBuffer);
assertThat(new String(lineBuffer.getBuffer(), 0, lineBuffer.getLength(), StandardCharsets.UTF_8)).isEqualTo(expectedValue);
}
assertThat(lineReader.readLine(lineBuffer)).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.plugin.hive.fs.CachingDirectoryLister;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory;
import io.trino.plugin.hive.line.CloudTrailJsonPageSourceFactory;
import io.trino.plugin.hive.line.CsvFileWriterFactory;
import io.trino.plugin.hive.line.CsvPageSourceFactory;
import io.trino.plugin.hive.line.JsonFileWriterFactory;
Expand Down Expand Up @@ -121,6 +122,7 @@ public void configure(Binder binder)
pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(AvroPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(CloudTrailJsonPageSourceFactory.class).in(Scopes.SINGLETON);

Multibinder<HiveFileWriterFactory> fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class);
binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static io.trino.hive.formats.HiveClassNames.AVRO_CONTAINER_INPUT_FORMAT_CLASS;
import static io.trino.hive.formats.HiveClassNames.AVRO_CONTAINER_OUTPUT_FORMAT_CLASS;
import static io.trino.hive.formats.HiveClassNames.AVRO_SERDE_CLASS;
import static io.trino.hive.formats.HiveClassNames.CLOUDTRAIL_INPUT_FORMAT;
import static io.trino.hive.formats.HiveClassNames.COLUMNAR_SERDE_CLASS;
import static io.trino.hive.formats.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS;
import static io.trino.hive.formats.HiveClassNames.HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS;
Expand Down Expand Up @@ -101,6 +102,10 @@ public enum HiveStorageFormat
REGEX(
REGEX_SERDE_CLASS,
TEXT_INPUT_FORMAT_CLASS,
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS),
CLOUDTRAIL(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document it in hive.md so that the Trino users get to know about it.

JSON_SERDE_CLASS,
CLOUDTRAIL_INPUT_FORMAT,
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS);

private final String serde;
Expand Down Expand Up @@ -134,7 +139,7 @@ public boolean isSplittable(String path)
// Only uncompressed text input format is splittable
return switch (this) {
case ORC, PARQUET, AVRO, RCBINARY, RCTEXT, SEQUENCEFILE -> true;
case JSON, OPENX_JSON, TEXTFILE, CSV, REGEX -> CompressionKind.forFile(path).isEmpty();
case JSON, OPENX_JSON, TEXTFILE, CSV, REGEX, CLOUDTRAIL -> CompressionKind.forFile(path).isEmpty();
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.line;

import com.google.inject.Inject;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hive.formats.line.cloudtrail.CloudTrailLineReaderFactory;
import io.trino.hive.formats.line.json.JsonDeserializerFactory;
import io.trino.plugin.hive.HiveConfig;

import static java.lang.Math.toIntExact;

public class CloudTrailJsonPageSourceFactory
extends LinePageSourceFactory
{
@Inject
public CloudTrailJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config)
{
super(trinoFileSystemFactory,
new JsonDeserializerFactory(),
new CloudTrailLineReaderFactory(1024, 1024, toIntExact(config.getTextMaxLineLength().toBytes())));
}
}
Loading
Loading