Skip to content

Commit

Permalink
add pings to sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Sep 5, 2024
1 parent 3dceb66 commit e72b3bb
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.chotki_cmd_log.txt
40 changes: 27 additions & 13 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Options struct {
MaxLogLen int64
RelaxedOrder bool
Logger utils.Logger
PingPeriod time.Duration
PingWait time.Duration

TlsConfig *tls.Config
}
Expand All @@ -69,6 +71,14 @@ func (o *Options) SetDefaults() {
o.MaxLogLen = 1 << 23
}

if o.PingPeriod == 0 {
o.PingPeriod = 10 * time.Second
}

if o.PingWait == 0 {
o.PingWait = 3 * time.Second
}

o.Merger = &pebble.Merger{
Name: "CRDT",
Merge: func(key, value []byte) (pebble.ValueMerger, error) {
Expand Down Expand Up @@ -159,12 +169,11 @@ func Open(dirname string, opts Options) (*Chotki, error) {
}

cho := Chotki{
db: db,
src: opts.Src,
dir: absdir,
log: opts.Logger,
opts: opts,

db: db,
src: opts.Src,
dir: absdir,
log: opts.Logger,
opts: opts,
clock: &rdx.LocalLogicalClock{Source: opts.Src},

outq: xsync.NewMapOf[string, protocol.DrainCloser](),
Expand All @@ -186,12 +195,14 @@ func Open(dirname string, opts Options) (*Chotki, error) {
}

return &Syncer{
Src: cho.src,
Host: &cho,
Mode: SyncRWLive,
Name: name,
log: cho.log,
oqueue: queue,
Src: cho.src,
Host: &cho,
Mode: SyncRWLive,
PingPeriod: cho.opts.PingPeriod,
PingWait: cho.opts.PingWait,
Name: name,
log: cho.log,
oqueue: queue,
}
},
func(name string) { // destroy connection
Expand Down Expand Up @@ -519,7 +530,10 @@ func (cho *Chotki) Drain(recs protocol.Records) (err error) {

case 'B': // bye dear
cho.syncs.Delete(id)

case 'A':
cho.log.Info("received ping")
case 'Z':
cho.log.Info("received pong")
default:
return fmt.Errorf("unsupported packet type %c", lit)
}
Expand Down
131 changes: 131 additions & 0 deletions chotki_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package chotki

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/drpcorg/chotki/protocol"
Expand All @@ -15,6 +17,18 @@ import (
"github.com/stretchr/testify/assert"
)

type FeedCloserTest struct{}

func (t *FeedCloserTest) Feed() (recs protocol.Records, err error) {
return protocol.Records{}, nil
}

func (t *FeedCloserTest) Close() error {
return nil
}

var _ protocol.FeedCloser = (*FeedCloserTest)(nil)

func testdirs(origs ...uint64) ([]string, func()) {
dirs := make([]string, len(origs))

Expand Down Expand Up @@ -120,6 +134,123 @@ func TestChotki_SyncEdit(t *testing.T) {
_ = b.Close()
}

func TestChotki_SyncLivePingsOk(t *testing.T) {
dirs, clear := testdirs(0xa, 0xb)
defer clear()

a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A", Logger: utils.NewDefaultLogger(slog.LevelInfo)})
assert.Nil(t, err)

b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B", Logger: utils.NewDefaultLogger(slog.LevelInfo)})
assert.Nil(t, err)

synca := Syncer{
Host: a,
PingPeriod: 100 * time.Millisecond,
PingWait: 100 * time.Millisecond,
Mode: SyncRWLive, Name: "a",
Src: a.src,
log: utils.NewDefaultLogger(slog.LevelDebug),
oqueue: &FeedCloserTest{},
}
syncb := Syncer{
Host: b,
PingPeriod: 100 * time.Second,
Mode: SyncRWLive,
PingWait: 3 * time.Second,
Name: "b",
Src: b.src,
log: utils.NewDefaultLogger(slog.LevelDebug),
oqueue: &FeedCloserTest{},
}

ctx, cancel := context.WithCancel(context.Background())
go protocol.PumpCtxCallback(ctx, &synca, &syncb, func() bool {
return synca.GetFeedState() != SendPing
})
go protocol.PumpCtx(ctx, &syncb, &synca)
time.Sleep(time.Millisecond * 10)
assert.Equal(t, SendLive, synca.GetFeedState())
assert.Equal(t, SendLive, syncb.GetFeedState())
assert.Equal(t, SendDiff, synca.GetDrainState())
assert.Equal(t, SendDiff, syncb.GetDrainState())

time.Sleep(time.Millisecond * 110)
assert.Equal(t, SendPing, synca.GetFeedState())
go protocol.PumpCtx(ctx, &synca, &syncb)
time.Sleep(time.Millisecond * 90)

assert.Equal(t, SendLive, synca.GetFeedState())
assert.Equal(t, SendLive, syncb.GetFeedState())
assert.Equal(t, SendLive, synca.GetDrainState())
assert.Equal(t, SendLive, syncb.GetDrainState())
cancel()
syncb.Close()
synca.Close()
_ = a.Close()
_ = b.Close()
}

func TestChotki_SyncLivePingsFail(t *testing.T) {
dirs, clear := testdirs(0xa, 0xb)
defer clear()

a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A", Logger: utils.NewDefaultLogger(slog.LevelInfo)})
assert.Nil(t, err)

b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B", Logger: utils.NewDefaultLogger(slog.LevelInfo)})
assert.Nil(t, err)

synca := Syncer{
Host: a,
PingPeriod: 100 * time.Millisecond,
PingWait: 100 * time.Millisecond,
Mode: SyncRWLive, Name: "a",
Src: a.src,
log: utils.NewDefaultLogger(slog.LevelDebug),
oqueue: &FeedCloserTest{},
}
syncb := Syncer{
Host: b,
PingPeriod: 100 * time.Second,
Mode: SyncRWLive,
PingWait: 3 * time.Second,
Name: "b",
Src: b.src,
log: utils.NewDefaultLogger(slog.LevelDebug),
oqueue: &FeedCloserTest{},
}

ctx, cancel := context.WithCancel(context.Background())
go protocol.PumpCtxCallback(ctx, &synca, &syncb, func() bool {
return synca.GetFeedState() != SendPing
})
go protocol.PumpCtxCallback(ctx, &syncb, &synca, func() bool {
return syncb.GetFeedState() != SendPong
})
time.Sleep(time.Millisecond * 10)
assert.Equal(t, SendLive, synca.GetFeedState())
assert.Equal(t, SendLive, syncb.GetFeedState())
assert.Equal(t, SendDiff, synca.GetDrainState())
assert.Equal(t, SendDiff, syncb.GetDrainState())

time.Sleep(time.Millisecond * 110)
assert.Equal(t, SendPing, synca.GetFeedState())
go protocol.PumpCtx(ctx, &synca, &syncb)
time.Sleep(time.Millisecond * 200)

assert.Equal(t, SendNone, synca.GetFeedState())
assert.Equal(t, SendPong, syncb.GetFeedState())
assert.Equal(t, SendDiff, synca.GetDrainState())
assert.Equal(t, SendNone, syncb.GetDrainState())

cancel()
syncb.Close()
synca.Close()
_ = a.Close()
_ = b.Close()
}

func TestChotki_SyncGlobals(t *testing.T) {
dirs, clear := testdirs(0xa, 0xb)
defer clear()
Expand Down
22 changes: 21 additions & 1 deletion protocol/feeddrainer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package protocol

import "io"
import (
"context"
"io"
)

type Feeder interface {
// Feed reads and returns records.
Expand Down Expand Up @@ -49,6 +52,23 @@ func Pump(feeder Feeder, drainer Drainer) (err error) {
return
}

func PumpCtx(ctx context.Context, feeder Feeder, drainer Drainer) (err error) {
for err == nil && ctx.Err() == nil {
err = Relay(feeder, drainer)
}
return
}

func PumpCtxCallback(ctx context.Context, feeder Feeder, drainer Drainer, f func() bool) (err error) {
for err == nil && ctx.Err() == nil {
err = Relay(feeder, drainer)
if !f() {
return
}
}
return
}

func PumpN(feeder Feeder, drainer Drainer, n int) (err error) {
for err == nil && n > 0 {
err = Relay(feeder, drainer)
Expand Down
Loading

0 comments on commit e72b3bb

Please sign in to comment.