Skip to content

Commit

Permalink
Introduce ProcessManager to simplify VNet's public API
Browse files Browse the repository at this point in the history
  • Loading branch information
ravicious committed May 17, 2024
1 parent 44debec commit 0cff6ea
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 155 deletions.
115 changes: 38 additions & 77 deletions lib/teleterm/vnet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync"

"github.com/gravitational/trace"
"golang.org/x/sync/errgroup"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
Expand All @@ -50,18 +49,10 @@ const (
type Service struct {
api.UnimplementedVnetServiceServer

cfg Config
mu sync.Mutex
status status
// stopErrC is used to pass an error from goroutine that runs VNet in the background to the
// goroutine which handles RPC for stopping VNet. stopErrC gets closed after VNet stops. Starting
// VNet creates a new channel and assigns it as stopErrC.
//
// It's a buffered channel in case VNet crashes and there's no Stop RPC reading from stopErrC at
// that moment.
stopErrC chan error
// cancel stops the VNet instance running in a separate goroutine.
cancel context.CancelFunc
cfg Config
mu sync.Mutex
status status
processManager *vnet.ProcessManager
}

// New creates an instance of Service.
Expand Down Expand Up @@ -106,62 +97,34 @@ func (s *Service) Start(ctx context.Context, req *api.StartRequest) (*api.StartR
return &api.StartResponse{}, nil
}

socket, socketPath, err := vnet.CreateSocket(ctx)
if err != nil {
return nil, trace.Wrap(err)
}

longCtx, cancelLongCtx := context.WithCancel(context.Background())
s.cancel = cancelLongCtx
defer func() {
// If by the end of this RPC the service is not running, make sure to cancel the long context.
if s.status != statusRunning {
cancelLongCtx()
}
}()

g, longCtx := errgroup.WithContext(longCtx)

g.Go(func() error {
<-longCtx.Done()

return trace.Wrap(socket.Close())
})

ipv6Prefix, err := vnet.IPv6Prefix()
if err != nil {
return nil, trace.Wrap(err)
}
dnsIPv6 := vnet.Ipv6WithSuffix(ipv6Prefix, []byte{2})

g.Go(func() error {
return trace.Wrap(vnet.ExecAdminSubcommand(longCtx, socketPath, ipv6Prefix.String(), dnsIPv6.String()))
})

appProvider := &appProvider{
daemonService: s.cfg.DaemonService,
clientStore: s.cfg.ClientStore,
insecureSkipVerify: s.cfg.InsecureSkipVerify,
}

ns, err := vnet.Setup(ctx, appProvider, socket, ipv6Prefix, dnsIPv6)
processManager, err := vnet.SetupAndRun(ctx, appProvider)
if err != nil {
return nil, trace.Wrap(err)
}

g.Go(func() error {
return trace.Wrap(ns.Run(longCtx))
})

s.stopErrC = make(chan error, 1)
defer func() {
if s.status != statusRunning {
err := processManager.Close()
if err != nil && !errors.Is(err, context.Canceled) {
log.ErrorContext(ctx, "VNet closed with an error", "error", err)
} else {
log.DebugContext(ctx, "VNet closed")
}
}
}()

go func() {
err := g.Wait()
err := processManager.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
log.ErrorContext(longCtx, "VNet closed with an error", "error", err)
s.stopErrC <- err
log.ErrorContext(ctx, "VNet closed with an error", "error", err)
} else {
log.DebugContext(ctx, "VNet closed")
}
close(s.stopErrC)

// TODO(ravicious): Notify the Electron app about change of VNet state, but only if it's
// running. If it's not running, then the Start RPC has already failed and forwarded the error
Expand All @@ -170,9 +133,12 @@ func (s *Service) Start(ctx context.Context, req *api.StartRequest) (*api.StartR
s.mu.Lock()
defer s.mu.Unlock()

s.status = statusNotRunning
if s.status == statusRunning {
s.status = statusNotRunning
}
}()

s.processManager = processManager
s.status = statusRunning
return &api.StartResponse{}, nil
}
Expand All @@ -182,23 +148,12 @@ func (s *Service) Stop(ctx context.Context, req *api.StopRequest) (*api.StopResp
s.mu.Lock()
defer s.mu.Unlock()

errC := make(chan error)

go func() {
errC <- trace.Wrap(s.stopLocked())
}()

select {
case <-ctx.Done():
return nil, trace.Wrap(ctx.Err())
case err := <-errC:
if err != nil {
return nil, trace.Wrap(err)
}

return &api.StopResponse{}, nil
err := s.stopLocked()
if err != nil {
return nil, trace.Wrap(err)
}

return &api.StopResponse{}, nil
}

func (s *Service) stopLocked() error {
Expand All @@ -210,10 +165,13 @@ func (s *Service) stopLocked() error {
return nil
}

s.cancel()
s.status = statusNotRunning
err := s.processManager.Close()
if err != nil && !errors.Is(err, context.Canceled) {
return trace.Wrap(err)
}

return trace.Wrap(<-s.stopErrC)
s.status = statusNotRunning
return nil
}

// Close stops VNet service and prevents it from being started again. Blocks until VNet stops.
Expand All @@ -223,9 +181,12 @@ func (s *Service) Close() error {
defer s.mu.Unlock()

err := s.stopLocked()
s.status = statusClosed
if err != nil {
return trace.Wrap(err)
}

return trace.Wrap(err)
s.status = statusClosed
return nil
}

type appProvider struct {
Expand Down
134 changes: 119 additions & 15 deletions lib/vnet/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,95 @@ package vnet
import (
"context"
"log/slog"
"net"
"os"
"time"

"github.com/gravitational/trace"
"golang.org/x/sync/errgroup"
"golang.zx2c4.com/wireguard/tun"
"gvisor.dev/gvisor/pkg/tcpip"

"github.com/gravitational/teleport/api/profile"
"github.com/gravitational/teleport/api/types"
)

// CreateSocket creates a socket that's going to be used to receive the TUN device created by the
// admin subcommand. The admin subcommand quits when it detects that the socket has been closed.
func CreateSocket(ctx context.Context) (*net.UnixListener, string, error) {
socket, socketPath, err := createUnixSocket()
// SetupAndRun creates a network stack for VNet and runs it in the background. To do this, it also
// needs to launch an admin subcommand in the background. It returns [ProcessManager] which controls
// the lifecycle of both background tasks.
//
// The caller is expected to call Close on the process manager to close the network stack and clean
// up any resources used by it.
//
// ctx is used to wait for setup steps that happen before SetupAndRun hands out the control to the
// process manager. If ctx gets canceled during SetupAndRun, the process manager gets closed along
// with its background tasks.
func SetupAndRun(ctx context.Context, appProvider AppProvider) (*ProcessManager, error) {
ipv6Prefix, err := IPv6Prefix()
if err != nil {
return nil, "", trace.Wrap(err)
return nil, trace.Wrap(err)
}
slog.DebugContext(ctx, "Created unix socket for admin subcommand", "socket", socketPath)
return socket, socketPath, nil
}
dnsIPv6 := Ipv6WithSuffix(ipv6Prefix, []byte{2})

// TODO: Add comment.
func Setup(ctx context.Context, appProvider AppProvider, socket *net.UnixListener, ipv6Prefix, dnsIPv6 tcpip.Address) (*NetworkStack, error) {
tun, err := receiveTUNDevice(ctx, socket)
pm := newProcessManager()
success := false
defer func() {
if !success {
// Closes the socket and background tasks.
pm.Close()
}
}()

// Create the socket that's used to receive the TUN device from the admin subcommand.
socket, socketPath, err := createUnixSocket()
if err != nil {
return nil, trace.Wrap(err)
}
slog.DebugContext(ctx, "Created unix socket for admin subcommand", "socket", socketPath)
pm.AddBackgroundTask(func(ctx context.Context) error {
<-ctx.Done()
return trace.Wrap(socket.Close())
})

// A channel to capture an error when waiting for a TUN device to be created.
//
// To create a TUN device, VNet first needs to start the admin subcommand. When the subcommand
// starts, osascript shows a password prompt. If the user closes this prompt, execAdminSubcommand
// fails and the socket ends up being closed. To make sure that the user sees the error from
// osascript about prompt being closed instead of an error from receiveTUNDevice about reading
// from a closed socket, we send the error from osascript immediately through this channel, rather
// than depending on pm.Wait.
tunOrAdminSubcommandErrC := make(chan error, 2)
var tun tun.Device

pm.AddBackgroundTask(func(ctx context.Context) error {
err := execAdminSubcommand(ctx, socketPath, ipv6Prefix.String(), dnsIPv6.String())
// Pass the osascript error immediately, without having to wait on pm to propagate the error.
tunOrAdminSubcommandErrC <- trace.Wrap(err)
return trace.Wrap(err)
})

go func() {
tunDevice, err := receiveTUNDevice(socket)
tun = tunDevice
tunOrAdminSubcommandErrC <- err
}()

select {
case <-ctx.Done():
return nil, trace.Wrap(ctx.Err())
case err := <-tunOrAdminSubcommandErrC:
if err != nil {
return nil, trace.Wrap(err)
}

if tun == nil {
// If the execution ever gets there, it's because of a bug.
return nil, trace.Errorf("no TUN device created, execAdminSubcommand must have returned early with no error")
}
}

appResolver := NewTCPAppResolver(appProvider)

ns, err := NewNetworkStack(&Config{
ns, err := newNetworkStack(&Config{
TUNDevice: tun,
IPv6Prefix: ipv6Prefix,
DNSIPv6: dnsIPv6,
Expand All @@ -61,7 +117,55 @@ func Setup(ctx context.Context, appProvider AppProvider, socket *net.UnixListene
return nil, trace.Wrap(err)
}

return ns, nil
pm.AddBackgroundTask(func(ctx context.Context) error {
return trace.Wrap(ns.Run(ctx))
})

success = true
return pm, nil
}

func newProcessManager() *ProcessManager {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

return &ProcessManager{
g: g,
ctx: ctx,
cancel: cancel,
}
}

// ProcessManager handles background tasks needed to run VNet.
// Its semantics are similar to an error group with context.
type ProcessManager struct {
g *errgroup.Group
ctx context.Context
cancel context.CancelFunc
}

// AddBackgroundTask adds a function to the error group. The context passed to bgTaskFunc is a
// background context that gets canceled when Close is called or any added bgTaskFunc returns an
// error.
func (pm *ProcessManager) AddBackgroundTask(bgTaskFunc func(ctx context.Context) error) {
pm.g.Go(func() error {
return trace.Wrap(bgTaskFunc(pm.ctx))
})
}

// Wait blocks and waits for the background tasks to finish, which typically happens when another
// goroutine calls Close on the process manager.
func (pm *ProcessManager) Wait() error {
return trace.Wrap(pm.g.Wait())
}

// Close stops any active background tasks by canceling the underlying context. It then returns the
// error from the error group.
func (pm *ProcessManager) Close() error {
go func() {
pm.cancel()
}()
return trace.Wrap(pm.g.Wait())
}

// AdminSubcommand is the tsh subcommand that should run as root that will create and setup a TUN device and
Expand Down
Loading

0 comments on commit 0cff6ea

Please sign in to comment.