Skip to content

Commit

Permalink
Upgrade avro version (#5577)
Browse files Browse the repository at this point in the history
* Upgrade avro version

* Fix tests
  • Loading branch information
carlesarnal authored Dec 2, 2024
1 parent 61f69db commit 38f0949
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.apicurio.registry.support.HealthUtils;
import io.apicurio.registry.support.TestCmmn;
import io.apicurio.registry.types.RuleType;
import io.apicurio.registry.utils.tests.BaseHttpUtils;
import io.apicurio.registry.utils.tests.TestUtils;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
Expand Down Expand Up @@ -52,16 +53,20 @@
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.response.Response;
import jakarta.enterprise.inject.Typed;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -533,8 +538,8 @@ public void testRegisterInvalidSchemaBadType() throws Exception {

try {
new org.apache.avro.Schema.Parser().parse(badSchemaString);
fail("Parsing invalid schema string should fail with SchemaParseException");
} catch (SchemaParseException ignored) {
fail("Parsing invalid schema string should fail with AvroTypeException");
} catch (AvroTypeException ignored) {
}

try {
Expand Down Expand Up @@ -1530,6 +1535,41 @@ public void testSubjectCompatibilityAfterDeletingSubject() throws Exception {
assertEquals(ErrorCode.SUBJECT_COMPATIBILITY_NOT_CONFIGURED.value(), rce.getErrorCode(), "Compatibility Level doesn't exist");
}
assertEquals(FULL.name, confluentClient.getConfig(null).getCompatibilityLevel(), "Top Compatibility Level Exists");
}

@Test
void compatibilityGlobalRules() throws Exception {
var first = "{\"type\":\"record\",\"name\":\"myrecord1\",\"fields\":[{\"name\":\"foo\",\"type\":\"string\"}]}";
// Adding a default value to the new field to keep full compatibility
var second = "{\"type\":\"record\",\"name\":\"myrecord1\",\"fields\":[{\"name\":\"foo\",\"type\":\"string\"}, {\"name\":\"bar\",\"type\":\"string\", \"default\": \"42\"}]}";
var invalid = "{\"type\": \"bloop\"}";

confluentClient.updateCompatibility("FULL", null);

String schemeSubject = TestUtils.generateArtifactId();
confluentClient.registerSchema(first, schemeSubject);

confluentClient.registerSchema(second, schemeSubject);

testCompatibility(wrap(invalid), schemeSubject, 422);

confluentClient.deleteSubject(Collections.emptyMap(), schemeSubject);
confluentClient.deleteSubject(Collections.emptyMap(), schemeSubject, true);
}

private static String wrap(String schema) {
return "{\"schema\": \"" + schema.replace("\"", "\\\"") + "\"}";
}

public Response testCompatibility(String body, String schemaName, int returnCode) {
try {
URL url = new URL("http://localhost:" + testPort + "/apis/ccompat/v7/compatibility/subjects/" + schemaName + "/versions/latest");
return BaseHttpUtils.rulesPostRequest(SR, body, url, returnCode);
} catch (MalformedURLException e) {
throw new UncheckedIOException(e);
}
}

public static final String SR = "application/vnd.schemaregistry.v1+json";

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,19 @@ record = deserializer.deserialize(topic, bytes);

@Test
public void testAvro() throws Exception {
testAvroAutoRegisterIdInBody(RecordIdStrategy.class, () -> restClient.getArtifactMetaData("test-group-avro", "myrecord3"));
testAvroAutoRegisterIdInBody(RecordIdStrategy.class, () -> restClient.getArtifactMetaData("test_group_avro", "myrecord3"));

}

@Test
public void testAvroQualifiedRecordIdStrategy() throws Exception {
testAvroAutoRegisterIdInBody(QualifiedRecordIdStrategy.class, () -> restClient.getArtifactMetaData(null, "test-group-avro.myrecord3"));
testAvroAutoRegisterIdInBody(QualifiedRecordIdStrategy.class, () -> restClient.getArtifactMetaData(null, "test_group_avro.myrecord3"));
}

private void testAvroAutoRegisterIdInBody(Class<? extends ArtifactReferenceResolverStrategy<?, ?>> strategy, Supplier<ArtifactMetaData> artifactFinder)
throws Exception {
Schema schema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"myrecord3\",\"namespace\":\"test-group-avro\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
"{\"type\":\"record\",\"name\":\"myrecord3\",\"namespace\":\"test_group_avro\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}");
try (AvroKafkaSerializer<GenericData.Record> serializer = new AvroKafkaSerializer<GenericData.Record>(restClient);
Deserializer<GenericData.Record> deserializer = new AvroKafkaDeserializer<>(restClient)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void cleanArtifacts() throws Exception {

@Test
public void testConfiguration() throws Exception {
String groupId = TestUtils.generateGroupId();
String groupId = TestUtils.generateSubject();
String topic = TestUtils.generateArtifactId();
String recordName = "myrecord4";
AvroGenericRecordSchemaFactory schemaFactory = new AvroGenericRecordSchemaFactory(groupId, recordName, List.of("bar"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ void evolveSchemaTest(boolean reuseClients) throws Exception {
String topicName = TestUtils.generateTopic();
kafkaCluster.createTopic(topicName, 1, 1);

String recordNamespace = TestUtils.generateGroupId();
String recordNamespace = TestUtils.generateSubject();
String recordName = TestUtils.generateSubject();
String schemaKey = "key1";
AvroGenericRecordSchemaFactory avroSchema = new AvroGenericRecordSchemaFactory(recordNamespace, recordName, List.of(schemaKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ void createAndUpdateArtifact() throws Exception {
@Test
void createAndDeleteMultipleArtifacts() throws Exception {
LOGGER.info("Creating some artifacts...");
String groupId = TestUtils.generateGroupId();
String groupId = TestUtils.generateSubject();

List<ArtifactMetaData> artifacts = IntStream.range(0, 10)
.mapToObj(i -> {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
<jandex.version>3.1.8</jandex.version>

<!-- Schema types -->
<avro.version>1.11.4</avro.version>
<avro.version>1.12.0</avro.version>
<json-schema-validator.version>1.5.2</json-schema-validator.version>
<vertx-json-schema.version>4.5.10</vertx-json-schema.version>
<wire-schema.version>4.9.9</wire-schema.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ protected Set<Incompatibility> isBackwardsCompatibleWith(String existing, String
}
} catch (AvroRuntimeException ex) {
throw new UnprocessableSchemaException("Could not execute compatibility rule on invalid Avro schema", ex);
} catch (NullPointerException ex) {
//I hate this, but Avro is throwing this exception for invalid schemas...
throw new UnprocessableSchemaException("Could not execute compatibility rule on invalid Avro schema", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void validateReferences(ContentHandle artifactContent, List<ArtifactRefer
} catch (Exception e) {
// This is terrible, but I don't know how else to detect if the reason for the parse failure
// is because of a missing defined type or some OTHER parse exception.
if (e.getMessage().contains("is not a defined name")) {
if (e.getMessage().contains("Undefined schema")) {
RuleViolation violation = new RuleViolation("Missing reference detected.", e.getMessage());
throw new RuleViolationException("Missing reference detected in Avro artifact.", RuleType.INTEGRITY,
IntegrityLevel.ALL_REFS_MAPPED.name(), Collections.singleton(violation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import io.apicurio.registry.resolver.ParsedSchema;
import io.apicurio.registry.utils.IoUtil;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.reflect.ReflectData;
import org.apache.kafka.common.errors.SerializationException;
Expand Down Expand Up @@ -67,7 +67,7 @@ public static Schema parse(String schema, List<ParsedSchema<Schema>> references)
try {
final Schema.Parser parser = new Schema.Parser();
return parser.parse(schema);
} catch (SchemaParseException e) {
} catch (AvroTypeException e) {
//If we fail to parse the content from the main schema, then parse first the references and then the main schema
final Schema.Parser parser = new Schema.Parser();
handleReferences(parser, references);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.apicurio.registry.content.ContentHandle;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.v2.beans.ArtifactReference;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -110,7 +111,7 @@ private ParsedDirectoryWrapper<Schema> parseDirectory(File directory, File rootS
processed.put(schema.getFullName(), schema);
schemaContents.put(schema.getFullName(), schemaContent);
fileParsed = true;
} catch (SchemaParseException ex) {
} catch (SchemaParseException | AvroTypeException ex) {
log.warn("Error processing Avro schema with name {}. This usually means that the references are not ready yet to parse it", typeToAdd.getName());
}
}
Expand Down
4 changes: 4 additions & 0 deletions utils/tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-common</artifactId>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
</dependency>

</dependencies>
<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.apicurio.registry.utils.tests;

import io.restassured.response.Response;

import java.net.URL;

import static io.restassured.RestAssured.given;

public class BaseHttpUtils {

public static Response getRequest(String contentType, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.get(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response getRequest(String contentType, URL endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.get(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response postRequest(String contentType, String body, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.body(body)
.post(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response postRequest(String contentType, String body, URL endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.body(body)
.post(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response putRequest(String contentType, String body, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.body(body)
.put(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response putRequest(String contentType, String body, URL endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.body(body)
.put(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response deleteRequest(String contentType, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.delete(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response rulesPostRequest(String contentType, String rule, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.body(rule)
.post(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response rulesPostRequest(String contentType, String rule, URL endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.body(rule)
.post(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response rulesGetRequest(String contentType, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.get(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response rulesPutRequest(String contentType, String rule, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.body(rule)
.put(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response rulesDeleteRequest(String contentType, String endpoint, int returnCode) {
return given()
.when()
.contentType(contentType)
.delete(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

public static Response artifactPostRequest(String artifactId, String contentType, String body, String endpoint, int returnCode) {
return given()
.when()
.header("X-Registry-Artifactid", artifactId)
.contentType(contentType)
.body(body)
.post(endpoint)
.then()
.statusCode(returnCode)
.extract()
.response();
}

}

0 comments on commit 38f0949

Please sign in to comment.