diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index d65d61e9d..2c9796fa1 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -3,9 +3,8 @@ package connclickhouse import ( "cmp" "context" - "database/sql" - "errors" "fmt" + "log/slog" "slices" "strconv" "strings" @@ -266,22 +265,78 @@ func (c *ClickHouseConnector) NormalizeRecords( return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err) } - destinationTableNames, err := c.getDistinctTableNamesInBatch( - ctx, - req.FlowJobName, - req.SyncBatchID, - normBatchID, - ) + rawTbl := c.getRawTableName(req.FlowJobName) + distinctTableNamesBatchMapping, err := c.getDistinctTableNamesInBatchRange( + ctx, req.FlowJobName, req.SyncBatchID, normBatchID) if err != nil { - c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err) - return nil, err + return nil, fmt.Errorf("failed to get distinct table names in batch range: %w", err) } - rawTbl := c.getRawTableName(req.FlowJobName) + for batchID := normBatchID + 1; batchID <= req.SyncBatchID; batchID++ { + if err := c.syncTablesInThisBatch(ctx, req, rawTbl, batchID, distinctTableNamesBatchMapping[batchID]); err != nil { + c.logger.Error("[clickhouse] error while syncing tables in this batch", slog.Any("error", err), + slog.Int64("batchID", batchID)) + return nil, err + } + + if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchID); err != nil { + c.logger.Error("[clickhouse] error while updating normalize batch id", + slog.Any("error", err), + slog.Int64("batchID", batchID)) + return nil, err + } + } + + return &model.NormalizeResponse{ + Done: true, + StartBatchID: normBatchID + 1, + EndBatchID: req.SyncBatchID, + }, nil +} + +func (c *ClickHouseConnector) getDistinctTableNamesInBatchRange( + ctx context.Context, + flowJobName string, + syncBatchID int64, + normalizeBatchID int64, +) (map[int64][]string, error) { + rawTbl := c.getRawTableName(flowJobName) + + q := fmt.Sprintf( + `SELECT DISTINCT _peerdb_batch_id,groupArray(DISTINCT _peerdb_destination_table_name) + FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d GROUP BY _peerdb_batch_id`, + rawTbl, normalizeBatchID, syncBatchID) + rows, err := c.query(ctx, q) + if err != nil { + return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) + } + defer rows.Close() + distinctTableNamesBatchMapping := make(map[int64][]string) + for rows.Next() { + var batchID int32 + var tableNames []string + if err := rows.Scan(&batchID, &tableNames); err != nil { + return nil, fmt.Errorf("error while scanning rows: %w", err) + } + distinctTableNamesBatchMapping[int64(batchID)] = tableNames + } + if rows.Err() != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) + } + + return distinctTableNamesBatchMapping, nil +} + +func (c *ClickHouseConnector) syncTablesInThisBatch( + ctx context.Context, + req *model.NormalizeRecordsRequest, + rawTableName string, + batchID int64, + destinationTableNames []string, +) error { // model the raw table data as inserts. for _, tbl := range destinationTableNames { - // SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id selectQuery := strings.Builder{} selectQuery.WriteString("SELECT ") @@ -300,7 +355,7 @@ func (c *ClickHouseConnector) NormalizeRecords( enablePrimaryUpdate, err := peerdbenv.PeerDBEnableClickHousePrimaryUpdate(ctx, req.Env) if err != nil { - return nil, err + return err } projection := strings.Builder{} @@ -337,7 +392,7 @@ func (c *ClickHouseConnector) NormalizeRecords( var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) if err != nil { - return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) + return fmt.Errorf("error while converting column type to clickhouse type: %w", err) } } if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() { @@ -396,11 +451,9 @@ func (c *ClickHouseConnector) NormalizeRecords( selectQuery.WriteString(projection.String()) selectQuery.WriteString(" FROM ") - selectQuery.WriteString(rawTbl) - selectQuery.WriteString(" WHERE _peerdb_batch_id > ") - selectQuery.WriteString(strconv.FormatInt(normBatchID, 10)) - selectQuery.WriteString(" AND _peerdb_batch_id <= ") - selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10)) + selectQuery.WriteString(rawTableName) + selectQuery.WriteString(" WHERE _peerdb_batch_id = ") + selectQuery.WriteString(strconv.FormatInt(batchID, 10)) selectQuery.WriteString(" AND _peerdb_destination_table_name = '") selectQuery.WriteString(tbl) selectQuery.WriteString("'") @@ -415,11 +468,9 @@ func (c *ClickHouseConnector) NormalizeRecords( selectQuery.WriteString("UNION ALL SELECT ") selectQuery.WriteString(projectionUpdate.String()) selectQuery.WriteString(" FROM ") - selectQuery.WriteString(rawTbl) - selectQuery.WriteString(" WHERE _peerdb_batch_id > ") - selectQuery.WriteString(strconv.FormatInt(normBatchID, 10)) - selectQuery.WriteString(" AND _peerdb_batch_id <= ") - selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10)) + selectQuery.WriteString(rawTableName) + selectQuery.WriteString(" WHERE _peerdb_batch_id = ") + selectQuery.WriteString(strconv.FormatInt(batchID, 10)) selectQuery.WriteString(" AND _peerdb_destination_table_name = '") selectQuery.WriteString(tbl) selectQuery.WriteString("' AND _peerdb_record_type = 1") @@ -435,60 +486,11 @@ func (c *ClickHouseConnector) NormalizeRecords( q := insertIntoSelectQuery.String() if err := c.execWithLogging(ctx, q); err != nil { - return nil, fmt.Errorf("error while inserting into normalized table: %w", err) + return fmt.Errorf("error while inserting into normalized table: %w", err) } } - err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID) - if err != nil { - c.logger.Error("[clickhouse] error while updating normalize batch id", "error", err) - return nil, err - } - - return &model.NormalizeResponse{ - Done: true, - StartBatchID: normBatchID + 1, - EndBatchID: req.SyncBatchID, - }, nil -} - -func (c *ClickHouseConnector) getDistinctTableNamesInBatch( - ctx context.Context, - flowJobName string, - syncBatchID int64, - normalizeBatchID int64, -) ([]string, error) { - rawTbl := c.getRawTableName(flowJobName) - - q := fmt.Sprintf( - `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`, - rawTbl, normalizeBatchID, syncBatchID) - - rows, err := c.query(ctx, q) - if err != nil { - return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) - } - defer rows.Close() - var tableNames []string - for rows.Next() { - var tableName sql.NullString - err = rows.Scan(&tableName) - if err != nil { - return nil, fmt.Errorf("error while scanning table name: %w", err) - } - - if !tableName.Valid { - return nil, errors.New("table name is not valid") - } - - tableNames = append(tableNames, tableName.String) - } - - if rows.Err() != nil { - return nil, fmt.Errorf("failed to read rows: %w", err) - } - - return tableNames, nil + return nil } func (c *ClickHouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error { diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 515b622ee..566bf4c4c 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -176,7 +176,9 @@ func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName s `UPDATE `+lastSyncStateTableName+ ` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID) if err != nil { - p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) + p.logger.Error("failed to update normalize batch id", + slog.Any("error", err), + slog.Int64("batchID", batchID)) return err }