Skip to content

Commit

Permalink
Merge branch 'main' into soft-delete-tests-bq
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 25, 2023
2 parents 49ef204 + b160753 commit 6d36913
Show file tree
Hide file tree
Showing 54 changed files with 1,375 additions and 930 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 2400s
gotestsum --format testname -- -p 8 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
29 changes: 22 additions & 7 deletions dev-peerdb.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
#!/bin/bash
set -Eeuo pipefail
#!/bin/sh
if test -z "$USE_PODMAN"
then
if ! command -v docker &> /dev/null
then
if command -v podman-compose
then
echo "docker could not be found on PATH, using podman-compose"
USE_PODMAN=1
else
echo "docker could not be found on PATH"
exit 1
fi
fi
fi

if ! command -v docker &> /dev/null
if test -z "$USE_PODMAN"
then
echo "docker could not be found on PATH"
exit 1
DOCKER="docker compose"
EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui"
else
DOCKER="podman-compose --podman-run-args=--replace"
EXTRA_ARGS=""
fi

export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD)
docker compose -f docker-compose-dev.yml up --build \
--no-attach temporal --no-attach pyroscope --no-attach temporal-ui
exec $DOCKER -f docker-compose-dev.yml up --build $EXTRA_ARGS
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready", "-d", "postgres", "-U", "postgres"]
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
timeout: 30s
retries: 5
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready", "-d", "postgres", "-U", "postgres"]
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
timeout: 30s
retries: 5
Expand Down
130 changes: 43 additions & 87 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
defer connectors.CloseConnector(dstConn)

if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, config.Name, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -111,6 +112,7 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -165,87 +167,15 @@ func (a *FlowableActivity) CreateNormalizedTable(
}
defer connectors.CloseConnector(conn)

return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
return setupNormalizedTablesOutput, nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
select {
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}
case <-ctx.Done():
return
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}
}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
Expand All @@ -256,6 +186,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
Expand All @@ -268,10 +199,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
if err != nil {
Expand All @@ -287,41 +214,45 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName
errGroup.Go(func() error {
return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
FlowJobName: flowName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
return dstConn.SetLastOffset(flowName, lastOffset)
},
})
})

hasRecords := !recordBatch.WaitAndCheckEmpty()
slog.InfoContext(ctx, fmt.Sprintf("the current sync flow has records: %v", hasRecords))
if a.CatalogPool != nil && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
syncBatchID, err := dstConn.GetLastSyncBatchID(flowName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}

err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, flowName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
}
Expand All @@ -330,6 +261,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
// wait for the pull goroutine to finish
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
}
slog.InfoContext(ctx, "no records to push")
Expand Down Expand Up @@ -358,11 +290,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to push records: %w", err)
}

err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
}

Expand Down Expand Up @@ -465,6 +399,7 @@ func (a *FlowableActivity) StartNormalize(
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return nil, fmt.Errorf("failed to normalized records: %w", err)
}

Expand Down Expand Up @@ -502,7 +437,13 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas(
}
defer connectors.CloseConnector(dest)

return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
err = dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return fmt.Errorf("failed to replay table schema deltas: %w", err)
}

return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand All @@ -513,7 +454,13 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *
}
defer connectors.CloseConnector(conn)

return conn.SetupQRepMetadataTables(config)
err = conn.SetupQRepMetadataTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

return nil
}

// GetQRepPartitions returns the partitions for a given QRepConfig.
Expand All @@ -538,6 +485,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,

partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
Expand Down Expand Up @@ -578,6 +526,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId))
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
}
Expand Down Expand Up @@ -717,6 +666,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config

err = dstConn.ConsolidateQRepPartitions(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}

Expand Down Expand Up @@ -791,6 +741,11 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
if !peerdbenv.PeerDBEnableWALHeartbeat() {
slog.InfoContext(ctx, "wal heartbeat is disabled")
return nil
}

sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()
Expand Down Expand Up @@ -1017,6 +972,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
} else {
err := errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

Expand Down
Loading

0 comments on commit 6d36913

Please sign in to comment.