Skip to content

Commit

Permalink
Change hook logger to overcome the 64k scanner limit (#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
yalosev authored Jul 11, 2023
1 parent 55ca7a9 commit 97a2b07
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 73 deletions.
116 changes: 43 additions & 73 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package executor

import (
"bufio"
"encoding/json"
"io"
"os/exec"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -38,52 +35,21 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string) (*CmdUsage, erro

logEntry.Debugf("Executing command '%s' in '%s' dir", strings.Join(cmd.Args, " "), cmd.Dir)

var wg sync.WaitGroup

stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}

stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
if app.LogProxyHookJSON {
plo := &proxyJSONLogger{stdoutLogEntry, make([]byte, 0)}
ple := &proxyJSONLogger{stderrLogEntry, make([]byte, 0)}
cmd.Stdout = plo
cmd.Stderr = ple
} else {
cmd.Stdout = stdoutLogEntry.Writer()
cmd.Stderr = stderrLogEntry.Writer()
}

err = cmd.Start()
err := cmd.Run()
if err != nil {
return nil, err
}

wg.Add(2)
go func() {
defer wg.Done()
if app.LogProxyHookJSON {
proxyJSONLogs(stdout, stdoutLogEntry)
} else {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
stdoutLogEntry.Info(scanner.Text())
}
}
}()

go func() {
defer wg.Done()
if app.LogProxyHookJSON {
proxyJSONLogs(stderr, stderrLogEntry)
} else {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
stderrLogEntry.Info(scanner.Text())
}
}
}()

wg.Wait()

err = cmd.Wait()

var usage *CmdUsage
if cmd.ProcessState != nil {
usage = &CmdUsage{
Expand All @@ -101,39 +67,43 @@ func RunAndLogLines(cmd *exec.Cmd, logLabels map[string]string) (*CmdUsage, erro
return usage, err
}

func proxyJSONLogs(r io.ReadCloser, logEntry *log.Entry) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
var line interface{}
if err := json.Unmarshal([]byte(scanner.Text()), &line); err != nil {
logEntry.Debugf("unmarshal json log line: %v", err)
// fall back to using the logger
logEntry.Info(scanner.Text())
continue
}
logMap, ok := line.(map[string]interface{})
if !ok {
logEntry.Debugf("json log line not map[string]interface{}: %v", line)
// fall back to using the logger
logEntry.Info(scanner.Text())
continue
}
type proxyJSONLogger struct {
*log.Entry

for k, v := range logEntry.Data {
logMap[k] = v
}
logLine, err := json.Marshal(logMap)
if err != nil {
logEntry.Debugf("marshal json log line: %v", err)
// fall back to using the logger
logEntry.Info(scanner.Text())
continue
buf []byte
}

func (pj *proxyJSONLogger) Write(p []byte) (n int, err error) {
pj.buf = append(pj.buf, p...)

var line interface{}
err = json.Unmarshal(pj.buf, &line)
if err != nil {
if err.Error() == "unexpected end of JSON input" {
return len(p), nil
}
// Mark this log entry as one that is json that needs to be proxied
logEntry := logEntry.WithField(app.ProxyJsonLogKey, true)
// Log the line via the same centralized logger; the formatter should make sure it's "proxied"
logEntry.Log(log.FatalLevel, string(logLine))
return len(p), err
}

logMap, ok := line.(map[string]interface{})
if !ok {
pj.Debugf("json log line not map[string]interface{}: %v", line)
// fall back to using the logger
pj.Info(string(p))
return len(p), err
}

for k, v := range pj.Data {
logMap[k] = v
}

logLine, _ := json.Marshal(logMap)

logEntry := pj.WithField(app.ProxyJsonLogKey, true)

logEntry.Log(log.FatalLevel, string(logLine))

return len(p), nil
}

func Output(cmd *exec.Cmd) (output []byte, err error) {
Expand Down
95 changes: 95 additions & 0 deletions pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package executor

import (
"bytes"
"io"
"math/rand"
"os"
"os/exec"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flant/shell-operator/pkg/app"
)

func TestRunAndLogLines(t *testing.T) {
var buf bytes.Buffer
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(&buf)

t.Run("simple log", func(t *testing.T) {
app.LogProxyHookJSON = true
// time="2023-07-10T18:13:42+04:00" level=fatal msg="{\"a\":\"b\",\"foo\":\"baz\",\"output\":\"stdout\"}" a=b output=stdout proxyJsonLog=true
cmd := exec.Command("echo", `{"foo": "baz"}`)
_, err := RunAndLogLines(cmd, map[string]string{"a": "b"})
assert.NoError(t, err)
assert.Contains(t, buf.String(), `level=fatal msg="{\"a\":\"b\",\"foo\":\"baz\",\"output\":\"stdout\"}" a=b output=stdout proxyJsonLog=true`)

buf.Reset()
})

t.Run("not json log", func(t *testing.T) {
app.LogProxyHookJSON = false
// time="2023-07-10T18:14:25+04:00" level=info msg=foobar a=b output=stdout
cmd := exec.Command("echo", `foobar`)
_, err := RunAndLogLines(cmd, map[string]string{"a": "b"})
time.Sleep(100 * time.Millisecond)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `level=info msg=foobar a=b output=stdout`)

buf.Reset()
})

t.Run("long file", func(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), "testjson-*.json")
require.NoError(t, err)
defer os.RemoveAll(f.Name())

_, _ = io.WriteString(f, `{"foo": "`+randStringRunes(1024*1024)+`"}`)

app.LogProxyHookJSON = true
cmd := exec.Command("cat", f.Name())
_, err = RunAndLogLines(cmd, map[string]string{"a": "b"})
assert.NoError(t, err)
assert.Contains(t, buf.String(), `\",\"output\":\"stdout\"}" a=b output=stdout proxyJsonLog=true`)

buf.Reset()
})

t.Run("invalid json structure", func(t *testing.T) {
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `["a","b","c"]`)
_, err := RunAndLogLines(cmd, map[string]string{"a": "b"})
assert.NoError(t, err)
assert.Contains(t, buf.String(), `level=debug msg="json log line not map[string]interface{}: [a b c]" a=b output=stdout`)

buf.Reset()
})

t.Run("multiline", func(t *testing.T) {
app.LogProxyHookJSON = true
cmd := exec.Command("echo", `
{"a":"b",
"c":"d"}
`)
_, err := RunAndLogLines(cmd, map[string]string{"foor": "baar"})
assert.NoError(t, err)
assert.Contains(t, buf.String(), `msg="{\"a\":\"b\",\"c\":\"d\",\"foor\":\"baar\",\"output\":\"stdout\"}" foor=baar output=stdout proxyJsonLog=true`)

buf.Reset()
})
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

0 comments on commit 97a2b07

Please sign in to comment.