From 512d50cbf7d2df441ec9b9158b0fe35f1cba0ecc Mon Sep 17 00:00:00 2001 From: dzlab Date: Fri, 27 Sep 2024 19:44:40 -0700 Subject: [PATCH] added instruction for reading from sink --- _posts/2024-06-13-debezium-elk.md | 113 ++++++++++++++++++++++++++---- 1 file changed, 99 insertions(+), 14 deletions(-) diff --git a/_posts/2024-06-13-debezium-elk.md b/_posts/2024-06-13-debezium-elk.md index a91e7f9..0426cd9 100644 --- a/_posts/2024-06-13-debezium-elk.md +++ b/_posts/2024-06-13-debezium-elk.md @@ -14,6 +14,10 @@ img_excerpt:
+ +You need to install the Elasticsearch sink connector separately or use the Kafka Connect images from Confluent and install Debezium into those https://www.confluent.io/hub/debezium/debezium-connector-postgresql + + `Dockerfile.connect-jdbc-es` ```Dockerfile @@ -179,28 +183,23 @@ $ curl -H "Accept:application/json" localhost:8083/connectors/ "database.password": "postgres", "database.dbname" : "postgres", "topic.prefix": "dbserver1", - "schema.include.list": "inventory" + "schema.include.list": "inventory", + "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", + "schema.history.internal.kafka.topic": "schema-changes.inventory", + "transforms": "route", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", + "transforms.route.replacement": "$3" } } ``` -I validated my config by - -```shell -$ curl -s -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connector-plugins/io.debezium.connector.postgresql.PostgresConnector/config/validate -d @connect-config.json | jq - -{ - "name": "io.debezium.connector.postgresql.PostgresConnector", - "error_count": 0, -. . . -``` - Start Postgres connector ```shell $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @pg-source.json -{"name":"pg-source","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","topic.prefix":"dbserver1","schema.include.list":"inventory","name":"pg-source"},"tasks":[],"type":"source"} +{"name":"pg-source","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","topic.prefix":"dbserver1","schema.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement":"$3","name":"pg-source"},"tasks":[],"type":"source"} ``` Check that the connector is created: @@ -261,7 +260,6 @@ $ curl localhost:8083/connectors/elastic-sink/status {"name":"elastic-sink","connector":{"state":"RUNNING","worker_id":"172.17.0.19:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.19:8083"}],"type":"sink"} ``` - ## Postgres Modify records in the database via Postgres client @@ -293,6 +291,93 @@ VALUES (default,'Sally','Thomas','sally.thomas@acme.com'), ``` +## Elasticsearch + +```shell +curl 'http://localhost:9200/customers/_search?pretty' +``` + +```json +{ + "took" : 836, + "timed_out" : false, + "_shards" : { + "total" : 1, + "successful" : 1, + "skipped" : 0, + "failed" : 0 + }, + "hits" : { + "total" : { + "value" : 4, + "relation" : "eq" + }, + "max_score" : 1.0, + "hits" : [ + { + "_index" : "customers", + "_type" : "customer", + "_id" : "1003", + "_score" : 1.0, + "_source" : { + "id" : 1003, + "first_name" : "Edward", + "last_name" : "Walker", + "email" : "ed@walker.com" + } + }, + { + "_index" : "customers", + "_type" : "customer", + "_id" : "1004", + "_score" : 1.0, + "_source" : { + "id" : 1004, + "first_name" : "Anne", + "last_name" : "Kretchmar", + "email" : "annek@noanswer.org" + } + }, + { + "_index" : "customers", + "_type" : "customer", + "_id" : "1002", + "_score" : 1.0, + "_source" : { + "id" : 1002, + "first_name" : "George", + "last_name" : "Bailey", + "email" : "gbailey@foobar.com" + } + }, + { + "_index" : "customers", + "_type" : "customer", + "_id" : "1001", + "_score" : 1.0, + "_source" : { + "id" : 1001, + "first_name" : "Sally", + "last_name" : "Thomas", + "email" : "sally.thomas@acme.com" + } + } + ] + } +} +``` + + + +## Teardown + +```shell +docker stop connect +docker stop kafka +docker stop zookeeper +docker stop elastic +docker stop postgres +``` ```shell # Shut down the cluster