Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake: stop storing metadata on warehouse; store in catalog #1179

Merged
merged 7 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ import (
)

const (
/*
Different batch Ids in code/BigQuery
1. batchID - identifier in raw table on target to depict which batch a row was inserted.
3. syncBatchID - batch id that was last synced or will be synced
4. normalizeBatchID - batch id that was last normalized or will be normalized.
*/
// MirrorJobsTable has the following schema:
// CREATE TABLE peerdb_mirror_jobs (
// mirror_job_id STRING NOT NULL,
Expand Down
9 changes: 1 addition & 8 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,11 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string,
return nil
}

// external
func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *ClickhouseConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func NewClickhouseConnector(ctx context.Context,
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

metadataSchemaName := "peerdb_ch_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ func (s *ClickhouseAvroSyncMethod) insertMetadata(
if err != nil {
s.connector.logger.Error("failed to create metadata insert statement",
slog.Any("error", err), partitionLog)
return fmt.Errorf("failed to create metadata insert statement: %v", err)
return fmt.Errorf("failed to create metadata insert statement: %w", err)
}

if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil {
return fmt.Errorf("failed to execute metadata insert statement: %v", err)
return fmt.Errorf("failed to execute metadata insert statement: %w", err)
}

return nil
Expand Down
11 changes: 2 additions & 9 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func NewEventHubConnector(
}

hubManager := NewEventHubManager(defaultAzureCreds, config)
metadataSchemaName := "peerdb_eventhub_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store",
slog.Any("error", err))
Expand Down Expand Up @@ -73,16 +72,10 @@ func (c *EventHubConnector) ConnectionActive() error {
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *EventHubConnector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error(fmt.Sprintf("failed to setup metadata tables: %v", err))
return err
}

return nil
}

Expand Down
166 changes: 76 additions & 90 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,43 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/encoding/protojson"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

const (
lastSyncStateTableName = "last_sync_state"
lastSyncStateTableName = "metadata_last_sync_state"
qrepTableName = "metadata_qrep_partitions"
)

type PostgresMetadataStore struct {
ctx context.Context
pool *pgxpool.Pool
schemaName string
logger slog.Logger
ctx context.Context
pool *pgxpool.Pool
logger slog.Logger
}

func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) {
func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, error) {
pool, err := cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %w", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &PostgresMetadataStore{
ctx: ctx,
pool: pool,
schemaName: schemaName,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
ctx: ctx,
pool: pool,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

func (p *PostgresMetadataStore) QualifyTable(table string) string {
return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table)
}

func (p *PostgresMetadataStore) Ping() error {
pingErr := p.pool.Ping(p.ctx)
if pingErr != nil {
Expand All @@ -54,57 +50,11 @@ func (p *PostgresMetadataStore) Ping() error {
return nil
}

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists pgtype.Int8
err := row.Scan(&exists)
if err != nil {
p.logger.Error("failed to check if schema exists", slog.Any("error", err))
return false
}

if exists.Int64 > 0 {
return true
}

return true
}

func (p *PostgresMetadataStore) SetupMetadata() error {
// create the schema
_, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}

// create the last sync state table
_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
sync_batch_id BIGINT NOT NULL,
normalize_batch_id BIGINT
)
`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create last sync state table", slog.Any("error", err))
return err
}

p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName))
return nil
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
row := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
row := p.pool.QueryRow(p.ctx,
`SELECT last_offset FROM `+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)
var offset pgtype.Int8
err := row.Scan(&offset)
if err != nil {
Expand All @@ -122,11 +72,10 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
row := p.pool.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
row := p.pool.QueryRow(p.ctx,
`SELECT sync_batch_id FROM `+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)

var syncBatchID pgtype.Int8
err := row.Scan(&syncBatchID)
Expand All @@ -145,11 +94,10 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT normalize_batch_id
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
`, jobName)
rows := p.pool.QueryRow(p.ctx,
`SELECT normalize_batch_id FROM `+
lastSyncStateTableName+
` WHERE job_name = $1`, jobName)

var normalizeBatchID pgtype.Int8
err := rows.Scan(&normalizeBatchID)
Expand All @@ -171,10 +119,10 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64,
func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
p.logger.Info("updating last offset", slog.Int64("offset", offset))
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)
if err != nil {
Expand All @@ -188,12 +136,12 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error {
p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset))
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET
last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
sync_batch_id = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.sync_batch_id, excluded.sync_batch_id),
last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
sync_batch_id = GREATEST(`+lastSyncStateTableName+`.sync_batch_id, excluded.sync_batch_id),
updated_at = NOW()
`, jobName, offset, syncBatchID)
if err != nil {
Expand All @@ -206,10 +154,9 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o

func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error {
p.logger.Info("updating normalize batch id for job")
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET normalize_batch_id=$2 WHERE job_name=$1
`, jobName, batchID)
_, err := p.pool.Exec(p.ctx,
`UPDATE `+lastSyncStateTableName+
` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID)
if err != nil {
p.logger.Error("failed to update normalize batch id", slog.Any("error", err))
return err
Expand All @@ -218,10 +165,49 @@ func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID i
return nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
func (p *PostgresMetadataStore) FinishQrepPartition(
partition *protos.QRepPartition,
jobName string,
startTime time.Time,
) error {
pbytes, err := protojson.Marshal(partition)
if err != nil {
return fmt.Errorf("failed to marshal partition to json: %w", err)
}
partitionJSON := string(pbytes)

_, err = p.pool.Exec(p.ctx,
`INSERT INTO `+qrepTableName+
`(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)
ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`,
jobName, partition.PartitionId, partitionJSON, startTime)
return err
}

func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) {
var exists bool
err := p.pool.QueryRow(p.ctx,
`SELECT EXISTS(SELECT * FROM `+qrepTableName+
` WHERE job_name = $1 AND partition_id = $2)`,
jobName, partitionID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
return exists, nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx,
`DELETE FROM `+lastSyncStateTableName+` WHERE job_name = $1`, jobName)
if err != nil {
return err
}

_, err = p.pool.Exec(p.ctx,
`DELETE FROM `+qrepTableName+` WHERE job_name = $1`, jobName)
if err != nil {
return err
}

return nil
}
20 changes: 3 additions & 17 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func NewS3Connector(ctx context.Context,
if err != nil {
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
metadataSchemaName := "peerdb_s3_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand Down Expand Up @@ -149,16 +148,10 @@ func (c *S3Connector) ConnectionActive() error {
}

func (c *S3Connector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
return false
}

func (c *S3Connector) SetupMetadataTables() error {
err := c.pgMetadata.SetupMetadata()
if err != nil {
c.logger.Error("failed to setup metadata tables", slog.Any("error", err))
return err
}

return nil
}

Expand All @@ -170,15 +163,8 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
func (c *S3Connector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
return err
}

return nil
return c.pgMetadata.UpdateLastOffset(jobName, offset)
}

func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
Expand Down
Loading
Loading