Skip to content

Commit

Permalink
Enable caching of agent readiness success (#384)
Browse files Browse the repository at this point in the history
As described in #371, the agent has a readiness endpoint, which checks its dependencies' readiness. This endpoint is already implemented. However, depending on usage, the endpoint might be called very frequently, which would result in many redundant checks being performed (for example if the endpoint is queried once per second). To address this, we are adding the ability to cache readiness success for a certain amount of time. If the endpoint is queried and succeeds, it will continue reporting success without making any readiness checks until the readiness cache TTL expires. The TTL is is configurable through the .yaml config. Failures are not cached.
  • Loading branch information
Anton-Kalpakchiev authored Nov 25, 2024
1 parent 059a132 commit 1f6f59c
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 16 deletions.
17 changes: 16 additions & 1 deletion agent/agentserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/uber/kraken/build-index/tagclient"
"github.com/uber/kraken/core"
Expand All @@ -39,7 +40,10 @@ import (
)

// Config defines Server configuration.
type Config struct{}
type Config struct {
// How long a successful readiness check is valid for. If 0, disable caching successful readiness.
readinessCacheTTL time.Duration `yaml:"readiness_cache_ttl"`
}

// Server defines the agent HTTP server.
type Server struct {
Expand All @@ -50,6 +54,7 @@ type Server struct {
tags tagclient.Client
ac announceclient.Client
containerRuntime containerruntime.Factory
lastReady time.Time
}

// New creates a new Server.
Expand Down Expand Up @@ -208,6 +213,14 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error {
}

func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error {
if s.config.readinessCacheTTL != 0 {
rCacheValid := s.lastReady.Add(s.config.readinessCacheTTL).After(time.Now())
if rCacheValid {
io.WriteString(w, "OK")
return nil
}
}

var schedErr, buildIndexErr, trackerErr error
var wg sync.WaitGroup

Expand Down Expand Up @@ -236,6 +249,8 @@ func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) e
if len(errMsgs) != 0 {
return handler.Errorf("agent not ready: %v", strings.Join(errMsgs, "\n")).Status(http.StatusServiceUnavailable)
}

s.lastReady = time.Now()
io.WriteString(w, "OK")
return nil
}
Expand Down
133 changes: 118 additions & 15 deletions agent/agentserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ func newServerMocks(t *testing.T) (*serverMocks, func()) {
containerruntime, &cleanup}, cleanup.Run
}

func (m *serverMocks) startServer() string {
s := New(Config{}, tally.NoopScope, m.cads, m.sched, m.tags, m.ac, m.containerRuntime)
func (m *serverMocks) startServer(c Config) (*Server, string) {
s := New(c, tally.NoopScope, m.cads, m.sched, m.tags, m.ac, m.containerRuntime)
addr, stop := testutil.StartServer(s.Handler())
m.cleanup.Add(stop)
return addr
return s, addr
}

func TestGetTag(t *testing.T) {
Expand All @@ -95,8 +95,8 @@ func TestGetTag(t *testing.T) {
d := core.DigestFixture()

mocks.tags.EXPECT().Get(tag).Return(d, nil)

c := agentclient.New(mocks.startServer())
_, addr := mocks.startServer(Config{})
c := agentclient.New(addr)

result, err := c.GetTag(tag)
require.NoError(err)
Expand All @@ -113,7 +113,8 @@ func TestGetTagNotFound(t *testing.T) {

mocks.tags.EXPECT().Get(tag).Return(core.Digest{}, tagclient.ErrTagNotFound)

c := agentclient.New(mocks.startServer())
_, addr := mocks.startServer(Config{})
c := agentclient.New(addr)

_, err := c.GetTag(tag)
require.Error(err)
Expand All @@ -134,7 +135,7 @@ func TestDownload(t *testing.T) {
return store.RunDownload(mocks.cads, d, blob.Content)
})

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})
c := agentclient.New(addr)

r, err := c.Download(namespace, blob.Digest)
Expand All @@ -155,7 +156,7 @@ func TestDownloadNotFound(t *testing.T) {

mocks.sched.EXPECT().Download(namespace, blob.Digest).Return(scheduler.ErrTorrentNotFound)

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})
c := agentclient.New(addr)

_, err := c.Download(namespace, blob.Digest)
Expand All @@ -174,7 +175,7 @@ func TestDownloadUnknownError(t *testing.T) {

mocks.sched.EXPECT().Download(namespace, blob.Digest).Return(fmt.Errorf("test error"))

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})
c := agentclient.New(addr)

_, err := c.Download(namespace, blob.Digest)
Expand All @@ -199,7 +200,7 @@ func TestHealthHandler(t *testing.T) {

mocks.sched.EXPECT().Probe().Return(test.probeErr)

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})

_, err := httputil.Get(fmt.Sprintf("http://%s/health", addr))
if test.probeErr != nil {
Expand Down Expand Up @@ -265,7 +266,7 @@ func TestReadinessCheckHandler(t *testing.T) {
mocks.tags.EXPECT().CheckReadiness().Return(tc.buildIndexErr)
mocks.ac.EXPECT().CheckReadiness().Return(tc.trackerErr)

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})
_, err := httputil.Get(fmt.Sprintf("http://%s/readiness", addr))
if tc.wantErr == "" {
require.Nil(err)
Expand All @@ -276,13 +277,115 @@ func TestReadinessCheckHandler(t *testing.T) {
}
}

func TestReadinessCheckHandlerCache(t *testing.T) {
testErr := errors.New("test-err")

for _, tc := range []struct {
desc string
readinessCacheTTL time.Duration
waitInvalidation bool
wantFirstCallSuccess bool
wantSecondCallSuccess bool
setupMocks func(m *serverMocks)
}{
{
desc: "call 1 succeeds, so second call succeeds without checks",
readinessCacheTTL: 10 * time.Minute,
waitInvalidation: false,
wantFirstCallSuccess: true,
wantSecondCallSuccess: true,
setupMocks: func(m *serverMocks) {
m.sched.EXPECT().Probe().Return(nil)
m.tags.EXPECT().CheckReadiness().Return(nil)
m.ac.EXPECT().CheckReadiness().Return(nil)
},
},
{
desc: "call 1 fails, so second call performs checks",
readinessCacheTTL: 10 * time.Minute,
waitInvalidation: false,
wantFirstCallSuccess: false,
wantSecondCallSuccess: true,
setupMocks: func(m *serverMocks) {
m.sched.EXPECT().Probe().Return(testErr)
m.tags.EXPECT().CheckReadiness().Return(testErr)
m.ac.EXPECT().CheckReadiness().Return(testErr)

m.sched.EXPECT().Probe().Return(nil)
m.tags.EXPECT().CheckReadiness().Return(nil)
m.ac.EXPECT().CheckReadiness().Return(nil)
},
},
{
desc: "call 1 succeeds, but cache becomes invalid, so second call performs checks",
readinessCacheTTL: 10 * time.Minute,
waitInvalidation: true,
wantFirstCallSuccess: true,
wantSecondCallSuccess: false,
setupMocks: func(m *serverMocks) {
m.sched.EXPECT().Probe().Return(nil)
m.tags.EXPECT().CheckReadiness().Return(nil)
m.ac.EXPECT().CheckReadiness().Return(nil)

m.sched.EXPECT().Probe().Return(testErr)
m.tags.EXPECT().CheckReadiness().Return(testErr)
m.ac.EXPECT().CheckReadiness().Return(testErr)
},
},
{
desc: "call 1 succeeds, but caching is disabled, so second call performs checks",
readinessCacheTTL: 0,
waitInvalidation: false,
wantFirstCallSuccess: true,
wantSecondCallSuccess: false,
setupMocks: func(m *serverMocks) {
m.sched.EXPECT().Probe().Return(nil)
m.tags.EXPECT().CheckReadiness().Return(nil)
m.ac.EXPECT().CheckReadiness().Return(nil)

m.sched.EXPECT().Probe().Return(testErr)
m.tags.EXPECT().CheckReadiness().Return(testErr)
m.ac.EXPECT().CheckReadiness().Return(testErr)
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
mocks, cleanup := newServerMocks(t)
defer cleanup()

tc.setupMocks(mocks)

s, addr := mocks.startServer(Config{readinessCacheTTL: tc.readinessCacheTTL})
_, err := httputil.Get(fmt.Sprintf("http://%s/readiness", addr))
if tc.wantFirstCallSuccess {
require.NoError(err)
} else {
require.Error(err)
}

if tc.waitInvalidation {
// To avoid using time.Sleep, we can rollback the server's lastReady variable to simulate cache invalidation.
s.lastReady = s.lastReady.Add(-1 * tc.readinessCacheTTL)
}

_, err = httputil.Get(fmt.Sprintf("http://%s/readiness", addr))
if tc.wantSecondCallSuccess {
require.NoError(err)
} else {
require.Error(err)
}
})
}
}

func TestPatchSchedulerConfigHandler(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})

config := scheduler.Config{
ConnTTI: time.Minute,
Expand Down Expand Up @@ -311,7 +414,7 @@ func TestGetBlacklistHandler(t *testing.T) {
}}
mocks.sched.EXPECT().BlacklistSnapshot().Return(blacklist, nil)

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})

resp, err := httputil.Get(fmt.Sprintf("http://%s/x/blacklist", addr))
require.NoError(err)
Expand All @@ -329,7 +432,7 @@ func TestDeleteBlobHandler(t *testing.T) {

d := core.DigestFixture()

addr := mocks.startServer()
_, addr := mocks.startServer(Config{})

mocks.sched.EXPECT().RemoveTorrent(d).Return(nil)

Expand Down Expand Up @@ -381,7 +484,7 @@ func TestPreloadHandler(t *testing.T) {
defer cleanup()

tt.setup(mocks)
addr := mocks.startServer()
_, addr := mocks.startServer(Config{})

_, err := httputil.Get(fmt.Sprintf("http://%s%s", addr, tt.url))
if tt.expectedError != "" {
Expand Down

0 comments on commit 1f6f59c

Please sign in to comment.