Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ICMP Rule with NodeLatencyMonitor #7011

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4078,7 +4078,7 @@ data:
# L7FlowExporter: false

# Enable NodeLatencyMonitor to monitor the latency between Nodes.
# NodeLatencyMonitor: false
NodeLatencyMonitor: true

# Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to
# remote BGP peers.
Expand Down
3 changes: 3 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func run(o *Options) error {
enableBridgingMode := enableAntreaIPAM && o.config.EnableBridgingMode
l7NetworkPolicyEnabled := features.DefaultFeatureGate.Enabled(features.L7NetworkPolicy)
nodeNetworkPolicyEnabled := features.DefaultFeatureGate.Enabled(features.NodeNetworkPolicy)
klog.Infof("DBUG: featuregates: %v", o.config.FeatureGates[string(features.NodeLatencyMonitor)])
nodeLatencyMonitorEnabled := o.config.FeatureGates[string(features.NodeLatencyMonitor)]
l7FlowExporterEnabled := features.DefaultFeatureGate.Enabled(features.L7FlowExporter)
enableMulticlusterGW := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.EnableGateway
_, multiclusterEncryptionMode := config.GetTrafficEncryptionModeFromStr(o.config.Multicluster.TrafficEncryptionMode)
Expand Down Expand Up @@ -241,6 +243,7 @@ func run(o *Options) error {
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge,
nodeNetworkPolicyEnabled,
nodeLatencyMonitorEnabled,
multicastEnabled,
o.config.SNATFullyRandomPorts,
*o.config.Egress.SNATFullyRandomPorts,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools v2.2.0+incompatible
k8s.io/api v0.31.1
k8s.io/apiextensions-apiserver v0.31.1
k8s.io/apimachinery v0.31.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,7 @@ gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
1 change: 1 addition & 0 deletions hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ IMAGE_NAME="antrea/codegen:kubernetes-1.31.1-build.2"
# to the "safe" list in the Git config when starting the container (required
# because of user mismatch).
if git_status=$(git status --porcelain --untracked=no 2>/dev/null) && [[ -n "${git_status}" ]]; then
echo ${git_status}
echoerr "!!! Dirty tree. Clean up and try again."
exit 1
fi
Expand Down
1 change: 1 addition & 0 deletions multicluster/hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ IMAGE_NAME="antrea/codegen:kubernetes-1.31.1-build.2"
# to the "safe" list in the Git config when starting the container (required
# because of user mismatch).
if git_status=$(git status --porcelain --untracked=no 2>/dev/null) && [[ -n "${git_status}" ]]; then
echo ${git_status}
echoerr "!!! Dirty tree. Clean up and try again."
exit 1
fi
Expand Down
28 changes: 28 additions & 0 deletions pkg/agent/apis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package apis

import (
"net"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/util/printers"
Expand Down Expand Up @@ -72,6 +75,31 @@ func (r AntreaAgentInfoResponse) SortRows() bool {
return true
}

type FqdnCacheResponse struct {
fqdnName string
ipAddress net.IP
expirationTime time.Time
}

func (r FqdnCacheResponse) GetTableHeader() []string {
return []string{"FQDN", "ADDRESS", "EXPIRATION TIME"}
}

func (r FqdnCacheResponse) GetTableRow(maxColumn int) []string {
klog.InfoS("DBUG: types.go GetTableRow() called")
return []string{
r.fqdnName,
r.ipAddress.String(),
r.expirationTime.String(),
}
}

func (r FqdnCacheResponse) SortRows() bool {
return false
}

// maybe some helper funcs needed here to help parse through ^^^^

type FeatureGateResponse struct {
Component string `json:"component,omitempty"`
Name string `json:"name,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"antrea.io/antrea/pkg/agent/apiserver/handlers/bgppolicy"
"antrea.io/antrea/pkg/agent/apiserver/handlers/bgproute"
"antrea.io/antrea/pkg/agent/apiserver/handlers/featuregates"
"antrea.io/antrea/pkg/agent/apiserver/handlers/fqdncache"
"antrea.io/antrea/pkg/agent/apiserver/handlers/memberlist"
"antrea.io/antrea/pkg/agent/apiserver/handlers/multicast"
"antrea.io/antrea/pkg/agent/apiserver/handlers/networkpolicy"
Expand Down Expand Up @@ -104,6 +105,7 @@ func installHandlers(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolic
s.Handler.NonGoRestfulMux.HandleFunc("/bgppolicy", bgppolicy.HandleFunc(bgpq))
s.Handler.NonGoRestfulMux.HandleFunc("/bgppeers", bgppeer.HandleFunc(bgpq))
s.Handler.NonGoRestfulMux.HandleFunc("/bgproutes", bgproute.HandleFunc(bgpq))
s.Handler.NonGoRestfulMux.HandleFunc("/fqdncache", fqdncache.HandleFunc(aq))
}

func installAPIGroup(s *genericapiserver.GenericAPIServer, aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, v4Enabled, v6Enabled bool) error {
Expand Down
37 changes: 37 additions & 0 deletions pkg/agent/apiserver/handlers/fqdncache/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2025 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fqdncache

import (
"encoding/json"
"net/http"

"k8s.io/klog/v2"

agentquerier "antrea.io/antrea/pkg/agent/querier"
)

func HandleFunc(aq agentquerier.AgentQuerier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
dnsEntryCache := aq.GetFqdnCache()
if dnsEntryCache == nil {
return
}
if err := json.NewEncoder(w).Encode(dnsEntryCache); err != nil {
http.Error(w, "Failed to encode response: "+err.Error(), http.StatusInternalServerError)
klog.ErrorS(err, "Failed to encode response")
}
}
}
82 changes: 82 additions & 0 deletions pkg/agent/apiserver/handlers/fqdncache/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2025 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fqdncache

import (
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

queriertest "antrea.io/antrea/pkg/agent/querier/testing"
"antrea.io/antrea/pkg/agent/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestFqdnCacheQuery(t *testing.T) {
tests := []struct {
name string
expectedStatus int
expectedResponse []types.DnsCacheEntry
}{
{
name: "FQDN cache exists",
expectedStatus: http.StatusOK,
expectedResponse: []types.DnsCacheEntry{
{
FqdnName: "google.com",
IpAddress: net.ParseIP("10.0.0.1"),
ExpirationTime: time.Date(2025, 12, 25, 15, 0, 0, 0, time.UTC),
},
},
},
{
name: "FQDN cache does not exist",
expectedStatus: http.StatusOK,
expectedResponse: []types.DnsCacheEntry{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
q := queriertest.NewMockAgentQuerier(ctrl)
q.EXPECT().GetFqdnCache().Return(tt.expectedResponse)
handler := HandleFunc(q)
req, err := http.NewRequest(http.MethodGet, "", nil)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
assert.Equal(t, tt.expectedStatus, recorder.Code)
if tt.expectedStatus == http.StatusOK {
var received []map[string]interface{}
err = json.Unmarshal(recorder.Body.Bytes(), &received)
require.NoError(t, err)
if len(received) > 0 {
parsedTime, err := time.Parse(time.RFC3339, received[0]["expirationTime"].(string))
require.NoError(t, err)
assert.Equal(t, tt.expectedResponse, []types.DnsCacheEntry{{
FqdnName: received[0]["fqdnName"].(string),
IpAddress: net.ParseIP(received[0]["ipAddress"].(string)),
ExpirationTime: parsedTime,
}})
}
}
})
}
}
12 changes: 12 additions & 0 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,18 @@ func NewNetworkPolicyController(antreaClientGetter client.AntreaClientProvider,
return c, nil
}

// TODO add change here for how data is interpreted and sent
func (c *Controller) GetFqdnCache() []types.DnsCacheEntry {
cacheEntryList := []types.DnsCacheEntry{}
for fqdn, dnsMeta := range c.fqdnController.dnsEntryCache {
for _, ipWithExpiration := range dnsMeta.responseIPs {
entry := types.DnsCacheEntry{FqdnName: fqdn, IpAddress: ipWithExpiration.ip, ExpirationTime: ipWithExpiration.expirationTime}
cacheEntryList = append(cacheEntryList, entry)
}
}
return cacheEntryList
}

func (c *Controller) GetNetworkPolicyNum() int {
return c.ruleCache.GetNetworkPolicyNum()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/monitortool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (m *NodeLatencyMonitor) onNodeDelete(obj interface{}) {

// onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor.
func (m *NodeLatencyMonitor) onNodeLatencyMonitorAdd(obj interface{}) {
// TODO maybe add the change here
nlm := obj.(*v1alpha1.NodeLatencyMonitor)
klog.V(4).InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm))

Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"antrea.io/antrea/pkg/agent/memberlist"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/proxy"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
Expand All @@ -47,6 +48,7 @@ type AgentQuerier interface {
GetMemberlistCluster() memberlist.Interface
GetNodeLister() corelisters.NodeLister
GetBGPPolicyInfoQuerier() querier.AgentBGPPolicyInfoQuerier
GetFqdnCache() []types.DnsCacheEntry
}

type agentQuerier struct {
Expand Down Expand Up @@ -97,6 +99,12 @@ func NewAgentQuerier(
}
}

// GetFQDNCache returns dnsEntryCache within fqdnController
func (aq agentQuerier) GetFqdnCache() []types.DnsCacheEntry {
klog.InfoS("DBUG: fqdn agent querier GetFqdnCache() called")
return aq.networkPolicyInfoQuerier.GetFqdnCache()
}

// GetNodeLister returns NodeLister.
func (aq agentQuerier) GetNodeLister() corelisters.NodeLister {
return aq.nodeLister
Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/querier/testing/mock_querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading