Skip to content

Commit

Permalink
Initial tracing support for lucius (#475)
Browse files Browse the repository at this point in the history
* Initial tracing support for lucius

* fix return
  • Loading branch information
DanG100 authored Sep 12, 2024
1 parent 70d6b3d commit 6627c8d
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 72 deletions.
2 changes: 2 additions & 0 deletions dataplane/saiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ go_library(
"//proto/forwarding",
"@com_github_golang_glog//:glog",
"@com_github_openconfig_gnmi//errlist",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel_trace//:trace",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
2 changes: 1 addition & 1 deletion dataplane/saiserver/attrmgr/attrmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (mgr *AttrMgr) PopulateAttributes(req, resp proto.Message) error {
enumVal := reqList.Get(i).Enum()
val, ok := mgr.attrs[id][int32(enumVal)]
if !ok {
return fmt.Errorf("requested attribute not set: %v in OID: %v", reqList.Get(i), id)
return fmt.Errorf("requested attribute not set: %v in OID: %v", attrTypeFd.Enum().Values().ByNumber(reqList.Get(i).Enum()).Name(), id)
}
// Empty lists exist so they are not errors, but are not settable.
if val != nil {
Expand Down
96 changes: 89 additions & 7 deletions dataplane/saiserver/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"net"
"strconv"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -87,6 +89,83 @@ type switchDataplaneAPI interface {
FlowCounterQuery(_ context.Context, request *fwdpb.FlowCounterQueryRequest) (*fwdpb.FlowCounterQueryReply, error)
}

type luciusTrace struct {
switchDataplaneAPI
tracer trace.Tracer
}

func (l *luciusTrace) TableCreate(ctx context.Context, req *fwdpb.TableCreateRequest) (*fwdpb.TableCreateReply, error) {
ctx, span := l.tracer.Start(ctx, "TableCreate")
defer span.End()
return l.switchDataplaneAPI.TableCreate(ctx, req)
}

func (l *luciusTrace) TableEntryAdd(ctx context.Context, req *fwdpb.TableEntryAddRequest) (*fwdpb.TableEntryAddReply, error) {
ctx, span := l.tracer.Start(ctx, "TableEntryAdd")
defer span.End()
return l.switchDataplaneAPI.TableEntryAdd(ctx, req)
}

func (l *luciusTrace) TableEntryRemove(ctx context.Context, req *fwdpb.TableEntryRemoveRequest) (*fwdpb.TableEntryRemoveReply, error) {
ctx, span := l.tracer.Start(ctx, "TableEntryRemove")
defer span.End()
return l.switchDataplaneAPI.TableEntryRemove(ctx, req)
}

func (l *luciusTrace) PortState(ctx context.Context, req *fwdpb.PortStateRequest) (*fwdpb.PortStateReply, error) {
ctx, span := l.tracer.Start(ctx, "PortState")
defer span.End()
return l.switchDataplaneAPI.PortState(ctx, req)
}

func (l *luciusTrace) ObjectCounters(ctx context.Context, req *fwdpb.ObjectCountersRequest) (*fwdpb.ObjectCountersReply, error) {
ctx, span := l.tracer.Start(ctx, "ObjectCounters")
defer span.End()
return l.switchDataplaneAPI.ObjectCounters(ctx, req)
}

func (l *luciusTrace) PortCreate(ctx context.Context, req *fwdpb.PortCreateRequest) (*fwdpb.PortCreateReply, error) {
ctx, span := l.tracer.Start(ctx, "PortCreate")
defer span.End()
return l.switchDataplaneAPI.PortCreate(ctx, req)
}

func (l *luciusTrace) PortUpdate(ctx context.Context, req *fwdpb.PortUpdateRequest) (*fwdpb.PortUpdateReply, error) {
ctx, span := l.tracer.Start(ctx, "PortUpdate")
defer span.End()
return l.switchDataplaneAPI.PortUpdate(ctx, req)
}

func (l *luciusTrace) AttributeUpdate(ctx context.Context, req *fwdpb.AttributeUpdateRequest) (*fwdpb.AttributeUpdateReply, error) {
ctx, span := l.tracer.Start(ctx, "AttributeUpdate")
defer span.End()
return l.switchDataplaneAPI.AttributeUpdate(ctx, req)
}

func (l *luciusTrace) ObjectNID(ctx context.Context, req *fwdpb.ObjectNIDRequest) (*fwdpb.ObjectNIDReply, error) {
ctx, span := l.tracer.Start(ctx, "ObjectNID")
defer span.End()
return l.switchDataplaneAPI.ObjectNID(ctx, req)
}

func (l *luciusTrace) ObjectDelete(ctx context.Context, req *fwdpb.ObjectDeleteRequest) (*fwdpb.ObjectDeleteReply, error) {
ctx, span := l.tracer.Start(ctx, "ObjectDelete")
defer span.End()
return l.switchDataplaneAPI.ObjectDelete(ctx, req)
}

func (l *luciusTrace) FlowCounterCreate(ctx context.Context, req *fwdpb.FlowCounterCreateRequest) (*fwdpb.FlowCounterCreateReply, error) {
ctx, span := l.tracer.Start(ctx, "FlowCounterCreate")
defer span.End()
return l.switchDataplaneAPI.FlowCounterCreate(ctx, req)
}

func (l *luciusTrace) FlowCounterQuery(ctx context.Context, req *fwdpb.FlowCounterQueryRequest) (*fwdpb.FlowCounterQueryReply, error) {
ctx, span := l.tracer.Start(ctx, "FlowCounterQuery")
defer span.End()
return l.switchDataplaneAPI.FlowCounterQuery(ctx, req)
}

const (
inputIfaceTable = "input-iface"
outputIfaceTable = "output-iface"
Expand Down Expand Up @@ -118,17 +197,20 @@ const (
)

func newSwitch(mgr *attrmgr.AttrMgr, engine switchDataplaneAPI, s *grpc.Server, opts *dplaneopts.Options) (*saiSwitch, error) {
vlan := newVlan(mgr, engine, s)
q := newQueue(mgr, engine, s)
sg := newSchedulerGroup(mgr, engine, s)
port, err := newPort(mgr, engine, s, vlan, q, sg, opts)
dplane := &luciusTrace{switchDataplaneAPI: engine, tracer: otel.Tracer("lucius")}

vlan := newVlan(mgr, dplane, s)
q := newQueue(mgr, dplane, s)
sg := newSchedulerGroup(mgr, dplane, s)
port, err := newPort(mgr, dplane, s, vlan, q, sg, opts)
if err != nil {
return nil, err
}

sw := &saiSwitch{
dataplane: engine,
acl: newACL(mgr, engine, s),
policer: newPolicer(mgr, engine, s),
dataplane: dplane,
acl: newACL(mgr, dplane, s),
policer: newPolicer(mgr, dplane, s),
port: port,
vlan: vlan,
stp: &stp{},
Expand Down
11 changes: 10 additions & 1 deletion dataplane/standalone/lucius/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@ go_library(
"//dataplane/saiserver",
"//dataplane/saiserver/attrmgr",
"//proto/forwarding",
"@com_github_golang_glog//:glog",
"@com_github_grpc_ecosystem_go_grpc_middleware_v2//interceptors/logging",
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:otelgrpc",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel//propagation",
"@io_opentelemetry_go_otel_exporters_stdout_stdoutlog//:stdoutlog",
"@io_opentelemetry_go_otel_log//global",
"@io_opentelemetry_go_otel_sdk//resource",
"@io_opentelemetry_go_otel_sdk//trace",
"@io_opentelemetry_go_otel_sdk_log//:log",
"@io_opentelemetry_go_otel_sdk_metric//:metric",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//reflection",
],
)
Expand Down
119 changes: 109 additions & 10 deletions dataplane/standalone/lucius/lucius.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,32 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"log/slog"
"net"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/propagation"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"

"github.com/openconfig/lemming/dataplane/dplaneopts"
"github.com/openconfig/lemming/dataplane/saiserver"
"github.com/openconfig/lemming/dataplane/saiserver/attrmgr"

log "github.com/golang/glog"

fwdpb "github.com/openconfig/lemming/proto/forwarding"
)

Expand All @@ -46,26 +57,43 @@ var (

func main() {
flag.Parse()
cancel, err := setupOTelSDK(context.Background())
if err != nil {
log.Fatal(err)
}
defer cancel(context.Background())
start(*port)
}

func getLogger() logging.Logger {
return logging.LoggerFunc(func(_ context.Context, level logging.Level, msg string, fields ...any) {
return logging.LoggerFunc(func(ctx context.Context, level logging.Level, msg string, fields ...any) {
switch level {
case logging.LevelDebug:
log.V(1).Info(msg, fields)
slog.DebugContext(ctx, msg, fields)

Check failure on line 72 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.DebugContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
case logging.LevelInfo:
log.Info(msg, fields)
slog.InfoContext(ctx, msg, fields)

Check failure on line 74 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.InfoContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
case logging.LevelWarn:
log.Warning(msg, fields)
slog.WarnContext(ctx, msg, fields)

Check failure on line 76 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.WarnContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
case logging.LevelError:
log.Error(msg, fields)
slog.ErrorContext(ctx, msg, fields)

Check failure on line 78 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.ErrorContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
}
})
}

var tracer = otel.Tracer("")

func traceHandler(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()

resp, err := handler(ctx, req)
grpc.SetTrailer(ctx, metadata.Pairs("traceparent", span.SpanContext().TraceID().String()))

return resp, err
}

func start(port int) {
log.Info("lucius initialized")
slog.Info("lucius initialized")

lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
Expand All @@ -74,8 +102,10 @@ func start(port int) {

mgr := attrmgr.New()
srv := grpc.NewServer(grpc.Creds(insecure.NewCredentials()),
grpc.ChainUnaryInterceptor(logging.UnaryServerInterceptor(getLogger()), mgr.Interceptor),
grpc.ChainStreamInterceptor(logging.StreamServerInterceptor(getLogger())))
grpc.ChainUnaryInterceptor(logging.UnaryServerInterceptor(getLogger()), mgr.Interceptor, traceHandler),
grpc.ChainStreamInterceptor(logging.StreamServerInterceptor(getLogger())),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)

reflection.Register(srv)

Expand All @@ -92,3 +122,72 @@ func start(port int) {
log.Fatalf("failed to serve forwarding server: %v", err)
}
}

// setupOTelSDK bootstraps the OpenTelemetry pipeline.
// If it does not return an error, make sure to call shutdown for proper cleanup.
func setupOTelSDK(ctx context.Context) (func(context.Context) error, error) {
var shutdownFuncs []func(context.Context) error

shutdown := func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}

res := resource.Default()

// Set up propagator.
prop := newPropagator()
otel.SetTextMapPropagator(prop)

bsp := sdktrace.NewBatchSpanProcessor(nil)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)

meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(nil)),
sdkmetric.WithResource(res),
)

shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
otel.SetTracerProvider(tracerProvider)

shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
otel.SetMeterProvider(meterProvider)

// Set up logger provider.
loggerProvider, err := newLoggerProvider(res)
if err != nil {
return nil, errors.Join(err, shutdown(ctx))
}
shutdownFuncs = append(shutdownFuncs, loggerProvider.Shutdown)
global.SetLoggerProvider(loggerProvider)

return shutdown, nil
}

func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}

func newLoggerProvider(res *resource.Resource) (*sdklog.LoggerProvider, error) {
logExporter, err := stdoutlog.New()
if err != nil {
return nil, err
}

loggerProvider := sdklog.NewLoggerProvider(
sdklog.WithProcessor(sdklog.NewBatchProcessor(logExporter)),
sdklog.WithResource(res),
)
return loggerProvider, nil
}
Loading

0 comments on commit 6627c8d

Please sign in to comment.