Skip to content

Commit

Permalink
feat: add telemetry/alerts via sns (#1411)
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta authored Mar 4, 2024
1 parent 39c6e7e commit 67fd5d3
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 19 deletions.
3 changes: 3 additions & 0 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)

return nil
}
Expand All @@ -49,6 +50,8 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down
43 changes: 39 additions & 4 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
)

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
catalogPool *pgxpool.Pool
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
Expand Down Expand Up @@ -50,13 +52,25 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
if catalogPool == nil {
panic("catalog pool is nil for Alerter")
}

snsTopic := peerdbenv.PeerDBTelemetryAWSSNSTopicArn()
var snsMessageSender telemetry.Sender
if snsTopic != "" {
var err error
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{
Topic: snsTopic,
})
logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender")
if err != nil {
panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err))
}
}
return &Alerter{
catalogPool: catalogPool,
catalogPool: catalogPool,
telemetrySender: snsMessageSender,
}
}

Expand Down Expand Up @@ -193,6 +207,22 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string
return false
}

func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) {
if a.telemetrySender != nil {
details := fmt.Sprintf("[%s] %s", flowName, more)
_, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: []string{flowName},
Type: flowName,
})
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err))
return
}
}
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
Expand All @@ -202,6 +232,11 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err))
return
}
a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR)
}

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)
}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
w.RegisterActivity(&activities.SnapshotActivity{
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
Alerter: alerting.NewAlerter(conn),
Alerter: alerting.NewAlerter(context.Background(), conn),
})

return c, w, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {

w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
Alerter: alerting.NewAlerter(context.Background(), conn),
CdcCache: make(map[string]connectors.CDCPullConnector),
})

Expand Down
17 changes: 12 additions & 5 deletions flow/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peer-flow

go 1.22
go 1.22.0

require (
cloud.google.com/go v0.112.0
Expand All @@ -10,10 +10,12 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0
github.com/ClickHouse/clickhouse-go/v2 v2.18.0
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.27.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1
github.com/cockroachdb/pebble v1.1.0
github.com/google/uuid v1.6.0
github.com/grafana/pyroscope-go v1.1.1
Expand Down Expand Up @@ -52,6 +54,11 @@ require (
github.com/DataDog/zstd v1.5.5 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
Expand Down Expand Up @@ -103,14 +110,14 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect
github.com/aws/smithy-go v1.20.0 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
Expand Down
18 changes: 10 additions & 8 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw=
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8=
github.com/aws/aws-sdk-go-v2/config v1.27.0 h1:J5sdGCAHuWKIXLeXiqr8II/adSvetkx0qdZwdbXXpb0=
Expand All @@ -76,10 +76,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE=
Expand All @@ -94,14 +94,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 h1:6DL0qu5+315wbsAEEmzK+P9leRwNbkp+lGjPC+CEvb8=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0/go.mod h1:olUAyg+FaoFaL/zFaeQQONjOZ9HXoxgvI/c7mQTYz7M=
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3JmwC77mdxIIyPWCdpM=
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc=
github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ=
github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
5 changes: 5 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,8 @@ func PeerDBEnableWALHeartbeat() bool {
func PeerDBEnableParallelSyncNormalize() bool {
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
}

// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN
func PeerDBTelemetryAWSSNSTopicArn() string {
return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")
}
25 changes: 25 additions & 0 deletions flow/shared/telemetry/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package telemetry

import (
"context"
)

type Sender interface {
SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error)
}

type Attributes struct {
Level Level
DeploymentUID string
Tags []string
Type string
}

type Level string

const (
INFO Level = "INFO"
WARN Level = "WARN"
ERROR Level = "ERROR"
CRITICAL Level = "CRITICAL"
)
110 changes: 110 additions & 0 deletions flow/shared/telemetry/sns_message_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package telemetry

import (
"context"
"crypto/sha256"
"encoding/hex"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"
"go.temporal.io/sdk/activity"
)

type SNSMessageSender interface {
Sender
}

type SNSMessageSenderImpl struct {
client *sns.Client
topic string
}

type SNSMessageSenderConfig struct {
Topic string `json:"topic"`
}

func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) {
activityInfo := activity.GetInfo(ctx)
deduplicationString := strings.Join([]string{
"deployID", attributes.DeploymentUID,
"subject", subject,
"runID", activityInfo.WorkflowExecution.RunID,
"activityName", activityInfo.ActivityType.Name,
}, " || ")
h := sha256.New()
h.Write([]byte(deduplicationString))
deduplicationHash := hex.EncodeToString(h.Sum(nil))

publish, err := s.client.Publish(ctx, &sns.PublishInput{
Message: aws.String(body),
MessageAttributes: map[string]types.MessageAttributeValue{
"level": {
DataType: aws.String("String"),
StringValue: aws.String(string(attributes.Level)),
},
"tags": {
DataType: aws.String("String"),
StringValue: aws.String(strings.Join(attributes.Tags, ",")),
},
"deploymentUUID": {
DataType: aws.String("String"),
StringValue: aws.String(attributes.DeploymentUID),
},
"entity": {
DataType: aws.String("String"),
StringValue: aws.String(attributes.DeploymentUID),
},
"type": {
DataType: aws.String("String"),
StringValue: aws.String(attributes.Type),
},
"alias": { // This will act as a de-duplication ID
DataType: aws.String("String"),
StringValue: aws.String(deduplicationHash),
},
},
Subject: aws.String(subject[:100]),
TopicArn: aws.String(s.topic),
})
if err != nil {
return nil, err
}
return publish.MessageId, nil
}

func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) {
// Topic Region must match client region
region := strings.Split(strings.TrimPrefix(config.Topic, "arn:aws:sns:"), ":")[0]
client, err := newSnsClient(ctx, &region)
if err != nil {
return nil, err
}
return &SNSMessageSenderImpl{
client: client,
topic: config.Topic,
}, nil
}

func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNSMessageSender {
return &SNSMessageSenderImpl{
client: client,
topic: config.Topic,
}
}

func newSnsClient(ctx context.Context, region *string) (*sns.Client, error) {
sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error {
if region != nil {
options.Region = *region
}
return nil
})
if err != nil {
return nil, err
}
snsClient := sns.NewFromConfig(sdkConfig)
return snsClient, nil
}

0 comments on commit 67fd5d3

Please sign in to comment.