From 272ec5c75ad3b2ed7d4ba9e9f5f1dd8bcb14b4f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 1 Feb 2024 00:16:58 +0000 Subject: [PATCH] move metadata schema creation into refinery migration --- flow/connectors/clickhouse/cdc.go | 9 +- flow/connectors/clickhouse/clickhouse.go | 3 +- flow/connectors/eventhub/eventhub.go | 11 +- flow/connectors/external_metadata/store.go | 113 ++++----------------- flow/connectors/s3/s3.go | 7 +- flow/connectors/snowflake/snowflake.go | 6 +- nexus/catalog/migrations/V19__metadata.sql | 16 +++ 7 files changed, 48 insertions(+), 117 deletions(-) create mode 100644 nexus/catalog/migrations/V19__metadata.sql diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 870a77f21d..422502e7b5 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -158,18 +158,11 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string, return nil } -// external func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *ClickhouseConnector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error("failed to setup metadata tables", slog.Any("error", err)) - return err - } - return nil } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index a66e396ded..43b8d028d3 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -31,8 +31,7 @@ func NewClickhouseConnector(ctx context.Context, return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } - metadataSchemaName := "peerdb_ch_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 3707dcb2a9..5386eb11a4 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -39,8 +39,7 @@ func NewEventHubConnector( } hubManager := NewEventHubManager(defaultAzureCreds, config) - metadataSchemaName := "peerdb_eventhub_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) @@ -73,16 +72,10 @@ func (c *EventHubConnector) ConnectionActive() error { } func (c *EventHubConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *EventHubConnector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error(fmt.Sprintf("failed to setup metadata tables: %v", err)) - return err - } - return nil } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 4e6613e902..d70a851e44 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -11,26 +11,23 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "google.golang.org/protobuf/encoding/protojson" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) const ( - lastSyncStateTableName = "last_sync_state" - qrepTableName = "qrep_metadata" + lastSyncStateTableName = "metadata_last_sync_state" + qrepTableName = "metadata_qrep_partitions" ) type PostgresMetadataStore struct { - ctx context.Context - pool *pgxpool.Pool - schemaName string - logger slog.Logger + ctx context.Context + pool *pgxpool.Pool + logger slog.Logger } -func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) { +func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, error) { pool, err := cc.GetCatalogConnectionPoolFromEnv() if err != nil { return nil, fmt.Errorf("failed to create catalog connection pool: %w", err) @@ -38,17 +35,12 @@ func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*Postgres flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &PostgresMetadataStore{ - ctx: ctx, - pool: pool, - schemaName: schemaName, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + ctx: ctx, + pool: pool, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), }, nil } -func (p *PostgresMetadataStore) QualifyTable(table string) string { - return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table) -} - func (p *PostgresMetadataStore) Ping() error { pingErr := p.pool.Ping(p.ctx) if pingErr != nil { @@ -58,68 +50,10 @@ func (p *PostgresMetadataStore) Ping() error { return nil } -func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { - // check if schema exists - row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) - - var exists pgtype.Int8 - err := row.Scan(&exists) - if err != nil { - p.logger.Error("failed to check if schema exists", slog.Any("error", err)) - return false - } - - if exists.Int64 > 0 { - return true - } - - return true -} - -func (p *PostgresMetadataStore) SetupMetadata() error { - // create the schema - _, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create schema", slog.Any("error", err)) - return err - } - - // create the last sync state table - _, err = p.pool.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+`( - job_name TEXT PRIMARY KEY NOT NULL, - last_offset BIGINT NOT NULL, - updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - sync_batch_id BIGINT NOT NULL, - normalize_batch_id BIGINT - )`) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create last sync state table", slog.Any("error", err)) - return err - } - - _, err = p.pool.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.QualifyTable(qrepTableName)+`( - job_name TEXT NOT NULL, - partition_id TEXT NOT NULL, - sync_partition JSON NOT NULL, - sync_start_time TIMESTAMPTZ NOT NULL, - sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), - PRIMARY KEY(job_name, partition_id) - )`) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create qrep metadata table", slog.Any("error", err)) - return err - } - - p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)) - return nil -} - func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { row := p.pool.QueryRow(p.ctx, `SELECT last_offset FROM `+ - p.QualifyTable(lastSyncStateTableName)+ + lastSyncStateTableName+ ` WHERE job_name = $1`, jobName) var offset pgtype.Int8 err := row.Scan(&offset) @@ -140,7 +74,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { row := p.pool.QueryRow(p.ctx, `SELECT sync_batch_id FROM `+ - p.QualifyTable(lastSyncStateTableName)+ + lastSyncStateTableName+ ` WHERE job_name = $1`, jobName) var syncBatchID pgtype.Int8 @@ -162,7 +96,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, `SELECT normalize_batch_id FROM `+ - p.QualifyTable(lastSyncStateTableName)+ + lastSyncStateTableName+ ` WHERE job_name = $1`, jobName) var normalizeBatchID pgtype.Int8 @@ -185,10 +119,10 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { p.logger.Info("updating last offset", slog.Int64("offset", offset)) _, err := p.pool.Exec(p.ctx, ` - INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), + DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), updated_at = NOW() `, jobName, offset, 0) if err != nil { @@ -202,12 +136,12 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error { p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset)) _, err := p.pool.Exec(p.ctx, ` - INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) DO UPDATE SET - last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), - sync_batch_id = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.sync_batch_id, excluded.sync_batch_id), + last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), + sync_batch_id = GREATEST(`+lastSyncStateTableName+`.sync_batch_id, excluded.sync_batch_id), updated_at = NOW() `, jobName, offset, syncBatchID) if err != nil { @@ -221,7 +155,7 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error { p.logger.Info("updating normalize batch id for job") _, err := p.pool.Exec(p.ctx, - `UPDATE `+p.QualifyTable(lastSyncStateTableName)+ + `UPDATE `+lastSyncStateTableName+ ` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID) if err != nil { p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) @@ -243,7 +177,7 @@ func (p *PostgresMetadataStore) FinishQrepPartition( partitionJSON := string(pbytes) _, err = p.pool.Exec(p.ctx, - `INSERT INTO `+p.QualifyTable(qrepTableName)+ + `INSERT INTO `+qrepTableName+ `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4) ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`, jobName, partition.PartitionId, partitionJSON, startTime) @@ -253,8 +187,7 @@ func (p *PostgresMetadataStore) FinishQrepPartition( func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) { var exists bool err := p.pool.QueryRow(p.ctx, - `SELECT EXISTS(SELECT * FROM `+ - p.QualifyTable(qrepTableName)+ + `SELECT EXISTS(SELECT * FROM `+qrepTableName+ ` WHERE job_name = $1 AND partition_id = $2)`, jobName, partitionID).Scan(&exists) if err != nil { @@ -265,15 +198,13 @@ func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionI func (p *PostgresMetadataStore) DropMetadata(jobName string) error { _, err := p.pool.Exec(p.ctx, - `DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+ - ` WHERE job_name = $1`, jobName) + `DELETE FROM `+lastSyncStateTableName+` WHERE job_name = $1`, jobName) if err != nil { return err } _, err = p.pool.Exec(p.ctx, - `DELETE FROM `+p.QualifyTable(qrepTableName)+ - ` WHERE job_name = $1`, jobName) + `DELETE FROM `+qrepTableName+` WHERE job_name = $1`, jobName) if err != nil { return err } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 7d258ad567..95213c6123 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -65,8 +65,7 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } - metadataSchemaName := "peerdb_s3_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err @@ -149,11 +148,11 @@ func (c *S3Connector) ConnectionActive() error { } func (c *S3Connector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *S3Connector) SetupMetadataTables() error { - return c.pgMetadata.SetupMetadata() + return nil } func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index b325e96f7e..7394110b86 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -202,7 +202,7 @@ func NewSnowflakeConnector(ctx context.Context, rawSchema = *snowflakeProtoConfig.MetadataSchema } - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, "peerdb_sf_metadata") + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { return nil, fmt.Errorf("could not connect to metadata store: %w", err) } @@ -240,11 +240,11 @@ func (c *SnowflakeConnector) ConnectionActive() error { } func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *SnowflakeConnector) SetupMetadataTables() error { - return c.pgMetadata.SetupMetadata() + return nil } // only used for testing atm. doesn't return info about pkey or ReplicaIdentity [which is PG specific anyway]. diff --git a/nexus/catalog/migrations/V19__metadata.sql b/nexus/catalog/migrations/V19__metadata.sql new file mode 100644 index 0000000000..886dc66eea --- /dev/null +++ b/nexus/catalog/migrations/V19__metadata.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS metadata_last_sync_state ( + job_name TEXT PRIMARY KEY NOT NULL, + last_offset BIGINT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + sync_batch_id BIGINT NOT NULL, + normalize_batch_id BIGINT +); + +CREATE TABLE IF NOT EXISTS metadata_qrep_partitions ( + job_name TEXT NOT NULL, + partition_id TEXT NOT NULL, + sync_partition JSON NOT NULL, + sync_start_time TIMESTAMPTZ NOT NULL, + sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (job_name, partition_id) +);