Skip to content

Commit

Permalink
Add time logs
Browse files Browse the repository at this point in the history
  • Loading branch information
krhitesh7 committed Jan 17, 2025
1 parent ba2f3d3 commit d342c03
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
33 changes: 30 additions & 3 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("BatchUpsertResources took %s for %d nodes\n", elapsed, size)
if elapsed > 50*time.Millisecond {
fmt.Printf("BatchUpsertResources took %s for %d nodes\n", elapsed, size)
}
return nil
}

Expand Down Expand Up @@ -594,15 +596,25 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
for _, k := range info.orderedDeltaWatches {
wg.Add(1)
watch := info.deltaWatches[k.ID]
// One goroutine for each client request awaiting response
go func(k key, w DeltaResponseWatch) {
defer wg.Done()
ctxWithDeadline, cancel := context.WithDeadline(context.TODO(), time.Now().Add(10*time.Millisecond))
defer cancel()
start := time.Now()
// Max 10ms execution time to respond to one client request
res, err := cache.respondDelta(
ctx,
ctxWithDeadline,
snapshot,
w.Request,
w.Response,
w.StreamState,
)
elapsed := time.Since(start)
// Log if it takes more than 5ms
if elapsed > 5*time.Millisecond {
fmt.Printf("respondDelta took %s\n", elapsed)
}
if err != nil {
return
}
Expand All @@ -618,7 +630,9 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
delete(info.deltaWatches, id)
}
elapsed := time.Since(start)
fmt.Printf("respondDeltaWatches took %s for %d watches and node %s\n", elapsed, size, info.node.Id)
if elapsed > 50*time.Millisecond {
fmt.Printf("respondDeltaWatches took %s for %d watches and node %s\n", elapsed, size, info.node.Id)
}
} else {
for id, watch := range info.deltaWatches {
res, err := cache.respondDelta(
Expand Down Expand Up @@ -897,11 +911,24 @@ func GetEnvoyNodeStr(node *core.Node) string {

// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change.
func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) {
// Use snapshot.Mu.RLock() to ensure that the snapshot is not modified while we are reading it.
// Previously, we created copy of resources which was less efficient.
start := time.Now()
snapshot.(*Snapshot).Mu.RLock()
elapsedLock := time.Since(start)
if elapsedLock > 1*time.Millisecond {
fmt.Printf("respondDelta took %s to lock\n", elapsedLock)
}
resp := createDeltaResponse(ctx, request, state, resourceContainer{
resourceMap: snapshot.GetResourcesAndTTL(request.GetTypeUrl()),
versionMap: snapshot.GetVersionMap(request.GetTypeUrl()),
systemVersion: snapshot.GetVersion(request.GetTypeUrl()),
})
snapshot.(*Snapshot).Mu.RUnlock()
elapsed := time.Since(start)
if elapsed > 3*time.Millisecond {
fmt.Printf("createDeltaResponse took %s\n", elapsed)
}

// Only send a response if there were changes
// We want to respond immediately for the first wildcard request in a stream, even if the response is empty
Expand Down
20 changes: 2 additions & 18 deletions pkg/cache/v3/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,7 @@ func (s *Snapshot) GetResourcesAndTTL(typeURL resource.Type) map[string]VTMarsha
return nil
}

s.Mu.RLock()
defer s.Mu.RUnlock()

// create a copy of the items
items := make(map[string]VTMarshaledResource, len(s.Resources[typ].Items))
for k, v := range s.Resources[typ].Items {
items[k] = v
}
return items
return s.Resources[typ].Items
}

// GetVersion returns the version for a resource type.
Expand All @@ -163,15 +155,7 @@ func (s *Snapshot) GetVersion(typeURL resource.Type) string {

// GetVersionMap will return the internal version map of the currently applied snapshot.
func (s *Snapshot) GetVersionMap(typeURL string) map[string]string {
s.Mu.RLock()
defer s.Mu.RUnlock()

// create a copy of the version map
versionMap := make(map[string]string, len(s.VersionMap[typeURL]))
for k, v := range s.VersionMap[typeURL] {
versionMap[k] = v
}
return versionMap
return s.VersionMap[typeURL]
}

// ConstructVersionMap will construct a version map based on the current state of a snapshot
Expand Down

0 comments on commit d342c03

Please sign in to comment.