From a0117e5690bd397f93912ce3dd0dfa7c615dc292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 29 Jan 2024 21:13:38 +0000 Subject: [PATCH 1/7] Snowflake: stop storing metadata on warehouse; store in catalog --- flow/connectors/bigquery/bigquery.go | 6 - flow/connectors/clickhouse/qrep_avro_sync.go | 4 +- flow/connectors/external_metadata/store.go | 117 ++++++-- flow/connectors/s3/s3.go | 17 +- flow/connectors/snowflake/qrep.go | 99 +------ flow/connectors/snowflake/qrep_avro_sync.go | 12 +- flow/connectors/snowflake/snowflake.go | 293 ++++--------------- 7 files changed, 150 insertions(+), 398 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 963eb60a96..edb3cbdcf3 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -27,12 +27,6 @@ import ( ) const ( - /* - Different batch Ids in code/BigQuery - 1. batchID - identifier in raw table on target to depict which batch a row was inserted. - 3. syncBatchID - batch id that was last synced or will be synced - 4. normalizeBatchID - batch id that was last normalized or will be normalized. - */ // MirrorJobsTable has the following schema: // CREATE TABLE peerdb_mirror_jobs ( // mirror_job_id STRING NOT NULL, diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 68129a98d5..3d2f179bca 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -195,11 +195,11 @@ func (s *ClickhouseAvroSyncMethod) insertMetadata( if err != nil { s.connector.logger.Error("failed to create metadata insert statement", slog.Any("error", err), partitionLog) - return fmt.Errorf("failed to create metadata insert statement: %v", err) + return fmt.Errorf("failed to create metadata insert statement: %w", err) } if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil { - return fmt.Errorf("failed to execute metadata insert statement: %v", err) + return fmt.Errorf("failed to execute metadata insert statement: %w", err) } return nil diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index ab5224b2ee..3be58019b8 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" + "google.golang.org/protobuf/encoding/protojson" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -18,6 +20,7 @@ import ( const ( lastSyncStateTableName = "last_sync_state" + qrepTableName = "qrep_metadata" ) type Querier interface { @@ -118,29 +121,49 @@ func (p *PostgresMetadataStore) SetupMetadata() error { // create the last sync state table _, err = p.conn.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` ( + CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+`( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), sync_batch_id BIGINT NOT NULL, normalize_batch_id BIGINT - ) - `) + )`) if err != nil && !utils.IsUniqueError(err) { p.logger.Error("failed to create last sync state table", slog.Any("error", err)) return err } + _, err = p.conn.Exec(p.ctx, ` + CREATE TABLE IF NOT EXISTS `+p.QualifyTable(qrepTableName)+`( + job_name TEXT NOT NULL, + partition_id TEXT NOT NULL, + sync_partition JSON NOT NULL, + sync_start_time TIMESTAMPTZ NOT NULL, + sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`) + if err != nil && !utils.IsUniqueError(err) { + p.logger.Error("failed to create qrep metadata table", slog.Any("error", err)) + return err + } + + _, err = p.conn.Exec(p.ctx, + `CREATE INDEX IF NOT EXISTS ix_qrep_metadata_partition_id ON `+ + p.QualifyTable(qrepTableName)+ + ` USING hash (partition_id)`) + if err != nil && !utils.IsUniqueError(err) { + p.logger.Error("failed to create qrep metadata index", slog.Any("error", err)) + return err + } + p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)) return nil } func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { - row := p.conn.QueryRow(p.ctx, ` - SELECT last_offset - FROM `+p.QualifyTable(lastSyncStateTableName)+` - WHERE job_name = $1 - `, jobName) + row := p.conn.QueryRow(p.ctx, + `SELECT last_offset FROM `+ + p.QualifyTable(lastSyncStateTableName)+ + ` WHERE job_name = $1`, jobName) var offset pgtype.Int8 err := row.Scan(&offset) if err != nil { @@ -158,11 +181,10 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { } func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { - row := p.conn.QueryRow(p.ctx, ` - SELECT sync_batch_id - FROM `+p.QualifyTable(lastSyncStateTableName)+` - WHERE job_name = $1 - `, jobName) + row := p.conn.QueryRow(p.ctx, + `SELECT sync_batch_id FROM `+ + p.QualifyTable(lastSyncStateTableName)+ + ` WHERE job_name = $1`, jobName) var syncBatchID pgtype.Int8 err := row.Scan(&syncBatchID) @@ -181,11 +203,10 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { } func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows := p.conn.QueryRow(p.ctx, ` - SELECT normalize_batch_id - FROM `+p.schemaName+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) + rows := p.conn.QueryRow(p.ctx, + `SELECT normalize_batch_id FROM `+ + p.QualifyTable(lastSyncStateTableName)+ + ` WHERE job_name = $1`, jobName) var normalizeBatchID pgtype.Int8 err := rows.Scan(&normalizeBatchID) @@ -242,10 +263,9 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error { p.logger.Info("updating normalize batch id for job") - _, err := p.conn.Exec(p.ctx, ` - UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` - SET normalize_batch_id=$2 WHERE job_name=$1 - `, jobName, batchID) + _, err := p.conn.Exec(p.ctx, + `UPDATE `+p.QualifyTable(lastSyncStateTableName)+ + ` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID) if err != nil { p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) return err @@ -254,10 +274,51 @@ func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID i return nil } -func (p *PostgresMetadataStore) DropMetadata(jobName string) error { - _, err := p.conn.Exec(p.ctx, ` - DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+` - WHERE job_name = $1 - `, jobName) +func (p *PostgresMetadataStore) FinishQrepPartition( + partition *protos.QRepPartition, + jobName string, + startTime time.Time, +) error { + pbytes, err := protojson.Marshal(partition) + if err != nil { + return fmt.Errorf("failed to marshal partition to json: %w", err) + } + partitionJSON := string(pbytes) + + _, err = p.conn.Exec(p.ctx, + `INSERT INTO `+p.QualifyTable(qrepTableName)+ + `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)`, + jobName, partition.PartitionId, partitionJSON, startTime) return err } + +func (p *PostgresMetadataStore) IsQrepPartitionSynced(partitionID string) (bool, error) { + var count int64 + err := p.conn.QueryRow(p.ctx, + `SELECT COUNT(*) FROM `+ + p.QualifyTable(qrepTableName)+ + ` WHERE partition_id = $1`, + partitionID).Scan(&count) + if err != nil { + return false, fmt.Errorf("failed to execute query: %w", err) + } + return count > 0, nil +} + +func (p *PostgresMetadataStore) DropMetadata(jobName string) error { + _, err := p.conn.Exec(p.ctx, + `DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+ + ` WHERE job_name = $1`, jobName) + if err != nil { + return err + } + + _, err = p.conn.Exec(p.ctx, + `DELETE FROM `+p.QualifyTable(qrepTableName)+ + ` WHERE job_name = $1`, jobName) + if err != nil { + return err + } + + return nil +} diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index df031c9b3b..ec5c35e28c 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -155,13 +155,7 @@ func (c *S3Connector) NeedsSetupMetadataTables() bool { } func (c *S3Connector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error("failed to setup metadata tables", slog.Any("error", err)) - return err - } - - return nil + return c.pgMetadata.SetupMetadata() } func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { @@ -172,15 +166,8 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { return c.pgMetadata.FetchLastOffset(jobName) } -// update offset for a job func (c *S3Connector) SetLastOffset(jobName string, offset int64) error { - err := c.pgMetadata.UpdateLastOffset(jobName, offset) - if err != nil { - c.logger.Error("failed to update last offset: ", slog.Any("error", err)) - return err - } - - return nil + return c.pgMetadata.UpdateLastOffset(jobName, offset) } func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 4361b17881..e3037d298e 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -5,12 +5,10 @@ import ( "fmt" "log/slog" "strings" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/jackc/pgx/v5/pgtype" - "google.golang.org/protobuf/encoding/protojson" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -18,8 +16,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -const qRepMetadataTableName = "_peerdb_query_replication_metadata" - func (c *SnowflakeConnector) SyncQRepRecords( config *protos.QRepConfig, partition *protos.QRepPartition, @@ -37,7 +33,7 @@ func (c *SnowflakeConnector) SyncQRepRecords( } c.logger.Info("Called QRep sync function and obtained table schema", flowLog) - done, err := c.isPartitionSynced(partition.PartitionId) + done, err := c.pgMetadata.IsQrepPartitionSynced(partition.PartitionId) if err != nil { return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) } @@ -51,30 +47,6 @@ func (c *SnowflakeConnector) SyncQRepRecords( return avroSync.SyncQRepRecords(config, partition, tblSchema, stream) } -func (c *SnowflakeConnector) createMetadataInsertStatement( - partition *protos.QRepPartition, - jobName string, - startTime time.Time, -) (string, error) { - // marshal the partition to json using protojson - pbytes, err := protojson.Marshal(partition) - if err != nil { - return "", fmt.Errorf("failed to marshal partition to json: %v", err) - } - - // convert the bytes to string - partitionJSON := string(pbytes) - - insertMetadataStmt := fmt.Sprintf( - `INSERT INTO %s.%s - (flowJobName, partitionID, syncPartition, syncStartTime, syncFinishTime) - VALUES ('%s', '%s', '%s', '%s'::timestamp, CURRENT_TIMESTAMP);`, - c.metadataSchema, qRepMetadataTableName, jobName, partition.PartitionId, - partitionJSON, startTime.Format(time.RFC3339)) - - return insertMetadataStmt, nil -} - func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) { schematable, err := utils.ParseSchemaTable(tableName) if err != nil { @@ -99,49 +71,13 @@ func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType return columnTypes, nil } -func (c *SnowflakeConnector) isPartitionSynced(partitionID string) (bool, error) { - //nolint:gosec - queryString := fmt.Sprintf(` - SELECT COUNT(*) - FROM %s.%s - WHERE partitionID = '%s' - `, c.metadataSchema, qRepMetadataTableName, partitionID) - - row := c.database.QueryRow(queryString) - - var count pgtype.Int8 - if err := row.Scan(&count); err != nil { - return false, fmt.Errorf("failed to execute query: %w", err) - } - - return count.Int64 > 0, nil -} - func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - // NOTE that Snowflake does not support transactional DDL - createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) - } - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := createMetadataTablesTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for creating metadata tables", - slog.Any("error", deferErr)) - } - }() - err = c.createPeerDBInternalSchema(createMetadataTablesTx) - if err != nil { - return err - } - err = c.createQRepMetadataTable(createMetadataTablesTx) + _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.rawSchema)) if err != nil { return err } stageName := c.getStageNameForJob(config.FlowJobName) - err = c.createStage(stageName, config) if err != nil { return err @@ -154,35 +90,6 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) } } - err = createMetadataTablesTx.Commit() - if err != nil { - return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err) - } - - return nil -} - -func (c *SnowflakeConnector) createQRepMetadataTable(createMetadataTableTx *sql.Tx) error { - // Define the schema - schemaStatement := ` - CREATE TABLE IF NOT EXISTS %s.%s ( - flowJobName STRING, - partitionID STRING, - syncPartition STRING, - syncStartTime TIMESTAMP_LTZ, - syncFinishTime TIMESTAMP_LTZ - ); - ` - queryString := fmt.Sprintf(schemaStatement, c.metadataSchema, qRepMetadataTableName) - - _, err := createMetadataTableTx.Exec(queryString) - if err != nil { - c.logger.Error(fmt.Sprintf("failed to create table %s.%s", c.metadataSchema, qRepMetadataTableName), - slog.Any("error", err)) - return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err) - } - - c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName)) return nil } @@ -371,5 +278,5 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error { } func (c *SnowflakeConnector) getStageNameForJob(job string) string { - return fmt.Sprintf("%s.peerdb_stage_%s", c.metadataSchema, job) + return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job) } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 3c330d636c..e72b0bc434 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -290,17 +290,9 @@ func (s *SnowflakeAvroSyncHandler) insertMetadata( startTime time.Time, ) error { partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) + err := s.connector.pgMetadata.FinishQrepPartition(partition, flowJobName, startTime) if err != nil { - s.connector.logger.Error("failed to create metadata insert statement", - slog.Any("error", err), partitionLog) - return fmt.Errorf("failed to create metadata insert statement: %v", err) - } - - if _, err := s.connector.database.ExecContext(s.connector.ctx, insertMetadataStmt); err != nil { - s.connector.logger.Error("failed to execute metadata insert statement "+insertMetadataStmt, - slog.Any("error", err), partitionLog) - return fmt.Errorf("failed to execute metadata insert statement: %v", err) + return fmt.Errorf("failed to execute metadata insert statement: %w", err) } s.connector.logger.Info("inserted metadata for partition", partitionLog) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index dd479b6640..7f9b3c33fe 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -17,6 +17,7 @@ import ( "go.temporal.io/sdk/activity" "golang.org/x/sync/errgroup" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -26,11 +27,9 @@ import ( const ( mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS" - createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(MIRROR_JOB_NAME STRING NOT NULL,OFFSET INT NOT NULL, - SYNC_BATCH_ID INT NOT NULL,NORMALIZE_BATCH_ID INT NOT NULL)` - rawTablePrefix = "_PEERDB_RAW" - createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" - createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, + rawTablePrefix = "_PEERDB_RAW" + createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" + createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, _PEERDB_TIMESTAMP INT NOT NULL,_PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL,_PEERDB_DATA STRING NOT NULL, _PEERDB_RECORD_TYPE INTEGER NOT NULL, _PEERDB_MATCH_DATA STRING,_PEERDB_BATCH_ID INT, _PEERDB_UNCHANGED_TOAST_COLUMNS STRING)` @@ -55,19 +54,13 @@ const ( WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE = 2) THEN %s` getDistinctDestinationTableNames = `SELECT DISTINCT _PEERDB_DESTINATION_TABLE_NAME FROM %s.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d` - getTableNametoUnchangedColsSQL = `SELECT _PEERDB_DESTINATION_TABLE_NAME, + getTableNameToUnchangedColsSQL = `SELECT _PEERDB_DESTINATION_TABLE_NAME, ARRAY_AGG(DISTINCT _PEERDB_UNCHANGED_TOAST_COLUMNS) FROM %s.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2 GROUP BY _PEERDB_DESTINATION_TABLE_NAME` getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE UPPER(TABLE_SCHEMA)=? AND UPPER(TABLE_NAME)=? ORDER BY ORDINAL_POSITION` - insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)" - - updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=? - WHERE MIRROR_JOB_NAME=?` - updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" - checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? and TABLE_NAME=?` checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" @@ -78,14 +71,14 @@ const ( dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" dropSchemaIfExistsSQL = "DROP SCHEMA IF EXISTS %s" - checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" ) type SnowflakeConnector struct { - ctx context.Context - database *sql.DB - metadataSchema string - logger slog.Logger + ctx context.Context + database *sql.DB + pgMetadata *metadataStore.PostgresMetadataStore + rawSchema string + logger slog.Logger } // creating this to capture array results from snowflake. @@ -206,17 +199,23 @@ func NewSnowflakeConnector(ctx context.Context, return nil, fmt.Errorf("could not validate snowflake peer: %w", err) } - metadataSchema := "_PEERDB_INTERNAL" + rawSchema := "_PEERDB_INTERNAL" if snowflakeProtoConfig.MetadataSchema != nil { - metadataSchema = *snowflakeProtoConfig.MetadataSchema + rawSchema = *snowflakeProtoConfig.MetadataSchema + } + + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, nil, "peerdb_sf_metadata") + if err != nil { + return nil, fmt.Errorf("could not connect to metadata store: %w", err) } flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &SnowflakeConnector{ - ctx: ctx, - database: database, - metadataSchema: metadataSchema, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + ctx: ctx, + database: database, + pgMetadata: pgMetadata, + rawSchema: rawSchema, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), }, nil } @@ -243,43 +242,11 @@ func (c *SnowflakeConnector) ConnectionActive() error { } func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool { - result, err := c.checkIfTableExists(c.metadataSchema, mirrorJobsTableIdentifier) - if err != nil { - return true - } - return !result + return c.pgMetadata.NeedsSetupMetadata() } func (c *SnowflakeConnector) SetupMetadataTables() error { - // NOTE that Snowflake does not support transactional DDL - createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) - } - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := createMetadataTablesTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for creating metadata tables", - slog.Any("error", deferErr)) - } - }() - - err = c.createPeerDBInternalSchema(createMetadataTablesTx) - if err != nil { - return err - } - _, err = createMetadataTablesTx.ExecContext(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL, - c.metadataSchema, mirrorJobsTableIdentifier)) - if err != nil { - return fmt.Errorf("error while setting up mirror jobs table: %w", err) - } - err = createMetadataTablesTx.Commit() - if err != nil { - return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err) - } - - return nil + return c.pgMetadata.SetupMetadata() } // only used for testing atm. doesn't return info about pkey or ReplicaIdentity [which is PG specific anyway]. @@ -324,58 +291,19 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T } func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { - var result pgtype.Int8 - err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, - c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - if err == sql.ErrNoRows { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - return 0, fmt.Errorf("error while reading result row: %w", err) - } - if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened") - return 0, nil - } - return result.Int64, nil + return c.pgMetadata.FetchLastOffset(jobName) } -func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error { - _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL, - c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) - if err != nil { - return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) - } - return nil +func (c *SnowflakeConnector) SetLastOffset(jobName string, offset int64) error { + return c.pgMetadata.UpdateLastOffset(jobName, offset) } func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { - var result pgtype.Int8 - err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema, - mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - if err == sql.ErrNoRows { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - return 0, fmt.Errorf("error while reading result row: %w", err) - } - return result.Int64, nil + return c.pgMetadata.GetLastBatchID(jobName) } func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - var normBatchID pgtype.Int8 - err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, - mirrorJobsTableIdentifier), jobName).Scan(&normBatchID) - if err != nil { - if err == sql.ErrNoRows { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - return 0, fmt.Errorf("error while reading result row: %w", err) - } - return normBatchID.Int64, nil + return c.pgMetadata.GetLastNormalizeBatchID(jobName) } func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -383,7 +311,7 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy ) ([]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getDistinctDestinationTableNames, c.metadataSchema, + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getDistinctDestinationTableNames, c.rawSchema, rawTableIdentifier, normalizeBatchID, syncBatchID)) if err != nil { return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err) @@ -407,12 +335,12 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy return destinationTableNames, nil } -func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64, +func (c *SnowflakeConnector) getTableNameToUnchangedCols(flowJobName string, syncBatchID int64, normalizeBatchID int64, ) (map[string][]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getTableNametoUnchangedColsSQL, c.metadataSchema, + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getTableNameToUnchangedColsSQL, c.rawSchema, rawTableIdentifier, normalizeBatchID, syncBatchID)) if err != nil { return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err) @@ -533,27 +461,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. return nil, err } - // transaction for SyncRecords - syncRecordsTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return nil, err - } - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := syncRecordsTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for SyncRecords: %v", - slog.Any("error", deferErr), slog.Int64("syncBatchID", req.SyncBatchID)) - } - }() - - // updating metadata with new offset and syncBatchID - err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckpointID, req.SyncBatchID, syncRecordsTx) - if err != nil { - return nil, err - } - // transaction commits - err = syncRecordsTx.Commit() + err = c.pgMetadata.FinishBatch(req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID) if err != nil { return nil, err } @@ -576,7 +484,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( qrepConfig := &protos.QRepConfig{ StagingPath: "", FlowJobName: req.FlowJobName, - DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s.%s", c.metadataSchema, + DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s.%s", c.rawSchema, rawTableIdentifier)), } avroSyncer := NewSnowflakeAvroSyncHandler(qrepConfig, c) @@ -625,12 +533,12 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest }, nil } - jobMetadataExists, err := c.jobMetadataExists(req.FlowJobName) + rawSchemaExists, err := c.rawSchemaExists(req.FlowJobName) if err != nil { return nil, err } // sync hasn't created job metadata yet, chill. - if !jobMetadataExists { + if !rawSchemaExists { return &model.NormalizeResponse{ Done: false, }, nil @@ -644,7 +552,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest return nil, err } - tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, req.SyncBatchID, normBatchID) + tableNameToUnchangedToastCols, err := c.getTableNameToUnchangedCols(req.FlowJobName, req.SyncBatchID, normBatchID) if err != nil { return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err) } @@ -663,7 +571,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, normalizedTableSchema: req.TableNameSchemaMapping[tableName], - unchangedToastColumns: tableNametoUnchangedToastCols[tableName], + unchangedToastColumns: tableNameToUnchangedToastCols[tableName], peerdbCols: &protos.PeerDBColumns{ SoftDelete: req.SoftDelete, SoftDeleteColName: req.SoftDeleteColName, @@ -706,8 +614,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest return nil, fmt.Errorf("error while normalizing records: %w", err) } - // updating metadata with new normalizeBatchID - err = c.updateNormalizeMetadata(req.FlowJobName, req.SyncBatchID) + err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, req.SyncBatchID) if err != nil { return nil, err } @@ -720,20 +627,20 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest } func (c *SnowflakeConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { - rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) + _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.rawSchema)) + if err != nil { + return nil, err + } createRawTableTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { return nil, fmt.Errorf("unable to begin transaction for creation of raw table: %w", err) } - err = c.createPeerDBInternalSchema(createRawTableTx) - if err != nil { - return nil, err - } // there is no easy way to check if a table has the same schema in Snowflake, // so just executing the CREATE TABLE IF NOT EXISTS blindly. + rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) _, err = createRawTableTx.ExecContext(c.ctx, - fmt.Sprintf(createRawTableSQL, c.metadataSchema, rawTableIdentifier)) + fmt.Sprintf(createRawTableSQL, c.rawSchema, rawTableIdentifier)) if err != nil { return nil, fmt.Errorf("unable to create raw table: %w", err) } @@ -754,6 +661,11 @@ func (c *SnowflakeConnector) CreateRawTable(req *protos.CreateRawTableInput) (*p } func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return fmt.Errorf("unable to clear metadata for sync flow cleanup: %w", err) + } + syncFlowCleanupTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { return fmt.Errorf("unable to begin transaction for sync flow cleanup: %w", err) @@ -765,31 +677,6 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } }() - row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) - var schemaExists pgtype.Bool - err = row.Scan(&schemaExists) - if err != nil { - return fmt.Errorf("unable to check if internal schema exists: %w", err) - } - - if schemaExists.Bool { - _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema, - getRawTableIdentifier(jobName))) - if err != nil { - return fmt.Errorf("unable to drop raw table: %w", err) - } - _, err = syncFlowCleanupTx.ExecContext(c.ctx, - fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) - if err != nil { - return fmt.Errorf("unable to delete job metadata: %w", err) - } - } - - err = syncFlowCleanupTx.Commit() - if err != nil { - return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err) - } - err = c.dropStage("", jobName) if err != nil { return err @@ -861,92 +748,16 @@ func getRawTableIdentifier(jobName string) string { return fmt.Sprintf("%s_%s", rawTablePrefix, jobName) } -func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) { +func (c *SnowflakeConnector) rawSchemaExists(jobName string) (bool, error) { var result pgtype.Bool err := c.database.QueryRowContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - return false, fmt.Errorf("error reading result row: %w", err) - } - return result.Bool, nil -} - -func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { - var result pgtype.Bool - err := tx.QueryRowContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.rawSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } return result.Bool, nil } -func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64, - syncBatchID int64, syncRecordsTx *sql.Tx, -) error { - jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) - if err != nil { - return fmt.Errorf("failed to get sync status for flow job: %w", err) - } - - if !jobMetadataExists { - _, err := syncRecordsTx.ExecContext(c.ctx, - fmt.Sprintf(insertJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), - flowJobName, lastCP, syncBatchID, 0) - if err != nil { - return fmt.Errorf("failed to insert flow job status: %w", err) - } - } else { - _, err := syncRecordsTx.ExecContext(c.ctx, - fmt.Sprintf(updateMetadataForSyncRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier), - lastCP, syncBatchID, flowJobName) - if err != nil { - return fmt.Errorf("failed to update flow job status: %w", err) - } - } - - return nil -} - -func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) - if err != nil { - return fmt.Errorf("failed to get sync status for flow job: %w", err) - } - if !jobMetadataExists { - return fmt.Errorf("job metadata does not exist, unable to update") - } - - stmt := fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier) - _, err = c.database.ExecContext(c.ctx, stmt, normalizeBatchID, flowJobName) - if err != nil { - return fmt.Errorf("failed to update metadata for NormalizeTables: %w", err) - } - - return nil -} - -func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error { - // check if the internal schema exists - row := createSchemaTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) - var schemaExists pgtype.Bool - err := row.Scan(&schemaExists) - if err != nil { - return fmt.Errorf("error while reading result row: %w", err) - } - - if schemaExists.Bool { - c.logger.Info(fmt.Sprintf("internal schema %s already exists", c.metadataSchema)) - return nil - } - - _, err = createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) - if err != nil { - return fmt.Errorf("error while creating internal schema for PeerDB: %w", err) - } - return nil -} - func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { renameTablesTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { From 6121541743f9d73fd9533f3f0f6932299c3fc829 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 31 Jan 2024 16:49:46 +0000 Subject: [PATCH 2/7] checkJSONValue: error on empty results --- flow/e2e/bigquery/peer_flow_bq_test.go | 4 ++++ flow/e2e/snowflake/qrep_flow_sf_test.go | 8 +++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 17cd3ecbea..e180e215eb 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -81,6 +81,10 @@ func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, va return fmt.Errorf("json value check failed: %v", err) } + if len(res.Records) == 0 { + return fmt.Errorf("bad json: empty result set from %s", tableName) + } + jsonVal := res.Records[0][0].Value if jsonVal != value { return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index ad47232b1e..9541d61c86 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -19,13 +19,15 @@ func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) } func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, value string) error { - res, err := s.sfHelper.ExecuteAndProcessQuery(fmt.Sprintf( - "SELECT %s:%s FROM %s;", - colName, fieldName, tableName)) + res, err := s.sfHelper.ExecuteAndProcessQuery(fmt.Sprintf("SELECT %s:%s FROM %s", colName, fieldName, tableName)) if err != nil { return fmt.Errorf("json value check failed: %v", err) } + if len(res.Records) == 0 { + return fmt.Errorf("bad json: empty result set from %s", tableName) + } + jsonVal := res.Records[0][0].Value if jsonVal != value { return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value) From 329d77c6ce343a79e1861be5c53c7206d71f07eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 31 Jan 2024 17:10:05 +0000 Subject: [PATCH 3/7] remove raw schema exists check with mirror jobs table --- flow/connectors/snowflake/snowflake.go | 28 +++----------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7f9b3c33fe..6c50aecfe8 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -26,10 +26,9 @@ import ( ) const ( - mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS" - rawTablePrefix = "_PEERDB_RAW" - createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" - createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, + rawTablePrefix = "_PEERDB_RAW" + createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" + createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, _PEERDB_TIMESTAMP INT NOT NULL,_PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL,_PEERDB_DATA STRING NOT NULL, _PEERDB_RECORD_TYPE INTEGER NOT NULL, _PEERDB_MATCH_DATA STRING,_PEERDB_BATCH_ID INT, _PEERDB_UNCHANGED_TOAST_COLUMNS STRING)` @@ -63,7 +62,6 @@ const ( checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? and TABLE_NAME=?` - checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" @@ -533,16 +531,6 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest }, nil } - rawSchemaExists, err := c.rawSchemaExists(req.FlowJobName) - if err != nil { - return nil, err - } - // sync hasn't created job metadata yet, chill. - if !rawSchemaExists { - return &model.NormalizeResponse{ - Done: false, - }, nil - } destinationTableNames, err := c.getDistinctTableNamesInBatch( req.FlowJobName, req.SyncBatchID, @@ -748,16 +736,6 @@ func getRawTableIdentifier(jobName string) string { return fmt.Sprintf("%s_%s", rawTablePrefix, jobName) } -func (c *SnowflakeConnector) rawSchemaExists(jobName string) (bool, error) { - var result pgtype.Bool - err := c.database.QueryRowContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.rawSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - return false, fmt.Errorf("error reading result row: %w", err) - } - return result.Bool, nil -} - func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { renameTablesTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { From 079df00cc362d7391b0e20b78bbe8cfcb0621b60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 31 Jan 2024 17:48:59 +0000 Subject: [PATCH 4/7] qrep metadata: set primary key on (job_name, partition_id), ON CONFLICT DO UPDATE --- flow/connectors/external_metadata/store.go | 27 ++++++++-------------- flow/connectors/snowflake/qrep.go | 2 +- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 3be58019b8..93649ffcda 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -139,22 +139,14 @@ func (p *PostgresMetadataStore) SetupMetadata() error { partition_id TEXT NOT NULL, sync_partition JSON NOT NULL, sync_start_time TIMESTAMPTZ NOT NULL, - sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW() + sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY(job_name, partition_id) )`) if err != nil && !utils.IsUniqueError(err) { p.logger.Error("failed to create qrep metadata table", slog.Any("error", err)) return err } - _, err = p.conn.Exec(p.ctx, - `CREATE INDEX IF NOT EXISTS ix_qrep_metadata_partition_id ON `+ - p.QualifyTable(qrepTableName)+ - ` USING hash (partition_id)`) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create qrep metadata index", slog.Any("error", err)) - return err - } - p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)) return nil } @@ -287,22 +279,23 @@ func (p *PostgresMetadataStore) FinishQrepPartition( _, err = p.conn.Exec(p.ctx, `INSERT INTO `+p.QualifyTable(qrepTableName)+ - `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)`, + `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4) + ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`, jobName, partition.PartitionId, partitionJSON, startTime) return err } -func (p *PostgresMetadataStore) IsQrepPartitionSynced(partitionID string) (bool, error) { - var count int64 +func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) { + var exists bool err := p.conn.QueryRow(p.ctx, - `SELECT COUNT(*) FROM `+ + `SELECT EXISTS(SELECT * FROM `+ p.QualifyTable(qrepTableName)+ - ` WHERE partition_id = $1`, - partitionID).Scan(&count) + ` WHERE job_name = $1 AND partition_id = $2)`, + jobName, partitionID).Scan(&exists) if err != nil { return false, fmt.Errorf("failed to execute query: %w", err) } - return count > 0, nil + return exists, nil } func (p *PostgresMetadataStore) DropMetadata(jobName string) error { diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index e3037d298e..a5bd38fe02 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -33,7 +33,7 @@ func (c *SnowflakeConnector) SyncQRepRecords( } c.logger.Info("Called QRep sync function and obtained table schema", flowLog) - done, err := c.pgMetadata.IsQrepPartitionSynced(partition.PartitionId) + done, err := c.pgMetadata.IsQrepPartitionSynced(config.FlowJobName, partition.PartitionId) if err != nil { return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) } From 272ec5c75ad3b2ed7d4ba9e9f5f1dd8bcb14b4f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 1 Feb 2024 00:16:58 +0000 Subject: [PATCH 5/7] move metadata schema creation into refinery migration --- flow/connectors/clickhouse/cdc.go | 9 +- flow/connectors/clickhouse/clickhouse.go | 3 +- flow/connectors/eventhub/eventhub.go | 11 +- flow/connectors/external_metadata/store.go | 113 ++++----------------- flow/connectors/s3/s3.go | 7 +- flow/connectors/snowflake/snowflake.go | 6 +- nexus/catalog/migrations/V19__metadata.sql | 16 +++ 7 files changed, 48 insertions(+), 117 deletions(-) create mode 100644 nexus/catalog/migrations/V19__metadata.sql diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 870a77f21d..422502e7b5 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -158,18 +158,11 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string, return nil } -// external func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *ClickhouseConnector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error("failed to setup metadata tables", slog.Any("error", err)) - return err - } - return nil } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index a66e396ded..43b8d028d3 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -31,8 +31,7 @@ func NewClickhouseConnector(ctx context.Context, return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } - metadataSchemaName := "peerdb_ch_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 3707dcb2a9..5386eb11a4 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -39,8 +39,7 @@ func NewEventHubConnector( } hubManager := NewEventHubManager(defaultAzureCreds, config) - metadataSchemaName := "peerdb_eventhub_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) @@ -73,16 +72,10 @@ func (c *EventHubConnector) ConnectionActive() error { } func (c *EventHubConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *EventHubConnector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error(fmt.Sprintf("failed to setup metadata tables: %v", err)) - return err - } - return nil } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 4e6613e902..d70a851e44 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -11,26 +11,23 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "google.golang.org/protobuf/encoding/protojson" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) const ( - lastSyncStateTableName = "last_sync_state" - qrepTableName = "qrep_metadata" + lastSyncStateTableName = "metadata_last_sync_state" + qrepTableName = "metadata_qrep_partitions" ) type PostgresMetadataStore struct { - ctx context.Context - pool *pgxpool.Pool - schemaName string - logger slog.Logger + ctx context.Context + pool *pgxpool.Pool + logger slog.Logger } -func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) { +func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, error) { pool, err := cc.GetCatalogConnectionPoolFromEnv() if err != nil { return nil, fmt.Errorf("failed to create catalog connection pool: %w", err) @@ -38,17 +35,12 @@ func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*Postgres flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &PostgresMetadataStore{ - ctx: ctx, - pool: pool, - schemaName: schemaName, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + ctx: ctx, + pool: pool, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), }, nil } -func (p *PostgresMetadataStore) QualifyTable(table string) string { - return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table) -} - func (p *PostgresMetadataStore) Ping() error { pingErr := p.pool.Ping(p.ctx) if pingErr != nil { @@ -58,68 +50,10 @@ func (p *PostgresMetadataStore) Ping() error { return nil } -func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { - // check if schema exists - row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) - - var exists pgtype.Int8 - err := row.Scan(&exists) - if err != nil { - p.logger.Error("failed to check if schema exists", slog.Any("error", err)) - return false - } - - if exists.Int64 > 0 { - return true - } - - return true -} - -func (p *PostgresMetadataStore) SetupMetadata() error { - // create the schema - _, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create schema", slog.Any("error", err)) - return err - } - - // create the last sync state table - _, err = p.pool.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+`( - job_name TEXT PRIMARY KEY NOT NULL, - last_offset BIGINT NOT NULL, - updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - sync_batch_id BIGINT NOT NULL, - normalize_batch_id BIGINT - )`) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create last sync state table", slog.Any("error", err)) - return err - } - - _, err = p.pool.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.QualifyTable(qrepTableName)+`( - job_name TEXT NOT NULL, - partition_id TEXT NOT NULL, - sync_partition JSON NOT NULL, - sync_start_time TIMESTAMPTZ NOT NULL, - sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), - PRIMARY KEY(job_name, partition_id) - )`) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create qrep metadata table", slog.Any("error", err)) - return err - } - - p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)) - return nil -} - func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { row := p.pool.QueryRow(p.ctx, `SELECT last_offset FROM `+ - p.QualifyTable(lastSyncStateTableName)+ + lastSyncStateTableName+ ` WHERE job_name = $1`, jobName) var offset pgtype.Int8 err := row.Scan(&offset) @@ -140,7 +74,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { row := p.pool.QueryRow(p.ctx, `SELECT sync_batch_id FROM `+ - p.QualifyTable(lastSyncStateTableName)+ + lastSyncStateTableName+ ` WHERE job_name = $1`, jobName) var syncBatchID pgtype.Int8 @@ -162,7 +96,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, `SELECT normalize_batch_id FROM `+ - p.QualifyTable(lastSyncStateTableName)+ + lastSyncStateTableName+ ` WHERE job_name = $1`, jobName) var normalizeBatchID pgtype.Int8 @@ -185,10 +119,10 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { p.logger.Info("updating last offset", slog.Int64("offset", offset)) _, err := p.pool.Exec(p.ctx, ` - INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), + DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), updated_at = NOW() `, jobName, offset, 0) if err != nil { @@ -202,12 +136,12 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error { p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset)) _, err := p.pool.Exec(p.ctx, ` - INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) DO UPDATE SET - last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), - sync_batch_id = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.sync_batch_id, excluded.sync_batch_id), + last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), + sync_batch_id = GREATEST(`+lastSyncStateTableName+`.sync_batch_id, excluded.sync_batch_id), updated_at = NOW() `, jobName, offset, syncBatchID) if err != nil { @@ -221,7 +155,7 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error { p.logger.Info("updating normalize batch id for job") _, err := p.pool.Exec(p.ctx, - `UPDATE `+p.QualifyTable(lastSyncStateTableName)+ + `UPDATE `+lastSyncStateTableName+ ` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID) if err != nil { p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) @@ -243,7 +177,7 @@ func (p *PostgresMetadataStore) FinishQrepPartition( partitionJSON := string(pbytes) _, err = p.pool.Exec(p.ctx, - `INSERT INTO `+p.QualifyTable(qrepTableName)+ + `INSERT INTO `+qrepTableName+ `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4) ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`, jobName, partition.PartitionId, partitionJSON, startTime) @@ -253,8 +187,7 @@ func (p *PostgresMetadataStore) FinishQrepPartition( func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) { var exists bool err := p.pool.QueryRow(p.ctx, - `SELECT EXISTS(SELECT * FROM `+ - p.QualifyTable(qrepTableName)+ + `SELECT EXISTS(SELECT * FROM `+qrepTableName+ ` WHERE job_name = $1 AND partition_id = $2)`, jobName, partitionID).Scan(&exists) if err != nil { @@ -265,15 +198,13 @@ func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionI func (p *PostgresMetadataStore) DropMetadata(jobName string) error { _, err := p.pool.Exec(p.ctx, - `DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+ - ` WHERE job_name = $1`, jobName) + `DELETE FROM `+lastSyncStateTableName+` WHERE job_name = $1`, jobName) if err != nil { return err } _, err = p.pool.Exec(p.ctx, - `DELETE FROM `+p.QualifyTable(qrepTableName)+ - ` WHERE job_name = $1`, jobName) + `DELETE FROM `+qrepTableName+` WHERE job_name = $1`, jobName) if err != nil { return err } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 7d258ad567..95213c6123 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -65,8 +65,7 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } - metadataSchemaName := "peerdb_s3_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err @@ -149,11 +148,11 @@ func (c *S3Connector) ConnectionActive() error { } func (c *S3Connector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *S3Connector) SetupMetadataTables() error { - return c.pgMetadata.SetupMetadata() + return nil } func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index b325e96f7e..7394110b86 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -202,7 +202,7 @@ func NewSnowflakeConnector(ctx context.Context, rawSchema = *snowflakeProtoConfig.MetadataSchema } - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, "peerdb_sf_metadata") + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { return nil, fmt.Errorf("could not connect to metadata store: %w", err) } @@ -240,11 +240,11 @@ func (c *SnowflakeConnector) ConnectionActive() error { } func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *SnowflakeConnector) SetupMetadataTables() error { - return c.pgMetadata.SetupMetadata() + return nil } // only used for testing atm. doesn't return info about pkey or ReplicaIdentity [which is PG specific anyway]. diff --git a/nexus/catalog/migrations/V19__metadata.sql b/nexus/catalog/migrations/V19__metadata.sql new file mode 100644 index 0000000000..886dc66eea --- /dev/null +++ b/nexus/catalog/migrations/V19__metadata.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS metadata_last_sync_state ( + job_name TEXT PRIMARY KEY NOT NULL, + last_offset BIGINT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + sync_batch_id BIGINT NOT NULL, + normalize_batch_id BIGINT +); + +CREATE TABLE IF NOT EXISTS metadata_qrep_partitions ( + job_name TEXT NOT NULL, + partition_id TEXT NOT NULL, + sync_partition JSON NOT NULL, + sync_start_time TIMESTAMPTZ NOT NULL, + sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (job_name, partition_id) +); From 8bf400623ef3078c954fcf345c1bb9a08b6a46de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 1 Feb 2024 02:38:46 +0000 Subject: [PATCH 6/7] cleanup --- flow/connectors/snowflake/qrep_avro_sync.go | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index e72b0bc434..8ca90b01c8 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -131,7 +131,7 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( } s.connector.logger.Info("Put file to stage in Avro sync for snowflake", partitionLog) - err = s.insertMetadata(partition, config.FlowJobName, startTime) + err = s.connector.pgMetadata.FinishQrepPartition(partition, config.FlowJobName, startTime) if err != nil { return -1, err } @@ -283,18 +283,3 @@ func (s *SnowflakeAvroSyncHandler) putFileToStage(avroFile *avro.AvroFile, stage s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage)) return nil } - -func (s *SnowflakeAvroSyncHandler) insertMetadata( - partition *protos.QRepPartition, - flowJobName string, - startTime time.Time, -) error { - partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) - err := s.connector.pgMetadata.FinishQrepPartition(partition, flowJobName, startTime) - if err != nil { - return fmt.Errorf("failed to execute metadata insert statement: %w", err) - } - - s.connector.logger.Info("inserted metadata for partition", partitionLog) - return nil -} From cbba83a09fbf9db6ded470fe0a05a89051f50e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 1 Feb 2024 02:38:28 +0000 Subject: [PATCH 7/7] bq: use external metadata store --- flow/connectors/bigquery/bigquery.go | 226 ++------------------- flow/connectors/bigquery/qrep.go | 100 +-------- flow/connectors/bigquery/qrep_avro_sync.go | 46 ++--- flow/connectors/external_metadata/store.go | 9 + 4 files changed, 50 insertions(+), 331 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index edb3cbdcf3..876e437a11 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -18,6 +18,7 @@ import ( "google.golang.org/api/iterator" "google.golang.org/api/option" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -27,14 +28,6 @@ import ( ) const ( - // MirrorJobsTable has the following schema: - // CREATE TABLE peerdb_mirror_jobs ( - // mirror_job_id STRING NOT NULL, - // offset INTEGER NOT NULL, - // sync_batch_id INTEGER NOT NULL - // normalize_batch_id INTEGER - // ) - MirrorJobsTable = "peerdb_mirror_jobs" SyncRecordsBatchSize = 1024 ) @@ -57,6 +50,7 @@ type BigQueryConnector struct { bqConfig *protos.BigqueryConfig client *bigquery.Client storageClient *storage.Client + pgMetadata *metadataStore.PostgresMetadataStore datasetID string projectID string catalogPool *pgxpool.Pool @@ -236,6 +230,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* client: client, datasetID: datasetID, projectID: projectID, + pgMetadata: metadataStore.NewPostgresMetadataStoreFromCatalog(ctx, catalogPool), storageClient: storageClient, catalogPool: catalogPool, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), @@ -263,10 +258,8 @@ func (c *BigQueryConnector) ConnectionActive() error { return nil } -// NeedsSetupMetadataTables returns true if the metadata tables need to be set up. func (c *BigQueryConnector) NeedsSetupMetadataTables() bool { - _, err := c.client.DatasetInProject(c.projectID, c.datasetID).Table(MirrorJobsTable).Metadata(c.ctx) - return err != nil + return false } func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error { @@ -324,135 +317,24 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, return nil } -// SetupMetadataTables sets up the metadata tables. func (c *BigQueryConnector) SetupMetadataTables() error { - // check if the dataset exists - dataset := c.client.DatasetInProject(c.projectID, c.datasetID) - if _, err := dataset.Metadata(c.ctx); err != nil { - // create the dataset as it doesn't exist - if err := dataset.Create(c.ctx, nil); err != nil { - return fmt.Errorf("failed to create dataset %s: %w", c.datasetID, err) - } - } - - // Create the mirror jobs table, NeedsSetupMetadataTables ensures it doesn't exist. - mirrorJobsTable := dataset.Table(MirrorJobsTable) - mirrorJobsTableMetadata := &bigquery.TableMetadata{ - Schema: bigquery.Schema{ - {Name: "mirror_job_name", Type: bigquery.StringFieldType}, - {Name: "offset", Type: bigquery.IntegerFieldType}, - {Name: "sync_batch_id", Type: bigquery.IntegerFieldType}, - {Name: "normalize_batch_id", Type: bigquery.IntegerFieldType}, - }, - } - if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil { - // if the table already exists, ignore the error - if !strings.Contains(err.Error(), "Already Exists") { - return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) - } else { - c.logger.Info(fmt.Sprintf("table %s already exists", MirrorJobsTable)) - } - } - return nil } func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT offset FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName) - q := c.client.Query(query) - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return 0, err - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - c.logger.Info("no row found, returning nil") - return 0, nil - } - - if row[0] == nil { - c.logger.Info("no offset found, returning nil") - return 0, nil - } else { - return row[0].(int64), nil - } + return c.pgMetadata.FetchLastOffset(jobName) } -func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error { - query := fmt.Sprintf( - "UPDATE %s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'", - MirrorJobsTable, - lastOffset, - jobName, - ) - q := c.client.Query(query) - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - _, err := q.Read(c.ctx) - if err != nil { - return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - } - - return nil +func (c *BigQueryConnector) SetLastOffset(jobName string, offset int64) error { + return c.pgMetadata.UpdateLastOffset(jobName, offset) } func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT sync_batch_id FROM %s WHERE mirror_job_name = '%s'", - MirrorJobsTable, jobName) - q := c.client.Query(query) - q.DisableQueryCache = true - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return -1, err - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - c.logger.Info("no row found") - return 0, nil - } - - if row[0] == nil { - c.logger.Info("no sync_batch_id found, returning 0") - return 0, nil - } else { - return row[0].(int64), nil - } + return c.pgMetadata.GetLastBatchID(jobName) } func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT normalize_batch_id FROM %s WHERE mirror_job_name = '%s'", - MirrorJobsTable, jobName) - q := c.client.Query(query) - q.DisableQueryCache = true - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return 0, err - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - c.logger.Info("no row found for job") - return 0, nil - } - - if row[0] != nil { - return row[0].(int64), nil - } - return 0, nil + return c.pgMetadata.GetLastNormalizeBatchID(jobName) } func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -591,20 +473,15 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } - hasJob, err := c.metadataHasJob(req.FlowJobName) - if err != nil { - return nil, fmt.Errorf("failed to check if job exists: %w", err) - } - // if job is not yet found in the peerdb_mirror_jobs_table - // OR sync is lagging end normalize - if !hasJob || normBatchID >= req.SyncBatchID { - c.logger.Info("waiting for sync to catch up, so finishing") + // normalize has caught up with sync, chill until more records are loaded. + if normBatchID >= req.SyncBatchID { return &model.NormalizeResponse{ Done: false, StartBatchID: normBatchID, EndBatchID: req.SyncBatchID, }, nil } + distinctTableNames, err := c.getDistinctTableNamesInBatch( req.FlowJobName, req.SyncBatchID, @@ -662,17 +539,10 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } } } - // update metadata to make the last normalized batch id to the recent last sync batch id. - updateMetadataStmt := fmt.Sprintf( - "UPDATE %s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", - MirrorJobsTable, req.SyncBatchID, req.FlowJobName) - - query := c.client.Query(updateMetadataStmt) - query.DefaultProjectID = c.projectID - query.DefaultDatasetID = c.datasetID - _, err = query.Read(c.ctx) + + err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, req.SyncBatchID) if err != nil { - return nil, fmt.Errorf("failed to execute update metadata statements %s: %v", updateMetadataStmt, err) + return nil, err } return &model.NormalizeResponse{ @@ -754,56 +624,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr }, nil } -// getUpdateMetadataStmt updates the metadata tables for a given job. -func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64, - batchID int64, -) (string, error) { - hasJob, err := c.metadataHasJob(jobName) - if err != nil { - return "", fmt.Errorf("failed to check if job exists: %w", err) - } - - // create the job in the metadata table - jobStatement := fmt.Sprintf( - "INSERT INTO %s (mirror_job_name,offset,sync_batch_id) VALUES ('%s',%d,%d);", - MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID) - if hasJob { - jobStatement = fmt.Sprintf( - "UPDATE %s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';", - MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName) - } - - return jobStatement, nil -} - -// metadataHasJob checks if the metadata table has the given job. -func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) { - checkStmt := fmt.Sprintf( - "SELECT COUNT(*) FROM %s WHERE mirror_job_name = '%s'", - MirrorJobsTable, jobName) - - q := c.client.Query(checkStmt) - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = c.datasetID - it, err := q.Read(c.ctx) - if err != nil { - return false, fmt.Errorf("failed to check if job exists: %w", err) - } - - var row []bigquery.Value - err = it.Next(&row) - if err != nil { - return false, fmt.Errorf("failed read row: %w", err) - } - - count, ok := row[0].(int64) - if !ok { - return false, fmt.Errorf("failed to convert count to int64") - } - - return count > 0, nil -} - // SetupNormalizedTables sets up normalized tables, implementing the Connector interface. // This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided. func (c *BigQueryConnector) SetupNormalizedTables( @@ -911,22 +731,18 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return fmt.Errorf("unable to clear metadata for sync flow cleanup: %w", err) + } + dataset := c.client.DatasetInProject(c.projectID, c.datasetID) // deleting PeerDB specific tables - err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } - // deleting job from metadata table - query := fmt.Sprintf("DELETE FROM %s WHERE mirror_job_name = '%s'", MirrorJobsTable, jobName) - queryHandler := c.client.Query(query) - queryHandler.DefaultProjectID = c.projectID - queryHandler.DefaultDatasetID = c.datasetID - _, err = queryHandler.Read(c.ctx) - if err != nil { - return fmt.Errorf("failed to delete job from metadata table: %w", err) - } return nil } diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index c0aafe045f..8978f82a11 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -3,13 +3,9 @@ package connbigquery import ( "fmt" "log/slog" - "reflect" "strings" - "time" "cloud.google.com/go/bigquery" - "google.golang.org/api/iterator" - "google.golang.org/protobuf/encoding/protojson" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -32,7 +28,7 @@ func (c *BigQueryConnector) SyncQRepRecords( return 0, err } - done, err := c.isPartitionSynced(partition.PartitionId) + done, err := c.pgMetadata.IsQrepPartitionSynced(config.FlowJobName, partition.PartitionId) if err != nil { return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) } @@ -97,69 +93,12 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfi return dstTableMetadata, nil } -func (c *BigQueryConnector) createMetadataInsertStatement( - partition *protos.QRepPartition, - jobName string, - startTime time.Time, -) (string, error) { - // marshal the partition to json using protojson - pbytes, err := protojson.Marshal(partition) - if err != nil { - return "", fmt.Errorf("failed to marshal partition to json: %v", err) - } - - // convert the bytes to string - partitionJSON := string(pbytes) - - insertMetadataStmt := fmt.Sprintf( - "INSERT INTO _peerdb_query_replication_metadata"+ - "(flowJobName, partitionID, syncPartition, syncStartTime, syncFinishTime) "+ - "VALUES ('%s', '%s', JSON '%s', TIMESTAMP('%s'), CURRENT_TIMESTAMP());", - jobName, partition.PartitionId, - partitionJSON, startTime.Format(time.RFC3339)) - - return insertMetadataStmt, nil -} - func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - qRepMetadataTableName := "_peerdb_query_replication_metadata" - - // define the schema - qRepMetadataSchema := bigquery.Schema{ - {Name: "flowJobName", Type: bigquery.StringFieldType}, - {Name: "partitionID", Type: bigquery.StringFieldType}, - {Name: "syncPartition", Type: bigquery.JSONFieldType}, - {Name: "syncStartTime", Type: bigquery.TimestampFieldType}, - {Name: "syncFinishTime", Type: bigquery.TimestampFieldType}, - } - - // reference the table - table := c.client.DatasetInProject(c.projectID, c.datasetID).Table(qRepMetadataTableName) - - // check if the table exists - meta, err := table.Metadata(c.ctx) - if err == nil { - // table exists, check if the schema matches - if !reflect.DeepEqual(meta.Schema, qRepMetadataSchema) { - return fmt.Errorf("table %s.%s already exists with different schema", c.datasetID, qRepMetadataTableName) - } else { - return nil - } - } - - // table does not exist, create it - err = table.Create(c.ctx, &bigquery.TableMetadata{ - Schema: qRepMetadataSchema, - }) - if err != nil { - return fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, qRepMetadataTableName, err) - } - if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { query := c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) query.DefaultDatasetID = c.datasetID query.DefaultProjectID = c.projectID - _, err = query.Read(c.ctx) + _, err := query.Read(c.ctx) if err != nil { return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) } @@ -167,38 +106,3 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e return nil } - -func (c *BigQueryConnector) isPartitionSynced(partitionID string) (bool, error) { - queryString := fmt.Sprintf( - "SELECT COUNT(*) FROM _peerdb_query_replication_metadata WHERE partitionID = '%s';", - partitionID, - ) - - query := c.client.Query(queryString) - query.DefaultDatasetID = c.datasetID - query.DefaultProjectID = c.projectID - it, err := query.Read(c.ctx) - if err != nil { - return false, fmt.Errorf("failed to execute query: %w", err) - } - - var values []bigquery.Value - err = it.Next(&values) - if err == iterator.Done { - return false, nil - } - if err != nil { - return false, fmt.Errorf("failed to iterate query results: %w", err) - } - - if len(values) != 1 { - return false, fmt.Errorf("expected 1 value, got %d", len(values)) - } - - count, ok := values[0].(int64) - if !ok { - return false, fmt.Errorf("failed to convert %v to int64", reflect.TypeOf(values[0])) - } - - return count > 0, nil -} diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index c8b182706f..894475c6b7 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -63,7 +63,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( table: stagingTable, }, stream, req.FlowJobName) if err != nil { - return nil, fmt.Errorf("failed to push to avro stage: %v", err) + return nil, fmt.Errorf("failed to push to avro stage: %w", err) } bqClient := s.connector.client @@ -73,11 +73,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( lastCP, err := req.Records.GetLastCheckpoint() if err != nil { - return nil, fmt.Errorf("failed to get last checkpoint: %v", err) - } - updateMetadataStmt, err := s.connector.getUpdateMetadataStmt(req.FlowJobName, lastCP, syncBatchID) - if err != nil { - return nil, fmt.Errorf("failed to update metadata: %v", err) + return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } activity.RecordHeartbeat(s.connector.ctx, @@ -91,18 +87,17 @@ func (s *QRepAvroSyncMethod) SyncRecords( return nil, fmt.Errorf("failed to sync schema changes: %w", err) } - stmts := []string{ - "BEGIN TRANSACTION;", - insertStmt, - updateMetadataStmt, - "COMMIT TRANSACTION;", - } - query := bqClient.Query(strings.Join(stmts, "\n")) + query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID query.DefaultProjectID = s.connector.projectID _, err = query.Read(s.connector.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err) + return nil, fmt.Errorf("failed to execute statements in a transaction: %w", err) + } + + err = s.connector.pgMetadata.FinishBatch(req.FlowJobName, syncBatchID, lastCP) + if err != nil { + return nil, fmt.Errorf("failed to update metadata: %w", err) } // drop the staging table @@ -180,7 +175,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingDatasetTable, stream, flowJobName) if err != nil { - return -1, fmt.Errorf("failed to push to avro stage: %v", err) + return -1, fmt.Errorf("failed to push to avro stage: %w", err) } activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( "Flow job %s: running insert-into-select transaction for"+ @@ -202,24 +197,19 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;", dstTableName, selector, stagingDatasetTable.string()) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) - if err != nil { - return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) - } slog.Info("Performing transaction inside QRep sync function", flowLog) - stmts := []string{ - "BEGIN TRANSACTION;", - insertStmt, - insertMetadataStmt, - "COMMIT TRANSACTION;", - } - query := bqClient.Query(strings.Join(stmts, "\n")) + query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID query.DefaultProjectID = s.connector.projectID _, err = query.Read(s.connector.ctx) if err != nil { - return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) + return -1, fmt.Errorf("failed to execute statements in a transaction: %w", err) + } + + err = s.connector.pgMetadata.FinishQrepPartition(partition, flowJobName, startTime) + if err != nil { + return -1, err } // drop the staging table @@ -283,7 +273,7 @@ func DefineAvroSchema(dstTableName string, avroSchemaJSON, err := json.Marshal(avroSchema) if err != nil { - return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err) + return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %w", err) } return &model.QRecordAvroSchemaDefinition{ diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index d70a851e44..099bd97b51 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -41,6 +41,15 @@ func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, erro }, nil } +func NewPostgresMetadataStoreFromCatalog(ctx context.Context, pool *pgxpool.Pool) *PostgresMetadataStore { + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + return &PostgresMetadataStore{ + ctx: ctx, + pool: pool, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + } +} + func (p *PostgresMetadataStore) Ping() error { pingErr := p.pool.Ping(p.ctx) if pingErr != nil {