Skip to content

Commit

Permalink
332 comsngularkloadgenserializerprotobufserializer nullpointerexcepti…
Browse files Browse the repository at this point in the history
…on (#355)

* #332 Add Files Modified To Solve Exception

Added a trim statement to avoid problem when cast the schemas from classpath

* #332 Add get dependencies and build proto.

* #332 Add Test And Documentation

Removed some unused test files

* #332 Add Topic Name Strategy

* #332 Change image from kafka manager to newest version

Apparently the original project was renamed into cmak to avoid break the TOS of kafka having the kafka in the name of unofficial tool

* Get the schema name from the schema references of imported classes

* #332 Removed generator set up by TopicNameStrategy

* #332 refactor

* #332 updated pom version

* #332 fixes PR

* update pom

---------

Co-authored-by: Adrian Lagartera <[email protected]>
Co-authored-by: Jose Enrique García Maciñeiras <[email protected]>
Co-authored-by: Alfredo González Fernández <[email protected]>
  • Loading branch information
4 people authored Mar 28, 2023
1 parent cc2c29a commit 27f2aa7
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 45 deletions.
20 changes: 20 additions & 0 deletions docs/schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,23 @@ The same applies when there is more than one level. The last child in the last l
The other exception applies to JSON Schema: in order to support `null`values, the collections within objects (type : object) cannot be null. Therefore, they will be **required by default** and they will be **initialized as an empty collection** if the object than contains them is not null.

> Within JSON Schema, a maximum of **2 nesting levels** is allowed.

### Protobuf Schema - Schema Registry

If you need use the Protobuf Schema with the Schema Registry, you must put the subject with the same name as the protobuf file, as the following example:

```
{
"schema": "...",
"schemaType": "PROTOBUF",
"references": [
{
"name": "[the_name_you_want].proto",
"subject": "[the_name_you_want]",
"version": ...
}]
}
```
> This example is based on a petition from Schema Registry
13 changes: 12 additions & 1 deletion pom-maven-central.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.4.1</version>
<version>5.4.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down Expand Up @@ -235,6 +235,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>alfredo9f</id>
<name>Alfredo González</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://www.sngular.com</organizationUrl>
<roles>
<role>Senior Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.4.1</version>
<version>5.4.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down Expand Up @@ -235,6 +235,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>alfredo9f</id>
<name>Alfredo González</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://www.sngular.com</organizationUrl>
<roles>
<role>Senior Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
4 changes: 2 additions & 2 deletions schema_registry_docker/docker-compose-noauth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ services:
environment:
AUTH_ENABLED: false
kafka-manager:
image: kafkamanager/kafka-manager
image: iunera/cmak
depends_on:
- kafka
- zookeeper
ports:
- 9000:9000
environment:
ZK_HOSTS: http://zookeper:2181
ZK_HOSTS: zookeeper:2181
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class ProtobufObjectCreatorFactory implements ObjectCreatorFactory {

public ProtobufObjectCreatorFactory(final Object schema, final Object metadata) throws DescriptorValidationException, IOException {
if (schema instanceof ParsedSchema) {
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema());
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema(), metadata);
} else if (schema instanceof ProtoFileElement) {
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema);
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema, metadata);
} else {
throw new KLoadGenException("Unsupported schema type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,28 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.sngular.kloadgen.model.FieldValueMapping;
import com.sngular.kloadgen.util.JMeterHelper;
import com.sngular.kloadgen.util.ProtobufHelper;
import com.squareup.wire.schema.internal.parser.EnumElement;
import com.squareup.wire.schema.internal.parser.FieldElement;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.TypeElement;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.threads.JMeterContextService;

public class SchemaProcessorUtils {

private static final String OPTIONAL = "optional";

private static final String TYPE_MAP_NUMBER = "typemapnumber";

private SchemaProcessorUtils() {
}

Expand Down Expand Up @@ -142,11 +149,11 @@ public static String[] splitAndNormalizeFullFieldName(final String fullFieldName
return Arrays.stream(fields).map(field -> field.replaceAll("\\[.*]", "")).toArray(String[]::new);
}

public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema) throws Descriptors.DescriptorValidationException, IOException {
public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema, final Object metadata) throws Descriptors.DescriptorValidationException, IOException {

final DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
final List<String> imports = schema.getImports();
for (String importedClass : imports) {
for (final String importedClass : imports) {
try (final InputStream resourceStream = SchemaProcessorUtils.class.getClassLoader().getResourceAsStream(importedClass)) {
if (null != resourceStream) {
final String schemaToString = new String(resourceStream.readAllBytes());
Expand All @@ -156,6 +163,13 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
schemaBuilder.addDependency(importedSchema.getFileDescriptorSet().getFile(0).getName());
schemaBuilder.addSchema(importedSchema);
}
} else {
final var importedProtobufSchema = (ProtobufSchema) JMeterHelper.getParsedSchema(getSubjectName(importedClass, metadata),
JMeterContextService.getContext().getProperties());
if (!ProtobufHelper.NOT_ACCEPTED_IMPORTS.contains(importedClass)) {
schemaBuilder.addDependency(importedProtobufSchema.toDescriptor().getFullName());
schemaBuilder.addSchema(convertDynamicSchema(importedProtobufSchema));
}
}
}
}
Expand All @@ -173,6 +187,24 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
return schemaBuilder.build().getMessageDescriptor(messageElement.getName());
}

private static String getSubjectName(final String importedClass, final Object metadata) {
final List<SchemaReference> references = ((SchemaMetadata) metadata).getReferences();
String subjectName = null;

for (final SchemaReference schemaReference : references) {
if (schemaReference.getName().equals(importedClass)) {
subjectName = schemaReference.getSubject();
break;
}
}

return Objects.requireNonNullElse(subjectName, importedClass);
}

private static DynamicSchema convertDynamicSchema(final ProtobufSchema importSchema) throws DescriptorValidationException {
return processImported(Arrays.asList(importSchema.rawSchema().toSchema().split("\\n")));
}

private static Predicate<String> isValid() {
return line -> !line.contains("//") && !line.isEmpty();
}
Expand All @@ -184,7 +216,7 @@ private static DynamicSchema processImported(final List<String> importedLines) t
String packageName = "";
final var linesIterator = importedLines.listIterator();
while (linesIterator.hasNext()) {
final var fileLine = linesIterator.next();
final var fileLine = linesIterator.next().trim();

if (fileLine.startsWith("package")) {
packageName = StringUtils.chop(fileLine.substring(7).trim());
Expand All @@ -211,9 +243,11 @@ private static MessageDefinition buildMessage(final String messageName, final Li
while (messageLines.hasNext() && !exit) {
final var field = messageLines.next().trim().split("\\s");
if (ProtobufHelper.isValidType(field[0])) {
messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfChoppable(field[3])));
messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfGreppable(field[3])));
} else if (ProtobufHelper.LABEL.contains(field[0])) {
messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfChoppable(field[4])));
messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfGreppable(field[4])));
} else if ("message".equalsIgnoreCase(field[0])) {
messageDefinition.addMessageDefinition(buildMessage(field[1], messageLines));
} else if ("}".equalsIgnoreCase(field[0])) {
exit = true;
}
Expand All @@ -222,7 +256,7 @@ private static MessageDefinition buildMessage(final String messageName, final Li
return messageDefinition.build();
}

private static String checkIfChoppable(final String field) {
private static String checkIfGreppable(final String field) {
String choppedField = field;
if (field.endsWith(";")) {
choppedField = StringUtils.chop(field);
Expand All @@ -241,7 +275,7 @@ private static MessageDefinition buildProtoMessageDefinition(
final MessageDefinition.Builder msgDef = MessageDefinition.newBuilder(fieldName);
final var element = (MessageElement) messageElement;
extracted(globalNestedTypesByLevelAndMessage, msgDef, element.getFields(), nextDeepLevel, fieldName);
for (var optionalField : element.getOneOfs()) {
for (final var optionalField : element.getOneOfs()) {
extracted(globalNestedTypesByLevelAndMessage, msgDef, optionalField.getFields(), nextDeepLevel, fieldName);
}
return msgDef.build();
Expand All @@ -254,7 +288,7 @@ private static void extracted(
final HashMap<String, TypeElement> nestedTypes = processLevelTypes(globalNestedTypesByLevelAndMessage, msgDef, fieldElementList, deepLevel,
messageName);

for (var elementField : fieldElementList) {
for (final var elementField : fieldElementList) {
final var elementFieldType = elementField.getType();
final var dotType = checkDotType(elementFieldType);
if (nestedTypes.containsKey(elementFieldType)) {
Expand Down Expand Up @@ -292,10 +326,10 @@ private static void extracted(

addDefinition(msgDef, mapDotType, removed, globalNestedTypesByLevelAndMessage, deepLevel);
}
msgDef.addField("repeated", "typemapnumber" + elementField.getName(), elementField.getName(), elementField.getTag());
msgDef.addField("repeated", TYPE_MAP_NUMBER + elementField.getName(), elementField.getName(), elementField.getTag());

msgDef.addMessageDefinition(
MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
MessageDefinition.newBuilder(TYPE_MAP_NUMBER + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
} else if (Objects.nonNull(elementField.getLabel())) {
msgDef.addField(elementField.getLabel().toString().toLowerCase(), elementField.getType(), elementField.getName(), elementField.getTag());
} else {
Expand Down Expand Up @@ -346,7 +380,7 @@ private static void addDefinition(
if (typeElement instanceof EnumElement) {
final var enumElement = (EnumElement) typeElement;
final EnumDefinition.Builder builder = EnumDefinition.newBuilder(enumElement.getName());
for (var constant : enumElement.getConstants()) {
for (final var constant : enumElement.getConstants()) {
builder.addValue(constant.getName(), constant.getTag());
}
msgDef.addEnumDefinition(builder.build());
Expand Down
36 changes: 11 additions & 25 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ public static Arguments getCommonConsumerDefaultParameters() {
defaultParameters.addArgument(ConsumerConfig.CLIENT_ID_CONFIG, "");
defaultParameters.addArgument(ConsumerConfig.SECURITY_PROVIDERS_CONFIG, "");
defaultParameters.addArgument(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS);
defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE);
defaultParameters.addArgument(SslConfigs.SSL_PROVIDER_CONFIG, "");
Expand Down Expand Up @@ -303,8 +302,7 @@ private static void verifySecurity(final JavaSamplerContext context, final Prope
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, context.getParameter(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG));

props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG), ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
""));

props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, context.getParameter(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG));
Expand Down Expand Up @@ -353,23 +351,17 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
props.putAll(originals);

try {
generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} catch (final KLoadGenException exc) {
if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
throw exc;
}
}
} else {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}

return generator;
Expand Down Expand Up @@ -428,25 +420,19 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {

props.putAll(originals);

generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
} else {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
}

return generator;
}

public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeaders, final ProducerRecord<Object, Object> producerRecord) {
final List<String> headersSB = new ArrayList<>();
for (HeaderMapping kafkaHeader : kafkaHeaders) {
final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(),
10,
Collections.emptyList()).toString();
for (final HeaderMapping kafkaHeader : kafkaHeaders) {
final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(), 10, Collections.emptyList()).toString();
headersSB.add(kafkaHeader.getHeaderName().concat(":").concat(headerValue));
producerRecord.headers().add(kafkaHeader.getHeaderName(), headerValue.getBytes(StandardCharsets.UTF_8));
}
Expand Down
Loading

0 comments on commit 27f2aa7

Please sign in to comment.