Skip to content

Commit

Permalink
TT-1856 Fix err consumer has finished but you are still trying to acc…
Browse files Browse the repository at this point in the history
…ept logs (#1365)
  • Loading branch information
lukaszcl authored Nov 21, 2024
1 parent f1f8047 commit af81e8a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 26 deletions.
1 change: 1 addition & 0 deletions lib/.changeset/v1.50.16.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- TT-1856 Fix err consumer has finished but you are still trying to accept logs
4 changes: 2 additions & 2 deletions lib/logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (ct *CustomT) Write(p []byte) (n int, err error) {
}
if ct.ended {
l := GetTestLogger(nil)
l.Error().Msgf("%s %s: %s", afterTestEndedMsg, ct.Name(), string(p))
l.Warn().Msgf("%s %s: %s", afterTestEndedMsg, ct.Name(), string(p))
return len(p), nil
}
ct.T.Log(strings.TrimSuffix(str, "\n"))
Expand All @@ -53,7 +53,7 @@ func (ct CustomT) Printf(format string, v ...interface{}) {
s := "%s: "
formatted := fmt.Sprintf("%s %s%s", afterTestEndedMsg, s, format)
l := GetTestLogger(nil)
l.Error().Msgf(formatted, ct.Name(), v)
l.Warn().Msgf(formatted, ct.Name(), v)
} else {
ct.L.Info().Msgf(format, v...)
}
Expand Down
69 changes: 45 additions & 24 deletions lib/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ func (m *LogStream) ConnectContainer(ctx context.Context, container LogProducing
select {
case logErr := <-container.GetLogProductionErrorChannel():
if logErr != nil {
// Check if the container is not stopped or terminated
if !container.IsRunning() {
m.log.Info().
Str("Container name", name).
Msg("Skipping log producer retrying, because the container is not running anymore")
break
}

m.log.Error().
Err(err).
Str("Container name", name).
Expand Down Expand Up @@ -441,31 +449,48 @@ func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) {

// Stop stops the consumer and closes listening channel (it won't be accepting any logs from now on)
func (g *ContainerLogConsumer) stop() error {
if g.isDone {
return nil
}
var stopErr error

// Use sync.Once to ensure the stop logic is executed only once
g.stopOnce.Do(func() {
// Send a signal to the listening channel if possible
select {
case g.logListeningDone <- struct{}{}:
default:
// No action needed if the channel is already full
}

// Close the channel
close(g.logListeningDone)
})

g.isDone = true
g.logListeningDone <- struct{}{}
defer close(g.logListeningDone)
return stopErr
}

return nil
// IsStopped checks if the consumer has already been stopped
func (g *ContainerLogConsumer) IsStopped() bool {
select {
case <-g.logListeningDone:
// Channel has been closed, meaning stop() was called
return true
default:
// Channel is still open
return false
}
}

// DisconnectContainer disconnects particular container
func (m *LogStream) DisconnectContainer(container LogProducingContainer) error {
var err error

if container.IsRunning() {
m.log.Trace().Str("container", container.GetContainerID()).Msg("Disconnecting container")
err = container.StopLogProducer()
}
m.log.Info().Str("Container", container.GetContainerID()).Msg("Disconnecting container")

consumerFound := false
m.consumerMutex.RLock()
for _, consumer := range m.consumers {
if consumer.container.GetContainerID() == container.GetContainerID() {
consumerFound = true
m.log.Info().Str("Container", consumer.name).Msg("Stopping consumer")
if stopErr := consumer.stop(); err != nil {
m.log.Error().
Err(stopErr).
Expand Down Expand Up @@ -555,7 +580,7 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu
// set the cursor to the end of the file, when done to resume writing, unless it was closed
//revive:disable
defer func() {
if !consumer.isDone {
if !consumer.IsStopped() {
_, deferErr := consumer.tempFile.Seek(0, 2)
attachError(deferErr)
}
Expand Down Expand Up @@ -620,8 +645,10 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu
// FlushLogsToTargets flushes all logs for all consumers (containers) to their targets
func (m *LogStream) FlushLogsToTargets() error {
var preExecuteFn = func(consumer *ContainerLogConsumer) error {
// do not accept any new logs
consumer.isDone = true
// Stop the consumer to ensure it doesn't accept new logs
if err := consumer.stop(); err != nil {
return fmt.Errorf("failed to stop consumer %s: %w", consumer.name, err)
}

for _, handler := range m.logTargetHandlers {
consumer.ls.log.Debug().
Expand Down Expand Up @@ -696,7 +723,7 @@ type ContainerLogConsumer struct {
ls *LogStream
tempFile *os.File
encoder *gob.Encoder
isDone bool
stopOnce sync.Once
hasErrored bool
logListeningDone chan struct{}
container LogProducingContainer
Expand All @@ -717,7 +744,6 @@ func newContainerLogConsumer(ctx context.Context, lw *LogStream, container LogPr
prefix: prefix,
logTargets: logTargets,
ls: lw,
isDone: false,
hasErrored: false,
logListeningDone: make(chan struct{}, 1),
container: container,
Expand Down Expand Up @@ -766,8 +792,7 @@ func (g *ContainerLogConsumer) ResetTempFile() error {
// MarkAsErrored marks the consumer as errored (which makes it stop accepting logs)
func (g *ContainerLogConsumer) MarkAsErrored() {
g.hasErrored = true
g.isDone = true
close(g.logListeningDone)
_ = g.stop()
}

// GetContainer returns the container that this consumer is connected to
Expand All @@ -784,12 +809,8 @@ func (g *ContainerLogConsumer) Accept(l tc.Log) {
return
}

if g.isDone {
g.ls.log.Error().
Str("Test", g.ls.testName).
Str("Container", g.name).
Str("Log", string(l.Content)).
Msg("Consumer has finished, but you are still trying to accept logs. This should never happen")
if g.IsStopped() {
// if the consumer is stopped, we don't want to accept any more logs
return
}

Expand Down

0 comments on commit af81e8a

Please sign in to comment.