Skip to content

Commit

Permalink
Merge pull request #34 from PretendoNetwork/stability
Browse files Browse the repository at this point in the history
Stability - Better concurrent map API and deferred packet handling
  • Loading branch information
jonbarrow authored Sep 26, 2023
2 parents acd5855 + aa29221 commit 9d6e91c
Show file tree
Hide file tree
Showing 13 changed files with 608 additions and 167 deletions.
41 changes: 34 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,42 @@ type Client struct {
clientConnectionSignature []byte
sessionKey []byte
sequenceIDIn *Counter
sequenceIDOut *Counter
sequenceIDOutManager *SequenceIDManager
pid uint32
stationURLs []*StationURL
connectionID uint32
pingCheckTimer *time.Timer
pingKickTimer *time.Timer
connected bool
incomingPacketManager *PacketManager
outgoingResendManager *PacketResendManager
}

// Reset resets the Client to default values
func (client *Client) Reset() error {
server := client.Server()

client.sequenceIDIn = NewCounter(0)
client.sequenceIDOut = NewCounter(0)
client.sequenceIDOutManager = NewSequenceIDManager() // TODO - Pass the server into here to get data for multiple substreams and the unreliable starting ID
client.incomingPacketManager = NewPacketManager()

if client.outgoingResendManager != nil {
// * PacketResendManager makes use of time.Ticker structs.
// * These create new channels and goroutines which won't
// * close even if the objects are deleted. To free up
// * resources, time.Ticker MUST be stopped before reassigning
client.outgoingResendManager.Clear()
}

client.outgoingResendManager = NewPacketResendManager(server.resendTimeout, server.resendTimeoutIncrement, server.resendMaxIterations)

client.UpdateAccessKey(client.Server().AccessKey())
client.UpdateAccessKey(server.AccessKey())
err := client.UpdateRC4Key([]byte("CD&ML"))
if err != nil {
return fmt.Errorf("Failed to update client RC4 key. %s", err.Error())
}

if client.Server().PRUDPVersion() == 0 {
if server.PRUDPVersion() == 0 {
client.SetServerConnectionSignature(make([]byte, 4))
client.SetClientConnectionSignature(make([]byte, 4))
} else {
Expand Down Expand Up @@ -149,9 +164,9 @@ func (client *Client) ClientConnectionSignature() []byte {
return client.clientConnectionSignature
}

// SequenceIDCounterOut returns the clients packet SequenceID counter for out-going packets
func (client *Client) SequenceIDCounterOut() *Counter {
return client.sequenceIDOut
// SequenceIDOutManager returns the clients packet SequenceID manager for out-going packets
func (client *Client) SequenceIDOutManager() *SequenceIDManager {
return client.sequenceIDOutManager
}

// SequenceIDCounterIn returns the clients packet SequenceID counter for incoming packets
Expand Down Expand Up @@ -233,6 +248,18 @@ func (client *Client) StartTimeoutTimer() {
})
}

// StopTimeoutTimer stops the packet timeout timer
func (client *Client) StopTimeoutTimer() {
//Stop the kick timer
if client.pingKickTimer != nil {
client.pingKickTimer.Stop()
}
//and the check timer
if client.pingCheckTimer != nil {
client.pingCheckTimer.Stop()
}
}

// NewClient returns a new PRUDP client
func NewClient(address *net.UDPAddr, server *Server) *Client {
client := &Client{
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
require (
github.com/PretendoNetwork/plogger-go v1.0.4
github.com/superwhiskers/crunch/v3 v3.5.7
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/mod v0.12.0
)

Expand All @@ -13,6 +14,6 @@ require (
github.com/jwalton/go-supportscolor v1.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.11.0 // indirect
)
8 changes: 5 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/PretendoNetwork/plogger-go v1.0.4 h1:PF7xHw9eDRHH+RsAP9tmAE7fG0N0p6H4
github.com/PretendoNetwork/plogger-go v1.0.4/go.mod h1:7kD6M4vPq1JL4LTuPg6kuB1OvUBOwQOtAvTaUwMbwvU=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/jwalton/go-supportscolor v1.2.0 h1:g6Ha4u7Vm3LIsQ5wmeBpS4gazu0UP1DRDE8y6bre4H8=
github.com/jwalton/go-supportscolor v1.2.0/go.mod h1:hFVUAZV2cWg+WFFC4v8pT2X/S2qUUBYMioBD9AINXGs=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
Expand All @@ -12,14 +12,16 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/superwhiskers/crunch/v3 v3.5.7 h1:N9RLxaR65C36i26BUIpzPXGy2f6pQ7wisu2bawbKNqg=
github.com/superwhiskers/crunch/v3 v3.5.7/go.mod h1:4ub2EKgF1MAhTjoOCTU4b9uLMsAweHEa89aRrfAypXA=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
76 changes: 76 additions & 0 deletions mutex_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package nex

import "sync"

// MutexMap implements a map type with go routine safe accessors through mutex locks. Embeds sync.RWMutex
type MutexMap[K comparable, V any] struct {
*sync.RWMutex
real map[K]V
}

// Set sets a key to a given value
func (m *MutexMap[K, V]) Set(key K, value V) {
m.Lock()
defer m.Unlock()

m.real[key] = value
}

// Get returns the given key value and a bool if found
func (m *MutexMap[K, V]) Get(key K) (V, bool) {
m.RLock()
defer m.RUnlock()

value, ok := m.real[key]

return value, ok
}

// Delete removes a key from the internal map
func (m *MutexMap[K, V]) Delete(key K) {
m.Lock()
defer m.Unlock()

delete(m.real, key)
}

// Size returns the length of the internal map
func (m *MutexMap[K, V]) Size() int {
m.RLock()
defer m.RUnlock()

return len(m.real)
}

// Each runs a callback function for every item in the map
// The map should not be modified inside the callback function
func (m *MutexMap[K, V]) Each(callback func(key K, value V)) {
m.RLock()
defer m.RUnlock()

for key, value := range m.real {
callback(key, value)
}
}

// Clear removes all items from the `real` map
// Accepts an optional callback function ran for every item before it is deleted
func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) {
m.Lock()
defer m.Unlock()

for key, value := range m.real {
if callback != nil {
callback(key, value)
}
delete(m.real, key)
}
}

// NewMutexMap returns a new instance of MutexMap with the provided key/value types
func NewMutexMap[K comparable, V any]() *MutexMap[K, V] {
return &MutexMap[K, V]{
RWMutex: &sync.RWMutex{},
real: make(map[K]V),
}
}
2 changes: 2 additions & 0 deletions packet_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nex

// PacketInterface implements all Packet methods
type PacketInterface interface {
Data() []byte
Sender() *Client
SetVersion(version uint8)
Version() uint8
Expand All @@ -28,6 +29,7 @@ type PacketInterface interface {
FragmentID() uint8
SetPayload(payload []byte)
Payload() []byte
DecryptPayload() error
RMCRequest() RMCRequest
Bytes() []byte
}
43 changes: 43 additions & 0 deletions packet_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package nex

// PacketManager implements an API for pushing/popping packets in the correct order
type PacketManager struct {
currentSequenceID *Counter
packets []PacketInterface
}

// Next gets the next packet in the sequence. Returns nil if the next packet has not been sent yet
func (p *PacketManager) Next() PacketInterface {
var packet PacketInterface

for i := 0; i < len(p.packets); i++ {
if p.currentSequenceID.Value() == uint32(p.packets[i].SequenceID()) {
packet = p.packets[i]
p.RemoveByIndex(i)
p.currentSequenceID.Increment()
break
}
}

return packet
}

// Push adds a packet to the pool to choose from in Next
func (p *PacketManager) Push(packet PacketInterface) {
p.packets = append(p.packets, packet)
}

// RemoveByIndex removes a packet from the pool using it's index in the slice
func (p *PacketManager) RemoveByIndex(i int) {
// * https://stackoverflow.com/a/37335777
p.packets[i] = p.packets[len(p.packets)-1]
p.packets = p.packets[:len(p.packets)-1]
}

// NewPacketManager returns a new PacketManager
func NewPacketManager() *PacketManager {
return &PacketManager{
currentSequenceID: NewCounter(0),
packets: make([]PacketInterface, 0),
}
}
114 changes: 114 additions & 0 deletions packet_resend_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package nex

import (
"time"
)

// PendingPacket represents a packet which the server has sent but not received an ACK for
// it handles it's own retransmission on a per-packet timer
type PendingPacket struct {
ticking bool
ticker *time.Ticker
quit chan struct{}
packet PacketInterface
iterations *Counter
timeout time.Duration
timeoutInc time.Duration
maxIterations int
}

// BeginTimeoutTimer starts the pending packets timeout timer until it is either stopped or maxIterations is hit
func (p *PendingPacket) BeginTimeoutTimer() {
go func() {
for {
select {
case <-p.quit:
return
case <-p.ticker.C:
client := p.packet.Sender()
server := client.Server()

if int(p.iterations.Increment()) > p.maxIterations {
// * Max iterations hit. Assume client is dead
server.TimeoutKick(client)
p.StopTimeoutTimer()
return
} else {
if p.timeoutInc != 0 {
p.timeout += p.timeoutInc
p.ticker.Reset(p.timeout)
}

// * Resend the packet
server.SendRaw(client.Address(), p.packet.Bytes())
}
}
}
}()
}

// StopTimeoutTimer stops the packet retransmission timer
func (p *PendingPacket) StopTimeoutTimer() {
if p.ticking {
close(p.quit)
p.ticker.Stop()
p.ticking = false
}
}

// NewPendingPacket returns a new PendingPacket
func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, timeoutIncrement time.Duration, maxIterations int) *PendingPacket {
p := &PendingPacket{
ticking: true,
ticker: time.NewTicker(timeoutTime),
quit: make(chan struct{}),
packet: packet,
iterations: NewCounter(0),
timeout: timeoutTime,
timeoutInc: timeoutIncrement,
maxIterations: maxIterations,
}

return p
}

// PacketResendManager manages all the pending packets sent the client waiting to be ACKed
type PacketResendManager struct {
pending *MutexMap[uint16, *PendingPacket]
timeoutTime time.Duration
timeoutInc time.Duration
maxIterations int
}

// Add creates a PendingPacket, adds it to the pool, and begins it's timeout timer
func (p *PacketResendManager) Add(packet PacketInterface) {
cached := NewPendingPacket(packet, p.timeoutTime, p.timeoutInc, p.maxIterations)
p.pending.Set(packet.SequenceID(), cached)

cached.BeginTimeoutTimer()
}

// Remove removes a packet from pool and stops it's timer
func (p *PacketResendManager) Remove(sequenceID uint16) {
if cached, ok := p.pending.Get(sequenceID); ok {
cached.StopTimeoutTimer()
p.pending.Delete(sequenceID)
}
}

// Clear removes all packets from pool and stops their timers
func (p *PacketResendManager) Clear() {
p.pending.Clear(func(key uint16, value *PendingPacket) {
value.StopTimeoutTimer()
})
}

// NewPacketResendManager returns a new PacketResendManager
func NewPacketResendManager(timeoutTime time.Duration, timeoutIncrement time.Duration, maxIterations int) *PacketResendManager {
return &PacketResendManager{
pending: NewMutexMap[uint16, *PendingPacket](),
timeoutTime: timeoutTime,
timeoutInc: timeoutIncrement,
maxIterations: maxIterations,
}
}
Loading

0 comments on commit 9d6e91c

Please sign in to comment.