Skip to content

Commit

Permalink
added metadata_schema option for SF and PG peers (#560)
Browse files Browse the repository at this point in the history
All metadata tables and Snowflake stages for CDC and QRep mirrors will
be created in the provided schema, schema will be auto-created. Default
is `_peerdb_internal`

fixes: #439
  • Loading branch information
heavycrystal authored Nov 6, 2023
1 parent efc9f45 commit 368fb9d
Show file tree
Hide file tree
Showing 14 changed files with 439 additions and 255 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.vscode
.vscode/
.env
tmp/
.envrc
.idea/
31 changes: 15 additions & 16 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import (

//nolint:stylecheck
const (
internalSchema = "_peerdb_internal"
mirrorJobsTableIdentifier = "peerdb_mirror_jobs"
createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(mirror_job_name TEXT PRIMARY KEY,
lsn_offset BIGINT NOT NULL,sync_batch_id BIGINT NOT NULL,normalize_batch_id BIGINT NOT NULL)`
rawTablePrefix = "_peerdb_raw"
createInternalSchemaSQL = "CREATE SCHEMA IF NOT EXISTS %s"
createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_peerdb_uid TEXT NOT NULL,
rawTablePrefix = "_peerdb_raw"
createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS %s"
createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_peerdb_uid TEXT NOT NULL,
_peerdb_timestamp BIGINT NOT NULL,_peerdb_destination_table_name TEXT NOT NULL,_peerdb_data JSONB NOT NULL,
_peerdb_record_type INTEGER NOT NULL, _peerdb_match_data JSONB,_peerdb_batch_id INTEGER,
_peerdb_unchanged_toast_columns TEXT)`
Expand Down Expand Up @@ -292,8 +291,8 @@ func (c *PostgresConnector) createSlotAndPublication(
return nil
}

func (c *PostgresConnector) createInternalSchema(createSchemaTx pgx.Tx) error {
_, err := createSchemaTx.Exec(c.ctx, fmt.Sprintf(createInternalSchemaSQL, internalSchema))
func (c *PostgresConnector) createMetadataSchema(createSchemaTx pgx.Tx) error {
_, err := createSchemaTx.Exec(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema))
if err != nil {
return fmt.Errorf("error while creating internal schema: %w", err)
}
Expand Down Expand Up @@ -329,7 +328,7 @@ func generateCreateTableSQLForNormalizedTable(sourceTableIdentifier string,
func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) {
rows, err := c.pool.Query(c.ctx, fmt.Sprintf(
getLastSyncBatchID_SQL,
internalSchema,
c.metadataSchema,
mirrorJobsTableIdentifier,
), jobName)
if err != nil {
Expand All @@ -350,7 +349,7 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}

func (c *PostgresConnector) getLastNormalizeBatchID(jobName string) (int64, error) {
rows, err := c.pool.Query(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, internalSchema,
rows, err := c.pool.Query(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Postgres peer for last normalizeBatchId: %w", err)
Expand All @@ -371,7 +370,7 @@ func (c *PostgresConnector) getLastNormalizeBatchID(jobName string) (int64, erro

func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) {
rows, err := c.pool.Query(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, internalSchema, mirrorJobsTableIdentifier), jobName)
fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return false, fmt.Errorf("failed to check if job exists: %w", err)
}
Expand Down Expand Up @@ -405,14 +404,14 @@ func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64,

if !jobMetadataExists {
_, err := syncRecordsTx.Exec(c.ctx,
fmt.Sprintf(insertJobMetadataSQL, internalSchema, mirrorJobsTableIdentifier),
fmt.Sprintf(insertJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier),
flowJobName, lastCP, syncBatchID, 0)
if err != nil {
return fmt.Errorf("failed to insert flow job status: %w", err)
}
} else {
_, err := syncRecordsTx.Exec(c.ctx,
fmt.Sprintf(updateMetadataForSyncRecordsSQL, internalSchema, mirrorJobsTableIdentifier),
fmt.Sprintf(updateMetadataForSyncRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier),
lastCP, syncBatchID, flowJobName)
if err != nil {
return fmt.Errorf("failed to update flow job status: %w", err)
Expand All @@ -433,7 +432,7 @@ func (c *PostgresConnector) updateNormalizeMetadata(flowJobName string, normaliz
}

_, err = normalizeRecordsTx.Exec(c.ctx,
fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, internalSchema, mirrorJobsTableIdentifier),
fmt.Sprintf(updateMetadataForNormalizeRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier),
normalizeBatchID, flowJobName)
if err != nil {
return fmt.Errorf("failed to update metadata for NormalizeTables: %w", err)
Expand All @@ -446,7 +445,7 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync
normalizeBatchID int64) (map[string][]string, error) {
rawTableIdentifier := getRawTableIdentifier(flowJobName)

rows, err := c.pool.Query(c.ctx, fmt.Sprintf(getTableNameToUnchangedToastColsSQL, internalSchema,
rows, err := c.pool.Query(c.ctx, fmt.Sprintf(getTableNameToUnchangedToastColsSQL, c.metadataSchema,
rawTableIdentifier), normalizeBatchID, syncBatchID)
if err != nil {
return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err)
Expand Down Expand Up @@ -512,11 +511,11 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ")

fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL,
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), internalSchema,
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), c.metadataSchema,
rawTableIdentifier, destinationTableIdentifier, insertColumnsSQL, flattenedCastsSQL,
strings.Join(normalizedTableSchema.PrimaryKeyColumns, ","), updateColumnsSQL)
fallbackDeleteStatement := fmt.Sprintf(fallbackDeleteStatementSQL,
strings.Join(maps.Values(primaryKeyColumnCasts), ","), internalSchema,
strings.Join(maps.Values(primaryKeyColumnCasts), ","), c.metadataSchema,
rawTableIdentifier, destinationTableIdentifier, deleteWhereClauseSQL)

return []string{fallbackUpsertStatement, fallbackDeleteStatement}
Expand Down Expand Up @@ -560,7 +559,7 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
updateStatements := c.generateUpdateStatement(columnNames, unchangedToastColumns)

return fmt.Sprintf(mergeStatementSQL, strings.Join(maps.Values(primaryKeyColumnCasts), ","),
internalSchema, rawTableIdentifier, destinationTableIdentifier, flattenedCastsSQL,
c.metadataSchema, rawTableIdentifier, destinationTableIdentifier, flattenedCastsSQL,
strings.Join(primaryKeySelectSQLArray, " AND "), insertColumnsSQL, insertValuesSQL, updateStatements)
}

Expand Down
32 changes: 20 additions & 12 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PostgresConnector struct {
replPool *pgxpool.Pool
tableSchemaMapping map[string]*protos.TableSchema
customTypesMapping map[uint32]string
metadataSchema string
}

// NewPostgresConnector creates a new instance of PostgresConnector.
Expand Down Expand Up @@ -67,18 +68,26 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConnConfig.MaxConns = 1

// TODO: replPool not initializing might be intentional, if we only want to use QRep mirrors
// and the user doesn't have the REPLICATION permission
replPool, err := pgxpool.NewWithConfig(ctx, replConnConfig)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
}

metadataSchema := "_peerdb_internal"
if pgConfig.MetadataSchema != nil {
metadataSchema = *pgConfig.MetadataSchema
}

return &PostgresConnector{
connStr: connectionString,
ctx: ctx,
config: pgConfig,
pool: pool,
replPool: replPool,
customTypesMapping: customTypeMap,
metadataSchema: metadataSchema,
}, nil
}

Expand Down Expand Up @@ -106,7 +115,7 @@ func (c *PostgresConnector) ConnectionActive() bool {
// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
func (c *PostgresConnector) NeedsSetupMetadataTables() bool {
result, err := c.tableExists(&utils.SchemaTable{
Schema: internalSchema,
Schema: c.metadataSchema,
Table: mirrorJobsTableIdentifier,
})
if err != nil {
Expand All @@ -128,12 +137,12 @@ func (c *PostgresConnector) SetupMetadataTables() error {
}
}()

err = c.createInternalSchema(createMetadataTablesTx)
err = c.createMetadataSchema(createMetadataTablesTx)
if err != nil {
return err
}
_, err = createMetadataTablesTx.Exec(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL,
internalSchema, mirrorJobsTableIdentifier))
c.metadataSchema, mirrorJobsTableIdentifier))
if err != nil {
return fmt.Errorf("error creating table %s: %w", mirrorJobsTableIdentifier, err)
}
Expand All @@ -148,7 +157,7 @@ func (c *PostgresConnector) SetupMetadataTables() error {
// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) {
rows, err := c.pool.
Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, internalSchema, mirrorJobsTableIdentifier), jobName)
Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return nil, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
}
Expand Down Expand Up @@ -354,7 +363,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}
}()

syncedRecordsCount, err := syncRecordsTx.CopyFrom(c.ctx, pgx.Identifier{internalSchema, rawTableIdentifier},
syncedRecordsCount, err := syncRecordsTx.CopyFrom(c.ctx, pgx.Identifier{c.metadataSchema, rawTableIdentifier},
[]string{"_peerdb_uid", "_peerdb_timestamp", "_peerdb_destination_table_name", "_peerdb_data",
"_peerdb_record_type", "_peerdb_match_data", "_peerdb_batch_id", "_peerdb_unchanged_toast_columns"},
pgx.CopyFromRows(records))
Expand Down Expand Up @@ -507,21 +516,21 @@ func (c *PostgresConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}
}()

err = c.createInternalSchema(createRawTableTx)
err = c.createMetadataSchema(createRawTableTx)
if err != nil {
return nil, fmt.Errorf("error creating internal schema: %w", err)
}
_, err = createRawTableTx.Exec(c.ctx, fmt.Sprintf(createRawTableSQL, internalSchema, rawTableIdentifier))
_, err = createRawTableTx.Exec(c.ctx, fmt.Sprintf(createRawTableSQL, c.metadataSchema, rawTableIdentifier))
if err != nil {
return nil, fmt.Errorf("error creating raw table: %w", err)
}
_, err = createRawTableTx.Exec(c.ctx, fmt.Sprintf(createRawTableBatchIDIndexSQL, rawTableIdentifier,
internalSchema, rawTableIdentifier))
c.metadataSchema, rawTableIdentifier))
if err != nil {
return nil, fmt.Errorf("error creating batch ID index on raw table: %w", err)
}
_, err = createRawTableTx.Exec(c.ctx, fmt.Sprintf(createRawTableDstTableIndexSQL, rawTableIdentifier,
internalSchema, rawTableIdentifier))
c.metadataSchema, rawTableIdentifier))
if err != nil {
return nil, fmt.Errorf("error creating destion table index on raw table: %w", err)
}
Expand Down Expand Up @@ -723,7 +732,6 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
// EnsurePullability ensures that a table is pullable, implementing the Connector interface.
func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatchInput,
) (*protos.EnsurePullabilityBatchOutput, error) {

tableIdentifierMapping := make(map[string]*protos.TableIdentifier)
for _, tableName := range req.SourceTableIdentifiers {
schemaTable, err := utils.ParseSchemaTable(tableName)
Expand Down Expand Up @@ -838,13 +846,13 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error {
}
}()

_, err = syncFlowCleanupTx.Exec(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, internalSchema,
_, err = syncFlowCleanupTx.Exec(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema,
getRawTableIdentifier(jobName)))
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
}
_, err = syncFlowCleanupTx.Exec(c.ctx,
fmt.Sprintf(deleteJobMetadataSQL, internalSchema, mirrorJobsTableIdentifier), jobName)
fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)
}
Expand Down
48 changes: 35 additions & 13 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,19 +553,34 @@ func (c *PostgresConnector) SyncQRepRecords(

// SetupQRepMetadataTables function for postgres connector
func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
qRepMetadataSchema := `CREATE TABLE IF NOT EXISTS %s (
createQRepMetadataTableTx, err := c.pool.Begin(c.ctx)
if err != nil {
return fmt.Errorf("error starting transaction for creating qrep metadata table: %w", err)
}
defer func() {
deferErr := createQRepMetadataTableTx.Rollback(c.ctx)
if deferErr != pgx.ErrTxClosed && deferErr != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("unexpected error rolling back transaction for creating qrep metadata table: %v", err)
}
}()

err = c.createMetadataSchema(createQRepMetadataTableTx)
if err != nil {
return fmt.Errorf("error creating metadata schema: %w", err)
}

metadataTableIdentifier := pgx.Identifier{c.metadataSchema, qRepMetadataTableName}
createQRepMetadataTableSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s(
flowJobName TEXT,
partitionID TEXT,
syncPartition JSONB,
syncStartTime TIMESTAMP,
syncFinishTime TIMESTAMP DEFAULT NOW()
)`

// replace table name in schema
qRepMetadataSchema = fmt.Sprintf(qRepMetadataSchema, qRepMetadataTableName)

)`, metadataTableIdentifier.Sanitize())
// execute create table query
_, err := c.pool.Exec(c.ctx, qRepMetadataSchema)
_, err = createQRepMetadataTableTx.Exec(c.ctx, createQRepMetadataTableSQL)
if err != nil {
return fmt.Errorf("failed to create table %s: %w", qRepMetadataTableName, err)
}
Expand All @@ -575,12 +590,18 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e

if config.WriteMode != nil &&
config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.pool.Exec(c.ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
_, err = createQRepMetadataTableTx.Exec(c.ctx,
fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

err = createQRepMetadataTableTx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("error committing transaction for creating qrep metadata table: %w", err)
}

return nil
}

Expand Down Expand Up @@ -612,17 +633,18 @@ func BuildQuery(query string, flowJobName string) (string, error) {
// isPartitionSynced checks whether a specific partition is synced
func (c *PostgresConnector) isPartitionSynced(partitionID string) (bool, error) {
// setup the query string
metadataTableIdentifier := pgx.Identifier{c.metadataSchema, qRepMetadataTableName}
queryString := fmt.Sprintf(
"SELECT COUNT(*) FROM %s WHERE partitionID = $1;",
qRepMetadataTableName,
"SELECT COUNT(*)>0 FROM %s WHERE partitionID = $1;",
metadataTableIdentifier.Sanitize(),
)

// prepare and execute the query
var count int
err := c.pool.QueryRow(c.ctx, queryString, partitionID).Scan(&count)
var result bool
err := c.pool.QueryRow(c.ctx, queryString, partitionID).Scan(&result)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}

return count > 0, nil
return result, nil
}
7 changes: 4 additions & 3 deletions flow/connectors/postgres/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
} else {
// Step 2.1: Create a temp staging table
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", util.RandomString(8))
stagingTableIdentifier := pgx.Identifier{dstTableName.Schema, stagingTableName}
stagingTableIdentifier := pgx.Identifier{s.connector.metadataSchema, stagingTableName}
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}

createStagingTableStmt := fmt.Sprintf(
Expand All @@ -109,7 +109,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
schema.GetColumnNames(),
copySource,
)
if err != nil {
if err != nil || numRowsSynced != int64(copySource.NumRecords()) {
return -1, fmt.Errorf("failed to copy records into staging table: %v", err)
}

Expand Down Expand Up @@ -173,9 +173,10 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
return -1, fmt.Errorf("failed to marshal partition to json: %v", err)
}

metadataTableIdentifier := pgx.Identifier{s.connector.metadataSchema, qRepMetadataTableName}
insertMetadataStmt := fmt.Sprintf(
"INSERT INTO %s VALUES ($1, $2, $3, $4, $5);",
qRepMetadataTableName,
metadataTableIdentifier.Sanitize(),
)
log.WithFields(log.Fields{
"flowName": flowJobName,
Expand Down
Loading

0 comments on commit 368fb9d

Please sign in to comment.