From 1bafc79e921ee3503e811648757f7caeab25e764 Mon Sep 17 00:00:00 2001 From: Termina1 Date: Thu, 12 Dec 2024 18:41:06 +0200 Subject: [PATCH] add stats for average write batch size --- chotki.go | 18 ++++++++++++------ protocol/net.go | 8 ++++++-- protocol/peer.go | 12 ++++++++++-- utils/avg.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 10 deletions(-) create mode 100644 utils/avg.go diff --git a/chotki.go b/chotki.go index f85b40d..0746146 100644 --- a/chotki.go +++ b/chotki.go @@ -512,25 +512,31 @@ func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body } type NetCollector struct { - net *protocol.Net - d *prometheus.Desc + net *protocol.Net + read_buffers_size *prometheus.Desc + write_batch_size *prometheus.Desc } func NewNetCollector(net *protocol.Net) *NetCollector { return &NetCollector{ - net: net, - d: prometheus.NewDesc("chotki_net_read_buffer_size", "", []string{"peer"}, prometheus.Labels{}), + net: net, + read_buffers_size: prometheus.NewDesc("chotki_net_read_buffer_size", "", []string{"peer"}, prometheus.Labels{}), + write_batch_size: prometheus.NewDesc("chotki_net_write_batch_size", "", []string{"peer"}, prometheus.Labels{}), } } func (n *NetCollector) Describe(d chan<- *prometheus.Desc) { - d <- n.d + d <- n.read_buffers_size + d <- n.write_batch_size } func (n *NetCollector) Collect(m chan<- prometheus.Metric) { stats := n.net.GetStats() for name, v := range stats.ReadBuffers { - m <- prometheus.MustNewConstMetric(n.d, prometheus.GaugeValue, float64(v), name) + m <- prometheus.MustNewConstMetric(n.read_buffers_size, prometheus.GaugeValue, float64(v), name) + } + for name, v := range stats.WriteBatches { + m <- prometheus.MustNewConstMetric(n.write_batch_size, prometheus.GaugeValue, float64(v), name) } } diff --git a/protocol/net.go b/protocol/net.go index 22bbe53..f6837f2 100644 --- a/protocol/net.go +++ b/protocol/net.go @@ -113,16 +113,19 @@ func NewNet(log utils.Logger, install InstallCallback, destroy DestroyCallback, } type NetStats struct { - ReadBuffers map[string]int32 + ReadBuffers map[string]int32 + WriteBatches map[string]int32 } func (n *Net) GetStats() NetStats { stats := NetStats{ - ReadBuffers: make(map[string]int32), + ReadBuffers: make(map[string]int32), + WriteBatches: make(map[string]int32), } n.conns.Range(func(name string, peer *Peer) bool { if peer != nil { stats.ReadBuffers[name] = peer.GetIncomingPacketBufferSize() + stats.WriteBatches[name] = int32(peer.writeBatchSize.Val()) } return true }) @@ -315,6 +318,7 @@ func (n *Net) keepPeer(name string, conn net.Conn) { readAccumtTimeLimit: n.readAccumTimeLimit, bufferMaxSize: n.bufferMaxSize, bufferMinToProcess: n.bufferMinToProcess, + writeBatchSize: &utils.AvgVal{}, } n.conns.Store(name, peer) diff --git a/protocol/peer.go b/protocol/peer.go index cdbc971..a35c66b 100644 --- a/protocol/peer.go +++ b/protocol/peer.go @@ -11,11 +11,14 @@ import ( "sync" "sync/atomic" "time" + + "github.com/drpcorg/chotki/utils" ) type Peer struct { - closed atomic.Bool - wg sync.WaitGroup + closed atomic.Bool + wg sync.WaitGroup + writeBatchSize *utils.AvgVal conn net.Conn inout FeedDrainCloserTraced @@ -138,6 +141,11 @@ func (p *Peer) keepWrite(ctx context.Context) error { if err != nil { return err } + batchSize := 0 + for _, r := range recs { + batchSize += len(r) + } + p.writeBatchSize.Add(float64(batchSize)) b := net.Buffers(recs) for len(b) > 0 && err == nil { diff --git a/utils/avg.go b/utils/avg.go new file mode 100644 index 0000000..a5aa188 --- /dev/null +++ b/utils/avg.go @@ -0,0 +1,29 @@ +package utils + +import "sync" + +type AvgVal struct { + v float64 + count int + lock sync.Mutex +} + +func NewAvgVal(val float64) *AvgVal { + return &AvgVal{ + v: val, + count: 1, + } +} + +func (a *AvgVal) Add(val float64) { + a.lock.Lock() + defer a.lock.Unlock() + a.v = (float64(a.count)*a.v + val) / float64(a.count+1) + a.count++ +} + +func (a *AvgVal) Val() float64 { + a.lock.Lock() + defer a.lock.Unlock() + return a.v +}