Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/dagster/dagster_peerdb/sqlpar…
Browse files Browse the repository at this point in the history
…se-0.4.4
  • Loading branch information
heavycrystal authored Sep 2, 2023
2 parents d21db6e + 3f48101 commit 74a3901
Show file tree
Hide file tree
Showing 49 changed files with 3,973 additions and 3,048 deletions.
20 changes: 12 additions & 8 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
name: Flow build and test

on:
push:
branches: [main]
pull_request:
branches: [main]
push:
branches: [main]

jobs:
flow_test:
runs-on: ubuntu-latest
timeout-minutes: 30
services:
pg_cdc:
image: debezium/postgres:14-alpine
image: postgres:15.4-alpine
ports:
- 7132:5432
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
options: >-
--name pg_cdc
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
Expand Down Expand Up @@ -55,18 +56,21 @@ jobs:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}

- name: create hstore extension
- name: create hstore extension and increase logical replication limits
run: |
sudo apt-get update
sudo apt-get install -y postgresql-client
psql -h localhost -p 7132 -U postgres -c "CREATE EXTENSION hstore;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;"
docker restart pg_cdc
working-directory: ./flow
env:
PG_CDC:
PGPASSWORD: postgres

- name: run tests
run: |
gotestsum --format testname -- -p 1 ./... -timeout 1200s
gotestsum --format testname -- -p 4 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stable-debian.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
release:
name: build and release
runs-on: ubuntu-latest-64-core
runs-on: ubuntu-latest
permissions:
contents: write
steps:
Expand Down
7 changes: 4 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ x-flow-worker-env: &flow-worker-env
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-""}
AWS_REGION: ${AWS_REGION:-""}
# enables worker profiling using Go's pprof
ENABLE_PROFILING: true
ENABLE_PROFILING: "true"
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: true
ENABLE_METRICS: "true"
# enables exporting of mirror metrics to Catalog in the PEERDB_STATS schema.
ENABLE_STATS: true
ENABLE_STATS: "true"

services:
catalog:
Expand Down Expand Up @@ -83,6 +83,7 @@ services:
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
- TEMPORAL_CSRF_COOKIE_INSECURE=true
image: temporalio/ui:2.17.2
ports:
- 8085:8080
Expand Down
64 changes: 52 additions & 12 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,17 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}

log.Info("initializing table schema...")
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("initializing table schema...")
err = dest.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
}

log.Info("pulling records...")
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("pulling records...")

startTime := time.Now()
records, err := src.PullRecords(&model.PullRecordsRequest{
Expand All @@ -201,7 +205,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
}
if a.CatalogMirrorMonitor.IsActive() && len(records.Records) > 0 {
syncBatchID, err := dest.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil {
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}

Expand All @@ -220,11 +224,15 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo

// log the number of records
numRecords := len(records.Records)
log.Printf("pulled %d records", numRecords)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Printf("pulled %d records", numRecords)
activity.RecordHeartbeat(ctx, fmt.Sprintf("pulled %d records", numRecords))

if numRecords == 0 {
log.Info("no records to push")
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("no records to push")
return nil, nil
}

Expand All @@ -238,7 +246,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
log.Warnf("failed to push records: %v", err)
return nil, fmt.Errorf("failed to push records: %w", err)
}
log.Info("pushed records")
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pushed %d records", res.NumRecordsSynced)

err = a.CatalogMirrorMonitor.
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
Expand Down Expand Up @@ -363,8 +373,30 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}

// ReplicateQRepPartition replicates a QRepPartition from the source to the destination.
func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context,
func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
config *protos.QRepConfig,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
numPartitions := len(partitions.Partitions)
log.Infof("replicating partitions for job - %s - batch %d - size: %d\n",
config.FlowJobName, partitions.BatchId, numPartitions)
for i, p := range partitions.Partitions {
log.Infof("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId)
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
if err != nil {
return err
}
}

return nil
}

// ReplicateQRepPartition replicates a QRepPartition from the source to the destination.
func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
config *protos.QRepConfig,
idx int,
total int,
partition *protos.QRepPartition,
runUUID string,
) error {
Expand Down Expand Up @@ -402,7 +434,9 @@ func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context,
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords = int64(tmp)
if err != nil {
log.Errorf("failed to pull records: %v", err)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to pull records: %v", err)
goroutineErr = err
}
err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
Expand All @@ -420,7 +454,9 @@ func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to pull records: %w", err)
}
numRecords = int64(recordBatch.NumRecords)
log.Printf("pulled %d records\n", len(recordBatch.Records))
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Printf("pulled %d records\n", len(recordBatch.Records))

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
if err != nil {
Expand All @@ -434,7 +470,7 @@ func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context,
}

shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return fmt.Sprintf("syncing partition - %s", partition.PartitionId)
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})

defer func() {
Expand All @@ -447,15 +483,19 @@ func (a *FlowableActivity) ReplicateQRepPartition(ctx context.Context,
}

if res == 0 {
log.Printf("no records to push for partition %s\n", partition.PartitionId)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Printf("no records to push for partition %s\n", partition.PartitionId)
return nil
}

wg.Wait()
if goroutineErr != nil {
return goroutineErr
}
log.Printf("pushed %d records\n", res)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Printf("pushed %d records\n", res)
err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
if err != nil {
return err
Expand Down
12 changes: 9 additions & 3 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,23 @@ func (a *SnapshotActivity) SetupReplication(
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
log.Errorf("failed to setup replication: %v", err)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to setup replication: %v", err)
replicationErr <- err
return
}
}()

log.Info("waiting for slot to be created...")
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("waiting for slot to be created...")
var slotInfo *connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
log.Infof("slot '%s' created", slotInfo.SlotName)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("slot '%s' created", slotInfo.SlotName)
case err := <-replicationErr:
return nil, fmt.Errorf("failed to setup replication: %w", err)
}
Expand Down
65 changes: 30 additions & 35 deletions flow/concurrency/bound_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,49 @@ import (
)

type BoundSelector struct {
ctx workflow.Context
limit workflow.Channel
statusCh workflow.Channel
numFutures int
ctx workflow.Context
limit int
futures []workflow.Future
ferrors []error
}

func NewBoundSelector(limit int, total int, ctx workflow.Context) *BoundSelector {
return &BoundSelector{
ctx: ctx,
limit: workflow.NewBufferedChannel(ctx, limit),
statusCh: workflow.NewBufferedChannel(ctx, total),
numFutures: 0,
ctx: ctx,
limit: limit,
}
}

func (s *BoundSelector) SpawnChild(chCtx workflow.Context, w interface{}, args ...interface{}) {
s.numFutures++
workflow.Go(s.ctx, func(ctx workflow.Context) {
s.limit.Send(ctx, struct{}{})
future := workflow.ExecuteChildWorkflow(chCtx, w, args...)
err := future.Get(ctx, nil)
s.statusCh.Send(ctx, err)
s.limit.Receive(ctx, nil)
})
if len(s.futures) >= s.limit {
s.waitOne()
}

future := workflow.ExecuteChildWorkflow(chCtx, w, args...)
s.futures = append(s.futures, future)
}

func (s *BoundSelector) waitOne() {
if len(s.futures) == 0 {
return
}

f := s.futures[0]
s.futures = s.futures[1:]

err := f.Get(s.ctx, nil)
if err != nil {
s.ferrors = append(s.ferrors, err)
}
}

func (s *BoundSelector) Wait() error {
defer s.statusCh.Close()
defer s.limit.Close()

ferrors := make([]error, 0)
doneCount := 0

for doneCount < s.numFutures {
selector := workflow.NewSelector(s.ctx)
selector.AddReceive(s.statusCh, func(c workflow.ReceiveChannel, more bool) {
var err error
c.Receive(s.ctx, &err)
if err != nil {
ferrors = append(ferrors, err)
}
doneCount++
})
selector.Select(s.ctx)
for len(s.futures) > 0 {
s.waitOne()
}

if len(ferrors) > 0 {
return errors.Join(ferrors...)
if len(s.ferrors) > 0 {
return errors.Join(s.ferrors...)
}

return nil
Expand Down
10 changes: 9 additions & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ func (c *BigQueryConnector) SyncQRepRecords(
}

if done {
log.Infof("Partition %s has already been synced", partition.PartitionId)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"partitionID": partition.PartitionId,
}).Infof("Partition %s has already been synced", partition.PartitionId)
return 0, nil
}
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("QRep sync function called and partition existence checked for"+
" partition %s of destination table %s",
partition.PartitionId, destTable)

syncMode := config.SyncMode
switch syncMode {
Expand Down
Loading

0 comments on commit 74a3901

Please sign in to comment.