From 12866351c63b4df9399b9a0c55b4ba2aea108098 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Mon, 4 Mar 2024 22:49:42 +0530 Subject: [PATCH] refactor: use different env and dedup strategy --- flow/alerting/alerting.go | 5 ++--- flow/peerdbenv/config.go | 5 +++++ flow/shared/telemetry/sns_message_sender.go | 11 ++++++++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 463758d8c4..794d7e6039 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -4,13 +4,12 @@ import ( "context" "encoding/json" "fmt" - "go.temporal.io/sdk/activity" "log/slog" - "os" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "go.temporal.io/sdk/activity" "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -58,7 +57,7 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter { if catalogPool == nil { panic("catalog pool is nil for Alerter") } - snsTopic := os.Getenv("TELEMETRY_AWS_SNS_TOPIC_ARN") + snsTopic := peerdbenv.PeerDBTelemetryAWSSNSTopicArn() var snsMessageSender telemetry.Sender if snsTopic != "" { var err error diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index f6128e56a7..9a59bad5ef 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -90,3 +90,8 @@ func PeerDBEnableWALHeartbeat() bool { func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) } + +// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN +func PeerDBTelemetryAWSSNSTopicArn() string { + return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") +} diff --git a/flow/shared/telemetry/sns_message_sender.go b/flow/shared/telemetry/sns_message_sender.go index 29ba69cef6..42bdd026a7 100644 --- a/flow/shared/telemetry/sns_message_sender.go +++ b/flow/shared/telemetry/sns_message_sender.go @@ -4,14 +4,13 @@ import ( "context" "crypto/sha256" "encoding/hex" - "fmt" "strings" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" + "go.temporal.io/sdk/activity" ) type SNSMessageSender interface { @@ -28,7 +27,13 @@ type SNSMessageSenderConfig struct { } func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) { - deduplicationString := fmt.Sprintf("[%s] - [%s] - [Window: %s]", attributes.DeploymentUID, subject, time.Now().Truncate(30*time.Minute)) + activityInfo := activity.GetInfo(ctx) + deduplicationString := strings.Join([]string{ + "deployID", attributes.DeploymentUID, + "subject", subject, + "runID", activityInfo.WorkflowExecution.RunID, + "activityName", activityInfo.ActivityType.Name, + }, " || ") h := sha256.New() h.Write([]byte(deduplicationString)) deduplicationHash := hex.EncodeToString(h.Sum(nil))