diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index a0a9fb9a71..b015ef47e4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -47,10 +47,15 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a if err != nil { return err } + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s'%s, 'Avro')", s.config.DestinationTableIdentifier, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) @@ -137,10 +142,15 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector = append(selector, "`"+colName+"`") } selectorStr := strings.Join(selector, ",") + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')", config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) if err != nil {