From 08861aaa54f9e5ee1f22849f2bbf72f518120522 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 21 Dec 2023 23:52:19 +0530 Subject: [PATCH] basic alerting, refactored to use slack-go instead (#866) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For now, an alerting function logs an entry in an alerts table in catalog, and also dispatches notifications to one or more slack channels. Events are staggered to be atleast 15 minutes apart by default, as long as the same "alert key" is used. This delay can be configured via the `PEERDB_ALERTING_GAP_MINUTES` environment variable, and alerts can be disabled by setting it to 0. Logs alerts in the following 2 cases for now: 1) When computed slot size of a peer exceeds a threshold, which is `5000MB` by default. This limit is configurable via the environment variable `PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD`. Setting it to 0 should disable this type of alerts entirely. 2) When the number of connections from the configured user for a peer exceed a threshold, which is 5 by default. This limit is configurable by the environment variable `PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD`. Setting it to 0 should disable this type of alerts entirely. To configure alerting, insert a row into the `peerdb_stats.alerting_config` table like this: ``` INSERT INTO peerdb_stats.alerting_config(service_type,service_config) VALUES('slack','{"auth_token": "SLACK_AUTH_TOKEN", "channel_ids": ["SLACK_CHANNEL","IDS_IN_ARRAY"]}'); ``` --------- Co-authored-by: Philip Dubé Co-authored-by: Kaushik Iska --- docker-compose-dev.yml | 2 +- docker-compose.yml | 2 +- flow/activities/flowable.go | 40 +++- flow/cmd/worker.go | 2 + flow/connectors/core.go | 3 + flow/connectors/postgres/client.go | 5 +- flow/connectors/postgres/postgres.go | 18 ++ flow/e2e/test_utils.go | 6 +- flow/generated/protos/flow.pb.go | 211 ++++++++++++------ flow/go.mod | 2 + flow/go.sum | 8 + flow/peerdbenv/config.go | 16 ++ flow/shared/alerting/alerting.go | 101 +++++++++ flow/shared/alerting/slack_alert_sender.go | 38 ++++ .../migrations/V16__alerting_config_init.sql | 13 ++ nexus/pt/src/peerdb_flow.rs | 8 + nexus/pt/src/peerdb_flow.serde.rs | 116 ++++++++++ protos/flow.proto | 5 + ui/grpc_generated/flow.ts | 81 +++++++ 19 files changed, 602 insertions(+), 75 deletions(-) create mode 100644 flow/shared/alerting/alerting.go create mode 100644 flow/shared/alerting/slack_alert_sender.go create mode 100644 nexus/catalog/migrations/V16__alerting_config_init.sql diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index f5d8942ba1..158483a7eb 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -133,7 +133,7 @@ services: dockerfile: stacks/flow.Dockerfile target: flow-snapshot-worker environment: - <<: [*flow-worker-env] + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/docker-compose.yml b/docker-compose.yml index 5645f2b1e4..add4bb2db1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -112,7 +112,7 @@ services: container_name: flow-snapshot-worker image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev environment: - <<: [*flow-worker-env] + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 11a6d37895..b5cf8e8973 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -19,6 +19,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/shared/alerting" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" @@ -42,6 +43,7 @@ type SlotSnapshotSignal struct { type FlowableActivity struct { CatalogPool *pgxpool.Pool + Alerter *alerting.Alerter } // CheckConnection implements CheckConnection. @@ -174,10 +176,38 @@ func (a *FlowableActivity) handleSlotInfo( ) error { slotInfo, err := srcConn.GetSlotInfo(slotName) if err != nil { - slog.Warn("warning: failed to get slot info", slog.Any("error", err)) + slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err)) return err } + deploymentUIDPrefix := "" + if peerdbenv.GetPeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID()) + } + + slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold() + if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { + a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), + fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! +cc: `, + deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) + } + + // Also handles alerts for PeerDB user connections exceeding a given limit here + maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold() + res, err := srcConn.GetOpenConnectionsForUser() + if err != nil { + slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) + return err + } + if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { + a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), + fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ + ` has exceeded threshold size of %d connections, currently at %d connections! +cc: `, + deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) + } + if len(slotInfo) != 0 { return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) } @@ -190,7 +220,13 @@ func (a *FlowableActivity) recordSlotSizePeriodically( slotName string, peerName string, ) { - timeout := 10 * time.Minute + // ensures slot info is logged at least once per SyncFlow + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } + + timeout := 5 * time.Minute ticker := time.NewTicker(timeout) defer ticker.Stop() diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index ba6e0d0e18..a42ac76b47 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/grafana/pyroscope-go" @@ -133,6 +134,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, + Alerter: alerting.NewAlerter(conn), }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index c3fb138398..e5efec63ca 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -48,6 +48,9 @@ type CDCPullConnector interface { // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) + + // GetOpenConnectionsForUser returns the number of open connections for the user configured in the peer. + GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) } type CDCSyncConnector interface { diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 9f516a49c6..dc604d5631 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -75,8 +75,9 @@ const ( ) %s src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2` - dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" - deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=$1" + dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" + deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE mirror_job_name=$1" + getNumConnectionsForUser = "SELECT COUNT(*) FROM pg_stat_activity WHERE usename=$1 AND client_addr IS NOT NULL" ) type ReplicaIdentityType rune diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index b848c5a5b5..a46005fe01 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -895,3 +895,21 @@ func (c *PostgresConnector) SendWALHeartbeat() error { return nil } + +// GetLastOffset returns the last synced offset for a job. +func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) { + row := c.pool. + QueryRow(c.ctx, getNumConnectionsForUser, c.config.User) + + // COUNT() returns BIGINT + var result pgtype.Int8 + err := row.Scan(&result) + if err != nil { + return nil, fmt.Errorf("error while reading result row: %w", err) + } + + return &protos.GetOpenConnectionsForUserResult{ + UserName: c.config.User, + CurrentOpenConnections: result.Int64, + }, nil +} diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 0aa1c12242..13ca8044e5 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -17,6 +17,7 @@ import ( "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" @@ -59,7 +60,10 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t env.RegisterWorkflow(peerflow.QRepFlowWorkflow) env.RegisterWorkflow(peerflow.XminFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) - env.RegisterActivity(&activities.FlowableActivity{CatalogPool: conn}) + env.RegisterActivity(&activities.FlowableActivity{ + CatalogPool: conn, + Alerter: alerting.NewAlerter(conn), + }) env.RegisterActivity(&activities.SnapshotActivity{}) } diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 3cb146df01..27d2bb5c04 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -3365,6 +3365,61 @@ func (x *PeerDBColumns) GetSoftDelete() bool { return false } +type GetOpenConnectionsForUserResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UserName string `protobuf:"bytes,1,opt,name=user_name,json=userName,proto3" json:"user_name,omitempty"` + CurrentOpenConnections int64 `protobuf:"varint,2,opt,name=current_open_connections,json=currentOpenConnections,proto3" json:"current_open_connections,omitempty"` +} + +func (x *GetOpenConnectionsForUserResult) Reset() { + *x = GetOpenConnectionsForUserResult{} + if protoimpl.UnsafeEnabled { + mi := &file_flow_proto_msgTypes[49] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetOpenConnectionsForUserResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetOpenConnectionsForUserResult) ProtoMessage() {} + +func (x *GetOpenConnectionsForUserResult) ProtoReflect() protoreflect.Message { + mi := &file_flow_proto_msgTypes[49] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetOpenConnectionsForUserResult.ProtoReflect.Descriptor instead. +func (*GetOpenConnectionsForUserResult) Descriptor() ([]byte, []int) { + return file_flow_proto_rawDescGZIP(), []int{49} +} + +func (x *GetOpenConnectionsForUserResult) GetUserName() string { + if x != nil { + return x.UserName + } + return "" +} + +func (x *GetOpenConnectionsForUserResult) GetCurrentOpenConnections() int64 { + if x != nil { + return x.CurrentOpenConnections + } + return 0 +} + var File_flow_proto protoreflect.FileDescriptor var file_flow_proto_rawDesc = []byte{ @@ -4056,27 +4111,34 @@ var file_flow_proto_rawDesc = []byte{ 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x41, 0x74, 0x43, 0x6f, 0x6c, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x66, 0x74, 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, - 0x6f, 0x66, 0x74, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, - 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, - 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, - 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, - 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, - 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, - 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, - 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, - 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, - 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, - 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, - 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, - 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, - 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, - 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, - 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x6f, 0x66, 0x74, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x22, 0x78, 0x0a, 0x1f, 0x47, 0x65, 0x74, + 0x4f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x46, + 0x6f, 0x72, 0x55, 0x73, 0x65, 0x72, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1b, 0x0a, 0x09, + 0x75, 0x73, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x75, 0x73, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x18, 0x63, 0x75, 0x72, + 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x6f, 0x70, 0x65, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x16, 0x63, 0x75, 0x72, + 0x72, 0x65, 0x6e, 0x74, 0x4f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, + 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, + 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, + 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, + 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, + 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, + 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, + 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, + 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, + 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, + 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, + 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, + 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, + 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, + 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, + 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, + 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, + 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -4092,7 +4154,7 @@ func file_flow_proto_rawDescGZIP() []byte { } var file_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 61) +var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 62) var file_flow_proto_goTypes = []interface{}{ (QRepSyncMode)(0), // 0: peerdb_flow.QRepSyncMode (QRepWriteType)(0), // 1: peerdb_flow.QRepWriteType @@ -4145,74 +4207,75 @@ var file_flow_proto_goTypes = []interface{}{ (*ReplayTableSchemaDeltaInput)(nil), // 48: peerdb_flow.ReplayTableSchemaDeltaInput (*QRepFlowState)(nil), // 49: peerdb_flow.QRepFlowState (*PeerDBColumns)(nil), // 50: peerdb_flow.PeerDBColumns - nil, // 51: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - nil, // 52: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - nil, // 53: peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry - nil, // 54: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - nil, // 55: peerdb_flow.StartFlowInput.RelationMessageMappingEntry - nil, // 56: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - nil, // 57: peerdb_flow.SetupReplicationInput.TableNameMappingEntry - nil, // 58: peerdb_flow.CreateRawTableInput.TableNameMappingEntry - nil, // 59: peerdb_flow.TableSchema.ColumnsEntry - nil, // 60: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - nil, // 61: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - nil, // 62: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - (*Peer)(nil), // 63: peerdb_peers.Peer - (*timestamppb.Timestamp)(nil), // 64: google.protobuf.Timestamp + (*GetOpenConnectionsForUserResult)(nil), // 51: peerdb_flow.GetOpenConnectionsForUserResult + nil, // 52: peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + nil, // 53: peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + nil, // 54: peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry + nil, // 55: peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + nil, // 56: peerdb_flow.StartFlowInput.RelationMessageMappingEntry + nil, // 57: peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + nil, // 58: peerdb_flow.SetupReplicationInput.TableNameMappingEntry + nil, // 59: peerdb_flow.CreateRawTableInput.TableNameMappingEntry + nil, // 60: peerdb_flow.TableSchema.ColumnsEntry + nil, // 61: peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + nil, // 62: peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + nil, // 63: peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + (*Peer)(nil), // 64: peerdb_peers.Peer + (*timestamppb.Timestamp)(nil), // 65: google.protobuf.Timestamp } var file_flow_proto_depIdxs = []int32{ 3, // 0: peerdb_flow.RelationMessage.columns:type_name -> peerdb_flow.RelationMessageColumn - 63, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer - 63, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer + 64, // 1: peerdb_flow.FlowConnectionConfigs.source:type_name -> peerdb_peers.Peer + 64, // 2: peerdb_flow.FlowConnectionConfigs.destination:type_name -> peerdb_peers.Peer 28, // 3: peerdb_flow.FlowConnectionConfigs.table_schema:type_name -> peerdb_flow.TableSchema 5, // 4: peerdb_flow.FlowConnectionConfigs.table_mappings:type_name -> peerdb_flow.TableMapping - 51, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry - 52, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry - 63, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer + 52, // 5: peerdb_flow.FlowConnectionConfigs.src_table_id_name_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.SrcTableIdNameMappingEntry + 53, // 6: peerdb_flow.FlowConnectionConfigs.table_name_schema_mapping:type_name -> peerdb_flow.FlowConnectionConfigs.TableNameSchemaMappingEntry + 64, // 7: peerdb_flow.FlowConnectionConfigs.metadata_peer:type_name -> peerdb_peers.Peer 0, // 8: peerdb_flow.FlowConnectionConfigs.snapshot_sync_mode:type_name -> peerdb_flow.QRepSyncMode 0, // 9: peerdb_flow.FlowConnectionConfigs.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode 28, // 10: peerdb_flow.RenameTableOption.table_schema:type_name -> peerdb_flow.TableSchema - 63, // 11: peerdb_flow.RenameTablesInput.peer:type_name -> peerdb_peers.Peer + 64, // 11: peerdb_flow.RenameTablesInput.peer:type_name -> peerdb_peers.Peer 7, // 12: peerdb_flow.RenameTablesInput.rename_table_options:type_name -> peerdb_flow.RenameTableOption - 63, // 13: peerdb_flow.CreateTablesFromExistingInput.peer:type_name -> peerdb_peers.Peer - 53, // 14: peerdb_flow.CreateTablesFromExistingInput.new_to_existing_table_mapping:type_name -> peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry - 54, // 15: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry - 64, // 16: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp + 64, // 13: peerdb_flow.CreateTablesFromExistingInput.peer:type_name -> peerdb_peers.Peer + 54, // 14: peerdb_flow.CreateTablesFromExistingInput.new_to_existing_table_mapping:type_name -> peerdb_flow.CreateTablesFromExistingInput.NewToExistingTableMappingEntry + 55, // 15: peerdb_flow.SyncFlowOptions.relation_message_mapping:type_name -> peerdb_flow.SyncFlowOptions.RelationMessageMappingEntry + 65, // 16: peerdb_flow.LastSyncState.last_synced_at:type_name -> google.protobuf.Timestamp 14, // 17: peerdb_flow.StartFlowInput.last_sync_state:type_name -> peerdb_flow.LastSyncState 6, // 18: peerdb_flow.StartFlowInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs 12, // 19: peerdb_flow.StartFlowInput.sync_flow_options:type_name -> peerdb_flow.SyncFlowOptions - 55, // 20: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry + 56, // 20: peerdb_flow.StartFlowInput.relation_message_mapping:type_name -> peerdb_flow.StartFlowInput.RelationMessageMappingEntry 6, // 21: peerdb_flow.StartNormalizeInput.flow_connection_configs:type_name -> peerdb_flow.FlowConnectionConfigs - 63, // 22: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer - 63, // 23: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer - 63, // 24: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 64, // 22: peerdb_flow.GetLastSyncedIDInput.peer_connection_config:type_name -> peerdb_peers.Peer + 64, // 23: peerdb_flow.EnsurePullabilityInput.peer_connection_config:type_name -> peerdb_peers.Peer + 64, // 24: peerdb_flow.EnsurePullabilityBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer 20, // 25: peerdb_flow.TableIdentifier.postgres_table_identifier:type_name -> peerdb_flow.PostgresTableIdentifier 21, // 26: peerdb_flow.EnsurePullabilityOutput.table_identifier:type_name -> peerdb_flow.TableIdentifier - 56, // 27: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry - 63, // 28: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer - 57, // 29: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry - 63, // 30: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer - 63, // 31: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer - 58, // 32: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry + 57, // 27: peerdb_flow.EnsurePullabilityBatchOutput.table_identifier_mapping:type_name -> peerdb_flow.EnsurePullabilityBatchOutput.TableIdentifierMappingEntry + 64, // 28: peerdb_flow.SetupReplicationInput.peer_connection_config:type_name -> peerdb_peers.Peer + 58, // 29: peerdb_flow.SetupReplicationInput.table_name_mapping:type_name -> peerdb_flow.SetupReplicationInput.TableNameMappingEntry + 64, // 30: peerdb_flow.SetupReplicationInput.destination_peer:type_name -> peerdb_peers.Peer + 64, // 31: peerdb_flow.CreateRawTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 59, // 32: peerdb_flow.CreateRawTableInput.table_name_mapping:type_name -> peerdb_flow.CreateRawTableInput.TableNameMappingEntry 0, // 33: peerdb_flow.CreateRawTableInput.cdc_sync_mode:type_name -> peerdb_flow.QRepSyncMode - 59, // 34: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry - 63, // 35: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 60, // 36: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry - 63, // 37: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer + 60, // 34: peerdb_flow.TableSchema.columns:type_name -> peerdb_flow.TableSchema.ColumnsEntry + 64, // 35: peerdb_flow.GetTableSchemaBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 61, // 36: peerdb_flow.GetTableSchemaBatchOutput.table_name_schema_mapping:type_name -> peerdb_flow.GetTableSchemaBatchOutput.TableNameSchemaMappingEntry + 64, // 37: peerdb_flow.SetupNormalizedTableInput.peer_connection_config:type_name -> peerdb_peers.Peer 28, // 38: peerdb_flow.SetupNormalizedTableInput.source_table_schema:type_name -> peerdb_flow.TableSchema - 63, // 39: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer - 61, // 40: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry - 62, // 41: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry - 64, // 42: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp - 64, // 43: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp + 64, // 39: peerdb_flow.SetupNormalizedTableBatchInput.peer_connection_config:type_name -> peerdb_peers.Peer + 62, // 40: peerdb_flow.SetupNormalizedTableBatchInput.table_name_schema_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchInput.TableNameSchemaMappingEntry + 63, // 41: peerdb_flow.SetupNormalizedTableBatchOutput.table_exists_mapping:type_name -> peerdb_flow.SetupNormalizedTableBatchOutput.TableExistsMappingEntry + 65, // 42: peerdb_flow.TimestampPartitionRange.start:type_name -> google.protobuf.Timestamp + 65, // 43: peerdb_flow.TimestampPartitionRange.end:type_name -> google.protobuf.Timestamp 37, // 44: peerdb_flow.TIDPartitionRange.start:type_name -> peerdb_flow.TID 37, // 45: peerdb_flow.TIDPartitionRange.end:type_name -> peerdb_flow.TID 35, // 46: peerdb_flow.PartitionRange.int_range:type_name -> peerdb_flow.IntPartitionRange 36, // 47: peerdb_flow.PartitionRange.timestamp_range:type_name -> peerdb_flow.TimestampPartitionRange 38, // 48: peerdb_flow.PartitionRange.tid_range:type_name -> peerdb_flow.TIDPartitionRange 1, // 49: peerdb_flow.QRepWriteMode.write_type:type_name -> peerdb_flow.QRepWriteType - 63, // 50: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer - 63, // 51: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer + 64, // 50: peerdb_flow.QRepConfig.source_peer:type_name -> peerdb_peers.Peer + 64, // 51: peerdb_flow.QRepConfig.destination_peer:type_name -> peerdb_peers.Peer 0, // 52: peerdb_flow.QRepConfig.sync_mode:type_name -> peerdb_flow.QRepSyncMode 40, // 53: peerdb_flow.QRepConfig.write_mode:type_name -> peerdb_flow.QRepWriteMode 39, // 54: peerdb_flow.QRepPartition.range:type_name -> peerdb_flow.PartitionRange @@ -4830,6 +4893,18 @@ func file_flow_proto_init() { return nil } } + file_flow_proto_msgTypes[49].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetOpenConnectionsForUserResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_flow_proto_msgTypes[6].OneofWrappers = []interface{}{} file_flow_proto_msgTypes[19].OneofWrappers = []interface{}{ @@ -4846,7 +4921,7 @@ func file_flow_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_flow_proto_rawDesc, NumEnums: 2, - NumMessages: 61, + NumMessages: 62, NumExtensions: 0, NumServices: 0, }, diff --git a/flow/go.mod b/flow/go.mod index 356eedb562..dd59212b71 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -24,6 +24,7 @@ require ( github.com/linkedin/goavro/v2 v2.12.0 github.com/microsoft/go-mssqldb v1.6.0 github.com/orcaman/concurrent-map/v2 v2.0.1 + github.com/slack-go/slack v0.12.3 github.com/snowflakedb/gosnowflake v1.7.1 github.com/stretchr/testify v1.8.4 github.com/twpayne/go-geos v0.14.0 @@ -53,6 +54,7 @@ require ( github.com/getsentry/sentry-go v0.25.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index 98802c7e49..e9a2d2b5eb 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -171,6 +171,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= @@ -221,6 +223,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= @@ -235,6 +238,9 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grafana/pyroscope-go v1.0.4 h1:oyQX0BOkL+iARXzHuCdIF5TQ7/sRSel1YFViMHC7Bm0= github.com/grafana/pyroscope-go v1.0.4/go.mod h1:0d7ftwSMBV/Awm7CCiYmHQEG8Y44Ma3YSjt+nWcWztY= github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo= @@ -351,6 +357,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/slack-go/slack v0.12.3 h1:92/dfFU8Q5XP6Wp5rr5/T5JHLM5c5Smtn53fhToAP88= +github.com/slack-go/slack v0.12.3/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/snowflakedb/gosnowflake v1.7.1 h1:c9JjyjjDlvxex9ud71TwKL+Wu54Vfx+39h4DAwbIdqU= github.com/snowflakedb/gosnowflake v1.7.1/go.mod h1:JI3eRZL8CpimPek6CJO0aTbDQjDGOt7Rxv9A/ti4f5c= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index cf0e51c21a..970be3455d 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -64,3 +64,19 @@ func GetPeerDBCatalogPassword() string { func GetPeerDBCatalogDatabase() string { return getEnvString("PEERDB_CATALOG_DATABASE", "") } + +// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely +func GetPeerDBSlotLagMBAlertThreshold() uint32 { + return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) +} + +// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely +func GetPeerDBAlertingGapMinutesAsDuration() time.Duration { + why := time.Duration(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15)) + return why * time.Minute +} + +// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely +func GetPeerDBOpenConnectionsAlertThreshold() uint32 { + return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) +} diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go new file mode 100644 index 0000000000..7dc3cb489f --- /dev/null +++ b/flow/shared/alerting/alerting.go @@ -0,0 +1,101 @@ +package alerting + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +// alerting service, no cool name :( +type Alerter struct { + catalogPool *pgxpool.Pool + logger *slog.Logger +} + +func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { + rows, err := catalogPool.Query(context.Background(), + "SELECT service_type,service_config FROM peerdb_stats.alerting_config") + if err != nil { + return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) + } + + var slackAlertSenders []*slackAlertSender + var serviceType, serviceConfig string + _, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error { + switch serviceType { + case "slack": + var slackServiceConfig slackAlertConfig + err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig) + if err != nil { + return fmt.Errorf("failed to unmarshal Slack service config: %w", err) + } + + slackAlertSenders = append(slackAlertSenders, newSlackAlertSender(&slackServiceConfig)) + default: + return fmt.Errorf("unknown service type: %s", serviceType) + } + return nil + }) + + return slackAlertSenders, nil +} + +// doesn't take care of closing pool, needs to be done externally. +func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { + return &Alerter{ + catalogPool: catalogPool, + logger: slog.Default(), + } +} + +// Only raises an alert if another alert with the same key hasn't been raised +// in the past X minutes, where X is configurable and defaults to 15 minutes +func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) { + if peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() == 0 { + a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning") + return + } + + if a.catalogPool != nil { + slackAlertSenders, err := registerSendersFromPool(a.catalogPool) + if err != nil { + a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err)) + return + } + if len(slackAlertSenders) == 0 { + a.logger.WarnContext(ctx, "no Slack senders configured, returning") + return + } + + row := a.catalogPool.QueryRow(context.Background(), + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + ORDER BY created_timestamp DESC LIMIT 1`, + alertKey) + var createdTimestamp time.Time + err = row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { + a.logger.Warn("failed to send alert: %v", err) + return + } + + if time.Since(createdTimestamp) >= peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() { + for _, slackAlertSender := range slackAlertSenders { + err = slackAlertSender.sendAlert(context.Background(), + fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) + if err != nil { + a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err)) + return + } + _, _ = a.catalogPool.Exec(context.Background(), + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + } + } + } +} diff --git a/flow/shared/alerting/slack_alert_sender.go b/flow/shared/alerting/slack_alert_sender.go new file mode 100644 index 0000000000..04c9a4ad86 --- /dev/null +++ b/flow/shared/alerting/slack_alert_sender.go @@ -0,0 +1,38 @@ +package alerting + +import ( + "context" + "fmt" + + "github.com/slack-go/slack" +) + +type slackAlertSender struct { + client *slack.Client + channelIDs []string +} + +type slackAlertConfig struct { + AuthToken string `json:"auth_token"` + ChannelIDs []string `json:"channel_ids"` +} + +func newSlackAlertSender(config *slackAlertConfig) *slackAlertSender { + return &slackAlertSender{ + client: slack.New(config.AuthToken), + channelIDs: config.ChannelIDs, + } +} + +func (s *slackAlertSender) sendAlert(ctx context.Context, alertTitle string, alertMessage string) error { + for _, channelID := range s.channelIDs { + _, _, _, err := s.client.SendMessageContext(ctx, channelID, slack.MsgOptionBlocks( + slack.NewHeaderBlock(slack.NewTextBlockObject("plain_text", alertTitle, true, false)), + slack.NewSectionBlock(slack.NewTextBlockObject("mrkdwn", alertMessage, false, false), nil, nil), + )) + if err != nil { + return fmt.Errorf("failed to send message to Slack channel %s: %w", channelID, err) + } + } + return nil +} diff --git a/nexus/catalog/migrations/V16__alerting_config_init.sql b/nexus/catalog/migrations/V16__alerting_config_init.sql new file mode 100644 index 0000000000..de16cc340a --- /dev/null +++ b/nexus/catalog/migrations/V16__alerting_config_init.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS peerdb_stats.alerting_config ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + service_type TEXT NOT NULL CHECK (service_type IN ('slack')), + service_config JSONB NOT NULL +); + +CREATE TABLE IF NOT EXISTS peerdb_stats.alerts_v1 ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + alert_key TEXT NOT NULL, + alert_level TEXT NOT NULL CHECK (alert_level IN ('critical')) DEFAULT 'critical', + alert_message TEXT NOT NULL, + created_timestamp TIMESTAMP DEFAULT now() +); \ No newline at end of file diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 4163aa0363..a58fcbb95d 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -555,6 +555,14 @@ pub struct PeerDbColumns { #[prost(bool, tag="3")] pub soft_delete: bool, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetOpenConnectionsForUserResult { + #[prost(string, tag="1")] + pub user_name: ::prost::alloc::string::String, + #[prost(int64, tag="2")] + pub current_open_connections: i64, +} /// protos for qrep #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index 3374e09b99..0e079f82fa 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -1806,6 +1806,122 @@ impl<'de> serde::Deserialize<'de> for GetLastSyncedIdInput { deserializer.deserialize_struct("peerdb_flow.GetLastSyncedIDInput", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for GetOpenConnectionsForUserResult { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.user_name.is_empty() { + len += 1; + } + if self.current_open_connections != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("peerdb_flow.GetOpenConnectionsForUserResult", len)?; + if !self.user_name.is_empty() { + struct_ser.serialize_field("userName", &self.user_name)?; + } + if self.current_open_connections != 0 { + struct_ser.serialize_field("currentOpenConnections", ToString::to_string(&self.current_open_connections).as_str())?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for GetOpenConnectionsForUserResult { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "user_name", + "userName", + "current_open_connections", + "currentOpenConnections", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + UserName, + CurrentOpenConnections, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "userName" | "user_name" => Ok(GeneratedField::UserName), + "currentOpenConnections" | "current_open_connections" => Ok(GeneratedField::CurrentOpenConnections), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GetOpenConnectionsForUserResult; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct peerdb_flow.GetOpenConnectionsForUserResult") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut user_name__ = None; + let mut current_open_connections__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::UserName => { + if user_name__.is_some() { + return Err(serde::de::Error::duplicate_field("userName")); + } + user_name__ = Some(map.next_value()?); + } + GeneratedField::CurrentOpenConnections => { + if current_open_connections__.is_some() { + return Err(serde::de::Error::duplicate_field("currentOpenConnections")); + } + current_open_connections__ = + Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::__SkipField__ => { + let _ = map.next_value::()?; + } + } + } + Ok(GetOpenConnectionsForUserResult { + user_name: user_name__.unwrap_or_default(), + current_open_connections: current_open_connections__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("peerdb_flow.GetOpenConnectionsForUserResult", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for GetTableSchemaBatchInput { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/protos/flow.proto b/protos/flow.proto index 965d8d0fce..430968bcba 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -372,4 +372,9 @@ message PeerDBColumns { string soft_delete_col_name = 1; string synced_at_col_name = 2; bool soft_delete = 3; +} + +message GetOpenConnectionsForUserResult { + string user_name = 1; + int64 current_open_connections = 2; } \ No newline at end of file diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 1ba651b45f..ba881795ed 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -482,6 +482,11 @@ export interface PeerDBColumns { softDelete: boolean; } +export interface GetOpenConnectionsForUserResult { + userName: string; + currentOpenConnections: number; +} + function createBaseTableNameMapping(): TableNameMapping { return { sourceTableName: "", destinationTableName: "" }; } @@ -6386,6 +6391,82 @@ export const PeerDBColumns = { }, }; +function createBaseGetOpenConnectionsForUserResult(): GetOpenConnectionsForUserResult { + return { userName: "", currentOpenConnections: 0 }; +} + +export const GetOpenConnectionsForUserResult = { + encode(message: GetOpenConnectionsForUserResult, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.userName !== "") { + writer.uint32(10).string(message.userName); + } + if (message.currentOpenConnections !== 0) { + writer.uint32(16).int64(message.currentOpenConnections); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): GetOpenConnectionsForUserResult { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseGetOpenConnectionsForUserResult(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.userName = reader.string(); + continue; + case 2: + if (tag !== 16) { + break; + } + + message.currentOpenConnections = longToNumber(reader.int64() as Long); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): GetOpenConnectionsForUserResult { + return { + userName: isSet(object.userName) ? String(object.userName) : "", + currentOpenConnections: isSet(object.currentOpenConnections) ? Number(object.currentOpenConnections) : 0, + }; + }, + + toJSON(message: GetOpenConnectionsForUserResult): unknown { + const obj: any = {}; + if (message.userName !== "") { + obj.userName = message.userName; + } + if (message.currentOpenConnections !== 0) { + obj.currentOpenConnections = Math.round(message.currentOpenConnections); + } + return obj; + }, + + create, I>>(base?: I): GetOpenConnectionsForUserResult { + return GetOpenConnectionsForUserResult.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): GetOpenConnectionsForUserResult { + const message = createBaseGetOpenConnectionsForUserResult(); + message.userName = object.userName ?? ""; + message.currentOpenConnections = object.currentOpenConnections ?? 0; + return message; + }, +}; + declare const self: any | undefined; declare const window: any | undefined; declare const global: any | undefined;