diff --git a/logstream/logprocessor.go b/logstream/logprocessor.go index f2819d8c2..b5f929f48 100644 --- a/logstream/logprocessor.go +++ b/logstream/logprocessor.go @@ -29,12 +29,14 @@ type LogProcessorFn[ReturnType any] func(content LogContent, returnValue *Return func (l *LogProcessor[ReturnType]) ProcessContainerLogs(containerName string, processFn func(content LogContent, returnValue *ReturnType) error) (*ReturnType, error) { containerName = strings.Replace(containerName, "/", "", 1) var consumer *ContainerLogConsumer + l.logStream.consumerMutex.RLock() for _, c := range l.logStream.consumers { if c.name == containerName { consumer = c break } } + l.logStream.consumerMutex.RUnlock() if consumer == nil { return new(ReturnType), fmt.Errorf("no consumer found for container %s", containerName) diff --git a/logstream/logstream.go b/logstream/logstream.go index 8579a0313..b0d1aaf84 100644 --- a/logstream/logstream.go +++ b/logstream/logstream.go @@ -55,6 +55,7 @@ type LogStream struct { loki *wasp.LokiClient containers []LogProducingContainer consumers map[string]*ContainerLogConsumer + consumerMutex sync.RWMutex logTargetHandlers map[LogTarget]HandleLogTarget enabledLogTargets []LogTarget acceptMutex sync.Mutex @@ -199,6 +200,8 @@ func (m *LogStream) ConnectContainer(ctx context.Context, container LogProducing Str("Name", name). Str("Timeout", m.loggingConfig.LogStream.LogProducerTimeout.String()). Msg("Connecting container logs") + m.consumerMutex.Lock() + defer m.consumerMutex.Unlock() m.consumers[name] = cons m.containers = append(m.containers, container) container.FollowOutput(cons) @@ -301,6 +304,9 @@ func (m *LogStream) ConnectContainer(ctx context.Context, container LogProducing // GetConsumers returns all consumers func (m *LogStream) GetConsumers() map[string]*ContainerLogConsumer { + m.consumerMutex.RLock() + defer m.consumerMutex.RUnlock() + return m.consumers } @@ -312,48 +318,49 @@ func wrapError(existingErr, newErr error) error { return fmt.Errorf("%w: %w", existingErr, newErr) } +var noOpPostDisconnectFn = func(m *LogStream) error { return nil } + // Shutdown disconnects all containers and stops all consumers func (m *LogStream) Shutdown(context context.Context) error { - var err error - for _, c := range m.consumers { - discErr := m.DisconnectContainer(c.container) - if discErr != nil { - m.log.Error(). - Err(discErr). - Str("Name", c.name). - Msg("Failed to disconnect container") + return m.shutdownWithFunction(context, noOpPostDisconnectFn) +} - err = wrapError(err, discErr) - } - } +// shutdownWithFunction disconnects all containers and stops all consumers and executes postDisconnectFn after all +// containers are disconnected, but before Loki is shutdown +func (m *LogStream) shutdownWithFunction(context context.Context, postDisconnectFn func(m *LogStream) error) error { + var wrappedErr error - if m.loki != nil { - m.loki.StopNow() + var containers []LogProducingContainer + m.consumerMutex.RLock() + for _, c := range m.consumers { + containers = append(containers, c.container) } + m.consumerMutex.RUnlock() - return err -} + // first disconnect all containers, so that no new logs are accepted + for _, container := range containers { + name, err := container.Name(context) + if err != nil { + m.log.Error(). + Err(err). + Str("Name", name). + Msg("Failed to get container name") + wrappedErr = wrapError(wrappedErr, err) -// FlushAndShutdown flushes all logs to their targets and shuts down the log stream in a default sequence -func (m *LogStream) FlushAndShutdown() error { - var wrappedErr error + continue + } - // first disconnect all containers, so that no new logs are accepted - for _, c := range m.consumers { - if err := m.DisconnectContainer(c.container); err != nil { + if err := m.DisconnectContainer(container); err != nil { m.log.Error(). Err(err). - Str("Name", c.name). + Str("Name", name). Msg("Failed to disconnect container") wrappedErr = wrapError(wrappedErr, err) } } - if err := m.FlushLogsToTargets(); err != nil { - m.log.Error(). - Err(err). - Msg("Failed to flush logs to targets") + if err := postDisconnectFn(m); err != nil { wrappedErr = wrapError(wrappedErr, err) } @@ -364,6 +371,23 @@ func (m *LogStream) FlushAndShutdown() error { return wrappedErr } +// FlushAndShutdown flushes all logs to their targets and shuts down the log stream in a default sequence +func (m *LogStream) FlushAndShutdown() error { + var logFlushFn = func(m *LogStream) error { + if err := m.FlushLogsToTargets(); err != nil { + m.log.Error(). + Err(err). + Msg("Failed to flush logs to targets") + + return err + } + + return nil + } + + return m.shutdownWithFunction(context.Background(), logFlushFn) +} + type LogWriter = func(testName string, name string, location interface{}) error // PrintLogTargetsLocations prints all log targets locations to stdout @@ -396,7 +420,9 @@ func (m *LogStream) GetLogLocation() string { func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) { for _, handler := range m.logTargetHandlers { name := string(handler.GetTarget()) + m.consumerMutex.RLock() location, err := handler.GetLogLocation(m.consumers) + m.consumerMutex.RUnlock() if err != nil { if strings.Contains(err.Error(), ShorteningFailedErr) { m.log.Warn().Str("Handler", name).Err(err).Msg("Failed to shorten Grafana URL, will use original one") @@ -435,6 +461,7 @@ func (m *LogStream) DisconnectContainer(container LogProducingContainer) error { } consumerFound := false + m.consumerMutex.RLock() for _, consumer := range m.consumers { if consumer.container.GetContainerID() == container.GetContainerID() { consumerFound = true @@ -448,6 +475,7 @@ func (m *LogStream) DisconnectContainer(container LogProducingContainer) error { break } } + m.consumerMutex.RUnlock() if !consumerFound { m.log.Warn(). @@ -500,6 +528,8 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu } } + m.consumerMutex.RLock() + defer m.consumerMutex.RUnlock() for _, consumer := range m.consumers { // nothing to do if no log targets are configured if len(consumer.logTargets) == 0 {