Skip to content

Commit

Permalink
Fix Warning & Operation metrics (#14323)
Browse files Browse the repository at this point in the history
`lxd_operations_total` and `lxd_warnings_total` are currently being
taken clusterwide, instead of returning just the entities related to the
Node responding the metrics request. This goes against the overall
design of the metrics, that are supposed to be per node and queried on
each node on a cluster.
To fix this, this PR filters the queries for those entities based on the
Node.
  • Loading branch information
tomponline authored Nov 21, 2024
2 parents ec03f4c + c567720 commit f9be34e
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 5 deletions.
42 changes: 37 additions & 5 deletions lxd/api_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/canonical/lxd/lxd/auth"
"github.com/canonical/lxd/lxd/db"
dbCluster "github.com/canonical/lxd/lxd/db/cluster"
"github.com/canonical/lxd/lxd/db/warningtype"
"github.com/canonical/lxd/lxd/instance"
instanceDrivers "github.com/canonical/lxd/lxd/instance/drivers"
"github.com/canonical/lxd/lxd/instance/instancetype"
Expand Down Expand Up @@ -129,7 +130,7 @@ func metricsGet(d *Daemon, r *http.Request) response.Response {
}

// Register internal metrics.
intMetrics = internalMetrics(ctx, s.StartTime, tx)
intMetrics = internalMetrics(ctx, s, tx)
return nil
})
if err != nil {
Expand Down Expand Up @@ -378,18 +379,49 @@ func getFilteredMetrics(s *state.State, r *http.Request, compress bool, metricSe
return response.SyncResponsePlain(true, compress, metricSet.String())
}

func internalMetrics(ctx context.Context, daemonStartTime time.Time, tx *db.ClusterTx) *metrics.MetricSet {
// clusterMemberWarnings returns the list of unresolved and unacknowledged warnings related to this cluster member.
// If this member is the leader, also include nodeless warnings.
// This way we include them while avoiding counting them redundantly across cluster members.
func clusterMemberWarnings(ctx context.Context, s *state.State, tx *db.ClusterTx) ([]dbCluster.Warning, error) {
var filters []dbCluster.WarningFilter

leaderInfo, err := s.LeaderInfo()
if err != nil {
return nil, err
}

// Use local variable to get pointer.
emptyNode := ""

for status := range warningtype.Statuses {
// Do not include resolved warnings that are resolved but not yet pruned neither those that were acknowledged.
if status != warningtype.StatusResolved && status != warningtype.StatusAcknowledged {
filters = append(filters, dbCluster.WarningFilter{Node: &s.ServerName, Status: &status})
if leaderInfo.Leader {
// Count the nodeless warnings as belonging to the leader node.
filters = append(filters, dbCluster.WarningFilter{Node: &emptyNode, Status: &status})
}
}
}

return dbCluster.GetWarnings(ctx, tx.Tx(), filters...)
}

func internalMetrics(ctx context.Context, s *state.State, tx *db.ClusterTx) *metrics.MetricSet {
out := metrics.NewMetricSet(nil)

warnings, err := dbCluster.GetWarnings(ctx, tx.Tx())
warnings, err := clusterMemberWarnings(ctx, s, tx)

if err != nil {
logger.Warn("Failed to get warnings", logger.Ctx{"err": err})
} else {
// Total number of warnings
out.AddSamples(metrics.WarningsTotal, metrics.Sample{Value: float64(len(warnings))})
}

operations, err := dbCluster.GetOperations(ctx, tx.Tx())
// Create local variable to get a pointer.
nodeID := tx.GetNodeID()
operations, err := dbCluster.GetOperations(ctx, tx.Tx(), dbCluster.OperationFilter{NodeID: &nodeID})
if err != nil {
logger.Warn("Failed to get operations", logger.Ctx{"err": err})
} else {
Expand Down Expand Up @@ -419,7 +451,7 @@ func internalMetrics(ctx context.Context, daemonStartTime time.Time, tx *db.Clus
}

// Daemon uptime
out.AddSamples(metrics.UptimeSeconds, metrics.Sample{Value: time.Since(daemonStartTime).Seconds()})
out.AddSamples(metrics.UptimeSeconds, metrics.Sample{Value: time.Since(s.StartTime).Seconds()})

// Number of goroutines
out.AddSamples(metrics.GoGoroutines, metrics.Sample{Value: float64(runtime.NumGoroutine())})
Expand Down
1 change: 1 addition & 0 deletions lxd/db/cluster/warnings.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
//go:generate mapper stmt -e warning objects-by-UUID
//go:generate mapper stmt -e warning objects-by-Project
//go:generate mapper stmt -e warning objects-by-Status
//go:generate mapper stmt -e warning objects-by-Node-and-Status
//go:generate mapper stmt -e warning objects-by-Node-and-TypeCode
//go:generate mapper stmt -e warning objects-by-Node-and-TypeCode-and-Project
//go:generate mapper stmt -e warning objects-by-Node-and-TypeCode-and-Project-and-EntityType-and-EntityID
Expand Down
33 changes: 33 additions & 0 deletions lxd/db/cluster/warnings.mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ SELECT warnings.id, coalesce(nodes.name, '') AS node, coalesce(projects.name, ''
ORDER BY warnings.uuid
`)

var warningObjectsByNodeAndStatus = RegisterStmt(`
SELECT warnings.id, coalesce(nodes.name, '') AS node, coalesce(projects.name, '') AS project, coalesce(warnings.entity_type_code, -1), coalesce(warnings.entity_id, -1), warnings.uuid, warnings.type_code, warnings.status, warnings.first_seen_date, warnings.last_seen_date, warnings.updated_date, warnings.last_message, warnings.count
FROM warnings
LEFT JOIN nodes ON warnings.node_id = nodes.id
LEFT JOIN projects ON warnings.project_id = projects.id
WHERE ( coalesce(node, '') = ? AND warnings.status = ? )
ORDER BY warnings.uuid
`)

var warningObjectsByNodeAndTypeCode = RegisterStmt(`
SELECT warnings.id, coalesce(nodes.name, '') AS node, coalesce(projects.name, '') AS project, coalesce(warnings.entity_type_code, -1), coalesce(warnings.entity_id, -1), warnings.uuid, warnings.type_code, warnings.status, warnings.first_seen_date, warnings.last_seen_date, warnings.updated_date, warnings.last_message, warnings.count
FROM warnings
Expand Down Expand Up @@ -238,6 +247,30 @@ func GetWarnings(ctx context.Context, tx *sql.Tx, filters ...WarningFilter) ([]W
continue
}

_, where, _ := strings.Cut(parts[0], "WHERE")
queryParts[0] += "OR" + where
} else if filter.Node != nil && filter.Status != nil && filter.ID == nil && filter.UUID == nil && filter.Project == nil && filter.TypeCode == nil && filter.EntityType == nil && filter.EntityID == nil {
args = append(args, []any{filter.Node, filter.Status}...)
if len(filters) == 1 {
sqlStmt, err = Stmt(tx, warningObjectsByNodeAndStatus)
if err != nil {
return nil, fmt.Errorf("Failed to get \"warningObjectsByNodeAndStatus\" prepared statement: %w", err)
}

break
}

query, err := StmtString(warningObjectsByNodeAndStatus)
if err != nil {
return nil, fmt.Errorf("Failed to get \"warningObjects\" prepared statement: %w", err)
}

parts := strings.SplitN(query, "ORDER BY", 2)
if i == 0 {
copy(queryParts[:], parts)
continue
}

_, where, _ := strings.Cut(parts[0], "WHERE")
queryParts[0] += "OR" + where
} else if filter.UUID != nil && filter.ID == nil && filter.Project == nil && filter.Node == nil && filter.TypeCode == nil && filter.EntityType == nil && filter.EntityID == nil && filter.Status == nil {
Expand Down
1 change: 1 addition & 0 deletions test/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ if [ "${1:-"all"}" != "standalone" ]; then
run_test test_clustering_join_api "clustering join api"
run_test test_clustering_shutdown_nodes "clustering shutdown"
run_test test_clustering_projects "clustering projects"
run_test test_clustering_metrics "clustering metrics"
run_test test_clustering_update_cert "clustering update cert"
run_test test_clustering_update_cert_reversion "clustering update cert reversion"
run_test test_clustering_update_cert_token "clustering update cert token"
Expand Down
80 changes: 80 additions & 0 deletions test/suites/clustering.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1963,6 +1963,86 @@ test_clustering_projects() {
kill_lxd "${LXD_TWO_DIR}"
}

test_clustering_metrics() {
local LXD_DIR

setup_clustering_bridge
prefix="lxd$$"
bridge="${prefix}"

setup_clustering_netns 1
LXD_ONE_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
chmod +x "${LXD_ONE_DIR}"
ns1="${prefix}1"
spawn_lxd_and_bootstrap_cluster "${ns1}" "${bridge}" "${LXD_ONE_DIR}"

# Add a newline at the end of each line. YAML as weird rules..
cert=$(sed ':a;N;$!ba;s/\n/\n\n/g' "${LXD_ONE_DIR}/cluster.crt")

# Spawn a second node
setup_clustering_netns 2
LXD_TWO_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
chmod +x "${LXD_TWO_DIR}"
ns2="${prefix}2"
spawn_lxd_and_join_cluster "${ns2}" "${bridge}" "${cert}" 2 1 "${LXD_TWO_DIR}" "${LXD_ONE_DIR}"

# Create one running container in each node and a stopped one on the leader.
LXD_DIR="${LXD_ONE_DIR}" deps/import-busybox --project default --alias testimage
LXD_DIR="${LXD_ONE_DIR}" lxc launch --target node1 testimage c1
LXD_DIR="${LXD_ONE_DIR}" lxc init --target node1 testimage stopped
LXD_DIR="${LXD_ONE_DIR}" lxc launch --target node2 testimage c2

# Check that scraping metrics on each node only includes started instances on that node.
LXD_DIR="${LXD_ONE_DIR}" lxc query "/1.0/metrics" | grep 'name="c1"'
! LXD_DIR="${LXD_ONE_DIR}" lxc query "/1.0/metrics" | grep 'name="stopped"' || false
! LXD_DIR="${LXD_ONE_DIR}" lxc query "/1.0/metrics" | grep 'name="c2"' || false
! LXD_DIR="${LXD_TWO_DIR}" lxc query "/1.0/metrics" | grep 'name="c1"' || false
LXD_DIR="${LXD_TWO_DIR}" lxc query "/1.0/metrics" | grep 'name="c2"'

# Stopped container is counted on lxd_instances.
LXD_DIR="${LXD_ONE_DIR}" lxc query /1.0/metrics | grep -xF 'lxd_instances{project="default",type="container"} 2'
LXD_DIR="${LXD_TWO_DIR}" lxc query /1.0/metrics | grep -xF 'lxd_instances{project="default",type="container"} 1'

# Remove previously existing warnings so they don't interfere with tests.
LXD_DIR="${LXD_ONE_DIR}" lxc warning delete --all

# Populate database with dummy warnings and check that each node only counts their own warnings.
LXD_DIR="${LXD_ONE_DIR}" lxc query --wait -X POST -d '{\"location\": \"node1\", \"type_code\": 0, \"message\": \"node1 is in a bad mood\"}' /internal/testing/warnings
LXD_DIR="${LXD_ONE_DIR}" lxc query --wait -X POST -d '{\"location\": \"node1\", \"type_code\": 1, \"message\": \"node1 is bored\"}' /internal/testing/warnings
LXD_DIR="${LXD_ONE_DIR}" lxc query --wait -X POST -d '{\"location\": \"node2\", \"type_code\": 0, \"message\": \"node2 is too cool for this\"}' /internal/testing/warnings

LXD_DIR="${LXD_ONE_DIR}" lxc query "/1.0/metrics" | grep -xF "lxd_warnings_total 2"
LXD_DIR="${LXD_TWO_DIR}" lxc query "/1.0/metrics" | grep -xF "lxd_warnings_total 1"

# Add a nodeless warning and check if count incremented only on the leader node.
LXD_DIR="${LXD_ONE_DIR}" lxc query --wait -X POST -d '{\"type_code\": 0, \"message\": \"nodeless warning\"}' /internal/testing/warnings

LXD_DIR="${LXD_ONE_DIR}" lxc query "/1.0/metrics" | grep -xF "lxd_warnings_total 3"
LXD_DIR="${LXD_TWO_DIR}" lxc query "/1.0/metrics" | grep -xF "lxd_warnings_total 1"

# Acknowledge/resolve a warning and check if the count decremented on the node relative to the resolved warning.
uuid=$(LXD_DIR="${LXD_ONE_DIR}" lxc warning list --format json | jq -r '.[] | select(.last_message=="node1 is bored") | .uuid')
LXD_DIR="${LXD_ONE_DIR}" lxc warning ack "${uuid}"

LXD_DIR="${LXD_ONE_DIR}" lxc query "/1.0/metrics" | grep -xF "lxd_warnings_total 2"
LXD_DIR="${LXD_TWO_DIR}" lxc query "/1.0/metrics" | grep -xF "lxd_warnings_total 1"

LXD_DIR="${LXD_ONE_DIR}" lxc delete -f c1 stopped c2
LXD_DIR="${LXD_ONE_DIR}" lxc image delete testimage

LXD_DIR="${LXD_TWO_DIR}" lxd shutdown
LXD_DIR="${LXD_ONE_DIR}" lxd shutdown
sleep 0.5
rm -f "${LXD_TWO_DIR}/unix.socket"
rm -f "${LXD_ONE_DIR}/unix.socket"

teardown_clustering_netns
teardown_clustering_bridge

kill_lxd "${LXD_ONE_DIR}"
kill_lxd "${LXD_TWO_DIR}"
}

test_clustering_address() {
local LXD_DIR

Expand Down

0 comments on commit f9be34e

Please sign in to comment.