Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro Logical Decimal to Java BigDecimal conversion error #361

Open
ruairim opened this issue Dec 16, 2024 · 0 comments
Open

Avro Logical Decimal to Java BigDecimal conversion error #361

ruairim opened this issue Dec 16, 2024 · 0 comments

Comments

@ruairim
Copy link

ruairim commented Dec 16, 2024

When writing a sink-connector with confluentinc-kafka-connect-s3-10.5.17 I get the following stack trace when I call toConnectData() with an avro schema that contains a logical decimal field corresponding to a java BigDecimal field

[Worker-055a71def9708e242] [2024-11-29 16:01:15,823] ERROR [events-archive-s3-sink-connector-1-0-0|task-1] WorkerSinkTask{id=events-archive-s3-sink-connector-1-0-0-1} Error converting message value in topic 'staging.pixel.event.online' partition 6 at offset 158975 and timestamp 1732896075632: Invalid class for bytes type, expecting byte[] or ByteBuffer but found class java.math.BigDecimal (org.apache.kafka.connect.runtime.WorkerSinkTask:548)
2024-11-29T16:01:15.000Z
[Worker-055a71def9708e242] org.apache.kafka.connect.errors.DataException: Invalid class for bytes type, expecting byte[] or ByteBuffer but found class java.math.BigDecimal
2024-11-29T16:01:15.000Z
[Worker-055a71def9708e242] at com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData.toConnectData(AvroData.java:1394)
2024-11-29T16:01:15.000Z

I added a test for BigDecimal that reproduces the problem and made a fix which you can review. Let me know if I should submit a PR.

code that fixes the problem (AvroData.java line 1385), I added the "} else if (value instanceof BigDecimal) {"

                case BYTES:
                    if (value instanceof byte[]) {
                        converted = ByteBuffer.wrap((byte[]) value);
                    } else if (value instanceof ByteBuffer) {
                        converted = value;
                    } else if (value instanceof GenericFixed) {
                        converted = ByteBuffer.wrap(((GenericFixed) value).bytes());
                    } else if (value instanceof BigDecimal) {
                        // Avro's ByteBuffer encoding of decimal is a byte array of the unscaled value
                        // of the BigDecimal, two's-complement big-endian, in an Avro fixed of the
                        // appropriate size.
                        BigDecimal decimal = (BigDecimal) value;
                        int scale = schema.parameters() != null
                                ? Integer.parseInt(schema.parameters().get(Decimal.SCALE_FIELD))
                                : 0;
                        int precision = schema.parameters() != null
                                ? Integer.parseInt(schema.parameters().get(CONNECT_AVRO_DECIMAL_PRECISION_PROP))
                                : CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT;
                        byte[] unscaledValue = decimal.unscaledValue().toByteArray();
                        byte[] fixedValue = new byte[precision];
                        int offset = fixedValue.length - unscaledValue.length;
                        if (decimal.signum() < 0) {
                            // If negative, fill the unscaled value with 0xFF
                            Arrays.fill(fixedValue, 0, offset, (byte) 0xFF);
                        }
                        System.arraycopy(unscaledValue, 0, fixedValue, offset, unscaledValue.length);
                        converted = ByteBuffer.wrap(fixedValue);

                    } else {
                        throw new DataException("Invalid class for bytes type, expecting byte[] or ByteBuffer "
                                + "but found " + value.getClass());
                    }
                    break;

A Test which fails with the master version of AvroData.java, passes with the above code added

    @Test
    public void testToConnectBigDecimalAvro() {
        org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.builder().bytesType();
        avroSchema.addProp(AvroData.AVRO_LOGICAL_TYPE_PROP, AvroData.AVRO_LOGICAL_DECIMAL);
        avroSchema.addProp("precision", 50);
        avroSchema.addProp("scale", 2);

        final SchemaAndValue expected = new SchemaAndValue(
            Decimal.builder(2).parameter(AvroData.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "50").build(),
            TEST_DECIMAL
        );

        final SchemaAndValue actual = avroData.toConnectData(avroSchema, TEST_DECIMAL);
        assertThat("schema.parameters() does not match.",
                   actual.schema().parameters(),
                   IsEqual.equalTo(expected.schema().parameters())
        );
        assertEquals("schema does not match.", expected.schema(), actual.schema());
        assertEquals("value does not match.", expected.value(), actual.value());
    }


I am using

    <dependency>
      <groupId>software.amazon.glue</groupId>
      <artifactId>schema-registry-kafkaconnect-converter</artifactId>
      <version>1.1.22</version>
    </dependency>
    <dependency>
      <groupId>software.amazon.glue</groupId>
      <artifactId>schema-registry-parent</artifactId>
      <version>1.1.22</version>
      <type>pom</type>
    </dependency>
    <dependency>
      <groupId>software.amazon.glue</groupId>
      <artifactId>schema-registry-serde</artifactId>
      <version>1.1.22</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.12.0</version>
    </dependency>

Thank you,
Ruairi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant