diff --git a/.github/workflows/ci-e2e-tests.yml b/.github/workflows/ci-e2e-tests.yml index bd1fc1b..b704345 100644 --- a/.github/workflows/ci-e2e-tests.yml +++ b/.github/workflows/ci-e2e-tests.yml @@ -74,7 +74,7 @@ jobs: run: | tenant_id=${{ steps.tenant.outputs.id }} nodes=$(curl http://localhost:8080/tenant/${tenant_id}/topology | jq -r '.nodes[].id' | tr " " "\n" | sort -g | tr "\n" " " | xargs) - if [ "${nodes}" != "cartservice free-currency-api frontend ingress postgres productcatalogservice" ]; then exit 1; fi + if [ "${nodes}" != "cartservice frontend ingress jsdelivr-api postgres productcatalogservice" ]; then exit 1; fi - name: Create, validate and delete flow run: | diff --git a/ci/obd-demo.yaml b/ci/obd-demo.yaml index 0ecad13..417e5f3 100644 --- a/ci/obd-demo.yaml +++ b/ci/obd-demo.yaml @@ -80,6 +80,7 @@ spec: targetPort: 8090 protocol: TCP appProtocol: HTTP + --- apiVersion: apps/v1 kind: Deployment @@ -124,15 +125,12 @@ spec: - name: "Cookie" value: "shop_session-id=x-liveness-probe" env: - - name: ADDRESS - value: ":8080" - - name: FREECURRENCYAPIKEY - value: "fca_live_nFVVF8CvfxqJhzMHB4N2x1NH7ffVVPwZr9hg3iNl" + - name: JSDELIVRAPIKEY + value: "prod" - name: CARTSERVICEHOST value: cartservice - name: PRODUCTCATALOGSERVICEHOST value: productcatalogservice - --- apiVersion: v1 kind: Service @@ -143,12 +141,7 @@ metadata: version: v1 annotations: kardinal.dev.service/dependencies: "productcatalogservice:http,cartservice:http" - kardinal.dev.service/plugins: | - - name: https://github.com/kurtosis-tech/free-currency-api-plugin.git - type: external - servicename: free-currency-api - args: - api_key: fca_live_VKZlykCWEiFcpBHnw74pzd4vLi04q1h9JySbVHDF + kardinal.dev.service/plugins: "jsdelivr-api" spec: type: ClusterIP selector: @@ -210,33 +203,7 @@ metadata: version: v1 annotations: kardinal.dev.service/stateful: "true" - kardinal.dev.service/plugins: | - - name: github.com/kurtosis-tech/postgres-seed-plugin - args: - seed_script: | - -- create the table - CREATE TABLE IF NOT EXISTS public.items( - id bigserial PRIMARY KEY, - created_at TIMESTAMP WITH TIME ZONE, - updated_at TIMESTAMP WITH TIME ZONE, - deleted_at TIMESTAMP WITH TIME ZONE, - user_id TEXT, - product_id TEXT, - quantity INTEGER - ); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1); - - -- Set the sequence to the correct value after inserting records - SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items)); - db_name: "cart" - db_user: "postgresuser" - db_password: "postgrespass" - + kardinal.dev.service/plugins: "postgres-seed-plugin" spec: type: ClusterIP ports: @@ -332,3 +299,51 @@ spec: name: frontend port: number: 80 + +--- +apiVersion: v1 +kind: Service +metadata: + name: jsdelivr-api + annotations: + kardinal.dev.service/plugin-definition: | + - name: github.com/leoporoli/jsdelivr-api-plugin + type: external + servicename: jsdelivr-api + args: + api_key: "dev" + +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres-seed-plugin + annotations: + kardinal.dev.service/plugin-definition: | + - name: github.com/leoporoli/postgres-seed-plugin + type: stateful + servicename: postgres-seed-plugin + args: + seed_script: | + -- create the table + CREATE TABLE IF NOT EXISTS public.items( + id bigserial PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE, + deleted_at TIMESTAMP WITH TIME ZONE, + user_id TEXT, + product_id TEXT, + quantity INTEGER + ); + + INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) + VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1); + + INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) + VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1); + + -- Set the sequence to the correct value after inserting records + SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items)); + db_name: "cart" + db_user: "postgresuser" + db_password: "postgrespass" diff --git a/ci/template.yaml b/ci/template.yaml index 24bc9fa..5546b23 100644 --- a/ci/template.yaml +++ b/ci/template.yaml @@ -4,32 +4,4 @@ metadata: name: postgres annotations: kardinal.dev.service/shared: "true" - kardinal.dev.service/plugins: | - - name: github.com/kurtosis-tech/postgres-seed-plugin - args: - seed_script: | - -- create the table - CREATE TABLE IF NOT EXISTS public.items( - id bigserial PRIMARY KEY, - created_at TIMESTAMP WITH TIME ZONE, - updated_at TIMESTAMP WITH TIME ZONE, - deleted_at TIMESTAMP WITH TIME ZONE, - user_id TEXT, - product_id TEXT, - quantity INTEGER - ); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1); - - INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity) - VALUES (3, '2024-08-02 13:03:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', ${last_insert_quantity:-1}); - - -- Set the sequence to the correct value after inserting records - SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items)); - db_name: "cart" - db_user: "postgresuser" - db_password: "postgrespass" \ No newline at end of file + kardinal.dev.service/plugins: "postgres-seed-plugin" diff --git a/kontrol-service/engine/docker.go b/kontrol-service/engine/docker.go index 98efd64..a0e2fa3 100644 --- a/kontrol-service/engine/docker.go +++ b/kontrol-service/engine/docker.go @@ -201,18 +201,37 @@ func processServiceConfigs( statefulSetConfigs []apitypes.StatefulSetConfig, version string, ) ([]*resolved.Service, []resolved.ServiceDependency, error) { + var err error clusterTopologyServices := []*resolved.Service{} clusterTopologyServiceDependencies := []resolved.ServiceDependency{} externalServicesDependencies := []resolved.ServiceDependency{} + availablePlugins := map[string]*resolved.StatefulPlugin{} type serviceWithDependenciesAnnotation struct { service *resolved.Service dependenciesAnnotation string } - serviceWithDependencies := []*serviceWithDependenciesAnnotation{} + servicesWithDependencies := []*serviceWithDependenciesAnnotation{} + // First, iterate the services to get all the available plugins + for _, serviceConfig := range serviceConfigs { + // availablePlugins list contains both stateful and external plugins and, externalServices is a list of Kardinal services that are also linked with a plugin inside the availablePlugins list + availablePlugins, err = addAvailablePluginsFromServiceConfig(serviceConfig, availablePlugins) + if err != nil { + return nil, nil, stacktrace.Propagate(err, "An error occurred while parsing plugin '%s'", serviceConfig.Service.GetName()) + } + } + + // Second, iterate the services to create the clusterTopology service with partial data (no dependencies set here) for _, serviceConfig := range serviceConfigs { service := serviceConfig.Service + + // A plugin services (k8s services with the kardinal.dev.service/plugin-definition) are used to define a plugin Type and not a k8s service + isPlugin, _ := isPluginService(service) + if isPlugin { + continue + } + serviceAnnotations := service.GetObjectMeta().GetAnnotations() // 1- Service @@ -225,12 +244,13 @@ func processServiceConfigs( return nil, nil, stacktrace.Propagate(error, "An error occurred creating new cluster topology service from service config '%s'", service.Name) } - // 2- Service plugins - serviceStatefulPlugins, externalServices, newExternalServicesDependencies, err := newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService) + // 2- Plugins + // the servicePlugins list contains both stateful and external plugins and, externalServices is a list of Kardinal services that are also linked with a plugin inside the availablePlugins list + servicePlugins, externalServices, newExternalServicesDependencies, err := newServicePluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService, availablePlugins) if err != nil { return nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful plugins and external services from service config '%s'", service.Name) } - clusterTopologyService.StatefulPlugins = serviceStatefulPlugins + clusterTopologyService.StatefulPlugins = servicePlugins clusterTopologyServices = append(clusterTopologyServices, externalServices...) externalServicesDependencies = append(externalServicesDependencies, newExternalServicesDependencies...) @@ -238,16 +258,16 @@ func processServiceConfigs( dependencies, ok := serviceAnnotations["kardinal.dev.service/dependencies"] if ok { newServiceWithDependenciesAnnotation := &serviceWithDependenciesAnnotation{&clusterTopologyService, dependencies} - serviceWithDependencies = append(serviceWithDependencies, newServiceWithDependenciesAnnotation) + servicesWithDependencies = append(servicesWithDependencies, newServiceWithDependenciesAnnotation) } clusterTopologyServices = append(clusterTopologyServices, &clusterTopologyService) } - // Set the service dependencies in the clusterTopologyService - // first iterate on the service with dependencies list - for _, svcWithDependenciesAnnotation := range serviceWithDependencies { + // Third, set the service dependencies in the clusterTopologyService + // a) iterate on the service with dependencies list + for _, serviceWithDependencies := range servicesWithDependencies { - serviceAndPorts := strings.Split(svcWithDependenciesAnnotation.dependenciesAnnotation, ",") + serviceAndPorts := strings.Split(serviceWithDependencies.dependenciesAnnotation, ",") for _, serviceAndPort := range serviceAndPorts { serviceAndPortParts := strings.Split(serviceAndPort, ":") depService, depServicePort, err := getServiceAndPortFromClusterTopologyServices(serviceAndPortParts[0], serviceAndPortParts[1], clusterTopologyServices) @@ -256,7 +276,7 @@ func processServiceConfigs( } serviceDependency := resolved.ServiceDependency{ - Service: svcWithDependenciesAnnotation.service, + Service: serviceWithDependencies.service, DependsOnService: depService, DependencyPort: depServicePort, } @@ -264,33 +284,72 @@ func processServiceConfigs( clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependency) } } - // then add the external services dependencies + // b) add the external services dependencies clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, externalServicesDependencies...) return clusterTopologyServices, clusterTopologyServiceDependencies, nil } -func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string, clusterTopologyService *resolved.Service) ([]*resolved.StatefulPlugin, []*resolved.Service, []resolved.ServiceDependency, error) { - var serviceStatefulPlugins []*resolved.StatefulPlugin +func addAvailablePluginsFromServiceConfig(serviceConfig apitypes.ServiceConfig, availablePlugins map[string]*resolved.StatefulPlugin) (map[string]*resolved.StatefulPlugin, error) { + service := serviceConfig.Service + isPlugin, pluginAnnotation := isPluginService(service) + if isPlugin { + var statefulPlugins []resolved.StatefulPlugin + err := yaml.Unmarshal([]byte(pluginAnnotation), &statefulPlugins) + if err != nil { + return nil, stacktrace.Propagate(err, "an error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) + } + + for index := range statefulPlugins { + plugin := statefulPlugins[index] + _, found := availablePlugins[plugin.ServiceName] + if found { + return nil, stacktrace.NewError("a plugin with service name '%s' already exists, the `plugin.servicename` value has to be unique", plugin.ServiceName) + } + availablePlugins[plugin.ServiceName] = &plugin + } + } + + return availablePlugins, nil +} + +func isPluginService(service corev1.Service) (bool, string) { + serviceAnnotations := service.GetObjectMeta().GetAnnotations() + + pluginAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugin-definition"] + + return ok, pluginAnnotation +} + +func newServicePluginsAndExternalServicesFromServiceConfig( + serviceConfig apitypes.ServiceConfig, + version string, + clusterTopologyService *resolved.Service, + availablePlugins map[string]*resolved.StatefulPlugin, +) ( + []*resolved.StatefulPlugin, + []*resolved.Service, + []resolved.ServiceDependency, + error, +) { + servicePlugins := []*resolved.StatefulPlugin{} externalServices := []*resolved.Service{} externalServiceDependencies := []resolved.ServiceDependency{} service := serviceConfig.Service serviceAnnotations := service.GetObjectMeta().GetAnnotations() - sPluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"] + pluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"] if ok { - var statefulPlugins []resolved.StatefulPlugin - err := yaml.Unmarshal([]byte(sPluginsAnnotation), &statefulPlugins) - if err != nil { - return nil, nil, nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName()) - } - serviceStatefulPlugins = make([]*resolved.StatefulPlugin, len(statefulPlugins)) - - for index := range statefulPlugins { + pluginsServiceName := strings.Split(pluginsAnnotation, ",") + for _, pluginSvcName := range pluginsServiceName { + plugin, ok := availablePlugins[pluginSvcName] + if !ok { + return nil, nil, nil, stacktrace.NewError("expected to find plugin with service name %s but it is not available, make sure to add the resource for it in the manifest file", pluginSvcName) + } + servicePlugins = append(servicePlugins, plugin) // TODO: consider giving external service plugins their own type, instead of using StatefulPlugins // if this is an external service plugin, represent that service as a service in the cluster topology - plugin := statefulPlugins[index] if plugin.Type == "external" { logrus.Infof("Adding external service to topology..") serviceName := plugin.ServiceName @@ -318,11 +377,10 @@ func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apityp } externalServiceDependencies = append(externalServiceDependencies, externalServiceDependency) } - serviceStatefulPlugins[index] = &plugin } } - return serviceStatefulPlugins, externalServices, externalServiceDependencies, nil + return servicePlugins, externalServices, externalServiceDependencies, nil } func newClusterTopologyServiceFromConfigs( @@ -349,7 +407,7 @@ func newClusterTopologyServiceFromConfigs( deploymentConfig.Deployment.GetObjectMeta().GetName(), statefulSetConfig.StatefulSet.GetObjectMeta().GetName(), } - logrus.Error("Service %s is associated with more than one workload: %v", serviceName, workloads) + logrus.Errorf("Service %s is associated with more than one workload: %v", serviceName, workloads) } if deploymentConfig != nil { workload := kardinal.NewDeploymentWorkloadSpec(deploymentConfig.Deployment.Spec) diff --git a/kontrol-service/engine/docker_test.go b/kontrol-service/engine/docker_test.go index 3d3602d..70579bb 100644 --- a/kontrol-service/engine/docker_test.go +++ b/kontrol-service/engine/docker_test.go @@ -23,6 +23,8 @@ func TestServiceConfigsToClusterTopology(t *testing.T) { t.Errorf("Error generating cluster: %s", err) } + require.NoError(t, err) + require.NotEmpty(t, cluster) redisProdService := cluster.Services[0] require.Equal(t, redisProdService.ServiceID, "redis-prod") require.Equal(t, redisProdService.IsExternal, false) @@ -36,7 +38,7 @@ func TestServiceConfigsToClusterTopology(t *testing.T) { require.Equal(t, votingAppUIService.ServiceID, "voting-app-ui") require.Equal(t, votingAppUIService.IsExternal, false) require.Equal(t, votingAppUIService.IsStateful, false) - require.Equal(t, *votingAppUIService.ServiceSpec, testServiceConfigs[1].Service.Spec) + require.Equal(t, *votingAppUIService.ServiceSpec, testServiceConfigs[2].Service.Spec) require.Equal(t, *votingAppUIService.WorkloadSpec.GetTemplateSpec(), testDeploymentConfigs[1].Deployment.Spec.Template.Spec) dependency := cluster.ServiceDependencies[0] diff --git a/kontrol-service/engine/flow/dev_flow.go b/kontrol-service/engine/flow/dev_flow.go index a61c16b..4b2310f 100644 --- a/kontrol-service/engine/flow/dev_flow.go +++ b/kontrol-service/engine/flow/dev_flow.go @@ -2,6 +2,8 @@ package flow import ( "fmt" + corev1 "k8s.io/api/core/v1" + kardinal "kardinal.kontrol-service/types/kardinal" "kardinal.kontrol-service/constants" @@ -14,7 +16,6 @@ import ( "kardinal.kontrol-service/plugins" "kardinal.kontrol-service/types/cluster_topology/resolved" "kardinal.kontrol-service/types/flow_spec" - kardinal "kardinal.kontrol-service/types/kardinal" ) // CreateDevFlow creates a dev flow from the given topologies @@ -47,18 +48,8 @@ func CreateDevFlow( topologyRef := &topology - clusterGraph := topologyToGraph(topologyRef) - for _, servicePatch := range flowPatch.ServicePatches { - serviceID := servicePatch.Service - logrus.Infof("calculating new flow for service %s", serviceID) - targetService, err := topologyRef.GetService(serviceID) - if err != nil { - return nil, err - } - _, err = applyPatch(pluginRunner, topologyRef, clusterGraph, flowID, targetService, servicePatch.WorkloadSpec) - if err != nil { - return nil, err - } + if err := applyPatch(pluginRunner, topologyRef, flowID, flowPatch.ServicePatches); err != nil { + return nil, err } // the baseline topology flow ID and flow version are equal to the namespace these three should use same value @@ -141,228 +132,282 @@ func markParentsAsShared(topology *resolved.ClusterTopology, service *resolved.S func applyPatch( pluginRunner *plugins.PluginRunner, - topology *resolved.ClusterTopology, - clusterGraph graph.Graph[resolved.ServiceHash, *resolved.Service], + topologyRef *resolved.ClusterTopology, flowID string, - targetService *resolved.Service, - workloadSpec *kardinal.WorkloadSpec, -) (*resolved.ClusterTopology, error) { - // Find downstream stateful services - statefulPaths := findAllDownstreamStatefulPaths(targetService, clusterGraph, topology) - statefulServices := make([]*resolved.Service, 0) - for _, path := range statefulPaths { - statefulServiceHash, err := lo.Last(path) - if statefulServiceHash == "" || err != nil { - logrus.Infof("Error finding last stateful service hash in path %v: %v", path, err) - } - statefulService, err := clusterGraph.Vertex(statefulServiceHash) + servicePatches []flow_spec.ServicePatch, +) error { + + // TODO could create a custom type for it with a Add method and a Get Method, in order to centralize the addition if someone else want o use it later in another part in the code + pluginServices := map[string][]string{} + pluginServicesMap := map[string]*resolved.StatefulPlugin{} + + clusterGraph := topologyToGraph(topologyRef) + + for _, servicePatch := range servicePatches { + serviceID := servicePatch.Service + logrus.Infof("calculating new flow for service %s", serviceID) + targetService, err := topologyRef.GetService(serviceID) if err != nil { - return nil, fmt.Errorf("an error occurred getting stateful service vertex from graph: %s", err) - } - statefulServices = append(statefulServices, statefulService) - } - statefulServices = lo.Uniq(statefulServices) - - externalPaths := findAllDownstreamExternalPaths(targetService, clusterGraph, topology) - externalServices := make([]*resolved.Service, 0) - alreadyHandledExternalServices := make([]string, 0) - for _, path := range externalPaths { - externalServiceHash, err := lo.Last(path) - if externalServiceHash == "" || err != nil { - logrus.Infof("Error finding last external service hash in path %v: %v", path, err) + return err } - externalService, err := clusterGraph.Vertex(externalServiceHash) - if err != nil { - return nil, fmt.Errorf("an error occurred getting external service vertex from graph: %s", err) + if targetService.ServiceSpec == nil { + return stacktrace.NewError("service '%v' does not have a service spec", targetService.ServiceID) } - externalServices = append(externalServices, externalService) - } - externalServices = lo.Uniq(externalServices) - // handle external service plugins on this service - logrus.Infof("Checking if this service has any external services...") - for pluginIdx, plugin := range targetService.StatefulPlugins { - if plugin.Type == "external" { - logrus.Infof("service %s contains an external dependency plugin: %v", targetService.ServiceID, plugin.Name) + if targetService.WorkloadSpec == nil { + return stacktrace.NewError("service '%v' does not have a workload spec", targetService.ServiceID) + } - // find the existing external service and update it in the topology to get a new version - externalService, err := topology.GetService(plugin.ServiceName) - if err != nil { - return nil, fmt.Errorf("external service specified by plugin '%v' was not found in base topology.", plugin.ServiceName) + // Find downstream stateful services + statefulPaths := findAllDownstreamStatefulPaths(targetService, clusterGraph, topologyRef) + statefulServices := make([]*resolved.Service, 0) + for _, path := range statefulPaths { + statefulServiceHash, err := lo.Last(path) + if statefulServiceHash == "" || err != nil { + logrus.Infof("Error finding last stateful service hash in path %v: %v", path, err) } - - err = applyExternalServicePlugin(pluginRunner, targetService, externalService, plugin, pluginIdx, flowID) + statefulService, err := clusterGraph.Vertex(statefulServiceHash) if err != nil { - return nil, stacktrace.Propagate(err, "An error occurred creating external servie plugin for external service '%v' depended on by '%v'", externalService.ServiceID, targetService.ServiceID) + return fmt.Errorf("an error occurred getting stateful service vertex from graph: %s", err) } - - err = topology.MoveServiceToVersion(externalService, flowID) + statefulServices = append(statefulServices, statefulService) + } + statefulServices = lo.Uniq(statefulServices) + + externalPaths := findAllDownstreamExternalPaths(targetService, clusterGraph, topologyRef) + externalServices := make([]*resolved.Service, 0) + alreadyHandledExternalServices := make([]string, 0) + for _, path := range externalPaths { + externalServiceHash, err := lo.Last(path) + if externalServiceHash == "" || err != nil { + logrus.Infof("Error finding last external service hash in path %v: %v", path, err) + } + externalService, err := clusterGraph.Vertex(externalServiceHash) if err != nil { - return nil, err + return fmt.Errorf("an error occurred getting external service vertex from graph: %s", err) } + externalServices = append(externalServices, externalService) } - } + externalServices = lo.Uniq(externalServices) + + // SECTION 1 - Create external plugins and move the external K8s Service to a new version with the FlowID + // handle external service plugins on this service + logrus.Infof("Checking if this service has any external services...") + for _, plugin := range targetService.StatefulPlugins { + + alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] + if ok { + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, targetService.ServiceID) + } else { + pluginServices[plugin.ServiceName] = []string{targetService.ServiceID} + } + pluginServicesMap[plugin.ServiceName] = plugin - modifiedTargetService := DeepCopyService(targetService) - modifiedTargetService.WorkloadSpec = workloadSpec - modifiedTargetService.Version = flowID - err := topology.UpdateWithService(modifiedTargetService) - if err != nil { - return nil, err - } + // Edit the external service k8s.Service resource setting it the flow ID + if plugin.Type == "external" { + logrus.Infof("service %s contains an external dependency plugin: %v", targetService.ServiceID, plugin.Name) - for serviceIdx, service := range topology.Services { - if lo.Contains(statefulServices, service) { - logrus.Debugf("applying stateful plugins on service: %s", service.ServiceID) - // Don't modify the original service - modifiedService := DeepCopyService(service) - modifiedService.Version = flowID + // find the existing external service and update it in the topology to get a new version + externalService, err := topologyRef.GetService(plugin.ServiceName) + if err != nil { + return fmt.Errorf("external service specified by plugin '%v' was not found in base topology", plugin.ServiceName) + } - if !modifiedService.IsStateful { - panic(fmt.Sprintf("Service %s is not stateful but is in stateful paths", modifiedService.ServiceID)) + err = topologyRef.MoveServiceToVersion(externalService, flowID) + if err != nil { + return err + } } + } - // Apply a chain of stateful plugins to the stateful service - resultSpec := modifiedService.WorkloadSpec.DeepCopy() - for pluginIdx, plugin := range modifiedService.StatefulPlugins { - if plugin.Type == "external" { - // we handle external plugins above - // might need to handle this if stateful services can have external dependencies - continue + // SECTION 2 - Target service updates with new modifications + modifiedTargetService := DeepCopyService(targetService) + modifiedTargetService.WorkloadSpec = servicePatch.WorkloadSpec + err = topologyRef.MoveServiceToVersion(modifiedTargetService, flowID) + if err != nil { + return err + } + + // SECTION 3 - handle stateful services + for serviceIdx, service := range topologyRef.Services { + if lo.Contains(statefulServices, service) { + logrus.Debugf("applying stateful plugins on service: %s", service.ServiceID) + // Don't modify the original service + modifiedService := DeepCopyService(service) + modifiedService.Version = flowID + + if !modifiedService.IsStateful { + return fmt.Errorf("service %s is not stateful but is in stateful paths", modifiedService.ServiceID) } - logrus.Infof("Applying plugin %s for service %s with flow id %s", plugin.Name, modifiedService.ServiceID, flowID) - pluginId := plugins.GetPluginId(flowID, modifiedService.ServiceID, pluginIdx) - spec, _, err := pluginRunner.CreateFlow(plugin.Name, *modifiedService.ServiceSpec, resultSpec, pluginId, plugin.Args) - if err != nil { - return nil, fmt.Errorf("error creating flow for service %s: %v", modifiedService.ServiceID, err) + + // Apply a chain of stateful plugins to the stateful service + resultSpec := modifiedService.WorkloadSpec.DeepCopy() + + for _, plugin := range modifiedService.StatefulPlugins { + if plugin.Type == "external" { + //we handle external plugins above + //might need to handle this if stateful services can have external dependencies + continue + } + + alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] + if ok { + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, modifiedService.ServiceID) + } else { + pluginServices[plugin.ServiceName] = []string{modifiedService.ServiceID} + } + pluginServicesMap[plugin.ServiceName] = plugin } - resultSpec = spec - } - // Update service with final deployment spec - modifiedService.WorkloadSpec = resultSpec - - topology.Services[serviceIdx] = modifiedService - topology.UpdateDependencies(service, modifiedService) - - // create versioned parents for non http stateful services - // TODO - this should be done for all non http services and not just the stateful ones - // every child should be copied; immediate parent duplicated - // if children of non http services support http then our routing will have to be modified - // we should treat those http services as non http; a hack could be to remove the appProtocol HTTP marking - if !modifiedService.IsHTTP() { - logrus.Infof("Stateful service %s is non http; its parents shall be duplicated", modifiedService.ServiceID) - parents := topology.FindImmediateParents(service) - for _, parent := range parents { - logrus.Infof("Duplicating parent %s", parent.ServiceID) - err = topology.MoveServiceToVersion(parent, flowID) - if err != nil { - return nil, err + // Update service with final deployment spec + modifiedService.WorkloadSpec = resultSpec + + topologyRef.Services[serviceIdx] = modifiedService + topologyRef.UpdateDependencies(service, modifiedService) + + // create versioned parents for non http stateful services + // TODO - this should be done for all non http services and not just the stateful ones + // every child should be copied; immediate parent duplicated + // if children of non http services support http then our routing will have to be modified + // we should treat those http services as non http; a hack could be to remove the appProtocol HTTP marking + if !modifiedService.IsHTTP() { + logrus.Infof("Stateful service %s is non http; its parents shall be duplicated", modifiedService.ServiceID) + parents := topologyRef.FindImmediateParents(service) + for _, parent := range parents { + logrus.Infof("Duplicating parent %s", parent.ServiceID) + err = topologyRef.MoveServiceToVersion(parent, flowID) + if err != nil { + return err + } } } } - } - // if the service is an external service of the target service, it was already handled above - if lo.Contains(externalServices, service) && !lo.Contains(alreadyHandledExternalServices, service.ServiceID) { - // assume there's only one parent service for now but eventually we'll likely need to account for multiple parents to external service - parentServices := topology.FindImmediateParents(service) - if len(parentServices) == 0 { - return nil, stacktrace.NewError("Expected to find a parent service to the external service '%v' but did not find one. All external services should have a parent so this is a bug in Kardinal.", service.ServiceID) - } - parentService := parentServices[0] - modifiedParentService := DeepCopyService(parentService) - - _, found := lo.Find(parentService.StatefulPlugins, func(plugin *resolved.StatefulPlugin) bool { - return plugin.ServiceName == service.ServiceID - }) - if !found { - return nil, stacktrace.NewError("Did not find plugin on parent service '%v' for the corresponding external service '%v'.This is a bug in Kardinal.", parentService.ServiceID, service.ServiceID) - } + // SECTION 4 - handle external services that are not target service dependencies + // if the service is an external service of the target service, it was already handled above + if lo.Contains(externalServices, service) && !lo.Contains(alreadyHandledExternalServices, service.ServiceID) { + // assume there's only one parent service for now but eventually we'll likely need to account for multiple parents to external service + parentServices := topologyRef.FindImmediateParents(service) + if len(parentServices) == 0 { + return stacktrace.NewError("Expected to find a parent service to the external service '%v' but did not find one. All external services should have a parent so this is a bug in Kardinal.", service.ServiceID) + } + parentService := parentServices[0] + modifiedParentService := DeepCopyService(parentService) + + _, found := lo.Find(parentService.StatefulPlugins, func(plugin *resolved.StatefulPlugin) bool { + return plugin.ServiceName == service.ServiceID + }) + if !found { + return stacktrace.NewError("Did not find plugin on parent service '%v' for the corresponding external service '%v'.This is a bug in Kardinal.", parentService.ServiceID, service.ServiceID) + } + + for _, plugin := range parentService.StatefulPlugins { + // assume there's only one plugin on the parent service for this external service + if parentService.ServiceSpec == nil { + return stacktrace.NewError("parent service '%v' does not have a service spec", parentService.ServiceID) + } - for pluginIdx, plugin := range parentService.StatefulPlugins { - // assume there's only one plugin on the parent service for this external service - if plugin.ServiceName == service.ServiceID { - err := applyExternalServicePlugin(pluginRunner, parentService, service, plugin, pluginIdx, flowID) - if err != nil { - return nil, stacktrace.Propagate(err, "error creating flow for external service '%s'", service.ServiceID) + if parentService.WorkloadSpec == nil { + return stacktrace.NewError("parent service '%v' does not have a workload spec", targetService.ServiceID) } + + alreadyServicesWithPlugin, ok := pluginServices[plugin.ServiceName] + if ok { + pluginServices[plugin.ServiceName] = append(alreadyServicesWithPlugin, parentService.ServiceID) + } else { + pluginServices[plugin.ServiceName] = []string{parentService.ServiceID} + } + pluginServicesMap[plugin.ServiceName] = plugin } - } - // add a flow version of the external service to the plugin - err := topology.MoveServiceToVersion(service, flowID) - if err != nil { - return nil, err - } + // add a flow version of the external service to the plugin + err := topologyRef.MoveServiceToVersion(service, flowID) + if err != nil { + return err + } - // add the parent to the topology replacing the deployment spec with the spec returned from the flow - err = topology.MoveServiceToVersion(modifiedParentService, flowID) - if err != nil { - return nil, err + // add the parent to the topology replacing the deployment spec with the spec returned from the flow + err = topologyRef.MoveServiceToVersion(modifiedParentService, flowID) + if err != nil { + return err + } } } } - return topology, nil -} + // SECTION 5 - Execute plugins and update the services deployment specs with the plugin's modifications + for pluginServiceName, serviceIds := range pluginServices { + var servicesServiceSpecs []corev1.ServiceSpec + var servicesWorkloadSpecs []*kardinal.WorkloadSpec + var servicesToUpdate []*resolved.Service -// TODO: have this handle stateful service plugins -func applyExternalServicePlugin( - pluginRunner *plugins.PluginRunner, - dependentService *resolved.Service, - externalService *resolved.Service, - externalServicePlugin *resolved.StatefulPlugin, - pluginIdx int, - flowId string, -) error { - if externalServicePlugin.Type != "external" { - return nil - } + plugin, ok := pluginServicesMap[pluginServiceName] + if !ok { + return stacktrace.NewError("expected to find plugin with service name '%s' in the plugins service map, this is a bug in Kardinal", pluginServiceName) + } - if dependentService.ServiceSpec == nil { - return stacktrace.NewError("Dependent service '%v' does not have a service spec", dependentService.ServiceID) - } + if len(serviceIds) == 0 { + return stacktrace.NewError("expected to find at least one service depending on plugin '%s' but none was found, please review your manifest file", plugin.ServiceName) + } - if dependentService.WorkloadSpec == nil { - return stacktrace.NewError("Dependent service '%v' does not have a workload spec", dependentService.ServiceID) - } + for _, serviceId := range serviceIds { + service, err := topologyRef.GetService(serviceId) + if err != nil { + return stacktrace.Propagate(err, "an error occurred getting service '%s' from topology", serviceId) + } + servicesServiceSpecs = append(servicesServiceSpecs, *service.ServiceSpec) + servicesWorkloadSpecs = append(servicesWorkloadSpecs, service.WorkloadSpec) + servicesToUpdate = append(servicesToUpdate, service) + } + + pluginId := plugins.GetPluginId(plugin.ServiceName, flowID) + logrus.Infof("Calling plugin '%v'...", pluginId) - logrus.Infof("Calling external service '%v' plugin with parent service '%v'...", externalService.ServiceID, dependentService.ServiceID) - pluginId := plugins.GetPluginId(flowId, dependentService.ServiceID, pluginIdx) - spec, _, err := pluginRunner.CreateFlow(externalServicePlugin.Name, *dependentService.ServiceSpec, dependentService.WorkloadSpec, pluginId, externalServicePlugin.Args) - if err != nil { - return stacktrace.Propagate(err, "error creating flow for external service '%s'", externalService.ServiceID) + servicesModifiedWorkloadSpecs, _, err := pluginRunner.CreateFlow(plugin.Name, servicesServiceSpecs, servicesWorkloadSpecs, pluginId, plugin.Args) + if err != nil { + return stacktrace.Propagate(err, "error when creating plugin flow for plugin '%s'", pluginId) + } + + if len(servicesToUpdate) != len(servicesModifiedWorkloadSpecs) { + return fmt.Errorf("an error occurred executing plugin '%s', the number of workload specs returned by the plugin.CreateFlow function are not equal to the number of service depending on it, please check the plugin code or report a bug in the Kardinal repository", plugin.ServiceName) + } + + // updating the service.workload_spec after the plugin execution + for serviceIndex := range serviceIds { + service := servicesToUpdate[serviceIndex] + modifiedWorkloadSpec := servicesModifiedWorkloadSpecs[serviceIndex] + service.WorkloadSpec = modifiedWorkloadSpec + if err = topologyRef.MoveServiceToVersion(service, flowID); err != nil { + return fmt.Errorf("an error occurred updating service '%s'", service.ServiceID) + } + } } - dependentService.WorkloadSpec = spec return nil } func DeleteFlow(pluginRunner *plugins.PluginRunner, topology resolved.ClusterTopology, flowId string) error { + pluginsToDeleteFromThisFlow := map[string]string{} + for _, service := range topology.Services { // don't need to delete flow for services in the topology that aren't a part of this flow if service.Version != flowId { continue } - err := DeleteDevFlow(pluginRunner, flowId, service) - if err != nil { - return stacktrace.Propagate(err, "An error occurred deleting flow '%v' for service '%v'", flowId, service.ServiceID) + for _, plugin := range service.StatefulPlugins { + pluginId := plugins.GetPluginId(plugin.ServiceName, flowId) + pluginsToDeleteFromThisFlow[pluginId] = plugin.Name } } - return nil -} -func DeleteDevFlow(pluginRunner *plugins.PluginRunner, flowId string, service *resolved.Service) error { - for pluginIdx, plugin := range service.StatefulPlugins { - logrus.Infof("Attempting to delete flow for plugin '%v' on flow '%v'", plugin.Name, flowId) - pluginId := plugins.GetPluginId(flowId, service.ServiceID, pluginIdx) - err := pluginRunner.DeleteFlow(plugin.Name, pluginId) + for pluginId, pluginName := range pluginsToDeleteFromThisFlow { + err := pluginRunner.DeleteFlow(pluginName, pluginId) if err != nil { logrus.Errorf("Error deleting flow: %v.", err) - return stacktrace.Propagate(err, "An error occurred while trying to call delete flow of plugin '%v' on service '%v' for flow '%v'", plugin.Name, service.ServiceID, flowId) + return stacktrace.Propagate(err, "An error occurred while trying to call delete flow of plugin '%v' for flow '%v'", pluginName, flowId) } } + return nil } diff --git a/kontrol-service/engine/flow/dev_flow_test.go b/kontrol-service/engine/flow/dev_flow_test.go index 1f647ff..719befa 100644 --- a/kontrol-service/engine/flow/dev_flow_test.go +++ b/kontrol-service/engine/flow/dev_flow_test.go @@ -17,7 +17,7 @@ import ( kardinal "kardinal.kontrol-service/types/kardinal" ) -const dummyPluginName = "https://github.com/h4ck3rk3y/identity-plugin.git" +const dummyPluginName = "https://github.com/fake-org/kardinal-identity-plugin-example.git" func clusterTopologyExample() resolved.ClusterTopology { dummySpec := &kardinal.WorkloadSpec{ diff --git a/kontrol-service/plugins/mock_github.go b/kontrol-service/plugins/mock_github.go index 51bba52..1372451 100644 --- a/kontrol-service/plugins/mock_github.go +++ b/kontrol-service/plugins/mock_github.go @@ -2,62 +2,81 @@ package plugins var MockGitHub = map[string]map[string]string{ // Simple Plugin - "https://github.com/h4ck3rk3y/a-test-plugin.git": { - "main.py": `REPLACED = "the-text-has-been-replaced" + "https://github.com/fake-org/kardinal-simple-plugin-example.git": { + "main.py": `import copy + +REPLACED = "the-text-has-been-replaced" + + +def create_flow(service_specs: list, pod_specs: list, flow_uuid, text_to_replace): + modified_pod_specs = [] + + for pod_spec in pod_specs: + modified_pod_spec = copy.deepcopy(pod_spec) + modified_pod_spec['containers'][0]['name'] = modified_pod_spec['containers'][0]['name'].replace(text_to_replace, REPLACED) + + modified_pod_specs.append(modified_pod_spec) -def create_flow(service_spec, pod_spec, flow_uuid, text_to_replace): - pod_spec['containers'][0]['name'] = pod_spec['containers'][0]['name'].replace(text_to_replace, REPLACED) - config_map = { "original_text": text_to_replace } - + return { - "pod_spec": pod_spec, + "pod_specs": modified_pod_specs, "config_map": config_map } + def delete_flow(config_map, flow_uuid): print(config_map["original_text"]) `, }, // Complex Plugin - "https://github.com/h4ck3rk3y/slightly-more-complex-plugin.git": { - "main.py": `import json + "https://github.com/fake-org/kardinal-slightly-more-complex-plugin-example.git": { + "main.py": `import copy import requests -def create_flow(service_spec, pod_spec, flow_uuid): + +def create_flow(service_specs: list, pod_specs: list, flow_uuid): response = requests.get("https://4.ident.me") if response.status_code != 200: raise Exception("An unexpected error occurred") - + ip_address = response.text.strip() - - # Replace the IP address in the environment variable - for container in pod_spec['containers']: - for env in container['env']: - if env['name'] == 'REDIS': - env['value'] = ip_address - + + modified_pod_specs = [] + + for pod_spec in pod_specs: + modified_pod_spec = copy.deepcopy(pod_spec) + # Replace the IP address in the environment variable + for container in modified_pod_spec['containers']: + for env in container['env']: + if env['name'] == 'REDIS': + env['value'] = ip_address + + modified_pod_specs.append(modified_pod_spec) + + config_map = { "original_value": "ip_addr" } - + return { - "pod_spec": pod_spec, + "pod_specs": modified_pod_specs, "config_map": config_map } + def delete_flow(config_map, flow_uuid): # In this complex plugin, we don't need to do anything for deletion return None`, "requirements.txt": "requests", }, // Identity Plugin - "https://github.com/h4ck3rk3y/identity-plugin.git": { - "main.py": `def create_flow(service_spec, pod_spec, flow_uuid): + "https://github.com/fake-org/kardinal-identity-plugin-example.git": { + "main.py": `def create_flow(service_specs, pod_specs, flow_uuid): return { - "pod_spec": pod_spec, + "pod_specs": pod_specs, "config_map": {} } @@ -66,16 +85,28 @@ def delete_flow(config_map, flow_uuid): `, }, // Redis sidecar plugin - "https://github.com/h4ck3rk3y/redis-sidecar-plugin.git": { - "main.py": `def create_flow(service_spec, pod_spec, flow_uuid): - pod_spec['containers'][0]["image"] = "kurtosistech/redis-proxy-overlay:latest" + "https://github.com/fake-org/kardinal-redis-sidecar-plugin-example.git": { + "main.py": `import copy + + +def create_flow(service_specs, pod_specs, flow_uuid): + + modified_pod_specs = [] + + for pod_spec in pod_specs: + modified_pod_spec = copy.deepcopy(pod_spec) + modified_pod_spec['containers'][0]["image"] = "kurtosistech/redis-proxy-overlay:latest" + + modified_pod_specs.append(modified_pod_spec) + return { - "pod_spec": pod_spec, + "pod_specs": modified_pod_specs, "config_map": {} } def delete_flow(config_map, flow_uuid): pass + `, }, } diff --git a/kontrol-service/plugins/plugins.go b/kontrol-service/plugins/plugins.go index 29a322e..2ac74cf 100644 --- a/kontrol-service/plugins/plugins.go +++ b/kontrol-service/plugins/plugins.go @@ -17,8 +17,8 @@ import ( ) const ( - // -- - pluginIdFmtStr = "%s-%s-%d" + // - + pluginIdFmtStr = "%s-%s" ) type PluginRunner struct { @@ -37,25 +37,33 @@ func NewPluginRunner(gitPluginProvider GitPluginProvider, tenantId string, db *d } } -func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpec corev1.ServiceSpec, originalWorkloadSpec *kardinal.WorkloadSpec, flowUuid string, arguments map[string]string) (*kardinal.WorkloadSpec, string, error) { - workloadSpec := originalWorkloadSpec.DeepCopy() +func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpecs []corev1.ServiceSpec, originalWorkloadSpecs []*kardinal.WorkloadSpec, flowUuid string, arguments map[string]string) ([]*kardinal.WorkloadSpec, string, error) { + var workloadSpecs []*kardinal.WorkloadSpec + var podSpecs []*v1.PodSpec + for _, originalWorkloadSpec := range originalWorkloadSpecs { + workloadSpec := originalWorkloadSpec.DeepCopy() + workloadSpecs = append(workloadSpecs, workloadSpec) + podSpecs = append(podSpecs, workloadSpec.GetTemplateSpec()) + } repoPath, err := pr.getOrCloneRepo(pluginUrl) if err != nil { return nil, "", fmt.Errorf("failed to get or clone repository: %v", err) } - serviceSpecJSON, err := json.Marshal(serviceSpec) + serviceSpecsJSON, err := json.Marshal(serviceSpecs) if err != nil { - return nil, "", fmt.Errorf("failed to marshal service spec: %v", err) + return nil, "", fmt.Errorf("failed to marshal service specs: %v", err) } + serviceSpecsJSONStr := base64.StdEncoding.EncodeToString(serviceSpecsJSON) - deploymentSpecJSON, err := json.Marshal(workloadSpec.GetTemplateSpec()) + podSpecsJSON, err := json.Marshal(podSpecs) if err != nil { - return nil, "", fmt.Errorf("failed to marshal deployment spec: %v", err) + return nil, "", fmt.Errorf("failed to marshal pod specs: %v", err) } + podSpecsJSONStr := base64.StdEncoding.EncodeToString(podSpecsJSON) - result, err := runPythonCreateFlow(repoPath, string(serviceSpecJSON), string(deploymentSpecJSON), flowUuid, arguments) + result, err := runPythonCreateFlow(repoPath, serviceSpecsJSONStr, podSpecsJSONStr, flowUuid, arguments) if err != nil { return nil, "", err } @@ -66,16 +74,23 @@ func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpec corev1.ServiceS return nil, "", fmt.Errorf("failed to parse result: %v", err) } - if resultMap["pod_spec"] == nil { - return nil, "", fmt.Errorf("no pod_spec found in plugin result") + if resultMap["pod_specs"] == nil { + return nil, "", fmt.Errorf("no pod_specs found in plugin result") } else { - var newDeploymentSpec v1.PodSpec - err = json.Unmarshal(resultMap["pod_spec"], &newDeploymentSpec) + var newPodSpecs []v1.PodSpec + err = json.Unmarshal(resultMap["pod_specs"], &newPodSpecs) if err != nil { - logrus.Errorf("Failed to unmarshal pod spec: %v", string(resultMap["pod_spec"])) - return nil, "", fmt.Errorf("failed to unmarshal deployment spec: %v", err) + logrus.Errorf("Failed to unmarshal pod specs: %v", string(resultMap["pod_specs"])) + return nil, "", fmt.Errorf("failed to unmarshal pod specs: %v", err) + } + numWorkloadSpecs := len(workloadSpecs) + numNewPodSpecs := len(newPodSpecs) + if numWorkloadSpecs != numNewPodSpecs { + return nil, "", fmt.Errorf("expected to receive '%d' modified pod specs from plugin '%s' execution result but '%d' were received instead, this is a bug in Kardinal", numWorkloadSpecs, flowUuid, numNewPodSpecs) + } + for newPodSpecIdx, newPodSpec := range newPodSpecs { + workloadSpecs[newPodSpecIdx].UpdateTemplateSpec(newPodSpec) } - workloadSpec.UpdateTemplateSpec(newDeploymentSpec) } configMapJSON := resultMap["config_map"] @@ -96,7 +111,7 @@ func (pr *PluginRunner) CreateFlow(pluginUrl string, serviceSpec corev1.ServiceS return nil, "", fmt.Errorf("failed to store the config map: %v", err) } - return workloadSpec, string(configMapBytes), nil + return workloadSpecs, string(configMapBytes), nil } func (pr *PluginRunner) DeleteFlow(pluginUrl, flowUuid string) error { @@ -123,8 +138,8 @@ func (pr *PluginRunner) DeleteFlow(pluginUrl, flowUuid string) error { return nil } -func GetPluginId(flowId, serviceId string, pluginIdx int) string { - return fmt.Sprintf(pluginIdFmtStr, flowId, serviceId, pluginIdx) +func GetPluginId(pluginServiceName string, flowId string) string { + return fmt.Sprintf(pluginIdFmtStr, pluginServiceName, flowId) } func (pr *PluginRunner) getConfigForFlow(flowUuid string) (string, error) { @@ -138,7 +153,7 @@ func (pr *PluginRunner) getConfigForFlow(flowUuid string) (string, error) { return pluginConfig.Config, nil } -func runPythonCreateFlow(repoPath, serviceSpecJSON, deploymentSpecJSON, flowUuid string, arguments map[string]string) (string, error) { +func runPythonCreateFlow(repoPath, serviceSpecsJSONStr, podSpecsJSONStr, flowUuid string, arguments map[string]string) (string, error) { scriptPath := filepath.Join(repoPath, "main.py") if _, err := os.Stat(scriptPath); os.IsNotExist(err) { @@ -178,8 +193,10 @@ import base64 sys.path.append("%s") import main -service_spec = json.loads('''%s''') -pod_spec = json.loads('''%s''') +service_specs_json = base64.b64decode('%s').decode('utf-8') +service_specs = json.loads(service_specs_json) +pod_specs_json = base64.b64decode('%s').decode('utf-8') +pod_specs = json.loads(pod_specs_json) flow_uuid = %q args_json = base64.b64decode('%s').decode('utf-8') args = json.loads(args_json) @@ -190,10 +207,10 @@ sig = inspect.signature(main.create_flow) # Prepare kwargs based on the function signature kwargs = {} for param in sig.parameters.values(): - if param.name == 'service_spec': - kwargs['service_spec'] = service_spec - elif param.name == 'pod_spec': - kwargs['pod_spec'] = pod_spec + if param.name == 'service_specs': + kwargs['service_specs'] = service_specs + elif param.name == 'pod_specs': + kwargs['pod_specs'] = pod_specs elif param.name == 'flow_uuid': kwargs['flow_uuid'] = flow_uuid elif param.name in args: @@ -209,7 +226,7 @@ result = main.create_flow(**kwargs) # Write the result to a temporary file with open('%s', 'w') as f: json.dump(result, f) -`, repoPath, serviceSpecJSON, deploymentSpecJSON, flowUuid, argJsonStr, tempResultFile.Name()) +`, repoPath, serviceSpecsJSONStr, podSpecsJSONStr, flowUuid, argJsonStr, tempResultFile.Name()) if err := executePythonScript(venvPath, repoPath, tempScript); err != nil { return "", err diff --git a/kontrol-service/plugins/plugins_test.go b/kontrol-service/plugins/plugins_test.go index 1c9bb5f..5009760 100644 --- a/kontrol-service/plugins/plugins_test.go +++ b/kontrol-service/plugins/plugins_test.go @@ -13,42 +13,82 @@ import ( ) const ( - simplePlugin = "https://github.com/h4ck3rk3y/a-test-plugin.git" - complexPlugin = "https://github.com/h4ck3rk3y/slightly-more-complex-plugin.git" - identityPlugin = "https://github.com/h4ck3rk3y/identity-plugin.git" - redisPlugin = "https://github.com/h4ck3rk3y/redis-sidecar-plugin.git" + simplePlugin = "https://github.com/fake-org/kardinal-simple-plugin-example.git" + complexPlugin = "https://github.com/fake-org/kardinal-slightly-more-complex-plugin-example.git" + identityPlugin = "https://github.com/fake-org/kardinal-identity-plugin-example.git" + redisPlugin = "https://github.com/fake-org/kardinal-redis-sidecar-plugin-example.git" flowUuid = "test-flow-uuid" ) -var serviceSpec = corev1.ServiceSpec{} +var serviceSpecs = []corev1.ServiceSpec{} -var deploymentSpec = appv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "helloworld", - }, - }, - Replicas: int32Ptr(1), - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ +var deploymentSpecs = []appv1.DeploymentSpec{ + { + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ "app": "helloworld", }, }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "helloworld", - Image: "karthequian/helloworld:latest", - Ports: []corev1.ContainerPort{ - { - ContainerPort: 80, + Replicas: int32Ptr(1), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "helloworld", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "helloworld", + Image: "karthequian/helloworld:latest", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "REDIS", + Value: "ip_addr", + }, }, }, - Env: []corev1.EnvVar{ - { - Name: "REDIS", - Value: "ip_addr", + }, + }, + }, + }, + { + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "helloworld-version2", + }, + }, + Replicas: int32Ptr(4), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "helloworld-version2", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "helloworld", + Image: "karthequian/helloworld:latest", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "REDIS", + Value: "ip_addr", + }, + { + Name: "FOO", // adding extra env var to compare differences + Value: "bar", + }, }, }, }, @@ -57,7 +97,13 @@ var deploymentSpec = appv1.DeploymentSpec{ }, } -var workloadSpec = kardinal.NewDeploymentWorkloadSpec(deploymentSpec) +var workloadSpec1 = kardinal.NewDeploymentWorkloadSpec(deploymentSpecs[0]) +var workloadSpec2 = kardinal.NewDeploymentWorkloadSpec(deploymentSpecs[1]) + +var workloadSpecs = []*kardinal.WorkloadSpec{ + &workloadSpec1, + &workloadSpec2, +} func getPluginRunner(t *testing.T) (*PluginRunner, func() error) { db, cleanUpDbFunc, err := database.NewSQLiteDB() @@ -84,11 +130,14 @@ func TestSimplePlugin(t *testing.T) { "text_to_replace": "helloworld", } - updatedDeploymentSpec, configMap, err := runner.CreateFlow(simplePlugin, serviceSpec, &workloadSpec, flowUuid, arguments) + updatedDeploymentSpecs, configMap, err := runner.CreateFlow(simplePlugin, serviceSpecs, workloadSpecs, flowUuid, arguments) require.NoError(t, err) - // Check if the deployment spec was updated correctly - require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.GetTemplateSpec().Containers[0].Name) + for idx, updatedDeploymentSpec := range updatedDeploymentSpecs { + // Check if the deployment spec was updated correctly + require.Equal(t, "the-text-has-been-replaced", updatedDeploymentSpec.GetTemplateSpec().Containers[0].Name) + require.Equal(t, workloadSpecs[idx].GetTemplateSpec().Containers[0].Env, updatedDeploymentSpec.GetTemplateSpec().Containers[0].Env) + } // Verify the config map var configMapData map[string]interface{} @@ -109,11 +158,11 @@ func TestIdentityPlugin(t *testing.T) { runner, cleanUpDbFunc := getPluginRunner(t) defer cleanUpDbFunc() - updatedServiceSpec, configMap, err := runner.CreateFlow(identityPlugin, serviceSpec, &workloadSpec, flowUuid, map[string]string{}) + updatedServiceSpec, configMap, err := runner.CreateFlow(identityPlugin, serviceSpecs, workloadSpecs, flowUuid, map[string]string{}) require.NoError(t, err) // Check if the deployment spec was updated correctly - require.Equal(t, workloadSpec, *updatedServiceSpec) + require.Equal(t, workloadSpecs, updatedServiceSpec) // Verify the config map var configMapData map[string]interface{} @@ -134,12 +183,14 @@ func TestComplexPlugin(t *testing.T) { runner, cleanUpDbFunc := getPluginRunner(t) defer cleanUpDbFunc() - updatedServiceSpec, configMap, err := runner.CreateFlow(complexPlugin, serviceSpec, &workloadSpec, flowUuid, map[string]string{}) + updatedDeploymentSpecs, configMap, err := runner.CreateFlow(complexPlugin, serviceSpecs, workloadSpecs, flowUuid, map[string]string{}) require.NoError(t, err) - // Check if the deployment spec was updated correctly - require.NotEqual(t, "ip_addr", updatedServiceSpec.GetTemplateSpec().Containers[0].Env[0].Value) - require.Regexp(t, `\b(?:\d{1,3}\.){3}\d{1,3}\b`, updatedServiceSpec.GetTemplateSpec().Containers[0].Env[0].Value) + for _, updatedDeploymentSpec := range updatedDeploymentSpecs { + // Check if the deployment spec was updated correctly + require.NotEqual(t, "ip_addr", updatedDeploymentSpec.GetTemplateSpec().Containers[0].Env[0].Value) + require.Regexp(t, `\b(?:\d{1,3}\.){3}\d{1,3}\b`, updatedDeploymentSpec.GetTemplateSpec().Containers[0].Env[0].Value) + } // Verify the config map var configMapData map[string]interface{} @@ -160,11 +211,13 @@ func TestRedisPluginTest(t *testing.T) { runner, cleanUpDbFunc := getPluginRunner(t) defer cleanUpDbFunc() - updatedServiceSpec, configMap, err := runner.CreateFlow(redisPlugin, serviceSpec, &workloadSpec, flowUuid, map[string]string{}) + updatedDeploymentSpecs, configMap, err := runner.CreateFlow(redisPlugin, serviceSpecs, workloadSpecs, flowUuid, map[string]string{}) require.NoError(t, err) - // Check if the deployment spec was updated correctly - require.Equal(t, "kurtosistech/redis-proxy-overlay:latest", updatedServiceSpec.GetTemplateSpec().Containers[0].Image) + for _, updatedDeploymentSpec := range updatedDeploymentSpecs { + // Check if the deployment spec was updated correctly + require.Equal(t, "kurtosistech/redis-proxy-overlay:latest", updatedDeploymentSpec.GetTemplateSpec().Containers[0].Image) + } // Verify the config map var configMapData map[string]interface{} diff --git a/kontrol-service/test/data.go b/kontrol-service/test/data.go index 9478f00..ad78a4c 100644 --- a/kontrol-service/test/data.go +++ b/kontrol-service/test/data.go @@ -40,7 +40,6 @@ func GetIngressConfigs() []apitypes.IngressConfig { } func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) { - serviceConfigs := []apitypes.ServiceConfig{} deploymentConfigs := []apitypes.DeploymentConfig{} // Redis prod service @@ -50,9 +49,11 @@ func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) containerImage := "bitnami/redis:6.0.8" containerName := "redis-prod" version := "prod" + pluginServiceName := "redis-prod-plugin" port := int32(6379) portStr := fmt.Sprintf("%d", port) - serviceConfigs = append(serviceConfigs, apitypes.ServiceConfig{ + + redisProdServiceConfig := apitypes.ServiceConfig{ Service: v1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -66,12 +67,7 @@ func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) }, Annotations: map[string]string{ "kardinal.dev.service/stateful": "true", - "kardinal.dev.service/plugins": ` -- name: github.com/kardinaldev/redis-db-sidecar-plugin:36ed9a4 - type: stateful - args: - mode: "pass-through" -`, + "kardinal.dev.service/plugins": pluginServiceName, }, }, Spec: v1.ServiceSpec{ @@ -88,7 +84,31 @@ func GetServiceConfigs() ([]apitypes.ServiceConfig, []apitypes.DeploymentConfig) }, }, }, - }) + } + + pluginServiceConfig := apitypes.ServiceConfig{ + Service: v1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pluginServiceName, + Annotations: map[string]string{ + "kardinal.dev.service/stateful": "true", + "kardinal.dev.service/plugin-definition": ` +- name: github.com/kardinaldev/redis-db-sidecar-plugin:36ed9a4 + type: stateful + servicename: redis-prod-plugin + args: + mode: "pass-through" +`, + }, + }, + }, + } + + serviceConfigs := []apitypes.ServiceConfig{redisProdServiceConfig, pluginServiceConfig} deploymentConfigs = append(deploymentConfigs, apitypes.DeploymentConfig{ Deployment: apps.Deployment{ diff --git a/kontrol-service/types/cluster_topology/resolved/core_test.go b/kontrol-service/types/cluster_topology/resolved/core_test.go index 9b5e792..37d4f55 100644 --- a/kontrol-service/types/cluster_topology/resolved/core_test.go +++ b/kontrol-service/types/cluster_topology/resolved/core_test.go @@ -9,7 +9,7 @@ import ( kardinal "kardinal.kontrol-service/types/kardinal" ) -const dummyPluginName = "https://github.com/h4ck3rk3y/identity-plugin.git" +const dummyPluginName = "https://github.com/fake-org/kardinal-identity-plugin-example.git" var ( httpProtocol = "HTTP"