diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 963eb60a96..edb3cbdcf3 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -27,12 +27,6 @@ import ( ) const ( - /* - Different batch Ids in code/BigQuery - 1. batchID - identifier in raw table on target to depict which batch a row was inserted. - 3. syncBatchID - batch id that was last synced or will be synced - 4. normalizeBatchID - batch id that was last normalized or will be normalized. - */ // MirrorJobsTable has the following schema: // CREATE TABLE peerdb_mirror_jobs ( // mirror_job_id STRING NOT NULL, diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 870a77f21d..422502e7b5 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -158,18 +158,11 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string, return nil } -// external func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *ClickhouseConnector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error("failed to setup metadata tables", slog.Any("error", err)) - return err - } - return nil } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index a66e396ded..43b8d028d3 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -31,8 +31,7 @@ func NewClickhouseConnector(ctx context.Context, return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } - metadataSchemaName := "peerdb_ch_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 68129a98d5..3d2f179bca 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -195,11 +195,11 @@ func (s *ClickhouseAvroSyncMethod) insertMetadata( if err != nil { s.connector.logger.Error("failed to create metadata insert statement", slog.Any("error", err), partitionLog) - return fmt.Errorf("failed to create metadata insert statement: %v", err) + return fmt.Errorf("failed to create metadata insert statement: %w", err) } if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil { - return fmt.Errorf("failed to execute metadata insert statement: %v", err) + return fmt.Errorf("failed to execute metadata insert statement: %w", err) } return nil diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 3707dcb2a9..5386eb11a4 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -39,8 +39,7 @@ func NewEventHubConnector( } hubManager := NewEventHubManager(defaultAzureCreds, config) - metadataSchemaName := "peerdb_eventhub_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) @@ -73,16 +72,10 @@ func (c *EventHubConnector) ConnectionActive() error { } func (c *EventHubConnector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *EventHubConnector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error(fmt.Sprintf("failed to setup metadata tables: %v", err)) - return err - } - return nil } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 767a7ed732..d70a851e44 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -4,29 +4,30 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "google.golang.org/protobuf/encoding/protojson" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) const ( - lastSyncStateTableName = "last_sync_state" + lastSyncStateTableName = "metadata_last_sync_state" + qrepTableName = "metadata_qrep_partitions" ) type PostgresMetadataStore struct { - ctx context.Context - pool *pgxpool.Pool - schemaName string - logger slog.Logger + ctx context.Context + pool *pgxpool.Pool + logger slog.Logger } -func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) { +func NewPostgresMetadataStore(ctx context.Context) (*PostgresMetadataStore, error) { pool, err := cc.GetCatalogConnectionPoolFromEnv() if err != nil { return nil, fmt.Errorf("failed to create catalog connection pool: %w", err) @@ -34,17 +35,12 @@ func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*Postgres flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &PostgresMetadataStore{ - ctx: ctx, - pool: pool, - schemaName: schemaName, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + ctx: ctx, + pool: pool, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), }, nil } -func (p *PostgresMetadataStore) QualifyTable(table string) string { - return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table) -} - func (p *PostgresMetadataStore) Ping() error { pingErr := p.pool.Ping(p.ctx) if pingErr != nil { @@ -54,57 +50,11 @@ func (p *PostgresMetadataStore) Ping() error { return nil } -func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { - // check if schema exists - row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) - - var exists pgtype.Int8 - err := row.Scan(&exists) - if err != nil { - p.logger.Error("failed to check if schema exists", slog.Any("error", err)) - return false - } - - if exists.Int64 > 0 { - return true - } - - return true -} - -func (p *PostgresMetadataStore) SetupMetadata() error { - // create the schema - _, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create schema", slog.Any("error", err)) - return err - } - - // create the last sync state table - _, err = p.pool.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` ( - job_name TEXT PRIMARY KEY NOT NULL, - last_offset BIGINT NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT NOW(), - sync_batch_id BIGINT NOT NULL, - normalize_batch_id BIGINT - ) - `) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create last sync state table", slog.Any("error", err)) - return err - } - - p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)) - return nil -} - func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { - row := p.pool.QueryRow(p.ctx, ` - SELECT last_offset - FROM `+p.QualifyTable(lastSyncStateTableName)+` - WHERE job_name = $1 - `, jobName) + row := p.pool.QueryRow(p.ctx, + `SELECT last_offset FROM `+ + lastSyncStateTableName+ + ` WHERE job_name = $1`, jobName) var offset pgtype.Int8 err := row.Scan(&offset) if err != nil { @@ -122,11 +72,10 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { } func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { - row := p.pool.QueryRow(p.ctx, ` - SELECT sync_batch_id - FROM `+p.QualifyTable(lastSyncStateTableName)+` - WHERE job_name = $1 - `, jobName) + row := p.pool.QueryRow(p.ctx, + `SELECT sync_batch_id FROM `+ + lastSyncStateTableName+ + ` WHERE job_name = $1`, jobName) var syncBatchID pgtype.Int8 err := row.Scan(&syncBatchID) @@ -145,11 +94,10 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { } func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows := p.pool.QueryRow(p.ctx, ` - SELECT normalize_batch_id - FROM `+p.schemaName+`.`+lastSyncStateTableName+` - WHERE job_name = $1 - `, jobName) + rows := p.pool.QueryRow(p.ctx, + `SELECT normalize_batch_id FROM `+ + lastSyncStateTableName+ + ` WHERE job_name = $1`, jobName) var normalizeBatchID pgtype.Int8 err := rows.Scan(&normalizeBatchID) @@ -171,10 +119,10 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { p.logger.Info("updating last offset", slog.Int64("offset", offset)) _, err := p.pool.Exec(p.ctx, ` - INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), + DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), updated_at = NOW() `, jobName, offset, 0) if err != nil { @@ -188,12 +136,12 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error { p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset)) _, err := p.pool.Exec(p.ctx, ` - INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) DO UPDATE SET - last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), - sync_batch_id = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.sync_batch_id, excluded.sync_batch_id), + last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), + sync_batch_id = GREATEST(`+lastSyncStateTableName+`.sync_batch_id, excluded.sync_batch_id), updated_at = NOW() `, jobName, offset, syncBatchID) if err != nil { @@ -206,10 +154,9 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error { p.logger.Info("updating normalize batch id for job") - _, err := p.pool.Exec(p.ctx, ` - UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` - SET normalize_batch_id=$2 WHERE job_name=$1 - `, jobName, batchID) + _, err := p.pool.Exec(p.ctx, + `UPDATE `+lastSyncStateTableName+ + ` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID) if err != nil { p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) return err @@ -218,10 +165,49 @@ func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID i return nil } -func (p *PostgresMetadataStore) DropMetadata(jobName string) error { - _, err := p.pool.Exec(p.ctx, ` - DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+` - WHERE job_name = $1 - `, jobName) +func (p *PostgresMetadataStore) FinishQrepPartition( + partition *protos.QRepPartition, + jobName string, + startTime time.Time, +) error { + pbytes, err := protojson.Marshal(partition) + if err != nil { + return fmt.Errorf("failed to marshal partition to json: %w", err) + } + partitionJSON := string(pbytes) + + _, err = p.pool.Exec(p.ctx, + `INSERT INTO `+qrepTableName+ + `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4) + ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`, + jobName, partition.PartitionId, partitionJSON, startTime) return err } + +func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) { + var exists bool + err := p.pool.QueryRow(p.ctx, + `SELECT EXISTS(SELECT * FROM `+qrepTableName+ + ` WHERE job_name = $1 AND partition_id = $2)`, + jobName, partitionID).Scan(&exists) + if err != nil { + return false, fmt.Errorf("failed to execute query: %w", err) + } + return exists, nil +} + +func (p *PostgresMetadataStore) DropMetadata(jobName string) error { + _, err := p.pool.Exec(p.ctx, + `DELETE FROM `+lastSyncStateTableName+` WHERE job_name = $1`, jobName) + if err != nil { + return err + } + + _, err = p.pool.Exec(p.ctx, + `DELETE FROM `+qrepTableName+` WHERE job_name = $1`, jobName) + if err != nil { + return err + } + + return nil +} diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index a99266e3a3..95213c6123 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -65,8 +65,7 @@ func NewS3Connector(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to create S3 client: %w", err) } - metadataSchemaName := "peerdb_s3_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err @@ -149,16 +148,10 @@ func (c *S3Connector) ConnectionActive() error { } func (c *S3Connector) NeedsSetupMetadataTables() bool { - return c.pgMetadata.NeedsSetupMetadata() + return false } func (c *S3Connector) SetupMetadataTables() error { - err := c.pgMetadata.SetupMetadata() - if err != nil { - c.logger.Error("failed to setup metadata tables", slog.Any("error", err)) - return err - } - return nil } @@ -170,15 +163,8 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { return c.pgMetadata.FetchLastOffset(jobName) } -// update offset for a job func (c *S3Connector) SetLastOffset(jobName string, offset int64) error { - err := c.pgMetadata.UpdateLastOffset(jobName, offset) - if err != nil { - c.logger.Error("failed to update last offset: ", slog.Any("error", err)) - return err - } - - return nil + return c.pgMetadata.UpdateLastOffset(jobName, offset) } func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 4361b17881..a5bd38fe02 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -5,12 +5,10 @@ import ( "fmt" "log/slog" "strings" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/jackc/pgx/v5/pgtype" - "google.golang.org/protobuf/encoding/protojson" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -18,8 +16,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -const qRepMetadataTableName = "_peerdb_query_replication_metadata" - func (c *SnowflakeConnector) SyncQRepRecords( config *protos.QRepConfig, partition *protos.QRepPartition, @@ -37,7 +33,7 @@ func (c *SnowflakeConnector) SyncQRepRecords( } c.logger.Info("Called QRep sync function and obtained table schema", flowLog) - done, err := c.isPartitionSynced(partition.PartitionId) + done, err := c.pgMetadata.IsQrepPartitionSynced(config.FlowJobName, partition.PartitionId) if err != nil { return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) } @@ -51,30 +47,6 @@ func (c *SnowflakeConnector) SyncQRepRecords( return avroSync.SyncQRepRecords(config, partition, tblSchema, stream) } -func (c *SnowflakeConnector) createMetadataInsertStatement( - partition *protos.QRepPartition, - jobName string, - startTime time.Time, -) (string, error) { - // marshal the partition to json using protojson - pbytes, err := protojson.Marshal(partition) - if err != nil { - return "", fmt.Errorf("failed to marshal partition to json: %v", err) - } - - // convert the bytes to string - partitionJSON := string(pbytes) - - insertMetadataStmt := fmt.Sprintf( - `INSERT INTO %s.%s - (flowJobName, partitionID, syncPartition, syncStartTime, syncFinishTime) - VALUES ('%s', '%s', '%s', '%s'::timestamp, CURRENT_TIMESTAMP);`, - c.metadataSchema, qRepMetadataTableName, jobName, partition.PartitionId, - partitionJSON, startTime.Format(time.RFC3339)) - - return insertMetadataStmt, nil -} - func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) { schematable, err := utils.ParseSchemaTable(tableName) if err != nil { @@ -99,49 +71,13 @@ func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType return columnTypes, nil } -func (c *SnowflakeConnector) isPartitionSynced(partitionID string) (bool, error) { - //nolint:gosec - queryString := fmt.Sprintf(` - SELECT COUNT(*) - FROM %s.%s - WHERE partitionID = '%s' - `, c.metadataSchema, qRepMetadataTableName, partitionID) - - row := c.database.QueryRow(queryString) - - var count pgtype.Int8 - if err := row.Scan(&count); err != nil { - return false, fmt.Errorf("failed to execute query: %w", err) - } - - return count.Int64 > 0, nil -} - func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - // NOTE that Snowflake does not support transactional DDL - createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) - } - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := createMetadataTablesTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for creating metadata tables", - slog.Any("error", deferErr)) - } - }() - err = c.createPeerDBInternalSchema(createMetadataTablesTx) - if err != nil { - return err - } - err = c.createQRepMetadataTable(createMetadataTablesTx) + _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.rawSchema)) if err != nil { return err } stageName := c.getStageNameForJob(config.FlowJobName) - err = c.createStage(stageName, config) if err != nil { return err @@ -154,35 +90,6 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) } } - err = createMetadataTablesTx.Commit() - if err != nil { - return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err) - } - - return nil -} - -func (c *SnowflakeConnector) createQRepMetadataTable(createMetadataTableTx *sql.Tx) error { - // Define the schema - schemaStatement := ` - CREATE TABLE IF NOT EXISTS %s.%s ( - flowJobName STRING, - partitionID STRING, - syncPartition STRING, - syncStartTime TIMESTAMP_LTZ, - syncFinishTime TIMESTAMP_LTZ - ); - ` - queryString := fmt.Sprintf(schemaStatement, c.metadataSchema, qRepMetadataTableName) - - _, err := createMetadataTableTx.Exec(queryString) - if err != nil { - c.logger.Error(fmt.Sprintf("failed to create table %s.%s", c.metadataSchema, qRepMetadataTableName), - slog.Any("error", err)) - return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err) - } - - c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName)) return nil } @@ -371,5 +278,5 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error { } func (c *SnowflakeConnector) getStageNameForJob(job string) string { - return fmt.Sprintf("%s.peerdb_stage_%s", c.metadataSchema, job) + return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job) } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 3c330d636c..8ca90b01c8 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -131,7 +131,7 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( } s.connector.logger.Info("Put file to stage in Avro sync for snowflake", partitionLog) - err = s.insertMetadata(partition, config.FlowJobName, startTime) + err = s.connector.pgMetadata.FinishQrepPartition(partition, config.FlowJobName, startTime) if err != nil { return -1, err } @@ -283,26 +283,3 @@ func (s *SnowflakeAvroSyncHandler) putFileToStage(avroFile *avro.AvroFile, stage s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage)) return nil } - -func (s *SnowflakeAvroSyncHandler) insertMetadata( - partition *protos.QRepPartition, - flowJobName string, - startTime time.Time, -) error { - partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) - if err != nil { - s.connector.logger.Error("failed to create metadata insert statement", - slog.Any("error", err), partitionLog) - return fmt.Errorf("failed to create metadata insert statement: %v", err) - } - - if _, err := s.connector.database.ExecContext(s.connector.ctx, insertMetadataStmt); err != nil { - s.connector.logger.Error("failed to execute metadata insert statement "+insertMetadataStmt, - slog.Any("error", err), partitionLog) - return fmt.Errorf("failed to execute metadata insert statement: %v", err) - } - - s.connector.logger.Info("inserted metadata for partition", partitionLog) - return nil -} diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index dd479b6640..7394110b86 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -17,6 +17,7 @@ import ( "go.temporal.io/sdk/activity" "golang.org/x/sync/errgroup" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -25,9 +26,6 @@ import ( ) const ( - mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS" - createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(MIRROR_JOB_NAME STRING NOT NULL,OFFSET INT NOT NULL, - SYNC_BATCH_ID INT NOT NULL,NORMALIZE_BATCH_ID INT NOT NULL)` rawTablePrefix = "_PEERDB_RAW" createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, @@ -55,22 +53,15 @@ const ( WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE = 2) THEN %s` getDistinctDestinationTableNames = `SELECT DISTINCT _PEERDB_DESTINATION_TABLE_NAME FROM %s.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d` - getTableNametoUnchangedColsSQL = `SELECT _PEERDB_DESTINATION_TABLE_NAME, + getTableNameToUnchangedColsSQL = `SELECT _PEERDB_DESTINATION_TABLE_NAME, ARRAY_AGG(DISTINCT _PEERDB_UNCHANGED_TOAST_COLUMNS) FROM %s.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2 GROUP BY _PEERDB_DESTINATION_TABLE_NAME` getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE UPPER(TABLE_SCHEMA)=? AND UPPER(TABLE_NAME)=? ORDER BY ORDINAL_POSITION` - insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)" - - updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=? - WHERE MIRROR_JOB_NAME=?` - updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" - checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? and TABLE_NAME=?` - checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" @@ -78,14 +69,14 @@ const ( dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" dropSchemaIfExistsSQL = "DROP SCHEMA IF EXISTS %s" - checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" ) type SnowflakeConnector struct { - ctx context.Context - database *sql.DB - metadataSchema string - logger slog.Logger + ctx context.Context + database *sql.DB + pgMetadata *metadataStore.PostgresMetadataStore + rawSchema string + logger slog.Logger } // creating this to capture array results from snowflake. @@ -206,17 +197,23 @@ func NewSnowflakeConnector(ctx context.Context, return nil, fmt.Errorf("could not validate snowflake peer: %w", err) } - metadataSchema := "_PEERDB_INTERNAL" + rawSchema := "_PEERDB_INTERNAL" if snowflakeProtoConfig.MetadataSchema != nil { - metadataSchema = *snowflakeProtoConfig.MetadataSchema + rawSchema = *snowflakeProtoConfig.MetadataSchema + } + + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) + if err != nil { + return nil, fmt.Errorf("could not connect to metadata store: %w", err) } flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &SnowflakeConnector{ - ctx: ctx, - database: database, - metadataSchema: metadataSchema, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + ctx: ctx, + database: database, + pgMetadata: pgMetadata, + rawSchema: rawSchema, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), }, nil } @@ -243,42 +240,10 @@ func (c *SnowflakeConnector) ConnectionActive() error { } func (c *SnowflakeConnector) NeedsSetupMetadataTables() bool { - result, err := c.checkIfTableExists(c.metadataSchema, mirrorJobsTableIdentifier) - if err != nil { - return true - } - return !result + return false } func (c *SnowflakeConnector) SetupMetadataTables() error { - // NOTE that Snowflake does not support transactional DDL - createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) - } - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := createMetadataTablesTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for creating metadata tables", - slog.Any("error", deferErr)) - } - }() - - err = c.createPeerDBInternalSchema(createMetadataTablesTx) - if err != nil { - return err - } - _, err = createMetadataTablesTx.ExecContext(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL, - c.metadataSchema, mirrorJobsTableIdentifier)) - if err != nil { - return fmt.Errorf("error while setting up mirror jobs table: %w", err) - } - err = createMetadataTablesTx.Commit() - if err != nil { - return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err) - } - return nil } @@ -324,58 +289,19 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T } func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { - var result pgtype.Int8 - err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, - c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - if err == sql.ErrNoRows { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - return 0, fmt.Errorf("error while reading result row: %w", err) - } - if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened") - return 0, nil - } - return result.Int64, nil + return c.pgMetadata.FetchLastOffset(jobName) } -func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error { - _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL, - c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) - if err != nil { - return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) - } - return nil +func (c *SnowflakeConnector) SetLastOffset(jobName string, offset int64) error { + return c.pgMetadata.UpdateLastOffset(jobName, offset) } func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { - var result pgtype.Int8 - err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema, - mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - if err == sql.ErrNoRows { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - return 0, fmt.Errorf("error while reading result row: %w", err) - } - return result.Int64, nil + return c.pgMetadata.GetLastBatchID(jobName) } func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - var normBatchID pgtype.Int8 - err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, - mirrorJobsTableIdentifier), jobName).Scan(&normBatchID) - if err != nil { - if err == sql.ErrNoRows { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - return 0, fmt.Errorf("error while reading result row: %w", err) - } - return normBatchID.Int64, nil + return c.pgMetadata.GetLastNormalizeBatchID(jobName) } func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -383,7 +309,7 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy ) ([]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getDistinctDestinationTableNames, c.metadataSchema, + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getDistinctDestinationTableNames, c.rawSchema, rawTableIdentifier, normalizeBatchID, syncBatchID)) if err != nil { return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err) @@ -407,12 +333,12 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy return destinationTableNames, nil } -func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64, +func (c *SnowflakeConnector) getTableNameToUnchangedCols(flowJobName string, syncBatchID int64, normalizeBatchID int64, ) (map[string][]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getTableNametoUnchangedColsSQL, c.metadataSchema, + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getTableNameToUnchangedColsSQL, c.rawSchema, rawTableIdentifier, normalizeBatchID, syncBatchID)) if err != nil { return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err) @@ -533,27 +459,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. return nil, err } - // transaction for SyncRecords - syncRecordsTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return nil, err - } - // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := syncRecordsTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for SyncRecords: %v", - slog.Any("error", deferErr), slog.Int64("syncBatchID", req.SyncBatchID)) - } - }() - - // updating metadata with new offset and syncBatchID - err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckpointID, req.SyncBatchID, syncRecordsTx) - if err != nil { - return nil, err - } - // transaction commits - err = syncRecordsTx.Commit() + err = c.pgMetadata.FinishBatch(req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID) if err != nil { return nil, err } @@ -576,7 +482,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( qrepConfig := &protos.QRepConfig{ StagingPath: "", FlowJobName: req.FlowJobName, - DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s.%s", c.metadataSchema, + DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s.%s", c.rawSchema, rawTableIdentifier)), } avroSyncer := NewSnowflakeAvroSyncHandler(qrepConfig, c) @@ -625,16 +531,6 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest }, nil } - jobMetadataExists, err := c.jobMetadataExists(req.FlowJobName) - if err != nil { - return nil, err - } - // sync hasn't created job metadata yet, chill. - if !jobMetadataExists { - return &model.NormalizeResponse{ - Done: false, - }, nil - } destinationTableNames, err := c.getDistinctTableNamesInBatch( req.FlowJobName, req.SyncBatchID, @@ -644,7 +540,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest return nil, err } - tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, req.SyncBatchID, normBatchID) + tableNameToUnchangedToastCols, err := c.getTableNameToUnchangedCols(req.FlowJobName, req.SyncBatchID, normBatchID) if err != nil { return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err) } @@ -663,7 +559,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, normalizedTableSchema: req.TableNameSchemaMapping[tableName], - unchangedToastColumns: tableNametoUnchangedToastCols[tableName], + unchangedToastColumns: tableNameToUnchangedToastCols[tableName], peerdbCols: &protos.PeerDBColumns{ SoftDelete: req.SoftDelete, SoftDeleteColName: req.SoftDeleteColName, @@ -706,8 +602,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest return nil, fmt.Errorf("error while normalizing records: %w", err) } - // updating metadata with new normalizeBatchID - err = c.updateNormalizeMetadata(req.FlowJobName, req.SyncBatchID) + err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, req.SyncBatchID) if err != nil { return nil, err } @@ -720,20 +615,20 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest } func (c *SnowflakeConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { - rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) + _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.rawSchema)) + if err != nil { + return nil, err + } createRawTableTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { return nil, fmt.Errorf("unable to begin transaction for creation of raw table: %w", err) } - err = c.createPeerDBInternalSchema(createRawTableTx) - if err != nil { - return nil, err - } // there is no easy way to check if a table has the same schema in Snowflake, // so just executing the CREATE TABLE IF NOT EXISTS blindly. + rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) _, err = createRawTableTx.ExecContext(c.ctx, - fmt.Sprintf(createRawTableSQL, c.metadataSchema, rawTableIdentifier)) + fmt.Sprintf(createRawTableSQL, c.rawSchema, rawTableIdentifier)) if err != nil { return nil, fmt.Errorf("unable to create raw table: %w", err) } @@ -754,6 +649,11 @@ func (c *SnowflakeConnector) CreateRawTable(req *protos.CreateRawTableInput) (*p } func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return fmt.Errorf("unable to clear metadata for sync flow cleanup: %w", err) + } + syncFlowCleanupTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { return fmt.Errorf("unable to begin transaction for sync flow cleanup: %w", err) @@ -765,31 +665,6 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } }() - row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) - var schemaExists pgtype.Bool - err = row.Scan(&schemaExists) - if err != nil { - return fmt.Errorf("unable to check if internal schema exists: %w", err) - } - - if schemaExists.Bool { - _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema, - getRawTableIdentifier(jobName))) - if err != nil { - return fmt.Errorf("unable to drop raw table: %w", err) - } - _, err = syncFlowCleanupTx.ExecContext(c.ctx, - fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) - if err != nil { - return fmt.Errorf("unable to delete job metadata: %w", err) - } - } - - err = syncFlowCleanupTx.Commit() - if err != nil { - return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err) - } - err = c.dropStage("", jobName) if err != nil { return err @@ -861,92 +736,6 @@ func getRawTableIdentifier(jobName string) string { return fmt.Sprintf("%s_%s", rawTablePrefix, jobName) } -func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) { - var result pgtype.Bool - err := c.database.QueryRowContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - return false, fmt.Errorf("error reading result row: %w", err) - } - return result.Bool, nil -} - -func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { - var result pgtype.Bool - err := tx.QueryRowContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - return false, fmt.Errorf("error reading result row: %w", err) - } - return result.Bool, nil -} - -func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64, - syncBatchID int64, syncRecordsTx *sql.Tx, -) error { - jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) - if err != nil { - return fmt.Errorf("failed to get sync status for flow job: %w", err) - } - - if !jobMetadataExists { - _, err := syncRecordsTx.ExecContext(c.ctx, - fmt.Sprintf(insertJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), - flowJobName, lastCP, syncBatchID, 0) - if err != nil { - return fmt.Errorf("failed to insert flow job status: %w", err) - } - } else { - _, err := syncRecordsTx.ExecContext(c.ctx, - fmt.Sprintf(updateMetadataForSyncRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier), - lastCP, syncBatchID, flowJobName) - if err != nil { - return fmt.Errorf("failed to update flow job status: %w", err) - } - } - - return nil -} - -func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) - if err != nil { - return fmt.Errorf("failed to get sync status for flow job: %w", err) - } - if !jobMetadataExists { - return fmt.Errorf("job metadata does not exist, unable to update") - } - - stmt := fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier) - _, err = c.database.ExecContext(c.ctx, stmt, normalizeBatchID, flowJobName) - if err != nil { - return fmt.Errorf("failed to update metadata for NormalizeTables: %w", err) - } - - return nil -} - -func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error { - // check if the internal schema exists - row := createSchemaTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) - var schemaExists pgtype.Bool - err := row.Scan(&schemaExists) - if err != nil { - return fmt.Errorf("error while reading result row: %w", err) - } - - if schemaExists.Bool { - c.logger.Info(fmt.Sprintf("internal schema %s already exists", c.metadataSchema)) - return nil - } - - _, err = createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) - if err != nil { - return fmt.Errorf("error while creating internal schema for PeerDB: %w", err) - } - return nil -} - func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { renameTablesTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 3005385335..3e62add82d 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -79,6 +79,10 @@ func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, va return fmt.Errorf("json value check failed: %v", err) } + if len(res.Records) == 0 { + return fmt.Errorf("bad json: empty result set from %s", tableName) + } + jsonVal := res.Records[0][0].Value if jsonVal != value { return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index ad47232b1e..9541d61c86 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -19,13 +19,15 @@ func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) } func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, value string) error { - res, err := s.sfHelper.ExecuteAndProcessQuery(fmt.Sprintf( - "SELECT %s:%s FROM %s;", - colName, fieldName, tableName)) + res, err := s.sfHelper.ExecuteAndProcessQuery(fmt.Sprintf("SELECT %s:%s FROM %s", colName, fieldName, tableName)) if err != nil { return fmt.Errorf("json value check failed: %v", err) } + if len(res.Records) == 0 { + return fmt.Errorf("bad json: empty result set from %s", tableName) + } + jsonVal := res.Records[0][0].Value if jsonVal != value { return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value) diff --git a/nexus/catalog/migrations/V19__metadata.sql b/nexus/catalog/migrations/V19__metadata.sql new file mode 100644 index 0000000000..886dc66eea --- /dev/null +++ b/nexus/catalog/migrations/V19__metadata.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS metadata_last_sync_state ( + job_name TEXT PRIMARY KEY NOT NULL, + last_offset BIGINT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + sync_batch_id BIGINT NOT NULL, + normalize_batch_id BIGINT +); + +CREATE TABLE IF NOT EXISTS metadata_qrep_partitions ( + job_name TEXT NOT NULL, + partition_id TEXT NOT NULL, + sync_partition JSON NOT NULL, + sync_start_time TIMESTAMPTZ NOT NULL, + sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (job_name, partition_id) +);