Skip to content

Commit

Permalink
Merge pull request #253 from newrelic/fix/warning-stops-mbeans-proces…
Browse files Browse the repository at this point in the history
…sing

fix: warning stops mbeans processing
  • Loading branch information
varas authored Jan 15, 2021
2 parents 1601955 + 22f38a2 commit a262d94
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
17 changes: 15 additions & 2 deletions jmx/jmx.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func doQuery(ctx context.Context, out chan []byte, queryErrC chan error, querySt
queryErrC <- fmt.Errorf("reading nrjmx stdout: %s", err.Error())
}
out <- b
return
}
}

Expand All @@ -341,6 +342,7 @@ func Query(objectPattern string, timeoutMillis int) (result map[string]interface

// receiveResult checks for channels to receive result from nrjmx command.
func receiveResult(lineC chan []byte, queryErrC chan error, cancelFn context.CancelFunc, objectPattern string, timeout time.Duration) (result map[string]interface{}, err error) {
defer logAvailableWarnings(cmdWarnC)
var warn string
for {
select {
Expand All @@ -361,12 +363,11 @@ func receiveResult(lineC chan []byte, queryErrC chan error, cancelFn context.Can
for k, v := range r {
result[k] = v
}

return

case warn = <-cmdWarnC:
// change on the API is required to return warnings
log.Warn(warn)
return

case err = <-cmdErrC:
return
Expand All @@ -383,3 +384,15 @@ func receiveResult(lineC chan []byte, queryErrC chan error, cancelFn context.Can
}
}
}

func logAvailableWarnings(channel chan string) {
var warn string
for {
select {
case warn = <-channel:
log.Warn(warn)
default:
return
}
}
}
20 changes: 15 additions & 5 deletions jmx/jmx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jmx

import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
Expand All @@ -10,6 +11,7 @@ import (
"testing"
"time"

"github.com/newrelic/infra-integrations-sdk/v4/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -179,17 +181,25 @@ func openWaitWithSSL(hostname, port, username, password, keyStore, keyStorePassw
}

func Test_receiveResult_warningsDoNotBreakResultReception(t *testing.T) {

var buf bytes.Buffer
log.SetOutput(&buf)

_, cancelFn := context.WithCancel(context.Background())

resultCh := make(chan []byte, 1)
queryErrCh := make(chan error)
outTimeout := time.Duration(timeoutMillis) * time.Millisecond
warningMessage := fmt.Sprint("WARNING foo bar")
cmdWarnC <- warningMessage

_, _ = receiveResult(resultCh, queryErrCh, cancelFn, "empty", outTimeout)
resultCh <- []byte("{\"foo\":1}")

cmdErrC <- fmt.Errorf("WARNING foo bar")
assert.Equal(t, <-cmdErrC, fmt.Errorf("WARNING foo bar"))
result, err := receiveResult(resultCh, queryErrCh, cancelFn, "foo", outTimeout)

resultCh <- []byte("{foo}")
assert.Equal(t, string(<-resultCh), "{foo}")
assert.NoError(t, err)
assert.Equal(t, map[string]interface{}{
"foo": 1.,
}, result)
assert.Equal(t, fmt.Sprintf("[WARN] %s\n", warningMessage), buf.String())
}
5 changes: 5 additions & 0 deletions log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func SetupLogging(verbose bool) {
}
}

// SetOutput sets output stream
func SetOutput(w io.Writer) {
globalLogger.logger.SetOutput(w)
}

// Debug logs a formatted message at level Debug.
func Debug(format string, args ...interface{}) {
globalLogger.Debugf(format, args...)
Expand Down

0 comments on commit a262d94

Please sign in to comment.