Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into cdc-parallel-sync-nor…
Browse files Browse the repository at this point in the history
…malize
  • Loading branch information
serprex committed Dec 23, 2023
2 parents 6af635e + be6b5c2 commit 2f5253a
Show file tree
Hide file tree
Showing 46 changed files with 1,259 additions and 821 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 1500s
gotestsum --format testname -- -p 8 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
129 changes: 43 additions & 86 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
defer connectors.CloseConnector(dstConn)

if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, config.Name, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -111,6 +112,7 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -165,84 +167,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
}
defer connectors.CloseConnector(conn)

return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
select {
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}
case <-ctx.Done():
return
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}
return setupNormalizedTablesOutput, nil
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
Expand All @@ -255,6 +186,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
Expand All @@ -267,10 +199,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
if err != nil {
Expand All @@ -286,41 +214,45 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName
errGroup.Go(func() error {
return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
FlowJobName: flowName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
return dstConn.SetLastOffset(flowName, lastOffset)
},
})
})

hasRecords := !recordBatch.WaitAndCheckEmpty()
slog.InfoContext(ctx, fmt.Sprintf("the current sync flow has records: %v", hasRecords))
if a.CatalogPool != nil && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
syncBatchID, err := dstConn.GetLastSyncBatchID(flowName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}

err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, flowName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
}
Expand All @@ -329,6 +261,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
// wait for the pull goroutine to finish
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
}
slog.InfoContext(ctx, "no records to push")
Expand Down Expand Up @@ -357,11 +290,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to push records: %w", err)
}

err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
}

Expand Down Expand Up @@ -461,6 +396,7 @@ func (a *FlowableActivity) StartNormalize(
SyncBatchID: syncBatchID,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return nil, fmt.Errorf("failed to normalized records: %w", err)
}

Expand Down Expand Up @@ -498,7 +434,13 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas(
}
defer connectors.CloseConnector(dest)

return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
err = dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return fmt.Errorf("failed to replay table schema deltas: %w", err)
}

return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand All @@ -509,7 +451,13 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *
}
defer connectors.CloseConnector(conn)

return conn.SetupQRepMetadataTables(config)
err = conn.SetupQRepMetadataTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

return nil
}

// GetQRepPartitions returns the partitions for a given QRepConfig.
Expand All @@ -534,6 +482,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,

partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
Expand Down Expand Up @@ -574,6 +523,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId))
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
}
Expand Down Expand Up @@ -713,6 +663,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config

err = dstConn.ConsolidateQRepPartitions(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}

Expand Down Expand Up @@ -787,6 +738,11 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
if !peerdbenv.PeerDBEnableWALHeartbeat() {
slog.InfoContext(ctx, "wal heartbeat is disabled")
return nil
}

sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()
Expand Down Expand Up @@ -1013,6 +969,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
} else {
err := errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

Expand Down
89 changes: 89 additions & 0 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package activities

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
select {
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}
case <-ctx.Done():
return
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}
}
4 changes: 2 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
return &protos.PeerSlotResponse{SlotData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
Expand All @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetStatInfo(
return &protos.PeerStatResponse{StatData: nil}, err
}

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false)
if err != nil {
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
Expand Down
Loading

0 comments on commit 2f5253a

Please sign in to comment.