Skip to content

Commit

Permalink
add generation of normalize schema (#1097)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored and pankaj-peerdb committed Jan 21, 2024
1 parent 1b6e473 commit ccbfe9b
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 142 deletions.
145 changes: 3 additions & 142 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"log/slog"
"regexp"
"strings"
"sync/atomic"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pgx/v5/pgtype"
)

const (
Expand Down Expand Up @@ -364,38 +365,6 @@ func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error {
return nil
}

func (c *ClickhouseConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
normalizedSchemaTable, err := utils.ParseSchemaTable(tableIdentifier)
if err != nil {
return nil, fmt.Errorf("error while parsing table schema and name: %w", err)
}
tableAlreadyExists, err := c.checkIfTableExists(normalizedSchemaTable.Schema, normalizedSchemaTable.Table)
if err != nil {
return nil, fmt.Errorf("error occurred while checking if normalized table exists: %w", err)
}
if tableAlreadyExists {
tableExistsMapping[tableIdentifier] = true
continue
}

normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable(
normalizedSchemaTable, tableSchema, req.SoftDeleteColName, req.SyncedAtColName)
_, err = c.database.ExecContext(c.ctx, normalizedTableCreateSQL)
if err != nil {
return nil, fmt.Errorf("[sf] error while creating normalized table: %w", err)
}
tableExistsMapping[tableIdentifier] = false
}

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
}

// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string,
Expand Down Expand Up @@ -450,111 +419,3 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string,

return nil
}

func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, err
}
// normalize has caught up with sync, chill until more records are loaded.
if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}

jobMetadataExists, err := c.jobMetadataExists(req.FlowJobName)
if err != nil {
return nil, err
}
// sync hasn't created job metadata yet, chill.
if !jobMetadataExists {
return &model.NormalizeResponse{
Done: false,
}, nil
}
destinationTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
)
if err != nil {
return nil, err
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)
if err != nil {
return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err)
}

var totalRowsAffected int64 = 0
g, gCtx := errgroup.WithContext(c.ctx)
g.SetLimit(8) // limit parallel merges to 8

for _, destinationTableName := range destinationTableNames {
tableName := destinationTableName // local variable for the closure

g.Go(func() error {
mergeGen := &mergeStmtGenerator{
rawTableName: getRawTableIdentifier(req.FlowJobName),
dstTableName: tableName,
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDelete: req.SoftDelete,
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
},
}
mergeStatement, err := mergeGen.generateMergeStmt()
if err != nil {
return err
}

startTime := time.Now()
c.logger.Info("[merge] merging records...", slog.String("destTable", tableName))

result, err := c.database.ExecContext(gCtx, mergeStatement, tableName)
if err != nil {
return fmt.Errorf("failed to merge records into %s (statement: %s): %w",
tableName, mergeStatement, err)
}

endTime := time.Now()
c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds",
tableName, endTime.Sub(startTime)/time.Second))
if err != nil {
c.logger.Error("[merge] error while normalizing records", slog.Any("error", err))
return err
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err)
}

atomic.AddInt64(&totalRowsAffected, rowsAffected)
return nil
})
}

if err := g.Wait(); err != nil {
return nil, fmt.Errorf("error while normalizing records: %w", err)
}

// updating metadata with new normalizeBatchID
err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID)
if err != nil {
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}
219 changes: 219 additions & 0 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package connclickhouse

import (
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"golang.org/x/sync/errgroup"
)

const (
signColName = "_peerdb_sign"
signColType = "Int8"
versionColName = "_peerdb_version"
versionColType = "Int8"
)

func (c *ClickhouseConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
normalizedSchemaTable, err := utils.ParseSchemaTable(tableIdentifier)
if err != nil {
return nil, fmt.Errorf("error while parsing table schema and name: %w", err)
}
tableAlreadyExists, err := c.checkIfTableExists(normalizedSchemaTable.Schema, normalizedSchemaTable.Table)
if err != nil {
return nil, fmt.Errorf("error occurred while checking if normalized table exists: %w", err)
}
if tableAlreadyExists {
tableExistsMapping[tableIdentifier] = true
continue
}

normalizedTableCreateSQL, err := generateCreateTableSQLForNormalizedTable(
normalizedSchemaTable,
tableSchema,
req.SoftDeleteColName,
req.SyncedAtColName,
)
if err != nil {
return nil, fmt.Errorf("error while generating create table sql for normalized table: %w", err)
}

_, err = c.database.ExecContext(c.ctx, normalizedTableCreateSQL)
if err != nil {
return nil, fmt.Errorf("[sf] error while creating normalized table: %w", err)
}
tableExistsMapping[tableIdentifier] = false
}

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
}

func generateCreateTableSQLForNormalizedTable(
normalizedSchemaTable *utils.SchemaTable,
tableSchema *protos.TableSchema,
softDeleteColName string,
syncedAtColName string,
) (string, error) {
var stmtBuilder strings.Builder
stmtBuilder.WriteString(fmt.Sprintf("CREATE TABLE `%s`.`%s` (", normalizedSchemaTable.Schema, normalizedSchemaTable.Table))

nc := len(tableSchema.ColumnNames)
for i := 0; i < nc; i++ {
colName := tableSchema.ColumnNames[i]
colType := qvalue.QValueKind(tableSchema.ColumnTypes[i])
clickhouseType, err := qValueKindToClickhouseType(colType)
if err != nil {
return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType))
}

// TODO support soft delete

// synced at column will be added to all normalized tables
if syncedAtColName != "" {
stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", syncedAtColName, "DateTime64(9)"))
}

// add sign and version columns
stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", signColName, signColType))
stmtBuilder.WriteString(fmt.Sprintf("`%s` %s", versionColName, versionColType))

stmtBuilder.WriteString(fmt.Sprintf(") ENGINE = ReplacingMergeTree(`%s`) ", versionColName))

pkeys := tableSchema.PrimaryKeyColumns
if len(pkeys) > 0 {
pkeyStr := strings.Join(pkeys, ",")

stmtBuilder.WriteString("PRIMARY KEY (")
stmtBuilder.WriteString(pkeyStr)
stmtBuilder.WriteString(") ")

stmtBuilder.WriteString("ORDER BY (")
stmtBuilder.WriteString(pkeyStr)
stmtBuilder.WriteString(")")
}

return stmtBuilder.String(), nil
}

func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName)
if err != nil {
return nil, err
}
// normalize has caught up with sync, chill until more records are loaded.
if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID {
return &model.NormalizeResponse{
Done: false,
StartBatchID: batchIDs.NormalizeBatchID,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}

jobMetadataExists, err := c.jobMetadataExists(req.FlowJobName)
if err != nil {
return nil, err
}
// sync hasn't created job metadata yet, chill.
if !jobMetadataExists {
return &model.NormalizeResponse{
Done: false,
}, nil
}
destinationTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
)
if err != nil {
return nil, err
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)
if err != nil {
return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err)
}

var totalRowsAffected int64 = 0
g, gCtx := errgroup.WithContext(c.ctx)
g.SetLimit(8) // limit parallel merges to 8

for _, destinationTableName := range destinationTableNames {
tableName := destinationTableName // local variable for the closure

g.Go(func() error {
mergeGen := &mergeStmtGenerator{
rawTableName: getRawTableIdentifier(req.FlowJobName),
dstTableName: tableName,
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDelete: req.SoftDelete,
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
},
}
mergeStatement, err := mergeGen.generateMergeStmt()
if err != nil {
return err
}

startTime := time.Now()
c.logger.Info("[merge] merging records...", slog.String("destTable", tableName))

result, err := c.database.ExecContext(gCtx, mergeStatement, tableName)
if err != nil {
return fmt.Errorf("failed to merge records into %s (statement: %s): %w",
tableName, mergeStatement, err)
}

endTime := time.Now()
c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds",
tableName, endTime.Sub(startTime)/time.Second))
if err != nil {
c.logger.Error("[merge] error while normalizing records", slog.Any("error", err))
return err
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err)
}

atomic.AddInt64(&totalRowsAffected, rowsAffected)
return nil
})
}

if err := g.Wait(); err != nil {
return nil, fmt.Errorf("error while normalizing records: %w", err)
}

// updating metadata with new normalizeBatchID
err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID)
if err != nil {
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
}, nil
}
Loading

0 comments on commit ccbfe9b

Please sign in to comment.