Skip to content

Commit

Permalink
wait drain state uses context and correctly canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Sep 5, 2024
1 parent 971c2de commit a508729
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion chotki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestChotki_SyncLivePingsOk(t *testing.T) {
synca := Syncer{
Host: a,
PingPeriod: 100 * time.Millisecond,
PingWait: 100 * time.Millisecond,
PingWait: 200 * time.Millisecond,
Mode: SyncRWLive, Name: "a",
Src: a.src,
log: utils.NewDefaultLogger(slog.LevelDebug),
Expand Down
21 changes: 16 additions & 5 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chotki

import (
"bytes"
"context"
"errors"
"io"
"sync"
Expand Down Expand Up @@ -165,13 +166,14 @@ func (sync *Syncer) Feed() (recs protocol.Records, err error) {
sync.SetFeedState(SendDiff)

case SendDiff:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
select {
case <-time.After(sync.PingWait):
sync.log.Error("handshake took too long", "name", sync.Name)
sync.SetFeedState(SendEOF)
return
case <-sync.WaitDrainState(SendDiff):
case <-sync.WaitDrainState(ctx, SendDiff):
}
recs, err = sync.FeedBlockDiff()
if err == io.EOF {
Expand Down Expand Up @@ -231,7 +233,7 @@ func (sync *Syncer) Feed() (recs protocol.Records, err error) {
timer := time.AfterFunc(time.Second, func() {
sync.SetDrainState(SendNone)
})
<-sync.WaitDrainState(SendNone)
<-sync.WaitDrainState(context.Background(), SendNone)
timer.Stop()
err = io.EOF
}
Expand Down Expand Up @@ -361,18 +363,27 @@ func (sync *Syncer) SetDrainState(state SyncState) {
sync.lock.Unlock()
}

func (sync *Syncer) WaitDrainState(state SyncState) chan SyncState {
func (sync *Syncer) WaitDrainState(ctx context.Context, state SyncState) chan SyncState {
res := make(chan SyncState)
go func() {
<-ctx.Done()
sync.cond.Broadcast()
}()
go func() {
defer close(res)
sync.lock.Lock()
defer sync.lock.Unlock()
if sync.cond.L == nil {
sync.cond.L = &sync.lock
}
for sync.drainState < state {
if ctx.Err() != nil {
return
}
sync.cond.Wait()
}
ds := sync.drainState
sync.lock.Unlock()

res <- ds
}()
return res
Expand Down

0 comments on commit a508729

Please sign in to comment.