Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK 9149] Use RDK Logger Across Interceptors #405

Merged
merged 29 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
42f47a0
manually import grpc interceptor helpers
bashar-515 Jan 7, 2025
94909f7
convert almost everything to use a zap compatible logger
bashar-515 Jan 8, 2025
f2c40d2
finish converting to zap compatible logger
bashar-515 Jan 8, 2025
8010e85
implement stream interceptor
bashar-515 Jan 8, 2025
4f87caf
extract to utility function
bashar-515 Jan 8, 2025
ec63210
remove note
bashar-515 Jan 8, 2025
9f9013f
no longer pass unnecessary logger
bashar-515 Jan 8, 2025
33c0e76
remove options
bashar-515 Jan 8, 2025
277491c
remove unused import
bashar-515 Jan 9, 2025
bf8e007
only output error level logs and up
bashar-515 Jan 9, 2025
54915dc
take wrapped logger out of stream interceptor
bashar-515 Jan 9, 2025
f636b20
remove unused code
bashar-515 Jan 10, 2025
9fe3c36
lint
bashar-515 Jan 10, 2025
2217f11
create new logger first
bashar-515 Jan 10, 2025
435eee4
explicitly add options to logger
bashar-515 Jan 10, 2025
8de8a59
include 'nolint' comments that were removed
bashar-515 Jan 13, 2025
069ef9a
fix 'nolint' comments
bashar-515 Jan 13, 2025
84b75c6
fix final 'nolint' comment
bashar-515 Jan 13, 2025
706c0ef
remove new added line
bashar-515 Jan 13, 2025
5afb862
set log level to error whenever there is an error
bashar-515 Jan 14, 2025
dccdba0
ensure timestamps field is formatted like log timestamp
bashar-515 Jan 14, 2025
9f1a7f9
don't export ISO8601 formatting constant and only set level to error …
bashar-515 Jan 14, 2025
7c0f9db
remove ineffectual assignment
bashar-515 Jan 14, 2025
49a5747
document LogFinalLine()
bashar-515 Jan 14, 2025
5eae87f
introduce server option to 'wait for handlers'
bashar-515 Jan 21, 2025
efb95b0
add comment to exported function
bashar-515 Jan 21, 2025
f550946
restore server_options.go file
bashar-515 Jan 21, 2025
2e008c3
always gracefully stop gRPC server
bashar-515 Jan 21, 2025
153d6c1
no longer add option to wait for handlers
bashar-515 Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package utils

import (
"reflect"
"time"

"github.com/edaniels/golog"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
)

// Logger is used various parts of the package for informational/debugging purposes.
Expand Down Expand Up @@ -127,3 +131,35 @@ func AddFieldsToLogger(inp ZapCompatibleLogger, args ...interface{}) (loggerRet

return loggerRet
}

// LogFinalLine is used to log the final status of a gRPC request along with its execution time, an associated error (if any), and the
// gRPC status code. If there is an error, the log level is upgraded (if necessary) to ERROR. Otherwise, it is set to DEBUG. This code is
// taken from
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
func LogFinalLine(logger ZapCompatibleLogger, startTime time.Time, err error, msg string, code codes.Code) {
level := grpc_zap.DefaultCodeToLevel(code)

// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
fields := []any{}
if err == nil {
level = zap.DebugLevel
} else {
if level < zap.ErrorLevel {
level = zap.ErrorLevel
}
fields = append(fields, "error", err)
}
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
switch level {
case zap.DebugLevel:
logger.Debugw(msg, fields...)
case zap.InfoLevel:
logger.Infow(msg, fields...)
case zap.ErrorLevel:
logger.Errorw(msg, fields...)
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
logger.Warnw(msg, fields...)
}
}
17 changes: 5 additions & 12 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import (
"github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -327,10 +324,6 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger: logger,
}

grpcLogger := logger.Desugar()
if !(sOpts.debug || utils.Debug) {
grpcLogger = grpcLogger.WithOptions(zap.IncreaseLevel(zap.LevelEnablerFunc(zapcore.ErrorLevel.Enabled)))
}
if sOpts.unknownStreamDesc != nil {
serverOpts = append(serverOpts, grpc.UnknownServiceHandler(sOpts.unknownStreamDesc.Handler))
}
Expand All @@ -342,10 +335,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger.Errorw("panicked while calling unary server method", "error", errors.WithStack(err))
return err
}))),
grpc_zap.UnaryServerInterceptor(grpcLogger),
grpcUnaryServerInterceptor(logger),
unaryServerCodeInterceptor(),
)
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor(grpcLogger))
unaryInterceptors = append(unaryInterceptors, UnaryServerTracingInterceptor())
unaryAuthIntPos := -1
if !sOpts.unauthenticated {
unaryInterceptors = append(unaryInterceptors, server.authUnaryInterceptor)
Expand Down Expand Up @@ -375,10 +368,10 @@ func NewServer(logger utils.ZapCompatibleLogger, opts ...ServerOption) (Server,
logger.Errorw("panicked while calling stream server method", "error", errors.WithStack(err))
return err
}))),
grpc_zap.StreamServerInterceptor(grpcLogger),
grpcStreamServerInterceptor(logger),
streamServerCodeInterceptor(),
)
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor(grpcLogger))
streamInterceptors = append(streamInterceptors, StreamServerTracingInterceptor())
streamAuthIntPos := -1
if !sOpts.unauthenticated {
streamInterceptors = append(streamInterceptors, server.authStreamInterceptor)
Expand Down Expand Up @@ -861,7 +854,7 @@ func (ss *simpleServer) Stop() error {
err = multierr.Combine(err, ss.signalingCallQueue.Close())
}
ss.logger.Debug("stopping gRPC server")
defer ss.grpcServer.Stop()
defer ss.grpcServer.GracefulStop()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was causing a goroutine leak in RDK testing.

ss.logger.Debug("canceling service servers for gateway")
for _, cancel := range ss.serviceServerCancels {
cancel()
Expand Down
53 changes: 51 additions & 2 deletions rpc/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import (
"context"
"encoding/hex"
"fmt"
"path"
"strconv"
"time"

grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.viam.com/utils"
)

// UnaryServerTracingInterceptor starts a new Span if Span metadata exists in the context.
func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerInterceptor {
func UnaryServerTracingInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if remoteSpanContext, err := remoteSpanContextFromContext(ctx); err == nil {
var span *trace.Span
Expand All @@ -38,7 +43,7 @@ func UnaryServerTracingInterceptor(logger *zap.Logger) grpc.UnaryServerIntercept
}

// StreamServerTracingInterceptor starts a new Span if Span metadata exists in the context.
func StreamServerTracingInterceptor(logger *zap.Logger) grpc.StreamServerInterceptor {
func StreamServerTracingInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if remoteSpanContext, err := remoteSpanContextFromContext(stream.Context()); err == nil {
newCtx, span := trace.StartSpanWithRemoteParent(stream.Context(), "server_root", remoteSpanContext)
Expand Down Expand Up @@ -119,3 +124,47 @@ func remoteSpanContextFromContext(ctx context.Context) (trace.SpanContext, error

return trace.SpanContext{TraceID: traceID, SpanID: spanID, TraceOptions: traceOptions, Tracestate: nil}, nil
}

func grpcUnaryServerInterceptor(logger utils.ZapCompatibleLogger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
resp, err := handler(ctx, req)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(ctx, info.FullMethod, startTime)...)

utils.LogFinalLine(loggerWithFields, startTime, err, "finished unary call with code "+code.String(), code)

return resp, err
}
}

func grpcStreamServerInterceptor(logger utils.ZapCompatibleLogger) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
err := handler(srv, stream)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(logger, serverCallFields(stream.Context(), info.FullMethod, startTime)...)

utils.LogFinalLine(loggerWithFields, startTime, err, "finished stream call with code "+code.String(), code)

return err
}
}

const iso8601 = "2006-01-02T15:04:05.000Z0700" // keep timestamp formatting constant

func serverCallFields(ctx context.Context, fullMethodString string, start time.Time) []any {
var f []any
f = append(f, "grpc.start_time", start.UTC().Format(iso8601))
if d, ok := ctx.Deadline(); ok {
f = append(f, zap.String("grpc.request.deadline", d.UTC().Format(iso8601)))
}
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
return append(f, []any{
"span.kind", "server",
"system", "grpc",
"grpc.service", service,
"grpc.method", method,
})
}
37 changes: 4 additions & 33 deletions rpc/wrtc_client_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import (
"time"

grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"github.com/viamrobotics/webrtc/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -110,8 +107,9 @@ func (ch *webrtcClientChannel) Invoke(
) error {
startTime := time.Now()
err := ch.invokeWithInterceptor(ctx, method, args, reply, opts...)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
logFinalClientLine(loggerWithFields, startTime, err, "finished client unary call")
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client unary call", code)
return err
}

Expand Down Expand Up @@ -197,8 +195,9 @@ func (ch *webrtcClientChannel) NewStream(
) (grpc.ClientStream, error) {
startTime := time.Now()
clientStream, err := ch.streamWithInterceptor(ctx, method)
code := grpc_logging.DefaultErrorToCode(err)
loggerWithFields := utils.AddFieldsToLogger(ch.webrtcBaseChannel.logger, newClientLoggerFields(method)...)
logFinalClientLine(loggerWithFields, startTime, err, "finished client streaming call")
utils.LogFinalLine(loggerWithFields, startTime, err, "finished client streaming call", code)
return clientStream, err
}

Expand Down Expand Up @@ -341,34 +340,6 @@ func (ch *webrtcClientChannel) writeReset(stream *webrtcpb.Stream) error {
})
}

// taken from
// https://github.com/grpc-ecosystem/go-grpc-middleware/blob/560829fc74fcf9a69b7ab01d484f8b8961dc734b/logging/zap/client_interceptors.go
func logFinalClientLine(logger utils.ZapCompatibleLogger, startTime time.Time, err error, msg string) {
code := grpc_logging.DefaultErrorToCode(err)
level := grpc_zap.DefaultCodeToLevel(code)

// this calculation is done because duration.Milliseconds() will return an integer, which is not precise enough.
duration := float32(time.Since(startTime).Nanoseconds()/1000) / 1000
fields := []any{}
if err == nil {
level = zap.DebugLevel
} else {
fields = append(fields, "error", err)
}
fields = append(fields, "grpc.code", code.String(), "grpc.time_ms", duration)
// grpc_zap.DefaultCodeToLevel will only return zap.DebugLevel, zap.InfoLevel, zap.ErrorLevel, zap.WarnLevel
switch level {
case zap.DebugLevel:
logger.Debugw(msg, fields...)
case zap.InfoLevel:
logger.Infow(msg, fields...)
case zap.ErrorLevel:
logger.Errorw(msg, fields...)
case zap.WarnLevel, zap.DPanicLevel, zap.PanicLevel, zap.FatalLevel, zapcore.InvalidLevel:
logger.Warnw(msg, fields...)
}
}

func newClientLoggerFields(fullMethodString string) []any {
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
Expand Down
Loading