kafka-connect-jdbc-sink-for-db2 is a Kafka Connect sink connector for copying data from Apache Kafka into a Db2 Database. This is based on the generic connector for JDBC databases.
The main difference with the generic JDBC connector is that this allows the user to exploit the new MEMORY_TABLE table function to batch rows into Db2 and achieve significantly larger ingest speeds than with the generic insert mechanism. This can be enabled as part of the connection configuration using the new insert.function.value=memory_table
property.
The connector is supplied as source code which you can easily build into a JAR file.
- Clone the repository with the following command:
[email protected]:ibm-messaging/kafka-connect-jdbc-sink.git
- Change directory to the
kafka-connect-jdbc-sink
directory:
cd kafka-connect-jdbc-sink
- Build the connector using Maven:
mvn clean package
-
Setup a local zookeeper service running on port 2181 (default)
-
Setup a local kafka service running on port 9092 (default)
-
Setup a local rabbitmq service running on port 15672 (default)
-
Copy the compiled jar file into the
/usr/local/share/java/
directory:
cp target/kafka-connect-jdbc-sink-1.0.0-SNAPSHOT-jar-with-dependencies.jar /usr/local/share/java/
- Copy the
connect-standalone.properties
andjdbc-sink.properties
files into the/usr/local/etc/kafka/
directory.
Note that for Db2 LUW 11.5.9 or later, one can use insert.function.value=memory_table
in jdbc-sink.properties
to insert rows using the SYSPROC.MEMORY_TABLE function instead of an INSERT with the input batch of rows. Also, setting the value.converter.schemas.enabled=true will allow valueSchema in the writer code to get the schema needed fro Db2 insert and create table. Additionally, if you want to change the connector batch size, one can set the Kafka Connect property to enable connector.client.config.override.policy=All and the connector needs to use settings: batch.size: , and consumer.override.max.poll.records: .
cp config/* /usr/local/etc/kafka/
- Go to the kafka installation directory
/usr/local/etc/kafka/
:
cd /usr/local/etc/kafka/
- Set the CLASSPATH value to
/usr/local/share/java/
as follows:
export CLASSPATH=/usr/local/share/java/
- Create a target kafka topic named
kafka_test
:
kafka-topics --create --topic kafka_test --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
- Set up a JDBC database with an accessible URL and Port Number as well as a user with read/write privileges.
Setting up this database involves creating the database, creating user with password and proper access privileges.
Below are some of the commands involved in setting up databases using postgresql:
create user {username};
create database {dbname};
grant all privileges on database {dbname} to {username};
\l - list databse
\du is to verify user roles
\c to select database
Below are some of the commands involved in setting up databases using db2 using a docker image:
1. docker network create DB2net
2. mkdir db2
3. cd db2
4. docker run -it -d --name mydb2 --privileged=true -p 50000:50000 -e LICENSE=accept -e DB2INST1_PASSWORD=<Access Password> -e DBNAME=db2 -v "$PWD":/database --network DB2net ibmcom/db2
5. docker logs -f mydb2
# make sure all 4 tasks are completed and
# (*) All databases are now active
6. docker exec -it mydb2 bash -c "su - db2inst1"
7. db2
8. create db kafka_test
9. connect to kafka_test
10. list tables
11. select * from company
Download the dc2jcc.jar file from the following url: https://www.ibm.com/support/pages/db2-jdbc-driver-versions-and-downloads and place it into the jar classpath /usr/local/share/java/
.
- Open up the
config\jdbc-connector.json
file using the command below:
vi config\jdbc-connector.json
- Set the following values in the
config\jdbc-connector.json
file:
"connection.url": <CONNECTION_URL_OF_YOUR_DATABASE>, (ie: "jdbc:postgresql://127.0.0.1:5432/postgres")
"connection.user": <CONNECTION_USER>, (ie: "newuser")
"connection.password": <CONNECTION_PASSWORD>, (ie: "test")
"table.name.format": <DATABASE_TABLE_NAME> (ie: "company")
Run the following command to start the sink connector service in standalone mode:
connect-standalone connect-standalone.properties jdbc-sink.properties
- In order to run the connector in distributed mode you must first register the connector with Kafka Connect service by creating a JSON file in the format below:
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "com.ibm.eventstreams.connect.jdbcsink.JDBCSinkConnector",
"tasks.max": "1",
"topics": "kafka_test",
"connection.url": "jdbc:postgresql://127.0.0.1:5432/postgres",
"connection.user": "newuser",
"connection.password": "test",
"connection.ds.pool.size": 5,
"insert.mode.databaselevel": true,
"table.name.format": "company"
}
}
A version of this file, config/jdbc-connector.json
, is located in the config
directory. To register
the connector do the following:
- Run the following command to the start the source connector service in distributed mode:
connect-distributed connect-distributed.properties
- Run the following command to register the connector with the Kafka Connect service:
curl -s -X POST -H 'Content-Type: application/json' --data @config/jdbc-connector.json http://localhost:8083/connectors
You can verify that your connector was properly registered by going to http://localhost:8083/connectors
which
should return a full list of available connectors. This JSON connector profile will be propegated to all workers
across the distributed system. After following these steps your connector will now run in distributed mode.
- Run a kafka producer using the following value.schema by entering the following command:
kafka-console-producer --broker-list localhost:9092 --topic kafka_test
The console producer will now wait for the output.
- Copy the following record into the producer terminal:
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "Roy Jones","company": "General Motors"}}
- Open up the command-line client of your JDBC database and verify that a record has been added into the target database table. If the database table did not exist prior to this, it would have been created by this process.
Be sure to target the proper database by using \c <database_name>
or USE <database_name>;
.
select * from company;
- You should be able to see your newly created record added to this database table as follows:
id | timestamp | name | company
----+------+---------------+-----------+----------------
1 | 1587969949600 | Roy Jones | General Motors
Commercial support for this connector is available for customers with a support entitlement for IBM Event Automation or IBM Cloud Pak for Integration.
For issues relating specifically to this connector, please use the GitHub issue tracker. If you do want to submit a Pull Request related to this connector, please read the contributing guide first to understand how to sign your commits.
Copyright 2020, 2023 IBM Corporation
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
(http://www.apache.org/licenses/LICENSE-2.0)
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.The project is licensed under the Apache 2 license.