Skip to content

Commit

Permalink
Automatically preserve kubelet supported version skew on worker nodes…
Browse files Browse the repository at this point in the history
…, while control plane is being updated
  • Loading branch information
rsafonseca committed Nov 4, 2024
1 parent 27daf5a commit a454adb
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 47 deletions.
2 changes: 1 addition & 1 deletion clusterapi/bootstrap/controllers/kopsconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (r *KopsConfigReconciler) buildBootstrapData(ctx context.Context) ([]byte,
}

assets := make(map[architectures.Architecture][]*assets.MirroredAsset)
configBuilder, err := nodemodel.NewNodeUpConfigBuilder(cluster, assetBuilder, assets, encryptionConfigSecretHash)
configBuilder, err := nodemodel.NewNodeUpConfigBuilder(cluster, assetBuilder, assets, assets, encryptionConfigSecretHash)
if err != nil {
return nil, err
}
Expand Down
78 changes: 66 additions & 12 deletions cmd/kops/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"k8s.io/kops/cmd/kops/util"
"k8s.io/kops/pkg/apis/kops"
apisutil "k8s.io/kops/pkg/apis/kops/util"
"k8s.io/kops/pkg/assets"
"k8s.io/kops/pkg/commands/commandutils"
"k8s.io/kops/pkg/kubeconfig"
Expand Down Expand Up @@ -65,6 +68,9 @@ type UpdateClusterOptions struct {
SSHPublicKey string
RunTasksOptions fi.RunTasksOptions
AllowKopsDowngrade bool
// Bypasses control plane version checks, which by default prevent non-control plane instancegroups
// from being updated to a version greater than the control plane
IgnoreVersionSkew bool
// GetAssets is whether this is invoked from the CmdGetAssets.
GetAssets bool

Expand Down Expand Up @@ -93,6 +99,8 @@ func (o *UpdateClusterOptions) InitDefaults() {
o.Target = "direct"
o.SSHPublicKey = ""
o.OutDir = ""
// By default we enforce the version skew between control plane and worker nodes
o.IgnoreVersionSkew = false

// By default we export a kubecfg, but it doesn't have a static/eternal credential in it any more.
o.CreateKubecfg = true
Expand Down Expand Up @@ -142,6 +150,7 @@ func NewCmdUpdateCluster(f *util.Factory, out io.Writer) *cobra.Command {
cmd.RegisterFlagCompletionFunc("lifecycle-overrides", completeLifecycleOverrides)

cmd.Flags().BoolVar(&options.Prune, "prune", options.Prune, "Delete old revisions of cloud resources that were needed during an upgrade")
cmd.Flags().BoolVar(&options.IgnoreVersionSkew, "ignore-version-skew", options.IgnoreVersionSkew, "Setting this to true will force updating the kubernetes version on all instance groups, regardles of which control plane version is running")

return cmd
}
Expand Down Expand Up @@ -290,19 +299,29 @@ func RunUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer, c *Up
return nil, err
}

minControlPlaneRunningVersion := cluster.Spec.KubernetesVersion
if !c.IgnoreVersionSkew {
minControlPlaneRunningVersion, err = checkControlPlaneRunningVersion(ctx, cluster.ObjectMeta.Name, minControlPlaneRunningVersion)
if err != nil {
klog.Warningf("error checking control plane running verion: %v", err)
} else {
klog.Warningf("successfully checked control plane running version: %v", minControlPlaneRunningVersion)
}
}
applyCmd := &cloudup.ApplyClusterCmd{
Cloud: cloud,
Clientset: clientset,
Cluster: cluster,
DryRun: isDryrun,
AllowKopsDowngrade: c.AllowKopsDowngrade,
RunTasksOptions: &c.RunTasksOptions,
OutDir: c.OutDir,
Phase: phase,
TargetName: targetName,
LifecycleOverrides: lifecycleOverrideMap,
GetAssets: c.GetAssets,
DeletionProcessing: deletionProcessing,
Cloud: cloud,
Clientset: clientset,
Cluster: cluster,
DryRun: isDryrun,
AllowKopsDowngrade: c.AllowKopsDowngrade,
RunTasksOptions: &c.RunTasksOptions,
OutDir: c.OutDir,
Phase: phase,
TargetName: targetName,
LifecycleOverrides: lifecycleOverrideMap,
GetAssets: c.GetAssets,
DeletionProcessing: deletionProcessing,
ControlPlaneRunningVersion: minControlPlaneRunningVersion,
}

applyResults, err := applyCmd.Run(ctx)
Expand Down Expand Up @@ -518,3 +537,38 @@ func completeLifecycleOverrides(cmd *cobra.Command, args []string, toComplete st
}
return completions, cobra.ShellCompDirectiveNoFileComp
}

func checkControlPlaneRunningVersion(ctx context.Context, clusterName string, version string) (string, error) {

configLoadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
configLoadingRules,
&clientcmd.ConfigOverrides{CurrentContext: clusterName}).ClientConfig()
if err != nil {
return version, fmt.Errorf("cannot load kubecfg settings for %q: %v", clusterName, err)
}

k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return version, fmt.Errorf("cannot build kubernetes api client for %q: %v", clusterName, err)
}

parsedVersion, err := apisutil.ParseKubernetesVersion(version)
if err != nil {
return version, fmt.Errorf("cannot parse kubernetes version %q: %v", clusterName, err)
}
nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
LabelSelector: "node-role.kubernetes.io/control-plane",
})
if err != nil {
return version, fmt.Errorf("cannot list nodes in cluster %q: %v", clusterName, err)
}
for _, node := range nodeList.Items {
if apisutil.IsKubernetesGTE(node.Status.NodeInfo.KubeletVersion, *parsedVersion) {
version = node.Status.NodeInfo.KubeletVersion
parsedVersion, _ = apisutil.ParseKubernetesVersion(version)
}

}
return strings.TrimPrefix(version, "v"), nil
}
3 changes: 3 additions & 0 deletions pkg/assets/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type AssetBuilder struct {
// KubernetesVersion is the version of kubernetes we are installing
KubernetesVersion semver.Version

// KubeletSupportedVersion is the max version of kubelet that we are currently allowed to run on worker nodes
KubeletSupportedVersion string

// StaticManifests records manifests used by nodeup:
// * e.g. sidecar manifests for static pods run by kubelet
StaticManifests []*StaticManifest
Expand Down
2 changes: 1 addition & 1 deletion pkg/commands/toolbox_enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func (b *ConfigBuilder) GetBootstrapData(ctx context.Context) (*BootstrapData, e
return nil, err
}

configBuilder, err := nodemodel.NewNodeUpConfigBuilder(cluster, assetBuilder, fileAssets.Assets, encryptionConfigSecretHash)
configBuilder, err := nodemodel.NewNodeUpConfigBuilder(cluster, assetBuilder, fileAssets.Assets, fileAssets.AssetsSupportedKubelet, encryptionConfigSecretHash)
if err != nil {
return nil, err
}
Expand Down
76 changes: 47 additions & 29 deletions pkg/nodemodel/fileassets.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,40 @@ type FileAssets struct {
// NodeUpAssets are the assets for downloading nodeup
NodeUpAssets map[architectures.Architecture]*assets.MirroredAsset

// AssetsSupportedKubelet are the assets for downloading nodeup, when the masters have not yet been rolled out to the version in the ClusterSpec
AssetsSupportedKubelet map[architectures.Architecture][]*assets.MirroredAsset

Cluster *kops.Cluster
}

// AddFileAssets adds the file assets within the assetBuilder
func (c *FileAssets) AddFileAssets(assetBuilder *assets.AssetBuilder) error {
var err error
c.Assets, err = c.addFileAssets(assetBuilder, c.Cluster.Spec.KubernetesVersion)
if err != nil {
return err
}
if len(assetBuilder.KubeletSupportedVersion) > 0 && assetBuilder.KubeletSupportedVersion != c.Cluster.Spec.KubernetesVersion {
c.AssetsSupportedKubelet, err = c.addFileAssets(assetBuilder, assetBuilder.KubeletSupportedVersion)
if err != nil {
return err
}
}
return nil
}

// AddFileAssets adds the file assets within the assetBuilder
func (c *FileAssets) addFileAssets(assetBuilder *assets.AssetBuilder, version string) (map[architectures.Architecture][]*assets.MirroredAsset, error) {
var baseURL string
if components.IsBaseURL(c.Cluster.Spec.KubernetesVersion) {
baseURL = c.Cluster.Spec.KubernetesVersion
if components.IsBaseURL(version) {
baseURL = version
} else {
baseURL = "https://dl.k8s.io/release/v" + c.Cluster.Spec.KubernetesVersion
baseURL = "https://dl.k8s.io/release/v" + version
}

c.Assets = make(map[architectures.Architecture][]*assets.MirroredAsset)
assetsMap := make(map[architectures.Architecture][]*assets.MirroredAsset)
c.NodeUpAssets = make(map[architectures.Architecture]*assets.MirroredAsset)
for _, arch := range architectures.GetSupported() {
c.Assets[arch] = []*assets.MirroredAsset{}
assetsMap[arch] = []*assets.MirroredAsset{}

k8sAssetsNames := []string{
fmt.Sprintf("/bin/linux/%s/kubelet", arch),
Expand All @@ -71,18 +89,18 @@ func (c *FileAssets) AddFileAssets(assetBuilder *assets.AssetBuilder) error {
for _, an := range k8sAssetsNames {
k, err := url.Parse(baseURL)
if err != nil {
return err
return nil, err
}
k.Path = path.Join(k.Path, an)

asset, err := assetBuilder.RemapFile(k, nil)
if err != nil {
return err
return nil, err
}
c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(asset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(asset))
}

kubernetesVersion, _ := util.ParseKubernetesVersion(c.Cluster.Spec.KubernetesVersion)
kubernetesVersion, _ := util.ParseKubernetesVersion(version)

cloudProvider := c.Cluster.GetCloudProvider()
if ok := model.UseExternalKubeletCredentialProvider(*kubernetesVersion, cloudProvider); ok {
Expand All @@ -95,7 +113,7 @@ func (c *FileAssets) AddFileAssets(assetBuilder *assets.AssetBuilder) error {
// VALID FOR 60 DAYS WE REALLY NEED TO MERGE https://github.com/kubernetes/cloud-provider-gcp/pull/601 and CUT A RELEASE
k, err := url.Parse(fmt.Sprintf("%s/linux-%s/v20231005-providersv0.27.1-65-g8fbe8d27", *binaryLocation, arch))
if err != nil {
return err
return nil, err
}

// TODO: Move these hashes to assetdata
Expand All @@ -105,14 +123,14 @@ func (c *FileAssets) AddFileAssets(assetBuilder *assets.AssetBuilder) error {
}
hash, err := hashing.FromString(hashes[arch])
if err != nil {
return fmt.Errorf("unable to parse auth-provider-gcp binary asset hash %q: %v", hashes[arch], err)
return nil, fmt.Errorf("unable to parse auth-provider-gcp binary asset hash %q: %v", hashes[arch], err)
}
asset, err := assetBuilder.RemapFile(k, hash)
if err != nil {
return err
return nil, err
}

c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(asset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(asset))
case kops.CloudProviderAWS:
binaryLocation := c.Cluster.Spec.CloudProvider.AWS.BinariesLocation
if binaryLocation == nil {
Expand All @@ -121,65 +139,65 @@ func (c *FileAssets) AddFileAssets(assetBuilder *assets.AssetBuilder) error {

u, err := url.Parse(fmt.Sprintf("%s/linux/%s/ecr-credential-provider-linux-%s", *binaryLocation, arch, arch))
if err != nil {
return err
return nil, err
}
asset, err := assetBuilder.RemapFile(u, nil)
if err != nil {
return err
return nil, err
}
c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(asset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(asset))
}
}

{
cniAsset, err := wellknownassets.FindCNIAssets(c.Cluster, assetBuilder, arch)
if err != nil {
return err
return nil, err
}
c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(cniAsset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(cniAsset))
}

if c.Cluster.Spec.Containerd == nil || !c.Cluster.Spec.Containerd.SkipInstall {
containerdAsset, err := wellknownassets.FindContainerdAsset(c.Cluster, assetBuilder, arch)
if err != nil {
return err
return nil, err
}
if containerdAsset != nil {
c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(containerdAsset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(containerdAsset))
}

runcAsset, err := wellknownassets.FindRuncAsset(c.Cluster, assetBuilder, arch)
if err != nil {
return err
return nil, err
}
if runcAsset != nil {
c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(runcAsset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(runcAsset))
}
nerdctlAsset, err := wellknownassets.FindNerdctlAsset(c.Cluster, assetBuilder, arch)
if err != nil {
return err
return nil, err
}
if nerdctlAsset != nil {
c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(nerdctlAsset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(nerdctlAsset))
}
}

crictlAsset, err := wellknownassets.FindCrictlAsset(c.Cluster, assetBuilder, arch)
if err != nil {
return err
return nil, err
}
if crictlAsset != nil {
c.Assets[arch] = append(c.Assets[arch], assets.BuildMirroredAsset(crictlAsset))
assetsMap[arch] = append(assetsMap[arch], assets.BuildMirroredAsset(crictlAsset))
}

asset, err := wellknownassets.NodeUpAsset(assetBuilder, arch)
if err != nil {
return err
return nil, err
}
c.NodeUpAssets[arch] = asset
}

return nil
return assetsMap, nil
}

// needsMounterAsset checks if we need the mounter program
Expand Down
14 changes: 11 additions & 3 deletions pkg/nodemodel/nodeupconfigbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type nodeUpConfigBuilder struct {
// Formats:
// raw url: http://... or https://...
// url with hash: <hex>@http://... or <hex>@https://...
assets map[architectures.Architecture][]*assets.MirroredAsset
assets map[architectures.Architecture][]*assets.MirroredAsset
assetsSupportedKubelet map[architectures.Architecture][]*assets.MirroredAsset

assetBuilder *assets.AssetBuilder
channels []string
Expand All @@ -59,7 +60,7 @@ type nodeUpConfigBuilder struct {
encryptionConfigSecretHash string
}

func NewNodeUpConfigBuilder(cluster *kops.Cluster, assetBuilder *assets.AssetBuilder, nodeAssets map[architectures.Architecture][]*assets.MirroredAsset, encryptionConfigSecretHash string) (model.NodeUpConfigBuilder, error) {
func NewNodeUpConfigBuilder(cluster *kops.Cluster, assetBuilder *assets.AssetBuilder, nodeAssets map[architectures.Architecture][]*assets.MirroredAsset, nodeAssetsSupportedKubelet map[architectures.Architecture][]*assets.MirroredAsset, encryptionConfigSecretHash string) (model.NodeUpConfigBuilder, error) {
configBase, err := vfs.Context.BuildVfsPath(cluster.Spec.ConfigStore.Base)
if err != nil {
return nil, fmt.Errorf("error parsing configStore.base %q: %v", cluster.Spec.ConfigStore.Base, err)
Expand Down Expand Up @@ -195,6 +196,7 @@ func NewNodeUpConfigBuilder(cluster *kops.Cluster, assetBuilder *assets.AssetBui
configBuilder := nodeUpConfigBuilder{
assetBuilder: assetBuilder,
assets: nodeAssets,
assetsSupportedKubelet: nodeAssetsSupportedKubelet,
channels: channels,
configBase: configBase,
cluster: cluster,
Expand Down Expand Up @@ -230,7 +232,13 @@ func (n *nodeUpConfigBuilder) BuildConfig(ig *kops.InstanceGroup, wellKnownAddre
config.Assets = make(map[architectures.Architecture][]string)
for _, arch := range architectures.GetSupported() {
config.Assets[arch] = []string{}
for _, a := range n.assets[arch] {
var assetsToUse map[architectures.Architecture][]*assets.MirroredAsset
if !hasAPIServer && len(n.assetsSupportedKubelet) > 0 {
assetsToUse = n.assetsSupportedKubelet
} else {
assetsToUse = n.assets
}
for _, a := range assetsToUse[arch] {
config.Assets[arch] = append(config.Assets[arch], a.CompactString())
}
}
Expand Down
8 changes: 7 additions & 1 deletion upup/pkg/fi/cloudup/apply_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type ApplyClusterCmd struct {

// DeletionProcessing controls whether we process deletions.
DeletionProcessing fi.DeletionProcessingMode

// The current oldest version of control plane nodes, defaults to version defined in cluster spec if IgnoreVersionSkew was set
ControlPlaneRunningVersion string
}

// ApplyResults holds information about an ApplyClusterCmd operation.
Expand Down Expand Up @@ -235,6 +238,9 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) (*ApplyResults, error) {
}

assetBuilder := assets.NewAssetBuilder(c.Clientset.VFSContext(), c.Cluster.Spec.Assets, c.Cluster.Spec.KubernetesVersion, c.GetAssets)
if len(c.ControlPlaneRunningVersion) > 0 && c.ControlPlaneRunningVersion != c.Cluster.Spec.KubernetesVersion {
assetBuilder.KubeletSupportedVersion = c.ControlPlaneRunningVersion
}
err = c.upgradeSpecs(ctx, assetBuilder)
if err != nil {
return nil, err
Expand Down Expand Up @@ -505,7 +511,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) (*ApplyResults, error) {
cloud: cloud,
}

configBuilder, err := nodemodel.NewNodeUpConfigBuilder(cluster, assetBuilder, fileAssets.Assets, encryptionConfigSecretHash)
configBuilder, err := nodemodel.NewNodeUpConfigBuilder(cluster, assetBuilder, fileAssets.Assets, fileAssets.AssetsSupportedKubelet, encryptionConfigSecretHash)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a454adb

Please sign in to comment.