Skip to content

Commit

Permalink
move metadata schema creation into refinery migration
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 1, 2024
1 parent 244de67 commit 272ec5c
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 117 deletions.
9 changes: 1 addition & 8 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,11 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string,
return nil
}

// external
func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *ClickhouseConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func NewClickhouseConnector(ctx context.Context,
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

metadataSchemaName := "peerdb_ch_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand Down
11 changes: 2 additions & 9 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func NewEventHubConnector(
}

hubManager := NewEventHubManager(defaultAzureCreds, config)
metadataSchemaName := "peerdb_eventhub_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store",
slog.Any("error", err))
Expand Down Expand Up @@ -73,16 +72,10 @@ func (c *EventHubConnector) ConnectionActive() error {
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *EventHubConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error(fmt.Sprintf("failed to setup metadata tables: %v", err))
return err
}

return nil
}

Expand Down
113 changes: 22 additions & 91 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,36 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/encoding/protojson"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

const (
lastSyncStateTableName = "last_sync_state"
qrepTableName = "qrep_metadata"
lastSyncStateTableName = "metadata_last_sync_state"
qrepTableName = "metadata_qrep_partitions"
)

type PostgresMetadataStore struct {
ctx context.Context
pool *pgxpool.Pool
schemaName string
logger slog.Logger
ctx context.Context
pool *pgxpool.Pool
logger slog.Logger
}

func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) {
func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, error) {
pool, err := cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %w", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &PostgresMetadataStore{
ctx: ctx,
pool: pool,
schemaName: schemaName,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
ctx: ctx,
pool: pool,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

func (p *PostgresMetadataStore) QualifyTable(table string) string {
return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table)
}

func (p *PostgresMetadataStore) Ping() error {
pingErr := p.pool.Ping(p.ctx)
if pingErr != nil {
Expand All @@ -58,68 +50,10 @@ func (p *PostgresMetadataStore) Ping() error {
return nil
}

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists pgtype.Int8
err := row.Scan(&exists)
if err != nil {
p.logger.Error("failed to check if schema exists", slog.Any("error", err))
return false
}

if exists.Int64 > 0 {
return true
}

return true
}

func (p *PostgresMetadataStore) SetupMetadata() error {
// create the schema
_, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}

// create the last sync state table
_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+`(
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sync_batch_id BIGINT NOT NULL,
normalize_batch_id BIGINT
)`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create last sync state table", slog.Any("error", err))
return err
}

_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(qrepTableName)+`(
job_name TEXT NOT NULL,
partition_id TEXT NOT NULL,
sync_partition JSON NOT NULL,
sync_start_time TIMESTAMPTZ NOT NULL,
sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(job_name, partition_id)
)`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create qrep metadata table", slog.Any("error", err))
return err
}

p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName))
return nil
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
row := p.pool.QueryRow(p.ctx,
`SELECT last_offset FROM `+
p.QualifyTable(lastSyncStateTableName)+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)
var offset pgtype.Int8
err := row.Scan(&offset)
Expand All @@ -140,7 +74,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
row := p.pool.QueryRow(p.ctx,
`SELECT sync_batch_id FROM `+
p.QualifyTable(lastSyncStateTableName)+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)

var syncBatchID pgtype.Int8
Expand All @@ -162,7 +96,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx,
`SELECT normalize_batch_id FROM `+
p.QualifyTable(lastSyncStateTableName)+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)

var normalizeBatchID pgtype.Int8
Expand All @@ -185,10 +119,10 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64,
func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
p.logger.Info("updating last offset", slog.Int64("offset", offset))
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)
if err != nil {
Expand All @@ -202,12 +136,12 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error {
p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset))
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET
last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
sync_batch_id = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.sync_batch_id, excluded.sync_batch_id),
last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
sync_batch_id = GREATEST(`+lastSyncStateTableName+`.sync_batch_id, excluded.sync_batch_id),
updated_at = NOW()
`, jobName, offset, syncBatchID)
if err != nil {
Expand All @@ -221,7 +155,7 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o
func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error {
p.logger.Info("updating normalize batch id for job")
_, err := p.pool.Exec(p.ctx,
`UPDATE `+p.QualifyTable(lastSyncStateTableName)+
`UPDATE `+lastSyncStateTableName+
` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID)
if err != nil {
p.logger.Error("failed to update normalize batch id", slog.Any("error", err))
Expand All @@ -243,7 +177,7 @@ func (p *PostgresMetadataStore) FinishQrepPartition(
partitionJSON := string(pbytes)

_, err = p.pool.Exec(p.ctx,
`INSERT INTO `+p.QualifyTable(qrepTableName)+
`INSERT INTO `+qrepTableName+
`(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)
ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`,
jobName, partition.PartitionId, partitionJSON, startTime)
Expand All @@ -253,8 +187,7 @@ func (p *PostgresMetadataStore) FinishQrepPartition(
func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) {
var exists bool
err := p.pool.QueryRow(p.ctx,
`SELECT EXISTS(SELECT * FROM `+
p.QualifyTable(qrepTableName)+
`SELECT EXISTS(SELECT * FROM `+qrepTableName+
` WHERE job_name = $1 AND partition_id = $2)`,
jobName, partitionID).Scan(&exists)
if err != nil {
Expand All @@ -265,15 +198,13 @@ func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionI

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx,
`DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+
` WHERE job_name = $1`, jobName)
`DELETE FROM `+lastSyncStateTableName+` WHERE job_name = $1`, jobName)
if err != nil {
return err
}

_, err = p.pool.Exec(p.ctx,
`DELETE FROM `+p.QualifyTable(qrepTableName)+
` WHERE job_name = $1`, jobName)
`DELETE FROM `+qrepTableName+` WHERE job_name = $1`, jobName)
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func NewS3Connector(ctx context.Context,
if err != nil {
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
metadataSchemaName := "peerdb_s3_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand Down Expand Up @@ -149,11 +148,11 @@ func (c *S3Connector) ConnectionActive() error {
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *S3Connector) SetupMetadataTables() error {
return c.pgMetadata.SetupMetadata()
return nil
}

func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) {
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func NewSnowflakeConnector(ctx context.Context,
rawSchema = *snowflakeProtoConfig.MetadataSchema
}

pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, "peerdb_sf_metadata")
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
return nil, fmt.Errorf("could not connect to metadata store: %w", err)
}
Expand Down Expand Up @@ -240,11 +240,11 @@ func (c *SnowflakeConnector) ConnectionActive() error {
}

func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *SnowflakeConnector) SetupMetadataTables() error {
return c.pgMetadata.SetupMetadata()
return nil
}

// only used for testing atm. doesn't return info about pkey or ReplicaIdentity [which is PG specific anyway].
Expand Down
16 changes: 16 additions & 0 deletions nexus/catalog/migrations/V19__metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS metadata_last_sync_state (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sync_batch_id BIGINT NOT NULL,
normalize_batch_id BIGINT
);

CREATE TABLE IF NOT EXISTS metadata_qrep_partitions (
job_name TEXT NOT NULL,
partition_id TEXT NOT NULL,
sync_partition JSON NOT NULL,
sync_start_time TIMESTAMPTZ NOT NULL,
sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (job_name, partition_id)
);

0 comments on commit 272ec5c

Please sign in to comment.