diff --git a/README.md b/README.md index b26e7a3..9de4cff 100644 --- a/README.md +++ b/README.md @@ -78,3 +78,19 @@ Reporting Kafka Metrics Refer the following [confluent documentation](https://docs.confluent.io/current/kafka/metrics-reporter.html) to access kafka related metrics. + +----------------------- +Supported Data Types +----------------------- +Currently supported native database types are: ASCII, BIGINT, BLOB, BOOLEAN, DATE, DECIMAL, DOUBLE, +DURATION, FLOAT, INET, INT, SMALLINT, TEXT, TIME, TIMESTAMP, TIMEUUID, TINYINT, UUID, VARCHAR, VARINT. + +COUNTERs are not supported. + +Collections, UDTs and Tuples are supported, although UDTs require proper setup. +To ensure connector knows how to handle User Types it is necessary that they are already defined in +target keyspace when connector starts. + +Best way to ensure conformity with specific table schema is to create table beforehand and use +connector with `TABLE_MANAGE_ENABLED_CONFIG` set to `false`. Connector will try to reasonably +convert most of the types (e.g. IP address provided as text string should be possible to insert into INET column). \ No newline at end of file diff --git a/src/main/java/io/connect/scylladb/RecordToBoundStatementConverter.java b/src/main/java/io/connect/scylladb/RecordToBoundStatementConverter.java index 0a1b211..94138ec 100644 --- a/src/main/java/io/connect/scylladb/RecordToBoundStatementConverter.java +++ b/src/main/java/io/connect/scylladb/RecordToBoundStatementConverter.java @@ -1,8 +1,11 @@ package io.connect.scylladb; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.LocalDate; -import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.CodecNotFoundException; +import com.datastax.driver.core.schemabuilder.UDTType; +import io.connect.scylladb.codec.ListTupleCodec; +import io.connect.scylladb.codec.MapUDTCodec; +import io.connect.scylladb.codec.StructTupleCodec; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -158,7 +161,25 @@ protected void setStructField( String fieldName, Struct value ) { - throw new UnsupportedOperationException(); + DataType colType = preparedStatement.getVariables().getType(fieldName); + switch (colType.getName()){ + case TUPLE: + TypeCodec codec; + try{ + codec = preparedStatement.getCodecRegistry().codecFor(colType, Struct.class); + } catch (CodecNotFoundException e) { + preparedStatement.getCodecRegistry().register(new StructTupleCodec(preparedStatement.getCodecRegistry(), (TupleType) colType)); + codec = preparedStatement.getCodecRegistry().codecFor(colType, Struct.class); + } + state.statement.set(fieldName, value, codec); + break; + case UDT: + state.statement.set(fieldName, value, preparedStatement.getCodecRegistry().codecFor(preparedStatement.getVariables().getType(fieldName), value)); + break; + default: + throw new UnsupportedOperationException("No behavior implemented for inserting Kafka Struct into " + colType.getName()); + } + state.parameters++; } protected void setArray( @@ -167,7 +188,21 @@ protected void setArray( Schema schema, List value ) { - state.statement.setList(fieldName, value); + DataType colType = preparedStatement.getVariables().getType(fieldName); + switch (colType.getName()){ + case TUPLE: + TypeCodec codec; + try{ + codec = preparedStatement.getCodecRegistry().codecFor(colType, List.class); + } catch (CodecNotFoundException e) { + preparedStatement.getCodecRegistry().register(new ListTupleCodec(preparedStatement.getCodecRegistry(), (TupleType) colType)); + codec = preparedStatement.getCodecRegistry().codecFor(colType, List.class); + } + state.statement.set(fieldName, value, codec); + break; + default: + state.statement.setList(fieldName, value); + } state.parameters++; } @@ -177,7 +212,24 @@ protected void setMap( Schema schema, Map value ) { - state.statement.setMap(fieldName, value); + DataType colType = preparedStatement.getVariables().getType(fieldName); + switch (colType.getName()){ + case UDT: + TypeCodec codec; + try{ + codec = preparedStatement.getCodecRegistry().codecFor(colType, Map.class); + } catch (CodecNotFoundException e) { + preparedStatement.getCodecRegistry().register(new MapUDTCodec(preparedStatement.getCodecRegistry(), (UserType) colType)); + codec = preparedStatement.getCodecRegistry().codecFor(colType, Map.class); + } + state.statement.set(fieldName, value, codec); + break; + case MAP: + state.statement.setMap(fieldName, value); + break; + default: + throw new UnsupportedOperationException("No behavior implemented for inserting Java Map into " + colType.getName()); + } state.parameters++; } diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java b/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java index 1d66fc0..2283c1d 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSchemaBuilder.java @@ -125,6 +125,14 @@ DataType dataType(Schema schema) { case STRING: dataType = DataType.varchar(); break; + case STRUCT: + dataType = session.keyspaceMetadata(config.keyspace).getUserType(schema.name()); + if (dataType == null){ + throw new DataException( + String.format("Couldn't find user type %s in keyspace %s", schema.name(), config.keyspace) + ); + } + break; default: throw new DataException( String.format("Unsupported type %s", schema.type()) diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java b/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java index 0f65b55..1dff860 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSessionFactory.java @@ -2,10 +2,12 @@ import com.datastax.driver.core.Cluster; import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.RemoteEndpointAwareNettySSLOptions; import com.datastax.driver.core.SSLOptions; import com.datastax.driver.core.Session; +import com.datastax.driver.core.UserType; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.extras.codecs.date.SimpleDateCodec; import io.connect.scylladb.codec.ConvenienceCodecs; @@ -14,6 +16,7 @@ import io.connect.scylladb.codec.StringTimeUuidCodec; import io.connect.scylladb.codec.StringUuidCodec; import io.connect.scylladb.codec.StringVarintCodec; +import io.connect.scylladb.codec.StructUDTCodec; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import org.apache.kafka.connect.errors.ConnectException; @@ -36,6 +39,8 @@ import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; +import java.util.Collection; +import java.util.List; public class ScyllaDbSessionFactory { @@ -148,6 +153,17 @@ public ScyllaDbSession newSession(ScyllaDbSinkConnectorConfig config) { Cluster cluster = clusterBuilder.build(); log.info("Creating session"); final Session session = cluster.connect(); + KeyspaceMetadata ks = session.getCluster().getMetadata().getKeyspace(config.keyspace); + if(ks != null) { + Collection userTypes = ks.getUserTypes(); + userTypes.stream().forEach(t -> { + CODEC_REGISTRY.register(new StructUDTCodec(CODEC_REGISTRY, t)); + }); + } + else{ + log.warn("Received null cluster metadata. Unable to register KafkaStruct to UserType codecs if any exist."); + } + return new ScyllaDbSessionImpl(config, cluster, session); } diff --git a/src/main/java/io/connect/scylladb/codec/ListTupleCodec.java b/src/main/java/io/connect/scylladb/codec/ListTupleCodec.java new file mode 100644 index 0000000..1f76b39 --- /dev/null +++ b/src/main/java/io/connect/scylladb/codec/ListTupleCodec.java @@ -0,0 +1,52 @@ +package io.connect.scylladb.codec; + +import com.datastax.driver.core.*; +import com.datastax.driver.extras.codecs.MappingCodec; +import com.google.common.reflect.TypeToken; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class ListTupleCodec extends MappingCodec { + + private CodecRegistry registry; + private TupleType definition; + + public ListTupleCodec(CodecRegistry registry, TupleType definition) { + super(TypeCodec.tuple(definition), List.class); + this.registry = registry; + this.definition = definition; + } + + @Override + protected TupleValue serialize(List list) { + if (list == null) { + return null; + } + + int size = definition.getComponentTypes().size(); + int listSize = list.size(); + if (listSize != size) { + throw new IllegalArgumentException( + String.format("Expecting %d fields, got %d", size, listSize)); + } + + TupleValue value = definition.newValue(); + Iterator iter = list.iterator(); + for(int i = 0; i < size; i++){ + Object item = iter.next(); + DataType elementType = definition.getComponentTypes().get(i); + value.set(i, item, registry.codecFor(elementType, item)); + } + return value; + } + + @Override + protected List deserialize(TupleValue value) { throw new UnsupportedOperationException(); } + +} diff --git a/src/main/java/io/connect/scylladb/codec/MapUDTCodec.java b/src/main/java/io/connect/scylladb/codec/MapUDTCodec.java new file mode 100644 index 0000000..0483c2b --- /dev/null +++ b/src/main/java/io/connect/scylladb/codec/MapUDTCodec.java @@ -0,0 +1,58 @@ +package io.connect.scylladb.codec; + +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; +import com.datastax.driver.extras.codecs.MappingCodec; +import com.google.common.reflect.TypeToken; + +import java.util.Map; + +public class MapUDTCodec extends MappingCodec { + CodecRegistry registry; + UserType definition; + + public MapUDTCodec(CodecRegistry registry, UserType udt) { + super(registry.codecFor(udt), Map.class); + this.registry = registry; + this.definition = udt; + } + + @Override + protected UDTValue serialize(Map map) { + if (map == null || map.isEmpty()) { + return null; + } + if(!(map.keySet().iterator().next() instanceof String)){ + throw new UnsupportedOperationException("This codec (" + this.getClass().getSimpleName() + + ") handles only Maps that have String as their key type."); + } + int size = definition.getFieldNames().size(); + int mapSize = map.size(); + if (mapSize != size) { + throw new IllegalArgumentException( + String.format("Expecting %d fields, got %d", size, mapSize)); + } + + final UDTValue value = definition.newValue(); + definition.getFieldNames().stream().forEach(fieldName -> { + if (!map.containsKey(fieldName)) { + throw new IllegalArgumentException( + String.format( + "Field %s in UDT %s not found in input map", + fieldName, definition.getName())); + } + DataType fieldType = definition.getFieldType(fieldName); + value.set(fieldName, map.get(fieldName), registry.codecFor(fieldType, map.get(fieldName))); + } + ); + + return value; + } + + @Override + protected Map deserialize(UDTValue value) { + throw new UnsupportedOperationException("This codec (" + this.getClass().getSimpleName() + ") does not support deserialization from UDT to Map"); + } +} diff --git a/src/main/java/io/connect/scylladb/codec/StructTupleCodec.java b/src/main/java/io/connect/scylladb/codec/StructTupleCodec.java new file mode 100644 index 0000000..f2e3048 --- /dev/null +++ b/src/main/java/io/connect/scylladb/codec/StructTupleCodec.java @@ -0,0 +1,50 @@ +package io.connect.scylladb.codec; + +import com.datastax.driver.core.*; +import com.datastax.driver.extras.codecs.MappingCodec; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.Set; +import java.util.stream.Collectors; + +public class StructTupleCodec extends MappingCodec { + + private CodecRegistry registry; + private TupleType definition; + + public StructTupleCodec(CodecRegistry registry, TupleType definition) { + super(TypeCodec.tuple(definition), Struct.class); + this.registry = registry; + this.definition = definition; + } + + @Override + protected TupleValue serialize(Struct struct) { + if (struct == null) { + return null; + } + + int size = definition.getComponentTypes().size(); + Schema schema = struct.schema(); + int structSize = schema.fields().size(); + Set structFieldNames = schema.fields().stream().map(Field::name).collect(Collectors.toSet()); + if (structSize != size) { + throw new IllegalArgumentException( + String.format("Expecting %d fields, got %d", size, structSize)); + } + + TupleValue value = definition.newValue(); + for(int i = 0; i < size; i++){ + Object field = struct.get(schema.fields().get(i)); + DataType elementType = definition.getComponentTypes().get(i); + value.set(i, field, registry.codecFor(elementType, field)); + } + return value; + } + + @Override + protected Struct deserialize(TupleValue value) { throw new UnsupportedOperationException(); } + +} diff --git a/src/main/java/io/connect/scylladb/codec/StructUDTCodec.java b/src/main/java/io/connect/scylladb/codec/StructUDTCodec.java new file mode 100644 index 0000000..419890c --- /dev/null +++ b/src/main/java/io/connect/scylladb/codec/StructUDTCodec.java @@ -0,0 +1,59 @@ +package io.connect.scylladb.codec; + +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; +import com.datastax.driver.extras.codecs.MappingCodec; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.Set; +import java.util.stream.Collectors; + +public class StructUDTCodec extends MappingCodec { + + CodecRegistry registry; + UserType definition; + + public StructUDTCodec(CodecRegistry registry, UserType udt) { + super(registry.codecFor(udt), Struct.class); + this.registry = registry; + this.definition = udt; + } + + @Override + protected UDTValue serialize(Struct struct) { + if (struct == null) { + return null; + } + + int size = definition.getFieldNames().size(); + Schema schema = struct.schema(); + int structSize = schema.fields().size(); + Set structFieldNames = schema.fields().stream().map(Field::name).collect(Collectors.toSet()); + if (structSize != size) { + throw new IllegalArgumentException( + String.format("Expecting %d fields, got %d", size, structSize)); + } + + final UDTValue value = definition.newValue(); + definition.getFieldNames().stream().forEach(fieldName -> { + if (!structFieldNames.contains(fieldName)) { + throw new IllegalArgumentException( + String.format( + "Field %s in UDT %s not found in input struct", + fieldName, definition.getName())); + } + DataType fieldType = definition.getFieldType(fieldName); + value.set(fieldName, struct.get(fieldName), registry.codecFor(fieldType, struct.get(fieldName))); + } + ); + + return value; + } + + @Override + protected Struct deserialize(UDTValue value) { throw new UnsupportedOperationException(); } +} diff --git a/src/test/java/io/connect/scylladb/integration/ScyllaComplexTypesIT.java b/src/test/java/io/connect/scylladb/integration/ScyllaComplexTypesIT.java new file mode 100644 index 0000000..f128292 --- /dev/null +++ b/src/test/java/io/connect/scylladb/integration/ScyllaComplexTypesIT.java @@ -0,0 +1,352 @@ +package io.connect.scylladb.integration; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.connect.scylladb.ScyllaDbSinkConnector; +import io.connect.scylladb.ScyllaDbSinkConnectorConfig; +import io.connect.scylladb.ScyllaDbSinkTask; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; + +public class ScyllaComplexTypesIT { + private static final Logger log = LoggerFactory.getLogger(ScyllaNativeTypesIT.class); + + private static final boolean DROP_TEST_TABLES = true; // Set false to later inspect results manually. + private static String currentTestTable; + + static String SCYLLA_DB_CONTACT_POINT = "localhost"; // Default value, overwritten later by Properties. + static final int SCYLLA_DB_PORT = 9042; + static final String SCYLLADB_KEYSPACE = "testkeyspace"; + private static final String SCYLLADB_OFFSET_TABLE = "kafka_connect_offsets"; + private ScyllaDbSinkConnector connector; + static final int MAX_CONNECTION_RETRIES = 10; + + static Cluster.Builder clusterBuilder() { + Cluster.Builder clusterBuilder = Cluster.builder() + .withPort(SCYLLA_DB_PORT) + .addContactPoints(SCYLLA_DB_CONTACT_POINT) + .withProtocolVersion(ProtocolVersion.NEWEST_SUPPORTED); + return clusterBuilder; + } + + private static final Map settings = new HashMap<>(ImmutableMap.builder() + .put(ScyllaDbSinkConnectorConfig.TABLE_MANAGE_ENABLED_CONFIG, "false") + .put(ScyllaDbSinkConnectorConfig.KEYSPACE_CONFIG, SCYLLADB_KEYSPACE) + .put(ScyllaDbSinkConnectorConfig.KEYSPACE_CREATE_ENABLED_CONFIG, "true") + .put(ScyllaDbSinkConnectorConfig.CONTACT_POINTS_CONFIG, SCYLLA_DB_CONTACT_POINT) + .put(ScyllaDbSinkConnectorConfig.PORT_CONFIG, String.valueOf(SCYLLA_DB_PORT)) + .put(ScyllaDbSinkConnectorConfig.KEYSPACE_REPLICATION_FACTOR_CONFIG, "1") + .build()); + + @BeforeAll + public static void setupKeyspace() throws InterruptedException { + Properties systemProperties = System.getProperties(); + SCYLLA_DB_CONTACT_POINT = systemProperties.getProperty("scylla.docker.hostname", "localhost"); + Cluster.Builder builder = clusterBuilder(); + int attempts = 0; + while (++attempts < MAX_CONNECTION_RETRIES) { + try (Cluster cluster = builder.build()) { + try (Session session = cluster.connect()) { + session.execute("SELECT cql_version FROM system.local"); + break; + } + } catch (NoHostAvailableException ex) { + if(attempts >= MAX_CONNECTION_RETRIES){ + throw ex; + } + else{ + log.debug("Exception thrown: ", ex); + log.debug("Retrying..."); + Thread.sleep(1000); + } + } + } + } + + ScyllaDbSinkTask task; + SinkTaskContext sinkTaskContext; + List validations; + + public void startConnector() { + this.task = new ScyllaDbSinkTask(); + this.sinkTaskContext = mock(SinkTaskContext.class); + this.task.initialize(this.sinkTaskContext); + this.validations = new ArrayList<>(); + + connector = new ScyllaDbSinkConnector(); + connector.start(settings); + } + public void startConnector(Map customSettings) { + this.task = new ScyllaDbSinkTask(); + this.sinkTaskContext = mock(SinkTaskContext.class); + this.task.initialize(this.sinkTaskContext); + this.validations = new ArrayList<>(); + + connector = new ScyllaDbSinkConnector(); + connector.start(customSettings); + } + + @AfterEach + public void stop() { + String query = "DROP TABLE" + " " + SCYLLADB_OFFSET_TABLE; + if (IsOffsetStorageTablePresent(SCYLLADB_OFFSET_TABLE)) { + execute(query); + } + if(DROP_TEST_TABLES) { + query = "DROP TABLE IF EXISTS" + " " + currentTestTable; + execute(query); + } + this.task.stop(); + this.connector.stop(); + } + + private void execute(String cql) { + try (Cluster cluster = clusterBuilder().build()) { + try (Session session = cluster.connect(SCYLLADB_KEYSPACE)) { + log.info("Executing: '" + cql + "'"); + session.execute(cql); + log.debug("Executed: '" + cql + "'"); + } + } + } + + private Boolean IsOffsetStorageTablePresent(String tableName) { + try (Cluster cluster = clusterBuilder().build()) { + KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(SCYLLADB_KEYSPACE); + TableMetadata table = ks.getTable(tableName); + return table != null; + } + } + + private void startTask(String topic, Map customSettings){ + final TopicPartition topicPartition = new TopicPartition(topic, 1); + currentTestTable = topic; + when(this.sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(topicPartition)); + this.task.start(customSettings); + } + + private void startTask(String topic){ + startTask(topic, settings); + } + + @Test + public void insertMap(){ + final String name = "map"; + final String topic = name + "Test"; + final Map testSettings = new HashMap<>(settings); + testSettings.put(ScyllaDbSinkConnectorConfig.TABLE_MANAGE_ENABLED_CONFIG, "true"); + startConnector(testSettings); + startTask(topic, testSettings); + SinkRecord record; + + Map colValue1 = + ImmutableMap.builder().put("key1", 11).put("key2", 22).build(); + record = setupRecord(topic + "1", name, 1, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), colValue1); + task.put(ImmutableList.of(record)); + + Map colValue2 = + ImmutableMap.builder().put(11, 22L).put(33, 44L).build(); + record = setupRecord(topic + "2", name, 1, SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT64_SCHEMA).build(), colValue2); + task.put(ImmutableList.of(record)); + + Map colValue3 = + ImmutableMap.builder().put("key1", java.util.Date.from(Instant.EPOCH)).build(); + record = setupRecord(topic + "3", name, 1, SchemaBuilder.map(Schema.STRING_SCHEMA, Timestamp.SCHEMA).build(), colValue3); + task.put(ImmutableList.of(record)); + + checkCorrectness(3); + } + + @Test + public void insertStruct(){ + final String name = "struct"; + final String topic = name + "Test"; + + //Create user type in correct keyspace + execute("CREATE TYPE IF NOT EXISTS " + SCYLLADB_KEYSPACE + ".testudt4 (\n" + + " col1 int,\n" + + " col2 bigint,\n" + + " col3 text);"); + + createTestTable(topic, "col_" + name, "testudt4"); + startConnector(); + startTask(topic); + + Schema colSchema = SchemaBuilder.struct() + .name("testudt4") //Needs to match the name of existing User Type if the table is managed + // and created by connector. Otherwise ScyllaDbSchemaBuilder won't know what DataType to use. + .field("col1", Schema.INT32_SCHEMA) + .field("col2", Schema.INT64_SCHEMA) + .field("col3", Schema.STRING_SCHEMA) + .build(); + + Struct colValue = new Struct(colSchema) + .put("col1", 123) + .put("col2", 456L) + .put("col3", "text"); + + SinkRecord record = setupRecord(topic, name, 1, colSchema, colValue); + task.put(ImmutableList.of(record)); + checkCorrectness(1); + } + + @Test + public void insertSet(){ + final String name = "set"; + final String topic = name + "Test"; + createTestTable(topic, "col_" + name, "set"); + startConnector(); + startTask(topic); + + Schema colSchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build(); + List colValue = new ArrayList<>(ImmutableList.of("a", "b", "c", "a", "b")); + + SinkRecord record = setupRecord(topic, name, 1, colSchema, colValue); + task.put(ImmutableList.of(record)); + checkCorrectness(1); + } + + @Test + public void insertList(){ + final String name = "list"; + final String topic = name + "Test"; + createTestTable(topic, "col_" + name, "list"); + startConnector(); + startTask(topic); + + Schema colSchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build(); + List colValue = new ArrayList<>(ImmutableList.of("a", "b", "c", "a", "b")); + + SinkRecord record = setupRecord(topic, name, 1, colSchema, colValue); + task.put(ImmutableList.of(record)); + checkCorrectness(1); + } + + @Test + public void insertTuple(){ + final String name = "tuple"; + final String topic = name + "Test"; + createTestTable(topic, "col_" + name, "tuple"); + startConnector(); + startTask(topic); + + Schema colSchema = SchemaBuilder.struct() + .name("structToTuple") + .field("col1", Schema.STRING_SCHEMA) + .field("col2", Schema.INT32_SCHEMA) + .build(); + + Struct colValue = new Struct(colSchema) + .put("col1", "text") + .put("col2", 123); + + SinkRecord record = setupRecord(topic, name, 1, colSchema, colValue); + task.put(ImmutableList.of(record)); + checkCorrectness(1); + } + @Test + public void insertListIntoTuple(){ + final String name = "tupleFromList"; + final String topic = name + "Test"; + createTestTable(topic, "col_tupleFromList", "tuple"); + + startConnector(); + startTask(topic); + + Schema colSchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build(); + List colValue = new ArrayList<>(ImmutableList.of("a", "1")); + + SinkRecord record = setupRecord(topic, name, 1, colSchema, colValue); + task.put(ImmutableList.of(record)); + checkCorrectness(1); + } + + @Test + public void insertMapIntoUDT(){ + final String name = "mapIntoUDT"; + final String topic = name + "Test"; + SinkRecord record; + + execute("CREATE TYPE IF NOT EXISTS " + SCYLLADB_KEYSPACE + ".mapIntoUDTType (\n" + + "key1 int,\n" + + "key2 bigint);"); + + createTestTable(topic, "col_" + name, "mapIntoUDTType"); + + startConnector(); + startTask(topic); + + Map colValue1 = + ImmutableMap.builder().put("key1", 11).put("key2", 22).build(); + record = setupRecord(topic, name, 1, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), colValue1); + task.put(ImmutableList.of(record)); + + checkCorrectness(1); + } + + private SinkRecord setupRecord(String topic, String colNameSuffix, int id, Schema colSchema, Object colValue){ + Schema keySchema = SchemaBuilder.struct() + .name(topic) + .field("id", Schema.INT32_SCHEMA) + .build(); + + Schema valueSchema = SchemaBuilder.struct() + .name(topic) + .field("id", Schema.INT32_SCHEMA) + .field("col_" + colNameSuffix, colSchema) + .build(); + + Struct key = new Struct(keySchema) + .put("id", id); + + Struct value = new Struct(valueSchema) + .put("id", id) + .put("col_" + colNameSuffix, colValue); + + return new SinkRecord(topic, 0, keySchema, key, valueSchema, value, 1234L, 1234L, TimestampType.CREATE_TIME); + } + + private void createTestTable(String tableName, String colName, String colType){ + execute("CREATE TABLE IF NOT EXISTS " + SCYLLADB_KEYSPACE + "." + tableName + " (\n" + + "id int PRIMARY KEY,\n" + + colName + " " + colType +"\n" + + ");"); + } + + private void checkCorrectness(int numOfPuts){ + Boolean tableExists = IsOffsetStorageTablePresent(SCYLLADB_OFFSET_TABLE); + assertEquals(true, tableExists); + verify(this.sinkTaskContext, times(numOfPuts)).requestCommit(); + verify(this.sinkTaskContext, times(1)).assignment(); + } +}