Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opamp-server): add ebpf callback hooks #1582

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion odiglet/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
}

odigosNs := k8senv.GetCurrentNamespace()
err = server.StartOpAmpServer(ctx, log.Logger, mgr, clientset, env.Current.NodeName, odigosNs)
err = server.StartOpAmpServer(ctx, log.Logger, mgr, clientset, env.Current.NodeName, odigosNs, nil)
if err != nil {
log.Logger.Error(err, "Failed to start opamp server")
}
Expand Down
3 changes: 3 additions & 0 deletions opampserver/pkg/connection/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ type ConnectionInfo struct {
// AgentRemoteConfig is the full remote config opamp message to send to the agent when needed
AgentRemoteConfig *protobufs.AgentRemoteConfig
RemoteResourceAttributes []configresolvers.ResourceAttribute

// can be nil or a cancel function to stop the eBPF program and release resources when closing the connection
EbpfCloseFunction func() error
}
30 changes: 30 additions & 0 deletions opampserver/pkg/server/ebpf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package server

import (
"context"

"go.opentelemetry.io/otel/attribute"
)

type EbpfHooks interface {

// This function is called when a new opamp client connection is established.
// If set, it is expected for this callback to load any eBPF programs needed for instrumentation
// of this client.
// The function should block until the eBPF programs are loaded, either with success or error.
// If an error is returned, the agent will be signaled not to start the and connection will be closed.
// If the function returns nil, the connection will be allowed to proceed, and the eBPF part is assumed ready.
//
// Input:
// - ctx: the context of the request
// - programmingLanguage: the programming language of the agent, as reported by the agent, conforming to otel semconv
// - pid: the process id of the agent process, which is used to inject the eBPF programs
// - serviceName: the service name to use as resource attribute for generated telemetry
// - resourceAttributes: a list of resource attributes to populate in the resource of the generated telemetry
//
// Output:
// - error: if an error occurred during the loading of the eBPF programs
// - cancelFunc: if loaded successfully, a cancel function to be called when the connection is closed to unload the eBPF programs and release resources
// at the moment, errors from cancel are logged and ignored
OnNewInstrumentedProcess(ctx context.Context, programmingLanguage string, pid int64, serviceName string, resourceAttributes []attribute.KeyValue) (func() error, error)
}
38 changes: 38 additions & 0 deletions opampserver/pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ConnectionHandlers struct {
kubeClientSet *kubernetes.Clientset
scheme *runtime.Scheme // TODO: revisit this, we should not depend on controller runtime
nodeName string
ebpfcb EbpfHooks
}

type opampAgentAttributesKeys struct {
Expand Down Expand Up @@ -95,6 +96,16 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
}
c.logger.Info("new OpAMP client connected", "deviceId", deviceId, "namespace", k8sAttributes.Namespace, "podName", k8sAttributes.PodName, "instrumentedAppName", instrumentedAppName, "workloadKind", k8sAttributes.WorkloadKind, "workloadName", k8sAttributes.WorkloadName, "containerName", k8sAttributes.ContainerName, "otelServiceName", k8sAttributes.OtelServiceName)

var ebpfCloseFunction func() error
if c.ebpfcb != nil {
otelResourceAttrs := opampResourceAttributesToOtel(remoteResourceAttributes)
closeFunction, err := c.ebpfcb.OnNewInstrumentedProcess(ctx, attrs.ProgrammingLanguage, pid, k8sAttributes.OtelServiceName, otelResourceAttrs)
if err != nil {
return nil, nil, fmt.Errorf("failed to load ebpf instrumentation program: %w", err)
}
ebpfCloseFunction = closeFunction
}

connectionInfo := &connection.ConnectionInfo{
DeviceId: deviceId,
Workload: podWorkload,
Expand All @@ -105,6 +116,7 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
InstrumentedAppName: instrumentedAppName,
AgentRemoteConfig: fullRemoteConfig,
RemoteResourceAttributes: remoteResourceAttributes,
EbpfCloseFunction: ebpfCloseFunction,
}

serverToAgent := &protobufs.ServerToAgent{
Expand Down Expand Up @@ -132,10 +144,36 @@ func (c *ConnectionHandlers) OnAgentToServerMessage(ctx context.Context, request
}

func (c *ConnectionHandlers) OnConnectionClosed(ctx context.Context, connectionInfo *connection.ConnectionInfo) {

if connectionInfo == nil {
// should not happen, safegurad against nil pointer
c.logger.Error(fmt.Errorf("missing connection info"), "OnConnectionClosed called with nil connection info")
return
}
if connectionInfo.EbpfCloseFunction != nil {
// signal the eBPF program to stop and release resources. fire and forget
err := connectionInfo.EbpfCloseFunction()
if err != nil {
c.logger.Error(err, "failed to unload eBPF program")
}
}

// keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting
}

func (c *ConnectionHandlers) OnConnectionNoHeartbeat(ctx context.Context, connectionInfo *connection.ConnectionInfo) error {

if connectionInfo == nil {
return fmt.Errorf("missing connection info")
}
if connectionInfo.EbpfCloseFunction != nil {
// signal the eBPF program to stop and release resources. fire and forget
err := connectionInfo.EbpfCloseFunction()
if err != nil {
c.logger.Error(err, "failed to unload eBPF program")
}
}

healthy := false
message := fmt.Sprintf("OpAMP server did not receive heartbeat from the agent, last message time: %s", connectionInfo.LastMessageTime.Format("2006-01-02 15:04:05 MST"))
// keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting
Expand Down
5 changes: 3 additions & 2 deletions opampserver/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
)

func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClientSet *kubernetes.Clientset, nodeName string, odigosNs string) error {
func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClientSet *kubernetes.Clientset, nodeName string, odigosNs string, ebpfcb EbpfHooks) error {

listenEndpoint := fmt.Sprintf("0.0.0.0:%d", OpAmpServerDefaultPort)
logger.Info("Starting opamp server", "listenEndpoint", listenEndpoint)
Expand All @@ -39,6 +39,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
kubeClientSet: kubeClientSet,
scheme: mgr.GetScheme(),
nodeName: nodeName,
ebpfcb: ebpfcb,
}

http.HandleFunc("POST /v1/opamp", func(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -165,7 +166,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "Failed to shut down the http server for incoming connections")
}
logger.Info("Shutting down live connections timeout monitor")
logger.Info("Shutting down OpAMP server gracefully due to context cancellation")
return
case <-ticker.C:
// Clean up stale connections
Expand Down
11 changes: 11 additions & 0 deletions opampserver/pkg/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,20 @@ package server
import (
"strconv"

"github.com/odigos-io/odigos/opampserver/pkg/sdkconfig/configresolvers"
"github.com/odigos-io/odigos/opampserver/protobufs"
"go.opentelemetry.io/otel/attribute"
)

func opampResourceAttributesToOtel(opampResourceAttributes []configresolvers.ResourceAttribute) []attribute.KeyValue {
otelAttributes := make([]attribute.KeyValue, 0, len(opampResourceAttributes))
for _, attr := range opampResourceAttributes {
// TODO: support any type, not just string
otelAttributes = append(otelAttributes, attribute.String(attr.Key, attr.Value))
}
return otelAttributes
}

func ConvertAnyValueToString(value *protobufs.AnyValue) string {
switch v := value.Value.(type) {
case *protobufs.AnyValue_StringValue:
Expand Down
Loading