Skip to content

Commit

Permalink
Add clickhouse connector (#969)
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb authored Jan 17, 2024
1 parent 8bb0bf1 commit a0a663f
Show file tree
Hide file tree
Showing 16 changed files with 644 additions and 24 deletions.
9 changes: 9 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,15 @@ func (h *FlowRequestHandler) CreatePeer(
}
s3Config := s3ConfigObject.S3Config
encodedConfig, encodingErr = proto.Marshal(s3Config)
case protos.DBType_CLICKHOUSE:
chConfigObject, ok := config.(*protos.Peer_ClickhouseConfig)

if !ok {
return wrongConfigResponse, nil
}

chConfig := chConfigObject.ClickhouseConfig
encodedConfig, encodingErr = proto.Marshal(chConfig)
default:
return wrongConfigResponse, nil
}
Expand Down
81 changes: 81 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package connclickhouse

import (
"context"
"database/sql"
"fmt"
"log/slog"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseConnector struct {
ctx context.Context
database *sql.DB
tableSchemaMapping map[string]*protos.TableSchema
logger slog.Logger
}

func NewClickhouseConnector(ctx context.Context,
clickhouseProtoConfig *protos.ClickhouseConfig,
) (*ClickhouseConnector, error) {
database, err := connect(ctx, clickhouseProtoConfig)
if err != nil {
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &ClickhouseConnector{
ctx: ctx,
database: database,
tableSchemaMapping: nil,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) {
dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", //&database=%s"
config.Host, config.Port, config.User, config.Password) //, config.Database

conn, err := sql.Open("clickhouse", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

if err := conn.PingContext(ctx); err != nil {
return nil, fmt.Errorf("failed to ping to Clickhouse peer: %w", err)
}

// Execute USE database command to select a specific database
_, err = conn.Exec(fmt.Sprintf("USE %s", config.Database))
if err != nil {
return nil, fmt.Errorf("failed in selecting db in Clickhouse peer: %w", err)
}

return conn, nil
}

func (c *ClickhouseConnector) Close() error {
if c == nil || c.database == nil {
return nil
}

err := c.database.Close()
if err != nil {
return fmt.Errorf("error while closing connection to Clickhouse peer: %w", err)
}
return nil
}

func (c *ClickhouseConnector) ConnectionActive() error {
if c == nil || c.database == nil {
return fmt.Errorf("ClickhouseConnector is nil")
}

// This also checks if database exists
err := c.database.PingContext(c.ctx)
return err
}
37 changes: 37 additions & 0 deletions flow/connectors/clickhouse/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package connclickhouse

import (
"context"
"fmt"

peersql "github.com/PeerDB-io/peer-flow/connectors/sql"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jmoiron/sqlx"
)

type ClickhouseClient struct {
peersql.GenericSQLQueryExecutor
// ctx is the context.
ctx context.Context
// config is the Snowflake config.
Config *protos.ClickhouseConfig
}

func NewClickhouseClient(ctx context.Context, config *protos.ClickhouseConfig) (*ClickhouseClient, error) {
databaseSql, err := connect(ctx, config)
database := sqlx.NewDb(databaseSql, "clickhouse") // Use the appropriate driver name

if err != nil {
return nil, fmt.Errorf("failed to open connection to Snowflake peer: %w", err)
}

genericExecutor := *peersql.NewGenericSQLQueryExecutor(
ctx, database, clickhouseTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap)

return &ClickhouseClient{
GenericSQLQueryExecutor: genericExecutor,
ctx: ctx,
Config: config,
}, nil
}
197 changes: 197 additions & 0 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package connclickhouse

import (
"database/sql"
"fmt"
"log/slog"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"google.golang.org/protobuf/encoding/protojson"
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"

func (c *ClickhouseConnector) SyncQRepRecords(
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
flowLog := slog.Group("sync_metadata",
slog.String(string(shared.PartitionIDKey), partition.PartitionId),
slog.String("destinationTable", destTable),
)

done, err := c.isPartitionSynced(partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

tblSchema, err := c.getTableSchema(destTable)
if err != nil {
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
}
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)

avroSync := NewClickhouseAvroSyncMethod(config, c)

return avroSync.SyncQRepRecords(config, partition, tblSchema, stream)
}

func (c *ClickhouseConnector) createMetadataInsertStatement(
partition *protos.QRepPartition,
jobName string,
startTime time.Time,
) (string, error) {
// marshal the partition to json using protojson
pbytes, err := protojson.Marshal(partition)
if err != nil {
return "", fmt.Errorf("failed to marshal partition to json: %v", err)
}

// convert the bytes to string
partitionJSON := string(pbytes)

insertMetadataStmt := fmt.Sprintf(
`INSERT INTO %s
(flowJobName, partitionID, syncPartition, syncStartTime, syncFinishTime)
VALUES ('%s', '%s', '%s', '%s', NOW());`,
qRepMetadataTableName, jobName, partition.PartitionId,
partitionJSON, startTime.Format("2006-01-02 15:04:05.000000"))

return insertMetadataStmt, nil
}

func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) {
//nolint:gosec
queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName)
rows, err := c.database.Query(queryString)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()

columnTypes, err := rows.ColumnTypes()
if err != nil {
return nil, fmt.Errorf("failed to get column types: %w", err)
}

return columnTypes, nil
}

func (c *ClickhouseConnector) isPartitionSynced(partitionID string) (bool, error) {
//nolint:gosec
queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, partitionID)

row := c.database.QueryRow(queryString)

var count int
if err := row.Scan(&count); err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
return count > 0, nil
}

func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
err := c.createQRepMetadataTable() //(createMetadataTablesTx)
if err != nil {
return err
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.database.Exec(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

return nil
}

func (c *ClickhouseConnector) createQRepMetadataTable() error { // createMetadataTableTx *sql.Tx
// Define the schema
schemaStatement := `
CREATE TABLE IF NOT EXISTS %s (
flowJobName String,
partitionID String,
syncPartition String,
syncStartTime DateTime64,
syncFinishTime DateTime64
) ENGINE = MergeTree()
ORDER BY partitionID;
`
queryString := fmt.Sprintf(schemaStatement, qRepMetadataTableName)
_, err := c.database.Exec(queryString)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to create table %s", qRepMetadataTableName),
slog.Any("error", err))

return fmt.Errorf("failed to create table %s: %w", qRepMetadataTableName, err)
}
c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName))
return nil
}

func (c *ClickhouseConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error {
c.logger.Info("Consolidating partitions noop")
return nil
}

// CleanupQRepFlow function for clickhouse connector
func (c *ClickhouseConnector) CleanupQRepFlow(config *protos.QRepConfig) error {
c.logger.Info("Cleaning up flow job")
return c.dropStage(config.StagingPath, config.FlowJobName)
}

// dropStage drops the stage for the given job.
func (c *ClickhouseConnector) dropStage(stagingPath string, job string) error {
// if s3 we need to delete the contents of the bucket
if strings.HasPrefix(stagingPath, "s3://") {
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
c.logger.Error("failed to create S3 bucket and prefix", slog.Any("error", err))
return fmt.Errorf("failed to create S3 bucket and prefix: %w", err)
}

c.logger.Info(fmt.Sprintf("Deleting contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job))

// deleting the contents of the bucket with prefix
s3svc, err := utils.CreateS3Client(utils.S3PeerCredentials{})
if err != nil {
c.logger.Error("failed to create S3 client", slog.Any("error", err))
return fmt.Errorf("failed to create S3 client: %w", err)
}

// Create a list of all objects with the defined prefix in the bucket
iter := s3manager.NewDeleteListIterator(s3svc, &s3.ListObjectsInput{
Bucket: aws.String(s3o.Bucket),
Prefix: aws.String(fmt.Sprintf("%s/%s", s3o.Prefix, job)),
})

// Iterate through the objects in the bucket with the prefix and delete them
s3Client := s3manager.NewBatchDeleteWithClient(s3svc)
if err := s3Client.Delete(aws.BackgroundContext(), iter); err != nil {
c.logger.Error("failed to delete objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to delete objects from bucket: %w", err)
}

c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job))
}

c.logger.Info(fmt.Sprintf("Dropped stage %s", stagingPath))
return nil
}
Loading

0 comments on commit a0a663f

Please sign in to comment.