Skip to content

Commit

Permalink
Clickhouse: add select consistency (#2300)
Browse files Browse the repository at this point in the history
### Overview
[A recent effort ](https://github.com/PeerDB-io/peerdb/pull/2256/files)
was made to improve PeerDB's ingestion performance into ClickHouse by
parallelizing the INSERT INTO SELECT queries which took data from the
raw table and put it into the final tables. Just before this step,
PeerDB moves data from S3 to the raw table.

### Problem
In this implementation, every insert runs on a new ClickHouse session -
which means the inserts could be running on different nodes/replicas. In
this case, there exists a non-zero chance (albeit very unliikely) that
the INSERT INTO SELECT does not read the rows in the raw table inserted
by the first step.
This is documented here:
https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency

### Solution
This PR adds the setting `select_sequential_consistency = 1` in the
`Connect` function which we use for connecting to ClickHouse
  • Loading branch information
Amogh-Bharadwaj authored Nov 26, 2024
1 parent c4742c3 commit 04deaf0
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,12 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou
tlsSetting.RootCAs = caPool
}

var settings clickhouse.Settings
// See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency
settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)}
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err)
} else if maxInsertThreads != 0 {
settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads}
settings["max_insert_threads"] = maxInsertThreads
}

conn, err := clickhouse.Open(&clickhouse.Options{
Expand Down

0 comments on commit 04deaf0

Please sign in to comment.