Skip to content

Commit

Permalink
Merge branch 'main' into high-availability-test
Browse files Browse the repository at this point in the history
  • Loading branch information
markmandel authored Feb 6, 2024
2 parents 4fa9b50 + 5f58fce commit 66faebd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 40 deletions.
38 changes: 20 additions & 18 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"

"agones.dev/agones/pkg/apis/agones"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
"agones.dev/agones/pkg/client/clientset/versioned"
Expand All @@ -37,19 +51,6 @@ import (
"agones.dev/agones/pkg/util/logfields"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
)

// Operation is a synchronisation action
Expand Down Expand Up @@ -1238,13 +1239,14 @@ func (s *SDKServer) updateList(ctx context.Context) error {

// Returns a new string list with the string keys in toDeleteValues removed from valuesList.
func deleteValues(valuesList []string, toDeleteValues map[string]bool) []string {
newList := make([]string, 0, len(valuesList)-len(toDeleteValues))
for i, val := range valuesList {
if _, ok := toDeleteValues[val]; !ok {
newList = append(newList, valuesList[i])
newValuesList := []string{}
for _, value := range valuesList {
if _, ok := toDeleteValues[value]; ok {
continue
}
newValuesList = append(newValuesList, value)
}
return newList
return newValuesList
}

// sendGameServerUpdate sends a watch game server event
Expand Down
42 changes: 20 additions & 22 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,16 +1437,8 @@ func TestSDKServerAddListValue(t *testing.T) {
require.NoError(t, err)
assert.NoError(t, sc.WaitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
err = sc.Run(ctx)
assert.NoError(t, err)
wg.Done()
}()
require.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))
sc.gsWaitForSync.Done()

// check initial value comes through
require.Eventually(t, func() bool {
Expand All @@ -1469,6 +1461,14 @@ func TestSDKServerAddListValue(t *testing.T) {
assert.Equal(t, testCase.want.Values, got.Values)
assert.Equal(t, testCase.want.Capacity, got.Capacity)

// start workerqueue processing at this point, so there is no chance of processing the above updates
// earlier.
sc.gsWaitForSync.Add(1)
go func() {
err = sc.Run(ctx)
assert.NoError(t, err)
}()

// on an update, confirm that the update hits the K8s api
if testCase.updated {
select {
Expand All @@ -1487,7 +1487,6 @@ func TestSDKServerAddListValue(t *testing.T) {
assert.Equal(t, testCase.expectedUpdatesQueueLen, glu)

cancel()
wg.Wait()
})
}
}
Expand Down Expand Up @@ -1586,16 +1585,8 @@ func TestSDKServerRemoveListValue(t *testing.T) {
require.NoError(t, err)
assert.NoError(t, sc.WaitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
err = sc.Run(ctx)
assert.NoError(t, err)
wg.Done()
}()
require.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))
sc.gsWaitForSync.Done()

// check initial value comes through
require.Eventually(t, func() bool {
Expand All @@ -1618,6 +1609,14 @@ func TestSDKServerRemoveListValue(t *testing.T) {
assert.Equal(t, testCase.want.Values, got.Values)
assert.Equal(t, testCase.want.Capacity, got.Capacity)

// start workerqueue processing at this point, so there is no chance of processing the above updates
// earlier.
sc.gsWaitForSync.Add(1)
go func() {
err = sc.Run(ctx)
assert.NoError(t, err)
}()

// on an update, confirm that the update hits the K8s api
if testCase.updated {
select {
Expand All @@ -1636,7 +1635,6 @@ func TestSDKServerRemoveListValue(t *testing.T) {
assert.Equal(t, testCase.expectedUpdatesQueueLen, glu)

cancel()
wg.Wait()
})
}
}
Expand Down

0 comments on commit 66faebd

Please sign in to comment.