From 67fd5d3d5dda941e8e138217ea5152ff3bd3009d Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Tue, 5 Mar 2024 00:24:31 +0530 Subject: [PATCH] feat: add telemetry/alerts via sns (#1411) --- flow/activities/snapshot_activity.go | 3 + flow/alerting/alerting.go | 43 +++++++- flow/cmd/snapshot_worker.go | 2 +- flow/cmd/worker.go | 2 +- flow/go.mod | 17 ++- flow/go.sum | 18 ++-- flow/peerdbenv/config.go | 5 + flow/shared/telemetry/interface.go | 25 +++++ flow/shared/telemetry/sns_message_sender.go | 110 ++++++++++++++++++++ 9 files changed, 206 insertions(+), 19 deletions(-) create mode 100644 flow/shared/telemetry/interface.go create mode 100644 flow/shared/telemetry/sns_message_sender.go diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 6ee2bb5a8b..262d3d0dbd 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s connectors.CloseConnector(ctx, s.connector) delete(a.SnapshotConnections, flowJobName) } + a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName) return nil } @@ -49,6 +50,8 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } + a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName) + conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index a837f58889..6fc903aa79 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -14,11 +14,13 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared/telemetry" ) // alerting service, no cool name :( type Alerter struct { - catalogPool *pgxpool.Pool + catalogPool *pgxpool.Pool + telemetrySender telemetry.Sender } func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) { @@ -50,13 +52,25 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen } // doesn't take care of closing pool, needs to be done externally. -func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { +func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { if catalogPool == nil { panic("catalog pool is nil for Alerter") } - + snsTopic := peerdbenv.PeerDBTelemetryAWSSNSTopicArn() + var snsMessageSender telemetry.Sender + if snsTopic != "" { + var err error + snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{ + Topic: snsTopic, + }) + logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender") + if err != nil { + panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err)) + } + } return &Alerter{ - catalogPool: catalogPool, + catalogPool: catalogPool, + telemetrySender: snsMessageSender, } } @@ -193,6 +207,22 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string return false } +func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) { + if a.telemetrySender != nil { + details := fmt.Sprintf("[%s] %s", flowName, more) + _, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{ + Level: level, + DeploymentUID: peerdbenv.PeerDBDeploymentUID(), + Tags: []string{flowName}, + Type: flowName, + }) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err)) + return + } + } +} + func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { errorWithStack := fmt.Sprintf("%+v", err) _, err = a.catalogPool.Exec(ctx, @@ -202,6 +232,11 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err)) return } + a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR) +} + +func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) { + a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO) } func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) { diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index eb9021de1a..d5b9d4b51f 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -68,7 +68,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) w.RegisterActivity(&activities.SnapshotActivity{ SnapshotConnections: make(map[string]activities.SlotSnapshotSignal), - Alerter: alerting.NewAlerter(conn), + Alerter: alerting.NewAlerter(context.Background(), conn), }) return c, w, nil diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 4014d47596..8977108be7 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -120,7 +120,7 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) { w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, - Alerter: alerting.NewAlerter(conn), + Alerter: alerting.NewAlerter(context.Background(), conn), CdcCache: make(map[string]connectors.CDCPullConnector), }) diff --git a/flow/go.mod b/flow/go.mod index a6ad81e4f7..da03d355b3 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow -go 1.22 +go 1.22.0 require ( cloud.google.com/go v0.112.0 @@ -10,10 +10,12 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 github.com/ClickHouse/clickhouse-go/v2 v2.18.0 - github.com/aws/aws-sdk-go-v2 v1.25.0 + github.com/aws/aws-sdk-go-v2 v1.25.2 + github.com/aws/aws-sdk-go-v2/config v1.27.0 github.com/aws/aws-sdk-go-v2/credentials v1.17.0 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 + github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 github.com/cockroachdb/pebble v1.1.0 github.com/google/uuid v1.6.0 github.com/grafana/pyroscope-go v1.1.1 @@ -52,6 +54,11 @@ require ( github.com/DataDog/zstd v1.5.5 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/apache/arrow/go/v14 v14.0.2 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/errors v1.11.1 // indirect @@ -103,14 +110,14 @@ require ( github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect - github.com/aws/smithy-go v1.20.0 // indirect + github.com/aws/smithy-go v1.20.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/djherbis/buffer v1.2.0 github.com/djherbis/nio/v3 v3.0.1 diff --git a/flow/go.sum b/flow/go.sum index 47d6c5edf6..fc03edf962 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -64,8 +64,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= -github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ= -github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA= +github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w= +github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8= github.com/aws/aws-sdk-go-v2/config v1.27.0 h1:J5sdGCAHuWKIXLeXiqr8II/adSvetkx0qdZwdbXXpb0= @@ -76,10 +76,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE= @@ -94,14 +94,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= +github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU= +github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 h1:6DL0qu5+315wbsAEEmzK+P9leRwNbkp+lGjPC+CEvb8= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0/go.mod h1:olUAyg+FaoFaL/zFaeQQONjOZ9HXoxgvI/c7mQTYz7M= github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3JmwC77mdxIIyPWCdpM= github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc= -github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ= -github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc= +github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw= +github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index f6128e56a7..9a59bad5ef 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -90,3 +90,8 @@ func PeerDBEnableWALHeartbeat() bool { func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) } + +// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN +func PeerDBTelemetryAWSSNSTopicArn() string { + return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") +} diff --git a/flow/shared/telemetry/interface.go b/flow/shared/telemetry/interface.go new file mode 100644 index 0000000000..6ee7d6f391 --- /dev/null +++ b/flow/shared/telemetry/interface.go @@ -0,0 +1,25 @@ +package telemetry + +import ( + "context" +) + +type Sender interface { + SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) +} + +type Attributes struct { + Level Level + DeploymentUID string + Tags []string + Type string +} + +type Level string + +const ( + INFO Level = "INFO" + WARN Level = "WARN" + ERROR Level = "ERROR" + CRITICAL Level = "CRITICAL" +) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go new file mode 100644 index 0000000000..42bdd026a7 --- /dev/null +++ b/flow/shared/telemetry/sns_message_sender.go @@ -0,0 +1,110 @@ +package telemetry + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sns/types" + "go.temporal.io/sdk/activity" +) + +type SNSMessageSender interface { + Sender +} + +type SNSMessageSenderImpl struct { + client *sns.Client + topic string +} + +type SNSMessageSenderConfig struct { + Topic string `json:"topic"` +} + +func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { + activityInfo := activity.GetInfo(ctx) + deduplicationString := strings.Join([]string{ + "deployID", attributes.DeploymentUID, + "subject", subject, + "runID", activityInfo.WorkflowExecution.RunID, + "activityName", activityInfo.ActivityType.Name, + }, " || ") + h := sha256.New() + h.Write([]byte(deduplicationString)) + deduplicationHash := hex.EncodeToString(h.Sum(nil)) + + publish, err := s.client.Publish(ctx, &sns.PublishInput{ + Message: aws.String(body), + MessageAttributes: map[string]types.MessageAttributeValue{ + "level": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.Level)), + }, + "tags": { + DataType: aws.String("String"), + StringValue: aws.String(strings.Join(attributes.Tags, ",")), + }, + "deploymentUUID": { + DataType: aws.String("String"), + StringValue: aws.String(attributes.DeploymentUID), + }, + "entity": { + DataType: aws.String("String"), + StringValue: aws.String(attributes.DeploymentUID), + }, + "type": { + DataType: aws.String("String"), + StringValue: aws.String(attributes.Type), + }, + "alias": { // This will act as a de-duplication ID + DataType: aws.String("String"), + StringValue: aws.String(deduplicationHash), + }, + }, + Subject: aws.String(subject[:100]), + TopicArn: aws.String(s.topic), + }) + if err != nil { + return nil, err + } + return publish.MessageId, nil +} + +func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) { + // Topic Region must match client region + region := strings.Split(strings.TrimPrefix(config.Topic, "arn:aws:sns:"), ":")[0] + client, err := newSnsClient(ctx, ®ion) + if err != nil { + return nil, err + } + return &SNSMessageSenderImpl{ + client: client, + topic: config.Topic, + }, nil +} + +func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNSMessageSender { + return &SNSMessageSenderImpl{ + client: client, + topic: config.Topic, + } +} + +func newSnsClient(ctx context.Context, region *string) (*sns.Client, error) { + sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error { + if region != nil { + options.Region = *region + } + return nil + }) + if err != nil { + return nil, err + } + snsClient := sns.NewFromConfig(sdkConfig) + return snsClient, nil +}