diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 504c6d354e..6efa58b4d8 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -133,7 +133,7 @@ services: dockerfile: stacks/flow.Dockerfile target: flow-snapshot-worker environment: - <<: [*flow-worker-env] + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/docker-compose.yml b/docker-compose.yml index a7778d77d2..31d77a9aa0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -112,7 +112,7 @@ services: container_name: flow-snapshot-worker image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev environment: - <<: [*flow-worker-env] + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6c7fa6c9e8..3b830a2443 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -42,9 +42,6 @@ type CDCPullConnector interface { // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error - - // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. - SendWALHeartbeat() error } type CDCSyncConnector interface { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 469775e49e..3077b2462e 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/utils/evervigil" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -29,6 +30,7 @@ type PostgresConnector struct { tableSchemaMapping map[string]*protos.TableSchema customTypesMapping map[uint32]string metadataSchema string + vigil *evervigil.EverVigil } // NewPostgresConnector creates a new instance of PostgresConnector. @@ -79,6 +81,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) metadataSchema = *pgConfig.MetadataSchema } + vigil, err := evervigil.NewVigil() + if err != nil { + return nil, fmt.Errorf("failed to initialize vigil: %w", err) + } + return &PostgresConnector{ connStr: connectionString, ctx: ctx, @@ -87,6 +94,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) replPool: replPool, customTypesMapping: customTypeMap, metadataSchema: metadataSchema, + vigil: vigil, }, nil } @@ -100,6 +108,10 @@ func (c *PostgresConnector) Close() error { c.replPool.Close() } + if c.vigil != nil { + c.vigil.Close() + } + return nil } @@ -883,19 +895,3 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { } return nil } - -func (c *PostgresConnector) SendWALHeartbeat() error { - command := ` - BEGIN; - DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` - _, err := c.pool.Exec(c.ctx, command) - if err != nil { - return fmt.Errorf("error bumping wal position: %w", err) - } - - return nil -} diff --git a/flow/go.mod b/flow/go.mod index c606d7573d..a2190a6668 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -51,15 +51,18 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/getsentry/sentry-go v0.18.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect + github.com/nikoksr/notify v0.41.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.12.0 // indirect github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect + github.com/slack-go/slack v0.12.2 // indirect ) require ( diff --git a/flow/go.sum b/flow/go.sum index 1186bc4a22..93e0fd2827 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -205,6 +205,7 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= @@ -273,6 +274,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -300,6 +302,9 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/pyroscope-go v1.0.4 h1:oyQX0BOkL+iARXzHuCdIF5TQ7/sRSel1YFViMHC7Bm0= github.com/grafana/pyroscope-go v1.0.4/go.mod h1:0d7ftwSMBV/Awm7CCiYmHQEG8Y44Ma3YSjt+nWcWztY= github.com/grafana/pyroscope-go/godeltaprof v0.1.5 h1:gkFVqihFRL1Nro2FCC0u6mW47jclef96Zu8I/ykq+4E= @@ -402,6 +407,8 @@ github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ib github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nikoksr/notify v0.41.0 h1:4LGE41GpWdHX5M3Xo6DlWRwS2WLDbOq1Rk7IzY4vjmQ= +github.com/nikoksr/notify v0.41.0/go.mod h1:FoE0UVPeopz1Vy5nm9vQZ+JVmYjEIjQgbFstbkw+cRE= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= @@ -457,6 +464,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/slack-go/slack v0.12.2 h1:x3OppyMyGIbbiyFhsBmpf9pwkUzMhthJMRNmNlA4LaQ= +github.com/slack-go/slack v0.12.2/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/snowflakedb/gosnowflake v1.6.25 h1:o5zUmxTOo0Eo9AdkEj8blCeiMuILrQJ+rjUMAeZhcRE= github.com/snowflakedb/gosnowflake v1.6.25/go.mod h1:KfO4F7bk+aXPUIvBqYxvPhxLlu2/w4TtSC8Rw/yr5Mg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/flow/utils/evervigil/ever_vigil.go b/flow/utils/evervigil/ever_vigil.go new file mode 100644 index 0000000000..781f31cdbd --- /dev/null +++ b/flow/utils/evervigil/ever_vigil.go @@ -0,0 +1,99 @@ +package evervigil + +import ( + "context" + "encoding/json" + "fmt" + "time" + + catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/nikoksr/notify" + "github.com/nikoksr/notify/service/slack" + "github.com/sirupsen/logrus" +) + +// alerting service, cool name +type EverVigil struct { + notifier *notify.Notify + catalogPool *pgxpool.Pool +} + +type slackServiceConfig struct { + AuthToken string `json:"auth_token"` + ChannelIDs []string `json:"channel_ids"` +} + +func NewVigil() (*EverVigil, error) { + catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv() + if catalogErr != nil { + return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr) + } + + notifier := notify.New() + + rows, err := catalogPool.Query(context.Background(), + "SELECT service_type,service_config FROM peerdb_stats.alerting_config") + if err != nil { + return nil, fmt.Errorf("failed to read everVigil config from catalog: %w", err) + } + + var serviceType, serviceConfig string + _, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error { + switch serviceType { + case "slack": + var slackServiceConfig slackServiceConfig + err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig) + if err != nil { + return fmt.Errorf("failed to unmarshal Slack service config: %w", err) + } + + slackService := slack.New(slackServiceConfig.AuthToken) + slackService.AddReceivers(slackServiceConfig.ChannelIDs...) + notifier.UseServices(slackService) + default: + return fmt.Errorf("unknown service type: %s", serviceType) + } + return nil + }) + + return &EverVigil{ + notifier: notifier, + catalogPool: catalogPool, + }, nil +} + +func (ev *EverVigil) Close() { + if ev.catalogPool != nil { + ev.catalogPool.Close() + } +} + +// Only raises an alert if another alert with the same key hasn't been raised +// in the past 15 minutes +func (ev *EverVigil) AlertIf(alertKey string, alertMessage string) { + row := ev.catalogPool.QueryRow(context.Background(), + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + ORDER BY created_timestamp DESC LIMIT 1`, + alertKey) + var createdTimestamp time.Time + err := row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { + logrus.Warnf("failed to send alert: %v", err) + return + } + + if time.Since(createdTimestamp) >= 15*time.Minute { + err = ev.notifier.Send(context.Background(), + fmt.Sprintf(":rotating_light: *Alert Alert* :rotating_light:: %s since %s", alertKey, + time.Now().Format("2006-01-02 15:04:05.999999")), alertMessage) + if err != nil { + logrus.Warnf("failed to send alert: %v", err) + return + } + _, _ = ev.catalogPool.Exec(context.Background(), + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + } +} diff --git a/nexus/catalog/migrations/V12__alerting_config_init.sql b/nexus/catalog/migrations/V12__alerting_config_init.sql new file mode 100644 index 0000000000..de16cc340a --- /dev/null +++ b/nexus/catalog/migrations/V12__alerting_config_init.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS peerdb_stats.alerting_config ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + service_type TEXT NOT NULL CHECK (service_type IN ('slack')), + service_config JSONB NOT NULL +); + +CREATE TABLE IF NOT EXISTS peerdb_stats.alerts_v1 ( + id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + alert_key TEXT NOT NULL, + alert_level TEXT NOT NULL CHECK (alert_level IN ('critical')) DEFAULT 'critical', + alert_message TEXT NOT NULL, + created_timestamp TIMESTAMP DEFAULT now() +); \ No newline at end of file