From e72b3bbc7328205291d117d74b9247a59278b418 Mon Sep 17 00:00:00 2001 From: Termina1 Date: Wed, 4 Sep 2024 18:16:17 +0300 Subject: [PATCH] add pings to sync --- .gitignore | 1 + chotki.go | 40 ++++++++---- chotki_test.go | 131 ++++++++++++++++++++++++++++++++++++++++ protocol/feeddrainer.go | 22 ++++++- sync.go | 106 +++++++++++++++++++++++++++++--- 5 files changed, 276 insertions(+), 24 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..314503f --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.chotki_cmd_log.txt diff --git a/chotki.go b/chotki.go index 5223de7..ab76bd4 100644 --- a/chotki.go +++ b/chotki.go @@ -60,6 +60,8 @@ type Options struct { MaxLogLen int64 RelaxedOrder bool Logger utils.Logger + PingPeriod time.Duration + PingWait time.Duration TlsConfig *tls.Config } @@ -69,6 +71,14 @@ func (o *Options) SetDefaults() { o.MaxLogLen = 1 << 23 } + if o.PingPeriod == 0 { + o.PingPeriod = 10 * time.Second + } + + if o.PingWait == 0 { + o.PingWait = 3 * time.Second + } + o.Merger = &pebble.Merger{ Name: "CRDT", Merge: func(key, value []byte) (pebble.ValueMerger, error) { @@ -159,12 +169,11 @@ func Open(dirname string, opts Options) (*Chotki, error) { } cho := Chotki{ - db: db, - src: opts.Src, - dir: absdir, - log: opts.Logger, - opts: opts, - + db: db, + src: opts.Src, + dir: absdir, + log: opts.Logger, + opts: opts, clock: &rdx.LocalLogicalClock{Source: opts.Src}, outq: xsync.NewMapOf[string, protocol.DrainCloser](), @@ -186,12 +195,14 @@ func Open(dirname string, opts Options) (*Chotki, error) { } return &Syncer{ - Src: cho.src, - Host: &cho, - Mode: SyncRWLive, - Name: name, - log: cho.log, - oqueue: queue, + Src: cho.src, + Host: &cho, + Mode: SyncRWLive, + PingPeriod: cho.opts.PingPeriod, + PingWait: cho.opts.PingWait, + Name: name, + log: cho.log, + oqueue: queue, } }, func(name string) { // destroy connection @@ -519,7 +530,10 @@ func (cho *Chotki) Drain(recs protocol.Records) (err error) { case 'B': // bye dear cho.syncs.Delete(id) - + case 'A': + cho.log.Info("received ping") + case 'Z': + cho.log.Info("received pong") default: return fmt.Errorf("unsupported packet type %c", lit) } diff --git a/chotki_test.go b/chotki_test.go index 050683a..436ab3b 100644 --- a/chotki_test.go +++ b/chotki_test.go @@ -1,12 +1,14 @@ package chotki import ( + "context" "errors" "fmt" "io" "log/slog" "os" "testing" + "time" "github.com/cockroachdb/pebble" "github.com/drpcorg/chotki/protocol" @@ -15,6 +17,18 @@ import ( "github.com/stretchr/testify/assert" ) +type FeedCloserTest struct{} + +func (t *FeedCloserTest) Feed() (recs protocol.Records, err error) { + return protocol.Records{}, nil +} + +func (t *FeedCloserTest) Close() error { + return nil +} + +var _ protocol.FeedCloser = (*FeedCloserTest)(nil) + func testdirs(origs ...uint64) ([]string, func()) { dirs := make([]string, len(origs)) @@ -120,6 +134,123 @@ func TestChotki_SyncEdit(t *testing.T) { _ = b.Close() } +func TestChotki_SyncLivePingsOk(t *testing.T) { + dirs, clear := testdirs(0xa, 0xb) + defer clear() + + a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A", Logger: utils.NewDefaultLogger(slog.LevelInfo)}) + assert.Nil(t, err) + + b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B", Logger: utils.NewDefaultLogger(slog.LevelInfo)}) + assert.Nil(t, err) + + synca := Syncer{ + Host: a, + PingPeriod: 100 * time.Millisecond, + PingWait: 100 * time.Millisecond, + Mode: SyncRWLive, Name: "a", + Src: a.src, + log: utils.NewDefaultLogger(slog.LevelDebug), + oqueue: &FeedCloserTest{}, + } + syncb := Syncer{ + Host: b, + PingPeriod: 100 * time.Second, + Mode: SyncRWLive, + PingWait: 3 * time.Second, + Name: "b", + Src: b.src, + log: utils.NewDefaultLogger(slog.LevelDebug), + oqueue: &FeedCloserTest{}, + } + + ctx, cancel := context.WithCancel(context.Background()) + go protocol.PumpCtxCallback(ctx, &synca, &syncb, func() bool { + return synca.GetFeedState() != SendPing + }) + go protocol.PumpCtx(ctx, &syncb, &synca) + time.Sleep(time.Millisecond * 10) + assert.Equal(t, SendLive, synca.GetFeedState()) + assert.Equal(t, SendLive, syncb.GetFeedState()) + assert.Equal(t, SendDiff, synca.GetDrainState()) + assert.Equal(t, SendDiff, syncb.GetDrainState()) + + time.Sleep(time.Millisecond * 110) + assert.Equal(t, SendPing, synca.GetFeedState()) + go protocol.PumpCtx(ctx, &synca, &syncb) + time.Sleep(time.Millisecond * 90) + + assert.Equal(t, SendLive, synca.GetFeedState()) + assert.Equal(t, SendLive, syncb.GetFeedState()) + assert.Equal(t, SendLive, synca.GetDrainState()) + assert.Equal(t, SendLive, syncb.GetDrainState()) + cancel() + syncb.Close() + synca.Close() + _ = a.Close() + _ = b.Close() +} + +func TestChotki_SyncLivePingsFail(t *testing.T) { + dirs, clear := testdirs(0xa, 0xb) + defer clear() + + a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A", Logger: utils.NewDefaultLogger(slog.LevelInfo)}) + assert.Nil(t, err) + + b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B", Logger: utils.NewDefaultLogger(slog.LevelInfo)}) + assert.Nil(t, err) + + synca := Syncer{ + Host: a, + PingPeriod: 100 * time.Millisecond, + PingWait: 100 * time.Millisecond, + Mode: SyncRWLive, Name: "a", + Src: a.src, + log: utils.NewDefaultLogger(slog.LevelDebug), + oqueue: &FeedCloserTest{}, + } + syncb := Syncer{ + Host: b, + PingPeriod: 100 * time.Second, + Mode: SyncRWLive, + PingWait: 3 * time.Second, + Name: "b", + Src: b.src, + log: utils.NewDefaultLogger(slog.LevelDebug), + oqueue: &FeedCloserTest{}, + } + + ctx, cancel := context.WithCancel(context.Background()) + go protocol.PumpCtxCallback(ctx, &synca, &syncb, func() bool { + return synca.GetFeedState() != SendPing + }) + go protocol.PumpCtxCallback(ctx, &syncb, &synca, func() bool { + return syncb.GetFeedState() != SendPong + }) + time.Sleep(time.Millisecond * 10) + assert.Equal(t, SendLive, synca.GetFeedState()) + assert.Equal(t, SendLive, syncb.GetFeedState()) + assert.Equal(t, SendDiff, synca.GetDrainState()) + assert.Equal(t, SendDiff, syncb.GetDrainState()) + + time.Sleep(time.Millisecond * 110) + assert.Equal(t, SendPing, synca.GetFeedState()) + go protocol.PumpCtx(ctx, &synca, &syncb) + time.Sleep(time.Millisecond * 200) + + assert.Equal(t, SendNone, synca.GetFeedState()) + assert.Equal(t, SendPong, syncb.GetFeedState()) + assert.Equal(t, SendDiff, synca.GetDrainState()) + assert.Equal(t, SendNone, syncb.GetDrainState()) + + cancel() + syncb.Close() + synca.Close() + _ = a.Close() + _ = b.Close() +} + func TestChotki_SyncGlobals(t *testing.T) { dirs, clear := testdirs(0xa, 0xb) defer clear() diff --git a/protocol/feeddrainer.go b/protocol/feeddrainer.go index 9eef7fe..ebaa34c 100644 --- a/protocol/feeddrainer.go +++ b/protocol/feeddrainer.go @@ -1,6 +1,9 @@ package protocol -import "io" +import ( + "context" + "io" +) type Feeder interface { // Feed reads and returns records. @@ -49,6 +52,23 @@ func Pump(feeder Feeder, drainer Drainer) (err error) { return } +func PumpCtx(ctx context.Context, feeder Feeder, drainer Drainer) (err error) { + for err == nil && ctx.Err() == nil { + err = Relay(feeder, drainer) + } + return +} + +func PumpCtxCallback(ctx context.Context, feeder Feeder, drainer Drainer, f func() bool) (err error) { + for err == nil && ctx.Err() == nil { + err = Relay(feeder, drainer) + if !f() { + return + } + } + return +} + func PumpN(feeder Feeder, drainer Drainer, n int) (err error) { for err == nil && n > 0 { err = Relay(feeder, drainer) diff --git a/sync.go b/sync.go index 7d39201..7bfd8bd 100644 --- a/sync.go +++ b/sync.go @@ -5,6 +5,7 @@ import ( "errors" "io" "sync" + "sync/atomic" "time" "github.com/cockroachdb/pebble" @@ -55,17 +56,30 @@ const ( SendLive SendEOF SendNone + SendPing + SendPong +) + +type PingState int + +const ( + Inactive PingState = iota + Ping + Pong + PingBroken ) func (s SyncState) String() string { - return []string{"SendHandshake", "SendDiff", "SendLive", "SendEOF", "SendNone"}[s] + return []string{"SendHandshake", "SendDiff", "SendLive", "SendEOF", "SendNone", "SendPing", "SendPong"}[s] } type Syncer struct { - Src uint64 - Name string - Host SyncHost - Mode SyncMode + Src uint64 + Name string + Host SyncHost + Mode SyncMode + PingPeriod time.Duration + PingWait time.Duration log utils.Logger vvit, ffit *pebble.Iterator @@ -79,8 +93,10 @@ type Syncer struct { vpack []byte reason error - lock sync.Mutex - cond sync.Cond + lock sync.Mutex + cond sync.Cond + pingTimer *time.Timer + pingStage atomic.Int32 } func (sync *Syncer) Close() error { @@ -118,9 +134,32 @@ func (sync *Syncer) Close() error { return nil } +func (sync *Syncer) GetFeedState() SyncState { + sync.lock.Lock() + defer sync.lock.Unlock() + return sync.feedState +} + +func (sync *Syncer) pingTransition() { + //nolint:exhaustive + switch PingState(sync.pingStage.Load()) { + case Ping: + sync.SetFeedState(SendPing) + case Pong: + sync.SetFeedState(SendPong) + case PingBroken: + sync.SetFeedState(SendEOF) + } +} + +func (sync *Syncer) GetDrainState() SyncState { + sync.lock.Lock() + defer sync.lock.Unlock() + return sync.drainState +} func (sync *Syncer) Feed() (recs protocol.Records, err error) { - switch sync.feedState { + switch sync.GetFeedState() { case SendHandshake: recs, err = sync.FeedHandshake() sync.SetFeedState(SendDiff) @@ -133,6 +172,7 @@ func (sync *Syncer) Feed() (recs protocol.Records, err error) { recs = append(recs, recs2...) if (sync.Mode & SyncLive) != 0 { sync.SetFeedState(SendLive) + sync.resetPingTimer() } else { sync.SetFeedState(SendEOF) } @@ -140,13 +180,30 @@ func (sync *Syncer) Feed() (recs protocol.Records, err error) { sync.snap = nil err = nil } - + case SendPing: + recs = protocol.Records{ + protocol.Record('A', rdx.Stlv("ping")), + } + sync.SetFeedState(SendLive) + sync.pingStage.Store(int32(Inactive)) + sync.pingTimer.Stop() + sync.pingTimer = time.AfterFunc(sync.PingWait, func() { + sync.pingStage.Store(int32(PingBroken)) + sync.log.Error("peer did not respond to ping", "name", sync.Name) + }) + case SendPong: + recs = protocol.Records{ + protocol.Record('Z', rdx.Stlv("pong")), + } + sync.pingStage.Store(int32(Inactive)) + sync.SetFeedState(SendLive) case SendLive: recs, err = sync.oqueue.Feed() if err == utils.ErrClosed { sync.SetFeedState(SendEOF) err = nil } + sync.pingTransition() case SendEOF: reason := []byte("closing") @@ -202,10 +259,13 @@ func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) { _, sync.vpack = protocol.OpenHeader(sync.vpack, 'V') // 5 sync.vpack = append(sync.vpack, protocol.Record('T', sync.snaplast.ZipBytes())...) + sync.lock.Lock() + mode := sync.Mode.Zip() + sync.lock.Unlock() // handshake: H(T{pro,src} M(mode) V(V{p,s}+)) hs := protocol.Record('H', protocol.TinyRecord('T', sync.snaplast.ZipBytes()), - protocol.TinyRecord('M', sync.Mode.Zip()), + protocol.TinyRecord('M', mode), protocol.Record('V', sync.vvit.Value()), ) @@ -314,6 +374,17 @@ func LastLit(recs protocol.Records) byte { return protocol.Lit(recs[len(recs)-1]) } +func (sync *Syncer) resetPingTimer() { + sync.lock.Lock() + defer sync.lock.Unlock() + if sync.pingTimer != nil { + sync.pingTimer.Stop() + } + sync.pingTimer = time.AfterFunc(sync.PingPeriod, func() { + sync.pingStage.Store(int32(Ping)) + }) +} + func (sync *Syncer) Drain(recs protocol.Records) (err error) { if len(recs) == 0 { return nil @@ -338,6 +409,8 @@ func (sync *Syncer) Drain(recs protocol.Records) (err error) { break } fallthrough + case SendPong, SendPing: + panic("chotki: unacceptable sync-state") case SendDiff: lit := LastLit(recs) @@ -347,17 +420,28 @@ func (sync *Syncer) Drain(recs protocol.Records) (err error) { } else { sync.SetDrainState(SendLive) } + if lit == 'A' { + sync.pingStage.Store(int32(Pong)) + } + } + if sync.Mode&SyncLive != 0 { + sync.resetPingTimer() } err = sync.Host.Drain(recs) if err == nil { sync.Host.Broadcast(recs, sync.Name) } + sync.pingTransition() case SendLive: + sync.resetPingTimer() lit := LastLit(recs) if lit == 'B' { sync.SetDrainState(SendNone) } + if lit == 'A' { + sync.SetFeedState(SendPong) + } err = sync.Host.Drain(recs) if err == nil { sync.Host.Broadcast(recs, sync.Name) @@ -384,6 +468,8 @@ func (sync *Syncer) DrainHandshake(recs protocol.Records) (err error) { } var mode SyncMode mode, sync.peervv, err = ParseHandshake(body) + sync.lock.Lock() sync.Mode &= mode + sync.lock.Unlock() return }