Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify trafficmanager interface #1866

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 41 additions & 132 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/coreos/pkg/flagutil"
"github.com/flannel-io/flannel/pkg/ip"
"github.com/flannel-io/flannel/pkg/ipmatch"
"github.com/flannel-io/flannel/pkg/lease"
"github.com/flannel-io/flannel/pkg/subnet"
etcd "github.com/flannel-io/flannel/pkg/subnet/etcd"
"github.com/flannel-io/flannel/pkg/subnet/kube"
Expand Down Expand Up @@ -338,95 +337,56 @@ func main() {

//Create TrafficManager and instanciate it based on whether we use iptables or nftables
trafficMngr := newTrafficManager()
flannelIPv4Net := ip.IP4Net{}
flannelIpv6Net := ip.IP6Net{}
if config.EnableIPv4 {
flannelIPv4Net, err = config.GetFlannelNetwork(&bn.Lease().Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
}
if config.EnableIPv6 {
flannelIpv6Net, err = config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
}
// Set up ipMasq if needed
if opts.ipMasq {
if config.EnableIPv4 {
net, err := config.GetFlannelNetwork(&bn.Lease().Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
if err = recycleIPTables(trafficMngr, net, bn.Lease()); err != nil {
log.Errorf("Failed to recycle IPTables rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Setting up masking rules")
trafficMngr.CreateIP4Chain("nat", "FLANNEL-POSTRTG")
getRules := func() []trafficmngr.IPTablesRule {
if config.HasNetworks() {
return trafficMngr.MasqRules(config.Networks, bn.Lease())
} else {
return trafficMngr.MasqRules([]ip.IP4Net{config.Network}, bn.Lease())
}
}
go trafficMngr.SetupAndEnsureIP4Tables(getRules, opts.iptablesResyncSeconds)

}
if config.EnableIPv6 {
ip6net, err := config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
if err = recycleIP6Tables(trafficMngr, ip6net, bn.Lease()); err != nil {
log.Errorf("Failed to recycle IP6Tables rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Setting up masking ip6 rules")
trafficMngr.CreateIP6Chain("nat", "FLANNEL-POSTRTG")
getRules := func() []trafficmngr.IPTablesRule {
if config.HasIPv6Networks() {
return trafficMngr.MasqIP6Rules(config.IPv6Networks, bn.Lease())
} else {
return trafficMngr.MasqIP6Rules([]ip.IP6Net{config.IPv6Network}, bn.Lease())
}
}
go trafficMngr.SetupAndEnsureIP6Tables(getRules, opts.iptablesResyncSeconds)
prevNetworks := ReadCIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_NETWORK")
prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET")

prevIPv6Networks := ReadIP6CIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_NETWORK")
prevIPv6Subnet := ReadIP6CIDRFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_SUBNET")

err = trafficMngr.SetupAndEnsureMasqRules(flannelIPv4Net, prevSubnet,
prevNetworks,
flannelIpv6Net, prevIPv6Subnet,
prevIPv6Networks,
bn.Lease(),
opts.iptablesResyncSeconds)
if err != nil {
log.Errorf("Failed to setup masq rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
}

// Always enables forwarding rules. This is needed for Docker versions >1.13 (https://docs.docker.com/engine/userguide/networking/default_network/container-communication/#container-communication-between-hosts)
// In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT.
// In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP.
if opts.iptablesForwardRules {
if config.EnableIPv4 {
net, err := config.GetFlannelNetwork(&bn.Lease().Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Changing default FORWARD chain policy to ACCEPT")
trafficMngr.CreateIP4Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return trafficMngr.ForwardRules(net.String())
}
go trafficMngr.SetupAndEnsureIP4Tables(getRules, opts.iptablesResyncSeconds)
}
if config.EnableIPv6 {
ip6net, err := config.GetFlannelIPv6Network(&bn.Lease().IPv6Subnet)
if err != nil {
log.Error(err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("IPv6: Changing default FORWARD chain policy to ACCEPT")
trafficMngr.CreateIP6Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return trafficMngr.ForwardRules(ip6net.String())
}
go trafficMngr.SetupAndEnsureIP6Tables(getRules, opts.iptablesResyncSeconds)
}
trafficMngr.SetupAndEnsureForwardRules(
flannelIPv4Net,
flannelIpv6Net,
opts.iptablesResyncSeconds)
}

if err := sm.HandleSubnetFile(opts.subnetFile, config, opts.ipMasq, bn.Lease().Subnet, bn.Lease().IPv6Subnet, bn.MTU()); err != nil {
Expand Down Expand Up @@ -465,57 +425,6 @@ func main() {
os.Exit(0)
}

func recycleIPTables(tm trafficmngr.TrafficManager, nw ip.IP4Net, myLease *lease.Lease) error {
prevNetworks := ReadCIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_NETWORK")
prevSubnet := ReadCIDRFromSubnetFile(opts.subnetFile, "FLANNEL_SUBNET")

//Find the cidr in FLANNEL_NETWORK which contains the podCIDR (i.e. FLANNEL_SUBNET) of this node
prevNetwork := ip.IP4Net{}
for _, net := range prevNetworks {
if net.ContainsCIDR(&prevSubnet) {
prevNetwork = net
break
}
}
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevNetwork != nw && prevSubnet != myLease.Subnet {
log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules", nw, myLease.Subnet, prevNetwork, prevSubnet)
newLease := &lease.Lease{
Subnet: prevSubnet,
}
if err := tm.DeleteIP4Tables(tm.MasqRules(prevNetworks, newLease)); err != nil {
return err
}
}
return nil
}

func recycleIP6Tables(tm trafficmngr.TrafficManager, nw ip.IP6Net, myLease *lease.Lease) error {
prevNetworks := ReadIP6CIDRsFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_NETWORK")
prevSubnet := ReadIP6CIDRFromSubnetFile(opts.subnetFile, "FLANNEL_IPV6_SUBNET")

//Find the cidr in FLANNEL_IPV6_NETWORK which contains the podCIDR (i.e. FLANNEL_IPV6_SUBNET) of this node
prevNetwork := ip.IP6Net{}
for _, net := range prevNetworks {
if net.ContainsCIDR(&prevSubnet) {
prevNetwork = net
break
}
}

// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevNetwork.String() != nw.String() && prevSubnet.String() != myLease.IPv6Subnet.String() {
log.Infof("Current ipv6 network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old ip6tables rules", nw, myLease.IPv6Subnet, prevNetwork, prevSubnet)
lease := &lease.Lease{
IPv6Subnet: prevSubnet,
}
if err := tm.DeleteIP6Tables(tm.MasqIP6Rules(prevNetworks, lease)); err != nil {
return err
}
}
return nil
}

func shutdownHandler(ctx context.Context, sigs chan os.Signal, cancel context.CancelFunc) {
// Wait for the context do be Done or for the signal to come in to shutdown.
select {
Expand Down
16 changes: 16 additions & 0 deletions pkg/subnet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,19 @@ func (c *Config) HasIPv6Networks() bool {
return false
}
}

func (c *Config) GetNetworks() []ip.IP4Net {
if len(c.Networks) > 0 {
return c.Networks
} else {
return []ip.IP4Net{c.Network}
}
}

func (c *Config) GeIPv6tNetworks() []ip.IP6Net {
if len(c.Networks) > 0 {
return c.IPv6Networks
} else {
return []ip.IP6Net{c.IPv6Network}
}
}
104 changes: 95 additions & 9 deletions pkg/trafficmngr/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,74 @@ type IPTablesManager struct{}

const kubeProxyMark string = "0x4000/0x4000"

func (iptm IPTablesManager) MasqRules(cluster_cidrs []ip.IP4Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
func (iptm IPTablesManager) SetupAndEnsureMasqRules(flannelIPv4Net, prevSubnet ip.IP4Net,
prevNetworks []ip.IP4Net,
flannelIPv6Net, prevIPv6Subnet ip.IP6Net,
prevIPv6Networks []ip.IP6Net,
currentlease *lease.Lease,
resyncPeriod int) error {
if flannelIPv4Net.String() != "" {
//Find the cidr in FLANNEL_NETWORK which contains the podCIDR (i.e. FLANNEL_SUBNET) of this node
prevNetwork := ip.IP4Net{}
for _, net := range prevNetworks {
if net.ContainsCIDR(&prevSubnet) {
prevNetwork = net
break
}
}
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevNetwork != flannelIPv4Net && prevSubnet != currentlease.Subnet {
log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules",
flannelIPv4Net, currentlease.Subnet, prevNetwork, prevSubnet)
newLease := &lease.Lease{
Subnet: prevSubnet,
}
if err := iptm.deleteIP4Tables(iptm.masqRules(prevNetworks, newLease)); err != nil {
return err
}
}

log.Infof("Setting up masking rules")
iptm.CreateIP4Chain("nat", "FLANNEL-POSTRTG")
//Note: doesn't work for multiple networks but we disabled MultiClusterCIDR anyway
getRules := func() []trafficmngr.IPTablesRule {
return iptm.masqRules([]ip.IP4Net{flannelIPv4Net}, currentlease)
}
go iptm.setupAndEnsureIP4Tables(getRules, resyncPeriod)
}
if flannelIPv6Net.String() != "" {
//Find the cidr in FLANNEL_IPV6_NETWORK which contains the podCIDR (i.e. FLANNEL_IPV6_SUBNET) of this node
prevIPv6Network := ip.IP6Net{}
for _, net := range prevIPv6Networks {
if net.ContainsCIDR(&prevIPv6Subnet) {
prevIPv6Network = net
break
}
}
// recycle iptables rules only when network configured or subnet leased is not equal to current one.
if prevIPv6Network != flannelIPv6Net && prevIPv6Subnet != currentlease.IPv6Subnet {
log.Infof("Current network or subnet (%v, %v) is not equal to previous one (%v, %v), trying to recycle old iptables rules",
flannelIPv6Net, currentlease.IPv6Subnet, prevIPv6Network, prevIPv6Subnet)
newLease := &lease.Lease{
IPv6Subnet: prevIPv6Subnet,
}
if err := iptm.deleteIP6Tables(iptm.masqIP6Rules(prevIPv6Networks, newLease)); err != nil {
return err
}
}

log.Infof("Setting up masking rules for IPv6")
iptm.CreateIP6Chain("nat", "FLANNEL-POSTRTG")
//Note: doesn't work for multiple networks but we disabled MultiClusterCIDR anyway
getRules := func() []trafficmngr.IPTablesRule {
return iptm.masqIP6Rules([]ip.IP6Net{flannelIPv6Net}, currentlease)
}
go iptm.setupAndEnsureIP6Tables(getRules, resyncPeriod)
}
return nil
}

func (iptm IPTablesManager) masqRules(cluster_cidrs []ip.IP4Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
pod_cidr := lease.Subnet.String()
ipt, err := iptables.New()
supports_random_fully := false
Expand Down Expand Up @@ -90,7 +157,7 @@ func (iptm IPTablesManager) MasqRules(cluster_cidrs []ip.IP4Net, lease *lease.Le
return rules
}

func (iptm IPTablesManager) MasqIP6Rules(cluster_cidrs []ip.IP6Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
func (iptm IPTablesManager) masqIP6Rules(cluster_cidrs []ip.IP6Net, lease *lease.Lease) []trafficmngr.IPTablesRule {
pod_cidr := lease.IPv6Subnet.String()
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
supports_random_fully := false
Expand Down Expand Up @@ -141,7 +208,26 @@ func (iptm IPTablesManager) MasqIP6Rules(cluster_cidrs []ip.IP6Net, lease *lease
return rules
}

func (iptm IPTablesManager) ForwardRules(flannelNetwork string) []trafficmngr.IPTablesRule {
func (iptm IPTablesManager) SetupAndEnsureForwardRules(flannelIPv4Network ip.IP4Net, flannelIPv6Network ip.IP6Net, resyncPeriod int) {
if flannelIPv4Network.String() != "" {
log.Infof("Changing default FORWARD chain policy to ACCEPT")
iptm.CreateIP4Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return iptm.forwardRules(flannelIPv4Network.String())
}
go iptm.setupAndEnsureIP4Tables(getRules, resyncPeriod)
}
if flannelIPv6Network.String() != "" {
log.Infof("IPv6: Changing default FORWARD chain policy to ACCEPT")
iptm.CreateIP6Chain("filter", "FLANNEL-FWD")
getRules := func() []trafficmngr.IPTablesRule {
return iptm.forwardRules(flannelIPv6Network.String())
}
go iptm.setupAndEnsureIP6Tables(getRules, resyncPeriod)
}
}

func (iptm IPTablesManager) forwardRules(flannelNetwork string) []trafficmngr.IPTablesRule {
return []trafficmngr.IPTablesRule{
// This rule ensure that the flannel iptables rules are executed before other rules on the node
{Table: "filter", Action: "-A", Chain: "FORWARD", Rulespec: []string{"-m", "comment", "--comment", "flanneld forward", "-j", "FLANNEL-FWD"}},
Expand Down Expand Up @@ -281,7 +367,7 @@ func ipTablesBootstrap(ipt IPTables, iptRestore IPTablesRestore, rules []traffic
return nil
}

func (iptm IPTablesManager) SetupAndEnsureIP4Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
func (iptm IPTablesManager) setupAndEnsureIP4Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
rules := getRules()
log.Infof("generated %d rules", len(rules))
ipt, err := iptables.New()
Expand Down Expand Up @@ -320,7 +406,7 @@ func (iptm IPTablesManager) SetupAndEnsureIP4Tables(getRules func() []trafficmng
}
}

func (iptm IPTablesManager) SetupAndEnsureIP6Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
func (iptm IPTablesManager) setupAndEnsureIP6Tables(getRules func() []trafficmngr.IPTablesRule, resyncPeriod int) {
rules := getRules()
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
Expand Down Expand Up @@ -358,8 +444,8 @@ func (iptm IPTablesManager) SetupAndEnsureIP6Tables(getRules func() []trafficmng
}
}

// DeleteIP4Tables delete specified iptables rules
func (iptm IPTablesManager) DeleteIP4Tables(rules []trafficmngr.IPTablesRule) error {
// deleteIP4Tables delete specified iptables rules
func (iptm IPTablesManager) deleteIP4Tables(rules []trafficmngr.IPTablesRule) error {
ipt, err := iptables.New()
if err != nil {
// if we can't find iptables, give up and return
Expand All @@ -380,8 +466,8 @@ func (iptm IPTablesManager) DeleteIP4Tables(rules []trafficmngr.IPTablesRule) er
return nil
}

// DeleteIP6Tables delete specified iptables rules
func (iptm IPTablesManager) DeleteIP6Tables(rules []trafficmngr.IPTablesRule) error {
// deleteIP6Tables delete specified iptables rules
func (iptm IPTablesManager) deleteIP6Tables(rules []trafficmngr.IPTablesRule) error {
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv6)
if err != nil {
// if we can't find iptables, give up and return
Expand Down
2 changes: 1 addition & 1 deletion pkg/trafficmngr/iptables/iptables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestDeleteRules(t *testing.T) {
ipt := &MockIPTables{t: t}
iptr := &MockIPTablesRestore{t: t}
iptm := IPTablesManager{}
baseRules := iptm.MasqRules([]ip.IP4Net{{
baseRules := iptm.masqRules([]ip.IP4Net{{
IP: ip.MustParseIP4("10.0.1.0"),
PrefixLen: 16,
}}, testingLease())
Expand Down
Loading
Loading