From c6d7b282790dd460fba5ab93d1f50a57b37c17c9 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 9 May 2024 23:46:43 +0530 Subject: [PATCH] Clickhouse insert-select: account for lower version (#1707) This PR tweaks the insert into select queries of clickhouse qrep to account for lower clickhouse versions where session token as an argument is not supported for the s3() table function. https://github.com/ClickHouse/ClickHouse/issues/61230 functionally tested --- flow/connectors/clickhouse/qrep_avro_sync.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index a0a9fb9a71..b015ef47e4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -47,10 +47,15 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a if err != nil { return err } + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s'%s, 'Avro')", s.config.DestinationTableIdentifier, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) @@ -137,10 +142,15 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector = append(selector, "`"+colName+"`") } selectorStr := strings.Join(selector, ",") + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')", config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) if err != nil {