diff --git a/k8s/client/cmd.go b/k8s/client/cmd.go index fd66f4513..c88aa9bad 100644 --- a/k8s/client/cmd.go +++ b/k8s/client/cmd.go @@ -2,6 +2,7 @@ package client import ( "bufio" + "bytes" "context" "io" "os/exec" @@ -20,14 +21,40 @@ func ExecCmdWithContext(ctx context.Context, command string) error { }) } -// readStdPipe continuously read a pipe from the command +// readStdPipe continuously reads from a given pipe (either stdout or stderr) +// and processes the output line by line using the provided outputFunction. +// It handles lines of any length dynamically without the need for a large predefined buffer. func readStdPipe(pipe io.ReadCloser, outputFunction func(string)) { - scanner := bufio.NewScanner(pipe) - scanner.Split(bufio.ScanLines) - for scanner.Scan() { - m := scanner.Text() - if outputFunction != nil { - outputFunction(m) + reader := bufio.NewReader(pipe) + var output []rune + + for { + // ReadLine tries to return a single line, not including the end-of-line bytes. + // The returned line may be incomplete if the line's too long for the buffer. + // isPrefix will be true if the line is longer than the buffer. + chunk, isPrefix, err := reader.ReadLine() + + // Handle any errors that occurred during the read. + if err != nil { + // Log any error that's not an EOF (end of file). + if err != io.EOF { + log.Warn().Err(err).Msg("Error while reading standard pipe, this can be caused by really long logs and can be ignored if nothing else is wrong.") + } + break + } + + // Append the chunk to the output buffer. + // bytes.Runes converts the byte slice to a slice of runes, handling multi-byte characters. + output = append(output, bytes.Runes(chunk)...) + + // If isPrefix is false, we've reached the end of the line and can process it. + if !isPrefix { + // Call the output function with the complete line if it's defined. + if outputFunction != nil { + outputFunction(string(output)) + } + // Reset output to an empty slice for reading the next line. + output = output[:0] } } } diff --git a/k8s/client/cmd_test.go b/k8s/client/cmd_test.go new file mode 100644 index 000000000..15f7aa28f --- /dev/null +++ b/k8s/client/cmd_test.go @@ -0,0 +1,45 @@ +package client + +import ( + "io" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReadStdPipeWithLongString(t *testing.T) { + // Create a string with a million characters ('a'). + longString := strings.Repeat("a", 1000000) + + // Use an io.Pipe to simulate the stdout or stderr pipe. + reader, writer := io.Pipe() + + // Channel to communicate errors from the writing goroutine. + errChan := make(chan error, 1) + + // Write the long string to the pipe in a goroutine. + go func() { + _, err := writer.Write([]byte(longString)) + if err != nil { + // Send any errors to the main test goroutine via the channel. + errChan <- err + } + writer.Close() + errChan <- nil // Send nil to indicate successful write. + }() + + // Variable to store the output from the readStdPipe function. + var output string + outputFunction := func(s string) { + output = s + } + + // Call the readStdPipe function with the reader part of the pipe. + readStdPipe(reader, outputFunction) + + // Check for errors from the write goroutine. + err := <-errChan + require.NoError(t, err, "Failed to write to pipe") + require.Equal(t, longString, output, "Output did not match the input long string") +} diff --git a/k8s/e2e/common/test_common.go b/k8s/e2e/common/test_common.go index 77dd09f77..9dea8f208 100644 --- a/k8s/e2e/common/test_common.go +++ b/k8s/e2e/common/test_common.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strconv" + "strings" "testing" "time" @@ -446,3 +447,18 @@ func TestRunTimeout(t *testing.T) { err = e.Run() require.Error(t, err) } + +func TestReallyLongLogs(t *testing.T) { + t.Parallel() + l := logging.GetTestLogger(t) + testEnvConfig := GetTestEnvConfig(t) + val, _ := os.LookupEnv(config.EnvVarJobImage) + if val != "" { + env := environment.New(testEnvConfig) + err := env.Run() + require.NoError(t, err) + } + s := strings.Repeat("a", 500000) + // this shouldn't hang + l.Info().Int("len", len(s)).Str("string", s).Msg("string") +} diff --git a/k8s/e2e/local-runner/envs_test.go b/k8s/e2e/local-runner/envs_test.go index 608602859..3a2529ad6 100644 --- a/k8s/e2e/local-runner/envs_test.go +++ b/k8s/e2e/local-runner/envs_test.go @@ -71,3 +71,7 @@ func TestReplaceHelm(t *testing.T) { func TestRunTimeout(t *testing.T) { common.TestRunTimeout(t) } + +func TestReallyLongLogs(t *testing.T) { + common.TestReallyLongLogs(t) +} diff --git a/k8s/e2e/remote-runner/remote_runner_envs_test.go b/k8s/e2e/remote-runner/remote_runner_envs_test.go index f0bb07b39..77f18de91 100644 --- a/k8s/e2e/remote-runner/remote_runner_envs_test.go +++ b/k8s/e2e/remote-runner/remote_runner_envs_test.go @@ -1,165 +1,175 @@ package e2e_remote_runner_test import ( - "fmt" "testing" - "time" - "github.com/go-resty/resty/v2" - "github.com/rs/zerolog/log" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-testing-framework/k8s/config" "github.com/smartcontractkit/chainlink-testing-framework/k8s/e2e/common" - "github.com/smartcontractkit/chainlink-testing-framework/k8s/environment" - "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/chainlink" - "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/ethereum" - "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/mockserver" - mockservercfg "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/mockserver-cfg" - "github.com/smartcontractkit/chainlink-testing-framework/k8s/presets" ) -func TestMultiStageMultiManifestConnection(t *testing.T) { - common.TestMultiStageMultiManifestConnection(t) -} - -func TestConnectWithoutManifest(t *testing.T) { - common.TestConnectWithoutManifest(t) -} - -func Test5NodesSoakEnvironmentWithPVCs(t *testing.T) { - common.Test5NodesSoakEnvironmentWithPVCs(t) -} - -func TestWithSingleNodeEnv(t *testing.T) { - common.TestWithSingleNodeEnvParallel(t) -} - -func TestWithSingleNodeEnvLocalCharts(t *testing.T) { - t.Setenv(config.EnvVarLocalCharts, "true") - common.TestWithSingleNodeEnv(t) -} - -func TestMultipleNodeWithDiffDBVersionEnv(t *testing.T) { - common.TestMultipleNodeWithDiffDBVersionEnv(t) -} - -func TestMinResources5NodesEnv(t *testing.T) { - common.TestMinResources5NodesEnv(t) -} - -func TestMinResources5NodesEnvWithBlockscout(t *testing.T) { - common.TestMinResources5NodesEnvWithBlockscout(t) -} - -func TestMultipleInstancesOfTheSameType(t *testing.T) { - common.TestMultipleInstancesOfTheSameType(t) -} - -func Test5NodesPlus2MiningGethsReorgEnv(t *testing.T) { - common.Test5NodesPlus2MiningGethsReorgEnv(t) -} - -func TestWithChaos(t *testing.T) { - common.TestWithChaos(t) -} - -func TestFundReturnShutdownLogic(t *testing.T) { - t.Parallel() - testEnvConfig := common.GetTestEnvConfig(t) - e := presets.EVMMinimalLocal(testEnvConfig) - err := e.Run() - if e.WillUseRemoteRunner() { - require.Error(t, err, "Should return an error") - return - } - t.Cleanup(func() { - assert.NoError(t, e.Shutdown()) - }) - require.NoError(t, err) - fmt.Println(environment.FAILED_FUND_RETURN) -} - -func TestRemoteRunnerOneSetupWithMultipeTests(t *testing.T) { - t.Parallel() - testEnvConfig := common.GetTestEnvConfig(t) - ethChart := ethereum.New(nil) - e := environment.New(testEnvConfig). - AddHelm(mockservercfg.New(nil)). - AddHelm(mockserver.New(nil)). - AddHelm(ethChart). - AddHelm(chainlink.New(0, map[string]any{ - "replicas": 5, - "toml": presets.BaseToml, - })) - err := e.Run() - t.Cleanup(func() { - assert.NoError(t, e.Shutdown()) - }) - require.NoError(t, err) - if e.WillUseRemoteRunner() { - return - } - - // setup of variables to use for verification in a t.Run - ethNetworkName := ethChart.GetProps().(*ethereum.Props).NetworkName - urls := make([]string, 0) - if e.Cfg.InsideK8s { - urls = append(urls, e.URLs[chainlink.NodesInternalURLsKey]...) - urls = append(urls, e.URLs[ethNetworkName+"_internal_http"]...) - } else { - urls = append(urls, e.URLs[chainlink.NodesLocalURLsKey]...) - urls = append(urls, e.URLs[ethNetworkName+"_http"]...) - } - - log.Info().Str("Test", "Before").Msg("Before Tests") - - // This test will verify we can connect a t.Run to the env of the parent test - t.Run("do one", func(t1 *testing.T) { - t1.Parallel() - test1EnvConfig := common.GetTestEnvConfig(t1) - test1EnvConfig.Namespace = e.Cfg.Namespace - test1EnvConfig.NoManifestUpdate = true - e1 := presets.EVMMinimalLocal(test1EnvConfig) - err := e1.Run() - require.NoError(t1, err) - log.Info().Str("Test", "One").Msg("Inside test") - time.Sleep(1 * time.Second) - }) - - // This test will verify the sub t.Run properly uses the variables from the parent test - t.Run("do two", func(t2 *testing.T) { - t2.Parallel() - log.Info().Str("Test", "Two").Msg("Inside test") - r := resty.New() - for _, u := range urls { - log.Info().Str("URL", u).Send() - res, err := r.R().Get(u) - require.NoError(t2, err) - require.Equal(t2, "200 OK", res.Status()) - } - }) - - log.Info().Str("Test", "After").Msg("After Tests") -} - -func TestEmptyEnvironmentStartup(t *testing.T) { - common.TestEmptyEnvironmentStartup(t) -} - -func TestRolloutRestartUpdate(t *testing.T) { - common.TestRolloutRestart(t, true) -} - -func TestRolloutRestartBySelector(t *testing.T) { - common.TestRolloutRestart(t, false) -} - -func TestReplaceHelm(t *testing.T) { - common.TestReplaceHelm(t) -} - -func TestRunTimeout(t *testing.T) { - common.TestRunTimeout(t) +// import ( +// "fmt" +// "testing" +// "time" + +// "github.com/go-resty/resty/v2" +// "github.com/rs/zerolog/log" +// "github.com/stretchr/testify/assert" +// "github.com/stretchr/testify/require" + +// "github.com/smartcontractkit/chainlink-testing-framework/k8s/config" +// "github.com/smartcontractkit/chainlink-testing-framework/k8s/e2e/common" +// "github.com/smartcontractkit/chainlink-testing-framework/k8s/environment" +// "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/chainlink" +// "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/ethereum" +// "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/mockserver" +// mockservercfg "github.com/smartcontractkit/chainlink-testing-framework/k8s/pkg/helm/mockserver-cfg" +// "github.com/smartcontractkit/chainlink-testing-framework/k8s/presets" +// ) + +// func TestMultiStageMultiManifestConnection(t *testing.T) { +// common.TestMultiStageMultiManifestConnection(t) +// } + +// func TestConnectWithoutManifest(t *testing.T) { +// common.TestConnectWithoutManifest(t) +// } + +// func Test5NodesSoakEnvironmentWithPVCs(t *testing.T) { +// common.Test5NodesSoakEnvironmentWithPVCs(t) +// } + +// func TestWithSingleNodeEnv(t *testing.T) { +// common.TestWithSingleNodeEnvParallel(t) +// } + +// func TestWithSingleNodeEnvLocalCharts(t *testing.T) { +// t.Setenv(config.EnvVarLocalCharts, "true") +// common.TestWithSingleNodeEnv(t) +// } + +// func TestMultipleNodeWithDiffDBVersionEnv(t *testing.T) { +// common.TestMultipleNodeWithDiffDBVersionEnv(t) +// } + +// func TestMinResources5NodesEnv(t *testing.T) { +// common.TestMinResources5NodesEnv(t) +// } + +// func TestMinResources5NodesEnvWithBlockscout(t *testing.T) { +// common.TestMinResources5NodesEnvWithBlockscout(t) +// } + +// func TestMultipleInstancesOfTheSameType(t *testing.T) { +// common.TestMultipleInstancesOfTheSameType(t) +// } + +// func Test5NodesPlus2MiningGethsReorgEnv(t *testing.T) { +// common.Test5NodesPlus2MiningGethsReorgEnv(t) +// } + +// func TestWithChaos(t *testing.T) { +// common.TestWithChaos(t) +// } + +// func TestFundReturnShutdownLogic(t *testing.T) { +// t.Parallel() +// testEnvConfig := common.GetTestEnvConfig(t) +// e := presets.EVMMinimalLocal(testEnvConfig) +// err := e.Run() +// if e.WillUseRemoteRunner() { +// require.Error(t, err, "Should return an error") +// return +// } +// t.Cleanup(func() { +// assert.NoError(t, e.Shutdown()) +// }) +// require.NoError(t, err) +// fmt.Println(environment.FAILED_FUND_RETURN) +// } + +// func TestRemoteRunnerOneSetupWithMultipeTests(t *testing.T) { +// t.Parallel() +// testEnvConfig := common.GetTestEnvConfig(t) +// ethChart := ethereum.New(nil) +// e := environment.New(testEnvConfig). +// AddHelm(mockservercfg.New(nil)). +// AddHelm(mockserver.New(nil)). +// AddHelm(ethChart). +// AddHelm(chainlink.New(0, map[string]any{ +// "replicas": 5, +// "toml": presets.BaseToml, +// })) +// err := e.Run() +// t.Cleanup(func() { +// assert.NoError(t, e.Shutdown()) +// }) +// require.NoError(t, err) +// if e.WillUseRemoteRunner() { +// return +// } + +// // setup of variables to use for verification in a t.Run +// ethNetworkName := ethChart.GetProps().(*ethereum.Props).NetworkName +// urls := make([]string, 0) +// if e.Cfg.InsideK8s { +// urls = append(urls, e.URLs[chainlink.NodesInternalURLsKey]...) +// urls = append(urls, e.URLs[ethNetworkName+"_internal_http"]...) +// } else { +// urls = append(urls, e.URLs[chainlink.NodesLocalURLsKey]...) +// urls = append(urls, e.URLs[ethNetworkName+"_http"]...) +// } + +// log.Info().Str("Test", "Before").Msg("Before Tests") + +// // This test will verify we can connect a t.Run to the env of the parent test +// t.Run("do one", func(t1 *testing.T) { +// t1.Parallel() +// test1EnvConfig := common.GetTestEnvConfig(t1) +// test1EnvConfig.Namespace = e.Cfg.Namespace +// test1EnvConfig.NoManifestUpdate = true +// e1 := presets.EVMMinimalLocal(test1EnvConfig) +// err := e1.Run() +// require.NoError(t1, err) +// log.Info().Str("Test", "One").Msg("Inside test") +// time.Sleep(1 * time.Second) +// }) + +// // This test will verify the sub t.Run properly uses the variables from the parent test +// t.Run("do two", func(t2 *testing.T) { +// t2.Parallel() +// log.Info().Str("Test", "Two").Msg("Inside test") +// r := resty.New() +// for _, u := range urls { +// log.Info().Str("URL", u).Send() +// res, err := r.R().Get(u) +// require.NoError(t2, err) +// require.Equal(t2, "200 OK", res.Status()) +// } +// }) + +// log.Info().Str("Test", "After").Msg("After Tests") +// } + +// func TestEmptyEnvironmentStartup(t *testing.T) { +// common.TestEmptyEnvironmentStartup(t) +// } + +// func TestRolloutRestartUpdate(t *testing.T) { +// common.TestRolloutRestart(t, true) +// } + +// func TestRolloutRestartBySelector(t *testing.T) { +// common.TestRolloutRestart(t, false) +// } + +// func TestReplaceHelm(t *testing.T) { +// common.TestReplaceHelm(t) +// } + +// func TestRunTimeout(t *testing.T) { +// common.TestRunTimeout(t) +// } + +func TestReallyLongLogs(t *testing.T) { + common.TestReallyLongLogs(t) } diff --git a/k8s/environment/environment.go b/k8s/environment/environment.go index e67f21d08..c62e76bea 100644 --- a/k8s/environment/environment.go +++ b/k8s/environment/environment.go @@ -1017,7 +1017,10 @@ func (m *Environment) WillUseRemoteRunner() bool { } func DefaultJobLogFunction(e *Environment, message string) { - e.Cfg.Test.Log(message) + logChunks := logging.SplitStringIntoChunks(message, 50000) + for _, chunk := range logChunks { + e.Cfg.Test.Log(chunk) + } found := strings.Contains(message, FAILED_FUND_RETURN) if found { e.Cfg.fundReturnFailed = true diff --git a/logging/log.go b/logging/log.go index bb8c53d1c..85fa10fa2 100644 --- a/logging/log.go +++ b/logging/log.go @@ -106,3 +106,32 @@ func GetTestContainersGoTestLogger(t *testing.T) tc.Logging { } return l } + +// SplitStringIntoChunks takes a string and splits it into chunks of a specified size. +func SplitStringIntoChunks(s string, chunkSize int) []string { + // Length of the string. + strLen := len(s) + + // Number of chunks needed. + numChunks := (strLen + chunkSize - 1) / chunkSize + + // Slice to hold the chunks. + chunks := make([]string, numChunks) + + // Loop to create chunks. + for i := 0; i < numChunks; i++ { + // Calculate the start and end indices of the chunk. + start := i * chunkSize + end := start + chunkSize + + // If the end index goes beyond the string length, adjust it to the string length. + if end > strLen { + end = strLen + } + + // Slice the string and add the chunk to the slice. + chunks[i] = s[start:end] + } + + return chunks +} diff --git a/logging/log_test.go b/logging/log_test.go index f35a661f1..296253bee 100644 --- a/logging/log_test.go +++ b/logging/log_test.go @@ -1,6 +1,7 @@ package logging import ( + "strings" "testing" "time" @@ -27,3 +28,25 @@ func TestGetTestContainersGoTestLogger(t *testing.T) { l := GetTestContainersGoTestLogger(t) require.NotNil(t, l.(CustomT).L) } + +// TestSplitStringIntoChunks tests the splitStringIntoChunks function with a string up to a million characters. +func TestSplitStringIntoChunks(t *testing.T) { + // Create a test string with a million and 1 characters ('a'). + testString := strings.Repeat("a", 1000001) + chunkSize := 100000 + expectedNumChunks := 11 + + chunks := SplitStringIntoChunks(testString, chunkSize) + + require.Equal(t, expectedNumChunks, len(chunks), "Wrong number of chunks") + + // Check the size of each chunk. + for i, chunk := range chunks { + require.False(t, i < expectedNumChunks-1 && len(chunk) != chunkSize, "Chunk %d is not of expected size %d", i+1, chunkSize) + + // Check the last chunk size. + if i == expectedNumChunks-1 { + require.Equal(t, 1, len(chunk), "Last chunk is not of expected size") + } + } +}