diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 8a35caa8a7..49916a6aee 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -66,127 +66,6 @@ func (c *ClickhouseConnector) getMirrorRowByJobNAme(jobName string) (*MirrorJobR return &result, nil } -func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool { - result, err := c.checkIfTableExists(c.config.Database, mirrorJobsTableIdentifier) - if err != nil { - return true - } - return !result -} - -func (c *ClickhouseConnector) SetupMetadataTables() error { - - createMirrorJobsTableSQL := `CREATE TABLE IF NOT EXISTS %s ( - MIRROR_JOB_NAME String NOT NULL, - OFFSET Int32 NOT NULL, - SYNC_BATCH_ID Int32 NOT NULL, - NORMALIZE_BATCH_ID Int32 NOT NULL - ) ENGINE = MergeTree() - ORDER BY MIRROR_JOB_NAME;` - - // NOTE that Clickhouse does not support transactional DDL - //createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil) - // if err != nil { - // return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) - // } - // in case we return after error, ensure transaction is rolled back - // defer func() { - // deferErr := createMetadataTablesTx.Rollback() - // if deferErr != sql.ErrTxDone && deferErr != nil { - // c.logger.Error("error while rolling back transaction for creating metadata tables", - // slog.Any("error", deferErr)) - // } - // }() - - // Not needed as we dont have schema - // err = c.createPeerDBInternalSchema(createMetadataTablesTx) - // if err != nil { - // return err - // } - _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL, mirrorJobsTableIdentifier)) - if err != nil { - return fmt.Errorf("error while setting up mirror jobs table: %w", err) - } - // err = createMetadataTablesTx.Commit() - // if err != nil { - // return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err) - // } - - return nil -} - -func (c *ClickhouseConnector) GetLastOffset(jobName string) (int64, error) { - getLastOffsetSQL := "SELECT OFFSET FROM %s WHERE MIRROR_JOB_NAME=?" - - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, - mirrorJobsTableIdentifier), jobName) - if err != nil { - return 0, fmt.Errorf("error querying Clickhouse peer for last syncedID: %w", err) - } - defer func() { - err = rows.Close() - if err != nil { - c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err)) - } - }() - - if !rows.Next() { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - var result pgtype.Int8 - err = rows.Scan(&result) - if err != nil { - return 0, fmt.Errorf("error while reading result row: %w", err) - } - if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened") - return 0, nil - } - return result.Int64, nil -} - -func (c *ClickhouseConnector) SetLastOffset(jobName string, lastOffset int64) error { - currentRow, err := c.getMirrorRowByJobNAme(jobName) - - if err != nil { - return err - } - - //setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" - setLastOffsetSQL := `INSERT INTO %s - (mirror_job_name, offset, sync_batch_id, normalize_batch_id) - VALUES (?, ?, ?, ?);` - _, err = c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL, - mirrorJobsTableIdentifier), currentRow.MirrorJobName, lastOffset, currentRow.SyncBatchID, currentRow.NormalizeBatchID) - if err != nil { - return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) - } - return nil -} - -func (c *ClickhouseConnector) GetLastSyncBatchID(jobName string) (int64, error) { - getLastSyncBatchID_SQL := "SELECT SYNC_BATCH_ID FROM %s WHERE MIRROR_JOB_NAME=?" - - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, - mirrorJobsTableIdentifier), jobName) - if err != nil { - return 0, fmt.Errorf("error querying Clickhouse peer for last syncBatchId: %w", err) - } - defer rows.Close() - - var result pgtype.Int8 - if !rows.Next() { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - err = rows.Scan(&result) - if err != nil { - return 0, fmt.Errorf("error while reading result row: %w", err) - } - return result.Int64, nil -} - func (c *ClickhouseConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { rawTableName := c.getRawTableName(req.FlowJobName) @@ -245,7 +124,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s", rawTableIdentifier)), } - avroSyncer := NewSnowflakeAvroSyncMethod(qrepConfig, c) + avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c) destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier) if err != nil { return nil, err @@ -419,3 +298,37 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string, return nil } + +// external +func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *ClickhouseConnector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + c.logger.Error("failed to setup metadata tables", slog.Any("error", err)) + return err + } + + return nil +} + +func (c *ClickhouseConnector) GetLastSyncBatchID(jobName string) (int64, error) { + return c.pgMetadata.GetLastBatchID(jobName) +} + +func (c *ClickhouseConnector) GetLastOffset(jobName string) (int64, error) { + return c.pgMetadata.FetchLastOffset(jobName) +} + +// update offset for a job +func (c *ClickhouseConnector) SetLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + c.logger.Error("failed to update last offset: ", slog.Any("error", err)) + return err + } + + return nil +} diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 60c2ba4840..84cc179b1a 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -8,6 +8,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) @@ -15,6 +16,7 @@ import ( type ClickhouseConnector struct { ctx context.Context database *sql.DB + pgMetadata *metadataStore.PostgresMetadataStore tableSchemaMapping map[string]*protos.TableSchema logger slog.Logger config *protos.ClickhouseConfig @@ -28,10 +30,18 @@ func NewClickhouseConnector(ctx context.Context, return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, + config.GetMetadataDb(), metadataSchemaName) + if err != nil { + slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) + return nil, err + } + flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &ClickhouseConnector{ ctx: ctx, database: database, + pgMetadata: pgMetadata, tableSchemaMapping: nil, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), config: clickhouseProtoConfig,