From d2ec972413a56a84f81b12b49313ce29af71ca2d Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 8 Dec 2023 00:40:08 +0530 Subject: [PATCH 1/5] basic alerting, for internal use primarily --- docker-compose-dev.yml | 2 +- docker-compose.yml | 2 +- flow/connectors/core.go | 3 - flow/connectors/postgres/postgres.go | 28 +++--- flow/go.mod | 5 +- flow/go.sum | 12 +++ flow/utils/evervigil/ever_vigil.go | 99 +++++++++++++++++++ .../migrations/V12__alerting_config_init.sql | 13 +++ 8 files changed, 142 insertions(+), 22 deletions(-) create mode 100644 flow/utils/evervigil/ever_vigil.go create mode 100644 nexus/catalog/migrations/V12__alerting_config_init.sql diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index f5d8942ba1..158483a7eb 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -133,7 +133,7 @@ services: dockerfile: stacks/flow.Dockerfile target: flow-snapshot-worker environment: - <<: [*flow-worker-env] + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/docker-compose.yml b/docker-compose.yml index 5645f2b1e4..add4bb2db1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -112,7 +112,7 @@ services: container_name: flow-snapshot-worker image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev environment: - <<: [*flow-worker-env] + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 70fc3d6bb7..4e2a801b43 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -43,9 +43,6 @@ type CDCPullConnector interface { // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error - // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. - SendWALHeartbeat() error - // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 2bf7fb7d04..95cef31b26 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/utils/evervigil" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -30,6 +31,7 @@ type PostgresConnector struct { tableSchemaMapping map[string]*protos.TableSchema customTypesMapping map[uint32]string metadataSchema string + vigil *evervigil.EverVigil } // NewPostgresConnector creates a new instance of PostgresConnector. @@ -83,6 +85,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) metadataSchema = *pgConfig.MetadataSchema } + vigil, err := evervigil.NewVigil() + if err != nil { + return nil, fmt.Errorf("failed to initialize vigil: %w", err) + } + return &PostgresConnector{ connStr: connectionString, ctx: ctx, @@ -91,6 +98,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) replPool: replPool, customTypesMapping: customTypeMap, metadataSchema: metadataSchema, + vigil: vigil, }, nil } @@ -109,6 +117,10 @@ func (c *PostgresConnector) Close() error { c.replPool.Close() } + if c.vigil != nil { + c.vigil.Close() + } + return nil } @@ -892,19 +904,3 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { } return nil } - -func (c *PostgresConnector) SendWALHeartbeat() error { - command := ` - BEGIN; - DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` - _, err := c.pool.Exec(c.ctx, command) - if err != nil { - return fmt.Errorf("error bumping wal position: %w", err) - } - - return nil -} diff --git a/flow/go.mod b/flow/go.mod index 7237ef44d0..95de3a1de6 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -24,6 +24,7 @@ require ( github.com/lib/pq v1.10.9 github.com/linkedin/goavro/v2 v2.12.0 github.com/microsoft/go-mssqldb v1.6.0 + github.com/nikoksr/notify v0.41.0 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/sirupsen/logrus v1.9.3 github.com/snowflakedb/gosnowflake v1.7.1 @@ -54,6 +55,7 @@ require ( github.com/getsentry/sentry-go v0.25.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect @@ -63,6 +65,7 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/slack-go/slack v0.12.2 // indirect github.com/ysmood/gop v0.2.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect go.opentelemetry.io/otel/metric v1.21.0 // indirect @@ -146,7 +149,7 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/crypto v0.16.0 // indirect + golang.org/x/crypto v0.16.0 golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.19.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index d10a1f1081..f86dd0a340 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -165,6 +165,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= @@ -215,6 +217,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= @@ -229,6 +232,9 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/pyroscope-go v1.0.4 h1:oyQX0BOkL+iARXzHuCdIF5TQ7/sRSel1YFViMHC7Bm0= github.com/grafana/pyroscope-go v1.0.4/go.mod h1:0d7ftwSMBV/Awm7CCiYmHQEG8Y44Ma3YSjt+nWcWztY= github.com/grafana/pyroscope-go/godeltaprof v0.1.5 h1:gkFVqihFRL1Nro2FCC0u6mW47jclef96Zu8I/ykq+4E= @@ -271,6 +277,8 @@ github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible h1:jdpOPRN1zP63Td1hDQbZW73xKmzDvZHzVdNYxhnTMDA= +github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible/go.mod h1:1c7szIrayyPPB/987hsnvNzLushdWf4o/79s3P08L8A= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= @@ -309,6 +317,8 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8Ie github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nikoksr/notify v0.41.0 h1:4LGE41GpWdHX5M3Xo6DlWRwS2WLDbOq1Rk7IzY4vjmQ= +github.com/nikoksr/notify v0.41.0/go.mod h1:FoE0UVPeopz1Vy5nm9vQZ+JVmYjEIjQgbFstbkw+cRE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= @@ -349,6 +359,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/slack-go/slack v0.12.2 h1:x3OppyMyGIbbiyFhsBmpf9pwkUzMhthJMRNmNlA4LaQ= +github.com/slack-go/slack v0.12.2/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/snowflakedb/gosnowflake v1.7.1 h1:c9JjyjjDlvxex9ud71TwKL+Wu54Vfx+39h4DAwbIdqU= github.com/snowflakedb/gosnowflake v1.7.1/go.mod h1:JI3eRZL8CpimPek6CJO0aTbDQjDGOt7Rxv9A/ti4f5c= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/flow/utils/evervigil/ever_vigil.go b/flow/utils/evervigil/ever_vigil.go new file mode 100644 index 0000000000..781f31cdbd --- /dev/null +++ b/flow/utils/evervigil/ever_vigil.go @@ -0,0 +1,99 @@ +package evervigil + +import ( + "context" + "encoding/json" + "fmt" + "time" + + catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/nikoksr/notify" + "github.com/nikoksr/notify/service/slack" + "github.com/sirupsen/logrus" +) + +// alerting service, cool name +type EverVigil struct { + notifier *notify.Notify + catalogPool *pgxpool.Pool +} + +type slackServiceConfig struct { + AuthToken string `json:"auth_token"` + ChannelIDs []string `json:"channel_ids"` +} + +func NewVigil() (*EverVigil, error) { + catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv() + if catalogErr != nil { + return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr) + } + + notifier := notify.New() + + rows, err := catalogPool.Query(context.Background(), + "SELECT service_type,service_config FROM peerdb_stats.alerting_config") + if err != nil { + return nil, fmt.Errorf("failed to read everVigil config from catalog: %w", err) + } + + var serviceType, serviceConfig string + _, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error { + switch serviceType { + case "slack": + var slackServiceConfig slackServiceConfig + err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig) + if err != nil { + return fmt.Errorf("failed to unmarshal Slack service config: %w", err) + } + + slackService := slack.New(slackServiceConfig.AuthToken) + slackService.AddReceivers(slackServiceConfig.ChannelIDs...) + notifier.UseServices(slackService) + default: + return fmt.Errorf("unknown service type: %s", serviceType) + } + return nil + }) + + return &EverVigil{ + notifier: notifier, + catalogPool: catalogPool, + }, nil +} + +func (ev *EverVigil) Close() { + if ev.catalogPool != nil { + ev.catalogPool.Close() + } +} + +// Only raises an alert if another alert with the same key hasn't been raised +// in the past 15 minutes +func (ev *EverVigil) AlertIf(alertKey string, alertMessage string) { + row := ev.catalogPool.QueryRow(context.Background(), + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + ORDER BY created_timestamp DESC LIMIT 1`, + alertKey) + var createdTimestamp time.Time + err := row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { + logrus.Warnf("failed to send alert: %v", err) + return + } + + if time.Since(createdTimestamp) >= 15*time.Minute { + err = ev.notifier.Send(context.Background(), + fmt.Sprintf(":rotating_light: *Alert Alert* :rotating_light:: %s since %s", alertKey, + time.Now().Format("2006-01-02 15:04:05.999999")), alertMessage) + if err != nil { + logrus.Warnf("failed to send alert: %v", err) + return + } + _, _ = ev.catalogPool.Exec(context.Background(), + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + } +} diff --git a/nexus/catalog/migrations/V12__alerting_config_init.sql b/nexus/catalog/migrations/V12__alerting_config_init.sql new file mode 100644 index 0000000000..de16cc340a --- /dev/null +++ b/nexus/catalog/migrations/V12__alerting_config_init.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS peerdb_stats.alerting_config ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + service_type TEXT NOT NULL CHECK (service_type IN ('slack')), + service_config JSONB NOT NULL +); + +CREATE TABLE IF NOT EXISTS peerdb_stats.alerts_v1 ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + alert_key TEXT NOT NULL, + alert_level TEXT NOT NULL CHECK (alert_level IN ('critical')) DEFAULT 'critical', + alert_message TEXT NOT NULL, + created_timestamp TIMESTAMP DEFAULT now() +); \ No newline at end of file From 90ace56e201a3fc870171ec9ac3da299b5766a17 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 8 Dec 2023 03:22:54 +0530 Subject: [PATCH 2/5] activity alerting prototype, not done for all activities yet --- flow/activities/flowable.go | 24 ++++++++++ flow/cmd/worker.go | 11 ++++- flow/connectors/postgres/postgres.go | 12 ----- flow/utils/evervigil/ever_vigil.go | 46 +++++++++---------- ...init.sql => V13__alerting_config_init.sql} | 0 5 files changed, 55 insertions(+), 38 deletions(-) rename nexus/catalog/migrations/{V12__alerting_config_init.sql => V13__alerting_config_init.sql} (100%) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index a8cc819214..1537157945 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,6 +5,8 @@ import ( "database/sql" "errors" "fmt" + "runtime" + "strings" "sync" "time" @@ -17,6 +19,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/utils/evervigil" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" @@ -41,6 +44,25 @@ type SlotSnapshotSignal struct { type FlowableActivity struct { CatalogPool *pgxpool.Pool + Vigil *evervigil.EverVigil +} + +func currentFunction() string { + counter, _, _, ok := runtime.Caller(2) + + if !ok { + return "unknown" + } + + splitStr := strings.Split(runtime.FuncForPC(counter).Name(), ".") + return splitStr[len(splitStr)-1] +} + +func (a *FlowableActivity) vigilForActivityFailures(flowJobName string, err error) { + if err != nil { + a.Vigil.AlertIf(fmt.Sprintf("%s-%s-failed", flowJobName, currentFunction()), + fmt.Sprintf("```%s```", err.Error())) + } } // CheckConnection implements CheckConnection. @@ -97,12 +119,14 @@ func (a *FlowableActivity) EnsurePullability( ) (*protos.EnsurePullabilityBatchOutput, error) { srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { + a.vigilForActivityFailures(config.FlowJobName, err) return nil, fmt.Errorf("failed to get connector: %w", err) } defer connectors.CloseConnector(srcConn) output, err := srcConn.EnsurePullability(config) if err != nil { + a.vigilForActivityFailures(config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index a9e1281619..12269bc021 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -11,6 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/utils/evervigil" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/grafana/pyroscope-go" @@ -103,10 +104,17 @@ func WorkerMain(opts *WorkerOptions) error { clientOptions.ConnectionOptions = connOptions } - conn, err := utils.GetCatalogConnectionPoolFromEnv() + catalogPool, err := utils.GetCatalogConnectionPoolFromEnv() if err != nil { return fmt.Errorf("unable to create catalog connection pool: %w", err) } + catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(catalogPool) + defer catalogMirrorMonitor.Close() + vigil, err := evervigil.NewVigil(catalogPool) + if err != nil { + return fmt.Errorf("unable to create Vigil: %w", err) + } + defer vigil.Close() c, err := client.Dial(clientOptions) if err != nil { @@ -132,6 +140,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, + Vigil: vigil, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 95cef31b26..bdf2bea8e5 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -12,7 +12,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/PeerDB-io/peer-flow/utils/evervigil" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -31,7 +30,6 @@ type PostgresConnector struct { tableSchemaMapping map[string]*protos.TableSchema customTypesMapping map[uint32]string metadataSchema string - vigil *evervigil.EverVigil } // NewPostgresConnector creates a new instance of PostgresConnector. @@ -85,11 +83,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) metadataSchema = *pgConfig.MetadataSchema } - vigil, err := evervigil.NewVigil() - if err != nil { - return nil, fmt.Errorf("failed to initialize vigil: %w", err) - } - return &PostgresConnector{ connStr: connectionString, ctx: ctx, @@ -98,7 +91,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) replPool: replPool, customTypesMapping: customTypeMap, metadataSchema: metadataSchema, - vigil: vigil, }, nil } @@ -117,10 +109,6 @@ func (c *PostgresConnector) Close() error { c.replPool.Close() } - if c.vigil != nil { - c.vigil.Close() - } - return nil } diff --git a/flow/utils/evervigil/ever_vigil.go b/flow/utils/evervigil/ever_vigil.go index 781f31cdbd..f9c0ecd659 100644 --- a/flow/utils/evervigil/ever_vigil.go +++ b/flow/utils/evervigil/ever_vigil.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/nikoksr/notify" @@ -25,12 +24,7 @@ type slackServiceConfig struct { ChannelIDs []string `json:"channel_ids"` } -func NewVigil() (*EverVigil, error) { - catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv() - if catalogErr != nil { - return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr) - } - +func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) { notifier := notify.New() rows, err := catalogPool.Query(context.Background(), @@ -73,27 +67,29 @@ func (ev *EverVigil) Close() { // Only raises an alert if another alert with the same key hasn't been raised // in the past 15 minutes func (ev *EverVigil) AlertIf(alertKey string, alertMessage string) { - row := ev.catalogPool.QueryRow(context.Background(), - `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + if ev.catalogPool != nil && ev.notifier != nil { + row := ev.catalogPool.QueryRow(context.Background(), + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 ORDER BY created_timestamp DESC LIMIT 1`, - alertKey) - var createdTimestamp time.Time - err := row.Scan(&createdTimestamp) - if err != nil && err != pgx.ErrNoRows { - logrus.Warnf("failed to send alert: %v", err) - return - } - - if time.Since(createdTimestamp) >= 15*time.Minute { - err = ev.notifier.Send(context.Background(), - fmt.Sprintf(":rotating_light: *Alert Alert* :rotating_light:: %s since %s", alertKey, - time.Now().Format("2006-01-02 15:04:05.999999")), alertMessage) - if err != nil { + alertKey) + var createdTimestamp time.Time + err := row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { logrus.Warnf("failed to send alert: %v", err) return } - _, _ = ev.catalogPool.Exec(context.Background(), - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", - alertKey, alertMessage) + + if time.Since(createdTimestamp) >= 15*time.Minute { + err = ev.notifier.Send(context.Background(), + fmt.Sprintf(":rotating_light: *Alert Alert* :rotating_light:: %s since %s", alertKey, + time.Now().Format("2006-01-02 15:04:05.999999")), alertMessage) + if err != nil { + logrus.Warnf("failed to send alert: %v", err) + return + } + _, _ = ev.catalogPool.Exec(context.Background(), + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + } } } diff --git a/nexus/catalog/migrations/V12__alerting_config_init.sql b/nexus/catalog/migrations/V13__alerting_config_init.sql similarity index 100% rename from nexus/catalog/migrations/V12__alerting_config_init.sql rename to nexus/catalog/migrations/V13__alerting_config_init.sql From 42a62b0de0da824516398d7446d34f79dd44c36c Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 9 Dec 2023 01:27:46 +0530 Subject: [PATCH 3/5] xremoved activity alerts for now, added slot size alert --- flow/activities/flowable.go | 2 -- flow/utils/evervigil/ever_vigil.go | 27 ++++++++++++++++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1537157945..e2db021f0d 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -119,14 +119,12 @@ func (a *FlowableActivity) EnsurePullability( ) (*protos.EnsurePullabilityBatchOutput, error) { srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { - a.vigilForActivityFailures(config.FlowJobName, err) return nil, fmt.Errorf("failed to get connector: %w", err) } defer connectors.CloseConnector(srcConn) output, err := srcConn.EnsurePullability(config) if err != nil { - a.vigilForActivityFailures(config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } diff --git a/flow/utils/evervigil/ever_vigil.go b/flow/utils/evervigil/ever_vigil.go index f9c0ecd659..0fae2c90ce 100644 --- a/flow/utils/evervigil/ever_vigil.go +++ b/flow/utils/evervigil/ever_vigil.go @@ -24,7 +24,7 @@ type slackServiceConfig struct { ChannelIDs []string `json:"channel_ids"` } -func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) { +func registerServicesForNotifier(catalogPool *pgxpool.Pool) (*notify.Notify, error) { notifier := notify.New() rows, err := catalogPool.Query(context.Background(), @@ -33,6 +33,7 @@ func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) { return nil, fmt.Errorf("failed to read everVigil config from catalog: %w", err) } + registeredAtleastOneService := false var serviceType, serviceConfig string _, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error { switch serviceType { @@ -46,12 +47,26 @@ func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) { slackService := slack.New(slackServiceConfig.AuthToken) slackService.AddReceivers(slackServiceConfig.ChannelIDs...) notifier.UseServices(slackService) + registeredAtleastOneService = true default: return fmt.Errorf("unknown service type: %s", serviceType) } return nil }) + // vigil is currently useless, marking it as such + if !registeredAtleastOneService { + notifier.Disabled = true + } + return notifier, nil +} + +func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) { + notifier, err := registerServicesForNotifier(catalogPool) + if err != nil { + return nil, err + } + return &EverVigil{ notifier: notifier, catalogPool: catalogPool, @@ -68,6 +83,16 @@ func (ev *EverVigil) Close() { // in the past 15 minutes func (ev *EverVigil) AlertIf(alertKey string, alertMessage string) { if ev.catalogPool != nil && ev.notifier != nil { + // try to make the vigil not useless if possible + if ev.notifier.Disabled { + var err error + ev.notifier, err = registerServicesForNotifier(ev.catalogPool) + if err != nil { + logrus.Warnf("failed to register services for vigil: %v", err) + return + } + } + row := ev.catalogPool.QueryRow(context.Background(), `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 ORDER BY created_timestamp DESC LIMIT 1`, From 2e3e6a01d592d79f0dbb761847c04d66a9b0094a Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 9 Dec 2023 02:27:27 +0530 Subject: [PATCH 4/5] slot size alert only on env set --- flow/activities/flowable.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e2db021f0d..d278a2e8cb 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,7 +5,9 @@ import ( "database/sql" "errors" "fmt" + "os" "runtime" + "strconv" "strings" "sync" "time" @@ -195,6 +197,21 @@ func (a *FlowableActivity) handleSlotInfo( return err } + slotLagInMBThresholdStr, ok := os.LookupEnv("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD") + if ok { + slotLagInMBThreshold, err := strconv.ParseInt(slotLagInMBThresholdStr, 10, 64) + if err != nil { + log.Warnf("failed to parse PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD as integer!") + return nil + } + + if int64(slotInfo[0].LagInMb) >= slotLagInMBThreshold { + a.Vigil.AlertIf(fmt.Sprintf("%s-slot-size-exceeded", peerName), + fmt.Sprintf("Slot %s on peer %s has exceeded threshold size of %dMB, currently at %fMB!", + slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) + } + } + if len(slotInfo) != 0 { return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) } @@ -207,6 +224,10 @@ func (a *FlowableActivity) recordSlotSizePeriodically( slotName string, peerName string, ) { + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } timeout := 10 * time.Minute ticker := time.NewTicker(timeout) From 95bb2097048de2f73849a6d0a42ba8a9bea95e75 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 12 Dec 2023 23:07:58 +0530 Subject: [PATCH 5/5] fixing up merge issues --- flow/cmd/worker.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 12269bc021..906a981b70 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -108,8 +108,6 @@ func WorkerMain(opts *WorkerOptions) error { if err != nil { return fmt.Errorf("unable to create catalog connection pool: %w", err) } - catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(catalogPool) - defer catalogMirrorMonitor.Close() vigil, err := evervigil.NewVigil(catalogPool) if err != nil { return fmt.Errorf("unable to create Vigil: %w", err) @@ -139,7 +137,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ - CatalogPool: conn, + CatalogPool: catalogPool, Vigil: vigil, })