Skip to content

Commit

Permalink
Merge branch 'main' into redpanda
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 14, 2024
2 parents 5466d06 + 578b1ed commit 4df0f80
Show file tree
Hide file tree
Showing 35 changed files with 977 additions and 714 deletions.
15 changes: 12 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ func (a *FlowableActivity) SyncFlow(
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
Expand Down Expand Up @@ -315,6 +314,7 @@ func (a *FlowableActivity) SyncFlow(
if err != nil {
return nil, err
}
logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
Expand Down Expand Up @@ -346,7 +346,11 @@ func (a *FlowableActivity) SyncFlow(
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed in pull records when: %w", err)
if temporal.IsApplicationError(err) {
return nil, err
} else {
return nil, fmt.Errorf("failed in pull records when: %w", err)
}
}
logger.Info("no records to push")

Expand Down Expand Up @@ -402,7 +406,11 @@ func (a *FlowableActivity) SyncFlow(
err = errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to pull records: %w", err)
if temporal.IsApplicationError(err) {
return nil, err
} else {
return nil, fmt.Errorf("failed to pull records: %w", err)
}
}

numRecords := res.NumRecordsSynced
Expand All @@ -411,6 +419,7 @@ func (a *FlowableActivity) SyncFlow(
logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint := recordBatch.GetLastCheckpoint()
srcConn.UpdateReplStateLastOffset(lastCheckpoint)

err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
Expand Down
161 changes: 102 additions & 59 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/jackc/pgx/v5"
Expand All @@ -23,32 +25,67 @@ type Alerter struct {
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
type AlertSenderConfig struct {
Id int64
Sender AlertSender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
"SELECT id,service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

var slackAlertSenders []*slackAlertSender
var serviceType, serviceConfig string
_, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error {
var alertSenderConfigs []AlertSenderConfig
var serviceType ServiceType
var serviceConfig string
var id int64
_, err = pgx.ForEachRow(rows, []any{&id, &serviceType, &serviceConfig}, func() error {
switch serviceType {
case "slack":
case SLACK:
var slackServiceConfig slackAlertConfig
err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal Slack service config: %w", err)
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}

alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)})
case EMAIL:
var replyToAddresses []string
if replyToEnvString := strings.TrimSpace(
peerdbenv.PeerDBAlertingEmailSenderReplyToAddresses()); replyToEnvString != "" {
replyToAddresses = strings.Split(replyToEnvString, ",")
}
emailServiceConfig := EmailAlertSenderConfig{
sourceEmail: peerdbenv.PeerDBAlertingEmailSenderSourceEmail(),
configurationSetName: peerdbenv.PeerDBAlertingEmailSenderConfigurationSet(),
replyToAddresses: replyToAddresses,
}
if emailServiceConfig.sourceEmail == "" {
return errors.New("missing sourceEmail for Email alerting service")
}
err = json.Unmarshal([]byte(serviceConfig), &emailServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}
var region *string
if envRegion := peerdbenv.PeerDBAlertingEmailSenderRegion(); envRegion != "" {
region = &envRegion
}

slackAlertSenders = append(slackAlertSenders, newSlackAlertSender(&slackServiceConfig))
alertSender, alertSenderErr := NewEmailAlertSenderWithNewClient(ctx, region, &emailServiceConfig)
if alertSenderErr != nil {
return fmt.Errorf("failed to initialize email alerter: %w", alertSenderErr)
}
alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: alertSender})
default:
return fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

return slackAlertSenders, nil
return alertSenderConfigs, nil
}

// doesn't take care of closing pool, needs to be done externally.
Expand All @@ -75,9 +112,9 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err))
return
}

Expand All @@ -89,29 +126,30 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, slackAlertSender.slotLagMBAlertThreshold)
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getSlotLagMBAlertThreshold() > 0 {
lowestSlotLagMBAlertThreshold = min(lowestSlotLagMBAlertThreshold, alertSender.Sender.getSlotLagMBAlertThreshold())
}
}

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertKey := fmt.Sprintf("%s Slot Lag Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName)
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!
cc: <!channel>`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
`currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if a.checkAndAddAlertToCatalog(ctx,
alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
if alertSenderConfig.Sender.getSlotLagMBAlertThreshold() > 0 {
if slotInfo.LagInMb > float32(alertSenderConfig.Sender.getSlotLagMBAlertThreshold()) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getSlotLagMBAlertThreshold()))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
}
}
Expand All @@ -121,52 +159,53 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
deploymentUIDPrefix = fmt.Sprintf("[%s] - ", peerdbenv.PeerDBDeploymentUID())
}

// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, slackAlertSender.openConnectionsAlertThreshold)
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.Sender.getOpenConnectionsAlertThreshold())
}
}

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertKey := fmt.Sprintf("%s Max Open Connections Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName)
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!
cc: <!channel>`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
` has exceeded threshold size of %%d connections, currently at %d connections!`,
deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if a.checkAndAddAlertToCatalog(ctx,
alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
if alertSenderConfig.Sender.getOpenConnectionsAlertThreshold() > 0 {
if openConnections.CurrentOpenConnections > int64(alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, alertSenderConfig.Sender.getOpenConnectionsAlertThreshold()))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToProvider(ctx, alertSenderConfig, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}
}

func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSenderConfig, alertKey string, alertMessage string) {
err := alertSenderConfig.Sender.sendAlert(ctx, alertKey, alertMessage)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
Expand All @@ -176,17 +215,17 @@ func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlert
// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
// returns true if alert added to catalog, so proceed with processing alerts to slack
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) bool {
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
return false
}

row := a.catalogPool.QueryRow(ctx,
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 AND alert_config_id=$2
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
alertKey, alertConfigId)
var createdTimestamp time.Time
err := row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
Expand All @@ -196,14 +235,18 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string

if time.Since(createdTimestamp) >= dur {
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message,alert_config_id) VALUES($1,$2,$3)",
alertKey, alertMessage, alertConfigId)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err))
return false
}
return true
}

logger.LoggerFromCtx(ctx).Info(
fmt.Sprintf("Skipped sending alerts: last alert was sent at %s, which was >=%s ago",
createdTimestamp.String(), dur.String()))
return false
}

Expand Down
Loading

0 comments on commit 4df0f80

Please sign in to comment.