Skip to content

Commit

Permalink
basic alerting, refactored to use slack-go instead (#866)
Browse files Browse the repository at this point in the history
For now, an alerting function logs an entry in an alerts table in
catalog, and also dispatches notifications to one or more slack
channels. Events are staggered to be atleast 15 minutes apart by
default, as long as the same "alert key" is used. This delay can be
configured via the `PEERDB_ALERTING_GAP_MINUTES` environment variable,
and alerts can be disabled by setting it to 0.

Logs alerts in the following 2 cases for now:
1) When computed slot size of a peer exceeds a threshold, which is
`5000MB` by default. This limit is configurable via the environment
variable `PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD`. Setting it to 0 should
disable this type of alerts entirely.
2) When the number of connections from the configured user for a peer
exceed a threshold, which is 5 by default. This limit is configurable by
the environment variable
`PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD`. Setting it to 0 should
disable this type of alerts entirely.

To configure alerting, insert a row into the
`peerdb_stats.alerting_config` table like this:

```
INSERT INTO peerdb_stats.alerting_config(service_type,service_config) VALUES('slack','{"auth_token": "SLACK_AUTH_TOKEN", "channel_ids": ["SLACK_CHANNEL","IDS_IN_ARRAY"]}');
```

---------

Co-authored-by: Philip Dubé <[email protected]>
Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2023
1 parent 65c585a commit 08861aa
Show file tree
Hide file tree
Showing 19 changed files with 602 additions and 75 deletions.
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ services:
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ services:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
40 changes: 38 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand All @@ -42,6 +43,7 @@ type SlotSnapshotSignal struct {

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
}

// CheckConnection implements CheckConnection.
Expand Down Expand Up @@ -174,10 +176,38 @@ func (a *FlowableActivity) handleSlotInfo(
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.Warn("warning: failed to get slot info", slog.Any("error", err))
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
Expand All @@ -190,7 +220,13 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
slotName string,
peerName string,
) {
timeout := 10 * time.Minute
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
Expand Down
2 changes: 2 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"

"github.com/grafana/pyroscope-go"
Expand Down Expand Up @@ -133,6 +134,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
})

err = w.Run(worker.InterruptCh())
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CDCPullConnector interface {

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(slotName string) ([]*protos.SlotInfo, error)

// GetOpenConnectionsForUser returns the number of open connections for the user configured in the peer.
GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error)
}

type CDCSyncConnector interface {
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ const (
)
%s src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`

dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=$1"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE mirror_job_name=$1"
getNumConnectionsForUser = "SELECT COUNT(*) FROM pg_stat_activity WHERE usename=$1 AND client_addr IS NOT NULL"
)

type ReplicaIdentityType rune
Expand Down
18 changes: 18 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,3 +895,21 @@ func (c *PostgresConnector) SendWALHeartbeat() error {

return nil
}

// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) {
row := c.pool.
QueryRow(c.ctx, getNumConnectionsForUser, c.config.User)

// COUNT() returns BIGINT
var result pgtype.Int8
err := row.Scan(&result)
if err != nil {
return nil, fmt.Errorf("error while reading result row: %w", err)
}

return &protos.GetOpenConnectionsForUserResult{
UserName: c.config.User,
CurrentOpenConnections: result.Int64,
}, nil
}
6 changes: 5 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -59,7 +60,10 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t
env.RegisterWorkflow(peerflow.QRepFlowWorkflow)
env.RegisterWorkflow(peerflow.XminFlowWorkflow)
env.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
env.RegisterActivity(&activities.FlowableActivity{CatalogPool: conn})
env.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
})
env.RegisterActivity(&activities.SnapshotActivity{})
}

Expand Down
Loading

0 comments on commit 08861aa

Please sign in to comment.