From 27f2aa71c61d2508b002c9ff414fc4cb7ee2d1e8 Mon Sep 17 00:00:00 2001 From: dhergonsngular <107672198+dhergonsngular@users.noreply.github.com> Date: Tue, 28 Mar 2023 15:04:04 +0200 Subject: [PATCH] 332 comsngularkloadgenserializerprotobufserializer nullpointerexception (#355) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * #332 Add Files Modified To Solve Exception Added a trim statement to avoid problem when cast the schemas from classpath * #332 Add get dependencies and build proto. * #332 Add Test And Documentation Removed some unused test files * #332 Add Topic Name Strategy * #332 Change image from kafka manager to newest version Apparently the original project was renamed into cmak to avoid break the TOS of kafka having the kafka in the name of unofficial tool * Get the schema name from the schema references of imported classes * #332 Removed generator set up by TopicNameStrategy * #332 refactor * #332 updated pom version * #332 fixes PR * update pom --------- Co-authored-by: Adrian Lagartera Co-authored-by: Jose Enrique García Maciñeiras Co-authored-by: Alfredo González Fernández --- docs/schemas.md | 20 ++++ pom-maven-central.xml | 13 ++- pom.xml | 13 ++- .../docker-compose-noauth.yml | 4 +- .../impl/ProtobufObjectCreatorFactory.java | 4 +- .../processor/util/SchemaProcessorUtils.java | 56 +++++++-- .../sngular/kloadgen/sampler/SamplerUtil.java | 36 ++---- .../impl/ProtobufSRLoadGeneratorTest.java | 89 +++++++++++++++ .../mappings/schema_registry_stub.json | 108 +++++++++++++++++- 9 files changed, 298 insertions(+), 45 deletions(-) create mode 100644 src/test/java/com/sngular/kloadgen/loadgen/impl/ProtobufSRLoadGeneratorTest.java diff --git a/docs/schemas.md b/docs/schemas.md index 63e2c4bb..57ee747b 100644 --- a/docs/schemas.md +++ b/docs/schemas.md @@ -176,3 +176,23 @@ The same applies when there is more than one level. The last child in the last l The other exception applies to JSON Schema: in order to support `null`values, the collections within objects (type : object) cannot be null. Therefore, they will be **required by default** and they will be **initialized as an empty collection** if the object than contains them is not null. > Within JSON Schema, a maximum of **2 nesting levels** is allowed. + + +### Protobuf Schema - Schema Registry + +If you need use the Protobuf Schema with the Schema Registry, you must put the subject with the same name as the protobuf file, as the following example: + +``` +{ + "schema": "...", + "schemaType": "PROTOBUF", + "references": [ + { + "name": "[the_name_you_want].proto", + "subject": "[the_name_you_want]", + "version": ... + }] +} + +``` +> This example is based on a petition from Schema Registry diff --git a/pom-maven-central.xml b/pom-maven-central.xml index d2522eda..f919e8ec 100644 --- a/pom-maven-central.xml +++ b/pom-maven-central.xml @@ -7,7 +7,7 @@ kloadgen - 5.4.1 + 5.4.2 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial @@ -235,6 +235,17 @@ Europe/Madrid + + alfredo9f + Alfredo González + alfredo.gonzalez@sngular.com + Sngular + https://www.sngular.com + + Senior Backend Developer + + Europe/Madrid + diff --git a/pom.xml b/pom.xml index fb4e6046..5f54018f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ kloadgen - 5.4.1 + 5.4.2 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial @@ -235,6 +235,17 @@ Europe/Madrid + + alfredo9f + Alfredo González + alfredo.gonzalez@sngular.com + Sngular + https://www.sngular.com + + Senior Backend Developer + + Europe/Madrid + diff --git a/schema_registry_docker/docker-compose-noauth.yml b/schema_registry_docker/docker-compose-noauth.yml index 264ee6b5..d8976b99 100644 --- a/schema_registry_docker/docker-compose-noauth.yml +++ b/schema_registry_docker/docker-compose-noauth.yml @@ -37,11 +37,11 @@ services: environment: AUTH_ENABLED: false kafka-manager: - image: kafkamanager/kafka-manager + image: iunera/cmak depends_on: - kafka - zookeeper ports: - 9000:9000 environment: - ZK_HOSTS: http://zookeper:2181 + ZK_HOSTS: zookeeper:2181 diff --git a/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java b/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java index b652ac98..cf7b053f 100644 --- a/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java +++ b/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java @@ -41,9 +41,9 @@ public class ProtobufObjectCreatorFactory implements ObjectCreatorFactory { public ProtobufObjectCreatorFactory(final Object schema, final Object metadata) throws DescriptorValidationException, IOException { if (schema instanceof ParsedSchema) { - this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema()); + this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema(), metadata); } else if (schema instanceof ProtoFileElement) { - this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema); + this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema, metadata); } else { throw new KLoadGenException("Unsupported schema type"); } diff --git a/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java b/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java index 8b04a22d..cc183f86 100644 --- a/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java +++ b/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java @@ -25,21 +25,28 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.sngular.kloadgen.model.FieldValueMapping; +import com.sngular.kloadgen.util.JMeterHelper; import com.sngular.kloadgen.util.ProtobufHelper; import com.squareup.wire.schema.internal.parser.EnumElement; import com.squareup.wire.schema.internal.parser.FieldElement; import com.squareup.wire.schema.internal.parser.MessageElement; import com.squareup.wire.schema.internal.parser.ProtoFileElement; import com.squareup.wire.schema.internal.parser.TypeElement; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.jmeter.threads.JMeterContextService; public class SchemaProcessorUtils { private static final String OPTIONAL = "optional"; + private static final String TYPE_MAP_NUMBER = "typemapnumber"; + private SchemaProcessorUtils() { } @@ -142,11 +149,11 @@ public static String[] splitAndNormalizeFullFieldName(final String fullFieldName return Arrays.stream(fields).map(field -> field.replaceAll("\\[.*]", "")).toArray(String[]::new); } - public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema) throws Descriptors.DescriptorValidationException, IOException { + public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema, final Object metadata) throws Descriptors.DescriptorValidationException, IOException { final DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder(); final List imports = schema.getImports(); - for (String importedClass : imports) { + for (final String importedClass : imports) { try (final InputStream resourceStream = SchemaProcessorUtils.class.getClassLoader().getResourceAsStream(importedClass)) { if (null != resourceStream) { final String schemaToString = new String(resourceStream.readAllBytes()); @@ -156,6 +163,13 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schemaBuilder.addDependency(importedSchema.getFileDescriptorSet().getFile(0).getName()); schemaBuilder.addSchema(importedSchema); } + } else { + final var importedProtobufSchema = (ProtobufSchema) JMeterHelper.getParsedSchema(getSubjectName(importedClass, metadata), + JMeterContextService.getContext().getProperties()); + if (!ProtobufHelper.NOT_ACCEPTED_IMPORTS.contains(importedClass)) { + schemaBuilder.addDependency(importedProtobufSchema.toDescriptor().getFullName()); + schemaBuilder.addSchema(convertDynamicSchema(importedProtobufSchema)); + } } } } @@ -173,6 +187,24 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement return schemaBuilder.build().getMessageDescriptor(messageElement.getName()); } + private static String getSubjectName(final String importedClass, final Object metadata) { + final List references = ((SchemaMetadata) metadata).getReferences(); + String subjectName = null; + + for (final SchemaReference schemaReference : references) { + if (schemaReference.getName().equals(importedClass)) { + subjectName = schemaReference.getSubject(); + break; + } + } + + return Objects.requireNonNullElse(subjectName, importedClass); + } + + private static DynamicSchema convertDynamicSchema(final ProtobufSchema importSchema) throws DescriptorValidationException { + return processImported(Arrays.asList(importSchema.rawSchema().toSchema().split("\\n"))); + } + private static Predicate isValid() { return line -> !line.contains("//") && !line.isEmpty(); } @@ -184,7 +216,7 @@ private static DynamicSchema processImported(final List importedLines) t String packageName = ""; final var linesIterator = importedLines.listIterator(); while (linesIterator.hasNext()) { - final var fileLine = linesIterator.next(); + final var fileLine = linesIterator.next().trim(); if (fileLine.startsWith("package")) { packageName = StringUtils.chop(fileLine.substring(7).trim()); @@ -211,9 +243,11 @@ private static MessageDefinition buildMessage(final String messageName, final Li while (messageLines.hasNext() && !exit) { final var field = messageLines.next().trim().split("\\s"); if (ProtobufHelper.isValidType(field[0])) { - messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfChoppable(field[3]))); + messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfGreppable(field[3]))); } else if (ProtobufHelper.LABEL.contains(field[0])) { - messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfChoppable(field[4]))); + messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfGreppable(field[4]))); + } else if ("message".equalsIgnoreCase(field[0])) { + messageDefinition.addMessageDefinition(buildMessage(field[1], messageLines)); } else if ("}".equalsIgnoreCase(field[0])) { exit = true; } @@ -222,7 +256,7 @@ private static MessageDefinition buildMessage(final String messageName, final Li return messageDefinition.build(); } - private static String checkIfChoppable(final String field) { + private static String checkIfGreppable(final String field) { String choppedField = field; if (field.endsWith(";")) { choppedField = StringUtils.chop(field); @@ -241,7 +275,7 @@ private static MessageDefinition buildProtoMessageDefinition( final MessageDefinition.Builder msgDef = MessageDefinition.newBuilder(fieldName); final var element = (MessageElement) messageElement; extracted(globalNestedTypesByLevelAndMessage, msgDef, element.getFields(), nextDeepLevel, fieldName); - for (var optionalField : element.getOneOfs()) { + for (final var optionalField : element.getOneOfs()) { extracted(globalNestedTypesByLevelAndMessage, msgDef, optionalField.getFields(), nextDeepLevel, fieldName); } return msgDef.build(); @@ -254,7 +288,7 @@ private static void extracted( final HashMap nestedTypes = processLevelTypes(globalNestedTypesByLevelAndMessage, msgDef, fieldElementList, deepLevel, messageName); - for (var elementField : fieldElementList) { + for (final var elementField : fieldElementList) { final var elementFieldType = elementField.getType(); final var dotType = checkDotType(elementFieldType); if (nestedTypes.containsKey(elementFieldType)) { @@ -292,10 +326,10 @@ private static void extracted( addDefinition(msgDef, mapDotType, removed, globalNestedTypesByLevelAndMessage, deepLevel); } - msgDef.addField("repeated", "typemapnumber" + elementField.getName(), elementField.getName(), elementField.getTag()); + msgDef.addField("repeated", TYPE_MAP_NUMBER + elementField.getName(), elementField.getName(), elementField.getTag()); msgDef.addMessageDefinition( - MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build()); + MessageDefinition.newBuilder(TYPE_MAP_NUMBER + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build()); } else if (Objects.nonNull(elementField.getLabel())) { msgDef.addField(elementField.getLabel().toString().toLowerCase(), elementField.getType(), elementField.getName(), elementField.getTag()); } else { @@ -346,7 +380,7 @@ private static void addDefinition( if (typeElement instanceof EnumElement) { final var enumElement = (EnumElement) typeElement; final EnumDefinition.Builder builder = EnumDefinition.newBuilder(enumElement.getName()); - for (var constant : enumElement.getConstants()) { + for (final var constant : enumElement.getConstants()) { builder.addValue(constant.getName(), constant.getTag()); } msgDef.addEnumDefinition(builder.build()); diff --git a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java index 30509a12..f0a9a9b3 100644 --- a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java +++ b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java @@ -177,8 +177,7 @@ public static Arguments getCommonConsumerDefaultParameters() { defaultParameters.addArgument(ConsumerConfig.CLIENT_ID_CONFIG, ""); defaultParameters.addArgument(ConsumerConfig.SECURITY_PROVIDERS_CONFIG, ""); defaultParameters.addArgument(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS); - defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, - SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM); + defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM); defaultParameters.addArgument(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM); defaultParameters.addArgument(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE); defaultParameters.addArgument(SslConfigs.SSL_PROVIDER_CONFIG, ""); @@ -303,8 +302,7 @@ private static void verifySecurity(final JavaSamplerContext context, final Prope props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, context.getParameter(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG)); props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, - propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG), - ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, + propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG), ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, "")); props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, context.getParameter(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG)); @@ -353,23 +351,17 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props) props.putAll(originals); try { - generator.setUpGenerator( - originals, - jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME), - (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES)); + generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME), + (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES)); } catch (final KLoadGenException exc) { if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) { - generator.setUpGenerator( - jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), - (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES)); + generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES)); } else { throw exc; } } } else { - generator.setUpGenerator( - jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), - (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES)); + generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES)); } return generator; @@ -428,14 +420,10 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) { props.putAll(originals); - generator.setUpGenerator( - originals, - jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME), - (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES)); + generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME), + (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES)); } else { - generator.setUpGenerator( - jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA), - (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES)); + generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES)); } return generator; @@ -443,10 +431,8 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) { public static List populateHeaders(final List kafkaHeaders, final ProducerRecord producerRecord) { final List headersSB = new ArrayList<>(); - for (HeaderMapping kafkaHeader : kafkaHeaders) { - final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(), - 10, - Collections.emptyList()).toString(); + for (final HeaderMapping kafkaHeader : kafkaHeaders) { + final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(), 10, Collections.emptyList()).toString(); headersSB.add(kafkaHeader.getHeaderName().concat(":").concat(headerValue)); producerRecord.headers().add(kafkaHeader.getHeaderName(), headerValue.getBytes(StandardCharsets.UTF_8)); } diff --git a/src/test/java/com/sngular/kloadgen/loadgen/impl/ProtobufSRLoadGeneratorTest.java b/src/test/java/com/sngular/kloadgen/loadgen/impl/ProtobufSRLoadGeneratorTest.java new file mode 100644 index 00000000..d10474d9 --- /dev/null +++ b/src/test/java/com/sngular/kloadgen/loadgen/impl/ProtobufSRLoadGeneratorTest.java @@ -0,0 +1,89 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * * License, v. 2.0. If a copy of the MPL was not distributed with this + * * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +package com.sngular.kloadgen.loadgen.impl; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import com.sngular.kloadgen.exception.KLoadGenException; +import com.sngular.kloadgen.model.FieldValueMapping; +import com.sngular.kloadgen.sampler.schemaregistry.SchemaRegistryManagerFactory; +import com.sngular.kloadgen.serializer.EnrichedRecord; +import com.sngular.kloadgen.util.ProducerKeysHelper; +import com.sngular.kloadgen.util.SchemaRegistryKeyHelper; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import org.apache.jmeter.threads.JMeterContext; +import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.JMeterVariables; +import org.apache.jmeter.util.JMeterUtils; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@WireMockTest +class ProtobufSRLoadGeneratorTest { + + @BeforeEach + public void setUp(final WireMockRuntimeInfo wmRuntimeInfo) throws IOException { + final File file = new File("src/test/resources"); + final String absolutePath = file.getAbsolutePath(); + final String kloadPath = absolutePath + "/kloadgen.properties"; + final Map properties = new HashMap<>(); + properties.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, wmRuntimeInfo.getHttpBaseUrl()); + properties.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME, "confluent"); + properties.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_AUTH_KEY, SchemaRegistryKeyHelper.SCHEMA_REGISTRY_AUTH_BASIC_TYPE); + properties.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_AUTH_FLAG, ProducerKeysHelper.FLAG_YES); + properties.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "foo"); + properties.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, "foo"); + JMeterUtils.loadJMeterProperties(kloadPath); + final JMeterContext jmcx = JMeterContextService.getContext(); + jmcx.getProperties().putAll(properties); + jmcx.setVariables(new JMeterVariables()); + JMeterUtils.setLocale(Locale.ENGLISH); + } + + @Test + void testProtobufLoadGenerator(final WireMockRuntimeInfo wmRuntimeInfo) throws KLoadGenException { + + final List fieldValueMappingList = Arrays.asList( + FieldValueMapping.builder().fieldName("propertyTest1.importedProperty.nestedProperty").fieldType("string").valueLength(0).fieldValueList("").required(true) + .isAncestorRequired(true).build(), + FieldValueMapping.builder().fieldName("propertyTest1.entityNumberTwo").fieldType("string").valueLength(0).fieldValueList("").required(true) + .isAncestorRequired(true).build(), + FieldValueMapping.builder().fieldName("propertyTest2.propertyNumberOne").fieldType("int").valueLength(0).fieldValueList("").required(true) + .isAncestorRequired(true).build(), + FieldValueMapping.builder().fieldName("propertyTest2.propertyNumberTwo").fieldType("string").valueLength(0).fieldValueList("").required(true) + .isAncestorRequired(true).build() + ); + + final var schemaRegistryManager = SchemaRegistryManagerFactory.getSchemaRegistry("confluent"); + final Map originals = new HashMap<>(); + + originals.put(schemaRegistryManager.getSchemaRegistryUrlKey(), wmRuntimeInfo.getHttpBaseUrl()); + originals.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME, "confluent"); + originals.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_USERNAME_KEY, "foo"); + originals.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_PASSWORD_KEY, "foo"); + + final var protobufLoadGenerator = new ProtobufLoadGenerator(); + protobufLoadGenerator.setUpGenerator(originals, "protobufSubjectWithImport", fieldValueMappingList); + final var message = protobufLoadGenerator.nextMessage(); + Assertions.assertThat(message).isNotNull().isInstanceOf(EnrichedRecord.class); + + Assertions.assertThat(message.getGenericRecord()).isNotNull(); + Assertions.assertThat(message.getGenericRecord().toString()).contains("propertyTest1"); + Assertions.assertThat(message.getGenericRecord().toString()).contains("entityNumberTwo"); + Assertions.assertThat(message.getGenericRecord().toString()).contains("propertyNumberOne"); + Assertions.assertThat(message.getGenericRecord().toString()).contains("propertyNumberTwo"); + } +} diff --git a/src/test/resources/mappings/schema_registry_stub.json b/src/test/resources/mappings/schema_registry_stub.json index b1754e0b..716e557a 100644 --- a/src/test/resources/mappings/schema_registry_stub.json +++ b/src/test/resources/mappings/schema_registry_stub.json @@ -108,6 +108,86 @@ } } }, + { + "priority": 10, + "request": { + "method": "GET", + "urlPattern": "/subjects/protobuf_subject/versions/latest" + }, + "response": { + "status": 200, + "jsonBody": { + "subject": "ProtobufSubject", + "version": 1, + "id": 74, + "schemaType": "PROTOBUF", + "schema": "syntax = \"proto3\";\r\npackage sngular;\r\n\r\noption java_package = \"com.sngular.external.proto\";\r\noption java_outer_classname = \"ExternalMessage\";\r\noption java_multiple_files = true;\r\n\r\nmessage ExternalEntity{\r\n string parentPropertyOne = 1;\r\n int32 parentPropertyTwo = 2;\r\n \r\n message EntityOne {\r\n int32 entityNumberOne = 1;\r\n string entityNumberTwo = 2;\r\n \r\n message NestingEntity {\r\n string nestedProperty = 1;\r\n \r\n }\r\n \r\n }\r\n \r\n message EntityTwo {\r\n int32 propertyNumberOne = 1;\r\n string propertyNumberTwo = 2; \r\n }\r\n\r\n}" + } + } + }, + { + "priority": 10, + "request": { + "method": "GET", + "urlPattern": "/schemas/ids/74\\?fetchMaxId=false&subject=protobuf_subject" + }, + "response": { + "status": 200, + "jsonBody": { + "schemaType": "PROTOBUF", + "schema": "syntax = \"proto3\";\r\npackage sngular;\r\n\r\noption java_package = \"com.sngular.external.proto\";\r\noption java_outer_classname = \"ExternalMessage\";\r\noption java_multiple_files = true;\r\n\r\nmessage ExternalEntity{\r\n string parentPropertyOne = 1;\r\n int32 parentPropertyTwo = 2;\r\n \r\n message EntityOne {\r\n int32 entityNumberOne = 1;\r\n string entityNumberTwo = 2;\r\n \r\n message NestingEntity {\r\n string nestedProperty = 1;\r\n \r\n }\r\n \r\n }\r\n \r\n message EntityTwo {\r\n int32 propertyNumberOne = 1;\r\n string propertyNumberTwo = 2; \r\n }\r\n\r\n}" + }, + "headers": { + "Content-Type": "application/vnd.schemaregistry.v1+json" + } + } + }, + { + "priority": 10, + "request": { + "method": "GET", + "urlPattern": "/subjects/protobufSubjectWithImport/versions/latest" + }, + "response": { + "status": 200, + "jsonBody": { + "subject": "ProtobufSubjectWithImport", + "version": 1, + "id": 75, + "schemaType": "PROTOBUF", + "schema": "syntax = \"proto3\";\r\npackage sngular;\r\n\r\nimport \"protobuf_subject.proto\";\r\n\r\noption java_package = \"com.sngular.internal.proto\";\r\noption java_outer_classname = \"InternalMessage\";\r\noption java_multiple_files = true;\r\n\r\nmessage InternalMessage{\r\n\r\n InternalEntityOne propertyTest1 = 1;\r\n EntityTwo propertyTest2 = 2;\r\n \r\n message InternalEntityOne {\r\n ExternalEntity.EntityOne.NestingEntity importedProperty = 1;\r\n string entityNumberTwo = 2;\r\n }\r\n \r\n message EntityTwo {\r\n int32 propertyNumberOne = 1;\r\n string propertyNumberTwo = 2; \r\n }\r\n\r\n}", + "references" : [{ + "name": "protobuf_subject.proto", + "subject": "protobuf_subject", + "version": 1 + } + ] + } + } + }, + { + "priority": 10, + "request": { + "method": "GET", + "urlPattern": "/schemas/ids/75\\?fetchMaxId=false&subject=protobufSubjectWithImport" + }, + "response": { + "status": 200, + "jsonBody": { + "schemaType": "PROTOBUF", + "schema": "syntax = \"proto3\";\r\npackage sngular;\r\n\r\nimport \"protobuf_subject.proto\";\r\n\r\noption java_package = \"com.sngular.internal.proto\";\r\noption java_outer_classname = \"InternalMessage\";\r\noption java_multiple_files = true;\r\n\r\nmessage InternalMessage{\r\n\r\n InternalEntityOne propertyTest1 = 1;\r\n EntityTwo propertyTest2 = 2;\r\n \r\n message InternalEntityOne {\r\n ExternalEntity.EntityOne.NestingEntity importedProperty = 1;\r\n string entityNumberTwo = 2;\r\n }\r\n \r\n message EntityTwo {\r\n int32 propertyNumberOne = 1;\r\n string propertyNumberTwo = 2; \r\n }\r\n\r\n}", + "references" : [{ + "name": "protobuf_subject.proto", + "subject": "protobuf_subject", + "version": 1 + } + ] + }, + "headers": { + "Content-Type": "application/vnd.schemaregistry.v1+json" + } + } + }, { "priority": 10, "request": { @@ -119,12 +199,36 @@ "jsonBody": [ "avrosubject", "users", - "arrayMap" + "arrayMap", + "protobuf_subject", + "protobufSubjectWithImport" ], "headers": { "Content-Type": "application/vnd.schemaregistry.v1+json" } } + }, + { + "priority": 10, + "request": { + "method": "GET", + "urlPattern" : "/subjects/protobuf_subject/versions/1\\?deleted=true" + }, + "response": { + "status": 200, + "jsonBody": { + "subject": "protobuf_subject", + "version": 1, + "id": 74, + "schemaType": "PROTOBUF", + "schema": "syntax = \"proto3\";\npackage sngular;\n\noption java_package = \"com.sngular.external.proto\";\noption java_outer_classname = \"ExternalMessage\";\noption java_multiple_files = true;\n\nmessage ExternalEntity{\n string parentPropertyOne = 1;\n int32 parentPropertyTwo = 2;\n \n message EntityOne {\n int32 entityNumberOne = 1;\n string entityNumberTwo = 2;\n \n message NestingEntity {\n string nestedProperty = 1;\n \n }\n \n }\n \n message EntityTwo {\n int32 propertyNumberOne = 1;\n string propertyNumberTwo = 2; \n }\n\n}" + }, + "headers": { + "Content-Type": "application/vnd.schemaregistry.v1+json" + } + } + + }, { "priority": 10, @@ -235,5 +339,3 @@ } ] } - -