Skip to content

Commit

Permalink
PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE (#2256)
Browse files Browse the repository at this point in the history
distributes queries across multiple connections so normalization can be concurrent & potentially distributed across nodes
uses channel since that'll allow unevenly distributed changes (one table may have much more activity than others) even out across connections
  • Loading branch information
serprex authored Nov 14, 2024
1 parent 79732fd commit 8c02a5e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 22 deletions.
72 changes: 56 additions & 16 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"golang.org/x/sync/errgroup"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -262,8 +265,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
}, nil
}

err = c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID)
if err != nil {
if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID); err != nil {
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}

Expand All @@ -278,9 +280,48 @@ func (c *ClickHouseConnector) NormalizeRecords(
return nil, err
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
}

parallelNormalize, err := peerdbenv.PeerDBClickHouseParallelNormalize(ctx, req.Env)
if err != nil {
return nil, err
}
parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames))
if parallelNormalize > 1 {
c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize))
}

queries := make(chan string)
rawTbl := c.getRawTableName(req.FlowJobName)

// model the raw table data as inserts.
group, errCtx := errgroup.WithContext(ctx)
for i := range parallelNormalize {
group.Go(func() error {
var chConn clickhouse.Conn
if i == 0 {
chConn = c.database
} else {
var err error
chConn, err = Connect(errCtx, req.Env, c.config)
if err != nil {
return err
}
defer chConn.Close()
}

for query := range queries {
c.logger.Info("normalizing batch", slog.String("query", query))
if err := chConn.Exec(errCtx, query); err != nil {
return fmt.Errorf("error while inserting into normalized table: %w", err)
}
}
return nil
})
}

for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
selectQuery := strings.Builder{}
Expand All @@ -299,11 +340,6 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
}

enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env)
if err != nil {
return nil, err
}

projection := strings.Builder{}
projectionUpdate := strings.Builder{}

Expand Down Expand Up @@ -338,6 +374,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
}
Expand Down Expand Up @@ -433,15 +470,19 @@ func (c *ClickHouseConnector) NormalizeRecords(
insertIntoSelectQuery.WriteString(colSelector.String())
insertIntoSelectQuery.WriteString(selectQuery.String())

q := insertIntoSelectQuery.String()

if err := c.execWithLogging(ctx, q); err != nil {
return nil, fmt.Errorf("error while inserting into normalized table: %w", err)
select {
case queries <- insertIntoSelectQuery.String():
case <-errCtx.Done():
close(queries)
return nil, ctx.Err()
}
}
close(queries)
if err := group.Wait(); err != nil {
return nil, err
}

err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID); err != nil {
c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err))
return nil, err
}
Expand Down Expand Up @@ -510,8 +551,7 @@ func (c *ClickHouseConnector) copyAvroStagesToDestination(
ctx context.Context, flowJobName string, normBatchID, syncBatchID int64,
) error {
for s := normBatchID + 1; s <= syncBatchID; s++ {
err := c.copyAvroStageToDestination(ctx, flowJobName, s)
if err != nil {
if err := c.copyAvroStageToDestination(ctx, flowJobName, s); err != nil {
return fmt.Errorf("failed to copy avro stage to destination: %w", err)
}
}
Expand Down
24 changes: 18 additions & 6 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE",
Description: "Divide tables in batch into N insert selects. Helps distribute load to multiple nodes",
DefaultValue: "0",
ValueType: protos.DynconfValueType_INT,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES",
Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely",
Expand Down Expand Up @@ -256,8 +264,8 @@ func dynamicConfSigned[T constraints.Signed](ctx context.Context, env map[string
return strconv.ParseInt(value, 10, 64)
})
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.Any("error", err))
return 0, fmt.Errorf("failed to parse as int64: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.String("key", key), slog.Any("error", err))
return 0, fmt.Errorf("failed to parse %s as int64: %w", key, err)
}

return T(value), nil
Expand All @@ -268,8 +276,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st
return strconv.ParseUint(value, 10, 64)
})
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.Any("error", err))
return 0, fmt.Errorf("failed to parse as uint64: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.String("key", key), slog.Any("error", err))
return 0, fmt.Errorf("failed to parse %s as uint64: %w", key, err)
}

return T(value), nil
Expand All @@ -278,8 +286,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st
func dynamicConfBool(ctx context.Context, env map[string]string, key string) (bool, error) {
value, err := dynLookupConvert(ctx, env, key, strconv.ParseBool)
if err != nil {
shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.Any("error", err))
return false, fmt.Errorf("failed to parse bool: %w", err)
shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.String("key", key), slog.Any("error", err))
return false, fmt.Errorf("failed to parse %s as bool: %w", key, err)
}

return value, nil
Expand Down Expand Up @@ -374,6 +382,10 @@ func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string
return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS")
}

func PeerDBClickHouseParallelNormalize(ctx context.Context, env map[string]string) (int, error) {
return dynamicConfSigned[int](ctx, env, "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE")
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM")
}
Expand Down

0 comments on commit 8c02a5e

Please sign in to comment.