Skip to content

Commit

Permalink
#332 Add Topic Name Strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
AdrianLagartera committed Mar 8, 2023
1 parent af0dfe3 commit a129cd7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.sngular.kloadgen.model.FieldValueMapping;
import com.sngular.kloadgen.model.HeaderMapping;
import com.sngular.kloadgen.randomtool.generator.StatelessGeneratorTool;
import com.sngular.kloadgen.util.NamingStrategyKeyHelper;
import com.sngular.kloadgen.util.ProducerKeysHelper;
import com.sngular.kloadgen.util.PropsKeysHelper;
import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
Expand Down Expand Up @@ -123,6 +124,7 @@ public static Properties setupCommonProperties(final JavaSamplerContext context)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, context.getParameter(ProducerConfig.COMPRESSION_TYPE_CONFIG));
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, context.getParameter(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
props.put(ProducerKeysHelper.SASL_MECHANISM, context.getParameter(ProducerKeysHelper.SASL_MECHANISM));
props.put(ProducerKeysHelper.KAFKA_TOPIC_CONFIG, context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG));

if (Objects.nonNull(context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG,
Expand All @@ -135,6 +137,8 @@ public static Properties setupCommonProperties(final JavaSamplerContext context)
}
});

// Todo: Add kafk topic name

verifySecurity(context, props);

return props;
Expand Down Expand Up @@ -343,10 +347,16 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
props.putAll(originals);

try {
generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
if (props.getProperty(ProducerKeysHelper.VALUE_NAME_STRATEGY).equals(NamingStrategyKeyHelper.TOPIC_NAME_STRATEGY)) {
generator.setUpGenerator(originals, props.getProperty(ProducerKeysHelper.KAFKA_TOPIC_CONFIG),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}

} catch (final KLoadGenException exc) {
if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
generator.setUpGenerator(
Expand All @@ -358,8 +368,8 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
}
} else {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}

return generator;
Expand Down Expand Up @@ -427,10 +437,10 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {

public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeaders, final ProducerRecord<Object, Object> producerRecord) {
final List<String> headersSB = new ArrayList<>();
for (HeaderMapping kafkaHeader : kafkaHeaders) {
for (final HeaderMapping kafkaHeader : kafkaHeaders) {
final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(),
10,
Collections.emptyList()).toString();
10,
Collections.emptyList()).toString();
headersSB.add(kafkaHeader.getHeaderName().concat(":").concat(headerValue));
producerRecord.headers().add(kafkaHeader.getHeaderName(), headerValue.getBytes(StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.sngular.kloadgen.util;

public class NamingStrategyKeyHelper {

public static final String TOPIC_NAME_STRATEGY = "io.confluent.kafka.serializers.subject.TopicNameStrategy";

public static final String TOPIC_RECORD_NAME_STRATEGY = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy";

public static final String RECORD_NAME_STRATEGY = "io.confluent.kafka.serializers.subject.RecordNameStrategy";
}

0 comments on commit a129cd7

Please sign in to comment.