Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into cdc-parallel-sync-nor…
Browse files Browse the repository at this point in the history
…malize
  • Loading branch information
serprex committed Dec 12, 2023
2 parents bf2effe + bb49389 commit a1e8d05
Show file tree
Hide file tree
Showing 55 changed files with 1,708 additions and 693 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
docker-build:
strategy:
matrix:
runner: [ubuntu-latest]
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
matrix:
runner: [ubicloud-standard-8-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 40
timeout-minutes: 30
services:
pg_cdc:
image: imresamu/postgis:15-3.4-alpine
Expand Down Expand Up @@ -80,13 +80,14 @@ jobs:
name: "gcs_creds.json"
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
- name: create hstore extension, increase logical replication limits, and setup catalog database
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
(cat ../nexus/catalog/migrations/V{?,??}__* | docker exec -i pg_cdc psql -h localhost -p 5432 -U postgres) &&
docker restart pg_cdc
working-directory: ./flow
env:
Expand Down
84 changes: 45 additions & 39 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
Expand All @@ -40,7 +40,7 @@ type SlotSnapshotSignal struct {
}

type FlowableActivity struct {
CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor
CatalogPool *pgxpool.Pool
}

// CheckConnection implements CheckConnection.
Expand Down Expand Up @@ -114,7 +114,7 @@ func (a *FlowableActivity) CreateRawTable(
ctx context.Context,
config *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -125,7 +125,7 @@ func (a *FlowableActivity) CreateRawTable(
if err != nil {
return nil, err
}
err = a.CatalogMirrorMonitor.InitializeCDCFlow(ctx, config.FlowJobName)
err = monitoring.InitializeCDCFlow(ctx, a.CatalogPool, config.FlowJobName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (a *FlowableActivity) handleSlotInfo(
}

if len(slotInfo) != 0 {
return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand Down Expand Up @@ -275,13 +275,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("the current sync flow has records: %v", hasRecords)

if a.CatalogMirrorMonitor.IsActive() && hasRecords {
if a.CatalogPool != nil && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}

err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
Expand Down Expand Up @@ -346,8 +346,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = a.CatalogMirrorMonitor.UpdateNumRowsAndEndLSNForCDCBatch(
err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID,
uint32(numRecords),
Expand All @@ -357,13 +358,17 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, err
}

err = a.CatalogMirrorMonitor.
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, pglogrepl.LSN(lastCheckpoint))
err = monitoring.UpdateLatestLSNAtTargetForCDCFlow(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(lastCheckpoint),
)
if err != nil {
return nil, err
}
if res.TableNameRowsMapping != nil {
err = a.CatalogMirrorMonitor.AddCDCBatchTablesForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID, res.TableNameRowsMapping)
if err != nil {
return nil, err
Expand Down Expand Up @@ -396,7 +401,7 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
syncBatchID)
return nil, err
} else if err != nil {
Expand Down Expand Up @@ -430,8 +435,12 @@ func (a *FlowableActivity) StartNormalize(

// normalize flow did not run due to no records, no need to update end time.
if res.Done {
err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
err = monitoring.UpdateEndTimeForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -496,8 +505,9 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
err = a.CatalogMirrorMonitor.InitializeQRepRun(
err = monitoring.InitializeQRepRun(
ctx,
a.CatalogPool,
config,
runUUID,
partitions,
Expand All @@ -518,7 +528,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID)
err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID)
if err != nil {
return fmt.Errorf("failed to update start time for qrep run: %w", err)
}
Expand All @@ -545,12 +555,13 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now())
err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
pullCtx, pullCancel := context.WithCancel(ctx)
srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
Expand Down Expand Up @@ -583,7 +594,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}).Errorf("failed to pull records: %v", err)
goroutineErr = err
} else {
err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
log.Errorf("%v", err)
goroutineErr = err
Expand All @@ -603,7 +614,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
"flowName": config.FlowJobName,
}).Infof("pulled %d records\n", len(recordBatch.Records))

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
return err
}
Expand All @@ -628,6 +639,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
pullCancel()
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("no records to push for partition %s\n", partition.PartitionId)
Expand All @@ -637,7 +649,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return goroutineErr
}

err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return err
}
Expand All @@ -647,7 +659,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}).Infof("pushed %d records\n", rowsSynced)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return err
}
Expand All @@ -659,7 +671,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
runUUID string) error {
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
} else if err != nil {
return err
}
Expand All @@ -677,7 +689,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
return err
}

return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
}

func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
Expand Down Expand Up @@ -715,14 +727,8 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
return nil
}

func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv()
if catalogErr != nil {
return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr)
}
defer catalogPool.Close()

optionRows, err := catalogPool.Query(ctx, `
func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
optionRows, err := a.CatalogPool.Query(ctx, `
SELECT DISTINCT p.name, p.options
FROM peers p
JOIN flows f ON p.id = f.source_peer
Expand Down Expand Up @@ -764,7 +770,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
log.Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
pgPeers, err := getPostgresPeerConfigs(ctx)
pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." +
"Skipping walheartbeat send. error encountered: " + err.Error())
Expand Down Expand Up @@ -946,17 +952,17 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}},
}
}
updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}

err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime)
err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords))
err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
log.Errorf("%v", err)
return err
Expand Down Expand Up @@ -988,7 +994,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return 0, err
}

err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}
Expand All @@ -998,7 +1004,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}).Infof("pushed %d records\n", rowsSynced)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return 0, err
}
Expand Down
1 change: 0 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func APIMain(args *APIServerParams) error {
}

flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)
defer flowHandler.Close()

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
Expand Down
Loading

0 comments on commit a1e8d05

Please sign in to comment.