Skip to content

Commit

Permalink
fix: remove closeChannelWhenEmpty busy loop (#2594)
Browse files Browse the repository at this point in the history
## Description
Addresses #2593 by
getting rid of the busy loop entirely. The loop was checking to make
sure all values from the channel were read before closing the channel.
When following logs, if a stream is cancelled before reading all logs in
the channel, it loops infinitely. This changes the consumer to loop
until it's read everything from the channel, even after its been closed.

 Also, adds a port for pprof to help with profiling in the future. 

## Is this change user facing?
YES 

## References 
#2593
  • Loading branch information
tedim52 authored Nov 10, 2024
1 parent 28582cc commit 7f99970
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 35 deletions.
6 changes: 2 additions & 4 deletions cli/cli/commands/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/kurtosis-tech/kurtosis/cli/cli/command_framework/lowlevel/flags"
"github.com/kurtosis-tech/kurtosis/cli/cli/command_str_consts"
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/multi_os_command_executor"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/user_support_constants"
"github.com/kurtosis-tech/kurtosis/contexts-config-store/store"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
)

var WebCmd = &lowlevel.LowlevelKurtosisCommand{
Expand All @@ -35,9 +35,7 @@ func run(_ context.Context, _ *flags.ParsedFlags, _ *args.ParsedArgs) error {
return stacktrace.Propagate(err, "tried fetching the current Kurtosis context but failed, we can't switch clusters without this information. This is a bug in Kurtosis")
}
if store.IsRemote(currentKurtosisContext) {
if err := multi_os_command_executor.OpenFile(user_support_constants.KurtosisCloudLink); err != nil {
return stacktrace.Propagate(err, "An error occurred while opening the Kurtosis Cloud Web UI")
}
logrus.Warn("Kurtosis Cloud has been deprecated. Switch to a local kurtosis context to use the local Kurtosis web UI instead.")
}

if err := multi_os_command_executor.OpenFile(webUILink); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (
FeedbackEmail = "feedback@" + OldDomain
FeedbackEmailLink = "mailto:" + FeedbackEmail
KurtosisTechTwitterProfileLink = "https://twitter.com/KurtosisTech"
KurtosisCloudLink = "https://cloud." + Domain

// If you add new URLs above, make sure to add them to the urlsToValidateInTest below!!!
// WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING
Expand All @@ -56,6 +55,5 @@ var urlsToValidateInTest = []string{
KurtosisDiscordUrl,
KurtosisOnBoardCalendlyUrl,
HowImportWorksLink,
KurtosisCloudLink,
KurtosisTechTwitterProfileLink,
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,13 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs(

// this go routine handles the stream cancellation
go func() {
//wait for stream go routine to end
// wait for stream go routine to end
wgSenders.Wait()

// send all buffered log lines
// flush should send remainder of logs in the buffer to the channel to be read
logLineSender.Flush()

// wait until the channel has been fully read/empty before closing it
closeChannelWhenEmpty(logsByKurtosisUserServiceUuidChan)
close(logsByKurtosisUserServiceUuidChan)
close(streamErrChan)

//then cancel the context
Expand Down Expand Up @@ -174,12 +173,3 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
shouldReturnAllLogs,
numLogLines)
}

func closeChannelWhenEmpty(logsChan chan map[service.ServiceUUID][]logline.LogLine) {
for {
if len(logsChan) == 0 {
close(logsChan)
return
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,13 @@ func executeStreamCallAndGetReceivedServiceLogLines(
case <-time.Tick(testTimeOut):
return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut)
case streamErr, isChanOpen := <-errChan:
if !isChanOpen {
if !isChanOpen && len(userServiceLogsByUuidChan) == 0 {
shouldReceiveStream = false
break
}
return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.")
if isChanOpen && streamErr != nil {
return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.")
}
case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan:
if !isChanOpen {
shouldReceiveStream = false
Expand Down
24 changes: 12 additions & 12 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@ package main
import (
"context"
"fmt"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout"
"io/fs"
"math"
"net"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"time"

"github.com/kurtosis-tech/kurtosis/api/golang/core/kurtosis_core_rpc_api_bindings"
"github.com/kurtosis-tech/kurtosis/api/golang/engine/kurtosis_engine_rpc_api_bindings/kurtosis_engine_rpc_api_bindingsconnect"
enclaveApi "github.com/kurtosis-tech/kurtosis/api/golang/http_rest/server/core_rest_api"
Expand All @@ -39,6 +27,7 @@ import (
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/kurtosis_backend"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy"
Expand All @@ -57,6 +46,17 @@ import (
echomiddleware "github.com/labstack/echo/v4/middleware"
"github.com/rs/cors"
"github.com/sirupsen/logrus"
"io/fs"
"math"
"net"
"net/http"
_ "net/http/pprof"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"time"
)

const (
Expand Down
6 changes: 4 additions & 2 deletions engine/server/engine/server/engine_connect_server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,10 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
logrus.Debug("Exiting the stream because an error from the logs database client was received through the error chan.")
return stacktrace.Propagate(err, "An error occurred streaming user service logs.")
}
logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan")
return nil
if len(serviceLogsByServiceUuidChan) == 0 {
logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan")
return nil
}
}
}
}
Expand Down

0 comments on commit 7f99970

Please sign in to comment.