Skip to content

Commit

Permalink
clickhouse: fix connection string (#1257)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Feb 12, 2024
1 parent 79915cf commit 9c17dff
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package connclickhouse

import (
"context"
"crypto/tls"
"database/sql"
"fmt"

_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.temporal.io/sdk/log"

Expand Down Expand Up @@ -78,24 +79,30 @@ func NewClickhouseConnector(
}

func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) {
dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", // TODO &database=%s"
config.Host, config.Port, config.User, config.Password) // TODO , config.Database

conn, err := sql.Open("clickhouse", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
Auth: clickhouse.Auth{
Database: config.Database,
Username: config.User,
Password: config.Password,
},
TLS: &tls.Config{MinVersion: tls.VersionTLS13},
Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4},
ClientInfo: clickhouse.ClientInfo{
Products: []struct {
Name string
Version string
}{
{Name: "peerdb"},
},
},
})

if err := conn.PingContext(ctx); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to ping to Clickhouse peer: %w", err)
}

// Execute USE database command to select a specific database
_, err = conn.Exec(fmt.Sprintf("USE %s", config.Database))
if err != nil {
return nil, fmt.Errorf("failed in selecting db in Clickhouse peer: %w", err)
}

return conn, nil
}

Expand Down

0 comments on commit 9c17dff

Please sign in to comment.