From 1534de94ab6c931a7d4577c1b1dbd0bb9926651a Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Sun, 3 Sep 2023 16:34:07 -0400 Subject: [PATCH 01/34] update: added Remaining method to StreamIn --- stream_in.go | 87 +++++++++++++++++++++++++++------------------------- 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/stream_in.go b/stream_in.go index 046bdafe..78569383 100644 --- a/stream_in.go +++ b/stream_in.go @@ -15,9 +15,14 @@ type StreamIn struct { Server *Server } +// Remaining reads a bool +func (stream *StreamIn) Remaining() int { + return len(stream.Bytes()[stream.ByteOffset():]) +} + // ReadBool reads a bool func (stream *StreamIn) ReadBool() (bool, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 1 { + if stream.Remaining() < 1 { return false, errors.New("Not enough data to read bool") } @@ -26,7 +31,7 @@ func (stream *StreamIn) ReadBool() (bool, error) { // ReadUInt8 reads a uint8 func (stream *StreamIn) ReadUInt8() (uint8, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 1 { + if stream.Remaining() < 1 { return 0, errors.New("Not enough data to read uint8") } @@ -35,7 +40,7 @@ func (stream *StreamIn) ReadUInt8() (uint8, error) { // ReadInt8 reads a uint8 func (stream *StreamIn) ReadInt8() (int8, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 1 { + if stream.Remaining() < 1 { return 0, errors.New("Not enough data to read int8") } @@ -44,7 +49,7 @@ func (stream *StreamIn) ReadInt8() (int8, error) { // ReadUInt16LE reads a Little-Endian encoded uint16 func (stream *StreamIn) ReadUInt16LE() (uint16, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 2 { + if stream.Remaining() < 2 { return 0, errors.New("Not enough data to read uint16") } @@ -53,7 +58,7 @@ func (stream *StreamIn) ReadUInt16LE() (uint16, error) { // ReadUInt16BE reads a Big-Endian encoded uint16 func (stream *StreamIn) ReadUInt16BE() (uint16, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 2 { + if stream.Remaining() < 2 { return 0, errors.New("Not enough data to read uint16") } @@ -62,7 +67,7 @@ func (stream *StreamIn) ReadUInt16BE() (uint16, error) { // ReadInt16LE reads a Little-Endian encoded int16 func (stream *StreamIn) ReadInt16LE() (int16, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 2 { + if stream.Remaining() < 2 { return 0, errors.New("Not enough data to read int16") } @@ -71,7 +76,7 @@ func (stream *StreamIn) ReadInt16LE() (int16, error) { // ReadInt16BE reads a Big-Endian encoded int16 func (stream *StreamIn) ReadInt16BE() (int16, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 2 { + if stream.Remaining() < 2 { return 0, errors.New("Not enough data to read int16") } @@ -80,7 +85,7 @@ func (stream *StreamIn) ReadInt16BE() (int16, error) { // ReadUInt32LE reads a Little-Endian encoded uint32 func (stream *StreamIn) ReadUInt32LE() (uint32, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 4 { + if stream.Remaining() < 4 { return 0, errors.New("Not enough data to read uint32") } @@ -89,7 +94,7 @@ func (stream *StreamIn) ReadUInt32LE() (uint32, error) { // ReadUInt32BE reads a Big-Endian encoded uint32 func (stream *StreamIn) ReadUInt32BE() (uint32, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 4 { + if stream.Remaining() < 4 { return 0, errors.New("Not enough data to read uint32") } @@ -98,7 +103,7 @@ func (stream *StreamIn) ReadUInt32BE() (uint32, error) { // ReadInt32LE reads a Little-Endian encoded int32 func (stream *StreamIn) ReadInt32LE() (int32, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 4 { + if stream.Remaining() < 4 { return 0, errors.New("Not enough data to read int32") } @@ -107,7 +112,7 @@ func (stream *StreamIn) ReadInt32LE() (int32, error) { // ReadInt32BE reads a Big-Endian encoded int32 func (stream *StreamIn) ReadInt32BE() (int32, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 4 { + if stream.Remaining() < 4 { return 0, errors.New("Not enough data to read int32") } @@ -116,7 +121,7 @@ func (stream *StreamIn) ReadInt32BE() (int32, error) { // ReadUInt64LE reads a Little-Endian encoded uint64 func (stream *StreamIn) ReadUInt64LE() (uint64, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 8 { + if stream.Remaining() < 8 { return 0, errors.New("Not enough data to read uint64") } @@ -125,7 +130,7 @@ func (stream *StreamIn) ReadUInt64LE() (uint64, error) { // ReadUInt64BE reads a Big-Endian encoded uint64 func (stream *StreamIn) ReadUInt64BE() (uint64, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 8 { + if stream.Remaining() < 8 { return 0, errors.New("Not enough data to read uint64") } @@ -134,7 +139,7 @@ func (stream *StreamIn) ReadUInt64BE() (uint64, error) { // ReadInt64LE reads a Little-Endian encoded int64 func (stream *StreamIn) ReadInt64LE() (int64, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 8 { + if stream.Remaining() < 8 { return 0, errors.New("Not enough data to read int64") } @@ -143,7 +148,7 @@ func (stream *StreamIn) ReadInt64LE() (int64, error) { // ReadInt64BE reads a Big-Endian encoded int64 func (stream *StreamIn) ReadInt64BE() (int64, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 8 { + if stream.Remaining() < 8 { return 0, errors.New("Not enough data to read int64") } @@ -152,7 +157,7 @@ func (stream *StreamIn) ReadInt64BE() (int64, error) { // ReadFloat32LE reads a Little-Endian encoded float32 func (stream *StreamIn) ReadFloat32LE() (float32, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 4 { + if stream.Remaining() < 4 { return 0, errors.New("Not enough data to read float32") } @@ -161,7 +166,7 @@ func (stream *StreamIn) ReadFloat32LE() (float32, error) { // ReadFloat32BE reads a Big-Endian encoded float32 func (stream *StreamIn) ReadFloat32BE() (float32, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 4 { + if stream.Remaining() < 4 { return 0, errors.New("Not enough data to read float32") } @@ -170,7 +175,7 @@ func (stream *StreamIn) ReadFloat32BE() (float32, error) { // ReadFloat64LE reads a Little-Endian encoded float64 func (stream *StreamIn) ReadFloat64LE() (float64, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 8 { + if stream.Remaining() < 8 { return 0, errors.New("Not enough data to read float64") } @@ -179,7 +184,7 @@ func (stream *StreamIn) ReadFloat64LE() (float64, error) { // ReadFloat64BE reads a Big-Endian encoded float64 func (stream *StreamIn) ReadFloat64BE() (float64, error) { - if len(stream.Bytes()[stream.ByteOffset():]) < 8 { + if stream.Remaining() < 8 { return 0, errors.New("Not enough data to read float64") } @@ -193,7 +198,7 @@ func (stream *StreamIn) ReadString() (string, error) { return "", fmt.Errorf("Failed to read NEX string length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length) { + if stream.Remaining() < int(length) { return "", errors.New("NEX string length longer than data size") } @@ -210,7 +215,7 @@ func (stream *StreamIn) ReadBuffer() ([]byte, error) { return []byte{}, fmt.Errorf("Failed to read NEX buffer length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length) { + if stream.Remaining() < int(length) { return []byte{}, errors.New("NEX buffer length longer than data size") } @@ -226,7 +231,7 @@ func (stream *StreamIn) ReadQBuffer() ([]byte, error) { return []byte{}, fmt.Errorf("Failed to read NEX qBuffer length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length) { + if stream.Remaining() < int(length) { return []byte{}, errors.New("NEX qBuffer length longer than data size") } @@ -257,7 +262,7 @@ func (stream *StreamIn) ReadStructure(structure StructureInterface) (StructureIn return nil, fmt.Errorf("Failed to read NEX Structure content length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(structureLength) { + if stream.Remaining() < int(structureLength) { return nil, errors.New("NEX Structure content length longer than data size") } @@ -373,7 +378,7 @@ func (stream *StreamIn) ReadListUInt8() ([]uint8, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length) { + if stream.Remaining() < int(length) { return nil, errors.New("NEX List length longer than data size") } @@ -398,7 +403,7 @@ func (stream *StreamIn) ReadListInt8() ([]int8, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length) { + if stream.Remaining() < int(length) { return nil, errors.New("NEX List length longer than data size") } @@ -423,7 +428,7 @@ func (stream *StreamIn) ReadListUInt16LE() ([]uint16, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*2) { + if stream.Remaining() < int(length*2) { return nil, errors.New("NEX List length longer than data size") } @@ -448,7 +453,7 @@ func (stream *StreamIn) ReadListUInt16BE() ([]uint16, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*2) { + if stream.Remaining() < int(length*2) { return nil, errors.New("NEX List length longer than data size") } @@ -473,7 +478,7 @@ func (stream *StreamIn) ReadListInt16LE() ([]int16, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*2) { + if stream.Remaining() < int(length*2) { return nil, errors.New("NEX List length longer than data size") } @@ -498,7 +503,7 @@ func (stream *StreamIn) ReadListInt16BE() ([]int16, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*2) { + if stream.Remaining() < int(length*2) { return nil, errors.New("NEX List length longer than data size") } @@ -523,7 +528,7 @@ func (stream *StreamIn) ReadListUInt32LE() ([]uint32, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } @@ -548,7 +553,7 @@ func (stream *StreamIn) ReadListUInt32BE() ([]uint32, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } @@ -573,7 +578,7 @@ func (stream *StreamIn) ReadListInt32LE() ([]int32, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } @@ -598,7 +603,7 @@ func (stream *StreamIn) ReadListInt32BE() ([]int32, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } @@ -623,7 +628,7 @@ func (stream *StreamIn) ReadListUInt64LE() ([]uint64, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*8) { + if stream.Remaining() < int(length*8) { return nil, errors.New("NEX List length longer than data size") } @@ -648,7 +653,7 @@ func (stream *StreamIn) ReadListUInt64BE() ([]uint64, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*8) { + if stream.Remaining() < int(length*8) { return nil, errors.New("NEX List length longer than data size") } @@ -673,7 +678,7 @@ func (stream *StreamIn) ReadListInt64LE() ([]int64, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*8) { + if stream.Remaining() < int(length*8) { return nil, errors.New("NEX List length longer than data size") } @@ -698,7 +703,7 @@ func (stream *StreamIn) ReadListInt64BE() ([]int64, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*8) { + if stream.Remaining() < int(length*8) { return nil, errors.New("NEX List length longer than data size") } @@ -723,7 +728,7 @@ func (stream *StreamIn) ReadListFloat32LE() ([]float32, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } @@ -748,7 +753,7 @@ func (stream *StreamIn) ReadListFloat32BE() ([]float32, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } @@ -773,7 +778,7 @@ func (stream *StreamIn) ReadListFloat64LE() ([]float64, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } @@ -798,7 +803,7 @@ func (stream *StreamIn) ReadListFloat64BE() ([]float64, error) { return nil, fmt.Errorf("Failed to read List length. %s", err.Error()) } - if len(stream.Bytes()[stream.ByteOffset():]) < int(length*4) { + if stream.Remaining() < int(length*4) { return nil, errors.New("NEX List length longer than data size") } From 9a8eb5c83f65bb2cd3907ad2be53924cdbafa3a4 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Sun, 3 Sep 2023 16:35:39 -0400 Subject: [PATCH 02/34] update: added ReadRemaining method to StreamIn --- stream_in.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/stream_in.go b/stream_in.go index 78569383..5ce718e9 100644 --- a/stream_in.go +++ b/stream_in.go @@ -20,6 +20,12 @@ func (stream *StreamIn) Remaining() int { return len(stream.Bytes()[stream.ByteOffset():]) } +// ReadRemaining reads a bool +func (stream *StreamIn) ReadRemaining() []byte { + // TODO - Should we do a bounds check here? Or just allow empty slices? + return stream.ReadBytesNext(int64(stream.Remaining())) +} + // ReadBool reads a bool func (stream *StreamIn) ReadBool() (bool, error) { if stream.Remaining() < 1 { From 23f3683781a6c7a305af5c1da43d32b7bc964e2d Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Sun, 3 Sep 2023 17:00:19 -0400 Subject: [PATCH 03/34] update: added experimental MutexMap type for safe map access --- mutex_map.go | 51 ++++++++++++++++++++++++++++++++++++++++ server.go | 66 +++++++++++++++++++++------------------------------- 2 files changed, 78 insertions(+), 39 deletions(-) create mode 100644 mutex_map.go diff --git a/mutex_map.go b/mutex_map.go new file mode 100644 index 00000000..8a36866a --- /dev/null +++ b/mutex_map.go @@ -0,0 +1,51 @@ +package nex + +import "sync" + +// MutexMap implements a map type with go routine safe accessors through mutex locks. Embeds sync.RWMutex +type MutexMap[K comparable, V any] struct { + *sync.RWMutex + real map[K]V +} + +// Set sets a key to a given value +func (m *MutexMap[K, V]) Set(key K, value V) { + m.Lock() + defer m.Unlock() + + m.real[key] = value +} + +// Get returns the given key value and a bool if found +func (m *MutexMap[K, V]) Get(key K) (V, bool) { + m.RLock() + defer m.RUnlock() + + value, ok := m.real[key] + + return value, ok +} + +// Delete removes a key from the internal map +func (m *MutexMap[K, V]) Delete(key K) { + m.Lock() + defer m.Unlock() + + delete(m.real, key) +} + +// Size returns the length of the internal map +func (m *MutexMap[K, V]) Size() int { + m.RLock() + defer m.RUnlock() + + return len(m.real) +} + +// NewMutexMap returns a new instance of MutexMap with the provided key/value types +func NewMutexMap[K comparable, V any]() *MutexMap[K, V] { + return &MutexMap[K, V]{ + RWMutex: &sync.RWMutex{}, + real: make(map[K]V), + } +} diff --git a/server.go b/server.go index 5a635b45..1060688e 100644 --- a/server.go +++ b/server.go @@ -15,15 +15,13 @@ import ( "net/http" "runtime" "strconv" - "sync" "time" ) // Server represents a PRUDP server type Server struct { socket *net.UDPConn - clients map[string]*Client - clientMutex *sync.RWMutex + clients *MutexMap[string, *Client] genericEventHandles map[string][]func(PacketInterface) prudpV0EventHandles map[string][]func(*PacketV0) prudpV1EventHandles map[string][]func(*PacketV1) @@ -105,16 +103,12 @@ func (server *Server) handleSocketMessage() error { discriminator := addr.String() - server.clientMutex.RLock() - client, ok := server.clients[discriminator] - server.clientMutex.RUnlock() + client, ok := server.clients.Get(discriminator) if !ok { client = NewClient(addr, server) - server.clientMutex.Lock() - server.clients[discriminator] = client - server.clientMutex.Unlock() + server.clients.Set(discriminator, client) } data := buffer[0:length] @@ -364,9 +358,7 @@ func (server *Server) Emit(event string, packet interface{}) { func (server *Server) ClientConnected(client *Client) bool { discriminator := client.Address().String() - server.clientMutex.RLock() - _, connected := server.clients[discriminator] - server.clientMutex.RUnlock() + _, connected := server.clients.Get(discriminator) return connected } @@ -400,9 +392,7 @@ func (server *Server) TimeoutKick(client *Client) { client.SetConnected(false) discriminator := client.Address().String() - server.clientMutex.Lock() - delete(server.clients, discriminator) - server.clientMutex.Unlock() + server.clients.Delete(discriminator) } // GracefulKick removes an active client from the server @@ -436,18 +426,17 @@ func (server *Server) GracefulKick(client *Client) { client.SetConnected(false) discriminator := client.Address().String() - server.clientMutex.Lock() - delete(server.clients, discriminator) - server.clientMutex.Unlock() + server.clients.Delete(discriminator) } // GracefulKickAll removes all clients from the server func (server *Server) GracefulKickAll() { // * https://stackoverflow.com/a/40456170 - server.clientMutex.RLock() - defer server.clientMutex.RUnlock() - for _, client := range server.clients { - server.clientMutex.RUnlock() + server.clients.RLock() + defer server.clients.RUnlock() + // TODO - MAKE A BETTER API FOR RANGING OVER THIS DATA INSIDE MutexMap! + for _, client := range server.clients.real { + server.clients.RUnlock() var packet PacketInterface var err error @@ -462,7 +451,7 @@ func (server *Server) GracefulKickAll() { if err != nil { // TODO - Should this return the error too? logger.Error(err.Error()) - server.clientMutex.RLock() + server.clients.RLock() continue } @@ -478,11 +467,9 @@ func (server *Server) GracefulKickAll() { client.SetConnected(false) discriminator := client.Address().String() - server.clientMutex.Lock() - delete(server.clients, discriminator) - server.clientMutex.Unlock() + server.clients.Delete(discriminator) - server.clientMutex.RLock() + server.clients.RLock() } } @@ -793,16 +780,17 @@ func (server *Server) ConnectionIDCounter() *Counter { // FindClientFromPID finds a client by their PID func (server *Server) FindClientFromPID(pid uint32) *Client { // * https://stackoverflow.com/a/40456170 - server.clientMutex.RLock() - for _, client := range server.clients { - server.clientMutex.RUnlock() + // TODO - MAKE A BETTER API FOR RANGING OVER THIS DATA INSIDE MutexMap! + server.clients.RLock() + for _, client := range server.clients.real { + server.clients.RUnlock() if client.pid == pid { return client } - server.clientMutex.RLock() + server.clients.RLock() } - server.clientMutex.RUnlock() + server.clients.RUnlock() return nil } @@ -810,16 +798,17 @@ func (server *Server) FindClientFromPID(pid uint32) *Client { // FindClientFromConnectionID finds a client by their Connection ID func (server *Server) FindClientFromConnectionID(rvcid uint32) *Client { // * https://stackoverflow.com/a/40456170 - server.clientMutex.RLock() - for _, client := range server.clients { - server.clientMutex.RUnlock() + // TODO - MAKE A BETTER API FOR RANGING OVER THIS DATA INSIDE MutexMap! + server.clients.RLock() + for _, client := range server.clients.real { + server.clients.RUnlock() if client.connectionID == rvcid { return client } - server.clientMutex.RLock() + server.clients.RLock() } - server.clientMutex.RUnlock() + server.clients.RUnlock() return nil } @@ -894,8 +883,7 @@ func NewServer() *Server { prudpV1EventHandles: make(map[string][]func(*PacketV1)), hppEventHandles: make(map[string][]func(*HPPPacket)), hppClientResponses: make(map[*Client](chan []byte)), - clients: make(map[string]*Client), - clientMutex: &sync.RWMutex{}, + clients: NewMutexMap[string, *Client](), prudpVersion: 1, fragmentSize: 1300, resendTimeout: 1.5, From c98e4817f117a21ab3d6d74e3111344f5b4c63c7 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Sun, 3 Sep 2023 18:27:14 -0400 Subject: [PATCH 04/34] feat: beginnings of deferred packet handling --- client.go | 3 +++ packet_interface.go | 1 + packet_manager.go | 43 +++++++++++++++++++++++++++++++++++++++++++ packet_v0.go | 32 +++++++++++++++++++------------- packet_v1.go | 33 +++++++++++++++++++-------------- rmc.go | 2 ++ server.go | 31 +++++++++++++++++++++++++++++++ 7 files changed, 118 insertions(+), 27 deletions(-) create mode 100644 packet_manager.go diff --git a/client.go b/client.go index 1bcfec31..e4126ed4 100644 --- a/client.go +++ b/client.go @@ -28,12 +28,14 @@ type Client struct { pingCheckTimer *time.Timer pingKickTimer *time.Timer connected bool + incomingPacketManager *PacketManager } // Reset resets the Client to default values func (client *Client) Reset() error { client.sequenceIDIn = NewCounter(0) client.sequenceIDOut = NewCounter(0) + client.incomingPacketManager = NewPacketManager() client.UpdateAccessKey(client.Server().AccessKey()) err := client.UpdateRC4Key([]byte("CD&ML")) @@ -155,6 +157,7 @@ func (client *Client) SequenceIDCounterOut() *Counter { } // SequenceIDCounterIn returns the clients packet SequenceID counter for incoming packets +// TODO - Rename this? This name kinda sucks now that it's being used for deferred packet handling func (client *Client) SequenceIDCounterIn() *Counter { return client.sequenceIDIn } diff --git a/packet_interface.go b/packet_interface.go index 1f522fe6..0d02f2fa 100644 --- a/packet_interface.go +++ b/packet_interface.go @@ -28,6 +28,7 @@ type PacketInterface interface { FragmentID() uint8 SetPayload(payload []byte) Payload() []byte + DecryptPayload() error RMCRequest() RMCRequest Bytes() []byte } diff --git a/packet_manager.go b/packet_manager.go new file mode 100644 index 00000000..ab49806c --- /dev/null +++ b/packet_manager.go @@ -0,0 +1,43 @@ +package nex + +// PacketManager implements an API for pushing/popping packets in the correct order +type PacketManager struct { + currentSequenceID *Counter + packets []PacketInterface +} + +// Next gets the next packet in the sequence. Returns nil if the next packet has not been sent yet +func (p *PacketManager) Next() PacketInterface { + var packet PacketInterface + + for i := 0; i < len(p.packets); i++ { + if p.currentSequenceID.Value() == uint32(p.packets[i].SequenceID()) { + packet = p.packets[i] + p.RemoveByIndex(i) + p.currentSequenceID.Increment() + break + } + } + + return packet +} + +// Push adds a packet to the pool to choose from in Next +func (p *PacketManager) Push(packet PacketInterface) { + p.packets = append(p.packets, packet) +} + +// RemoveByIndex removes a packet from the pool using it's index in the slice +func (p *PacketManager) RemoveByIndex(i int) { + // * https://stackoverflow.com/a/37335777 + p.packets[i] = p.packets[len(p.packets)-1] + p.packets = p.packets[:len(p.packets)-1] +} + +// NewPacketManager returns a new PacketManager +func NewPacketManager() *PacketManager { + return &PacketManager{ + currentSequenceID: NewCounter(0), + packets: make([]PacketInterface, 0), + } +} diff --git a/packet_v0.go b/packet_v0.go index 351b7fde..6a906746 100644 --- a/packet_v0.go +++ b/packet_v0.go @@ -117,19 +117,6 @@ func (packet *PacketV0) Decode() error { payloadCrypted := stream.ReadBytesNext(int64(payloadSize)) packet.SetPayload(payloadCrypted) - - if packet.Type() == DataPacket { - ciphered := make([]byte, payloadSize) - packet.Sender().Decipher().XORKeyStream(ciphered, payloadCrypted) - - request := NewRMCRequest() - err := request.FromBytes(ciphered) - if err != nil { - return errors.New("[PRUDPv0] Error parsing RMC request: " + err.Error()) - } - - packet.rmcRequest = request - } } if len(packet.Data()[stream.ByteOffset():]) < int(checksumSize) { @@ -154,6 +141,25 @@ func (packet *PacketV0) Decode() error { return nil } +// DecryptPayload decrypts the packets payload and sets the RMC request data +func (packet *PacketV0) DecryptPayload() error { + if packet.Type() == DataPacket && !packet.HasFlag(FlagAck) { + ciphered := make([]byte, len(packet.Payload())) + + packet.Sender().Decipher().XORKeyStream(ciphered, packet.Payload()) + + request := NewRMCRequest() + err := request.FromBytes(ciphered) + if err != nil { + return fmt.Errorf("Failed to read PRUDPv0 RMC request. %s", err.Error()) + } + + packet.rmcRequest = request + } + + return nil +} + // Bytes encodes the packet and returns a byte array func (packet *PacketV0) Bytes() []byte { if packet.Type() == DataPacket { diff --git a/packet_v1.go b/packet_v1.go index df099962..2de4843a 100644 --- a/packet_v1.go +++ b/packet_v1.go @@ -199,20 +199,6 @@ func (packet *PacketV1) Decode() error { payloadCrypted := stream.ReadBytesNext(int64(payloadSize)) packet.SetPayload(payloadCrypted) - - if packet.Type() == DataPacket && !packet.HasFlag(FlagMultiAck) { - ciphered := make([]byte, payloadSize) - - packet.Sender().Decipher().XORKeyStream(ciphered, payloadCrypted) - - request := NewRMCRequest() - err := request.FromBytes(ciphered) - if err != nil { - return fmt.Errorf("Failed to read PRUDPv1 RMC request. %s", err.Error()) - } - - packet.rmcRequest = request - } } calculatedSignature := packet.calculateSignature(packet.Data()[2:14], packet.Sender().ServerConnectionSignature(), options, packet.Payload()) @@ -224,6 +210,25 @@ func (packet *PacketV1) Decode() error { return nil } +// DecryptPayload decrypts the packets payload and sets the RMC request data +func (packet *PacketV1) DecryptPayload() error { + if packet.Type() == DataPacket && !packet.HasFlag(FlagMultiAck) { + ciphered := make([]byte, len(packet.Payload())) + + packet.Sender().Decipher().XORKeyStream(ciphered, packet.Payload()) + + request := NewRMCRequest() + err := request.FromBytes(ciphered) + if err != nil { + return fmt.Errorf("Failed to read PRUDPv1 RMC request. %s", err.Error()) + } + + packet.rmcRequest = request + } + + return nil +} + // Bytes encodes the packet and returns a byte array func (packet *PacketV1) Bytes() []byte { if packet.Type() == DataPacket { diff --git a/rmc.go b/rmc.go index 2d0ce7cd..fb752517 100644 --- a/rmc.go +++ b/rmc.go @@ -5,6 +5,8 @@ import ( "fmt" ) +// TODO - We should probably combine RMCRequest and RMCResponse in a single RMCMessage for simpler packet payload setting/reading that supports both request and response payloads + // RMCRequest represets a RMC request type RMCRequest struct { protocolID uint8 diff --git a/server.go b/server.go index 1060688e..2cca2152 100644 --- a/server.go +++ b/server.go @@ -129,6 +129,33 @@ func (server *Server) handleSocketMessage() error { client.IncreasePingTimeoutTime(server.PingTimeout()) + if packet.HasFlag(FlagAck) || packet.HasFlag(FlagMultiAck) { + // * Bail early + // TODO - Track this in a server->client packet manager, and do retransmission on our end + return nil + } + + // TODO - Make a better API in client to access incomingPacketManager? + client.incomingPacketManager.Push(packet) + + // TODO - Make this API smarter. Only track missing packets? + // TODO - Only loop to process out of order packets when correct next packet is found? + if next := client.incomingPacketManager.Next(); next != nil { + // TODO - Should we explicitly check for errors here and log instead of passing the error up? + return server.processPacket(next) + } + + return nil +} + +func (server *Server) processPacket(packet PacketInterface) error { + err := packet.DecryptPayload() + if err != nil { + return err + } + + client := packet.Sender() + if packet.HasFlag(FlagAck) || packet.HasFlag(FlagMultiAck) { return nil } @@ -160,6 +187,10 @@ func (server *Server) handleSocketMessage() error { client.SetConnected(true) client.StartTimeoutTimer() + // TODO - Don't make this part suck ass? + // * Manually incrementing because the original manager gets destroyed in the reset + // * but we need to still track the SYN packet was sent + client.incomingPacketManager.currentSequenceID.Increment() server.Emit("Syn", packet) case ConnectPacket: packet.Sender().SetClientConnectionSignature(packet.ConnectionSignature()) From 58359dce0c5105b7892f5e4074e1657f2a1d75b2 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Sun, 3 Sep 2023 18:51:30 -0400 Subject: [PATCH 05/34] chore: remove old TODO in client.go --- client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/client.go b/client.go index e4126ed4..afe258f8 100644 --- a/client.go +++ b/client.go @@ -157,7 +157,6 @@ func (client *Client) SequenceIDCounterOut() *Counter { } // SequenceIDCounterIn returns the clients packet SequenceID counter for incoming packets -// TODO - Rename this? This name kinda sucks now that it's being used for deferred packet handling func (client *Client) SequenceIDCounterIn() *Counter { return client.sequenceIDIn } From aac9380b1862ae9c56453489be5acee2a9be337e Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Sun, 3 Sep 2023 19:02:14 -0400 Subject: [PATCH 06/34] refactor: keep checking for packets in PacketManager pool using Next loop --- server.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/server.go b/server.go index 2cca2152..99703844 100644 --- a/server.go +++ b/server.go @@ -138,11 +138,19 @@ func (server *Server) handleSocketMessage() error { // TODO - Make a better API in client to access incomingPacketManager? client.incomingPacketManager.Push(packet) - // TODO - Make this API smarter. Only track missing packets? - // TODO - Only loop to process out of order packets when correct next packet is found? - if next := client.incomingPacketManager.Next(); next != nil { - // TODO - Should we explicitly check for errors here and log instead of passing the error up? - return server.processPacket(next) + // TODO - Make this API smarter. Only track missing packets and not all packets? + // * Keep processing packets so long as the next one is in the pool, + // * this way if several packets came in out of order they all get + // * processed at once the moment the correct next packet comes in + for next := client.incomingPacketManager.Next(); next != nil; { + err := server.processPacket(next) + if err != nil { + // TODO - Should this return the error too? + logger.Error(err.Error()) + return nil + } + + next = client.incomingPacketManager.Next() } return nil From 810ab36c03d21e7729a33e93ffe65b12a1b13bc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20L=C3=B3pez=20Guimaraes?= Date: Tue, 5 Sep 2023 20:51:10 +0100 Subject: [PATCH 07/34] Stop timeout when receiving disconnect packet Don't send the timeout packet when the client disconnects reliably. --- client.go | 12 ++++++++++++ server.go | 1 + 2 files changed, 13 insertions(+) diff --git a/client.go b/client.go index afe258f8..79fd7147 100644 --- a/client.go +++ b/client.go @@ -235,6 +235,18 @@ func (client *Client) StartTimeoutTimer() { }) } +// StopTimeoutTimer stops the packet timeout timer +func (client *Client) StopTimeoutTimer() { + //Stop the kick timer + if client.pingKickTimer != nil { + client.pingKickTimer.Stop() + } + //and the check timer + if client.pingCheckTimer != nil { + client.pingCheckTimer.Stop() + } +} + // NewClient returns a new PRUDP client func NewClient(address *net.UDPAddr, server *Server) *Client { client := &Client{ diff --git a/server.go b/server.go index 99703844..25475b15 100644 --- a/server.go +++ b/server.go @@ -208,6 +208,7 @@ func (server *Server) processPacket(packet PacketInterface) error { server.Emit("Data", packet) case DisconnectPacket: server.Emit("Disconnect", packet) + client.StopTimeoutTimer() server.GracefulKick(client) case PingPacket: //server.SendPing(client) From cb09b0045f7890edd02186bd3ac334de7d7195d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20L=C3=B3pez=20Guimaraes?= Date: Tue, 5 Sep 2023 20:56:57 +0100 Subject: [PATCH 08/34] Stop timeout on GracefulKick --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 25475b15..42dd964a 100644 --- a/server.go +++ b/server.go @@ -208,7 +208,6 @@ func (server *Server) processPacket(packet PacketInterface) error { server.Emit("Data", packet) case DisconnectPacket: server.Emit("Disconnect", packet) - client.StopTimeoutTimer() server.GracefulKick(client) case PingPacket: //server.SendPing(client) @@ -464,6 +463,7 @@ func (server *Server) GracefulKick(client *Client) { server.Emit("Kick", packet) client.SetConnected(false) + client.StopTimeoutTimer() discriminator := client.Address().String() server.clients.Delete(discriminator) From 8e627d08019a533643cbe7edff0b1e80ba6084a8 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Mon, 11 Sep 2023 13:35:41 -0400 Subject: [PATCH 09/34] Added SequenceIDManager and separate counters for reliable and PING packets --- client.go | 10 +++++----- sequence_id_manager.go | 29 +++++++++++++++++++++++++++++ server.go | 2 +- 3 files changed, 35 insertions(+), 6 deletions(-) create mode 100644 sequence_id_manager.go diff --git a/client.go b/client.go index 79fd7147..48cee9e6 100644 --- a/client.go +++ b/client.go @@ -21,7 +21,7 @@ type Client struct { clientConnectionSignature []byte sessionKey []byte sequenceIDIn *Counter - sequenceIDOut *Counter + sequenceIDOutManager *SequenceIDManager pid uint32 stationURLs []*StationURL connectionID uint32 @@ -34,7 +34,7 @@ type Client struct { // Reset resets the Client to default values func (client *Client) Reset() error { client.sequenceIDIn = NewCounter(0) - client.sequenceIDOut = NewCounter(0) + client.sequenceIDOutManager = NewSequenceIDManager() // TODO - Pass the server into here to get data for multiple substreams and the unreliable starting ID client.incomingPacketManager = NewPacketManager() client.UpdateAccessKey(client.Server().AccessKey()) @@ -151,9 +151,9 @@ func (client *Client) ClientConnectionSignature() []byte { return client.clientConnectionSignature } -// SequenceIDCounterOut returns the clients packet SequenceID counter for out-going packets -func (client *Client) SequenceIDCounterOut() *Counter { - return client.sequenceIDOut +// SequenceIDOutManager returns the clients packet SequenceID manager for out-going packets +func (client *Client) SequenceIDOutManager() *SequenceIDManager { + return client.sequenceIDOutManager } // SequenceIDCounterIn returns the clients packet SequenceID counter for incoming packets diff --git a/sequence_id_manager.go b/sequence_id_manager.go new file mode 100644 index 00000000..ca80bde6 --- /dev/null +++ b/sequence_id_manager.go @@ -0,0 +1,29 @@ +package nex + +// SequenceIDManager implements an API for managing the sequence IDs of different packet streams on a client +type SequenceIDManager struct { + reliableCounter *Counter // TODO - NEX only uses one reliable stream, but Rendezvous supports many. This needs to be a slice! + pingCounter *Counter + // TODO - Unreliable packets for Rendezvous +} + +// Next gets the next sequence ID for the packet. Returns 0 for an unsupported packet +func (s *SequenceIDManager) Next(packet PacketInterface) uint32 { + if packet.HasFlag(FlagReliable) { + return s.reliableCounter.Increment() + } + + if packet.Type() == PingPacket { + return s.pingCounter.Increment() + } + + return 0 +} + +// NewSequenceIDManager returns a new SequenceIDManager +func NewSequenceIDManager() *SequenceIDManager { + return &SequenceIDManager{ + reliableCounter: NewCounter(0), + pingCounter: NewCounter(0), + } +} diff --git a/server.go b/server.go index 42dd964a..aeba9197 100644 --- a/server.go +++ b/server.go @@ -899,7 +899,7 @@ func (server *Server) SendFragment(packet PacketInterface, fragmentID uint8) { packet.SetFragmentID(fragmentID) packet.SetPayload(data) - packet.SetSequenceID(uint16(client.SequenceIDCounterOut().Increment())) + packet.SetSequenceID(uint16(client.SequenceIDOutManager().Next(packet))) encodedPacket := packet.Bytes() From 340a52d97ea74b623056a9ff9af7757929234c27 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Tue, 12 Sep 2023 14:17:47 +0200 Subject: [PATCH 10/34] Add emulated packet drop option --- server.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/server.go b/server.go index 5a635b45..1be807b1 100644 --- a/server.go +++ b/server.go @@ -11,6 +11,7 @@ package nex import ( "crypto/rand" "fmt" + mrand "math/rand" "net" "net/http" "runtime" @@ -50,6 +51,8 @@ type Server struct { messagingProtocolVersion *NEXVersion utilityProtocolVersion *NEXVersion natTraversalProtocolVersion *NEXVersion + emuSendPacketDropPercent int + emuRecvPacketDropPercent int } // Listen starts a NEX server on a given address @@ -103,6 +106,11 @@ func (server *Server) handleSocketMessage() error { return err } + if server.shouldDropPacket(true) { + // Emulate packet drop for debugging + return nil + } + discriminator := addr.String() server.clientMutex.RLock() @@ -879,6 +887,10 @@ func (server *Server) SendFragment(packet PacketInterface, fragmentID uint8) { // SendRaw writes raw packet data to the client socket func (server *Server) SendRaw(conn *net.UDPAddr, data []byte) { + if server.shouldDropPacket(false) { + // Emulate packet drop for debugging + return + } _, err := server.Socket().WriteToUDP(data, conn) if err != nil { // TODO - Should this return the error too? @@ -886,6 +898,23 @@ func (server *Server) SendRaw(conn *net.UDPAddr, data []byte) { } } +func (server *Server) shouldDropPacket(isRecv bool) bool { + if isRecv { + return server.emuRecvPacketDropPercent != 0 && mrand.Intn(100) < server.emuRecvPacketDropPercent + } else { + return server.emuSendPacketDropPercent != 0 && mrand.Intn(100) < server.emuSendPacketDropPercent + } +} + +// SetEmulatedPacketDropPercent sets the percentage of emulated sent and received dropped packets, set to 0 to disable. +func (server *Server) SetEmulatedPacketDropPercent(forRecv bool, percent int) { + if forRecv { + server.emuRecvPacketDropPercent = percent + } else { + server.emuSendPacketDropPercent = percent + } +} + // NewServer returns a new NEX server func NewServer() *Server { server := &Server{ @@ -903,6 +932,8 @@ func NewServer() *Server { kerberosKeySize: 32, kerberosKeyDerivation: 0, connectionIDCounter: NewCounter(10), + emuSendPacketDropPercent: 0, + emuRecvPacketDropPercent: 0, } server.SetDefaultNEXVersion(NewNEXVersion(0, 0, 0)) From 760eedd9149a953df7211ce65531a383edaf7e41 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Tue, 12 Sep 2023 17:37:41 +0200 Subject: [PATCH 11/34] Update server.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniel López Guimaraes <112760654+DaniElectra@users.noreply.github.com> --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 1be807b1..7ea1edac 100644 --- a/server.go +++ b/server.go @@ -906,7 +906,7 @@ func (server *Server) shouldDropPacket(isRecv bool) bool { } } -// SetEmulatedPacketDropPercent sets the percentage of emulated sent and received dropped packets, set to 0 to disable. +// SetEmulatedPacketDropPercent sets the percentage of emulated sent and received dropped packets func (server *Server) SetEmulatedPacketDropPercent(forRecv bool, percent int) { if forRecv { server.emuRecvPacketDropPercent = percent From 787a9dab071a588dae841339b7b20eb645fee42f Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Tue, 12 Sep 2023 17:37:58 +0200 Subject: [PATCH 12/34] Update server.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniel López Guimaraes <112760654+DaniElectra@users.noreply.github.com> --- server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server.go b/server.go index 7ea1edac..10b7244c 100644 --- a/server.go +++ b/server.go @@ -891,6 +891,7 @@ func (server *Server) SendRaw(conn *net.UDPAddr, data []byte) { // Emulate packet drop for debugging return } + _, err := server.Socket().WriteToUDP(data, conn) if err != nil { // TODO - Should this return the error too? From 6be87c5a98169e454ef6ed2dcd1ac14a97d4b9f0 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 12 Sep 2023 15:47:39 -0400 Subject: [PATCH 13/34] Added server->client packet retransmission --- client.go | 17 ++++- go.mod | 3 +- go.sum | 8 ++- mutex_map.go | 11 ++++ packet_interface.go | 1 + packet_resend_manager.go | 99 ++++++++++++++++++++++++++++ packet_v0.go | 42 ++++++------ packet_v1.go | 36 +++++------ server.go | 136 +++++++++++++++++++++++++++++++++------ 9 files changed, 290 insertions(+), 63 deletions(-) create mode 100644 packet_resend_manager.go diff --git a/client.go b/client.go index 48cee9e6..4590b9db 100644 --- a/client.go +++ b/client.go @@ -29,21 +29,34 @@ type Client struct { pingKickTimer *time.Timer connected bool incomingPacketManager *PacketManager + outgoingResendManager *PacketResendManager } // Reset resets the Client to default values func (client *Client) Reset() error { + server := client.Server() + client.sequenceIDIn = NewCounter(0) client.sequenceIDOutManager = NewSequenceIDManager() // TODO - Pass the server into here to get data for multiple substreams and the unreliable starting ID client.incomingPacketManager = NewPacketManager() - client.UpdateAccessKey(client.Server().AccessKey()) + if client.outgoingResendManager != nil { + // * PacketResendManager makes use of time.Ticker structs. + // * These create new channels and goroutines which won't + // * close even if the objects are deleted. To free up + // * resources, time.Ticker MUST be stopped before reassigning + client.outgoingResendManager.Clear() + } + + client.outgoingResendManager = NewPacketResendManager(server.resendTimeout, server.resendMaxIterations) + + client.UpdateAccessKey(server.AccessKey()) err := client.UpdateRC4Key([]byte("CD&ML")) if err != nil { return fmt.Errorf("Failed to update client RC4 key. %s", err.Error()) } - if client.Server().PRUDPVersion() == 0 { + if server.PRUDPVersion() == 0 { client.SetServerConnectionSignature(make([]byte, 4)) client.SetClientConnectionSignature(make([]byte, 4)) } else { diff --git a/go.mod b/go.mod index 46b78fb2..911601da 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( github.com/PretendoNetwork/plogger-go v1.0.4 github.com/superwhiskers/crunch/v3 v3.5.7 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/mod v0.12.0 ) @@ -13,6 +14,6 @@ require ( github.com/jwalton/go-supportscolor v1.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect - golang.org/x/sys v0.11.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/term v0.11.0 // indirect ) diff --git a/go.sum b/go.sum index 2c9138aa..ef5e34c6 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,7 @@ github.com/PretendoNetwork/plogger-go v1.0.4 h1:PF7xHw9eDRHH+RsAP9tmAE7fG0N0p6H4 github.com/PretendoNetwork/plogger-go v1.0.4/go.mod h1:7kD6M4vPq1JL4LTuPg6kuB1OvUBOwQOtAvTaUwMbwvU= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/jwalton/go-supportscolor v1.2.0 h1:g6Ha4u7Vm3LIsQ5wmeBpS4gazu0UP1DRDE8y6bre4H8= github.com/jwalton/go-supportscolor v1.2.0/go.mod h1:hFVUAZV2cWg+WFFC4v8pT2X/S2qUUBYMioBD9AINXGs= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -12,14 +12,16 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/superwhiskers/crunch/v3 v3.5.7 h1:N9RLxaR65C36i26BUIpzPXGy2f6pQ7wisu2bawbKNqg= github.com/superwhiskers/crunch/v3 v3.5.7/go.mod h1:4ub2EKgF1MAhTjoOCTU4b9uLMsAweHEa89aRrfAypXA= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= diff --git a/mutex_map.go b/mutex_map.go index 8a36866a..5568c8e2 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -42,6 +42,17 @@ func (m *MutexMap[K, V]) Size() int { return len(m.real) } +// Each runs a function for every item in the map +// the function takes in the items key and value +func (m *MutexMap[K, V]) Each(callback func(key K, value V)) { + m.RLock() + defer m.RUnlock() + + for key, value := range m.real { + callback(key, value) + } +} + // NewMutexMap returns a new instance of MutexMap with the provided key/value types func NewMutexMap[K comparable, V any]() *MutexMap[K, V] { return &MutexMap[K, V]{ diff --git a/packet_interface.go b/packet_interface.go index 0d02f2fa..71aaf5b9 100644 --- a/packet_interface.go +++ b/packet_interface.go @@ -2,6 +2,7 @@ package nex // PacketInterface implements all Packet methods type PacketInterface interface { + Data() []byte Sender() *Client SetVersion(version uint8) Version() uint8 diff --git a/packet_resend_manager.go b/packet_resend_manager.go new file mode 100644 index 00000000..b4d3d407 --- /dev/null +++ b/packet_resend_manager.go @@ -0,0 +1,99 @@ +package nex + +import ( + "time" +) + +// PendingPacket represents a packet which the server has sent but not received an ACK for +// it handles it's own retransmission on a per-packet timer +type PendingPacket struct { + ticker *time.Ticker + quit chan struct{} + packet PacketInterface + iterations *Counter + maxIterations int +} + +// BeginTimeoutTimer starts the pending packets timeout timer until it is either stopped or maxIterations is hit +func (p *PendingPacket) BeginTimeoutTimer() { + go func() { + for { + select { + case <-p.quit: + //fmt.Println("Stopped") + return + case <-p.ticker.C: + client := p.packet.Sender() + server := client.Server() + + if int(p.iterations.Increment()) > p.maxIterations { + // * Max iterations hit. Assume client is dead + server.TimeoutKick(client) + p.StopTimeoutTimer() + return + } else { + // * Resend the packet + server.SendRaw(client.Address(), p.packet.Bytes()) + } + } + } + }() +} + +// StopTimeoutTimer stops the packet retransmission timer +func (p *PendingPacket) StopTimeoutTimer() { + close(p.quit) + p.ticker.Stop() +} + +// NewPendingPacket returns a new PendingPacket +func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, maxIterations int) *PendingPacket { + p := &PendingPacket{ + ticker: time.NewTicker(timeoutTime), + quit: make(chan struct{}), + packet: packet, + iterations: NewCounter(0), + maxIterations: maxIterations, + } + + return p +} + +// PacketResendManager manages all the pending packets sent the client waiting to be ACKed +type PacketResendManager struct { + pending *MutexMap[uint16, *PendingPacket] + timeoutTime time.Duration + maxIterations int +} + +// Add creates a PendingPacket, adds it to the pool, and begins it's timeout timer +func (p *PacketResendManager) Add(packet PacketInterface) { + cached := NewPendingPacket(packet, p.timeoutTime, p.maxIterations) + p.pending.Set(packet.SequenceID(), cached) + + cached.BeginTimeoutTimer() +} + +// Remove removes a packet from pool and stops it's timer +func (p *PacketResendManager) Remove(sequenceID uint16) { + if cached, ok := p.pending.Get(sequenceID); ok { + cached.StopTimeoutTimer() + p.pending.Delete(sequenceID) + } +} + +// Clear removes all packets from pool and stops their timers +func (p *PacketResendManager) Clear() { + p.pending.Each(func(key uint16, value *PendingPacket) { + p.Remove(value.packet.SequenceID()) + }) +} + +// NewPacketResendManager returns a new PacketResendManager +func NewPacketResendManager(timeoutTime time.Duration, maxIterations int) *PacketResendManager { + return &PacketResendManager{ + pending: NewMutexMap[uint16, *PendingPacket](), + timeoutTime: timeoutTime, + maxIterations: maxIterations, + } +} diff --git a/packet_v0.go b/packet_v0.go index 6a906746..b19eff37 100644 --- a/packet_v0.go +++ b/packet_v0.go @@ -162,27 +162,27 @@ func (packet *PacketV0) DecryptPayload() error { // Bytes encodes the packet and returns a byte array func (packet *PacketV0) Bytes() []byte { - if packet.Type() == DataPacket { - - if packet.HasFlag(FlagAck) { - packet.SetPayload([]byte{}) - } else { - payload := packet.Payload() - - if payload != nil || len(payload) > 0 { - payloadSize := len(payload) - - encrypted := make([]byte, payloadSize) - packet.Sender().Cipher().XORKeyStream(encrypted, payload) - - packet.SetPayload(encrypted) - } - } - - if !packet.HasFlag(FlagHasSize) { - packet.AddFlag(FlagHasSize) - } - } + //if packet.Type() == DataPacket { + // + // if packet.HasFlag(FlagAck) { + // packet.SetPayload([]byte{}) + // } else { + // payload := packet.Payload() + // + // if payload != nil || len(payload) > 0 { + // payloadSize := len(payload) + // + // encrypted := make([]byte, payloadSize) + // packet.Sender().Cipher().XORKeyStream(encrypted, payload) + // + // packet.SetPayload(encrypted) + // } + // } + // + // if !packet.HasFlag(FlagHasSize) { + // packet.AddFlag(FlagHasSize) + // } + //} var typeFlags uint16 = packet.Type() | packet.Flags()<<4 diff --git a/packet_v1.go b/packet_v1.go index 2de4843a..77308284 100644 --- a/packet_v1.go +++ b/packet_v1.go @@ -231,24 +231,24 @@ func (packet *PacketV1) DecryptPayload() error { // Bytes encodes the packet and returns a byte array func (packet *PacketV1) Bytes() []byte { - if packet.Type() == DataPacket { - if !packet.HasFlag(FlagMultiAck) { - payload := packet.Payload() - - if payload != nil || len(payload) > 0 { - payloadSize := len(payload) - - encrypted := make([]byte, payloadSize) - packet.Sender().Cipher().XORKeyStream(encrypted, payload) - - packet.SetPayload(encrypted) - } - } - - if !packet.HasFlag(FlagHasSize) { - packet.AddFlag(FlagHasSize) - } - } + //if packet.Type() == DataPacket { + // if !packet.HasFlag(FlagMultiAck) { + // payload := packet.Payload() + // + // if payload != nil || len(payload) > 0 { + // payloadSize := len(payload) + // + // encrypted := make([]byte, payloadSize) + // packet.Sender().Cipher().XORKeyStream(encrypted, payload) + // + // packet.SetPayload(encrypted) + // } + // } + // + // if !packet.HasFlag(FlagHasSize) { + // packet.AddFlag(FlagHasSize) + // } + //} var typeFlags uint16 = packet.Type() | packet.Flags()<<4 diff --git a/server.go b/server.go index e313ec4d..2cad2984 100644 --- a/server.go +++ b/server.go @@ -17,6 +17,8 @@ import ( "runtime" "strconv" "time" + + "golang.org/x/exp/slices" ) // Server represents a PRUDP server @@ -29,12 +31,14 @@ type Server struct { hppEventHandles map[string][]func(*HPPPacket) hppClientResponses map[*Client](chan []byte) passwordFromPIDHandler func(pid uint32) (string, uint32) + useNewMultiACK bool accessKey string prudpVersion int prudpProtocolMinorVersion int supportedFunctions int fragmentSize int16 - resendTimeout float32 + resendTimeout time.Duration + resendMaxIterations int pingTimeout int kerberosPassword string kerberosKeySize int @@ -138,8 +142,8 @@ func (server *Server) handleSocketMessage() error { client.IncreasePingTimeoutTime(server.PingTimeout()) if packet.HasFlag(FlagAck) || packet.HasFlag(FlagMultiAck) { - // * Bail early - // TODO - Track this in a server->client packet manager, and do retransmission on our end + // TODO - Should this return an error? + server.handleAcknowledgement(packet) return nil } @@ -227,6 +231,51 @@ func (server *Server) processPacket(packet PacketInterface) error { return nil } +func (server *Server) handleAcknowledgement(packet PacketInterface) { + if packet.Version() == 0 { + // * PRUDPv0 does not have aggregate acknowledgement + packet.Sender().outgoingResendManager.Remove(packet.SequenceID()) + } else { + // TODO - Validate the aggregate packet is valid and can be processed + sequenceIDs := make([]uint16, 0) + stream := NewStreamIn(packet.Payload(), server) + var baseSequenceID uint16 + + // TODO - We should probably handle these errors lol + if server.useNewMultiACK { + _, _ = stream.ReadUInt8() // * Substream ID. NEX always uses 0 + additionalIDsCount, _ := stream.ReadUInt8() + baseSequenceID, _ = stream.ReadUInt16LE() + + for i := 0; i < int(additionalIDsCount); i++ { + additionalID, _ := stream.ReadUInt16LE() + sequenceIDs = append(sequenceIDs, additionalID) + } + } else { + baseSequenceID = packet.SequenceID() + + for remaining := stream.Remaining(); remaining != 0; { + additionalID, _ := stream.ReadUInt16LE() + sequenceIDs = append(sequenceIDs, additionalID) + remaining = stream.Remaining() + } + } + + // * MutexMap.Each locks the mutex, can't remove while reading + // * Have to just loop again + packet.Sender().outgoingResendManager.pending.Each(func(sequenceID uint16, pending *PendingPacket) { + if sequenceID <= baseSequenceID && !slices.Contains(sequenceIDs, sequenceID) { + sequenceIDs = append(sequenceIDs, sequenceID) + } + }) + + // * Actually remove the packets from the pool + for _, sequenceID := range sequenceIDs { + packet.Sender().outgoingResendManager.Remove(sequenceID) + } + } +} + // HPPListen starts a NEX HPP server on a given address func (server *Server) HPPListen(address string) { hppHandler := func(w http.ResponseWriter, req *http.Request) { @@ -439,6 +488,7 @@ func (server *Server) TimeoutKick(client *Client) { client.SetConnected(false) discriminator := client.Address().String() + client.outgoingResendManager.Clear() server.clients.Delete(discriminator) } @@ -474,6 +524,7 @@ func (server *Server) GracefulKick(client *Client) { client.StopTimeoutTimer() discriminator := client.Address().String() + client.outgoingResendManager.Clear() server.clients.Delete(discriminator) } @@ -515,6 +566,7 @@ func (server *Server) GracefulKickAll() { client.SetConnected(false) discriminator := client.Address().String() + client.outgoingResendManager.Clear() server.clients.Delete(discriminator) server.clients.RLock() @@ -648,6 +700,16 @@ func (server *Server) SetSocket(socket *net.UDPConn) { server.socket = socket } +// UseNewMultiACK checks if the server uses the new FLAG_MULTI_ACK encoding +func (server *Server) UseNewMultiACK() bool { + return server.useNewMultiACK +} + +// SetUseNewMultiACK sets whether the server uses the new FLAG_MULTI_ACK encoding +func (server *Server) SetUseNewMultiACK(useNewMultiACK bool) { + server.useNewMultiACK = useNewMultiACK +} + // PRUDPVersion returns the server PRUDP version func (server *Server) PRUDPVersion() int { return server.prudpVersion @@ -820,6 +882,16 @@ func (server *Server) SetFragmentSize(fragmentSize int16) { server.fragmentSize = fragmentSize } +// SetResendTimeout sets the time that a packet should wait before resending to the client +func (server *Server) SetResendTimeout(resendTimeout time.Duration) { + server.resendTimeout = resendTimeout +} + +// SetFragmentSize sets the max number of times a packet can try to resend before assuming the client is dead +func (server *Server) SetResendMaxIterations(resendMaxIterations int) { + server.resendMaxIterations = resendMaxIterations +} + // ConnectionIDCounter gets the server connection ID counter func (server *Server) ConnectionIDCounter() *Counter { return server.connectionIDCounter @@ -902,16 +974,42 @@ func (server *Server) Send(packet PacketInterface) { // SendFragment sends a packet fragment to the client func (server *Server) SendFragment(packet PacketInterface, fragmentID uint8) { - data := packet.Payload() client := packet.Sender() + payload := packet.Payload() + + if packet.Type() == DataPacket { + if packet.Version() == 0 && packet.HasFlag(FlagAck) { + // * v0 ACK payloads empty, ensure this + payload = []byte{} + } else if !packet.HasFlag(FlagMultiAck) { + if payload != nil || len(payload) > 0 { + payloadSize := len(payload) + + encrypted := make([]byte, payloadSize) + packet.Sender().Cipher().XORKeyStream(encrypted, payload) + + payload = encrypted + } + } + + // * Only add the HAS_SIZE flag if the payload exists + if !packet.HasFlag(FlagHasSize) && len(payload) > 0 { + packet.AddFlag(FlagHasSize) + } + } packet.SetFragmentID(fragmentID) - packet.SetPayload(data) + + packet.SetPayload(payload) packet.SetSequenceID(uint16(client.SequenceIDOutManager().Next(packet))) encodedPacket := packet.Bytes() server.SendRaw(client.Address(), encodedPacket) + + if (packet.HasFlag(FlagReliable) || packet.Type() == SynPacket) && packet.HasFlag(FlagNeedsAck) { + packet.Sender().outgoingResendManager.Add(packet) + } } // SendRaw writes raw packet data to the client socket @@ -948,19 +1046,21 @@ func (server *Server) SetEmulatedPacketDropPercent(forRecv bool, percent int) { // NewServer returns a new NEX server func NewServer() *Server { server := &Server{ - genericEventHandles: make(map[string][]func(PacketInterface)), - prudpV0EventHandles: make(map[string][]func(*PacketV0)), - prudpV1EventHandles: make(map[string][]func(*PacketV1)), - hppEventHandles: make(map[string][]func(*HPPPacket)), - hppClientResponses: make(map[*Client](chan []byte)), - clients: NewMutexMap[string, *Client](), - prudpVersion: 1, - fragmentSize: 1300, - resendTimeout: 1.5, - pingTimeout: 5, - kerberosKeySize: 32, - kerberosKeyDerivation: 0, - connectionIDCounter: NewCounter(10), + genericEventHandles: make(map[string][]func(PacketInterface)), + prudpV0EventHandles: make(map[string][]func(*PacketV0)), + prudpV1EventHandles: make(map[string][]func(*PacketV1)), + hppEventHandles: make(map[string][]func(*HPPPacket)), + hppClientResponses: make(map[*Client](chan []byte)), + clients: NewMutexMap[string, *Client](), + useNewMultiACK: false, + prudpVersion: 1, + fragmentSize: 1300, + resendTimeout: time.Second * 2, + resendMaxIterations: 5, + pingTimeout: 5, + kerberosKeySize: 32, + kerberosKeyDerivation: 0, + connectionIDCounter: NewCounter(10), emuSendPacketDropPercent: 0, emuRecvPacketDropPercent: 0, } From d345357f79e5abe1d47a2b48795359e03b9739aa Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 12 Sep 2023 15:49:58 -0400 Subject: [PATCH 14/34] Removed comments from packet Bytes methods --- packet_v0.go | 22 ---------------------- packet_v1.go | 19 ------------------- 2 files changed, 41 deletions(-) diff --git a/packet_v0.go b/packet_v0.go index b19eff37..3161d459 100644 --- a/packet_v0.go +++ b/packet_v0.go @@ -162,28 +162,6 @@ func (packet *PacketV0) DecryptPayload() error { // Bytes encodes the packet and returns a byte array func (packet *PacketV0) Bytes() []byte { - //if packet.Type() == DataPacket { - // - // if packet.HasFlag(FlagAck) { - // packet.SetPayload([]byte{}) - // } else { - // payload := packet.Payload() - // - // if payload != nil || len(payload) > 0 { - // payloadSize := len(payload) - // - // encrypted := make([]byte, payloadSize) - // packet.Sender().Cipher().XORKeyStream(encrypted, payload) - // - // packet.SetPayload(encrypted) - // } - // } - // - // if !packet.HasFlag(FlagHasSize) { - // packet.AddFlag(FlagHasSize) - // } - //} - var typeFlags uint16 = packet.Type() | packet.Flags()<<4 stream := NewStreamOut(packet.Sender().Server()) diff --git a/packet_v1.go b/packet_v1.go index 77308284..4fd505b6 100644 --- a/packet_v1.go +++ b/packet_v1.go @@ -231,25 +231,6 @@ func (packet *PacketV1) DecryptPayload() error { // Bytes encodes the packet and returns a byte array func (packet *PacketV1) Bytes() []byte { - //if packet.Type() == DataPacket { - // if !packet.HasFlag(FlagMultiAck) { - // payload := packet.Payload() - // - // if payload != nil || len(payload) > 0 { - // payloadSize := len(payload) - // - // encrypted := make([]byte, payloadSize) - // packet.Sender().Cipher().XORKeyStream(encrypted, payload) - // - // packet.SetPayload(encrypted) - // } - // } - // - // if !packet.HasFlag(FlagHasSize) { - // packet.AddFlag(FlagHasSize) - // } - //} - var typeFlags uint16 = packet.Type() | packet.Flags()<<4 stream := NewStreamOut(packet.Sender().Server()) From 3797b40f54dc218efea46bb105b563ec70b2eed7 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 12 Sep 2023 15:56:12 -0400 Subject: [PATCH 15/34] Updated SetResendMaxIterations Godoc comment --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 2cad2984..5c998c08 100644 --- a/server.go +++ b/server.go @@ -887,7 +887,7 @@ func (server *Server) SetResendTimeout(resendTimeout time.Duration) { server.resendTimeout = resendTimeout } -// SetFragmentSize sets the max number of times a packet can try to resend before assuming the client is dead +// SetResendMaxIterations sets the max number of times a packet can try to resend before assuming the client is dead func (server *Server) SetResendMaxIterations(resendMaxIterations int) { server.resendMaxIterations = resendMaxIterations } From a4757b43db65af7109c5e1b97c0a55dc18cdc5c2 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 12 Sep 2023 16:38:17 -0400 Subject: [PATCH 16/34] Added debug logs to PacketResendManager --- packet_resend_manager.go | 4 ++++ server.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/packet_resend_manager.go b/packet_resend_manager.go index b4d3d407..56a17ba0 100644 --- a/packet_resend_manager.go +++ b/packet_resend_manager.go @@ -1,6 +1,7 @@ package nex import ( + "fmt" "time" ) @@ -28,11 +29,13 @@ func (p *PendingPacket) BeginTimeoutTimer() { if int(p.iterations.Increment()) > p.maxIterations { // * Max iterations hit. Assume client is dead + fmt.Println("Max iterations hit. Assume client is dead") server.TimeoutKick(client) p.StopTimeoutTimer() return } else { // * Resend the packet + fmt.Printf("Time out on port %d, resend packet sequenceID %d. Iteration %d\n", p.packet.Sender().Server().port, p.packet.SequenceID(), p.iterations.Value()) server.SendRaw(client.Address(), p.packet.Bytes()) } } @@ -42,6 +45,7 @@ func (p *PendingPacket) BeginTimeoutTimer() { // StopTimeoutTimer stops the packet retransmission timer func (p *PendingPacket) StopTimeoutTimer() { + fmt.Printf("Stopping timer, packet %d was ACKED on port %d\n", p.packet.SequenceID(), p.packet.Sender().Server().port) close(p.quit) p.ticker.Stop() } diff --git a/server.go b/server.go index 5c998c08..fb0b8e6f 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,7 @@ type Server struct { natTraversalProtocolVersion *NEXVersion emuSendPacketDropPercent int emuRecvPacketDropPercent int + port int } // Listen starts a NEX server on a given address @@ -66,6 +67,9 @@ func (server *Server) Listen(address string) { panic(err) } + // * For debugging the resend manager + server.port = udpAddress.Port + socket, err := net.ListenUDP(protocol, udpAddress) if err != nil { panic(err) From 1fc69bc8e30c07ec32c72786216055df571086af Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 12 Sep 2023 17:03:22 -0400 Subject: [PATCH 17/34] Removed multi-ack version set. Check PRUDP version --- server.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/server.go b/server.go index fb0b8e6f..43af4979 100644 --- a/server.go +++ b/server.go @@ -31,7 +31,6 @@ type Server struct { hppEventHandles map[string][]func(*HPPPacket) hppClientResponses map[*Client](chan []byte) passwordFromPIDHandler func(pid uint32) (string, uint32) - useNewMultiACK bool accessKey string prudpVersion int prudpProtocolMinorVersion int @@ -237,7 +236,6 @@ func (server *Server) processPacket(packet PacketInterface) error { func (server *Server) handleAcknowledgement(packet PacketInterface) { if packet.Version() == 0 { - // * PRUDPv0 does not have aggregate acknowledgement packet.Sender().outgoingResendManager.Remove(packet.SequenceID()) } else { // TODO - Validate the aggregate packet is valid and can be processed @@ -246,7 +244,7 @@ func (server *Server) handleAcknowledgement(packet PacketInterface) { var baseSequenceID uint16 // TODO - We should probably handle these errors lol - if server.useNewMultiACK { + if server.PRUDPProtocolMinorVersion() >= 2 { _, _ = stream.ReadUInt8() // * Substream ID. NEX always uses 0 additionalIDsCount, _ := stream.ReadUInt8() baseSequenceID, _ = stream.ReadUInt16LE() @@ -704,16 +702,6 @@ func (server *Server) SetSocket(socket *net.UDPConn) { server.socket = socket } -// UseNewMultiACK checks if the server uses the new FLAG_MULTI_ACK encoding -func (server *Server) UseNewMultiACK() bool { - return server.useNewMultiACK -} - -// SetUseNewMultiACK sets whether the server uses the new FLAG_MULTI_ACK encoding -func (server *Server) SetUseNewMultiACK(useNewMultiACK bool) { - server.useNewMultiACK = useNewMultiACK -} - // PRUDPVersion returns the server PRUDP version func (server *Server) PRUDPVersion() int { return server.prudpVersion @@ -1056,7 +1044,6 @@ func NewServer() *Server { hppEventHandles: make(map[string][]func(*HPPPacket)), hppClientResponses: make(map[*Client](chan []byte)), clients: NewMutexMap[string, *Client](), - useNewMultiACK: false, prudpVersion: 1, fragmentSize: 1300, resendTimeout: time.Second * 2, From 5cadb763595abdf24f6817d176efd01f3281d0a1 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 12 Sep 2023 17:04:05 -0400 Subject: [PATCH 18/34] Only do multi-ack handling if packet is multi-ack --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 43af4979..1c3c44ad 100644 --- a/server.go +++ b/server.go @@ -235,7 +235,7 @@ func (server *Server) processPacket(packet PacketInterface) error { } func (server *Server) handleAcknowledgement(packet PacketInterface) { - if packet.Version() == 0 { + if packet.Version() == 0 || (packet.HasFlag(FlagAck) && !packet.HasFlag(FlagMultiAck)) { packet.Sender().outgoingResendManager.Remove(packet.SequenceID()) } else { // TODO - Validate the aggregate packet is valid and can be processed From 15fae560cfeb6fe3d1f44301fce87bfc4ed99bfe Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 12 Sep 2023 17:23:03 -0400 Subject: [PATCH 19/34] Lower resend time to one second --- server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 1c3c44ad..3be13f08 100644 --- a/server.go +++ b/server.go @@ -222,6 +222,7 @@ func (server *Server) processPacket(packet PacketInterface) error { case DataPacket: server.Emit("Data", packet) case DisconnectPacket: + fmt.Println("Got a DisconnectPacket") server.Emit("Disconnect", packet) server.GracefulKick(client) case PingPacket: @@ -1046,7 +1047,7 @@ func NewServer() *Server { clients: NewMutexMap[string, *Client](), prudpVersion: 1, fragmentSize: 1300, - resendTimeout: time.Second * 2, + resendTimeout: time.Second, resendMaxIterations: 5, pingTimeout: 5, kerberosKeySize: 32, From c816953698447475a751c87ffefe58cee68e0b63 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Thu, 14 Sep 2023 19:13:47 +0200 Subject: [PATCH 20/34] Always ACK incoming packets & separate pings from packet manager --- packet_manager.go | 4 ++++ server.go | 61 +++++++++++++++++++++++++++++------------------ 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/packet_manager.go b/packet_manager.go index ab49806c..0c31a36f 100644 --- a/packet_manager.go +++ b/packet_manager.go @@ -27,6 +27,10 @@ func (p *PacketManager) Push(packet PacketInterface) { p.packets = append(p.packets, packet) } +func (p *PacketManager) Increment() { + p.currentSequenceID.Increment() +} + // RemoveByIndex removes a packet from the pool using it's index in the slice func (p *PacketManager) RemoveByIndex(i int) { // * https://stackoverflow.com/a/37335777 diff --git a/server.go b/server.go index 3be13f08..77947b8e 100644 --- a/server.go +++ b/server.go @@ -150,22 +150,48 @@ func (server *Server) handleSocketMessage() error { return nil } - // TODO - Make a better API in client to access incomingPacketManager? - client.incomingPacketManager.Push(packet) - - // TODO - Make this API smarter. Only track missing packets and not all packets? - // * Keep processing packets so long as the next one is in the pool, - // * this way if several packets came in out of order they all get - // * processed at once the moment the correct next packet comes in - for next := client.incomingPacketManager.Next(); next != nil; { - err := server.processPacket(next) + if packet.HasFlag(FlagNeedsAck) { + if packet.Type() != ConnectPacket || (packet.Type() == ConnectPacket && len(packet.Payload()) <= 0) { + go server.AcknowledgePacket(packet, nil) + } + + if packet.Type() == DisconnectPacket { + go server.AcknowledgePacket(packet, nil) + go server.AcknowledgePacket(packet, nil) + } + } + + switch packet.Type() { + case DataPacket: + + // TODO - Make a better API in client to access incomingPacketManager? + client.incomingPacketManager.Push(packet) + + // TODO - Make this API smarter. Only track missing packets and not all packets? + // * Keep processing packets so long as the next one is in the pool, + // * this way if several packets came in out of order they all get + // * processed at once the moment the correct next packet comes in + for next := client.incomingPacketManager.Next(); next != nil; { + err := server.processPacket(next) + if err != nil { + // TODO - Should this return the error too? + logger.Error(err.Error()) + return nil + } + + next = client.incomingPacketManager.Next() + } + default: + if packet.Type() != PingPacket { + client.incomingPacketManager.Increment() + } + + err := server.processPacket(packet) if err != nil { // TODO - Should this return the error too? logger.Error(err.Error()) return nil } - - next = client.incomingPacketManager.Next() } return nil @@ -183,17 +209,6 @@ func (server *Server) processPacket(packet PacketInterface) error { return nil } - if packet.HasFlag(FlagNeedsAck) { - if packet.Type() != ConnectPacket || (packet.Type() == ConnectPacket && len(packet.Payload()) <= 0) { - go server.AcknowledgePacket(packet, nil) - } - - if packet.Type() == DisconnectPacket { - go server.AcknowledgePacket(packet, nil) - go server.AcknowledgePacket(packet, nil) - } - } - switch packet.Type() { case SynPacket: // * PID should always be 0 when a fresh connection is made @@ -1048,7 +1063,7 @@ func NewServer() *Server { prudpVersion: 1, fragmentSize: 1300, resendTimeout: time.Second, - resendMaxIterations: 5, + resendMaxIterations: 15, pingTimeout: 5, kerberosKeySize: 32, kerberosKeyDerivation: 0, From 91324b9c09ee2ce1462a0399f064559ed43cb320 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Thu, 14 Sep 2023 19:15:40 +0200 Subject: [PATCH 21/34] Remove debug logs --- packet_resend_manager.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packet_resend_manager.go b/packet_resend_manager.go index 56a17ba0..b4d3d407 100644 --- a/packet_resend_manager.go +++ b/packet_resend_manager.go @@ -1,7 +1,6 @@ package nex import ( - "fmt" "time" ) @@ -29,13 +28,11 @@ func (p *PendingPacket) BeginTimeoutTimer() { if int(p.iterations.Increment()) > p.maxIterations { // * Max iterations hit. Assume client is dead - fmt.Println("Max iterations hit. Assume client is dead") server.TimeoutKick(client) p.StopTimeoutTimer() return } else { // * Resend the packet - fmt.Printf("Time out on port %d, resend packet sequenceID %d. Iteration %d\n", p.packet.Sender().Server().port, p.packet.SequenceID(), p.iterations.Value()) server.SendRaw(client.Address(), p.packet.Bytes()) } } @@ -45,7 +42,6 @@ func (p *PendingPacket) BeginTimeoutTimer() { // StopTimeoutTimer stops the packet retransmission timer func (p *PendingPacket) StopTimeoutTimer() { - fmt.Printf("Stopping timer, packet %d was ACKED on port %d\n", p.packet.SequenceID(), p.packet.Sender().Server().port) close(p.quit) p.ticker.Stop() } From 0ad66556c9bc2e429937105841058486c33c569f Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Thu, 14 Sep 2023 22:32:48 +0200 Subject: [PATCH 22/34] Apply fixes --- packet_manager.go | 4 ---- server.go | 23 +++++++++-------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/packet_manager.go b/packet_manager.go index 0c31a36f..ab49806c 100644 --- a/packet_manager.go +++ b/packet_manager.go @@ -27,10 +27,6 @@ func (p *PacketManager) Push(packet PacketInterface) { p.packets = append(p.packets, packet) } -func (p *PacketManager) Increment() { - p.currentSequenceID.Increment() -} - // RemoveByIndex removes a packet from the pool using it's index in the slice func (p *PacketManager) RemoveByIndex(i int) { // * https://stackoverflow.com/a/37335777 diff --git a/server.go b/server.go index 77947b8e..228c6f2b 100644 --- a/server.go +++ b/server.go @@ -162,8 +162,14 @@ func (server *Server) handleSocketMessage() error { } switch packet.Type() { - case DataPacket: - + case PingPacket: + err := server.processPacket(packet) + if err != nil { + // TODO - Should this return the error too? + logger.Error(err.Error()) + return nil + } + default: // TODO - Make a better API in client to access incomingPacketManager? client.incomingPacketManager.Push(packet) @@ -181,17 +187,6 @@ func (server *Server) handleSocketMessage() error { next = client.incomingPacketManager.Next() } - default: - if packet.Type() != PingPacket { - client.incomingPacketManager.Increment() - } - - err := server.processPacket(packet) - if err != nil { - // TODO - Should this return the error too? - logger.Error(err.Error()) - return nil - } } return nil @@ -1063,7 +1058,7 @@ func NewServer() *Server { prudpVersion: 1, fragmentSize: 1300, resendTimeout: time.Second, - resendMaxIterations: 15, + resendMaxIterations: 5, pingTimeout: 5, kerberosKeySize: 32, kerberosKeyDerivation: 0, From 597496de27f91a021607e11034645872f84e0fa6 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Fri, 15 Sep 2023 15:07:21 +0200 Subject: [PATCH 23/34] Use RLock/RUnlock in mutex map set and delete --- mutex_map.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mutex_map.go b/mutex_map.go index 5568c8e2..0c328d2c 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -10,8 +10,8 @@ type MutexMap[K comparable, V any] struct { // Set sets a key to a given value func (m *MutexMap[K, V]) Set(key K, value V) { - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() m.real[key] = value } @@ -28,8 +28,8 @@ func (m *MutexMap[K, V]) Get(key K) (V, bool) { // Delete removes a key from the internal map func (m *MutexMap[K, V]) Delete(key K) { - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() delete(m.real, key) } From a5810b2e854cabb419aefd63f465b6f05cce3603 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Fri, 15 Sep 2023 15:35:35 +0200 Subject: [PATCH 24/34] Actually fix the deadlock --- mutex_map.go | 20 ++++++++++++++++---- packet_resend_manager.go | 4 ++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/mutex_map.go b/mutex_map.go index 0c328d2c..e05b38e9 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -10,8 +10,8 @@ type MutexMap[K comparable, V any] struct { // Set sets a key to a given value func (m *MutexMap[K, V]) Set(key K, value V) { - m.RLock() - defer m.RUnlock() + m.Lock() + defer m.Unlock() m.real[key] = value } @@ -28,8 +28,8 @@ func (m *MutexMap[K, V]) Get(key K) (V, bool) { // Delete removes a key from the internal map func (m *MutexMap[K, V]) Delete(key K) { - m.RLock() - defer m.RUnlock() + m.Lock() + defer m.Unlock() delete(m.real, key) } @@ -53,6 +53,18 @@ func (m *MutexMap[K, V]) Each(callback func(key K, value V)) { } } +// Clear clears the map and runs a function for every item removed +// the function takes in the items key and value +func (m *MutexMap[K, V]) Clear(callback func (key K, value V)) { + m.Lock() + defer m.Unlock() + + for key, value := range m.real { + callback(key, value) + delete(m.real, key) + } +} + // NewMutexMap returns a new instance of MutexMap with the provided key/value types func NewMutexMap[K comparable, V any]() *MutexMap[K, V] { return &MutexMap[K, V]{ diff --git a/packet_resend_manager.go b/packet_resend_manager.go index b4d3d407..5280df55 100644 --- a/packet_resend_manager.go +++ b/packet_resend_manager.go @@ -84,8 +84,8 @@ func (p *PacketResendManager) Remove(sequenceID uint16) { // Clear removes all packets from pool and stops their timers func (p *PacketResendManager) Clear() { - p.pending.Each(func(key uint16, value *PendingPacket) { - p.Remove(value.packet.SequenceID()) + p.pending.Clear(func(key uint16, value *PendingPacket) { + value.StopTimeoutTimer() }) } From d08058f95fa466137047499fa987452ee7891539 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Fri, 15 Sep 2023 17:28:03 +0200 Subject: [PATCH 25/34] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jonathan Barrow Co-authored-by: Daniel López Guimaraes <112760654+DaniElectra@users.noreply.github.com> --- mutex_map.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/mutex_map.go b/mutex_map.go index e05b38e9..0170498c 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -53,14 +53,16 @@ func (m *MutexMap[K, V]) Each(callback func(key K, value V)) { } } -// Clear clears the map and runs a function for every item removed -// the function takes in the items key and value -func (m *MutexMap[K, V]) Clear(callback func (key K, value V)) { +// Clear removes all items from the `real` map +// Accepts an optional callback function ran for every item before it is deleted +func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) { m.Lock() defer m.Unlock() for key, value := range m.real { - callback(key, value) + if callback != nil { + callback(key, value) + } delete(m.real, key) } } From 1e5d8f9fd7b00a8b3f1116a4f20962a8e1337b5d Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Fri, 15 Sep 2023 17:30:17 +0200 Subject: [PATCH 26/34] Update Each godoc --- mutex_map.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mutex_map.go b/mutex_map.go index 0170498c..cd8e29e6 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -42,8 +42,9 @@ func (m *MutexMap[K, V]) Size() int { return len(m.real) } -// Each runs a function for every item in the map -// the function takes in the items key and value +// Each runs a callback function for every item in the map +// the callback function takes in the items key and value +// the map should not be modified inside the callback function func (m *MutexMap[K, V]) Each(callback func(key K, value V)) { m.RLock() defer m.RUnlock() From 0fbcb7d1bedce48c584737b5160edfdb1b4a1afd Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Fri, 15 Sep 2023 17:36:06 +0200 Subject: [PATCH 27/34] Update mutex_map.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniel López Guimaraes <112760654+DaniElectra@users.noreply.github.com> --- mutex_map.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mutex_map.go b/mutex_map.go index cd8e29e6..4a91d345 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -43,8 +43,7 @@ func (m *MutexMap[K, V]) Size() int { } // Each runs a callback function for every item in the map -// the callback function takes in the items key and value -// the map should not be modified inside the callback function +// The map should not be modified inside the callback function func (m *MutexMap[K, V]) Each(callback func(key K, value V)) { m.RLock() defer m.RUnlock() From 46857e0637cfafb5cc39a3f1816fc4149e2ddc47 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Fri, 15 Sep 2023 18:04:42 +0200 Subject: [PATCH 28/34] Prevent double StopTimeoutTimer --- packet_resend_manager.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packet_resend_manager.go b/packet_resend_manager.go index 5280df55..cf620f8e 100644 --- a/packet_resend_manager.go +++ b/packet_resend_manager.go @@ -7,6 +7,7 @@ import ( // PendingPacket represents a packet which the server has sent but not received an ACK for // it handles it's own retransmission on a per-packet timer type PendingPacket struct { + ticking bool ticker *time.Ticker quit chan struct{} packet PacketInterface @@ -42,13 +43,17 @@ func (p *PendingPacket) BeginTimeoutTimer() { // StopTimeoutTimer stops the packet retransmission timer func (p *PendingPacket) StopTimeoutTimer() { - close(p.quit) - p.ticker.Stop() + if p.ticking { + close(p.quit) + p.ticker.Stop() + p.ticking = false + } } // NewPendingPacket returns a new PendingPacket func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, maxIterations int) *PendingPacket { p := &PendingPacket{ + ticking: true, ticker: time.NewTicker(timeoutTime), quit: make(chan struct{}), packet: packet, From e6cad4188663e1ee9ce76a4713a84b11225549b4 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Mon, 18 Sep 2023 19:05:40 +0200 Subject: [PATCH 29/34] Add timeout increment to packet resend manager --- client.go | 2 +- packet_resend_manager.go | 16 +++++++++++++--- server.go | 7 +++++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 4590b9db..9818db6a 100644 --- a/client.go +++ b/client.go @@ -48,7 +48,7 @@ func (client *Client) Reset() error { client.outgoingResendManager.Clear() } - client.outgoingResendManager = NewPacketResendManager(server.resendTimeout, server.resendMaxIterations) + client.outgoingResendManager = NewPacketResendManager(server.resendTimeout, server.resendTimeoutIncrement, server.resendMaxIterations) client.UpdateAccessKey(server.AccessKey()) err := client.UpdateRC4Key([]byte("CD&ML")) diff --git a/packet_resend_manager.go b/packet_resend_manager.go index cf620f8e..f40799c7 100644 --- a/packet_resend_manager.go +++ b/packet_resend_manager.go @@ -12,6 +12,8 @@ type PendingPacket struct { quit chan struct{} packet PacketInterface iterations *Counter + timeout time.Duration + timeoutInc time.Duration maxIterations int } @@ -33,6 +35,10 @@ func (p *PendingPacket) BeginTimeoutTimer() { p.StopTimeoutTimer() return } else { + if p.timeoutInc != 0 { + p.timeout += p.timeoutInc + p.ticker = time.NewTicker(p.timeout) + } // * Resend the packet server.SendRaw(client.Address(), p.packet.Bytes()) } @@ -51,13 +57,15 @@ func (p *PendingPacket) StopTimeoutTimer() { } // NewPendingPacket returns a new PendingPacket -func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, maxIterations int) *PendingPacket { +func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, timeoutIncrement time.Duration, maxIterations int) *PendingPacket { p := &PendingPacket{ ticking: true, ticker: time.NewTicker(timeoutTime), quit: make(chan struct{}), packet: packet, iterations: NewCounter(0), + timeout: timeoutTime, + timeoutInc: timeoutIncrement, maxIterations: maxIterations, } @@ -68,12 +76,13 @@ func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, maxIter type PacketResendManager struct { pending *MutexMap[uint16, *PendingPacket] timeoutTime time.Duration + timeoutInc time.Duration maxIterations int } // Add creates a PendingPacket, adds it to the pool, and begins it's timeout timer func (p *PacketResendManager) Add(packet PacketInterface) { - cached := NewPendingPacket(packet, p.timeoutTime, p.maxIterations) + cached := NewPendingPacket(packet, p.timeoutTime, p.timeoutInc, p.maxIterations) p.pending.Set(packet.SequenceID(), cached) cached.BeginTimeoutTimer() @@ -95,10 +104,11 @@ func (p *PacketResendManager) Clear() { } // NewPacketResendManager returns a new PacketResendManager -func NewPacketResendManager(timeoutTime time.Duration, maxIterations int) *PacketResendManager { +func NewPacketResendManager(timeoutTime time.Duration, timeoutIncrement time.Duration, maxIterations int) *PacketResendManager { return &PacketResendManager{ pending: NewMutexMap[uint16, *PendingPacket](), timeoutTime: timeoutTime, + timeoutInc: timeoutIncrement, maxIterations: maxIterations, } } diff --git a/server.go b/server.go index 228c6f2b..7a43f8e3 100644 --- a/server.go +++ b/server.go @@ -37,6 +37,7 @@ type Server struct { supportedFunctions int fragmentSize int16 resendTimeout time.Duration + resendTimeoutIncrement time.Duration resendMaxIterations int pingTimeout int kerberosPassword string @@ -890,6 +891,11 @@ func (server *Server) SetResendTimeout(resendTimeout time.Duration) { server.resendTimeout = resendTimeout } +// SetResendTimeoutIncrement sets how much to increment the resendTimeout every time a packet is resent to the client +func (server *Server) SetResendTimeoutIncrement(resendTimeoutIncrement time.Duration) { + server.resendTimeoutIncrement = resendTimeoutIncrement +} + // SetResendMaxIterations sets the max number of times a packet can try to resend before assuming the client is dead func (server *Server) SetResendMaxIterations(resendMaxIterations int) { server.resendMaxIterations = resendMaxIterations @@ -1058,6 +1064,7 @@ func NewServer() *Server { prudpVersion: 1, fragmentSize: 1300, resendTimeout: time.Second, + resendTimeoutIncrement: 0, resendMaxIterations: 5, pingTimeout: 5, kerberosKeySize: 32, From ea3bb1aef74ab8cc5fff64fc60ece1d79725de4b Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Mon, 18 Sep 2023 19:32:28 +0200 Subject: [PATCH 30/34] Fix tab --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 7a43f8e3..4e3b598d 100644 --- a/server.go +++ b/server.go @@ -37,7 +37,7 @@ type Server struct { supportedFunctions int fragmentSize int16 resendTimeout time.Duration - resendTimeoutIncrement time.Duration + resendTimeoutIncrement time.Duration resendMaxIterations int pingTimeout int kerberosPassword string From a5eaf92bb2cc573f1b7beabf82faa1ebe40df0dd Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Mon, 18 Sep 2023 22:40:58 +0200 Subject: [PATCH 31/34] Apply suggestions --- packet_resend_manager.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packet_resend_manager.go b/packet_resend_manager.go index f40799c7..d47fe45e 100644 --- a/packet_resend_manager.go +++ b/packet_resend_manager.go @@ -37,8 +37,9 @@ func (p *PendingPacket) BeginTimeoutTimer() { } else { if p.timeoutInc != 0 { p.timeout += p.timeoutInc - p.ticker = time.NewTicker(p.timeout) + p.ticker.Reset(p.timeout) } + // * Resend the packet server.SendRaw(client.Address(), p.packet.Bytes()) } @@ -59,7 +60,7 @@ func (p *PendingPacket) StopTimeoutTimer() { // NewPendingPacket returns a new PendingPacket func NewPendingPacket(packet PacketInterface, timeoutTime time.Duration, timeoutIncrement time.Duration, maxIterations int) *PendingPacket { p := &PendingPacket{ - ticking: true, + ticking: true, ticker: time.NewTicker(timeoutTime), quit: make(chan struct{}), packet: packet, From 8566ab2e89fd6b101942c2fc7090c8eff8b25305 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20L=C3=B3pez=20Guimaraes?= Date: Mon, 25 Sep 2023 16:17:59 +0100 Subject: [PATCH 32/34] Remove disconnect packet log --- server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server.go b/server.go index 4e3b598d..a84f96b5 100644 --- a/server.go +++ b/server.go @@ -233,7 +233,6 @@ func (server *Server) processPacket(packet PacketInterface) error { case DataPacket: server.Emit("Data", packet) case DisconnectPacket: - fmt.Println("Got a DisconnectPacket") server.Emit("Disconnect", packet) server.GracefulKick(client) case PingPacket: From 38a6143269ebb639b95f37c98bb92383120afc50 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 26 Sep 2023 14:48:13 -0400 Subject: [PATCH 33/34] Removed unused debug stuff --- packet_resend_manager.go | 1 - server.go | 6 +----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packet_resend_manager.go b/packet_resend_manager.go index d47fe45e..47b7028c 100644 --- a/packet_resend_manager.go +++ b/packet_resend_manager.go @@ -23,7 +23,6 @@ func (p *PendingPacket) BeginTimeoutTimer() { for { select { case <-p.quit: - //fmt.Println("Stopped") return case <-p.ticker.C: client := p.packet.Sender() diff --git a/server.go b/server.go index a84f96b5..3d3e138e 100644 --- a/server.go +++ b/server.go @@ -55,7 +55,6 @@ type Server struct { natTraversalProtocolVersion *NEXVersion emuSendPacketDropPercent int emuRecvPacketDropPercent int - port int } // Listen starts a NEX server on a given address @@ -67,9 +66,6 @@ func (server *Server) Listen(address string) { panic(err) } - // * For debugging the resend manager - server.port = udpAddress.Port - socket, err := net.ListenUDP(protocol, udpAddress) if err != nil { panic(err) @@ -890,7 +886,7 @@ func (server *Server) SetResendTimeout(resendTimeout time.Duration) { server.resendTimeout = resendTimeout } -// SetResendTimeoutIncrement sets how much to increment the resendTimeout every time a packet is resent to the client +// SetResendTimeoutIncrement sets how much to increment the resendTimeout every time a packet is resent to the client func (server *Server) SetResendTimeoutIncrement(resendTimeoutIncrement time.Duration) { server.resendTimeoutIncrement = resendTimeoutIncrement } From aa2922193a7fb0d14f183ffa8d2d92e6005d80f7 Mon Sep 17 00:00:00 2001 From: Jonathan Barrow Date: Tue, 26 Sep 2023 15:15:00 -0400 Subject: [PATCH 34/34] fixed comments for Remaining and ReadRemaining --- stream_in.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream_in.go b/stream_in.go index 5ce718e9..795d0e87 100644 --- a/stream_in.go +++ b/stream_in.go @@ -15,12 +15,12 @@ type StreamIn struct { Server *Server } -// Remaining reads a bool +// Remaining returns the amount of data left to be read in the buffer func (stream *StreamIn) Remaining() int { return len(stream.Bytes()[stream.ByteOffset():]) } -// ReadRemaining reads a bool +// ReadRemaining reads all the data left to be read in the buffer func (stream *StreamIn) ReadRemaining() []byte { // TODO - Should we do a bounds check here? Or just allow empty slices? return stream.ReadBytesNext(int64(stream.Remaining()))