Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clickhouse cdc #1096

Merged
merged 36 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e112156
initial cdc code
pankaj-peerdb Jan 18, 2024
9a0e5a0
add generation of normalize schema (#1097)
iskakaushik Jan 18, 2024
db0e0d5
use catalog for metadata
pankaj-peerdb Jan 18, 2024
4718337
more changes
pankaj-peerdb Jan 19, 2024
d9cd595
data going in raw table
pankaj-peerdb Jan 22, 2024
f4620ba
fix normalize errors
iskakaushik Jan 22, 2024
f620e37
more normalize code
iskakaushik Jan 22, 2024
0f2db07
basic sketch of normalize
iskakaushik Jan 22, 2024
f3ab034
fix schemaless ness
iskakaushik Jan 22, 2024
9c9a0ff
fix table exists check
iskakaushik Jan 22, 2024
1f8d06f
more fixes
iskakaushik Jan 22, 2024
a697232
sign and version columns
iskakaushik Jan 22, 2024
8124e58
add other col values
iskakaushik Jan 22, 2024
bb8ca22
lweor case
iskakaushik Jan 22, 2024
b0a7265
hardcode staging
pankaj-peerdb Jan 22, 2024
b1b2ab1
fix stagingPath
pankaj-peerdb Jan 25, 2024
3c05b8e
cleanup
pankaj-peerdb Jan 25, 2024
eaaf63d
cleanup
pankaj-peerdb Jan 26, 2024
4abf02c
fix after cleanup
pankaj-peerdb Jan 28, 2024
3b21db0
fix after reabase
pankaj-peerdb Jan 28, 2024
2de2c4e
fix after rebase
pankaj-peerdb Jan 28, 2024
fb598ac
fix s3 url formation issue
pankaj-peerdb Jan 29, 2024
ad208ce
restore docker
pankaj-peerdb Jan 29, 2024
a51dbf3
lint
pankaj-peerdb Jan 29, 2024
80dc51b
remove unused code
pankaj-peerdb Jan 29, 2024
d97f69a
fix build issue
pankaj-peerdb Jan 29, 2024
56ebce6
fix build issue
pankaj-peerdb Jan 29, 2024
553830f
fix build issue
pankaj-peerdb Jan 29, 2024
083a445
fix flow build
pankaj-peerdb Jan 29, 2024
08f822b
fix lint error in normalize.go
pankaj-peerdb Jan 29, 2024
8173def
fix more lint errors
pankaj-peerdb Jan 29, 2024
d6f89fc
prettier on ui
pankaj-peerdb Jan 29, 2024
9b9e57e
fix build after rebase
pankaj-peerdb Jan 29, 2024
b7d63a9
add newline back
pankaj-peerdb Jan 29, 2024
7566de4
ui prettier
pankaj-peerdb Jan 29, 2024
152b8c8
ts config new line
pankaj-peerdb Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableMappings: input.FlowConnectionConfigs.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
res.RelationMessageMapping = input.RelationMessageMapping
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = input.RelationMessageMapping

err = errGroup.Wait()
if err != nil {
Expand Down
198 changes: 198 additions & 0 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package connclickhouse

import (
"database/sql"
"fmt"
"log/slog"
"regexp"
"strings"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
)

const (
checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = ? AND name = ?) AS table_exists;`
mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS"
)

// getRawTableName returns the raw table name for the given table identifier.
func (c *ClickhouseConnector) getRawTableName(flowJobName string) string {
// replace all non-alphanumeric characters with _
flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_")
return fmt.Sprintf("_peerdb_raw_%s", flowJobName)
}

func (c *ClickhouseConnector) checkIfTableExists(databaseName string, tableIdentifier string) (bool, error) {
var result sql.NullInt32
err := c.database.QueryRowContext(c.ctx, checkIfTableExistsSQL, databaseName, tableIdentifier).Scan(&result)
if err != nil {
return false, fmt.Errorf("error while reading result row: %w", err)
}

if !result.Valid {
return false, fmt.Errorf("[clickhouse] checkIfTableExists: result is not valid")
}

return result.Int32 == 1, nil
}

type MirrorJobRow struct {
MirrorJobName string
Offset int
SyncBatchID int
NormalizeBatchID int
}

func (c *ClickhouseConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

createRawTableSQL := `CREATE TABLE IF NOT EXISTS %s (
_peerdb_uid String NOT NULL,
_peerdb_timestamp Int64 NOT NULL,
_peerdb_destination_table_name String NOT NULL,
_peerdb_data String NOT NULL,
_peerdb_record_type Int NOT NULL,
_peerdb_match_data String,
_peerdb_batch_id Int,
_peerdb_unchanged_toast_columns String
) ENGINE = ReplacingMergeTree ORDER BY _peerdb_uid;`

_, err := c.database.ExecContext(c.ctx,
fmt.Sprintf(createRawTableSQL, rawTableName))
if err != nil {
return nil, fmt.Errorf("unable to create raw table: %w", err)
}
return &protos.CreateRawTableOutput{
TableIdentifier: rawTableName,
}, nil
}

func (c *ClickhouseConnector) syncRecordsViaAvro(
req *model.SyncRecordsRequest,
rawTableIdentifier string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.config.S3Integration,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c)
destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier)
if err != nil {
return nil, err
}

numRecords, err := avroSyncer.SyncRecords(destinationTableSchema, streamRes.Stream, req.FlowJobName)
if err != nil {
return nil, err
}

err = c.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, err
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)
c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName))

res, err := c.syncRecordsViaAvro(req, rawTableName, req.SyncBatchID)
if err != nil {
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err))
return nil, err
}
err = c.pgMetadata.IncrementID(req.FlowJobName)
if err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
}

return res, nil
}

func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return err
}
return nil
}

// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

// external
func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
}

func (c *ClickhouseConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
}

func (c *ClickhouseConnector) GetLastSyncBatchID(jobName string) (int64, error) {
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *ClickhouseConnector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
func (c *ClickhouseConnector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
return err
}

return nil
}
13 changes: 13 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseConnector struct {
ctx context.Context
database *sql.DB
pgMetadata *metadataStore.PostgresMetadataStore
tableSchemaMapping map[string]*protos.TableSchema
logger slog.Logger
config *protos.ClickhouseConfig
}

func NewClickhouseConnector(ctx context.Context,
Expand All @@ -28,12 +31,22 @@ func NewClickhouseConnector(ctx context.Context,
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

metadataSchemaName := "peerdb_s3_metadata" // #nosec G101
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx,
clickhouseProtoConfig.GetMetadataDb(), metadataSchemaName)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &ClickhouseConnector{
ctx: ctx,
database: database,
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
config: clickhouseProtoConfig,
}, nil
}

Expand Down
Loading
Loading