Skip to content

Commit

Permalink
Merge pull request #1 from mailgun/add-ttl
Browse files Browse the repository at this point in the history
PIP-405: Sinks can now accept an expire time
  • Loading branch information
thrawn01 authored Apr 10, 2019
2 parents 5b532d6 + e72846e commit b9520c6
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 64 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*~
.idea/
7 changes: 7 additions & 0 deletions byteview.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"io"
"strings"
"time"
)

// A ByteView holds an immutable view of bytes.
Expand All @@ -33,6 +34,12 @@ type ByteView struct {
// If b is non-nil, b is used, else s is used.
b []byte
s string
e time.Time
}

// Returns the expire time associated with this view
func (v ByteView) Expire() time.Time {
return v.e
}

// Len returns the view's length.
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/golang/groupcache

require github.com/golang/protobuf v1.3.1
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
14 changes: 12 additions & 2 deletions groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"

pb "github.com/golang/groupcache/groupcachepb"
"github.com/golang/groupcache/lru"
Expand Down Expand Up @@ -310,7 +311,16 @@ func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}

var expire time.Time
if res.Expire != nil && *res.Expire != 0 {
expire = time.Unix(*res.Expire/int64(time.Second), *res.Expire%int64(time.Second))
if time.Now().After(expire) {
return ByteView{}, errors.New("peer returned expired value")
}
}

value := ByteView{b: res.Value, e: expire}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
Expand Down Expand Up @@ -418,7 +428,7 @@ func (c *cache) add(key string, value ByteView) {
},
}
}
c.lru.Add(key, value)
c.lru.Add(key, value, value.Expire())
c.nbytes += int64(len(key)) + int64(value.Len())
}

Expand Down
38 changes: 31 additions & 7 deletions groupcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
)

var (
once sync.Once
stringGroup, protoGroup Getter
once sync.Once
stringGroup, protoGroup, expireGroup Getter

stringc = make(chan string)

Expand All @@ -52,6 +52,7 @@ var (
const (
stringGroupName = "string-group"
protoGroupName = "proto-group"
expireGroupName = "expire-group"
testMessageType = "google3/net/groupcache/go/test_proto.TestMessage"
fromChan = "from-chan"
cacheSize = 1 << 20
Expand All @@ -63,7 +64,7 @@ func testSetup() {
key = <-stringc
}
cacheFills.Add(1)
return dest.SetString("ECHO:" + key)
return dest.SetString("ECHO:"+key, time.Time{})
}))

protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error {
Expand All @@ -74,7 +75,12 @@ func testSetup() {
return dest.SetProto(&testpb.TestMessage{
Name: proto.String("ECHO:" + key),
City: proto.String("SOME-CITY"),
})
}, time.Time{})
}))

expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error {
cacheFills.Add(1)
return dest.SetString("ECHO:"+key, time.Now().Add(time.Millisecond*100))
}))
}

Expand Down Expand Up @@ -185,6 +191,24 @@ func TestCaching(t *testing.T) {
}
}

func TestCachingExpire(t *testing.T) {
once.Do(testSetup)
fills := countFills(func() {
for i := 0; i < 3; i++ {
var s string
if err := expireGroup.Get(dummyCtx, "TestCachingExpire-key", StringSink(&s)); err != nil {
t.Fatal(err)
}
if i == 1 {
time.Sleep(time.Millisecond * 150)
}
}
})
if fills != 2 {
t.Errorf("expected 2 cache fill; got %d", fills)
}
}

func TestCacheEviction(t *testing.T) {
once.Do(testSetup)
testKey := "TestCacheEviction-key"
Expand Down Expand Up @@ -261,7 +285,7 @@ func TestPeers(t *testing.T) {
localHits := 0
getter := func(_ Context, key string, dest Sink) error {
localHits++
return dest.SetString("got:" + key)
return dest.SetString("got:"+key, time.Time{})
}
testGroup := newGroup("TestPeers-group", cacheSize, GetterFunc(getter), peerList)
run := func(name string, n int, wantSummary string) {
Expand Down Expand Up @@ -345,7 +369,7 @@ func TestAllocatingByteSliceTarget(t *testing.T) {
sink := AllocatingByteSliceSink(&dst)

inBytes := []byte("some bytes")
sink.SetBytes(inBytes)
sink.SetBytes(inBytes, time.Time{})
if want := "some bytes"; string(dst) != want {
t.Errorf("SetBytes resulted in %q; want %q", dst, want)
}
Expand Down Expand Up @@ -388,7 +412,7 @@ func TestNoDedup(t *testing.T) {
const testkey = "testkey"
const testval = "testval"
g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error {
return dest.SetString(testval)
return dest.SetString(testval, time.Time{})
}), nil)

orderedGroup := &orderedFlightGroup{
Expand Down
70 changes: 58 additions & 12 deletions groupcachepb/groupcache.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions groupcachepb/groupcache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ message GetRequest {
message GetResponse {
optional bytes value = 1;
optional double minute_qps = 2;
optional int64 expire = 3;
}

service GroupCache {
Expand Down
18 changes: 15 additions & 3 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,27 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

group.Stats.ServerRequests.Add(1)
var value []byte
err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
var b []byte

value := AllocatingByteSliceSink(&b)
err := group.Get(ctx, key, value)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

view, err := value.view()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var expireNano int64
if !view.e.IsZero() {
expireNano = view.Expire().UnixNano()
}

// Write the value to the response body as a proto message.
body, err := proto.Marshal(&pb.GetResponse{Value: value})
body, err := proto.Marshal(&pb.GetResponse{Value: b, Expire: &expireNano})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func beChildForTestHTTPPool() {
p.Set(addrToURL(addrs)...)

getter := GetterFunc(func(ctx Context, key string, dest Sink) error {
dest.SetString(strconv.Itoa(*peerIndex) + ":" + key)
dest.SetString(strconv.Itoa(*peerIndex)+":"+key, time.Time{})
return nil
})
NewGroup("httpPoolTest", 1<<20, getter)
Expand Down
23 changes: 17 additions & 6 deletions lru/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
// Package lru implements an LRU cache.
package lru

import "container/list"
import (
"container/list"
"time"
)

// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
Expand All @@ -37,8 +40,9 @@ type Cache struct {
type Key interface{}

type entry struct {
key Key
value interface{}
key Key
value interface{}
expire time.Time
}

// New creates a new Cache.
Expand All @@ -53,7 +57,7 @@ func New(maxEntries int) *Cache {
}

// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
func (c *Cache) Add(key Key, value interface{}, expire time.Time) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
Expand All @@ -63,7 +67,7 @@ func (c *Cache) Add(key Key, value interface{}) {
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
ele := c.ll.PushFront(&entry{key, value, expire})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
Expand All @@ -76,8 +80,15 @@ func (c *Cache) Get(key Key) (value interface{}, ok bool) {
return
}
if ele, hit := c.cache[key]; hit {
entry := ele.Value.(*entry)
// If the entry has expired, remove it from the cache
if !entry.expire.IsZero() && entry.expire.Before(time.Now()) {
c.removeElement(ele)
return nil, false
}

c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
return entry.value, true
}
return
}
Expand Down
Loading

0 comments on commit b9520c6

Please sign in to comment.