Skip to content

Commit

Permalink
cli: inject service mesh
Browse files Browse the repository at this point in the history
  • Loading branch information
davidweisse committed Jun 6, 2024
1 parent c8458fc commit 747e41d
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 110 deletions.
27 changes: 22 additions & 5 deletions cli/cmd/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func NewGenerateCmd() *cobra.Command {
Short: "generate policies and inject into Kubernetes resources",
Long: `Generate policies and inject into the given Kubernetes resources.
This will add the Contrast Initializer as an init container to all workloads
with a contrast-cc runtime and then download the referenced container images to
calculate the dm-verity hashes of the image layers. In addition, the Rego policy
will be used as base and updated with the given settings file. For each
container workload, the policy is added as an annotation to the Kubernetes YAML.
This will add the Contrast Initializer and Contrast Service Mesh as init containers
to your workloads and then download the referenced container images to calculate the
dm-verity hashes of the image layers. In addition, the Rego policy will be used as
base and updated with the given settings file. For each container workload, the
policy is added as an annotation to the Kubernetes YAML.
The hashes of the policies are added to the manifest.
Expand Down Expand Up @@ -280,6 +280,9 @@ func patchTargets(paths []string, imageReplacementsFile string, skipInitializer
return fmt.Errorf("injecting Initializer: %w", err)
}
}
if err := injectServiceMesh(kubeObjs); err != nil {
return fmt.Errorf("injecting Service Mesh: %w", err)
}

kubeObjs = kuberesource.PatchImages(kubeObjs, replacements)

Expand Down Expand Up @@ -314,6 +317,20 @@ func injectInitializer(resources []any) error {
return nil
}

func injectServiceMesh(resources []any) error {
for _, resource := range resources {
deploy, ok := resource.(*applyappsv1.StatefulSetApplyConfiguration)
if ok && deploy.Spec.Template.Annotations[contrastRoleAnnotationKey] == "coordinator" {
continue
}
_, err := kuberesource.AddServiceMesh(resource, kuberesource.ServiceMeshProxy())
if err != nil {
return err
}
}
return nil
}

func runtimeClassNamePatcher() func(*applycorev1.PodSpecApplyConfiguration) *applycorev1.PodSpecApplyConfiguration {
return func(spec *applycorev1.PodSpecApplyConfiguration) *applycorev1.PodSpecApplyConfiguration {
if spec.RuntimeClassName == nil || *spec.RuntimeClassName == runtimeHandler {
Expand Down
143 changes: 90 additions & 53 deletions internal/kuberesource/mutators.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kuberesource
import (
"fmt"
"slices"
"strconv"
"strings"

applyappsv1 "k8s.io/client-go/applyconfigurations/apps/v1"
Expand All @@ -15,9 +16,12 @@ import (
)

const (
exposeServiceAnnotation = "contrast.edgeless.systems/expose-service"
contrastRoleAnnotationKey = "contrast.edgeless.systems/pod-role"
skipInitializerAnnotationKey = "contrast.edgeless.systems/skip-initializer"
exposeServiceAnnotation = "contrast.edgeless.systems/expose-service"
contrastRoleAnnotationKey = "contrast.edgeless.systems/pod-role"
skipInitializerAnnotationKey = "contrast.edgeless.systems/skip-initializer"
smIngressConfigAnnotationKey = "contrast.edgeless.systems/servicemesh-ingress"
smEgressConfigAnnotationKey = "contrast.edgeless.systems/servicemesh-egress"
smAdminInterfaceAnnotationKey = "contrast.edgeless.systems/servicemesh-admin-interface-port"
)

// AddInitializer adds an initializer and its shared volume to the resource.
Expand Down Expand Up @@ -49,23 +53,9 @@ func AddInitializer(
return nil
}

// Existing volume with unique name has to be of type EmptyDir.
var volumeExists bool
for _, volume := range spec.Volumes {
if *volume.Name == *initializer.VolumeMounts[0].Name {
volumeExists = true
if volume.EmptyDir == nil {
retErr = fmt.Errorf("volume %s has to be of type EmptyDir", *volume.Name)
return nil
}
}
}
// Create the volume written by the initializer if it not already exists.
if !volumeExists {
spec.WithVolumes(Volume().
WithName(*initializer.VolumeMounts[0].Name).
WithEmptyDir(EmptyDirVolumeSource().Inner()),
)
retErr = ensureVolumeExists(spec, *initializer.VolumeMounts[0].Name)
if retErr != nil {
return nil
}

// Remove already existing volume mounts on the worker containers with unique volume mount name.
Expand Down Expand Up @@ -95,17 +85,84 @@ const (
ServiceMeshDisabled serviceMeshMode = "service-mesh-disabled"
)

// AddServiceMesh adds a service mesh proxy to the resource.
// AddServiceMesh adds a service mesh proxy to the resource with the proxy
// configuration given in the object annotations.
//
// If the resource does not contain a PodSpec, this function does nothing.
// This function is not idempotent.
// This function is idempotent.
func AddServiceMesh(
resource any,
serviceMeshProxy *applycorev1.ContainerApplyConfiguration,
) any {
return MapPodSpec(resource, func(spec *applycorev1.PodSpecApplyConfiguration) *applycorev1.PodSpecApplyConfiguration {
) (res any, retErr error) {
res = MapPodSpecWithMeta(resource, func(meta *applymetav1.ObjectMetaApplyConfiguration, spec *applycorev1.PodSpecApplyConfiguration) *applycorev1.PodSpecApplyConfiguration {
if spec.RuntimeClassName == nil || !strings.HasPrefix(*spec.RuntimeClassName, "contrast-cc") {
return spec
}

ingressConfig, ingressOk := meta.Annotations[smIngressConfigAnnotationKey]
egressConfig, egressOk := meta.Annotations[smEgressConfigAnnotationKey]
portAnnotation, portOk := meta.Annotations[smAdminInterfaceAnnotationKey]

// Don't change anything if automatic service mesh injection isn't enabled.
if !ingressOk && !egressOk && !portOk {
return spec
}

// Remove already existing init containers with unique service mesh name.
spec.InitContainers = slices.DeleteFunc(spec.InitContainers, func(c applycorev1.ContainerApplyConfiguration) bool {
return *c.Name == *serviceMeshProxy.Name
})

retErr = ensureVolumeExists(spec, *serviceMeshProxy.VolumeMounts[0].Name)
if retErr != nil {
return nil
}

if portAnnotation != "" {
port, err := strconv.Atoi(portAnnotation)
if retErr != nil {
retErr = fmt.Errorf("parsing service mesh admin interface port: %w", err)
return nil
}

ingressConfig += fmt.Sprintf("##admin#%s#true", portAnnotation)
serviceMeshProxy.
WithEnv(NewEnvVar("EDG_ADMIN_PORT", portAnnotation)).
WithPorts(
ContainerPort().
WithName("admin-interface").
WithContainerPort(int32(port)),
)
}

if ingressConfig != "" {
serviceMeshProxy.WithEnv(NewEnvVar("EDG_INGRESS_PROXY_CONFIG", ingressConfig))
}
if egressConfig != "" {
serviceMeshProxy.WithEnv(NewEnvVar("EDG_EGRESS_PROXY_CONFIG", egressConfig))
}

return spec.WithInitContainers(serviceMeshProxy)
})
return res, retErr
}

func ensureVolumeExists(spec *applycorev1.PodSpecApplyConfiguration, volumeName string) error {
// Existing volume with unique name has to be of type EmptyDir.
for _, volume := range spec.Volumes {
if *volume.Name == volumeName {
if volume.EmptyDir == nil {
return fmt.Errorf("volume %s has to be of type EmptyDir", *volume.Name)
}
return nil
}
}
// Create the volume written if it not already exists.
spec.WithVolumes(Volume().
WithName(volumeName).
WithEmptyDir(EmptyDirVolumeSource().Inner()),
)
return nil
}

// AddPortForwarders adds a port-forwarder for each Service resource.
Expand Down Expand Up @@ -201,38 +258,18 @@ func PatchNamespaces(resources []any, namespace string) []any {
// PatchServiceMeshAdminInterface activates the admin interface on the
// specified port for all Service Mesh components in a set of resources.
func PatchServiceMeshAdminInterface(resources []any, port int32) []any {
var out []any
for _, resource := range resources {
switch r := resource.(type) {
case *applyappsv1.DeploymentApplyConfiguration:
for i := 0; i < len(r.Spec.Template.Spec.InitContainers); i++ {
// TODO(davidweisse): find service mesh containers by unique name as specified in RFC 005.
if strings.Contains(*r.Spec.Template.Spec.InitContainers[i].Image, "service-mesh-proxy") {
r.Spec.Template.Spec.InitContainers[i] = *r.Spec.Template.Spec.InitContainers[i].
WithEnv(NewEnvVar("EDG_ADMIN_PORT", fmt.Sprint(port))).
WithPorts(
ContainerPort().
WithName("admin-interface").
WithContainerPort(port),
)
ingressProxyConfig := false
for j, env := range r.Spec.Template.Spec.InitContainers[i].Env {
if *env.Name == "EDG_INGRESS_PROXY_CONFIG" {
ingressProxyConfig = true
env.WithValue(fmt.Sprintf("%s##admin#%d#true", *env.Value, port))
r.Spec.Template.Spec.InitContainers[i].Env[j] = env
break
}
}
if !ingressProxyConfig {
r.Spec.Template.Spec.InitContainers[i].WithEnv(
NewEnvVar("EDG_INGRESS_PROXY_CONFIG", fmt.Sprintf("admin#%d#true", port)),
)
}
}
out = append(out, MapPodSpecWithMeta(resource, func(meta *applymetav1.ObjectMetaApplyConfiguration, spec *applycorev1.PodSpecApplyConfiguration) *applycorev1.PodSpecApplyConfiguration {
_, ingressOk := meta.Annotations[smIngressConfigAnnotationKey]
_, egressOk := meta.Annotations[smEgressConfigAnnotationKey]
if ingressOk || egressOk {
meta.WithAnnotations(map[string]string{smAdminInterfaceAnnotationKey: fmt.Sprint(port)})
}
}
return spec
}))
}
return resources
return out
}

// PatchCoordinatorMetrics enables Coordinator metrics on the specified port.
Expand Down
125 changes: 113 additions & 12 deletions internal/kuberesource/mutators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,117 @@ func TestAddInitializer(t *testing.T) {
}

func TestAddServiceMesh(t *testing.T) {
require := require.New(t)
d := applyappsv1.Deployment("test", "default").
WithSpec(applyappsv1.DeploymentSpec().
WithTemplate(applycorev1.PodTemplateSpec().
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container()))))

smProxy := ServiceMeshProxy()
AddServiceMesh(d, smProxy)

require.NotEmpty(d.Spec.Template.Spec.InitContainers)
require.Equal(d.Spec.Template.Spec.InitContainers[0], *smProxy)
for _, tc := range []struct {
name string
d *applyappsv1.DeploymentApplyConfiguration
skipServiceMesh bool
wantError bool
}{
{
name: "default",
d: applyappsv1.Deployment("test", "default").
WithAnnotations(map[string]string{smIngressConfigAnnotationKey: ""}).
WithSpec(applyappsv1.DeploymentSpec().
WithTemplate(applycorev1.PodTemplateSpec().
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container()).
WithRuntimeClassName("contrast-cc"),
))),
wantError: false,
},
{
name: "no service mesh",
d: applyappsv1.Deployment("test", "default").
WithSpec(applyappsv1.DeploymentSpec().
WithTemplate(applycorev1.PodTemplateSpec().
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container()).
WithRuntimeClassName("contrast-cc"),
))),
skipServiceMesh: true,
wantError: false,
},
{
name: "service mesh replaced",
d: applyappsv1.Deployment("test", "default").
WithAnnotations(map[string]string{smIngressConfigAnnotationKey: ""}).
WithSpec(applyappsv1.DeploymentSpec().
WithTemplate(applycorev1.PodTemplateSpec().
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container()).
WithInitContainers(ServiceMeshProxy()).
WithRuntimeClassName("contrast-cc"),
))),
wantError: false,
},
{
name: "volume reused",
d: applyappsv1.Deployment("test", "default").
WithAnnotations(map[string]string{smIngressConfigAnnotationKey: ""}).
WithSpec(applyappsv1.DeploymentSpec().
WithTemplate(applycorev1.PodTemplateSpec().
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container()).
WithRuntimeClassName("contrast-cc").
WithVolumes(Volume().
WithName(*ServiceMeshProxy().VolumeMounts[0].Name).
WithEmptyDir(EmptyDirVolumeSource().Inner()),
),
))),
wantError: false,
},
{
name: "volume is not an EmptyDir",
d: applyappsv1.Deployment("test", "default").
WithAnnotations(map[string]string{smIngressConfigAnnotationKey: ""}).
WithSpec(applyappsv1.DeploymentSpec().
WithTemplate(applycorev1.PodTemplateSpec().
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container()).
WithRuntimeClassName("contrast-cc").
WithVolumes(Volume().
WithName(*ServiceMeshProxy().VolumeMounts[0].Name).
WithConfigMap(Volume().ConfigMap),
),
))),
wantError: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
require := require.New(t)

_, err := AddServiceMesh(tc.d, ServiceMeshProxy())
if tc.wantError {
require.Error(err)
return
}
require.NoError(err)

if tc.skipServiceMesh {
require.Empty(tc.d.Spec.Template.Spec.InitContainers)
return
}
require.NotEmpty(tc.d.Spec.Template.Spec.InitContainers)
require.Equal(*tc.d.Spec.Template.Spec.InitContainers[0].Name, *ServiceMeshProxy().Name)
require.NotEmpty(tc.d.Spec.Template.Spec.InitContainers[0].VolumeMounts)
require.Equal(*tc.d.Spec.Template.Spec.InitContainers[0].VolumeMounts[0].Name, *ServiceMeshProxy().VolumeMounts[0].Name)

serviceMeshCount := 0
for _, c := range tc.d.Spec.Template.Spec.InitContainers {
if *c.Name == *ServiceMeshProxy().Name {
serviceMeshCount++
}
}
require.Equal(1, serviceMeshCount)

require.NotEmpty(tc.d.Spec.Template.Spec.Volumes)
serviceMeshVolumeCount := 0
for _, v := range tc.d.Spec.Template.Spec.Volumes {
if *v.Name == *ServiceMeshProxy().VolumeMounts[0].Name {
serviceMeshVolumeCount++
}
}
require.Equal(1, serviceMeshVolumeCount)
})
}
}
2 changes: 1 addition & 1 deletion internal/kuberesource/parts.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func Initializer() *applycorev1.ContainerApplyConfiguration {
// ServiceMeshProxy creates a new service mesh proxy sidecar container.
func ServiceMeshProxy() *applycorev1.ContainerApplyConfiguration {
return applycorev1.Container().
WithName("sidecar").
WithName("contrast-service-mesh").
WithImage("ghcr.io/edgelesssys/contrast/service-mesh-proxy:latest").
WithRestartPolicy(corev1.ContainerRestartPolicyAlways).
WithVolumeMounts(VolumeMount().
Expand Down
Loading

0 comments on commit 747e41d

Please sign in to comment.