diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 1fbbf1c6e4..b61e07c319 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -4,9 +4,8 @@ import ( "context" "database/sql" "fmt" - "net/url" - _ "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.temporal.io/sdk/log" @@ -79,16 +78,17 @@ func NewClickhouseConnector( } func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) { - dsn := fmt.Sprintf("clickhouse://%s:%d/%s?username=%s&password=%s", - config.Host, config.Port, - url.PathEscape(config.Database), url.QueryEscape(config.User), url.QueryEscape(config.Password)) - - conn, err := sql.Open("clickhouse", dsn) - if err != nil { - return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) - } + conn := clickhouse.OpenDB(&clickhouse.Options{ + Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, + Auth: clickhouse.Auth{ + Database: config.Database, + Username: config.User, + Password: config.Password, + }, + }) if err := conn.PingContext(ctx); err != nil { + conn.Close() return nil, fmt.Errorf("failed to ping to Clickhouse peer: %w", err) }