Skip to content

Commit

Permalink
added instruction for reading from sink
Browse files Browse the repository at this point in the history
  • Loading branch information
dzlab committed Sep 28, 2024
1 parent 59870ea commit 512d50c
Showing 1 changed file with 99 additions and 14 deletions.
113 changes: 99 additions & 14 deletions _posts/2024-06-13-debezium-elk.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ img_excerpt:
<img align="center" src="/assets/logos/debeziumio-ar21.svg" width="300" />
<br/>


You need to install the Elasticsearch sink connector separately or use the Kafka Connect images from Confluent and install Debezium into those https://www.confluent.io/hub/debezium/debezium-connector-postgresql


`Dockerfile.connect-jdbc-es`

```Dockerfile
Expand Down Expand Up @@ -179,28 +183,23 @@ $ curl -H "Accept:application/json" localhost:8083/connectors/
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "dbserver1",
"schema.include.list": "inventory"
"schema.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
```

I validated my config by

```shell
$ curl -s -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connector-plugins/io.debezium.connector.postgresql.PostgresConnector/config/validate -d @connect-config.json | jq

{
"name": "io.debezium.connector.postgresql.PostgresConnector",
"error_count": 0,
. . .
```
Start Postgres connector

```shell
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @pg-source.json

{"name":"pg-source","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","topic.prefix":"dbserver1","schema.include.list":"inventory","name":"pg-source"},"tasks":[],"type":"source"}
{"name":"pg-source","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","topic.prefix":"dbserver1","schema.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement":"$3","name":"pg-source"},"tasks":[],"type":"source"}
```

Check that the connector is created:
Expand Down Expand Up @@ -261,7 +260,6 @@ $ curl localhost:8083/connectors/elastic-sink/status
{"name":"elastic-sink","connector":{"state":"RUNNING","worker_id":"172.17.0.19:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.17.0.19:8083"}],"type":"sink"}
```

## Postgres

Modify records in the database via Postgres client
Expand Down Expand Up @@ -293,6 +291,93 @@ VALUES (default,'Sally','Thomas','[email protected]'),
```


## Elasticsearch

```shell
curl 'http://localhost:9200/customers/_search?pretty'
```

```json
{
"took" : 836,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "[email protected]"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "[email protected]"
}
}
]
}
}
```



## Teardown

```shell
docker stop connect
docker stop kafka
docker stop zookeeper
docker stop elastic
docker stop postgres
```

```shell
# Shut down the cluster
Expand Down

0 comments on commit 512d50c

Please sign in to comment.