Skip to content

Commit b098fcc

Browse files
committed
Reconcile secondary network OVS ports after Agent restart
Signed-off-by: KMAnju-2021 <[email protected]>
1 parent 61f55bb commit b098fcc

24 files changed

+577
-170
lines changed

cmd/antrea-agent/agent.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,19 @@ func run(o *Options) error {
647647
}
648648
}
649649

650+
// Secondary network controller should be created before CNIServer.Run() to make sure no Pod CNI updates will be missed.
651+
var secondaryNetworkController *secondarynetwork.Controller
652+
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
653+
secondaryNetworkController, err = secondarynetwork.NewController(
654+
o.config.ClientConnection, o.config.KubeAPIServerOverride,
655+
k8sClient, localPodInformer.Get(),
656+
podUpdateChannel, ifaceStore,
657+
&o.config.SecondaryNetwork, ovsdbConnection)
658+
if err != nil {
659+
return fmt.Errorf("failed to create secondary network controller: %w", err)
660+
}
661+
}
662+
650663
var traceflowController *traceflow.Controller
651664
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
652665
traceflowController = traceflow.NewTraceflowController(
@@ -761,18 +774,6 @@ func run(o *Options) error {
761774
go ipamController.Run(stopCh)
762775
}
763776

764-
var secondaryNetworkController *secondarynetwork.Controller
765-
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
766-
secondaryNetworkController, err = secondarynetwork.NewController(
767-
o.config.ClientConnection, o.config.KubeAPIServerOverride,
768-
k8sClient, localPodInformer.Get(),
769-
podUpdateChannel,
770-
&o.config.SecondaryNetwork, ovsdbConnection)
771-
if err != nil {
772-
return fmt.Errorf("failed to create secondary network controller: %w", err)
773-
}
774-
}
775-
776777
var bgpController *bgp.Controller
777778
if features.DefaultFeatureGate.Enabled(features.BGPPolicy) {
778779
bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies()

pkg/agent/secondarynetwork/init.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
componentbaseconfig "k8s.io/component-base/config"
2525
"k8s.io/klog/v2"
2626

27+
"antrea.io/antrea/pkg/agent/interfacestore"
2728
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
2829
agentconfig "antrea.io/antrea/pkg/config/agent"
2930
"antrea.io/antrea/pkg/ovs/ovsconfig"
@@ -47,6 +48,7 @@ func NewController(
4748
k8sClient clientset.Interface,
4849
podInformer cache.SharedIndexInformer,
4950
podUpdateSubscriber channel.Subscriber,
51+
primaryInterfaceStore interfacestore.InterfaceStore,
5052
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
5153
) (*Controller, error) {
5254
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
@@ -65,7 +67,7 @@ func NewController(
6567
// k8s.v1.cni.cncf.io/networks Annotation defined.
6668
podWatchController, err := podwatch.NewPodController(
6769
k8sClient, netAttachDefClient, podInformer,
68-
podUpdateSubscriber, ovsBridgeClient)
70+
podUpdateSubscriber, primaryInterfaceStore, ovsBridgeClient)
6971
if err != nil {
7072
return nil, err
7173
}

pkg/agent/secondarynetwork/podwatch/controller.go

+85
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func NewPodController(
103103
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface,
104104
podInformer cache.SharedIndexInformer,
105105
podUpdateSubscriber channel.Subscriber,
106+
primaryInterfaceStore interfacestore.InterfaceStore,
106107
ovsBridgeClient ovsconfig.OVSBridgeClient,
107108
) (*PodController, error) {
108109
ifaceStore := interfacestore.NewInterfaceStore()
@@ -134,6 +135,19 @@ func NewPodController(
134135
},
135136
resyncPeriod,
136137
)
138+
139+
// This is the case when secondary bridge is not configured and no VLAN interfaces at all. In this case,
140+
// we should skip both initializeSecondaryInterfaceStore and reconcileSecondaryInterfaces.
141+
if ovsBridgeClient != nil {
142+
if err := pc.initializeSecondaryInterfaceStore(); err != nil {
143+
return nil, fmt.Errorf("failed to initialize secondary interface store: %w", err)
144+
}
145+
146+
if err := pc.reconcileSecondaryInterfaces(primaryInterfaceStore); err != nil {
147+
return nil, fmt.Errorf("failed to restore CNI cache and reconcile secondary interfaces: %w", err)
148+
}
149+
}
150+
137151
// podUpdateSubscriber can be nil with test code.
138152
if podUpdateSubscriber != nil {
139153
// Subscribe Pod CNI add/del events.
@@ -521,3 +535,74 @@ func checkForPodSecondaryNetworkAttachement(pod *corev1.Pod) (string, bool) {
521535
return netObj, false
522536
}
523537
}
538+
539+
// initializeSecondaryInterfaceStore restores secondary interfaceStore when agent restarts.
540+
func (pc *PodController) initializeSecondaryInterfaceStore() error {
541+
ovsPorts, err := pc.ovsBridgeClient.GetPortList()
542+
if err != nil {
543+
return fmt.Errorf("failed to list OVS ports for the secondary bridge: %w", err)
544+
}
545+
546+
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
547+
for index := range ovsPorts {
548+
port := &ovsPorts[index]
549+
ovsPort := &interfacestore.OVSPortConfig{
550+
PortUUID: port.UUID,
551+
OFPort: port.OFPort,
552+
}
553+
554+
interfaceType, ok := port.ExternalIDs[interfacestore.AntreaInterfaceTypeKey]
555+
if !ok {
556+
klog.InfoS("Interface type is not set for the secondary bridge", "interfaceName", port.Name)
557+
continue
558+
}
559+
560+
var intf *interfacestore.InterfaceConfig
561+
switch interfaceType {
562+
case interfacestore.AntreaContainer:
563+
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort)
564+
default:
565+
klog.InfoS("Unknown Antrea interface type for the secondary bridge", "type", interfaceType)
566+
continue
567+
}
568+
569+
ifaceList = append(ifaceList, intf)
570+
}
571+
572+
pc.interfaceStore.Initialize(ifaceList)
573+
klog.InfoS("Successfully initialized the secondary bridge interface store")
574+
575+
return nil
576+
}
577+
578+
// reconcileSecondaryInterfaces restores cniCache when agent restarts using primary interfaceStore.
579+
func (pc *PodController) reconcileSecondaryInterfaces(primaryInterfaceStore interfacestore.InterfaceStore) error {
580+
knownInterfaces := primaryInterfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
581+
for _, containerConfig := range knownInterfaces {
582+
config := containerConfig.ContainerInterfaceConfig
583+
podKey := podKeyGet(config.PodName, config.PodNamespace)
584+
pc.cniCache.Store(podKey, &podCNIInfo{
585+
containerID: config.ContainerID,
586+
})
587+
}
588+
589+
var staleInterfaces []*interfacestore.InterfaceConfig
590+
// secondaryInterfaces is the list of interfaces currently in the secondary local cache.
591+
secondaryInterfaces := pc.interfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
592+
for _, containerConfig := range secondaryInterfaces {
593+
_, exists := primaryInterfaceStore.GetContainerInterface(containerConfig.ContainerID)
594+
if !exists || containerConfig.OFPort == -1 {
595+
// Deletes ports not in the CNI cache.
596+
staleInterfaces = append(staleInterfaces, containerConfig)
597+
}
598+
}
599+
600+
// If there are any stale interfaces, pass them to removeInterfaces()
601+
if len(staleInterfaces) > 0 {
602+
if err := pc.removeInterfaces(staleInterfaces); err != nil {
603+
klog.ErrorS(err, "Failed to remove stale secondary interfaces", "staleInterfaces", staleInterfaces)
604+
}
605+
}
606+
607+
return nil
608+
}

pkg/agent/secondarynetwork/podwatch/controller_test.go

+136-5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"time"
3333

3434
current "github.com/containernetworking/cni/pkg/types/100"
35+
"github.com/google/uuid"
3536
netdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
3637
netdefclientfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
3738
"github.com/stretchr/testify/assert"
@@ -43,12 +44,15 @@ import (
4344
"k8s.io/client-go/kubernetes/fake"
4445
"k8s.io/client-go/util/workqueue"
4546

47+
"antrea.io/antrea/pkg/agent/cniserver"
4648
"antrea.io/antrea/pkg/agent/cniserver/ipam"
4749
cnitypes "antrea.io/antrea/pkg/agent/cniserver/types"
4850
"antrea.io/antrea/pkg/agent/interfacestore"
4951
podwatchtesting "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch/testing"
5052
"antrea.io/antrea/pkg/agent/types"
5153
crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
54+
"antrea.io/antrea/pkg/ovs/ovsconfig"
55+
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
5256
)
5357

5458
const (
@@ -213,14 +217,17 @@ func TestPodControllerRun(t *testing.T) {
213217
ctrl := gomock.NewController(t)
214218
client := fake.NewSimpleClientset()
215219
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
220+
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
221+
mockOVSBridgeClient.EXPECT().GetPortList().Return(nil, nil).AnyTimes()
222+
primaryInterfaceStore := interfacestore.NewInterfaceStore()
216223
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
217224
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
218225
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)
219226
podController, _ := NewPodController(
220227
client,
221228
netdefclient,
222229
informerFactory.Core().V1().Pods().Informer(),
223-
nil, nil)
230+
nil, primaryInterfaceStore, mockOVSBridgeClient)
224231
podController.interfaceConfigurator = interfaceConfigurator
225232
podController.ipamAllocator = mockIPAM
226233
cniCache := &podController.cniCache
@@ -968,7 +975,7 @@ func TestPodControllerAddPod(t *testing.T) {
968975

969976
t.Run("updating deviceID cache per Pod", func(t *testing.T) {
970977
ctrl := gomock.NewController(t)
971-
podController, _, _ := testPodController(ctrl)
978+
podController, _, _, _ := testPodController(ctrl)
972979
_, err := podController.assignUnusedSriovVFDeviceID(podName, testNamespace, sriovResourceName1, interfaceName)
973980
_, exists := podController.vfDeviceIDUsageMap.Load(podKey)
974981
assert.True(t, exists)
@@ -984,16 +991,18 @@ func TestPodControllerAddPod(t *testing.T) {
984991

985992
func testPodController(ctrl *gomock.Controller) (
986993
*PodController, *podwatchtesting.MockIPAMAllocator,
987-
*podwatchtesting.MockInterfaceConfigurator) {
994+
*podwatchtesting.MockInterfaceConfigurator, *ovsconfigtest.MockOVSBridgeClient) {
988995
client := fake.NewSimpleClientset()
989996
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
990997
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
991998
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
992999
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)
1000+
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
9931001

9941002
// PodController without event handlers.
9951003
return &PodController{
9961004
kubeClient: client,
1005+
ovsBridgeClient: mockOVSBridgeClient,
9971006
netAttachDefClient: netdefclient,
9981007
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
9991008
workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay),
@@ -1005,18 +1014,140 @@ func testPodController(ctrl *gomock.Controller) (
10051014
interfaceConfigurator: interfaceConfigurator,
10061015
ipamAllocator: mockIPAM,
10071016
interfaceStore: interfacestore.NewInterfaceStore(),
1008-
}, mockIPAM, interfaceConfigurator
1017+
}, mockIPAM, interfaceConfigurator, mockOVSBridgeClient
10091018
}
10101019

10111020
// Create a test PodController and start informerFactory.
10121021
func testPodControllerStart(ctrl *gomock.Controller) (
10131022
*PodController, *podwatchtesting.MockIPAMAllocator,
10141023
*podwatchtesting.MockInterfaceConfigurator) {
1015-
podController, mockIPAM, interfaceConfigurator := testPodController(ctrl)
1024+
podController, mockIPAM, interfaceConfigurator, _ := testPodController(ctrl)
10161025
informerFactory := informers.NewSharedInformerFactory(podController.kubeClient, resyncPeriod)
10171026
podController.podInformer = informerFactory.Core().V1().Pods().Informer()
10181027
stopCh := make(chan struct{})
10191028
informerFactory.Start(stopCh)
10201029
informerFactory.WaitForCacheSync(stopCh)
10211030
return podController, mockIPAM, interfaceConfigurator
10221031
}
1032+
1033+
func convertExternalIDMap(in map[string]interface{}) map[string]string {
1034+
out := make(map[string]string, len(in))
1035+
for k, v := range in {
1036+
out[k] = v.(string)
1037+
}
1038+
return out
1039+
}
1040+
1041+
func createTestInterfaces() (map[string]string, []ovsconfig.OVSPortData, []*interfacestore.InterfaceConfig) {
1042+
uuid1 := uuid.New().String()
1043+
uuid2 := uuid.New().String()
1044+
uuid3 := uuid.New().String()
1045+
uuid4 := uuid.New().String()
1046+
1047+
p1MAC, p1IP := "11:22:33:44:55:66", "192.168.1.10"
1048+
p2MAC, p2IP := "11:22:33:44:55:77", "192.168.1.11"
1049+
1050+
p1NetMAC, _ := net.ParseMAC(p1MAC)
1051+
p1NetIP := net.ParseIP(p1IP)
1052+
p2NetMAC, _ := net.ParseMAC(p2MAC)
1053+
p2NetIP := net.ParseIP(p2IP)
1054+
1055+
// Create InterfaceConfig objects directly
1056+
containerConfig1 := interfacestore.NewContainerInterface("p1", uuid1, "Pod1", "nsA", "eth0", p1NetMAC, []net.IP{p1NetIP}, 100)
1057+
containerConfig2 := interfacestore.NewContainerInterface("p2", uuid2, "Pod2", "nsA", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)
1058+
containerConfig3 := interfacestore.NewContainerInterface("p3", uuid3, "Pod3", "nsA", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)
1059+
1060+
ovsPort1 := ovsconfig.OVSPortData{
1061+
UUID: uuid1, Name: "p1", OFPort: 11,
1062+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1063+
containerConfig1))}
1064+
1065+
ovsPort2 := ovsconfig.OVSPortData{
1066+
UUID: uuid2, Name: "p2", OFPort: 12,
1067+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1068+
containerConfig2))}
1069+
1070+
ovsPort3 := ovsconfig.OVSPortData{
1071+
UUID: uuid3, Name: "p3", OFPort: -1,
1072+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1073+
containerConfig3))}
1074+
1075+
ovsPort4 := ovsconfig.OVSPortData{
1076+
UUID: uuid4,
1077+
Name: "unknownIface",
1078+
OFPort: 20,
1079+
ExternalIDs: map[string]string{
1080+
"unknownKey": "unknownValue"}}
1081+
1082+
return map[string]string{"uuid1": uuid1, "uuid2": uuid2, "uuid3": uuid3, "uuid4": uuid4}, []ovsconfig.OVSPortData{ovsPort1, ovsPort2, ovsPort3, ovsPort4}, []*interfacestore.InterfaceConfig{containerConfig1, containerConfig2, containerConfig3}
1083+
}
1084+
1085+
func TestInitializeSecondaryInterfaceStore(t *testing.T) {
1086+
ctrl := gomock.NewController(t)
1087+
pc, _, _, mockOVSBridgeClient := testPodController(ctrl)
1088+
uuids, ovsPorts, _ := createTestInterfaces()
1089+
mockOVSBridgeClient.EXPECT().GetPortList().Return(ovsPorts, nil)
1090+
1091+
err := pc.initializeSecondaryInterfaceStore()
1092+
require.NoError(t, err, "OVS ports list successfully")
1093+
1094+
// Validate stored interfaces
1095+
require.Equal(t, 3, pc.interfaceStore.Len(), "Only valid interfaces should be stored")
1096+
1097+
_, found1 := pc.interfaceStore.GetContainerInterface(uuids["uuid1"])
1098+
assert.True(t, found1, "Interface 1 should be stored")
1099+
1100+
_, found2 := pc.interfaceStore.GetContainerInterface(uuids["uuid2"])
1101+
assert.True(t, found2, "Interface 2 should be stored")
1102+
1103+
_, found3 := pc.interfaceStore.GetContainerInterface(uuids["uuid3"])
1104+
assert.True(t, found3, "Interface 3 should be stored")
1105+
1106+
_, found4 := pc.interfaceStore.GetContainerInterface(uuids["uuid4"])
1107+
assert.False(t, found4, "Unknown interface type should not be stored")
1108+
}
1109+
1110+
func TestReconcileSecondaryInterfaces(t *testing.T) {
1111+
ctrl := gomock.NewController(t)
1112+
pc, mockIPAM, interfaceConfigurator, _ := testPodController(ctrl)
1113+
primaryStore := interfacestore.NewInterfaceStore()
1114+
1115+
_, _, containerConfigs := createTestInterfaces()
1116+
1117+
// Ensure OVSPortConfig is initialized to avoid nil pointer dereference issues
1118+
// when accessing or modifying OFPort.
1119+
containerConfigs[0].OVSPortConfig = &interfacestore.OVSPortConfig{}
1120+
containerConfigs[0].OFPort = 11
1121+
containerConfigs[1].OVSPortConfig = &interfacestore.OVSPortConfig{}
1122+
containerConfigs[1].OFPort = 12
1123+
containerConfigs[2].OVSPortConfig = &interfacestore.OVSPortConfig{}
1124+
containerConfigs[2].OFPort = -1
1125+
1126+
// Add interfaces to primary store
1127+
primaryStore.AddInterface(containerConfigs[0])
1128+
primaryStore.AddInterface(containerConfigs[1])
1129+
1130+
// Add interfaces to controller secondaryInterfaceStore
1131+
pc.interfaceStore.AddInterface(containerConfigs[0])
1132+
pc.interfaceStore.AddInterface(containerConfigs[1])
1133+
// Case when OFPort == -1
1134+
pc.interfaceStore.AddInterface(containerConfigs[2])
1135+
1136+
interfaceConfigurator.EXPECT().DeleteVLANSecondaryInterface(gomock.Any()).Return(nil).Times(1)
1137+
mockIPAM.EXPECT().SecondaryNetworkRelease(gomock.Any()).Return(nil).Times(1)
1138+
1139+
err := pc.reconcileSecondaryInterfaces(primaryStore)
1140+
1141+
require.NoError(t, err)
1142+
1143+
// Check CNI Cache
1144+
_, foundPod1 := pc.cniCache.Load("nsA/Pod1")
1145+
assert.True(t, foundPod1, "CNI Cache should contain nsA/Pod1")
1146+
1147+
_, foundPod2 := pc.cniCache.Load("nsA/Pod2")
1148+
assert.True(t, foundPod2, "CNI Cache should contain nsA/Pod2")
1149+
1150+
// Ensure stale interfaces are removed
1151+
_, foundPod3 := pc.cniCache.Load("nsA/Pod3")
1152+
assert.False(t, foundPod3, "Stale interface should have been removed")
1153+
}

0 commit comments

Comments
 (0)