Skip to content

Commit

Permalink
BQ merge: don't be transactional
Browse files Browse the repository at this point in the history
Discussed with Kevin BQ lock contention. He made #889 to remove temp table

Merge is idempotent, so no need to have transaction,
which removes need to have advisory lock on catalog
  • Loading branch information
serprex committed Dec 24, 2023
1 parent b160753 commit 5e9ae02
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 71 deletions.
63 changes: 5 additions & 58 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,25 +781,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

stmts := []string{}
stmts := make([]string, 0, len(distinctTableNames)+1)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab lock: %v", err)
}

defer func() {
err := release()
if err != nil {
c.logger.Error("failed to release lock", slog.Any("error", err))
}
}()

stmts = append(stmts, "BEGIN TRANSACTION;")

for _, tableName := range distinctTableNames {
mergeGen := &mergeStmtGenerator{
Dataset: c.datasetID,
Expand All @@ -824,11 +810,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")

_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
query := strings.Join(stmts, "\n")
_, err = c.client.Query(query).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err)
return nil, fmt.Errorf("failed to execute statements %s: %v", query, err)
}

return &model.NormalizeResponse{
Expand Down Expand Up @@ -1023,21 +1009,9 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
release, err := c.grabJobsUpdateLock()
if err != nil {
return fmt.Errorf("failed to grab lock: %w", err)
}

defer func() {
err := release()
if err != nil {
c.logger.Error("failed to release lock", slog.Any("error", err))
}
}()

dataset := c.client.Dataset(c.datasetID)
// deleting PeerDB specific tables
err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}
Expand Down Expand Up @@ -1069,33 +1043,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
return fmt.Sprintf("_peerdb_staging_%s", flowJobName)
}

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
// returns a function to release the lock.
func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {
tx, err := c.catalogPool.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

return func() error {
err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}, nil
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
Expand Down
26 changes: 13 additions & 13 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)

// execute the statements in a transaction
stmts := []string{}
stmts = append(stmts, "BEGIN TRANSACTION;")
stmts = append(stmts, insertStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
stmts := []string{
"BEGIN TRANSACTION;",
insertStmt,
updateMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
Expand Down Expand Up @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client
datasetID := s.connector.datasetID
// Start a transaction
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if softDeleteCol != "" { // PeerDB column
Expand All @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
datasetID, dstTableName, selector, datasetID, stagingTable)

stmts = append(stmts, insertStmt)

insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime)
if err != nil {
return -1, fmt.Errorf("failed to create metadata insert statement: %v", err)
}
slog.Info("Performing transaction inside QRep sync function", flowLog)
stmts = append(stmts, insertMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
// Execute the statements in a transaction

stmts := []string{
"BEGIN TRANSACTION;",
insertStmt,
insertMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
Expand Down

0 comments on commit 5e9ae02

Please sign in to comment.