Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move global state to explicit state 'workspace' controlled by the caller. #65

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
module github.com/mailgun/groupcache/v2

go 1.19
go 1.21

require (
github.com/golang/protobuf v1.5.2
github.com/golang/protobuf v1.5.3
github.com/segmentio/fasthash v1.0.3
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/protobuf v1.28.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
golang.org/x/sys v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
40 changes: 24 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,37 +1,45 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
97 changes: 62 additions & 35 deletions groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,32 @@ func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
return f(ctx, key, dest)
}

var (
mu sync.RWMutex
groups = make(map[string]*Group)

initPeerServerOnce sync.Once
initPeerServer func()
)
// GetGroupWithWorkspace returns the named group previously created with NewGroup, or
// nil if there's no such group.
func GetGroupWithWorkspace(ws *workspace, name string) *Group {
ws.mu.RLock()
g := ws.groups[name]
ws.mu.RUnlock()
return g
}

// GetGroup returns the named group previously created with NewGroup, or
// nil if there's no such group.
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
return GetGroupWithWorkspace(DefaultWorkspace, name)
}

// NewGroupWithWorkspace creates a coordinated group-aware Getter from a Getter.
//
// The returned Getter tries (but does not guarantee) to run only one
// Get call at once for a given key across an entire set of peer
// processes. Concurrent callers both in the local process and in
// other processes receive copies of the answer once the original Get
// completes.
//
// The group name must be unique for each getter.
func NewGroupWithWorkspace(ws *workspace, name string, cacheBytes int64, getter Getter) *Group {
return newGroup(ws, name, cacheBytes, getter, nil)
}

// NewGroup creates a coordinated group-aware Getter from a Getter.
Expand All @@ -95,28 +106,34 @@ func GetGroup(name string) *Group {
//
// The group name must be unique for each getter.
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
return newGroup(name, cacheBytes, getter, nil)
return newGroup(DefaultWorkspace, name, cacheBytes, getter, nil)
}

// DeregisterGroupWithWorkspace removes group from group pool
func DeregisterGroupWithWorkspace(ws *workspace, name string) {
ws.mu.Lock()
delete(ws.groups, name)
ws.mu.Unlock()
}

// DeregisterGroup removes group from group pool
func DeregisterGroup(name string) {
mu.Lock()
delete(groups, name)
mu.Unlock()
DeregisterGroupWithWorkspace(DefaultWorkspace, name)
}

// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
func newGroup(ws *workspace, name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
initPeerServerOnce.Do(callInitPeerServer)
if _, dup := groups[name]; dup {
ws.mu.Lock()
defer ws.mu.Unlock()
ws.initPeerServerOnce.Do(func() { callInitPeerServer(ws) })
if _, dup := ws.groups[name]; dup {
panic("duplicate registration of group " + name)
}
g := &Group{
ws: ws,
name: name,
getter: getter,
peers: peers,
Expand All @@ -125,43 +142,53 @@ func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *G
setGroup: &singleflight.Group{},
removeGroup: &singleflight.Group{},
}
if fn := newGroupHook; fn != nil {
if fn := ws.newGroupHook; fn != nil {
fn(g)
}
groups[name] = g
ws.groups[name] = g
return g
}

// newGroupHook, if non-nil, is called right after a new group is created.
var newGroupHook func(*Group)
// RegisterNewGroupHookWithWorkspace registers a hook that is run each time
// a group is created.
func RegisterNewGroupHookWithWorkspace(ws *workspace, fn func(*Group)) {
if ws.newGroupHook != nil {
panic("RegisterNewGroupHook called more than once")
}
ws.newGroupHook = fn
}

// RegisterNewGroupHook registers a hook that is run each time
// a group is created.
func RegisterNewGroupHook(fn func(*Group)) {
if newGroupHook != nil {
panic("RegisterNewGroupHook called more than once")
RegisterNewGroupHookWithWorkspace(DefaultWorkspace, fn)
}

// RegisterServerStartWithWorkspace registers a hook that is run when the first
// group is created.
func RegisterServerStartWithWorkspace(ws *workspace, fn func()) {
if ws.initPeerServer != nil {
panic("RegisterServerStart called more than once")
}
newGroupHook = fn
ws.initPeerServer = fn
}

// RegisterServerStart registers a hook that is run when the first
// group is created.
func RegisterServerStart(fn func()) {
if initPeerServer != nil {
panic("RegisterServerStart called more than once")
}
initPeerServer = fn
RegisterServerStartWithWorkspace(DefaultWorkspace, fn)
}

func callInitPeerServer() {
if initPeerServer != nil {
initPeerServer()
func callInitPeerServer(ws *workspace) {
if ws.initPeerServer != nil {
ws.initPeerServer()
}
}

// A Group is a cache namespace and associated data loaded spread over
// a group of 1 or more machines.
type Group struct {
ws *workspace
name string
getter Getter
peersOnce sync.Once
Expand Down Expand Up @@ -232,7 +259,7 @@ func (g *Group) Name() string {

func (g *Group) initPeers() {
if g.peers == nil {
g.peers = getPeers(g.name)
g.peers = getPeers(g.ws, g.name)
}
}

Expand Down
6 changes: 3 additions & 3 deletions groupcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestPeers(t *testing.T) {
localHits++
return dest.SetString("got:"+key, time.Time{})
}
testGroup := newGroup("TestPeers-group", cacheSize, GetterFunc(getter), peerList)
testGroup := newGroup(DefaultWorkspace, "TestPeers-group", cacheSize, GetterFunc(getter), peerList)
run := func(name string, n int, wantSummary string) {
// Reset counters
localHits = 0
Expand Down Expand Up @@ -438,7 +438,7 @@ func (g *orderedFlightGroup) Lock(fn func()) {
func TestNoDedup(t *testing.T) {
const testkey = "testkey"
const testval = "testval"
g := newGroup("testgroup", 1024, GetterFunc(func(_ context.Context, key string, dest Sink) error {
g := newGroup(DefaultWorkspace, "testgroup", 1024, GetterFunc(func(_ context.Context, key string, dest Sink) error {
return dest.SetString(testval, time.Time{})
}), nil)

Expand Down Expand Up @@ -522,7 +522,7 @@ func TestContextDeadlineOnPeer(t *testing.T) {
getter := func(_ context.Context, key string, dest Sink) error {
return dest.SetString("got:"+key, time.Time{})
}
testGroup := newGroup("TestContextDeadlineOnPeer-group", cacheSize, GetterFunc(getter), peerList)
testGroup := newGroup(DefaultWorkspace, "TestContextDeadlineOnPeer-group", cacheSize, GetterFunc(getter), peerList)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300)
defer cancel()
Expand Down
36 changes: 26 additions & 10 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const defaultReplicas = 50

// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
ws *workspace

// this peer's base URL, e.g. "https://example.net:8000"
self string

Expand Down Expand Up @@ -76,28 +78,35 @@ type HTTPPoolOptions struct {
Context func(*http.Request) context.Context
}

// NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
// NewHTTPPoolWithWorkspace initializes an HTTP pool of peers, and registers itself as a PeerPicker.
// For convenience, it also registers itself as an http.Handler with http.DefaultServeMux.
// The self argument should be a valid base URL that points to the current server,
// for example "http://example.net:8000".
func NewHTTPPool(self string) *HTTPPool {
p := NewHTTPPoolOpts(self, nil)
func NewHTTPPoolWithWorkspace(ws *workspace, self string) *HTTPPool {
p := NewHTTPPoolOptsWithWorkspace(ws, self, nil)
http.Handle(p.opts.BasePath, p)
return p
}

var httpPoolMade bool
// NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
// For convenience, it also registers itself as an http.Handler with http.DefaultServeMux.
// The self argument should be a valid base URL that points to the current server,
// for example "http://example.net:8000".
func NewHTTPPool(self string) *HTTPPool {
return NewHTTPPoolWithWorkspace(DefaultWorkspace, self)
}

// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// NewHTTPPoolOptsWithWorkspace initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
if httpPoolMade {
func NewHTTPPoolOptsWithWorkspace(ws *workspace, self string, o *HTTPPoolOptions) *HTTPPool {
if ws.httpPoolMade {
panic("groupcache: NewHTTPPool must be called only once")
}
httpPoolMade = true
ws.httpPoolMade = true

p := &HTTPPool{
ws: ws,
self: self,
httpGetters: make(map[string]*httpGetter),
}
Expand All @@ -112,10 +121,17 @@ func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
}
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)

RegisterPeerPicker(func() PeerPicker { return p })
RegisterPeerPickerWithWorkspace(ws, func() PeerPicker { return p })
return p
}

// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
return NewHTTPPoolOptsWithWorkspace(DefaultWorkspace, self, o)
}

// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
Expand Down Expand Up @@ -173,7 +189,7 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := parts[1]

// Fetch the value for this group/key.
group := GetGroup(groupName)
group := GetGroupWithWorkspace(p.ws, groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
Expand Down
Loading
Loading