Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarification on snapshot.mode Functionality Its Interaction with ClickHouse Sink Connector #991

Open
Avnish986 opened this issue Feb 18, 2025 · 6 comments
Assignees

Comments

@Avnish986
Copy link

Avnish986 commented Feb 18, 2025

We are implementing a Change Data Capture (CDC) pipeline using Debezium to stream data from our source database into ClickHouse. We have configured Debezium with snapshot.mode=schema_only, table.include.list property being commented and observed the following behavior:

  1. Existing Data: Upon starting the connector, existing data in the source tables is not captured, as expected with the schema_only setting.
    2.New Data Changes: After the connector starts, any new data changes (INSERTs, UPDATEs, DELETEs) to existing tables are captured and streamed to ClickHouse.
    3.New Tables: If a new table is created in the source database after the connector has started, the schema of this new table is captured, and subsequent data changes to this table are also captured and streamed to ClickHouse.

we seek to clarify to ensure our understanding and implementation are correct.

snapshot.mode=initial

  1. Existing Data and Schema: The connector captures both the existing data and schema of the specified tables upon startup.
  2. New Data: After the initial snapshot, subsequent data changes are captured and streamed.
  3. New Tables: Newly created tables after the connector starts are not automatically captured; a connector restart or reconfiguration is necessary to include them.

snapshot.mode=when_needed

  1. Perform a snapshot only if there is no prior offset information or if the existing offset is invalid.
  2. If offsets are present and valid, the connector skips the snapshot and begins streaming changes from the last known position.

@subkanthi We appreciate your guidance to ensure our CDC implementation aligns with best practices and operates as intended.

@Avnish986
Copy link
Author

Avnish986 commented Feb 18, 2025

@subkanthi can you please help us validate our current setup and ensure optimal configuration for our CDC pipeline.

@Avnish986 Avnish986 changed the title Debezium MySQL Connector Fails on sink-connector-client restart with JsonEOFException Clarification on snapshot.mode Functionality Its Interaction with ClickHouse Sink Connector Feb 24, 2025
@aadant
Copy link
Collaborator

aadant commented Feb 24, 2025

I also use snapshot.mode=schema_only, I noticed that if you add new tables (not in the initial filtering) or new schema, you need to use snapshot.mode=recovery then restart the connector with snapshot.mode=schema_only.

@Avnish986
Copy link
Author

@aadant but I am using snapshot.mode=schema_only and once the connector is up and replication happened after that if I create a new table and add data to the table that is being replicated to clickhouse.

without using snapshot.mode=recovery but I have commented the property table.include.list.

Need help with the snapshot choice for production because currently using schema_only and long term there is some data loss observed during replication.

@aadant
Copy link
Collaborator

aadant commented Feb 25, 2025

@Avnish986 if you do not filter out databases or tables, it should work fine in schema_only. Just make sure to restore a consistent snapshot.

  • start the connector to configure all tables in the altinity_sink_connector schema
  • stop the connector
  • take a consistent snapshot using the python utilities : mysql_dumper.py
  • restore the consistent snapshot using clickhouse_loader.py
  • update the replication position (found in @.json) to altinity_sink_connector.replica_source_info.
  • restart the connector.
  • run checks using checksums / count

@Avnish986
Copy link
Author

Avnish986 commented Feb 26, 2025

@aadant
I appreciate your guidance on setting up the connector with snapshot.mode=schema_only. However, I'm unclear about the steps following step 2. Could you please provide a brief explanation of steps 3 through 6? Specifically:

  • Consistent Snapshot: How do I use the mysql_dumper.py utility to take a consistent snapshot?
  • Restoration: What is the process for restoring this snapshot into ClickHouse using clickhouse_loader.py?
  • Replication Position: How do I update the replication position from the .json file into altinity_sink_connector.replica_source_info?

A concise walkthrough of these steps would be greatly helpful.

FYI: Using sink-connector-lightweight

@Avnish986
Copy link
Author

Avnish986 commented Feb 26, 2025

@aadant also when I am inserting the data the connector logs are going in loop

create table query
CREATE TABLE ut_hello_25 ( id INT NOT NULL AUTO_INCREMENT, column1 VARCHAR(255) NOT NULL, PRIMARY KEY (id) );

clickhouse-sink-connector-lt_1  | 2025-02-26 12:06:56.633 INFO  - *************** EXECUTED BATCH Successfully Records: 1************** task(0) Thread ID: Sink Connector thread-pool-0 Result: [I@9c26e11 Database: test Table: ut_hello_25
clickhouse-sink-connector-lt_1  | 2025-02-26 12:06:56.733 INFO  - ****** Thread: Sink Connector thread-pool-0 Batch Size: 1 ******
clickhouse-sink-connector-lt_1  | 2025-02-26 12:06:56.741 INFO  - *** INSERT QUERY for Database(test) ***: insert into `ut_hello_25`(`id`,`column1`,`_version`,`is_deleted`) select `id`,`column1`,`_version`,`is_deleted` from input('`id` Int32,`column1` String,`_version` UInt64,`is_deleted` UInt8')

I have to drop table the table from click house in order to exit the loop
clickhouse-sink-connector-lt_1 | Caused by: java.io.IOException: Code: 60. DB::Exception: Unknown table expression identifier 'ut_hello_25' in scope SELECT * FROM ut_hello_25 WHERE 0. (UNKNOWN_TABLE) (version 24.8.4.13 (official build)) clickhouse-sink-connector-lt_1 | clickhouse-sink-connector-lt_1 | at com.clickhouse.client.http.ApacheHttpConnectionImpl.checkResponse(ApacheHttpConnectionImpl.java:209) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.http.ApacheHttpConnectionImpl.post(ApacheHttpConnectionImpl.java:243) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:118) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:280) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.ClickHouseClientBuilder$Agent.sendOnce(ClickHouseClientBuilder.java:282) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.ClickHouseClientBuilder$Agent.send(ClickHouseClientBuilder.java:294) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.ClickHouseClientBuilder$Agent.execute(ClickHouseClientBuilder.java:349) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.ClickHouseClient.executeAndWait(ClickHouseClient.java:878) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.client.ClickHouseRequest.executeAndWait(ClickHouseRequest.java:2154) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.jdbc.internal.ClickHouseConnectionImpl.getTableColumns(ClickHouseConnectionImpl.java:264) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.jdbc.internal.ClickHouseConnectionImpl.prepareStatement(ClickHouseConnectionImpl.java:876) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.clickhouse.jdbc.ClickHouseConnection.prepareStatement(ClickHouseConnection.java:121) ~[app.jar:?] clickhouse-sink-connector-lt_1 | at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.lambda$executePreparedStatement$0(PreparedStatementExecutor.java:121) ~[app.jar:?] clickhouse-sink-connector-lt_1 | ... 12 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants