Skip to content

Commit

Permalink
Refactoring ahead of tg caching - pt 1
Browse files Browse the repository at this point in the history
  • Loading branch information
erikfuller committed Sep 30, 2023
1 parent 6cccaa5 commit 939c64b
Show file tree
Hide file tree
Showing 45 changed files with 1,068 additions and 1,136 deletions.
227 changes: 108 additions & 119 deletions controllers/gateway_controller.go

Large diffs are not rendered by default.

26 changes: 8 additions & 18 deletions controllers/gatewayclass_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,36 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// GatewayClassReconciler reconciles a GatewayClass object
type GatewayClassReconciler struct {
type gatewayClassReconciler struct {
log gwlog.Logger
client client.Client
scheme *runtime.Scheme
latticeControllerEnabled bool
}

func RegisterGatewayClassController(log gwlog.Logger, mgr ctrl.Manager) error {
r := &GatewayClassReconciler{
r := &gatewayClassReconciler{
log: log,
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
latticeControllerEnabled: false,
}
return ctrl.NewControllerManagedBy(mgr).
For(&gateway_api.GatewayClass{}).
For(&gwv1beta1.GatewayClass{}).
Complete(r)
}

//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the GatewayClass object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *GatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *gatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.log.Infow("reconcile", "name", req.Name)

gwClass := &gateway_api.GatewayClass{}
gwClass := &gwv1beta1.GatewayClass{}
if err := r.client.Get(ctx, req.NamespacedName, gwClass); err != nil {
r.log.Debugw("gateway not found", "name", req.Name)
return ctrl.Result{}, nil
Expand All @@ -88,8 +78,8 @@ func (r *GatewayClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
gwClass.Status.Conditions[0].LastTransitionTime = metav1.NewTime(time.Now())
gwClass.Status.Conditions[0].ObservedGeneration = gwClass.Generation
gwClass.Status.Conditions[0].Status = "True"
gwClass.Status.Conditions[0].Message = string(gateway_api.GatewayClassReasonAccepted)
gwClass.Status.Conditions[0].Reason = string(gateway_api.GatewayClassReasonAccepted)
gwClass.Status.Conditions[0].Message = string(gwv1beta1.GatewayClassReasonAccepted)
gwClass.Status.Conditions[0].Reason = string(gwv1beta1.GatewayClassReasonAccepted)

if err := r.client.Status().Patch(ctx, gwClass, client.MergeFrom(gwClassOld)); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to update gatewayclass status")
Expand Down
10 changes: 0 additions & 10 deletions controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// podReconciler reconciles a Pod object
type podReconciler struct {
log gwlog.Logger
client client.Client
Expand All @@ -49,15 +48,6 @@ func RegisterPodController(log gwlog.Logger, mgr ctrl.Manager) error {
//+kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Pod object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *podReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
pod := &corev1.Pod{}
if err := r.client.Get(ctx, req.NamespacedName, pod); err != nil {
Expand Down
131 changes: 54 additions & 77 deletions controllers/route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"
gateway_api_v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gateway_api "sigs.k8s.io/gateway-api/apis/v1beta1"
gateway_api_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"sigs.k8s.io/external-dns/endpoint"

Expand All @@ -52,7 +51,7 @@ import (
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
latticemodel "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
)

Expand All @@ -61,8 +60,7 @@ var routeTypeToFinalizer = map[core.RouteType]string{
core.GrpcRouteType: "grpcroute.k8s.aws/resources",
}

// RouteReconciler reconciles a HTTPRoute and GRPCRoute objects
type RouteReconciler struct {
type routeReconciler struct {
routeType core.RouteType
log gwlog.Logger
client client.Client
Expand Down Expand Up @@ -103,12 +101,12 @@ func RegisterAllRouteControllers(
routeType core.RouteType
gatewayApiType client.Object
}{
{core.HttpRouteType, &gateway_api_v1beta1.HTTPRoute{}},
{core.GrpcRouteType, &gateway_api_v1alpha2.GRPCRoute{}},
{core.HttpRouteType, &gwv1beta1.HTTPRoute{}},
{core.GrpcRouteType, &gwv1alpha2.GRPCRoute{}},
}

for _, routeInfo := range routeInfos {
reconciler := RouteReconciler{
reconciler := routeReconciler{
routeType: routeInfo.routeType,
log: log,
client: mgrClient,
Expand All @@ -126,9 +124,9 @@ func RegisterAllRouteControllers(

builder := ctrl.NewControllerManagedBy(mgr).
For(routeInfo.gatewayApiType).
Watches(&source.Kind{Type: &gateway_api_v1beta1.Gateway{}}, gwEventHandler).
Watches(&source.Kind{Type: &gwv1beta1.Gateway{}}, gwEventHandler).
Watches(&source.Kind{Type: &corev1.Service{}}, svcEventHandler.MapToRoute(routeInfo.routeType)).
Watches(&source.Kind{Type: &mcs_api.ServiceImport{}}, svcImportEventHandler.MapToRoute(routeInfo.routeType)).
Watches(&source.Kind{Type: &mcsv1alpha1.ServiceImport{}}, svcImportEventHandler.MapToRoute(routeInfo.routeType)).
Watches(&source.Kind{Type: &corev1.Endpoints{}}, svcEventHandler.MapToRoute(routeInfo.routeType))

if ok, err := k8s.IsGVKSupported(mgr, v1alpha1.GroupVersion.String(), v1alpha1.TargetGroupPolicyKind); ok {
Expand Down Expand Up @@ -162,20 +160,11 @@ func RegisterAllRouteControllers(
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes/status;httproutes/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes/finalizers;httproutes/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Route object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *routeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return lattice_runtime.HandleReconcileError(r.reconcile(ctx, req))
}

func (r *RouteReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
func (r *routeReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
route, err := r.getRoute(ctx, req)
if err != nil {
return client.IgnoreNotFound(err)
Expand All @@ -190,34 +179,29 @@ func (r *RouteReconciler) reconcile(ctx context.Context, req ctrl.Request) error
}

if !route.DeletionTimestamp().IsZero() {
r.log.Infow("reconcile, deleting", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Deleting Reconcile")
if err := r.cleanupRouteResources(ctx, route); err != nil {
return fmt.Errorf("failed to cleanup GRPCRoute %v, %v: %w", route.Name(), route.Namespace(), err)
}
err = updateRouteListenerStatus(ctx, r.client, route)
if err != nil {
return err
}
err = r.finalizerManager.RemoveFinalizers(ctx, route.K8sObject(), routeTypeToFinalizer[r.routeType])
if err != nil {
return err
}

// TODO delete metrics
return nil
return r.reconcileDelete(ctx, req, route)
} else {
r.log.Infow("reconcile, adding or updating", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Adding/Updating Reconcile")
err := r.reconcileRouteResource(ctx, route)
// TODO add/update metrics
return r.reconcileUpsert(ctx, req, route)
}
}

func (r *routeReconciler) reconcileDelete(ctx context.Context, req ctrl.Request, route core.Route) error {
r.log.Infow("reconcile, deleting", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Deleting Reconcile")

if err := r.cleanupRouteResources(ctx, route); err != nil {
return fmt.Errorf("failed to cleanup GRPCRoute %v, %v: %w", route.Name(), route.Namespace(), err)
}

if err := updateRouteListenerStatus(ctx, r.client, route); err != nil {
return err
}

return r.finalizerManager.RemoveFinalizers(ctx, route.K8sObject(), routeTypeToFinalizer[r.routeType])
}

func (r *RouteReconciler) getRoute(ctx context.Context, req ctrl.Request) (core.Route, error) {
func (r *routeReconciler) getRoute(ctx context.Context, req ctrl.Request) (core.Route, error) {
switch r.routeType {
case core.HttpRouteType:
return core.GetHTTPRoute(ctx, r.client, req.NamespacedName)
Expand All @@ -229,7 +213,7 @@ func (r *RouteReconciler) getRoute(ctx context.Context, req ctrl.Request) (core.
}

func updateRouteListenerStatus(ctx context.Context, k8sClient client.Client, route core.Route) error {
gw := &gateway_api_v1beta1.Gateway{}
gw := &gwv1beta1.Gateway{}

gwNamespace := route.Namespace()
if route.Spec().ParentRefs()[0].Namespace != nil {
Expand All @@ -248,18 +232,18 @@ func updateRouteListenerStatus(ctx context.Context, k8sClient client.Client, rou
return UpdateGWListenerStatus(ctx, k8sClient, gw)
}

func (r *RouteReconciler) cleanupRouteResources(ctx context.Context, route core.Route) error {
func (r *routeReconciler) cleanupRouteResources(ctx context.Context, route core.Route) error {
_, _, err := r.buildAndDeployModel(ctx, route)
return err
}

func (r *RouteReconciler) isRouteRelevant(ctx context.Context, route core.Route) bool {
func (r *routeReconciler) isRouteRelevant(ctx context.Context, route core.Route) bool {
if len(route.Spec().ParentRefs()) == 0 {
r.log.Infof("Ignore Route which has no ParentRefs gateway %v ", route.Name())
return false
}

gw := &gateway_api_v1beta1.Gateway{}
gw := &gwv1beta1.Gateway{}

gwNamespace := route.Namespace()
if route.Spec().ParentRefs()[0].Namespace != nil {
Expand All @@ -277,7 +261,7 @@ func (r *RouteReconciler) isRouteRelevant(ctx context.Context, route core.Route)
}

// make sure gateway is an aws-vpc-lattice
gwClass := &gateway_api_v1beta1.GatewayClass{}
gwClass := &gwv1beta1.GatewayClass{}
gwClassName := types.NamespacedName{
Namespace: defaultNameSpace,
Name: string(gw.Spec.GatewayClassName),
Expand All @@ -297,10 +281,10 @@ func (r *RouteReconciler) isRouteRelevant(ctx context.Context, route core.Route)
return false
}

func (r *RouteReconciler) buildAndDeployModel(
func (r *routeReconciler) buildAndDeployModel(
ctx context.Context,
route core.Route,
) (core.Stack, *latticemodel.Service, error) {
) (core.Stack, *model.Service, error) {
stack, latticeService, err := r.modelBuilder.Build(ctx, route)

if err != nil {
Expand Down Expand Up @@ -332,7 +316,11 @@ func (r *RouteReconciler) buildAndDeployModel(
return stack, latticeService, err
}

func (r *RouteReconciler) reconcileRouteResource(ctx context.Context, route core.Route) error {
func (r *routeReconciler) reconcileUpsert(ctx context.Context, req ctrl.Request, route core.Route) error {
r.log.Infow("reconcile, adding or updating", "name", req.Name)
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeNormal,
k8s.RouteEventReasonReconcile, "Adding/Updating Reconcile")

if err := r.finalizerManager.AddFinalizers(ctx, route.K8sObject(), routeTypeToFinalizer[r.routeType]); err != nil {
r.eventRecorder.Event(route.K8sObject(), corev1.EventTypeWarning, k8s.RouteEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
}
Expand All @@ -345,10 +333,10 @@ func (r *RouteReconciler) reconcileRouteResource(ctx context.Context, route core
route.Status().UpdateParentRefs(route.Spec().ParentRefs()[0], config.LatticeGatewayControllerName)

route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api.RouteConditionAccepted),
Type: string(gwv1beta1.RouteConditionAccepted),
Status: metav1.ConditionFalse,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api.RouteReasonUnsupportedValue),
Reason: string(gwv1beta1.RouteReasonUnsupportedValue),
Message: fmt.Sprintf("Dual stack Service is not supported"),
})

Expand Down Expand Up @@ -382,7 +370,7 @@ func (r *RouteReconciler) reconcileRouteResource(ctx context.Context, route core
return nil
}

func (r *RouteReconciler) updateRouteStatus(ctx context.Context, dns string, route core.Route) error {
func (r *routeReconciler) updateRouteStatus(ctx context.Context, dns string, route core.Route) error {
r.log.Debugf("Updating route %s-%s with DNS %s", route.Name(), route.Namespace(), dns)
routeOld := route.DeepCopy()

Expand All @@ -401,25 +389,25 @@ func (r *RouteReconciler) updateRouteStatus(ctx context.Context, dns string, rou
// Update listener Status
if err := updateRouteListenerStatus(ctx, r.client, route); err != nil {
route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api_v1beta1.RouteConditionAccepted),
Type: string(gwv1beta1.RouteConditionAccepted),
Status: metav1.ConditionFalse,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api_v1beta1.RouteReasonNoMatchingParent),
Reason: string(gwv1beta1.RouteReasonNoMatchingParent),
Message: fmt.Sprintf("Could not match gateway %s: %s", route.Spec().ParentRefs()[0].Name, err),
})
} else {
route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api_v1beta1.RouteConditionAccepted),
Type: string(gwv1beta1.RouteConditionAccepted),
Status: metav1.ConditionTrue,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api_v1beta1.RouteReasonAccepted),
Reason: string(gwv1beta1.RouteReasonAccepted),
Message: fmt.Sprintf("DNS Name: %s", dns),
})
route.Status().UpdateRouteCondition(metav1.Condition{
Type: string(gateway_api_v1beta1.RouteConditionResolvedRefs),
Type: string(gwv1beta1.RouteConditionResolvedRefs),
Status: metav1.ConditionTrue,
ObservedGeneration: route.K8sObject().GetGeneration(),
Reason: string(gateway_api_v1beta1.RouteReasonResolvedRefs),
Reason: string(gwv1beta1.RouteReasonResolvedRefs),
Message: fmt.Sprintf("DNS Name: %s", dns),
})
}
Expand All @@ -432,7 +420,7 @@ func (r *RouteReconciler) updateRouteStatus(ctx context.Context, dns string, rou
return nil
}

func (r *RouteReconciler) validateBackendRefsIpFamilies(ctx context.Context, route core.Route) error {
func (r *routeReconciler) validateBackendRefsIpFamilies(ctx context.Context, route core.Route) error {
rules := route.Spec().Rules()

for _, rule := range rules {
Expand All @@ -444,19 +432,8 @@ func (r *RouteReconciler) validateBackendRefsIpFamilies(ctx context.Context, rou
continue
}

svc := &corev1.Service{}

key := types.NamespacedName{
Name: string(backendRef.Name()),
}

if backendRef.Namespace() != nil {
key.Namespace = string(*backendRef.Namespace())
} else {
key.Namespace = route.Namespace()
}

if err := r.client.Get(ctx, key, svc); err != nil {
svc, err := gateway.GetServiceForBackendRef(ctx, r.client, route, backendRef)
if err != nil {
// Ignore error since Service might not be created yet
continue
}
Expand Down
Loading

0 comments on commit 939c64b

Please sign in to comment.