Skip to content

Commit

Permalink
#23 Create a utils class that will help users manage their properties (
Browse files Browse the repository at this point in the history
  • Loading branch information
ABMC831 authored Oct 24, 2024
1 parent ccd7acd commit 901e4d5
Show file tree
Hide file tree
Showing 17 changed files with 143 additions and 69 deletions.
18 changes: 18 additions & 0 deletions examples/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
topicName = "MyStandardTopic"

writer {
bootstrap.servers = "my.lab.kafka.cluster.com:9092"
key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
}

reader {
bootstrap.servers = "my.lab.kafka.cluster.com:9092"
key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
client.id = "DebugConsumer_007"
group.id = "DebugGroup_007"
auto.offset.reset = "earliest"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import java.util.Properties

object ReaderUsingsResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
println("Scala 3 feature")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import java.util.Properties

object WriterUsingsResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
println("Scala 3 feature")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package za.co.absa.kafkacase.examples.reader
import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties
import scala.util.Using

object ReaderUsingsResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
Using(new ReaderImpl[T](readerProps, topicName, neverEnding = false)) { reader =>
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
Using(new ReaderImpl[T](readerConf, topicName, neverEnding = false)) { reader =>
for (item <- reader)
println(item)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package za.co.absa.kafkacase.examples.writer
import io.circe.Encoder
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties
import scala.util.Using

object WriterUsingsResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
Using(new WriterImpl[T](writerProps, topicName)) { writer =>
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
Using(new WriterImpl[T](writerConf, topicName)) { writer =>
writer.write("sampleMessageKey1", sampleMessageToWrite)
writer.write("sampleMessageKey2", sampleMessageToWrite)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package za.co.absa.kafkacase.examples

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import com.typesafe.config.ConfigFactory
import za.co.absa.kafkacase.examples.reader.{ReaderCustomResourceHandling, ReaderManualResourceHandling, ReaderReadOnce, ReaderUsingsResourceHandling}
import za.co.absa.kafkacase.examples.writer.{WriterCustomResourceHandling, WriterManualResourceHandling, WriterUsingsResourceHandling, WriterWriteOnce}
import za.co.absa.kafkacase.models.topics.EdlaChange

import java.util.{Properties, UUID}

object KafkaCase {
private val config = ConfigFactory.load()

// This goes from your application logic
private val sampleMessageToWrite = EdlaChange(
app_id_snow = "N/A",
Expand All @@ -39,33 +38,25 @@ object KafkaCase {
timestamp_event = 12345
)

// This goes from your config / domain knowledge
private val topicName = "KillMePleaseTopic"

// This goes from your writer configs
private val writerProps = new Properties()
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

// This goes from your reader configs
private val readerProps = new Properties()
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")


def main(args: Array[String]): Unit = {
WriterManualResourceHandling(writerProps, topicName, sampleMessageToWrite)
WriterCustomResourceHandling(writerProps, topicName, sampleMessageToWrite)
WriterUsingsResourceHandling(writerProps, topicName, sampleMessageToWrite)
WriterWriteOnce(writerProps, topicName, sampleMessageToWrite)
ReaderManualResourceHandling[EdlaChange](readerProps, topicName)
ReaderCustomResourceHandling[EdlaChange](readerProps, topicName)
ReaderUsingsResourceHandling[EdlaChange](readerProps, topicName)
ReaderReadOnce[EdlaChange](readerProps, topicName)
val writerConfig = config.getConfig("writer")
val readerConfig = config.getConfig("reader")
val topicName = config.getString("topicName")
println("Sample how to use writer with manual resource handling")
WriterManualResourceHandling(writerConfig, topicName, sampleMessageToWrite)
println("Sample how to use writer with custom resource handling")
WriterCustomResourceHandling(writerConfig, topicName, sampleMessageToWrite)
println("Sample how to use writer with Usings in scala 3")
WriterUsingsResourceHandling(writerConfig, topicName, sampleMessageToWrite)
println("Sample how to use writer write-once fashion")
WriterWriteOnce(writerConfig, topicName, sampleMessageToWrite)
println("Sample how to use reader with manual resource handling")
ReaderManualResourceHandling[EdlaChange](readerConfig, topicName)
println("Sample how to use reader with custom resource handling")
ReaderCustomResourceHandling[EdlaChange](readerConfig, topicName)
println("Sample how to use reader with Usings in scala 3")
ReaderUsingsResourceHandling[EdlaChange](readerConfig, topicName)
println("Sample how to use reader read-once fashion")
ReaderReadOnce[EdlaChange](readerConfig, topicName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties

object ReaderCustomResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
withResource(new ReaderImpl[T](readerProps, topicName, neverEnding = false))(reader => {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
withResource(ReaderImpl[T](readerConf, topicName))(reader => {
for (item <- reader)
println(item)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl

import java.util.Properties

object ReaderManualResourceHandling {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false)
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
val reader = ReaderImpl[T](readerConf, topicName)
try {
for (item <- reader)
println(item)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.reader.Reader

import java.util.Properties

object ReaderReadOnce {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit =
Reader.readOnce[T](readerProps, topicName, println)
def apply[T: Decoder](readerConf: Config, topicName: String): Unit =
Reader.readOnce[T](readerConf, topicName, (item: (String, Either[String, T])) => println(item))
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties

object WriterCustomResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
withResource(new WriterImpl[T](writerProps, topicName))(writer => {
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
withResource(new WriterImpl[T](writerConf, topicName))(writer => {
writer.write("sampleMessageKey1", sampleMessageToWrite)
writer.write("sampleMessageKey2", sampleMessageToWrite)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import za.co.absa.kafkacase.writer.WriterImpl

import java.util.Properties

object WriterManualResourceHandling {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
val writer = new WriterImpl[T](writerProps, topicName)
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
val writer = new WriterImpl[T](writerConf, topicName)
try {
writer.write("sampleMessageKey1", sampleMessageToWrite)
writer.write("sampleMessageKey2", sampleMessageToWrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import za.co.absa.kafkacase.writer.Writer

import java.util.Properties

object WriterWriteOnce {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit =
Writer.writeOnce(writerProps, topicName, "sampleKey", sampleMessageToWrite)
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit =
Writer.writeOnce(writerConf, topicName, "sampleKey", sampleMessageToWrite)
}
6 changes: 4 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ object Dependencies {

def readerDependencies: Seq[ModuleID] = Seq(
"io.circe" %% "circe-parser" % "0.14.9",
"org.apache.kafka" % "kafka-clients" % "3.8.0"
"org.apache.kafka" % "kafka-clients" % "3.8.0",
"com.typesafe" % "config" % "1.4.3"
)

def writerDependencies: Seq[ModuleID] = Seq(
"io.circe" %% "circe-parser" % "0.14.9",
"org.apache.kafka" % "kafka-clients" % "3.8.0"
"org.apache.kafka" % "kafka-clients" % "3.8.0",
"com.typesafe" % "config" % "1.4.3"
)

def examplesDependencies: Seq[ModuleID] = Seq.empty[ModuleID]
Expand Down
13 changes: 12 additions & 1 deletion reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.kafkacase.reader

import com.typesafe.config.Config
import io.circe.Decoder

import java.util.Properties
Expand All @@ -24,7 +25,17 @@ trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoC

object Reader {
def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false)
val reader = ReaderImpl[T](readerProps, topicName)
try {
for (item <- reader)
work(item)
} finally {
reader.close()
}
}

def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = ReaderImpl[T](readerConf, topicName)
try {
for (item <- reader)
work(item)
Expand Down
41 changes: 39 additions & 2 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@

package za.co.absa.kafkacase.reader

import com.typesafe.config.Config
import io.circe.Decoder
import io.circe.jawn.decode
import org.slf4j.LoggerFactory
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import za.co.absa.kafkacase.reader.ReaderImpl.{DEFAULT_TIMEOUT, log}
import za.co.absa.kafkacase.reader.ReaderImpl.{DEFAULT_TIMEOUT, convertConfigToProperties, log}

import java.time.Duration
import java.util
import java.util.Properties

class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = true) extends Reader[TType] {
class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Duration, neverEnding: Boolean) extends Reader[TType] {
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
private var singlePollIterator = fetchNextBatch()
Expand Down Expand Up @@ -60,5 +61,41 @@ class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Dura

object ReaderImpl {
private val DEFAULT_TIMEOUT: Duration = Duration.ofSeconds(3)
private val DEFAULT_NEVER_ENDING: Boolean = true
private val log = LoggerFactory.getLogger(this.getClass)

// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods
// Primary method that contains default arguments
def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = DEFAULT_NEVER_ENDING): ReaderImpl[TType] = {
new ReaderImpl[TType](props, topic, timeout, neverEnding)
}

// Overloaded method with Config and all optional arguments
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration, neverEnding: Boolean): ReaderImpl[TType] = {
val props = convertConfigToProperties(config)
apply[TType](props, topic, timeout, neverEnding)
}

// Overloaded method with Config and neverEnding optional argument
def apply[TType: Decoder](config: Config, topic: String, neverEnding: Boolean): ReaderImpl[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, neverEnding)
}

// Overloaded method with Config and timeout optional argument
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration): ReaderImpl[TType] = {
apply[TType](config, topic, timeout, DEFAULT_NEVER_ENDING)
}

// Overloaded method with Config and none of optional arguments
def apply[TType: Decoder](config: Config, topic: String): ReaderImpl[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVER_ENDING)
}

private def convertConfigToProperties(config: Config): Properties = {
val properties = new Properties()
config.entrySet().forEach { entry =>
properties.put(entry.getKey, config.getString(entry.getKey))
}
properties
}
}
10 changes: 10 additions & 0 deletions writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.kafkacase.writer

import com.typesafe.config.Config
import io.circe.Encoder

import java.util.Properties
Expand All @@ -39,4 +40,13 @@ object Writer {
writer.close()
}
}

def writeOnce[T: Encoder](writerConf: Config, topicName: String, messageKey: String, sampleMessageToWrite: T): Unit = {
val writer = new WriterImpl[T](writerConf, topicName)
try {
writer.write(messageKey, sampleMessageToWrite)
} finally {
writer.close()
}
}
}
Loading

0 comments on commit 901e4d5

Please sign in to comment.