title | summary |
Replicate data from TiDB to Apache Kafka |
Learn how to replicate data from TiDB to Apache Kafka |
This document describes how to replicate data from TiDB to Apache Kafka by using TiCDC, which includes the following steps:
- Deploy a TiCDC cluster and a Kafka cluster.
- Create a changefeed with Kafka as the sink.
- Write data to the TiDB cluster by using go-tpc. On Kafka console consumer, check that the data is replicated to a specified Kafka topic.
These steps are performed in a lab environment. You can also deploy a cluster for a production environment by referring to these steps.
Deploy a TiCDC cluster.
You can deploy a TiCDC quickly by running the
tiup playground
command.{{< copyable "shell-regular" >}}
tiup playground --host --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 # View cluster status tiup status
In a production environment, you can deploy a TiCDC as instructed in Deploy TiCDC.
Deploy a Kafka cluster.
- To quickly deploy a Kafka cluster, refer to Apache Kakfa Quickstart.
- To deploy a Kafka cluster in production environments, refer to Running Kafka in Production.
Use tiup ctl to create a changefeed with Kafka as the downstream node.
{{< copyable "shell-regular" >}}
tiup ctl cdc changefeed create --pd="" --sink-uri="kafka://" --changefeed-id="kafka-changefeed"
If the command is executed successfully, information about the changefeed is displayed, such as the changefeed ID and the sink URI.
{{< copyable "shell-regular" >}}
Create changefeed successfully!
ID: kafka-changefeed
Info: {"sink-uri":"kafka://","opts":{},"create-time":"2022-04-06T14:45:10.824475+08:00","start-ts":432335096583028737,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"canal-json","column-selectors":null},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}},"state":"normal","error":null,"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v6.1.0-master"}
If the command does not return any information, you should check network connectivity from the server where the command is executed to the target Kafka cluster.
In production environments, a Kafka cluster has multiple broker nodes. Therefore, you can add the addresses of multiple brokers to the sink UIR. This improves stable access to the Kafka cluster. When a Kafka cluster is faulty, the changefeed still works. Suppose that a Kafka cluster has three broker nodes, with IP addresses being,, and, respectively. You can create a changefeed with the following sink URI.
{{< copyable "shell-regular" >}}
tiup ctl cdc changefeed create --pd="" --sink-uri="kafka://,,"
After executing the preceding command, run the following command to check the status of the changefeed.
{{< copyable "shell-regular" >}}
tiup ctl cdc changefeed list --pd=""
You can manage the status of a changefeed as instructed in Manage replication tasks (changefeed
After a changefeed is created, once there is any event change in the TiDB cluster, such as an INSERT
operation, data change is generated in TiCDC. Then TiCDC replicates the data change to the sink specified in the changefeed. In this document, the sink is Kafka and the data change is written to the specified Kafka topic.
Simulate service workload.
In the lab environment, you can use
to write data to the TiDB cluster, which is used as the source of the changefeed. Specifically, run the following command to create a databasetpcc
in the upstream TiDB cluster. Then useTiUP bench
to write data to this new database.{{< copyable "shell-regular" >}}
create database tpcc; tiup bench tpcc -H -P 4000 -D tpcc --warehouses 4 prepare tiup bench tpcc -H -P 4000 -D tpcc --warehouses 4 run --time 300s
For more details about
, refer to How to Run TPC-C Test on TiDB. -
Consume data change from Kafka.
When a changefeed works normally, it writes data to the Kafka topic. You can run
to view the written data.{{< copyable "shell-regular" >}}
./bin/kafka-console-consumer.sh --bootstrap-server --from-beginning --topic `${topic-name}`
In production environments, you need to develop Kafka Consumer to consume the data in the Kafka topic.