Skip to content

Commit

Permalink
[INLONG-11227][SDK] Add Parquet formatted data sink for Transform
Browse files Browse the repository at this point in the history
  • Loading branch information
ZKpLo committed Sep 29, 2024
1 parent e0d7f8d commit cad0eff
Show file tree
Hide file tree
Showing 10 changed files with 749 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

public final class ParquetByteArrayWriter<T> implements Closeable {

private final org.apache.parquet.hadoop.ParquetWriter<T> writer;
private final ParquetOutputByteArray outputByteArray;

public static <T> ParquetByteArrayWriter<T> buildWriter(MessageType schema, ParquetWriteRunner<T> writeRunner)
throws IOException {
return new ParquetByteArrayWriter<>(new ParquetOutputByteArray(), schema, writeRunner);
}

private ParquetByteArrayWriter(ParquetOutputByteArray outputFile, MessageType schema,
ParquetWriteRunner<T> writeRunner)
throws IOException {
this.writer = new Builder<T>(outputFile)
.withType(schema)
.withWriteRunner(writeRunner)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.build();
outputByteArray = outputFile;
}

@Override
public void close() throws IOException {
this.writer.close();
}

public void write(T record) throws IOException {
this.writer.write(record);
}

public ByteArrayOutputStream getByteArrayOutputStream() {
return outputByteArray.getByteArrayOutputStream();
}

private static final class Builder<T>
extends
org.apache.parquet.hadoop.ParquetWriter.Builder<T, ParquetByteArrayWriter.Builder<T>> {

private MessageType schema;
private ParquetWriteRunner<T> writeRunner;

private Builder(OutputFile file) {
super(file);
}

public Builder<T> withType(MessageType schema) {
this.schema = schema;
return this;
}

public Builder<T> withWriteRunner(ParquetWriteRunner<T> writeRunner) {
this.writeRunner = writeRunner;
return this;
}

@Override
protected Builder<T> self() {
return this;
}

@Override
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return new ParquetByteArrayWriter.SimpleWriteSupport<>(schema, writeRunner);
}
}

private static class SimpleWriteSupport<T> extends WriteSupport<T> {

private final MessageType schema;
private final ParquetWriteRunner<T> writeRunner;
private final ParquetValueWriter valueWriter;

private RecordConsumer recordConsumer;

SimpleWriteSupport(MessageType schema, ParquetWriteRunner<T> writeRunner) {
this.schema = schema;
this.writeRunner = writeRunner;
this.valueWriter = this::write;
}

public void write(String name, Object value) {
int fieldIndex = schema.getFieldIndex(name);
PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
recordConsumer.startField(name, fieldIndex);

switch (type.getPrimitiveTypeName()) {
case INT32:
recordConsumer.addInteger((int) value);
break;
case INT64:
recordConsumer.addLong((long) value);
break;
case DOUBLE:
recordConsumer.addDouble((double) value);
break;
case BOOLEAN:
recordConsumer.addBoolean((boolean) value);
break;
case FLOAT:
recordConsumer.addFloat((float) value);
break;
case BINARY:
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) {
recordConsumer.addBinary(Binary.fromString((String) value));
} else {
throw new UnsupportedOperationException(
"Don't support writing " + type.getLogicalTypeAnnotation());
}
break;
default:
throw new UnsupportedOperationException("Don't support writing " + type.getPrimitiveTypeName());
}
recordConsumer.endField(name, fieldIndex);
}

@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(schema, Collections.emptyMap());
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(T record) {
recordConsumer.startMessage();
writeRunner.doWrite(record, valueWriter);
recordConsumer.endMessage();
}

@Override
public String getName() {
return null;
}
}

public static void main(String[] args) throws Exception {
File parquet = new File("test.parquet");

MessageType schema = new MessageType("test",
Types.required(INT64).named("id"),
Types.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("email"));

ParquetWriteRunner<Object[]> writeRunner = (record, valueWriter) -> {
valueWriter.write("id", record[0]);
valueWriter.write("email", record[1]);
};

byte[] bytes = null;
ParquetByteArrayWriter<Object[]> writer = ParquetByteArrayWriter.buildWriter(schema, writeRunner);

writer.write(new Object[]{1L, "hello1"});
writer.write(new Object[]{2L, "hello2"});
writer.close();
bytes = writer.getByteArrayOutputStream().toByteArray();

try (FileOutputStream fileOutputStream = new FileOutputStream(parquet)) {
fileOutputStream.write(bytes);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class ParquetOutputByteArray implements OutputFile {

private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

public ByteArrayOutputStream getByteArrayOutputStream() {
return byteArrayOutputStream;
}

@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return createOrOverwrite(blockSizeHint);
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return new DelegatingPositionOutputStream(byteArrayOutputStream) {

@Override
public long getPos() throws IOException {
return byteArrayOutputStream.size();
}
};
}

@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return 1024L;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
import org.apache.inlong.sdk.transform.process.Context;

import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;

/**
* ParquetSinkEncoder
*/
public class ParquetSinkEncoder implements SinkEncoder<ByteArrayOutputStream> {

protected ParquetSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();

private final List<FieldInfo> fields;
private ParquetByteArrayWriter<Object[]> writer;

public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) {
this.sinkInfo = sinkInfo;
this.fields = sinkInfo.getFields();
ArrayList<Type> typesList = new ArrayList<>();
for (FieldInfo fieldInfo : this.fields) {
typesList.add(Types.required(BINARY)
.as(LogicalTypeAnnotation.stringType())
.named(fieldInfo.getName()));
}
MessageType schema = new MessageType("Output", typesList);
ParquetWriteRunner<Object[]> writeRunner = (record, valueWriter) -> {
for (int i = 0; i < record.length; i++) {
valueWriter.write(this.fields.get(i).getName(), record[i]);
}
};
try {
writer = ParquetByteArrayWriter.buildWriter(schema, writeRunner);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public ByteArrayOutputStream encode(SinkData sinkData, Context context) {
int size = this.fields.size();
Object[] rowsInfo = new Object[size];
Arrays.fill(rowsInfo, "");
for (int i = 0; i < size; i++) {
String fieldData = sinkData.getField(this.fields.get(i).getName());
if (fieldData == null) {
continue;
}
rowsInfo[i] = fieldData;
}
try {
writer.write(rowsInfo);
} catch (Exception ignored) {

}
return writer.getByteArrayOutputStream();
}

@Override
public List<FieldInfo> getFields() {
return this.fields;
}
public byte[] mergeByteArray(List<ByteArrayOutputStream> list) {
if (list.isEmpty()) {
return null;
}
try {
this.writer.close(); // need firstly close
} catch (IOException e) {
throw new RuntimeException(e);
}
return list.get(0).toByteArray();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.encode;

public interface ParquetValueWriter {

void write(String name, Object value);
}
Loading

0 comments on commit cad0eff

Please sign in to comment.