Skip to content

Commit

Permalink
Pass port from VNet to local proxy (#49453)
Browse files Browse the repository at this point in the history
* Prepare app specs in tests for specifying TCP ports

* Refactor logging in lib/vnet/app_resolver.go

Use libutils.NewPackageLogger, call it log instead of slog which makes
it harder to use the imported default slog logger instead of the one from
a struct.

Move creation of logger within TCPAppResolver.resolveTCPHandlerForCluster

* Pass port from VNet to local proxy

* Don't create another package logger

* Don't pass logger to newTCPAppHandler

* Fix typo in comment

* Explicitly pass port to dialHost

* Convert multi-line definitions of simple appSpecs to single-line

* Add TODO comment about validating local port

* Empty commit to trigger CI
  • Loading branch information
ravicious committed Dec 3, 2024
1 parent 511d7dd commit 29c4a88
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 67 deletions.
116 changes: 80 additions & 36 deletions lib/vnet/app_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"log/slog"
"net"
"strings"
"sync"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -94,7 +95,7 @@ type DialOptions struct {
type TCPAppResolver struct {
appProvider AppProvider
clusterConfigCache *ClusterConfigCache
slog *slog.Logger
log *slog.Logger
clock clockwork.Clock
}

Expand All @@ -109,7 +110,7 @@ type TCPAppResolver struct {
func NewTCPAppResolver(appProvider AppProvider, opts ...tcpAppResolverOption) (*TCPAppResolver, error) {
r := &TCPAppResolver{
appProvider: appProvider,
slog: slog.With(teleport.ComponentKey, "VNet.AppResolver"),
log: log.With(teleport.ComponentKey, "VNet.AppResolver"),
}
for _, opt := range opts {
opt(r)
Expand Down Expand Up @@ -159,7 +160,7 @@ func (r *TCPAppResolver) ResolveTCPHandler(ctx context.Context, fqdn string) (*T
// the error but don't return it so that DNS resolution will be forwarded upstream instead of
// failing, to avoid breaking e.g. web app access (we don't know if this is a web or TCP app yet
// because we can't log in).
slog.ErrorContext(ctx, "Failed to get teleport client.", "error", err)
r.log.ErrorContext(ctx, "Failed to get teleport client.", "error", err)
continue
}

Expand All @@ -168,8 +169,7 @@ func (r *TCPAppResolver) ResolveTCPHandler(ctx context.Context, fqdn string) (*T
leafClusterName = clusterClient.ClusterName()
}

slog := r.slog.With("profile", profileName, "fqdn", fqdn, "leaf_cluster", leafClusterName)
return r.resolveTCPHandlerForCluster(ctx, slog, clusterClient, profileName, leafClusterName, fqdn)
return r.resolveTCPHandlerForCluster(ctx, clusterClient, profileName, leafClusterName, fqdn)
}
// fqdn did not match any profile, forward the request upstream.
return nil, ErrNoTCPHandler
Expand All @@ -180,7 +180,7 @@ var errNoMatch = errors.New("cluster does not match queried FQDN")
func (r *TCPAppResolver) clusterClientForAppFQDN(ctx context.Context, profileName, fqdn string) (ClusterClient, error) {
rootClient, err := r.appProvider.GetCachedClient(ctx, profileName, "")
if err != nil {
r.slog.ErrorContext(ctx, "Failed to get root cluster client, apps in this cluster will not be resolved.", "profile", profileName, "error", err)
r.log.ErrorContext(ctx, "Failed to get root cluster client, apps in this cluster will not be resolved.", "profile", profileName, "error", err)
return nil, errNoMatch
}

Expand All @@ -192,7 +192,7 @@ func (r *TCPAppResolver) clusterClientForAppFQDN(ctx context.Context, profileNam
leafClusters, err := getLeafClusters(ctx, rootClient)
if err != nil {
// Good chance we're here because the user is not logged in to the profile.
r.slog.ErrorContext(ctx, "Failed to list leaf clusters, apps in this cluster will not be resolved.", "profile", profileName, "error", err)
r.log.ErrorContext(ctx, "Failed to list leaf clusters, apps in this cluster will not be resolved.", "profile", profileName, "error", err)
return nil, errNoMatch
}

Expand All @@ -201,13 +201,13 @@ func (r *TCPAppResolver) clusterClientForAppFQDN(ctx context.Context, profileNam
for _, leafClusterName := range allClusters {
clusterClient, err := r.appProvider.GetCachedClient(ctx, profileName, leafClusterName)
if err != nil {
r.slog.ErrorContext(ctx, "Failed to get cluster client, apps in this cluster will not be resolved.", "profile", profileName, "leaf_cluster", leafClusterName, "error", err)
r.log.ErrorContext(ctx, "Failed to get cluster client, apps in this cluster will not be resolved.", "profile", profileName, "leaf_cluster", leafClusterName, "error", err)
continue
}

clusterConfig, err := r.clusterConfigCache.GetClusterConfig(ctx, clusterClient)
if err != nil {
r.slog.ErrorContext(ctx, "Failed to get VnetConfig, apps in the cluster will not be resolved.", "profile", profileName, "leaf_cluster", leafClusterName, "error", err)
r.log.ErrorContext(ctx, "Failed to get VnetConfig, apps in the cluster will not be resolved.", "profile", profileName, "leaf_cluster", leafClusterName, "error", err)
continue
}
for _, zone := range clusterConfig.DNSZones {
Expand Down Expand Up @@ -242,10 +242,10 @@ func getLeafClusters(ctx context.Context, rootClient ClusterClient) ([]string, e
// query.
func (r *TCPAppResolver) resolveTCPHandlerForCluster(
ctx context.Context,
slog *slog.Logger,
clusterClient ClusterClient,
profileName, leafClusterName, fqdn string,
) (*TCPHandlerSpec, error) {
log := r.log.With("profile", profileName, "leaf_cluster", leafClusterName, "fqdn", fqdn)
// An app public_addr could technically be full-qualified or not, match either way.
expr := fmt.Sprintf(`(resource.spec.public_addr == "%s" || resource.spec.public_addr == "%s") && hasPrefix(resource.spec.uri, "tcp://")`,
strings.TrimSuffix(fqdn, "."), fqdn)
Expand All @@ -257,7 +257,7 @@ func (r *TCPAppResolver) resolveTCPHandlerForCluster(
if err != nil {
// Don't return an unexpected error so we can try to find the app in different clusters or forward the
// request upstream.
slog.InfoContext(ctx, "Failed to list application servers.", "error", err)
log.InfoContext(ctx, "Failed to list application servers.", "error", err)
return nil, ErrNoTCPHandler
}
if len(resp.Resources) == 0 {
Expand All @@ -282,9 +282,15 @@ func (r *TCPAppResolver) resolveTCPHandlerForCluster(
}

type tcpAppHandler struct {
profileName string
leafClusterName string
lp *alpnproxy.LocalProxy
log *slog.Logger
appProvider AppProvider
clock clockwork.Clock
profileName string
leafClusterName string
app types.Application
portToLocalProxy map[uint16]*alpnproxy.LocalProxy
// mu guards access to portToLocalProxy.
mu sync.Mutex
}

func (r *TCPAppResolver) newTCPAppHandler(
Expand All @@ -293,38 +299,73 @@ func (r *TCPAppResolver) newTCPAppHandler(
leafClusterName string,
app types.Application,
) (*tcpAppHandler, error) {
dialOpts, err := r.appProvider.GetDialOptions(ctx, profileName)
return &tcpAppHandler{
appProvider: r.appProvider,
clock: r.clock,
profileName: profileName,
leafClusterName: leafClusterName,
app: app,
portToLocalProxy: make(map[uint16]*alpnproxy.LocalProxy),
log: r.log.With(teleport.ComponentKey, "VNet.AppHandler",
"profile", profileName, "leaf_cluster", leafClusterName, "fqdn", app.GetPublicAddr()),
}, nil
}

// getOrInitializeLocalProxy returns a separate local proxy for each port for multi-port apps. For
// single-port apps, it returns the same local proxy no matter the port.
func (h *tcpAppHandler) getOrInitializeLocalProxy(ctx context.Context, localPort uint16) (*alpnproxy.LocalProxy, error) {
h.mu.Lock()
defer h.mu.Unlock()

// Connections to single-port apps need to go through a local proxy that has a cert with TargetPort
// set to 0. This ensures that the old behavior is kept for such apps, where the client can dial
// the public address of an app on any port and be routed to the port from the URI.
//
// https://github.com/gravitational/teleport/blob/master/rfd/0182-multi-port-tcp-app-access.md#vnet-with-single-port-apps
if len(h.app.GetTCPPorts()) == 0 {
localPort = 0
}
// TODO(ravicious): For multi-port apps, check if localPort is valid and surface the error in UI.
// https://github.com/gravitational/teleport/blob/master/rfd/0182-multi-port-tcp-app-access.md#incorrect-port

lp, ok := h.portToLocalProxy[localPort]
if ok {
return lp, nil
}

dialOpts, err := h.appProvider.GetDialOptions(ctx, h.profileName)
if err != nil {
return nil, trace.Wrap(err, "getting dial options for profile %q", profileName)
return nil, trace.Wrap(err, "getting dial options for profile %q", h.profileName)
}
clusterClient, err := r.appProvider.GetCachedClient(ctx, profileName, leafClusterName)
clusterClient, err := h.appProvider.GetCachedClient(ctx, h.profileName, h.leafClusterName)
if err != nil {
return nil, trace.Wrap(err)
}

routeToApp := proto.RouteToApp{
Name: app.GetName(),
PublicAddr: app.GetPublicAddr(),
Name: h.app.GetName(),
PublicAddr: h.app.GetPublicAddr(),
// ClusterName must not be set to "" when targeting an app from a root cluster. Otherwise the
// connection routed through a local proxy will just get lost somewhere in the cluster (with no
// clear error being reported) and hang forever.
ClusterName: clusterClient.ClusterName(),
URI: app.GetURI(),
URI: h.app.GetURI(),
TargetPort: uint32(localPort),
}

appCertIssuer := &appCertIssuer{
appProvider: r.appProvider,
profileName: profileName,
leafClusterName: leafClusterName,
appProvider: h.appProvider,
profileName: h.profileName,
leafClusterName: h.leafClusterName,
routeToApp: routeToApp,
}
certChecker := client.NewCertChecker(appCertIssuer, r.clock)
certChecker := client.NewCertChecker(appCertIssuer, h.clock)
middleware := &localProxyMiddleware{
certChecker: certChecker,
appProvider: r.appProvider,
appProvider: h.appProvider,
routeToApp: routeToApp,
profileName: profileName,
leafClusterName: leafClusterName,
profileName: h.profileName,
leafClusterName: h.leafClusterName,
}

localProxyConfig := alpnproxy.LocalProxyConfig{
Expand All @@ -336,25 +377,28 @@ func (r *TCPAppResolver) newTCPAppHandler(
ALPNConnUpgradeRequired: dialOpts.ALPNConnUpgradeRequired,
Middleware: middleware,
InsecureSkipVerify: dialOpts.InsecureSkipVerify,
Clock: r.clock,
Clock: h.clock,
}

lp, err := alpnproxy.NewLocalProxy(localProxyConfig)
h.log.DebugContext(ctx, "Creating local proxy", "target_port", localPort)
newLP, err := alpnproxy.NewLocalProxy(localProxyConfig)
if err != nil {
return nil, trace.Wrap(err, "creating local proxy")
}

return &tcpAppHandler{
profileName: profileName,
leafClusterName: leafClusterName,
lp: lp,
}, nil
h.portToLocalProxy[localPort] = newLP

return newLP, nil
}

// HandleTCPConnector handles an incoming TCP connection from VNet by passing it to the local alpn proxy,
// which is set up with middleware to automatically handler certificate renewal and re-logins.
func (h *tcpAppHandler) HandleTCPConnector(ctx context.Context, connector func() (net.Conn, error)) error {
return trace.Wrap(h.lp.HandleTCPConnector(ctx, connector), "handling TCP connector")
func (h *tcpAppHandler) HandleTCPConnector(ctx context.Context, localPort uint16, connector func() (net.Conn, error)) error {
lp, err := h.getOrInitializeLocalProxy(ctx, localPort)
if err != nil {
return trace.Wrap(err)
}
return trace.Wrap(lp.HandleTCPConnector(ctx, connector), "handling TCP connector")
}

// appCertIssuer implements [client.CertIssuer].
Expand Down
4 changes: 2 additions & 2 deletions lib/vnet/vnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type TCPHandlerSpec struct {
// [connector] to complete the TCP handshake and get the TCP conn. This is so that clients will see that the
// TCP connection was refused, instead of seeing a successful TCP dial that is immediately closed.
type TCPHandler interface {
HandleTCPConnector(ctx context.Context, connector func() (net.Conn, error)) error
HandleTCPConnector(ctx context.Context, localPort uint16, connector func() (net.Conn, error)) error
}

// UDPHandler defines the behavior for handling UDP connections from VNet.
Expand Down Expand Up @@ -423,7 +423,7 @@ func (ns *NetworkStack) handleTCP(req *tcp.ForwarderRequest) {
return conn, nil
}

if err := handler.HandleTCPConnector(ctx, connector); err != nil {
if err := handler.HandleTCPConnector(ctx, id.LocalPort, connector); err != nil {
if errors.Is(err, context.Canceled) {
slog.DebugContext(ctx, "TCP connection handler returned early due to canceled context.")
} else {
Expand Down
Loading

0 comments on commit 29c4a88

Please sign in to comment.