diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3c0787527e..0a220ef424 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -781,25 +781,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - stmts := []string{} + stmts := make([]string, 0, len(distinctTableNames)+1) // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) - release, err := c.grabJobsUpdateLock() - if err != nil { - return nil, fmt.Errorf("failed to grab lock: %v", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - - stmts = append(stmts, "BEGIN TRANSACTION;") - for _, tableName := range distinctTableNames { mergeGen := &mergeStmtGenerator{ Dataset: c.datasetID, @@ -824,11 +810,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) + query := strings.Join(stmts, "\n") + _, err = c.client.Query(query).Read(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err) + return nil, fmt.Errorf("failed to execute statements %s: %v", query, err) } return &model.NormalizeResponse{ @@ -1023,21 +1009,9 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { - release, err := c.grabJobsUpdateLock() - if err != nil { - return fmt.Errorf("failed to grab lock: %w", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1069,33 +1043,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { return fmt.Sprintf("_peerdb_staging_%s", flowJobName) } -// Bigquery doesn't allow concurrent updates to the same table. -// we grab a lock on catalog to ensure that only one job is updating -// bigquery tables at a time. -// returns a function to release the lock. -func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { - tx, err := c.catalogPool.Begin(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) - } - - // grab an advisory lock based on the mirror jobs table hash - mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) - if err != nil { - err = tx.Rollback(c.ctx) - return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) - } - - return func() error { - err = tx.Commit(c.ctx) - if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return nil - }, nil -} - func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d52e7c42e3..7ed87b0c06 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) - // execute the statements in a transaction - stmts := []string{} - stmts = append(stmts, "BEGIN TRANSACTION;") - stmts = append(stmts, insertStmt) - stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + updateMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client datasetID := s.connector.datasetID - // Start a transaction - stmts := []string{"BEGIN TRANSACTION;"} selector := "*" if softDeleteCol != "" { // PeerDB column @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", datasetID, dstTableName, selector, datasetID, stagingTable) - stmts = append(stmts, insertStmt) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) if err != nil { return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) } slog.Info("Performing transaction inside QRep sync function", flowLog) - stmts = append(stmts, insertMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - // Execute the statements in a transaction + + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + insertMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)