diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala index d8d6236..853b672 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala @@ -18,8 +18,8 @@ package za.co.absa.kafkacase.examples import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig -import za.co.absa.kafkacase.examples.reader.{ReaderCustomResourceHandling, ReaderManualResourceHandling, ReaderUsingsResourceHandling} -import za.co.absa.kafkacase.examples.writer.{WriterCustomResourceHandling, WriterManualResourceHandling, WriterUsingsResourceHandling} +import za.co.absa.kafkacase.examples.reader.{ReaderCustomResourceHandling, ReaderManualResourceHandling, ReaderReadOnce, ReaderUsingsResourceHandling} +import za.co.absa.kafkacase.examples.writer.{WriterCustomResourceHandling, WriterManualResourceHandling, WriterUsingsResourceHandling, WriterWriteOnce} import za.co.absa.kafkacase.models.topics.EdlaChange import java.util.{Properties, UUID} @@ -62,8 +62,10 @@ object KafkaCase { WriterManualResourceHandling(writerProps, topicName, sampleMessageToWrite) WriterCustomResourceHandling(writerProps, topicName, sampleMessageToWrite) WriterUsingsResourceHandling(writerProps, topicName, sampleMessageToWrite) + WriterWriteOnce(writerProps, topicName, sampleMessageToWrite) ReaderManualResourceHandling[EdlaChange](readerProps, topicName) ReaderCustomResourceHandling[EdlaChange](readerProps, topicName) ReaderUsingsResourceHandling[EdlaChange](readerProps, topicName) + ReaderReadOnce[EdlaChange](readerProps, topicName) } } diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala new file mode 100644 index 0000000..5d0e0e4 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.reader + +import io.circe.Decoder +import za.co.absa.kafkacase.reader.Reader + +import java.util.Properties + +object ReaderReadOnce { + def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = + Reader.readOnce[T](readerProps, topicName, println) +} diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala new file mode 100644 index 0000000..985fc14 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.writer + +import io.circe.Encoder +import za.co.absa.kafkacase.writer.Writer + +import java.util.Properties + +object WriterWriteOnce { + def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = + Writer.writeOnce(writerProps, topicName, "sampleKey", sampleMessageToWrite) +} diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala index a93b18c..d57793d 100644 --- a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala @@ -16,4 +16,20 @@ package za.co.absa.kafkacase.reader +import io.circe.Decoder + +import java.util.Properties + trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoCloseable + +object Reader { + def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = { + val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false) + try { + for (item <- reader) + work(item) + } finally { + reader.close() + } + } +} diff --git a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala index 85b07c1..e63248c 100644 --- a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala +++ b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala @@ -16,6 +16,10 @@ package za.co.absa.kafkacase.writer +import io.circe.Encoder + +import java.util.Properties + trait Writer[TType] extends AutoCloseable { def write(key: String, value: TType): Unit def flush(): Unit @@ -25,3 +29,14 @@ trait Writer[TType] extends AutoCloseable { flush() } } + +object Writer { + def writeOnce[T: Encoder](writerProps: Properties, topicName: String, messageKey: String, sampleMessageToWrite: T): Unit = { + val writer = new WriterImpl[T](writerProps, topicName) + try { + writer.write(messageKey, sampleMessageToWrite) + } finally { + writer.close() + } + } +}