Skip to content

Commit

Permalink
Snowflake: stop storing metadata on warehouse; store in catalog (#1179)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored and pankaj-peerdb committed Feb 6, 2024
1 parent 055587e commit 5f1b677
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 512 deletions.
6 changes: 0 additions & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ import (
)

const (
/*
Different batch Ids in code/BigQuery
1. batchID - identifier in raw table on target to depict which batch a row was inserted.
3. syncBatchID - batch id that was last synced or will be synced
4. normalizeBatchID - batch id that was last normalized or will be normalized.
*/
// MirrorJobsTable has the following schema:
// CREATE TABLE peerdb_mirror_jobs (
// mirror_job_id STRING NOT NULL,
Expand Down
9 changes: 1 addition & 8 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,11 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string,
return nil
}

// external
func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *ClickhouseConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func NewClickhouseConnector(ctx context.Context,
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

metadataSchemaName := "peerdb_ch_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ func (s *ClickhouseAvroSyncMethod) insertMetadata(
if err != nil {
s.connector.logger.Error("failed to create metadata insert statement",
slog.Any("error", err), partitionLog)
return fmt.Errorf("failed to create metadata insert statement: %v", err)
return fmt.Errorf("failed to create metadata insert statement: %w", err)
}

if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil {
return fmt.Errorf("failed to execute metadata insert statement: %v", err)
return fmt.Errorf("failed to execute metadata insert statement: %w", err)
}

return nil
Expand Down
11 changes: 2 additions & 9 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func NewEventHubConnector(
}

hubManager := NewEventHubManager(defaultAzureCreds, config)
metadataSchemaName := "peerdb_eventhub_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store",
slog.Any("error", err))
Expand Down Expand Up @@ -73,16 +72,10 @@ func (c *EventHubConnector) ConnectionActive() error {
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *EventHubConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error(fmt.Sprintf("failed to setup metadata tables: %v", err))
return err
}

return nil
}

Expand Down
166 changes: 76 additions & 90 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,43 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/encoding/protojson"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

const (
lastSyncStateTableName = "last_sync_state"
lastSyncStateTableName = "metadata_last_sync_state"
qrepTableName = "metadata_qrep_partitions"
)

type PostgresMetadataStore struct {
ctx context.Context
pool *pgxpool.Pool
schemaName string
logger slog.Logger
ctx context.Context
pool *pgxpool.Pool
logger slog.Logger
}

func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) {
func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, error) {
pool, err := cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %w", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &PostgresMetadataStore{
ctx: ctx,
pool: pool,
schemaName: schemaName,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
ctx: ctx,
pool: pool,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

func (p *PostgresMetadataStore) QualifyTable(table string) string {
return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table)
}

func (p *PostgresMetadataStore) Ping() error {
pingErr := p.pool.Ping(p.ctx)
if pingErr != nil {
Expand All @@ -54,57 +50,11 @@ func (p *PostgresMetadataStore) Ping() error {
return nil
}

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists pgtype.Int8
err := row.Scan(&exists)
if err != nil {
p.logger.Error("failed to check if schema exists", slog.Any("error", err))
return false
}

if exists.Int64 > 0 {
return true
}

return true
}

func (p *PostgresMetadataStore) SetupMetadata() error {
// create the schema
_, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}

// create the last sync state table
_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
sync_batch_id BIGINT NOT NULL,
normalize_batch_id BIGINT
)
`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create last sync state table", slog.Any("error", err))
return err
}

p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName))
return nil
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
row := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
row := p.pool.QueryRow(p.ctx,
`SELECT last_offset FROM `+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)
var offset pgtype.Int8
err := row.Scan(&offset)
if err != nil {
Expand All @@ -122,11 +72,10 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
row := p.pool.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
row := p.pool.QueryRow(p.ctx,
`SELECT sync_batch_id FROM `+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)

var syncBatchID pgtype.Int8
err := row.Scan(&syncBatchID)
Expand All @@ -145,11 +94,10 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT normalize_batch_id
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
rows := p.pool.QueryRow(p.ctx,
`SELECT normalize_batch_id FROM `+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)

var normalizeBatchID pgtype.Int8
err := rows.Scan(&normalizeBatchID)
Expand All @@ -171,10 +119,10 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64,
func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
p.logger.Info("updating last offset", slog.Int64("offset", offset))
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)
if err != nil {
Expand All @@ -188,12 +136,12 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error {
p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset))
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET
last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
sync_batch_id = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.sync_batch_id, excluded.sync_batch_id),
last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
sync_batch_id = GREATEST(`+lastSyncStateTableName+`.sync_batch_id, excluded.sync_batch_id),
updated_at = NOW()
`, jobName, offset, syncBatchID)
if err != nil {
Expand All @@ -206,10 +154,9 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o

func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error {
p.logger.Info("updating normalize batch id for job")
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET normalize_batch_id=$2 WHERE job_name=$1
`, jobName, batchID)
_, err := p.pool.Exec(p.ctx,
`UPDATE `+lastSyncStateTableName+
` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID)
if err != nil {
p.logger.Error("failed to update normalize batch id", slog.Any("error", err))
return err
Expand All @@ -218,10 +165,49 @@ func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID i
return nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
func (p *PostgresMetadataStore) FinishQrepPartition(
partition *protos.QRepPartition,
jobName string,
startTime time.Time,
) error {
pbytes, err := protojson.Marshal(partition)
if err != nil {
return fmt.Errorf("failed to marshal partition to json: %w", err)
}
partitionJSON := string(pbytes)

_, err = p.pool.Exec(p.ctx,
`INSERT INTO `+qrepTableName+
`(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)
ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`,
jobName, partition.PartitionId, partitionJSON, startTime)
return err
}

func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) {
var exists bool
err := p.pool.QueryRow(p.ctx,
`SELECT EXISTS(SELECT * FROM `+qrepTableName+
` WHERE job_name = $1 AND partition_id = $2)`,
jobName, partitionID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
return exists, nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx,
`DELETE FROM `+lastSyncStateTableName+` WHERE job_name = $1`, jobName)
if err != nil {
return err
}

_, err = p.pool.Exec(p.ctx,
`DELETE FROM `+qrepTableName+` WHERE job_name = $1`, jobName)
if err != nil {
return err
}

return nil
}
20 changes: 3 additions & 17 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func NewS3Connector(ctx context.Context,
if err != nil {
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
metadataSchemaName := "peerdb_s3_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand Down Expand Up @@ -149,16 +148,10 @@ func (c *S3Connector) ConnectionActive() error {
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *S3Connector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
}

Expand All @@ -170,15 +163,8 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
func (c *S3Connector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
return err
}

return nil
return c.pgMetadata.UpdateLastOffset(jobName, offset)
}

func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
Expand Down
Loading

0 comments on commit 5f1b677

Please sign in to comment.