Skip to content

Commit

Permalink
use duration instead of int
Browse files Browse the repository at this point in the history
  • Loading branch information
tedim52 committed Oct 17, 2024
1 parent d0b9481 commit f8806c2
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
)

const (
oneWeek = 7 * 24 * time.Hour

removeLogsWaitHours = 6 * time.Hour

createLogsWaitMinutes = 1 * time.Minute
Expand All @@ -38,19 +36,19 @@ type LogFileManager struct {

time logs_clock.LogsClock

logRetentionPeriodInWeeks int
logRetentionPeriod time.Duration

baseFilePath string
}

func NewLogFileManager(kurtosisBackend backend_interface.KurtosisBackend, filesystem volume_filesystem.VolumeFilesystem, fileLayout file_layout.LogFileLayout, time logs_clock.LogsClock, logRetentionPeriodInWeeks int, baseFilePath string) *LogFileManager {
func NewLogFileManager(kurtosisBackend backend_interface.KurtosisBackend, filesystem volume_filesystem.VolumeFilesystem, fileLayout file_layout.LogFileLayout, time logs_clock.LogsClock, logRetentionPeriod time.Duration, baseFilePath string) *LogFileManager {
return &LogFileManager{
kurtosisBackend: kurtosisBackend,
filesystem: filesystem,
fileLayout: fileLayout,
time: time,
logRetentionPeriodInWeeks: logRetentionPeriodInWeeks,
baseFilePath: baseFilePath,
kurtosisBackend: kurtosisBackend,
filesystem: filesystem,
fileLayout: fileLayout,
time: time,
logRetentionPeriod: logRetentionPeriod,
baseFilePath: baseFilePath,
}
}

Expand Down Expand Up @@ -146,22 +144,21 @@ func (manager *LogFileManager) RemoveLogsBeyondRetentionPeriod(ctx context.Conte
serviceNameStr := string(serviceRegistration.GetName())
serviceShortUuidStr := uuid_generator.ShortenedUUIDString(serviceUuidStr)

retentionPeriod := time.Duration(manager.logRetentionPeriodInWeeks) * oneWeek
oldServiceLogFilesByUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, retentionPeriod, 1, string(enclaveUuid), serviceUuidStr)
oldServiceLogFilesByUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, manager.logRetentionPeriod, 1, string(enclaveUuid), serviceUuidStr)
if err != nil {
logrus.Errorf("An error occurred getting log file paths for service '%v' in enclave '%v' logs beyond retention: %v", serviceUuidStr, enclaveUuid, err)
} else {
pathsToRemove = append(pathsToRemove, oldServiceLogFilesByUuid...)
}

oldServiceLogFilesByName, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, retentionPeriod, 1, string(enclaveUuid), serviceNameStr)
oldServiceLogFilesByName, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, manager.logRetentionPeriod, 1, string(enclaveUuid), serviceNameStr)
if err != nil {
logrus.Errorf("An error occurred getting log file paths for service '%v' in enclave '%v' logs beyond retention: %v", serviceNameStr, enclaveUuid, err)
} else {
pathsToRemove = append(pathsToRemove, oldServiceLogFilesByName...)
}

oldServiceLogFilesByShortUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, retentionPeriod, 1, string(enclaveUuid), serviceShortUuidStr)
oldServiceLogFilesByShortUuid, err := manager.fileLayout.GetLogFilePaths(manager.filesystem, manager.logRetentionPeriod, 1, string(enclaveUuid), serviceShortUuidStr)
if err != nil {
logrus.Errorf("An error occurred getting log file paths for service '%v' in enclave '%v' logs beyond retention: %v", serviceShortUuidStr, enclaveUuid, err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net"
"os"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestRemoveEnclaveLogs(t *testing.T) {
_, _ = mockFs.Create(week52filepath)
_, _ = mockFs.Create(week52filepathDiffService)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, 5, volume_consts.LogsStorageDirpath)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, convertWeeksToDuration(5), volume_consts.LogsStorageDirpath)
err := logFileManager.RemoveEnclaveLogs(testEnclaveUuid) // should remove only all log files for enclave one

require.NoError(t, err)
Expand Down Expand Up @@ -114,7 +115,7 @@ func TestRemoveAllLogs(t *testing.T) {
_, _ = mockFs.Create(week52filepath)
_, _ = mockFs.Create(week52filepathDiffService)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, 5, volume_consts.LogsStorageDirpath)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, convertWeeksToDuration(5), volume_consts.LogsStorageDirpath)
err := logFileManager.RemoveAllLogs()

require.NoError(t, err)
Expand Down Expand Up @@ -149,7 +150,7 @@ func TestCreateLogFiles(t *testing.T) {
expectedServiceNameFilePath := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerDay(2022, 52, 0).Now(), testEnclaveUuid, testUserService1Name)
expectedServiceShortUuidFilePath := fileLayout.GetLogFilePath(logs_clock.NewMockLogsClockPerDay(2022, 52, 0).Now(), testEnclaveUuid, uuid_generator.ShortenedUUIDString(testUserService1Uuid))

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, 5, volume_consts.LogsStorageDirpath)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, fileLayout, mockTime, convertWeeksToDuration(5), volume_consts.LogsStorageDirpath)
err := logFileManager.CreateLogFiles(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -191,3 +192,8 @@ func getMockedKurtosisBackendWithEnclavesAndServices(ctx context.Context, t *tes
Return(servicesMap, nil)
return mockKurtosisBackend
}

func convertWeeksToDuration(retentionPeriodInWeeks int) time.Duration {
const hoursInWeek = 7 * 24 // 7 days * 24 hours
return time.Duration(retentionPeriodInWeeks*hoursInWeek) * time.Hour
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) {

underlyingFs := createFilledPerWeekFilesystem(startingWeek)
mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime))

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) {

underlyingFs := createEmptyPerWeekFilesystem(startingWeek)
mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime))

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t *
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime))

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -407,7 +407,7 @@ func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime))

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, 4, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime))

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, 4, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime))

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClockPerDay(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
perWeekStreamStrategy := stream_logs_strategy.NewStreamLogsStrategyImpl(mockTime, retentionPeriodInWeeksForTesting, file_layout.NewPerWeekFileLayout(mockTime))

expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,26 @@ const (
oneWeek = 7 * 24 * time.Hour
)

// PerWeekStreamLogsStrategy pulls logs from filesystem where there is a log file per year, per week, per enclave, per service
// StreamLogsStrategyImpl pulls logs from filesystem where there is a log file per year, per week, per enclave, per service
// Weeks are denoted 01-52
// e.g.
// [.../28/d3e8832d671f/61830789f03a.json] is the file containing logs from service with uuid 61830789f03a, in enclave with uuid d3e8832d671f,
// in the 28th week of the current year
type PerWeekStreamLogsStrategy struct {
time logs_clock.LogsClock
logRetentionPeriodInWeeks int
type StreamLogsStrategyImpl struct {
time logs_clock.LogsClock
logRetentionPeriod time.Duration
fileLayout file_layout.LogFileLayout
}

func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock, logRetentionPeriodInWeeks int) *PerWeekStreamLogsStrategy {
return &PerWeekStreamLogsStrategy{
time: time,
logRetentionPeriodInWeeks: logRetentionPeriodInWeeks,
func NewStreamLogsStrategyImpl(time logs_clock.LogsClock, logRetentionPeriod time.Duration, fileLayout file_layout.LogFileLayout) *StreamLogsStrategyImpl {
return &StreamLogsStrategyImpl{
time: time,
logRetentionPeriod: logRetentionPeriod,
fileLayout: fileLayout,
}
}

func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
func (strategy *StreamLogsStrategyImpl) StreamLogs(
ctx context.Context,
fs volume_filesystem.VolumeFilesystem,
logLineSender *logline.LogLineSender,
Expand All @@ -54,7 +56,7 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
shouldReturnAllLogs bool,
numLogLines uint32,
) {
paths, err := strategy.getLogFilePaths(fs, strategy.logRetentionPeriodInWeeks, string(enclaveUuid), string(serviceUuid))
paths, err := strategy.fileLayout.GetLogFilePaths(fs, strategy.logRetentionPeriod, -1, string(enclaveUuid), string(serviceUuid))
if err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred retrieving log file paths for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid)
return
Expand All @@ -67,12 +69,6 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
serviceUuid, enclaveUuid)
return
}
if len(paths) > strategy.logRetentionPeriodInWeeks {
logrus.Warnf(
`We expected to retrieve logs going back '%v' weeks, but instead retrieved logs going back '%v' weeks.
This means logs past the retention period are being returned, likely a bug in Kurtosis.`,
strategy.logRetentionPeriodInWeeks, len(paths))
}

logsReader, files, err := getLogsReader(fs, paths)
if err != nil {
Expand Down Expand Up @@ -109,19 +105,6 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
}
}

// [getLogFilePaths] returns a list of log file paths containing logs for [serviceUuid] in [enclaveUuid]
// going [retentionPeriodInWeeks] back from the [currentWeek].
// Notes:
// - File paths are of the format '/week/{{ enclave uuid }}/{{ service uuid }}.json' where 'week' is %V strftime specifier
// - The list of file paths is returned in order of oldest logs to most recent logs e.g. [ 03/80124/1234.json, /04/801234/1234.json, ...]
// - If a file path does not exist, the function exits and returns whatever file paths were found
func (strategy *PerWeekStreamLogsStrategy) getLogFilePaths(filesystem volume_filesystem.VolumeFilesystem, retentionPeriodInWeeks int, enclaveUuid, serviceUuid string) ([]string, error) {
// TODO: embed FileLayout into StreamLogsStrategy interface
perWeekFileLayout := file_layout.NewPerWeekFileLayout(strategy.time)
retentionPeriod := time.Duration(retentionPeriodInWeeks) * oneWeek
return perWeekFileLayout.GetLogFilePaths(filesystem, retentionPeriod, -1, enclaveUuid, serviceUuid)
}

// Returns a Reader over all logs in [logFilePaths] and the open file descriptors of the associated [logFilePaths]
func getLogsReader(filesystem volume_filesystem.VolumeFilesystem, logFilePaths []string) (*bufio.Reader, []volume_filesystem.VolumeFile, error) {
var fileReaders []io.Reader
Expand All @@ -143,7 +126,7 @@ func getLogsReader(filesystem volume_filesystem.VolumeFilesystem, logFilePaths [
return bufio.NewReader(combinedLogsReader), files, nil
}

func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
func (strategy *StreamLogsStrategyImpl) streamAllLogs(
ctx context.Context,
logsReader *bufio.Reader,
logLineSender *logline.LogLineSender,
Expand Down Expand Up @@ -181,7 +164,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
}

// tail -n X
func (strategy *PerWeekStreamLogsStrategy) streamTailLogs(
func (strategy *StreamLogsStrategyImpl) streamTailLogs(
ctx context.Context,
logsReader *bufio.Reader,
numLogLines uint32,
Expand Down Expand Up @@ -266,7 +249,7 @@ func isValidJsonEnding(line string) bool {
return endOfLine == volume_consts.EndOfJsonLine
}

func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(jsonLog JsonLog, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID) error {
func (strategy *StreamLogsStrategyImpl) sendJsonLogLine(jsonLog JsonLog, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID) error {
// each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."}
// eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd",
// "log":"hi","timestamp":"2023-08-14T14:57:49Z"}
Expand Down Expand Up @@ -307,14 +290,14 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(jsonLog JsonLog, conj
}

// Returns true if [logLine] has no timestamp
func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) {
retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek)
func (strategy *StreamLogsStrategyImpl) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) {
oldestTimeWithinRetentionPeriod := strategy.time.Now().Add(-strategy.logRetentionPeriod)
timestamp := logLine.GetTimestamp()
return timestamp.After(retentionPeriod), nil
return timestamp.After(oldestTimeWithinRetentionPeriod), nil
}

// Continue streaming log lines as they are written to log file (tail -f [filepath])
func (strategy *PerWeekStreamLogsStrategy) followLogs(
func (strategy *StreamLogsStrategyImpl) followLogs(
ctx context.Context,
filepath string,
logLineSender *logline.LogLineSender,
Expand Down
Loading

0 comments on commit f8806c2

Please sign in to comment.