Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
refactor generateClusterTopology to improve legibility (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoporoli authored Sep 26, 2024
1 parent a48e1ab commit df2a6f7
Showing 1 changed file with 174 additions and 111 deletions.
285 changes: 174 additions & 111 deletions kontrol-service/engine/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package engine

import (
"fmt"
"strings"

apitypes "github.com/kurtosis-tech/kardinal/libs/cli-kontrol-api/api/golang/types"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"

apitypes "github.com/kurtosis-tech/kardinal/libs/cli-kontrol-api/api/golang/types"
corev1 "k8s.io/api/core/v1"
net "k8s.io/api/networking/v1"
gateway "sigs.k8s.io/gateway-api/apis/v1"
"strings"

"kardinal.kontrol-service/engine/flow"
"kardinal.kontrol-service/plugins"
Expand Down Expand Up @@ -75,30 +75,36 @@ func generateClusterTopology(
ingressConfigs []apitypes.IngressConfig,
gatewayConfigs []apitypes.GatewayConfig,
routeConfigs []apitypes.RouteConfig,
namespace, version string,
namespace string,
version string,
) (*resolved.ClusterTopology, error) {
clusterTopology := resolved.ClusterTopology{}
clusterTopologyServices := []*resolved.Service{}
clusterTopologyServiceDependencies := []resolved.ServiceDependency{}
clusterTopology.Namespace = namespace
clusterTopologyGatewayAndRoutes := processGatewayAndRouteConfigs(gatewayConfigs, routeConfigs, version)
clusterTopologyIngress := processIngressConfigs(ingressConfigs, version)
clusterTopologyServices, clusterTopologyServiceDependencies, err := processServiceConfigs(serviceConfigs, version)
if err != nil {
return nil, stacktrace.NewError("an error occurred processing the service configs")
}

clusterTopologyIngress := resolved.Ingress{
ActiveFlowIDs: []string{version},
Ingresses: []net.Ingress{},
// some validations
if len(clusterTopologyIngress.Ingresses) == 0 && len(clusterTopologyGatewayAndRoutes.Gateways) == 0 && len(clusterTopologyGatewayAndRoutes.GatewayRoutes) == 0 {
logrus.Warnf("No ingress or gateway found in the service configs")
}
if len(clusterTopologyServices) == 0 {
return nil, stacktrace.NewError("At least one service is required in addition to the ingress service(s)")
}

for _, ingressConfig := range ingressConfigs {
ingress := ingressConfig.Ingress
ingressAnnotations := ingress.GetObjectMeta().GetAnnotations()
clusterTopology := resolved.ClusterTopology{}
clusterTopology.Namespace = namespace
clusterTopology.Ingress = clusterTopologyIngress
clusterTopology.GatewayAndRoutes = clusterTopologyGatewayAndRoutes
clusterTopology.Services = clusterTopologyServices
clusterTopology.ServiceDependencies = clusterTopologyServiceDependencies

// Ingress?
isIngress, ok := ingressAnnotations["kardinal.dev.service/ingress"]
if ok && isIngress == "true" {
clusterTopologyIngress.Ingresses = append(clusterTopologyIngress.Ingresses, ingress)
}
}
return &clusterTopology, nil
}

gatewayAndRoutes := resolved.GatewayAndRoutes{
func processGatewayAndRouteConfigs(gatewayConfigs []apitypes.GatewayConfig, routeConfigs []apitypes.RouteConfig, version string) *resolved.GatewayAndRoutes {
gatewayAndRoutes := &resolved.GatewayAndRoutes{
ActiveFlowIDs: []string{version},
Gateways: []*gateway.Gateway{},
GatewayRoutes: []*gateway.HTTPRouteSpec{},
Expand Down Expand Up @@ -131,126 +137,183 @@ func generateClusterTopology(
gatewayAndRoutes.GatewayRoutes = append(gatewayAndRoutes.GatewayRoutes, &route.Spec)
}
}
clusterTopology.GatewayAndRoutes = &gatewayAndRoutes
return gatewayAndRoutes
}

func processServiceConfigs(serviceConfigs []apitypes.ServiceConfig, version string) ([]*resolved.Service, []resolved.ServiceDependency, error) {
clusterTopologyServices := []*resolved.Service{}
clusterTopologyServiceDependencies := []resolved.ServiceDependency{}
externalServicesDependencies := []resolved.ServiceDependency{}

type serviceWithDependenciesAnnotation struct {
service *resolved.Service
dependenciesAnnotation string
}
serviceWithDependencies := []*serviceWithDependenciesAnnotation{}

for _, serviceConfig := range serviceConfigs {
service := serviceConfig.Service
deployment := serviceConfig.Deployment
serviceAnnotations := service.GetObjectMeta().GetAnnotations()

// Service
// 1- Service
logrus.Infof("Processing service: %v", service.GetObjectMeta().GetName())
clusterTopologyService := resolved.Service{
ServiceID: service.GetObjectMeta().GetName(),
Version: version,
ServiceSpec: &service.Spec,
DeploymentSpec: &deployment.Spec,
}
isStateful, ok := serviceAnnotations["kardinal.dev.service/stateful"]
if ok && isStateful == "true" {
clusterTopologyService.IsStateful = true
}
isExternal, ok := serviceAnnotations["kardinal.dev.service/external"]
if ok && isExternal == "true" {
clusterTopologyService.IsExternal = true
}
clusterTopologyService := newClusterTopologyServiceFromServiceConfig(serviceConfig, version)

isShared, ok := serviceAnnotations["kardinal.dev.service/shared"]
if ok && isShared == "true" {
clusterTopologyService.IsShared = true
// 2- Service plugins
serviceStatefulPlugins, externalServices, newExternalServicesDependencies, err := newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig, version, &clusterTopologyService)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred creating new stateful plugins and external services from service config '%s'", service.Name)
}
clusterTopologyService.StatefulPlugins = serviceStatefulPlugins
clusterTopologyServices = append(clusterTopologyServices, externalServices...)
externalServicesDependencies = append(externalServicesDependencies, newExternalServicesDependencies...)

// Service plugin?
sPlugins, ok := serviceAnnotations["kardinal.dev.service/plugins"]
// 3- Service dependencies (creates a list of services with dependencies)
dependencies, ok := serviceAnnotations["kardinal.dev.service/dependencies"]
if ok {
var statefulPlugins []resolved.StatefulPlugin
err := yaml.Unmarshal([]byte(sPlugins), &statefulPlugins)
newServiceWithDependenciesAnnotation := &serviceWithDependenciesAnnotation{&clusterTopologyService, dependencies}
serviceWithDependencies = append(serviceWithDependencies, newServiceWithDependenciesAnnotation)
}
clusterTopologyServices = append(clusterTopologyServices, &clusterTopologyService)
}

// Set the service dependencies in the clusterTopologyService
// first iterate on the service with dependencies list
for _, svcWithDependenciesAnnotation := range serviceWithDependencies {

serviceAndPorts := strings.Split(svcWithDependenciesAnnotation.dependenciesAnnotation, ",")
for _, serviceAndPort := range serviceAndPorts {
serviceAndPortParts := strings.Split(serviceAndPort, ":")
depService, depServicePort, err := getServiceAndPortFromClusterTopologyServices(serviceAndPortParts[0], serviceAndPortParts[1], clusterTopologyServices)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName())
return nil, nil, stacktrace.Propagate(err, "An error occurred finding the service dependency for service %s and port %s", serviceAndPortParts[0], serviceAndPortParts[1])
}
serviceStatefulPlugins := make([]*resolved.StatefulPlugin, len(statefulPlugins))
for index := range statefulPlugins {
logrus.Infof("Voting App UI Plugin: %v", statefulPlugins[index].Name)
// TODO: consider giving external service plugins their own type, instead of using StatefulPlugins
// if this is an external service plugin, represent that service as a service in the cluster topology
plugin := statefulPlugins[index]
if plugin.Type == "external" {
logrus.Infof("Adding external service to topology..")
serviceName := plugin.ServiceName
logrus.Infof("plugin service name: %v", plugin.ServiceName)
if serviceName == "" {
serviceName = fmt.Sprintf("%v:%v", clusterTopologyService.ServiceID, "external")
}
externalService := resolved.Service{
ServiceID: serviceName,
Version: version,
ServiceSpec: nil, // leave empty for now
DeploymentSpec: nil, // leave empty for now
IsExternal: true,
// external services can definitely be stateful but for now treat external and stateful services as mutually exclusive to make plugin logic easier to handle
IsStateful: false,
}

clusterTopologyServices = append(clusterTopologyServices, &externalService)

externalServiceDependency := resolved.ServiceDependency{
Service: &clusterTopologyService,
DependsOnService: &externalService,
DependencyPort: nil,
}
clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, externalServiceDependency)
}
serviceStatefulPlugins[index] = &plugin
serviceDependency := resolved.ServiceDependency{
Service: svcWithDependenciesAnnotation.service,
DependsOnService: depService,
DependencyPort: depServicePort,
}
clusterTopologyService.StatefulPlugins = serviceStatefulPlugins
}

clusterTopologyServices = append(clusterTopologyServices, &clusterTopologyService)
clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependency)
}
}
// then add the external services dependencies
clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, externalServicesDependencies...)

if len(clusterTopologyIngress.Ingresses) == 0 && len(gatewayAndRoutes.Gateways) == 0 && len(gatewayAndRoutes.GatewayRoutes) == 0 {
logrus.Warnf("No ingress or gateway found in the service configs")
}
clusterTopology.Ingress = &clusterTopologyIngress
return clusterTopologyServices, clusterTopologyServiceDependencies, nil
}

if len(clusterTopologyServices) == 0 {
return nil, stacktrace.NewError("At least one service is required in addition to the ingress service(s)")
}
clusterTopology.Services = clusterTopologyServices
func newStatefulPluginsAndExternalServicesFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string, clusterTopologyService *resolved.Service) ([]*resolved.StatefulPlugin, []*resolved.Service, []resolved.ServiceDependency, error) {
var serviceStatefulPlugins []*resolved.StatefulPlugin
externalServices := []*resolved.Service{}
externalServiceDependencies := []resolved.ServiceDependency{}

for _, serviceConfig := range serviceConfigs {
service := serviceConfig.Service
serviceAnnotations := service.GetObjectMeta().GetAnnotations()
service := serviceConfig.Service
serviceAnnotations := service.GetObjectMeta().GetAnnotations()

clusterTopologyService, err := clusterTopology.GetService(service.GetObjectMeta().GetName())
sPluginsAnnotation, ok := serviceAnnotations["kardinal.dev.service/plugins"]
if ok {
var statefulPlugins []resolved.StatefulPlugin
err := yaml.Unmarshal([]byte(sPluginsAnnotation), &statefulPlugins)
if err != nil {
logrus.Fatalf("An error occurred finding service %s in the list of services", service.GetObjectMeta().GetName())
return nil, stacktrace.Propagate(err, "An error occurred finding service %s in the list of services", service.GetObjectMeta().GetName())
return nil, nil, nil, stacktrace.Propagate(err, "An error occurred parsing the plugins for service %s", service.GetObjectMeta().GetName())
}

// Service dependencies?
deps, ok := serviceAnnotations["kardinal.dev.service/dependencies"]
if ok {
serviceAndPorts := strings.Split(deps, ",")
for _, serviceAndPort := range serviceAndPorts {
serviceAndPortParts := strings.Split(serviceAndPort, ":")
depService, depServicePort, err := clusterTopology.GetServiceAndPort(serviceAndPortParts[0], serviceAndPortParts[1])
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred finding the service dependency for service %s and port %s", serviceAndPortParts[0], serviceAndPortParts[1])
serviceStatefulPlugins = make([]*resolved.StatefulPlugin, len(statefulPlugins))

for index := range statefulPlugins {
// TODO: consider giving external service plugins their own type, instead of using StatefulPlugins
// if this is an external service plugin, represent that service as a service in the cluster topology
plugin := statefulPlugins[index]
if plugin.Type == "external" {
logrus.Infof("Adding external service to topology..")
serviceName := plugin.ServiceName
logrus.Infof("plugin service name: %v", plugin.ServiceName)
if serviceName == "" {
serviceID := service.GetObjectMeta().GetName()
serviceName = fmt.Sprintf("%v:%v", serviceID, "external")
}
externalService := resolved.Service{
ServiceID: serviceName,
Version: version,
ServiceSpec: nil, // leave empty for now
DeploymentSpec: nil, // leave empty for now
IsExternal: true,
// external services can definitely be stateful but for now treat external and stateful services as mutually exclusive to make plugin logic easier to handle
IsStateful: false,
}

serviceDependency := resolved.ServiceDependency{
externalServices = append(externalServices, &externalService)

externalServiceDependency := resolved.ServiceDependency{
Service: clusterTopologyService,
DependsOnService: depService,
DependencyPort: depServicePort,
DependsOnService: &externalService,
DependencyPort: nil,
}
externalServiceDependencies = append(externalServiceDependencies, externalServiceDependency)
}
serviceStatefulPlugins[index] = &plugin
}
}

return serviceStatefulPlugins, externalServices, externalServiceDependencies, nil
}

clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependency)
func newClusterTopologyServiceFromServiceConfig(serviceConfig apitypes.ServiceConfig, version string) resolved.Service {
service := serviceConfig.Service
deployment := serviceConfig.Deployment
serviceAnnotations := service.GetObjectMeta().GetAnnotations()

clusterTopologyService := resolved.Service{
ServiceID: service.GetObjectMeta().GetName(),
Version: version,
ServiceSpec: &service.Spec,
DeploymentSpec: &deployment.Spec,
}
isStateful, ok := serviceAnnotations["kardinal.dev.service/stateful"]
if ok && isStateful == "true" {
clusterTopologyService.IsStateful = true
}
isExternal, ok := serviceAnnotations["kardinal.dev.service/external"]
if ok && isExternal == "true" {
clusterTopologyService.IsExternal = true
}

isShared, ok := serviceAnnotations["kardinal.dev.service/shared"]
if ok && isShared == "true" {
clusterTopologyService.IsShared = true
}
return clusterTopologyService
}

func getServiceAndPortFromClusterTopologyServices(serviceName string, servicePortName string, clusterTopologyServices []*resolved.Service) (*resolved.Service, *corev1.ServicePort, error) {
for _, service := range clusterTopologyServices {
if service.ServiceID == serviceName {
for _, port := range service.ServiceSpec.Ports {
if port.Name == servicePortName {
return service, &port, nil
}
}
}
}

clusterTopology.ServiceDependencies = clusterTopologyServiceDependencies
return nil, nil, stacktrace.NewError("Service %s and Port %s not found in the list of services", serviceName, servicePortName)
}

return &clusterTopology, nil
func processIngressConfigs(ingressConfigs []apitypes.IngressConfig, version string) *resolved.Ingress {
clusterTopologyIngress := &resolved.Ingress{
ActiveFlowIDs: []string{version},
Ingresses: []net.Ingress{},
}
for _, ingressConfig := range ingressConfigs {
ingress := ingressConfig.Ingress
ingressAnnotations := ingress.GetObjectMeta().GetAnnotations()

// Ingress?
isIngress, ok := ingressAnnotations["kardinal.dev.service/ingress"]
if ok && isIngress == "true" {
clusterTopologyIngress.Ingresses = append(clusterTopologyIngress.Ingresses, ingress)
}
}
return clusterTopologyIngress
}

0 comments on commit df2a6f7

Please sign in to comment.