diff --git a/docs/schemas.md b/docs/schemas.md
index 63e2c4bb..57ee747b 100644
--- a/docs/schemas.md
+++ b/docs/schemas.md
@@ -176,3 +176,23 @@ The same applies when there is more than one level. The last child in the last l
The other exception applies to JSON Schema: in order to support `null`values, the collections within objects (type : object) cannot be null. Therefore, they will be **required by default** and they will be **initialized as an empty collection** if the object than contains them is not null.
> Within JSON Schema, a maximum of **2 nesting levels** is allowed.
+
+
+### Protobuf Schema - Schema Registry
+
+If you need use the Protobuf Schema with the Schema Registry, you must put the subject with the same name as the protobuf file, as the following example:
+
+```
+{
+ "schema": "...",
+ "schemaType": "PROTOBUF",
+ "references": [
+ {
+ "name": "[the_name_you_want].proto",
+ "subject": "[the_name_you_want]",
+ "version": ...
+ }]
+}
+
+```
+> This example is based on a petition from Schema Registry
diff --git a/pom-maven-central.xml b/pom-maven-central.xml
index d2522eda..f919e8ec 100644
--- a/pom-maven-central.xml
+++ b/pom-maven-central.xml
@@ -7,7 +7,7 @@
kloadgen
- 5.4.1
+ 5.4.2
KLoadGen
Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
@@ -235,6 +235,17 @@
Europe/Madrid
+
+ alfredo9f
+ Alfredo González
+ alfredo.gonzalez@sngular.com
+ Sngular
+ https://www.sngular.com
+
+ Senior Backend Developer
+
+ Europe/Madrid
+
diff --git a/pom.xml b/pom.xml
index fb4e6046..5f54018f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
kloadgen
- 5.4.1
+ 5.4.2
KLoadGen
Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
@@ -235,6 +235,17 @@
Europe/Madrid
+
+ alfredo9f
+ Alfredo González
+ alfredo.gonzalez@sngular.com
+ Sngular
+ https://www.sngular.com
+
+ Senior Backend Developer
+
+ Europe/Madrid
+
diff --git a/schema_registry_docker/docker-compose-noauth.yml b/schema_registry_docker/docker-compose-noauth.yml
index 264ee6b5..d8976b99 100644
--- a/schema_registry_docker/docker-compose-noauth.yml
+++ b/schema_registry_docker/docker-compose-noauth.yml
@@ -37,11 +37,11 @@ services:
environment:
AUTH_ENABLED: false
kafka-manager:
- image: kafkamanager/kafka-manager
+ image: iunera/cmak
depends_on:
- kafka
- zookeeper
ports:
- 9000:9000
environment:
- ZK_HOSTS: http://zookeper:2181
+ ZK_HOSTS: zookeeper:2181
diff --git a/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java b/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java
index b652ac98..cf7b053f 100644
--- a/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java
+++ b/src/main/java/com/sngular/kloadgen/processor/objectcreatorfactory/impl/ProtobufObjectCreatorFactory.java
@@ -41,9 +41,9 @@ public class ProtobufObjectCreatorFactory implements ObjectCreatorFactory {
public ProtobufObjectCreatorFactory(final Object schema, final Object metadata) throws DescriptorValidationException, IOException {
if (schema instanceof ParsedSchema) {
- this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema());
+ this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema(), metadata);
} else if (schema instanceof ProtoFileElement) {
- this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema);
+ this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema, metadata);
} else {
throw new KLoadGenException("Unsupported schema type");
}
diff --git a/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java b/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java
index 8b04a22d..cc183f86 100644
--- a/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java
+++ b/src/main/java/com/sngular/kloadgen/processor/util/SchemaProcessorUtils.java
@@ -25,21 +25,28 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.sngular.kloadgen.model.FieldValueMapping;
+import com.sngular.kloadgen.util.JMeterHelper;
import com.sngular.kloadgen.util.ProtobufHelper;
import com.squareup.wire.schema.internal.parser.EnumElement;
import com.squareup.wire.schema.internal.parser.FieldElement;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.TypeElement;
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.jmeter.threads.JMeterContextService;
public class SchemaProcessorUtils {
private static final String OPTIONAL = "optional";
+ private static final String TYPE_MAP_NUMBER = "typemapnumber";
+
private SchemaProcessorUtils() {
}
@@ -142,11 +149,11 @@ public static String[] splitAndNormalizeFullFieldName(final String fullFieldName
return Arrays.stream(fields).map(field -> field.replaceAll("\\[.*]", "")).toArray(String[]::new);
}
- public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema) throws Descriptors.DescriptorValidationException, IOException {
+ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema, final Object metadata) throws Descriptors.DescriptorValidationException, IOException {
final DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
final List imports = schema.getImports();
- for (String importedClass : imports) {
+ for (final String importedClass : imports) {
try (final InputStream resourceStream = SchemaProcessorUtils.class.getClassLoader().getResourceAsStream(importedClass)) {
if (null != resourceStream) {
final String schemaToString = new String(resourceStream.readAllBytes());
@@ -156,6 +163,13 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
schemaBuilder.addDependency(importedSchema.getFileDescriptorSet().getFile(0).getName());
schemaBuilder.addSchema(importedSchema);
}
+ } else {
+ final var importedProtobufSchema = (ProtobufSchema) JMeterHelper.getParsedSchema(getSubjectName(importedClass, metadata),
+ JMeterContextService.getContext().getProperties());
+ if (!ProtobufHelper.NOT_ACCEPTED_IMPORTS.contains(importedClass)) {
+ schemaBuilder.addDependency(importedProtobufSchema.toDescriptor().getFullName());
+ schemaBuilder.addSchema(convertDynamicSchema(importedProtobufSchema));
+ }
}
}
}
@@ -173,6 +187,24 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
return schemaBuilder.build().getMessageDescriptor(messageElement.getName());
}
+ private static String getSubjectName(final String importedClass, final Object metadata) {
+ final List references = ((SchemaMetadata) metadata).getReferences();
+ String subjectName = null;
+
+ for (final SchemaReference schemaReference : references) {
+ if (schemaReference.getName().equals(importedClass)) {
+ subjectName = schemaReference.getSubject();
+ break;
+ }
+ }
+
+ return Objects.requireNonNullElse(subjectName, importedClass);
+ }
+
+ private static DynamicSchema convertDynamicSchema(final ProtobufSchema importSchema) throws DescriptorValidationException {
+ return processImported(Arrays.asList(importSchema.rawSchema().toSchema().split("\\n")));
+ }
+
private static Predicate isValid() {
return line -> !line.contains("//") && !line.isEmpty();
}
@@ -184,7 +216,7 @@ private static DynamicSchema processImported(final List importedLines) t
String packageName = "";
final var linesIterator = importedLines.listIterator();
while (linesIterator.hasNext()) {
- final var fileLine = linesIterator.next();
+ final var fileLine = linesIterator.next().trim();
if (fileLine.startsWith("package")) {
packageName = StringUtils.chop(fileLine.substring(7).trim());
@@ -211,9 +243,11 @@ private static MessageDefinition buildMessage(final String messageName, final Li
while (messageLines.hasNext() && !exit) {
final var field = messageLines.next().trim().split("\\s");
if (ProtobufHelper.isValidType(field[0])) {
- messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfChoppable(field[3])));
+ messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfGreppable(field[3])));
} else if (ProtobufHelper.LABEL.contains(field[0])) {
- messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfChoppable(field[4])));
+ messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfGreppable(field[4])));
+ } else if ("message".equalsIgnoreCase(field[0])) {
+ messageDefinition.addMessageDefinition(buildMessage(field[1], messageLines));
} else if ("}".equalsIgnoreCase(field[0])) {
exit = true;
}
@@ -222,7 +256,7 @@ private static MessageDefinition buildMessage(final String messageName, final Li
return messageDefinition.build();
}
- private static String checkIfChoppable(final String field) {
+ private static String checkIfGreppable(final String field) {
String choppedField = field;
if (field.endsWith(";")) {
choppedField = StringUtils.chop(field);
@@ -241,7 +275,7 @@ private static MessageDefinition buildProtoMessageDefinition(
final MessageDefinition.Builder msgDef = MessageDefinition.newBuilder(fieldName);
final var element = (MessageElement) messageElement;
extracted(globalNestedTypesByLevelAndMessage, msgDef, element.getFields(), nextDeepLevel, fieldName);
- for (var optionalField : element.getOneOfs()) {
+ for (final var optionalField : element.getOneOfs()) {
extracted(globalNestedTypesByLevelAndMessage, msgDef, optionalField.getFields(), nextDeepLevel, fieldName);
}
return msgDef.build();
@@ -254,7 +288,7 @@ private static void extracted(
final HashMap nestedTypes = processLevelTypes(globalNestedTypesByLevelAndMessage, msgDef, fieldElementList, deepLevel,
messageName);
- for (var elementField : fieldElementList) {
+ for (final var elementField : fieldElementList) {
final var elementFieldType = elementField.getType();
final var dotType = checkDotType(elementFieldType);
if (nestedTypes.containsKey(elementFieldType)) {
@@ -292,10 +326,10 @@ private static void extracted(
addDefinition(msgDef, mapDotType, removed, globalNestedTypesByLevelAndMessage, deepLevel);
}
- msgDef.addField("repeated", "typemapnumber" + elementField.getName(), elementField.getName(), elementField.getTag());
+ msgDef.addField("repeated", TYPE_MAP_NUMBER + elementField.getName(), elementField.getName(), elementField.getTag());
msgDef.addMessageDefinition(
- MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
+ MessageDefinition.newBuilder(TYPE_MAP_NUMBER + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
} else if (Objects.nonNull(elementField.getLabel())) {
msgDef.addField(elementField.getLabel().toString().toLowerCase(), elementField.getType(), elementField.getName(), elementField.getTag());
} else {
@@ -346,7 +380,7 @@ private static void addDefinition(
if (typeElement instanceof EnumElement) {
final var enumElement = (EnumElement) typeElement;
final EnumDefinition.Builder builder = EnumDefinition.newBuilder(enumElement.getName());
- for (var constant : enumElement.getConstants()) {
+ for (final var constant : enumElement.getConstants()) {
builder.addValue(constant.getName(), constant.getTag());
}
msgDef.addEnumDefinition(builder.build());
diff --git a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
index 30509a12..f0a9a9b3 100644
--- a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
+++ b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
@@ -177,8 +177,7 @@ public static Arguments getCommonConsumerDefaultParameters() {
defaultParameters.addArgument(ConsumerConfig.CLIENT_ID_CONFIG, "");
defaultParameters.addArgument(ConsumerConfig.SECURITY_PROVIDERS_CONFIG, "");
defaultParameters.addArgument(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS);
- defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
- SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
+ defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE);
defaultParameters.addArgument(SslConfigs.SSL_PROVIDER_CONFIG, "");
@@ -303,8 +302,7 @@ private static void verifySecurity(final JavaSamplerContext context, final Prope
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, context.getParameter(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG));
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
- propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
- ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
+ propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG), ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
""));
props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, context.getParameter(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG));
@@ -353,23 +351,17 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
props.putAll(originals);
try {
- generator.setUpGenerator(
- originals,
- jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
- (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
+ generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
+ (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} catch (final KLoadGenException exc) {
if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
- generator.setUpGenerator(
- jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
- (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
+ generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
throw exc;
}
}
} else {
- generator.setUpGenerator(
- jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
- (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
+ generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}
return generator;
@@ -428,14 +420,10 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
props.putAll(originals);
- generator.setUpGenerator(
- originals,
- jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
- (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
+ generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
+ (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
} else {
- generator.setUpGenerator(
- jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA),
- (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
+ generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
}
return generator;
@@ -443,10 +431,8 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
public static List populateHeaders(final List kafkaHeaders, final ProducerRecord