diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index b68274ae46..4beec9dcab 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s connectors.CloseConnector(ctx, s.connector) delete(a.SnapshotConnections, flowJobName) } - a.Alerter.LogFlowEvent(ctx, flowJobName, fmt.Sprintf("Ended Snapshot Flow Job - %s", flowJobName)) + a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName) return nil } @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } - a.Alerter.LogFlowEvent(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) + a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName) conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 328cc29733..622cd5613a 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/PeerDB-io/peer-flow/shared/telemetry" "log/slog" "os" "time" @@ -17,6 +16,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/shared/telemetry" ) // alerting service, no cool name :( @@ -249,7 +249,4 @@ func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err)) return } - // TODO Maybe too much noise here - //a.sendTelemetryMessage(ctx, flowName, info, INFO) - } diff --git a/flow/shared/telemetry/interface.go b/flow/shared/telemetry/interface.go index 419e890169..6ee7d6f391 100644 --- a/flow/shared/telemetry/interface.go +++ b/flow/shared/telemetry/interface.go @@ -19,7 +19,7 @@ type Level string const ( INFO Level = "INFO" - WARN = "WARN" - ERROR = "ERROR" - CRITICAL = "CRITICAL" + WARN Level = "WARN" + ERROR Level = "ERROR" + CRITICAL Level = "CRITICAL" ) diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 9c7deb2b78..101677b5db 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -2,11 +2,12 @@ package telemetry import ( "context" + "strings" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" - "strings" ) type SNSMessageSender interface { @@ -22,8 +23,8 @@ type SNSMessageSenderConfig struct { Topic string `json:"topic"` } -func (S *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { - publish, err := S.client.Publish(ctx, &sns.PublishInput{ +func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { + publish, err := s.client.Publish(ctx, &sns.PublishInput{ Message: aws.String(body), MessageAttributes: map[string]types.MessageAttributeValue{ "level": { @@ -36,19 +37,19 @@ func (S *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, }, "deploymentUUID": { DataType: aws.String("String"), - StringValue: aws.String(string(attributes.DeploymentUID)), + StringValue: aws.String(attributes.DeploymentUID), }, "entity": { DataType: aws.String("String"), - StringValue: aws.String(string(attributes.DeploymentUID)), + StringValue: aws.String(attributes.DeploymentUID), }, "type": { DataType: aws.String("String"), - StringValue: aws.String(string(attributes.Type)), + StringValue: aws.String(attributes.Type), }, }, Subject: aws.String(subject), - TopicArn: aws.String(S.topic), + TopicArn: aws.String(s.topic), }) if err != nil { return nil, err