Skip to content

Commit

Permalink
feat: add auto scaling to ftl serve (#533)
Browse files Browse the repository at this point in the history
Fixes #498

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
wesbillman and github-actions[bot] authored Nov 2, 2023
1 parent 1fceeaa commit 38e509b
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 73 deletions.
51 changes: 51 additions & 0 deletions backend/common/bind/bind_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package bind

import (
"fmt"
"net"
"net/url"
"strconv"

"github.com/alecthomas/atomic"
"github.com/alecthomas/errors"
)

type BindAllocator struct {
baseURL *url.URL
port atomic.Int32
}

func NewBindAllocator(url *url.URL) (*BindAllocator, error) {
_, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return nil, errors.WithStack(err)
}

port, err := strconv.Atoi(portStr)
if err != nil {
return nil, errors.WithStack(err)
}

return &BindAllocator{
baseURL: url,
port: atomic.NewInt32(int32(port) - 1), //nolint:gosec
}, nil
}

func (b *BindAllocator) Next() *url.URL {
var l *net.TCPListener
var err error
for {
b.port.Add(1)
l, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(b.baseURL.Hostname()), Port: int(b.port.Load())})

if err != nil {
continue
}
_ = l.Close()

newURL := *b.baseURL
newURL.Host = net.JoinHostPort(b.baseURL.Hostname(), fmt.Sprintf("%d", b.port.Load()))
return &newURL
}
}
44 changes: 38 additions & 6 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/TBD54566975/ftl/backend/common/sha256"
"github.com/TBD54566975/ftl/backend/common/slices"
"github.com/TBD54566975/ftl/backend/controller/internal/dal"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/console"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -87,7 +88,7 @@ func Start(ctx context.Context, config Config) error {
return errors.WithStack(err)
}

svc, err := New(ctx, dal, config)
svc, err := New(ctx, dal, config, runnerScaling)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -120,12 +121,13 @@ type Service struct {
// Map from endpoint to client.
clients *ttlcache.Cache[string, clients]

routesMu sync.RWMutex
routes map[string][]dal.Route
config Config
routesMu sync.RWMutex
routes map[string][]dal.Route
config Config
runnerScaling scaling.RunnerScaling
}

func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
key := config.Key
if config.Key.ULID() == (ulid.ULID{}) {
key = model.NewControllerKey()
Expand All @@ -138,6 +140,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
clients: ttlcache.New[string, clients](ttlcache.WithTTL[string, clients](time.Minute)),
routes: map[string][]dal.Route{},
config: config,
runnerScaling: runnerScaling,
}

go runWithRetries(ctx, time.Second*1, time.Second*2, svc.syncRoutes)
Expand All @@ -146,6 +149,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
go runWithRetries(ctx, config.RunnerTimeout, time.Second*10, svc.reapStaleRunners)
go runWithRetries(ctx, config.DeploymentReservationTimeout, time.Second*20, svc.releaseExpiredReservations)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileDeployments)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileRunners)
return svc, nil
}

Expand Down Expand Up @@ -792,6 +796,34 @@ func (s *Service) reconcileDeployments(ctx context.Context) error {
return nil
}

func (s *Service) reconcileRunners(ctx context.Context) error {
activeDeployments, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return errors.Wrap(err, "failed to get deployments needing reconciliation")
}

totalRunners := 0
for _, deployment := range activeDeployments {
totalRunners += deployment.MinReplicas
}

// It's possible that idles runners will get terminated here, but they will get recreated in the next
// reconciliation cycle.
idleRunners, err := s.dal.GetIdleRunners(ctx, 16, model.Labels{})
if err != nil {
return errors.WithStack(err)
}

idleRunnerKeys := slices.Map(idleRunners, func(r dal.Runner) model.RunnerKey { return r.Key })

err = s.runnerScaling.SetReplicas(ctx, totalRunners, idleRunnerKeys)
if err != nil {
return errors.WithStack(err)
}

return nil
}

func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentName) (bool, error) {
runners, err := s.dal.GetRunnersForDeployment(ctx, key)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions backend/controller/scaling/k8s_scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package scaling

import (
"context"

"github.com/TBD54566975/ftl/backend/common/model"
)

var _ RunnerScaling = (*K8sScaling)(nil)

type K8sScaling struct {
}

func NewK8sScaling() *K8sScaling {
return &K8sScaling{}
}

func (k *K8sScaling) SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error {
return nil
}
130 changes: 130 additions & 0 deletions backend/controller/scaling/local_scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package scaling

import (
"context"
"encoding/binary"
"fmt"
"net/url"
"os"
"path/filepath"
"sync"

"github.com/alecthomas/errors"
"github.com/alecthomas/kong"

"github.com/TBD54566975/ftl/backend/common/bind"
"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/runner"
)

var _ RunnerScaling = (*LocalScaling)(nil)

type LocalScaling struct {
lock sync.Mutex
cacheDir string
runners map[model.RunnerKey]context.CancelFunc

portAllocator *bind.BindAllocator
controllerAddresses []*url.URL
}

func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL) (*LocalScaling, error) {
cacheDir, err := os.UserCacheDir()
if err != nil {
return nil, errors.WithStack(err)
}
return &LocalScaling{
lock: sync.Mutex{},
cacheDir: cacheDir,
runners: map[model.RunnerKey]context.CancelFunc{},
portAllocator: portAllocator,
controllerAddresses: controllerAddresses,
}, nil
}

func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error {
l.lock.Lock()
defer l.lock.Unlock()

logger := log.FromContext(ctx)

replicasToAdd := replicas - len(l.runners)

if replicasToAdd <= 0 {
replicasToRemove := -replicasToAdd

for i := 0; i < replicasToRemove; i++ {
if len(idleRunners) == 0 {
return nil
}
runnerToRemove := idleRunners[len(idleRunners)-1]
idleRunners = idleRunners[:len(idleRunners)-1]

err := l.remove(ctx, runnerToRemove)
if err != nil {
return errors.WithStack(err)
}
}

return nil
}

logger.Infof("Adding %d replicas", replicasToAdd)
for i := 0; i < replicasToAdd; i++ {
i := i

controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)]
config := runner.Config{
Bind: l.portAllocator.Next(),
ControllerEndpoint: controllerEndpoint,
}

name := fmt.Sprintf("runner%d", i)
if err := kong.ApplyDefaults(&config, kong.Vars{
"deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", name, "deployments"),
"language": "go,kotlin",
}); err != nil {
return errors.WithStack(err)
}

// Create a readable ULID for the runner.
var ulid [16]byte
binary.BigEndian.PutUint32(ulid[10:], uint32(len(l.runners)+1))
ulidStr := fmt.Sprintf("%025X", ulid)
err := config.Key.Scan(ulidStr)
if err != nil {
return errors.WithStack(err)
}

runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name))

runnerCtx, cancel := context.WithCancel(runnerCtx)
l.runners[config.Key] = cancel

go func() {
logger.Infof("Starting runner: %s", config.Key)
err := runner.Start(runnerCtx, config)
if err != nil {
logger.Errorf(err, "Error starting runner: %s", err)
}
}()
}

return nil
}

func (l *LocalScaling) remove(ctx context.Context, runner model.RunnerKey) error {
log := log.FromContext(ctx)
log.Infof("Removing runner: %s", runner)

cancel, ok := l.runners[runner]
if !ok {
return errors.Errorf("runner %s not found", runner)
}

cancel()
delete(l.runners, runner)

return nil
}
11 changes: 11 additions & 0 deletions backend/controller/scaling/scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package scaling

import (
"context"

"github.com/TBD54566975/ftl/backend/common/model"
)

type RunnerScaling interface {
SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error
}
4 changes: 3 additions & 1 deletion cmd/ftl-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/observability"
"github.com/TBD54566975/ftl/backend/controller"
"github.com/TBD54566975/ftl/backend/controller/scaling"
)

var version = "dev"
Expand All @@ -38,6 +39,7 @@ func main() {
ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, cli.LogConfig))
err = observability.Init(ctx, "ftl-controller", version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")
err = controller.Start(ctx, cli.ControllerConfig)

err = controller.Start(ctx, cli.ControllerConfig, scaling.NewK8sScaling())
kctx.FatalIfErrorf(err)
}
Loading

0 comments on commit 38e509b

Please sign in to comment.