From 9398cae230170990b5b048f63a22653a714e30ee Mon Sep 17 00:00:00 2001 From: Neil Date: Tue, 19 Nov 2024 08:42:27 +0000 Subject: [PATCH] Expose download/upload rate per peer (#1206) --- cmd/yggdrasilctl/main.go | 12 ++++++++++-- src/admin/admin.go | 16 +++++++++------- src/admin/getpeers.go | 4 ++++ src/core/api.go | 4 ++++ src/core/link.go | 37 ++++++++++++++++++++++++++++++++++--- 5 files changed, 61 insertions(+), 12 deletions(-) diff --git a/cmd/yggdrasilctl/main.go b/cmd/yggdrasilctl/main.go index 8a30f438f..b73708e6f 100644 --- a/cmd/yggdrasilctl/main.go +++ b/cmd/yggdrasilctl/main.go @@ -174,9 +174,9 @@ func run() int { if err := json.Unmarshal(recv.Response, &resp); err != nil { panic(err) } - table.SetHeader([]string{"URI", "State", "Dir", "IP Address", "Uptime", "RTT", "RX", "TX", "Pr", "Cost", "Last Error"}) + table.SetHeader([]string{"URI", "State", "Dir", "IP Address", "Uptime", "RTT", "RX", "TX", "Down", "Up", "Pr", "Cost", "Last Error"}) for _, peer := range resp.Peers { - state, lasterr, dir, rtt := "Up", "-", "Out", "-" + state, lasterr, dir, rtt, rxr, txr := "Up", "-", "Out", "-", "-", "-" if !peer.Up { state, lasterr = "Down", fmt.Sprintf("%s ago: %s", peer.LastErrorTime.Round(time.Second), peer.LastError) } else if rttms := float64(peer.Latency.Microseconds()) / 1000; rttms > 0 { @@ -190,6 +190,12 @@ func run() int { uri.RawQuery = "" uristring = uri.String() } + if peer.RXRate > 0 { + rxr = peer.RXRate.String() + "/s" + } + if peer.TXRate > 0 { + txr = peer.TXRate.String() + "/s" + } table.Append([]string{ uristring, state, @@ -199,6 +205,8 @@ func run() int { rtt, peer.RXBytes.String(), peer.TXBytes.String(), + rxr, + txr, fmt.Sprintf("%d", peer.Priority), fmt.Sprintf("%d", peer.Cost), lasterr, diff --git a/src/admin/admin.go b/src/admin/admin.go index 996cabd58..54c1a124f 100644 --- a/src/admin/admin.go +++ b/src/admin/admin.go @@ -356,13 +356,15 @@ type DataUnit uint64 func (d DataUnit) String() string { switch { - case d > 1024*1024*1024*1024: - return fmt.Sprintf("%2.ftb", float64(d)/1024/1024/1024/1024) - case d > 1024*1024*1024: - return fmt.Sprintf("%2.fgb", float64(d)/1024/1024/1024) - case d > 1024*1024: - return fmt.Sprintf("%2.fmb", float64(d)/1024/1024) + case d >= 1024*1024*1024*1024: + return fmt.Sprintf("%2.1fTB", float64(d)/1024/1024/1024/1024) + case d >= 1024*1024*1024: + return fmt.Sprintf("%2.1fGB", float64(d)/1024/1024/1024) + case d >= 1024*1024: + return fmt.Sprintf("%2.1fMB", float64(d)/1024/1024) + case d >= 100: + return fmt.Sprintf("%2.1fKB", float64(d)/1024) default: - return fmt.Sprintf("%2.fkb", float64(d)/1024) + return fmt.Sprintf("%dB", d) } } diff --git a/src/admin/getpeers.go b/src/admin/getpeers.go index 2c2f8d8a1..34eca2436 100644 --- a/src/admin/getpeers.go +++ b/src/admin/getpeers.go @@ -28,6 +28,8 @@ type PeerEntry struct { Cost uint64 `json:"cost"` RXBytes DataUnit `json:"bytes_recvd,omitempty"` TXBytes DataUnit `json:"bytes_sent,omitempty"` + RXRate DataUnit `json:"rate_recvd,omitempty"` + TXRate DataUnit `json:"rate_sent,omitempty"` Uptime float64 `json:"uptime,omitempty"` Latency time.Duration `json:"latency_ms,omitempty"` LastErrorTime time.Duration `json:"last_error_time,omitempty"` @@ -47,6 +49,8 @@ func (a *AdminSocket) getPeersHandler(_ *GetPeersRequest, res *GetPeersResponse) URI: p.URI, RXBytes: DataUnit(p.RXBytes), TXBytes: DataUnit(p.TXBytes), + RXRate: DataUnit(p.RXRate), + TXRate: DataUnit(p.TXRate), Uptime: p.Uptime.Seconds(), } if p.Latency > 0 { diff --git a/src/core/api.go b/src/core/api.go index 2aa1ba879..cc1bde32a 100644 --- a/src/core/api.go +++ b/src/core/api.go @@ -33,6 +33,8 @@ type PeerInfo struct { Cost uint64 RXBytes uint64 TXBytes uint64 + RXRate uint64 + TXRate uint64 Uptime time.Duration Latency time.Duration } @@ -87,6 +89,8 @@ func (c *Core) GetPeers() []PeerInfo { peerinfo.Inbound = state.linkType == linkTypeIncoming peerinfo.RXBytes = atomic.LoadUint64(&c.rx) peerinfo.TXBytes = atomic.LoadUint64(&c.tx) + peerinfo.RXRate = atomic.LoadUint64(&c.rxrate) + peerinfo.TXRate = atomic.LoadUint64(&c.txrate) peerinfo.Uptime = time.Since(c.up) } if p, ok := conns[conn]; ok { diff --git a/src/core/link.go b/src/core/link.go index d7e5b1104..c21b8db5b 100644 --- a/src/core/link.go +++ b/src/core/link.go @@ -99,9 +99,36 @@ func (l *links) init(c *Core) error { l._links = make(map[linkInfo]*link) l._listeners = make(map[*Listener]context.CancelFunc) + l.Act(nil, l._updateAverages) return nil } +func (l *links) _updateAverages() { + select { + case <-l.core.ctx.Done(): + return + default: + } + + for _, l := range l._links { + if l._conn == nil { + continue + } + rx := atomic.LoadUint64(&l._conn.rx) + tx := atomic.LoadUint64(&l._conn.tx) + lastrx := atomic.LoadUint64(&l._conn.lastrx) + lasttx := atomic.LoadUint64(&l._conn.lasttx) + atomic.StoreUint64(&l._conn.rxrate, rx-lastrx) + atomic.StoreUint64(&l._conn.txrate, tx-lasttx) + atomic.StoreUint64(&l._conn.lastrx, rx) + atomic.StoreUint64(&l._conn.lasttx, tx) + } + + time.AfterFunc(time.Second, func() { + l.Act(nil, l._updateAverages) + }) +} + func (l *links) shutdown() { phony.Block(l, func() { for listener := range l._listeners { @@ -699,9 +726,13 @@ func urlForLinkInfo(u url.URL) url.URL { type linkConn struct { // tx and rx are at the beginning of the struct to ensure 64-bit alignment // on 32-bit platforms, see https://pkg.go.dev/sync/atomic#pkg-note-BUG - rx uint64 - tx uint64 - up time.Time + rx uint64 + tx uint64 + rxrate uint64 + txrate uint64 + lastrx uint64 + lasttx uint64 + up time.Time net.Conn }