Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparation for the QUIC proxy peering implementation #48836

Merged
merged 10 commits into from
Nov 13, 2024
8 changes: 4 additions & 4 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,10 @@ const (
// group they should attempt to be connected to.
ProxyGroupGenerationLabel = TeleportInternalLabelPrefix + "proxygroup-gen"

// ProxyPeerQUICLabel is the internal-user label for proxy heartbeats that's
// used to signal that the proxy supports receiving proxy peering
// connections over QUIC.
ProxyPeerQUICLabel = TeleportInternalLabelPrefix + "proxy-peer-quic"
// UnstableProxyPeerQUICLabel is the internal-use label for proxy heartbeats
// that's used to signal that the proxy supports receiving proxy peering
// connections over QUIC. The value should be "yes".
UnstableProxyPeerQUICLabel = TeleportInternalLabelPrefix + "proxy-peer-quic"

// OktaAppNameLabel is the individual app name label.
OktaAppNameLabel = TeleportInternalLabelPrefix + "okta-app-name"
Expand Down
5 changes: 5 additions & 0 deletions lib/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2727,6 +2727,11 @@ func Configure(clf *CommandLineFlags, cfg *servicecfg.Config, legacyAppFlags boo
cfg.DebugService.Enabled = false
}

// TODO(espadolini): allow this when the implementation is merged
if false && os.Getenv("TELEPORT_UNSTABLE_QUIC_PROXY_PEERING") == "yes" {
codingllama marked this conversation as resolved.
Show resolved Hide resolved
cfg.Proxy.QUICProxyPeering = true
}

return nil
}

Expand Down
60 changes: 0 additions & 60 deletions lib/proxy/clusterdial/dial.go

This file was deleted.

89 changes: 32 additions & 57 deletions lib/proxy/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
streamutils "github.com/gravitational/teleport/api/utils/grpc/stream"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/proxy/peer/internal"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
)
Expand Down Expand Up @@ -94,11 +95,11 @@ type ClientConfig struct {
}

// connShuffler shuffles the order of client connections.
type connShuffler func([]clientConn)
type connShuffler func([]internal.ClientConn)

// randomConnShuffler returns a conn shuffler that randomizes the order of connections.
func randomConnShuffler() connShuffler {
return func(conns []clientConn) {
return func(conns []internal.ClientConn) {
rand.Shuffle(len(conns), func(i, j int) {
conns[i], conns[j] = conns[j], conns[i]
})
Expand All @@ -107,7 +108,7 @@ func randomConnShuffler() connShuffler {

// noopConnShutffler returns a conn shuffler that keeps the original connection ordering.
func noopConnShuffler() connShuffler {
return func([]clientConn) {}
return func([]internal.ClientConn) {}
}

// checkAndSetDefaults checks and sets default values
Expand Down Expand Up @@ -163,32 +164,6 @@ func (c *ClientConfig) checkAndSetDefaults() error {
return nil
}

// clientConn manages client connections to a specific peer proxy (with a fixed
// host ID and address).
type clientConn interface {
// peerID returns the host ID of the peer proxy.
peerID() string
// peerAddr returns the address of the peer proxy.
peerAddr() string

// dial opens a connection of a given tunnel type to a node with the given
// ID through the peer proxy managed by the clientConn.
dial(
nodeID string,
src net.Addr,
dst net.Addr,
tunnelType types.TunnelType,
) (net.Conn, error)

// close closes all connections and releases any background resources
// immediately.
close() error

// shutdown waits until all connections are closed or the context is done,
// then acts like close.
shutdown(context.Context)
}

// grpcClientConn manages client connections to a specific peer proxy over gRPC.
type grpcClientConn struct {
cc *grpc.ClientConn
Expand All @@ -205,13 +180,13 @@ type grpcClientConn struct {
count int
}

var _ clientConn = (*grpcClientConn)(nil)
var _ internal.ClientConn = (*grpcClientConn)(nil)

// peerID implements [clientConn].
func (c *grpcClientConn) peerID() string { return c.id }
// PeerID implements [internal.ClientConn].
func (c *grpcClientConn) PeerID() string { return c.id }

// peerAddr implements [clientConn].
func (c *grpcClientConn) peerAddr() string { return c.addr }
// PeerAddr implements [internal.ClientConn].
func (c *grpcClientConn) PeerAddr() string { return c.addr }

// maybeAcquire returns a non-nil release func if the grpcClientConn is
// currently allowed to open connections; i.e., if it hasn't fully shut down.
Expand All @@ -234,8 +209,8 @@ func (c *grpcClientConn) maybeAcquire() (release func()) {
})
}

// shutdown implements [clientConn].
func (c *grpcClientConn) shutdown(ctx context.Context) {
// Shutdown implements [internal.ClientConn].
func (c *grpcClientConn) Shutdown(ctx context.Context) {
defer c.cc.Close()

c.mu.Lock()
Expand All @@ -255,13 +230,13 @@ func (c *grpcClientConn) shutdown(ctx context.Context) {
}
}

// close implements [clientConn].
func (c *grpcClientConn) close() error {
// Close implements [internal.ClientConn].
func (c *grpcClientConn) Close() error {
return c.cc.Close()
}

// dial implements [clientConn].
func (c *grpcClientConn) dial(
// Dial implements [internal.ClientConn].
func (c *grpcClientConn) Dial(
nodeID string,
src net.Addr,
dst net.Addr,
Expand Down Expand Up @@ -335,7 +310,7 @@ type Client struct {
cancel context.CancelFunc

config ClientConfig
conns map[string]clientConn
conns map[string]internal.ClientConn
metrics *clientMetrics
reporter *reporter
}
Expand All @@ -360,7 +335,7 @@ func NewClient(config ClientConfig) (*Client, error) {
config: config,
ctx: closeContext,
cancel: cancel,
conns: make(map[string]clientConn),
conns: make(map[string]internal.ClientConn),
metrics: metrics,
reporter: reporter,
}
Expand Down Expand Up @@ -453,7 +428,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
}

var toDelete []string
toKeep := make(map[string]clientConn)
toKeep := make(map[string]internal.ClientConn)
for id, conn := range c.conns {
proxy, ok := toDial[id]

Expand All @@ -464,7 +439,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
}

// peer address changed
if conn.peerAddr() != proxy.GetPeerAddr() {
if conn.PeerAddr() != proxy.GetPeerAddr() {
toDelete = append(toDelete, id)
continue
}
Expand All @@ -485,8 +460,8 @@ func (c *Client) updateConnections(proxies []types.Server) error {
}

// establish new connections
_, supportsQuic := proxy.GetLabel(types.ProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic)
supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes")
codingllama marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
c.metrics.reportTunnelError(errorProxyPeerTunnelDial)
c.config.Log.DebugContext(c.ctx, "error dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
Expand All @@ -503,7 +478,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {

for _, id := range toDelete {
if conn, ok := c.conns[id]; ok {
go conn.shutdown(c.ctx)
go conn.Shutdown(c.ctx)
}
}
c.conns = toKeep
Expand Down Expand Up @@ -556,9 +531,9 @@ func (c *Client) Shutdown(ctx context.Context) {
var wg sync.WaitGroup
for _, conn := range c.conns {
wg.Add(1)
go func(conn clientConn) {
go func(conn internal.ClientConn) {
defer wg.Done()
conn.shutdown(ctx)
conn.Shutdown(ctx)
}(conn)
}
wg.Wait()
Expand All @@ -572,7 +547,7 @@ func (c *Client) Stop() error {

var errs []error
for _, conn := range c.conns {
if err := conn.close(); err != nil {
if err := conn.Close(); err != nil {
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -627,7 +602,7 @@ func (c *Client) dial(

var errs []error
for _, clientConn := range conns {
conn, err := clientConn.dial(nodeID, src, dst, tunnelType)
conn, err := clientConn.Dial(nodeID, src, dst, tunnelType)
if err != nil {
errs = append(errs, trace.Wrap(err))
continue
Expand All @@ -643,13 +618,13 @@ func (c *Client) dial(
// otherwise.
// The boolean returned in the second argument is intended for testing purposes,
// to indicates whether the connection was cached or newly established.
func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
func (c *Client) getConnections(proxyIDs []string) ([]internal.ClientConn, bool, error) {
if len(proxyIDs) == 0 {
return nil, false, trace.BadParameter("failed to dial: no proxy ids given")
}

ids := make(map[string]struct{})
var conns []clientConn
var conns []internal.ClientConn

// look for existing matching connections.
c.RLock()
Expand Down Expand Up @@ -686,8 +661,8 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
continue
}

_, supportsQuic := proxy.GetLabel(types.ProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic)
supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes")
espadolini marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
c.metrics.reportTunnelError(errorProxyPeerTunnelDirectDial)
c.config.Log.DebugContext(c.ctx, "error direct dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
Expand All @@ -707,15 +682,15 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
defer c.Unlock()

for _, conn := range conns {
c.conns[conn.peerID()] = conn
c.conns[conn.PeerID()] = conn
}

c.config.connShuffler(conns)
return conns, false, nil
}

// connect dials a new connection to proxyAddr.
func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (clientConn, error) {
func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (internal.ClientConn, error) {
if supportsQUIC && c.config.QUICTransport != nil {
panic("QUIC proxy peering is not implemented")
}
Expand Down
3 changes: 2 additions & 1 deletion lib/proxy/peer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gravitational/teleport/api/client/proto"
clientapi "github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/proxy/peer/internal"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -208,7 +209,7 @@ func TestBackupClient(t *testing.T) {
require.True(t, dialCalled)
}

func waitForGRPCConns(t *testing.T, conns map[string]clientConn, d time.Duration) {
func waitForGRPCConns(t *testing.T, conns map[string]internal.ClientConn, d time.Duration) {
require.Eventually(t, func() bool {
for _, conn := range conns {
// panic if we hit a non-grpc client conn
Expand Down
7 changes: 3 additions & 4 deletions lib/proxy/peer/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/credentials"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/proxy/peer/internal"
"github.com/gravitational/teleport/lib/tlsca"
)

Expand Down Expand Up @@ -74,15 +75,13 @@ func (c *clientCredentials) ClientHandshake(ctx context.Context, laddr string, c
}

if err := validatePeer(c.peerID, identity); err != nil {
c.log.ErrorContext(ctx, duplicatePeerMsg, "peer_addr", c.peerAddr, "peer_id", c.peerID)
internal.LogDuplicatePeer(ctx, c.log, slog.LevelError, "peer_addr", c.peerAddr, "peer_id", c.peerID)
return nil, nil, trace.Wrap(err)
}

return conn, authInfo, nil
}

const duplicatePeerMsg = "Detected multiple Proxy Peers with the same public address when connecting to a Proxy which can lead to inconsistent state and problems establishing sessions. For best results ensure that `peer_public_addr` is unique per proxy and not a load balancer."

// getIdentity returns a [tlsca.Identity] that is created from the certificate
// presented during the TLS handshake.
func getIdentity(authInfo credentials.AuthInfo) (*tlsca.Identity, error) {
Expand Down Expand Up @@ -121,5 +120,5 @@ func validatePeer(peerID string, identity *tlsca.Identity) error {
return nil
}

return trace.AccessDenied("connected to unexpected proxy")
return trace.Wrap(internal.WrongProxyError{})
}
Loading
Loading