Skip to content

Commit

Permalink
Stilgar: Add Clickhouse query level settings (#1728)
Browse files Browse the repository at this point in the history
Adds
```
max_memory_usage
max_block_size
max_insert_threads
max_insert_block_size
```
  • Loading branch information
Amogh-Bharadwaj committed Jun 5, 2024
1 parent a9b1e36 commit da95bdd
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
7 changes: 7 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

const (
PeerDBClickhouseQueryMaxMemoryUsage string = "64000000000"
PeerDBClickhouseMaxBlockSize string = "10240"
PeerDBClickhouseMaxInsertBlockSize string = "10240"
PeerDBClickhouseMaxInsertThreads string = "2"
)

type ClickhouseConnector struct {
*metadataStore.PostgresMetadata
database *sql.DB
Expand Down
19 changes: 17 additions & 2 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -50,12 +51,19 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}

insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_memory_usage": PeerDBClickhouseQueryMaxMemoryUsage,
"max_block_size": PeerDBClickhouseMaxBlockSize,
"max_insert_block_size": PeerDBClickhouseMaxInsertBlockSize,
"max_insert_threads": PeerDBClickhouseMaxInsertThreads,
}))
//nolint:gosec
query := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM s3('%s','%s','%s'%s, 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

_, err = s.connector.database.ExecContext(ctx, query)
_, err = s.connector.database.ExecContext(insertSelectQueryCtx, query)

return err
}
Expand Down Expand Up @@ -144,12 +152,19 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}

insertSelectQueryCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_memory_usage": PeerDBClickhouseQueryMaxMemoryUsage,
"max_block_size": PeerDBClickhouseMaxBlockSize,
"max_insert_block_size": PeerDBClickhouseMaxInsertBlockSize,
"max_insert_threads": PeerDBClickhouseMaxInsertThreads,
}))
//nolint:gosec
query := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

_, err = s.connector.database.ExecContext(ctx, query)
_, err = s.connector.database.ExecContext(insertSelectQueryCtx, query)
if err != nil {
s.connector.logger.Error("Failed to insert into select for Clickhouse: ", err)
return 0, err
Expand Down

0 comments on commit da95bdd

Please sign in to comment.