diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index b4af109b66..a1ef2bacef 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -19,6 +19,7 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" @@ -788,7 +789,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) - release, err := c.grabJobsUpdateLock() + release, err := c.grabJobsUpdateLock(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to grab lock: %v", err) } @@ -1029,7 +1030,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { - release, err := c.grabJobsUpdateLock() + release, err := c.grabJobsUpdateLock(jobName) if err != nil { return fmt.Errorf("failed to grab lock: %w", err) } @@ -1079,7 +1080,9 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { // we grab a lock on catalog to ensure that only one job is updating // bigquery tables at a time. // returns a function to release the lock. -func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { +func (c *BigQueryConnector) grabJobsUpdateLock(flowJobName string) (func() error, error) { + c.logger.InfoContext(c.ctx, + fmt.Sprintf("[%s] trying to acquire BigQuery metadata table update lock", flowJobName)) tx, err := c.catalogPool.Begin(c.ctx) if err != nil { return nil, fmt.Errorf("failed to begin transaction: %w", err) @@ -1087,10 +1090,20 @@ func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { // grab an advisory lock based on the mirror jobs table hash mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) - if err != nil { - err = tx.Rollback(c.ctx) - return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) + var lockStatus pgtype.Bool + for !lockStatus.Bool { + row := tx.QueryRow(c.ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1))", mjTbl) + + err = row.Scan(&lockStatus) + if err != nil { + err = tx.Rollback(c.ctx) + return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) + } + + c.logger.InfoContext(c.ctx, + fmt.Sprintf(`[%s] failed to acquire BigQuery metadata table update lock, + trying again in 5 seconds.`, flowJobName)) + time.Sleep(5 * time.Second) } return func() error {