Skip to content

Commit

Permalink
Remove Session Param on Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
tung.tq committed Feb 15, 2023
1 parent 5d02ee6 commit 5cf7b77
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 67 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
run: make benchmark
- name: Benchmark Stat
run: benchstat benchmark_new.txt benchmark_new.txt
- name: Benchmark Memory Allocation
run: make membench
- name: Convert coverage.out to coverage.lcov
uses: jandelgado/[email protected]
- name: Coveralls
Expand Down
8 changes: 3 additions & 5 deletions examples/failover/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,18 @@ func main() {
}
defer closeFun()

sessProvider := memproxy.NewSessionProvider()
userSeq := 0

for {
doGetFromCache(mc, sessProvider, &userSeq)
doGetFromCache(mc, &userSeq)
time.Sleep(1 * time.Second)
}
}

func doGetFromCache(
mc memproxy.Memcache, sessProvider memproxy.SessionProvider,
mc memproxy.Memcache,
userSeq *int,
) {
pipe := mc.Pipeline(context.Background(), sessProvider.New())
pipe := mc.Pipeline(context.Background())
defer pipe.Finish()

*userSeq++
Expand Down
22 changes: 10 additions & 12 deletions item/item_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,20 @@ func clearMemcache(c *memcache.Client) {
}
}

func newMemcache(b *testing.B) (memproxy.Memcache, memproxy.SessionProvider) {
func newMemcache(b *testing.B) memproxy.Memcache {
client, err := memcache.New("localhost:11211", 1)
if err != nil {
panic(err)
}
clearMemcache(client)

mc := memproxy.NewPlainMemcache(client, 3)
mc := memproxy.NewPlainMemcache(client)
b.Cleanup(func() { _ = mc.Close() })

sess := memproxy.NewSessionProvider()
return mc, sess
return mc
}

func newMemcacheWithProxy(b *testing.B) (memproxy.Memcache, memproxy.SessionProvider) {
func newMemcacheWithProxy(b *testing.B) memproxy.Memcache {
clearClient, err := memcache.New("localhost:11211", 1)
if err != nil {
panic(err)
Expand All @@ -105,19 +104,18 @@ func newMemcacheWithProxy(b *testing.B) (memproxy.Memcache, memproxy.SessionProv
panic(err)
}

sess := memproxy.NewSessionProvider()
return mc, sess
return mc
}

func BenchmarkItemGetSingle(b *testing.B) {
mc, sess := newMemcache(b)
mc := newMemcache(b)

b.ResetTimer()

value := int64(112)

for n := 0; n < b.N; n++ {
pipe := mc.Pipeline(context.Background(), sess.New())
pipe := mc.Pipeline(context.Background())

var filler Filler[benchValue, benchKey] = func(ctx context.Context, key benchKey) func() (benchValue, error) {
return func() (benchValue, error) {
Expand Down Expand Up @@ -171,17 +169,17 @@ func writeMemProfile() {

func benchmarkWithBatch(
b *testing.B,
newFunc func(b *testing.B) (memproxy.Memcache, memproxy.SessionProvider),
newFunc func(b *testing.B) memproxy.Memcache,
batchSize int,
) {
mc, sess := newFunc(b)
mc := newFunc(b)

b.ResetTimer()

value := int64(112)

for n := 0; n < b.N; n++ {
pipe := mc.Pipeline(context.Background(), sess.New())
pipe := mc.Pipeline(context.Background())

var filler Filler[benchValue, benchKey] = func(ctx context.Context, key benchKey) func() (benchValue, error) {
return func() (benchValue, error) {
Expand Down
14 changes: 4 additions & 10 deletions item/item_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ import (

type itemPropertyTest struct {
client *memcache.Client

mc memproxy.Memcache
sessProvider memproxy.SessionProvider
mc memproxy.Memcache

mut sync.Mutex
currentAge int64
}

func (p *itemPropertyTest) newItem() (*Item[userValue, userKey], func()) {
pipe := p.mc.Pipeline(newContext(), p.sessProvider.New())
pipe := p.mc.Pipeline(newContext())
return New[userValue, userKey](
pipe, unmarshalUser,
NewMultiGetFiller[userValue, userKey](func(ctx context.Context, keys []userKey) ([]userValue, error) {
Expand Down Expand Up @@ -53,7 +51,7 @@ func (p *itemPropertyTest) updateAge(key userKey) {
p.currentAge++
p.mut.Unlock()

pipe := p.mc.Pipeline(newContext(), p.sessProvider.New())
pipe := p.mc.Pipeline(newContext())
pipe.Delete(key.String(), memproxy.DeleteOptions{})
pipe.Finish()
}
Expand All @@ -76,9 +74,7 @@ func newItemPropertyTest(t *testing.T) *itemPropertyTest {
t.Cleanup(func() { _ = client.Close() })
p.client = client

p.mc = memproxy.NewPlainMemcache(client, 3)
p.sessProvider = memproxy.NewSessionProvider()

p.mc = memproxy.NewPlainMemcache(client)
return p
}

Expand Down Expand Up @@ -108,8 +104,6 @@ func newItemPropertyTestWithProxy(t *testing.T) *itemPropertyTest {
t.Cleanup(closeFunc)
p.mc = mc

p.sessProvider = memproxy.NewSessionProvider()

return p
}

Expand Down
38 changes: 35 additions & 3 deletions memproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// Memcache represents a generic Memcache interface
// implementations of this interface must be thread safe
type Memcache interface {
Pipeline(ctx context.Context, sess Session, options ...PipelineOption) Pipeline
Pipeline(ctx context.Context, options ...PipelineOption) Pipeline

// Close ...
Close() error
Expand Down Expand Up @@ -98,8 +98,40 @@ type DeleteOptions struct {
type DeleteResponse struct {
}

type pipelineOptions struct {
//==============================================
// Pipeline Options
//==============================================

// PipelineConfig ...
type PipelineConfig struct {
existingSess Session
}

// GetSession ...
func (c *PipelineConfig) GetSession(provider SessionProvider) Session {
if c.existingSess != nil {
return c.existingSess
}
return provider.New()
}

// ComputePipelineConfig ...
func ComputePipelineConfig(options []PipelineOption) *PipelineConfig {
conf := &PipelineConfig{
existingSess: nil,
}
for _, fn := range options {
fn(conf)
}
return conf
}

// PipelineOption ...
type PipelineOption func(opts *pipelineOptions)
type PipelineOption func(conf *PipelineConfig)

// WithPipelineExistingSession ...
func WithPipelineExistingSession(sess Session) PipelineOption {
return func(conf *PipelineConfig) {
conf.existingSess = sess
}
}
14 changes: 4 additions & 10 deletions mocks/memproxy_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 45 additions & 3 deletions plain_memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

type plainMemcacheImpl struct {
client *memcache.Client
sessProvider SessionProvider
leaseDuration uint32
}

Expand All @@ -18,18 +19,59 @@ type plainPipelineImpl struct {
leaseDuration uint32
}

type plainMemcacheConfig struct {
leaseDurationSeconds uint32
sessProvider SessionProvider
}

func computePlainMemcacheConfig(options ...PlainMemcacheOption) *plainMemcacheConfig {
conf := &plainMemcacheConfig{
leaseDurationSeconds: 3,
sessProvider: NewSessionProvider(),
}
for _, fn := range options {
fn(conf)
}
return conf
}

// PlainMemcacheOption ...
type PlainMemcacheOption func(opts *plainMemcacheConfig)

// WithPlainMemcacheLeaseDuration ...
func WithPlainMemcacheLeaseDuration(leaseDurationSeconds uint32) PlainMemcacheOption {
return func(opts *plainMemcacheConfig) {
opts.leaseDurationSeconds = leaseDurationSeconds
}
}

// WithPlainMemcacheSessionProvider ...
func WithPlainMemcacheSessionProvider(sessProvider SessionProvider) PlainMemcacheOption {
return func(opts *plainMemcacheConfig) {
opts.sessProvider = sessProvider
}
}

var _ Pipeline = &plainPipelineImpl{}

// NewPlainMemcache a light wrapper around memcached client
func NewPlainMemcache(client *memcache.Client, leaseDurationSeconds uint32) Memcache {
func NewPlainMemcache(
client *memcache.Client,
options ...PlainMemcacheOption,
) Memcache {
conf := computePlainMemcacheConfig(options...)
return &plainMemcacheImpl{
client: client,
leaseDuration: leaseDurationSeconds,
sessProvider: conf.sessProvider,
leaseDuration: conf.leaseDurationSeconds,
}
}

// Pipeline ...
func (m *plainMemcacheImpl) Pipeline(_ context.Context, sess Session, _ ...PipelineOption) Pipeline {
func (m *plainMemcacheImpl) Pipeline(_ context.Context, options ...PipelineOption) Pipeline {
conf := ComputePipelineConfig(options)
sess := conf.GetSession(m.sessProvider)

return &plainPipelineImpl{
sess: sess,
pipeline: m.client.Pipeline(),
Expand Down
4 changes: 2 additions & 2 deletions plain_memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func newPlainMemcacheTest(t *testing.T) *plainMemcacheTest {
panic(err)
}

cache := NewPlainMemcache(client, 7)
cache := NewPlainMemcache(client, WithPlainMemcacheLeaseDuration(7))

return &plainMemcacheTest{
pipe: cache.Pipeline(context.Background(), nil),
pipe: cache.Pipeline(context.Background()),
}
}

Expand Down
Loading

0 comments on commit 5cf7b77

Please sign in to comment.