-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding native support of AWS Cloudtrail input format #24479
Open
zhaner08
wants to merge
6
commits into
trinodb:master
Choose a base branch
from
zhaner08:support_aws_cloudtrail_input_format
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+358
−3
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
c5c53d4
Initial implementation for supporting AWS Cloudtrail input format
zhaner08 c4c910a
Merge branch 'trinodb:master' into support_aws_cloudtrail_input_format
zhaner08 2215e8a
Cleanup and adding tests for the format
zhaner08 a5776d0
Update class name
zhaner08 12867a1
Remove non related changes
zhaner08 0888235
Fix unit tests
zhaner08 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
158 changes: 158 additions & 0 deletions
158
...ive-formats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.hive.formats.line.cloudtrail; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import com.google.common.collect.ImmutableSet; | ||
import io.airlift.log.Logger; | ||
import io.trino.hive.formats.line.LineBuffer; | ||
import io.trino.hive.formats.line.LineReader; | ||
|
||
import java.io.IOException; | ||
import java.util.Iterator; | ||
import java.util.Set; | ||
import java.util.function.Supplier; | ||
|
||
import static java.nio.charset.StandardCharsets.UTF_8; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
/* | ||
Cloudtrail reader should be used together with a JSON serde | ||
Records from cloudtrail are json objects wrapped in an object with key named "Records" | ||
This reader reads the line, and extract and serve the record through linebuffer one at a time | ||
*/ | ||
public class CloudTrailLineReader | ||
implements LineReader | ||
{ | ||
private static final Logger log = Logger.get(CloudTrailLineReader.class); | ||
private static final String DOCUMENT_LIST_KEY = "Records"; | ||
private static final String AS_STRING_SUFFIX = "_as_string"; | ||
// Nested fields that are returned as String regardless of their nested schemas | ||
private static final Set<String> NESTED_FIELDS_WITHOUT_SCHEMA = ImmutableSet.of( | ||
"requestParameters", | ||
"responseElements", | ||
"additionalEventData", | ||
"serviceEventDetails"); | ||
// Fields that we will append an additional String field with a suffix | ||
private static final Set<String> AS_STRING_FIELDS = ImmutableSet.of( | ||
"userIdentity"); | ||
private static final ObjectMapper MAPPER = new ObjectMapper(); | ||
|
||
private final LineReader delegate; | ||
private Iterator<JsonNode> records; | ||
private LineBuffer fileLineBuffer; | ||
|
||
public CloudTrailLineReader(LineReader delegate, Supplier<LineBuffer> lineBufferSupplier) | ||
throws IOException | ||
{ | ||
this.delegate = requireNonNull(delegate, "delegate is null"); | ||
fileLineBuffer = lineBufferSupplier.get(); | ||
} | ||
|
||
@Override | ||
public boolean isClosed() | ||
{ | ||
return delegate.isClosed(); | ||
} | ||
|
||
@Override | ||
public long getRetainedSize() | ||
{ | ||
return delegate.getRetainedSize(); | ||
} | ||
|
||
@Override | ||
public long getBytesRead() | ||
{ | ||
return delegate.getBytesRead(); | ||
} | ||
|
||
@Override | ||
public long getReadTimeNanos() | ||
{ | ||
return delegate.getReadTimeNanos(); | ||
} | ||
|
||
@Override | ||
public boolean readLine(LineBuffer lineBuffer) | ||
throws IOException | ||
{ | ||
lineBuffer.reset(); | ||
|
||
if (records != null && records.hasNext()) { | ||
lineBuffer.write(transformJsonNode(records.next())); | ||
return true; | ||
} | ||
|
||
while (delegate.readLine(fileLineBuffer)) { | ||
try { | ||
JsonNode node = MAPPER.readValue(fileLineBuffer.getBuffer(), JsonNode.class); | ||
records = node.get(DOCUMENT_LIST_KEY).elements(); | ||
} | ||
catch (Exception e) { | ||
// We do not throw error in order to fully read cloudtrail records and ignored backward incompatible records | ||
log.error(e, "Encountered an exception while parsing CloudTrail records"); | ||
continue; | ||
} | ||
|
||
if (records.hasNext()) { | ||
lineBuffer.write(transformJsonNode(records.next())); | ||
return true; | ||
} | ||
} | ||
|
||
// If there is no record left and not line left from the split, cleanup the resources | ||
close(); | ||
return false; | ||
} | ||
|
||
@Override | ||
public void close() | ||
throws IOException | ||
{ | ||
fileLineBuffer = null; | ||
delegate.close(); | ||
} | ||
|
||
private byte[] transformJsonNode(JsonNode node) | ||
{ | ||
ObjectNode objectNode = (ObjectNode) node; | ||
for (String field : NESTED_FIELDS_WITHOUT_SCHEMA) { | ||
if (node.has(field)) { | ||
JsonNode fieldNode = node.get(field); | ||
if (fieldNode.isNull()) { | ||
objectNode.remove(field); | ||
} | ||
else { | ||
objectNode.put(field, fieldNode.toString()); | ||
} | ||
} | ||
} | ||
|
||
for (String field : AS_STRING_FIELDS) { | ||
if (node.has(field)) { | ||
JsonNode fieldNode = node.get(field); | ||
if (fieldNode.isNull()) { | ||
objectNode.remove(field); | ||
} | ||
else { | ||
objectNode.put(field + AS_STRING_SUFFIX, fieldNode.toString()); | ||
} | ||
} | ||
} | ||
return objectNode.toString().getBytes(UTF_8); | ||
} | ||
} |
52 changes: 52 additions & 0 deletions
52
...mats/src/main/java/io/trino/hive/formats/line/cloudtrail/CloudTrailLineReaderFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.hive.formats.line.cloudtrail; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
import io.trino.filesystem.TrinoInputFile; | ||
import io.trino.hive.formats.line.LineReader; | ||
import io.trino.hive.formats.line.text.TextLineReaderFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.Set; | ||
|
||
import static io.trino.hive.formats.HiveClassNames.CLOUDTRAIL_INPUT_FORMAT; | ||
|
||
public class CloudTrailLineReaderFactory | ||
extends TextLineReaderFactory | ||
{ | ||
public CloudTrailLineReaderFactory(int fileBufferSize, int initialLineBufferSize, int maxLineLength) | ||
{ | ||
super(fileBufferSize, initialLineBufferSize, maxLineLength); | ||
} | ||
|
||
@Override | ||
public Set<String> getHiveInputFormatClassNames() | ||
{ | ||
return ImmutableSet.of(CLOUDTRAIL_INPUT_FORMAT); | ||
} | ||
|
||
@Override | ||
public LineReader createLineReader( | ||
TrinoInputFile inputFile, | ||
long start, | ||
long length, | ||
int headerCount, | ||
int footerCount) | ||
throws IOException | ||
{ | ||
LineReader delegate = super.createLineReader(inputFile, start, length, headerCount, footerCount); | ||
return new CloudTrailLineReader(delegate, this::createLineBuffer); | ||
} | ||
} |
99 changes: 99 additions & 0 deletions
99
...formats/src/test/java/io/trino/hive/formats/line/cloudtrail/TestCloudTrailLineReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.hive.formats.line.cloudtrail; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import io.trino.filesystem.Location; | ||
import io.trino.filesystem.TrinoInputFile; | ||
import io.trino.filesystem.memory.MemoryInputFile; | ||
import io.trino.hive.formats.line.LineBuffer; | ||
import io.trino.hive.formats.line.LineReader; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
|
||
import static io.airlift.slice.Slices.utf8Slice; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public class TestCloudTrailLineReader | ||
{ | ||
@Test | ||
public void testBasicRowParsing() | ||
throws IOException | ||
{ | ||
String data = "{\"Records\":[" + | ||
// try different spacing, should all work | ||
"{ \"data\":\"1\" }," + | ||
"{\"data\":\"2\" }," + | ||
"{\"data\": \"3\"}" + | ||
"]}"; | ||
|
||
testAllValues(data, ImmutableList.of("{\"data\":\"1\"}", "{\"data\":\"2\"}", "{\"data\":\"3\"}")); | ||
} | ||
|
||
@Test | ||
public void testSkipMalformedDocument() | ||
throws IOException | ||
{ | ||
String data = "{randomcontent}"; | ||
|
||
testAllValues(data, ImmutableList.of()); | ||
} | ||
|
||
@Test | ||
public void testCloudTrailTransform() | ||
throws IOException | ||
{ | ||
String data = "{\"Records\":[" + | ||
// try different spacing, should all work | ||
"{ \"requestParameters\": {\"data\": \"1\"} }," + | ||
"{ \"responseElements\": {\"data\": \"2\"} }," + | ||
"{ \"additionalEventData\": {\"data\": \"3\"} }," + | ||
"{ \"requestParameters\": {\"data\": \"4\"}, \"additionalEventData\": {\"data\": \"5\"} }" + | ||
"]}"; | ||
|
||
testAllValues(data, ImmutableList.of("{\"requestParameters\":\"{\\\"data\\\":\\\"1\\\"}\"}", | ||
"{\"responseElements\":\"{\\\"data\\\":\\\"2\\\"}\"}", | ||
"{\"additionalEventData\":\"{\\\"data\\\":\\\"3\\\"}\"}", | ||
"{\"requestParameters\":\"{\\\"data\\\":\\\"4\\\"}\",\"additionalEventData\":\"{\\\"data\\\":\\\"5\\\"}\"}")); | ||
} | ||
|
||
@Test | ||
public void testCloudTrailAsStringColumn() | ||
throws IOException | ||
{ | ||
String data = "{\"Records\":[" + | ||
// try different spacing, should all work | ||
"{ \"userIdentity\": {\"data\": \"1\"} }" + | ||
"]}"; | ||
|
||
testAllValues(data, ImmutableList.of("{\"userIdentity\":{\"data\":\"1\"},\"userIdentity_as_string\":\"{\\\"data\\\":\\\"1\\\"}\"}")); | ||
} | ||
|
||
private static void testAllValues(String data, List<String> expectedValues) | ||
throws IOException | ||
{ | ||
CloudTrailLineReaderFactory readerFactory = new CloudTrailLineReaderFactory(1024, 1024, 8096); | ||
TrinoInputFile file = new MemoryInputFile(Location.of("memory:///test"), utf8Slice(data)); | ||
LineReader lineReader = readerFactory.createLineReader(file, 0, 1, 0, 0); | ||
LineBuffer lineBuffer = new LineBuffer(1, 1024); | ||
for (String expectedValue : expectedValues) { | ||
lineReader.readLine(lineBuffer); | ||
assertThat(new String(lineBuffer.getBuffer(), 0, lineBuffer.getLength(), StandardCharsets.UTF_8)).isEqualTo(expectedValue); | ||
} | ||
assertThat(lineReader.readLine(lineBuffer)).isFalse(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
...n/trino-hive/src/main/java/io/trino/plugin/hive/line/CloudTrailJsonPageSourceFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.plugin.hive.line; | ||
|
||
import com.google.inject.Inject; | ||
import io.trino.filesystem.TrinoFileSystemFactory; | ||
import io.trino.hive.formats.line.cloudtrail.CloudTrailLineReaderFactory; | ||
import io.trino.hive.formats.line.json.JsonDeserializerFactory; | ||
import io.trino.plugin.hive.HiveConfig; | ||
|
||
import static java.lang.Math.toIntExact; | ||
|
||
public class CloudTrailJsonPageSourceFactory | ||
extends LinePageSourceFactory | ||
{ | ||
@Inject | ||
public CloudTrailJsonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig config) | ||
{ | ||
super(trinoFileSystemFactory, | ||
new JsonDeserializerFactory(), | ||
new CloudTrailLineReaderFactory(1024, 1024, toIntExact(config.getTextMaxLineLength().toBytes()))); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's document it in
hive.md
so that the Trino users get to know about it.