Skip to content

Commit

Permalink
switched to Slack alerts instead of erroring
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 19, 2024
1 parent e2933b3 commit c9dd1c0
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 42 deletions.
37 changes: 21 additions & 16 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(ctx, config)
if err != nil {
if config.OnlyAlertOnConstraintsFail && errors.Is(err, connpostgres.ErrCDCNotSupportedForTable) {
a.Alerter.AlertGeneric(ctx, config.FlowJobName+"-add-tables-failed",
fmt.Sprintf("failed to add tables for mirror %s: %v", config.FlowJobName, err))
return nil, nil
}
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}
Expand Down Expand Up @@ -191,9 +196,9 @@ func (a *FlowableActivity) CreateNormalizedTable(

numTablesSetup.Add(1)
if created {
logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
logger.Info("created table " + tableIdentifier)
} else {
logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
logger.Info("table already exists " + tableIdentifier)
}
}

Expand Down Expand Up @@ -232,14 +237,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
slotNameForMetrics := "peerflow_slot_" + input.FlowConnectionConfigs.FlowJobName
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
return "transferring records for job - " + jobName
})
defer shutdown()

Expand Down Expand Up @@ -425,7 +430,7 @@ func (a *FlowableActivity) StartNormalize(
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -493,7 +498,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
return "getting partitions for job - " + config.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -584,7 +589,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info(fmt.Sprintf("replicating partition %s", partition.PartitionId))
logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
Expand Down Expand Up @@ -644,7 +649,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
logger.Info(fmt.Sprintf("no records to push for partition %s", partition.PartitionId))
logger.Info("no records to push for partition " + partition.PartitionId)
pullCancel()
} else {
wg.Wait()
Expand Down Expand Up @@ -677,7 +682,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
return "consolidating partitions for job - " + config.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -880,26 +885,26 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("renaming tables for job - %s", config.FlowJobName)
return "renaming tables for job - " + config.FlowJobName
})
defer shutdown()

if config.Peer.Type == protos.DBType_SNOWFLAKE {
sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
return nil, errors.New("failed to cast connector to snowflake connector")
}
return sfConn.RenameTables(ctx, config)
} else if config.Peer.Type == protos.DBType_BIGQUERY {
bqConn, ok := dstConn.(*connbigquery.BigQueryConnector)
if !ok {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to cast connector to bigquery connector")
return nil, errors.New("failed to cast connector to bigquery connector")
}
return bqConn.RenameTables(ctx, config)
}
return nil, fmt.Errorf("rename tables is only supported on snowflake and bigquery")
return nil, errors.New("rename tables is only supported on snowflake and bigquery")
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
Expand All @@ -915,18 +920,18 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr
if req.Peer.Type == protos.DBType_SNOWFLAKE {
sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
return nil, errors.New("failed to cast connector to snowflake connector")
}
return sfConn.CreateTablesFromExisting(ctx, req)
} else if req.Peer.Type == protos.DBType_BIGQUERY {
bqConn, ok := dstConn.(*connbigquery.BigQueryConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to bigquery connector")
return nil, errors.New("failed to cast connector to bigquery connector")
}
return bqConn.CreateTablesFromExisting(ctx, req)
}
a.Alerter.LogFlowError(ctx, req.FlowJobName, err)
return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery")
return nil, errors.New("create tables from existing is only supported on snowflake and bigquery")
}

// ReplicateXminPartition replicates a XminPartition from the source to the destination.
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -169,7 +170,7 @@ func (p *PostgresCDCSource) replicationOptions() (*pglogrepl.StartReplicationOpt
pubOpt := fmt.Sprintf("publication_names '%s'", p.publication)
pluginArguments = append(pluginArguments, pubOpt)
} else {
return nil, fmt.Errorf("publication name is not set")
return nil, errors.New("publication name is not set")
}

return &pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil
Expand Down Expand Up @@ -259,7 +260,7 @@ func (p *PostgresCDCSource) consumeStream(

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage))
p.logger.Info("Sent Standby status message. " + numRowsProcessedMessage)
standByLastLogged = time.Now()
}
}
Expand Down
11 changes: 8 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connpostgres

import (
"context"
"errors"
"fmt"
"log/slog"
"regexp"
Expand Down Expand Up @@ -36,6 +37,8 @@ type PostgresConnector struct {
logger log.Logger
}

var ErrCDCNotSupportedForTable = errors.New("table has no primary keys and does not have REPLICA IDENTITY FULL")

func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

Expand Down Expand Up @@ -114,7 +117,7 @@ func (c *PostgresConnector) Conn() *pgx.Conn {
// ConnectionActive returns nil if the connection is active.
func (c *PostgresConnector) ConnectionActive(ctx context.Context) error {
if c.conn == nil {
return fmt.Errorf("connection is nil")
return errors.New("connection is nil")
}
pingErr := c.conn.Ping(ctx)
return pingErr
Expand Down Expand Up @@ -513,7 +516,9 @@ type SlotCheckResult struct {
}

// CreateRawTable creates a raw table, implementing the Connector interface.
func (c *PostgresConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
func (c *PostgresConnector) CreateRawTable(ctx context.Context,
req *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)

err := c.createMetadataSchema(ctx)
Expand Down Expand Up @@ -786,7 +791,7 @@ func (c *PostgresConnector) EnsurePullability(
// we only allow no primary key if the table has REPLICA IDENTITY FULL
// this is ok for replica identity index as we populate the primary key columns
if len(pKeyCols) == 0 && replicaIdentity != ReplicaIdentityFull {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
return nil, fmt.Errorf("%w: %s", ErrCDCNotSupportedForTable, schemaTable.String())
}

utils.RecordHeartbeat(ctx, fmt.Sprintf("ensured pullability table %s", tableName))
Expand Down
37 changes: 27 additions & 10 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!
cc: <!channel>`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)
"currently at %.2fMB!",
deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
Expand Down Expand Up @@ -130,30 +130,47 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!
cc: <!channel>`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)
" has exceeded threshold size of %%d connections, currently at %d connections!",
deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}

func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
func (a *Alerter) AlertGeneric(ctx context.Context, alertKey string, alertMessage string) {
if a.checkAndAddAlertToCatalog(ctx, alertKey, alertMessage) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

for _, slackAlertSender := range slackAlertSenders {
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, alertMessage)
}
}
}

func (a *Alerter) alertToSpecificSlackSender(ctx context.Context, slackAlertSender *slackAlertSender,
alertKey string, alertMessage string,
) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage+`
cc: <!channel>`)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
Expand Down
27 changes: 16 additions & 11 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,28 +152,33 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
continue
}

additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables))
for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables {
additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier)
}

ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
ensurePullabilityFuture := workflow.ExecuteActivity(
ensurePullabilityCtx,
flowable.EnsurePullability,
&protos.EnsurePullabilityBatchInput{
PeerConnectionConfig: cfg.Source,
FlowJobName: cfg.FlowJobName,
SourceTableIdentifiers: func() []string {
additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables))
for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables {
additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier)
}
return additionalSourceTables
}(),
CheckConstraints: true,
PeerConnectionConfig: cfg.Source,
FlowJobName: cfg.FlowJobName,
SourceTableIdentifiers: additionalSourceTables,
CheckConstraints: true,
OnlyAlertOnConstraintsFail: true,
})
if err := ensurePullabilityFuture.Get(ctx, nil); err != nil {
var ensurePullabilityOutput *protos.EnsurePullabilityBatchOutput
if err := ensurePullabilityFuture.Get(ctx, &ensurePullabilityOutput); err != nil {
w.logger.Error("failed to ensure pullability for additional tables: ", err)
return err
}
// if err == nil and output == nil, constraints failed, so ignore batch
if ensurePullabilityOutput == nil {
continue
}

alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ message EnsurePullabilityBatchInput {
string flow_job_name = 2;
repeated string source_table_identifiers = 3;
bool check_constraints = 4;
bool only_alert_on_constraints_fail = 5;
}

message PostgresTableIdentifier {
Expand Down

0 comments on commit c9dd1c0

Please sign in to comment.