Skip to content

Commit

Permalink
Return error and reject requests
Browse files Browse the repository at this point in the history
  • Loading branch information
AlessandroPatti committed Sep 1, 2023
1 parent 8a9d9d0 commit e72ba2b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 42 deletions.
6 changes: 2 additions & 4 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,8 @@ func (c *diskCache) commit(key string, legacy bool, tempfile string, reservedSiz
random: random,
}

if !c.lru.Add(key, newItem) {
err = fmt.Errorf("INTERNAL ERROR: failed to add: %s, size %d (on disk: %d)",
key, logicalSize, sizeOnDisk)
log.Println(err.Error())
if err := c.lru.Add(key, newItem); err != nil {
log.Println(err)
return unreserve, removeTempfile, err
}

Expand Down
3 changes: 2 additions & 1 deletion cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,10 @@ func TestCacheExistingFiles(t *testing.T) {

evicted := []Key{}
origOnEvict := testCache.lru.onEvict
testCache.lru.onEvict = func(key Key, value lruItem) {
testCache.lru.onEvict = func(key Key, value lruItem) error {
evicted = append(evicted, key.(string))
origOnEvict(key, value)
return nil
}

if testCache.lru.Len() != 4 {
Expand Down
15 changes: 7 additions & 8 deletions cache/disk/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,14 +599,15 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
// The eviction callback deletes the file from disk.
// This function is only called while the lock is held
// by the current goroutine.
onEvict := func(key Key, value lruItem) {
onEvict := func(key Key, value lruItem) error {
select {
case evictionQueue <- EvictionTask{Key: key, lruItem: value}:
c.evictionGauge.Inc()
c.evictionBytesGauge.Add(float64(value.sizeOnDisk))
return nil
default:
c.evictionCounter.WithLabelValues("full").Inc()
log.Printf("Too many enqueued evictions, could not evict %s", key)
return fmt.Errorf("Too many enqueued evictions, could not evict %s", key)
}
}

Expand All @@ -615,12 +616,10 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
c.lru = NewSizedLRU(maxSizeBytes, onEvict, len(result.item))

for i := 0; i < len(result.item); i++ {
ok := c.lru.Add(result.metadata[i].lookupKey, *result.item[i])
if !ok {
err = os.Remove(filepath.Join(c.dir, result.metadata[i].lookupKey))
if err != nil {
return err
}
err := c.lru.Add(result.metadata[i].lookupKey, *result.item[i])
if err != nil {
_ = os.Remove(filepath.Join(c.dir, result.metadata[i].lookupKey))
return err
}
}

Expand Down
43 changes: 30 additions & 13 deletions cache/disk/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type Key interface{}

// EvictCallback is the type of callbacks that are invoked when items are evicted.
type EvictCallback func(key Key, value lruItem)
type EvictCallback func(key Key, value lruItem) error

// SizedLRU is an LRU cache that will keep its total size below maxSize by evicting
// items.
Expand Down Expand Up @@ -101,25 +101,27 @@ func (c *SizedLRU) RegisterMetrics() {
// Note that this function rounds file sizes up to the nearest
// BlockSize (4096) bytes, as an estimate of actual disk usage since
// most linux filesystems default to 4kb blocks.
func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
func (c *SizedLRU) Add(key Key, value lruItem) error {

roundedUpSizeOnDisk := roundUp4k(value.sizeOnDisk)

if roundedUpSizeOnDisk > c.maxSize {
return false
return fmt.Errorf("Unable to reserve space for blob (size: %d) larger than cache size %d", roundedUpSizeOnDisk, c.maxSize)
}

var sizeDelta, uncompressedSizeDelta int64
if ee, ok := c.cache[key]; ok {
sizeDelta = roundedUpSizeOnDisk - roundUp4k(ee.Value.(*entry).value.sizeOnDisk)
if c.reservedSize+sizeDelta > c.maxSize {
return false
return fmt.Errorf("INTERNAL ERROR: not enough space for blob with size %d (undersized cache?)", value.size)
}
uncompressedSizeDelta = roundUp4k(value.size) - roundUp4k(ee.Value.(*entry).value.size)

prevValue := ee.Value.(*entry).value
if c.onEvict != nil {
c.onEvict(key, prevValue)
if err := c.onEvict(key, prevValue); err != nil {
return err
}
}

c.ll.MoveToFront(ee)
Expand All @@ -128,7 +130,7 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
} else {
sizeDelta = roundedUpSizeOnDisk
if c.reservedSize+sizeDelta > c.maxSize {
return false
return fmt.Errorf("INTERNAL ERROR: unable to reclaim enough space for blob with size %d (undersized cache?)", value.size)
}
uncompressedSizeDelta = roundUp4k(value.size)
ele := c.ll.PushFront(&entry{key, value})
Expand All @@ -140,7 +142,11 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
for c.currentSize+sizeDelta > c.maxSize {
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
err := c.removeElement(ele)
if err != nil {
delete(c.cache, key)
return err
}
}
}

Expand All @@ -150,7 +156,7 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
c.gaugeCacheSizeBytes.Set(float64(c.currentSize))
c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize))

return true
return nil
}

// Get looks up a key in the cache
Expand All @@ -164,12 +170,16 @@ func (c *SizedLRU) Get(key Key) (value lruItem, ok bool) {
}

// Remove removes a (key, value) from the cache
func (c *SizedLRU) Remove(key Key) {
func (c *SizedLRU) Remove(key Key) error {
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
err := c.removeElement(ele)
if err != nil {
return err
}
c.gaugeCacheSizeBytes.Set(float64(c.currentSize))
c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize))
}
return nil
}

// Len returns the number of items in the cache
Expand Down Expand Up @@ -234,7 +244,10 @@ func (c *SizedLRU) Reserve(size int64) (bool, error) {
for sumLargerThan(size, c.currentSize, c.maxSize) {
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
err := c.removeElement(ele)
if err != nil {
return false, err
}
} else {
return false, errReservation // This should have been caught at the start.
}
Expand Down Expand Up @@ -267,7 +280,7 @@ func (c *SizedLRU) Unreserve(size int64) error {
return nil
}

func (c *SizedLRU) removeElement(e *list.Element) {
func (c *SizedLRU) removeElement(e *list.Element) error {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
Expand All @@ -276,8 +289,12 @@ func (c *SizedLRU) removeElement(e *list.Element) {
c.counterEvictedBytes.Add(float64(kv.value.sizeOnDisk))

if c.onEvict != nil {
c.onEvict(kv.key, kv.value)
err := c.onEvict(kv.key, kv.value)
if err != nil {
return err
}
}
return nil
}

// Round n up to the nearest multiple of BlockSize (4096).
Expand Down
39 changes: 23 additions & 16 deletions cache/disk/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func TestBasics(t *testing.T) {
// Add an item
aKey := "akey"
anItem := lruItem{size: 5, sizeOnDisk: 5}
ok = lru.Add(aKey, anItem)
if !ok {
t.Fatalf("Add: failed inserting item")
err := lru.Add(aKey, anItem)
if err != nil {
t.Fatalf("Add: failed inserting item: %s", err)
}

getItem, getOk := lru.Get(aKey)
Expand All @@ -53,15 +53,19 @@ func TestBasics(t *testing.T) {
checkSizeAndNumItems(t, lru, BlockSize, 1)

// Remove the item
lru.Remove(aKey)
err = lru.Remove(aKey)
if err != nil {
t.Fatal(err)
}
checkSizeAndNumItems(t, lru, 0, 0)
}

func TestEviction(t *testing.T) {
// Keep track of evictions using the callback
var evictions []int
onEvict := func(key Key, value lruItem) {
onEvict := func(key Key, value lruItem) error {
evictions = append(evictions, key.(int))
return nil
}

lru := NewSizedLRU(10*BlockSize, onEvict, 0)
Expand All @@ -85,9 +89,9 @@ func TestEviction(t *testing.T) {

for i, thisExpected := range expectedSizesNumItems {
item := lruItem{size: int64(i) * BlockSize, sizeOnDisk: int64(i) * BlockSize}
ok := lru.Add(i, item)
if !ok {
t.Fatalf("Add: failed adding %d", i)
err := lru.Add(i, item)
if err != nil {
t.Fatalf("Add: failed adding %d: %s", i, err)
}

checkSizeAndNumItems(t, lru, thisExpected.expBlocks*BlockSize, thisExpected.expNumItems)
Expand All @@ -103,8 +107,8 @@ func TestRejectBigItem(t *testing.T) {
// Bounded caches should reject big items
lru := NewSizedLRU(10, nil, 0)

ok := lru.Add("hello", lruItem{size: 11, sizeOnDisk: 11})
if ok {
err := lru.Add("hello", lruItem{size: 11, sizeOnDisk: 11})
if err == nil {
t.Fatalf("Add succeeded, expected it to fail")
}

Expand All @@ -115,7 +119,10 @@ func TestReserveZeroAlwaysPossible(t *testing.T) {
largeItem := lruItem{size: math.MaxInt64, sizeOnDisk: math.MaxInt64}

lru := NewSizedLRU(math.MaxInt64, nil, 0)
lru.Add("foo", largeItem)
err := lru.Add("foo", largeItem)
if err != nil {
t.Fatal(err)
}
ok, err := lru.Reserve(0)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -246,8 +253,8 @@ func TestAddWithSpaceReserved(t *testing.T) {
t.Fatalf("Expected to be able to reserve 1")
}

ok = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if ok {
err = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if err == nil {
t.Fatal("Expected to not be able to add item with size 2")
}

Expand All @@ -256,8 +263,8 @@ func TestAddWithSpaceReserved(t *testing.T) {
t.Fatal("Expected to be able to unreserve 1:", err)
}

ok = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if !ok {
t.Fatal("Expected to be able to add item with size 2")
err = lru.Add("hello", lruItem{size: 2, sizeOnDisk: 2})
if err != nil {
t.Fatalf("Expected to be able to add item with size 2: %s", err)
}
}

0 comments on commit e72ba2b

Please sign in to comment.