From f78a72ef06f8b39eda9065c868da4d0c98a9d533 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 25 Sep 2024 12:28:52 -0300 Subject: [PATCH 01/22] refactor generateClusterTopology to improve legibility --- kontrol-service/engine/docker.go | 363 ++++++++++++++++++------------- 1 file changed, 217 insertions(+), 146 deletions(-) diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index 8e1e07b..b3053f8 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -2,16 +2,14 @@ package engine import ( "fmt" - "strings" - + apitypes "github.com/kurtosis-tech/kardinal/libs/cli-kontrol-api/api/golang/types" "github.com/kurtosis-tech/stacktrace" "github.com/samber/lo" "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" - - apitypes "github.com/kurtosis-tech/kardinal/libs/cli-kontrol-api/api/golang/types" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" net "k8s.io/api/networking/v1" + "strings" "kardinal.kontrol-service/engine/flow" "kardinal.kontrol-service/plugins" @@ -67,192 +65,265 @@ func GenerateProdDevCluster(baseClusterTopologyMaybeWithTemplateOverrides *resol func generateClusterTopology(serviceConfigs []apitypes.ServiceConfig, ingressConfigs []apitypes.IngressConfig, namespace, version string) (*resolved.ClusterTopology, error) { clusterTopology := resolved.ClusterTopology{} - - clusterTopologyServices := []*resolved.Service{} - clusterTopologyIngress := []*resolved.Ingress{} - clusterTopologyServiceDependencies := []resolved.ServiceDependency{} clusterTopology.Namespace = namespace - alreadyFoundIngress := false - for _, ingressConfig := range ingressConfigs { - ingress := ingressConfig.Ingress - ingressAnnotations := ingress.GetObjectMeta().GetAnnotations() + clusterTopologyIngress := processIngressConfigs(ingressConfigs, version) - // Ingress? - isIngress, ok := ingressAnnotations["kardinal.dev.service/ingress"] - if ok && isIngress == "true" { - ingressObj := resolved.Ingress{ - ActiveFlowIDs: []string{version}, - IngressID: ingress.ObjectMeta.Name, - IngressSpec: &ingress.Spec, - } - _, ok := ingressAnnotations["kardinal.dev.service/host"] - if ok { - logrus.Debugf("Found hostname Kardinal annotation on Ingress '%v' but using Ingress Rules provided by k8s Ingress object instead.", ingress.Name) - } + clusterTopologyIngressModified, clusterTopologyServices, clusterTopologyServiceDependencies, err := processServiceConfigs(serviceConfigs, version, clusterTopologyIngress) + if err != nil { + return nil, stacktrace.NewError("an error occurred processing the service configs") + } - // A k8s ingress object should specify the Ingress rules so use those instead of creating one manually - for _, ingressRule := range ingress.Spec.Rules { - ingressObj.IngressRules = append(ingressObj.IngressRules, &ingressRule) - } + if len(clusterTopologyIngressModified) == 0 { + return nil, stacktrace.NewError("At least one service needs to be annotated as an ingress service") + } + clusterTopology.Ingresses = clusterTopologyIngressModified - clusterTopologyIngress = append(clusterTopologyIngress, &ingressObj) - alreadyFoundIngress = true - } + if len(clusterTopologyServices) == 0 { + return nil, stacktrace.NewError("At least one service is required in addition to the ingress service(s)") } + clusterTopology.Services = clusterTopologyServices + clusterTopology.ServiceDependencies = clusterTopologyServiceDependencies + + return &clusterTopology, nil +} + +func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version string, clusterTopologyIngress []*resolved.Ingress) ([]*resolved.Ingress, []*resolved.Service, []resolved.ServiceDependency, error) { + clusterTopologyServices := []*resolved.Service{} + clusterTopologyServiceDependencies := []resolved.ServiceDependency{} + externalServicesDependencies := []resolved.ServiceDependency{} + + type serviceWithDependenciesAnnotation struct { + service *resolved.Service + dependenciesAnnotation string + } + serviceWithDependencies := []*serviceWithDependenciesAnnotation{} for _, serviceConfig := range serviceConfigs { service := serviceConfig.Service - deployment := serviceConfig.Deployment serviceAnnotations := service.GetObjectMeta().GetAnnotations() - // Ingress? - isIngress, ok := serviceAnnotations["kardinal.dev.service/ingress"] - if ok && isIngress == "true" { - if !alreadyFoundIngress { - ingress := resolved.Ingress{ - ActiveFlowIDs: []string{version}, - IngressID: service.ObjectMeta.Name, - ServiceSpec: &service.Spec, - } - host, ok := serviceAnnotations["kardinal.dev.service/host"] - if ok { - ingress.IngressRules = []*net.IngressRule{ - { - Host: host, - }, - } - } - clusterTopologyIngress = append(clusterTopologyIngress, &ingress) + // 1- Ingress + ingressNotFoundYet := len(clusterTopologyIngress) == 0 + // find ingress from a service config only if it wasn't found before from the ingress configs + if ingressNotFoundYet && isIngres(serviceAnnotations) { + ingress, err := newClusterTopologyIngresFromServiceConfig(serviceConfig, version) + if err != nil { + return nil, nil, nil, stacktrace.Propagate(err, "An error occurred generating the cluster topology ingres from the service config '%s'", service.Name) } + clusterTopologyIngress = append(clusterTopologyIngress, ingress) + } + + if isIngres(serviceAnnotations) { // TODO: why this need to be a separated service? // Don't add ingress services to the list of resolved services continue } - // Service + // 2- Service logrus.Infof("Processing service: %v", service.GetObjectMeta().GetName()) - clusterTopologyService := resolved.Service{ - ServiceID: service.GetObjectMeta().GetName(), - Version: version, - ServiceSpec: &service.Spec, - DeploymentSpec: &deployment.Spec, - } - isStateful, ok := serviceAnnotations["kardinal.dev.service/stateful"] - if ok && isStateful == "true" { - clusterTopologyService.IsStateful = true - } - isExternal, ok := serviceAnnotations["kardinal.dev.service/external"] - if ok && isExternal == "true" { - clusterTopologyService.IsExternal = true - } + clusterTopologyService := newClusterTopologyServiceFromServiceConfig(serviceConfig, version) - isShared, ok := serviceAnnotations["kardinal.dev.service/shared"] - if ok && isShared == "true" { - clusterTopologyService.IsShared = true + // 3- Service plugins + serviceStatefulPlugins, externalServices, newExternalServicesDependencies, err := newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService) + if err != nil { + return nil, nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful plugins and external services from service config '%s'", service.Name) } + clusterTopologyService.StatefulPlugins = serviceStatefulPlugins + clusterTopologyServices = append(clusterTopologyServices, externalServices...) + externalServicesDependencies = append(externalServicesDependencies, newExternalServicesDependencies...) - // Service plugin? - sPlugins, ok := serviceAnnotations["kardinal.dev.service/plugins"] + // 4- Service dependencies (creates a list of services with dependencies) + dependencies, ok := serviceAnnotations["kardinal.dev.service/dependencies"] if ok { - var statefulPlugins []resolved.StatefulPlugin - err := yaml.Unmarshal([]byte(sPlugins), &statefulPlugins) - if err != nil { - return nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) - } - serviceStatefulPlugins := make([]*resolved.StatefulPlugin, len(statefulPlugins)) - for index := range statefulPlugins { - logrus.Infof("Voting App UI Plugin: %v", statefulPlugins[index].Name) - // TODO: consider giving external service plugins their own type, instead of using StatefulPlugins - // if this is an external service plugin, represent that service as a service in the cluster topology - plugin := statefulPlugins[index] - if plugin.Type == "external" { - logrus.Infof("Adding external service to topology..") - serviceName := plugin.ServiceName - logrus.Infof("plugin service name: %v", plugin.ServiceName) - if serviceName == "" { - serviceName = fmt.Sprintf("%v:%v", clusterTopologyService.ServiceID, "external") - } - externalService := resolved.Service{ - ServiceID: serviceName, - Version: version, - ServiceSpec: nil, // leave empty for now - DeploymentSpec: nil, // leave empty for now - IsExternal: true, - // external services can definitely be stateful but for now treat external and stateful services as mutually exclusive to make plugin logic easier to handle - IsStateful: false, - } - - clusterTopologyServices = append(clusterTopologyServices, &externalService) - - externalServiceDependency := resolved.ServiceDependency{ - Service: &clusterTopologyService, - DependsOnService: &externalService, - DependencyPort: nil, - } - clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, externalServiceDependency) - } - serviceStatefulPlugins[index] = &plugin - } - clusterTopologyService.StatefulPlugins = serviceStatefulPlugins + newServiceWithDependenciesAnnotation := &serviceWithDependenciesAnnotation{&clusterTopologyService, dependencies} + serviceWithDependencies = append(serviceWithDependencies, newServiceWithDependenciesAnnotation) } - clusterTopologyServices = append(clusterTopologyServices, &clusterTopologyService) } - if len(clusterTopologyIngress) == 0 { - return nil, stacktrace.NewError("At least one service needs to be annotated as an ingress service") - } - clusterTopology.Ingresses = clusterTopologyIngress + // Set the service dependencies in the clusterTopologyService + // first iterate on the service with dependencies list + for _, svcWithDependenciesAnnotation := range serviceWithDependencies { - if len(clusterTopologyServices) == 0 { - return nil, stacktrace.NewError("At least one service is required in addition to the ingress service(s)") - } - clusterTopology.Services = clusterTopologyServices + serviceAndPorts := strings.Split(svcWithDependenciesAnnotation.dependenciesAnnotation, ",") + for _, serviceAndPort := range serviceAndPorts { + serviceAndPortParts := strings.Split(serviceAndPort, ":") + depService, depServicePort, err := getServiceAndPortFromClusterTopologyServices(serviceAndPortParts[0], serviceAndPortParts[1], clusterTopologyServices) + if err != nil { + return nil, nil, nil, stacktrace.Propagate(err, "An error occurred finding the service dependency for service %s and port %s", serviceAndPortParts[0], serviceAndPortParts[1]) + } - for _, serviceConfig := range serviceConfigs { - service := serviceConfig.Service - serviceAnnotations := service.GetObjectMeta().GetAnnotations() + serviceDependency := resolved.ServiceDependency{ + Service: svcWithDependenciesAnnotation.service, + DependsOnService: depService, + DependencyPort: depServicePort, + } - if isServiceIngress(&clusterTopology, service) || alreadyFoundIngress { - logrus.Infof("Service %s is an ingress service, skipping dependency resolution", service.GetObjectMeta().GetName()) - continue + clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependency) } + } + // then add the external services dependencies + clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, externalServicesDependencies...) - clusterTopologyService, err := clusterTopology.GetService(service.GetObjectMeta().GetName()) + return clusterTopologyIngress, clusterTopologyServices, clusterTopologyServiceDependencies, nil +} + +func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string, clusterTopologyService *resolved.Service) ([]*resolved.StatefulPlugin, []*resolved.Service, []resolved.ServiceDependency, error) { + var serviceStatefulPlugins []*resolved.StatefulPlugin + externalServices := []*resolved.Service{} + externalServiceDependencies := []resolved.ServiceDependency{} + + service := serviceConfig.Service + serviceAnnotations := service.GetObjectMeta().GetAnnotations() + + sPluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"] + if ok { + var statefulPlugins []resolved.StatefulPlugin + err := yaml.Unmarshal([]byte(sPluginsAnnotation), &statefulPlugins) if err != nil { - logrus.Fatalf("An error occurred finding service %s in the list of services", service.GetObjectMeta().GetName()) - return nil, stacktrace.Propagate(err, "An error occurred finding service %s in the list of services", service.GetObjectMeta().GetName()) + return nil, nil, nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) } - - // Service dependencies? - deps, ok := serviceAnnotations["kardinal.dev.service/dependencies"] - if ok { - serviceAndPorts := strings.Split(deps, ",") - for _, serviceAndPort := range serviceAndPorts { - serviceAndPortParts := strings.Split(serviceAndPort, ":") - depService, depServicePort, err := clusterTopology.GetServiceAndPort(serviceAndPortParts[0], serviceAndPortParts[1]) - if err != nil { - return nil, stacktrace.Propagate(err, "An error occurred finding the service dependency for service %s and port %s", serviceAndPortParts[0], serviceAndPortParts[1]) + serviceStatefulPlugins = make([]*resolved.StatefulPlugin, len(statefulPlugins)) + + for index := range statefulPlugins { + // TODO: consider giving external service plugins their own type, instead of using StatefulPlugins + // if this is an external service plugin, represent that service as a service in the cluster topology + plugin := statefulPlugins[index] + if plugin.Type == "external" { + logrus.Infof("Adding external service to topology..") + serviceName := plugin.ServiceName + logrus.Infof("plugin service name: %v", plugin.ServiceName) + if serviceName == "" { + serviceID := service.GetObjectMeta().GetName() + serviceName = fmt.Sprintf("%v:%v", serviceID, "external") } + externalService := resolved.Service{ + ServiceID: serviceName, + Version: version, + ServiceSpec: nil, // leave empty for now + DeploymentSpec: nil, // leave empty for now + IsExternal: true, + // external services can definitely be stateful but for now treat external and stateful services as mutually exclusive to make plugin logic easier to handle + IsStateful: false, + } + + externalServices = append(externalServices, &externalService) - serviceDependency := resolved.ServiceDependency{ + externalServiceDependency := resolved.ServiceDependency{ Service: clusterTopologyService, - DependsOnService: depService, - DependencyPort: depServicePort, + DependsOnService: &externalService, + DependencyPort: nil, } + externalServiceDependencies = append(externalServiceDependencies, externalServiceDependency) + } + serviceStatefulPlugins[index] = &plugin + } + } + + return serviceStatefulPlugins, externalServices, externalServiceDependencies, nil +} - clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependency) +func newClusterTopologyServiceFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string) resolved.Service { + service := serviceConfig.Service + deployment := serviceConfig.Deployment + serviceAnnotations := service.GetObjectMeta().GetAnnotations() + + clusterTopologyService := resolved.Service{ + ServiceID: service.GetObjectMeta().GetName(), + Version: version, + ServiceSpec: &service.Spec, + DeploymentSpec: &deployment.Spec, + } + isStateful, ok := serviceAnnotations["kardinal.dev.service/stateful"] + if ok && isStateful == "true" { + clusterTopologyService.IsStateful = true + } + isExternal, ok := serviceAnnotations["kardinal.dev.service/external"] + if ok && isExternal == "true" { + clusterTopologyService.IsExternal = true + } + + isShared, ok := serviceAnnotations["kardinal.dev.service/shared"] + if ok && isShared == "true" { + clusterTopologyService.IsShared = true + } + return clusterTopologyService +} + +func getServiceAndPortFromClusterTopologyServices(serviceName string, servicePortName string, clusterTopologyServices []*resolved.Service) (*resolved.Service, *corev1.ServicePort, error) { + for _, service := range clusterTopologyServices { + if service.ServiceID == serviceName { + for _, port := range service.ServiceSpec.Ports { + if port.Name == servicePortName { + return service, &port, nil + } } } } - clusterTopology.ServiceDependencies = clusterTopologyServiceDependencies + return nil, nil, stacktrace.NewError("Service %s and Port %s not found in the list of services", serviceName, servicePortName) +} - return &clusterTopology, nil +func newClusterTopologyIngresFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string) (*resolved.Ingress, error) { + service := serviceConfig.Service + serviceAnnotations := service.GetObjectMeta().GetAnnotations() + if !isIngres(serviceAnnotations) { + return nil, stacktrace.NewError("Service %s is not an ingress service", service.GetObjectMeta().GetName()) + } + ingress := &resolved.Ingress{ + ActiveFlowIDs: []string{version}, + IngressID: service.ObjectMeta.Name, + ServiceSpec: &service.Spec, + } + host, ok := serviceAnnotations["kardinal.dev.service/host"] + if ok { + ingress.IngressRules = []*net.IngressRule{ + { + Host: host, + }, + } + } + return ingress, nil +} + +func isIngres(serviceAnnotations map[string]string) bool { + isIngress, ok := serviceAnnotations["kardinal.dev.service/ingress"] + return ok && isIngress == "true" +} + +func processIngressConfigs(ingressConfigs []apitypes.IngressConfig, version string) []*resolved.Ingress { + clusterTopologyIngress := []*resolved.Ingress{} + // First try to get it from the ingressConfigs + for _, ingressConfig := range ingressConfigs { + ingress := ingressConfig.Ingress + ingressAnnotations := ingress.GetObjectMeta().GetAnnotations() + + // Ingress? + isIngress, ok := ingressAnnotations["kardinal.dev.service/ingress"] + if ok && isIngress == "true" { + ingressObj := resolved.Ingress{ + ActiveFlowIDs: []string{version}, + IngressID: ingress.ObjectMeta.Name, + IngressSpec: &ingress.Spec, + } + _, ok := ingressAnnotations["kardinal.dev.service/host"] + if ok { + logrus.Debugf("Found hostname Kardinal annotation on Ingress '%v' but using Ingress Rules provided by k8s Ingress object instead.", ingress.Name) + } + + // A k8s ingress object should specify the Ingress rules so use those instead of creating one manually + for _, ingressRule := range ingress.Spec.Rules { + ingressObj.IngressRules = append(ingressObj.IngressRules, &ingressRule) + } + + clusterTopologyIngress = append(clusterTopologyIngress, &ingressObj) + } + } + return clusterTopologyIngress } -func isServiceIngress(clusterTopology *resolved.ClusterTopology, service v1.Service) bool { +func isServiceIngress(clusterTopology *resolved.ClusterTopology, service corev1.Service) bool { return lo.SomeBy(clusterTopology.Ingresses, func(item *resolved.Ingress) bool { return item.IngressID == service.GetObjectMeta().GetName() }) From 787205149d2e971f37dc59eba8f4e56e83eb0f47 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 25 Sep 2024 18:15:54 -0300 Subject: [PATCH 02/22] parsing services with kardinal.dev.service/plugin-definition annotation and seting those in the cluster topology services --- kontrol-service/engine/docker.go | 79 ++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index b3053f8..c72ec51 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -89,9 +89,11 @@ func generateClusterTopology(serviceConfigs []apitypes.ServiceConfig, ingressCon } func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version string, clusterTopologyIngress []*resolved.Ingress) ([]*resolved.Ingress, []*resolved.Service, []resolved.ServiceDependency, error) { + var err error clusterTopologyServices := []*resolved.Service{} clusterTopologyServiceDependencies := []resolved.ServiceDependency{} externalServicesDependencies := []resolved.ServiceDependency{} + availablePlugins := map[string]*resolved.StatefulPlugin{} type serviceWithDependenciesAnnotation struct { service *resolved.Service @@ -99,6 +101,16 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri } serviceWithDependencies := []*serviceWithDependenciesAnnotation{} + // First, iterate the services to get all the available plugins + for _, serviceConfig := range serviceConfigs { + // availablePlugins list contains both stateful and external plugins and, externalServices is a list of Kardinal services that are also linked with a plugin inside the availablePlugins list + availablePlugins, err = addAvailablePluginsFromServiceConfig(serviceConfig, availablePlugins) + if err != nil { + return nil, nil, nil, stacktrace.Propagate(err, "An error occurred while parsing plugin '%s'", serviceConfig.Service.GetName()) + } + } + + // Second, iterate the services to create the clusterTopology service with partial data (no dependencies set here) for _, serviceConfig := range serviceConfigs { service := serviceConfig.Service serviceAnnotations := service.GetObjectMeta().GetAnnotations() @@ -124,12 +136,13 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri logrus.Infof("Processing service: %v", service.GetObjectMeta().GetName()) clusterTopologyService := newClusterTopologyServiceFromServiceConfig(serviceConfig, version) - // 3- Service plugins - serviceStatefulPlugins, externalServices, newExternalServicesDependencies, err := newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService) + // 3- Plugins + // the servicePlugins list contains both stateful and external plugins and, externalServices is a list of Kardinal services that are also linked with a plugin inside the availablePlugins list + servicePlugins, externalServices, newExternalServicesDependencies, err := newServicePluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService, availablePlugins) if err != nil { - return nil, nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful plugins and external services from service config '%s'", service.Name) + return nil, nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful availablePlugins and external services from service config '%s'", service.Name) } - clusterTopologyService.StatefulPlugins = serviceStatefulPlugins + clusterTopologyService.StatefulPlugins = servicePlugins clusterTopologyServices = append(clusterTopologyServices, externalServices...) externalServicesDependencies = append(externalServicesDependencies, newExternalServicesDependencies...) @@ -142,8 +155,8 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri clusterTopologyServices = append(clusterTopologyServices, &clusterTopologyService) } - // Set the service dependencies in the clusterTopologyService - // first iterate on the service with dependencies list + // Third, set the service dependencies in the clusterTopologyService + // a) iterate on the service with dependencies list for _, svcWithDependenciesAnnotation := range serviceWithDependencies { serviceAndPorts := strings.Split(svcWithDependenciesAnnotation.dependenciesAnnotation, ",") @@ -163,33 +176,62 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependency) } } - // then add the external services dependencies + // b) add the external services dependencies clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, externalServicesDependencies...) return clusterTopologyIngress, clusterTopologyServices, clusterTopologyServiceDependencies, nil } -func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string, clusterTopologyService *resolved.Service) ([]*resolved.StatefulPlugin, []*resolved.Service, []resolved.ServiceDependency, error) { - var serviceStatefulPlugins []*resolved.StatefulPlugin - externalServices := []*resolved.Service{} - externalServiceDependencies := []resolved.ServiceDependency{} - +func addAvailablePluginsFromServiceConfig(serviceConfig apitypes.ServiceConfig, availablePlugins map[string]*resolved.StatefulPlugin) (map[string]*resolved.StatefulPlugin, error) { service := serviceConfig.Service serviceAnnotations := service.GetObjectMeta().GetAnnotations() - sPluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"] + pluginAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugin-definition"] if ok { var statefulPlugins []resolved.StatefulPlugin - err := yaml.Unmarshal([]byte(sPluginsAnnotation), &statefulPlugins) + err := yaml.Unmarshal([]byte(pluginAnnotation), &statefulPlugins) if err != nil { - return nil, nil, nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) + return nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) } - serviceStatefulPlugins = make([]*resolved.StatefulPlugin, len(statefulPlugins)) for index := range statefulPlugins { + plugin := statefulPlugins[index] + availablePlugins[plugin.Name] = &plugin + } + } + + return availablePlugins, nil +} + +func newServicePluginsAndExternalServicesFromServiceConfig( + serviceConfig apitypes.ServiceConfig, + version string, + clusterTopologyService *resolved.Service, + availablePlugins map[string]*resolved.StatefulPlugin, +) ( + []*resolved.StatefulPlugin, + []*resolved.Service, + []resolved.ServiceDependency, + error, +) { + servicePlugins := []*resolved.StatefulPlugin{} + externalServices := []*resolved.Service{} + externalServiceDependencies := []resolved.ServiceDependency{} + + service := serviceConfig.Service + serviceAnnotations := service.GetObjectMeta().GetAnnotations() + + sPluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"] + if ok { + svcPluginNames := strings.Split(sPluginsAnnotation, ",") + for _, svcPlugName := range svcPluginNames { + plugin, ok := availablePlugins[svcPlugName] + if !ok { + return nil, nil, nil, stacktrace.NewError("expected to find plugin with name %s but it is not available, make sure to add the resource for it in the manifest file", svcPlugName) + } + servicePlugins = append(servicePlugins, plugin) // TODO: consider giving external service plugins their own type, instead of using StatefulPlugins // if this is an external service plugin, represent that service as a service in the cluster topology - plugin := statefulPlugins[index] if plugin.Type == "external" { logrus.Infof("Adding external service to topology..") serviceName := plugin.ServiceName @@ -217,11 +259,10 @@ func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apityp } externalServiceDependencies = append(externalServiceDependencies, externalServiceDependency) } - serviceStatefulPlugins[index] = &plugin } } - return serviceStatefulPlugins, externalServices, externalServiceDependencies, nil + return servicePlugins, externalServices, externalServiceDependencies, nil } func newClusterTopologyServiceFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string) resolved.Service { From c55d162ef67539aac517f9a073bec38c26f6f395 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Thu, 26 Sep 2024 11:10:33 -0300 Subject: [PATCH 03/22] using plugin service name to identify them --- kontrol-service/engine/docker.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index c72ec51..203dcff 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -191,12 +191,16 @@ func addAvailablePluginsFromServiceConfig(serviceConfig apitypes.ServiceConfig, var statefulPlugins []resolved.StatefulPlugin err := yaml.Unmarshal([]byte(pluginAnnotation), &statefulPlugins) if err != nil { - return nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) + return nil, stacktrace.Propagate(err, "an error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) } for index := range statefulPlugins { plugin := statefulPlugins[index] - availablePlugins[plugin.Name] = &plugin + _, found := availablePlugins[plugin.ServiceName] + if found { + return nil, stacktrace.NewError("a plugin with service name '%s' already exists, the `plugin.servicename` value has to be unique", plugin.ServiceName) + } + availablePlugins[plugin.ServiceName] = &plugin } } @@ -221,13 +225,13 @@ func newServicePluginsAndExternalServicesFromServiceConfig( service := serviceConfig.Service serviceAnnotations := service.GetObjectMeta().GetAnnotations() - sPluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"] + pluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"] if ok { - svcPluginNames := strings.Split(sPluginsAnnotation, ",") - for _, svcPlugName := range svcPluginNames { - plugin, ok := availablePlugins[svcPlugName] + pluginsServiceName := strings.Split(pluginsAnnotation, ",") + for _, pluginSvcName := range pluginsServiceName { + plugin, ok := availablePlugins[pluginSvcName] if !ok { - return nil, nil, nil, stacktrace.NewError("expected to find plugin with name %s but it is not available, make sure to add the resource for it in the manifest file", svcPlugName) + return nil, nil, nil, stacktrace.NewError("expected to find plugin with service name %s but it is not available, make sure to add the resource for it in the manifest file", pluginSvcName) } servicePlugins = append(servicePlugins, plugin) // TODO: consider giving external service plugins their own type, instead of using StatefulPlugins From cd0881e0b25d5d5073e91a4191bf1be9f4c2835e Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Thu, 26 Sep 2024 16:41:55 -0300 Subject: [PATCH 04/22] small change --- kontrol-service/engine/docker.go | 10 +++++----- kontrol-service/engine/flow/dev_flow.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index d35ae61..3ddb579 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -149,7 +149,7 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri service *resolved.Service dependenciesAnnotation string } - serviceWithDependencies := []*serviceWithDependenciesAnnotation{} + servicesWithDependencies := []*serviceWithDependenciesAnnotation{} for _, serviceConfig := range serviceConfigs { service := serviceConfig.Service @@ -172,16 +172,16 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri dependencies, ok := serviceAnnotations["kardinal.dev.service/dependencies"] if ok { newServiceWithDependenciesAnnotation := &serviceWithDependenciesAnnotation{&clusterTopologyService, dependencies} - serviceWithDependencies = append(serviceWithDependencies, newServiceWithDependenciesAnnotation) + servicesWithDependencies = append(servicesWithDependencies, newServiceWithDependenciesAnnotation) } clusterTopologyServices = append(clusterTopologyServices, &clusterTopologyService) } // Set the service dependencies in the clusterTopologyService // first iterate on the service with dependencies list - for _, svcWithDependenciesAnnotation := range serviceWithDependencies { + for _, serviceWithDependencies := range servicesWithDependencies { - serviceAndPorts := strings.Split(svcWithDependenciesAnnotation.dependenciesAnnotation, ",") + serviceAndPorts := strings.Split(serviceWithDependencies.dependenciesAnnotation, ",") for _, serviceAndPort := range serviceAndPorts { serviceAndPortParts := strings.Split(serviceAndPort, ":") depService, depServicePort, err := getServiceAndPortFromClusterTopologyServices(serviceAndPortParts[0], serviceAndPortParts[1], clusterTopologyServices) @@ -190,7 +190,7 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri } serviceDependency := resolved.ServiceDependency{ - Service: svcWithDependenciesAnnotation.service, + Service: serviceWithDependencies.service, DependsOnService: depService, DependencyPort: depServicePort, } diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index a4ff802..a757cbe 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -190,7 +190,7 @@ func applyPatch( // find the existing external service and update it in the topology to get a new version externalService, err := topology.GetService(plugin.ServiceName) if err != nil { - return nil, fmt.Errorf("external service specified by plugin '%v' was not found in base topology.", plugin.ServiceName) + return nil, fmt.Errorf("external service specified by plugin '%v' was not found in base topology", plugin.ServiceName) } err = applyExternalServicePlugin(pluginRunner, targetService, externalService, plugin, pluginIdx, flowID) From e4b0a070c343d23476e98b1278edc6cac1692e6c Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Fri, 27 Sep 2024 11:20:45 -0300 Subject: [PATCH 05/22] adding comments in the dev flow --- kontrol-service/engine/flow/dev_flow.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index a757cbe..0d04588 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -181,6 +181,18 @@ func applyPatch( } externalServices = lo.Uniq(externalServices) + //pluginServices := map[*resolved.StatefulPlugin][]*resolved.Service{} + //for _, plugin := range targetService.StatefulPlugins { + // servicesWithPlugin := []*resolved.Service{targetService} + // alreadyServicesWithPlugin, ok := pluginServices[plugin] + // if ok { + // servicesWithPlugin = append(alreadyServicesWithPlugin, targetService) + // } + // pluginServices[plugin] = servicesWithPlugin + //} + + // TODO SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID + // handle external service plugins on this service logrus.Infof("Checking if this service has any external services...") for pluginIdx, plugin := range targetService.StatefulPlugins { @@ -205,6 +217,7 @@ func applyPatch( } } + // TODO SECTION 2 - Target service updates with new modifications modifiedTargetService := DeepCopyService(targetService) modifiedTargetService.DeploymentSpec = deploymentSpec modifiedTargetService.Version = flowID @@ -213,6 +226,7 @@ func applyPatch( return nil, err } + // TODO SECTION 3 - handle stateful services for serviceIdx, service := range topology.Services { if lo.Contains(statefulServices, service) { logrus.Debugf("applying stateful plugins on service: %s", service.ServiceID) @@ -221,7 +235,7 @@ func applyPatch( modifiedService.Version = flowID if !modifiedService.IsStateful { - panic(fmt.Sprintf("Service %s is not stateful but is in stateful paths", modifiedService.ServiceID)) + return nil, fmt.Errorf("service %s is not stateful but is in stateful paths", modifiedService.ServiceID) } // Apply a chain of stateful plugins to the stateful service From 68c895a2c14bae2885962cbc5e5a851b3836e913 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Tue, 1 Oct 2024 17:24:43 -0300 Subject: [PATCH 06/22] update the plugin.CreateFlow signature to support multiple deployment specs comming from different services --- kontrol-service/plugins/mock_github.go | 89 +++++++++++++++-------- kontrol-service/plugins/plugins.go | 59 ++++++++------- kontrol-service/plugins/plugins_test.go | 95 ++++++++++++++----------- 3 files changed, 148 insertions(+), 95 deletions(-) diff --git a/kontrol-service/plugins/mock_github.go b/kontrol-service/plugins/mock_github.go index 9aa464b..d1ace3a 100644 --- a/kontrol-service/plugins/mock_github.go +++ b/kontrol-service/plugins/mock_github.go @@ -2,64 +2,83 @@ package plugins var MockGitHub = map[string]map[string]string{ // Simple Plugin - "https://github.com/h4ck3rk3y/a-test-plugin.git": { - "main.py": `REPLACED = "the-text-has-been-replaced" + "https://github.com/fake-org/kardinal-simple-plugin-example.git": { + "main.py": `import copy + +REPLACED = "the-text-has-been-replaced" + + +def create_flow(service_specs: list, deployment_specs: list, flow_uuid, text_to_replace): + modified_deployment_specs = [] + + for deployment_spec in deployment_specs: + modified_deployment_spec = copy.deepcopy(deployment_spec) + modified_deployment_spec['template']['metadata']['labels']['app'] = modified_deployment_spec['template']['metadata']['labels']['app'].replace(text_to_replace, REPLACED) + modified_deployment_spec['selector']['matchLabels']['app'] = modified_deployment_spec['selector']['matchLabels']['app'].replace(text_to_replace, REPLACED) + modified_deployment_spec['template']['spec']['containers'][0]['name'] = modified_deployment_spec['template']['spec']['containers'][0]['name'].replace(text_to_replace, REPLACED) + + modified_deployment_specs.append(modified_deployment_spec) -def create_flow(service_spec, deployment_spec, flow_uuid, text_to_replace): - deployment_spec['template']['metadata']['labels']['app'] = deployment_spec['template']['metadata']['labels']['app'].replace(text_to_replace, REPLACED) - deployment_spec['selector']['matchLabels']['app'] = deployment_spec['selector']['matchLabels']['app'].replace(text_to_replace, REPLACED) - deployment_spec['template']['spec']['containers'][0]['name'] = deployment_spec['template']['spec']['containers'][0]['name'].replace(text_to_replace, REPLACED) - config_map = { "original_text": text_to_replace } - + return { - "deployment_spec": deployment_spec, + "deployment_specs": modified_deployment_specs, "config_map": config_map } + def delete_flow(config_map, flow_uuid): print(config_map["original_text"]) `, }, // Complex Plugin - "https://github.com/h4ck3rk3y/slightly-more-complex-plugin.git": { - "main.py": `import json + "https://github.com/fake-org/kardinal-slightly-more-complex-plugin-example.git": { + "main.py": `import copy import requests -def create_flow(service_spec, deployment_spec, flow_uuid): + +def create_flow(service_specs: list, deployment_specs: list, flow_uuid): response = requests.get("https://4.ident.me") if response.status_code != 200: raise Exception("An unexpected error occurred") - + ip_address = response.text.strip() - - # Replace the IP address in the environment variable - for container in deployment_spec['template']['spec']['containers']: - for env in container['env']: - if env['name'] == 'REDIS': - env['value'] = ip_address - + + modified_deployment_specs = [] + + for deployment_spec in deployment_specs: + modified_deployment_spec = copy.deepcopy(deployment_spec) + # Replace the IP address in the environment variable + for container in modified_deployment_spec['template']['spec']['containers']: + for env in container['env']: + if env['name'] == 'REDIS': + env['value'] = ip_address + + modified_deployment_specs.append(modified_deployment_spec) + + config_map = { "original_value": "ip_addr" } - + return { - "deployment_spec": deployment_spec, + "deployment_specs": modified_deployment_specs, "config_map": config_map } + def delete_flow(config_map, flow_uuid): # In this complex plugin, we don't need to do anything for deletion return None`, "requirements.txt": "requests", }, // Identity Plugin - "https://github.com/h4ck3rk3y/identity-plugin.git": { - "main.py": `def create_flow(service_spec, deployment_spec, flow_uuid): + "https://github.com/fake-org/kardinal-identity-plugin-example.git": { + "main.py": `def create_flow(service_specs, deployment_specs, flow_uuid): return { - "deployment_spec": deployment_spec, + "deployment_specs": deployment_specs, "config_map": {} } @@ -68,16 +87,28 @@ def delete_flow(config_map, flow_uuid): `, }, // Redis sidecar plugin - "https://github.com/h4ck3rk3y/redis-sidecar-plugin.git": { - "main.py": `def create_flow(service_spec, deployment_spec, flow_uuid): - deployment_spec['template']['spec']['containers'][0]["image"] = "kurtosistech/redis-proxy-overlay:latest" + "https://github.com/fake-org/kardinal-redis-sidecar-plugin-example.git": { + "main.py": `import copy + + +def create_flow(service_specs, deployment_specs, flow_uuid): + + modified_deployment_specs = [] + + for deployment_spec in deployment_specs: + modified_deployment_spec = copy.deepcopy(deployment_spec) + modified_deployment_spec['template']['spec']['containers'][0]["image"] = "kurtosistech/redis-proxy-overlay:latest" + + modified_deployment_specs.append(modified_deployment_spec) + return { - "deployment_spec": deployment_spec, + "deployment_specs": modified_deployment_specs, "config_map": {} } def delete_flow(config_map, flow_uuid): pass + `, }, } diff --git a/kontrol-service/plugins/plugins.go b/kontrol-service/plugins/plugins.go index eb64a8b..f66813d 100644 --- a/kontrol-service/plugins/plugins.go +++ b/kontrol-service/plugins/plugins.go @@ -19,6 +19,8 @@ import ( const ( // -- pluginIdFmtStr = "%s-%s-%d" + // -,, + pluginIdFmtStr2 = "%s-%s" ) type PluginRunner struct { @@ -37,58 +39,60 @@ func NewPluginRunner(gitPluginProvider GitPluginProvider, tenantId string, db *d } } -func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpec corev1.ServiceSpec, deploymentSpec appv1.DeploymentSpec, flowUuid string, arguments map[string]string) (appv1.DeploymentSpec, string, error) { +func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpecs []corev1.ServiceSpec, deploymentSpecs []appv1.DeploymentSpec, flowUuid string, arguments map[string]string) ([]appv1.DeploymentSpec, string, error) { repoPath, err := pr.getOrCloneRepo(pluginUrl) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("failed to get or clone repository: %v", err) + return nil, "", fmt.Errorf("failed to get or clone repository: %v", err) } - serviceSpecJSON, err := json.Marshal(serviceSpec) + serviceSpecsJSON, err := json.Marshal(serviceSpecs) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("failed to marshal service spec: %v", err) + return nil, "", fmt.Errorf("failed to marshal service specs: %v", err) } + serviceSpecsJSONStr := base64.StdEncoding.EncodeToString(serviceSpecsJSON) - deploymentSpecJSON, err := json.Marshal(deploymentSpec) + deploymentSpecsJSON, err := json.Marshal(deploymentSpecs) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("failed to marshal deployment spec: %v", err) + return nil, "", fmt.Errorf("failed to marshal deployment specs: %v", err) } + deploymentSpecsJSONStr := base64.StdEncoding.EncodeToString(deploymentSpecsJSON) - result, err := runPythonCreateFlow(repoPath, string(serviceSpecJSON), string(deploymentSpecJSON), flowUuid, arguments) + result, err := runPythonCreateFlow(repoPath, serviceSpecsJSONStr, deploymentSpecsJSONStr, flowUuid, arguments) if err != nil { - return appv1.DeploymentSpec{}, "", err + return nil, "", err } var resultMap map[string]json.RawMessage err = json.Unmarshal([]byte(result), &resultMap) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("failed to parse result: %v", err) + return nil, "", fmt.Errorf("failed to parse result: %v", err) } - var newDeploymentSpec appv1.DeploymentSpec - err = json.Unmarshal(resultMap["deployment_spec"], &newDeploymentSpec) + var newDeploymentSpecs []appv1.DeploymentSpec + err = json.Unmarshal(resultMap["deployment_specs"], &newDeploymentSpecs) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("failed to unmarshal deployment spec: %v", err) + return nil, "", fmt.Errorf("failed to unmarshal deployment spec: %v", err) } configMapJSON := resultMap["config_map"] var configMap map[string]interface{} err = json.Unmarshal(configMapJSON, &configMap) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("invalid config map: %v", err) + return nil, "", fmt.Errorf("invalid config map: %v", err) } configMapBytes, err := json.Marshal(configMap) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("failed to re-marshal config map: %v", err) + return nil, "", fmt.Errorf("failed to re-marshal config map: %v", err) } logrus.Infof("Storing config map for plugin called with uuid '%v':\n %s\n...", flowUuid, string(configMapBytes)) _, err = pr.db.CreatePluginConfig(flowUuid, string(configMapBytes), pr.tenantId) if err != nil { - return appv1.DeploymentSpec{}, "", fmt.Errorf("failed to store the config map: %v", err) + return nil, "", fmt.Errorf("failed to store the config map: %v", err) } - return newDeploymentSpec, string(configMapBytes), nil + return newDeploymentSpecs, string(configMapBytes), nil } func (pr *PluginRunner) DeleteFlow(pluginUrl, flowUuid string, arguments map[string]string) error { @@ -119,6 +123,11 @@ func GetPluginId(flowId, serviceId string, pluginIdx int) string { return fmt.Sprintf(pluginIdFmtStr, flowId, serviceId, pluginIdx) } +func GetPluginId2(flowId string, serviceIds []string) string { + serviceIdsStr := strings.Join(serviceIds, ",") + return fmt.Sprintf(pluginIdFmtStr2, flowId, serviceIdsStr) +} + func (pr *PluginRunner) getConfigForFlow(flowUuid string) (string, error) { pluginConfig, err := pr.db.GetPluginConfigByFlowID(pr.tenantId, flowUuid) if err != nil { @@ -130,7 +139,7 @@ func (pr *PluginRunner) getConfigForFlow(flowUuid string) (string, error) { return pluginConfig.Config, nil } -func runPythonCreateFlow(repoPath, serviceSpecJSON, deploymentSpecJSON, flowUuid string, arguments map[string]string) (string, error) { +func runPythonCreateFlow(repoPath, serviceSpecsJSONStr, deploymentSpecsJSONStr, flowUuid string, arguments map[string]string) (string, error) { scriptPath := filepath.Join(repoPath, "main.py") if _, err := os.Stat(scriptPath); os.IsNotExist(err) { @@ -170,8 +179,10 @@ import base64 sys.path.append("%s") import main -service_spec = json.loads('''%s''') -deployment_spec = json.loads('''%s''') +service_specs_json = base64.b64decode('%s').decode('utf-8') +service_specs = json.loads(service_specs_json) +deployment_specs_json = base64.b64decode('%s').decode('utf-8') +deployment_specs = json.loads(deployment_specs_json) flow_uuid = %q args_json = base64.b64decode('%s').decode('utf-8') args = json.loads(args_json) @@ -182,10 +193,10 @@ sig = inspect.signature(main.create_flow) # Prepare kwargs based on the function signature kwargs = {} for param in sig.parameters.values(): - if param.name == 'service_spec': - kwargs['service_spec'] = service_spec - elif param.name == 'deployment_spec': - kwargs['deployment_spec'] = deployment_spec + if param.name == 'service_specs': + kwargs['service_specs'] = service_specs + elif param.name == 'deployment_specs': + kwargs['deployment_specs'] = deployment_specs elif param.name == 'flow_uuid': kwargs['flow_uuid'] = flow_uuid elif param.name in args: @@ -201,7 +212,7 @@ result = main.create_flow(**kwargs) # Write the result to a temporary file with open('%s', 'w') as f: json.dump(result, f) -`, repoPath, serviceSpecJSON, deploymentSpecJSON, flowUuid, argJsonStr, tempResultFile.Name()) +`, repoPath, serviceSpecsJSONStr, deploymentSpecsJSONStr, flowUuid, argJsonStr, tempResultFile.Name()) if err := executePythonScript(venvPath, repoPath, tempScript); err != nil { return "", err diff --git a/kontrol-service/plugins/plugins_test.go b/kontrol-service/plugins/plugins_test.go index 5200156..a4213a9 100644 --- a/kontrol-service/plugins/plugins_test.go +++ b/kontrol-service/plugins/plugins_test.go @@ -12,42 +12,47 @@ import ( ) const ( - simplePlugin = "https://github.com/h4ck3rk3y/a-test-plugin.git" - complexPlugin = "https://github.com/h4ck3rk3y/slightly-more-complex-plugin.git" - identityPlugin = "https://github.com/h4ck3rk3y/identity-plugin.git" - redisPlugin = "https://github.com/h4ck3rk3y/redis-sidecar-plugin.git" + simplePlugin = "https://github.com/fake-org/kardinal-simple-plugin-example.git" + complexPlugin = "https://github.com/fake-org/kardinal-slightly-more-complex-plugin-example.git" + identityPlugin = "https://github.com/fake-org/kardinal-identity-plugin-example.git" + redisPlugin = "https://github.com/fake-org/kardinal-redis-sidecar-plugin-example.git" flowUuid = "test-flow-uuid" ) -var serviceSpec = corev1.ServiceSpec{} +// TODO Add a test for checking that different env vars values between deployment specs remains the same after the plugin execution +// TODO this is for testing determinism -var deploymentSpec = appv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "helloworld", - }, - }, - Replicas: int32Ptr(1), - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ +var serviceSpecs = []corev1.ServiceSpec{} + +var deploymentSpecs = []appv1.DeploymentSpec{ + { + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ "app": "helloworld", }, }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "helloworld", - Image: "karthequian/helloworld:latest", - Ports: []corev1.ContainerPort{ - { - ContainerPort: 80, + Replicas: int32Ptr(1), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "helloworld", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "helloworld", + Image: "karthequian/helloworld:latest", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + }, }, - }, - Env: []corev1.EnvVar{ - { - Name: "REDIS", - Value: "ip_addr", + Env: []corev1.EnvVar{ + { + Name: "REDIS", + Value: "ip_addr", + }, }, }, }, @@ -81,13 +86,15 @@ func TestSimplePlugin(t *testing.T) { "text_to_replace": "helloworld", } - updatedDeploymentSpec, configMap, err := runner.CreateFlow(simplePlugin, serviceSpec, deploymentSpec, flowUuid, arguments) + updatedDeploymentSpecs, configMap, err := runner.CreateFlow(simplePlugin, serviceSpecs, deploymentSpecs, flowUuid, arguments) require.NoError(t, err) - // Check if the deployment spec was updated correctly - require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.Template.Labels["app"]) - require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.Selector.MatchLabels["app"]) - require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.Template.Spec.Containers[0].Name) + for _, updatedDeploymentSpec := range updatedDeploymentSpecs { + // Check if the deployment spec was updated correctly + require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.Template.Labels["app"]) + require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.Selector.MatchLabels["app"]) + require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.Template.Spec.Containers[0].Name) + } // Verify the config map var configMapData map[string]interface{} @@ -108,11 +115,11 @@ func TestIdentityPlugin(t *testing.T) { runner, cleanUpDbFunc := getPluginRunner(t) defer cleanUpDbFunc() - updatedServiceSpec, configMap, err := runner.CreateFlow(identityPlugin, serviceSpec, deploymentSpec, flowUuid, map[string]string{}) + updatedServiceSpec, configMap, err := runner.CreateFlow(identityPlugin, serviceSpecs, deploymentSpecs, flowUuid, map[string]string{}) require.NoError(t, err) // Check if the deployment spec was updated correctly - require.Equal(t, deploymentSpec, updatedServiceSpec) + require.Equal(t, deploymentSpecs, updatedServiceSpec) // Verify the config map var configMapData map[string]interface{} @@ -133,12 +140,14 @@ func TestComplexPlugin(t *testing.T) { runner, cleanUpDbFunc := getPluginRunner(t) defer cleanUpDbFunc() - updatedServiceSpec, configMap, err := runner.CreateFlow(complexPlugin, serviceSpec, deploymentSpec, flowUuid, map[string]string{}) + updatedDeploymentSpecs, configMap, err := runner.CreateFlow(complexPlugin, serviceSpecs, deploymentSpecs, flowUuid, map[string]string{}) require.NoError(t, err) - // Check if the deployment spec was updated correctly - require.NotEqual(t, "ip_addr", updatedServiceSpec.Template.Spec.Containers[0].Env[0].Value) - require.Regexp(t, `\b(?:\d{1,3}\.){3}\d{1,3}\b`, updatedServiceSpec.Template.Spec.Containers[0].Env[0].Value) + for _, updatedDeploymentSpec := range updatedDeploymentSpecs { + // Check if the deployment spec was updated correctly + require.NotEqual(t, "ip_addr", updatedDeploymentSpec.Template.Spec.Containers[0].Env[0].Value) + require.Regexp(t, `\b(?:\d{1,3}\.){3}\d{1,3}\b`, updatedDeploymentSpec.Template.Spec.Containers[0].Env[0].Value) + } // Verify the config map var configMapData map[string]interface{} @@ -159,11 +168,13 @@ func TestRedisPluginTest(t *testing.T) { runner, cleanUpDbFunc := getPluginRunner(t) defer cleanUpDbFunc() - updatedServiceSpec, configMap, err := runner.CreateFlow(redisPlugin, serviceSpec, deploymentSpec, flowUuid, map[string]string{}) + updatedDeploymentSpecs, configMap, err := runner.CreateFlow(redisPlugin, serviceSpecs, deploymentSpecs, flowUuid, map[string]string{}) require.NoError(t, err) - // Check if the deployment spec was updated correctly - require.Equal(t, "kurtosistech/redis-proxy-overlay:latest", updatedServiceSpec.Template.Spec.Containers[0].Image) + for _, updatedDeploymentSpec := range updatedDeploymentSpecs { + // Check if the deployment spec was updated correctly + require.Equal(t, "kurtosistech/redis-proxy-overlay:latest", updatedDeploymentSpec.Template.Spec.Containers[0].Image) + } // Verify the config map var configMapData map[string]interface{} From dee29ed16f2e57fec657c31d8b045111b09c6e20 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 2 Oct 2024 11:02:00 -0300 Subject: [PATCH 07/22] refactor how to execute plugins --- kontrol-service/engine/flow/dev_flow.go | 407 +++++++++++++----------- kontrol-service/plugins/plugins.go | 2 + 2 files changed, 230 insertions(+), 179 deletions(-) diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index 0d04588..fa06eea 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -2,6 +2,8 @@ package flow import ( "fmt" + appv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "kardinal.kontrol-service/constants" @@ -14,8 +16,6 @@ import ( "kardinal.kontrol-service/plugins" "kardinal.kontrol-service/types/cluster_topology/resolved" "kardinal.kontrol-service/types/flow_spec" - - v1 "k8s.io/api/apps/v1" ) // CreateDevFlow creates a dev flow from the given topologies @@ -49,17 +49,8 @@ func CreateDevFlow( topologyRef := &topology clusterGraph := topologyToGraph(topologyRef) - for _, servicePatch := range flowPatch.ServicePatches { - serviceID := servicePatch.Service - logrus.Infof("calculating new flow for service %s", serviceID) - targetService, err := topologyRef.GetService(serviceID) - if err != nil { - return nil, err - } - _, err = applyPatch(pluginRunner, topologyRef, clusterGraph, flowID, targetService, servicePatch.DeploymentSpec) - if err != nil { - return nil, err - } + if err := applyPatch(pluginRunner, topologyRef, clusterGraph, flowID, flowPatch.ServicePatches); err != nil { + return nil, err } // the baseline topology flow ID and flow version are equal to the namespace these three should use same value @@ -143,210 +134,268 @@ func markParentsAsShared(topology *resolved.ClusterTopology, service *resolved.S func applyPatch( pluginRunner *plugins.PluginRunner, - topology *resolved.ClusterTopology, + topologyRef *resolved.ClusterTopology, clusterGraph graph.Graph[resolved.ServiceHash, *resolved.Service], flowID string, - targetService *resolved.Service, - deploymentSpec *v1.DeploymentSpec, -) (*resolved.ClusterTopology, error) { - // Find downstream stateful services - statefulPaths := findAllDownstreamStatefulPaths(targetService, clusterGraph, topology) - statefulServices := make([]*resolved.Service, 0) - for _, path := range statefulPaths { - statefulServiceHash, err := lo.Last(path) - if statefulServiceHash == "" || err != nil { - logrus.Infof("Error finding last stateful service hash in path %v: %v", path, err) - } - statefulService, err := clusterGraph.Vertex(statefulServiceHash) - if err != nil { - return nil, fmt.Errorf("an error occurred getting stateful service vertex from graph: %s", err) - } - statefulServices = append(statefulServices, statefulService) - } - statefulServices = lo.Uniq(statefulServices) - - externalPaths := findAllDownstreamExternalPaths(targetService, clusterGraph, topology) - externalServices := make([]*resolved.Service, 0) - alreadyHandledExternalServices := make([]string, 0) - for _, path := range externalPaths { - externalServiceHash, err := lo.Last(path) - if externalServiceHash == "" || err != nil { - logrus.Infof("Error finding last external service hash in path %v: %v", path, err) - } - externalService, err := clusterGraph.Vertex(externalServiceHash) + servicePatches []flow_spec.ServicePatch, +) error { + + // TODO could create a custom type for it with a Add method and a Get Method, in order to centralize the addition if someone else want o use it later in another part in the code + pluginServices := map[*resolved.StatefulPlugin][]*resolved.Service{} + + for _, servicePatch := range servicePatches { + serviceID := servicePatch.Service + logrus.Infof("calculating new flow for service %s", serviceID) + targetService, err := topologyRef.GetService(serviceID) if err != nil { - return nil, fmt.Errorf("an error occurred getting external service vertex from graph: %s", err) + return err } - externalServices = append(externalServices, externalService) - } - externalServices = lo.Uniq(externalServices) - - //pluginServices := map[*resolved.StatefulPlugin][]*resolved.Service{} - //for _, plugin := range targetService.StatefulPlugins { - // servicesWithPlugin := []*resolved.Service{targetService} - // alreadyServicesWithPlugin, ok := pluginServices[plugin] - // if ok { - // servicesWithPlugin = append(alreadyServicesWithPlugin, targetService) - // } - // pluginServices[plugin] = servicesWithPlugin - //} - - // TODO SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID - - // handle external service plugins on this service - logrus.Infof("Checking if this service has any external services...") - for pluginIdx, plugin := range targetService.StatefulPlugins { - if plugin.Type == "external" { - logrus.Infof("service %s contains an external dependency plugin: %v", targetService.ServiceID, plugin.Name) - - // find the existing external service and update it in the topology to get a new version - externalService, err := topology.GetService(plugin.ServiceName) - if err != nil { - return nil, fmt.Errorf("external service specified by plugin '%v' was not found in base topology", plugin.ServiceName) - } - err = applyExternalServicePlugin(pluginRunner, targetService, externalService, plugin, pluginIdx, flowID) + // Find downstream stateful services + statefulPaths := findAllDownstreamStatefulPaths(targetService, clusterGraph, topologyRef) + statefulServices := make([]*resolved.Service, 0) + for _, path := range statefulPaths { + statefulServiceHash, err := lo.Last(path) + if statefulServiceHash == "" || err != nil { + logrus.Infof("Error finding last stateful service hash in path %v: %v", path, err) + } + statefulService, err := clusterGraph.Vertex(statefulServiceHash) if err != nil { - return nil, stacktrace.Propagate(err, "An error occurred creating external servie plugin for external service '%v' depended on by '%v'", externalService.ServiceID, targetService.ServiceID) + return fmt.Errorf("an error occurred getting stateful service vertex from graph: %s", err) } - - err = topology.MoveServiceToVersion(externalService, flowID) + statefulServices = append(statefulServices, statefulService) + } + statefulServices = lo.Uniq(statefulServices) + + externalPaths := findAllDownstreamExternalPaths(targetService, clusterGraph, topologyRef) + externalServices := make([]*resolved.Service, 0) + alreadyHandledExternalServices := make([]string, 0) + for _, path := range externalPaths { + externalServiceHash, err := lo.Last(path) + if externalServiceHash == "" || err != nil { + logrus.Infof("Error finding last external service hash in path %v: %v", path, err) + } + externalService, err := clusterGraph.Vertex(externalServiceHash) if err != nil { - return nil, err + return fmt.Errorf("an error occurred getting external service vertex from graph: %s", err) } + externalServices = append(externalServices, externalService) } - } + externalServices = lo.Uniq(externalServices) + + // TODO SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID + // handle external service plugins on this service + logrus.Infof("Checking if this service has any external services...") + for _, plugin := range targetService.StatefulPlugins { + + // TODO this is adding both kind of plugins stateful and external + alreadyServicesWithPlugin, ok := pluginServices[plugin] + if ok { + pluginServices[plugin] = append(alreadyServicesWithPlugin, targetService) + } else { + pluginServices[plugin] = []*resolved.Service{targetService} + } - // TODO SECTION 2 - Target service updates with new modifications - modifiedTargetService := DeepCopyService(targetService) - modifiedTargetService.DeploymentSpec = deploymentSpec - modifiedTargetService.Version = flowID - err := topology.UpdateWithService(modifiedTargetService) - if err != nil { - return nil, err - } + // Edit the external service k8s.Service resource setting it the flow ID + if plugin.Type == "external" { + logrus.Infof("service %s contains an external dependency plugin: %v", targetService.ServiceID, plugin.Name) - // TODO SECTION 3 - handle stateful services - for serviceIdx, service := range topology.Services { - if lo.Contains(statefulServices, service) { - logrus.Debugf("applying stateful plugins on service: %s", service.ServiceID) - // Don't modify the original service - modifiedService := DeepCopyService(service) - modifiedService.Version = flowID + // find the existing external service and update it in the topology to get a new version + externalService, err := topologyRef.GetService(plugin.ServiceName) + if err != nil { + return fmt.Errorf("external service specified by plugin '%v' was not found in base topology", plugin.ServiceName) + } - if !modifiedService.IsStateful { - return nil, fmt.Errorf("service %s is not stateful but is in stateful paths", modifiedService.ServiceID) - } + //err = applyExternalServicePlugin(pluginRunner, targetService, externalService, plugin, pluginIdx, flowID) + //if err != nil { + // return stacktrace.Propagate(err, "An error occurred creating external servie plugin for external service '%v' depended on by '%v'", externalService.ServiceID, targetService.ServiceID) + //} - // Apply a chain of stateful plugins to the stateful service - resultSpec := DeepCopyDeploymentSpec(modifiedService.DeploymentSpec) - for pluginIdx, plugin := range modifiedService.StatefulPlugins { - if plugin.Type == "external" { - // we handle external plugins above - // might need to handle this if stateful services can have external dependencies - continue - } - logrus.Infof("Applying plugin %s for service %s with flow id %s", plugin.Name, modifiedService.ServiceID, flowID) - pluginId := plugins.GetPluginId(flowID, modifiedService.ServiceID, pluginIdx) - spec, _, err := pluginRunner.CreateFlow(plugin.Name, *modifiedService.ServiceSpec, *resultSpec, pluginId, plugin.Args) + err = topologyRef.MoveServiceToVersion(externalService, flowID) if err != nil { - return nil, fmt.Errorf("error creating flow for service %s: %v", modifiedService.ServiceID, err) + return err } - resultSpec = &spec } + } - // Update service with final deployment spec - modifiedService.DeploymentSpec = resultSpec - - topology.Services[serviceIdx] = modifiedService - topology.UpdateDependencies(service, modifiedService) - - // create versioned parents for non http stateful services - // TODO - this should be done for all non http services and not just the stateful ones - // every child should be copied; immediate parent duplicated - // if children of non http services support http then our routing will have to be modified - // we should treat those http services as non http; a hack could be to remove the appProtocol HTTP marking - if !modifiedService.IsHTTP() { - logrus.Infof("Stateful service %s is non http; its parents shall be duplicated", modifiedService.ServiceID) - parents := topology.FindImmediateParents(service) - for _, parent := range parents { - logrus.Infof("Duplicating parent %s", parent.ServiceID) - err = topology.MoveServiceToVersion(parent, flowID) - if err != nil { - return nil, err + // TODO SECTION 2 - Target service updates with new modifications + modifiedTargetService := DeepCopyService(targetService) + modifiedTargetService.DeploymentSpec = servicePatch.DeploymentSpec + modifiedTargetService.Version = flowID + err = topologyRef.UpdateWithService(modifiedTargetService) + if err != nil { + return err + } + + // TODO SECTION 3 - handle stateful services + for serviceIdx, service := range topologyRef.Services { + if lo.Contains(statefulServices, service) { + logrus.Debugf("applying stateful plugins on service: %s", service.ServiceID) + // Don't modify the original service + modifiedService := DeepCopyService(service) + modifiedService.Version = flowID + + if !modifiedService.IsStateful { + return fmt.Errorf("service %s is not stateful but is in stateful paths", modifiedService.ServiceID) + } + + // Apply a chain of stateful plugins to the stateful service + resultSpec := DeepCopyDeploymentSpec(modifiedService.DeploymentSpec) + + // TODO I Think we can remove this part because this plugin was already added above + //for _, plugin := range modifiedService.StatefulPlugins { + //if plugin.Type == "external" { + // we handle external plugins above + // might need to handle this if stateful services can have external dependencies + //continue + //} + + //logrus.Infof("Applying plugin %s for service %s with flow id %s", plugin.Name, modifiedService.ServiceID, flowID) + //pluginId := plugins.GetPluginId(flowID, modifiedService.ServiceID, pluginIdx) + //spec, _, err := pluginRunner.CreateFlow(plugin.Name, *modifiedService.ServiceSpec, *resultSpec, pluginId, plugin.Args) + //if err != nil { + // return fmt.Errorf("error creating flow for service %s: %v", modifiedService.ServiceID, err) + //} + //resultSpec = &spec + //} + + // Update service with final deployment spec + modifiedService.DeploymentSpec = resultSpec + + topologyRef.Services[serviceIdx] = modifiedService + topologyRef.UpdateDependencies(service, modifiedService) + + // create versioned parents for non http stateful services + // TODO - this should be done for all non http services and not just the stateful ones + // every child should be copied; immediate parent duplicated + // if children of non http services support http then our routing will have to be modified + // we should treat those http services as non http; a hack could be to remove the appProtocol HTTP marking + if !modifiedService.IsHTTP() { + logrus.Infof("Stateful service %s is non http; its parents shall be duplicated", modifiedService.ServiceID) + parents := topologyRef.FindImmediateParents(service) + for _, parent := range parents { + logrus.Infof("Duplicating parent %s", parent.ServiceID) + err = topologyRef.MoveServiceToVersion(parent, flowID) + if err != nil { + return err + } } } } - } - // if the service is an external service of the target service, it was already handled above - if lo.Contains(externalServices, service) && !lo.Contains(alreadyHandledExternalServices, service.ServiceID) { - // assume there's only one parent service for now but eventually we'll likely need to account for multiple parents to external service - parentServices := topology.FindImmediateParents(service) - if len(parentServices) == 0 { - return nil, stacktrace.NewError("Expected to find a parent service to the external service '%v' but did not find one. All external services should have a parent so this is a bug in Kardinal.", service.ServiceID) - } - parentService := parentServices[0] - modifiedParentService := DeepCopyService(parentService) - - _, found := lo.Find(parentService.StatefulPlugins, func(plugin *resolved.StatefulPlugin) bool { - return plugin.ServiceName == service.ServiceID - }) - if !found { - return nil, stacktrace.NewError("Did not find plugin on parent service '%v' for the corresponding external service '%v'.This is a bug in Kardinal.", parentService.ServiceID, service.ServiceID) - } + // TODO SECTION 3 - handle external services that are not target service dependencies + // if the service is an external service of the target service, it was already handled above + if lo.Contains(externalServices, service) && !lo.Contains(alreadyHandledExternalServices, service.ServiceID) { + // assume there's only one parent service for now but eventually we'll likely need to account for multiple parents to external service + parentServices := topologyRef.FindImmediateParents(service) + if len(parentServices) == 0 { + return stacktrace.NewError("Expected to find a parent service to the external service '%v' but did not find one. All external services should have a parent so this is a bug in Kardinal.", service.ServiceID) + } + parentService := parentServices[0] + modifiedParentService := DeepCopyService(parentService) + + _, found := lo.Find(parentService.StatefulPlugins, func(plugin *resolved.StatefulPlugin) bool { + return plugin.ServiceName == service.ServiceID + }) + if !found { + return stacktrace.NewError("Did not find plugin on parent service '%v' for the corresponding external service '%v'.This is a bug in Kardinal.", parentService.ServiceID, service.ServiceID) + } + + for _, plugin := range parentService.StatefulPlugins { + // assume there's only one plugin on the parent service for this external service - for pluginIdx, plugin := range parentService.StatefulPlugins { - // assume there's only one plugin on the parent service for this external service - if plugin.ServiceName == service.ServiceID { - err := applyExternalServicePlugin(pluginRunner, parentService, service, plugin, pluginIdx, flowID) - if err != nil { - return nil, stacktrace.Propagate(err, "error creating flow for external service '%s'", service.ServiceID) + // TODO this is adding both kind of plugins stateful and external + alreadyServicesWithPlugin, ok := pluginServices[plugin] + if ok { + pluginServices[plugin] = append(alreadyServicesWithPlugin, parentService) + } else { + pluginServices[plugin] = []*resolved.Service{parentService} } + + //if plugin.ServiceName == service.ServiceID { + //err := applyExternalServicePlugin(pluginRunner, parentService, service, plugin, pluginIdx, flowID) + //if err != nil { + //return stacktrace.Propagate(err, "error creating flow for external service '%s'", service.ServiceID) + //} + //} } - } - // add a flow version of the external service to the plugin - err := topology.MoveServiceToVersion(service, flowID) - if err != nil { - return nil, err - } + // add a flow version of the external service to the plugin + err := topologyRef.MoveServiceToVersion(service, flowID) + if err != nil { + return err + } - // add the parent to the topology replacing the deployment spec with the spec returned from the flow - err = topology.MoveServiceToVersion(modifiedParentService, flowID) - if err != nil { - return nil, err + // add the parent to the topology replacing the deployment spec with the spec returned from the flow + err = topologyRef.MoveServiceToVersion(modifiedParentService, flowID) + if err != nil { + return err + } } } } - return topology, nil -} + // Execute plugins and update the services deployment specs with the plugin's modifications + for plugin, services := range pluginServices { + var servicesIds []string + var servicesServiceSpecs []corev1.ServiceSpec + var servicesDeploymentSpecs []appv1.DeploymentSpec -// TODO: have this handle stateful service plugins -func applyExternalServicePlugin( - pluginRunner *plugins.PluginRunner, - dependentService *resolved.Service, - externalService *resolved.Service, - externalServicePlugin *resolved.StatefulPlugin, - pluginIdx int, - flowId string, -) error { - if externalServicePlugin.Type != "external" { - return nil - } + for _, service := range services { + servicesIds = append(servicesIds, service.ServiceID) + servicesServiceSpecs = append(servicesServiceSpecs, *service.ServiceSpec) + servicesDeploymentSpecs = append(servicesDeploymentSpecs, *service.DeploymentSpec) + } + + pluginId := plugins.GetPluginId2(flowID, servicesIds) + logrus.Infof("Calling plugin '%v'...", pluginId) - logrus.Infof("Calling external service '%v' plugin with parent service '%v'...", externalService.ServiceID, dependentService.ServiceID) - pluginId := plugins.GetPluginId(flowId, dependentService.ServiceID, pluginIdx) - spec, _, err := pluginRunner.CreateFlow(externalServicePlugin.Name, *dependentService.ServiceSpec, *dependentService.DeploymentSpec, pluginId, externalServicePlugin.Args) - if err != nil { - return stacktrace.Propagate(err, "error creating flow for external service '%s'", externalService.ServiceID) + servicesModifiedDeploymentSpecs, _, err := pluginRunner.CreateFlow(plugin.Name, servicesServiceSpecs, servicesDeploymentSpecs, pluginId, plugin.Args) + if err != nil { + return stacktrace.Propagate(err, "error when creating plugin flow for plugin '%s'", pluginId) + } + + if len(services) != len(servicesModifiedDeploymentSpecs) { + return fmt.Errorf("an error occurred executing plugin '%s', the number of deployment specs returned by the plugin.CreateFlow function are not equal to the number of service depending on it, please check the plugin code or report a bug in the Kardinal repository", plugin.ServiceName) + } + + // updating the service.deployment_spec after the plugin execution + for serviceIndex, serviceToUpdate := range services { + modifiedDeploymentSpec := servicesModifiedDeploymentSpecs[serviceIndex] + serviceToUpdate.DeploymentSpec = &modifiedDeploymentSpec + if err := topologyRef.UpdateWithService(serviceToUpdate); err != nil { + return fmt.Errorf("an error occurred updating service '%s'", serviceToUpdate.ServiceID) + } + } } - dependentService.DeploymentSpec = &spec return nil } +// TODO: have this handle stateful service plugins +//func applyExternalServicePlugin( +//pluginRunner *plugins.PluginRunner, +//dependentService *resolved.Service, +//externalService *resolved.Service, +//externalServicePlugin *resolved.StatefulPlugin, +//pluginIdx int, +//flowId string, +//) error { +//if externalServicePlugin.Type != "external" { +//return nil +//} + +//logrus.Infof("Calling external service '%v' plugin with parent service '%v'...", externalService.ServiceID, dependentService.ServiceID) +//pluginId := plugins.GetPluginId(flowId, dependentService.ServiceID, pluginIdx) +//spec, _, err := pluginRunner.CreateFlow(externalServicePlugin.Name, *dependentService.ServiceSpec, *dependentService.DeploymentSpec, pluginId, externalServicePlugin.Args) +//if err != nil { +//return stacktrace.Propagate(err, "error creating flow for external service '%s'", externalService.ServiceID) +//} + +//dependentService.DeploymentSpec = &spec +//return nil +//} + func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTopology, flowId string) error { for _, service := range topology.Services { // don't need to delete flow for services in the topology that aren't a part of this flow diff --git a/kontrol-service/plugins/plugins.go b/kontrol-service/plugins/plugins.go index f66813d..ac0f5ed 100644 --- a/kontrol-service/plugins/plugins.go +++ b/kontrol-service/plugins/plugins.go @@ -119,10 +119,12 @@ func (pr *PluginRunner) DeleteFlow(pluginUrl, flowUuid string, arguments map[str return nil } +// TODO remove this after the DeleteDevFlow refactor func GetPluginId(flowId, serviceId string, pluginIdx int) string { return fmt.Sprintf(pluginIdFmtStr, flowId, serviceId, pluginIdx) } +// TODO rename it to the original name func GetPluginId2(flowId string, serviceIds []string) string { serviceIdsStr := strings.Join(serviceIds, ",") return fmt.Sprintf(pluginIdFmtStr2, flowId, serviceIdsStr) From 02e3a67277af921a9bc328df068c246afa592db8 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 2 Oct 2024 12:59:12 -0300 Subject: [PATCH 08/22] handling services plugins --- kontrol-service/engine/flow/dev_flow.go | 24 ++++++++++++------- .../plugins/git_plugin_provider.go | 1 + 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index 728f78a..82ecdfd 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -141,7 +141,8 @@ func applyPatch( ) error { // TODO could create a custom type for it with a Add method and a Get Method, in order to centralize the addition if someone else want o use it later in another part in the code - pluginServices := map[*resolved.StatefulPlugin][]*resolved.Service{} + pluginServices := map[string][]*resolved.Service{} + pluginServicesMap := map[string]*resolved.StatefulPlugin{} for _, servicePatch := range servicePatches { serviceID := servicePatch.Service @@ -189,12 +190,13 @@ func applyPatch( for _, plugin := range targetService.StatefulPlugins { // TODO this is adding both kind of plugins stateful and external - alreadyServicesWithPlugin, ok := pluginServices[plugin] + alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { - pluginServices[plugin] = append(alreadyServicesWithPlugin, targetService) + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService) } else { - pluginServices[plugin] = []*resolved.Service{targetService} + pluginServices[plugin.ServiceName] = []*resolved.Service{targetService} } + pluginServicesMap[plugin.ServiceName] = plugin // Edit the external service k8s.Service resource setting it the flow ID if plugin.Type == "external" { @@ -305,12 +307,13 @@ func applyPatch( // assume there's only one plugin on the parent service for this external service // TODO this is adding both kind of plugins stateful and external - alreadyServicesWithPlugin, ok := pluginServices[plugin] + alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { - pluginServices[plugin] = append(alreadyServicesWithPlugin, parentService) + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService) } else { - pluginServices[plugin] = []*resolved.Service{parentService} + pluginServices[plugin.ServiceName] = []*resolved.Service{targetService} } + pluginServicesMap[plugin.ServiceName] = plugin //if plugin.ServiceName == service.ServiceID { //err := applyExternalServicePlugin(pluginRunner, parentService, service, plugin, pluginIdx, flowID) @@ -336,11 +339,16 @@ func applyPatch( } // Execute plugins and update the services deployment specs with the plugin's modifications - for plugin, services := range pluginServices { + for pluginServiceName, services := range pluginServices { var servicesIds []string var servicesServiceSpecs []corev1.ServiceSpec var servicesDeploymentSpecs []appv1.DeploymentSpec + plugin, ok := pluginServicesMap[pluginServiceName] + if !ok { + return stacktrace.NewError("expected to find plugin with service name '%s' in the plugins service map, this is a bug in Kardinal", pluginServiceName) + } + for _, service := range services { servicesIds = append(servicesIds, service.ServiceID) servicesServiceSpecs = append(servicesServiceSpecs, *service.ServiceSpec) diff --git a/kontrol-service/plugins/git_plugin_provider.go b/kontrol-service/plugins/git_plugin_provider.go index 3ddf402..e5ebd48 100644 --- a/kontrol-service/plugins/git_plugin_provider.go +++ b/kontrol-service/plugins/git_plugin_provider.go @@ -28,6 +28,7 @@ func (gpp *GitPluginProviderImpl) PullGitHubPlugin(repoPath, repoUrl string) err return fmt.Errorf("git clone failed: %v\nOutput: %s", err, output) } } else { + return nil // TODO remove this line, it's only for testing purpose // If the repository already exists, pull the latest changes cmd := exec.Command("git", "-C", repoPath, "pull") if output, err := cmd.CombinedOutput(); err != nil { From 7fa7fb8d2bbb5e8ac59140655e409539cd3b6f51 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 2 Oct 2024 19:27:14 -0300 Subject: [PATCH 09/22] handling plugins delete flow --- kontrol-service/engine/docker.go | 21 ++++++++--- kontrol-service/engine/flow/dev_flow.go | 48 ++++++++++++++----------- kontrol-service/plugins/plugins.go | 17 ++++++--- 3 files changed, 58 insertions(+), 28 deletions(-) diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index 82f0e57..391c510 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -165,6 +165,13 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri // Second, iterate the services to create the clusterTopology service with partial data (no dependencies set here) for _, serviceConfig := range serviceConfigs { service := serviceConfig.Service + + // A plugin services (k8s services with the kardinal.dev.service/plugin-definition) are used to define a plugin Type and not a k8s service + isPlugin, _ := isPluginService(service) + if isPlugin { + continue + } + serviceAnnotations := service.GetObjectMeta().GetAnnotations() // 1- Service @@ -219,10 +226,8 @@ func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version stri func addAvailablePluginsFromServiceConfig(serviceConfig apitypes.ServiceConfig, availablePlugins map[string]*resolved.StatefulPlugin) (map[string]*resolved.StatefulPlugin, error) { service := serviceConfig.Service - serviceAnnotations := service.GetObjectMeta().GetAnnotations() - - pluginAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugin-definition"] - if ok { + isPlugin, pluginAnnotation := isPluginService(service) + if isPlugin { var statefulPlugins []resolved.StatefulPlugin err := yaml.Unmarshal([]byte(pluginAnnotation), &statefulPlugins) if err != nil { @@ -242,6 +247,14 @@ func addAvailablePluginsFromServiceConfig(serviceConfig apitypes.ServiceConfig, return availablePlugins, nil } +func isPluginService(service corev1.Service) (bool, string) { + serviceAnnotations := service.GetObjectMeta().GetAnnotations() + + pluginAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugin-definition"] + + return ok, pluginAnnotation +} + func newServicePluginsAndExternalServicesFromServiceConfig( serviceConfig apitypes.ServiceConfig, version string, diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index 82ecdfd..9f9cd20 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -244,22 +244,30 @@ func applyPatch( // Apply a chain of stateful plugins to the stateful service resultSpec := DeepCopyDeploymentSpec(modifiedService.DeploymentSpec) - // TODO I Think we can remove this part because this plugin was already added above - //for _, plugin := range modifiedService.StatefulPlugins { - //if plugin.Type == "external" { - // we handle external plugins above - // might need to handle this if stateful services can have external dependencies - //continue - //} + for _, plugin := range modifiedService.StatefulPlugins { + if plugin.Type == "external" { + //we handle external plugins above + //might need to handle this if stateful services can have external dependencies + continue + } - //logrus.Infof("Applying plugin %s for service %s with flow id %s", plugin.Name, modifiedService.ServiceID, flowID) - //pluginId := plugins.GetPluginId(flowID, modifiedService.ServiceID, pluginIdx) - //spec, _, err := pluginRunner.CreateFlow(plugin.Name, *modifiedService.ServiceSpec, *resultSpec, pluginId, plugin.Args) - //if err != nil { - // return fmt.Errorf("error creating flow for service %s: %v", modifiedService.ServiceID, err) - //} - //resultSpec = &spec - //} + // TODO this is adding both kind of plugins stateful and external + alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] + if ok { + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService) + } else { + pluginServices[plugin.ServiceName] = []*resolved.Service{targetService} + } + pluginServicesMap[plugin.ServiceName] = plugin + + //logrus.Infof("Applying plugin %s for service %s with flow id %s", plugin.Name, modifiedService.ServiceID, flowID) + //pluginId := plugins.GetPluginId(flowID, modifiedService.ServiceID, pluginIdx) + //spec, _, err := pluginRunner.CreateFlow(plugin.Name, *modifiedService.ServiceSpec, *resultSpec, pluginId, plugin.Args) + //if err != nil { + // return fmt.Errorf("error creating flow for service %s: %v", modifiedService.ServiceID, err) + //} + //resultSpec = &spec + } // Update service with final deployment spec modifiedService.DeploymentSpec = resultSpec @@ -340,7 +348,7 @@ func applyPatch( // Execute plugins and update the services deployment specs with the plugin's modifications for pluginServiceName, services := range pluginServices { - var servicesIds []string + //var servicesIds []string var servicesServiceSpecs []corev1.ServiceSpec var servicesDeploymentSpecs []appv1.DeploymentSpec @@ -350,12 +358,12 @@ func applyPatch( } for _, service := range services { - servicesIds = append(servicesIds, service.ServiceID) + //servicesIds = append(servicesIds, service.ServiceID) servicesServiceSpecs = append(servicesServiceSpecs, *service.ServiceSpec) servicesDeploymentSpecs = append(servicesDeploymentSpecs, *service.DeploymentSpec) } - pluginId := plugins.GetPluginId2(flowID, servicesIds) + pluginId := plugins.GetPluginId3(plugin.ServiceName, flowID) logrus.Infof("Calling plugin '%v'...", pluginId) servicesModifiedDeploymentSpecs, _, err := pluginRunner.CreateFlow(plugin.Name, servicesServiceSpecs, servicesDeploymentSpecs, pluginId, plugin.Args) @@ -419,9 +427,9 @@ func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTop } func DeleteDevFlow(pluginRunner *plugins.PluginRunner, flowId string, service *resolved.Service) error { - for pluginIdx, plugin := range service.StatefulPlugins { + for _, plugin := range service.StatefulPlugins { logrus.Infof("Attempting to delete flow for plugin '%v' on flow '%v'", plugin.Name, flowId) - pluginId := plugins.GetPluginId(flowId, service.ServiceID, pluginIdx) + pluginId := plugins.GetPluginId3(plugin.ServiceName, flowId) err := pluginRunner.DeleteFlow(plugin.Name, pluginId) if err != nil { logrus.Errorf("Error deleting flow: %v.", err) diff --git a/kontrol-service/plugins/plugins.go b/kontrol-service/plugins/plugins.go index 68649de..7db5878 100644 --- a/kontrol-service/plugins/plugins.go +++ b/kontrol-service/plugins/plugins.go @@ -19,8 +19,12 @@ import ( const ( // -- pluginIdFmtStr = "%s-%s-%d" - // -,, - pluginIdFmtStr2 = "%s-%s" + // TODO use this last one and remove the previous one + // --,, + pluginIdFmtStr2 = "%s-%s-%s" + // TODO use this last one and remove the previous one + // - + pluginIdFmtStr3 = "%s-%s" ) type PluginRunner struct { @@ -125,9 +129,14 @@ func GetPluginId(flowId, serviceId string, pluginIdx int) string { } // TODO rename it to the original name -func GetPluginId2(flowId string, serviceIds []string) string { +func GetPluginId2(pluginServiceName string, flowId string, serviceIds []string) string { serviceIdsStr := strings.Join(serviceIds, ",") - return fmt.Sprintf(pluginIdFmtStr2, flowId, serviceIdsStr) + return fmt.Sprintf(pluginIdFmtStr2, pluginServiceName, flowId, serviceIdsStr) +} + +// TODO rename it to the original name +func GetPluginId3(pluginServiceName string, flowId string) string { + return fmt.Sprintf(pluginIdFmtStr3, pluginServiceName, flowId) } func (pr *PluginRunner) getConfigForFlow(flowUuid string) (string, error) { From 24d0ff3b6b743db15d22301b38d5c55e42c1e2e4 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Thu, 3 Oct 2024 16:11:44 -0300 Subject: [PATCH 10/22] updating target service version in apply patch --- kontrol-service/engine/flow/dev_flow.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index 9f9cd20..d05c353 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -48,8 +48,7 @@ func CreateDevFlow( topologyRef := &topology - clusterGraph := topologyToGraph(topologyRef) - if err := applyPatch(pluginRunner, topologyRef, clusterGraph, flowID, flowPatch.ServicePatches); err != nil { + if err := applyPatch(pluginRunner, topologyRef, flowID, flowPatch.ServicePatches); err != nil { return nil, err } @@ -135,7 +134,6 @@ func markParentsAsShared(topology *resolved.ClusterTopology, service *resolved.S func applyPatch( pluginRunner *plugins.PluginRunner, topologyRef *resolved.ClusterTopology, - clusterGraph graph.Graph[resolved.ServiceHash, *resolved.Service], flowID string, servicePatches []flow_spec.ServicePatch, ) error { @@ -144,6 +142,8 @@ func applyPatch( pluginServices := map[string][]*resolved.Service{} pluginServicesMap := map[string]*resolved.StatefulPlugin{} + clusterGraph := topologyToGraph(topologyRef) + for _, servicePatch := range servicePatches { serviceID := servicePatch.Service logrus.Infof("calculating new flow for service %s", serviceID) @@ -223,8 +223,7 @@ func applyPatch( // TODO SECTION 2 - Target service updates with new modifications modifiedTargetService := DeepCopyService(targetService) modifiedTargetService.DeploymentSpec = servicePatch.DeploymentSpec - modifiedTargetService.Version = flowID - err = topologyRef.UpdateWithService(modifiedTargetService) + err = topologyRef.MoveServiceToVersion(modifiedTargetService, flowID) if err != nil { return err } @@ -379,7 +378,7 @@ func applyPatch( for serviceIndex, serviceToUpdate := range services { modifiedDeploymentSpec := servicesModifiedDeploymentSpecs[serviceIndex] serviceToUpdate.DeploymentSpec = &modifiedDeploymentSpec - if err := topologyRef.UpdateWithService(serviceToUpdate); err != nil { + if err := topologyRef.MoveServiceToVersion(serviceToUpdate, flowID); err != nil { return fmt.Errorf("an error occurred updating service '%s'", serviceToUpdate.ServiceID) } } From 17cab3bdefc29e1df46018fa6a1cba133d500a3b Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Fri, 4 Oct 2024 16:43:17 -0300 Subject: [PATCH 11/22] fix delete flow and fix plugins map used to execute plugins --- kontrol-service/engine/flow/dev_flow.go | 60 ++++++++++++++----------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index c564d87..9cd6026 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -138,7 +138,7 @@ func applyPatch( ) error { // TODO could create a custom type for it with a Add method and a Get Method, in order to centralize the addition if someone else want o use it later in another part in the code - pluginServices := map[string][]*resolved.Service{} + pluginServices := map[string][]string{} pluginServicesMap := map[string]*resolved.StatefulPlugin{} clusterGraph := topologyToGraph(topologyRef) @@ -198,9 +198,9 @@ func applyPatch( // TODO this is adding both kind of plugins stateful and external alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { - pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService) + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService.ServiceID) } else { - pluginServices[plugin.ServiceName] = []*resolved.Service{targetService} + pluginServices[plugin.ServiceName] = []string{targetService.ServiceID} } pluginServicesMap[plugin.ServiceName] = plugin @@ -259,9 +259,9 @@ func applyPatch( // TODO this is adding both kind of plugins stateful and external alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { - pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService) + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, modifiedService.ServiceID) } else { - pluginServices[plugin.ServiceName] = []*resolved.Service{targetService} + pluginServices[plugin.ServiceName] = []string{modifiedService.ServiceID} } pluginServicesMap[plugin.ServiceName] = plugin @@ -329,9 +329,9 @@ func applyPatch( // TODO this is adding both kind of plugins stateful and external alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { - pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService) + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, parentService.ServiceID) } else { - pluginServices[plugin.ServiceName] = []*resolved.Service{targetService} + pluginServices[plugin.ServiceName] = []string{parentService.ServiceID} } pluginServicesMap[plugin.ServiceName] = plugin @@ -359,18 +359,28 @@ func applyPatch( } // Execute plugins and update the services deployment specs with the plugin's modifications - for pluginServiceName, services := range pluginServices { + for pluginServiceName, serviceIds := range pluginServices { var servicesServiceSpecs []corev1.ServiceSpec var servicesWorkloadSpecs []*kardinal.WorkloadSpec + var servicesToUpdate []*resolved.Service plugin, ok := pluginServicesMap[pluginServiceName] if !ok { return stacktrace.NewError("expected to find plugin with service name '%s' in the plugins service map, this is a bug in Kardinal", pluginServiceName) } - for _, service := range services { + if len(serviceIds) == 0 { + return stacktrace.NewError("expected to find at least one service depending on plugin '%s' but none was found, please review your manifest file", plugin.ServiceName) + } + + for _, serviceId := range serviceIds { + service, err := topologyRef.GetService(serviceId) + if err != nil { + return stacktrace.Propagate(err, "an error occurred getting service '%s' from topology", serviceId) + } servicesServiceSpecs = append(servicesServiceSpecs, *service.ServiceSpec) servicesWorkloadSpecs = append(servicesWorkloadSpecs, service.WorkloadSpec) + servicesToUpdate = append(servicesToUpdate, service) } pluginId := plugins.GetPluginId3(plugin.ServiceName, flowID) @@ -381,19 +391,19 @@ func applyPatch( return stacktrace.Propagate(err, "error when creating plugin flow for plugin '%s'", pluginId) } - if len(services) != len(servicesModifiedWorkloadSpecs) { + if len(servicesToUpdate) != len(servicesModifiedWorkloadSpecs) { return fmt.Errorf("an error occurred executing plugin '%s', the number of workload specs returned by the plugin.CreateFlow function are not equal to the number of service depending on it, please check the plugin code or report a bug in the Kardinal repository", plugin.ServiceName) } // updating the service.workload_spec after the plugin execution - for serviceIndex, serviceToUpdate := range services { + for serviceIndex := range serviceIds { + service := servicesToUpdate[serviceIndex] modifiedWorkloadSpec := servicesModifiedWorkloadSpecs[serviceIndex] - serviceToUpdate.WorkloadSpec = modifiedWorkloadSpec - if err := topologyRef.MoveServiceToVersion(serviceToUpdate, flowID); err != nil { - return fmt.Errorf("an error occurred updating service '%s'", serviceToUpdate.ServiceID) + service.WorkloadSpec = modifiedWorkloadSpec + if err = topologyRef.MoveServiceToVersion(service, flowID); err != nil { + return fmt.Errorf("an error occurred updating service '%s'", service.ServiceID) } } - } return nil @@ -424,29 +434,27 @@ func applyPatch( //} func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTopology, flowId string) error { + pluginsToDeleteFromThisFlow := map[string]string{} + for _, service := range topology.Services { // don't need to delete flow for services in the topology that aren't a part of this flow if service.Version != flowId { continue } - err := DeleteDevFlow(pluginRunner, flowId, service) - if err != nil { - return stacktrace.Propagate(err, "An error occurred deleting flow '%v' for service '%v'", flowId, service.ServiceID) + for _, plugin := range service.StatefulPlugins { + pluginId := plugins.GetPluginId3(plugin.ServiceName, flowId) + pluginsToDeleteFromThisFlow[pluginId] = plugin.Name } } - return nil -} -func DeleteDevFlow(pluginRunner *plugins.PluginRunner, flowId string, service *resolved.Service) error { - for _, plugin := range service.StatefulPlugins { - logrus.Infof("Attempting to delete flow for plugin '%v' on flow '%v'", plugin.Name, flowId) - pluginId := plugins.GetPluginId3(plugin.ServiceName, flowId) - err := pluginRunner.DeleteFlow(plugin.Name, pluginId) + for pluginId, pluginName := range pluginsToDeleteFromThisFlow { + err := pluginRunner.DeleteFlow(pluginName, pluginId) if err != nil { logrus.Errorf("Error deleting flow: %v.", err) - return stacktrace.Propagate(err, "An error occurred while trying to call delete flow of plugin '%v' on service '%v' for flow '%v'", plugin.Name, service.ServiceID, flowId) + return stacktrace.Propagate(err, "An error occurred while trying to call delete flow of plugin '%v' for flow '%v'", pluginName, flowId) } } + return nil } From 032b75322f8b603558ea6e89ccdf6cc6a4eac80d Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Fri, 4 Oct 2024 16:48:51 -0300 Subject: [PATCH 12/22] cleaning the implementation --- kontrol-service/engine/flow/dev_flow.go | 59 ++----------------- .../plugins/git_plugin_provider.go | 1 - kontrol-service/plugins/plugins.go | 27 ++------- 3 files changed, 10 insertions(+), 77 deletions(-) diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index 9cd6026..a2bd975 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -190,12 +190,11 @@ func applyPatch( } externalServices = lo.Uniq(externalServices) - // TODO SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID + // SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID // handle external service plugins on this service logrus.Infof("Checking if this service has any external services...") for _, plugin := range targetService.StatefulPlugins { - // TODO this is adding both kind of plugins stateful and external alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService.ServiceID) @@ -214,11 +213,6 @@ func applyPatch( return fmt.Errorf("external service specified by plugin '%v' was not found in base topology", plugin.ServiceName) } - //err = applyExternalServicePlugin(pluginRunner, targetService, externalService, plugin, pluginIdx, flowID) - //if err != nil { - // return stacktrace.Propagate(err, "An error occurred creating external servie plugin for external service '%v' depended on by '%v'", externalService.ServiceID, targetService.ServiceID) - //} - err = topologyRef.MoveServiceToVersion(externalService, flowID) if err != nil { return err @@ -234,7 +228,7 @@ func applyPatch( return err } - // TODO SECTION 3 - handle stateful services + // SECTION 3 - handle stateful services for serviceIdx, service := range topologyRef.Services { if lo.Contains(statefulServices, service) { logrus.Debugf("applying stateful plugins on service: %s", service.ServiceID) @@ -256,7 +250,6 @@ func applyPatch( continue } - // TODO this is adding both kind of plugins stateful and external alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, modifiedService.ServiceID) @@ -264,14 +257,6 @@ func applyPatch( pluginServices[plugin.ServiceName] = []string{modifiedService.ServiceID} } pluginServicesMap[plugin.ServiceName] = plugin - - //logrus.Infof("Applying plugin %s for service %s with flow id %s", plugin.Name, modifiedService.ServiceID, flowID) - //pluginId := plugins.GetPluginId(flowID, modifiedService.ServiceID, pluginIdx) - //spec, _, err := pluginRunner.CreateFlow(plugin.Name, *modifiedService.ServiceSpec, *resultSpec, pluginId, plugin.Args) - //if err != nil { - // return fmt.Errorf("error creating flow for service %s: %v", modifiedService.ServiceID, err) - //} - //resultSpec = &spec } // Update service with final deployment spec @@ -298,7 +283,7 @@ func applyPatch( } } - // TODO SECTION 3 - handle external services that are not target service dependencies + // SECTION 4 - handle external services that are not target service dependencies // if the service is an external service of the target service, it was already handled above if lo.Contains(externalServices, service) && !lo.Contains(alreadyHandledExternalServices, service.ServiceID) { // assume there's only one parent service for now but eventually we'll likely need to account for multiple parents to external service @@ -326,7 +311,6 @@ func applyPatch( return stacktrace.NewError("parent service '%v' does not have a workload spec", targetService.ServiceID) } - // TODO this is adding both kind of plugins stateful and external alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] if ok { pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, parentService.ServiceID) @@ -334,13 +318,6 @@ func applyPatch( pluginServices[plugin.ServiceName] = []string{parentService.ServiceID} } pluginServicesMap[plugin.ServiceName] = plugin - - //if plugin.ServiceName == service.ServiceID { - //err := applyExternalServicePlugin(pluginRunner, parentService, service, plugin, pluginIdx, flowID) - //if err != nil { - //return stacktrace.Propagate(err, "error creating flow for external service '%s'", service.ServiceID) - //} - //} } // add a flow version of the external service to the plugin @@ -358,7 +335,7 @@ func applyPatch( } } - // Execute plugins and update the services deployment specs with the plugin's modifications + // SECTION 5 - Execute plugins and update the services deployment specs with the plugin's modifications for pluginServiceName, serviceIds := range pluginServices { var servicesServiceSpecs []corev1.ServiceSpec var servicesWorkloadSpecs []*kardinal.WorkloadSpec @@ -383,7 +360,7 @@ func applyPatch( servicesToUpdate = append(servicesToUpdate, service) } - pluginId := plugins.GetPluginId3(plugin.ServiceName, flowID) + pluginId := plugins.GetPluginId(plugin.ServiceName, flowID) logrus.Infof("Calling plugin '%v'...", pluginId) servicesModifiedWorkloadSpecs, _, err := pluginRunner.CreateFlow(plugin.Name, servicesServiceSpecs, servicesWorkloadSpecs, pluginId, plugin.Args) @@ -409,30 +386,6 @@ func applyPatch( return nil } -// TODO: have this handle stateful service plugins -//func applyExternalServicePlugin( -//pluginRunner *plugins.PluginRunner, -//dependentService *resolved.Service, -//externalService *resolved.Service, -//externalServicePlugin *resolved.StatefulPlugin, -//pluginIdx int, -//flowId string, -//) error { -//if externalServicePlugin.Type != "external" { -//return nil -//} - -//logrus.Infof("Calling external service '%v' plugin with parent service '%v'...", externalService.ServiceID, dependentService.ServiceID) -//pluginId := plugins.GetPluginId(flowId, dependentService.ServiceID, pluginIdx) -//spec, _, err := pluginRunner.CreateFlow(externalServicePlugin.Name, *dependentService.ServiceSpec, *dependentService.DeploymentSpec, pluginId, externalServicePlugin.Args) -//if err != nil { -//return stacktrace.Propagate(err, "error creating flow for external service '%s'", externalService.ServiceID) -//} - -//dependentService.DeploymentSpec = &spec -//return nil -//} - func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTopology, flowId string) error { pluginsToDeleteFromThisFlow := map[string]string{} @@ -442,7 +395,7 @@ func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTop continue } for _, plugin := range service.StatefulPlugins { - pluginId := plugins.GetPluginId3(plugin.ServiceName, flowId) + pluginId := plugins.GetPluginId(plugin.ServiceName, flowId) pluginsToDeleteFromThisFlow[pluginId] = plugin.Name } } diff --git a/kontrol-service/plugins/git_plugin_provider.go b/kontrol-service/plugins/git_plugin_provider.go index e5ebd48..3ddf402 100644 --- a/kontrol-service/plugins/git_plugin_provider.go +++ b/kontrol-service/plugins/git_plugin_provider.go @@ -28,7 +28,6 @@ func (gpp *GitPluginProviderImpl) PullGitHubPlugin(repoPath, repoUrl string) err return fmt.Errorf("git clone failed: %v\nOutput: %s", err, output) } } else { - return nil // TODO remove this line, it's only for testing purpose // If the repository already exists, pull the latest changes cmd := exec.Command("git", "-C", repoPath, "pull") if output, err := cmd.CombinedOutput(); err != nil { diff --git a/kontrol-service/plugins/plugins.go b/kontrol-service/plugins/plugins.go index 41892e6..2ac74cf 100644 --- a/kontrol-service/plugins/plugins.go +++ b/kontrol-service/plugins/plugins.go @@ -17,14 +17,8 @@ import ( ) const ( - // -- - pluginIdFmtStr = "%s-%s-%d" - // TODO use this last one and remove the previous one - // --,, - pluginIdFmtStr2 = "%s-%s-%s" - // TODO use this last one and remove the previous one // - - pluginIdFmtStr3 = "%s-%s" + pluginIdFmtStr = "%s-%s" ) type PluginRunner struct { @@ -63,7 +57,6 @@ func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpecs []corev1.Servi } serviceSpecsJSONStr := base64.StdEncoding.EncodeToString(serviceSpecsJSON) - podSpecsJSON, err := json.Marshal(podSpecs) if err != nil { return nil, "", fmt.Errorf("failed to marshal pod specs: %v", err) @@ -95,7 +88,7 @@ func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpecs []corev1.Servi if numWorkloadSpecs != numNewPodSpecs { return nil, "", fmt.Errorf("expected to receive '%d' modified pod specs from plugin '%s' execution result but '%d' were received instead, this is a bug in Kardinal", numWorkloadSpecs, flowUuid, numNewPodSpecs) } - for newPodSpecIdx, newPodSpec := range newPodSpecs{ + for newPodSpecIdx, newPodSpec := range newPodSpecs { workloadSpecs[newPodSpecIdx].UpdateTemplateSpec(newPodSpec) } } @@ -145,20 +138,8 @@ func (pr *PluginRunner) DeleteFlow(pluginUrl, flowUuid string) error { return nil } -// TODO remove this after the DeleteDevFlow refactor -func GetPluginId(flowId, serviceId string, pluginIdx int) string { - return fmt.Sprintf(pluginIdFmtStr, flowId, serviceId, pluginIdx) -} - -// TODO rename it to the original name -func GetPluginId2(pluginServiceName string, flowId string, serviceIds []string) string { - serviceIdsStr := strings.Join(serviceIds, ",") - return fmt.Sprintf(pluginIdFmtStr2, pluginServiceName, flowId, serviceIdsStr) -} - -// TODO rename it to the original name -func GetPluginId3(pluginServiceName string, flowId string) string { - return fmt.Sprintf(pluginIdFmtStr3, pluginServiceName, flowId) +func GetPluginId(pluginServiceName string, flowId string) string { + return fmt.Sprintf(pluginIdFmtStr, pluginServiceName, flowId) } func (pr *PluginRunner) getConfigForFlow(flowUuid string) (string, error) { From 120ec6a3da6a1d640ece284798cebb73166644df Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Fri, 4 Oct 2024 17:25:51 -0300 Subject: [PATCH 13/22] update plugin tests --- kontrol-service/plugins/plugins_test.go | 54 ++++++++++++++++++++----- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/kontrol-service/plugins/plugins_test.go b/kontrol-service/plugins/plugins_test.go index 5d44b86..5009760 100644 --- a/kontrol-service/plugins/plugins_test.go +++ b/kontrol-service/plugins/plugins_test.go @@ -20,11 +20,6 @@ const ( flowUuid = "test-flow-uuid" ) - - -// TODO Add a test for checking that different env vars values between deployment specs remains the same after the plugin execution -// TODO this is for testing determinism - var serviceSpecs = []corev1.ServiceSpec{} var deploymentSpecs = []appv1.DeploymentSpec{ @@ -62,12 +57,52 @@ var deploymentSpecs = []appv1.DeploymentSpec{ }, }, }, + { + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "helloworld-version2", + }, + }, + Replicas: int32Ptr(4), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "helloworld-version2", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "helloworld", + Image: "karthequian/helloworld:latest", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "REDIS", + Value: "ip_addr", + }, + { + Name: "FOO", // adding extra env var to compare differences + Value: "bar", + }, + }, + }, + }, + }, + }, + }, } -var workloadSpec = kardinal.NewDeploymentWorkloadSpec(deploymentSpecs[0]) +var workloadSpec1 = kardinal.NewDeploymentWorkloadSpec(deploymentSpecs[0]) +var workloadSpec2 = kardinal.NewDeploymentWorkloadSpec(deploymentSpecs[1]) var workloadSpecs = []*kardinal.WorkloadSpec{ - &workloadSpec, + &workloadSpec1, + &workloadSpec2, } func getPluginRunner(t *testing.T) (*PluginRunner, func() error) { @@ -98,9 +133,10 @@ func TestSimplePlugin(t *testing.T) { updatedDeploymentSpecs, configMap, err := runner.CreateFlow(simplePlugin, serviceSpecs, workloadSpecs, flowUuid, arguments) require.NoError(t, err) - for _, updatedDeploymentSpec := range updatedDeploymentSpecs { + for idx, updatedDeploymentSpec := range updatedDeploymentSpecs { // Check if the deployment spec was updated correctly require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.GetTemplateSpec().Containers[0].Name) + require.Equal(t, workloadSpecs[idx].GetTemplateSpec().Containers[0].Env, updatedDeploymentSpec.GetTemplateSpec().Containers[0].Env) } // Verify the config map @@ -126,7 +162,7 @@ func TestIdentityPlugin(t *testing.T) { require.NoError(t, err) // Check if the deployment spec was updated correctly - require.Equal(t, deploymentSpecs, updatedServiceSpec) + require.Equal(t, workloadSpecs, updatedServiceSpec) // Verify the config map var configMapData map[string]interface{} From 3da135111c2495f25ebd08b583b2249cbaf7cc9d Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Mon, 7 Oct 2024 10:57:09 -0300 Subject: [PATCH 14/22] obd-demo file edited to support new plugins API --- ci/obd-demo.yaml | 193 +++++++++++++++++++++++++++++++++++++---------- ci/template.yaml | 2 +- 2 files changed, 155 insertions(+), 40 deletions(-) diff --git a/ci/obd-demo.yaml b/ci/obd-demo.yaml index 0ecad13..6c9c7c2 100644 --- a/ci/obd-demo.yaml +++ b/ci/obd-demo.yaml @@ -80,6 +80,7 @@ spec: targetPort: 8090 protocol: TCP appProtocol: HTTP + --- apiVersion: apps/v1 kind: Deployment @@ -124,15 +125,12 @@ spec: - name: "Cookie" value: "shop_session-id=x-liveness-probe" env: - - name: ADDRESS - value: ":8080" - - name: FREECURRENCYAPIKEY - value: "fca_live_nFVVF8CvfxqJhzMHB4N2x1NH7ffVVPwZr9hg3iNl" + - name: JSDELIVRAPIKEY + value: "prod" - name: CARTSERVICEHOST value: cartservice - name: PRODUCTCATALOGSERVICEHOST value: productcatalogservice - --- apiVersion: v1 kind: Service @@ -143,12 +141,7 @@ metadata: version: v1 annotations: kardinal.dev.service/dependencies: "productcatalogservice:http,cartservice:http" - kardinal.dev.service/plugins: | - - name: https://github.com/kurtosis-tech/free-currency-api-plugin.git - type: external - servicename: free-currency-api - args: - api_key: fca_live_VKZlykCWEiFcpBHnw74pzd4vLi04q1h9JySbVHDF + kardinal.dev.service/plugins: "jsdelivr-api" spec: type: ClusterIP selector: @@ -210,33 +203,7 @@ metadata: version: v1 annotations: kardinal.dev.service/stateful: "true" - kardinal.dev.service/plugins: | - - name: github.com/kurtosis-tech/postgres-seed-plugin - args: - seed_script: | - -- create the table - CREATE TABLE IF NOT EXISTS public.items( - id bigserial PRIMARY KEY, - created_at TIMESTAMP WITH TIME ZONE, - updated_at TIMESTAMP WITH TIME ZONE, - deleted_at TIMESTAMP WITH TIME ZONE, - user_id TEXT, - product_id TEXT, - quantity INTEGER - ); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1); - - -- Set the sequence to the correct value after inserting records - SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items)); - db_name: "cart" - db_user: "postgresuser" - db_password: "postgrespass" - + kardinal.dev.service/plugins: "postgres-seed-plugin" spec: type: ClusterIP ports: @@ -312,6 +279,44 @@ spec: protocol: TCP appProtocol: HTTP +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: gateway + annotations: + kardinal.dev.service/gateway: "true" +spec: + gatewayClassName: istio + listeners: + - name: default + hostname: "*.app.localhost" + port: 8888 + protocol: HTTP + allowedRoutes: + namespaces: + from: All + +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: http + annotations: + kardinal.dev.service/route: "true" +spec: + parentRefs: + - name: gateway + hostnames: ["prod.app.localhost"] + rules: + - matches: + - path: + type: PathPrefix + value: / + backendRefs: + - name: frontend + port: 80 + --- apiVersion: networking.k8s.io/v1 kind: Ingress @@ -322,7 +327,7 @@ metadata: name: ingress spec: rules: - - host: baseline.app.localhost + - host: web.other.localhost http: paths: - path: / @@ -332,3 +337,113 @@ spec: name: frontend port: number: 80 + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: metrics-v1 + labels: + app: metrics + version: v1 +spec: + selector: + matchLabels: + app: metrics + version: v1 + template: + metadata: + labels: + app: metrics + version: v1 + annotations: + sidecar.istio.io/rewriteAppHTTPProbers: "true" + spec: + containers: + - name: server + image: kurtosistech/metrics:main + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8091 + readinessProbe: + initialDelaySeconds: 10 + httpGet: + path: "/_healthz" + port: 8091 + httpHeaders: + - name: "Cookie" + value: "shop_session-id=x-readiness-probe" + livenessProbe: + initialDelaySeconds: 10 + httpGet: + path: "/_healthz" + port: 8091 + httpHeaders: + - name: "Cookie" + value: "shop_session-id=x-liveness-probe" +--- +apiVersion: v1 +kind: Service +metadata: + name: metrics + labels: + app: metrics + version: v1 +spec: + type: ClusterIP + selector: + app: metrics + ports: + - name: http + port: 8091 + protocol: TCP + appProtocol: HTTP + targetPort: 8091 + +--- +apiVersion: v1 +kind: Service +metadata: + name: jsdelivr-api + annotations: + kardinal.dev.service/plugin-definition: | + - name: github.com/leoporoli/jsdelivr-api-plugin + type: external + servicename: jsdelivr-api + args: + api_key: "dev" + +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres-seed-plugin + annotations: + kardinal.dev.service/plugin-definition: | + - name: github.com/leoporoli/postgres-seed-plugin + type: stateful + servicename: postgres-seed-plugin + args: + seed_script: | + -- create the table + CREATE TABLE IF NOT EXISTS public.items( + id bigserial PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE, + deleted_at TIMESTAMP WITH TIME ZONE, + user_id TEXT, + product_id TEXT, + quantity INTEGER + ); + + INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) + VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1); + + INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) + VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1); + + -- Set the sequence to the correct value after inserting records + SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items)); + db_name: "cart" + db_user: "postgresuser" + db_password: "postgrespass" diff --git a/ci/template.yaml b/ci/template.yaml index 24bc9fa..5e9f7fe 100644 --- a/ci/template.yaml +++ b/ci/template.yaml @@ -5,7 +5,7 @@ metadata: annotations: kardinal.dev.service/shared: "true" kardinal.dev.service/plugins: | - - name: github.com/kurtosis-tech/postgres-seed-plugin + - name: github.com/leoporoli/postgres-seed-plugin args: seed_script: | -- create the table From 5834377170e4045b8f57d0e2882416636e3caaf7 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Mon, 7 Oct 2024 11:08:58 -0300 Subject: [PATCH 15/22] edited obd-demo --- ci/obd-demo.yaml | 102 +---------------------------------------------- 1 file changed, 1 insertion(+), 101 deletions(-) diff --git a/ci/obd-demo.yaml b/ci/obd-demo.yaml index 6c9c7c2..417e5f3 100644 --- a/ci/obd-demo.yaml +++ b/ci/obd-demo.yaml @@ -279,44 +279,6 @@ spec: protocol: TCP appProtocol: HTTP ---- -apiVersion: gateway.networking.k8s.io/v1 -kind: Gateway -metadata: - name: gateway - annotations: - kardinal.dev.service/gateway: "true" -spec: - gatewayClassName: istio - listeners: - - name: default - hostname: "*.app.localhost" - port: 8888 - protocol: HTTP - allowedRoutes: - namespaces: - from: All - ---- -apiVersion: gateway.networking.k8s.io/v1 -kind: HTTPRoute -metadata: - name: http - annotations: - kardinal.dev.service/route: "true" -spec: - parentRefs: - - name: gateway - hostnames: ["prod.app.localhost"] - rules: - - matches: - - path: - type: PathPrefix - value: / - backendRefs: - - name: frontend - port: 80 - --- apiVersion: networking.k8s.io/v1 kind: Ingress @@ -327,7 +289,7 @@ metadata: name: ingress spec: rules: - - host: web.other.localhost + - host: baseline.app.localhost http: paths: - path: / @@ -338,68 +300,6 @@ spec: port: number: 80 ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: metrics-v1 - labels: - app: metrics - version: v1 -spec: - selector: - matchLabels: - app: metrics - version: v1 - template: - metadata: - labels: - app: metrics - version: v1 - annotations: - sidecar.istio.io/rewriteAppHTTPProbers: "true" - spec: - containers: - - name: server - image: kurtosistech/metrics:main - imagePullPolicy: IfNotPresent - ports: - - containerPort: 8091 - readinessProbe: - initialDelaySeconds: 10 - httpGet: - path: "/_healthz" - port: 8091 - httpHeaders: - - name: "Cookie" - value: "shop_session-id=x-readiness-probe" - livenessProbe: - initialDelaySeconds: 10 - httpGet: - path: "/_healthz" - port: 8091 - httpHeaders: - - name: "Cookie" - value: "shop_session-id=x-liveness-probe" ---- -apiVersion: v1 -kind: Service -metadata: - name: metrics - labels: - app: metrics - version: v1 -spec: - type: ClusterIP - selector: - app: metrics - ports: - - name: http - port: 8091 - protocol: TCP - appProtocol: HTTP - targetPort: 8091 - --- apiVersion: v1 kind: Service From a99ecd6b03ff8be10e9fd8f05d4427c9a40ba3a6 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Mon, 7 Oct 2024 11:15:02 -0300 Subject: [PATCH 16/22] ci-e2e-tests edited --- .github/workflows/ci-e2e-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-e2e-tests.yml b/.github/workflows/ci-e2e-tests.yml index bd1fc1b..b704345 100644 --- a/.github/workflows/ci-e2e-tests.yml +++ b/.github/workflows/ci-e2e-tests.yml @@ -74,7 +74,7 @@ jobs: run: | tenant_id=${{ steps.tenant.outputs.id }} nodes=$(curl http://localhost:8080/tenant/${tenant_id}/topology | jq -r '.nodes[].id' | tr " " "\n" | sort -g | tr "\n" " " | xargs) - if [ "${nodes}" != "cartservice free-currency-api frontend ingress postgres productcatalogservice" ]; then exit 1; fi + if [ "${nodes}" != "cartservice frontend ingress jsdelivr-api postgres productcatalogservice" ]; then exit 1; fi - name: Create, validate and delete flow run: | From ace7f77a5fd7bb17d1bf2f2104820f6a9adbfa0d Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Mon, 7 Oct 2024 11:51:05 -0300 Subject: [PATCH 17/22] teplate support new plugins API --- ci/template.yaml | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/ci/template.yaml b/ci/template.yaml index 5e9f7fe..5546b23 100644 --- a/ci/template.yaml +++ b/ci/template.yaml @@ -4,32 +4,4 @@ metadata: name: postgres annotations: kardinal.dev.service/shared: "true" - kardinal.dev.service/plugins: | - - name: github.com/leoporoli/postgres-seed-plugin - args: - seed_script: | - -- create the table - CREATE TABLE IF NOT EXISTS public.items( - id bigserial PRIMARY KEY, - created_at TIMESTAMP WITH TIME ZONE, - updated_at TIMESTAMP WITH TIME ZONE, - deleted_at TIMESTAMP WITH TIME ZONE, - user_id TEXT, - product_id TEXT, - quantity INTEGER - ); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (3, '2024-08-02 13:03:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', ${last_insert_quantity:-1}); - - -- Set the sequence to the correct value after inserting records - SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items)); - db_name: "cart" - db_user: "postgresuser" - db_password: "postgrespass" \ No newline at end of file + kardinal.dev.service/plugins: "postgres-seed-plugin" From 29843bd8a6323f00a1a6c56e1fdab620d8b5be49 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Mon, 7 Oct 2024 15:43:07 -0300 Subject: [PATCH 18/22] fix typo --- kontrol-service/engine/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index 3bad3f5..39f1538 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -248,7 +248,7 @@ func processServiceConfigs( // the servicePlugins list contains both stateful and external plugins and, externalServices is a list of Kardinal services that are also linked with a plugin inside the availablePlugins list servicePlugins, externalServices, newExternalServicesDependencies, err := newServicePluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService, availablePlugins) if err != nil { - return nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful availablePlugins and external services from service config '%s'", service.Name) + return nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful plugins and external services from service config '%s'", service.Name) } clusterTopologyService.StatefulPlugins = servicePlugins clusterTopologyServices = append(clusterTopologyServices, externalServices...) From 84cd1e956c8882b12824a46f69d0e8884e75590b Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 9 Oct 2024 10:24:06 -0300 Subject: [PATCH 19/22] fix TestServiceConfigsToClusterTopology test --- kontrol-service/engine/docker.go | 2 +- kontrol-service/engine/docker_test.go | 4 ++- kontrol-service/test/data.go | 38 ++++++++++++++++++++------- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index 39f1538..a0e2fa3 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -407,7 +407,7 @@ func newClusterTopologyServiceFromConfigs( deploymentConfig.Deployment.GetObjectMeta().GetName(), statefulSetConfig.StatefulSet.GetObjectMeta().GetName(), } - logrus.Error("Service %s is associated with more than one workload: %v", serviceName, workloads) + logrus.Errorf("Service %s is associated with more than one workload: %v", serviceName, workloads) } if deploymentConfig != nil { workload := kardinal.NewDeploymentWorkloadSpec(deploymentConfig.Deployment.Spec) diff --git a/kontrol-service/engine/docker_test.go b/kontrol-service/engine/docker_test.go index 3d3602d..70579bb 100644 --- a/kontrol-service/engine/docker_test.go +++ b/kontrol-service/engine/docker_test.go @@ -23,6 +23,8 @@ func TestServiceConfigsToClusterTopology(t *testing.T) { t.Errorf("Error generating cluster: %s", err) } + require.NoError(t, err) + require.NotEmpty(t, cluster) redisProdService := cluster.Services[0] require.Equal(t, redisProdService.ServiceID, "redis-prod") require.Equal(t, redisProdService.IsExternal, false) @@ -36,7 +38,7 @@ func TestServiceConfigsToClusterTopology(t *testing.T) { require.Equal(t, votingAppUIService.ServiceID, "voting-app-ui") require.Equal(t, votingAppUIService.IsExternal, false) require.Equal(t, votingAppUIService.IsStateful, false) - require.Equal(t, *votingAppUIService.ServiceSpec, testServiceConfigs[1].Service.Spec) + require.Equal(t, *votingAppUIService.ServiceSpec, testServiceConfigs[2].Service.Spec) require.Equal(t, *votingAppUIService.WorkloadSpec.GetTemplateSpec(), testDeploymentConfigs[1].Deployment.Spec.Template.Spec) dependency := cluster.ServiceDependencies[0] diff --git a/kontrol-service/test/data.go b/kontrol-service/test/data.go index 9478f00..ad78a4c 100644 --- a/kontrol-service/test/data.go +++ b/kontrol-service/test/data.go @@ -40,7 +40,6 @@ func GetIngressConfigs() []apitypes.IngressConfig { } func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) { - serviceConfigs := []apitypes.ServiceConfig{} deploymentConfigs := []apitypes.DeploymentConfig{} // Redis prod service @@ -50,9 +49,11 @@ func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) containerImage := "bitnami/redis:6.0.8" containerName := "redis-prod" version := "prod" + pluginServiceName := "redis-prod-plugin" port := int32(6379) portStr := fmt.Sprintf("%d", port) - serviceConfigs = append(serviceConfigs, apitypes.ServiceConfig{ + + redisProdServiceConfig := apitypes.ServiceConfig{ Service: v1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -66,12 +67,7 @@ func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) }, Annotations: map[string]string{ "kardinal.dev.service/stateful": "true", - "kardinal.dev.service/plugins": ` -- name: github.com/kardinaldev/redis-db-sidecar-plugin:36ed9a4 - type: stateful - args: - mode: "pass-through" -`, + "kardinal.dev.service/plugins": pluginServiceName, }, }, Spec: v1.ServiceSpec{ @@ -88,7 +84,31 @@ func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) }, }, }, - }) + } + + pluginServiceConfig := apitypes.ServiceConfig{ + Service: v1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pluginServiceName, + Annotations: map[string]string{ + "kardinal.dev.service/stateful": "true", + "kardinal.dev.service/plugin-definition": ` +- name: github.com/kardinaldev/redis-db-sidecar-plugin:36ed9a4 + type: stateful + servicename: redis-prod-plugin + args: + mode: "pass-through" +`, + }, + }, + }, + } + + serviceConfigs := []apitypes.ServiceConfig{redisProdServiceConfig, pluginServiceConfig} deploymentConfigs = append(deploymentConfigs, apitypes.DeploymentConfig{ Deployment: apps.Deployment{ From 3317561d72d370d8ea35c8edcec222d2f2a5ddaf Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 9 Oct 2024 10:32:30 -0300 Subject: [PATCH 20/22] TODO removed --- kontrol-service/engine/flow/dev_flow.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index a2bd975..4b2310f 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -220,7 +220,7 @@ func applyPatch( } } - // TODO SECTION 2 - Target service updates with new modifications + // SECTION 2 - Target service updates with new modifications modifiedTargetService := DeepCopyService(targetService) modifiedTargetService.WorkloadSpec = servicePatch.WorkloadSpec err = topologyRef.MoveServiceToVersion(modifiedTargetService, flowID) From 1785b15ec40509d4ba912abe37d25f6b751b3ac9 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 9 Oct 2024 10:38:07 -0300 Subject: [PATCH 21/22] fix TestExternalServicesFlowOnDependentService --- kontrol-service/engine/flow/dev_flow_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kontrol-service/engine/flow/dev_flow_test.go b/kontrol-service/engine/flow/dev_flow_test.go index 1f647ff..719befa 100644 --- a/kontrol-service/engine/flow/dev_flow_test.go +++ b/kontrol-service/engine/flow/dev_flow_test.go @@ -17,7 +17,7 @@ import ( kardinal "kardinal.kontrol-service/types/kardinal" ) -const dummyPluginName = "https://github.com/h4ck3rk3y/identity-plugin.git" +const dummyPluginName = "https://github.com/fake-org/kardinal-identity-plugin-example.git" func clusterTopologyExample() resolved.ClusterTopology { dummySpec := &kardinal.WorkloadSpec{ From 19f64e6708f38483345f99e3422cc4297a921e06 Mon Sep 17 00:00:00 2001 From: Leandro Poroli Date: Wed, 9 Oct 2024 10:39:19 -0300 Subject: [PATCH 22/22] fix TestHashFunc test --- kontrol-service/types/cluster_topology/resolved/core_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kontrol-service/types/cluster_topology/resolved/core_test.go b/kontrol-service/types/cluster_topology/resolved/core_test.go index 9b5e792..37d4f55 100644 --- a/kontrol-service/types/cluster_topology/resolved/core_test.go +++ b/kontrol-service/types/cluster_topology/resolved/core_test.go @@ -9,7 +9,7 @@ import ( kardinal "kardinal.kontrol-service/types/kardinal" ) -const dummyPluginName = "https://github.com/h4ck3rk3y/identity-plugin.git" +const dummyPluginName = "https://github.com/fake-org/kardinal-identity-plugin-example.git" var ( httpProtocol = "HTTP"