Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write protocol should be Apache Arrow #49

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
1,171 changes: 1,171 additions & 0 deletions data-source/src/main/scala/tech/ytsaurus/client/ArrowTableRowsSerializer.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tech.ytsaurus.client;

import tech.ytsaurus.client.request.Format;
import tech.ytsaurus.client.request.SerializationContext;
import tech.ytsaurus.rpcproxy.ERowsetFormat;

import java.util.HashMap;
import java.util.Map;

public class ArrowWriteSerializationContext<T, L, D, G extends YTGetters<T, L, D>> extends SerializationContext<T> {
private final java.util.List<? extends Map.Entry<String, ? extends G.FromStruct>> rowGetters;

public ArrowWriteSerializationContext(
java.util.List<? extends Map.Entry<String, ? extends G.FromStruct>> rowGetters
) {
this.rowsetFormat = ERowsetFormat.RF_FORMAT;
this.format = new Format("arrow", new HashMap<>());
this.rowGetters = rowGetters;
}

public java.util.List<? extends Map.Entry<String, ? extends G.FromStruct>> getRowGetters() {
return rowGetters;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package tech.ytsaurus.client;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;

public class InternalRowYTGetters extends YTGetters<InternalRow, ArrayData, MapData> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package tech.ytsaurus.client;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import javax.annotation.Nullable;

import tech.ytsaurus.client.request.WriteTable;
import tech.ytsaurus.client.rows.UnversionedRow;
import tech.ytsaurus.client.rows.UnversionedRowSerializer;
import tech.ytsaurus.client.rpc.Compression;
import tech.ytsaurus.client.rpc.RpcUtil;
import tech.ytsaurus.core.tables.TableSchema;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.rpcproxy.TWriteTableMeta;


@NonNullApi
class TableWriterBaseImpl<T> extends RawTableWriterImpl {
protected @Nullable
TableSchema schema;
protected final WriteTable<T> req;
protected @Nullable
TableRowsSerializer<T> tableRowsSerializer;
private final SerializationResolver serializationResolver;
@Nullable
protected ApiServiceTransaction transaction;

TableWriterBaseImpl(WriteTable<T> req, SerializationResolver serializationResolver) {
super(req.getWindowSize(), req.getPacketSize());
this.req = req;
this.serializationResolver = serializationResolver;
var format = this.req.getSerializationContext().getFormat();
if (format.isEmpty() || !"arrow".equals(format.get().getType())) {
tableRowsSerializer = TableRowsSerializer.createTableRowsSerializer(
this.req.getSerializationContext(), serializationResolver
).orElse(null);
}
}

public void setTransaction(ApiServiceTransaction transaction) {
if (this.transaction != null) {
throw new IllegalStateException("Write transaction already started");
}
this.transaction = transaction;
}

public CompletableFuture<TableWriterBaseImpl<T>> startUploadImpl() {
TableWriterBaseImpl<T> self = this;

return startUpload.thenApply((attachments) -> {
if (attachments.size() != 1) {
throw new IllegalArgumentException("protocol error");
}
byte[] head = attachments.get(0);
if (head == null) {
throw new IllegalArgumentException("protocol error");
}

TWriteTableMeta metadata = RpcUtil.parseMessageBodyWithCompression(
head,
TWriteTableMeta.parser(),
Compression.None
);
self.schema = ApiServiceUtil.deserializeTableSchema(metadata.getSchema());
logger.debug("schema -> {}", schema.toYTree().toString());

{
var format = this.req.getSerializationContext().getFormat();
if (format.isPresent() && "arrow".equals(format.get().getType())) {
tableRowsSerializer = new ArrowTableRowsSerializer<>(
((ArrowWriteSerializationContext<T, ?, ?, ?>) this.req.getSerializationContext()).getRowGetters()
);
}
}

if (this.tableRowsSerializer == null) {
if (this.req.getSerializationContext().getObjectClass().isEmpty()) {
throw new IllegalStateException("No object clazz");
}
Class<T> objectClazz = self.req.getSerializationContext().getObjectClass().get();
if (UnversionedRow.class.equals(objectClazz)) {
this.tableRowsSerializer =
(TableRowsSerializer<T>) new TableRowsWireSerializer<>(new UnversionedRowSerializer());
} else {
this.tableRowsSerializer = new TableRowsWireSerializer<>(
serializationResolver.createWireRowSerializer(
serializationResolver.forClass(objectClazz, self.schema))
);
}
}

return self;
});
}

public boolean write(List<T> rows, TableSchema schema) throws IOException {
byte[] serializedRows = tableRowsSerializer.serializeRows(rows, schema);
return write(serializedRows);
}

@Override
public CompletableFuture<?> close() {
return super.close()
.thenCompose(response -> {
if (transaction != null && transaction.isActive()) {
return transaction.commit()
.thenApply(unused -> response);
}
return CompletableFuture.completedFuture(response);
});
}
}
177 changes: 177 additions & 0 deletions data-source/src/main/scala/tech/ytsaurus/client/YTGetters.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package tech.ytsaurus.client;

import tech.ytsaurus.typeinfo.*;
import tech.ytsaurus.yson.YsonConsumer;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Map;

public class YTGetters<Struct, List, Dict> {
public abstract class Getter {
private Getter() {
}

public abstract TiType getTiType();
}

public abstract class FromStruct extends Getter {
private FromStruct() {
}

public abstract void getYson(Struct struct, YsonConsumer ysonConsumer);
}

public abstract class FromList extends Getter {
private FromList() {
}

public abstract int getSize(List list);

public abstract void getYson(List list, int i, YsonConsumer ysonConsumer);
}

public abstract class FromStructToYson extends FromStruct {
}

public abstract class FromListToYson extends FromList {
}

public abstract class FromDict extends Getter {
public abstract FromList getKeyGetter();

public abstract FromList getValueGetter();

public abstract int getSize(Dict dict);

public abstract List getKeys(Dict dict);

public abstract List getValues(Dict dict);
}

public abstract class FromStructToNull extends FromStruct {
}

public abstract class FromListToNull extends FromList {
}

public abstract class FromStructToOptional extends FromStruct {
public abstract FromStruct getNotEmptyGetter();

public abstract boolean isEmpty(Struct struct);
}

public abstract class FromListToOptional extends FromList {
public abstract FromList getNotEmptyGetter();

public abstract boolean isEmpty(List list, int i);
}

public abstract class FromStructToString extends FromStruct {
public abstract ByteBuffer getString(Struct struct);
}

public abstract class FromListToString extends FromList {
public abstract ByteBuffer getString(List struct, int i);
}

public abstract class FromStructToByte extends FromStruct {
public abstract byte getByte(Struct struct);
}

public abstract class FromListToByte extends FromList {
public abstract byte getByte(List list, int i);
}

public abstract class FromStructToShort extends FromStruct {
public abstract short getShort(Struct struct);
}

public abstract class FromListToShort extends FromList {
public abstract short getShort(List list, int i);
}

public abstract class FromStructToInt extends FromStruct {
public abstract int getInt(Struct struct);
}

public abstract class FromListToInt extends FromList {
public abstract int getInt(List list, int i);
}

public abstract class FromStructToLong extends FromStruct {
public abstract long getLong(Struct struct);
}

public abstract class FromListToLong extends FromList {
public abstract long getLong(List list, int i);
}

public abstract class FromStructToBoolean extends FromStruct {
public abstract boolean getBoolean(Struct struct);
}

public abstract class FromListToBoolean extends FromList {
public abstract boolean getBoolean(List list, int i);
}

public abstract class FromStructToFloat extends FromStruct {
public abstract float getFloat(Struct struct);
}

public abstract class FromListToFloat extends FromList {
public abstract float getFloat(List list, int i);
}

public abstract class FromStructToDouble extends FromStruct {
public abstract double getDouble(Struct struct);
}

public abstract class FromListToDouble extends FromList {
public abstract double getDouble(List list, int i);
}

public abstract class FromStructToStruct extends FromStruct {
public abstract java.util.List<Map.Entry<String, FromStruct>> getMembersGetters();

public abstract Struct getStruct(Struct struct);
}

public abstract class FromListToStruct extends FromList {
public abstract java.util.List<Map.Entry<String, FromStruct>> getMembersGetters();

public abstract Struct getStruct(List list, int i);
}

public abstract class FromStructToList extends FromStruct {
public abstract FromList getElementGetter();

public abstract List getList(Struct struct);
}

public abstract class FromListToList extends FromList {
public abstract FromList getElementGetter();

public abstract List getList(List list, int i);
}

public abstract class FromStructToDict extends FromStruct {
public abstract FromDict getGetter();

public abstract Dict getDict(Struct struct);
}

public abstract class FromListToDict extends FromList {
public abstract FromDict getGetter();

public abstract Dict getDict(List list, int i);
}

public abstract class FromStructToBigDecimal extends FromStruct {
public abstract BigDecimal getBigDecimal(Struct struct);
}

public abstract class FromListToBigDecimal extends FromList {
public abstract BigDecimal getBigDecimal(List list, int i);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ import org.apache.spark.executor.TaskMetricUpdater
import org.apache.spark.metrics.yt.YtMetricsRegister
import org.apache.spark.metrics.yt.YtMetricsRegister.ytMetricsSource._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
import tech.ytsaurus.client.request.{TransactionalOptions, WriteTable}
import tech.ytsaurus.client.{ArrowWriteSerializationContext, CompoundClient, InternalRowYTGetters, TableWriter}
import tech.ytsaurus.core.GUID
import tech.ytsaurus.spyt.format.conf.SparkYtWriteConfiguration
import tech.ytsaurus.spyt.format.conf.YtTableSparkSettings._
import tech.ytsaurus.spyt.fs.conf._
import tech.ytsaurus.spyt.fs.path.YPathEnriched
import tech.ytsaurus.spyt.serializers.{InternalRowSerializer, WriteSchemaConverter}
import tech.ytsaurus.spyt.wrapper.LogLazy
import tech.ytsaurus.client.request.{TransactionalOptions, WriteSerializationContext, WriteTable}
import tech.ytsaurus.client.{CompoundClient, TableWriter}
import tech.ytsaurus.core.GUID
import tech.ytsaurus.spyt.format.conf.SparkYtWriteConfiguration

import java.util
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.collection.JavaConverters.seqAsJavaListConverter
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Try}

Expand Down Expand Up @@ -151,9 +153,15 @@ class YtOutputWriter(richPath: YPathEnriched,
protected def initializeWriter(): TableWriter[InternalRow] = {
val appendPath = richPath.withAttr("append", "true").toYPath
log.debugLazy(s"Initialize new write: $appendPath, transaction: $transactionGuid")
val internalRowGetters = new InternalRowYTGetters()
val request = WriteTable.builder[InternalRow]()
.setPath(appendPath)
.setSerializationContext(new WriteSerializationContext(new InternalRowSerializer(schema, WriteSchemaConverter(options))))
.setSerializationContext(new ArrowWriteSerializationContext[InternalRow, ArrayData, MapData, InternalRowYTGetters](
WriteSchemaConverter(options).ytLogicalTypeStruct(schema).fields.zipWithIndex.map {
case ((name, ytLogicalType, _), i) =>
util.Map.entry(name, ytLogicalType.ytGettersFromStruct(internalRowGetters, i))
}.asJava
))
.setTransactionalOptions(new TransactionalOptions(GUID.valueOf(transactionGuid)))
.setNeedRetries(false)
.build()
Expand Down
Loading
Loading