Skip to content

Commit

Permalink
[improve][io] KCA: flag to force optional primitive schemas (#19951)
Browse files Browse the repository at this point in the history
Motivation
Kafka's schema has "Optional" flag that used there to validate data/allow nulls.
Pulsar's schema does not have such info which makes conversion to kafka schema lossy.

Modifications
Added a config parameter that lets one force primitive schemas into optional ones.
KV schema is always optional.

Default is false, to match existing behavior.

(cherry picked from commit 55523ac)
  • Loading branch information
dlg99 committed Mar 29, 2023
1 parent b2f734f commit b86bda3
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
private PulsarKafkaConnectSinkConfig kafkaSinkConfig;

protected String topicName;
protected boolean useOptionalPrimitives;

private boolean sanitizeTopicName = false;
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
Expand Down Expand Up @@ -165,6 +166,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
useOptionalPrimitives = kafkaSinkConfig.isUseOptionalPrimitives();

useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
Expand Down Expand Up @@ -447,8 +449,11 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
&& sourceRecord.getSchema().getSchemaInfo() != null
&& sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
// Assume Key_Value schema's key and value are always optional
keySchema = PulsarSchemaToKafkaSchema
.getOptionalKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives);
valueSchema = PulsarSchemaToKafkaSchema
.getOptionalKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives);

Object nativeObject = sourceRecord.getValue().getNativeObject();

Expand All @@ -465,12 +470,13 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
keySchema = Schema.BYTES_SCHEMA;
keySchema = useOptionalPrimitives ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
} else {
key = sourceRecord.getKey().orElse(null);
keySchema = Schema.STRING_SCHEMA;
keySchema = useOptionalPrimitives ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
}
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
valueSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(sourceRecord.getSchema(), useOptionalPrimitives);
value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
private boolean collapsePartitionedTopics = false;

@FieldDoc(
defaultValue = "false",
help = "Pulsar schema does not contain information whether the Schema is optional, Kafka's does. \n"
+ "This provides a way to force all primitive schemas to be optional for Kafka. \n")
private boolean useOptionalPrimitives = false;

public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ public Schema schema() {
}

private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToOptionalKafkaSchema;
private static final ImmutableSet<String> kafkaLogicalSchemas;
private static final AvroData avroData = new AvroData(1000);
private static final Cache<byte[], Schema> schemaCache =
CacheBuilder.newBuilder().maximumSize(10000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
private static final Cache<Schema, Schema> optionalSchemaCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();

static {
pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
Expand All @@ -135,6 +139,17 @@ public Schema schema() {
.put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
.put(SchemaType.DATE, Date.SCHEMA)
.build();
pulsarSchemaTypeToOptionalKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
.put(SchemaType.BOOLEAN, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.put(SchemaType.INT8, Schema.OPTIONAL_INT8_SCHEMA)
.put(SchemaType.INT16, Schema.OPTIONAL_INT16_SCHEMA)
.put(SchemaType.INT32, Schema.OPTIONAL_INT32_SCHEMA)
.put(SchemaType.INT64, Schema.OPTIONAL_INT64_SCHEMA)
.put(SchemaType.FLOAT, Schema.OPTIONAL_FLOAT32_SCHEMA)
.put(SchemaType.DOUBLE, Schema.OPTIONAL_FLOAT64_SCHEMA)
.put(SchemaType.STRING, Schema.OPTIONAL_STRING_SCHEMA)
.put(SchemaType.BYTES, Schema.OPTIONAL_BYTES_SCHEMA)
.build();
kafkaLogicalSchemas = ImmutableSet.<String>builder()
.add(Timestamp.LOGICAL_NAME)
.add(Date.LOGICAL_NAME)
Expand All @@ -155,12 +170,33 @@ private static org.apache.pulsar.kafka.shade.avro.Schema parseAvroSchema(String
return parser.parse(schemaJson);
}

public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
Schema s = getKafkaConnectSchema(pulsarSchema);
return new OptionalForcingSchema(s);
public static Schema makeOptional(Schema s) {
if (s == null || s.isOptional()) {
return s;
}

String logicalSchemaName = s.name();
if (kafkaLogicalSchemas.contains(logicalSchemaName)) {
return s;
}

try {
return optionalSchemaCache.get(s, () -> new OptionalForcingSchema(s));
} catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
String msg = "Failed to create optional schema for " + s;
log.error(msg);
throw new IllegalStateException(msg, ee);
}
}

public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
boolean useOptionalPrimitives) {
return makeOptional(getKafkaConnectSchema(pulsarSchema, useOptionalPrimitives));

}

public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
boolean useOptionalPrimitives) {
if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
}
Expand Down Expand Up @@ -193,6 +229,11 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
throw new IllegalStateException("Unsupported Kafka Logical Schema " + logicalSchemaName);
}

if (useOptionalPrimitives
&& pulsarSchemaTypeToOptionalKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
return pulsarSchemaTypeToOptionalKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}

if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}
Expand All @@ -201,8 +242,10 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
return SchemaBuilder.map(
makeOptional(getKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives)),
makeOptional(getKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives)))
.optional()
.build();
}
org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public T answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}

private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";
final private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";

private Path file;
private Map<String, Object> props;
Expand Down Expand Up @@ -797,7 +797,9 @@ public void kafkaLogicalTypesTimestampTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("12/30/1999 11:12:13");
Object connectData = KafkaConnectData
Expand All @@ -815,7 +817,9 @@ public void kafkaLogicalTypesTimeTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("01/01/1970 11:12:13");
Object connectData = KafkaConnectData
Expand All @@ -833,7 +837,9 @@ public void kafkaLogicalTypesDateTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("12/31/2022 00:00:00");
Object connectData = KafkaConnectData
Expand All @@ -854,7 +860,9 @@ public void kafkaLogicalTypesDecimalTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

Object connectData = KafkaConnectData
.getKafkaConnectData(Decimal.fromLogical(kafkaSchema, BigDecimal.valueOf(100L, 10)), kafkaSchema);
Expand All @@ -874,11 +882,11 @@ public void connectDataComplexAvroSchemaGenericRecordTest() {
getGenericRecord(value, pulsarAvroSchema));

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema));
.getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema), false);

Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema);

org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
Assert.assertTrue(kafkaSchema.isOptional());
Assert.assertTrue(kafkaSchema.keySchema().isOptional());
Assert.assertTrue(kafkaSchema.valueSchema().isOptional());
}

@Test
Expand Down Expand Up @@ -990,7 +998,8 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem
Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(pulsarAvroSchema);
.getKafkaConnectSchema(pulsarAvroSchema, false);
Assert.assertFalse(kafkaSchema.isOptional());

Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);

Expand All @@ -999,6 +1008,18 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem
Object jsonNode = pojoAsJsonNode(pojo);
connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);

kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(pulsarAvroSchema, true);
Assert.assertFalse(kafkaSchema.isOptional());

connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);

org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);

jsonNode = pojoAsJsonNode(pojo);
connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
}

private JsonNode pojoAsJsonNode(Object pojo) {
Expand Down
Loading

0 comments on commit b86bda3

Please sign in to comment.