diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 80e2e80f5b..1fbbf1c6e4 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "net/url" _ "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -78,8 +79,9 @@ func NewClickhouseConnector( } func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) { - dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", // TODO &database=%s" - config.Host, config.Port, config.User, config.Password) // TODO , config.Database + dsn := fmt.Sprintf("clickhouse://%s:%d/%s?username=%s&password=%s", + config.Host, config.Port, + url.PathEscape(config.Database), url.QueryEscape(config.User), url.QueryEscape(config.Password)) conn, err := sql.Open("clickhouse", dsn) if err != nil { @@ -90,12 +92,6 @@ func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, err return nil, fmt.Errorf("failed to ping to Clickhouse peer: %w", err) } - // Execute USE database command to select a specific database - _, err = conn.Exec(fmt.Sprintf("USE %s", config.Database)) - if err != nil { - return nil, fmt.Errorf("failed in selecting db in Clickhouse peer: %w", err) - } - return conn, nil }