Skip to content

Commit

Permalink
Add validation that the resolver and splitter are synced with Consul (#6
Browse files Browse the repository at this point in the history
)

Signed-off-by: Ashwin Venkatesh <[email protected]>
  • Loading branch information
thisisnotashwin authored Feb 5, 2024
1 parent edd7645 commit 6c6dcc8
Show file tree
Hide file tree
Showing 2 changed files with 393 additions and 30 deletions.
89 changes: 60 additions & 29 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strconv"
"time"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
rolloutsPlugin "github.com/argoproj/argo-rollouts/rollout/trafficrouting/plugin/rpc"
Expand Down Expand Up @@ -98,33 +99,41 @@ func (r *RpcPlugin) SetWeight(rollout *v1alpha1.Rollout, desiredWeight int32, _
return pluginTypes.RpcError{ErrorString: err.Error()}
}

if err := validateResolverSyncStatus(serviceResolver); err != nil {
return pluginTypes.RpcError{ErrorString: err.Error()}
}

// If the rollout is successful (not aborted) then modify the resolver
if rolloutAborted(rollout) {
r.LogCtx.Debug("Updating ServiceResolver for aborted rollout", "canarySubsetName", canarySubsetName, "serviceResolver", serviceResolver)
serviceResolver, err = r.updateResolverForAbortedRollout(canarySubsetName, *serviceResolver)
serviceResolver, err = r.updateResolverForAbortedRollout(canarySubsetName, serviceResolver)
if err != nil {
return pluginTypes.RpcError{ErrorString: err.Error()}
}
} else {
// Check if the pods have completely rolled over, and we are finished, now set the resolver to the stable version
if rolloutComplete(rollout) {
r.LogCtx.Debug("Updating ServiceResolver for completion", "stableSubsetName", stableSubsetName, "canarySubsetName", canarySubsetName, "serviceMetaVersion", serviceMetaVersion, "serviceResolver", serviceResolver)
serviceResolver, err = r.updateResolverAfterCompletion(stableSubsetName, canarySubsetName, serviceMetaVersion, suffix, *serviceResolver)
r.LogCtx.Debug("Updating ServiceResolver after completion", "stableSubsetName", stableSubsetName, "canarySubsetName", canarySubsetName, "serviceMetaVersion", serviceMetaVersion, "serviceResolver", serviceResolver)
serviceResolver, err = r.updateResolverAfterCompletion(stableSubsetName, canarySubsetName, serviceMetaVersion, suffix, serviceResolver)
if err != nil {
return pluginTypes.RpcError{ErrorString: err.Error()}
}
} else {
// Update the resolver so that canary subset points to the desired version
r.LogCtx.Debug("Updating ServiceResolver for rollout", "canarySubsetName", canarySubsetName, "serviceMetaVersion", serviceMetaVersion, "serviceResolver", serviceResolver)
serviceResolver, err = r.updateResolverForRollouts(canarySubsetName, serviceMetaVersion, suffix, *serviceResolver)
serviceResolver, err = r.updateResolverForInProgressRollouts(canarySubsetName, serviceMetaVersion, suffix, serviceResolver)
if err != nil {
return pluginTypes.RpcError{ErrorString: err.Error()}
}
}
}

serviceSplitter := consulv1aplha1.ServiceSplitter{}
if err := r.K8SClient.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: rollout.GetNamespace()}, &serviceSplitter, &client.GetOptions{}); err != nil {
serviceSplitter := &consulv1aplha1.ServiceSplitter{}
if err := r.K8SClient.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: rollout.GetNamespace()}, serviceSplitter, &client.GetOptions{}); err != nil {
return pluginTypes.RpcError{ErrorString: err.Error()}
}

if err := validateSplitterSyncStatus(serviceSplitter); err != nil {
return pluginTypes.RpcError{ErrorString: err.Error()}
}

Expand Down Expand Up @@ -156,7 +165,7 @@ func (r *RpcPlugin) SetWeight(rollout *v1alpha1.Rollout, desiredWeight int32, _

// Persist changes to the ServiceSplitter
r.LogCtx.Debug("Updating ServiceSplitter", "serviceSplitter", serviceSplitter)
if err := r.K8SClient.Update(ctx, &serviceSplitter, &client.UpdateOptions{}); err != nil {
if err := r.K8SClient.Update(ctx, serviceSplitter, &client.UpdateOptions{}); err != nil {
return pluginTypes.RpcError{ErrorString: err.Error()}
}
return pluginTypes.RpcError{}
Expand All @@ -182,43 +191,39 @@ func (r *RpcPlugin) RemoveManagedRoutes(_ *v1alpha1.Rollout) pluginTypes.RpcErro
return pluginTypes.RpcError{}
}

func (r *RpcPlugin) updateResolverAfterCompletion(stableSubsetName, canarySubsetName, serviceMetaVersion, suffix string, sr consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
func (r *RpcPlugin) updateResolverAfterCompletion(stableSubsetName, canarySubsetName, serviceMetaVersion, suffix string, sr *consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
var err error
serviceResolver, err := r.updateCanaryResolverForRollouts(canarySubsetName, "", sr)
sr, err = r.updateResolverSubsetForRollouts(canarySubsetName, "", sr)
if err != nil {
return nil, err
}

// Update the resolver so that stable subset points to the former canary version
if _, ok := serviceResolver.Spec.Subsets[stableSubsetName]; !ok {
return nil, errors.New(fmt.Sprintf("spec.subsets.%s.filter was not found in consul service resolver: %v", canarySubsetName, sr))
sr, err = r.updateResolverSubsetForRollouts(stableSubsetName, fmt.Sprintf(filterServiceMetaVersionTemplate, suffix, serviceMetaVersion), sr)
if err != nil {
return nil, err
}
stableSubset := serviceResolver.Spec.Subsets[stableSubsetName]
stableSubset.Filter = fmt.Sprintf(filterServiceMetaVersionTemplate, suffix, serviceMetaVersion)
serviceResolver.Spec.Subsets[stableSubsetName] = stableSubset

return serviceResolver, nil
return sr, nil
}

// updateCanaryResolverForRollouts sets the canary filter to the serviceMetaVersion passed in
func (r *RpcPlugin) updateResolverForRollouts(canarySubsetName, serviceMetaVersion, suffix string, sr consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
return r.updateCanaryResolverForRollouts(canarySubsetName, fmt.Sprintf(filterServiceMetaVersionTemplate, suffix, serviceMetaVersion), sr)
// updateResolverForInProgressRollouts sets the canary filter to the serviceMetaVersion passed in
func (r *RpcPlugin) updateResolverForInProgressRollouts(canarySubsetName, serviceMetaVersion, suffix string, sr *consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
return r.updateResolverSubsetForRollouts(canarySubsetName, fmt.Sprintf(filterServiceMetaVersionTemplate, suffix, serviceMetaVersion), sr)
}

// updateResolverForAbortedRollout sets the canary filter to empty if we've aborted the rollout
func (r *RpcPlugin) updateResolverForAbortedRollout(canarySubsetName string, sr consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
return r.updateCanaryResolverForRollouts(canarySubsetName, "", sr)
func (r *RpcPlugin) updateResolverForAbortedRollout(canarySubsetName string, sr *consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
return r.updateResolverSubsetForRollouts(canarySubsetName, "", sr)
}

func (r *RpcPlugin) updateCanaryResolverForRollouts(canarySubsetName, filterValue string, sr consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
if _, ok := sr.Spec.Subsets[canarySubsetName]; !ok {
return nil, errors.New(fmt.Sprintf("spec.subsets.%s.filter was not found in consul service resolver: %v", canarySubsetName, sr))
func (r *RpcPlugin) updateResolverSubsetForRollouts(subsetName, filterValue string, sr *consulv1aplha1.ServiceResolver) (*consulv1aplha1.ServiceResolver, error) {
if _, ok := sr.Spec.Subsets[subsetName]; !ok {
return nil, errors.New(fmt.Sprintf("spec.subsets.%s.filter was not found in consul service resolver: %v", subsetName, sr))
}
canarySubset := sr.Spec.Subsets[canarySubsetName]
canarySubset.Filter = filterValue
sr.Spec.Subsets[canarySubsetName] = canarySubset
subset := sr.Spec.Subsets[subsetName]
subset.Filter = filterValue
sr.Spec.Subsets[subsetName] = subset

return &sr, nil
return sr, nil
}

func rolloutComplete(rollout *v1alpha1.Rollout) bool {
Expand Down Expand Up @@ -260,3 +265,29 @@ func validateConfig(cfg ConsulTrafficRouting) error {
}
return nil
}

func validateResolverSyncStatus(resolver *consulv1aplha1.ServiceResolver) error {
for _, condition := range resolver.Status.Conditions {
if condition.Type == consulv1aplha1.ConditionSynced {
if condition.Status != corev1.ConditionTrue || overTwoSeconds(condition.LastTransitionTime.Time, resolver.LastSyncedTime.Time) {
return errors.New("service resolver has not synced with Consul. The service resolver needs to be up to date before rollout can continue")
}
}
}
return nil
}

func validateSplitterSyncStatus(splitter *consulv1aplha1.ServiceSplitter) error {
for _, condition := range splitter.Status.Conditions {
if condition.Type == consulv1aplha1.ConditionSynced {
if condition.Status != corev1.ConditionTrue || overTwoSeconds(condition.LastTransitionTime.Time, splitter.LastSyncedTime.Time) {
return errors.New("service splitter has not synced with Consul. The service splitter needs to be up to date before rollout can continue")
}
}
}
return nil
}

func overTwoSeconds(t1, t2 time.Time) bool {
return t1.Sub(t2).Abs() > 2*time.Second
}
Loading

0 comments on commit 6c6dcc8

Please sign in to comment.