From 6e41cb51654281b23b250e63c457206d315c65d1 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 04:30:59 +0530 Subject: [PATCH 01/13] feat: add telemetry via sns --- flow/activities/snapshot_activity.go | 2 + flow/alerting/alerting.go | 51 ++++++++++++- flow/go.mod | 15 +++- flow/go.sum | 18 +++-- flow/shared/telemetry/interface.go | 25 ++++++ flow/shared/telemetry/sns_message_sender.go | 84 +++++++++++++++++++++ 6 files changed, 180 insertions(+), 15 deletions(-) create mode 100644 flow/shared/telemetry/interface.go create mode 100644 flow/shared/telemetry/sns_message_sender.go diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 6ee2bb5a8b..8067d1546c 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -49,6 +49,8 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } + a.Alerter.LogFlowStart(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) + conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index a837f58889..e2c4d88f8c 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "fmt" + "github.com/PeerDB-io/peer-flow/shared/telemetry" "log/slog" + "os" "time" "github.com/jackc/pgx/v5" @@ -18,7 +20,8 @@ import ( // alerting service, no cool name :( type Alerter struct { - catalogPool *pgxpool.Pool + catalogPool *pgxpool.Pool + telemetrySender telemetry.Sender } func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) { @@ -54,9 +57,20 @@ func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { if catalogPool == nil { panic("catalog pool is nil for Alerter") } - + snsTopic := os.Getenv("TELEMETRY_AWS_SNS_TOPIC_ARN") + var snsMessageSender telemetry.Sender + if snsTopic != "" { + var err error + snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(context.TODO(), &telemetry.SNSMessageSenderConfig{ + Topic: snsTopic, + }) + if err != nil { + panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err)) + } + } return &Alerter{ - catalogPool: catalogPool, + catalogPool: catalogPool, + telemetrySender: snsMessageSender, } } @@ -193,6 +207,28 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string return false } +func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more any, level telemetry.Level) { + if a.telemetrySender != nil { + deployUuidPrefix := "" + deployUuid := peerdbenv.PeerDBDeploymentUID() + if deployUuid != "" { + deployUuidPrefix = fmt.Sprintf("[%s] ", deployUuid) + } + + details := fmt.Sprintf("%s[%s] %s", deployUuidPrefix, flowName, more) + _, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{ + Level: level, + DeploymentUID: deployUuid, + Tags: []string{flowName}, + Type: flowName, + }) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err)) + return + } + } +} + func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { errorWithStack := fmt.Sprintf("%+v", err) _, err = a.catalogPool.Exec(ctx, @@ -202,6 +238,12 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err)) return } + a.sendTelemetryMessage(ctx, flowName, err, telemetry.ERROR) +} + +func (a *Alerter) LogFlowStart(ctx context.Context, flowName string, info string) { + a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO) + } func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) { @@ -212,4 +254,7 @@ func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err)) return } + // TODO Maybe too much noise here + //a.sendTelemetryMessage(ctx, flowName, info, INFO) + } diff --git a/flow/go.mod b/flow/go.mod index a6ad81e4f7..bb66c0f627 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -10,10 +10,12 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 github.com/ClickHouse/clickhouse-go/v2 v2.18.0 - github.com/aws/aws-sdk-go-v2 v1.25.0 + github.com/aws/aws-sdk-go-v2 v1.25.2 + github.com/aws/aws-sdk-go-v2/config v1.27.0 github.com/aws/aws-sdk-go-v2/credentials v1.17.0 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 + github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 github.com/cockroachdb/pebble v1.1.0 github.com/google/uuid v1.6.0 github.com/grafana/pyroscope-go v1.1.1 @@ -52,6 +54,11 @@ require ( github.com/DataDog/zstd v1.5.5 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/apache/arrow/go/v14 v14.0.2 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/errors v1.11.1 // indirect @@ -103,14 +110,14 @@ require ( github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect - github.com/aws/smithy-go v1.20.0 // indirect + github.com/aws/smithy-go v1.20.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/djherbis/buffer v1.2.0 github.com/djherbis/nio/v3 v3.0.1 diff --git a/flow/go.sum b/flow/go.sum index 47d6c5edf6..fc03edf962 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -64,8 +64,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= -github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ= -github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA= +github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w= +github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8= github.com/aws/aws-sdk-go-v2/config v1.27.0 h1:J5sdGCAHuWKIXLeXiqr8II/adSvetkx0qdZwdbXXpb0= @@ -76,10 +76,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE= @@ -94,14 +94,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= +github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU= +github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 h1:6DL0qu5+315wbsAEEmzK+P9leRwNbkp+lGjPC+CEvb8= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0/go.mod h1:olUAyg+FaoFaL/zFaeQQONjOZ9HXoxgvI/c7mQTYz7M= github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3JmwC77mdxIIyPWCdpM= github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc= -github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ= -github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc= +github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw= +github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/flow/shared/telemetry/interface.go b/flow/shared/telemetry/interface.go new file mode 100644 index 0000000000..419e890169 --- /dev/null +++ b/flow/shared/telemetry/interface.go @@ -0,0 +1,25 @@ +package telemetry + +import ( + "context" +) + +type Sender interface { + SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) +} + +type Attributes struct { + Level Level + DeploymentUID string + Tags []string + Type string +} + +type Level string + +const ( + INFO Level = "INFO" + WARN = "WARN" + ERROR = "ERROR" + CRITICAL = "CRITICAL" +) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go new file mode 100644 index 0000000000..9c7deb2b78 --- /dev/null +++ b/flow/shared/telemetry/sns_message_sender.go @@ -0,0 +1,84 @@ +package telemetry + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sns/types" + "strings" +) + +type SNSMessageSender interface { + Sender +} + +type SNSMessageSenderImpl struct { + client *sns.Client + topic string +} + +type SNSMessageSenderConfig struct { + Topic string `json:"topic"` +} + +func (S *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { + publish, err := S.client.Publish(ctx, &sns.PublishInput{ + Message: aws.String(body), + MessageAttributes: map[string]types.MessageAttributeValue{ + "level": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.Level)), + }, + "tags": { + DataType: aws.String("String"), + StringValue: aws.String(strings.Join(attributes.Tags, ",")), + }, + "deploymentUUID": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.DeploymentUID)), + }, + "entity": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.DeploymentUID)), + }, + "type": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.Type)), + }, + }, + Subject: aws.String(subject), + TopicArn: aws.String(S.topic), + }) + if err != nil { + return nil, err + } + return publish.MessageId, nil +} + +func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) { + client, err := newSnsClient(ctx) + if err != nil { + return nil, err + } + return &SNSMessageSenderImpl{ + client: client, + topic: config.Topic, + }, nil +} + +func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNSMessageSender { + return &SNSMessageSenderImpl{ + client: client, + topic: config.Topic, + } +} + +func newSnsClient(ctx context.Context) (*sns.Client, error) { + sdkConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + snsClient := sns.NewFromConfig(sdkConfig) + return snsClient, nil +} From 4ec0babf3ba4c290201b0e2ac4f7da97aa56affd Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 22:03:28 +0530 Subject: [PATCH 02/13] feat: add end of snaphost event --- flow/activities/snapshot_activity.go | 3 ++- flow/alerting/alerting.go | 18 ++++++------------ flow/cmd/snapshot_worker.go | 2 +- flow/cmd/worker.go | 2 +- flow/go.mod | 2 +- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 8067d1546c..30ca43ee45 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s connectors.CloseConnector(ctx, s.connector) delete(a.SnapshotConnections, flowJobName) } + a.Alerter.LogFlowEvent(ctx, flowJobName, fmt.Sprintf("Ended Snapshot Flow Job - %s", flowJobName)) return nil } @@ -49,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } - a.Alerter.LogFlowStart(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) + a.Alerter.LogFlowEvent(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index e2c4d88f8c..4ae2c773bf 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -53,7 +53,7 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen } // doesn't take care of closing pool, needs to be done externally. -func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { +func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { if catalogPool == nil { panic("catalog pool is nil for Alerter") } @@ -61,9 +61,10 @@ func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { var snsMessageSender telemetry.Sender if snsTopic != "" { var err error - snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(context.TODO(), &telemetry.SNSMessageSenderConfig{ + snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{ Topic: snsTopic, }) + logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender") if err != nil { panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err)) } @@ -209,16 +210,10 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more any, level telemetry.Level) { if a.telemetrySender != nil { - deployUuidPrefix := "" - deployUuid := peerdbenv.PeerDBDeploymentUID() - if deployUuid != "" { - deployUuidPrefix = fmt.Sprintf("[%s] ", deployUuid) - } - - details := fmt.Sprintf("%s[%s] %s", deployUuidPrefix, flowName, more) + details := fmt.Sprintf("[%s] %s", flowName, more) _, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{ Level: level, - DeploymentUID: deployUuid, + DeploymentUID: peerdbenv.PeerDBDeploymentUID(), Tags: []string{flowName}, Type: flowName, }) @@ -241,9 +236,8 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) a.sendTelemetryMessage(ctx, flowName, err, telemetry.ERROR) } -func (a *Alerter) LogFlowStart(ctx context.Context, flowName string, info string) { +func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) { a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO) - } func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) { diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index eb9021de1a..d5b9d4b51f 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -68,7 +68,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) w.RegisterActivity(&activities.SnapshotActivity{ SnapshotConnections: make(map[string]activities.SlotSnapshotSignal), - Alerter: alerting.NewAlerter(conn), + Alerter: alerting.NewAlerter(context.Background(), conn), }) return c, w, nil diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 4014d47596..8977108be7 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -120,7 +120,7 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) { w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, - Alerter: alerting.NewAlerter(conn), + Alerter: alerting.NewAlerter(context.Background(), conn), CdcCache: make(map[string]connectors.CDCPullConnector), }) diff --git a/flow/go.mod b/flow/go.mod index bb66c0f627..da03d355b3 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow -go 1.22 +go 1.22.0 require ( cloud.google.com/go v0.112.0 From 16fd46657086c584e50c9fa39500d96ecb8bba8b Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 22:11:35 +0530 Subject: [PATCH 03/13] chore: linter fixes --- flow/activities/snapshot_activity.go | 4 ++-- flow/alerting/alerting.go | 5 +---- flow/shared/telemetry/interface.go | 6 +++--- flow/shared/telemetry/sns_message_sender.go | 15 ++++++++------- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 30ca43ee45..262d3d0dbd 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s connectors.CloseConnector(ctx, s.connector) delete(a.SnapshotConnections, flowJobName) } - a.Alerter.LogFlowEvent(ctx, flowJobName, fmt.Sprintf("Ended Snapshot Flow Job - %s", flowJobName)) + a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName) return nil } @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } - a.Alerter.LogFlowEvent(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) + a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName) conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 4ae2c773bf..9df98d7c11 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/PeerDB-io/peer-flow/shared/telemetry" "log/slog" "os" "time" @@ -16,6 +15,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared/telemetry" ) // alerting service, no cool name :( @@ -248,7 +248,4 @@ func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err)) return } - // TODO Maybe too much noise here - //a.sendTelemetryMessage(ctx, flowName, info, INFO) - } diff --git a/flow/shared/telemetry/interface.go b/flow/shared/telemetry/interface.go index 419e890169..6ee7d6f391 100644 --- a/flow/shared/telemetry/interface.go +++ b/flow/shared/telemetry/interface.go @@ -19,7 +19,7 @@ type Level string const ( INFO Level = "INFO" - WARN = "WARN" - ERROR = "ERROR" - CRITICAL = "CRITICAL" + WARN Level = "WARN" + ERROR Level = "ERROR" + CRITICAL Level = "CRITICAL" ) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 9c7deb2b78..101677b5db 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -2,11 +2,12 @@ package telemetry import ( "context" + "strings" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" - "strings" ) type SNSMessageSender interface { @@ -22,8 +23,8 @@ type SNSMessageSenderConfig struct { Topic string `json:"topic"` } -func (S *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { - publish, err := S.client.Publish(ctx, &sns.PublishInput{ +func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { + publish, err := s.client.Publish(ctx, &sns.PublishInput{ Message: aws.String(body), MessageAttributes: map[string]types.MessageAttributeValue{ "level": { @@ -36,19 +37,19 @@ func (S *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, }, "deploymentUUID": { DataType: aws.String("String"), - StringValue: aws.String(string(attributes.DeploymentUID)), + StringValue: aws.String(attributes.DeploymentUID), }, "entity": { DataType: aws.String("String"), - StringValue: aws.String(string(attributes.DeploymentUID)), + StringValue: aws.String(attributes.DeploymentUID), }, "type": { DataType: aws.String("String"), - StringValue: aws.String(string(attributes.Type)), + StringValue: aws.String(attributes.Type), }, }, Subject: aws.String(subject), - TopicArn: aws.String(S.topic), + TopicArn: aws.String(s.topic), }) if err != nil { return nil, err From a49ea06224125e895fc34cd7852c09aed077b222 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 23:29:01 +0530 Subject: [PATCH 04/13] fix: topic region must match client region --- flow/shared/telemetry/sns_message_sender.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 101677b5db..06cdf3ba90 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -58,7 +58,9 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, } func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) { - client, err := newSnsClient(ctx) + // Topic Region must match client region + region := strings.Split(strings.TrimPrefix(config.Topic, "arn:aws:sns:"), ":")[0] + client, err := newSnsClient(ctx, ®ion) if err != nil { return nil, err } @@ -75,8 +77,12 @@ func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNS } } -func newSnsClient(ctx context.Context) (*sns.Client, error) { - sdkConfig, err := config.LoadDefaultConfig(ctx) +func newSnsClient(ctx context.Context, region *string) (*sns.Client, error) { + sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error { + if region != nil { + options.Region = *region + } + }) if err != nil { return nil, err } From 83e990bae9521557a6c4bcedc160dcf5d31c1467 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 23:29:25 +0530 Subject: [PATCH 05/13] fix: add return --- flow/shared/telemetry/sns_message_sender.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 06cdf3ba90..27388a2cf7 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -82,6 +82,7 @@ func newSnsClient(ctx context.Context, region *string) (*sns.Client, error) { if region != nil { options.Region = *region } + return nil }) if err != nil { return nil, err From acc7b28221416e287b4856f3ef5b4fdd050026cc Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 23:50:09 +0530 Subject: [PATCH 06/13] fix: error not propagating properly - also attempt to add information from context --- flow/alerting/alerting.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 9df98d7c11..e69c028a29 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -208,10 +208,10 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string return false } -func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more any, level telemetry.Level) { +func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) { if a.telemetrySender != nil { details := fmt.Sprintf("[%s] %s", flowName, more) - _, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{ + _, err := a.telemetrySender.SendMessage(ctx, details, fmt.Sprintf("%s\n%+v", details, ctx), telemetry.Attributes{ Level: level, DeploymentUID: peerdbenv.PeerDBDeploymentUID(), Tags: []string{flowName}, @@ -233,7 +233,7 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err)) return } - a.sendTelemetryMessage(ctx, flowName, err, telemetry.ERROR) + a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR) } func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) { From df706b71925b84154567d452677e2b63f494ad85 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 23:57:36 +0530 Subject: [PATCH 07/13] fix: sns subject limit --- flow/shared/telemetry/sns_message_sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 27388a2cf7..641bb3cb5e 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -48,7 +48,7 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, StringValue: aws.String(attributes.Type), }, }, - Subject: aws.String(subject), + Subject: aws.String(subject[:100]), TopicArn: aws.String(s.topic), }) if err != nil { From eb95ea81811552d5e827001eecf9f549b2231e9f Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Sat, 2 Mar 2024 00:16:06 +0530 Subject: [PATCH 08/13] feat: add alias as a de-duplication ID --- flow/shared/telemetry/sns_message_sender.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 641bb3cb5e..c3bc2a6b71 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -2,7 +2,9 @@ package telemetry import ( "context" + "fmt" "strings" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -47,6 +49,10 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, DataType: aws.String("String"), StringValue: aws.String(attributes.Type), }, + "alias": { // This will act as a de-duplication ID + DataType: aws.String("String"), + StringValue: aws.String(fmt.Sprintf("[%s] - [%s] - [Window: %s]", attributes.DeploymentUID, subject, time.Now().Truncate(30*time.Minute))), + }, }, Subject: aws.String(subject[:100]), TopicArn: aws.String(s.topic), From 44e92cf2ac62b3cbb72cf67d623b07ddf8a56cd7 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Sat, 2 Mar 2024 01:01:35 +0530 Subject: [PATCH 09/13] chore: use sha as dedup id instead --- flow/shared/telemetry/sns_message_sender.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index c3bc2a6b71..29ba69cef6 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -2,6 +2,8 @@ package telemetry import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "strings" "time" @@ -26,6 +28,11 @@ type SNSMessageSenderConfig struct { } func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { + deduplicationString := fmt.Sprintf("[%s] - [%s] - [Window: %s]", attributes.DeploymentUID, subject, time.Now().Truncate(30*time.Minute)) + h := sha256.New() + h.Write([]byte(deduplicationString)) + deduplicationHash := hex.EncodeToString(h.Sum(nil)) + publish, err := s.client.Publish(ctx, &sns.PublishInput{ Message: aws.String(body), MessageAttributes: map[string]types.MessageAttributeValue{ @@ -51,7 +58,7 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, }, "alias": { // This will act as a de-duplication ID DataType: aws.String("String"), - StringValue: aws.String(fmt.Sprintf("[%s] - [%s] - [Window: %s]", attributes.DeploymentUID, subject, time.Now().Truncate(30*time.Minute))), + StringValue: aws.String(deduplicationHash), }, }, Subject: aws.String(subject[:100]), From 0214642bc0ccb8c87d9df058b077f05cc3a2d5d0 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Sat, 2 Mar 2024 01:43:19 +0530 Subject: [PATCH 10/13] feat: attach activity info with body --- flow/alerting/alerting.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index e69c028a29..463758d8c4 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "go.temporal.io/sdk/activity" "log/slog" "os" "time" @@ -211,7 +212,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) { if a.telemetrySender != nil { details := fmt.Sprintf("[%s] %s", flowName, more) - _, err := a.telemetrySender.SendMessage(ctx, details, fmt.Sprintf("%s\n%+v", details, ctx), telemetry.Attributes{ + _, err := a.telemetrySender.SendMessage(ctx, details, fmt.Sprintf("%s\n%+v", details, activity.GetInfo(ctx)), telemetry.Attributes{ Level: level, DeploymentUID: peerdbenv.PeerDBDeploymentUID(), Tags: []string{flowName}, From 12866351c63b4df9399b9a0c55b4ba2aea108098 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Mon, 4 Mar 2024 22:49:42 +0530 Subject: [PATCH 11/13] refactor: use different env and dedup strategy --- flow/alerting/alerting.go | 5 ++--- flow/peerdbenv/config.go | 5 +++++ flow/shared/telemetry/sns_message_sender.go | 11 ++++++++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 463758d8c4..794d7e6039 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -4,13 +4,12 @@ import ( "context" "encoding/json" "fmt" - "go.temporal.io/sdk/activity" "log/slog" - "os" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "go.temporal.io/sdk/activity" "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -58,7 +57,7 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { if catalogPool == nil { panic("catalog pool is nil for Alerter") } - snsTopic := os.Getenv("TELEMETRY_AWS_SNS_TOPIC_ARN") + snsTopic := peerdbenv.PeerDBTelemetryAWSSNSTopicArn() var snsMessageSender telemetry.Sender if snsTopic != "" { var err error diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index f6128e56a7..9a59bad5ef 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -90,3 +90,8 @@ func PeerDBEnableWALHeartbeat() bool { func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) } + +// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN +func PeerDBTelemetryAWSSNSTopicArn() string { + return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") +} diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 29ba69cef6..42bdd026a7 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -4,14 +4,13 @@ import ( "context" "crypto/sha256" "encoding/hex" - "fmt" "strings" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" + "go.temporal.io/sdk/activity" ) type SNSMessageSender interface { @@ -28,7 +27,13 @@ type SNSMessageSenderConfig struct { } func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { - deduplicationString := fmt.Sprintf("[%s] - [%s] - [Window: %s]", attributes.DeploymentUID, subject, time.Now().Truncate(30*time.Minute)) + activityInfo := activity.GetInfo(ctx) + deduplicationString := strings.Join([]string{ + "deployID", attributes.DeploymentUID, + "subject", subject, + "runID", activityInfo.WorkflowExecution.RunID, + "activityName", activityInfo.ActivityType.Name, + }, " || ") h := sha256.New() h.Write([]byte(deduplicationString)) deduplicationHash := hex.EncodeToString(h.Sum(nil)) From a616dc5f11c35dcfef1222d85c9ec61c018ee8f3 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Mon, 4 Mar 2024 23:00:40 +0530 Subject: [PATCH 12/13] chore: remove extra activity info from body --- flow/alerting/alerting.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 794d7e6039..1c086ec179 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -7,15 +7,13 @@ import ( "log/slog" "time" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - "go.temporal.io/sdk/activity" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared/telemetry" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) // alerting service, no cool name :( @@ -211,7 +209,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) { if a.telemetrySender != nil { details := fmt.Sprintf("[%s] %s", flowName, more) - _, err := a.telemetrySender.SendMessage(ctx, details, fmt.Sprintf("%s\n%+v", details, activity.GetInfo(ctx)), telemetry.Attributes{ + _, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{ Level: level, DeploymentUID: peerdbenv.PeerDBDeploymentUID(), Tags: []string{flowName}, From 67414b74c7998674ed500d796b55c364ea7b6565 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Tue, 5 Mar 2024 00:18:18 +0530 Subject: [PATCH 13/13] chore: linter fixes --- flow/alerting/alerting.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 1c086ec179..6fc903aa79 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -7,13 +7,14 @@ import ( "log/slog" "time" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared/telemetry" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" ) // alerting service, no cool name :(