From b6aa80a7d7fc258c62c1dfe1309d8ef81279be20 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Fri, 1 Mar 2024 22:03:28 +0530 Subject: [PATCH] feat: add end of snaphost event --- flow/activities/snapshot_activity.go | 3 ++- flow/cmd/snapshot_worker.go | 2 +- flow/cmd/worker.go | 2 +- flow/e2e/test_utils.go | 2 +- flow/go.mod | 2 +- flow/shared/alerting/alerting.go | 18 ++++++------------ 6 files changed, 12 insertions(+), 17 deletions(-) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index c08c3198dd..b68274ae46 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s connectors.CloseConnector(ctx, s.connector) delete(a.SnapshotConnections, flowJobName) } + a.Alerter.LogFlowEvent(ctx, flowJobName, fmt.Sprintf("Ended Snapshot Flow Job - %s", flowJobName)) return nil } @@ -49,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } - a.Alerter.LogFlowStart(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) + a.Alerter.LogFlowEvent(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName)) conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index bc53785382..f0ecbaadd5 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -67,7 +67,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { return fmt.Errorf("unable to create catalog connection pool: %w", err) } - alerter, err := alerting.NewAlerter(conn) + alerter, err := alerting.NewAlerter(context.Background(), conn) if err != nil { return fmt.Errorf("unable to create alerter: %w", err) } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index ee9218a9da..d2d0308c78 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -133,7 +133,7 @@ func WorkerMain(opts *WorkerOptions) error { }) peerflow.RegisterFlowWorkerWorkflows(w) - alerter, err := alerting.NewAlerter(conn) + alerter, err := alerting.NewAlerter(context.Background(), conn) if err != nil { return fmt.Errorf("unable to create alerter: %w", err) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 598297106b..7bcff99bde 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -63,7 +63,7 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv peerflow.RegisterFlowWorkerWorkflows(env) env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) - alerter, err := alerting.NewAlerter(conn) + alerter, err := alerting.NewAlerter(context.Background(), conn) if err != nil { t.Fatalf("unable to create alerter: %v", err) } diff --git a/flow/go.mod b/flow/go.mod index bb66c0f627..da03d355b3 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow -go 1.22 +go 1.22.0 require ( cloud.google.com/go v0.112.0 diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 818807f9ed..328cc29733 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -54,7 +54,7 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen } // doesn't take care of closing pool, needs to be done externally. -func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { +func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) (*Alerter, error) { if catalogPool == nil { return nil, errors.New("catalog pool is nil for Alerter") } @@ -62,9 +62,10 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { var snsMessageSender telemetry.Sender if snsTopic != "" { var err error - snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(context.TODO(), &telemetry.SNSMessageSenderConfig{ + snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{ Topic: snsTopic, }) + logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender") if err != nil { return nil, err } @@ -210,16 +211,10 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more any, level telemetry.Level) { if a.telemetrySender != nil { - deployUuidPrefix := "" - deployUuid := peerdbenv.PeerDBDeploymentUID() - if deployUuid != "" { - deployUuidPrefix = fmt.Sprintf("[%s] ", deployUuid) - } - - details := fmt.Sprintf("%s[%s] %s", deployUuidPrefix, flowName, more) + details := fmt.Sprintf("[%s] %s", flowName, more) _, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{ Level: level, - DeploymentUID: deployUuid, + DeploymentUID: peerdbenv.PeerDBDeploymentUID(), Tags: []string{flowName}, Type: flowName, }) @@ -242,9 +237,8 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) a.sendTelemetryMessage(ctx, flowName, err, telemetry.ERROR) } -func (a *Alerter) LogFlowStart(ctx context.Context, flowName string, info string) { +func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) { a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO) - } func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {