Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka base realisation #83

Open
kirillyu opened this issue Mar 24, 2022 · 5 comments
Open

Kafka base realisation #83

kirillyu opened this issue Mar 24, 2022 · 5 comments
Labels
enhancement New feature or request

Comments

@kirillyu
Copy link
Contributor

I revied the plugins which implement it and make the conclusion that they are really more complex and overfeatured that the base needed. In my mind there is only two base needs: Configure kafka, Produce messages to kafka.
I create this code on Kotlin to realise it. Maybe it will be helpfull for anybody, or maybe implement it in dsl:

`import java.util.Properties
import kotlin.collections.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import us.abstracta.jmeter.javadsl.JmeterDsl
import us.abstracta.jmeter.javadsl.java.DslJsr223Sampler

object Kafka {
private val kafkaSets = HashMap<String, KafkaProducer<String, String>>()

var propsTemplate = Properties()

init {
    // propsSet["bootstrap.servers"] ="host1:9092,host2:9092host3:9092" - example
    propsTemplate["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    propsTemplate["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    propsTemplate["compression.type"] = "none"
    propsTemplate["batch.size"] = "16384"
    propsTemplate["linger.ms"] = "0"
    propsTemplate["buffer.memory"] = "33554432"
    propsTemplate["acks"] = "all"
    propsTemplate["send.buffer.bytes"] = "131072"
    propsTemplate["receive.buffer.bytes"] = "32768"
    propsTemplate["security.protocol"] = "PLAINTEXT"
    propsTemplate["sasl.kerberos.service.name"] = "kafka"
    propsTemplate["sasl.mechanism"] = "GSSAPI"
    propsTemplate["message.key.placeholder"] = "KEY"
    propsTemplate["message.value.placeholder"] = "MESSAGE"
    propsTemplate["kerberos.auth.enabled"] = "NO"
    propsTemplate["java.security.auth.login.config"] = "null"
    propsTemplate["java.security.krb5.conf"] = "null"
    propsTemplate["ssl.enabled"] = "NO"
    propsTemplate["ssl.key.password"] = "null"
    propsTemplate["ssl.keystore.location"] = "null"
    propsTemplate["ssl.keystore.password"] = "null"
    propsTemplate["ssl.keystore.type"] = "JKS"
    propsTemplate["ssl.truststore.location"] = "null"
    propsTemplate["ssl.truststore.password"] = "null"
    propsTemplate["ssl.truststore.type"] = "JKS"
}

fun kafkaConfig(configName: String, propSet: Properties): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaConfig") {
        if (!kafkaSets.containsKey(configName)) {
            for (prop in propsTemplate) {
                if (!propSet.containsKey(prop.key)) {
                    propSet.setProperty(prop.key as String?, prop.value as String?)
                }
            }
            if (!propSet.containsKey("bootstrap.servers"))
                throw Exception("Couldn't create config without property \"bootstrap.servers\"")
            val kp = KafkaProducer<String, String>(propSet)
            kafkaSets[configName] = kp
        }
    }
}

fun kafkaConfig(configName: String, bootstraps: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaConfig") {
        if (!kafkaSets.containsKey(configName)) {
            val propSet = propsTemplate
            propSet.setProperty("bootstrap.servers", bootstraps)
            val kp = KafkaProducer<String, String>(propSet)
            kafkaSets[configName] = kp
        }
    }
}

fun kafkaProduce(configName: String, topic: String, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, "key", bodyMessage)) // key needed?
        }
    }
}

fun kafkaProduce(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, partition, "key", bodyMessage))
        }
    }
}

fun kafkaProduceWithTimestamp(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, partition, System.currentTimeMillis(), "key", bodyMessage))
        }
    }
}

fun kafkaProduce(configName: String, topic: String, createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String):
    DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, "key", createBodyMessage.invoke(it))) // key needed?
        }
    }
}

fun kafkaProduce(
    configName: String,
    topic: String,
    partition: Int,
    createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(
                ProducerRecord(
                    topic, partition, "key",
                    createBodyMessage.invoke(it)
                )
            )
        }
    }
}

fun kafkaProduceWithTimestamp(
    configName: String,
    topic: String,
    partition: Int,
    createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(
                ProducerRecord(
                    topic, partition, System.currentTimeMillis(),
                    "key",
                    createBodyMessage.invoke(it)
                )
            )
        }
    }
}

}`

@rabelenda rabelenda added the enhancement New feature or request label Mar 24, 2022
@Sonal94539
Copy link

We also need Kafka based support in JMeter Java DSL for async API load testing. We are using Kafka Support and
Kafka backend Listener from Jmeter plugin manager and hope to have similar features in DSL so that we can start using it .
Thanks !

@kirillyu
Copy link
Contributor Author

Can you tell me what is missing in the example I provided?

@Sonal94539
Copy link

I am still learning about DSL and how we can use it to replace a conventional Jmeter based performance test.
And My requirement is to create new performance/load tests for our Kafka Based async APIs. So I read this thread which talks about kafka configuration and producing messages but as per description of this issue , seems like its not already a part of this DSL hence considered a future enhancement ?

@kirillyu
Copy link
Contributor Author

This is based on jsr223 Sampler, which is the part of dsl. DSL give you the ability to use any java code inside. So that's it

@Sonal94539
Copy link

Thank you for your response . I am new to DSL and Jmeter both so trying to understand how could I use these methods to build a performance test using DSL .
Would it be possible for you to provide similar methods for a creating a kafka consumer ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants