diff --git a/debugd/internal/debugd/deploy/service.go b/debugd/internal/debugd/deploy/service.go index 806e8abbc2..edea5bf070 100644 --- a/debugd/internal/debugd/deploy/service.go +++ b/debugd/internal/debugd/deploy/service.go @@ -61,6 +61,7 @@ type SystemdUnit struct { type ServiceManager struct { log *slog.Logger dbus dbusClient + journal journalReader fs afero.Fs systemdUnitFilewriteLock sync.Mutex } @@ -71,6 +72,7 @@ func NewServiceManager(log *slog.Logger) *ServiceManager { return &ServiceManager{ log: log, dbus: &dbusWrapper{}, + journal: &journalctlWrapper{}, fs: fs, systemdUnitFilewriteLock: sync.Mutex{}, } @@ -99,6 +101,11 @@ type dbusConn interface { Close() } +type journalReader interface { + // ReadJournal reads the journal for a specific unit. + readJournal(unit string) string +} + // SystemdAction will perform a systemd action on a service unit (start, stop, restart, reload). func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManagerRequest) error { log := s.log.With(slog.String("unit", request.Unit), slog.String("action", request.Action.String())) @@ -139,7 +146,8 @@ func (s *ServiceManager) SystemdAction(ctx context.Context, request ServiceManag return nil default: - return fmt.Errorf("performing action %q on systemd unit %q failed: expected %q but received %q", request.Action.String(), request.Unit, "done", result) + serviceJournal := s.journal.readJournal(request.Unit) + return fmt.Errorf("performing action %q on systemd unit %q failed: expected %q but received %q. systemd unit journal entries: %s", request.Action.String(), request.Unit, "done", result, serviceJournal) } } diff --git a/debugd/internal/debugd/deploy/service_test.go b/debugd/internal/debugd/deploy/service_test.go index c0c98f93e7..394960c0a6 100644 --- a/debugd/internal/debugd/deploy/service_test.go +++ b/debugd/internal/debugd/deploy/service_test.go @@ -104,6 +104,7 @@ func TestSystemdAction(t *testing.T) { manager := ServiceManager{ log: logger.NewTest(t), dbus: &tc.dbus, + journal: &stubJournalReader{}, fs: fs, systemdUnitFilewriteLock: sync.Mutex{}, } @@ -183,6 +184,7 @@ func TestWriteSystemdUnitFile(t *testing.T) { manager := ServiceManager{ log: logger.NewTest(t), dbus: &tc.dbus, + journal: &stubJournalReader{}, fs: fs, systemdUnitFilewriteLock: sync.Mutex{}, } @@ -296,6 +298,7 @@ func TestOverrideServiceUnitExecStart(t *testing.T) { manager := ServiceManager{ log: logger.NewTest(t), dbus: &tc.dbus, + journal: &stubJournalReader{}, fs: fs, systemdUnitFilewriteLock: sync.Mutex{}, } @@ -367,3 +370,9 @@ func (c *fakeDbusConn) ReloadContext(_ context.Context) error { } func (c *fakeDbusConn) Close() {} + +type stubJournalReader struct{} + +func (s *stubJournalReader) readJournal(_ string) string { + return "" +} diff --git a/debugd/internal/debugd/deploy/wrappers.go b/debugd/internal/debugd/deploy/wrappers.go index 9ec9f0b019..f072308d9f 100644 --- a/debugd/internal/debugd/deploy/wrappers.go +++ b/debugd/internal/debugd/deploy/wrappers.go @@ -8,6 +8,7 @@ package deploy import ( "context" + "os/exec" "github.com/coreos/go-systemd/v22/dbus" ) @@ -48,3 +49,85 @@ func (c *dbusConnWrapper) ReloadContext(ctx context.Context) error { func (c *dbusConnWrapper) Close() { c.conn.Close() } + +type journalctlWrapper struct{} + +func (j *journalctlWrapper) readJournal(unit string) string { + out, _ := exec.CommandContext(context.Background(), "journalctl", "-u", unit, "--no-pager").CombinedOutput() + return string(out) +} + +/* +// Preferably, we would use the systemd journal API directly. +// However, this requires linking against systemd libraries, so we go with the easier journalctl command for now. + +type sdJournalWrapper struct{} + +// readJournal reads the journal for a specific unit. +func (s *sdJournalWrapper) readJournal(unit string) string { + journal, err := sdjournal.NewJournal() + if err != nil { + log.Printf("opening journal: %s", err) + return "" + } + defer journal.Close() + + // Filter the journal for the specified unit + filters := []string{ + fmt.Sprintf("_SYSTEMD_UNIT=%s", unit), + fmt.Sprintf("UNIT=%s", unit), + fmt.Sprintf("OBJECT_SYSTEMD_UNIT=%s", unit), + fmt.Sprintf("_SYSTEMD_SLICE=%s", unit), + fmt.Sprintf("_SYSTEMD_USER_UNIT=%s", unit), + fmt.Sprintf("USER_UNIT=%s", unit), + fmt.Sprintf("COREDUMP_USER_UNIT=%s", unit), + fmt.Sprintf("OBJECT_SYSTEMD_USER_UNIT=%s", unit), + fmt.Sprintf("_SYSTEMD_USER_SLICE=%s", unit), + } + for _, filter := range filters { + if err := journal.AddMatch(filter); err != nil { + log.Printf("applying filter %q: %s", filter, err) + return "" + } + if err := journal.AddDisjunction(); err != nil { + log.Printf("adding disjunction to journal filter: %s", err) + return "" + } + } + + // Seek to the beginning of the journal + if err := journal.SeekHead(); err != nil { + log.Printf("seeking journal tail: %s", err) + return "" + } + + // Iterate over the journal entries + var previousCursor string + journalLog := &strings.Builder{} + for { + if _, err := journal.Next(); err != nil { + log.Printf("getting next entry in journal: %s", err) + return "" + } + + entry, err := journal.GetEntry() + if err != nil { + log.Printf("getting journal entry: %s", err) + return "" + } + + // Abort if we reached the end of the journal, i.e. the cursor didn't change + if entry.Cursor == previousCursor { + break + } + previousCursor = entry.Cursor + + if _, err := journalLog.WriteString(entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE] + "\n"); err != nil { + log.Printf("copying journal entry to buffer: %s", err) + return "" + } + } + + return strings.TrimSpace(journalLog.String()) +} +*/ diff --git a/debugd/internal/debugd/logcollector/logcollector.go b/debugd/internal/debugd/logcollector/logcollector.go index 9723d102fd..59964287d8 100644 --- a/debugd/internal/debugd/logcollector/logcollector.go +++ b/debugd/internal/debugd/logcollector/logcollector.go @@ -200,6 +200,10 @@ func startPod(ctx context.Context, logger *slog.Logger) error { if err := runLogstashCmd.Start(); err != nil { return fmt.Errorf("failed to start logstash: %w", err) } + if out, err := exec.CommandContext(ctx, "podman", "wait", "logstash", "--condition=running", "--interval=15s").CombinedOutput(); err != nil { + logger.Error("Logstash container failed to reach healthy status", "err", err, "output", out) + return fmt.Errorf("waiting for logstash container to reach healthy status: %w; output: %s", err, out) + } // start filebeat container filebeatLog := newCmdLogger(logger.WithGroup("filebeat")) @@ -225,6 +229,10 @@ func startPod(ctx context.Context, logger *slog.Logger) error { if err := runFilebeatCmd.Start(); err != nil { return fmt.Errorf("failed to run filebeat: %w", err) } + if out, err := exec.CommandContext(ctx, "podman", "wait", "filebeat", "--condition=running", "--interval=15s").CombinedOutput(); err != nil { + logger.Error("Filebeat container failed to reach healthy status", "err", err, "output", out) + return fmt.Errorf("waiting for filebeat container to reach healthy status: %w; output: %s", err, out) + } return nil }