Skip to content

Commit

Permalink
trying to see lock contention
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 22, 2023
1 parent 64906a2 commit 49ef204
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -788,7 +789,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

release, err := c.grabJobsUpdateLock()
release, err := c.grabJobsUpdateLock(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to grab lock: %v", err)
}
Expand Down Expand Up @@ -1029,7 +1030,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
release, err := c.grabJobsUpdateLock()
release, err := c.grabJobsUpdateLock(jobName)
if err != nil {
return fmt.Errorf("failed to grab lock: %w", err)
}
Expand Down Expand Up @@ -1079,18 +1080,30 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
// returns a function to release the lock.
func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {
func (c *BigQueryConnector) grabJobsUpdateLock(flowJobName string) (func() error, error) {
c.logger.InfoContext(c.ctx,
fmt.Sprintf("[%s] trying to acquire BigQuery metadata table update lock", flowJobName))
tx, err := c.catalogPool.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
var lockStatus pgtype.Bool
for !lockStatus.Bool {
row := tx.QueryRow(c.ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1))", mjTbl)

err = row.Scan(&lockStatus)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

c.logger.InfoContext(c.ctx,
fmt.Sprintf(`[%s] failed to acquire BigQuery metadata table update lock,
trying again in 5 seconds.`, flowJobName))
time.Sleep(5 * time.Second)
}

return func() error {
Expand Down

0 comments on commit 49ef204

Please sign in to comment.