diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index d5357c9a1..fb221096c 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -12,6 +12,9 @@ import ( "strings" "time" + "github.com/ClickHouse/clickhouse-go/v2" + "golang.org/x/sync/errgroup" + "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -262,8 +265,7 @@ func (c *ClickHouseConnector) NormalizeRecords( }, nil } - err = c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID) - if err != nil { + if err := c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID); err != nil { return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err) } @@ -278,9 +280,48 @@ func (c *ClickHouseConnector) NormalizeRecords( return nil, err } + enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env) + if err != nil { + return nil, err + } + + parallelNormalize, err := peerdbenv.PeerDBClickHouseParallelNormalize(ctx, req.Env) + if err != nil { + return nil, err + } + parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames)) + if parallelNormalize > 1 { + c.logger.Info("normalizing in parallel", slog.Int("connections", parallelNormalize)) + } + + queries := make(chan string) rawTbl := c.getRawTableName(req.FlowJobName) - // model the raw table data as inserts. + group, errCtx := errgroup.WithContext(ctx) + for i := range parallelNormalize { + group.Go(func() error { + var chConn clickhouse.Conn + if i == 0 { + chConn = c.database + } else { + var err error + chConn, err = Connect(errCtx, req.Env, c.config) + if err != nil { + return err + } + defer chConn.Close() + } + + for query := range queries { + c.logger.Info("normalizing batch", slog.String("query", query)) + if err := chConn.Exec(errCtx, query); err != nil { + return fmt.Errorf("error while inserting into normalized table: %w", err) + } + } + return nil + }) + } + for _, tbl := range destinationTableNames { // SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id selectQuery := strings.Builder{} @@ -299,11 +340,6 @@ func (c *ClickHouseConnector) NormalizeRecords( } } - enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env) - if err != nil { - return nil, err - } - projection := strings.Builder{} projectionUpdate := strings.Builder{} @@ -338,6 +374,7 @@ func (c *ClickHouseConnector) NormalizeRecords( var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) if err != nil { + close(queries) return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) } } @@ -433,15 +470,19 @@ func (c *ClickHouseConnector) NormalizeRecords( insertIntoSelectQuery.WriteString(colSelector.String()) insertIntoSelectQuery.WriteString(selectQuery.String()) - q := insertIntoSelectQuery.String() - - if err := c.execWithLogging(ctx, q); err != nil { - return nil, fmt.Errorf("error while inserting into normalized table: %w", err) + select { + case queries <- insertIntoSelectQuery.String(): + case <-errCtx.Done(): + close(queries) + return nil, ctx.Err() } } + close(queries) + if err := group.Wait(); err != nil { + return nil, err + } - err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID) - if err != nil { + if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID); err != nil { c.logger.Error("[clickhouse] error while updating normalize batch id", slog.Int64("BatchID", req.SyncBatchID), slog.Any("error", err)) return nil, err } @@ -510,8 +551,7 @@ func (c *ClickHouseConnector) copyAvroStagesToDestination( ctx context.Context, flowJobName string, normBatchID, syncBatchID int64, ) error { for s := normBatchID + 1; s <= syncBatchID; s++ { - err := c.copyAvroStageToDestination(ctx, flowJobName, s) - if err != nil { + if err := c.copyAvroStageToDestination(ctx, flowJobName, s); err != nil { return fmt.Errorf("failed to copy avro stage to destination: %w", err) } } diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index f3c2de097..f149bf37b 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -180,6 +180,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, + { + Name: "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE", + Description: "Divide tables in batch into N insert selects. Helps distribute load to multiple nodes", + DefaultValue: "0", + ValueType: protos.DynconfValueType_INT, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES", Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely", @@ -256,8 +264,8 @@ func dynamicConfSigned[T constraints.Signed](ctx context.Context, env map[string return strconv.ParseInt(value, 10, 64) }) if err != nil { - shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.Any("error", err)) - return 0, fmt.Errorf("failed to parse as int64: %w", err) + shared.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.String("key", key), slog.Any("error", err)) + return 0, fmt.Errorf("failed to parse %s as int64: %w", key, err) } return T(value), nil @@ -268,8 +276,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st return strconv.ParseUint(value, 10, 64) }) if err != nil { - shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.Any("error", err)) - return 0, fmt.Errorf("failed to parse as uint64: %w", err) + shared.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.String("key", key), slog.Any("error", err)) + return 0, fmt.Errorf("failed to parse %s as uint64: %w", key, err) } return T(value), nil @@ -278,8 +286,8 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st func dynamicConfBool(ctx context.Context, env map[string]string, key string) (bool, error) { value, err := dynLookupConvert(ctx, env, key, strconv.ParseBool) if err != nil { - shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.Any("error", err)) - return false, fmt.Errorf("failed to parse bool: %w", err) + shared.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.String("key", key), slog.Any("error", err)) + return false, fmt.Errorf("failed to parse %s as bool: %w", key, err) } return value, nil @@ -374,6 +382,10 @@ func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS") } +func PeerDBClickHouseParallelNormalize(ctx context.Context, env map[string]string) (int, error) { + return dynamicConfSigned[int](ctx, env, "PEERDB_CLICKHOUSE_PARALLEL_NORMALIZE") +} + func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) { return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM") }