Skip to content

Commit

Permalink
Don't create metadata schema in a transaction
Browse files Browse the repository at this point in the history
When a command errors during a transaction the transaction can no longer progress,
it must be rolled back. This is the case when IF NOT EXISTS errors over unique violation

We do not need schema/table creation to be transactional, so remove transaction
  • Loading branch information
serprex committed Dec 30, 2023
1 parent a182a23 commit ad9953f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 60 deletions.
19 changes: 2 additions & 17 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,15 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
}

func (p *PostgresMetadataStore) SetupMetadata() error {
// start a transaction
tx, err := p.pool.Begin(p.ctx)
if err != nil {
p.logger.Error("failed to start transaction", slog.Any("error", err))
return err
}

// create the schema
_, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
_, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}

// create the last sync state table
_, err = tx.Exec(p.ctx, `
_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
Expand All @@ -126,14 +119,6 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
}

p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName))

// commit the transaction
err = tx.Commit(p.ctx)
if err != nil {
p.logger.Error("failed to commit transaction", slog.Any("error", err))
return err
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ func (c *PostgresConnector) createSlotAndPublication(
return nil
}

func (c *PostgresConnector) createMetadataSchema(createSchemaTx pgx.Tx) error {
_, err := createSchemaTx.Exec(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema))
func (c *PostgresConnector) createMetadataSchema() error {
_, err := c.pool.Exec(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema))
if err != nil && !utils.IsUniqueError(err) {
return fmt.Errorf("error while creating internal schema: %w", err)
}
Expand Down
29 changes: 8 additions & 21 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,31 +141,17 @@ func (c *PostgresConnector) NeedsSetupMetadataTables() bool {

// SetupMetadataTables sets up the metadata tables.
func (c *PostgresConnector) SetupMetadataTables() error {
createMetadataTablesTx, err := c.pool.Begin(c.ctx)
if err != nil {
return fmt.Errorf("error starting transaction for creating metadata tables: %w", err)
}
defer func() {
deferErr := createMetadataTablesTx.Rollback(c.ctx)
if deferErr != pgx.ErrTxClosed && deferErr != nil {
c.logger.Error("error rolling back transaction for creating metadata tables", slog.Any("error", err))
}
}()

err = c.createMetadataSchema(createMetadataTablesTx)
err := c.createMetadataSchema()
if err != nil {
return err
}
_, err = createMetadataTablesTx.Exec(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL,

_, err = c.pool.Exec(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL,
c.metadataSchema, mirrorJobsTableIdentifier))
if err != nil && !utils.IsUniqueError(err) {
return fmt.Errorf("error creating table %s: %w", mirrorJobsTableIdentifier, err)
}

err = createMetadataTablesTx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("error committing transaction for creating metadata tables: %w", err)
}
return nil
}

Expand Down Expand Up @@ -507,6 +493,11 @@ type SlotCheckResult struct {
func (c *PostgresConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)

err := c.createMetadataSchema()
if err != nil {
return nil, fmt.Errorf("error creating internal schema: %w", err)
}

createRawTableTx, err := c.pool.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("error starting transaction for creating raw table: %w", err)
Expand All @@ -518,10 +509,6 @@ func (c *PostgresConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}
}()

err = c.createMetadataSchema(createRawTableTx)
if err != nil {
return nil, fmt.Errorf("error creating internal schema: %w", err)
}
_, err = createRawTableTx.Exec(c.ctx, fmt.Sprintf(createRawTableSQL, c.metadataSchema, rawTableIdentifier))
if err != nil {
return nil, fmt.Errorf("error creating raw table: %w", err)
Expand Down
24 changes: 4 additions & 20 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,18 +477,7 @@ func (c *PostgresConnector) SyncQRepRecords(

// SetupQRepMetadataTables function for postgres connector
func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
createQRepMetadataTableTx, err := c.pool.Begin(c.ctx)
if err != nil {
return fmt.Errorf("error starting transaction for creating qrep metadata table: %w", err)
}
defer func() {
deferErr := createQRepMetadataTableTx.Rollback(c.ctx)
if deferErr != pgx.ErrTxClosed && deferErr != nil {
c.logger.Error("error rolling back transaction for creating qrep metadata table", slog.Any("error", err))
}
}()

err = c.createMetadataSchema(createQRepMetadataTableTx)
err := c.createMetadataSchema()
if err != nil {
return fmt.Errorf("error creating metadata schema: %w", err)
}
Expand All @@ -502,26 +491,21 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
syncFinishTime TIMESTAMP DEFAULT NOW()
)`, metadataTableIdentifier.Sanitize())
// execute create table query
_, err = createQRepMetadataTableTx.Exec(c.ctx, createQRepMetadataTableSQL)
if err != nil {
_, err = c.pool.Exec(c.ctx, createQRepMetadataTableSQL)
if err != nil && !utils.IsUniqueError(err) {
return fmt.Errorf("failed to create table %s: %w", qRepMetadataTableName, err)
}
c.logger.Info("Setup metadata table.")

if config.WriteMode != nil &&
config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = createQRepMetadataTableTx.Exec(c.ctx,
_, err = c.pool.Exec(c.ctx,
fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

err = createQRepMetadataTableTx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("error committing transaction for creating qrep metadata table: %w", err)
}

return nil
}

Expand Down

0 comments on commit ad9953f

Please sign in to comment.