Skip to content

Commit

Permalink
feat: add end of snaphost event
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Mar 1, 2024
1 parent bed7bc3 commit 3dce54c
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 17 deletions.
3 changes: 2 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, fmt.Sprintf("Ended Snapshot Flow Job - %s", flowJobName))

return nil
}
Expand All @@ -49,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowStart(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName))
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, fmt.Sprintf("Started Snapshot Flow Job - %s", config.FlowJobName))

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
}

alerter, err := alerting.NewAlerter(conn)
alerter, err := alerting.NewAlerter(context.Background(), conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func WorkerMain(opts *WorkerOptions) error {
})
peerflow.RegisterFlowWorkerWorkflows(w)

alerter, err := alerting.NewAlerter(conn)
alerter, err := alerting.NewAlerter(context.Background(), conn)
if err != nil {
return fmt.Errorf("unable to create alerter: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv
peerflow.RegisterFlowWorkerWorkflows(env)
env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)

alerter, err := alerting.NewAlerter(conn)
alerter, err := alerting.NewAlerter(context.Background(), conn)
if err != nil {
t.Fatalf("unable to create alerter: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peer-flow

go 1.22
go 1.22.0

require (
cloud.google.com/go v0.112.0
Expand Down
18 changes: 6 additions & 12 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen
}

// doesn't take care of closing pool, needs to be done externally.
func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) {
func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) (*Alerter, error) {
if catalogPool == nil {
return nil, errors.New("catalog pool is nil for Alerter")
}
snsTopic := os.Getenv("TELEMETRY_AWS_SNS_TOPIC_ARN")
var snsMessageSender telemetry.Sender
if snsTopic != "" {
var err error
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(context.TODO(), &telemetry.SNSMessageSenderConfig{
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{
Topic: snsTopic,
})
logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,16 +211,10 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string

func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more any, level telemetry.Level) {
if a.telemetrySender != nil {
deployUuidPrefix := ""
deployUuid := peerdbenv.PeerDBDeploymentUID()
if deployUuid != "" {
deployUuidPrefix = fmt.Sprintf("[%s] ", deployUuid)
}

details := fmt.Sprintf("%s[%s] %s", deployUuidPrefix, flowName, more)
details := fmt.Sprintf("[%s] %s", flowName, more)
_, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{
Level: level,
DeploymentUID: deployUuid,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: []string{flowName},
Type: flowName,
})
Expand All @@ -242,9 +237,8 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
a.sendTelemetryMessage(ctx, flowName, err, telemetry.ERROR)
}

func (a *Alerter) LogFlowStart(ctx context.Context, flowName string, info string) {
func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)

}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
Expand Down

0 comments on commit 3dce54c

Please sign in to comment.