Skip to content

Commit

Permalink
PMM-12743 test with recover
Browse files Browse the repository at this point in the history
  • Loading branch information
BupycHuk committed Dec 4, 2023
1 parent bef98bf commit 84384ad
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 56 deletions.
2 changes: 1 addition & 1 deletion agent/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ VERSION_FLAGS = -X 'github.com/percona/pmm/version.ProjectName=pmm-agent' \
-X 'github.com/percona/pmm/version.Branch=$(PMM_RELEASE_BRANCH)'

release: ## Build static pmm-agent release binary (Linux only)
env CGO_ENABLED=1 go build -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent
env CGO_ENABLED=1 go build -race -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent
go build -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent-entrypoint ./cmd/pmm-agent-entrypoint
$(PMM_RELEASE_PATH)/pmm-agent --version
ldd $(PMM_RELEASE_PATH)/pmm-agent 2>&1 | grep -Fq 'not a dynamic executable'
Expand Down
2 changes: 1 addition & 1 deletion agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ func (c *Client) Connect(ctx context.Context) error {
}()
c.supervisor.ClearChangesChannel()
c.SendActualStatuses()
c.cache.SetSender(dialResult.channel)

c.rw.Lock()
c.md = dialResult.md
c.channel = dialResult.channel
c.cache.SetSender(dialResult.channel)
c.rw.Unlock()

c.processChannelRequests(ctx)
Expand Down
115 changes: 67 additions & 48 deletions agent/utils/buffer-ring/bigqueue/bigqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Ring struct {
wg sync.WaitGroup

sendLock sync.RWMutex
recvLock sync.RWMutex
recvLock sync.Mutex
totalSize int64 // represent the limit after which old data will be overwritten

sender atomic.Pointer[models.Sender]
Expand Down Expand Up @@ -123,20 +123,20 @@ func (r *Ring) Send(resp *models.AgentResponse) error {

r.recvLock.Lock()
defer r.recvLock.Unlock()
if r.isEmpty() && s != nil {
err = (*s).Send(resp)
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
} else {
return err
if !r.isEmpty() || s == nil {
r.push(msg)
return nil
}
err = (*s).Send(resp)
if errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
r.push(msg)
}
return nil
}

r.push(msg)
return nil
return err
}

// SendAndWaitResponse stores AgentMessageRequestPayload on nil channel.
Expand All @@ -149,20 +149,23 @@ func (r *Ring) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agentpb

r.recvLock.Lock()
defer r.recvLock.Unlock()
if r.isEmpty() && s != nil {
resp, err = (*s).SendAndWaitResponse(payload)
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
} else {
return resp, err

if !r.isEmpty() || s == nil {
r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()})
return nil, nil
}

resp, err = (*s).SendAndWaitResponse(payload)
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()})
return nil, nil
}

r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()})
return &agentpb.StateChangedResponse{}, nil
return resp, err
}

// SetSender check and set sender and notify sender loop.
Expand All @@ -179,6 +182,8 @@ func (r *Ring) Close() {
default:
close(r.done)
r.wg.Wait()
r.recvLock.Lock()
defer r.recvLock.Unlock()
if err := r.fq.Close(); err != nil {
r.l.Errorf("closing cache: %+v", err)
}
Expand All @@ -190,6 +195,7 @@ func (r *Ring) isEmpty() bool {
return r.fq.IsEmpty()
}

// push inserts message to cache.
func (r *Ring) push(msg *agentpb.AgentMessage) {
b, err := proto.Marshal(msg)
if err != nil {
Expand Down Expand Up @@ -261,6 +267,7 @@ func (r *Ring) sendRunner() {
}()
}

// sendInLoop sends messages from cache to sender.
func (r *Ring) sendInLoop() {
var s *models.Sender
for {
Expand All @@ -284,35 +291,47 @@ func (r *Ring) sendInLoop() {
return
default:
}
r.recvLock.Lock()
_, b, err := r.fq.Peek()
r.recvLock.Unlock()
if err != nil {
r.l.Errorf("reading entry from cache: %+v", err)
}
if b == nil {
break
}
var m agentpb.AgentMessage
if err := proto.Unmarshal(b, &m); err != nil {
r.l.Errorf("unmarshal entry from cache: %+v", err)
} else if err = r.send(*s, &m); err != nil {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
break
}
r.recvLock.Lock()
r.fq.Skip(1) //nolint:errcheck
r.recvLock.Unlock()
count++
count += r.peekAndSend(s)
}
if count > 0 {
asyncNotify(r.gcCh)
}
}

func (r *Ring) peekAndSend(s *models.Sender) int {
defer func() {
if rc := recover(); rc != nil {
r.l.Errorf("panic in peekAndSend: %v", rc)
}
}()
r.recvLock.Lock()
r.l.Debugf("status: %v", r.fq.Status())
idx, b, err := r.fq.Peek()
r.recvLock.Unlock()
if err != nil {
r.l.Errorf("reading entry from cache: %+v", err)
}
if len(b) == 0 {
return 0
}
r.l.Debugf("resending %d: %s", idx, string(b))
var m agentpb.AgentMessage
err = proto.Unmarshal(b, &m)
if err != nil {
r.l.Errorf("unmarshal entry from cache: %+v", err)
} else if err = r.send(*s, &m); err != nil {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
return 0
}
r.recvLock.Lock()
r.fq.Skip(1) //nolint:errcheck
r.recvLock.Unlock()
return 1
}

// initPaths creates all paths for queue to use. Original repo creates directories with perm error.
func initPaths(dir string) error {
for _, path := range []string{
Expand Down Expand Up @@ -405,7 +424,7 @@ func (r *Ring) send(s models.Sender, m *agentpb.AgentMessage) error {
case *agentpb.AgentMessage_StateChanged:
_, err = s.SendAndWaitResponse(p.StateChanged)
default:
r.l.Errorf("unknown message: %T", m)
r.l.Warnf("unknown message in cache: %T", m)
return nil
}
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,10 @@
method: GET
retries: 120
delay: 1

- name: Disable maintenance mode
file:
state: absent
path: /usr/share/pmm-server/maintenance/maintenance.html
# We use current_version_file['failed'] because we don't want to run this on creating container
when: docker_upgrade


- name: Disable maintenance mode
file:
state: absent
path: /usr/share/pmm-server/maintenance/maintenance.html

0 comments on commit 84384ad

Please sign in to comment.