Skip to content

Commit

Permalink
feat: add telemetry via sns
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Mar 1, 2024
1 parent 84194d9 commit 227d065
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 15 deletions.
2 changes: 2 additions & 0 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowStart(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName))

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down
15 changes: 11 additions & 4 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0
github.com/ClickHouse/clickhouse-go/v2 v2.18.0
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.27.0
github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1
github.com/cockroachdb/pebble v1.1.0
github.com/google/uuid v1.6.0
github.com/grafana/pyroscope-go v1.1.1
Expand Down Expand Up @@ -52,6 +54,11 @@ require (
github.com/DataDog/zstd v1.5.5 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
Expand Down Expand Up @@ -103,14 +110,14 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect
github.com/aws/smithy-go v1.20.0 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
Expand Down
18 changes: 10 additions & 8 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw=
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8=
github.com/aws/aws-sdk-go-v2/config v1.27.0 h1:J5sdGCAHuWKIXLeXiqr8II/adSvetkx0qdZwdbXXpb0=
Expand All @@ -76,10 +76,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE=
Expand All @@ -94,14 +94,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 h1:6DL0qu5+315wbsAEEmzK+P9leRwNbkp+lGjPC+CEvb8=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0/go.mod h1:olUAyg+FaoFaL/zFaeQQONjOZ9HXoxgvI/c7mQTYz7M=
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3JmwC77mdxIIyPWCdpM=
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc=
github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ=
github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
51 changes: 48 additions & 3 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
"log/slog"
"os"
"time"

"github.com/jackc/pgx/v5"
Expand All @@ -19,7 +21,8 @@ import (

// alerting service, no cool name :(
type Alerter struct {
catalogPool *pgxpool.Pool
catalogPool *pgxpool.Pool
telemetrySender telemetry.Sender
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
Expand Down Expand Up @@ -55,9 +58,20 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
if catalogPool == nil {
return nil, errors.New("catalog pool is nil for Alerter")
}

snsTopic := os.Getenv("TELEMETRY_AWS_SNS_TOPIC_ARN")
var snsMessageSender telemetry.Sender
if snsTopic != "" {
var err error
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(context.TODO(), &telemetry.SNSMessageSenderConfig{
Topic: snsTopic,
})
if err != nil {
return nil, err
}
}
return &Alerter{
catalogPool: catalogPool,
catalogPool: catalogPool,
telemetrySender: snsMessageSender,
}, nil
}

Expand Down Expand Up @@ -194,6 +208,28 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string
return false
}

func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more any, level telemetry.Level) {
if a.telemetrySender != nil {
deployUuidPrefix := ""
deployUuid := peerdbenv.PeerDBDeploymentUID()
if deployUuid != "" {
deployUuidPrefix = fmt.Sprintf("[%s] ", deployUuid)
}

details := fmt.Sprintf("%s[%s] %s", deployUuidPrefix, flowName, more)
_, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{
Level: level,
DeploymentUID: deployUuid,
Tags: []string{flowName},
Type: flowName,
})
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err))
return
}
}
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
errorWithStack := fmt.Sprintf("%+v", err)
_, err = a.catalogPool.Exec(ctx,
Expand All @@ -203,6 +239,12 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err))
return
}
a.sendTelemetryMessage(ctx, flowName, err, telemetry.ERROR)
}

func (a *Alerter) LogFlowStart(ctx context.Context, flowName string, info string) {
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)

}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
Expand All @@ -213,4 +255,7 @@ func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string)
logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err))
return
}
// TODO Maybe too much noise here
//a.sendTelemetryMessage(ctx, flowName, info, INFO)

}
25 changes: 25 additions & 0 deletions flow/shared/telemetry/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package telemetry

import (
"context"
)

type Sender interface {
SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error)
}

type Attributes struct {
Level Level
DeploymentUID string
Tags []string
Type string
}

type Level string

const (
INFO Level = "INFO"
WARN = "WARN"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
)
84 changes: 84 additions & 0 deletions flow/shared/telemetry/sns_message_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package telemetry

import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"
"strings"
)

type SNSMessageSender interface {
Sender
}

type SNSMessageSenderImpl struct {
client *sns.Client
topic string
}

type SNSMessageSenderConfig struct {
Topic string `json:"topic"`
}

func (S *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) {
publish, err := S.client.Publish(ctx, &sns.PublishInput{
Message: aws.String(body),
MessageAttributes: map[string]types.MessageAttributeValue{
"level": {
DataType: aws.String("String"),
StringValue: aws.String(string(attributes.Level)),
},
"tags": {
DataType: aws.String("String"),
StringValue: aws.String(strings.Join(attributes.Tags, ",")),
},
"deploymentUUID": {
DataType: aws.String("String"),
StringValue: aws.String(string(attributes.DeploymentUID)),
},
"entity": {
DataType: aws.String("String"),
StringValue: aws.String(string(attributes.DeploymentUID)),
},
"type": {
DataType: aws.String("String"),
StringValue: aws.String(string(attributes.Type)),
},
},
Subject: aws.String(subject),
TopicArn: aws.String(S.topic),
})
if err != nil {
return nil, err
}
return publish.MessageId, nil
}

func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) {
client, err := newSnsClient(ctx)
if err != nil {
return nil, err
}
return &SNSMessageSenderImpl{
client: client,
topic: config.Topic,
}, nil
}

func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNSMessageSender {
return &SNSMessageSenderImpl{
client: client,
topic: config.Topic,
}
}

func newSnsClient(ctx context.Context) (*sns.Client, error) {
sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, err
}
snsClient := sns.NewFromConfig(sdkConfig)
return snsClient, nil
}

0 comments on commit 227d065

Please sign in to comment.