From e20dffcf172a045c7b9c32c357da12b8bf26bb2f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 16 May 2024 19:08:18 +0530 Subject: [PATCH] query level settings --- flow/connectors/clickhouse/clickhouse.go | 7 +++++++ flow/connectors/clickhouse/qrep_avro_sync.go | 19 +++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index d19b5dbedb..5337435580 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -23,6 +23,13 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) +const ( + PeerDBClickhouseQueryMaxMemoryUsage string = "64000000000" + PeerDBClickhouseMaxBlockSize string = "10240" + PeerDBClickhouseMaxInsertBlockSize string = "10240" + PeerDBClickhouseMaxInsertThreads string = "2" +) + type ClickhouseConnector struct { *metadataStore.PostgresMetadata database *sql.DB diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 0156784c86..47c8b29130 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -10,6 +10,7 @@ import ( "go.temporal.io/sdk/activity" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -52,12 +53,19 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a if creds.AWS.SessionToken != "" { sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) } + + insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ + "max_memory_usage": PeerDBClickhouseQueryMaxMemoryUsage, + "max_block_size": PeerDBClickhouseMaxBlockSize, + "max_insert_block_size": PeerDBClickhouseMaxInsertBlockSize, + "max_insert_threads": PeerDBClickhouseMaxInsertThreads, + })) //nolint:gosec query := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM s3('%s','%s','%s'%s, 'Avro')", s.config.DestinationTableIdentifier, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - _, err = s.connector.database.ExecContext(ctx, query) + _, err = s.connector.database.ExecContext(insertSelectQueryCtx, query) return err } @@ -147,12 +155,19 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( if creds.AWS.SessionToken != "" { sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) } + + insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ + "max_memory_usage": PeerDBClickhouseQueryMaxMemoryUsage, + "max_block_size": PeerDBClickhouseMaxBlockSize, + "max_insert_block_size": PeerDBClickhouseMaxInsertBlockSize, + "max_insert_threads": PeerDBClickhouseMaxInsertThreads, + })) //nolint:gosec query := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')", config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - _, err = s.connector.database.ExecContext(ctx, query) + _, err = s.connector.database.ExecContext(insertSelectQueryCtx, query) if err != nil { s.connector.logger.Error("Failed to insert into select for Clickhouse: ", err) return 0, err