Skip to content

Commit

Permalink
checkpoint - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mfreeman451 committed Feb 6, 2025
1 parent 0a6c7a1 commit 04a9d08
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 62 deletions.
53 changes: 46 additions & 7 deletions pkg/cloud/alerts/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ type AlertKey struct {
}

type WebhookAlerter struct {
config WebhookConfig
client *http.Client
LastAlertTimes map[AlertKey]time.Time
mu sync.RWMutex
bufferPool *sync.Pool
config WebhookConfig
client *http.Client
LastAlertTimes map[AlertKey]time.Time
NodeDownStates map[string]bool
ServiceAlertStates map[string]bool
Mu sync.RWMutex
bufferPool *sync.Pool
}

func (w *WebhookConfig) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -104,6 +106,7 @@ func NewWebhookAlerter(config WebhookConfig) *WebhookAlerter {
Timeout: 10 * time.Second,
},
LastAlertTimes: make(map[AlertKey]time.Time),
NodeDownStates: make(map[string]bool),
bufferPool: &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
Expand All @@ -112,6 +115,13 @@ func NewWebhookAlerter(config WebhookConfig) *WebhookAlerter {
}
}

func (w *WebhookAlerter) MarkServiceAsRecovered(nodeID string) {
w.Mu.Lock()
defer w.Mu.Unlock()

w.ServiceAlertStates[nodeID] = false
}

func (w *WebhookAlerter) IsEnabled() bool {
return w.config.Enabled
}
Expand All @@ -137,9 +147,29 @@ func (w *WebhookAlerter) getTemplateFuncs() template.FuncMap {
func (w *WebhookAlerter) Alert(ctx context.Context, alert *WebhookAlert) error {
if !w.IsEnabled() {
log.Printf("Webhook alerter disabled, skipping alert: %s", alert.Title)

return errWebhookDisabled
}

// Only check NodeDownStates for "Node Offline" alerts.
if alert.Title == "Node Offline" {
w.Mu.RLock()
if w.NodeDownStates[alert.NodeID] {
w.Mu.RUnlock()
log.Printf("Skipping duplicate 'Node Offline' alert for node: %s", alert.NodeID)

return nil // Or return a specific error if you want to track this
}

w.Mu.RUnlock()

//If we got here, it is a valid down alert.

Check failure on line 166 in pkg/cloud/alerts/webhook.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
w.Mu.Lock()
w.NodeDownStates[alert.NodeID] = true
w.Mu.Unlock()
}

// Always check cooldown (using the correct AlertKey, with ServiceName).
if err := w.CheckCooldown(alert.NodeID, alert.Title, alert.ServiceName); err != nil {
return err
}
Expand All @@ -156,14 +186,23 @@ func (w *WebhookAlerter) Alert(ctx context.Context, alert *WebhookAlert) error {
return w.sendRequest(ctx, payload)
}

func (w *WebhookAlerter) MarkNodeAsRecovered(nodeID string) {
w.Mu.Lock()
defer w.Mu.Unlock()

w.NodeDownStates[nodeID] = false

log.Printf("Marked Node: %v as recovered in the webhook alerter", nodeID)
}

// CheckCooldown checks if an alert is within its cooldown period.
func (w *WebhookAlerter) CheckCooldown(nodeID, alertTitle, serviceName string) error {
if w.config.Cooldown <= 0 {
return nil
}

w.mu.Lock()
defer w.mu.Unlock()
w.Mu.Lock()
defer w.Mu.Unlock()

key := AlertKey{NodeID: nodeID, Title: alertTitle, ServiceName: serviceName}

Expand Down
107 changes: 52 additions & 55 deletions pkg/cloud/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
)

const (
downtimeValue = "unknown"
shutdownTimeout = 10 * time.Second
oneDay = 24 * time.Hour
oneWeek = 7 * oneDay
Expand Down Expand Up @@ -386,25 +385,19 @@ func (s *Server) checkInitialStates(ctx context.Context) {

for rows.Next() {
var nodeID string

var isHealthy bool

var lastSeen time.Time

if err := rows.Scan(&nodeID, &isHealthy, &lastSeen); err != nil {
log.Printf("Error scanning node row: %v", err)

continue
}

duration := time.Since(lastSeen)
if duration > s.alertThreshold {
log.Printf("Node %s found offline during initial check (last seen: %v ago)",
nodeID, duration.Round(time.Second))

if err := s.markNodeDown(ctx, nodeID, time.Now()); err != nil {
log.Printf("Error marking node down: %v", err)
}
// ONLY LOG. DO NOT MARK DOWN.
}
}
}
Expand Down Expand Up @@ -459,6 +452,8 @@ func (*Server) createNodeStatus(req *proto.PollerStatusRequest, now time.Time) *
}

func (s *Server) processServices(pollerID string, apiStatus *api.NodeStatus, services []*proto.ServiceStatus, now time.Time) {
allServicesAvailable := true

for _, svc := range services {
apiService := api.ServiceStatus{
Name: svc.ServiceName,
Expand All @@ -468,7 +463,7 @@ func (s *Server) processServices(pollerID string, apiStatus *api.NodeStatus, ser
}

if !svc.Available {
apiStatus.IsHealthy = false
allServicesAvailable = false // If ANY service is unavailable, set to false
}

// Process JSON details if available
Expand All @@ -485,6 +480,9 @@ func (s *Server) processServices(pollerID string, apiStatus *api.NodeStatus, ser

apiStatus.Services = append(apiStatus.Services, apiService)
}

// Only set IsHealthy based on ALL services.
apiStatus.IsHealthy = allServicesAvailable
}

func (s *Server) handleService(pollerID string, svc *api.ServiceStatus, now time.Time) error {
Expand Down Expand Up @@ -975,49 +973,55 @@ func (s *Server) checkNodeStates(ctx context.Context) error {

for rows.Next() {
var nodeID string

var lastSeen time.Time

var isHealthy bool

if err := rows.Scan(&nodeID, &lastSeen, &isHealthy); err != nil {
log.Printf("Error scanning node row: %v", err)

continue
}

if err := s.evaluateNodeHealth(ctx, nodeID, lastSeen, isHealthy, threshold); err != nil {
err := s.evaluateNodeHealth(ctx, nodeID, lastSeen, isHealthy, threshold)
if err != nil {
// Only log errors, don't propagate service-related issues
log.Printf("Error evaluating node %s health: %v", nodeID, err)
}
}

return rows.Err()
}

func (s *Server) evaluateNodeHealth(ctx context.Context, nodeID string, lastSeen time.Time, isHealthy bool, threshold time.Time) error {
func (s *Server) evaluateNodeHealth(
ctx context.Context, nodeID string, lastSeen time.Time, isHealthy bool, threshold time.Time) error {
log.Printf("Evaluating node health: id=%s lastSeen=%v isHealthy=%v threshold=%v",
nodeID, lastSeen.Format(time.RFC3339), isHealthy, threshold.Format(time.RFC3339))

// Case 1: Node was healthy but hasn't been seen recently (went down)
if isHealthy && lastSeen.Before(threshold) {
duration := time.Since(lastSeen).Round(time.Second)
log.Printf("Node %s appears to be offline (last seen: %v ago)", nodeID, duration)

return s.handleNodeDown(ctx, nodeID, lastSeen)
}

// Case 2: Node is healthy and reporting within threshold
// Case 2: Node is healthy and reporting within threshold - DO NOTHING
if isHealthy && !lastSeen.Before(threshold) {
return nil
}

// Case 3: Node is unhealthy (service failures) but still reporting
if !isHealthy && !lastSeen.Before(threshold) {
if err := s.checkServiceStatus(ctx, nodeID, lastSeen); err != nil {
log.Printf("Error checking service status: %v", err)
// Case 3: Node is reporting but its status might have changed
if !lastSeen.Before(threshold) {
// Get the current health status
currentHealth, err := s.getNodeHealthState(nodeID)
if err != nil {
log.Printf("Error getting current health state for node %s: %v", nodeID, err)

return fmt.Errorf("failed to get current health state: %w", err)
}

return s.handlePotentialRecovery(ctx, nodeID, lastSeen) // Corrected call
// ONLY handle potential recovery - do not send service alerts here
if !isHealthy && currentHealth {
return s.handlePotentialRecovery(ctx, nodeID, lastSeen)
}
}

return nil
Expand All @@ -1030,6 +1034,7 @@ func (s *Server) checkServiceStatus(ctx context.Context, nodeID string, lastSeen
return fmt.Errorf("failed to get current services: %w", err)
}

// TODO: shouldnt we be getting the previous services from the db?
previousServices, err := s.db.GetServiceHistory(nodeID, "", 1)
if err != nil {
log.Printf("Error getting previous service states: %v", err)
Expand All @@ -1039,7 +1044,7 @@ func (s *Server) checkServiceStatus(ctx context.Context, nodeID string, lastSeen

changedService := s.findChangedService(currentServices, previousServices)

if changedService != "" && s.shouldSendServiceAlert(currentServices) {
if changedService != "" && s.shouldSendServiceAlert(nodeID, currentServices) {
if err := s.sendServiceFailureAlert(ctx, nodeID, lastSeen, currentServices, changedService); err != nil {
// already logged inside sendServiceFailureAlert
return err
Expand Down Expand Up @@ -1070,17 +1075,29 @@ func (*Server) findChangedService(current, previous []db.ServiceStatus) string {
}

// shouldSendServiceAlert determines if a service alert should be sent.
func (*Server) shouldSendServiceAlert(currentServices []db.ServiceStatus) bool {
func (s *Server) shouldSendServiceAlert(nodeID string, currentServices []db.ServiceStatus) bool {
// Only send an alert if we haven't already alerted for this service failure
for _, webhook := range s.webhooks {
if alerter, ok := webhook.(*alerts.WebhookAlerter); ok {
alerter.Mu.RLock()
inAlert := alerter.ServiceAlertStates[nodeID]
alerter.Mu.RUnlock()
if inAlert {
return false
}
}
}

// Check if services are degraded - avoid the large numbers issue
total := len(currentServices)
available := 0

for _, svc := range currentServices {
if svc.Available {
available++
}
}

// Only send if state changed and SOME services are down.
// Only alert if there's actual service degradation
return available < total && available > 0
}

Expand Down Expand Up @@ -1126,14 +1143,12 @@ func (s *Server) sendServiceFailureAlert(

func (s *Server) handlePotentialRecovery(ctx context.Context, nodeID string, lastSeen time.Time) error {
// Get the most up-to-date node status from the database
status, err := s.db.GetNodeStatus(nodeID)
if err != nil {
return fmt.Errorf("failed to get node status: %w", err)
}

// We create a basic apiStatus here because it's required by handleNodeRecovery,
// but we don't need the service details, those are already being used where
// they need to be.
apiStatus := &api.NodeStatus{
NodeID: nodeID,
IsHealthy: status.IsHealthy, // Use the *actual* health status
LastUpdate: lastSeen,
Services: make([]api.ServiceStatus, 0),
}
Expand Down Expand Up @@ -1237,11 +1252,12 @@ func (*Server) updateNodeInTx(tx *sql.Tx, nodeID string, isHealthy bool, timesta
}

func (s *Server) handleNodeRecovery(ctx context.Context, nodeID string, apiStatus *api.NodeStatus, timestamp time.Time) {
lastDownTime := s.getLastDowntime(nodeID)
downtime := downtimeValue

if !lastDownTime.IsZero() {
downtime = timestamp.Sub(lastDownTime).String()
// Reset the "down" state in the alerter *before* sending the alert.
for _, webhook := range s.webhooks {
if alerter, ok := webhook.(*alerts.WebhookAlerter); ok {
alerter.MarkNodeAsRecovered(nodeID)
alerter.MarkServiceAsRecovered(nodeID)
}
}

alert := &alerts.WebhookAlert{
Expand All @@ -1253,9 +1269,8 @@ func (s *Server) handleNodeRecovery(ctx context.Context, nodeID string, apiStatu
ServiceName: "", // Ensure ServiceName is empty for node-level alerts
Details: map[string]any{
"hostname": getHostname(),
"downtime": downtime,
"recovery_time": timestamp.Format(time.RFC3339),
"services": len(apiStatus.Services),
"services": len(apiStatus.Services), // This might be 0, which is fine.
},
}

Expand Down Expand Up @@ -1350,24 +1365,6 @@ func (s *Server) ReportStatus(ctx context.Context, req *proto.PollerStatusReques
return &proto.PollerStatusResponse{Received: true}, nil
}

func (s *Server) getLastDowntime(nodeID string) time.Time {
var downtime time.Time
err := s.db.QueryRow(`
SELECT timestamp
FROM node_history
WHERE node_id = ? AND is_healthy = FALSE
ORDER BY timestamp DESC
LIMIT 1
`, nodeID).Scan(&downtime)

if err != nil {
log.Printf("Error getting last downtime for node %s: %v", nodeID, err)
return time.Time{} // Return zero time if error
}

return downtime
}

func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
Expand Down

0 comments on commit 04a9d08

Please sign in to comment.