Skip to content

Commit

Permalink
#23 Create a utils class that will help users manage their properties
Browse files Browse the repository at this point in the history
  • Loading branch information
ABMC831 committed Oct 23, 2024
1 parent ccd7acd commit 3bb76fa
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 60 deletions.
18 changes: 18 additions & 0 deletions examples/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
topicName = "KillMePleaseTopic"

writer {
bootstrap.servers = "ZADALNRAPP00009.corp.dsarena.com:9092"
key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
}

reader {
bootstrap.servers = "ZADALNRAPP00009.corp.dsarena.com:9092"
key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
client.id = "DebugConsumer_007"
group.id = "DebugGroup_007"
auto.offset.reset = "earliest"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import java.util.Properties

object ReaderUsingsResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
println("Scala 3 feature")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import java.util.Properties

object WriterUsingsResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
println("Scala 3 feature")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package za.co.absa.kafkacase.examples.reader
import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties
import scala.util.Using

object ReaderUsingsResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
Using(new ReaderImpl[T](readerProps, topicName, neverEnding = false)) { reader =>
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
Using(new ReaderImpl[T](readerConf, topicName, neverEnding = false)) { reader =>
for (item <- reader)
println(item)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package za.co.absa.kafkacase.examples.writer
import io.circe.Encoder
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties
import scala.util.Using

object WriterUsingsResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
Using(new WriterImpl[T](writerProps, topicName)) { writer =>
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
Using(new WriterImpl[T](writerConf, topicName)) { writer =>
writer.write("sampleMessageKey1", sampleMessageToWrite)
writer.write("sampleMessageKey2", sampleMessageToWrite)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package za.co.absa.kafkacase.examples

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import com.typesafe.config.ConfigFactory
import za.co.absa.kafkacase.examples.reader.{ReaderCustomResourceHandling, ReaderManualResourceHandling, ReaderReadOnce, ReaderUsingsResourceHandling}
import za.co.absa.kafkacase.examples.writer.{WriterCustomResourceHandling, WriterManualResourceHandling, WriterUsingsResourceHandling, WriterWriteOnce}
import za.co.absa.kafkacase.models.topics.EdlaChange

import java.util.{Properties, UUID}

object KafkaCase {
private val config = ConfigFactory.load()

// This goes from your application logic
private val sampleMessageToWrite = EdlaChange(
app_id_snow = "N/A",
Expand All @@ -39,33 +38,16 @@ object KafkaCase {
timestamp_event = 12345
)

// This goes from your config / domain knowledge
private val topicName = "KillMePleaseTopic"

// This goes from your writer configs
private val writerProps = new Properties()
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

// This goes from your reader configs
private val readerProps = new Properties()
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")


def main(args: Array[String]): Unit = {
WriterManualResourceHandling(writerProps, topicName, sampleMessageToWrite)
WriterCustomResourceHandling(writerProps, topicName, sampleMessageToWrite)
WriterUsingsResourceHandling(writerProps, topicName, sampleMessageToWrite)
WriterWriteOnce(writerProps, topicName, sampleMessageToWrite)
ReaderManualResourceHandling[EdlaChange](readerProps, topicName)
ReaderCustomResourceHandling[EdlaChange](readerProps, topicName)
ReaderUsingsResourceHandling[EdlaChange](readerProps, topicName)
ReaderReadOnce[EdlaChange](readerProps, topicName)
val writerConfig = config.getConfig("writer")
WriterManualResourceHandling(writerConfig, topicName, sampleMessageToWrite)
WriterCustomResourceHandling(writerConfig, topicName, sampleMessageToWrite)
WriterUsingsResourceHandling(writerConfig, topicName, sampleMessageToWrite)
WriterWriteOnce(writerConfig, topicName, sampleMessageToWrite)
val readerConfig = config.getConfig("reader")
ReaderManualResourceHandling[EdlaChange](readerConfig, topicName)
ReaderCustomResourceHandling[EdlaChange](readerConfig, topicName)
ReaderUsingsResourceHandling[EdlaChange](readerConfig, topicName)
ReaderReadOnce[EdlaChange](readerConfig, topicName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties

object ReaderCustomResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
withResource(new ReaderImpl[T](readerProps, topicName, neverEnding = false))(reader => {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
withResource(new ReaderImpl[T](readerConf, topicName, neverEnding = false))(reader => {
for (item <- reader)
println(item)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties

object ReaderManualResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false)
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false)
try {
for (item <- reader)
println(item)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties

object WriterCustomResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
withResource(new WriterImpl[T](writerProps, topicName))(writer => {
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
withResource(new WriterImpl[T](writerConf, topicName))(writer => {
writer.write("sampleMessageKey1", sampleMessageToWrite)
writer.write("sampleMessageKey2", sampleMessageToWrite)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties

object WriterManualResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
val writer = new WriterImpl[T](writerProps, topicName)
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
val writer = new WriterImpl[T](writerConf, topicName)
try {
writer.write("sampleMessageKey1", sampleMessageToWrite)
writer.write("sampleMessageKey2", sampleMessageToWrite)
Expand Down
6 changes: 4 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ object Dependencies {

def readerDependencies: Seq[ModuleID] = Seq(
"io.circe" %% "circe-parser" % "0.14.9",
"org.apache.kafka" % "kafka-clients" % "3.8.0"
"org.apache.kafka" % "kafka-clients" % "3.8.0",
"com.typesafe" % "config" % "1.4.3"
)

def writerDependencies: Seq[ModuleID] = Seq(
"io.circe" %% "circe-parser" % "0.14.9",
"org.apache.kafka" % "kafka-clients" % "3.8.0"
"org.apache.kafka" % "kafka-clients" % "3.8.0",
"com.typesafe" % "config" % "1.4.3"
)

def examplesDependencies: Seq[ModuleID] = Seq.empty[ModuleID]
Expand Down
22 changes: 20 additions & 2 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@

package za.co.absa.kafkacase.reader

import com.typesafe.config.Config
import io.circe.Decoder
import io.circe.jawn.decode
import org.slf4j.LoggerFactory
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import za.co.absa.kafkacase.reader.ReaderImpl.{DEFAULT_TIMEOUT, log}
import za.co.absa.kafkacase.reader.ReaderImpl.{DEFAULT_TIMEOUT, convertConfigToProperties, log}

import java.time.Duration
import java.util
import java.util.Properties

class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = true) extends Reader[TType] {
class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Duration, neverEnding: Boolean) extends Reader[TType] {
// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary constructors
def this(config: Config, topic: String, timeout: Duration) = this(convertConfigToProperties(config), topic, timeout, true)
def this(config: Config, topic: String, neverEnding: Boolean) = this(convertConfigToProperties(config), topic, DEFAULT_TIMEOUT, neverEnding)
def this(config: Config, topic: String) = this(convertConfigToProperties(config), topic, DEFAULT_TIMEOUT, true)
def this(config: Config, topic: String, timeout: Duration, neverEnding: Boolean) = this(convertConfigToProperties(config), topic, timeout, neverEnding)
def this(props: Properties, topic: String, timeout: Duration) = this(props, topic, timeout, true)
def this(props: Properties, topic: String, neverEnding: Boolean) = this(props, topic, DEFAULT_TIMEOUT, neverEnding)
def this(props: Properties, topic: String) = this(props, topic, DEFAULT_TIMEOUT, true)

private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
private var singlePollIterator = fetchNextBatch()
Expand Down Expand Up @@ -61,4 +71,12 @@ class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Dura
object ReaderImpl {
private val DEFAULT_TIMEOUT: Duration = Duration.ofSeconds(3)
private val log = LoggerFactory.getLogger(this.getClass)

private def convertConfigToProperties(config: Config): Properties = {
val properties = new Properties()
config.entrySet().forEach { entry =>
properties.put(entry.getKey, config.getString(entry.getKey))
}
properties
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package za.co.absa.kafkacase.writer

import com.typesafe.config.Config
import io.circe.Encoder
import io.circe.syntax.EncoderOps
import org.slf4j.LoggerFactory
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import za.co.absa.kafkacase.writer.WriterImpl.log
import za.co.absa.kafkacase.writer.WriterImpl.{convertConfigToProperties, log}

import java.util.Properties

class WriterImpl[TType: Encoder](props: Properties, topic: String) extends Writer[TType] {
def this(config: Config, topic: String) = this(convertConfigToProperties(config), topic)

private val producer = new KafkaProducer[String, String](props)

def write(key: String, value: TType): Unit = {
Expand All @@ -39,4 +42,12 @@ class WriterImpl[TType: Encoder](props: Properties, topic: String) extends Write

object WriterImpl {
private val log = LoggerFactory.getLogger(this.getClass)

private def convertConfigToProperties(config: Config): Properties = {
val properties = new Properties()
config.entrySet().forEach { entry =>
properties.put(entry.getKey, config.getString(entry.getKey))
}
properties
}
}

0 comments on commit 3bb76fa

Please sign in to comment.