diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index cf0da791aa..f197880c95 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -5,14 +5,15 @@ import ( "crypto/tls" "database/sql" "fmt" + "log/slog" "net/url" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/aws/aws-sdk-go-v2/service/s3" "go.temporal.io/sdk/log" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" - conns3 "github.com/PeerDB-io/peer-flow/connectors/s3" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" @@ -39,9 +40,18 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error return fmt.Errorf("failed to create S3 client: %w", err) } - validErr := conns3.ValidCheck(ctx, s3Client, creds.BucketPath, nil) - if validErr != nil { - return validErr + object, err := utils.NewS3BucketAndPrefix(creds.BucketPath) + if err != nil { + return fmt.Errorf("failed to create S3 bucket and prefix: %w", err) + } + + slog.Info(fmt.Sprintf("Validating S3 bucke: %s", object.Bucket)) + _, listErr := s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: &object.Bucket, + }, + ) + if listErr != nil { + return fmt.Errorf("failed to list objects: %w", listErr) } return nil