Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMM-12743 test with recover #2673

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ VERSION_FLAGS = -X 'github.com/percona/pmm/version.ProjectName=pmm-agent' \
-X 'github.com/percona/pmm/version.Branch=$(PMM_RELEASE_BRANCH)'

release: ## Build static pmm-agent release binary (Linux only)
env CGO_ENABLED=1 go build -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent
env CGO_ENABLED=1 go build -race -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revert it

go build -v -ldflags "-extldflags '-static' $(VERSION_FLAGS)" -tags 'osusergo netgo static_build' -o $(PMM_RELEASE_PATH)/pmm-agent-entrypoint ./cmd/pmm-agent-entrypoint
$(PMM_RELEASE_PATH)/pmm-agent --version
ldd $(PMM_RELEASE_PATH)/pmm-agent 2>&1 | grep -Fq 'not a dynamic executable'
Expand Down
2 changes: 1 addition & 1 deletion agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ func (c *Client) Connect(ctx context.Context) error {
}()
c.supervisor.ClearChangesChannel()
c.SendActualStatuses()
c.cache.SetSender(dialResult.channel)

c.rw.Lock()
c.md = dialResult.md
c.channel = dialResult.channel
c.cache.SetSender(dialResult.channel)
c.rw.Unlock()

c.processChannelRequests(ctx)
Expand Down
115 changes: 67 additions & 48 deletions agent/utils/buffer-ring/bigqueue/bigqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
wg sync.WaitGroup

sendLock sync.RWMutex
recvLock sync.RWMutex
recvLock sync.Mutex
totalSize int64 // represent the limit after which old data will be overwritten

sender atomic.Pointer[models.Sender]
Expand Down Expand Up @@ -123,20 +123,20 @@

r.recvLock.Lock()
defer r.recvLock.Unlock()
if r.isEmpty() && s != nil {
err = (*s).Send(resp)
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
} else {
return err
if !r.isEmpty() || s == nil {
r.push(msg)
return nil
}
err = (*s).Send(resp)
if errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
r.push(msg)
}
return nil
}

r.push(msg)
return nil
return err
}

// SendAndWaitResponse stores AgentMessageRequestPayload on nil channel.
Expand All @@ -149,20 +149,23 @@

r.recvLock.Lock()
defer r.recvLock.Unlock()
if r.isEmpty() && s != nil {
resp, err = (*s).SendAndWaitResponse(payload)
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
} else {
return resp, err

if !r.isEmpty() || s == nil {
r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()})
return nil, nil
}

resp, err = (*s).SendAndWaitResponse(payload)
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()})
return nil, nil
}

r.push(&agentpb.AgentMessage{Payload: payload.AgentMessageRequestPayload()})
return &agentpb.StateChangedResponse{}, nil
return resp, err
}

// SetSender check and set sender and notify sender loop.
Expand All @@ -179,6 +182,8 @@
default:
close(r.done)
r.wg.Wait()
r.recvLock.Lock()
defer r.recvLock.Unlock()
if err := r.fq.Close(); err != nil {
r.l.Errorf("closing cache: %+v", err)
}
Expand All @@ -190,6 +195,7 @@
return r.fq.IsEmpty()
}

// push inserts message to cache.
func (r *Ring) push(msg *agentpb.AgentMessage) {
b, err := proto.Marshal(msg)
if err != nil {
Expand Down Expand Up @@ -261,6 +267,7 @@
}()
}

// sendInLoop sends messages from cache to sender.
func (r *Ring) sendInLoop() {
var s *models.Sender
for {
Expand All @@ -284,35 +291,47 @@
return
default:
}
r.recvLock.Lock()
_, b, err := r.fq.Peek()
r.recvLock.Unlock()
if err != nil {
r.l.Errorf("reading entry from cache: %+v", err)
}
if b == nil {
break
}
var m agentpb.AgentMessage
if err := proto.Unmarshal(b, &m); err != nil {
r.l.Errorf("unmarshal entry from cache: %+v", err)
} else if err = r.send(*s, &m); err != nil {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
break
}
r.recvLock.Lock()
r.fq.Skip(1) //nolint:errcheck
r.recvLock.Unlock()
count++
count += r.peekAndSend(s)
}
if count > 0 {

Check failure on line 296 in agent/utils/buffer-ring/bigqueue/bigqueue.go

View workflow job for this annotation

GitHub Actions / Checks

unreachable: unreachable code (govet)
asyncNotify(r.gcCh)
}
}

func (r *Ring) peekAndSend(s *models.Sender) int {
defer func() {
if rc := recover(); rc != nil {
r.l.Errorf("panic in peekAndSend: %v", rc)
}
}()
r.recvLock.Lock()
r.l.Debugf("status: %v", r.fq.Status())
idx, b, err := r.fq.Peek()
r.recvLock.Unlock()
if err != nil {
r.l.Errorf("reading entry from cache: %+v", err)
}
if len(b) == 0 {
return 0
}
r.l.Debugf("resending %d: %s", idx, string(b))
var m agentpb.AgentMessage
err = proto.Unmarshal(b, &m)
if err != nil {
r.l.Errorf("unmarshal entry from cache: %+v", err)
} else if err = r.send(*s, &m); err != nil {
if r.sender.CompareAndSwap(s, nil) {
asyncRelease(r.establishCh)
r.l.Debugf("sender released: %v", err)
}
return 0
}
r.recvLock.Lock()
r.fq.Skip(1) //nolint:errcheck
r.recvLock.Unlock()
return 1
}

// initPaths creates all paths for queue to use. Original repo creates directories with perm error.
func initPaths(dir string) error {
for _, path := range []string{
Expand Down Expand Up @@ -405,7 +424,7 @@
case *agentpb.AgentMessage_StateChanged:
_, err = s.SendAndWaitResponse(p.StateChanged)
default:
r.l.Errorf("unknown message: %T", m)
r.l.Warnf("unknown message in cache: %T", m)
return nil
}
if err != nil && errors.As(err, &agenterrors.ErrChanConn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,10 @@
method: GET
retries: 120
delay: 1

- name: Disable maintenance mode
file:
state: absent
path: /usr/share/pmm-server/maintenance/maintenance.html
# We use current_version_file['failed'] because we don't want to run this on creating container
when: docker_upgrade


- name: Disable maintenance mode
file:
state: absent
path: /usr/share/pmm-server/maintenance/maintenance.html
Loading