Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log common errors to catalog for user acknowledgement #878

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 37 additions & 85 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
defer connectors.CloseConnector(dstConn)

if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, config.Name, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -111,6 +112,7 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -165,84 +167,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
}
defer connectors.CloseConnector(conn)

return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
select {
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}
case <-ctx.Done():
return
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}
return setupNormalizedTablesOutput, nil
}

// StartFlow implements StartFlow.
Expand All @@ -256,6 +187,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
Expand All @@ -268,10 +200,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
if err != nil {
Expand All @@ -287,9 +215,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName
errGroup.Go(func() error {
return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
FlowJobName: flowName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
Expand All @@ -301,27 +232,28 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
return dstConn.SetLastOffset(flowName, lastOffset)
},
})
})

hasRecords := !recordBatch.WaitAndCheckEmpty()
slog.InfoContext(ctx, fmt.Sprintf("the current sync flow has records: %v", hasRecords))
if a.CatalogPool != nil && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
syncBatchID, err := dstConn.GetLastSyncBatchID(flowName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}

err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, flowName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
}
Expand All @@ -330,6 +262,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
// wait for the pull goroutine to finish
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
}
slog.InfoContext(ctx, "no records to push")
Expand Down Expand Up @@ -358,11 +291,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to push records: %w", err)
}

err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
}

Expand Down Expand Up @@ -465,6 +400,7 @@ func (a *FlowableActivity) StartNormalize(
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return nil, fmt.Errorf("failed to normalized records: %w", err)
}

Expand Down Expand Up @@ -502,7 +438,13 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas(
}
defer connectors.CloseConnector(dest)

return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
err = dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
return fmt.Errorf("failed to replay table schema deltas: %w", err)
}

return nil
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand All @@ -513,7 +455,13 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *
}
defer connectors.CloseConnector(conn)

return conn.SetupQRepMetadataTables(config)
err = conn.SetupQRepMetadataTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

return nil
}

// GetQRepPartitions returns the partitions for a given QRepConfig.
Expand All @@ -538,6 +486,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,

partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
Expand Down Expand Up @@ -578,6 +527,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId))
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
}
Expand Down Expand Up @@ -717,6 +667,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config

err = dstConn.ConsolidateQRepPartitions(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}

Expand Down Expand Up @@ -1017,6 +968,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
} else {
err := errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

Expand Down
89 changes: 89 additions & 0 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package activities

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
select {
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}
case <-ctx.Done():
return
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}
}
11 changes: 11 additions & 0 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,14 @@ func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertM
return
}
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, errorWithStack, "error")
if err != nil {
a.logger.WarnContext(ctx, "failed to insert flow error", slog.Any("error", err))
return
}
}
10 changes: 10 additions & 0 deletions nexus/catalog/migrations/V17__mirror_errors.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE peerdb_stats.flow_errors (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
flow_name TEXT NOT NULL,
error_message TEXT NOT NULL,
error_type TEXT NOT NULL,
error_timestamp TIMESTAMP NOT NULL DEFAULT now(),
ack BOOLEAN NOT NULL DEFAULT FALSE
);

CREATE INDEX idx_flow_errors_flow_name ON peerdb_stats.flow_errors (flow_name);
Loading