Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
cleaning the implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
leoporoli committed Oct 4, 2024
1 parent 17cab3b commit 032b753
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 77 deletions.
59 changes: 6 additions & 53 deletions kontrol-service/engine/flow/dev_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,11 @@ func applyPatch(
}
externalServices = lo.Uniq(externalServices)

// TODO SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID
// SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID
// handle external service plugins on this service
logrus.Infof("Checking if this service has any external services...")
for _, plugin := range targetService.StatefulPlugins {

// TODO this is adding both kind of plugins stateful and external
alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName]
if ok {
pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService.ServiceID)
Expand All @@ -214,11 +213,6 @@ func applyPatch(
return fmt.Errorf("external service specified by plugin '%v' was not found in base topology", plugin.ServiceName)
}

//err = applyExternalServicePlugin(pluginRunner, targetService, externalService, plugin, pluginIdx, flowID)
//if err != nil {
// return stacktrace.Propagate(err, "An error occurred creating external servie plugin for external service '%v' depended on by '%v'", externalService.ServiceID, targetService.ServiceID)
//}

err = topologyRef.MoveServiceToVersion(externalService, flowID)
if err != nil {
return err
Expand All @@ -234,7 +228,7 @@ func applyPatch(
return err
}

// TODO SECTION 3 - handle stateful services
// SECTION 3 - handle stateful services
for serviceIdx, service := range topologyRef.Services {
if lo.Contains(statefulServices, service) {
logrus.Debugf("applying stateful plugins on service: %s", service.ServiceID)
Expand All @@ -256,22 +250,13 @@ func applyPatch(
continue
}

// TODO this is adding both kind of plugins stateful and external
alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName]
if ok {
pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, modifiedService.ServiceID)
} else {
pluginServices[plugin.ServiceName] = []string{modifiedService.ServiceID}
}
pluginServicesMap[plugin.ServiceName] = plugin

//logrus.Infof("Applying plugin %s for service %s with flow id %s", plugin.Name, modifiedService.ServiceID, flowID)
//pluginId := plugins.GetPluginId(flowID, modifiedService.ServiceID, pluginIdx)
//spec, _, err := pluginRunner.CreateFlow(plugin.Name, *modifiedService.ServiceSpec, *resultSpec, pluginId, plugin.Args)
//if err != nil {
// return fmt.Errorf("error creating flow for service %s: %v", modifiedService.ServiceID, err)
//}
//resultSpec = &spec
}

// Update service with final deployment spec
Expand All @@ -298,7 +283,7 @@ func applyPatch(
}
}

// TODO SECTION 3 - handle external services that are not target service dependencies
// SECTION 4 - handle external services that are not target service dependencies
// if the service is an external service of the target service, it was already handled above
if lo.Contains(externalServices, service) && !lo.Contains(alreadyHandledExternalServices, service.ServiceID) {
// assume there's only one parent service for now but eventually we'll likely need to account for multiple parents to external service
Expand Down Expand Up @@ -326,21 +311,13 @@ func applyPatch(
return stacktrace.NewError("parent service '%v' does not have a workload spec", targetService.ServiceID)
}

// TODO this is adding both kind of plugins stateful and external
alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName]
if ok {
pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, parentService.ServiceID)
} else {
pluginServices[plugin.ServiceName] = []string{parentService.ServiceID}
}
pluginServicesMap[plugin.ServiceName] = plugin

//if plugin.ServiceName == service.ServiceID {
//err := applyExternalServicePlugin(pluginRunner, parentService, service, plugin, pluginIdx, flowID)
//if err != nil {
//return stacktrace.Propagate(err, "error creating flow for external service '%s'", service.ServiceID)
//}
//}
}

// add a flow version of the external service to the plugin
Expand All @@ -358,7 +335,7 @@ func applyPatch(
}
}

// Execute plugins and update the services deployment specs with the plugin's modifications
// SECTION 5 - Execute plugins and update the services deployment specs with the plugin's modifications
for pluginServiceName, serviceIds := range pluginServices {
var servicesServiceSpecs []corev1.ServiceSpec
var servicesWorkloadSpecs []*kardinal.WorkloadSpec
Expand All @@ -383,7 +360,7 @@ func applyPatch(
servicesToUpdate = append(servicesToUpdate, service)
}

pluginId := plugins.GetPluginId3(plugin.ServiceName, flowID)
pluginId := plugins.GetPluginId(plugin.ServiceName, flowID)
logrus.Infof("Calling plugin '%v'...", pluginId)

servicesModifiedWorkloadSpecs, _, err := pluginRunner.CreateFlow(plugin.Name, servicesServiceSpecs, servicesWorkloadSpecs, pluginId, plugin.Args)
Expand All @@ -409,30 +386,6 @@ func applyPatch(
return nil
}

// TODO: have this handle stateful service plugins
//func applyExternalServicePlugin(
//pluginRunner *plugins.PluginRunner,
//dependentService *resolved.Service,
//externalService *resolved.Service,
//externalServicePlugin *resolved.StatefulPlugin,
//pluginIdx int,
//flowId string,
//) error {
//if externalServicePlugin.Type != "external" {
//return nil
//}

//logrus.Infof("Calling external service '%v' plugin with parent service '%v'...", externalService.ServiceID, dependentService.ServiceID)
//pluginId := plugins.GetPluginId(flowId, dependentService.ServiceID, pluginIdx)
//spec, _, err := pluginRunner.CreateFlow(externalServicePlugin.Name, *dependentService.ServiceSpec, *dependentService.DeploymentSpec, pluginId, externalServicePlugin.Args)
//if err != nil {
//return stacktrace.Propagate(err, "error creating flow for external service '%s'", externalService.ServiceID)
//}

//dependentService.DeploymentSpec = &spec
//return nil
//}

func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTopology, flowId string) error {
pluginsToDeleteFromThisFlow := map[string]string{}

Expand All @@ -442,7 +395,7 @@ func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTop
continue
}
for _, plugin := range service.StatefulPlugins {
pluginId := plugins.GetPluginId3(plugin.ServiceName, flowId)
pluginId := plugins.GetPluginId(plugin.ServiceName, flowId)
pluginsToDeleteFromThisFlow[pluginId] = plugin.Name
}
}
Expand Down
1 change: 0 additions & 1 deletion kontrol-service/plugins/git_plugin_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func (gpp *GitPluginProviderImpl) PullGitHubPlugin(repoPath, repoUrl string) err
return fmt.Errorf("git clone failed: %v\nOutput: %s", err, output)
}
} else {
return nil // TODO remove this line, it's only for testing purpose
// If the repository already exists, pull the latest changes
cmd := exec.Command("git", "-C", repoPath, "pull")
if output, err := cmd.CombinedOutput(); err != nil {
Expand Down
27 changes: 4 additions & 23 deletions kontrol-service/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@ import (
)

const (
// <flow id>-<service id>-<plugin idx>
pluginIdFmtStr = "%s-%s-%d"
// TODO use this last one and remove the previous one
// <plugin.service_name>-<flow id>-<service1 id>,<service2 id>,<service3 id>
pluginIdFmtStr2 = "%s-%s-%s"
// TODO use this last one and remove the previous one
// <plugin.service_name>-<flow id>
pluginIdFmtStr3 = "%s-%s"
pluginIdFmtStr = "%s-%s"
)

type PluginRunner struct {
Expand Down Expand Up @@ -63,7 +57,6 @@ func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpecs []corev1.Servi
}
serviceSpecsJSONStr := base64.StdEncoding.EncodeToString(serviceSpecsJSON)


podSpecsJSON, err := json.Marshal(podSpecs)
if err != nil {
return nil, "", fmt.Errorf("failed to marshal pod specs: %v", err)
Expand Down Expand Up @@ -95,7 +88,7 @@ func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpecs []corev1.Servi
if numWorkloadSpecs != numNewPodSpecs {
return nil, "", fmt.Errorf("expected to receive '%d' modified pod specs from plugin '%s' execution result but '%d' were received instead, this is a bug in Kardinal", numWorkloadSpecs, flowUuid, numNewPodSpecs)
}
for newPodSpecIdx, newPodSpec := range newPodSpecs{
for newPodSpecIdx, newPodSpec := range newPodSpecs {
workloadSpecs[newPodSpecIdx].UpdateTemplateSpec(newPodSpec)
}
}
Expand Down Expand Up @@ -145,20 +138,8 @@ func (pr *PluginRunner) DeleteFlow(pluginUrl, flowUuid string) error {
return nil
}

// TODO remove this after the DeleteDevFlow refactor
func GetPluginId(flowId, serviceId string, pluginIdx int) string {
return fmt.Sprintf(pluginIdFmtStr, flowId, serviceId, pluginIdx)
}

// TODO rename it to the original name
func GetPluginId2(pluginServiceName string, flowId string, serviceIds []string) string {
serviceIdsStr := strings.Join(serviceIds, ",")
return fmt.Sprintf(pluginIdFmtStr2, pluginServiceName, flowId, serviceIdsStr)
}

// TODO rename it to the original name
func GetPluginId3(pluginServiceName string, flowId string) string {
return fmt.Sprintf(pluginIdFmtStr3, pluginServiceName, flowId)
func GetPluginId(pluginServiceName string, flowId string) string {
return fmt.Sprintf(pluginIdFmtStr, pluginServiceName, flowId)
}

func (pr *PluginRunner) getConfigForFlow(flowUuid string) (string, error) {
Expand Down

0 comments on commit 032b753

Please sign in to comment.