Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial tracing support for lucius #475

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataplane/saiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ go_library(
"//proto/forwarding",
"@com_github_golang_glog//:glog",
"@com_github_openconfig_gnmi//errlist",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel_trace//:trace",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
2 changes: 1 addition & 1 deletion dataplane/saiserver/attrmgr/attrmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (mgr *AttrMgr) PopulateAttributes(req, resp proto.Message) error {
enumVal := reqList.Get(i).Enum()
val, ok := mgr.attrs[id][int32(enumVal)]
if !ok {
return fmt.Errorf("requested attribute not set: %v in OID: %v", reqList.Get(i), id)
return fmt.Errorf("requested attribute not set: %v in OID: %v", attrTypeFd.Enum().Values().ByNumber(reqList.Get(i).Enum()).Name(), id)
}
// Empty lists exist so they are not errors, but are not settable.
if val != nil {
Expand Down
96 changes: 89 additions & 7 deletions dataplane/saiserver/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"net"
"strconv"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -87,6 +89,83 @@ type switchDataplaneAPI interface {
FlowCounterQuery(_ context.Context, request *fwdpb.FlowCounterQueryRequest) (*fwdpb.FlowCounterQueryReply, error)
}

type luciusTrace struct {
switchDataplaneAPI
tracer trace.Tracer
}

func (l *luciusTrace) TableCreate(ctx context.Context, req *fwdpb.TableCreateRequest) (*fwdpb.TableCreateReply, error) {
ctx, span := l.tracer.Start(ctx, "TableCreate")
defer span.End()
return l.switchDataplaneAPI.TableCreate(ctx, req)
}

func (l *luciusTrace) TableEntryAdd(ctx context.Context, req *fwdpb.TableEntryAddRequest) (*fwdpb.TableEntryAddReply, error) {
ctx, span := l.tracer.Start(ctx, "TableEntryAdd")
defer span.End()
return l.switchDataplaneAPI.TableEntryAdd(ctx, req)
}

func (l *luciusTrace) TableEntryRemove(ctx context.Context, req *fwdpb.TableEntryRemoveRequest) (*fwdpb.TableEntryRemoveReply, error) {
ctx, span := l.tracer.Start(ctx, "TableEntryRemove")
defer span.End()
return l.switchDataplaneAPI.TableEntryRemove(ctx, req)
}

func (l *luciusTrace) PortState(ctx context.Context, req *fwdpb.PortStateRequest) (*fwdpb.PortStateReply, error) {
ctx, span := l.tracer.Start(ctx, "PortState")
defer span.End()
return l.switchDataplaneAPI.PortState(ctx, req)
}

func (l *luciusTrace) ObjectCounters(ctx context.Context, req *fwdpb.ObjectCountersRequest) (*fwdpb.ObjectCountersReply, error) {
ctx, span := l.tracer.Start(ctx, "ObjectCounters")
defer span.End()
return l.switchDataplaneAPI.ObjectCounters(ctx, req)
}

func (l *luciusTrace) PortCreate(ctx context.Context, req *fwdpb.PortCreateRequest) (*fwdpb.PortCreateReply, error) {
ctx, span := l.tracer.Start(ctx, "PortCreate")
defer span.End()
return l.switchDataplaneAPI.PortCreate(ctx, req)
}

func (l *luciusTrace) PortUpdate(ctx context.Context, req *fwdpb.PortUpdateRequest) (*fwdpb.PortUpdateReply, error) {
ctx, span := l.tracer.Start(ctx, "PortUpdate")
defer span.End()
return l.switchDataplaneAPI.PortUpdate(ctx, req)
}

func (l *luciusTrace) AttributeUpdate(ctx context.Context, req *fwdpb.AttributeUpdateRequest) (*fwdpb.AttributeUpdateReply, error) {
ctx, span := l.tracer.Start(ctx, "AttributeUpdate")
defer span.End()
return l.switchDataplaneAPI.AttributeUpdate(ctx, req)
}

func (l *luciusTrace) ObjectNID(ctx context.Context, req *fwdpb.ObjectNIDRequest) (*fwdpb.ObjectNIDReply, error) {
ctx, span := l.tracer.Start(ctx, "ObjectNID")
defer span.End()
return l.switchDataplaneAPI.ObjectNID(ctx, req)
}

func (l *luciusTrace) ObjectDelete(ctx context.Context, req *fwdpb.ObjectDeleteRequest) (*fwdpb.ObjectDeleteReply, error) {
ctx, span := l.tracer.Start(ctx, "ObjectDelete")
defer span.End()
return l.switchDataplaneAPI.ObjectDelete(ctx, req)
}

func (l *luciusTrace) FlowCounterCreate(ctx context.Context, req *fwdpb.FlowCounterCreateRequest) (*fwdpb.FlowCounterCreateReply, error) {
ctx, span := l.tracer.Start(ctx, "FlowCounterCreate")
defer span.End()
return l.switchDataplaneAPI.FlowCounterCreate(ctx, req)
}

func (l *luciusTrace) FlowCounterQuery(ctx context.Context, req *fwdpb.FlowCounterQueryRequest) (*fwdpb.FlowCounterQueryReply, error) {
ctx, span := l.tracer.Start(ctx, "FlowCounterQuery")
defer span.End()
return l.switchDataplaneAPI.FlowCounterQuery(ctx, req)
}

const (
inputIfaceTable = "input-iface"
outputIfaceTable = "output-iface"
Expand Down Expand Up @@ -118,17 +197,20 @@ const (
)

func newSwitch(mgr *attrmgr.AttrMgr, engine switchDataplaneAPI, s *grpc.Server, opts *dplaneopts.Options) (*saiSwitch, error) {
vlan := newVlan(mgr, engine, s)
q := newQueue(mgr, engine, s)
sg := newSchedulerGroup(mgr, engine, s)
port, err := newPort(mgr, engine, s, vlan, q, sg, opts)
dplane := &luciusTrace{switchDataplaneAPI: engine, tracer: otel.Tracer("lucius")}

vlan := newVlan(mgr, dplane, s)
q := newQueue(mgr, dplane, s)
sg := newSchedulerGroup(mgr, dplane, s)
port, err := newPort(mgr, dplane, s, vlan, q, sg, opts)
if err != nil {
return nil, err
}

sw := &saiSwitch{
dataplane: engine,
acl: newACL(mgr, engine, s),
policer: newPolicer(mgr, engine, s),
dataplane: dplane,
acl: newACL(mgr, dplane, s),
policer: newPolicer(mgr, dplane, s),
port: port,
vlan: vlan,
stp: &stp{},
Expand Down
11 changes: 10 additions & 1 deletion dataplane/standalone/lucius/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@ go_library(
"//dataplane/saiserver",
"//dataplane/saiserver/attrmgr",
"//proto/forwarding",
"@com_github_golang_glog//:glog",
"@com_github_grpc_ecosystem_go_grpc_middleware_v2//interceptors/logging",
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:otelgrpc",
"@io_opentelemetry_go_otel//:otel",
"@io_opentelemetry_go_otel//propagation",
"@io_opentelemetry_go_otel_exporters_stdout_stdoutlog//:stdoutlog",
"@io_opentelemetry_go_otel_log//global",
"@io_opentelemetry_go_otel_sdk//resource",
"@io_opentelemetry_go_otel_sdk//trace",
"@io_opentelemetry_go_otel_sdk_log//:log",
"@io_opentelemetry_go_otel_sdk_metric//:metric",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//reflection",
],
)
Expand Down
125 changes: 115 additions & 10 deletions dataplane/standalone/lucius/lucius.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,32 @@

import (
"context"
"errors"
"flag"
"fmt"
"log"
"log/slog"
"net"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/propagation"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"

"github.com/openconfig/lemming/dataplane/dplaneopts"
"github.com/openconfig/lemming/dataplane/saiserver"
"github.com/openconfig/lemming/dataplane/saiserver/attrmgr"

log "github.com/golang/glog"

fwdpb "github.com/openconfig/lemming/proto/forwarding"
)

Expand All @@ -46,26 +57,43 @@

func main() {
flag.Parse()
cancel, err := setupOTelSDK(context.Background())
if err != nil {
log.Fatal(err)
}
defer cancel(context.Background())
start(*port)
}

func getLogger() logging.Logger {
return logging.LoggerFunc(func(_ context.Context, level logging.Level, msg string, fields ...any) {
return logging.LoggerFunc(func(ctx context.Context, level logging.Level, msg string, fields ...any) {
switch level {
case logging.LevelDebug:
log.V(1).Info(msg, fields)
slog.DebugContext(ctx, msg, fields)

Check failure on line 72 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.DebugContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
case logging.LevelInfo:
log.Info(msg, fields)
slog.InfoContext(ctx, msg, fields)

Check failure on line 74 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.InfoContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
case logging.LevelWarn:
log.Warning(msg, fields)
slog.WarnContext(ctx, msg, fields)

Check failure on line 76 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.WarnContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
case logging.LevelError:
log.Error(msg, fields)
slog.ErrorContext(ctx, msg, fields)

Check failure on line 78 in dataplane/standalone/lucius/lucius.go

View workflow job for this annotation

GitHub Actions / lint

slog: slog.ErrorContext arg "fields" should be a string or a slog.Attr (possible missing key or value) (govet)
}
})
}

var tracer = otel.Tracer("")

func traceHandler(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()

resp, err := handler(ctx, req)
grpc.SetTrailer(ctx, metadata.Pairs("traceparent", span.SpanContext().TraceID().String()))

return resp, err
}

func start(port int) {
log.Info("lucius initialized")
slog.Info("lucius initialized")

lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
Expand All @@ -74,8 +102,10 @@

mgr := attrmgr.New()
srv := grpc.NewServer(grpc.Creds(insecure.NewCredentials()),
grpc.ChainUnaryInterceptor(logging.UnaryServerInterceptor(getLogger()), mgr.Interceptor),
grpc.ChainStreamInterceptor(logging.StreamServerInterceptor(getLogger())))
grpc.ChainUnaryInterceptor(logging.UnaryServerInterceptor(getLogger()), mgr.Interceptor, traceHandler),
grpc.ChainStreamInterceptor(logging.StreamServerInterceptor(getLogger())),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)

reflection.Register(srv)

Expand All @@ -92,3 +122,78 @@
log.Fatalf("failed to serve forwarding server: %v", err)
}
}

// setupOTelSDK bootstraps the OpenTelemetry pipeline.
// If it does not return an error, make sure to call shutdown for proper cleanup.
func setupOTelSDK(ctx context.Context) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error

shutdown = func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}

// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
handleErr := func(inErr error) {
err = errors.Join(inErr, shutdown(ctx))
}

res := resource.Default()

// Set up propagator.
prop := newPropagator()
otel.SetTextMapPropagator(prop)

bsp := sdktrace.NewBatchSpanProcessor(nil)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)

meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(nil)),
sdkmetric.WithResource(res),
)

shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
otel.SetTracerProvider(tracerProvider)

shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
otel.SetMeterProvider(meterProvider)

// Set up logger provider.
loggerProvider, err := newLoggerProvider(res)
if err != nil {
handleErr(err)
return
DanG100 marked this conversation as resolved.
Show resolved Hide resolved
}
shutdownFuncs = append(shutdownFuncs, loggerProvider.Shutdown)
global.SetLoggerProvider(loggerProvider)

return
}

func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}

func newLoggerProvider(res *resource.Resource) (*sdklog.LoggerProvider, error) {
logExporter, err := stdoutlog.New()
if err != nil {
return nil, err
}

loggerProvider := sdklog.NewLoggerProvider(
sdklog.WithProcessor(sdklog.NewBatchProcessor(logExporter)),
sdklog.WithResource(res),
)
return loggerProvider, nil
}
Loading
Loading