Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-1180] use RWMutex to protect LogStream's consumer map #959

Merged
merged 1 commit into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions logstream/logprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ type LogProcessorFn[ReturnType any] func(content LogContent, returnValue *Return
func (l *LogProcessor[ReturnType]) ProcessContainerLogs(containerName string, processFn func(content LogContent, returnValue *ReturnType) error) (*ReturnType, error) {
containerName = strings.Replace(containerName, "/", "", 1)
var consumer *ContainerLogConsumer
l.logStream.consumerMutex.RLock()
for _, c := range l.logStream.consumers {
if c.name == containerName {
consumer = c
break
}
}
l.logStream.consumerMutex.RUnlock()

if consumer == nil {
return new(ReturnType), fmt.Errorf("no consumer found for container %s", containerName)
Expand Down
82 changes: 56 additions & 26 deletions logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type LogStream struct {
loki *wasp.LokiClient
containers []LogProducingContainer
consumers map[string]*ContainerLogConsumer
consumerMutex sync.RWMutex
logTargetHandlers map[LogTarget]HandleLogTarget
enabledLogTargets []LogTarget
acceptMutex sync.Mutex
Expand Down Expand Up @@ -199,6 +200,8 @@ func (m *LogStream) ConnectContainer(ctx context.Context, container LogProducing
Str("Name", name).
Str("Timeout", m.loggingConfig.LogStream.LogProducerTimeout.String()).
Msg("Connecting container logs")
m.consumerMutex.Lock()
defer m.consumerMutex.Unlock()
m.consumers[name] = cons
m.containers = append(m.containers, container)
container.FollowOutput(cons)
Expand Down Expand Up @@ -301,6 +304,9 @@ func (m *LogStream) ConnectContainer(ctx context.Context, container LogProducing

// GetConsumers returns all consumers
func (m *LogStream) GetConsumers() map[string]*ContainerLogConsumer {
m.consumerMutex.RLock()
defer m.consumerMutex.RUnlock()

return m.consumers
}

Expand All @@ -312,48 +318,49 @@ func wrapError(existingErr, newErr error) error {
return fmt.Errorf("%w: %w", existingErr, newErr)
}

var noOpPostDisconnectFn = func(m *LogStream) error { return nil }

// Shutdown disconnects all containers and stops all consumers
func (m *LogStream) Shutdown(context context.Context) error {
var err error
for _, c := range m.consumers {
discErr := m.DisconnectContainer(c.container)
if discErr != nil {
m.log.Error().
Err(discErr).
Str("Name", c.name).
Msg("Failed to disconnect container")
return m.shutdownWithFunction(context, noOpPostDisconnectFn)
}

err = wrapError(err, discErr)
}
}
// shutdownWithFunction disconnects all containers and stops all consumers and executes postDisconnectFn after all
// containers are disconnected, but before Loki is shutdown
func (m *LogStream) shutdownWithFunction(context context.Context, postDisconnectFn func(m *LogStream) error) error {
var wrappedErr error

if m.loki != nil {
m.loki.StopNow()
var containers []LogProducingContainer
m.consumerMutex.RLock()
for _, c := range m.consumers {
containers = append(containers, c.container)
}
m.consumerMutex.RUnlock()

return err
}
// first disconnect all containers, so that no new logs are accepted
for _, container := range containers {
name, err := container.Name(context)
if err != nil {
m.log.Error().
Err(err).
Str("Name", name).
Msg("Failed to get container name")
wrappedErr = wrapError(wrappedErr, err)

// FlushAndShutdown flushes all logs to their targets and shuts down the log stream in a default sequence
func (m *LogStream) FlushAndShutdown() error {
var wrappedErr error
continue
}

// first disconnect all containers, so that no new logs are accepted
for _, c := range m.consumers {
if err := m.DisconnectContainer(c.container); err != nil {
if err := m.DisconnectContainer(container); err != nil {
m.log.Error().
Err(err).
Str("Name", c.name).
Str("Name", name).
Msg("Failed to disconnect container")

wrappedErr = wrapError(wrappedErr, err)
}
}

if err := m.FlushLogsToTargets(); err != nil {
m.log.Error().
Err(err).
Msg("Failed to flush logs to targets")
if err := postDisconnectFn(m); err != nil {
wrappedErr = wrapError(wrappedErr, err)
}

Expand All @@ -364,6 +371,23 @@ func (m *LogStream) FlushAndShutdown() error {
return wrappedErr
}

// FlushAndShutdown flushes all logs to their targets and shuts down the log stream in a default sequence
func (m *LogStream) FlushAndShutdown() error {
var logFlushFn = func(m *LogStream) error {
if err := m.FlushLogsToTargets(); err != nil {
m.log.Error().
Err(err).
Msg("Failed to flush logs to targets")

return err
}

return nil
}

return m.shutdownWithFunction(context.Background(), logFlushFn)
}

type LogWriter = func(testName string, name string, location interface{}) error

// PrintLogTargetsLocations prints all log targets locations to stdout
Expand Down Expand Up @@ -396,7 +420,9 @@ func (m *LogStream) GetLogLocation() string {
func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) {
for _, handler := range m.logTargetHandlers {
name := string(handler.GetTarget())
m.consumerMutex.RLock()
location, err := handler.GetLogLocation(m.consumers)
m.consumerMutex.RUnlock()
if err != nil {
if strings.Contains(err.Error(), ShorteningFailedErr) {
m.log.Warn().Str("Handler", name).Err(err).Msg("Failed to shorten Grafana URL, will use original one")
Expand Down Expand Up @@ -435,6 +461,7 @@ func (m *LogStream) DisconnectContainer(container LogProducingContainer) error {
}

consumerFound := false
m.consumerMutex.RLock()
for _, consumer := range m.consumers {
if consumer.container.GetContainerID() == container.GetContainerID() {
consumerFound = true
Expand All @@ -448,6 +475,7 @@ func (m *LogStream) DisconnectContainer(container LogProducingContainer) error {
break
}
}
m.consumerMutex.RUnlock()

if !consumerFound {
m.log.Warn().
Expand Down Expand Up @@ -500,6 +528,8 @@ func (m *LogStream) GetAllLogsAndConsume(preExecuteFn ConsumerConsumingFn, consu
}
}

m.consumerMutex.RLock()
defer m.consumerMutex.RUnlock()
for _, consumer := range m.consumers {
// nothing to do if no log targets are configured
if len(consumer.logTargets) == 0 {
Expand Down
Loading