Skip to content

Commit

Permalink
update reconnect subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdimidium committed Apr 24, 2024
1 parent 7d1e3b3 commit 013c55f
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 237 deletions.
10 changes: 5 additions & 5 deletions toyqueue/drainfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "io"

// Records (a batch of) as a very universal primitive, especially
// for database/network op/packet processing. Batching allows
// for writev() and other performance optimizations. ALso, if
// for writev() and other performance optimizations. Also, if
// you have cryptography, blobs are way handier than structs.
// Records converts easily to net.Buffers.
type Records [][]byte
Expand Down Expand Up @@ -37,14 +37,14 @@ type Drainer interface {
Drain(recs Records) error
}

type DrainSeeker interface {
type DrainCloser interface {
Drainer
io.Seeker
io.Closer
}

type DrainCloser interface {
type DrainSeeker interface {
Drainer
io.Closer
io.Seeker
}

type DrainSeekCloser interface {
Expand Down
48 changes: 24 additions & 24 deletions toyqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,29 @@ func (recs Records) TotalLen() (total int64) {

type RecordQueue struct {
recs Records
lock sync.Mutex
cond sync.Cond
mu sync.Mutex
co sync.Cond
Limit int
}

var ErrWouldBlock = errors.New("the queue is over capacity")
var ErrClosed = errors.New("queue is closed")

func (q *RecordQueue) Drain(recs Records) error {
q.lock.Lock()
q.mu.Lock()
was0 := len(q.recs) == 0
if len(q.recs)+len(recs) > q.Limit {
q.lock.Unlock()
q.mu.Unlock()
if q.Limit == 0 {
return ErrClosed
}
return ErrWouldBlock
}
q.recs = append(q.recs, recs...)
if was0 && q.cond.L != nil {
q.cond.Broadcast()
if was0 && q.co.L != nil {
q.co.Broadcast()
}
q.lock.Unlock()
q.mu.Unlock()
return nil
}

Expand All @@ -72,28 +72,28 @@ func (q *RecordQueue) Close() error {
}

func (q *RecordQueue) Feed() (recs Records, err error) {
q.lock.Lock()
q.mu.Lock()
if len(q.recs) == 0 {
err = ErrWouldBlock
if q.Limit == 0 {
err = ErrClosed
}
q.lock.Unlock()
q.mu.Unlock()
return
}
wasfull := len(q.recs) >= q.Limit
recs = q.recs
q.recs = q.recs[len(q.recs):]
if wasfull && q.cond.L != nil {
q.cond.Broadcast()
if wasfull && q.co.L != nil {
q.co.Broadcast()
}
q.lock.Unlock()
q.mu.Unlock()
return
}

func (q *RecordQueue) Blocking() FeedDrainCloser {
if q.cond.L == nil {
q.cond.L = &q.lock
if q.co.L == nil {
q.co.L = &q.mu
}
return &blockingRecordQueue{q}
}
Expand All @@ -108,15 +108,15 @@ func (bq *blockingRecordQueue) Close() error {

func (bq *blockingRecordQueue) Drain(recs Records) error {
q := bq.queue
q.lock.Lock()
q.mu.Lock()
for len(recs) > 0 {
was0 := len(q.recs) == 0
for q.Limit <= len(q.recs) {
if q.Limit == 0 {
q.lock.Unlock()
q.mu.Unlock()
return ErrClosed
}
q.cond.Wait()
q.co.Wait()
}
qcap := q.Limit - len(q.recs)
if qcap > len(recs) {
Expand All @@ -125,29 +125,29 @@ func (bq *blockingRecordQueue) Drain(recs Records) error {
q.recs = append(q.recs, recs[:qcap]...)
recs = recs[qcap:]
if was0 {
q.cond.Broadcast()
q.co.Broadcast()
}
}
q.lock.Unlock()
q.mu.Unlock()
return nil
}

func (bq *blockingRecordQueue) Feed() (recs Records, err error) {
q := bq.queue
q.lock.Lock()
q.mu.Lock()
wasfull := len(q.recs) >= q.Limit
for len(q.recs) == 0 {
if q.Limit == 0 {
q.lock.Unlock()
q.mu.Unlock()
return nil, ErrClosed
}
q.cond.Wait()
q.co.Wait()
}
recs = q.recs
q.recs = q.recs[len(q.recs):]
if wasfull {
q.cond.Broadcast()
q.co.Broadcast()
}
q.lock.Unlock()
q.mu.Unlock()
return
}
134 changes: 45 additions & 89 deletions toytlv/peer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package toytlv

import (
"context"
"errors"
"io"
"log/slog"
"net"
"sync"
"sync/atomic"
"time"

Expand All @@ -30,134 +30,90 @@ const (
type Jack func(conn net.Conn) toyqueue.FeedDrainCloser

type Peer struct {
conn atomic.Pointer[net.Conn]
inout toyqueue.FeedDrainCloser
reconnect func() (net.Conn, error)

mu sync.Mutex
co sync.Cond

Protocol ConnType
KeepAlive bool
}

func (tcp *Peer) doRead() {
err := tcp.read()

if err != nil && err != ErrDisconnected {
_ = tcp.Close()

// TODO: error handling
slog.Error("couldn't read from conn", "err", err)
}
conn atomic.Pointer[net.Conn]
inout toyqueue.FeedDrainCloser
}

func (tcp *Peer) read() error {
var buf []byte
func (p *Peer) KeepRead(ctx context.Context) error {
var err error
var buf []byte
var recs toyqueue.Records

for {
conn := tcp.conn.Load()
select {
case <-ctx.Done():
break
default:
// continue
}

conn := p.conn.Load()
if conn == nil {
break
}

if buf, err = appendRead(buf, *conn, TYPICAL_MTU); err != nil {
if errors.Is(err, io.EOF) {
time.Sleep(time.Millisecond)
continue
}

return err
}

var recs toyqueue.Records
if recs, buf, err = Split(buf); err != nil {
recs, buf, err = Split(buf)
if err != nil {
return err
} else if len(recs) == 0 {
time.Sleep(time.Millisecond)
continue
}

if err = tcp.inout.Drain(recs); err != nil {
if err = p.inout.Drain(recs); err != nil {
return err
}
}

return nil
}

func (tcp *Peer) doWrite() {
var err error
var recs toyqueue.Records
for err == nil {
conn := tcp.conn.Load()
if conn == nil {
func (p *Peer) KeepWrite(ctx context.Context) error {
for {
select {
case <-ctx.Done():
break
default:
// continue
}

recs, err = tcp.inout.Feed()
b := net.Buffers(recs)
for len(b) > 0 && err == nil {
_, err = b.WriteTo(*conn)

conn := p.conn.Load()
if conn == nil {
break
}
}
if err != nil {
tcp.Close() // TODO err
}
}

func (tcp *Peer) Drain(recs toyqueue.Records) error {
return tcp.inout.Drain(recs)
}

func (tcp *Peer) Feed() (toyqueue.Records, error) {
return tcp.inout.Feed()
}

func (tcp *Peer) keepTalking() {
go tcp.doWrite()
go tcp.doRead()

talkBackoff, connBackoff := MIN_RETRY_PERIOD, MIN_RETRY_PERIOD

for tcp.reconnect == nil {
conntime := time.Now()

atLeast5min := conntime.Add(time.Minute * 5)
if atLeast5min.After(time.Now()) {
talkBackoff *= 2 // connected, tried to talk, failed => wait more
if talkBackoff > MAX_RETRY_PERIOD {
talkBackoff = MAX_RETRY_PERIOD
}
recs, err := p.inout.Feed()
if err != nil {
return err
}

for {
if conn := tcp.conn.Load(); conn == nil {
break
}

time.Sleep(connBackoff + talkBackoff)
conn, err := tcp.reconnect()
if err != nil {
connBackoff = connBackoff * 2
if connBackoff > MAX_RETRY_PERIOD/2 {
connBackoff = MAX_RETRY_PERIOD
}
} else {
tcp.conn.Store(&conn)
connBackoff = MIN_RETRY_PERIOD
b := net.Buffers(recs)
for len(b) > 0 && err == nil {
// TODO: consider the number of bytes written
if _, err = b.WriteTo(*conn); err != nil {
return err
}
}
}
}

func (tcp *Peer) Close() error {
tcp.mu.Lock()
defer tcp.mu.Unlock()
return nil
}

func (p *Peer) Close() error {
// TODO writer closes on complete | 1 sec expired
if conn := tcp.conn.Swap(nil); conn != nil {
if conn := p.conn.Swap(nil); conn != nil {
if err := (*conn).Close(); err != nil {
return err
}

tcp.co.Broadcast()
}

return nil
Expand Down
Loading

0 comments on commit 013c55f

Please sign in to comment.