diff --git a/_posts/2024-06-13-debezium-elk.md b/_posts/2024-06-13-debezium-elk.md
index a91e7f9..0426cd9 100644
--- a/_posts/2024-06-13-debezium-elk.md
+++ b/_posts/2024-06-13-debezium-elk.md
@@ -14,6 +14,10 @@ img_excerpt:
+
+You need to install the Elasticsearch sink connector separately or use the Kafka Connect images from Confluent and install Debezium into those https://www.confluent.io/hub/debezium/debezium-connector-postgresql
+
+
`Dockerfile.connect-jdbc-es`
```Dockerfile
@@ -179,28 +183,23 @@ $ curl -H "Accept:application/json" localhost:8083/connectors/
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "dbserver1",
- "schema.include.list": "inventory"
+ "schema.include.list": "inventory",
+ "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
+ "schema.history.internal.kafka.topic": "schema-changes.inventory",
+ "transforms": "route",
+ "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
+ "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
+ "transforms.route.replacement": "$3"
}
}
```
-I validated my config by
-
-```shell
-$ curl -s -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connector-plugins/io.debezium.connector.postgresql.PostgresConnector/config/validate -d @connect-config.json | jq
-
-{
- "name": "io.debezium.connector.postgresql.PostgresConnector",
- "error_count": 0,
-. . .
-```
-
Start Postgres connector
```shell
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @pg-source.json
-{"name":"pg-source","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","topic.prefix":"dbserver1","schema.include.list":"inventory","name":"pg-source"},"tasks":[],"type":"source"}
+{"name":"pg-source","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","topic.prefix":"dbserver1","schema.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement":"$3","name":"pg-source"},"tasks":[],"type":"source"}
```
Check that the connector is created:
@@ -261,7 +260,6 @@ $ curl localhost:8083/connectors/elastic-sink/status
{"name":"elastic-sink","connector":{"state":"RUNNING","worker_id":"172.17.0.19:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.19:8083"}],"type":"sink"}
```
-
## Postgres
Modify records in the database via Postgres client
@@ -293,6 +291,93 @@ VALUES (default,'Sally','Thomas','sally.thomas@acme.com'),
```
+## Elasticsearch
+
+```shell
+curl 'http://localhost:9200/customers/_search?pretty'
+```
+
+```json
+{
+ "took" : 836,
+ "timed_out" : false,
+ "_shards" : {
+ "total" : 1,
+ "successful" : 1,
+ "skipped" : 0,
+ "failed" : 0
+ },
+ "hits" : {
+ "total" : {
+ "value" : 4,
+ "relation" : "eq"
+ },
+ "max_score" : 1.0,
+ "hits" : [
+ {
+ "_index" : "customers",
+ "_type" : "customer",
+ "_id" : "1003",
+ "_score" : 1.0,
+ "_source" : {
+ "id" : 1003,
+ "first_name" : "Edward",
+ "last_name" : "Walker",
+ "email" : "ed@walker.com"
+ }
+ },
+ {
+ "_index" : "customers",
+ "_type" : "customer",
+ "_id" : "1004",
+ "_score" : 1.0,
+ "_source" : {
+ "id" : 1004,
+ "first_name" : "Anne",
+ "last_name" : "Kretchmar",
+ "email" : "annek@noanswer.org"
+ }
+ },
+ {
+ "_index" : "customers",
+ "_type" : "customer",
+ "_id" : "1002",
+ "_score" : 1.0,
+ "_source" : {
+ "id" : 1002,
+ "first_name" : "George",
+ "last_name" : "Bailey",
+ "email" : "gbailey@foobar.com"
+ }
+ },
+ {
+ "_index" : "customers",
+ "_type" : "customer",
+ "_id" : "1001",
+ "_score" : 1.0,
+ "_source" : {
+ "id" : 1001,
+ "first_name" : "Sally",
+ "last_name" : "Thomas",
+ "email" : "sally.thomas@acme.com"
+ }
+ }
+ ]
+ }
+}
+```
+
+
+
+## Teardown
+
+```shell
+docker stop connect
+docker stop kafka
+docker stop zookeeper
+docker stop elastic
+docker stop postgres
+```
```shell
# Shut down the cluster