diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala index ce451aa..cd103ce 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala @@ -40,14 +40,23 @@ object KafkaCase { def main(args: Array[String]): Unit = { val writerConfig = config.getConfig("writer") + val readerConfig = config.getConfig("reader") + val topicName = config.getString("topicName") + println("Sample how to use writer with manual resource handling") WriterManualResourceHandling(writerConfig, topicName, sampleMessageToWrite) + println("Sample how to use writer with custom resource handling") WriterCustomResourceHandling(writerConfig, topicName, sampleMessageToWrite) + println("Sample how to use writer with Usings in scala 3") WriterUsingsResourceHandling(writerConfig, topicName, sampleMessageToWrite) + println("Sample how to use writer write-once fashion") WriterWriteOnce(writerConfig, topicName, sampleMessageToWrite) - val readerConfig = config.getConfig("reader") + println("Sample how to use reader with manual resource handling") ReaderManualResourceHandling[EdlaChange](readerConfig, topicName) + println("Sample how to use reader with custom resource handling") ReaderCustomResourceHandling[EdlaChange](readerConfig, topicName) + println("Sample how to use reader with Usings in scala 3") ReaderUsingsResourceHandling[EdlaChange](readerConfig, topicName) + println("Sample how to use reader read-once fashion") ReaderReadOnce[EdlaChange](readerConfig, topicName) } } diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala index 5d0e0e4..16612e7 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala @@ -16,12 +16,11 @@ package za.co.absa.kafkacase.examples.reader +import com.typesafe.config.Config import io.circe.Decoder import za.co.absa.kafkacase.reader.Reader -import java.util.Properties - object ReaderReadOnce { - def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = - Reader.readOnce[T](readerProps, topicName, println) + def apply[T: Decoder](readerConf: Config, topicName: String): Unit = + Reader.readOnce[T](readerConf, topicName, println) } diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala index 985fc14..e76a5d6 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala @@ -16,12 +16,11 @@ package za.co.absa.kafkacase.examples.writer +import com.typesafe.config.Config import io.circe.Encoder import za.co.absa.kafkacase.writer.Writer -import java.util.Properties - object WriterWriteOnce { - def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = - Writer.writeOnce(writerProps, topicName, "sampleKey", sampleMessageToWrite) + def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = + Writer.writeOnce(writerConf, topicName, "sampleKey", sampleMessageToWrite) } diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala index d57793d..5b5f5fb 100644 --- a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala @@ -16,6 +16,7 @@ package za.co.absa.kafkacase.reader +import com.typesafe.config.Config import io.circe.Decoder import java.util.Properties @@ -32,4 +33,14 @@ object Reader { reader.close() } } + + def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = { + val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false) + try { + for (item <- reader) + work(item) + } finally { + reader.close() + } + } } diff --git a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala index e63248c..c3a9ee6 100644 --- a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala +++ b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala @@ -16,6 +16,7 @@ package za.co.absa.kafkacase.writer +import com.typesafe.config.Config import io.circe.Encoder import java.util.Properties @@ -39,4 +40,13 @@ object Writer { writer.close() } } + + def writeOnce[T: Encoder](writerConf: Config, topicName: String, messageKey: String, sampleMessageToWrite: T): Unit = { + val writer = new WriterImpl[T](writerConf, topicName) + try { + writer.write(messageKey, sampleMessageToWrite) + } finally { + writer.close() + } + } }