Skip to content

Commit

Permalink
UAP Ops Agent Golang Wrapper.
Browse files Browse the repository at this point in the history
  • Loading branch information
XuechunHou committed Nov 27, 2024
1 parent 19a24c5 commit a002926
Show file tree
Hide file tree
Showing 2 changed files with 279 additions and 0 deletions.
66 changes: 66 additions & 0 deletions cmd/ops_agent_uap_wrapper/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"flag"
"fmt"
"net"
"os"
"time"

pb "github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin"

Check failure on line 11 in cmd/ops_agent_uap_wrapper/plugin.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

module github.com/GoogleCloudPlatform/google-guest-agent@latest found (v0.0.0-20241121184505-2a5a31759404), but does not contain package github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin

Check failure on line 11 in cmd/ops_agent_uap_wrapper/plugin.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

module github.com/GoogleCloudPlatform/google-guest-agent@latest found (v0.0.0-20241121184505-2a5a31759404), but does not contain package github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin
"google.golang.org/grpc"
)

var (
// protocol is the protocol to use tcp/uds.
protocol string
// address is the address to start server listening on.
address string
// logfile is the path to the log file to capture error logs.
logfile string
)

func init() {
flag.StringVar(&protocol, "protocol", "", "protocol to use uds/tcp")
flag.StringVar(&address, "address", "", "address to start server listening on")
flag.StringVar(&logfile, "errorlogfile", "", "path to the error log file")
}

func main() {
flag.Parse()

if _, err := os.Stat(address); err == nil {
if err := os.RemoveAll(address); err != nil {
// Unix sockets must be unlinked (listener.Close()) before
// being reused again. If file already exist bind can fail.
fmt.Fprintf(os.Stderr, "Failed to remove %q: %v\n", address, err)
os.Exit(1)
}
}

listener, err := net.Listen(protocol, address)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to start listening on %q using %q: %v\n", address, protocol, err)
os.Exit(1)
}
defer listener.Close()

// This is the grpc server in communication with the Guest Agent.
server := grpc.NewServer()
defer server.GracefulStop()

ps := &OpsAgentPluginServer{server: server}
// Successfully registering the server and starting to listen on the address
// offered mean Guest Agent was successful in installing/launching the plugin
// & will manage the lifecycle (start, stop, or revision change) here onwards.
pb.RegisterGuestAgentPluginServer(server, ps)
if err := server.Serve(listener); err != nil {
fmt.Fprintf(os.Stderr, "Exiting, cannot continue serving: %v\n", err)
os.Exit(1)
}
ctx := context.Background()
ps.Start(ctx, &pb.StartRequest{})
time.Sleep(3 * time.Minute)
ps.Stop(ctx, &pb.StopRequest{})
}
213 changes: 213 additions & 0 deletions cmd/ops_agent_uap_wrapper/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package main

import (
"context"

"bytes"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
"syscall"
"time"

"google.golang.org/grpc"

pb "github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin"
)

const MaximumWaitForProcessStart = 5 * time.Second
const Prefix = "/opt/google-cloud-ops-agent" // @PREFIX@
const Sysconfdir = "/etc" // @SYSCONFDIR@
const LogsDirectory = "/var/log/google-cloud-ops-agent"
const FluentBitStateDiectory = "/var/lib/google-cloud-ops-agent/fluent-bit"
const FluentBitRuntimeDirectory = "/run/google-cloud-ops-agent-fluent-bit"
const OtelRuntimeDirectory = "/run/google-cloud-ops-agent-opentelemetry-collector"

// PluginServer implements the plugin RPC server interface.
type OpsAgentPluginServer struct {
pb.UnimplementedGuestAgentPluginServer
server *grpc.Server
// cancel is the cancel function to be called when core plugin is stopped.
cancel context.CancelFunc
}

// Apply applies the config sent or performs the work defined in the message.
// ApplyRequest is opaque to the agent and is expected to be well known contract
// between Plugin and the server itself. For e.g. service might want to update
// plugin config to enable/disable feature here plugins can react to such requests.
func (ps *OpsAgentPluginServer) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{}, nil
}

// sigHandler handles SIGTERM, SIGINT etc signals. The function provided in the
// cancel argument handles internal framework termination and the plugin
// interface notification of the "exiting" state.
func sigHandler(ctx context.Context, cancel func(sig os.Signal)) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGHUP, syscall.SIGKILL)
go func() {
select {
case sig := <-sigChan:
log.Printf("Got signal: %d, leaving...", sig)
close(sigChan)
cancel(sig)
case <-ctx.Done():
break
}
}()
}

func (ps *OpsAgentPluginServer) runAgent(ctx context.Context) {
// Register signal handler and implements its callback.
sigHandler(ctx, func(_ os.Signal) {
// We're handling some external signal here, set cleanup to [false].
// If this was Guest Agent trying to stop it would call [Stop] RPC directly
// or do a [SIGKILL] which anyways cannot be intercepted.
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
})

defer func() {
log.Print("Stopping Ops Agent plugin...")
ps.server.GracefulStop()
}()

// Starting ExecStartPre commands
execStartPreConfigValidationCmd := exec.CommandContext(ctx,
Prefix+"/libexec/google_cloud_ops_agent_engine",
"-in", Sysconfdir+"/google-cloud-ops-agent/config.yaml",
)
if err := runCommand(execStartPreConfigValidationCmd); err != nil {
log.Fatalf("failed to validate Ops Agent default config.yaml: %s", err)
}

execStartPreOtelCmd := exec.CommandContext(ctx,
Prefix+"/libexec/google_cloud_ops_agent_engine",
"-service", "otel",
"-in", Sysconfdir+"/google-cloud-ops-agent/config.yaml",
"-out", OtelRuntimeDirectory,
"-logs", LogsDirectory)

if err := runCommand(execStartPreOtelCmd); err != nil {
log.Fatalf("failed to generate config yaml for Otel: %s", err)
}

execStartPreFluentBitCmd := exec.CommandContext(ctx,
Prefix+"/libexec/google_cloud_ops_agent_engine",
"-service", "fluentbit",
"-in", Sysconfdir+"/google-cloud-ops-agent/config.yaml",
"-out", FluentBitRuntimeDirectory,
"-logs", LogsDirectory, "-state", FluentBitStateDiectory)

if err := runCommand(execStartPreFluentBitCmd); err != nil {
log.Fatalf("failed to generate config yaml for FluentBit: %s", err)
}
// Starting Diagnostics Service
execDiagnosticsCmd := exec.CommandContext(ctx,
Prefix+"/libexec/google_cloud_ops_agent_diagnostics",
"-config", Sysconfdir+"/google-cloud-ops-agent/config.yaml",
)
go restartCommand(execDiagnosticsCmd)

// Starting Otel
execOtelCmd := exec.CommandContext(ctx,
Prefix+"/subagents/opentelemetry-collector/otelopscol",
"--config", OtelRuntimeDirectory+"/otel.yaml",
)
go restartCommand(execOtelCmd)

// Starting FluentBit
execFluentBitCmd := exec.CommandContext(ctx,
Prefix+"/libexec/google_cloud_ops_agent_wrapper",
"-config_path", Sysconfdir+"/google-cloud-ops-agent/config.yaml",
"-log_path", LogsDirectory+"/subagents/logging-module.log",
Prefix+"/subagents/fluent-bit/bin/fluent-bit",
"--config", FluentBitRuntimeDirectory+"/fluent_bit_main.conf",
"--parser", FluentBitRuntimeDirectory+"/fluent_bit_parser.conf",
"--storage_path", FluentBitStateDiectory+"/buffers",
)
if err := restartCommand(execFluentBitCmd); err != nil {
log.Fatal(err)
}
}

// Start starts the plugin and initiates the plugin functionality.
// Until plugin receives Start request plugin is expected to be not functioning
// and just listening on the address handed off waiting for the request.
func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest) (*pb.StartResponse, error) {
pCtx, cancel := context.WithCancel(context.Background())
ps.cancel = cancel

go ps.runAgent(pCtx)
return &pb.StartResponse{}, nil
}

// Stop is the stop hook and implements any cleanup if required.
// Stop maybe called if plugin revision is being changed.
// For e.g. if plugins want to stop some task it was performing or remove some
// state before exiting it can be done on this request.
func (ps *OpsAgentPluginServer) Stop(ctx context.Context, msg *pb.StopRequest) (*pb.StopResponse, error) {
log.Printf("Handling stop request %+v, stopping core plugin...", msg)
ps.cancel()
return &pb.StopResponse{}, nil
}

// GetStatus is the health check agent would perform to make sure plugin process
// is alive. If request fails process is considered dead and relaunched. Plugins
// can share any additional information to report it to the service. For e.g. if
// plugins detect some non-fatal errors causing it unable to offer some features
// it can reported in status which is sent back to the service by agent.
func (ps *OpsAgentPluginServer) GetStatus(ctx context.Context, msg *pb.GetStatusRequest) (*pb.Status, error) {
return &pb.Status{Code: 0, Results: []string{"Plugin is running ok"}}, nil
}

func runCommand(cmd *exec.Cmd) error {
if cmd == nil {
return nil
}
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGKILL,
Setpgid: true,
}
var outb, errb bytes.Buffer
cmd.Stderr = &errb
cmd.Stdout = &outb
log.Printf("Running command: %s, with arguments: %s", cmd.Path, cmd.Args)
if err := cmd.Run(); err != nil {
fullError := fmt.Errorf("failed to execute cmd: %s with arguments %s, \ncommand output: %s\n error: %s %s", cmd.Path, cmd.Args, outb.String(), errb.String(), err)
log.Print(fullError)
return fullError
}
return nil
}

func restartCommand(cmd *exec.Cmd) error {
if cmd == nil {
return nil
}
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGKILL,
Setpgid: true,
}
var outb, errb bytes.Buffer
cmd.Stderr = &errb
cmd.Stdout = &outb
log.Printf("Restarting command: %s, with arguments: %s", cmd.Path, cmd.Args)
err := cmd.Run()
if err != nil {
// https://pkg.go.dev/os#ProcessState.ExitCode Don't restart if the command was terminated by signals.
fullError := fmt.Errorf("failed to execute cmd: %s with arguments %s, \ncommand output: %s\n error: %s %s", cmd.Path, cmd.Args, outb.String(), errb.String(), err)

if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ProcessState.ExitCode() == -1 {
notRestartedError := fmt.Errorf("command terminated by signals, not restarting\n%s", fullError)
log.Print(notRestartedError)
return notRestartedError
}
log.Print(fullError)
} else {
log.Printf("command: %s, with arguments: %s completed successfully", cmd.Path, cmd.Args)
}
cmdToRestart := exec.Command(cmd.Path, cmd.Args...)
return restartCommand(cmdToRestart)
}

0 comments on commit a002926

Please sign in to comment.