diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager.go index 68b5d568b1..3a17098fea 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager.go @@ -19,8 +19,6 @@ import ( ) const ( - oneWeek = 7 * 24 * time.Hour - removeLogsWaitHours = 6 * time.Hour createLogsWaitMinutes = 1 * time.Minute @@ -38,19 +36,19 @@ type LogFileManager struct { time logs_clock.LogsClock - logRetentionPeriodInWeeks int + logRetentionPeriod time.Duration baseFilePath string } -func NewLogFileManager(kurtosisBackend backend_interface.KurtosisBackend, filesystem volume_filesystem.VolumeFilesystem, fileLayout file_layout.LogFileLayout, time logs_clock.LogsClock, logRetentionPeriodInWeeks int, baseFilePath string) *LogFileManager { +func NewLogFileManager(kurtosisBackend backend_interface.KurtosisBackend, filesystem volume_filesystem.VolumeFilesystem, fileLayout file_layout.LogFileLayout, time logs_clock.LogsClock, logRetentionPeriod time.Duration, baseFilePath string) *LogFileManager { return &LogFileManager{ - kurtosisBackend: kurtosisBackend, - filesystem: filesystem, - fileLayout: fileLayout, - time: time, - logRetentionPeriodInWeeks: logRetentionPeriodInWeeks, - baseFilePath: baseFilePath, + kurtosisBackend: kurtosisBackend, + filesystem: filesystem, + fileLayout: fileLayout, + time: time, + logRetentionPeriod: logRetentionPeriod, + baseFilePath: baseFilePath, } } @@ -146,22 +144,21 @@ func (manager *LogFileManager) RemoveLogsBeyondRetentionPeriod(ctx context.Conte serviceNameStr := string(serviceRegistration.GetName()) serviceShortUuidStr := uuid_generator.ShortenedUUIDString(serviceUuidStr) - retentionPeriod := time.Duration(manager.logRetentionPeriodInWeeks) * oneWeek - oldServiceLogFilesByUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, retentionPeriod, 1, string(enclaveUuid), serviceUuidStr) + oldServiceLogFilesByUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, manager.logRetentionPeriod, 1, string(enclaveUuid), serviceUuidStr) if err != nil { logrus.Errorf("An error occurred getting log file paths for service '%v' in enclave '%v' logs beyond retention: %v", serviceUuidStr, enclaveUuid, err) } else { pathsToRemove = append(pathsToRemove, oldServiceLogFilesByUuid...) } - oldServiceLogFilesByName, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, retentionPeriod, 1, string(enclaveUuid), serviceNameStr) + oldServiceLogFilesByName, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, manager.logRetentionPeriod, 1, string(enclaveUuid), serviceNameStr) if err != nil { logrus.Errorf("An error occurred getting log file paths for service '%v' in enclave '%v' logs beyond retention: %v", serviceNameStr, enclaveUuid, err) } else { pathsToRemove = append(pathsToRemove, oldServiceLogFilesByName...) } - oldServiceLogFilesByShortUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, retentionPeriod, 1, string(enclaveUuid), serviceShortUuidStr) + oldServiceLogFilesByShortUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, manager.logRetentionPeriod, 1, string(enclaveUuid), serviceShortUuidStr) if err != nil { logrus.Errorf("An error occurred getting log file paths for service '%v' in enclave '%v' logs beyond retention: %v", serviceShortUuidStr, enclaveUuid, err) } else { diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager_test.go index c7da874544..dde02fdd78 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager/log_file_manager_test.go @@ -16,6 +16,7 @@ import ( "net" "os" "testing" + "time" ) const ( @@ -75,7 +76,7 @@ func TestRemoveEnclaveLogs(t *testing.T) { _, _ = mockFs.Create(week52filepath) _, _ = mockFs.Create(week52filepathDiffService) - logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, 5, volume_consts.LogsStorageDirpath) + logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, convertWeeksToDuration(5), volume_consts.LogsStorageDirpath) err := logFileManager.RemoveEnclaveLogs(testEnclaveUuid) // should remove only all log files for enclave one require.NoError(t, err) @@ -114,7 +115,7 @@ func TestRemoveAllLogs(t *testing.T) { _, _ = mockFs.Create(week52filepath) _, _ = mockFs.Create(week52filepathDiffService) - logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, 5, volume_consts.LogsStorageDirpath) + logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, convertWeeksToDuration(5), volume_consts.LogsStorageDirpath) err := logFileManager.RemoveAllLogs() require.NoError(t, err) @@ -149,7 +150,7 @@ func TestCreateLogFiles(t *testing.T) { expectedServiceNameFilePath := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerDay(2022, 52, 0).Now(), testEnclaveUuid, testUserService1Name) expectedServiceShortUuidFilePath := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerDay(2022, 52, 0).Now(), testEnclaveUuid, uuid_generator.ShortenedUUIDString(testUserService1Uuid)) - logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, 5, volume_consts.LogsStorageDirpath) + logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, convertWeeksToDuration(5), volume_consts.LogsStorageDirpath) err := logFileManager.CreateLogFiles(ctx) require.NoError(t, err) @@ -191,3 +192,8 @@ func getMockedKurtosisBackendWithEnclavesAndServices(ctx context.Context, t *tes Return(servicesMap, nil) return mockKurtosisBackend } + +func convertWeeksToDuration(retentionPeriodInWeeks int) time.Duration { + const hoursInWeek = 7 * 24 // 7 days * 24 hours + return time.Duration(retentionPeriodInWeeks*hoursInWeek) * time.Hour +} diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go index 36ee7c5e19..8597d18dc9 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go @@ -132,7 +132,7 @@ func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) { underlyingFs := createFilledPerWeekFilesystem(startingWeek) mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( t, @@ -215,7 +215,7 @@ func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) { underlyingFs := createEmptyPerWeekFilesystem(startingWeek) mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( t, @@ -320,7 +320,7 @@ func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t * require.NoError(t, err) mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( t, @@ -407,7 +407,7 @@ func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) { require.NoError(t, err) mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( t, @@ -474,7 +474,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) { require.NoError(t, err) mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, 4, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( t, @@ -543,7 +543,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) { require.NoError(t, err) mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, 4, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines( t, @@ -594,7 +594,7 @@ func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) { require.NoError(t, err) mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay) - perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr) require.NoError(t, err) diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index 7ee7192431..f681fd394e 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -25,24 +25,26 @@ const ( oneWeek = 7 * 24 * time.Hour ) -// PerWeekStreamLogsStrategy pulls logs from filesystem where there is a log file per year, per week, per enclave, per service +// StreamLogsStrategyImpl pulls logs from filesystem where there is a log file per year, per week, per enclave, per service // Weeks are denoted 01-52 // e.g. // [.../28/d3e8832d671f/61830789f03a.json] is the file containing logs from service with uuid 61830789f03a, in enclave with uuid d3e8832d671f, // in the 28th week of the current year -type PerWeekStreamLogsStrategy struct { - time logs_clock.LogsClock - logRetentionPeriodInWeeks int +type StreamLogsStrategyImpl struct { + time logs_clock.LogsClock + logRetentionPeriod time.Duration + fileLayout file_layout.LogFileLayout } -func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock, logRetentionPeriodInWeeks int) *PerWeekStreamLogsStrategy { - return &PerWeekStreamLogsStrategy{ - time: time, - logRetentionPeriodInWeeks: logRetentionPeriodInWeeks, +func NewStreamLogsStrategyImpl(time logs_clock.LogsClock, logRetentionPeriod time.Duration, fileLayout file_layout.LogFileLayout) *StreamLogsStrategyImpl { + return &StreamLogsStrategyImpl{ + time: time, + logRetentionPeriod: logRetentionPeriod, + fileLayout: fileLayout, } } -func (strategy *PerWeekStreamLogsStrategy) StreamLogs( +func (strategy *StreamLogsStrategyImpl) StreamLogs( ctx context.Context, fs volume_filesystem.VolumeFilesystem, logLineSender *logline.LogLineSender, @@ -54,7 +56,7 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( shouldReturnAllLogs bool, numLogLines uint32, ) { - paths, err := strategy.getLogFilePaths(fs, strategy.logRetentionPeriodInWeeks, string(enclaveUuid), string(serviceUuid)) + paths, err := strategy.fileLayout.GetLogFilePaths(fs, strategy.logRetentionPeriod, -1, string(enclaveUuid), string(serviceUuid)) if err != nil { streamErrChan <- stacktrace.Propagate(err, "An error occurred retrieving log file paths for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid) return @@ -67,12 +69,6 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( serviceUuid, enclaveUuid) return } - if len(paths) > strategy.logRetentionPeriodInWeeks { - logrus.Warnf( - `We expected to retrieve logs going back '%v' weeks, but instead retrieved logs going back '%v' weeks. - This means logs past the retention period are being returned, likely a bug in Kurtosis.`, - strategy.logRetentionPeriodInWeeks, len(paths)) - } logsReader, files, err := getLogsReader(fs, paths) if err != nil { @@ -109,19 +105,6 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( } } -// [getLogFilePaths] returns a list of log file paths containing logs for [serviceUuid] in [enclaveUuid] -// going [retentionPeriodInWeeks] back from the [currentWeek]. -// Notes: -// - File paths are of the format '/week/{{ enclave uuid }}/{{ service uuid }}.json' where 'week' is %V strftime specifier -// - The list of file paths is returned in order of oldest logs to most recent logs e.g. [ 03/80124/1234.json, /04/801234/1234.json, ...] -// - If a file path does not exist, the function exits and returns whatever file paths were found -func (strategy *PerWeekStreamLogsStrategy) getLogFilePaths(filesystem volume_filesystem.VolumeFilesystem, retentionPeriodInWeeks int, enclaveUuid, serviceUuid string) ([]string, error) { - // TODO: embed FileLayout into StreamLogsStrategy interface - perWeekFileLayout := file_layout.NewPerWeekFileLayout(strategy.time) - retentionPeriod := time.Duration(retentionPeriodInWeeks) * oneWeek - return perWeekFileLayout.GetLogFilePaths(filesystem, retentionPeriod, -1, enclaveUuid, serviceUuid) -} - // Returns a Reader over all logs in [logFilePaths] and the open file descriptors of the associated [logFilePaths] func getLogsReader(filesystem volume_filesystem.VolumeFilesystem, logFilePaths []string) (*bufio.Reader, []volume_filesystem.VolumeFile, error) { var fileReaders []io.Reader @@ -143,7 +126,7 @@ func getLogsReader(filesystem volume_filesystem.VolumeFilesystem, logFilePaths [ return bufio.NewReader(combinedLogsReader), files, nil } -func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( +func (strategy *StreamLogsStrategyImpl) streamAllLogs( ctx context.Context, logsReader *bufio.Reader, logLineSender *logline.LogLineSender, @@ -181,7 +164,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs( } // tail -n X -func (strategy *PerWeekStreamLogsStrategy) streamTailLogs( +func (strategy *StreamLogsStrategyImpl) streamTailLogs( ctx context.Context, logsReader *bufio.Reader, numLogLines uint32, @@ -266,7 +249,7 @@ func isValidJsonEnding(line string) bool { return endOfLine == volume_consts.EndOfJsonLine } -func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(jsonLog JsonLog, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID) error { +func (strategy *StreamLogsStrategyImpl) sendJsonLogLine(jsonLog JsonLog, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID) error { // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} @@ -307,14 +290,14 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(jsonLog JsonLog, conj } // Returns true if [logLine] has no timestamp -func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) { - retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek) +func (strategy *StreamLogsStrategyImpl) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) { + oldestTimeWithinRetentionPeriod := strategy.time.Now().Add(-strategy.logRetentionPeriod) timestamp := logLine.GetTimestamp() - return timestamp.After(retentionPeriod), nil + return timestamp.After(oldestTimeWithinRetentionPeriod), nil } // Continue streaming log lines as they are written to log file (tail -f [filepath]) -func (strategy *PerWeekStreamLogsStrategy) followLogs( +func (strategy *StreamLogsStrategyImpl) followLogs( ctx context.Context, filepath string, logLineSender *logline.LogLineSender, diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy_test.go index 6d52dea218..ed8cc5e642 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy_test.go @@ -57,7 +57,7 @@ func TestGetLogFilePaths(t *testing.T) { } mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, currentWeek, defaultDay) - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid) require.NoError(t, err) @@ -94,7 +94,7 @@ func TestGetLogFilePathsAcrossNewYear(t *testing.T) { } mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, currentWeek, defaultDay) - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid) require.NoError(t, err) @@ -131,7 +131,7 @@ func TestGetLogFilePathsAcrossNewYearWith53Weeks(t *testing.T) { } mockTime := logs_clock.NewMockLogsClockPerDay(2016, currentWeek, 1) - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid) require.NoError(t, err) @@ -163,7 +163,7 @@ func TestGetLogFilePathsWithDiffRetentionPeriod(t *testing.T) { } mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, currentWeek, defaultDay) - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriod, testEnclaveUuid, testUserService1Uuid) require.NoError(t, err) @@ -195,7 +195,7 @@ func TestGetLogFilePathsReturnsAllAvailableWeeks(t *testing.T) { currentWeek := 2 mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, currentWeek, defaultDay) - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid) require.NoError(t, err) @@ -220,7 +220,7 @@ func TestGetLogFilePathsReturnsCorrectPathsIfWeeksMissingInBetween(t *testing.T) currentWeek := 3 mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, currentWeek, defaultDay) - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid) require.NoError(t, err) @@ -249,7 +249,7 @@ func TestGetLogFilePathsReturnsCorrectPathsIfCurrentWeekHasNoLogsYet(t *testing. week2filepath, } - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid) require.NoError(t, err) @@ -267,7 +267,7 @@ func TestIsWithinRetentionPeriod(t *testing.T) { // week 41 would put the log line outside the retention period mockTime := logs_clock.NewMockLogsClockPerDay(2023, 41, 0) - strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting) + strategy := NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime)) timestamp, err := parseTimestampFromJsonLogLine(jsonLogLine) require.NoError(t, err) diff --git a/engine/server/engine/main.go b/engine/server/engine/main.go index 058249ee1f..fe07ba0da1 100644 --- a/engine/server/engine/main.go +++ b/engine/server/engine/main.go @@ -9,8 +9,8 @@ import ( "context" "fmt" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout" + "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_consts" "io/fs" - "math" "net" "net/http" "os" @@ -410,17 +410,11 @@ func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosi case args.KurtosisBackendType_Docker: realTime := logs_clock.NewRealClock() - logRetentionPeriodInWeeks := int(math.Round(logRetentionPeriod.Hours() / float64(7*24*time.Hour))) - if logRetentionPeriodInWeeks < 1 { - logRetentionPeriodInWeeks = 1 - } - logrus.Infof("Setting log retention period to '%v' week(s).", logRetentionPeriodInWeeks) osFs := volume_filesystem.NewOsVolumeFilesystem() - perHourFileLayout := file_layout.NewPerHourFileLayout(logs_clock.NewRealClock()) - logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, perHourFileLayout, realTime, logRetentionPeriodInWeeks) - perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime, logRetentionPeriodInWeeks) - - logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, logFileManager, perWeekStreamLogsStrategy) + perHourFileLayout := file_layout.NewPerHourFileLayout(realTime) + logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, perHourFileLayout, realTime, logRetentionPeriod, volume_consts.LogsStorageDirpath) + streamLogsStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(realTime, logRetentionPeriod, perHourFileLayout) + logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, logFileManager, streamLogsStrategy) case args.KurtosisBackendType_Kubernetes: logsDatabaseClient = kurtosis_backend.NewKurtosisBackendLogsDatabaseClient(kurtosisBackend) }