diff --git a/item/item_property_test.go b/item/item_property_test.go index eb672d0..a6efbbd 100644 --- a/item/item_property_test.go +++ b/item/item_property_test.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/QuangTung97/go-memcache/memcache" "github.com/QuangTung97/memproxy" + "github.com/QuangTung97/memproxy/proxy" "github.com/stretchr/testify/assert" "math/rand" "sync" @@ -81,6 +82,37 @@ func newItemPropertyTest(t *testing.T) *itemPropertyTest { return p } +func newItemPropertyTestWithProxy(t *testing.T) *itemPropertyTest { + p := &itemPropertyTest{} + + client, err := memcache.New("localhost:11211", 3) + if err != nil { + panic(err) + } + t.Cleanup(func() { _ = client.Close() }) + p.client = client + + servers := []proxy.SimpleServerConfig{ + { + Host: "localhost", + Port: 11211, + }, + } + mc, closeFunc, err := proxy.NewSimpleReplicatedMemcache( + servers, 3, + proxy.NewSimpleStats(servers), + ) + if err != nil { + panic(err) + } + t.Cleanup(closeFunc) + p.mc = mc + + p.sessProvider = memproxy.NewSessionProvider() + + return p +} + func (p *itemPropertyTest) testConsistency(t *testing.T) { var wg sync.WaitGroup @@ -154,4 +186,17 @@ func TestProperty_SingleKey(t *testing.T) { p.testConsistency(t) } }) + + t.Run("with-proxy", func(t *testing.T) { + seed := time.Now().UnixNano() + rand.Seed(seed) + fmt.Println("SEED:", seed) + + p := newItemPropertyTestWithProxy(t) + + for i := 0; i < 100; i++ { + p.flushAll() + p.testConsistency(t) + } + }) } diff --git a/proxy/proxy.go b/proxy/proxy.go index 9293966..8ddbb1e 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -46,7 +46,12 @@ type Pipeline struct { //revive:disable-next-line:nested-structs needExecServerSet map[ServerID]struct{} - leaseSetServers map[string]ServerID + leaseSetServers map[string]leaseSetState +} + +type leaseSetState struct { + valid bool // for preventing a special race condition + serverID ServerID } // Pipeline ... @@ -64,7 +69,7 @@ func (m *Memcache) Pipeline( pipelines: map[ServerID]memproxy.Pipeline{}, - leaseSetServers: map[string]ServerID{}, + leaseSetServers: map[string]leaseSetState{}, } } @@ -115,7 +120,20 @@ func (p *Pipeline) setKeyForLeaseSet( serverID ServerID, ) { if resp.Status == memproxy.LeaseGetStatusLeaseGranted || resp.Status == memproxy.LeaseGetStatusLeaseRejected { - p.leaseSetServers[key] = serverID + prev, ok := p.leaseSetServers[key] + if ok { + if prev.serverID != serverID { + prev.valid = false + p.leaseSetServers[key] = prev + return + } + return + } + + p.leaseSetServers[key] = leaseSetState{ + valid: true, + serverID: serverID, + } } } @@ -171,13 +189,13 @@ func (p *Pipeline) LeaseSet( key string, data []byte, cas uint64, options memproxy.LeaseSetOptions, ) func() (memproxy.LeaseSetResponse, error) { - serverID, ok := p.leaseSetServers[key] - if !ok { + setState, ok := p.leaseSetServers[key] + if !ok || !setState.valid { return func() (memproxy.LeaseSetResponse, error) { return memproxy.LeaseSetResponse{}, nil } } - pipe := p.getRoutePipeline(serverID) + pipe := p.getRoutePipeline(setState.serverID) return pipe.LeaseSet(key, data, cas, options) } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 588a9a0..0b2ab08 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -180,6 +180,23 @@ func (p *pipelineTest) stubPipeLeaseGet(pipe *mocks.PipelineMock, resp memproxy. } } +func (p *pipelineTest) stubLeaseGetMulti( + pipe *mocks.PipelineMock, + respList []memproxy.LeaseGetResponse, + errList []error, +) { + pipe.LeaseGetFunc = func( + key string, options memproxy.LeaseGetOptions, + ) func() (memproxy.LeaseGetResponse, error) { + index := len(pipe.LeaseGetCalls()) - 1 + p.appendAction(leaseGetAction(key)) + return func() (memproxy.LeaseGetResponse, error) { + p.appendAction(leaseGetFuncAction(key)) + return respList[index], errList[index] + } + } +} + func (p *pipelineTest) stubLeaseGet1(resp memproxy.LeaseGetResponse, err error) { p.stubPipeLeaseGet(p.pipe1, resp, err) } @@ -497,6 +514,176 @@ func TestPipeline__LeaseGet_Then_Set(t *testing.T) { leaseSetFuncAction("KEY01"), }, p.actions) }) + + t.Run("lease-get-granted--then-failover--then-lease-get-again--then-set", func(t *testing.T) { + p := newPipelineTest(t) + + const cas1 = 2255 + const cas2 = 2266 + + p.stubHasNextAvail(true) + + p.stubSelect(serverID1, serverID1, serverID2) + p.stubLeaseGetMulti( + p.pipe1, + []memproxy.LeaseGetResponse{ + { + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas1, + }, + {}, + }, + []error{ + nil, + errors.New("server error"), + }, + ) + p.stubLeaseGetMulti( + p.pipe2, + []memproxy.LeaseGetResponse{ + { + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas2, + }, + }, + []error{ + nil, + }, + ) + + fn1 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{}) + fn2 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{}) + + resp1, err := fn1() + assert.Equal(t, nil, err) + assert.Equal(t, memproxy.LeaseGetResponse{ + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas1, + }, resp1) + + resp2, err := fn2() + assert.Equal(t, nil, err) + assert.Equal(t, memproxy.LeaseGetResponse{ + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas2, + }, resp2) + + p.stubLeaseSet1(memproxy.LeaseSetResponse{}, nil) + p.stubLeaseSet2(memproxy.LeaseSetResponse{}, nil) + + setFn := p.pipe.LeaseSet("KEY01", []byte("set data 01"), cas1, memproxy.LeaseSetOptions{}) + setResp, err := setFn() + + assert.Equal(t, nil, err) + assert.Equal(t, memproxy.LeaseSetResponse{}, setResp) + + get1Calls := p.pipe1.LeaseGetCalls() + assert.Equal(t, 2, len(get1Calls)) + + get2Calls := p.pipe2.LeaseGetCalls() + assert.Equal(t, 1, len(get2Calls)) + + setCalls1 := p.pipe1.LeaseSetCalls() + assert.Equal(t, 0, len(setCalls1)) + + setCalls2 := p.pipe2.LeaseSetCalls() + assert.Equal(t, 0, len(setCalls2)) + + assert.Equal(t, 3, len(p.selector.SelectServerCalls())) + }) + + t.Run("lease-get-failover-2-times--back-to-the-same--server--then-set", func(t *testing.T) { + p := newPipelineTest(t) + + const cas1 = 2255 + const cas2 = 2266 + const cas3 = 2277 + + p.stubHasNextAvail(true) + + p.stubSelect(serverID1, serverID1, serverID2, serverID2, serverID1) + p.stubLeaseGetMulti( + p.pipe1, + []memproxy.LeaseGetResponse{ + { + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas1, + }, + {}, + { + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas3, + }, + }, + []error{ + nil, + errors.New("server error"), + nil, + }, + ) + p.stubLeaseGetMulti( + p.pipe2, + []memproxy.LeaseGetResponse{ + { + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas2, + }, + {}, + }, + []error{ + nil, + errors.New("server error 2"), + }, + ) + + fn1 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{}) + fn2 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{}) + + resp1, err := fn1() + assert.Equal(t, nil, err) + assert.Equal(t, memproxy.LeaseGetResponse{ + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas1, + }, resp1) + + resp2, err := fn2() + assert.Equal(t, nil, err) + assert.Equal(t, memproxy.LeaseGetResponse{ + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas2, + }, resp2) + + fn3 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{}) + resp3, err := fn3() + assert.Equal(t, nil, err) + assert.Equal(t, memproxy.LeaseGetResponse{ + Status: memproxy.LeaseGetStatusLeaseGranted, + CAS: cas3, + }, resp3) + + p.stubLeaseSet1(memproxy.LeaseSetResponse{}, nil) + p.stubLeaseSet2(memproxy.LeaseSetResponse{}, nil) + + setFn := p.pipe.LeaseSet("KEY01", []byte("set data 01"), cas1, memproxy.LeaseSetOptions{}) + setResp, err := setFn() + + assert.Equal(t, nil, err) + assert.Equal(t, memproxy.LeaseSetResponse{}, setResp) + + get1Calls := p.pipe1.LeaseGetCalls() + assert.Equal(t, 3, len(get1Calls)) + + get2Calls := p.pipe2.LeaseGetCalls() + assert.Equal(t, 2, len(get2Calls)) + + setCalls1 := p.pipe1.LeaseSetCalls() + assert.Equal(t, 0, len(setCalls1)) + + setCalls2 := p.pipe2.LeaseSetCalls() + assert.Equal(t, 0, len(setCalls2)) + + assert.Equal(t, 5, len(p.selector.SelectServerCalls())) + }) } func TestPipeline__LeaseGet_Multi(t *testing.T) {