Skip to content

Latest commit

 

History

History
 
 

tutorial

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Debezium Tutorial

This demo automatically deploys the topology of services as defined in the Debezium Tutorial.

Using MySQL

# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.2
docker-compose -f docker-compose-mysql.yaml up

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

# Consume messages from a Debezium topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

# Modify records in the database via MySQL client
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

# Shut down the cluster
docker-compose -f docker-compose-mysql.yaml down

Using MySQL and the Avro message format

To use Avro-style messages instead of JSON, Avro can be configured one of two ways, in the Kafka Connect worker configuration or in the connector configuration. Using Avro in conjunction with the schema registry allows for much more compact messages.

Kafka Connect Worker configuration

Configuring Avro at the Kafka Connect worker involves using the same steps above for MySQL but instead using the docker-compose-mysql-avro-worker.yaml configuration file instead. The Compose file configures the Connect service to use the Avro (de-)serializers for the Connect instance and starts one more additional service, the Confluent schema registry.

Debezium Connector configuration

Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-avro-connector.yaml and register-mysql-avro.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Confluent schema registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the schema registry.

You can access the first version of the schema for customers values like so:

curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1

Or, if you have the jq utility installed, you can get a formatted output like this:

curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'

If you alter the structure of the customers table in the database and trigger another change event, a new version of that schema will be available in the registry.

The service registry also comes with a console consumer that can read the Avro messages:

docker-compose -f docker-compose-mysql-avro-worker.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --property schema.registry.url=http://schema-registry:8081 \
    --topic dbserver1.inventory.customers

Using MySQL and Apicurio Registry

Apicurio Registry is an open-source API and schema registry that amongst other things can be used to store schemas of Kafka records. It provides

  • its own native Avro converter and Protobuf serializer
  • a JSON converter that exports its schema into the registry
  • a compatibility layer with other schema registries such as IBM's or Confluent's; it can be used with the Confluent Avro converter.

For the Apicurio examples we will use the following deployment topology:

Apicurio example topology

JSON format

Configuring JSON converter with externalized schema at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio-converter-json.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.

You can access the first version of the schema for customers values like so:

curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value

Or, if you have the jq utility installed, you can get a formatted output like this:

curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value | jq .

If you alter the structure of the customers table in the database and trigger another change event, a new version of that schema will be available in the registry.

You can consume the JSON messages in the same way as with standard JSON converter

docker-compose -f docker-compose-mysql-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

When you look at the data message you will notice that it contains only payload but not schema part as this is externalized into the registry.

Avro format using Apicurio Avro converter

Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio-converter-avro.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.

You can access the first version of the schema for customers values like so:

curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value

Or, if you have the jq utility installed, you can get a formatted output like this:

curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value | jq .

If you alter the structure of the customers table in the database and trigger another change event, a new version of that schema will be available in the registry.

Avro format using Confluent Avro converter

Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.

You can access the first version of the schema for customers values like so:

curl -X GET http://localhost:8080/api/ccompat/subjects/dbserver1.inventory.customers-value/versions/1

Or, if you have the jq utility installed, you can get a formatted output like this:

curl -X GET http://localhost:8080/api/ccompat/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'

If you alter the structure of the customers table in the database and trigger another change event, a new version of that schema will be available in the registry.

To consume the Avro messages It is possible to use kafkacat tool:

docker run --rm --tty \
  --network tutorial_default \
  debezium/tooling \
  kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://apicurio:8080/api/ccompat \
  -t dbserver1.inventory.customers | jq .

Using Postgres

# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.2
docker-compose -f docker-compose-postgres.yaml up

# Start Postgres connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json

# Consume messages from a Debezium topic
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

# Modify records in the database via Postgres client
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'

# Shut down the cluster
docker-compose -f docker-compose-postgres.yaml down

Using MongoDB

# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.2
docker-compose -f docker-compose-mongodb.yaml up

# Initialize MongoDB replica set and insert some test data
docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c '/usr/local/bin/init-inventory.sh'

# Start MongoDB connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json

# Consume messages from a Debezium topic
docker-compose -f docker-compose-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

# Modify records in the database via MongoDB client
docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'

db.customers.insert([
    { _id : NumberLong("1005"), first_name : 'Bob', last_name : 'Hopper', email : '[email protected]', unique_id : UUID() }
]);

# Shut down the cluster
docker-compose -f docker-compose-mongodb.yaml down

Using Oracle

This assumes Oracle is running on localhost (or is reachable there, e.g. by means of running it within a VM or Docker container with appropriate port configurations) and set up with the configuration, users and grants described in the Debezium Vagrant set-up. Note that the connector is using the XStream API, which requires a license for the Golden Gate product (which itself is not required be installed, though).

Also you must download the Oracle instant client for Linux and put it under the directory debezium-with-oracle-jdbc/oracle_instantclient.

# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.2
docker-compose -f docker-compose-oracle.yaml up --build

# Insert test data
cat debezium-with-oracle-jdbc/init/inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

Adjust the host name of the database server and the name of the XStream outbound server in register-oracle.json as per your environment. Then register the Debezium Oracle connector:

# Start Oracle connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-oracle.json

# Create a test change record
echo "INSERT INTO customers VALUES (NULL, 'John', 'Doe', '[email protected]');" | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

# Consume messages from a Debezium topic
docker-compose -f docker-compose-oracle.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic server1.DEBEZIUM.CUSTOMERS

# Modify other records in the database via Oracle client
docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

# Shut down the cluster
docker-compose -f docker-compose-oracle.yaml down

Using SQL Server

# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.2
docker-compose -f docker-compose-sqlserver.yaml up

# Initialize database and insert test data
cat debezium-sqlserver-init/inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

# Start SQL Server connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

# Consume messages from a Debezium topic
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic server1.dbo.customers

# Modify records in the database via SQL Server client (do not forget to add `GO` command to execute the statement)
docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'

# Shut down the cluster
docker-compose -f docker-compose-sqlserver.yaml down

Using Db2

# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.2

docker-compose -f docker-compose-db2.yaml up --build

# Start DB2 connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-db2.json

# Consume messages from a Debezium topic
docker-compose -f docker-compose-db2.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic db2server.DB2INST1.CUSTOMERS

# Modify records in the database via DB2 client
docker-compose -f docker-compose-db2.yaml exec db2server bash -c 'su - db2inst1'
db2 connect to TESTDB
db2 "INSERT INTO DB2INST1.CUSTOMERS(first_name, last_name, email) VALUES ('John', 'Doe', '[email protected]');"
# Shut down the cluster
docker-compose -f docker-compose-db2.yaml down

Using externalized secrets

Kafka Connect allows externalization of secrets into a separate configuration repository. The configuration is done at both worker and connector level.

# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.2
docker-compose -f docker-compose-mysql-ext-secrets.yml up

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-ext-secrets.json

# Check plugin configuration to see that secrets are not visible
curl -s http://localhost:8083/connectors/inventory-connector/config | jq .

# Shut down the cluster
docker-compose -f docker-compose-mysql-ext-secrets.yml down

Debugging

Should you need to establish a remote debugging session into a deployed connector, add the following to the environment section of the connect in the Compose file service:

- KAFKA_DEBUG=true
- DEBUG_SUSPEND_FLAG=n
- JAVA_DEBUG_PORT=*:5005

Also expose the debugging port 5005 under ports:

- 5005:5005

You can then establish a remote debugging session from your IDE on localhost:5005.