Skip to content

Commit

Permalink
fix: switch PullSchemaResponse.deployment_key to optional
Browse files Browse the repository at this point in the history
This reflects the fact that builtin modules will not have an associated
deployment key.

Also use strongly typed model.DeploymentKey where appropriate.
  • Loading branch information
alecthomas committed Nov 26, 2024
1 parent 2471264 commit e9ddb21
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 109 deletions.
6 changes: 3 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
moduleRemoved := mostRecentDeploymentByModule[name] == deletion.String()
response = &ftlv1.PullSchemaResponse{
ModuleName: name,
DeploymentKey: deletion.String(),
DeploymentKey: proto.String(deletion.String()),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED,
ModuleRemoved: moduleRemoved,
}
Expand Down Expand Up @@ -1571,7 +1571,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
}
response = &ftlv1.PullSchemaResponse{
ModuleName: moduleSchema.Name,
DeploymentKey: message.Key.String(),
DeploymentKey: proto.String(message.Key.String()),
Schema: moduleSchema,
ChangeType: changeType,
ModuleRemoved: moduleRemoved,
Expand All @@ -1581,7 +1581,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
mostRecentDeploymentByModule[message.Schema.Name] = message.Key.String()
response = &ftlv1.PullSchemaResponse{
ModuleName: moduleSchema.Name,
DeploymentKey: message.Key.String(),
DeploymentKey: proto.String(message.Key.String()),
Schema: moduleSchema,
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED,
More: initialCount > 1,
Expand Down
25 changes: 15 additions & 10 deletions backend/controller/scaling/k8sscaling/deployment_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)

const thisDeploymentName = "ftl-controller"
Expand All @@ -54,7 +55,7 @@ type DeploymentProvisioner struct {
MyDeploymentName string
Namespace string
// Map of known deployments
KnownDeployments *xsync.Map
KnownDeployments *xsync.MapOf[string, bool]
FTLEndpoint string
IstioSecurity optional.Option[istioclient.Clientset]
}
Expand Down Expand Up @@ -106,22 +107,26 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
if !msg.More {
defer r.deleteMissingDeployments(ctx)
}
if msg.DeploymentKey == "" {
if msg.DeploymentKey == nil {
// Builtins don't have deployments
return nil
}
logger := log.FromContext(ctx)
logger = logger.Module(msg.ModuleName)
ctx = log.ContextWithLogger(ctx, logger)
logger.Debugf("Handling schema change for %s", msg.DeploymentKey)
deploymentKey, err := model.ParseDeploymentKey(msg.GetDeploymentKey())
if err != nil {
return fmt.Errorf("failed to parse deployment key %s: %w", msg.GetDeploymentKey(), err)
}
logger.Debugf("Handling schema change for %s", deploymentKey)
deploymentClient := r.Client.AppsV1().Deployments(r.Namespace)
deployment, err := deploymentClient.Get(ctx, msg.DeploymentKey, v1.GetOptions{})
deployment, err := deploymentClient.Get(ctx, deploymentKey.String(), v1.GetOptions{})
deploymentExists := true
if err != nil {
if errors.IsNotFound(err) {
deploymentExists = false
} else {
return fmt.Errorf("failed to get deployment %s: %w", msg.DeploymentKey, err)
return fmt.Errorf("failed to get deployment %s: %w", deploymentKey, err)
}
}

Expand All @@ -131,12 +136,12 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
// Note that a change is now currently usually and add and a delete
// As it should really be called a module changed, not a deployment changed
// This will need to be fixed as part of the support for rolling deployments
r.KnownDeployments.Store(msg.DeploymentKey, true)
r.KnownDeployments.Store(deploymentKey.String(), true)
if deploymentExists {
logger.Debugf("Updating deployment %s", msg.DeploymentKey)
logger.Debugf("Updating deployment %s", deploymentKey)
return r.handleExistingDeployment(ctx, deployment, msg.Schema)
} else {
return r.handleNewDeployment(ctx, msg.Schema, msg.DeploymentKey)
return r.handleNewDeployment(ctx, msg.Schema, deploymentKey.String())
}
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
if deploymentExists {
Expand All @@ -145,9 +150,9 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
// Nasty hack, we want all the controllers to have updated their route tables before we kill the runner
// so we add a slight delay here
time.Sleep(time.Second * 10)
r.KnownDeployments.Delete(msg.DeploymentKey)
r.KnownDeployments.Delete(deploymentKey.String())
logger.Debugf("Deleting service %s", msg.ModuleName)
err = r.Client.CoreV1().Services(r.Namespace).Delete(ctx, msg.DeploymentKey, v1.DeleteOptions{})
err = r.Client.CoreV1().Services(r.Namespace).Delete(ctx, deploymentKey.String(), v1.DeleteOptions{})
if err != nil {
if !errors.IsNotFound(err) {
logger.Errorf(err, "Failed to delete service %s", msg.ModuleName)
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (k k8sScaling) Start(ctx context.Context, controller url.URL, leaser leases
deploymentReconciler := &DeploymentProvisioner{
Client: clientset,
Namespace: namespace,
KnownDeployments: xsync.NewMap(),
KnownDeployments: xsync.NewMapOf[string, bool](),
FTLEndpoint: controller.String(),
IstioSecurity: optional.Ptr(sec),
}
Expand Down
31 changes: 18 additions & 13 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type localScaling struct {
}

type devModeRunner struct {
uri url.URL
deploymentKey string
uri url.URL
// Set to None under mysterious circumstances...
deploymentKey optional.Option[model.DeploymentKey]
debugPort int
}

Expand Down Expand Up @@ -113,7 +114,7 @@ type deploymentInfo struct {
runner optional.Option[runnerInfo]
module string
replicas int32
key string
key model.DeploymentKey
language string
exits int
}
Expand Down Expand Up @@ -149,32 +150,36 @@ func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*u
}

func (l *localScaling) handleSchemaChange(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
if msg.DeploymentKey == "" {
if msg.DeploymentKey == nil {
// Builtins don't have deployments
return nil
}
deploymentKey, err := model.ParseDeploymentKey(msg.GetDeploymentKey())
if err != nil {
return fmt.Errorf("failed to parse deployment key: %w", err)
}
l.lock.Lock()
defer l.lock.Unlock()
logger := log.FromContext(ctx).Scope("localScaling").Module(msg.ModuleName)
ctx = log.ContextWithLogger(ctx, logger)
logger.Debugf("Handling schema change for %s", msg.DeploymentKey)
logger.Debugf("Handling schema change for %s", deploymentKey)
moduleDeployments := l.runners[msg.ModuleName]
if moduleDeployments == nil {
moduleDeployments = map[string]*deploymentInfo{}
l.runners[msg.ModuleName] = moduleDeployments
}
deploymentRunners := moduleDeployments[msg.DeploymentKey]
deploymentRunners := moduleDeployments[deploymentKey.String()]
if deploymentRunners == nil {
deploymentRunners = &deploymentInfo{runner: optional.None[runnerInfo](), key: msg.DeploymentKey, module: msg.ModuleName, language: msg.Schema.Runtime.Language}
moduleDeployments[msg.DeploymentKey] = deploymentRunners
deploymentRunners = &deploymentInfo{runner: optional.None[runnerInfo](), key: deploymentKey, module: msg.ModuleName, language: msg.Schema.Runtime.Language}
moduleDeployments[deploymentKey.String()] = deploymentRunners
}

switch msg.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
deploymentRunners.replicas = msg.Schema.Runtime.MinReplicas
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
deploymentRunners.replicas = 0
delete(moduleDeployments, msg.DeploymentKey)
delete(moduleDeployments, deploymentKey.String())
}
return l.reconcileRunners(ctx, deploymentRunners)
}
Expand Down Expand Up @@ -203,7 +208,7 @@ func (l *localScaling) reconcileRunners(ctx context.Context, deploymentRunners *
return nil
}

func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, info *deploymentInfo) error {
func (l *localScaling) startRunner(ctx context.Context, deploymentKey model.DeploymentKey, info *deploymentInfo) error {
select {
case <-ctx.Done():
// In some cases this gets called with an expired context, generally after the lease is released
Expand All @@ -217,11 +222,11 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
debugPort := 0
if devEndpoint != nil {
devURI = optional.Some(devEndpoint.uri)
if devEndpoint.deploymentKey == deploymentKey {
if devKey, ok := devEndpoint.deploymentKey.Get(); ok && devKey.Equal(deploymentKey) {
// Already running, don't start another
return nil
}
devEndpoint.deploymentKey = deploymentKey
devEndpoint.deploymentKey = optional.Some(deploymentKey)
debugPort = devEndpoint.debugPort
} else if ide, ok := l.ideSupport.Get(); ok {
var debug *localdebug.DebugInfo
Expand Down Expand Up @@ -282,7 +287,7 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
l.lock.Lock()
defer l.lock.Unlock()
if devEndpoint != nil {
devEndpoint.deploymentKey = ""
devEndpoint.deploymentKey = optional.None[model.DeploymentKey]()
}
// Don't count context.Canceled as an a restart error
if err != nil && !errors.Is(err, context.Canceled) {
Expand Down
2 changes: 1 addition & 1 deletion backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, resp *ft
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if !resp.ModuleRemoved {
logger.Debugf("Not removing cron jobs for %s as module is still present", resp.DeploymentKey)
logger.Debugf("Not removing cron jobs for %s as module is still present", resp.GetDeploymentKey())
return nil
}
logger.Debugf("Removing cron jobs for module %s", resp.ModuleName)
Expand Down
2 changes: 1 addition & 1 deletion backend/ingress/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if !resp.ModuleRemoved {
logger.Debugf("Not removing ingress for %s as it is not the current deployment", resp.DeploymentKey)
logger.Debugf("Not removing ingress for %s as it is not the current deployment", resp.GetDeploymentKey())
return nil
}
for i := range existing.Modules {
Expand Down
Loading

0 comments on commit e9ddb21

Please sign in to comment.