Skip to content

Commit

Permalink
add stats for average write batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Dec 12, 2024
1 parent ff96e25 commit 1bafc79
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 10 deletions.
18 changes: 12 additions & 6 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,25 +512,31 @@ func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body
}

type NetCollector struct {
net *protocol.Net
d *prometheus.Desc
net *protocol.Net
read_buffers_size *prometheus.Desc
write_batch_size *prometheus.Desc
}

func NewNetCollector(net *protocol.Net) *NetCollector {
return &NetCollector{
net: net,
d: prometheus.NewDesc("chotki_net_read_buffer_size", "", []string{"peer"}, prometheus.Labels{}),
net: net,
read_buffers_size: prometheus.NewDesc("chotki_net_read_buffer_size", "", []string{"peer"}, prometheus.Labels{}),
write_batch_size: prometheus.NewDesc("chotki_net_write_batch_size", "", []string{"peer"}, prometheus.Labels{}),
}
}

func (n *NetCollector) Describe(d chan<- *prometheus.Desc) {
d <- n.d
d <- n.read_buffers_size
d <- n.write_batch_size
}

func (n *NetCollector) Collect(m chan<- prometheus.Metric) {
stats := n.net.GetStats()
for name, v := range stats.ReadBuffers {
m <- prometheus.MustNewConstMetric(n.d, prometheus.GaugeValue, float64(v), name)
m <- prometheus.MustNewConstMetric(n.read_buffers_size, prometheus.GaugeValue, float64(v), name)
}
for name, v := range stats.WriteBatches {
m <- prometheus.MustNewConstMetric(n.write_batch_size, prometheus.GaugeValue, float64(v), name)
}
}

Expand Down
8 changes: 6 additions & 2 deletions protocol/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,19 @@ func NewNet(log utils.Logger, install InstallCallback, destroy DestroyCallback,
}

type NetStats struct {
ReadBuffers map[string]int32
ReadBuffers map[string]int32
WriteBatches map[string]int32
}

func (n *Net) GetStats() NetStats {
stats := NetStats{
ReadBuffers: make(map[string]int32),
ReadBuffers: make(map[string]int32),
WriteBatches: make(map[string]int32),
}
n.conns.Range(func(name string, peer *Peer) bool {
if peer != nil {
stats.ReadBuffers[name] = peer.GetIncomingPacketBufferSize()
stats.WriteBatches[name] = int32(peer.writeBatchSize.Val())
}
return true
})
Expand Down Expand Up @@ -315,6 +318,7 @@ func (n *Net) keepPeer(name string, conn net.Conn) {
readAccumtTimeLimit: n.readAccumTimeLimit,
bufferMaxSize: n.bufferMaxSize,
bufferMinToProcess: n.bufferMinToProcess,
writeBatchSize: &utils.AvgVal{},
}
n.conns.Store(name, peer)

Expand Down
12 changes: 10 additions & 2 deletions protocol/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/drpcorg/chotki/utils"
)

type Peer struct {
closed atomic.Bool
wg sync.WaitGroup
closed atomic.Bool
wg sync.WaitGroup
writeBatchSize *utils.AvgVal

conn net.Conn
inout FeedDrainCloserTraced
Expand Down Expand Up @@ -138,6 +141,11 @@ func (p *Peer) keepWrite(ctx context.Context) error {
if err != nil {
return err
}
batchSize := 0
for _, r := range recs {
batchSize += len(r)
}
p.writeBatchSize.Add(float64(batchSize))

b := net.Buffers(recs)
for len(b) > 0 && err == nil {
Expand Down
29 changes: 29 additions & 0 deletions utils/avg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package utils

import "sync"

type AvgVal struct {
v float64
count int
lock sync.Mutex
}

func NewAvgVal(val float64) *AvgVal {
return &AvgVal{
v: val,
count: 1,
}
}

func (a *AvgVal) Add(val float64) {
a.lock.Lock()
defer a.lock.Unlock()
a.v = (float64(a.count)*a.v + val) / float64(a.count+1)
a.count++
}

func (a *AvgVal) Val() float64 {
a.lock.Lock()
defer a.lock.Unlock()
return a.v
}

0 comments on commit 1bafc79

Please sign in to comment.