diff --git a/backend/common/bind/bind_allocator.go b/backend/common/bind/bind_allocator.go new file mode 100644 index 0000000000..34d54d7f26 --- /dev/null +++ b/backend/common/bind/bind_allocator.go @@ -0,0 +1,51 @@ +package bind + +import ( + "fmt" + "net" + "net/url" + "strconv" + + "github.com/alecthomas/atomic" + "github.com/alecthomas/errors" +) + +type BindAllocator struct { + baseURL *url.URL + port atomic.Int32 +} + +func NewBindAllocator(url *url.URL) (*BindAllocator, error) { + _, portStr, err := net.SplitHostPort(url.Host) + if err != nil { + return nil, errors.WithStack(err) + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, errors.WithStack(err) + } + + return &BindAllocator{ + baseURL: url, + port: atomic.NewInt32(int32(port) - 1), //nolint:gosec + }, nil +} + +func (b *BindAllocator) Next() *url.URL { + var l *net.TCPListener + var err error + for { + b.port.Add(1) + l, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(b.baseURL.Hostname()), Port: int(b.port.Load())}) + + if err != nil { + continue + } + _ = l.Close() + + newURL := *b.baseURL + newURL.Host = net.JoinHostPort(b.baseURL.Hostname(), fmt.Sprintf("%d", b.port.Load())) + return &newURL + } +} diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 93984aa680..a92526cb46 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -36,6 +36,7 @@ import ( "github.com/TBD54566975/ftl/backend/common/sha256" "github.com/TBD54566975/ftl/backend/common/slices" "github.com/TBD54566975/ftl/backend/controller/internal/dal" + "github.com/TBD54566975/ftl/backend/controller/scaling" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/console" ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1" @@ -66,7 +67,7 @@ func (c *Config) SetDefaults() { } // Start the Controller. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config) error { +func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling) error { config.SetDefaults() logger := log.FromContext(ctx) @@ -87,7 +88,7 @@ func Start(ctx context.Context, config Config) error { return errors.WithStack(err) } - svc, err := New(ctx, dal, config) + svc, err := New(ctx, dal, config, runnerScaling) if err != nil { return errors.WithStack(err) } @@ -120,12 +121,13 @@ type Service struct { // Map from endpoint to client. clients *ttlcache.Cache[string, clients] - routesMu sync.RWMutex - routes map[string][]dal.Route - config Config + routesMu sync.RWMutex + routes map[string][]dal.Route + config Config + runnerScaling scaling.RunnerScaling } -func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) { +func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) { key := config.Key if config.Key.ULID() == (ulid.ULID{}) { key = model.NewControllerKey() @@ -138,6 +140,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) { clients: ttlcache.New[string, clients](ttlcache.WithTTL[string, clients](time.Minute)), routes: map[string][]dal.Route{}, config: config, + runnerScaling: runnerScaling, } go runWithRetries(ctx, time.Second*1, time.Second*2, svc.syncRoutes) @@ -146,6 +149,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) { go runWithRetries(ctx, config.RunnerTimeout, time.Second*10, svc.reapStaleRunners) go runWithRetries(ctx, config.DeploymentReservationTimeout, time.Second*20, svc.releaseExpiredReservations) go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileDeployments) + go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileRunners) return svc, nil } @@ -792,6 +796,34 @@ func (s *Service) reconcileDeployments(ctx context.Context) error { return nil } +func (s *Service) reconcileRunners(ctx context.Context) error { + activeDeployments, err := s.dal.GetActiveDeployments(ctx) + if err != nil { + return errors.Wrap(err, "failed to get deployments needing reconciliation") + } + + totalRunners := 0 + for _, deployment := range activeDeployments { + totalRunners += deployment.MinReplicas + } + + // It's possible that idles runners will get terminated here, but they will get recreated in the next + // reconciliation cycle. + idleRunners, err := s.dal.GetIdleRunners(ctx, 16, model.Labels{}) + if err != nil { + return errors.WithStack(err) + } + + idleRunnerKeys := slices.Map(idleRunners, func(r dal.Runner) model.RunnerKey { return r.Key }) + + err = s.runnerScaling.SetReplicas(ctx, totalRunners, idleRunnerKeys) + if err != nil { + return errors.WithStack(err) + } + + return nil +} + func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentName) (bool, error) { runners, err := s.dal.GetRunnersForDeployment(ctx, key) if err != nil { diff --git a/backend/controller/scaling/k8s_scaling.go b/backend/controller/scaling/k8s_scaling.go new file mode 100644 index 0000000000..4032dc3566 --- /dev/null +++ b/backend/controller/scaling/k8s_scaling.go @@ -0,0 +1,20 @@ +package scaling + +import ( + "context" + + "github.com/TBD54566975/ftl/backend/common/model" +) + +var _ RunnerScaling = (*K8sScaling)(nil) + +type K8sScaling struct { +} + +func NewK8sScaling() *K8sScaling { + return &K8sScaling{} +} + +func (k *K8sScaling) SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error { + return nil +} diff --git a/backend/controller/scaling/local_scaling.go b/backend/controller/scaling/local_scaling.go new file mode 100644 index 0000000000..e9a69feb4a --- /dev/null +++ b/backend/controller/scaling/local_scaling.go @@ -0,0 +1,130 @@ +package scaling + +import ( + "context" + "encoding/binary" + "fmt" + "net/url" + "os" + "path/filepath" + "sync" + + "github.com/alecthomas/errors" + "github.com/alecthomas/kong" + + "github.com/TBD54566975/ftl/backend/common/bind" + "github.com/TBD54566975/ftl/backend/common/log" + "github.com/TBD54566975/ftl/backend/common/model" + "github.com/TBD54566975/ftl/backend/runner" +) + +var _ RunnerScaling = (*LocalScaling)(nil) + +type LocalScaling struct { + lock sync.Mutex + cacheDir string + runners map[model.RunnerKey]context.CancelFunc + + portAllocator *bind.BindAllocator + controllerAddresses []*url.URL +} + +func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL) (*LocalScaling, error) { + cacheDir, err := os.UserCacheDir() + if err != nil { + return nil, errors.WithStack(err) + } + return &LocalScaling{ + lock: sync.Mutex{}, + cacheDir: cacheDir, + runners: map[model.RunnerKey]context.CancelFunc{}, + portAllocator: portAllocator, + controllerAddresses: controllerAddresses, + }, nil +} + +func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error { + l.lock.Lock() + defer l.lock.Unlock() + + logger := log.FromContext(ctx) + + replicasToAdd := replicas - len(l.runners) + + if replicasToAdd <= 0 { + replicasToRemove := -replicasToAdd + + for i := 0; i < replicasToRemove; i++ { + if len(idleRunners) == 0 { + return nil + } + runnerToRemove := idleRunners[len(idleRunners)-1] + idleRunners = idleRunners[:len(idleRunners)-1] + + err := l.remove(ctx, runnerToRemove) + if err != nil { + return errors.WithStack(err) + } + } + + return nil + } + + logger.Infof("Adding %d replicas", replicasToAdd) + for i := 0; i < replicasToAdd; i++ { + i := i + + controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)] + config := runner.Config{ + Bind: l.portAllocator.Next(), + ControllerEndpoint: controllerEndpoint, + } + + name := fmt.Sprintf("runner%d", i) + if err := kong.ApplyDefaults(&config, kong.Vars{ + "deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", name, "deployments"), + "language": "go,kotlin", + }); err != nil { + return errors.WithStack(err) + } + + // Create a readable ULID for the runner. + var ulid [16]byte + binary.BigEndian.PutUint32(ulid[10:], uint32(len(l.runners)+1)) + ulidStr := fmt.Sprintf("%025X", ulid) + err := config.Key.Scan(ulidStr) + if err != nil { + return errors.WithStack(err) + } + + runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name)) + + runnerCtx, cancel := context.WithCancel(runnerCtx) + l.runners[config.Key] = cancel + + go func() { + logger.Infof("Starting runner: %s", config.Key) + err := runner.Start(runnerCtx, config) + if err != nil { + logger.Errorf(err, "Error starting runner: %s", err) + } + }() + } + + return nil +} + +func (l *LocalScaling) remove(ctx context.Context, runner model.RunnerKey) error { + log := log.FromContext(ctx) + log.Infof("Removing runner: %s", runner) + + cancel, ok := l.runners[runner] + if !ok { + return errors.Errorf("runner %s not found", runner) + } + + cancel() + delete(l.runners, runner) + + return nil +} diff --git a/backend/controller/scaling/scaling.go b/backend/controller/scaling/scaling.go new file mode 100644 index 0000000000..97c5ea4094 --- /dev/null +++ b/backend/controller/scaling/scaling.go @@ -0,0 +1,11 @@ +package scaling + +import ( + "context" + + "github.com/TBD54566975/ftl/backend/common/model" +) + +type RunnerScaling interface { + SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error +} diff --git a/cmd/ftl-controller/main.go b/cmd/ftl-controller/main.go index 922ea86d8c..20e302104e 100644 --- a/cmd/ftl-controller/main.go +++ b/cmd/ftl-controller/main.go @@ -13,6 +13,7 @@ import ( "github.com/TBD54566975/ftl/backend/common/log" "github.com/TBD54566975/ftl/backend/common/observability" "github.com/TBD54566975/ftl/backend/controller" + "github.com/TBD54566975/ftl/backend/controller/scaling" ) var version = "dev" @@ -38,6 +39,7 @@ func main() { ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, cli.LogConfig)) err = observability.Init(ctx, "ftl-controller", version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") - err = controller.Start(ctx, cli.ControllerConfig) + + err = controller.Start(ctx, cli.ControllerConfig, scaling.NewK8sScaling()) kctx.FatalIfErrorf(err) } diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index d094be8e23..fbd8f4d4a0 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -2,12 +2,8 @@ package main import ( "context" - "encoding/binary" "fmt" "net/url" - "os" - "path/filepath" - "strconv" "strings" "time" @@ -15,16 +11,17 @@ import ( "github.com/alecthomas/kong" "golang.org/x/sync/errgroup" + "github.com/TBD54566975/ftl/backend/common/bind" "github.com/TBD54566975/ftl/backend/common/exec" "github.com/TBD54566975/ftl/backend/common/log" "github.com/TBD54566975/ftl/backend/controller" - "github.com/TBD54566975/ftl/backend/runner" + "github.com/TBD54566975/ftl/backend/controller/scaling" ) type serveCmd struct { Bind *url.URL `help:"Starting endpoint to bind to and advertise to. Each controller and runner will increment the port by 1" default:"http://localhost:8892"` Controllers int `short:"c" help:"Number of controllers to start." default:"1"` - Runners int `short:"r" help:"Number of runners to start." default:"10"` + Runners int `short:"r" help:"Number of runners to start." default:"0"` } const ftlContainerName = "ftl-db" @@ -41,14 +38,25 @@ func (s *serveCmd) Run(ctx context.Context) error { wg, ctx := errgroup.WithContext(ctx) + portAllocator, err := bind.NewBindAllocator(s.Bind) + if err != nil { + return errors.WithStack(err) + } controllerAddresses := make([]*url.URL, 0, s.Controllers) - nextBind := s.Bind + + for i := 0; i < s.Controllers; i++ { + controllerAddresses = append(controllerAddresses, portAllocator.Next()) + } + + runnerScaling, err := scaling.NewLocalScaling(portAllocator, controllerAddresses) + if err != nil { + return errors.WithStack(err) + } for i := 0; i < s.Controllers; i++ { i := i - controllerAddresses = append(controllerAddresses, nextBind) config := controller.Config{ - Bind: nextBind, + Bind: controllerAddresses[i], DSN: dsn, } if err := kong.ApplyDefaults(&config); err != nil { @@ -59,58 +67,15 @@ func (s *serveCmd) Run(ctx context.Context) error { controllerCtx := log.ContextWithLogger(ctx, logger.Scope(scope)) wg.Go(func() error { - return errors.Wrapf(controller.Start(controllerCtx, config), "controller%d failed", i) + return errors.Wrapf(controller.Start(controllerCtx, config, runnerScaling), "controller%d failed", i) }) - - var err error - nextBind, err = incrementPort(nextBind) - if err != nil { - return errors.WithStack(err) - } } - cacheDir, err := os.UserCacheDir() + err = runnerScaling.SetReplicas(ctx, s.Runners, nil) if err != nil { return errors.WithStack(err) } - for i := 0; i < s.Runners; i++ { - i := i - controllerEndpoint := controllerAddresses[i%len(controllerAddresses)] - config := runner.Config{ - Bind: nextBind, - ControllerEndpoint: controllerEndpoint, - } - - name := fmt.Sprintf("runner%d", i) - if err := kong.ApplyDefaults(&config, kong.Vars{ - "deploymentdir": filepath.Join(cacheDir, "ftl-runner", name, "deployments"), - "language": "go,kotlin", - }); err != nil { - return errors.WithStack(err) - } - - // Create a readable ULID for the runner. - var ulid [16]byte - binary.BigEndian.PutUint32(ulid[10:], uint32(i)) - ulidStr := fmt.Sprintf("%025X", ulid) - err := config.Key.Scan(ulidStr) - if err != nil { - return errors.WithStack(err) - } - - runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name)) - - wg.Go(func() error { - return errors.Wrapf(runner.Start(runnerCtx, config), "runner%d failed", i) - }) - - nextBind, err = incrementPort(nextBind) - if err != nil { - return errors.WithStack(err) - } - } - if err := wg.Wait(); err != nil { return errors.WithStack(err) } @@ -182,18 +147,6 @@ func setupDB(ctx context.Context) (string, error) { return dsn, nil } -func incrementPort(baseURL *url.URL) (*url.URL, error) { - newURL := *baseURL - - newPort, err := strconv.Atoi(newURL.Port()) - if err != nil { - return nil, errors.WithStack(err) - } - - newURL.Host = fmt.Sprintf("%s:%d", baseURL.Hostname(), newPort+1) - return &newURL, nil -} - func pollContainerHealth(ctx context.Context, containerName string, timeout time.Duration) error { logger := log.FromContext(ctx) logger.Infof("Waiting for %s to be healthy", containerName)