Skip to content

Commit 7c6208b

Browse files
committed
Reconcile secondary network OVS ports after Agent restart
Signed-off-by: KMAnju-2021 <[email protected]>
1 parent 61f55bb commit 7c6208b

24 files changed

+541
-170
lines changed

cmd/antrea-agent/agent.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,19 @@ func run(o *Options) error {
647647
}
648648
}
649649

650+
// Secondary network controller should be created before CNIServer.Run() to make sure no Pod CNI updates will be missed.
651+
var secondaryNetworkController *secondarynetwork.Controller
652+
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
653+
secondaryNetworkController, err = secondarynetwork.NewController(
654+
o.config.ClientConnection, o.config.KubeAPIServerOverride,
655+
k8sClient, localPodInformer.Get(),
656+
podUpdateChannel, ifaceStore,
657+
&o.config.SecondaryNetwork, ovsdbConnection)
658+
if err != nil {
659+
return fmt.Errorf("failed to create secondary network controller: %w", err)
660+
}
661+
}
662+
650663
var traceflowController *traceflow.Controller
651664
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
652665
traceflowController = traceflow.NewTraceflowController(
@@ -761,18 +774,6 @@ func run(o *Options) error {
761774
go ipamController.Run(stopCh)
762775
}
763776

764-
var secondaryNetworkController *secondarynetwork.Controller
765-
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
766-
secondaryNetworkController, err = secondarynetwork.NewController(
767-
o.config.ClientConnection, o.config.KubeAPIServerOverride,
768-
k8sClient, localPodInformer.Get(),
769-
podUpdateChannel,
770-
&o.config.SecondaryNetwork, ovsdbConnection)
771-
if err != nil {
772-
return fmt.Errorf("failed to create secondary network controller: %w", err)
773-
}
774-
}
775-
776777
var bgpController *bgp.Controller
777778
if features.DefaultFeatureGate.Enabled(features.BGPPolicy) {
778779
bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies()

pkg/agent/secondarynetwork/init.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
componentbaseconfig "k8s.io/component-base/config"
2525
"k8s.io/klog/v2"
2626

27+
"antrea.io/antrea/pkg/agent/interfacestore"
2728
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
2829
agentconfig "antrea.io/antrea/pkg/config/agent"
2930
"antrea.io/antrea/pkg/ovs/ovsconfig"
@@ -47,6 +48,7 @@ func NewController(
4748
k8sClient clientset.Interface,
4849
podInformer cache.SharedIndexInformer,
4950
podUpdateSubscriber channel.Subscriber,
51+
primaryInterfaceStore interfacestore.InterfaceStore,
5052
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
5153
) (*Controller, error) {
5254
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
@@ -65,7 +67,7 @@ func NewController(
6567
// k8s.v1.cni.cncf.io/networks Annotation defined.
6668
podWatchController, err := podwatch.NewPodController(
6769
k8sClient, netAttachDefClient, podInformer,
68-
podUpdateSubscriber, ovsBridgeClient)
70+
podUpdateSubscriber, primaryInterfaceStore, ovsBridgeClient)
6971
if err != nil {
7072
return nil, err
7173
}

pkg/agent/secondarynetwork/podwatch/controller.go

+81
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func NewPodController(
103103
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface,
104104
podInformer cache.SharedIndexInformer,
105105
podUpdateSubscriber channel.Subscriber,
106+
primaryInterfaceStore interfacestore.InterfaceStore,
106107
ovsBridgeClient ovsconfig.OVSBridgeClient,
107108
) (*PodController, error) {
108109
ifaceStore := interfacestore.NewInterfaceStore()
@@ -134,6 +135,15 @@ func NewPodController(
134135
},
135136
resyncPeriod,
136137
)
138+
139+
if err := pc.initializeSecondaryInterfaceStore(); err != nil {
140+
return nil, fmt.Errorf("failed to initialize secondary interface store: %w", err)
141+
}
142+
143+
if err := pc.reconcileSecondaryInterfaces(primaryInterfaceStore); err != nil {
144+
return nil, fmt.Errorf("failed to restore CNI cache and reconcile secondary interfaces: %w", err)
145+
}
146+
137147
// podUpdateSubscriber can be nil with test code.
138148
if podUpdateSubscriber != nil {
139149
// Subscribe Pod CNI add/del events.
@@ -521,3 +531,74 @@ func checkForPodSecondaryNetworkAttachement(pod *corev1.Pod) (string, bool) {
521531
return netObj, false
522532
}
523533
}
534+
535+
// initializeSecondaryInterfaceStore restores secondary interfaceStore when agent restarts.
536+
func (pc *PodController) initializeSecondaryInterfaceStore() error {
537+
ovsPorts, err := pc.ovsBridgeClient.GetPortList()
538+
if err != nil {
539+
return fmt.Errorf("failed to list OVS ports for the secondary bridge: %w", err)
540+
}
541+
542+
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
543+
for index := range ovsPorts {
544+
port := &ovsPorts[index]
545+
ovsPort := &interfacestore.OVSPortConfig{
546+
PortUUID: port.UUID,
547+
OFPort: port.OFPort,
548+
}
549+
550+
interfaceType, ok := port.ExternalIDs[interfacestore.AntreaInterfaceTypeKey]
551+
if !ok {
552+
klog.InfoS("Interface type is not set for the secondary bridge", "interfaceName", port.Name)
553+
continue
554+
}
555+
556+
var intf *interfacestore.InterfaceConfig
557+
switch interfaceType {
558+
case interfacestore.AntreaContainer:
559+
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort)
560+
default:
561+
klog.InfoS("Unknown Antrea interface type for the secondary bridge", "type", interfaceType)
562+
continue
563+
}
564+
565+
ifaceList = append(ifaceList, intf)
566+
}
567+
568+
pc.interfaceStore.Initialize(ifaceList)
569+
klog.InfoS("Successfully initialized the secondary bridge interface store")
570+
571+
return nil
572+
}
573+
574+
// reconcileSecondaryInterfaces restores cniCache when agent restarts using primary interfaceStore.
575+
func (pc *PodController) reconcileSecondaryInterfaces(primaryInterfaceStore interfacestore.InterfaceStore) error {
576+
knownInterfaces := primaryInterfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
577+
for _, containerConfig := range knownInterfaces {
578+
config := containerConfig.ContainerInterfaceConfig
579+
podKey := podKeyGet(config.PodName, config.PodNamespace)
580+
pc.cniCache.Store(podKey, &podCNIInfo{
581+
containerID: config.ContainerID,
582+
})
583+
}
584+
585+
var staleInterfaces []*interfacestore.InterfaceConfig
586+
// secondaryInterfaces is the list of interfaces currently in the secondary local cache.
587+
secondaryInterfaces := pc.interfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
588+
for _, containerConfig := range secondaryInterfaces {
589+
_, exists := primaryInterfaceStore.GetContainerInterface(containerConfig.ContainerID)
590+
if !exists || containerConfig.OFPort == -1 {
591+
// Deletes ports not in the CNI cache.
592+
staleInterfaces = append(staleInterfaces, containerConfig)
593+
}
594+
}
595+
596+
// If there are any stale interfaces, pass them to removeInterfaces()
597+
if len(staleInterfaces) > 0 {
598+
if err := pc.removeInterfaces(staleInterfaces); err != nil {
599+
klog.ErrorS(err, "Failed to remove stale secondary interfaces", "staleInterfaces", staleInterfaces)
600+
}
601+
}
602+
603+
return nil
604+
}

pkg/agent/secondarynetwork/podwatch/controller_test.go

+132-5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"time"
3333

3434
current "github.com/containernetworking/cni/pkg/types/100"
35+
"github.com/google/uuid"
3536
netdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
3637
netdefclientfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
3738
"github.com/stretchr/testify/assert"
@@ -43,12 +44,15 @@ import (
4344
"k8s.io/client-go/kubernetes/fake"
4445
"k8s.io/client-go/util/workqueue"
4546

47+
"antrea.io/antrea/pkg/agent/cniserver"
4648
"antrea.io/antrea/pkg/agent/cniserver/ipam"
4749
cnitypes "antrea.io/antrea/pkg/agent/cniserver/types"
4850
"antrea.io/antrea/pkg/agent/interfacestore"
4951
podwatchtesting "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch/testing"
5052
"antrea.io/antrea/pkg/agent/types"
5153
crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
54+
"antrea.io/antrea/pkg/ovs/ovsconfig"
55+
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
5256
)
5357

5458
const (
@@ -213,14 +217,17 @@ func TestPodControllerRun(t *testing.T) {
213217
ctrl := gomock.NewController(t)
214218
client := fake.NewSimpleClientset()
215219
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
220+
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
221+
mockOVSBridgeClient.EXPECT().GetPortList().Return(nil, nil).AnyTimes()
222+
primaryInterfaceStore := interfacestore.NewInterfaceStore()
216223
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
217224
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
218225
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)
219226
podController, _ := NewPodController(
220227
client,
221228
netdefclient,
222229
informerFactory.Core().V1().Pods().Informer(),
223-
nil, nil)
230+
nil, primaryInterfaceStore, mockOVSBridgeClient)
224231
podController.interfaceConfigurator = interfaceConfigurator
225232
podController.ipamAllocator = mockIPAM
226233
cniCache := &podController.cniCache
@@ -968,7 +975,7 @@ func TestPodControllerAddPod(t *testing.T) {
968975

969976
t.Run("updating deviceID cache per Pod", func(t *testing.T) {
970977
ctrl := gomock.NewController(t)
971-
podController, _, _ := testPodController(ctrl)
978+
podController, _, _, _ := testPodController(ctrl)
972979
_, err := podController.assignUnusedSriovVFDeviceID(podName, testNamespace, sriovResourceName1, interfaceName)
973980
_, exists := podController.vfDeviceIDUsageMap.Load(podKey)
974981
assert.True(t, exists)
@@ -984,16 +991,18 @@ func TestPodControllerAddPod(t *testing.T) {
984991

985992
func testPodController(ctrl *gomock.Controller) (
986993
*PodController, *podwatchtesting.MockIPAMAllocator,
987-
*podwatchtesting.MockInterfaceConfigurator) {
994+
*podwatchtesting.MockInterfaceConfigurator, *ovsconfigtest.MockOVSBridgeClient) {
988995
client := fake.NewSimpleClientset()
989996
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
990997
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
991998
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
992999
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)
1000+
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
9931001

9941002
// PodController without event handlers.
9951003
return &PodController{
9961004
kubeClient: client,
1005+
ovsBridgeClient: mockOVSBridgeClient,
9971006
netAttachDefClient: netdefclient,
9981007
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
9991008
workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay),
@@ -1005,18 +1014,136 @@ func testPodController(ctrl *gomock.Controller) (
10051014
interfaceConfigurator: interfaceConfigurator,
10061015
ipamAllocator: mockIPAM,
10071016
interfaceStore: interfacestore.NewInterfaceStore(),
1008-
}, mockIPAM, interfaceConfigurator
1017+
}, mockIPAM, interfaceConfigurator, mockOVSBridgeClient
10091018
}
10101019

10111020
// Create a test PodController and start informerFactory.
10121021
func testPodControllerStart(ctrl *gomock.Controller) (
10131022
*PodController, *podwatchtesting.MockIPAMAllocator,
10141023
*podwatchtesting.MockInterfaceConfigurator) {
1015-
podController, mockIPAM, interfaceConfigurator := testPodController(ctrl)
1024+
podController, mockIPAM, interfaceConfigurator, _ := testPodController(ctrl)
10161025
informerFactory := informers.NewSharedInformerFactory(podController.kubeClient, resyncPeriod)
10171026
podController.podInformer = informerFactory.Core().V1().Pods().Informer()
10181027
stopCh := make(chan struct{})
10191028
informerFactory.Start(stopCh)
10201029
informerFactory.WaitForCacheSync(stopCh)
10211030
return podController, mockIPAM, interfaceConfigurator
10221031
}
1032+
1033+
func convertExternalIDMap(in map[string]interface{}) map[string]string {
1034+
out := make(map[string]string, len(in))
1035+
for k, v := range in {
1036+
out[k] = v.(string)
1037+
}
1038+
return out
1039+
}
1040+
1041+
func createTestInterfaces() (map[string]string, []ovsconfig.OVSPortData) {
1042+
uuid1 := uuid.New().String()
1043+
uuid2 := uuid.New().String()
1044+
uuid3 := uuid.New().String()
1045+
uuid4 := uuid.New().String()
1046+
1047+
p1MAC, p1IP := "11:22:33:44:55:66", "192.168.1.10"
1048+
p2MAC, p2IP := "11:22:33:44:55:77", "192.168.1.11"
1049+
1050+
p1NetMAC, _ := net.ParseMAC(p1MAC)
1051+
p1NetIP := net.ParseIP(p1IP)
1052+
p2NetMAC, _ := net.ParseMAC(p2MAC)
1053+
p2NetIP := net.ParseIP(p2IP)
1054+
1055+
// Create InterfaceConfig objects directly
1056+
iface1 := interfacestore.NewContainerInterface("p1", uuid1, "Pod1", "nsA", "eth0", p1NetMAC, []net.IP{p1NetIP}, 100)
1057+
iface2 := interfacestore.NewContainerInterface("p2", uuid2, "Pod2", "nsA", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)
1058+
iface3 := interfacestore.NewContainerInterface("p3", uuid3, "Pod3", "nsA", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)
1059+
1060+
ovsPort1 := ovsconfig.OVSPortData{
1061+
UUID: uuid1, Name: "p1", OFPort: 11,
1062+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1063+
iface1))}
1064+
1065+
ovsPort2 := ovsconfig.OVSPortData{
1066+
UUID: uuid2, Name: "p2", OFPort: 12,
1067+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1068+
iface2))}
1069+
1070+
ovsPort3 := ovsconfig.OVSPortData{
1071+
UUID: uuid3, Name: "p3", OFPort: -1,
1072+
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
1073+
iface3))}
1074+
1075+
ovsPort4 := ovsconfig.OVSPortData{
1076+
UUID: uuid4,
1077+
Name: "unknownIface",
1078+
OFPort: 20,
1079+
ExternalIDs: map[string]string{
1080+
"unknownKey": "unknownValue"}}
1081+
1082+
return map[string]string{"uuid1": uuid1, "uuid2": uuid2, "uuid3": uuid3, "uuid4": uuid4}, []ovsconfig.OVSPortData{ovsPort1, ovsPort2, ovsPort3, ovsPort4}
1083+
}
1084+
1085+
func TestInitializeSecondaryInterfaceStore(t *testing.T) {
1086+
ctrl := gomock.NewController(t)
1087+
pc, _, _, mockOVSBridgeClient := testPodController(ctrl)
1088+
uuids, ovsPorts := createTestInterfaces()
1089+
mockOVSBridgeClient.EXPECT().GetPortList().Return(ovsPorts, nil)
1090+
1091+
err := pc.initializeSecondaryInterfaceStore()
1092+
require.NoError(t, err, "OVS ports list successfully")
1093+
1094+
// Validate stored interfaces
1095+
require.Equal(t, 3, pc.interfaceStore.Len(), "Only valid interfaces should be stored")
1096+
1097+
_, found1 := pc.interfaceStore.GetContainerInterface(uuids["uuid1"])
1098+
assert.True(t, found1, "Interface 1 should be stored")
1099+
1100+
_, found2 := pc.interfaceStore.GetContainerInterface(uuids["uuid2"])
1101+
assert.True(t, found2, "Interface 2 should be stored")
1102+
1103+
_, found3 := pc.interfaceStore.GetContainerInterface(uuids["uuid3"])
1104+
assert.True(t, found3, "Interface 3 should be stored")
1105+
1106+
_, found4 := pc.interfaceStore.GetContainerInterface(uuids["uuid4"])
1107+
assert.False(t, found4, "Unknown interface type should not be stored")
1108+
}
1109+
1110+
func TestReconcileSecondaryInterfaces(t *testing.T) {
1111+
ctrl := gomock.NewController(t)
1112+
pc, mockIPAM, interfaceConfigurator, _ := testPodController(ctrl)
1113+
primaryStore := interfacestore.NewInterfaceStore()
1114+
1115+
_, ovsPorts := createTestInterfaces()
1116+
1117+
infaceConfig1 := cniserver.ParseOVSPortInterfaceConfig(&ovsPorts[0], &interfacestore.OVSPortConfig{PortUUID: ovsPorts[0].UUID, OFPort: ovsPorts[0].OFPort})
1118+
infaceConfig2 := cniserver.ParseOVSPortInterfaceConfig(&ovsPorts[1], &interfacestore.OVSPortConfig{PortUUID: ovsPorts[1].UUID, OFPort: ovsPorts[1].OFPort})
1119+
staleInfaceConfig3 := cniserver.ParseOVSPortInterfaceConfig(&ovsPorts[2], &interfacestore.OVSPortConfig{PortUUID: ovsPorts[2].UUID, OFPort: ovsPorts[2].OFPort})
1120+
1121+
// Add interfaces to primary store
1122+
primaryStore.AddInterface(infaceConfig1)
1123+
primaryStore.AddInterface(infaceConfig2)
1124+
1125+
// Add interfaces to controller secondaryInterfaceStore
1126+
pc.interfaceStore.AddInterface(infaceConfig1)
1127+
pc.interfaceStore.AddInterface(infaceConfig2)
1128+
1129+
// Case when OFPort == -1
1130+
pc.interfaceStore.AddInterface(staleInfaceConfig3)
1131+
1132+
interfaceConfigurator.EXPECT().DeleteVLANSecondaryInterface(gomock.Any()).Return(nil).Times(1)
1133+
mockIPAM.EXPECT().SecondaryNetworkRelease(gomock.Any()).Return(nil).Times(1)
1134+
1135+
err := pc.reconcileSecondaryInterfaces(primaryStore)
1136+
1137+
require.NoError(t, err)
1138+
1139+
// Check CNI Cache
1140+
_, foundPod1 := pc.cniCache.Load("nsA/Pod1")
1141+
assert.True(t, foundPod1, "CNI Cache should contain ns1/pod1")
1142+
1143+
_, foundPod2 := pc.cniCache.Load("nsA/Pod2")
1144+
assert.True(t, foundPod2, "CNI Cache should contain ns2/pod2")
1145+
1146+
// Ensure stale interfaces are removed
1147+
_, foundPod3 := pc.cniCache.Load("nsA/Pod3")
1148+
assert.False(t, foundPod3, "Stale interface should have been removed")
1149+
}

0 commit comments

Comments
 (0)