Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ask for committed offsets #168

Open
Flamma opened this issue Dec 21, 2018 · 0 comments
Open

Ask for committed offsets #168

Flamma opened this issue Dec 21, 2018 · 0 comments

Comments

@Flamma
Copy link

Flamma commented Dec 21, 2018

I want to test if an offset is committed or not (because only commit in some situations). I have not found any way to test that.

I'm very newbie at this, so probably there's something I just don't get.

I'm using akka kafka streams. Here's an example of a test that is working.

  "KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes

      withRunningKafka {

        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()

        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }

publishAndRequestRetry uses publishToKafka to put a message and wait for the source to get it.

I want to add a test to know if the offset has been commited or not. Is this possible with EmbeddedKafka?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant