Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Raw MySQL packets when passing results back from VTTablet to VTGate #17080

Closed
wants to merge 11 commits into from
58 changes: 58 additions & 0 deletions go/mathstats/ewma.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mathstats

import (
"math"
"sync/atomic"
)

type SimpleEWMA struct {
// The current value of the average. After adding with Add(), this is
// updated to reflect the average of all values seen thus far.
value atomic.Uint64
}

// Add adds a value to the series and updates the moving average.
func (e *SimpleEWMA) Add(value uint64) {
const (
AVG_METRIC_AGE float64 = 30.0
DECAY float64 = 2 / (float64(AVG_METRIC_AGE) + 1)
)

fvalue := float64(value)

for {
v := e.value.Load()

var nv float64
if v == 0 {
nv = fvalue
} else {
nv = (fvalue * DECAY) + (math.Float64frombits(v) * (1 - DECAY))
}

if e.value.CompareAndSwap(v, math.Float64bits(nv)) {
return
}
}
}

// Value returns the current value of the moving average.
func (e *SimpleEWMA) Value() uint64 {
return uint64(math.Float64frombits(e.value.Load()))
}
32 changes: 32 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,38 @@ func (c *Conn) readHeaderFrom(r io.Reader) (int, error) {
return int(uint32(c.header[0]) | uint32(c.header[1])<<8 | uint32(c.header[2])<<16), nil
}

func (c *Conn) readPacketAsProto(b *queryResultBuilder) ([]byte, error) {
r := c.getReader()

length, err := c.readHeaderFrom(r)
if err != nil {
return nil, err
}

if length == 0 {
// This can be caused by the packet after a packet of
// exactly size MaxPacketSize.
return nil, nil
}

// Use the bufPool.
if length < MaxPacketSize {
buf := b.Packet(length)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, vterrors.Wrapf(err, "io.ReadFull(packet body of length %v) failed", length)
}
return buf, nil
}

// This packet came from MySQL, so how is it sending us a packet
// that's beyond the max size???

// Much slower path, revert to allocating everything from scratch.
// We're going to concatenate a lot of data anyway, can't really
// optimize this code path easily.
panic("TODO: large packets")
}

// readEphemeralPacket attempts to read a packet into buffer from sync.Pool. Do
// not use this method if the contents of the packet needs to be kept
// after the next readEphemeralPacket.
Expand Down
Loading
Loading