Skip to content

Commit

Permalink
Merge pull request #5 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
Release Candidate 2.0
  • Loading branch information
thrawn01 authored Jun 3, 2019
2 parents 9dade90 + 054f217 commit 4b81d84
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ script:
- go test ./...

go:
- 1.9.x
- 1.10.x
- 1.11.x
- 1.12.x
- master

cache:
Expand Down
30 changes: 30 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.0.0] - 2019-05-30
### Changes
* Now using golang standard `context.Context` instead of `groupcache.Context`.
* HTTP requests made by `httpGetter` now respect `context.Context` done.
* Moved `HTTPPool` config `Context` and `Transport` to `HTTPPoolOptions` for consist configuration.
* Now Associating the transport with peer `httpGetter` so we take advantage of
connection reuse. This lowers the impact on DNS and improves performance for
high request volume low latency applications.
* Now always populating the hotcache. A more complex algorithm is unnecessary
when the LRU cache will ensure the most used values remain in the cache. The
evict code ensures the hotcache does not overcrowd the maincache.

## [1.3.0] - 2019-05-23
### Added
* Added `Remove()` method to `Group` to purge a key from the group.

## [1.1.0] - 2019-04-10
### Added
* Sinks can now accept an expire time
* Changed import path to mailgun/groupcache

## [hash 5b532d6fd5efaf7fa130d4e859a2fde0fc3a9e1b] - 2019-01-29
### Changes
* Initial import from https://github.com/golang/groupcache
29 changes: 13 additions & 16 deletions groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ limitations under the License.
package groupcache

import (
"context"
"errors"
"math/rand"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -45,13 +45,13 @@ type Getter interface {
// uniquely describe the loaded data, without an implicit
// current time, and without relying on cache expiration
// mechanisms.
Get(ctx Context, key string, dest Sink) error
Get(ctx context.Context, key string, dest Sink) error
}

// A GetterFunc implements Getter with a function.
type GetterFunc func(ctx Context, key string, dest Sink) error
type GetterFunc func(ctx context.Context, key string, dest Sink) error

func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
return f(ctx, key, dest)
}

Expand Down Expand Up @@ -210,7 +210,7 @@ func (g *Group) initPeers() {
}
}

func (g *Group) Get(ctx Context, key string, dest Sink) error {
func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
Expand Down Expand Up @@ -240,7 +240,7 @@ func (g *Group) Get(ctx Context, key string, dest Sink) error {

// Remove clears the key from our cache then forwards the remove
// request to all peers.
func (g *Group) Remove(ctx Context, key string) error {
func (g *Group) Remove(ctx context.Context, key string) error {
g.peersOnce.Do(g.initPeers)

_, err := g.removeGroup.Do(key, func() (interface{}, error) {
Expand Down Expand Up @@ -288,7 +288,7 @@ func (g *Group) Remove(ctx Context, key string) error {
}

// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
Expand Down Expand Up @@ -347,15 +347,15 @@ func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPo
return
}

func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
return dest.view()
}

func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
Expand All @@ -375,16 +375,13 @@ func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView
}

value := ByteView{b: res.Value, e: expire}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache)
}

// Always populate the hot cache
g.populateCache(key, value, &g.hotCache)
return value, nil
}

func (g *Group) removeFromPeer(ctx Context, peer ProtoGetter, key string) error {
func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
Expand Down
27 changes: 13 additions & 14 deletions groupcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ limitations under the License.
package groupcache

import (
"context"
"errors"
"fmt"
"hash/crc32"
"math/rand"
"reflect"
"sync"
"testing"
Expand All @@ -32,7 +32,7 @@ import (
"github.com/golang/protobuf/proto"

pb "github.com/mailgun/groupcache/groupcachepb"
testpb "github.com/mailgun/groupcache/testpb"
"github.com/mailgun/groupcache/testpb"
)

var (
Expand All @@ -41,7 +41,7 @@ var (

stringc = make(chan string)

dummyCtx Context
dummyCtx context.Context

// cacheFills is the number of times stringGroup or
// protoGroup's Getter have been called. Read using the
Expand All @@ -59,15 +59,15 @@ const (
)

func testSetup() {
stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error {
stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error {
if key == fromChan {
key = <-stringc
}
cacheFills.Add(1)
return dest.SetString("ECHO:"+key, time.Time{})
}))

protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error {
protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error {
if key == fromChan {
key = <-stringc
}
Expand All @@ -78,7 +78,7 @@ func testSetup() {
}, time.Time{})
}))

expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error {
expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error {
cacheFills.Add(1)
return dest.SetString("ECHO:"+key, time.Now().Add(time.Millisecond*100))
}))
Expand Down Expand Up @@ -254,7 +254,7 @@ type fakePeer struct {
fail bool
}

func (p *fakePeer) Get(_ Context, in *pb.GetRequest, out *pb.GetResponse) error {
func (p *fakePeer) Get(_ context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
p.hits++
if p.fail {
return errors.New("simulated error from peer")
Expand All @@ -263,7 +263,7 @@ func (p *fakePeer) Get(_ Context, in *pb.GetRequest, out *pb.GetResponse) error
return nil
}

func (p *fakePeer) Remove(_ Context, in *pb.GetRequest) error {
func (p *fakePeer) Remove(_ context.Context, in *pb.GetRequest) error {
p.hits++
if p.fail {
return errors.New("simulated error from peer")
Expand All @@ -288,14 +288,13 @@ func (p fakePeers) GetAll() []ProtoGetter {
// tests that peers (virtual, in-process) are hit, and how much.
func TestPeers(t *testing.T) {
once.Do(testSetup)
rand.Seed(123)
peer0 := &fakePeer{}
peer1 := &fakePeer{}
peer2 := &fakePeer{}
peerList := fakePeers([]ProtoGetter{peer0, peer1, peer2, nil})
const cacheSize = 0 // disabled
localHits := 0
getter := func(_ Context, key string, dest Sink) error {
getter := func(_ context.Context, key string, dest Sink) error {
localHits++
return dest.SetString("got:"+key, time.Time{})
}
Expand Down Expand Up @@ -338,9 +337,9 @@ func TestPeers(t *testing.T) {
resetCacheSize(1 << 20)
run("base", 200, "localHits = 49, peers = 51 49 51")

// Verify cache was hit. All localHits are gone, and some of
// the peer hits (the ones randomly selected to be maybe hot)
run("cached_base", 200, "localHits = 0, peers = 49 47 48")
// Verify cache was hit. All localHits and peers are gone as the hotCache has
// the data we need
run("cached_base", 200, "localHits = 0, peers = 0 0 0")
resetCacheSize(0)

// With one of the peers being down.
Expand Down Expand Up @@ -427,7 +426,7 @@ func (g *orderedFlightGroup) Lock(fn func()) {
func TestNoDedup(t *testing.T) {
const testkey = "testkey"
const testval = "testval"
g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error {
g := newGroup("testgroup", 1024, GetterFunc(func(_ context.Context, key string, dest Sink) error {
return dest.SetString(testval, time.Time{})
}), nil)

Expand Down
61 changes: 38 additions & 23 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package groupcache

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -37,16 +38,6 @@ const defaultReplicas = 50

// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
// Context optionally specifies a context for the server to use when it
// receives a request.
// If nil, the server uses a nil Context.
Context func(*http.Request) Context

// Transport optionally specifies an http.RoundTripper for the client
// to use when it makes a request.
// If nil, the client uses http.DefaultTransport.
Transport func(Context) http.RoundTripper

// this peer's base URL, e.g. "https://example.net:8000"
self string

Expand All @@ -71,6 +62,16 @@ type HTTPPoolOptions struct {
// HashFn specifies the hash function of the consistent hash.
// If blank, it defaults to crc32.ChecksumIEEE.
HashFn consistenthash.Hash

// Transport optionally specifies an http.RoundTripper for the client
// to use when it makes a request.
// If nil, the client uses http.DefaultTransport.
Transport func(context.Context) http.RoundTripper

// Context optionally specifies a context for the server to use when it
// receives a request.
// If nil, uses the http.Request.Context()
Context func(*http.Request) context.Context
}

// NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
Expand Down Expand Up @@ -123,7 +124,7 @@ func (p *HTTPPool) Set(peers ...string) {
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
p.httpGetters[peer] = &httpGetter{getTransport: p.opts.Transport, baseURL: peer + p.opts.BasePath}
}
}

Expand Down Expand Up @@ -172,9 +173,11 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
var ctx Context
if p.Context != nil {
ctx = p.Context(r)
var ctx context.Context
if p.opts.Context != nil {
ctx = p.opts.Context(r)
} else {
ctx = r.Context()
}

group.Stats.ServerRequests.Add(1)
Expand Down Expand Up @@ -215,15 +218,16 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

type httpGetter struct {
transport func(Context) http.RoundTripper
baseURL string
getTransport func(context.Context) http.RoundTripper
transport http.RoundTripper
baseURL string
}

var bufferPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}

func (h *httpGetter) makeRequest(context Context, method string, in *pb.GetRequest, out *http.Response) error {
func (h *httpGetter) makeRequest(ctx context.Context, method string, in *pb.GetRequest, out *http.Response) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
Expand All @@ -234,19 +238,30 @@ func (h *httpGetter) makeRequest(context Context, method string, in *pb.GetReque
if err != nil {
return err
}
tr := http.DefaultTransport
if h.transport != nil {
tr = h.transport(context)

// Pass along the context to the RoundTripper
req = req.WithContext(ctx)

// Associate the transport with this peer so we take advantage of connection reuse.
if h.transport == nil {
if h.getTransport != nil {
h.transport = h.getTransport(ctx)
}
// Ensure we have a copy of the default transport and not just a reference.
tr := http.DefaultTransport.(*http.Transport)
trCopy := http.Transport(*tr)
h.transport = &trCopy
}
res, err := tr.RoundTrip(req)

res, err := h.transport.RoundTrip(req)
if err != nil {
return err
}
*out = *res
return nil
}

func (h *httpGetter) Get(ctx Context, in *pb.GetRequest, out *pb.GetResponse) error {
func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
var res http.Response
if err := h.makeRequest(ctx, http.MethodGet, in, &res); err != nil {
return err
Expand All @@ -269,7 +284,7 @@ func (h *httpGetter) Get(ctx Context, in *pb.GetRequest, out *pb.GetResponse) er
return nil
}

func (h *httpGetter) Remove(ctx Context, in *pb.GetRequest) error {
func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error {
var res http.Response
if err := h.makeRequest(ctx, http.MethodDelete, in, &res); err != nil {
return err
Expand Down
Loading

0 comments on commit 4b81d84

Please sign in to comment.