Skip to content

Commit

Permalink
Redirected wrapper logs into a file.
Browse files Browse the repository at this point in the history
  • Loading branch information
XuechunHou committed Nov 29, 2024
1 parent 364a9bb commit 7200f96
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 31 deletions.
7 changes: 4 additions & 3 deletions cmd/ops_agent_uap_wrapper/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"flag"
"log"
"time"

pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_wrapper/google_guest_agent/plugin"
"github.com/GoogleCloudPlatform/ops-agent/internal/logs"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -47,7 +49,7 @@ func main() {
server := grpc.NewServer()
defer server.GracefulStop()

ps := &OpsAgentPluginServer{server: server}
ps := &OpsAgentPluginServer{server: server, logger: logs.Default()}
// Successfully registering the server and starting to listen on the address
// offered mean Guest Agent was successful in installing/launching the plugin
// & will manage the lifecycle (start, stop, or revision change) here onwards.
Expand All @@ -60,9 +62,8 @@ func main() {
ctx := context.Background()
ps.Start(ctx, &pb.StartRequest{})
log.Print(ps.GetStatus(ctx, &pb.GetStatusRequest{}))
ps.Start(ctx, &pb.StartRequest{})
time.Sleep(1 * time.Minute)
log.Print(ps.GetStatus(ctx, &pb.GetStatusRequest{}))
ps.Stop(ctx, &pb.StopRequest{})
log.Print(ps.GetStatus(ctx, &pb.GetStatusRequest{}))
ps.Stop(ctx, &pb.StopRequest{})
}
109 changes: 81 additions & 28 deletions cmd/ops_agent_uap_wrapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"path/filepath"
"sync"

"bytes"
"fmt"
Expand All @@ -15,6 +17,7 @@ import (
"google.golang.org/grpc"

pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_wrapper/google_guest_agent/plugin"
"github.com/GoogleCloudPlatform/ops-agent/internal/logs"
)

const MaximumWaitForProcessStart = 5 * time.Second
Expand All @@ -24,6 +27,7 @@ const LogsDirectory = "/var/log/google-cloud-ops-agent"
const FluentBitStateDiectory = "/var/lib/google-cloud-ops-agent/fluent-bit"
const FluentBitRuntimeDirectory = "/run/google-cloud-ops-agent-fluent-bit"
const OtelRuntimeDirectory = "/run/google-cloud-ops-agent-opentelemetry-collector"
const OpsAgentUapPluginLog = "ops-agent-uap-plugin.log"

// PluginServer implements the plugin RPC server interface.
type OpsAgentPluginServer struct {
Expand All @@ -32,6 +36,7 @@ type OpsAgentPluginServer struct {
// cancel is the cancel function to be called when core plugin is stopped.
cancel context.CancelFunc
startContext context.Context
logger logs.StructuredLogger
}

// Apply applies the config sent or performs the work defined in the message.
Expand All @@ -41,6 +46,11 @@ type OpsAgentPluginServer struct {
func (ps *OpsAgentPluginServer) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{}, nil
}
func (ps *OpsAgentPluginServer) Cancel() {
if ps.cancel != nil {
ps.cancel()
}
}

// sigHandler handles SIGTERM, SIGINT etc signals. The function provided in the
// cancel argument handles internal framework termination and the plugin
Expand Down Expand Up @@ -70,17 +80,18 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) {
})

defer func() {
log.Print("Stopping Ops Agent plugin...")
ps.server.GracefulStop()
ps.logger.Infof("Stopping Ops Agent plugin...")
ps.cancel()
}()

// Starting ExecStartPre commands
execStartPreConfigValidationCmd := exec.CommandContext(ctx,
Prefix+"/libexec/google_cloud_ops_agent_engine",
"-in", Sysconfdir+"/google-cloud-ops-agent/config.yaml",
)
if err := runCommand(execStartPreConfigValidationCmd); err != nil {
log.Fatalf("failed to validate Ops Agent default config.yaml: %s", err)
if err := runCommand(execStartPreConfigValidationCmd, ps.logger); err != nil {
ps.logger.Errorf("failed to validate Ops Agent default config.yaml: %s", err)
return
}

execStartPreOtelCmd := exec.CommandContext(ctx,
Expand All @@ -90,8 +101,9 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) {
"-out", OtelRuntimeDirectory,
"-logs", LogsDirectory)

if err := runCommand(execStartPreOtelCmd); err != nil {
log.Fatalf("failed to generate config yaml for Otel: %s", err)
if err := runCommand(execStartPreOtelCmd, ps.logger); err != nil {
ps.logger.Errorf("failed to generate config yaml for Otel: %s", err)
return // context is cancelled on Return, and Start() can be triggerred again to start up ops Agent plugin again.
}

execStartPreFluentBitCmd := exec.CommandContext(ctx,
Expand All @@ -101,22 +113,28 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) {
"-out", FluentBitRuntimeDirectory,
"-logs", LogsDirectory, "-state", FluentBitStateDiectory)

if err := runCommand(execStartPreFluentBitCmd); err != nil {
log.Fatalf("failed to generate config yaml for FluentBit: %s", err)
if err := runCommand(execStartPreFluentBitCmd, ps.logger); err != nil {
ps.logger.Errorf("failed to generate config yaml for FluentBit: %s", err)
return
}

var wg sync.WaitGroup

// Starting Diagnostics Service
execDiagnosticsCmd := exec.CommandContext(ctx,
Prefix+"/libexec/google_cloud_ops_agent_diagnostics",
"-config", Sysconfdir+"/google-cloud-ops-agent/config.yaml",
)
go restartCommand(execDiagnosticsCmd)
wg.Add(1)
go restartCommand(ctx, &wg, ps.logger, execDiagnosticsCmd)

// Starting Otel
execOtelCmd := exec.CommandContext(ctx,
Prefix+"/subagents/opentelemetry-collector/otelopscol",
"--config", OtelRuntimeDirectory+"/otel.yaml",
)
go restartCommand(execOtelCmd)
wg.Add(1)
go restartCommand(ctx, &wg, ps.logger, execOtelCmd)

// Starting FluentBit
execFluentBitCmd := exec.CommandContext(ctx,
Expand All @@ -128,17 +146,20 @@ func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) {
"--parser", FluentBitRuntimeDirectory+"/fluent_bit_parser.conf",
"--storage_path", FluentBitStateDiectory+"/buffers",
)
if err := restartCommand(execFluentBitCmd); err != nil {
log.Fatal(err)
}
wg.Add(1)
go restartCommand(ctx, &wg, ps.logger, execFluentBitCmd)
wg.Wait()
ps.logger.Infof("wait group has exited")
}

// Start starts the plugin and initiates the plugin functionality.
// Until plugin receives Start request plugin is expected to be not functioning
// and just listening on the address handed off waiting for the request.
func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest) (*pb.StartResponse, error) {
logDir := msg.Config.GetStateDirectoryPath()
ps.logger = CreateOpsAgentUapPluginLogger(logDir)
if ps.startContext != nil && ps.startContext.Err() == nil {
log.Print("Ops Agent plugin is started already, skipping the current Start() request")
ps.logger.Infof("Ops Agent plugin is started already, skipping the current Start() request")
return &pb.StartResponse{}, nil
}
pCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -155,11 +176,11 @@ func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest)
// state before exiting it can be done on this request.
func (ps *OpsAgentPluginServer) Stop(ctx context.Context, msg *pb.StopRequest) (*pb.StopResponse, error) {
if ps.cancel == nil || ps.startContext == nil || ps.startContext.Err() != nil {
log.Print("Ops Agent plugin is already stoppped, skipping the current Stop() request")
ps.logger.Warnf("Ops Agent plugin is already stoppped, skipping the current Stop() request")
return &pb.StopResponse{}, nil

}
log.Printf("Handling stop request %+v, stopping core plugin...", msg)
ps.logger.Infof("Handling stop request %+v, stopping core plugin...", msg)
ps.cancel()
return &pb.StopResponse{}, nil
}
Expand All @@ -177,7 +198,7 @@ func (ps *OpsAgentPluginServer) GetStatus(ctx context.Context, msg *pb.GetStatus
return &pb.Status{Code: 0, Results: []string{"Plugin is running ok"}}, nil
}

func runCommand(cmd *exec.Cmd) error {
func runCommand(cmd *exec.Cmd, logger logs.StructuredLogger) error {
if cmd == nil {
return nil
}
Expand All @@ -188,18 +209,23 @@ func runCommand(cmd *exec.Cmd) error {
var outb, errb bytes.Buffer
cmd.Stderr = &errb
cmd.Stdout = &outb
log.Printf("Running command: %s, with arguments: %s", cmd.Path, cmd.Args)
logger.Infof("Running command: %s, with arguments: %s", cmd.Path, cmd.Args)
if err := cmd.Run(); err != nil {
fullError := fmt.Errorf("failed to execute cmd: %s with arguments %s, \ncommand output: %s\n error: %s %s", cmd.Path, cmd.Args, outb.String(), errb.String(), err)
log.Print(fullError)
return fullError
}
return nil
}

func restartCommand(cmd *exec.Cmd) error {
func restartCommand(ctx context.Context, wg *sync.WaitGroup, logger logs.StructuredLogger, cmd *exec.Cmd) {
defer wg.Done()
if cmd == nil {
return nil
return
}
if ctx.Err() != nil {
// context has been cancelled
logger.Warnf("Context has been cancelled, exiting")
return
}
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGKILL,

Check failure on line 231 in cmd/ops_agent_uap_wrapper/service.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

unknown field Pdeathsig in struct literal of type "syscall".SysProcAttr

Check failure on line 231 in cmd/ops_agent_uap_wrapper/service.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

unknown field Pdeathsig in struct literal of type "syscall".SysProcAttr
Expand All @@ -208,21 +234,48 @@ func restartCommand(cmd *exec.Cmd) error {
var outb, errb bytes.Buffer
cmd.Stderr = &errb
cmd.Stdout = &outb
log.Printf("Restarting command: %s, with arguments: %s", cmd.Path, cmd.Args)
logger.Infof("Restarting command: %s, with arguments: %s", cmd.Path, cmd.Args)
err := cmd.Run()
if err != nil {
// https://pkg.go.dev/os#ProcessState.ExitCode Don't restart if the command was terminated by signals.
fullError := fmt.Errorf("failed to execute cmd: %s with arguments %s, \ncommand output: %s\n error: %s %s", cmd.Path, cmd.Args, outb.String(), errb.String(), err)

if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ProcessState.ExitCode() == -1 {
notRestartedError := fmt.Errorf("command terminated by signals, not restarting\n%s", fullError)
log.Print(notRestartedError)
return notRestartedError
logger.Errorf("%s", notRestartedError)
return
}
log.Print(fullError)
logger.Errorf("%s", fullError)

} else {
log.Printf("command: %s, with arguments: %s completed successfully", cmd.Path, cmd.Args)
logger.Infof("command: %s, with arguments: %s completed successfully", cmd.Path, cmd.Args)
}
cmdToRestart := exec.Command(cmd.Path, cmd.Args...)
return restartCommand(cmdToRestart)
// Sleep 10 seconds before retarting the task
time.Sleep(5 * time.Second)
cmdToRestart := exec.CommandContext(ctx, cmd.Path, cmd.Args...)
wg.Add(1)
go restartCommand(ctx, wg, logger, cmdToRestart)
}

func CreateOpsAgentUapPluginLogger(logDir string) logs.StructuredLogger {
// Check if the directory already exists
if _, err := os.Stat(logDir); os.IsNotExist(err) {
// Directory does not exist, create it
err := os.Mkdir(logDir, 0755) // 0755 sets permissions (read/write/execute for owner, read/execute for group and others)
if err != nil {
log.Printf("failed to create directory for %q: %v", logDir, err)
logDir = ""
}
}

// Create the log file under the directory
path := filepath.Join(logDir, OpsAgentUapPluginLog)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("failed to open health checks log file %q: %v", path, err)
return logs.Default()
}
file.Close()

return logs.New(path)
}

0 comments on commit 7200f96

Please sign in to comment.