Skip to content

Commit

Permalink
Test Consistency with Proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
tung.tq committed Feb 10, 2023
1 parent c04be92 commit 5d02ee6
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 6 deletions.
45 changes: 45 additions & 0 deletions item/item_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/QuangTung97/go-memcache/memcache"
"github.com/QuangTung97/memproxy"
"github.com/QuangTung97/memproxy/proxy"
"github.com/stretchr/testify/assert"
"math/rand"
"sync"
Expand Down Expand Up @@ -81,6 +82,37 @@ func newItemPropertyTest(t *testing.T) *itemPropertyTest {
return p
}

func newItemPropertyTestWithProxy(t *testing.T) *itemPropertyTest {
p := &itemPropertyTest{}

client, err := memcache.New("localhost:11211", 3)
if err != nil {
panic(err)
}
t.Cleanup(func() { _ = client.Close() })
p.client = client

servers := []proxy.SimpleServerConfig{
{
Host: "localhost",
Port: 11211,
},
}
mc, closeFunc, err := proxy.NewSimpleReplicatedMemcache(
servers, 3,
proxy.NewSimpleStats(servers),
)
if err != nil {
panic(err)
}
t.Cleanup(closeFunc)
p.mc = mc

p.sessProvider = memproxy.NewSessionProvider()

return p
}

func (p *itemPropertyTest) testConsistency(t *testing.T) {
var wg sync.WaitGroup

Expand Down Expand Up @@ -154,4 +186,17 @@ func TestProperty_SingleKey(t *testing.T) {
p.testConsistency(t)
}
})

t.Run("with-proxy", func(t *testing.T) {
seed := time.Now().UnixNano()
rand.Seed(seed)
fmt.Println("SEED:", seed)

p := newItemPropertyTestWithProxy(t)

for i := 0; i < 100; i++ {
p.flushAll()
p.testConsistency(t)
}
})
}
30 changes: 24 additions & 6 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ type Pipeline struct {
//revive:disable-next-line:nested-structs
needExecServerSet map[ServerID]struct{}

leaseSetServers map[string]ServerID
leaseSetServers map[string]leaseSetState
}

type leaseSetState struct {
valid bool // for preventing a special race condition
serverID ServerID
}

// Pipeline ...
Expand All @@ -64,7 +69,7 @@ func (m *Memcache) Pipeline(

pipelines: map[ServerID]memproxy.Pipeline{},

leaseSetServers: map[string]ServerID{},
leaseSetServers: map[string]leaseSetState{},
}
}

Expand Down Expand Up @@ -115,7 +120,20 @@ func (p *Pipeline) setKeyForLeaseSet(
serverID ServerID,
) {
if resp.Status == memproxy.LeaseGetStatusLeaseGranted || resp.Status == memproxy.LeaseGetStatusLeaseRejected {
p.leaseSetServers[key] = serverID
prev, ok := p.leaseSetServers[key]
if ok {
if prev.serverID != serverID {
prev.valid = false
p.leaseSetServers[key] = prev
return
}
return
}

p.leaseSetServers[key] = leaseSetState{
valid: true,
serverID: serverID,
}
}
}

Expand Down Expand Up @@ -171,13 +189,13 @@ func (p *Pipeline) LeaseSet(
key string, data []byte, cas uint64,
options memproxy.LeaseSetOptions,
) func() (memproxy.LeaseSetResponse, error) {
serverID, ok := p.leaseSetServers[key]
if !ok {
setState, ok := p.leaseSetServers[key]
if !ok || !setState.valid {
return func() (memproxy.LeaseSetResponse, error) {
return memproxy.LeaseSetResponse{}, nil
}
}
pipe := p.getRoutePipeline(serverID)
pipe := p.getRoutePipeline(setState.serverID)
return pipe.LeaseSet(key, data, cas, options)
}

Expand Down
187 changes: 187 additions & 0 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,23 @@ func (p *pipelineTest) stubPipeLeaseGet(pipe *mocks.PipelineMock, resp memproxy.
}
}

func (p *pipelineTest) stubLeaseGetMulti(
pipe *mocks.PipelineMock,
respList []memproxy.LeaseGetResponse,
errList []error,
) {
pipe.LeaseGetFunc = func(
key string, options memproxy.LeaseGetOptions,
) func() (memproxy.LeaseGetResponse, error) {
index := len(pipe.LeaseGetCalls()) - 1
p.appendAction(leaseGetAction(key))
return func() (memproxy.LeaseGetResponse, error) {
p.appendAction(leaseGetFuncAction(key))
return respList[index], errList[index]
}
}
}

func (p *pipelineTest) stubLeaseGet1(resp memproxy.LeaseGetResponse, err error) {
p.stubPipeLeaseGet(p.pipe1, resp, err)
}
Expand Down Expand Up @@ -497,6 +514,176 @@ func TestPipeline__LeaseGet_Then_Set(t *testing.T) {
leaseSetFuncAction("KEY01"),
}, p.actions)
})

t.Run("lease-get-granted--then-failover--then-lease-get-again--then-set", func(t *testing.T) {
p := newPipelineTest(t)

const cas1 = 2255
const cas2 = 2266

p.stubHasNextAvail(true)

p.stubSelect(serverID1, serverID1, serverID2)
p.stubLeaseGetMulti(
p.pipe1,
[]memproxy.LeaseGetResponse{
{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas1,
},
{},
},
[]error{
nil,
errors.New("server error"),
},
)
p.stubLeaseGetMulti(
p.pipe2,
[]memproxy.LeaseGetResponse{
{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas2,
},
},
[]error{
nil,
},
)

fn1 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
fn2 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})

resp1, err := fn1()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas1,
}, resp1)

resp2, err := fn2()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas2,
}, resp2)

p.stubLeaseSet1(memproxy.LeaseSetResponse{}, nil)
p.stubLeaseSet2(memproxy.LeaseSetResponse{}, nil)

setFn := p.pipe.LeaseSet("KEY01", []byte("set data 01"), cas1, memproxy.LeaseSetOptions{})
setResp, err := setFn()

assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseSetResponse{}, setResp)

get1Calls := p.pipe1.LeaseGetCalls()
assert.Equal(t, 2, len(get1Calls))

get2Calls := p.pipe2.LeaseGetCalls()
assert.Equal(t, 1, len(get2Calls))

setCalls1 := p.pipe1.LeaseSetCalls()
assert.Equal(t, 0, len(setCalls1))

setCalls2 := p.pipe2.LeaseSetCalls()
assert.Equal(t, 0, len(setCalls2))

assert.Equal(t, 3, len(p.selector.SelectServerCalls()))
})

t.Run("lease-get-failover-2-times--back-to-the-same--server--then-set", func(t *testing.T) {
p := newPipelineTest(t)

const cas1 = 2255
const cas2 = 2266
const cas3 = 2277

p.stubHasNextAvail(true)

p.stubSelect(serverID1, serverID1, serverID2, serverID2, serverID1)
p.stubLeaseGetMulti(
p.pipe1,
[]memproxy.LeaseGetResponse{
{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas1,
},
{},
{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas3,
},
},
[]error{
nil,
errors.New("server error"),
nil,
},
)
p.stubLeaseGetMulti(
p.pipe2,
[]memproxy.LeaseGetResponse{
{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas2,
},
{},
},
[]error{
nil,
errors.New("server error 2"),
},
)

fn1 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
fn2 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})

resp1, err := fn1()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas1,
}, resp1)

resp2, err := fn2()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas2,
}, resp2)

fn3 := p.pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp3, err := fn3()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: cas3,
}, resp3)

p.stubLeaseSet1(memproxy.LeaseSetResponse{}, nil)
p.stubLeaseSet2(memproxy.LeaseSetResponse{}, nil)

setFn := p.pipe.LeaseSet("KEY01", []byte("set data 01"), cas1, memproxy.LeaseSetOptions{})
setResp, err := setFn()

assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseSetResponse{}, setResp)

get1Calls := p.pipe1.LeaseGetCalls()
assert.Equal(t, 3, len(get1Calls))

get2Calls := p.pipe2.LeaseGetCalls()
assert.Equal(t, 2, len(get2Calls))

setCalls1 := p.pipe1.LeaseSetCalls()
assert.Equal(t, 0, len(setCalls1))

setCalls2 := p.pipe2.LeaseSetCalls()
assert.Equal(t, 0, len(setCalls2))

assert.Equal(t, 5, len(p.selector.SelectServerCalls()))
})
}

func TestPipeline__LeaseGet_Multi(t *testing.T) {
Expand Down

0 comments on commit 5d02ee6

Please sign in to comment.