From 04deaf0c6da736dd379087d7395da20e85f8d2a9 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj <65964360+Amogh-Bharadwaj@users.noreply.github.com> Date: Wed, 27 Nov 2024 04:34:52 +0530 Subject: [PATCH] Clickhouse: add select consistency (#2300) ### Overview [A recent effort ](https://github.com/PeerDB-io/peerdb/pull/2256/files) was made to improve PeerDB's ingestion performance into ClickHouse by parallelizing the INSERT INTO SELECT queries which took data from the raw table and put it into the final tables. Just before this step, PeerDB moves data from S3 to the raw table. ### Problem In this implementation, every insert runs on a new ClickHouse session - which means the inserts could be running on different nodes/replicas. In this case, there exists a non-zero chance (albeit very unliikely) that the INSERT INTO SELECT does not read the rows in the raw table inserted by the first step. This is documented here: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency ### Solution This PR adds the setting `select_sequential_consistency = 1` in the `Connect` function which we use for connecting to ClickHouse --- flow/connectors/clickhouse/clickhouse.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 63ccea693..f024a767e 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -228,11 +228,12 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou tlsSetting.RootCAs = caPool } - var settings clickhouse.Settings + // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency + settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) } else if maxInsertThreads != 0 { - settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads} + settings["max_insert_threads"] = maxInsertThreads } conn, err := clickhouse.Open(&clickhouse.Options{