Skip to content

Commit

Permalink
Merge pull request #187 from mfreeman451/fix/node_recovery_issue
Browse files Browse the repository at this point in the history
Fix/node recovery issue
  • Loading branch information
mfreeman451 authored Feb 6, 2025
2 parents 5175cc3 + 1f003c5 commit 02b6b15
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 133 deletions.
15 changes: 8 additions & 7 deletions pkg/cloud/alerts/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand All @@ -15,12 +16,12 @@ import (
)

var (
errWebhookDisabled = fmt.Errorf("webhook alerter is disabled")
errWebhookCooldown = fmt.Errorf("alert is within cooldown period")
errInvalidJSON = fmt.Errorf("invalid JSON generated")
errWebhookStatus = fmt.Errorf("webhook returned non-200 status")
errTemplateParse = fmt.Errorf("template parsing failed")
errTemplateExecution = fmt.Errorf("template execution failed")
errWebhookDisabled = errors.New("webhook alerter is disabled")
ErrWebhookCooldown = errors.New("alert is within cooldown period")
errInvalidJSON = errors.New("invalid JSON generated")
errWebhookStatus = errors.New("webhook returned non-200 status")
errTemplateParse = errors.New("template parsing failed")
errTemplateExecution = errors.New("template execution failed")
)

type WebhookConfig struct {
Expand Down Expand Up @@ -157,7 +158,7 @@ func (w *WebhookAlerter) checkCooldown(alertTitle string) error {
lastAlertTime, exists := w.lastAlertTimes[alertTitle]
if exists && time.Since(lastAlertTime) < w.config.Cooldown {
log.Printf("Alert '%s' is within cooldown period, skipping", alertTitle)
return errWebhookCooldown
return ErrWebhookCooldown
}

w.lastAlertTimes[alertTitle] = time.Now()
Expand Down
10 changes: 10 additions & 0 deletions pkg/cloud/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cloud

import "errors"

var (
errEmptyPollerID = errors.New("empty poller ID")
errDatabaseError = errors.New("database error")
errInvalidSweepData = errors.New("invalid sweep data")
errFailedToSendAlerts = errors.New("failed to send alerts")
)
106 changes: 106 additions & 0 deletions pkg/cloud/node_recovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package cloud

import (
"context"
"errors"
"fmt"
"log"
"os"
"time"

"github.com/mfreeman451/serviceradar/pkg/cloud/alerts"
"github.com/mfreeman451/serviceradar/pkg/db"
)

// NodeRecoveryManager handles node recovery state transitions.
type NodeRecoveryManager struct {
db db.Service
alerter alerts.AlertService
getHostname func() string
}

func newNodeRecoveryManager(d db.Service, alerter alerts.AlertService) *NodeRecoveryManager {
return &NodeRecoveryManager{
db: d,
alerter: alerter,
getHostname: func() string {
hostname, err := os.Hostname()
if err != nil {
return statusUnknown
}
return hostname
},
}
}

func (m *NodeRecoveryManager) processRecovery(ctx context.Context, nodeID string, lastSeen time.Time) error {
tx, err := m.db.Begin()
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}

var committed bool
defer func() {
if !committed {
if rbErr := tx.Rollback(); rbErr != nil {
log.Printf("Error rolling back transaction: %v", rbErr)
}
}
}()

status, err := m.db.GetNodeStatus(nodeID)
if err != nil {
return fmt.Errorf("get node status: %w", err)
}

// Early return if the node is already healthy
if status.IsHealthy {
return nil
}

// Update node status
status.IsHealthy = true
status.LastSeen = lastSeen

// Update the database BEFORE trying to send the alert
if err = m.db.UpdateNodeStatus(status); err != nil {
return fmt.Errorf("update node status: %w", err)
}

// Send alert
if err = m.sendRecoveryAlert(ctx, nodeID, lastSeen); err != nil {
// Only treat the cooldown as non-error
if !errors.Is(err, alerts.ErrWebhookCooldown) {
return fmt.Errorf("send recovery alert: %w", err)
}

// Log the cooldown but proceed with the recovery
log.Printf("Recovery alert for node %s rate limited, but node marked as recovered", nodeID)
}

// Commit the transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}

committed = true

return nil
}

// sendRecoveryAlert handles alert creation and sending.
func (m *NodeRecoveryManager) sendRecoveryAlert(ctx context.Context, nodeID string, lastSeen time.Time) error {
alert := &alerts.WebhookAlert{
Level: alerts.Info,
Title: "Node Recovered",
Message: fmt.Sprintf("Node '%s' is back online", nodeID),
NodeID: nodeID,
Timestamp: lastSeen.UTC().Format(time.RFC3339),
Details: map[string]any{
"hostname": m.getHostname(),
"recovery_time": lastSeen.Format(time.RFC3339),
},
}

return m.alerter.Alert(ctx, alert)
}
117 changes: 111 additions & 6 deletions pkg/cloud/node_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cloud

import (
"context"
"errors"
"testing"
"time"

Expand All @@ -11,6 +12,109 @@ import (
"go.uber.org/mock/gomock"
)

func TestNodeRecoveryManager_ProcessRecovery_WithCooldown(t *testing.T) {
tests := []struct {
name string
nodeID string
lastSeen time.Time
getCurrentNode *db.NodeStatus
dbError error
alertError error
expectCommit bool
expectError string
}{
{
name: "successful_recovery_with_cooldown",
nodeID: "test-node",
lastSeen: time.Now(),
getCurrentNode: &db.NodeStatus{
NodeID: "test-node",
IsHealthy: false,
LastSeen: time.Now().Add(-time.Hour),
},
alertError: alerts.ErrWebhookCooldown,
expectCommit: true,
},
{
name: "successful_recovery_no_cooldown",
nodeID: "test-node",
lastSeen: time.Now(),
getCurrentNode: &db.NodeStatus{
NodeID: "test-node",
IsHealthy: false,
LastSeen: time.Now().Add(-time.Hour),
},
expectCommit: true,
},
{
name: "already_healthy",
nodeID: "test-node",
lastSeen: time.Now(),
getCurrentNode: &db.NodeStatus{
NodeID: "test-node",
IsHealthy: true,
LastSeen: time.Now(),
},
},
{
name: "db_error",
nodeID: "test-node",
lastSeen: time.Now(),
dbError: db.ErrDatabaseError,
expectError: "get node status",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockDB := db.NewMockService(ctrl)
mockAlerter := alerts.NewMockAlertService(ctrl)
mockTx := db.NewMockTransaction(ctrl)

// Setup Begin() expectation
mockDB.EXPECT().Begin().Return(mockTx, nil)

// Setup GetNodeStatus expectation
mockDB.EXPECT().GetNodeStatus(tt.nodeID).Return(tt.getCurrentNode, tt.dbError)

if tt.getCurrentNode != nil && !tt.getCurrentNode.IsHealthy {
// Expect node status update
mockDB.EXPECT().UpdateNodeStatus(gomock.Any()).Return(nil)

// Always expect Rollback() due to defer
mockTx.EXPECT().Rollback().Return(nil).AnyTimes()

// Expect alert attempt
mockAlerter.EXPECT().Alert(gomock.Any(), gomock.Any()).Return(tt.alertError)

if tt.expectCommit {
mockTx.EXPECT().Commit().Return(nil)
}
} else {
// For non-recovery cases, expect Rollback()
mockTx.EXPECT().Rollback().Return(nil).AnyTimes()
}

mgr := &NodeRecoveryManager{
db: mockDB,
alerter: mockAlerter,
getHostname: func() string { return "test-host" },
}

err := mgr.processRecovery(context.Background(), tt.nodeID, tt.lastSeen)

if tt.expectError != "" {
assert.ErrorContains(t, err, tt.expectError)
} else if errors.Is(tt.alertError, alerts.ErrWebhookCooldown) {
assert.NoError(t, err)
}
})
}
}

func TestNodeRecoveryManager_ProcessRecovery(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -38,7 +142,6 @@ func TestNodeRecoveryManager_ProcessRecovery(t *testing.T) {
IsHealthy: true,
LastSeen: time.Now(),
},
expectAlert: false,
},
{
name: "db_error",
Expand All @@ -57,21 +160,21 @@ func TestNodeRecoveryManager_ProcessRecovery(t *testing.T) {
mockAlerter := alerts.NewMockAlertService(ctrl)
mockTx := db.NewMockTransaction(ctrl)

// Mock Begin() call
// Setup Begin() expectation
mockDB.EXPECT().Begin().Return(mockTx, nil)

// Mock Rollback() as it's in a defer
// Always expect Rollback() due to defer
mockTx.EXPECT().Rollback().Return(nil).AnyTimes()

// Mock GetNodeStatus
// Setup GetNodeStatus expectation
mockDB.EXPECT().GetNodeStatus(tt.nodeID).Return(tt.currentStatus, tt.dbError)

if tt.currentStatus != nil && !tt.currentStatus.IsHealthy {
// Expect node status update
mockDB.EXPECT().UpdateNodeStatus(gomock.Any()).Return(nil)

if tt.expectAlert {
mockAlerter.EXPECT().Alert(gomock.Any(), gomock.Any()).Return(nil)
// Mock the successful commit
mockTx.EXPECT().Commit().Return(nil)
}
}
Expand All @@ -85,7 +188,7 @@ func TestNodeRecoveryManager_ProcessRecovery(t *testing.T) {
err := mgr.processRecovery(context.Background(), tt.nodeID, time.Now())

if tt.expectedError != "" {
assert.Contains(t, err.Error(), tt.expectedError)
assert.ErrorContains(t, err, tt.expectedError)
} else {
assert.NoError(t, err)
}
Expand All @@ -103,13 +206,15 @@ func TestNodeRecoveryManager_SendRecoveryAlert(t *testing.T) {
getHostname: func() string { return "test-host" },
}

// Verify alert properties
mockAlerter.EXPECT().
Alert(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, alert *alerts.WebhookAlert) error {
assert.Equal(t, alerts.Info, alert.Level)
assert.Equal(t, "Node Recovered", alert.Title)
assert.Equal(t, "test-node", alert.NodeID)
assert.Equal(t, "test-host", alert.Details["hostname"])
assert.Contains(t, alert.Message, "test-node")

return nil
})
Expand Down
Loading

0 comments on commit 02b6b15

Please sign in to comment.