diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index e4edd44160..c1ac28252c 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -86,6 +86,7 @@ jobs: docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;" + docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;" docker restart pg_cdc working-directory: ./flow env: @@ -94,7 +95,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 16 ./... -timeout 2400s + gotestsum --format testname -- -p 32 ./... -timeout 2400s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} @@ -114,3 +115,8 @@ jobs: SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }} SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }} SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }} + PEERDB_CATALOG_HOST: localhost + PEERDB_CATALOG_PORT: 7132 + PEERDB_CATALOG_USER: postgres + PEERDB_CATALOG_PASSWORD: postgres + PEERDB_CATALOG_DATABASE: postgres diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 82e16282fc..7e9089ee95 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -13,10 +13,12 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/storage" "github.com/PeerDB-io/peer-flow/connectors/utils" + cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" "google.golang.org/api/iterator" @@ -64,6 +66,7 @@ type BigQueryConnector struct { storageClient *storage.Client tableNameSchemaMapping map[string]*protos.TableSchema datasetID string + catalogPool *pgxpool.Pool } type StagingBQRecord struct { @@ -177,12 +180,18 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to create Storage client: %v", err) } + catalogPool, err := cc.GetCatalogConnectionPoolFromEnv() + if err != nil { + return nil, fmt.Errorf("failed to create catalog connection pool: %v", err) + } + return &BigQueryConnector{ ctx: ctx, bqConfig: config, client: client, datasetID: datasetID, storageClient: storageClient, + catalogPool: catalogPool, }, nil } @@ -614,6 +623,18 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, return nil, fmt.Errorf("failed to get last checkpoint: %v", err) } + release, err := c.grabJobsUpdateLock() + if err != nil { + return nil, fmt.Errorf("failed to grab jobs update lock: %v", err) + } + + defer func() { + err := release() + if err != nil { + log.Errorf("failed to release jobs update lock: %v", err) + } + }() + // we have to do the following things in a transaction // 1. append the records in the staging table to the raw table. // 2. execute the update metadata query to store the last committed watermark. @@ -916,6 +937,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) // append all the statements to one list log.Printf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames) + release, err := c.grabJobsUpdateLock() + if err != nil { + return nil, fmt.Errorf("failed to grab lock: %v", err) + } + + defer func() { + err := release() + if err != nil { + log.Errorf("failed to release lock: %v", err) + } + }() + stmts = append(stmts, "BEGIN TRANSACTION;") for _, tableName := range distinctTableNames { @@ -1134,9 +1167,21 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { + release, err := c.grabJobsUpdateLock() + if err != nil { + return fmt.Errorf("failed to grab lock: %w", err) + } + + defer func() { + err := release() + if err != nil { + log.Printf("failed to release lock: %v", err) + } + }() + dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1185,11 +1230,44 @@ func (c *BigQueryConnector) truncateTable(tableIdentifier string) error { return nil } +// Bigquery doesn't allow concurrent updates to the same table. +// we grab a lock on catalog to ensure that only one job is updating +// bigquery tables at a time. +// returns a function to release the lock. +func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { + tx, err := c.catalogPool.Begin(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + + // grab an advisory lock based on the mirror jobs table hash + mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) + _, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl) + + if err != nil { + err = tx.Rollback(c.ctx) + return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) + } + + return func() error { + // release the lock + _, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl) + if err != nil { + return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err) + } + + err = tx.Commit(c.ctx) + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + return nil + }, nil +} + func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName dst := renameRequest.NewName - log.WithFields(log.Fields{ "flowName": req.FlowJobName, }).Infof("renaming table '%s' to '%s'...", src, dst)