Skip to content

gojek/kat

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

68 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

KAT

Build Status

Kafka Admin Tool provides an interface to perform many admin operations on kafka in a straight-forward manner.

Installation

Using Homebrew

brew tap gojek/stable
brew install kat

Others

go install github.com/gojek/kat

Local Dev/Testing

  • Clone the repo
  • Run make all to run lint checks, unit tests and build the project
  • Manual testing: Running docker-compose up -d will create 2 local kafka clusters. Commands can be run against these clusters for testing

Admin operations available

Command Usage

Help

  • Display the various args accepted by each command and the corresponding defaults
kat --help
kat <cmd> --help

List Topics

  • List all the topics in a cluster
kat topic list --broker-list <"broker1:9092,broker2:9092">
  • List all topics with a particular replication factor
kat topic list --broker-list <"broker1:9092,broker2:9092"> --replication-factor <replication factor>
  • List all topics with last write time before given time (unused/stale topics)
kat topic list --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch time> --data-dir=<kafka logs directory>
  • List topic with size less than or equal to given size
kat topic list --broker-list <"broker1:9092,broker2:9092"> --size=<size in bytes>

Topic throughput metrics or last modified time is not available in topic metadata response from kafka. Hence, this tool has a custom implementation of ssh'ing into all the brokers and filtering through the kafka logs directory to find the topics that were not written after the given time.

Describe Topics

  • Describe metadata for topics
kat topic describe --broker-list <"broker1:9092,broker2:9092"> --topics <"topic1,topic2">

Delete Topics

  • Delete the topics that match the given topic-whitelist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --topic-whitelist=<*test*>
  • Delete the topics that do not match the given topic-blacklist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --topic-blacklist=<*test*>
  • Delete the topics that are not modified since the last-write epoch time and match the topic-whitelist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch time> --data-dir=<kafka logs directory>  --topic-whitelist=<*test*>
  • Delete the topics that are not modified since the last-write epoch time and do not match the topic-blacklist regex
kat topic delete --broker-list <"broker1:9092,broker2:9092"> --last-write=<epoch time> --data-dir=<kafka logs directory>  --topic-blacklist=<*test*>

List Consumer Groups for a Topic

  • Lists all the consumer groups that are subscribed to a given topic
kat consumergroup list -b <"broker1:9092,broker2:9092"> -t <topic-name>

Increase Replication Factor

  • Increase the replication factor of topics that match given regex
kat topic increase-replication-factor --broker-list <"broker1:9092,broker2:9092"> --zookeeper <"zookeeper1,zookeeper2"> --topics <"topic1|topic2.*"> --replication-factor <r> --num-of-brokers <n> --batch <b> --timeout-per-batch <t> --poll-interval <p> --throttle <t>

Details

Reassign Partitions

  • Reassign partitions for topics that match given regex
kat topic reassign-partitions --broker-list <"broker1:9092,broker2:9092"> --zookeeper <"zookeeper1,zookeeper2"> --topics <"topic1|topic2.*"> --broker-ids <i,j,k> --batch <b> --timeout-per-batch <t> --poll-interval <p> --throttle <t>

Details

Show Topic Configs

  • Show config for topics
kat topic config show --topics <"topic1,topic2"> --broker-list <"broker1:9092,broker2:9092">

Alter Topic Configs

  • Alter config for topics
kat topic config alter --topics <"topic1,topic2"> --broker-list <"broker1:9092,broker2:9092"> --config <"retention.ms=500000000,segment.bytes=1000000000">

Mirror Topic Configs from Source to Destination Cluster

  • Mirror all configs for topics present in both source and destination cluster
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4">
  • Mirror configs for topics present in both source and destination cluster, with some configs as exception
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes">
  • Mirror configs for topics present in source cluster, but not in destination cluster
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes"> --create-topics
  • Mirror configs for topics, with increase in partition count if there is a difference
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes"> --create-topics --increase-partitions
  • Preview changes that will be applied on the destination cluster after mirroring
kat mirror --source-broker-ips=<"broker1:9092,broker2:9092"> --destination-broker-ips=<"broker3,broker4"> --exclude-configs=<"retention.ms,segment.bytes"> --create-topics --increase-partitions --dry-run

Increase Replication Factor and Partition Reassignment Details

Increasing Replication Factor and Partition Reassignment are not one step processes. On a high level, the following steps need to be executed:

  1. Generating the reassignment.json file
  2. Executing kafka-reassign-partitions command
  3. Verifying the status of reassignment

Increase Replication Factor

  1. Topics are split into batches of the number passed in batch arg.
  2. Reassignment json file is created for each batch.
    • This file is created using custom round-robin mechanism, that assigns leaders and ISRs per partition.
  3. kafka-reassign-partitions command is executed for each batch.
  4. Status is polled for every poll-interval until the timeout-per-batch is reached. If the timeout breaches, the command exits. Once replication factor for all partitions in the batch are increased, then next batch is processed.
  5. The reassignment.json and rollback.json files for all the batches are stored in /tmp directory. In case of any failure, running the kafka-reassign-partitions by passing the rollback.json of the failed batch will restore the state of those partitions.

Partition Reassignment

  1. Topics are split into multi level batches of the number passed in topic-batch-size and partition-batch-size(Applicapble only for partition reassignment command) arg.
    • The reassignment is first into a topic level batch according to the topic-batch-size.
    • Each batch is then divided into sub-batches according to partition-batch-size.
    • This ensures that any point of time maximum partition-batch-size partitions are being moved in the cluster.
  2. Reassignment json file is created for each batch.
    • This is created using --generate flag provided by kafka cli tool.
  3. kafka-reassign-partitions command is executed for each batch.
  4. Status is polled for every poll-interval until the timeout-per-batch is reached. If the timeout breaches, the command exits. Once all partitions in the batch are migrated, then next batch is processed.
  5. The reassignment.json and rollback.json files for all the batches are stored in /tmp directory. In case of any failure, running the kafka-reassign-partitions by passing the rollback.json of the failed batch will restore the state of those partitions.
Graceful Shutdown and Resume
  • Partition reassignment tool also has support for graceful shutdown and resume.
  • If the a SIGINT(Ctrl+C) is supplied to the ongoing reassignment job, it will stop execution after the current batch is executed.
  • To resume the job from the previous state, just add --resume flag to the command with all the input flags provided to the previously stopped job.
  • The resume flag assumes that the cluster state has remained the same between the previously stopped job and the resumption job. If new topics have been added during that interval, it's possible the new topics skip reassignment.

Future Scope

  • Add support for more admin operations
  • Beautify the response of list and show config commands. Add custom features to ui pkg
  • Fetch values from a kat config file instead of passing everything as cmd args

Contributing

  • Raise an issue to clarify scope/questions, followed by PR
  • Follow go guidelines for development
  • Ensure make succeeds

Thanks for all the Contributors.

License

Licensed under the Apache License, Version 2.0

About

Swiss Knife for Kafka admin operations

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages