You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When writing a sink-connector with confluentinc-kafka-connect-s3-10.5.17 I get the following stack trace when I call toConnectData() with an avro schema that contains a logical decimal field corresponding to a java BigDecimal field
[Worker-055a71def9708e242] [2024-11-29 16:01:15,823] ERROR [events-archive-s3-sink-connector-1-0-0|task-1] WorkerSinkTask{id=events-archive-s3-sink-connector-1-0-0-1} Error converting message value in topic 'staging.pixel.event.online' partition 6 at offset 158975 and timestamp 1732896075632: Invalid class for bytes type, expecting byte[] or ByteBuffer but found class java.math.BigDecimal (org.apache.kafka.connect.runtime.WorkerSinkTask:548)
2024-11-29T16:01:15.000Z
[Worker-055a71def9708e242] org.apache.kafka.connect.errors.DataException: Invalid class for bytes type, expecting byte[] or ByteBuffer but found class java.math.BigDecimal
2024-11-29T16:01:15.000Z
[Worker-055a71def9708e242] at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1394)
2024-11-29T16:01:15.000Z
I added a test for BigDecimal that reproduces the problem and made a fix which you can review. Let me know if I should submit a PR.
code that fixes the problem (AvroData.java line 1385), I added the "} else if (value instanceof BigDecimal) {"
case BYTES:
if (value instanceof byte[]) {
converted = ByteBuffer.wrap((byte[]) value);
} else if (value instanceof ByteBuffer) {
converted = value;
} else if (value instanceof GenericFixed) {
converted = ByteBuffer.wrap(((GenericFixed) value).bytes());
} else if (value instanceof BigDecimal) {
// Avro's ByteBuffer encoding of decimal is a byte array of the unscaled value
// of the BigDecimal, two's-complement big-endian, in an Avro fixed of the
// appropriate size.
BigDecimal decimal = (BigDecimal) value;
int scale = schema.parameters() != null
? Integer.parseInt(schema.parameters().get(Decimal.SCALE_FIELD))
: 0;
int precision = schema.parameters() != null
? Integer.parseInt(schema.parameters().get(CONNECT_AVRO_DECIMAL_PRECISION_PROP))
: CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT;
byte[] unscaledValue = decimal.unscaledValue().toByteArray();
byte[] fixedValue = new byte[precision];
int offset = fixedValue.length - unscaledValue.length;
if (decimal.signum() < 0) {
// If negative, fill the unscaled value with 0xFF
Arrays.fill(fixedValue, 0, offset, (byte) 0xFF);
}
System.arraycopy(unscaledValue, 0, fixedValue, offset, unscaledValue.length);
converted = ByteBuffer.wrap(fixedValue);
} else {
throw new DataException("Invalid class for bytes type, expecting byte[] or ByteBuffer "
+ "but found " + value.getClass());
}
break;
A Test which fails with the master version of AvroData.java, passes with the above code added
@Test
public void testToConnectBigDecimalAvro() {
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.builder().bytesType();
avroSchema.addProp(AvroData.AVRO_LOGICAL_TYPE_PROP, AvroData.AVRO_LOGICAL_DECIMAL);
avroSchema.addProp("precision", 50);
avroSchema.addProp("scale", 2);
final SchemaAndValue expected = new SchemaAndValue(
Decimal.builder(2).parameter(AvroData.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "50").build(),
TEST_DECIMAL
);
final SchemaAndValue actual = avroData.toConnectData(avroSchema, TEST_DECIMAL);
assertThat("schema.parameters() does not match.",
actual.schema().parameters(),
IsEqual.equalTo(expected.schema().parameters())
);
assertEquals("schema does not match.", expected.schema(), actual.schema());
assertEquals("value does not match.", expected.value(), actual.value());
}
When writing a sink-connector with confluentinc-kafka-connect-s3-10.5.17 I get the following stack trace when I call toConnectData() with an avro schema that contains a logical decimal field corresponding to a java BigDecimal field
I added a test for BigDecimal that reproduces the problem and made a fix which you can review. Let me know if I should submit a PR.
code that fixes the problem (AvroData.java line 1385), I added the "} else if (value instanceof BigDecimal) {"
A Test which fails with the master version of AvroData.java, passes with the above code added
I am using
Thank you,
Ruairi
The text was updated successfully, but these errors were encountered: