From b8dc49f9a0a44998ec0a5f1cd89a4dee52f0f2ef Mon Sep 17 00:00:00 2001 From: Michael Freeman Date: Thu, 6 Feb 2025 00:38:22 -0600 Subject: [PATCH] cleanup --- pkg/cloud/alerts/webhook.go | 2 +- pkg/cloud/server.go | 252 ++---------------------------------- 2 files changed, 9 insertions(+), 245 deletions(-) diff --git a/pkg/cloud/alerts/webhook.go b/pkg/cloud/alerts/webhook.go index 33e39d9..2b2a732 100644 --- a/pkg/cloud/alerts/webhook.go +++ b/pkg/cloud/alerts/webhook.go @@ -163,7 +163,7 @@ func (w *WebhookAlerter) Alert(ctx context.Context, alert *WebhookAlert) error { w.Mu.RUnlock() - //If we got here, it is a valid down alert. + // If we got here, it is a valid down alert. w.Mu.Lock() w.NodeDownStates[alert.NodeID] = true w.Mu.Unlock() diff --git a/pkg/cloud/server.go b/pkg/cloud/server.go index 9dc673e..ec9f69b 100644 --- a/pkg/cloud/server.go +++ b/pkg/cloud/server.go @@ -141,7 +141,7 @@ func (s *Server) monitorNodes(ctx context.Context) { time.Sleep(nodeDiscoveryTimeout) // Initial checks - s.checkInitialStates(ctx) + s.checkInitialStates() time.Sleep(nodeNeverReportedTimeout) s.checkNeverReportedPollers(ctx) @@ -349,7 +349,7 @@ func (s *Server) SetAPIServer(apiServer api.Service) { }) } -func (s *Server) checkInitialStates(ctx context.Context) { +func (s *Server) checkInitialStates() { log.Printf("Checking initial states of all nodes") likeConditions := make([]string, 0, len(s.pollerPatterns)) @@ -385,7 +385,9 @@ func (s *Server) checkInitialStates(ctx context.Context) { for rows.Next() { var nodeID string + var isHealthy bool + var lastSeen time.Time if err := rows.Scan(&nodeID, &isHealthy, &lastSeen); err != nil { @@ -397,7 +399,6 @@ func (s *Server) checkInitialStates(ctx context.Context) { if duration > s.alertThreshold { log.Printf("Node %s found offline during initial check (last seen: %v ago)", nodeID, duration.Round(time.Second)) - // ONLY LOG. DO NOT MARK DOWN. } } } @@ -578,128 +579,6 @@ func (s *Server) updateNodeState(ctx context.Context, pollerID string, apiStatus return nil } -// sendNodeDownAlert sends an alert when a node goes down. -func (s *Server) sendNodeDownAlert(ctx context.Context, nodeID string, lastSeen time.Time) { - alert := &alerts.WebhookAlert{ - Level: alerts.Error, - Title: "Node Offline", - Message: fmt.Sprintf("Node '%s' is offline", nodeID), - NodeID: nodeID, - Timestamp: lastSeen.UTC().Format(time.RFC3339), - Details: map[string]any{ - "hostname": getHostname(), - "duration": time.Since(lastSeen).String(), - }, - } - - err := s.sendAlert(ctx, alert) - if err != nil { - log.Printf("Error sending alert: %v", err) - return - } -} - -// updateAPINodeStatus updates the node status in the API server. -func (s *Server) updateAPINodeStatus(nodeID string, isHealthy bool, timestamp time.Time) { - if s.apiServer != nil { - status := &api.NodeStatus{ - NodeID: nodeID, - IsHealthy: isHealthy, - LastUpdate: timestamp, - } - s.apiServer.UpdateNodeStatus(nodeID, status) - } -} - -// markNodeDown handles marking a node as down and sending alerts. -func (s *Server) markNodeDown(ctx context.Context, nodeID string, lastSeen time.Time) error { - if err := s.updateNodeDownStatus(nodeID, lastSeen); err != nil { - return err - } - - s.sendNodeDownAlert(ctx, nodeID, lastSeen) - s.updateAPINodeStatus(nodeID, false, lastSeen) - - return nil -} - -func (s *Server) updateNodeDownStatus(nodeID string, lastSeen time.Time) error { - tx, err := s.db.Begin() - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func(tx db.Transaction) { - err = tx.Rollback() - if err != nil { - log.Printf("Error rolling back transaction: %v", err) - } - }(tx) - - sqlTx, err := db.ToTx(tx) - if err != nil { - return fmt.Errorf("invalid transaction: %w", err) - } - - if err := s.performNodeUpdate(sqlTx, nodeID, lastSeen); err != nil { - return err - } - - return tx.Commit() -} - -// checkNodeExists verifies if a node exists in the database. -func (*Server) checkNodeExists(tx *sql.Tx, nodeID string) (bool, error) { - var exists bool - - err := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM nodes WHERE node_id = ?)", nodeID).Scan(&exists) - if err != nil { - return false, fmt.Errorf("failed to check node existence: %w", err) - } - - return exists, nil -} - -// insertNewNode adds a new node to the database. -func (*Server) insertNewNode(tx *sql.Tx, nodeID string, lastSeen time.Time) error { - _, err := tx.Exec(` - INSERT INTO nodes (node_id, last_seen, is_healthy) - VALUES (?, ?, FALSE)`, - nodeID, lastSeen) - if err != nil { - return fmt.Errorf("failed to insert new node: %w", err) - } - - return nil -} - -// updateExistingNode updates an existing node's status. -func (*Server) updateExistingNode(tx *sql.Tx, nodeID string, lastSeen time.Time) error { - _, err := tx.Exec(` - UPDATE nodes - SET is_healthy = FALSE, - last_seen = ? - WHERE node_id = ?`, - lastSeen, nodeID) - if err != nil { - return fmt.Errorf("failed to update existing node: %w", err) - } - - return nil -} - -func (s *Server) performNodeUpdate(tx *sql.Tx, nodeID string, lastSeen time.Time) error { - exists, err := s.checkNodeExists(tx, nodeID) - if err != nil { - return err - } - - if !exists { - return s.insertNewNode(tx, nodeID, lastSeen) - } - - return s.updateExistingNode(tx, nodeID, lastSeen) -} - // periodicCleanup runs regular maintenance tasks on the database. func (s *Server) periodicCleanup(_ context.Context) { ticker := time.NewTicker(1 * time.Hour) @@ -973,11 +852,14 @@ func (s *Server) checkNodeStates(ctx context.Context) error { for rows.Next() { var nodeID string + var lastSeen time.Time + var isHealthy bool if err := rows.Scan(&nodeID, &lastSeen, &isHealthy); err != nil { log.Printf("Error scanning node row: %v", err) + continue } @@ -1000,6 +882,7 @@ func (s *Server) evaluateNodeHealth( if isHealthy && lastSeen.Before(threshold) { duration := time.Since(lastSeen).Round(time.Second) log.Printf("Node %s appears to be offline (last seen: %v ago)", nodeID, duration) + return s.handleNodeDown(ctx, nodeID, lastSeen) } @@ -1027,126 +910,7 @@ func (s *Server) evaluateNodeHealth( return nil } -// checkServiceStatus handles service status alerts with proper cooldown. -func (s *Server) checkServiceStatus(ctx context.Context, nodeID string, lastSeen time.Time) error { - currentServices, err := s.db.GetNodeServices(nodeID) - if err != nil { - return fmt.Errorf("failed to get current services: %w", err) - } - - // TODO: shouldnt we be getting the previous services from the db? - previousServices, err := s.db.GetServiceHistory(nodeID, "", 1) - if err != nil { - log.Printf("Error getting previous service states: %v", err) - // treat an error getting history as if there's no history. - previousServices = []db.ServiceStatus{} - } - - changedService := s.findChangedService(currentServices, previousServices) - - if changedService != "" && s.shouldSendServiceAlert(nodeID, currentServices) { - if err := s.sendServiceFailureAlert(ctx, nodeID, lastSeen, currentServices, changedService); err != nil { - // already logged inside sendServiceFailureAlert - return err - } - } - - return nil -} - -// findChangedService finds the first service that changed state from available to unavailable. -func (*Server) findChangedService(current, previous []db.ServiceStatus) string { - previousStates := make(map[string]bool) - for _, svc := range previous { - previousStates[svc.ServiceName] = svc.Available - } - - for _, svc := range current { - if prevAvailable, ok := previousStates[svc.ServiceName]; ok { - if prevAvailable && !svc.Available { // Changed from available to unavailable - return svc.ServiceName - } - } else if !svc.Available { // New and unavailable service - return svc.ServiceName - } - } - - return "" // No changed service found -} - -// shouldSendServiceAlert determines if a service alert should be sent. -func (s *Server) shouldSendServiceAlert(nodeID string, currentServices []db.ServiceStatus) bool { - // Only send an alert if we haven't already alerted for this service failure - for _, webhook := range s.webhooks { - if alerter, ok := webhook.(*alerts.WebhookAlerter); ok { - alerter.Mu.RLock() - inAlert := alerter.ServiceAlertStates[nodeID] - alerter.Mu.RUnlock() - if inAlert { - return false - } - } - } - - // Check if services are degraded - avoid the large numbers issue - total := len(currentServices) - available := 0 - for _, svc := range currentServices { - if svc.Available { - available++ - } - } - - // Only alert if there's actual service degradation - return available < total && available > 0 -} - -// sendServiceFailureAlert constructs and sends the service failure alert. -func (s *Server) sendServiceFailureAlert( - ctx context.Context, - nodeID string, - lastSeen time.Time, - currentServices []db.ServiceStatus, - changedServiceName string) error { - total := len(currentServices) - available := 0 - - for _, svc := range currentServices { - if svc.Available { - available++ - } - } - - alert := &alerts.WebhookAlert{ - Level: alerts.Warning, - Title: "Service Failure", - Message: fmt.Sprintf("Node '%s' has %d/%d services available", nodeID, available, total), - NodeID: nodeID, - ServiceName: changedServiceName, // Set the service name here - Timestamp: time.Now().UTC().Format(time.RFC3339), - Details: map[string]any{ - "hostname": getHostname(), - "available_services": available, - "total_services": total, - "last_seen": lastSeen.Format(time.RFC3339), - }, - } - - if err := s.sendAlert(ctx, alert); err != nil && !errors.Is(err, alerts.ErrWebhookCooldown) { - log.Printf("Failed to send service failure alert: %v", err) - - return fmt.Errorf("failed to send service failure alert: %w", err) // now we return this - } - - return nil -} - func (s *Server) handlePotentialRecovery(ctx context.Context, nodeID string, lastSeen time.Time) error { - // Get the most up-to-date node status from the database - - // We create a basic apiStatus here because it's required by handleNodeRecovery, - // but we don't need the service details, those are already being used where - // they need to be. apiStatus := &api.NodeStatus{ NodeID: nodeID, LastUpdate: lastSeen,