diff --git a/lib/.changeset/v1.50.16.md b/lib/.changeset/v1.50.16.md new file mode 100644 index 000000000..f06f8c237 --- /dev/null +++ b/lib/.changeset/v1.50.16.md @@ -0,0 +1 @@ +- TT-1856 Fix err consumer has finished but you are still trying to accept logs \ No newline at end of file diff --git a/lib/logging/log.go b/lib/logging/log.go index 2ad3cb670..ad71d663c 100644 --- a/lib/logging/log.go +++ b/lib/logging/log.go @@ -40,7 +40,7 @@ func (ct *CustomT) Write(p []byte) (n int, err error) { } if ct.ended { l := GetTestLogger(nil) - l.Error().Msgf("%s %s: %s", afterTestEndedMsg, ct.Name(), string(p)) + l.Warn().Msgf("%s %s: %s", afterTestEndedMsg, ct.Name(), string(p)) return len(p), nil } ct.T.Log(strings.TrimSuffix(str, "\n")) @@ -53,7 +53,7 @@ func (ct CustomT) Printf(format string, v ...interface{}) { s := "%s: " formatted := fmt.Sprintf("%s %s%s", afterTestEndedMsg, s, format) l := GetTestLogger(nil) - l.Error().Msgf(formatted, ct.Name(), v) + l.Warn().Msgf(formatted, ct.Name(), v) } else { ct.L.Info().Msgf(format, v...) } diff --git a/lib/logstream/logstream.go b/lib/logstream/logstream.go index 8469318eb..c7b971b62 100644 --- a/lib/logstream/logstream.go +++ b/lib/logstream/logstream.go @@ -229,6 +229,14 @@ func (m *LogStream) ConnectContainer(ctx context.Context, container LogProducing select { case logErr := <-container.GetLogProductionErrorChannel(): if logErr != nil { + // Check if the container is not stopped or terminated + if !container.IsRunning() { + m.log.Info(). + Str("Container name", name). + Msg("Skipping log producer retrying, because the container is not running anymore") + break + } + m.log.Error(). Err(err). Str("Container name", name). @@ -441,31 +449,48 @@ func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) { // Stop stops the consumer and closes listening channel (it won't be accepting any logs from now on) func (g *ContainerLogConsumer) stop() error { - if g.isDone { - return nil - } + var stopErr error + + // Use sync.Once to ensure the stop logic is executed only once + g.stopOnce.Do(func() { + // Send a signal to the listening channel if possible + select { + case g.logListeningDone <- struct{}{}: + default: + // No action needed if the channel is already full + } + + // Close the channel + close(g.logListeningDone) + }) - g.isDone = true - g.logListeningDone <- struct{}{} - defer close(g.logListeningDone) + return stopErr +} - return nil +// IsStopped checks if the consumer has already been stopped +func (g *ContainerLogConsumer) IsStopped() bool { + select { + case <-g.logListeningDone: + // Channel has been closed, meaning stop() was called + return true + default: + // Channel is still open + return false + } } // DisconnectContainer disconnects particular container func (m *LogStream) DisconnectContainer(container LogProducingContainer) error { var err error - if container.IsRunning() { - m.log.Trace().Str("container", container.GetContainerID()).Msg("Disconnecting container") - err = container.StopLogProducer() - } + m.log.Info().Str("Container", container.GetContainerID()).Msg("Disconnecting container") consumerFound := false m.consumerMutex.RLock() for _, consumer := range m.consumers { if consumer.container.GetContainerID() == container.GetContainerID() { consumerFound = true + m.log.Info().Str("Container", consumer.name).Msg("Stopping consumer") if stopErr := consumer.stop(); err != nil { m.log.Error(). Err(stopErr). @@ -555,7 +580,7 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu // set the cursor to the end of the file, when done to resume writing, unless it was closed //revive:disable defer func() { - if !consumer.isDone { + if !consumer.IsStopped() { _, deferErr := consumer.tempFile.Seek(0, 2) attachError(deferErr) } @@ -620,8 +645,10 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu // FlushLogsToTargets flushes all logs for all consumers (containers) to their targets func (m *LogStream) FlushLogsToTargets() error { var preExecuteFn = func(consumer *ContainerLogConsumer) error { - // do not accept any new logs - consumer.isDone = true + // Stop the consumer to ensure it doesn't accept new logs + if err := consumer.stop(); err != nil { + return fmt.Errorf("failed to stop consumer %s: %w", consumer.name, err) + } for _, handler := range m.logTargetHandlers { consumer.ls.log.Debug(). @@ -696,7 +723,7 @@ type ContainerLogConsumer struct { ls *LogStream tempFile *os.File encoder *gob.Encoder - isDone bool + stopOnce sync.Once hasErrored bool logListeningDone chan struct{} container LogProducingContainer @@ -717,7 +744,6 @@ func newContainerLogConsumer(ctx context.Context, lw *LogStream, container LogPr prefix: prefix, logTargets: logTargets, ls: lw, - isDone: false, hasErrored: false, logListeningDone: make(chan struct{}, 1), container: container, @@ -766,8 +792,7 @@ func (g *ContainerLogConsumer) ResetTempFile() error { // MarkAsErrored marks the consumer as errored (which makes it stop accepting logs) func (g *ContainerLogConsumer) MarkAsErrored() { g.hasErrored = true - g.isDone = true - close(g.logListeningDone) + _ = g.stop() } // GetContainer returns the container that this consumer is connected to @@ -784,12 +809,8 @@ func (g *ContainerLogConsumer) Accept(l tc.Log) { return } - if g.isDone { - g.ls.log.Error(). - Str("Test", g.ls.testName). - Str("Container", g.name). - Str("Log", string(l.Content)). - Msg("Consumer has finished, but you are still trying to accept logs. This should never happen") + if g.IsStopped() { + // if the consumer is stopped, we don't want to accept any more logs return }