Skip to content

Commit

Permalink
Update pod readiness condition (#607)
Browse files Browse the repository at this point in the history
* Update pod readiness condition
* Address PR comments and add E2E test
  • Loading branch information
Doyoon Kim authored Mar 1, 2024
1 parent 982ef21 commit b73f671
Show file tree
Hide file tree
Showing 15 changed files with 616 additions and 58 deletions.
1 change: 1 addition & 0 deletions .github/workflows/e2e-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v3
- run: sed -En 's/^go[[:space:]]+([[:digit:].]+)$/GO_VERSION=\1/p' go.mod >> $GITHUB_ENV
- name: Setup Go
uses: actions/setup-go@v4
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/presubmit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: sed -En 's/^go[[:space:]]+([[:digit:].]+)$/GO_VERSION=\1/p' go.mod >> $GITHUB_ENV
- uses: actions/setup-go@v4
with:
go-version: ${{ env.GO_VERSION }}
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ func TestRouteReconciler_ReconcileCreates(t *testing.T) {
// we expect a fair number of lattice calls
mockLattice.EXPECT().ListTargetsAsList(ctx, gomock.Any()).Return(
[]*vpclattice.TargetSummary{}, nil)
mockLattice.EXPECT().ListTargetsAsList(ctx, gomock.Any()).Return(
[]*vpclattice.TargetSummary{
{
Id: aws.String("192.0.2.22"),
Port: aws.Int64(8090),
},
{
Id: aws.String("192.0.2.33"),
Port: aws.Int64(8090),
},
}, nil)
mockLattice.EXPECT().RegisterTargetsWithContext(ctx, gomock.Any()).Return(
&vpclattice.RegisterTargetsOutput{
Successful: []*vpclattice.Target{
Expand Down
98 changes: 60 additions & 38 deletions pkg/deploy/lattice/targets_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/vpclattice"

pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
"github.com/aws/aws-application-networking-k8s/pkg/utils"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"github.com/aws/aws-sdk-go/aws"
)

const (
Expand All @@ -23,6 +23,7 @@ const (
//go:generate mockgen -destination targets_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice TargetsManager

type TargetsManager interface {
List(ctx context.Context, modelTg *model.TargetGroup) ([]*vpclattice.TargetSummary, error)
Update(ctx context.Context, modelTargets *model.Targets, modelTg *model.TargetGroup) error
}

Expand All @@ -41,6 +42,14 @@ func NewTargetsManager(
}
}

func (s *defaultTargetsManager) List(ctx context.Context, modelTg *model.TargetGroup) ([]*vpclattice.TargetSummary, error) {
lattice := s.cloud.Lattice()
listTargetsInput := vpclattice.ListTargetsInput{
TargetGroupIdentifier: &modelTg.Status.Id,
}
return lattice.ListTargetsAsList(ctx, &listTargetsInput)
}

func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model.Targets, modelTg *model.TargetGroup) error {
if modelTg.Status == nil || modelTg.Status.Id == "" {
return errors.New("model target group is missing id")
Expand All @@ -50,44 +59,65 @@ func (s *defaultTargetsManager) Update(ctx context.Context, modelTargets *model.
modelTg.ID(), modelTargets.Spec.StackTargetGroupId)
}

// Only take care of pods that are ready, for backwards compatibility.
// TODO: Pod readiness support.
modelTargets.Spec.TargetList = utils.SliceFilter(modelTargets.Spec.TargetList, func(t model.Target) bool {
return t.Ready
})

s.log.Debugf("Creating targets for target group %s", modelTg.Status.Id)

lattice := s.cloud.Lattice()
listTargetsInput := vpclattice.ListTargetsInput{
TargetGroupIdentifier: &modelTg.Status.Id,
}
listTargetsOutput, err := lattice.ListTargetsAsList(ctx, &listTargetsInput)
latticeTargets, err := s.List(ctx, modelTg)
if err != nil {
return err
}
staleTargets := s.findStaleTargets(modelTargets, latticeTargets)

err1 := s.deregisterStaleTargets(ctx, modelTargets, modelTg, listTargetsOutput)
err2 := s.registerTargets(ctx, modelTargets, modelTg)
err1 := s.deregisterTargets(ctx, modelTg, staleTargets)
err2 := s.registerTargets(ctx, modelTg, modelTargets.Spec.TargetList)
return errors.Join(err1, err2)
}

func (s *defaultTargetsManager) findStaleTargets(
modelTargets *model.Targets,
listTargetsOutput []*vpclattice.TargetSummary) []model.Target {

// Disregard readiness information, and use IP/Port as key.
modelSet := utils.NewSet[model.Target]()
for _, target := range modelTargets.Spec.TargetList {
targetIpPort := model.Target{
TargetIP: target.TargetIP,
Port: target.Port,
}
modelSet.Put(targetIpPort)
}

staleTargets := make([]model.Target, 0)
for _, target := range listTargetsOutput {
ipPort := model.Target{
TargetIP: aws.StringValue(target.Id),
Port: aws.Int64Value(target.Port),
}
if aws.StringValue(target.Status) != vpclattice.TargetStatusDraining && !modelSet.Contains(ipPort) {
staleTargets = append(staleTargets, ipPort)
}
}
return staleTargets
}

func (s *defaultTargetsManager) registerTargets(
ctx context.Context,
modelTargets *model.Targets,
modelTg *model.TargetGroup,
targets []model.Target,
) error {
latticeTargets := utils.SliceMap(modelTargets.Spec.TargetList, func(t model.Target) *vpclattice.Target {
if len(targets) == 0 {
return nil
}
latticeTargets := utils.SliceMap(targets, func(t model.Target) *vpclattice.Target {
return &vpclattice.Target{Id: &t.TargetIP, Port: &t.Port}
})
chunks := utils.Chunks(latticeTargets, maxTargetsPerLatticeTargetsApiCall)
var registerTargetsError error
for i, targets := range chunks {
registerRouteInput := vpclattice.RegisterTargetsInput{
for i, chunk := range chunks {
registerTargetsInput := vpclattice.RegisterTargetsInput{
TargetGroupIdentifier: &modelTg.Status.Id,
Targets: targets,
Targets: chunk,
}
resp, err := s.cloud.Lattice().RegisterTargetsWithContext(ctx, &registerRouteInput)
resp, err := s.cloud.Lattice().RegisterTargetsWithContext(ctx, &registerTargetsInput)
if err != nil {
registerTargetsError = errors.Join(registerTargetsError, fmt.Errorf("Failed to register targets from VPC Lattice Target Group %s due to %s", modelTg.Status.Id, err))
}
Expand All @@ -101,32 +131,24 @@ func (s *defaultTargetsManager) registerTargets(
return registerTargetsError
}

func (s *defaultTargetsManager) deregisterStaleTargets(
func (s *defaultTargetsManager) deregisterTargets(
ctx context.Context,
modelTargets *model.Targets,
modelTg *model.TargetGroup,
listTargetsOutput []*vpclattice.TargetSummary,
targets []model.Target,
) error {
var targetsToDeregister []*vpclattice.Target
for _, latticeTarget := range listTargetsOutput {
isStale := true
for _, t := range modelTargets.Spec.TargetList {
if (aws.StringValue(latticeTarget.Id) == t.TargetIP) && (aws.Int64Value(latticeTarget.Port) == t.Port) {
isStale = false
break
}
}
if isStale {
targetsToDeregister = append(targetsToDeregister, &vpclattice.Target{Id: latticeTarget.Id, Port: latticeTarget.Port})
}
if len(targets) == 0 {
return nil
}
latticeTargets := utils.SliceMap(targets, func(t model.Target) *vpclattice.Target {
return &vpclattice.Target{Id: &t.TargetIP, Port: &t.Port}
})

chunks := utils.Chunks(targetsToDeregister, maxTargetsPerLatticeTargetsApiCall)
chunks := utils.Chunks(latticeTargets, maxTargetsPerLatticeTargetsApiCall)
var deregisterTargetsError error
for i, targets := range chunks {
for i, chunk := range chunks {
deregisterTargetsInput := vpclattice.DeregisterTargetsInput{
TargetGroupIdentifier: &modelTg.Status.Id,
Targets: targets,
Targets: chunk,
}
resp, err := s.cloud.Lattice().DeregisterTargetsWithContext(ctx, &deregisterTargetsInput)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/deploy/lattice/targets_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b73f671

Please sign in to comment.