Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove closeChannelWhenEmpty busy loop #2594

Merged
merged 13 commits into from
Nov 10, 2024
6 changes: 2 additions & 4 deletions cli/cli/commands/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/kurtosis-tech/kurtosis/cli/cli/command_framework/lowlevel/flags"
"github.com/kurtosis-tech/kurtosis/cli/cli/command_str_consts"
"github.com/kurtosis-tech/kurtosis/cli/cli/helpers/multi_os_command_executor"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/user_support_constants"
"github.com/kurtosis-tech/kurtosis/contexts-config-store/store"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
)

var WebCmd = &lowlevel.LowlevelKurtosisCommand{
Expand All @@ -35,9 +35,7 @@ func run(_ context.Context, _ *flags.ParsedFlags, _ *args.ParsedArgs) error {
return stacktrace.Propagate(err, "tried fetching the current Kurtosis context but failed, we can't switch clusters without this information. This is a bug in Kurtosis")
}
if store.IsRemote(currentKurtosisContext) {
if err := multi_os_command_executor.OpenFile(user_support_constants.KurtosisCloudLink); err != nil {
return stacktrace.Propagate(err, "An error occurred while opening the Kurtosis Cloud Web UI")
}
logrus.Warn("Kurtosis Cloud has been deprecated. Switch to a local kurtosis context to use the local Kurtosis web UI instead.")
}

if err := multi_os_command_executor.OpenFile(webUILink); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const (
FeedbackEmail = "feedback@" + OldDomain
FeedbackEmailLink = "mailto:" + FeedbackEmail
KurtosisTechTwitterProfileLink = "https://twitter.com/KurtosisTech"
KurtosisCloudLink = "https://cloud." + Domain

// If you add new URLs above, make sure to add them to the urlsToValidateInTest below!!!
// WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING
Expand All @@ -56,6 +55,5 @@ var urlsToValidateInTest = []string{
KurtosisDiscordUrl,
KurtosisOnBoardCalendlyUrl,
HowImportWorksLink,
KurtosisCloudLink,
KurtosisTechTwitterProfileLink,
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,13 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs(

// this go routine handles the stream cancellation
go func() {
//wait for stream go routine to end
// wait for stream go routine to end
wgSenders.Wait()

// send all buffered log lines
// flush should send remainder of logs in the buffer to the channel to be read
logLineSender.Flush()

// wait until the channel has been fully read/empty before closing it
closeChannelWhenEmpty(logsByKurtosisUserServiceUuidChan)
close(logsByKurtosisUserServiceUuidChan)
close(streamErrChan)

//then cancel the context
Expand Down Expand Up @@ -174,12 +173,3 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
shouldReturnAllLogs,
numLogLines)
}

func closeChannelWhenEmpty(logsChan chan map[service.ServiceUUID][]logline.LogLine) {
for {
if len(logsChan) == 0 {
close(logsChan)
return
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,13 @@ func executeStreamCallAndGetReceivedServiceLogLines(
case <-time.Tick(testTimeOut):
return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut)
case streamErr, isChanOpen := <-errChan:
if !isChanOpen {
if !isChanOpen && len(userServiceLogsByUuidChan) == 0 {
shouldReceiveStream = false
break
}
return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.")
if isChanOpen && streamErr != nil {
return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.")
}
case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan:
if !isChanOpen {
shouldReceiveStream = false
Expand Down
29 changes: 16 additions & 13 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@ package main
import (
"context"
"fmt"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout"
"io/fs"
"math"
"net"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"time"

"github.com/kurtosis-tech/kurtosis/api/golang/core/kurtosis_core_rpc_api_bindings"
"github.com/kurtosis-tech/kurtosis/api/golang/engine/kurtosis_engine_rpc_api_bindings/kurtosis_engine_rpc_api_bindingsconnect"
enclaveApi "github.com/kurtosis-tech/kurtosis/api/golang/http_rest/server/core_rest_api"
Expand All @@ -39,6 +27,7 @@ import (
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/kurtosis_backend"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy"
Expand All @@ -57,6 +46,17 @@ import (
echomiddleware "github.com/labstack/echo/v4/middleware"
"github.com/rs/cors"
"github.com/sirupsen/logrus"
"io/fs"
"math"
"net"
"net/http"
_ "net/http/pprof"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"time"
)

const (
Expand All @@ -73,6 +73,7 @@ const (
functionPathSeparator = "."
emptyFunctionName = ""
webappPortAddr = ":9711"
pprofPath = "/debug/pprof/"

remoteBackendConfigFilename = "remote_backend_config.json"
pathToStaticFolder = "/run/webapp"
Expand Down Expand Up @@ -213,8 +214,9 @@ func runMain() error {
}
logrus.Debugf("Created environment js file with content: \n%s", envJsFileContent)

handler := http.NewServeMux()
fileServer := http.FileServer(http.Dir(pathToStaticFolder))
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
path, err := filepath.Abs(r.URL.Path)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -236,6 +238,7 @@ func runMain() error {
w.Header().Add("Cache-Control", "no-store")
fileServer.ServeHTTP(w, r)
})
handler.Handle(pprofPath, http.HandlerFunc(http.DefaultServeMux.ServeHTTP))

err := http.ListenAndServe(webappPortAddr, handler)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions engine/server/engine/server/engine_connect_server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,10 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
logrus.Debug("Exiting the stream because an error from the logs database client was received through the error chan.")
return stacktrace.Propagate(err, "An error occurred streaming user service logs.")
}
logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan")
return nil
if len(serviceLogsByServiceUuidChan) == 0 {
logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan")
return nil
}
}
}
}
Expand Down
Loading