Skip to content

Commit

Permalink
use advisory locks to prevent concurrent updates on the mirror jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 11, 2023
1 parent 54bc55b commit 8fbf1d2
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jobs:
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;"
docker restart pg_cdc
working-directory: ./flow
env:
Expand All @@ -94,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 16 ./... -timeout 2400s
gotestsum --format testname -- -p 32 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand All @@ -114,3 +115,8 @@ jobs:
SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }}
SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }}
SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }}
PEERDB_CATALOG_HOST: localhost
PEERDB_CATALOG_PORT: 7132
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
82 changes: 80 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -64,6 +66,7 @@ type BigQueryConnector struct {
storageClient *storage.Client
tableNameSchemaMapping map[string]*protos.TableSchema
datasetID string
catalogPool *pgxpool.Pool
}

type StagingBQRecord struct {
Expand Down Expand Up @@ -177,12 +180,18 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
return nil, fmt.Errorf("failed to create Storage client: %v", err)
}

catalogPool, err := cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %v", err)
}

return &BigQueryConnector{
ctx: ctx,
bqConfig: config,
client: client,
datasetID: datasetID,
storageClient: storageClient,
catalogPool: catalogPool,
}, nil
}

Expand Down Expand Up @@ -614,6 +623,18 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab jobs update lock: %v", err)
}

defer func() {
err := release()
if err != nil {
log.Errorf("failed to release jobs update lock: %v", err)
}
}()

// we have to do the following things in a transaction
// 1. append the records in the staging table to the raw table.
// 2. execute the update metadata query to store the last committed watermark.
Expand Down Expand Up @@ -916,6 +937,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
// append all the statements to one list
log.Printf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab lock: %v", err)
}

defer func() {
err := release()
if err != nil {
log.Errorf("failed to release lock: %v", err)
}
}()

stmts = append(stmts, "BEGIN TRANSACTION;")

for _, tableName := range distinctTableNames {
Expand Down Expand Up @@ -1134,9 +1167,21 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
release, err := c.grabJobsUpdateLock()
if err != nil {
return fmt.Errorf("failed to grab lock: %w", err)
}

defer func() {
err := release()
if err != nil {
log.Printf("failed to release lock: %v", err)
}
}()

dataset := c.client.Dataset(c.datasetID)
// deleting PeerDB specific tables
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}
Expand Down Expand Up @@ -1185,11 +1230,44 @@ func (c *BigQueryConnector) truncateTable(tableIdentifier string) error {
return nil
}

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
// returns a function to release the lock.
func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {
tx, err := c.catalogPool.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl)

if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

return func() error {
// release the lock
_, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl)
if err != nil {
return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err)
}

err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}, nil
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("renaming table '%s' to '%s'...", src, dst)
Expand Down

0 comments on commit 8fbf1d2

Please sign in to comment.