Skip to content

Commit

Permalink
DB query coalescing - block other requests while refreshing the cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Jan 17, 2025
1 parent 9b84df2 commit a2a06b1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
37 changes: 31 additions & 6 deletions balancer/catabalancer/catalyst_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

_ "github.com/lib/pq"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/log"
"github.com/patrickmn/go-cache"
)

const (
stateCacheKey = "stateCacheKey"
stateCacheKey = "stateCacheKey"
dbQueryTimeout = 10 * time.Second
)

type CataBalancer struct {
Expand All @@ -27,6 +30,7 @@ type CataBalancer struct {
ingestStreamTimeout time.Duration
nodeStatsDB *sql.DB
nodeStatsCache *cache.Cache
cacheMutex sync.Mutex
}

type stats struct {
Expand Down Expand Up @@ -136,7 +140,7 @@ func (c *CataBalancer) UpdateMembers(ctx context.Context, members []cluster.Memb
}

func (c *CataBalancer) GetBestNode(ctx context.Context, redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string, isStudioReq bool) (string, string, error) {
s, err := c.refreshNodes()
s, err := c.refreshNodes(ctx)
if err != nil {
return "", "", fmt.Errorf("error refreshing nodes: %w", err)
}
Expand Down Expand Up @@ -291,10 +295,28 @@ func truncateReturned(scoredNodes []ScoredNode, numNodes int) []ScoredNode {
return scoredNodes[:numNodes]
}

func (c *CataBalancer) refreshNodes() (stats, error) {
func (c *CataBalancer) getCachedStats() (stats, bool) {
cachedState, found := c.nodeStatsCache.Get(stateCacheKey)
if found {
return *cachedState.(*stats), nil
return *cachedState.(*stats), true
}
return stats{}, false
}

func (c *CataBalancer) refreshNodes(ctx context.Context) (stats, error) {
cachedState, found := c.getCachedStats()
if found {
return cachedState, nil
}

c.cacheMutex.Lock()
defer c.cacheMutex.Unlock()

// check cache again since multiple requests can get an initial cache miss, the first one will populate
// the cache while the requests waiting behind it (with the cacheMutex) can use the new cached data
cachedState, found = c.getCachedStats()
if found {
return cachedState, nil
}

s := stats{
Expand All @@ -307,8 +329,11 @@ func (c *CataBalancer) refreshNodes() (stats, error) {
return s, fmt.Errorf("node stats DB was nil")
}

queryContext, cancel := context.WithTimeout(ctx, dbQueryTimeout)
defer cancel()

query := "SELECT stats FROM node_stats"
rows, err := c.nodeStatsDB.Query(query)
rows, err := c.nodeStatsDB.QueryContext(queryContext, query)
if err != nil {
return s, fmt.Errorf("failed to query node stats: %w", err)
}
Expand Down Expand Up @@ -366,7 +391,7 @@ func getPlaybackID(streamID string) string {
}

func (c *CataBalancer) MistUtilLoadSource(ctx context.Context, streamID, lat, lon string) (string, error) {
s, err := c.refreshNodes()
s, err := c.refreshNodes(ctx)
if err != nil {
return "", fmt.Errorf("error refreshing nodes: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion balancer/catabalancer/catalyst_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func TestStreamTimeout(t *testing.T) {
nodeStats := NodeUpdateEvent{NodeID: "node", NodeMetrics: NodeMetrics{Timestamp: time.Now()}}
nodeStats.SetStreams([]string{"video+stream"}, []string{"video+ingest"})
setNodeMetrics(t, mock, []NodeUpdateEvent{nodeStats})
s, err := c.refreshNodes()
s, err := c.refreshNodes(context.Background())
require.NoError(t, err)
setNodeMetrics(t, mock, []NodeUpdateEvent{nodeStats})

Expand Down

0 comments on commit a2a06b1

Please sign in to comment.