Skip to content

Commit

Permalink
fix: support logs db for k8s (#1864)
Browse files Browse the repository at this point in the history
## Description:
<!-- Describe this change, how it works, and the motivation behind it.
-->

## Is this change user facing?
YES/NO
<!-- If yes, please add the "user facing" label to the PR -->
<!-- If yes, don't forget to include docs changes where relevant -->

## References (if applicable):
<!-- Add relevant Github Issues, Discord threads, or other helpful
information. -->
  • Loading branch information
tedim52 authored Nov 29, 2023
1 parent e83f266 commit 8afa9c7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 68 deletions.
34 changes: 21 additions & 13 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
em_api "github.com/kurtosis-tech/kurtosis/enclave-manager/server"
"github.com/kurtosis-tech/kurtosis/engine/launcher/args"
"github.com/kurtosis-tech/kurtosis/engine/launcher/args/kurtosis_backend_config"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/kurtosis_backend"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock"
Expand Down Expand Up @@ -140,19 +142,11 @@ func runMain() error {
return stacktrace.Propagate(err, "An error occurred getting the Kurtosis backend for backend type '%v' and config '%+v'", serverArgs.KurtosisBackendType, backendConfig)
}

logsDatabaseClient := getLogsDatabaseClient(serverArgs.KurtosisBackendType, kurtosisBackend)

// TODO: Move log file management into LogsDatabaseClient
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()

// TODO: remove once users are fully migrated to log retention/new log schema
// pulls logs per enclave/per service id
perFileStreamStrategy := stream_logs_strategy.NewPerFileStreamLogsStrategy()
perFileLogsDatabaseClient := persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perFileStreamStrategy)

// pulls logs /per week/per enclave/per service
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime)
perWeekLogsDatabaseClient := persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamStrategy)

// TODO: Move logFileManager into LogsDatabaseClient
logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, realTime)
logFileManager.StartLogFileManagement(ctx)

Expand Down Expand Up @@ -232,8 +226,7 @@ func runMain() error {
enclaveManager,
serverArgs.MetricsUserID,
serverArgs.DidUserAcceptSendingMetrics,
perWeekLogsDatabaseClient,
perFileLogsDatabaseClient,
logsDatabaseClient,
logFileManager,
metricsClient)
apiPath, handler := kurtosis_engine_rpc_api_bindingsconnect.NewEngineServiceHandler(engineConnectServer)
Expand Down Expand Up @@ -329,6 +322,21 @@ func getKurtosisBackend(ctx context.Context, kurtosisBackendType args.KurtosisBa
return kurtosisBackend, nil
}

// if cluster is docker, return logs client for centralized logging, otherwise use logs db of kurtosis backend which uses k8s logs under the hood
func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosisBackend backend_interface.KurtosisBackend) centralized_logs.LogsDatabaseClient {
var logsDatabaseClient centralized_logs.LogsDatabaseClient
switch kurtosisBackendType {
case args.KurtosisBackendType_Docker:
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()
perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime)
logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamLogsStrategy)
case args.KurtosisBackendType_Kubernetes:
logsDatabaseClient = kurtosis_backend.NewKurtosisBackendLogsDatabaseClient(kurtosisBackend)
}
return logsDatabaseClient
}

func formatFilenameFunctionForLogs(filename string, functionName string) string {
var output strings.Builder
output.WriteString("[")
Expand Down
62 changes: 7 additions & 55 deletions engine/server/engine/server/engine_connect_server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@ import (
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"time"
)

const (
subnetworkDisableBecauseItIsDeprecated = false
)

var (
logRetentionFeatureReleaseTime = time.Date(2023, 9, 7, 13, 0, 0, 0, time.UTC)
)

type EngineConnectServerService struct {
// The version tag of the engine server image, so it can report its own version
imageVersionTag string
Expand All @@ -38,14 +33,8 @@ type EngineConnectServerService struct {
// User consent to send metrics
didUserAcceptSendingMetrics bool

// The clients for consuming container logs from the logs' database server

// per week pulls logs from enclaves created post log retention feature
perWeekLogsDatabaseClient centralized_logs.LogsDatabaseClient

// per file pulls logs from enclaves created pre log retention feature
// TODO: remove once users are fully migrated to log retention/new log schema
perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient
// The client for consuming container logs from the logs database
logsDatabaseClient centralized_logs.LogsDatabaseClient

logFileManager *log_file_manager.LogFileManager

Expand All @@ -57,8 +46,7 @@ func NewEngineConnectServerService(
enclaveManager *enclave_manager.EnclaveManager,
metricsUserId string,
didUserAcceptSendingMetrics bool,
perWeekLogsDatabaseClient centralized_logs.LogsDatabaseClient,
perFileLogsDatabaseClient centralized_logs.LogsDatabaseClient,
logsDatabaseClient centralized_logs.LogsDatabaseClient,
logFileManager *log_file_manager.LogFileManager,
metricsClient metrics_client.MetricsClient,
) *EngineConnectServerService {
Expand All @@ -67,8 +55,7 @@ func NewEngineConnectServerService(
enclaveManager: enclaveManager,
metricsUserID: metricsUserId,
didUserAcceptSendingMetrics: didUserAcceptSendingMetrics,
perWeekLogsDatabaseClient: perWeekLogsDatabaseClient,
perFileLogsDatabaseClient: perFileLogsDatabaseClient,
logsDatabaseClient: logsDatabaseClient,
logFileManager: logFileManager,
metricsClient: metricsClient,
}
Expand Down Expand Up @@ -205,7 +192,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
requestedServiceUuids[serviceUuid] = true
}

if service.perWeekLogsDatabaseClient == nil || service.perFileLogsDatabaseClient == nil {
if service.logsDatabaseClient == nil {
return stacktrace.NewError("It's not possible to return service logs because there is no logs database client; this is bug in Kurtosis")
}

Expand All @@ -225,14 +212,7 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c
return stacktrace.Propagate(err, "An error occurred creating the conjunctive log line filters from the GRPC's conjunctive log line filters '%+v'", args.GetConjunctiveFilters())
}

// get enclave creation time to determine strategy to pull logs
enclaveCreationTime, err := service.getEnclaveCreationTime(ctx, enclaveUuid)
if err != nil {
return stacktrace.Propagate(err, "An error occurred while trying to get the enclave creation time to determine how to pull logs.")
}
logsDatabaseClient := service.getLogsDatabaseClient(enclaveCreationTime)

serviceLogsByServiceUuidChan, errChan, cancelCtxFunc, err = logsDatabaseClient.StreamUserServiceLogs(
serviceLogsByServiceUuidChan, errChan, cancelCtxFunc, err = service.logsDatabaseClient.StreamUserServiceLogs(
contextWithCancel,
enclaveUuid,
requestedServiceUuids,
Expand Down Expand Up @@ -300,8 +280,7 @@ func (service *EngineConnectServerService) reportAnyMissingUuidsAndGetNotFoundUu
requestedServiceUuids map[user_service.ServiceUUID]bool,
stream *connect.ServerStream[kurtosis_engine_rpc_api_bindings.GetServiceLogsResponse],
) (map[string]bool, error) {
// doesn't matter which logs client is used here
existingServiceUuids, err := service.perWeekLogsDatabaseClient.FilterExistingServiceUuids(ctx, enclaveUuid, requestedServiceUuids)
existingServiceUuids, err := service.logsDatabaseClient.FilterExistingServiceUuids(ctx, enclaveUuid, requestedServiceUuids)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred retrieving the exhaustive list of service UUIDs from the log client for enclave '%v' and for the requested UUIDs '%+v'", enclaveUuid, requestedServiceUuids)
}
Expand Down Expand Up @@ -422,30 +401,3 @@ func newConjunctiveLogLineFiltersFromGRPCLogLineFilters(

return conjunctiveLogLineFilters, nil
}

// If the enclave was created prior to log retention, return the per file logs client
func (service *EngineConnectServerService) getLogsDatabaseClient(enclaveCreationTime time.Time) centralized_logs.LogsDatabaseClient {
if enclaveCreationTime.After(logRetentionFeatureReleaseTime) {
return service.perWeekLogsDatabaseClient
} else {
return service.perFileLogsDatabaseClient
}
}

func (service *EngineConnectServerService) getEnclaveCreationTime(ctx context.Context, enclaveUuid enclave.EnclaveUUID) (time.Time, error) {
enclaves, err := service.enclaveManager.GetEnclaves(ctx)
if err != nil {
return time.Time{}, err
}

enclaveObj, found := enclaves[string(enclaveUuid)]
if !found {
return time.Time{}, stacktrace.NewError("Engine could not find enclave '%v'", enclaveUuid)
}

timestamp := enclaveObj.GetCreationTime()
if timestamp == nil {
return time.Time{}, stacktrace.NewError("An error occurred getting the creation time for enclave '%v'. This is a bug in Kurtosis", enclaveUuid)
}
return timestamp.AsTime(), nil
}

0 comments on commit 8afa9c7

Please sign in to comment.