Skip to content

Commit

Permalink
make some of our snowflake and bigquery tests run in parallel (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 13, 2023
1 parent 173571e commit fdf7bbc
Show file tree
Hide file tree
Showing 9 changed files with 563 additions and 333 deletions.
17 changes: 12 additions & 5 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ jobs:
- name: install gotestsum
run: |
go install gotest.tools/gotestsum@latest
- name: install lib-geos
run: |
sudo apt-get update
sudo apt-get install libgeos-dev
- name: download go modules
run: |
go mod download
Expand All @@ -65,14 +65,14 @@ jobs:
with:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}

- name: setup S3 credentials
id: s3-credentials
uses: jsdaniell/[email protected]
with:
name: "s3_creds.json"
json: ${{ secrets.S3_CREDS }}

- name: setup GCS credentials
id: gcs-credentials
uses: jsdaniell/[email protected]
Expand All @@ -86,6 +86,7 @@ jobs:
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;"
docker restart pg_cdc
working-directory: ./flow
env:
Expand All @@ -94,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 4 ./... -timeout 2400s
gotestsum --format testname -- -p 32 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand All @@ -114,3 +115,9 @@ jobs:
SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }}
SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }}
SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }}
PEERDB_CATALOG_HOST: localhost
PEERDB_CATALOG_PORT: 7132
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3
89 changes: 86 additions & 3 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -64,6 +66,7 @@ type BigQueryConnector struct {
storageClient *storage.Client
tableNameSchemaMapping map[string]*protos.TableSchema
datasetID string
catalogPool *pgxpool.Pool
}

type StagingBQRecord struct {
Expand Down Expand Up @@ -177,12 +180,18 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
return nil, fmt.Errorf("failed to create Storage client: %v", err)
}

catalogPool, err := cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %v", err)
}

return &BigQueryConnector{
ctx: ctx,
bqConfig: config,
client: client,
datasetID: datasetID,
storageClient: storageClient,
catalogPool: catalogPool,
}, nil
}

Expand Down Expand Up @@ -261,7 +270,12 @@ func (c *BigQueryConnector) SetupMetadataTables() error {
},
}
if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil {
return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err)
// if the table already exists, ignore the error
if !strings.Contains(err.Error(), "Already Exists") {
return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err)
} else {
log.Infof("table %s already exists", MirrorJobsTable)
}
}

return nil
Expand Down Expand Up @@ -609,6 +623,18 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab jobs update lock: %v", err)
}

defer func() {
err := release()
if err != nil {
log.Errorf("failed to release jobs update lock: %v", err)
}
}()

// we have to do the following things in a transaction
// 1. append the records in the staging table to the raw table.
// 2. execute the update metadata query to store the last committed watermark.
Expand Down Expand Up @@ -911,6 +937,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
// append all the statements to one list
log.Printf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab lock: %v", err)
}

defer func() {
err := release()
if err != nil {
log.Errorf("failed to release lock: %v", err)
}
}()

stmts = append(stmts, "BEGIN TRANSACTION;")

for _, tableName := range distinctTableNames {
Expand Down Expand Up @@ -1129,9 +1167,21 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
release, err := c.grabJobsUpdateLock()
if err != nil {
return fmt.Errorf("failed to grab lock: %w", err)
}

defer func() {
err := release()
if err != nil {
log.Printf("failed to release lock: %v", err)
}
}()

dataset := c.client.Dataset(c.datasetID)
// deleting PeerDB specific tables
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}
Expand Down Expand Up @@ -1180,11 +1230,44 @@ func (c *BigQueryConnector) truncateTable(tableIdentifier string) error {
return nil
}

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
// returns a function to release the lock.
func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {
tx, err := c.catalogPool.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl)

if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

return func() error {
// release the lock
_, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl)
if err != nil {
return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err)
}

err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}, nil
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("renaming table '%s' to '%s'...", src, dst)
Expand Down
Loading

0 comments on commit fdf7bbc

Please sign in to comment.