Skip to content

Commit

Permalink
feat: add request logging interceptor for API (#2203)
Browse files Browse the repository at this point in the history
Set `PEERDB_API_REQUEST_LOGGING_ENABLED` = true to enable request
logging. Separating it from the oauth interceptor incase oauth is
disabled and we wish to enable request logging
  • Loading branch information
iamKunalGupta authored Oct 30, 2024
1 parent 159d868 commit 7d63502
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 33 deletions.
15 changes: 12 additions & 3 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

"github.com/PeerDB-io/peer-flow/auth"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/middleware"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -213,14 +213,23 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
return fmt.Errorf("unable to create Temporal client: %w", err)
}

options, err := auth.AuthGrpcMiddleware([]string{
authGrpcMiddleware, err := middleware.AuthGrpcMiddleware([]string{
grpc_health_v1.Health_Check_FullMethodName,
grpc_health_v1.Health_Watch_FullMethodName,
})
if err != nil {
return err
}
grpcServer := grpc.NewServer(options...)

requestLoggingMiddleware := middleware.RequestLoggingMiddleWare()

// Interceptors are executed in the order they are passed to, so unauthorized requests are not logged
interceptors := grpc.ChainUnaryInterceptor(
authGrpcMiddleware,
requestLoggingMiddleware,
)

grpcServer := grpc.NewServer(interceptors)

catalogPool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions flow/middleware/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package middleware

import (
"context"
"log/slog"

"google.golang.org/grpc"

"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func RequestLoggingMiddleWare() grpc.UnaryServerInterceptor {
if !peerdbenv.PeerDBRAPIRequestLoggingEnabled() {
slog.Info("Request Logging Interceptor is disabled")
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return handler(ctx, req)
}
}
slog.Info("Setting up request logging middleware")
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
slog.Info("Received gRPC request", slog.String("method", info.FullMethod))

resp, err := handler(ctx, req)
if err != nil {
slog.Error("gRPC request failed", slog.String("method", info.FullMethod), slog.Any("error", err))
} else {
slog.Info("gRPC request completed successfully", slog.String("method", info.FullMethod))
}
return resp, err
}
}
48 changes: 19 additions & 29 deletions flow/auth/middleware.go → flow/middleware/oauth.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package auth
package middleware

import (
"context"
Expand Down Expand Up @@ -34,7 +34,7 @@ type identityProvider struct {
issuer string
}

func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, error) {
func AuthGrpcMiddleware(unauthenticatedMethods []string) (grpc.UnaryServerInterceptor, error) {
oauthConfig := peerdbenv.GetPeerDBOAuthConfig()
oauthJwtClaims := map[string]string{}
if oauthConfig.OAuthJwtClaimKey != "" {
Expand All @@ -57,7 +57,9 @@ func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, e

slog.Warn("authentication is disabled")

return nil, nil
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return handler(ctx, req)
}, nil
}

if err != nil {
Expand All @@ -68,36 +70,24 @@ func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, e
for _, method := range unauthenticatedMethods {
unauthenticatedMethodsMap[method] = struct{}{}
}
return []grpc.ServerOption{
grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
slog.Info("Received gRPC request", slog.String("method", info.FullMethod))

if _, unauthorized := unauthenticatedMethodsMap[info.FullMethod]; !unauthorized {
var authHeader string
authHeaders := metadata.ValueFromIncomingContext(ctx, "Authorization")
if len(authHeaders) == 1 {
authHeader = authHeaders[0]
} else if len(authHeaders) > 1 {
slog.Warn("Multiple Authorization headers supplied, request rejected", slog.String("method", info.FullMethod))
return nil, status.Errorf(codes.Unauthenticated, "multiple Authorization headers supplied, request rejected")
}
_, err := validateRequestToken(authHeader, cfg.OauthJwtCustomClaims, ip...)
if err != nil {
slog.Debug("Failed to validate request token", slog.String("method", info.FullMethod), slog.Any("error", err))
return nil, status.Errorf(codes.Unauthenticated, "%s", err.Error())
}
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
if _, unauthorized := unauthenticatedMethodsMap[info.FullMethod]; !unauthorized {
var authHeader string
authHeaders := metadata.ValueFromIncomingContext(ctx, "Authorization")
if len(authHeaders) == 1 {
authHeader = authHeaders[0]
} else if len(authHeaders) > 1 {
slog.Warn("Multiple Authorization headers supplied, request rejected", slog.String("method", info.FullMethod))
return nil, status.Errorf(codes.Unauthenticated, "multiple Authorization headers supplied, request rejected")
}

resp, err := handler(ctx, req)

_, err := validateRequestToken(authHeader, cfg.OauthJwtCustomClaims, ip...)
if err != nil {
slog.Error("gRPC request failed", slog.String("method", info.FullMethod), slog.Any("error", err))
} else {
slog.Info("gRPC request completed successfully", slog.String("method", info.FullMethod))
slog.Debug("Failed to validate request token", slog.String("method", info.FullMethod), slog.Any("error", err))
return nil, status.Errorf(codes.Unauthenticated, "%s", err.Error())
}
}

return resp, err
}),
return handler(ctx, req)
}, nil
}

Expand Down
10 changes: 10 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -156,3 +157,12 @@ func PeerDBGetIncidentIoUrl() string {
func PeerDBGetIncidentIoToken() string {
return GetEnvString("PEERDB_INCIDENTIO_TOKEN", "")
}

func PeerDBRAPIRequestLoggingEnabled() bool {
requestLoggingEnabled, err := strconv.ParseBool(GetEnvString("PEERDB_API_REQUEST_LOGGING_ENABLED", "false"))
if err != nil {
slog.Error("failed to parse PEERDB_API_REQUEST_LOGGING_ENABLED to bool", "error", err)
return false
}
return requestLoggingEnabled
}
6 changes: 5 additions & 1 deletion flow/peerdbenv/oauth.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package peerdbenv

import "strconv"
import (
"log/slog"
"strconv"
)

type PeerDBOAuthConfig struct {
// there can be more complex use cases where domain != issuer, but we handle them later if required
Expand All @@ -18,6 +21,7 @@ func GetPeerDBOAuthConfig() PeerDBOAuthConfig {
oauthDiscoveryEnabledString := GetEnvString("PEERDB_OAUTH_DISCOVERY_ENABLED", "false")
oauthDiscoveryEnabled, err := strconv.ParseBool(oauthDiscoveryEnabledString)
if err != nil {
slog.Error("failed to parse PEERDB_OAUTH_DISCOVERY_ENABLED to bool", "error", err)
oauthDiscoveryEnabled = false
}
oauthKeysetJson := GetEnvString("PEERDB_OAUTH_KEYSET_JSON", "")
Expand Down

0 comments on commit 7d63502

Please sign in to comment.