Skip to content

Commit

Permalink
Clickhouse insert-select: account for lower version (#1707)
Browse files Browse the repository at this point in the history
This PR tweaks the insert into select queries of clickhouse qrep to
account for lower clickhouse versions where session token as an argument
is not supported for the s3() table function.
ClickHouse/ClickHouse#61230

functionally tested
  • Loading branch information
Amogh-Bharadwaj authored May 9, 2024
1 parent 9d74565 commit c6d7b28
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
if err != nil {
return err
}

sessionTokenPart := ""
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', '%s', 'Avro')",
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s'%s, 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken)
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -137,10 +142,15 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
selector = append(selector, "`"+colName+"`")
}
selectorStr := strings.Join(selector, ",")

sessionTokenPart := ""
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', '%s', 'Avro')",
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken)
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

_, err = s.connector.database.ExecContext(ctx, query)
if err != nil {
Expand Down

0 comments on commit c6d7b28

Please sign in to comment.