From 0a86a30f77a4ca0cd4d884e731a7d98532bb511e Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 22 Dec 2023 09:04:25 -0500 Subject: [PATCH 1/2] log common errors to catalog for user acknowledgement --- flow/activities/flowable.go | 122 ++++++------------ flow/activities/slot.go | 89 +++++++++++++ flow/shared/alerting/alerting.go | 11 ++ .../migrations/V17__alert_has_mirror_opt.sql | 5 + 4 files changed, 142 insertions(+), 85 deletions(-) create mode 100644 flow/activities/slot.go create mode 100644 nexus/catalog/migrations/V17__alert_has_mirror_opt.sql diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 982364ba58..8fe8e4be2b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -73,6 +73,7 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot defer connectors.CloseConnector(dstConn) if err := dstConn.SetupMetadataTables(); err != nil { + a.Alerter.LogFlowError(ctx, config.Name, err) return fmt.Errorf("failed to setup metadata tables: %w", err) } @@ -111,6 +112,7 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(config) if err != nil { + a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -165,84 +167,13 @@ func (a *FlowableActivity) CreateNormalizedTable( } defer connectors.CloseConnector(conn) - return conn.SetupNormalizedTables(config) -} - -func (a *FlowableActivity) handleSlotInfo( - ctx context.Context, - srcConn connectors.CDCPullConnector, - slotName string, - peerName string, -) error { - slotInfo, err := srcConn.GetSlotInfo(slotName) + setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config) if err != nil { - slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err)) - return err - } - - deploymentUIDPrefix := "" - if peerdbenv.PeerDBDeploymentUID() != "" { - deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) + a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + return nil, fmt.Errorf("failed to setup normalized tables: %w", err) } - slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold() - if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { - a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), - fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! -cc: `, - deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) - } - - // Also handles alerts for PeerDB user connections exceeding a given limit here - maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold() - res, err := srcConn.GetOpenConnectionsForUser() - if err != nil { - slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) - return err - } - if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { - a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), - fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ - ` has exceeded threshold size of %d connections, currently at %d connections! -cc: `, - deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) - } - - if len(slotInfo) != 0 { - return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) - } - return nil -} - -func (a *FlowableActivity) recordSlotSizePeriodically( - ctx context.Context, - srcConn connectors.CDCPullConnector, - slotName string, - peerName string, -) { - // ensures slot info is logged at least once per SyncFlow - err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) - if err != nil { - return - } - - timeout := 5 * time.Minute - ticker := time.NewTicker(timeout) - - defer ticker.Stop() - for { - select { - case <-ticker.C: - err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) - if err != nil { - return - } - case <-ctx.Done(): - return - } - ticker.Stop() - ticker = time.NewTicker(timeout) - } + return setupNormalizedTablesOutput, nil } // StartFlow implements StartFlow. @@ -256,6 +187,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed to get destination connector: %w", err) } defer connectors.CloseConnector(dstConn) + slog.InfoContext(ctx, "initializing table schema...") err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping) if err != nil { @@ -268,10 +200,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - recordBatch := model.NewCDCRecordStream() - - startTime := time.Now() - errGroup, errCtx := errgroup.WithContext(ctx) srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source) if err != nil { @@ -287,9 +215,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) // start a goroutine to pull records from the source + recordBatch := model.NewCDCRecordStream() + startTime := time.Now() + flowName := input.FlowConnectionConfigs.FlowJobName errGroup.Go(func() error { return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{ - FlowJobName: input.FlowConnectionConfigs.FlowJobName, + FlowJobName: flowName, SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, @@ -301,7 +232,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, RelationMessageMapping: input.RelationMessageMapping, RecordStream: recordBatch, SetLastOffset: func(lastOffset int64) error { - return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset) + return dstConn.SetLastOffset(flowName, lastOffset) }, }) }) @@ -309,12 +240,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, hasRecords := !recordBatch.WaitAndCheckEmpty() slog.InfoContext(ctx, fmt.Sprintf("the current sync flow has records: %v", hasRecords)) if a.CatalogPool != nil && hasRecords { - syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) + syncBatchID, err := dstConn.GetLastSyncBatchID(flowName) if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { return nil, err } - err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, flowName, monitoring.CDCBatchInfo{ BatchID: syncBatchID + 1, RowsInBatch: 0, @@ -322,6 +253,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, StartTime: startTime, }) if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } } @@ -330,6 +262,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, // wait for the pull goroutine to finish err = errGroup.Wait() if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to pull records: %w", err) } slog.InfoContext(ctx, "no records to push") @@ -358,11 +291,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) if err != nil { slog.Warn("failed to push records", slog.Any("error", err)) + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to push records: %w", err) } err = errGroup.Wait() if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to pull records: %w", err) } @@ -465,6 +400,7 @@ func (a *FlowableActivity) StartNormalize( SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName, }) if err != nil { + a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) return nil, fmt.Errorf("failed to normalized records: %w", err) } @@ -502,7 +438,13 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas( } defer connectors.CloseConnector(dest) - return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas) + err = dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas) + if err != nil { + a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) + return fmt.Errorf("failed to replay table schema deltas: %w", err) + } + + return nil } // SetupQRepMetadataTables sets up the metadata tables for QReplication. @@ -513,7 +455,13 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config * } defer connectors.CloseConnector(conn) - return conn.SetupQRepMetadataTables(config) + err = conn.SetupQRepMetadataTables(config) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to setup metadata tables: %w", err) + } + + return nil } // GetQRepPartitions returns the partitions for a given QRepConfig. @@ -538,6 +486,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, partitions, err := srcConn.GetQRepPartitions(config, last) if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to get partitions from source: %w", err) } if len(partitions) > 0 { @@ -578,6 +527,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId)) err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID) if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } } @@ -717,6 +667,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config err = dstConn.ConsolidateQRepPartitions(config) if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } @@ -1017,6 +968,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } else { err := errGroup.Wait() if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return 0, err } diff --git a/flow/activities/slot.go b/flow/activities/slot.go new file mode 100644 index 0000000000..cf1375b4e4 --- /dev/null +++ b/flow/activities/slot.go @@ -0,0 +1,89 @@ +package activities + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/PeerDB-io/peer-flow/connectors" + "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func (a *FlowableActivity) handleSlotInfo( + ctx context.Context, + srcConn connectors.CDCPullConnector, + slotName string, + peerName string, +) error { + slotInfo, err := srcConn.GetSlotInfo(slotName) + if err != nil { + slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err)) + return err + } + + deploymentUIDPrefix := "" + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) + } + + slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold() + if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { + a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), + fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! +cc: `, + deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) + } + + // Also handles alerts for PeerDB user connections exceeding a given limit here + maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold() + res, err := srcConn.GetOpenConnectionsForUser() + if err != nil { + slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) + return err + } + if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { + a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), + fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ + ` has exceeded threshold size of %d connections, currently at %d connections! +cc: `, + deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) + } + + if len(slotInfo) != 0 { + return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) + } + return nil +} + +func (a *FlowableActivity) recordSlotSizePeriodically( + ctx context.Context, + srcConn connectors.CDCPullConnector, + slotName string, + peerName string, +) { + // ensures slot info is logged at least once per SyncFlow + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } + + timeout := 5 * time.Minute + ticker := time.NewTicker(timeout) + + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } + case <-ctx.Done(): + return + } + ticker.Stop() + ticker = time.NewTicker(timeout) + } +} diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 394623825d..274a420b69 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -112,3 +112,14 @@ func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertM return } } + +func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { + errorWithStack := fmt.Sprintf("%+v", err) + _, err = a.catalogPool.Exec(ctx, + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message, flow_name) VALUES($1,$2,$3)", + "flow_error", errorWithStack, flowName) + if err != nil { + a.logger.WarnContext(ctx, "failed to insert alert", slog.Any("error", err)) + return + } +} diff --git a/nexus/catalog/migrations/V17__alert_has_mirror_opt.sql b/nexus/catalog/migrations/V17__alert_has_mirror_opt.sql new file mode 100644 index 0000000000..6beeb7a94c --- /dev/null +++ b/nexus/catalog/migrations/V17__alert_has_mirror_opt.sql @@ -0,0 +1,5 @@ +ALTER TABLE peerdb_state.alerts_v1 +ADD COLUMN IF NOT EXISTS flow_name TEXT +ADD COLUMN IF NOT EXISTS ack BOOLEAN DEFAULT FALSE; + +CREATE INDEX alerts_v1_flow_name_idx ON peerdb_state.alerts_v1 (flow_name); From d668ec3aa85f2e48790fc24a9f43e3d637124a2e Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 22 Dec 2023 09:22:07 -0500 Subject: [PATCH 2/2] fix --- flow/shared/alerting/alerting.go | 6 +++--- nexus/catalog/migrations/V17__alert_has_mirror_opt.sql | 5 ----- nexus/catalog/migrations/V17__mirror_errors.sql | 10 ++++++++++ 3 files changed, 13 insertions(+), 8 deletions(-) delete mode 100644 nexus/catalog/migrations/V17__alert_has_mirror_opt.sql create mode 100644 nexus/catalog/migrations/V17__mirror_errors.sql diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 274a420b69..50608087d2 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -116,10 +116,10 @@ func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertM func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { errorWithStack := fmt.Sprintf("%+v", err) _, err = a.catalogPool.Exec(ctx, - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message, flow_name) VALUES($1,$2,$3)", - "flow_error", errorWithStack, flowName) + "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", + flowName, errorWithStack, "error") if err != nil { - a.logger.WarnContext(ctx, "failed to insert alert", slog.Any("error", err)) + a.logger.WarnContext(ctx, "failed to insert flow error", slog.Any("error", err)) return } } diff --git a/nexus/catalog/migrations/V17__alert_has_mirror_opt.sql b/nexus/catalog/migrations/V17__alert_has_mirror_opt.sql deleted file mode 100644 index 6beeb7a94c..0000000000 --- a/nexus/catalog/migrations/V17__alert_has_mirror_opt.sql +++ /dev/null @@ -1,5 +0,0 @@ -ALTER TABLE peerdb_state.alerts_v1 -ADD COLUMN IF NOT EXISTS flow_name TEXT -ADD COLUMN IF NOT EXISTS ack BOOLEAN DEFAULT FALSE; - -CREATE INDEX alerts_v1_flow_name_idx ON peerdb_state.alerts_v1 (flow_name); diff --git a/nexus/catalog/migrations/V17__mirror_errors.sql b/nexus/catalog/migrations/V17__mirror_errors.sql new file mode 100644 index 0000000000..06f2352ea2 --- /dev/null +++ b/nexus/catalog/migrations/V17__mirror_errors.sql @@ -0,0 +1,10 @@ +CREATE TABLE peerdb_stats.flow_errors ( + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + flow_name TEXT NOT NULL, + error_message TEXT NOT NULL, + error_type TEXT NOT NULL, + error_timestamp TIMESTAMP NOT NULL DEFAULT now(), + ack BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE INDEX idx_flow_errors_flow_name ON peerdb_stats.flow_errors (flow_name);