Skip to content

Commit

Permalink
ClickHouse: Normalize one batch at a time (#2219)
Browse files Browse the repository at this point in the history
This helps with:
- Memory utilisation on ClickHouse side
- Observability
- Debuggability

---------

Co-authored-by: Kevin Biju <[email protected]>
Co-authored-by: Kevin Biju <[email protected]>
  • Loading branch information
3 people authored Nov 12, 2024
1 parent e3168da commit cea742d
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 76 deletions.
152 changes: 77 additions & 75 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package connclickhouse
import (
"cmp"
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -266,22 +265,78 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
)
rawTbl := c.getRawTableName(req.FlowJobName)
distinctTableNamesBatchMapping, err := c.getDistinctTableNamesInBatchRange(
ctx, req.FlowJobName, req.SyncBatchID, normBatchID)
if err != nil {
c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err)
return nil, err
return nil, fmt.Errorf("failed to get distinct table names in batch range: %w", err)
}

rawTbl := c.getRawTableName(req.FlowJobName)
for batchID := normBatchID + 1; batchID <= req.SyncBatchID; batchID++ {
if err := c.syncTablesInThisBatch(ctx, req, rawTbl, batchID, distinctTableNamesBatchMapping[batchID]); err != nil {
c.logger.Error("[clickhouse] error while syncing tables in this batch", slog.Any("error", err),
slog.Int64("batchID", batchID))
return nil, err
}

if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchID); err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id",
slog.Any("error", err),
slog.Int64("batchID", batchID))
return nil, err
}
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

func (c *ClickHouseConnector) getDistinctTableNamesInBatchRange(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
) (map[int64][]string, error) {
rawTbl := c.getRawTableName(flowJobName)

q := fmt.Sprintf(
`SELECT DISTINCT _peerdb_batch_id,groupArray(DISTINCT _peerdb_destination_table_name)
FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d GROUP BY _peerdb_batch_id`,
rawTbl, normalizeBatchID, syncBatchID)

rows, err := c.query(ctx, q)
if err != nil {
return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err)
}
defer rows.Close()
distinctTableNamesBatchMapping := make(map[int64][]string)
for rows.Next() {
var batchID int32
var tableNames []string
if err := rows.Scan(&batchID, &tableNames); err != nil {
return nil, fmt.Errorf("error while scanning rows: %w", err)
}
distinctTableNamesBatchMapping[int64(batchID)] = tableNames
}
if rows.Err() != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

return distinctTableNamesBatchMapping, nil
}

func (c *ClickHouseConnector) syncTablesInThisBatch(
ctx context.Context,
req *model.NormalizeRecordsRequest,
rawTableName string,
batchID int64,
destinationTableNames []string,
) error {
// model the raw table data as inserts.
for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
selectQuery := strings.Builder{}
selectQuery.WriteString("SELECT ")

Expand All @@ -300,7 +355,7 @@ func (c *ClickHouseConnector) NormalizeRecords(

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
return err
}

projection := strings.Builder{}
Expand Down Expand Up @@ -337,7 +392,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
return fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
}
if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
Expand Down Expand Up @@ -396,11 +451,9 @@ func (c *ClickHouseConnector) NormalizeRecords(

selectQuery.WriteString(projection.String())
selectQuery.WriteString(" FROM ")
selectQuery.WriteString(rawTbl)
selectQuery.WriteString(" WHERE _peerdb_batch_id > ")
selectQuery.WriteString(strconv.FormatInt(normBatchID, 10))
selectQuery.WriteString(" AND _peerdb_batch_id <= ")
selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10))
selectQuery.WriteString(rawTableName)
selectQuery.WriteString(" WHERE _peerdb_batch_id = ")
selectQuery.WriteString(strconv.FormatInt(batchID, 10))
selectQuery.WriteString(" AND _peerdb_destination_table_name = '")
selectQuery.WriteString(tbl)
selectQuery.WriteString("'")
Expand All @@ -415,11 +468,9 @@ func (c *ClickHouseConnector) NormalizeRecords(
selectQuery.WriteString("UNION ALL SELECT ")
selectQuery.WriteString(projectionUpdate.String())
selectQuery.WriteString(" FROM ")
selectQuery.WriteString(rawTbl)
selectQuery.WriteString(" WHERE _peerdb_batch_id > ")
selectQuery.WriteString(strconv.FormatInt(normBatchID, 10))
selectQuery.WriteString(" AND _peerdb_batch_id <= ")
selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10))
selectQuery.WriteString(rawTableName)
selectQuery.WriteString(" WHERE _peerdb_batch_id = ")
selectQuery.WriteString(strconv.FormatInt(batchID, 10))
selectQuery.WriteString(" AND _peerdb_destination_table_name = '")
selectQuery.WriteString(tbl)
selectQuery.WriteString("' AND _peerdb_record_type = 1")
Expand All @@ -435,60 +486,11 @@ func (c *ClickHouseConnector) NormalizeRecords(
q := insertIntoSelectQuery.String()

if err := c.execWithLogging(ctx, q); err != nil {
return nil, fmt.Errorf("error while inserting into normalized table: %w", err)
return fmt.Errorf("error while inserting into normalized table: %w", err)
}
}

err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id", "error", err)
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

func (c *ClickHouseConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
) ([]string, error) {
rawTbl := c.getRawTableName(flowJobName)

q := fmt.Sprintf(
`SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`,
rawTbl, normalizeBatchID, syncBatchID)

rows, err := c.query(ctx, q)
if err != nil {
return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err)
}
defer rows.Close()
var tableNames []string
for rows.Next() {
var tableName sql.NullString
err = rows.Scan(&tableName)
if err != nil {
return nil, fmt.Errorf("error while scanning table name: %w", err)
}

if !tableName.Valid {
return nil, errors.New("table name is not valid")
}

tableNames = append(tableNames, tableName.String)
}

if rows.Err() != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

return tableNames, nil
return nil
}

func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error {
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName s
`UPDATE `+lastSyncStateTableName+
` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID)
if err != nil {
p.logger.Error("failed to update normalize batch id", slog.Any("error", err))
p.logger.Error("failed to update normalize batch id",
slog.Any("error", err),
slog.Int64("batchID", batchID))
return err
}

Expand Down

0 comments on commit cea742d

Please sign in to comment.