diff --git a/cmd/ops_agent_uap_wrapper/plugin.go b/cmd/ops_agent_uap_wrapper/plugin.go index eb35cb7e87..f6da362586 100644 --- a/cmd/ops_agent_uap_wrapper/plugin.go +++ b/cmd/ops_agent_uap_wrapper/plugin.go @@ -4,8 +4,10 @@ import ( "context" "flag" "log" + "time" pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_wrapper/google_guest_agent/plugin" + "github.com/GoogleCloudPlatform/ops-agent/internal/logs" "google.golang.org/grpc" ) @@ -47,7 +49,7 @@ func main() { server := grpc.NewServer() defer server.GracefulStop() - ps := &OpsAgentPluginServer{server: server} + ps := &OpsAgentPluginServer{server: server, logger: logs.Default()} // Successfully registering the server and starting to listen on the address // offered mean Guest Agent was successful in installing/launching the plugin // & will manage the lifecycle (start, stop, or revision change) here onwards. @@ -60,9 +62,8 @@ func main() { ctx := context.Background() ps.Start(ctx, &pb.StartRequest{}) log.Print(ps.GetStatus(ctx, &pb.GetStatusRequest{})) - ps.Start(ctx, &pb.StartRequest{}) + time.Sleep(1 * time.Minute) log.Print(ps.GetStatus(ctx, &pb.GetStatusRequest{})) ps.Stop(ctx, &pb.StopRequest{}) log.Print(ps.GetStatus(ctx, &pb.GetStatusRequest{})) - ps.Stop(ctx, &pb.StopRequest{}) } diff --git a/cmd/ops_agent_uap_wrapper/service.go b/cmd/ops_agent_uap_wrapper/service.go index 60ee6930e4..e2836aa216 100644 --- a/cmd/ops_agent_uap_wrapper/service.go +++ b/cmd/ops_agent_uap_wrapper/service.go @@ -2,6 +2,8 @@ package main import ( "context" + "path/filepath" + "sync" "bytes" "fmt" @@ -15,6 +17,7 @@ import ( "google.golang.org/grpc" pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_wrapper/google_guest_agent/plugin" + "github.com/GoogleCloudPlatform/ops-agent/internal/logs" ) const MaximumWaitForProcessStart = 5 * time.Second @@ -24,6 +27,7 @@ const LogsDirectory = "/var/log/google-cloud-ops-agent" const FluentBitStateDiectory = "/var/lib/google-cloud-ops-agent/fluent-bit" const FluentBitRuntimeDirectory = "/run/google-cloud-ops-agent-fluent-bit" const OtelRuntimeDirectory = "/run/google-cloud-ops-agent-opentelemetry-collector" +const OpsAgentUapPluginLog = "ops-agent-uap-plugin.log" // PluginServer implements the plugin RPC server interface. type OpsAgentPluginServer struct { @@ -32,6 +36,7 @@ type OpsAgentPluginServer struct { // cancel is the cancel function to be called when core plugin is stopped. cancel context.CancelFunc startContext context.Context + logger logs.StructuredLogger } // Apply applies the config sent or performs the work defined in the message. @@ -41,6 +46,11 @@ type OpsAgentPluginServer struct { func (ps *OpsAgentPluginServer) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) { return &pb.ApplyResponse{}, nil } +func (ps *OpsAgentPluginServer) Cancel() { + if ps.cancel != nil { + ps.cancel() + } +} // sigHandler handles SIGTERM, SIGINT etc signals. The function provided in the // cancel argument handles internal framework termination and the plugin @@ -70,8 +80,8 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) { }) defer func() { - log.Print("Stopping Ops Agent plugin...") - ps.server.GracefulStop() + ps.logger.Infof("Stopping Ops Agent plugin...") + ps.cancel() }() // Starting ExecStartPre commands @@ -79,8 +89,9 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) { Prefix+"/libexec/google_cloud_ops_agent_engine", "-in", Sysconfdir+"/google-cloud-ops-agent/config.yaml", ) - if err := runCommand(execStartPreConfigValidationCmd); err != nil { - log.Fatalf("failed to validate Ops Agent default config.yaml: %s", err) + if err := runCommand(execStartPreConfigValidationCmd, ps.logger); err != nil { + ps.logger.Errorf("failed to validate Ops Agent default config.yaml: %s", err) + return } execStartPreOtelCmd := exec.CommandContext(ctx, @@ -90,8 +101,9 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) { "-out", OtelRuntimeDirectory, "-logs", LogsDirectory) - if err := runCommand(execStartPreOtelCmd); err != nil { - log.Fatalf("failed to generate config yaml for Otel: %s", err) + if err := runCommand(execStartPreOtelCmd, ps.logger); err != nil { + ps.logger.Errorf("failed to generate config yaml for Otel: %s", err) + return // context is cancelled on Return, and Start() can be triggerred again to start up ops Agent plugin again. } execStartPreFluentBitCmd := exec.CommandContext(ctx, @@ -101,22 +113,28 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) { "-out", FluentBitRuntimeDirectory, "-logs", LogsDirectory, "-state", FluentBitStateDiectory) - if err := runCommand(execStartPreFluentBitCmd); err != nil { - log.Fatalf("failed to generate config yaml for FluentBit: %s", err) + if err := runCommand(execStartPreFluentBitCmd, ps.logger); err != nil { + ps.logger.Errorf("failed to generate config yaml for FluentBit: %s", err) + return } + + var wg sync.WaitGroup + // Starting Diagnostics Service execDiagnosticsCmd := exec.CommandContext(ctx, Prefix+"/libexec/google_cloud_ops_agent_diagnostics", "-config", Sysconfdir+"/google-cloud-ops-agent/config.yaml", ) - go restartCommand(execDiagnosticsCmd) + wg.Add(1) + go restartCommand(ctx, &wg, ps.logger, execDiagnosticsCmd) // Starting Otel execOtelCmd := exec.CommandContext(ctx, Prefix+"/subagents/opentelemetry-collector/otelopscol", "--config", OtelRuntimeDirectory+"/otel.yaml", ) - go restartCommand(execOtelCmd) + wg.Add(1) + go restartCommand(ctx, &wg, ps.logger, execOtelCmd) // Starting FluentBit execFluentBitCmd := exec.CommandContext(ctx, @@ -128,17 +146,20 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) { "--parser", FluentBitRuntimeDirectory+"/fluent_bit_parser.conf", "--storage_path", FluentBitStateDiectory+"/buffers", ) - if err := restartCommand(execFluentBitCmd); err != nil { - log.Fatal(err) - } + wg.Add(1) + go restartCommand(ctx, &wg, ps.logger, execFluentBitCmd) + wg.Wait() + ps.logger.Infof("wait group has exited") } // Start starts the plugin and initiates the plugin functionality. // Until plugin receives Start request plugin is expected to be not functioning // and just listening on the address handed off waiting for the request. func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest) (*pb.StartResponse, error) { + logDir := msg.Config.GetStateDirectoryPath() + ps.logger = CreateOpsAgentUapPluginLogger(logDir) if ps.startContext != nil && ps.startContext.Err() == nil { - log.Print("Ops Agent plugin is started already, skipping the current Start() request") + ps.logger.Infof("Ops Agent plugin is started already, skipping the current Start() request") return &pb.StartResponse{}, nil } pCtx, cancel := context.WithCancel(context.Background()) @@ -155,11 +176,11 @@ func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest) // state before exiting it can be done on this request. func (ps *OpsAgentPluginServer) Stop(ctx context.Context, msg *pb.StopRequest) (*pb.StopResponse, error) { if ps.cancel == nil || ps.startContext == nil || ps.startContext.Err() != nil { - log.Print("Ops Agent plugin is already stoppped, skipping the current Stop() request") + ps.logger.Warnf("Ops Agent plugin is already stoppped, skipping the current Stop() request") return &pb.StopResponse{}, nil } - log.Printf("Handling stop request %+v, stopping core plugin...", msg) + ps.logger.Infof("Handling stop request %+v, stopping core plugin...", msg) ps.cancel() return &pb.StopResponse{}, nil } @@ -177,7 +198,7 @@ func (ps *OpsAgentPluginServer) GetStatus(ctx context.Context, msg *pb.GetStatus return &pb.Status{Code: 0, Results: []string{"Plugin is running ok"}}, nil } -func runCommand(cmd *exec.Cmd) error { +func runCommand(cmd *exec.Cmd, logger logs.StructuredLogger) error { if cmd == nil { return nil } @@ -188,18 +209,23 @@ func runCommand(cmd *exec.Cmd) error { var outb, errb bytes.Buffer cmd.Stderr = &errb cmd.Stdout = &outb - log.Printf("Running command: %s, with arguments: %s", cmd.Path, cmd.Args) + logger.Infof("Running command: %s, with arguments: %s", cmd.Path, cmd.Args) if err := cmd.Run(); err != nil { fullError := fmt.Errorf("failed to execute cmd: %s with arguments %s, \ncommand output: %s\n error: %s %s", cmd.Path, cmd.Args, outb.String(), errb.String(), err) - log.Print(fullError) return fullError } return nil } -func restartCommand(cmd *exec.Cmd) error { +func restartCommand(ctx context.Context, wg *sync.WaitGroup, logger logs.StructuredLogger, cmd *exec.Cmd) { + defer wg.Done() if cmd == nil { - return nil + return + } + if ctx.Err() != nil { + // context has been cancelled + logger.Warnf("Context has been cancelled, exiting") + return } cmd.SysProcAttr = &syscall.SysProcAttr{ Pdeathsig: syscall.SIGKILL, @@ -208,7 +234,7 @@ func restartCommand(cmd *exec.Cmd) error { var outb, errb bytes.Buffer cmd.Stderr = &errb cmd.Stdout = &outb - log.Printf("Restarting command: %s, with arguments: %s", cmd.Path, cmd.Args) + logger.Infof("Restarting command: %s, with arguments: %s", cmd.Path, cmd.Args) err := cmd.Run() if err != nil { // https://pkg.go.dev/os#ProcessState.ExitCode Don't restart if the command was terminated by signals. @@ -216,13 +242,40 @@ func restartCommand(cmd *exec.Cmd) error { if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ProcessState.ExitCode() == -1 { notRestartedError := fmt.Errorf("command terminated by signals, not restarting\n%s", fullError) - log.Print(notRestartedError) - return notRestartedError + logger.Errorf("%s", notRestartedError) + return } - log.Print(fullError) + logger.Errorf("%s", fullError) + } else { - log.Printf("command: %s, with arguments: %s completed successfully", cmd.Path, cmd.Args) + logger.Infof("command: %s, with arguments: %s completed successfully", cmd.Path, cmd.Args) } - cmdToRestart := exec.Command(cmd.Path, cmd.Args...) - return restartCommand(cmdToRestart) + // Sleep 10 seconds before retarting the task + time.Sleep(5 * time.Second) + cmdToRestart := exec.CommandContext(ctx, cmd.Path, cmd.Args...) + wg.Add(1) + go restartCommand(ctx, wg, logger, cmdToRestart) +} + +func CreateOpsAgentUapPluginLogger(logDir string) logs.StructuredLogger { + // Check if the directory already exists + if _, err := os.Stat(logDir); os.IsNotExist(err) { + // Directory does not exist, create it + err := os.Mkdir(logDir, 0755) // 0755 sets permissions (read/write/execute for owner, read/execute for group and others) + if err != nil { + log.Printf("failed to create directory for %q: %v", logDir, err) + logDir = "" + } + } + + // Create the log file under the directory + path := filepath.Join(logDir, OpsAgentUapPluginLog) + file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("failed to open health checks log file %q: %v", path, err) + return logs.Default() + } + file.Close() + + return logs.New(path) }