Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: switch PullSchemaResponse.deployment_key to optional #3533

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
moduleRemoved := mostRecentDeploymentByModule[name] == deletion.String()
response = &ftlv1.PullSchemaResponse{
ModuleName: name,
DeploymentKey: deletion.String(),
DeploymentKey: proto.String(deletion.String()),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED,
ModuleRemoved: moduleRemoved,
}
Expand Down Expand Up @@ -1571,7 +1571,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
}
response = &ftlv1.PullSchemaResponse{
ModuleName: moduleSchema.Name,
DeploymentKey: message.Key.String(),
DeploymentKey: proto.String(message.Key.String()),
Schema: moduleSchema,
ChangeType: changeType,
ModuleRemoved: moduleRemoved,
Expand All @@ -1581,7 +1581,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
mostRecentDeploymentByModule[message.Schema.Name] = message.Key.String()
response = &ftlv1.PullSchemaResponse{
ModuleName: moduleSchema.Name,
DeploymentKey: message.Key.String(),
DeploymentKey: proto.String(message.Key.String()),
Schema: moduleSchema,
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED,
More: initialCount > 1,
Expand Down
25 changes: 15 additions & 10 deletions backend/controller/scaling/k8sscaling/deployment_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)

const thisDeploymentName = "ftl-controller"
Expand All @@ -54,7 +55,7 @@ type DeploymentProvisioner struct {
MyDeploymentName string
Namespace string
// Map of known deployments
KnownDeployments *xsync.Map
KnownDeployments *xsync.MapOf[string, bool]
FTLEndpoint string
IstioSecurity optional.Option[istioclient.Clientset]
}
Expand Down Expand Up @@ -106,22 +107,26 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
if !msg.More {
defer r.deleteMissingDeployments(ctx)
}
if msg.DeploymentKey == "" {
if msg.DeploymentKey == nil {
// Builtins don't have deployments
return nil
}
logger := log.FromContext(ctx)
logger = logger.Module(msg.ModuleName)
ctx = log.ContextWithLogger(ctx, logger)
logger.Debugf("Handling schema change for %s", msg.DeploymentKey)
deploymentKey, err := model.ParseDeploymentKey(msg.GetDeploymentKey())
if err != nil {
return fmt.Errorf("failed to parse deployment key %s: %w", msg.GetDeploymentKey(), err)
}
logger.Debugf("Handling schema change for %s", deploymentKey)
deploymentClient := r.Client.AppsV1().Deployments(r.Namespace)
deployment, err := deploymentClient.Get(ctx, msg.DeploymentKey, v1.GetOptions{})
deployment, err := deploymentClient.Get(ctx, deploymentKey.String(), v1.GetOptions{})
deploymentExists := true
if err != nil {
if errors.IsNotFound(err) {
deploymentExists = false
} else {
return fmt.Errorf("failed to get deployment %s: %w", msg.DeploymentKey, err)
return fmt.Errorf("failed to get deployment %s: %w", deploymentKey, err)
}
}

Expand All @@ -131,12 +136,12 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
// Note that a change is now currently usually and add and a delete
// As it should really be called a module changed, not a deployment changed
// This will need to be fixed as part of the support for rolling deployments
r.KnownDeployments.Store(msg.DeploymentKey, true)
r.KnownDeployments.Store(deploymentKey.String(), true)
if deploymentExists {
logger.Debugf("Updating deployment %s", msg.DeploymentKey)
logger.Debugf("Updating deployment %s", deploymentKey)
return r.handleExistingDeployment(ctx, deployment, msg.Schema)
} else {
return r.handleNewDeployment(ctx, msg.Schema, msg.DeploymentKey)
return r.handleNewDeployment(ctx, msg.Schema, deploymentKey.String())
}
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
if deploymentExists {
Expand All @@ -145,9 +150,9 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
// Nasty hack, we want all the controllers to have updated their route tables before we kill the runner
// so we add a slight delay here
time.Sleep(time.Second * 10)
r.KnownDeployments.Delete(msg.DeploymentKey)
r.KnownDeployments.Delete(deploymentKey.String())
logger.Debugf("Deleting service %s", msg.ModuleName)
err = r.Client.CoreV1().Services(r.Namespace).Delete(ctx, msg.DeploymentKey, v1.DeleteOptions{})
err = r.Client.CoreV1().Services(r.Namespace).Delete(ctx, deploymentKey.String(), v1.DeleteOptions{})
if err != nil {
if !errors.IsNotFound(err) {
logger.Errorf(err, "Failed to delete service %s", msg.ModuleName)
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (k k8sScaling) Start(ctx context.Context, controller url.URL, leaser leases
deploymentReconciler := &DeploymentProvisioner{
Client: clientset,
Namespace: namespace,
KnownDeployments: xsync.NewMap(),
KnownDeployments: xsync.NewMapOf[string, bool](),
FTLEndpoint: controller.String(),
IstioSecurity: optional.Ptr(sec),
}
Expand Down
31 changes: 18 additions & 13 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type localScaling struct {
}

type devModeRunner struct {
uri url.URL
deploymentKey string
uri url.URL
// Set to None under mysterious circumstances...
deploymentKey optional.Option[model.DeploymentKey]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stuartwdouglas I couldn't quite grok why this was being set to ""

debugPort int
}

Expand Down Expand Up @@ -113,7 +114,7 @@ type deploymentInfo struct {
runner optional.Option[runnerInfo]
module string
replicas int32
key string
key model.DeploymentKey
language string
exits int
}
Expand Down Expand Up @@ -149,32 +150,36 @@ func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*u
}

func (l *localScaling) handleSchemaChange(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
if msg.DeploymentKey == "" {
if msg.DeploymentKey == nil {
// Builtins don't have deployments
return nil
}
deploymentKey, err := model.ParseDeploymentKey(msg.GetDeploymentKey())
if err != nil {
return fmt.Errorf("failed to parse deployment key: %w", err)
}
l.lock.Lock()
defer l.lock.Unlock()
logger := log.FromContext(ctx).Scope("localScaling").Module(msg.ModuleName)
ctx = log.ContextWithLogger(ctx, logger)
logger.Debugf("Handling schema change for %s", msg.DeploymentKey)
logger.Debugf("Handling schema change for %s", deploymentKey)
moduleDeployments := l.runners[msg.ModuleName]
if moduleDeployments == nil {
moduleDeployments = map[string]*deploymentInfo{}
l.runners[msg.ModuleName] = moduleDeployments
}
deploymentRunners := moduleDeployments[msg.DeploymentKey]
deploymentRunners := moduleDeployments[deploymentKey.String()]
if deploymentRunners == nil {
deploymentRunners = &deploymentInfo{runner: optional.None[runnerInfo](), key: msg.DeploymentKey, module: msg.ModuleName, language: msg.Schema.Runtime.Language}
moduleDeployments[msg.DeploymentKey] = deploymentRunners
deploymentRunners = &deploymentInfo{runner: optional.None[runnerInfo](), key: deploymentKey, module: msg.ModuleName, language: msg.Schema.Runtime.Language}
moduleDeployments[deploymentKey.String()] = deploymentRunners
}

switch msg.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
deploymentRunners.replicas = msg.Schema.Runtime.MinReplicas
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
deploymentRunners.replicas = 0
delete(moduleDeployments, msg.DeploymentKey)
delete(moduleDeployments, deploymentKey.String())
}
return l.reconcileRunners(ctx, deploymentRunners)
}
Expand Down Expand Up @@ -203,7 +208,7 @@ func (l *localScaling) reconcileRunners(ctx context.Context, deploymentRunners *
return nil
}

func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, info *deploymentInfo) error {
func (l *localScaling) startRunner(ctx context.Context, deploymentKey model.DeploymentKey, info *deploymentInfo) error {
select {
case <-ctx.Done():
// In some cases this gets called with an expired context, generally after the lease is released
Expand All @@ -217,11 +222,11 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
debugPort := 0
if devEndpoint != nil {
devURI = optional.Some(devEndpoint.uri)
if devEndpoint.deploymentKey == deploymentKey {
if devKey, ok := devEndpoint.deploymentKey.Get(); ok && devKey.Equal(deploymentKey) {
// Already running, don't start another
return nil
}
devEndpoint.deploymentKey = deploymentKey
devEndpoint.deploymentKey = optional.Some(deploymentKey)
debugPort = devEndpoint.debugPort
} else if ide, ok := l.ideSupport.Get(); ok {
var debug *localdebug.DebugInfo
Expand Down Expand Up @@ -282,7 +287,7 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
l.lock.Lock()
defer l.lock.Unlock()
if devEndpoint != nil {
devEndpoint.deploymentKey = ""
devEndpoint.deploymentKey = optional.None[model.DeploymentKey]()
}
// Don't count context.Canceled as an a restart error
if err != nil && !errors.Is(err, context.Canceled) {
Expand Down
2 changes: 1 addition & 1 deletion backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, resp *ft
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if !resp.ModuleRemoved {
logger.Debugf("Not removing cron jobs for %s as module is still present", resp.DeploymentKey)
logger.Debugf("Not removing cron jobs for %s as module is still present", resp.GetDeploymentKey())
return nil
}
logger.Debugf("Removing cron jobs for module %s", resp.ModuleName)
Expand Down
2 changes: 1 addition & 1 deletion backend/ingress/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if !resp.ModuleRemoved {
logger.Debugf("Not removing ingress for %s as it is not the current deployment", resp.DeploymentKey)
logger.Debugf("Not removing ingress for %s as it is not the current deployment", resp.GetDeploymentKey())
return nil
}
for i := range existing.Modules {
Expand Down
Loading
Loading