Skip to content

Commit

Permalink
add more settings to 0
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 16, 2024
1 parent 5cc52c4 commit ad5ff28
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
11 changes: 7 additions & 4 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (
)

const (
PeerDBClickhouseQueryMaxMemoryUsage string = "64000000000"
PeerDBClickhouseMaxBlockSize string = "10240"
PeerDBClickhouseMaxInsertBlockSize string = "10240"
PeerDBClickhouseMaxInsertThreads string = "2"
PeerDBClickhouseQueryMaxMemoryUsage string = "64000000000"
PeerDBClickhouseMaxBlockSize string = "10240"
PeerDBClickhouseMaxInsertBlockSize string = "10240"
PeerDBClickhouseMaxInsertThreads string = "2"
PeerDBClickhouseMaxServerMemoryUsage string = "0"
PeerDBClickhouseMemoryRamRatio string = "0"
PeerDBClickhouseMemoryOvercommitRatioDenominator string = "0"
)

type ClickhouseConnector struct {
Expand Down
24 changes: 12 additions & 12 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

var ClickhouseQuerySettings = clickhouse.Settings{
"max_memory_usage": PeerDBClickhouseQueryMaxMemoryUsage,
"max_block_size": PeerDBClickhouseMaxBlockSize,
"max_insert_block_size": PeerDBClickhouseMaxInsertBlockSize,
"max_insert_threads": PeerDBClickhouseMaxInsertThreads,
"max_server_memory_usage": PeerDBClickhouseMaxServerMemoryUsage,
"max_server_memory_usage_to_ram_ratio": PeerDBClickhouseMemoryRamRatio,
"memory_overcommit_ratio_denominator": PeerDBClickhouseMemoryOvercommitRatioDenominator,
}

type ClickhouseAvroSyncMethod struct {
config *protos.QRepConfig
connector *ClickhouseConnector
Expand Down Expand Up @@ -54,12 +64,7 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}

insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_memory_usage": PeerDBClickhouseQueryMaxMemoryUsage,
"max_block_size": PeerDBClickhouseMaxBlockSize,
"max_insert_block_size": PeerDBClickhouseMaxInsertBlockSize,
"max_insert_threads": PeerDBClickhouseMaxInsertThreads,
}))
insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(ClickhouseQuerySettings))
//nolint:gosec
query := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM s3('%s','%s','%s'%s, 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl,
Expand Down Expand Up @@ -156,12 +161,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}

insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_memory_usage": PeerDBClickhouseQueryMaxMemoryUsage,
"max_block_size": PeerDBClickhouseMaxBlockSize,
"max_insert_block_size": PeerDBClickhouseMaxInsertBlockSize,
"max_insert_threads": PeerDBClickhouseMaxInsertThreads,
}))
insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(ClickhouseQuerySettings))
//nolint:gosec
query := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
Expand Down

0 comments on commit ad5ff28

Please sign in to comment.