Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mfreeman451 committed Feb 6, 2025
1 parent 04a9d08 commit b8dc49f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 245 deletions.
2 changes: 1 addition & 1 deletion pkg/cloud/alerts/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (w *WebhookAlerter) Alert(ctx context.Context, alert *WebhookAlert) error {

w.Mu.RUnlock()

//If we got here, it is a valid down alert.
// If we got here, it is a valid down alert.
w.Mu.Lock()
w.NodeDownStates[alert.NodeID] = true
w.Mu.Unlock()
Expand Down
252 changes: 8 additions & 244 deletions pkg/cloud/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s *Server) monitorNodes(ctx context.Context) {
time.Sleep(nodeDiscoveryTimeout)

// Initial checks
s.checkInitialStates(ctx)
s.checkInitialStates()

time.Sleep(nodeNeverReportedTimeout)
s.checkNeverReportedPollers(ctx)
Expand Down Expand Up @@ -349,7 +349,7 @@ func (s *Server) SetAPIServer(apiServer api.Service) {
})
}

func (s *Server) checkInitialStates(ctx context.Context) {
func (s *Server) checkInitialStates() {
log.Printf("Checking initial states of all nodes")

likeConditions := make([]string, 0, len(s.pollerPatterns))
Expand Down Expand Up @@ -385,7 +385,9 @@ func (s *Server) checkInitialStates(ctx context.Context) {

for rows.Next() {
var nodeID string

var isHealthy bool

var lastSeen time.Time

if err := rows.Scan(&nodeID, &isHealthy, &lastSeen); err != nil {
Expand All @@ -397,7 +399,6 @@ func (s *Server) checkInitialStates(ctx context.Context) {
if duration > s.alertThreshold {
log.Printf("Node %s found offline during initial check (last seen: %v ago)",
nodeID, duration.Round(time.Second))
// ONLY LOG. DO NOT MARK DOWN.
}
}
}
Expand Down Expand Up @@ -578,128 +579,6 @@ func (s *Server) updateNodeState(ctx context.Context, pollerID string, apiStatus
return nil
}

// sendNodeDownAlert sends an alert when a node goes down.
func (s *Server) sendNodeDownAlert(ctx context.Context, nodeID string, lastSeen time.Time) {
alert := &alerts.WebhookAlert{
Level: alerts.Error,
Title: "Node Offline",
Message: fmt.Sprintf("Node '%s' is offline", nodeID),
NodeID: nodeID,
Timestamp: lastSeen.UTC().Format(time.RFC3339),
Details: map[string]any{
"hostname": getHostname(),
"duration": time.Since(lastSeen).String(),
},
}

err := s.sendAlert(ctx, alert)
if err != nil {
log.Printf("Error sending alert: %v", err)
return
}
}

// updateAPINodeStatus updates the node status in the API server.
func (s *Server) updateAPINodeStatus(nodeID string, isHealthy bool, timestamp time.Time) {
if s.apiServer != nil {
status := &api.NodeStatus{
NodeID: nodeID,
IsHealthy: isHealthy,
LastUpdate: timestamp,
}
s.apiServer.UpdateNodeStatus(nodeID, status)
}
}

// markNodeDown handles marking a node as down and sending alerts.
func (s *Server) markNodeDown(ctx context.Context, nodeID string, lastSeen time.Time) error {
if err := s.updateNodeDownStatus(nodeID, lastSeen); err != nil {
return err
}

s.sendNodeDownAlert(ctx, nodeID, lastSeen)
s.updateAPINodeStatus(nodeID, false, lastSeen)

return nil
}

func (s *Server) updateNodeDownStatus(nodeID string, lastSeen time.Time) error {
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func(tx db.Transaction) {
err = tx.Rollback()
if err != nil {
log.Printf("Error rolling back transaction: %v", err)
}
}(tx)

sqlTx, err := db.ToTx(tx)
if err != nil {
return fmt.Errorf("invalid transaction: %w", err)
}

if err := s.performNodeUpdate(sqlTx, nodeID, lastSeen); err != nil {
return err
}

return tx.Commit()
}

// checkNodeExists verifies if a node exists in the database.
func (*Server) checkNodeExists(tx *sql.Tx, nodeID string) (bool, error) {
var exists bool

err := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM nodes WHERE node_id = ?)", nodeID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("failed to check node existence: %w", err)
}

return exists, nil
}

// insertNewNode adds a new node to the database.
func (*Server) insertNewNode(tx *sql.Tx, nodeID string, lastSeen time.Time) error {
_, err := tx.Exec(`
INSERT INTO nodes (node_id, last_seen, is_healthy)
VALUES (?, ?, FALSE)`,
nodeID, lastSeen)
if err != nil {
return fmt.Errorf("failed to insert new node: %w", err)
}

return nil
}

// updateExistingNode updates an existing node's status.
func (*Server) updateExistingNode(tx *sql.Tx, nodeID string, lastSeen time.Time) error {
_, err := tx.Exec(`
UPDATE nodes
SET is_healthy = FALSE,
last_seen = ?
WHERE node_id = ?`,
lastSeen, nodeID)
if err != nil {
return fmt.Errorf("failed to update existing node: %w", err)
}

return nil
}

func (s *Server) performNodeUpdate(tx *sql.Tx, nodeID string, lastSeen time.Time) error {
exists, err := s.checkNodeExists(tx, nodeID)
if err != nil {
return err
}

if !exists {
return s.insertNewNode(tx, nodeID, lastSeen)
}

return s.updateExistingNode(tx, nodeID, lastSeen)
}

// periodicCleanup runs regular maintenance tasks on the database.
func (s *Server) periodicCleanup(_ context.Context) {
ticker := time.NewTicker(1 * time.Hour)
Expand Down Expand Up @@ -973,11 +852,14 @@ func (s *Server) checkNodeStates(ctx context.Context) error {

for rows.Next() {
var nodeID string

var lastSeen time.Time

var isHealthy bool

if err := rows.Scan(&nodeID, &lastSeen, &isHealthy); err != nil {
log.Printf("Error scanning node row: %v", err)

continue
}

Expand All @@ -1000,6 +882,7 @@ func (s *Server) evaluateNodeHealth(
if isHealthy && lastSeen.Before(threshold) {
duration := time.Since(lastSeen).Round(time.Second)
log.Printf("Node %s appears to be offline (last seen: %v ago)", nodeID, duration)

return s.handleNodeDown(ctx, nodeID, lastSeen)
}

Expand Down Expand Up @@ -1027,126 +910,7 @@ func (s *Server) evaluateNodeHealth(
return nil
}

// checkServiceStatus handles service status alerts with proper cooldown.
func (s *Server) checkServiceStatus(ctx context.Context, nodeID string, lastSeen time.Time) error {
currentServices, err := s.db.GetNodeServices(nodeID)
if err != nil {
return fmt.Errorf("failed to get current services: %w", err)
}

// TODO: shouldnt we be getting the previous services from the db?
previousServices, err := s.db.GetServiceHistory(nodeID, "", 1)
if err != nil {
log.Printf("Error getting previous service states: %v", err)
// treat an error getting history as if there's no history.
previousServices = []db.ServiceStatus{}
}

changedService := s.findChangedService(currentServices, previousServices)

if changedService != "" && s.shouldSendServiceAlert(nodeID, currentServices) {
if err := s.sendServiceFailureAlert(ctx, nodeID, lastSeen, currentServices, changedService); err != nil {
// already logged inside sendServiceFailureAlert
return err
}
}

return nil
}

// findChangedService finds the first service that changed state from available to unavailable.
func (*Server) findChangedService(current, previous []db.ServiceStatus) string {
previousStates := make(map[string]bool)
for _, svc := range previous {
previousStates[svc.ServiceName] = svc.Available
}

for _, svc := range current {
if prevAvailable, ok := previousStates[svc.ServiceName]; ok {
if prevAvailable && !svc.Available { // Changed from available to unavailable
return svc.ServiceName
}
} else if !svc.Available { // New and unavailable service
return svc.ServiceName
}
}

return "" // No changed service found
}

// shouldSendServiceAlert determines if a service alert should be sent.
func (s *Server) shouldSendServiceAlert(nodeID string, currentServices []db.ServiceStatus) bool {
// Only send an alert if we haven't already alerted for this service failure
for _, webhook := range s.webhooks {
if alerter, ok := webhook.(*alerts.WebhookAlerter); ok {
alerter.Mu.RLock()
inAlert := alerter.ServiceAlertStates[nodeID]
alerter.Mu.RUnlock()
if inAlert {
return false
}
}
}

// Check if services are degraded - avoid the large numbers issue
total := len(currentServices)
available := 0
for _, svc := range currentServices {
if svc.Available {
available++
}
}

// Only alert if there's actual service degradation
return available < total && available > 0
}

// sendServiceFailureAlert constructs and sends the service failure alert.
func (s *Server) sendServiceFailureAlert(
ctx context.Context,
nodeID string,
lastSeen time.Time,
currentServices []db.ServiceStatus,
changedServiceName string) error {
total := len(currentServices)
available := 0

for _, svc := range currentServices {
if svc.Available {
available++
}
}

alert := &alerts.WebhookAlert{
Level: alerts.Warning,
Title: "Service Failure",
Message: fmt.Sprintf("Node '%s' has %d/%d services available", nodeID, available, total),
NodeID: nodeID,
ServiceName: changedServiceName, // Set the service name here
Timestamp: time.Now().UTC().Format(time.RFC3339),
Details: map[string]any{
"hostname": getHostname(),
"available_services": available,
"total_services": total,
"last_seen": lastSeen.Format(time.RFC3339),
},
}

if err := s.sendAlert(ctx, alert); err != nil && !errors.Is(err, alerts.ErrWebhookCooldown) {
log.Printf("Failed to send service failure alert: %v", err)

return fmt.Errorf("failed to send service failure alert: %w", err) // now we return this
}

return nil
}

func (s *Server) handlePotentialRecovery(ctx context.Context, nodeID string, lastSeen time.Time) error {
// Get the most up-to-date node status from the database

// We create a basic apiStatus here because it's required by handleNodeRecovery,
// but we don't need the service details, those are already being used where
// they need to be.
apiStatus := &api.NodeStatus{
NodeID: nodeID,
LastUpdate: lastSeen,
Expand Down

0 comments on commit b8dc49f

Please sign in to comment.