diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 008b462ae4..b26aacf637 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -97,22 +97,15 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { } func (p *PostgresMetadataStore) SetupMetadata() error { - // start a transaction - tx, err := p.pool.Begin(p.ctx) - if err != nil { - p.logger.Error("failed to start transaction", slog.Any("error", err)) - return err - } - // create the schema - _, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) + _, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) if err != nil && !utils.IsUniqueError(err) { p.logger.Error("failed to create schema", slog.Any("error", err)) return err } // create the last sync state table - _, err = tx.Exec(p.ctx, ` + _, err = p.pool.Exec(p.ctx, ` CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` ( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, @@ -126,14 +119,6 @@ func (p *PostgresMetadataStore) SetupMetadata() error { } p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)) - - // commit the transaction - err = tx.Commit(p.ctx) - if err != nil { - p.logger.Error("failed to commit transaction", slog.Any("error", err)) - return err - } - return nil } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 52eb0d77d1..f24802aff3 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -372,8 +372,8 @@ func (c *PostgresConnector) createSlotAndPublication( return nil } -func (c *PostgresConnector) createMetadataSchema(createSchemaTx pgx.Tx) error { - _, err := createSchemaTx.Exec(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) +func (c *PostgresConnector) createMetadataSchema() error { + _, err := c.pool.Exec(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) if err != nil && !utils.IsUniqueError(err) { return fmt.Errorf("error while creating internal schema: %w", err) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 127d30d491..4f8064b0d0 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -141,31 +141,17 @@ func (c *PostgresConnector) NeedsSetupMetadataTables() bool { // SetupMetadataTables sets up the metadata tables. func (c *PostgresConnector) SetupMetadataTables() error { - createMetadataTablesTx, err := c.pool.Begin(c.ctx) - if err != nil { - return fmt.Errorf("error starting transaction for creating metadata tables: %w", err) - } - defer func() { - deferErr := createMetadataTablesTx.Rollback(c.ctx) - if deferErr != pgx.ErrTxClosed && deferErr != nil { - c.logger.Error("error rolling back transaction for creating metadata tables", slog.Any("error", err)) - } - }() - - err = c.createMetadataSchema(createMetadataTablesTx) + err := c.createMetadataSchema() if err != nil { return err } - _, err = createMetadataTablesTx.Exec(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL, + + _, err = c.pool.Exec(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL, c.metadataSchema, mirrorJobsTableIdentifier)) if err != nil && !utils.IsUniqueError(err) { return fmt.Errorf("error creating table %s: %w", mirrorJobsTableIdentifier, err) } - err = createMetadataTablesTx.Commit(c.ctx) - if err != nil { - return fmt.Errorf("error committing transaction for creating metadata tables: %w", err) - } return nil } @@ -507,6 +493,11 @@ type SlotCheckResult struct { func (c *PostgresConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) + err := c.createMetadataSchema() + if err != nil { + return nil, fmt.Errorf("error creating internal schema: %w", err) + } + createRawTableTx, err := c.pool.Begin(c.ctx) if err != nil { return nil, fmt.Errorf("error starting transaction for creating raw table: %w", err) @@ -518,10 +509,6 @@ func (c *PostgresConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr } }() - err = c.createMetadataSchema(createRawTableTx) - if err != nil { - return nil, fmt.Errorf("error creating internal schema: %w", err) - } _, err = createRawTableTx.Exec(c.ctx, fmt.Sprintf(createRawTableSQL, c.metadataSchema, rawTableIdentifier)) if err != nil { return nil, fmt.Errorf("error creating raw table: %w", err) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index ce114b702b..2e45647280 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -477,18 +477,7 @@ func (c *PostgresConnector) SyncQRepRecords( // SetupQRepMetadataTables function for postgres connector func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - createQRepMetadataTableTx, err := c.pool.Begin(c.ctx) - if err != nil { - return fmt.Errorf("error starting transaction for creating qrep metadata table: %w", err) - } - defer func() { - deferErr := createQRepMetadataTableTx.Rollback(c.ctx) - if deferErr != pgx.ErrTxClosed && deferErr != nil { - c.logger.Error("error rolling back transaction for creating qrep metadata table", slog.Any("error", err)) - } - }() - - err = c.createMetadataSchema(createQRepMetadataTableTx) + err := c.createMetadataSchema() if err != nil { return fmt.Errorf("error creating metadata schema: %w", err) } @@ -502,26 +491,21 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e syncFinishTime TIMESTAMP DEFAULT NOW() )`, metadataTableIdentifier.Sanitize()) // execute create table query - _, err = createQRepMetadataTableTx.Exec(c.ctx, createQRepMetadataTableSQL) - if err != nil { + _, err = c.pool.Exec(c.ctx, createQRepMetadataTableSQL) + if err != nil && !utils.IsUniqueError(err) { return fmt.Errorf("failed to create table %s: %w", qRepMetadataTableName, err) } c.logger.Info("Setup metadata table.") if config.WriteMode != nil && config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { - _, err = createQRepMetadataTableTx.Exec(c.ctx, + _, err = c.pool.Exec(c.ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) if err != nil { return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) } } - err = createQRepMetadataTableTx.Commit(c.ctx) - if err != nil { - return fmt.Errorf("error committing transaction for creating qrep metadata table: %w", err) - } - return nil }