diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 0e75ea2e06..c08c3198dd 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -49,6 +49,8 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } + a.Alerter.LogFlowStart(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) + conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) diff --git a/flow/go.mod b/flow/go.mod index a6ad81e4f7..bb66c0f627 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -10,10 +10,12 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 github.com/ClickHouse/clickhouse-go/v2 v2.18.0 - github.com/aws/aws-sdk-go-v2 v1.25.0 + github.com/aws/aws-sdk-go-v2 v1.25.2 + github.com/aws/aws-sdk-go-v2/config v1.27.0 github.com/aws/aws-sdk-go-v2/credentials v1.17.0 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 + github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 github.com/cockroachdb/pebble v1.1.0 github.com/google/uuid v1.6.0 github.com/grafana/pyroscope-go v1.1.1 @@ -52,6 +54,11 @@ require ( github.com/DataDog/zstd v1.5.5 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/apache/arrow/go/v14 v14.0.2 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/errors v1.11.1 // indirect @@ -103,14 +110,14 @@ require ( github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect - github.com/aws/smithy-go v1.20.0 // indirect + github.com/aws/smithy-go v1.20.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/djherbis/buffer v1.2.0 github.com/djherbis/nio/v3 v3.0.1 diff --git a/flow/go.sum b/flow/go.sum index 47d6c5edf6..fc03edf962 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -64,8 +64,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= -github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ= -github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA= +github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w= +github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8= github.com/aws/aws-sdk-go-v2/config v1.27.0 h1:J5sdGCAHuWKIXLeXiqr8II/adSvetkx0qdZwdbXXpb0= @@ -76,10 +76,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE= @@ -94,14 +94,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA= github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= +github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU= +github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 h1:6DL0qu5+315wbsAEEmzK+P9leRwNbkp+lGjPC+CEvb8= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0/go.mod h1:olUAyg+FaoFaL/zFaeQQONjOZ9HXoxgvI/c7mQTYz7M= github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3JmwC77mdxIIyPWCdpM= github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc= -github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ= -github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc= +github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw= +github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 4e95f1d5d1..818807f9ed 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "github.com/PeerDB-io/peer-flow/shared/telemetry" "log/slog" + "os" "time" "github.com/jackc/pgx/v5" @@ -19,7 +21,8 @@ import ( // alerting service, no cool name :( type Alerter struct { - catalogPool *pgxpool.Pool + catalogPool *pgxpool.Pool + telemetrySender telemetry.Sender } func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) { @@ -55,9 +58,20 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { if catalogPool == nil { return nil, errors.New("catalog pool is nil for Alerter") } - + snsTopic := os.Getenv("TELEMETRY_AWS_SNS_TOPIC_ARN") + var snsMessageSender telemetry.Sender + if snsTopic != "" { + var err error + snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(context.TODO(), &telemetry.SNSMessageSenderConfig{ + Topic: snsTopic, + }) + if err != nil { + return nil, err + } + } return &Alerter{ - catalogPool: catalogPool, + catalogPool: catalogPool, + telemetrySender: snsMessageSender, }, nil } @@ -194,6 +208,28 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string return false } +func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more any, level telemetry.Level) { + if a.telemetrySender != nil { + deployUuidPrefix := "" + deployUuid := peerdbenv.PeerDBDeploymentUID() + if deployUuid != "" { + deployUuidPrefix = fmt.Sprintf("[%s] ", deployUuid) + } + + details := fmt.Sprintf("%s[%s] %s", deployUuidPrefix, flowName, more) + _, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{ + Level: level, + DeploymentUID: deployUuid, + Tags: []string{flowName}, + Type: flowName, + }) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err)) + return + } + } +} + func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { errorWithStack := fmt.Sprintf("%+v", err) _, err = a.catalogPool.Exec(ctx, @@ -203,6 +239,12 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err)) return } + a.sendTelemetryMessage(ctx, flowName, err, telemetry.ERROR) +} + +func (a *Alerter) LogFlowStart(ctx context.Context, flowName string, info string) { + a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO) + } func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) { @@ -213,4 +255,7 @@ func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err)) return } + // TODO Maybe too much noise here + //a.sendTelemetryMessage(ctx, flowName, info, INFO) + } diff --git a/flow/shared/telemetry/interface.go b/flow/shared/telemetry/interface.go new file mode 100644 index 0000000000..419e890169 --- /dev/null +++ b/flow/shared/telemetry/interface.go @@ -0,0 +1,25 @@ +package telemetry + +import ( + "context" +) + +type Sender interface { + SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) +} + +type Attributes struct { + Level Level + DeploymentUID string + Tags []string + Type string +} + +type Level string + +const ( + INFO Level = "INFO" + WARN = "WARN" + ERROR = "ERROR" + CRITICAL = "CRITICAL" +) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go new file mode 100644 index 0000000000..9c7deb2b78 --- /dev/null +++ b/flow/shared/telemetry/sns_message_sender.go @@ -0,0 +1,84 @@ +package telemetry + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sns/types" + "strings" +) + +type SNSMessageSender interface { + Sender +} + +type SNSMessageSenderImpl struct { + client *sns.Client + topic string +} + +type SNSMessageSenderConfig struct { + Topic string `json:"topic"` +} + +func (S *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { + publish, err := S.client.Publish(ctx, &sns.PublishInput{ + Message: aws.String(body), + MessageAttributes: map[string]types.MessageAttributeValue{ + "level": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.Level)), + }, + "tags": { + DataType: aws.String("String"), + StringValue: aws.String(strings.Join(attributes.Tags, ",")), + }, + "deploymentUUID": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.DeploymentUID)), + }, + "entity": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.DeploymentUID)), + }, + "type": { + DataType: aws.String("String"), + StringValue: aws.String(string(attributes.Type)), + }, + }, + Subject: aws.String(subject), + TopicArn: aws.String(S.topic), + }) + if err != nil { + return nil, err + } + return publish.MessageId, nil +} + +func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) { + client, err := newSnsClient(ctx) + if err != nil { + return nil, err + } + return &SNSMessageSenderImpl{ + client: client, + topic: config.Topic, + }, nil +} + +func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNSMessageSender { + return &SNSMessageSenderImpl{ + client: client, + topic: config.Topic, + } +} + +func newSnsClient(ctx context.Context) (*sns.Client, error) { + sdkConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + snsClient := sns.NewFromConfig(sdkConfig) + return snsClient, nil +}