Skip to content

Commit

Permalink
Triage really long logs issues
Browse files Browse the repository at this point in the history
  • Loading branch information
tateexon committed Dec 13, 2023
1 parent e9f7824 commit 8beebb0
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 164 deletions.
41 changes: 34 additions & 7 deletions k8s/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bufio"
"bytes"
"context"
"io"
"os/exec"
Expand All @@ -20,14 +21,40 @@ func ExecCmdWithContext(ctx context.Context, command string) error {
})
}

// readStdPipe continuously read a pipe from the command
// readStdPipe continuously reads from a given pipe (either stdout or stderr)
// and processes the output line by line using the provided outputFunction.
// It handles lines of any length dynamically without the need for a large predefined buffer.
func readStdPipe(pipe io.ReadCloser, outputFunction func(string)) {
scanner := bufio.NewScanner(pipe)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
if outputFunction != nil {
outputFunction(m)
reader := bufio.NewReader(pipe)
var output []rune

for {
// ReadLine tries to return a single line, not including the end-of-line bytes.
// The returned line may be incomplete if the line's too long for the buffer.
// isPrefix will be true if the line is longer than the buffer.
chunk, isPrefix, err := reader.ReadLine()

// Handle any errors that occurred during the read.
if err != nil {
// Log any error that's not an EOF (end of file).
if err != io.EOF {
log.Warn().Err(err).Msg("Error while reading standard pipe, this can be caused by really long logs and can be ignored if nothing else is wrong.")
}
break
}

// Append the chunk to the output buffer.
// bytes.Runes converts the byte slice to a slice of runes, handling multi-byte characters.
output = append(output, bytes.Runes(chunk)...)

// If isPrefix is false, we've reached the end of the line and can process it.
if !isPrefix {
// Call the output function with the complete line if it's defined.
if outputFunction != nil {
outputFunction(string(output))
}
// Reset output to an empty slice for reading the next line.
output = output[:0]
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions k8s/client/cmd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package client

import (
"io"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

func TestReadStdPipeWithLongString(t *testing.T) {
// Create a string with a million characters ('a').
longString := strings.Repeat("a", 1000000)

// Use an io.Pipe to simulate the stdout or stderr pipe.
reader, writer := io.Pipe()

// Channel to communicate errors from the writing goroutine.
errChan := make(chan error, 1)

// Write the long string to the pipe in a goroutine.
go func() {
_, err := writer.Write([]byte(longString))
if err != nil {
// Send any errors to the main test goroutine via the channel.
errChan <- err
}
writer.Close()
errChan <- nil // Send nil to indicate successful write.
}()

// Variable to store the output from the readStdPipe function.
var output string
outputFunction := func(s string) {
output = s
}

// Call the readStdPipe function with the reader part of the pipe.
readStdPipe(reader, outputFunction)

// Check for errors from the write goroutine.
err := <-errChan
require.NoError(t, err, "Failed to write to pipe")
require.Equal(t, longString, output, "Output did not match the input long string")
}
16 changes: 16 additions & 0 deletions k8s/e2e/common/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -446,3 +447,18 @@ func TestRunTimeout(t *testing.T) {
err = e.Run()
require.Error(t, err)
}

func TestReallyLongLogs(t *testing.T) {
t.Parallel()
l := logging.GetTestLogger(t)
testEnvConfig := GetTestEnvConfig(t)
val, _ := os.LookupEnv(config.EnvVarJobImage)
if val != "" {
env := environment.New(testEnvConfig)
err := env.Run()
require.NoError(t, err)
}
s := strings.Repeat("a", 500000)
// this shouldn't hang
l.Info().Int("len", len(s)).Str("string", s).Msg("string")
}
4 changes: 4 additions & 0 deletions k8s/e2e/local-runner/envs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ func TestReplaceHelm(t *testing.T) {
func TestRunTimeout(t *testing.T) {
common.TestRunTimeout(t)
}

func TestReallyLongLogs(t *testing.T) {
common.TestReallyLongLogs(t)
}
Loading

0 comments on commit 8beebb0

Please sign in to comment.