Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "PMM-8655 Send unsent messages after connection problems (#1970)" #2676

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions agent/agents/mysql/perfschema/perfschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"io"
"math"
"sync"
"time"

"github.com/AlekSi/pointer" // register SQL driver
Expand All @@ -46,18 +45,6 @@ type (
summaryMap map[string]*eventsStatementsSummaryByDigest
)

// mySQLVersion contains.
type mySQLVersion struct {
version float64
vendor string
}

// versionsCache provides cached access to MySQL version.
type versionsCache struct {
rw sync.RWMutex
items map[string]*mySQLVersion
}

const (
retainHistory = 5 * time.Minute
refreshHistory = 5 * time.Second
Expand Down
108 changes: 0 additions & 108 deletions agent/client/cache/cache.go

This file was deleted.

64 changes: 0 additions & 64 deletions agent/client/cache/dummy.go

This file was deleted.

37 changes: 18 additions & 19 deletions agent/client/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"github.com/percona/pmm/agent/models"
agenterrors "github.com/percona/pmm/agent/utils/errors"
"github.com/percona/pmm/api/agentpb"
)

Expand All @@ -48,6 +46,15 @@ type ServerRequest struct {
Payload agentpb.ServerRequestPayload
}

// AgentResponse represents agent's response.
// It is similar to agentpb.AgentMessage except it can contain only responses,
// and the payload is already unwrapped (XXX instead of AgentMessage_XXX).
type AgentResponse struct {
ID uint32
Status *grpcstatus.Status
Payload agentpb.AgentResponsePayload
}

// Response is a type used to pass response from pmm-server to the subscriber.
type Response struct {
Payload agentpb.ServerResponsePayload
Expand Down Expand Up @@ -111,7 +118,7 @@ func New(stream agentpb.Agent_ConnectClient) *Channel {
func (c *Channel) close(err error) {
c.closeOnce.Do(func() {
c.l.Debugf("Closing with error: %+v", err)
c.closeErr = agenterrors.NewChannelClosedError(err)
c.closeErr = err

c.m.Lock()
for _, ch := range c.responses { // unblock all subscribers
Expand Down Expand Up @@ -141,7 +148,7 @@ func (c *Channel) Requests() <-chan *ServerRequest {
}

// Send sends message to pmm-managed. It is no-op once channel is closed (see Wait).
func (c *Channel) Send(resp *models.AgentResponse) error {
func (c *Channel) Send(resp *AgentResponse) {
msg := &agentpb.AgentMessage{
Id: resp.ID,
}
Expand All @@ -151,7 +158,7 @@ func (c *Channel) Send(resp *models.AgentResponse) error {
if resp.Status != nil {
msg.Status = resp.Status.Proto()
}
return c.send(msg)
c.send(msg)
}

// SendAndWaitResponse sends request to pmm-managed, blocks until response is available.
Expand All @@ -162,24 +169,21 @@ func (c *Channel) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agen
id := atomic.AddUint32(&c.lastSentRequestID, 1)
ch := c.subscribe(id)

err := c.send(&agentpb.AgentMessage{
c.send(&agentpb.AgentMessage{
Id: id,
Payload: payload.AgentMessageRequestPayload(),
})
if err != nil {
return nil, err
}

resp := <-ch
return resp.Payload, resp.Error
}

func (c *Channel) send(msg *agentpb.AgentMessage) error {
func (c *Channel) send(msg *agentpb.AgentMessage) {
c.sendM.Lock()
select {
case <-c.closeWait:
c.sendM.Unlock()
return c.Wait()
return
default:
}

Expand All @@ -197,12 +201,10 @@ func (c *Channel) send(msg *agentpb.AgentMessage) error {
err := c.s.Send(msg)
c.sendM.Unlock()
if err != nil {
err = errors.Wrap(err, "failed to send message")
c.close(err)
return agenterrors.NewChannelClosedError(err)
c.close(errors.Wrap(err, "failed to send message"))
return
}
c.mSend.Inc()
return nil
}

// runReader receives messages from server.
Expand Down Expand Up @@ -312,13 +314,10 @@ func (c *Channel) runReceiver() {
c.l.Warnf("pmm-managed was not able to process message with id: %d, handling of that payload type is unimplemented", msg.Id)
continue
}
err := c.Send(&models.AgentResponse{
c.Send(&AgentResponse{
ID: msg.Id,
Status: grpcstatus.New(codes.Unimplemented, "can't handle message type sent, it is not implemented"),
})
if err != nil {
c.l.Error(err)
}
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions agent/client/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/percona/pmm/agent/models"
"github.com/percona/pmm/agent/utils/truncate"
"github.com/percona/pmm/api/agentpb"
)
Expand Down Expand Up @@ -150,6 +149,7 @@ func TestAgentRequestWithTruncatedInvalidUTF8(t *testing.T) {
Mysql: &agentpb.MetricsBucket_MySQL{},
}}
resp, err = channel.SendAndWaitResponse(&request)
require.NoError(t, err)
assert.Nil(t, resp)
}

Expand Down Expand Up @@ -248,13 +248,12 @@ func TestServerRequest(t *testing.T) {
for req := range channel.Requests() {
assert.IsType(t, &agentpb.Ping{}, req.Payload)

err := channel.Send(&models.AgentResponse{
channel.Send(&AgentResponse{
ID: req.ID,
Payload: &agentpb.Pong{
CurrentTime: timestamppb.Now(),
},
})
assert.NoError(t, err)
}
}

Expand Down Expand Up @@ -417,11 +416,10 @@ func TestUnexpectedResponsePayloadFromServer(t *testing.T) {
channel, _, teardown := setup(t, connect, io.EOF)
defer teardown()
req := <-channel.Requests()
err := channel.Send(&models.AgentResponse{
channel.Send(&AgentResponse{
ID: req.ID,
Payload: &agentpb.Pong{
CurrentTime: timestamppb.Now(),
},
})
assert.NoError(t, err)
}
Loading