Skip to content

Commit

Permalink
use catalog for metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 21, 2024
1 parent ccbfe9b commit 44a0b97
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 122 deletions.
157 changes: 35 additions & 122 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,127 +66,6 @@ func (c *ClickhouseConnector) getMirrorRowByJobNAme(jobName string) (*MirrorJobR
return &result, nil
}

func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool {
result, err := c.checkIfTableExists(c.config.Database, mirrorJobsTableIdentifier)
if err != nil {
return true
}
return !result
}

func (c *ClickhouseConnector) SetupMetadataTables() error {

createMirrorJobsTableSQL := `CREATE TABLE IF NOT EXISTS %s (
MIRROR_JOB_NAME String NOT NULL,
OFFSET Int32 NOT NULL,
SYNC_BATCH_ID Int32 NOT NULL,
NORMALIZE_BATCH_ID Int32 NOT NULL
) ENGINE = MergeTree()
ORDER BY MIRROR_JOB_NAME;`

// NOTE that Clickhouse does not support transactional DDL
//createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil)
// if err != nil {
// return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err)
// }
// in case we return after error, ensure transaction is rolled back
// defer func() {
// deferErr := createMetadataTablesTx.Rollback()
// if deferErr != sql.ErrTxDone && deferErr != nil {
// c.logger.Error("error while rolling back transaction for creating metadata tables",
// slog.Any("error", deferErr))
// }
// }()

// Not needed as we dont have schema
// err = c.createPeerDBInternalSchema(createMetadataTablesTx)
// if err != nil {
// return err
// }
_, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL, mirrorJobsTableIdentifier))
if err != nil {
return fmt.Errorf("error while setting up mirror jobs table: %w", err)
}
// err = createMetadataTablesTx.Commit()
// if err != nil {
// return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err)
// }

return nil
}

func (c *ClickhouseConnector) GetLastOffset(jobName string) (int64, error) {
getLastOffsetSQL := "SELECT OFFSET FROM %s WHERE MIRROR_JOB_NAME=?"

rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL,
mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Clickhouse peer for last syncedID: %w", err)
}
defer func() {
err = rows.Close()
if err != nil {
c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err))
}
}()

if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
if err != nil {
return 0, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened")
return 0, nil
}
return result.Int64, nil
}

func (c *ClickhouseConnector) SetLastOffset(jobName string, lastOffset int64) error {
currentRow, err := c.getMirrorRowByJobNAme(jobName)

if err != nil {
return err
}

//setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL := `INSERT INTO %s
(mirror_job_name, offset, sync_batch_id, normalize_batch_id)
VALUES (?, ?, ?, ?);`
_, err = c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL,
mirrorJobsTableIdentifier), currentRow.MirrorJobName, lastOffset, currentRow.SyncBatchID, currentRow.NormalizeBatchID)
if err != nil {
return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
return nil
}

func (c *ClickhouseConnector) GetLastSyncBatchID(jobName string) (int64, error) {
getLastSyncBatchID_SQL := "SELECT SYNC_BATCH_ID FROM %s WHERE MIRROR_JOB_NAME=?"

rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL,
mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Clickhouse peer for last syncBatchId: %w", err)
}
defer rows.Close()

var result pgtype.Int8
if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
err = rows.Scan(&result)
if err != nil {
return 0, fmt.Errorf("error while reading result row: %w", err)
}
return result.Int64, nil
}

func (c *ClickhouseConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

Expand Down Expand Up @@ -245,7 +124,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s",
rawTableIdentifier)),
}
avroSyncer := NewSnowflakeAvroSyncMethod(qrepConfig, c)
avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c)
destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier)
if err != nil {
return nil, err
Expand Down Expand Up @@ -419,3 +298,37 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string,

return nil
}

// external
func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
}

func (c *ClickhouseConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
}

func (c *ClickhouseConnector) GetLastSyncBatchID(jobName string) (int64, error) {
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *ClickhouseConnector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
func (c *ClickhouseConnector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
return err
}

return nil
}
10 changes: 10 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseConnector struct {
ctx context.Context
database *sql.DB
pgMetadata *metadataStore.PostgresMetadataStore
tableSchemaMapping map[string]*protos.TableSchema
logger slog.Logger
config *protos.ClickhouseConfig
Expand All @@ -28,10 +30,18 @@ func NewClickhouseConnector(ctx context.Context,
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx,
config.GetMetadataDb(), metadataSchemaName)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &ClickhouseConnector{
ctx: ctx,
database: database,
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
config: clickhouseProtoConfig,
Expand Down

0 comments on commit 44a0b97

Please sign in to comment.