diff --git a/agent/Makefile b/agent/Makefile index a2334a7e7d..a4cfc32532 100644 --- a/agent/Makefile +++ b/agent/Makefile @@ -25,7 +25,7 @@ VERSION_FLAGS = -X 'github.com/percona/pmm/version.ProjectName=pmm-agent' \ -X 'github.com/percona/pmm/version.Branch=$(PMM_RELEASE_BRANCH)' release: ## Build static pmm-agent release binary (Linux only) - env CGO_ENABLED=1 go build -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent + env CGO_ENABLED=1 go build -race -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent go build -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent-entrypoint ./cmd/pmm-agent-entrypoint $(PMM_RELEASE_PATH)/pmm-agent --version ldd $(PMM_RELEASE_PATH)/pmm-agent 2>&1 | grep -Fq 'not a dynamic executable' diff --git a/agent/client/client.go b/agent/client/client.go index f6ba5547a7..bf8888ef21 100644 --- a/agent/client/client.go +++ b/agent/client/client.go @@ -173,11 +173,11 @@ func (c *Client) Connect(ctx context.Context) error { }() c.supervisor.ClearChangesChannel() c.SendActualStatuses() - c.cache.SetSender(dialResult.channel) c.rw.Lock() c.md = dialResult.md c.channel = dialResult.channel + c.cache.SetSender(dialResult.channel) c.rw.Unlock() c.processChannelRequests(ctx) diff --git a/agent/utils/buffer-ring/bigqueue/bigqueue.go b/agent/utils/buffer-ring/bigqueue/bigqueue.go index 8bc35b8472..67ee768774 100644 --- a/agent/utils/buffer-ring/bigqueue/bigqueue.go +++ b/agent/utils/buffer-ring/bigqueue/bigqueue.go @@ -56,7 +56,7 @@ type Ring struct { wg sync.WaitGroup sendLock sync.RWMutex - recvLock sync.RWMutex + recvLock sync.Mutex totalSize int64 // represent the limit after which old data will be overwritten sender atomic.Pointer[models.Sender] @@ -123,20 +123,20 @@ func (r *Ring) Send(resp *models.AgentResponse) error { r.recvLock.Lock() defer r.recvLock.Unlock() - if r.isEmpty() && s != nil { - err = (*s).Send(resp) - if err != nil && errors.As(err, &agenterrors.ErrChanConn) { - if r.sender.CompareAndSwap(s, nil) { - asyncRelease(r.establishCh) - r.l.Debugf("sender released: %v", err) - } - } else { - return err + if !r.isEmpty() || s == nil { + r.push(msg) + return nil + } + err = (*s).Send(resp) + if errors.As(err, &agenterrors.ErrChanConn) { + if r.sender.CompareAndSwap(s, nil) { + asyncRelease(r.establishCh) + r.l.Debugf("sender released: %v", err) + r.push(msg) } + return nil } - - r.push(msg) - return nil + return err } // SendAndWaitResponse stores AgentMessageRequestPayload on nil channel. @@ -149,20 +149,23 @@ func (r *Ring) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agentpb r.recvLock.Lock() defer r.recvLock.Unlock() - if r.isEmpty() && s != nil { - resp, err = (*s).SendAndWaitResponse(payload) - if err != nil && errors.As(err, &agenterrors.ErrChanConn) { - if r.sender.CompareAndSwap(s, nil) { - asyncRelease(r.establishCh) - r.l.Debugf("sender released: %v", err) - } - } else { - return resp, err + + if !r.isEmpty() || s == nil { + r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()}) + return nil, nil + } + + resp, err = (*s).SendAndWaitResponse(payload) + if err != nil && errors.As(err, &agenterrors.ErrChanConn) { + if r.sender.CompareAndSwap(s, nil) { + asyncRelease(r.establishCh) + r.l.Debugf("sender released: %v", err) } + r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()}) + return nil, nil } - r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()}) - return &agentpb.StateChangedResponse{}, nil + return resp, err } // SetSender check and set sender and notify sender loop. @@ -179,6 +182,8 @@ func (r *Ring) Close() { default: close(r.done) r.wg.Wait() + r.recvLock.Lock() + defer r.recvLock.Unlock() if err := r.fq.Close(); err != nil { r.l.Errorf("closing cache: %+v", err) } @@ -190,6 +195,7 @@ func (r *Ring) isEmpty() bool { return r.fq.IsEmpty() } +// push inserts message to cache. func (r *Ring) push(msg *agentpb.AgentMessage) { b, err := proto.Marshal(msg) if err != nil { @@ -261,6 +267,7 @@ func (r *Ring) sendRunner() { }() } +// sendInLoop sends messages from cache to sender. func (r *Ring) sendInLoop() { var s *models.Sender for { @@ -284,35 +291,47 @@ func (r *Ring) sendInLoop() { return default: } - r.recvLock.Lock() - _, b, err := r.fq.Peek() - r.recvLock.Unlock() - if err != nil { - r.l.Errorf("reading entry from cache: %+v", err) - } - if b == nil { - break - } - var m agentpb.AgentMessage - if err := proto.Unmarshal(b, &m); err != nil { - r.l.Errorf("unmarshal entry from cache: %+v", err) - } else if err = r.send(*s, &m); err != nil { - if r.sender.CompareAndSwap(s, nil) { - asyncRelease(r.establishCh) - r.l.Debugf("sender released: %v", err) - } - break - } - r.recvLock.Lock() - r.fq.Skip(1) //nolint:errcheck - r.recvLock.Unlock() - count++ + count += r.peekAndSend(s) } if count > 0 { asyncNotify(r.gcCh) } } +func (r *Ring) peekAndSend(s *models.Sender) int { + defer func() { + if rc := recover(); rc != nil { + r.l.Errorf("panic in peekAndSend: %v", rc) + } + }() + r.recvLock.Lock() + r.l.Debugf("status: %v", r.fq.Status()) + idx, b, err := r.fq.Peek() + r.recvLock.Unlock() + if err != nil { + r.l.Errorf("reading entry from cache: %+v", err) + } + if len(b) == 0 { + return 0 + } + r.l.Debugf("resending %d: %s", idx, string(b)) + var m agentpb.AgentMessage + err = proto.Unmarshal(b, &m) + if err != nil { + r.l.Errorf("unmarshal entry from cache: %+v", err) + } else if err = r.send(*s, &m); err != nil { + if r.sender.CompareAndSwap(s, nil) { + asyncRelease(r.establishCh) + r.l.Debugf("sender released: %v", err) + } + return 0 + } + r.recvLock.Lock() + r.fq.Skip(1) //nolint:errcheck + r.recvLock.Unlock() + return 1 +} + // initPaths creates all paths for queue to use. Original repo creates directories with perm error. func initPaths(dir string) error { for _, path := range []string{ @@ -405,7 +424,7 @@ func (r *Ring) send(s models.Sender, m *agentpb.AgentMessage) error { case *agentpb.AgentMessage_StateChanged: _, err = s.SendAndWaitResponse(p.StateChanged) default: - r.l.Errorf("unknown message: %T", m) + r.l.Warnf("unknown message in cache: %T", m) return nil } if err != nil && errors.As(err, &agenterrors.ErrChanConn) { diff --git a/update/ansible/playbook/tasks/roles/initialization/tasks/main.yml b/update/ansible/playbook/tasks/roles/initialization/tasks/main.yml index 5113096993..81c57b7ff4 100644 --- a/update/ansible/playbook/tasks/roles/initialization/tasks/main.yml +++ b/update/ansible/playbook/tasks/roles/initialization/tasks/main.yml @@ -220,12 +220,10 @@ method: GET retries: 120 delay: 1 - - - name: Disable maintenance mode - file: - state: absent - path: /usr/share/pmm-server/maintenance/maintenance.html # We use current_version_file['failed'] because we don't want to run this on creating container when: docker_upgrade - +- name: Disable maintenance mode + file: + state: absent + path: /usr/share/pmm-server/maintenance/maintenance.html