Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netlink #101

Merged
merged 4 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions pkg/clients/exec_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,25 @@ package clients
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
)

const (
startTimeout = 5 * time.Second
deletionTimeout = 10 * time.Minute
)

type ExecContext interface {
ExecCommand([]string) (string, string, error)
ExecCommandStdIn([]string, bytes.Buffer) (string, string, error)
Expand Down Expand Up @@ -155,3 +164,211 @@ func (c *ContainerExecContext) ExecCommand(command []string) (stdout, stderr str
func (c *ContainerExecContext) ExecCommandStdIn(command []string, buffIn bytes.Buffer) (stdout, stderr string, err error) {
return c.execCommand(command, &buffIn)
}

// ContainerExecContext encapsulates the context in which a command is run; the namespace, pod, and container.
type ContainerCreationExecContext struct {
*ContainerExecContext
labels map[string]string
pod *corev1.Pod
containerSecurityContext *corev1.SecurityContext
containerImage string
command []string
volumes []*Volume
hostNetwork bool
}

type Volume struct {
VolumeSource corev1.VolumeSource
Name string
MountPath string
}

func (c *ContainerCreationExecContext) createPod() error {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: c.podName,
Namespace: c.namespace,
Labels: c.labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: c.containerName,
Image: c.containerImage,
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
HostNetwork: c.hostNetwork,
},
}
if len(c.command) > 0 {
pod.Spec.Containers[0].Command = c.command
}
if c.containerSecurityContext != nil {
pod.Spec.Containers[0].SecurityContext = c.containerSecurityContext
}
if len(c.volumes) > 0 {
volumes := make([]corev1.Volume, 0)
volumeMounts := make([]corev1.VolumeMount, 0)

for _, v := range c.volumes {
volumes = append(volumes, corev1.Volume{Name: v.Name, VolumeSource: v.VolumeSource})
pod.Spec.Volumes = volumes
volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: v.Name, MountPath: v.MountPath})
pod.Spec.Containers[0].VolumeMounts = volumeMounts
}
}

pod, err := c.clientset.K8sClient.CoreV1().Pods(pod.Namespace).Create(
context.TODO(),
pod,
metav1.CreateOptions{},
)
c.pod = pod
if err != nil {
return fmt.Errorf("failed to create pod: %w", err)
}
return nil
}

func (c *ContainerCreationExecContext) listPods(options *metav1.ListOptions) (*corev1.PodList, error) {
pods, err := c.clientset.K8sClient.CoreV1().Pods(c.pod.Namespace).List(
context.TODO(),
*options,
)
if err != nil {
return pods, fmt.Errorf("failed to find pods: %s", err.Error())
}
return pods, nil
}

func (c *ContainerCreationExecContext) refeshPod() error {
pods, err := c.listPods(&metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", c.podName).String(),
ResourceVersion: c.pod.ResourceVersion,
})
if err != nil {
return err
}
if len(pods.Items) == 0 {
return fmt.Errorf("failed to find pod: %s", c.podName)
}
c.pod = &pods.Items[0]

return nil
}

func (c *ContainerCreationExecContext) isPodRunning() (bool, error) {
err := c.refeshPod()
if err != nil {
return false, err
}
if c.pod.Status.Phase == corev1.PodRunning {
return true, nil
}
return false, nil
}

func (c *ContainerCreationExecContext) waitForPodToStart() error {
start := time.Now()
for time.Since(start) <= startTimeout {
running, err := c.isPodRunning()
if err != nil {
return err
}
if running {
return nil
}
time.Sleep(time.Microsecond)
}
return errors.New("timed out waiting for pod to start")
}

func (c *ContainerCreationExecContext) CreatePodAndWait() error {
var err error
running := false
if c.pod != nil {
running, err = c.isPodRunning()
if err != nil {
return err
}
}
if !running {
err := c.createPod()
if err != nil {
return err
}
}
return c.waitForPodToStart()
}

func (c *ContainerCreationExecContext) deletePod() error {
deletePolicy := metav1.DeletePropagationForeground
err := c.clientset.K8sClient.CoreV1().Pods(c.pod.Namespace).Delete(
context.TODO(),
c.pod.Name,
metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
return fmt.Errorf("failed to delete pod: %w", err)
}
return nil
}

func (c *ContainerCreationExecContext) waitForPodToDelete() error {
start := time.Now()
for time.Since(start) <= deletionTimeout {
pods, err := c.listPods(&metav1.ListOptions{})
if err != nil {
return err
}
found := false
for _, pod := range pods.Items { //nolint:gocritic // This isn't my object I can't use a pointer
if pod.Name == c.podName {
found = true
}
}
if !found {
return nil
}
time.Sleep(time.Microsecond)
}
return errors.New("pod has not terminated within the timeout")
}

func (c *ContainerCreationExecContext) DeletePodAndWait() error {
err := c.deletePod()
if err != nil {
return err
}
return c.waitForPodToDelete()
}

func NewContainerCreationExecContext(
crwr45 marked this conversation as resolved.
Show resolved Hide resolved
clientset *Clientset,
namespace, podName, containerName, containerImage string,
labels map[string]string,
command []string,
containerSecurityContext *corev1.SecurityContext,
hostNetwork bool,
volumes []*Volume,
) *ContainerCreationExecContext {
ctx := ContainerExecContext{
namespace: namespace,
podNamePrefix: podName,
podName: podName,
containerName: containerName,
clientset: clientset,
}

return &ContainerCreationExecContext{
ContainerExecContext: &ctx,
containerImage: containerImage,
labels: labels,
command: command,
containerSecurityContext: containerSecurityContext,
hostNetwork: hostNetwork,
volumes: volumes,
}
}
47 changes: 43 additions & 4 deletions pkg/collectors/contexts/contexts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ package contexts
import (
"fmt"

corev1 "k8s.io/api/core/v1"

"github.com/redhat-partner-solutions/vse-sync-collection-tools/pkg/clients"
)

const (
PTPNamespace = "openshift-ptp"
PTPPodNamePrefix = "linuxptp-daemon-"
PTPContainer = "linuxptp-daemon-container"
GPSContainer = "gpsd"
PTPNamespace = "openshift-ptp"
PTPPodNamePrefix = "linuxptp-daemon-"
PTPContainer = "linuxptp-daemon-container"
GPSContainer = "gpsd"
NetlinkDebugPod = "ptp-dpll-netlink-debug-pod"
NetlinkDebugContainer = "ptp-dpll-netlink-debug-container"
NetlinkDebugContainerImage = "quay.io/jnunez/tools:dpll9.2_dev"
nocturnalastro marked this conversation as resolved.
Show resolved Hide resolved
)

func GetPTPDaemonContext(clientset *clients.Clientset) (clients.ExecContext, error) {
Expand All @@ -22,3 +27,37 @@ func GetPTPDaemonContext(clientset *clients.Clientset) (clients.ExecContext, err
}
return ctx, nil
}

func GetNetlinkContext(clientset *clients.Clientset) (*clients.ContainerCreationExecContext, error) {
hpt := corev1.HostPathDirectory
ctx := clients.NewContainerCreationExecContext(
clientset,
PTPNamespace,
NetlinkDebugPod,
NetlinkDebugContainer,
NetlinkDebugContainerImage,
map[string]string{},
[]string{"sleep", "inf"},
&corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
// Requires NET_ADMIN: having (NET_RAW + NET_BIND_SERVICE + NET_BROADCAST) does not work
// Without NET_ADMIN it will not connect to the netlink interface
// Requires SYS_AMDIN: having every other permission does not work.
// Without SYS_ADMIN lspci does not include the Serial number in the comments thefore can not calculate the clockID
Add: []corev1.Capability{
"SYS_ADMIN",
"NET_ADMIN",
},
},
},
true,
[]*clients.Volume{
{
Name: "modules",
MountPath: "/lib/modules",
VolumeSource: corev1.VolumeSource{HostPath: &corev1.HostPathVolumeSource{Path: "/lib/modules", Type: &hpt}},
},
},
)
return ctx, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ const (
unitConversionFactor = 100
)

type DevDPLLInfo struct {
type DevFilesystemDPLLInfo struct {
Timestamp string `fetcherKey:"date" json:"timestamp"`
EECState string `fetcherKey:"dpll_0_state" json:"eecstate"`
PPSState string `fetcherKey:"dpll_1_state" json:"state"`
PPSOffset float64 `fetcherKey:"dpll_1_offset" json:"terror"`
}

// AnalyserJSON returns the json expected by the analysers
func (dpllInfo *DevDPLLInfo) GetAnalyserFormat() ([]*callbacks.AnalyserFormatType, error) {
func (dpllInfo *DevFilesystemDPLLInfo) GetAnalyserFormat() ([]*callbacks.AnalyserFormatType, error) {
formatted := callbacks.AnalyserFormatType{
ID: "dpll/time-error",
Data: map[string]any{
Expand All @@ -41,14 +41,14 @@ func (dpllInfo *DevDPLLInfo) GetAnalyserFormat() ([]*callbacks.AnalyserFormatTyp
}

var (
dpllFetcher map[string]*fetcher.Fetcher
dpllFSFetcher map[string]*fetcher.Fetcher
)

func init() {
dpllFetcher = make(map[string]*fetcher.Fetcher)
dpllFSFetcher = make(map[string]*fetcher.Fetcher)
}

func postProcessDPLL(result map[string]string) (map[string]any, error) {
func postProcessDPLLFilesystem(result map[string]string) (map[string]any, error) {
processedResult := make(map[string]any)
offset, err := strconv.ParseFloat(result["dpll_1_offset"], 32)
if err != nil {
Expand All @@ -58,9 +58,9 @@ func postProcessDPLL(result map[string]string) (map[string]any, error) {
return processedResult, nil
}

// BuildDPLLInfoFetcher popluates the fetcher required for
// BuildFilesystemDPLLInfoFetcher popluates the fetcher required for
// collecting the DPLLInfo
func BuildDPLLInfoFetcher(interfaceName string) error { //nolint:dupl // Further dedup risks be too abstract or fragile
func BuildFilesystemDPLLInfoFetcher(interfaceName string) error { //nolint:dupl // Further dedup risks be too abstract or fragile
fetcherInst, err := fetcher.FetcherFactory(
[]*clients.Cmd{dateCmd},
[]fetcher.AddCommandArgs{
Expand All @@ -85,21 +85,21 @@ func BuildDPLLInfoFetcher(interfaceName string) error { //nolint:dupl // Further
log.Errorf("failed to create fetcher for dpll: %s", err.Error())
return fmt.Errorf("failed to create fetcher for dpll: %w", err)
}
dpllFetcher[interfaceName] = fetcherInst
fetcherInst.SetPostProcessor(postProcessDPLL)
dpllFSFetcher[interfaceName] = fetcherInst
fetcherInst.SetPostProcessor(postProcessDPLLFilesystem)
return nil
}

// GetDevDPLLInfo returns the device DPLL info for an interface.
func GetDevDPLLInfo(ctx clients.ExecContext, interfaceName string) (DevDPLLInfo, error) {
dpllInfo := DevDPLLInfo{}
fetcherInst, fetchedInstanceOk := dpllFetcher[interfaceName]
// GetDevDPLLFilesystemInfo returns the device DPLL info for an interface.
func GetDevDPLLFilesystemInfo(ctx clients.ExecContext, interfaceName string) (DevFilesystemDPLLInfo, error) {
dpllInfo := DevFilesystemDPLLInfo{}
fetcherInst, fetchedInstanceOk := dpllFSFetcher[interfaceName]
if !fetchedInstanceOk {
err := BuildDPLLInfoFetcher(interfaceName)
err := BuildFilesystemDPLLInfoFetcher(interfaceName)
if err != nil {
return dpllInfo, err
}
fetcherInst, fetchedInstanceOk = dpllFetcher[interfaceName]
fetcherInst, fetchedInstanceOk = dpllFSFetcher[interfaceName]
if !fetchedInstanceOk {
return dpllInfo, errors.New("failed to create fetcher for DPLLInfo")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("NewContainerContext", func() {

ctx, err := clients.NewContainerContext(clientset, "TestNamespace", "Test", "TestContainer")
Expect(err).NotTo(HaveOccurred())
info, err := devices.GetDevDPLLInfo(ctx, "aFakeInterface")
info, err := devices.GetDevDPLLFilesystemInfo(ctx, "aFakeInterface")
Expect(err).NotTo(HaveOccurred())
Expect(info.Timestamp).To(Equal("2023-06-16T11:49:47.0584Z"))
Expect(info.EECState).To(Equal(eecState))
Expand Down
Loading
Loading