Skip to content

Latest commit

 

History

History
61 lines (44 loc) · 2.85 KB

README.md

File metadata and controls

61 lines (44 loc) · 2.85 KB

forked from balajirrao/kafka-streams-multi-runner and fixed for bug reproduction

kafka-streams-multi-runner

This main purpose of this project is to reproduce a bug in kafka-streams present at least in version 3.3.1 and likely since earlier.

Details of the bug

I found that in a production kafka-streams application, during deployments when partitions move from one application instance to another, state stores returned stale data.

This project can be used to reproduce the bug by running multiple instances of a kafka streams app with the ability to start and stop instances in an orderly fashion.

Using this I have been able to reproduce the bug in the following setting:

  • processor API based app
  • number of standby tasks of 1
  • cache enabled key-value state store

The interaction between caching and standby tasks is the cause of this bug. When an active task becomes a standby, the restoration doesn't invalidate the cache, leading to stale values returned by the cached key-value store when the task becomes active again on the same instance.

Setup

This project has two sub projects - coordinator and worker. The worker is a processor-api based kafka-streams app and the coordinator coordinates the running of multiple instances of the worker.

When the coordinator is run, it creates an input topic and produces certain messages in that topic. The worker stores some state in a state store and makes some assertions on what it expects the state to be.

The coordinator starts and stops instances of worker according to a "program", a list of integers. An integer in a program toggles the running state of the instance every 15 seconds. For example, the program "1 2 1" results in the following sequence of execution

  • start instance 1 -> wait 15 seconds -> start instance 2 -> wait 15 seconds -> stop instance 1

How to run the project

You'll need to install sbt.

  • The coordinator expects a worker JAR to be present in a certain location in the project. Run sbt "worker/assembly" to build the worker JAR.
  • The worker expects kafka to be running on localhost:9092 with plaintext auth.
  • Run sbt "coordinator / run 1" to run the program "1 2 3" in the coordinator. This particular program reproduces the said bug. If no arguments are passed to the coordinator, it runs a random program.
  • The coordinator will output the following
        sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
        ...
        appid: 437163b7-6f91-4aa2-bdf1-c7ba7e5a40ca
        [0] starting instance +1
        forwardAll Start
        [0]!!! BROKEN !!! Key: 859930 Expected in stored(Cache or Store) value: 2 but KeyValueIterator value: 1
    
    

The number in square brackets is the "program-counter". You can find worker logs for this instance in {app_id}.{program-counter}.log