Skip to content

Commit

Permalink
Add complex types support and tests.
Browse files Browse the repository at this point in the history
Adds support for all Collections (List, Set, Map), UDT and Tuples.

Adds tests that attempt inserting into columns of complex type.
Those tests check if implemented codecs kick in when needed.

Kafka Struct fields get translated into UDTs. Connector will try to infer correct
UserType from table column. If table does not exist yet or is
being altered the connector will try to find UserType with the same
name as struct's schema. In all cases the UserType needs to already
be created in targeted keyspace.

Kafka Maps get translated into Scylla maps, some can be inserted into UDTs.
Kafka Arrays can be inserted into Lists, Sets and Tuples.
Kafka Structs can be inserted into UDTs and Tuples.

Updates README with basic information about supported types on Scylla side.

All added types should work with table managed by Scylla.
UDTs, Maps, Lists, Sets should work with table managed by connector.

Fixes scylladb#60
  • Loading branch information
Bouncheck committed May 30, 2022
1 parent c544b5d commit d2e1d93
Show file tree
Hide file tree
Showing 9 changed files with 669 additions and 6 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,19 @@ Reporting Kafka Metrics

Refer the following [confluent documentation](https://docs.confluent.io/current/kafka/metrics-reporter.html)
to access kafka related metrics.

-----------------------
Supported Data Types
-----------------------
Currently supported native database types are: ASCII, BIGINT, BLOB, BOOLEAN, DATE, DECIMAL, DOUBLE,
DURATION, FLOAT, INET, INT, SMALLINT, TEXT, TIME, TIMESTAMP, TIMEUUID, TINYINT, UUID, VARCHAR, VARINT.

COUNTERs are not supported.

Collections, UDTs and Tuples are supported, although UDTs require proper setup.
To ensure connector knows how to handle User Types it is necessary that they are already defined in
target keyspace when connector starts.

Best way to ensure conformity with specific table schema is to create table beforehand and use
connector with `TABLE_MANAGE_ENABLED_CONFIG` set to `false`. Connector will try to reasonably
convert most of the types (e.g. IP address provided as text string should be possible to insert into INET column).
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.connect.scylladb;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.CodecNotFoundException;
import com.datastax.driver.core.schemabuilder.UDTType;
import io.connect.scylladb.codec.ListTupleCodec;
import io.connect.scylladb.codec.MapUDTCodec;
import io.connect.scylladb.codec.StructTupleCodec;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

Expand Down Expand Up @@ -158,7 +161,25 @@ protected void setStructField(
String fieldName,
Struct value
) {
throw new UnsupportedOperationException();
DataType colType = preparedStatement.getVariables().getType(fieldName);
switch (colType.getName()){
case TUPLE:
TypeCodec<Struct> codec;
try{
codec = preparedStatement.getCodecRegistry().codecFor(colType, Struct.class);
} catch (CodecNotFoundException e) {
preparedStatement.getCodecRegistry().register(new StructTupleCodec(preparedStatement.getCodecRegistry(), (TupleType) colType));
codec = preparedStatement.getCodecRegistry().codecFor(colType, Struct.class);
}
state.statement.set(fieldName, value, codec);
break;
case UDT:
state.statement.set(fieldName, value, preparedStatement.getCodecRegistry().codecFor(preparedStatement.getVariables().getType(fieldName), value));
break;
default:
throw new UnsupportedOperationException("No behavior implemented for inserting Kafka Struct into " + colType.getName());
}
state.parameters++;
}

protected void setArray(
Expand All @@ -167,7 +188,21 @@ protected void setArray(
Schema schema,
List value
) {
state.statement.setList(fieldName, value);
DataType colType = preparedStatement.getVariables().getType(fieldName);
switch (colType.getName()){
case TUPLE:
TypeCodec<List> codec;
try{
codec = preparedStatement.getCodecRegistry().codecFor(colType, List.class);
} catch (CodecNotFoundException e) {
preparedStatement.getCodecRegistry().register(new ListTupleCodec(preparedStatement.getCodecRegistry(), (TupleType) colType));
codec = preparedStatement.getCodecRegistry().codecFor(colType, List.class);
}
state.statement.set(fieldName, value, codec);
break;
default:
state.statement.setList(fieldName, value);
}
state.parameters++;
}

Expand All @@ -177,7 +212,24 @@ protected void setMap(
Schema schema,
Map value
) {
state.statement.setMap(fieldName, value);
DataType colType = preparedStatement.getVariables().getType(fieldName);
switch (colType.getName()){
case UDT:
TypeCodec<Map> codec;
try{
codec = preparedStatement.getCodecRegistry().codecFor(colType, Map.class);
} catch (CodecNotFoundException e) {
preparedStatement.getCodecRegistry().register(new MapUDTCodec(preparedStatement.getCodecRegistry(), (UserType) colType));
codec = preparedStatement.getCodecRegistry().codecFor(colType, Map.class);
}
state.statement.set(fieldName, value, codec);
break;
case MAP:
state.statement.setMap(fieldName, value);
break;
default:
throw new UnsupportedOperationException("No behavior implemented for inserting Java Map into " + colType.getName());
}
state.parameters++;
}

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ DataType dataType(Schema schema) {
case STRING:
dataType = DataType.varchar();
break;
case STRUCT:
dataType = session.keyspaceMetadata(config.keyspace).getUserType(schema.name());
if (dataType == null){
throw new DataException(
String.format("Couldn't find user type %s in keyspace %s", schema.name(), config.keyspace)
);
}
break;
default:
throw new DataException(
String.format("Unsupported type %s", schema.type())
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.RemoteEndpointAwareNettySSLOptions;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.UserType;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.extras.codecs.date.SimpleDateCodec;
import io.connect.scylladb.codec.ConvenienceCodecs;
Expand All @@ -14,6 +16,7 @@
import io.connect.scylladb.codec.StringTimeUuidCodec;
import io.connect.scylladb.codec.StringUuidCodec;
import io.connect.scylladb.codec.StringVarintCodec;
import io.connect.scylladb.codec.StructUDTCodec;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -36,6 +39,8 @@
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Collection;
import java.util.List;

public class ScyllaDbSessionFactory {

Expand Down Expand Up @@ -148,6 +153,17 @@ public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) {
Cluster cluster = clusterBuilder.build();
log.info("Creating session");
final Session session = cluster.connect();
KeyspaceMetadata ks = session.getCluster().getMetadata().getKeyspace(config.keyspace);
if(ks != null) {
Collection<UserType> userTypes = ks.getUserTypes();
userTypes.stream().forEach(t -> {
CODEC_REGISTRY.register(new StructUDTCodec(CODEC_REGISTRY, t));
});
}
else{
log.warn("Received null cluster metadata. Unable to register KafkaStruct to UserType codecs if any exist.");
}

return new ScyllaDbSessionImpl(config, cluster, session);
}

Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/connect/scylladb/codec/ListTupleCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.connect.scylladb.codec;

import com.datastax.driver.core.*;
import com.datastax.driver.extras.codecs.MappingCodec;
import com.google.common.reflect.TypeToken;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class ListTupleCodec extends MappingCodec<List, TupleValue> {

private CodecRegistry registry;
private TupleType definition;

public ListTupleCodec(CodecRegistry registry, TupleType definition) {
super(TypeCodec.tuple(definition), List.class);
this.registry = registry;
this.definition = definition;
}

@Override
protected TupleValue serialize(List list) {
if (list == null) {
return null;
}

int size = definition.getComponentTypes().size();
int listSize = list.size();
if (listSize != size) {
throw new IllegalArgumentException(
String.format("Expecting %d fields, got %d", size, listSize));
}

TupleValue value = definition.newValue();
Iterator iter = list.iterator();
for(int i = 0; i < size; i++){
Object item = iter.next();
DataType elementType = definition.getComponentTypes().get(i);
value.set(i, item, registry.codecFor(elementType, item));
}
return value;
}

@Override
protected List deserialize(TupleValue value) { throw new UnsupportedOperationException(); }

}
58 changes: 58 additions & 0 deletions src/main/java/io/connect/scylladb/codec/MapUDTCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.connect.scylladb.codec;

import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.datastax.driver.extras.codecs.MappingCodec;
import com.google.common.reflect.TypeToken;

import java.util.Map;

public class MapUDTCodec extends MappingCodec<Map, UDTValue> {
CodecRegistry registry;
UserType definition;

public MapUDTCodec(CodecRegistry registry, UserType udt) {
super(registry.codecFor(udt), Map.class);
this.registry = registry;
this.definition = udt;
}

@Override
protected UDTValue serialize(Map map) {
if (map == null || map.isEmpty()) {
return null;
}
if(!(map.keySet().iterator().next() instanceof String)){
throw new UnsupportedOperationException("This codec (" + this.getClass().getSimpleName()
+ ") handles only Maps that have String as their key type.");
}
int size = definition.getFieldNames().size();
int mapSize = map.size();
if (mapSize != size) {
throw new IllegalArgumentException(
String.format("Expecting %d fields, got %d", size, mapSize));
}

final UDTValue value = definition.newValue();
definition.getFieldNames().stream().forEach(fieldName -> {
if (!map.containsKey(fieldName)) {
throw new IllegalArgumentException(
String.format(
"Field %s in UDT %s not found in input map",
fieldName, definition.getName()));
}
DataType fieldType = definition.getFieldType(fieldName);
value.set(fieldName, map.get(fieldName), registry.codecFor(fieldType, map.get(fieldName)));
}
);

return value;
}

@Override
protected Map deserialize(UDTValue value) {
throw new UnsupportedOperationException("This codec (" + this.getClass().getSimpleName() + ") does not support deserialization from UDT to Map");
}
}
50 changes: 50 additions & 0 deletions src/main/java/io/connect/scylladb/codec/StructTupleCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.connect.scylladb.codec;

import com.datastax.driver.core.*;
import com.datastax.driver.extras.codecs.MappingCodec;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

import java.util.Set;
import java.util.stream.Collectors;

public class StructTupleCodec extends MappingCodec<Struct, TupleValue> {

private CodecRegistry registry;
private TupleType definition;

public StructTupleCodec(CodecRegistry registry, TupleType definition) {
super(TypeCodec.tuple(definition), Struct.class);
this.registry = registry;
this.definition = definition;
}

@Override
protected TupleValue serialize(Struct struct) {
if (struct == null) {
return null;
}

int size = definition.getComponentTypes().size();
Schema schema = struct.schema();
int structSize = schema.fields().size();
Set<String> structFieldNames = schema.fields().stream().map(Field::name).collect(Collectors.toSet());
if (structSize != size) {
throw new IllegalArgumentException(
String.format("Expecting %d fields, got %d", size, structSize));
}

TupleValue value = definition.newValue();
for(int i = 0; i < size; i++){
Object field = struct.get(schema.fields().get(i));
DataType elementType = definition.getComponentTypes().get(i);
value.set(i, field, registry.codecFor(elementType, field));
}
return value;
}

@Override
protected Struct deserialize(TupleValue value) { throw new UnsupportedOperationException(); }

}
59 changes: 59 additions & 0 deletions src/main/java/io/connect/scylladb/codec/StructUDTCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.connect.scylladb.codec;

import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.datastax.driver.extras.codecs.MappingCodec;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

import java.util.Set;
import java.util.stream.Collectors;

public class StructUDTCodec extends MappingCodec<Struct, UDTValue> {

CodecRegistry registry;
UserType definition;

public StructUDTCodec(CodecRegistry registry, UserType udt) {
super(registry.codecFor(udt), Struct.class);
this.registry = registry;
this.definition = udt;
}

@Override
protected UDTValue serialize(Struct struct) {
if (struct == null) {
return null;
}

int size = definition.getFieldNames().size();
Schema schema = struct.schema();
int structSize = schema.fields().size();
Set<String> structFieldNames = schema.fields().stream().map(Field::name).collect(Collectors.toSet());
if (structSize != size) {
throw new IllegalArgumentException(
String.format("Expecting %d fields, got %d", size, structSize));
}

final UDTValue value = definition.newValue();
definition.getFieldNames().stream().forEach(fieldName -> {
if (!structFieldNames.contains(fieldName)) {
throw new IllegalArgumentException(
String.format(
"Field %s in UDT %s not found in input struct",
fieldName, definition.getName()));
}
DataType fieldType = definition.getFieldType(fieldName);
value.set(fieldName, struct.get(fieldName), registry.codecFor(fieldType, struct.get(fieldName)));
}
);

return value;
}

@Override
protected Struct deserialize(UDTValue value) { throw new UnsupportedOperationException(); }
}
Loading

0 comments on commit d2e1d93

Please sign in to comment.