Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
feat!: multiple services depending on same plugin implementation (#61)
Browse files Browse the repository at this point in the history
* refactor generateClusterTopology to improve legibility

* parsing services with kardinal.dev.service/plugin-definition annotation and seting those in the cluster topology services

* using plugin service name to identify them

* small change

* adding comments in the dev flow

* update the plugin.CreateFlow signature to support multiple deployment specs comming from different services

* refactor how to execute plugins

* handling services plugins

* handling plugins delete flow

* updating target service version in apply patch

* fix delete flow and fix plugins map used to execute plugins

* cleaning the implementation

* update plugin tests

* obd-demo file edited to support new plugins API

* edited obd-demo

* ci-e2e-tests edited

* teplate support new plugins API

* fix typo

* fix TestServiceConfigsToClusterTopology test

* TODO removed

* fix TestExternalServicesFlowOnDependentService

* fix TestHashFunc test
  • Loading branch information
leoporoli authored Oct 10, 2024
1 parent 7d6f89e commit 3fce223
Show file tree
Hide file tree
Showing 12 changed files with 593 additions and 380 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
run: |
tenant_id=${{ steps.tenant.outputs.id }}
nodes=$(curl http://localhost:8080/tenant/${tenant_id}/topology | jq -r '.nodes[].id' | tr " " "\n" | sort -g | tr "\n" " " | xargs)
if [ "${nodes}" != "cartservice free-currency-api frontend ingress postgres productcatalogservice" ]; then exit 1; fi
if [ "${nodes}" != "cartservice frontend ingress jsdelivr-api postgres productcatalogservice" ]; then exit 1; fi
- name: Create, validate and delete flow
run: |
Expand Down
91 changes: 53 additions & 38 deletions ci/obd-demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ spec:
targetPort: 8090
protocol: TCP
appProtocol: HTTP

---
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -124,15 +125,12 @@ spec:
- name: "Cookie"
value: "shop_session-id=x-liveness-probe"
env:
- name: ADDRESS
value: ":8080"
- name: FREECURRENCYAPIKEY
value: "fca_live_nFVVF8CvfxqJhzMHB4N2x1NH7ffVVPwZr9hg3iNl"
- name: JSDELIVRAPIKEY
value: "prod"
- name: CARTSERVICEHOST
value: cartservice
- name: PRODUCTCATALOGSERVICEHOST
value: productcatalogservice

---
apiVersion: v1
kind: Service
Expand All @@ -143,12 +141,7 @@ metadata:
version: v1
annotations:
kardinal.dev.service/dependencies: "productcatalogservice:http,cartservice:http"
kardinal.dev.service/plugins: |
- name: https://github.com/kurtosis-tech/free-currency-api-plugin.git
type: external
servicename: free-currency-api
args:
api_key: fca_live_VKZlykCWEiFcpBHnw74pzd4vLi04q1h9JySbVHDF
kardinal.dev.service/plugins: "jsdelivr-api"
spec:
type: ClusterIP
selector:
Expand Down Expand Up @@ -210,33 +203,7 @@ metadata:
version: v1
annotations:
kardinal.dev.service/stateful: "true"
kardinal.dev.service/plugins: |
- name: github.com/kurtosis-tech/postgres-seed-plugin
args:
seed_script: |
-- create the table
CREATE TABLE IF NOT EXISTS public.items(
id bigserial PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE,
deleted_at TIMESTAMP WITH TIME ZONE,
user_id TEXT,
product_id TEXT,
quantity INTEGER
);
INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity)
VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1);
INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity)
VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1);
-- Set the sequence to the correct value after inserting records
SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items));
db_name: "cart"
db_user: "postgresuser"
db_password: "postgrespass"
kardinal.dev.service/plugins: "postgres-seed-plugin"
spec:
type: ClusterIP
ports:
Expand Down Expand Up @@ -332,3 +299,51 @@ spec:
name: frontend
port:
number: 80

---
apiVersion: v1
kind: Service
metadata:
name: jsdelivr-api
annotations:
kardinal.dev.service/plugin-definition: |
- name: github.com/leoporoli/jsdelivr-api-plugin
type: external
servicename: jsdelivr-api
args:
api_key: "dev"
---
apiVersion: v1
kind: Service
metadata:
name: postgres-seed-plugin
annotations:
kardinal.dev.service/plugin-definition: |
- name: github.com/leoporoli/postgres-seed-plugin
type: stateful
servicename: postgres-seed-plugin
args:
seed_script: |
-- create the table
CREATE TABLE IF NOT EXISTS public.items(
id bigserial PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE,
deleted_at TIMESTAMP WITH TIME ZONE,
user_id TEXT,
product_id TEXT,
quantity INTEGER
);
INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity)
VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1);
INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity)
VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1);
-- Set the sequence to the correct value after inserting records
SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items));
db_name: "cart"
db_user: "postgresuser"
db_password: "postgrespass"
30 changes: 1 addition & 29 deletions ci/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,4 @@ metadata:
name: postgres
annotations:
kardinal.dev.service/shared: "true"
kardinal.dev.service/plugins: |
- name: github.com/kurtosis-tech/postgres-seed-plugin
args:
seed_script: |
-- create the table
CREATE TABLE IF NOT EXISTS public.items(
id bigserial PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE,
deleted_at TIMESTAMP WITH TIME ZONE,
user_id TEXT,
product_id TEXT,
quantity INTEGER
);
INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity)
VALUES (1, '2024-08-02 13:02:07.656104 +00:00', '2024-08-02 13:02:07.656104 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '66VCHSJNUP', 1);
INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity)
VALUES (2, '2024-08-02 13:02:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', 1);
INSERT INTO public.items (id, created_at, updated_at, deleted_at, user_id, product_id, quantity)
VALUES (3, '2024-08-02 13:03:10.891407 +00:00', '2024-08-02 13:02:10.891407 +00:00', null, '0494c5e0-dde0-48fa-a6d8-f7962f5476bf', '2ZYFJ3GM2N', ${last_insert_quantity:-1});
-- Set the sequence to the correct value after inserting records
SELECT setval('public.items_id_seq', (SELECT MAX(id) FROM public.items));
db_name: "cart"
db_user: "postgresuser"
db_password: "postgrespass"
kardinal.dev.service/plugins: "postgres-seed-plugin"
110 changes: 84 additions & 26 deletions kontrol-service/engine/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,37 @@ func processServiceConfigs(
statefulSetConfigs []apitypes.StatefulSetConfig,
version string,
) ([]*resolved.Service, []resolved.ServiceDependency, error) {
var err error
clusterTopologyServices := []*resolved.Service{}
clusterTopologyServiceDependencies := []resolved.ServiceDependency{}
externalServicesDependencies := []resolved.ServiceDependency{}
availablePlugins := map[string]*resolved.StatefulPlugin{}

type serviceWithDependenciesAnnotation struct {
service *resolved.Service
dependenciesAnnotation string
}
serviceWithDependencies := []*serviceWithDependenciesAnnotation{}
servicesWithDependencies := []*serviceWithDependenciesAnnotation{}

// First, iterate the services to get all the available plugins
for _, serviceConfig := range serviceConfigs {
// availablePlugins list contains both stateful and external plugins and, externalServices is a list of Kardinal services that are also linked with a plugin inside the availablePlugins list
availablePlugins, err = addAvailablePluginsFromServiceConfig(serviceConfig, availablePlugins)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred while parsing plugin '%s'", serviceConfig.Service.GetName())
}
}

// Second, iterate the services to create the clusterTopology service with partial data (no dependencies set here)
for _, serviceConfig := range serviceConfigs {
service := serviceConfig.Service

// A plugin services (k8s services with the kardinal.dev.service/plugin-definition) are used to define a plugin Type and not a k8s service
isPlugin, _ := isPluginService(service)
if isPlugin {
continue
}

serviceAnnotations := service.GetObjectMeta().GetAnnotations()

// 1- Service
Expand All @@ -225,29 +244,30 @@ func processServiceConfigs(
return nil, nil, stacktrace.Propagate(error, "An error occurred creating new cluster topology service from service config '%s'", service.Name)
}

// 2- Service plugins
serviceStatefulPlugins, externalServices, newExternalServicesDependencies, err := newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService)
// 2- Plugins
// the servicePlugins list contains both stateful and external plugins and, externalServices is a list of Kardinal services that are also linked with a plugin inside the availablePlugins list
servicePlugins, externalServices, newExternalServicesDependencies, err := newServicePluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService, availablePlugins)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful plugins and external services from service config '%s'", service.Name)
}
clusterTopologyService.StatefulPlugins = serviceStatefulPlugins
clusterTopologyService.StatefulPlugins = servicePlugins
clusterTopologyServices = append(clusterTopologyServices, externalServices...)
externalServicesDependencies = append(externalServicesDependencies, newExternalServicesDependencies...)

// 3- Service dependencies (creates a list of services with dependencies)
dependencies, ok := serviceAnnotations["kardinal.dev.service/dependencies"]
if ok {
newServiceWithDependenciesAnnotation := &serviceWithDependenciesAnnotation{&clusterTopologyService, dependencies}
serviceWithDependencies = append(serviceWithDependencies, newServiceWithDependenciesAnnotation)
servicesWithDependencies = append(servicesWithDependencies, newServiceWithDependenciesAnnotation)
}
clusterTopologyServices = append(clusterTopologyServices, &clusterTopologyService)
}

// Set the service dependencies in the clusterTopologyService
// first iterate on the service with dependencies list
for _, svcWithDependenciesAnnotation := range serviceWithDependencies {
// Third, set the service dependencies in the clusterTopologyService
// a) iterate on the service with dependencies list
for _, serviceWithDependencies := range servicesWithDependencies {

serviceAndPorts := strings.Split(svcWithDependenciesAnnotation.dependenciesAnnotation, ",")
serviceAndPorts := strings.Split(serviceWithDependencies.dependenciesAnnotation, ",")
for _, serviceAndPort := range serviceAndPorts {
serviceAndPortParts := strings.Split(serviceAndPort, ":")
depService, depServicePort, err := getServiceAndPortFromClusterTopologyServices(serviceAndPortParts[0], serviceAndPortParts[1], clusterTopologyServices)
Expand All @@ -256,41 +276,80 @@ func processServiceConfigs(
}

serviceDependency := resolved.ServiceDependency{
Service: svcWithDependenciesAnnotation.service,
Service: serviceWithDependencies.service,
DependsOnService: depService,
DependencyPort: depServicePort,
}

clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependency)
}
}
// then add the external services dependencies
// b) add the external services dependencies
clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, externalServicesDependencies...)

return clusterTopologyServices, clusterTopologyServiceDependencies, nil
}

func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string, clusterTopologyService *resolved.Service) ([]*resolved.StatefulPlugin, []*resolved.Service, []resolved.ServiceDependency, error) {
var serviceStatefulPlugins []*resolved.StatefulPlugin
func addAvailablePluginsFromServiceConfig(serviceConfig apitypes.ServiceConfig, availablePlugins map[string]*resolved.StatefulPlugin) (map[string]*resolved.StatefulPlugin, error) {
service := serviceConfig.Service
isPlugin, pluginAnnotation := isPluginService(service)
if isPlugin {
var statefulPlugins []resolved.StatefulPlugin
err := yaml.Unmarshal([]byte(pluginAnnotation), &statefulPlugins)
if err != nil {
return nil, stacktrace.Propagate(err, "an error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName())
}

for index := range statefulPlugins {
plugin := statefulPlugins[index]
_, found := availablePlugins[plugin.ServiceName]
if found {
return nil, stacktrace.NewError("a plugin with service name '%s' already exists, the `plugin.servicename` value has to be unique", plugin.ServiceName)
}
availablePlugins[plugin.ServiceName] = &plugin
}
}

return availablePlugins, nil
}

func isPluginService(service corev1.Service) (bool, string) {
serviceAnnotations := service.GetObjectMeta().GetAnnotations()

pluginAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugin-definition"]

return ok, pluginAnnotation
}

func newServicePluginsAndExternalServicesFromServiceConfig(
serviceConfig apitypes.ServiceConfig,
version string,
clusterTopologyService *resolved.Service,
availablePlugins map[string]*resolved.StatefulPlugin,
) (
[]*resolved.StatefulPlugin,
[]*resolved.Service,
[]resolved.ServiceDependency,
error,
) {
servicePlugins := []*resolved.StatefulPlugin{}
externalServices := []*resolved.Service{}
externalServiceDependencies := []resolved.ServiceDependency{}

service := serviceConfig.Service
serviceAnnotations := service.GetObjectMeta().GetAnnotations()

sPluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"]
pluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"]
if ok {
var statefulPlugins []resolved.StatefulPlugin
err := yaml.Unmarshal([]byte(sPluginsAnnotation), &statefulPlugins)
if err != nil {
return nil, nil, nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName())
}
serviceStatefulPlugins = make([]*resolved.StatefulPlugin, len(statefulPlugins))

for index := range statefulPlugins {
pluginsServiceName := strings.Split(pluginsAnnotation, ",")
for _, pluginSvcName := range pluginsServiceName {
plugin, ok := availablePlugins[pluginSvcName]
if !ok {
return nil, nil, nil, stacktrace.NewError("expected to find plugin with service name %s but it is not available, make sure to add the resource for it in the manifest file", pluginSvcName)
}
servicePlugins = append(servicePlugins, plugin)
// TODO: consider giving external service plugins their own type, instead of using StatefulPlugins
// if this is an external service plugin, represent that service as a service in the cluster topology
plugin := statefulPlugins[index]
if plugin.Type == "external" {
logrus.Infof("Adding external service to topology..")
serviceName := plugin.ServiceName
Expand Down Expand Up @@ -318,11 +377,10 @@ func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apityp
}
externalServiceDependencies = append(externalServiceDependencies, externalServiceDependency)
}
serviceStatefulPlugins[index] = &plugin
}
}

return serviceStatefulPlugins, externalServices, externalServiceDependencies, nil
return servicePlugins, externalServices, externalServiceDependencies, nil
}

func newClusterTopologyServiceFromConfigs(
Expand All @@ -349,7 +407,7 @@ func newClusterTopologyServiceFromConfigs(
deploymentConfig.Deployment.GetObjectMeta().GetName(),
statefulSetConfig.StatefulSet.GetObjectMeta().GetName(),
}
logrus.Error("Service %s is associated with more than one workload: %v", serviceName, workloads)
logrus.Errorf("Service %s is associated with more than one workload: %v", serviceName, workloads)
}
if deploymentConfig != nil {
workload := kardinal.NewDeploymentWorkloadSpec(deploymentConfig.Deployment.Spec)
Expand Down
4 changes: 3 additions & 1 deletion kontrol-service/engine/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func TestServiceConfigsToClusterTopology(t *testing.T) {
t.Errorf("Error generating cluster: %s", err)
}

require.NoError(t, err)
require.NotEmpty(t, cluster)
redisProdService := cluster.Services[0]
require.Equal(t, redisProdService.ServiceID, "redis-prod")
require.Equal(t, redisProdService.IsExternal, false)
Expand All @@ -36,7 +38,7 @@ func TestServiceConfigsToClusterTopology(t *testing.T) {
require.Equal(t, votingAppUIService.ServiceID, "voting-app-ui")
require.Equal(t, votingAppUIService.IsExternal, false)
require.Equal(t, votingAppUIService.IsStateful, false)
require.Equal(t, *votingAppUIService.ServiceSpec, testServiceConfigs[1].Service.Spec)
require.Equal(t, *votingAppUIService.ServiceSpec, testServiceConfigs[2].Service.Spec)
require.Equal(t, *votingAppUIService.WorkloadSpec.GetTemplateSpec(), testDeploymentConfigs[1].Deployment.Spec.Template.Spec)

dependency := cluster.ServiceDependencies[0]
Expand Down
Loading

0 comments on commit 3fce223

Please sign in to comment.