Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

332 comsngularkloadgenserializerprotobufserializer nullpointerexception #355

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,23 @@ The same applies when there is more than one level. The last child in the last l
The other exception applies to JSON Schema: in order to support `null`values, the collections within objects (type : object) cannot be null. Therefore, they will be **required by default** and they will be **initialized as an empty collection** if the object than contains them is not null.

> Within JSON Schema, a maximum of **2 nesting levels** is allowed.


### Protobuf Schema - Schema Registry

If you need use the Protobuf Schema with the Schema Registry, you must put the subject with the same name as the protobuf file, as the following example:

```
{
"schema": "...",
"schemaType": "PROTOBUF",
"references": [
{
"name": "[the_name_you_want].proto",
"subject": "[the_name_you_want]",
"version": ...
}]
}

```
> This example is based on a petition from Schema Registry
13 changes: 12 additions & 1 deletion pom-maven-central.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.4.1</version>
<version>5.4.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down Expand Up @@ -235,6 +235,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>alfredo9f</id>
<name>Alfredo González</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://www.sngular.com</organizationUrl>
<roles>
<role>Senior Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.4.1</version>
<version>5.4.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down Expand Up @@ -235,6 +235,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>alfredo9f</id>
<name>Alfredo González</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://www.sngular.com</organizationUrl>
<roles>
<role>Senior Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
4 changes: 2 additions & 2 deletions schema_registry_docker/docker-compose-noauth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ services:
environment:
AUTH_ENABLED: false
kafka-manager:
image: kafkamanager/kafka-manager
image: iunera/cmak
depends_on:
- kafka
- zookeeper
ports:
- 9000:9000
environment:
ZK_HOSTS: http://zookeper:2181
ZK_HOSTS: zookeeper:2181
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class ProtobufObjectCreatorFactory implements ObjectCreatorFactory {

public ProtobufObjectCreatorFactory(final Object schema, final Object metadata) throws DescriptorValidationException, IOException {
if (schema instanceof ParsedSchema) {
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema());
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema(), metadata);
} else if (schema instanceof ProtoFileElement) {
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema);
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema, metadata);
} else {
throw new KLoadGenException("Unsupported schema type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,28 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.sngular.kloadgen.model.FieldValueMapping;
import com.sngular.kloadgen.util.JMeterHelper;
import com.sngular.kloadgen.util.ProtobufHelper;
import com.squareup.wire.schema.internal.parser.EnumElement;
import com.squareup.wire.schema.internal.parser.FieldElement;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.TypeElement;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.threads.JMeterContextService;

public class SchemaProcessorUtils {

private static final String OPTIONAL = "optional";

private static final String TYPE_MAP_NUMBER = "typemapnumber";

private SchemaProcessorUtils() {
}

Expand Down Expand Up @@ -142,11 +149,11 @@ public static String[] splitAndNormalizeFullFieldName(final String fullFieldName
return Arrays.stream(fields).map(field -> field.replaceAll("\\[.*]", "")).toArray(String[]::new);
}

public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema) throws Descriptors.DescriptorValidationException, IOException {
public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema, final Object metadata) throws Descriptors.DescriptorValidationException, IOException {

final DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
final List<String> imports = schema.getImports();
for (String importedClass : imports) {
for (final String importedClass : imports) {
try (final InputStream resourceStream = SchemaProcessorUtils.class.getClassLoader().getResourceAsStream(importedClass)) {
if (null != resourceStream) {
final String schemaToString = new String(resourceStream.readAllBytes());
Expand All @@ -156,6 +163,13 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
schemaBuilder.addDependency(importedSchema.getFileDescriptorSet().getFile(0).getName());
schemaBuilder.addSchema(importedSchema);
}
} else {
final var importedProtobufSchema = (ProtobufSchema) JMeterHelper.getParsedSchema(getSubjectName(importedClass, metadata),
JMeterContextService.getContext().getProperties());
if (!ProtobufHelper.NOT_ACCEPTED_IMPORTS.contains(importedClass)) {
schemaBuilder.addDependency(importedProtobufSchema.toDescriptor().getFullName());
schemaBuilder.addSchema(convertDynamicSchema(importedProtobufSchema));
}
}
}
}
Expand All @@ -173,6 +187,24 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
return schemaBuilder.build().getMessageDescriptor(messageElement.getName());
}

private static String getSubjectName(final String importedClass, final Object metadata) {
final List<SchemaReference> references = ((SchemaMetadata) metadata).getReferences();
String subjectName = null;

for (final SchemaReference schemaReference : references) {
if (schemaReference.getName().equals(importedClass)) {
subjectName = schemaReference.getSubject();
break;
}
}

return Objects.requireNonNullElse(subjectName, importedClass);
}

private static DynamicSchema convertDynamicSchema(final ProtobufSchema importSchema) throws DescriptorValidationException {
return processImported(Arrays.asList(importSchema.rawSchema().toSchema().split("\\n")));
}

private static Predicate<String> isValid() {
return line -> !line.contains("//") && !line.isEmpty();
}
Expand All @@ -184,7 +216,7 @@ private static DynamicSchema processImported(final List<String> importedLines) t
String packageName = "";
final var linesIterator = importedLines.listIterator();
while (linesIterator.hasNext()) {
final var fileLine = linesIterator.next();
final var fileLine = linesIterator.next().trim();

if (fileLine.startsWith("package")) {
packageName = StringUtils.chop(fileLine.substring(7).trim());
Expand All @@ -211,9 +243,11 @@ private static MessageDefinition buildMessage(final String messageName, final Li
while (messageLines.hasNext() && !exit) {
final var field = messageLines.next().trim().split("\\s");
if (ProtobufHelper.isValidType(field[0])) {
messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfChoppable(field[3])));
messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfGreppable(field[3])));
} else if (ProtobufHelper.LABEL.contains(field[0])) {
messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfChoppable(field[4])));
messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfGreppable(field[4])));
} else if ("message".equalsIgnoreCase(field[0])) {
messageDefinition.addMessageDefinition(buildMessage(field[1], messageLines));
} else if ("}".equalsIgnoreCase(field[0])) {
exit = true;
}
Expand All @@ -222,7 +256,7 @@ private static MessageDefinition buildMessage(final String messageName, final Li
return messageDefinition.build();
}

private static String checkIfChoppable(final String field) {
private static String checkIfGreppable(final String field) {
String choppedField = field;
if (field.endsWith(";")) {
choppedField = StringUtils.chop(field);
Expand All @@ -241,7 +275,7 @@ private static MessageDefinition buildProtoMessageDefinition(
final MessageDefinition.Builder msgDef = MessageDefinition.newBuilder(fieldName);
final var element = (MessageElement) messageElement;
extracted(globalNestedTypesByLevelAndMessage, msgDef, element.getFields(), nextDeepLevel, fieldName);
for (var optionalField : element.getOneOfs()) {
for (final var optionalField : element.getOneOfs()) {
extracted(globalNestedTypesByLevelAndMessage, msgDef, optionalField.getFields(), nextDeepLevel, fieldName);
}
return msgDef.build();
Expand All @@ -254,7 +288,7 @@ private static void extracted(
final HashMap<String, TypeElement> nestedTypes = processLevelTypes(globalNestedTypesByLevelAndMessage, msgDef, fieldElementList, deepLevel,
messageName);

for (var elementField : fieldElementList) {
for (final var elementField : fieldElementList) {
final var elementFieldType = elementField.getType();
final var dotType = checkDotType(elementFieldType);
if (nestedTypes.containsKey(elementFieldType)) {
Expand Down Expand Up @@ -292,10 +326,10 @@ private static void extracted(

addDefinition(msgDef, mapDotType, removed, globalNestedTypesByLevelAndMessage, deepLevel);
}
msgDef.addField("repeated", "typemapnumber" + elementField.getName(), elementField.getName(), elementField.getTag());
msgDef.addField("repeated", TYPE_MAP_NUMBER + elementField.getName(), elementField.getName(), elementField.getTag());

msgDef.addMessageDefinition(
MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
MessageDefinition.newBuilder(TYPE_MAP_NUMBER + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
} else if (Objects.nonNull(elementField.getLabel())) {
msgDef.addField(elementField.getLabel().toString().toLowerCase(), elementField.getType(), elementField.getName(), elementField.getTag());
} else {
Expand Down Expand Up @@ -346,7 +380,7 @@ private static void addDefinition(
if (typeElement instanceof EnumElement) {
final var enumElement = (EnumElement) typeElement;
final EnumDefinition.Builder builder = EnumDefinition.newBuilder(enumElement.getName());
for (var constant : enumElement.getConstants()) {
for (final var constant : enumElement.getConstants()) {
builder.addValue(constant.getName(), constant.getTag());
}
msgDef.addEnumDefinition(builder.build());
Expand Down
36 changes: 11 additions & 25 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ public static Arguments getCommonConsumerDefaultParameters() {
defaultParameters.addArgument(ConsumerConfig.CLIENT_ID_CONFIG, "");
defaultParameters.addArgument(ConsumerConfig.SECURITY_PROVIDERS_CONFIG, "");
defaultParameters.addArgument(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS);
defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE);
defaultParameters.addArgument(SslConfigs.SSL_PROVIDER_CONFIG, "");
Expand Down Expand Up @@ -303,8 +302,7 @@ private static void verifySecurity(final JavaSamplerContext context, final Prope
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, context.getParameter(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG));

props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG), ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
""));

props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, context.getParameter(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG));
Expand Down Expand Up @@ -353,23 +351,17 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
props.putAll(originals);

try {
generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} catch (final KLoadGenException exc) {
if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
throw exc;
}
}
} else {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}

return generator;
Expand Down Expand Up @@ -428,25 +420,19 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {

props.putAll(originals);

generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
} else {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
}

return generator;
}

public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeaders, final ProducerRecord<Object, Object> producerRecord) {
final List<String> headersSB = new ArrayList<>();
for (HeaderMapping kafkaHeader : kafkaHeaders) {
final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(),
10,
Collections.emptyList()).toString();
for (final HeaderMapping kafkaHeader : kafkaHeaders) {
final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(), 10, Collections.emptyList()).toString();
headersSB.add(kafkaHeader.getHeaderName().concat(":").concat(headerValue));
producerRecord.headers().add(kafkaHeader.getHeaderName(), headerValue.getBytes(StandardCharsets.UTF_8));
}
Expand Down
Loading