Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add telemetry/alerts via sns #1411

Merged
merged 13 commits into from
Mar 4, 2024
3 changes: 3 additions & 0 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)

return nil
}
Expand All @@ -49,6 +50,8 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down
43 changes: 39 additions & 4 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
)

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
catalogPool *pgxpool.Pool
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
Expand Down Expand Up @@ -50,13 +52,25 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
if catalogPool == nil {
panic("catalog pool is nil for Alerter")
}

snsTopic := peerdbenv.PeerDBTelemetryAWSSNSTopicArn()
var snsMessageSender telemetry.Sender
if snsTopic != "" {
var err error
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{
Topic: snsTopic,
})
logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender")
if err != nil {
panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err))
}
}
return &Alerter{
catalogPool: catalogPool,
catalogPool: catalogPool,
telemetrySender: snsMessageSender,
}
}

Expand Down Expand Up @@ -193,6 +207,22 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string
return false
}

func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) {
if a.telemetrySender != nil {
details := fmt.Sprintf("[%s] %s", flowName, more)
_, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: []string{flowName},
Type: flowName,
})
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err))
return
}
}
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
Expand All @@ -202,6 +232,11 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err))
return
}
a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR)
}

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)
}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
w.RegisterActivity(&activities.SnapshotActivity{
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
Alerter: alerting.NewAlerter(conn),
Alerter: alerting.NewAlerter(context.Background(), conn),
})

return c, w, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
Alerter: alerting.NewAlerter(context.Background(), conn),
CdcCache: make(map[string]connectors.CDCPullConnector),
})

Expand Down
17 changes: 12 additions & 5 deletions flow/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peer-flow

go 1.22
go 1.22.0

require (
cloud.google.com/go v0.112.0
Expand All @@ -10,10 +10,12 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0
github.com/ClickHouse/clickhouse-go/v2 v2.18.0
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.27.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1
github.com/cockroachdb/pebble v1.1.0
github.com/google/uuid v1.6.0
github.com/grafana/pyroscope-go v1.1.1
Expand Down Expand Up @@ -52,6 +54,11 @@ require (
github.com/DataDog/zstd v1.5.5 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
Expand Down Expand Up @@ -103,14 +110,14 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect
github.com/aws/smithy-go v1.20.0 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
Expand Down
18 changes: 10 additions & 8 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw=
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8=
github.com/aws/aws-sdk-go-v2/config v1.27.0 h1:J5sdGCAHuWKIXLeXiqr8II/adSvetkx0qdZwdbXXpb0=
Expand All @@ -76,10 +76,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE=
Expand All @@ -94,14 +94,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 h1:6DL0qu5+315wbsAEEmzK+P9leRwNbkp+lGjPC+CEvb8=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0/go.mod h1:olUAyg+FaoFaL/zFaeQQONjOZ9HXoxgvI/c7mQTYz7M=
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3JmwC77mdxIIyPWCdpM=
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc=
github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ=
github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
5 changes: 5 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,8 @@ func PeerDBEnableWALHeartbeat() bool {
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN
func PeerDBTelemetryAWSSNSTopicArn() string {
return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")
}
25 changes: 25 additions & 0 deletions flow/shared/telemetry/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package telemetry

import (
"context"
)

type Sender interface {
SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error)
}

type Attributes struct {
Level Level
DeploymentUID string
Tags []string
Type string
}

type Level string

const (
INFO Level = "INFO"
WARN Level = "WARN"
ERROR Level = "ERROR"
CRITICAL Level = "CRITICAL"
)
110 changes: 110 additions & 0 deletions flow/shared/telemetry/sns_message_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package telemetry

import (
"context"
"crypto/sha256"
"encoding/hex"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"
"go.temporal.io/sdk/activity"
)

type SNSMessageSender interface {
Sender
}

type SNSMessageSenderImpl struct {
client *sns.Client
topic string
}

type SNSMessageSenderConfig struct {
Topic string `json:"topic"`
}

func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) {
activityInfo := activity.GetInfo(ctx)
deduplicationString := strings.Join([]string{
"deployID", attributes.DeploymentUID,
"subject", subject,
"runID", activityInfo.WorkflowExecution.RunID,
"activityName", activityInfo.ActivityType.Name,
}, " || ")
h := sha256.New()
h.Write([]byte(deduplicationString))
deduplicationHash := hex.EncodeToString(h.Sum(nil))

publish, err := s.client.Publish(ctx, &sns.PublishInput{
Message: aws.String(body),
MessageAttributes: map[string]types.MessageAttributeValue{
"level": {
DataType: aws.String("String"),
StringValue: aws.String(string(attributes.Level)),
},
"tags": {
DataType: aws.String("String"),
StringValue: aws.String(strings.Join(attributes.Tags, ",")),
},
"deploymentUUID": {
DataType: aws.String("String"),
StringValue: aws.String(attributes.DeploymentUID),
},
"entity": {
DataType: aws.String("String"),
StringValue: aws.String(attributes.DeploymentUID),
},
"type": {
DataType: aws.String("String"),
StringValue: aws.String(attributes.Type),
},
"alias": { // This will act as a de-duplication ID
DataType: aws.String("String"),
StringValue: aws.String(deduplicationHash),
},
},
Subject: aws.String(subject[:100]),
TopicArn: aws.String(s.topic),
})
if err != nil {
return nil, err
}
return publish.MessageId, nil
}

func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) {
// Topic Region must match client region
region := strings.Split(strings.TrimPrefix(config.Topic, "arn:aws:sns:"), ":")[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use strings.Cut: https://pkg.go.dev/strings#Cut
May also want strings.CutPrefix with error if Topic is lacking prefix

client, err := newSnsClient(ctx, &region)
if err != nil {
return nil, err
}
return &SNSMessageSenderImpl{
client: client,
topic: config.Topic,
}, nil
}

func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNSMessageSender {
return &SNSMessageSenderImpl{
client: client,
topic: config.Topic,
}
}

func newSnsClient(ctx context.Context, region *string) (*sns.Client, error) {
sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error {
if region != nil {
options.Region = *region
}
return nil
})
if err != nil {
return nil, err
}
snsClient := sns.NewFromConfig(sdkConfig)
return snsClient, nil
}
Loading