From ab9ffbe0149b6abd50ee3568880065fd5130c9be Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Fri, 6 Dec 2024 14:28:08 -0600 Subject: [PATCH] Introduce a copy-offload daemon in a user container (#238) This daemon is an http server that accepts copy-offload requests and executes them directly. The daemon is in a container, on the rabbit, and was placed there by a "#DW container" directive. This commit includes a daemon that can be built locally and run outside a container, or it can be built in a container image. The necessary RBAC bits for config/ are coming in the next step. Signed-off-by: Dean Roehrich --- .vscode/launch.json | 12 + Dockerfile | 49 +- Makefile | 36 +- crd-bumper.yaml | 2 +- .../compute/server/servers/server_default.go | 4 +- daemons/copy-offload/cmd/main.go | 126 +++ daemons/copy-offload/pkg/driver/dmrequest.go | 82 ++ daemons/copy-offload/pkg/driver/driver.go | 762 ++++++++++++++++++ daemons/copy-offload/pkg/server/server.go | 136 ++++ .../copy-offload/pkg/server/server_test.go | 367 +++++++++ .../controller/datamovement_controller.go | 2 +- .../helpers/datamovement_helpers.go | 4 + 12 files changed, 1570 insertions(+), 12 deletions(-) create mode 100644 daemons/copy-offload/cmd/main.go create mode 100644 daemons/copy-offload/pkg/driver/dmrequest.go create mode 100644 daemons/copy-offload/pkg/driver/driver.go create mode 100644 daemons/copy-offload/pkg/server/server.go create mode 100644 daemons/copy-offload/pkg/server/server_test.go diff --git a/.vscode/launch.json b/.vscode/launch.json index 89a08cdd..bfb18211 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -27,6 +27,18 @@ "--socket=/tmp/nnf.sock" ] }, + { + "name": "Debug Copy-Offload server", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/daemons/copy-offload/cmd", + "env": {"NNF_NODE_NAME": "rabbit-node-1"}, + "args" :[ + "--port=8080", + "--mock", + ] + }, { "name": "Launch file", "type": "go", diff --git a/Dockerfile b/Dockerfile index 909c0c0b..d36ee961 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,10 +21,7 @@ ARG NNFMFU_TAG_BASE=ghcr.io/nearnodeflash/nnf-mfu ARG NNFMFU_VERSION=master # Build the manager binary -FROM golang:1.21-alpine AS builder - -ARG TARGETARCH -ARG TARGETOS +FROM golang:1.21-alpine AS builder_setup WORKDIR /workspace # Copy the Go Modules manifests @@ -40,6 +37,12 @@ COPY cmd/ cmd/ COPY api/ api/ COPY internal/ internal/ +############################################################################### +FROM builder_setup AS builder + +ARG TARGETARCH +ARG TARGETOS + # Build # the GOARCH has a default value to allow the binary be built according to the host where the command # was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO @@ -47,9 +50,26 @@ COPY internal/ internal/ # by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go +############################################################################### +FROM builder_setup AS copy_offload_builder + +ARG TARGETARCH +ARG TARGETOS + +COPY daemons/copy-offload/ daemons/copy-offload/ + +# Build +# the GOARCH has a default value to allow the binary be built according to the host where the command +# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO +# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, +# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. +RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o nnf-copy-offload daemons/copy-offload/cmd/main.go + ############################################################################### FROM builder AS testing +COPY daemons/copy-offload/ daemons/copy-offload/ + WORKDIR /workspace COPY config/ config/ @@ -102,3 +122,24 @@ ENTRYPOINT ["/manager"] ARG NNFMFU_TAG_BASE ARG NNFMFU_VERSION LABEL nnf-mfu="$NNFMFU_TAG_BASE-debug:$NNFMFU_VERSION" + +############################################################################### +FROM $NNFMFU_TAG_BASE:$NNFMFU_VERSION AS copy_offload_production + +# The following lines are from the mpiFileUtils (nnf-mfu) Dockerfile; +# do not change them unless you know what it is you are doing +RUN sed -i "s/[ #]\(.*StrictHostKeyChecking \).*/ \1no/g" /etc/ssh/ssh_config \ + && echo " UserKnownHostsFile /dev/null" >> /etc/ssh/ssh_config + +# Copy the executable and execute +WORKDIR / +COPY --from=copy_offload_builder /workspace/nnf-copy-offload . + +ENTRYPOINT ["/nnf-copy-offload"] + +# Make it easy to figure out which nnf-mfu was used. +# docker inspect --format='{{json .Config.Labels}}' image:tag +ARG NNFMFU_TAG_BASE +ARG NNFMFU_VERSION +LABEL nnf-mfu="$NNFMFU_TAG_BASE:$NNFMFU_VERSION" + diff --git a/Makefile b/Makefile index 4558a96e..5aeb82cb 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,9 @@ BUNDLE_METADATA_OPTS ?= $(BUNDLE_CHANNELS) $(BUNDLE_DEFAULT_CHANNEL) # For example, running 'make bundle-build bundle-push catalog-build catalog-push' will build and push both # cray.hpe.com/nnf-dm-bundle:$VERSION and cray.hpe.com/nnf-dm-catalog:$VERSION. IMAGE_TAG_BASE ?= ghcr.io/nearnodeflash/nnf-dm +IMAGE_COPY_OFFLOAD_TAG_BASE = $(IMAGE_TAG_BASE)-copy-offload IMAGE_TARGET ?= production +IMAGE_COPY_OFFLOAD_TARGET = copy_offload_$(IMAGE_TARGET) # The NNF-MFU container image to use in NNFContainerProfile resources. NNFMFU_TAG_BASE ?= ghcr.io/nearnodeflash/nnf-mfu @@ -136,7 +138,7 @@ container-unit-test: .version ## Run tests inside a container image ${CONTAINER_TOOL} build -f Dockerfile --label $(IMAGE_TAG_BASE)-$@:$(VERSION)-$@ -t $(IMAGE_TAG_BASE)-$@:$(VERSION) --target testing $(CONTAINER_BUILDARGS) . ${CONTAINER_TOOL} run --rm -t --name $@-nnf-dm $(IMAGE_TAG_BASE)-$@:$(VERSION) -##@ Build +##@ Build the controller manager. RPM_PLATFORM ?= linux/amd64 RPM_TARGET ?= x86_64 .PHONY: build-daemon-rpm @@ -168,6 +170,35 @@ build: generate fmt vet ## Build manager binary. run: manifests generate fmt vet ## Run a controller from your host. CGO_ENABLED=0 go run cmd/main.go +##@ Build the copy-offload container's daemon outside the container. +.PHONY: build-copy-offload-local +build-copy-offload-local: GOOS = $(shell go env GOOS) +build-copy-offload-local: GOARCH = $(shell go env GOARCH) +build-copy-offload-local: build-copy-offload-with + +.PHONY: build-copy-offload-with +build-copy-offload-with: $(LOCALBIN) +build-copy-offload-with: fmt vet ## Build standalone copy-offload daemon + CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o bin/nnf-copy-offload daemons/copy-offload/cmd/main.go + +.PHONY: build-copy-offload-docker-local +build-copy-offload-docker-local: GOARCH = $(shell go env GOARCH) +build-copy-offload-docker-local: build-copy-offload-docker-with + +.PHONY: build-copy-offload-docker-amd64 +build-copy-offload-docker-amd64: GOARCH = amd64 +build-copy-offload-docker-amd64: build-copy-offload-docker-with + +.PHONY: build-copy-offload-docker-with +build-copy-offload-docker-with: VERSION ?= $(shell cat .version) +build-copy-offload-docker-with: .version ## Build docker image with the manager. + ${CONTAINER_TOOL} build --platform linux/$(GOARCH) --target $(IMAGE_COPY_OFFLOAD_TARGET) -t $(IMAGE_COPY_OFFLOAD_TAG_BASE):$(VERSION) $(CONTAINER_BUILDARGS) . + +.PHONY: kind-push-copy-offload +kind-push-copy-offload: VERSION ?= $(shell cat .version) +kind-push-copy-offload: .version + kind load docker-image $(IMAGE_COPY_OFFLOAD_TAG_BASE):$(VERSION) + # If you wish to build the manager image targeting other platforms you can use the --platform flag. # (i.e. docker build --platform linux/arm64). However, you must enable docker buildKit for it. # More info: https://docs.docker.com/develop/develop-images/build_enhancements/ @@ -215,9 +246,6 @@ docker-buildx-debug: docker-buildx kind-push: VERSION ?= $(shell cat .version) kind-push: .version ## Push docker image to kind - # Nnf-dm is used on all nodes. It's on the management node for the - # nnf-dm-controller-manager deployment, and on the rabbit nodes for - # the nnf-dm-rsyncnode daemonset that is created by that deployment. kind load docker-image $(IMAGE_TAG_BASE):$(VERSION) kind-push-debug: VERSION ?= $(shell cat .version) diff --git a/crd-bumper.yaml b/crd-bumper.yaml index a0b4884d..5f3f6b4c 100644 --- a/crd-bumper.yaml +++ b/crd-bumper.yaml @@ -1,7 +1,7 @@ # A comma-separated list of directories where more Go code can be found, beyond # the usual cmd/, api/, internal/ that kubebuilder would put in place. The Go # files in these dirs will be bumped to the new hub version. -extra_go_dirs: daemons/compute/server/servers +extra_go_dirs: daemons/compute/server,daemons/copy-offload # A comma-separated list of directories of Kustomize config files that have # references to the API and that must be updated to the new hub version so diff --git a/daemons/compute/server/servers/server_default.go b/daemons/compute/server/servers/server_default.go index a8f86416..96ae29a8 100644 --- a/daemons/compute/server/servers/server_default.go +++ b/daemons/compute/server/servers/server_default.go @@ -866,7 +866,7 @@ func (s *defaultServer) findRabbitRelativeSource(ctx context.Context, computeMou } if len(clientMounts.Items) == 0 { - return "", fmt.Errorf("no client mounts found on node '%s'", s.namespace) + return "", fmt.Errorf("no client mounts found for node '%s'", s.namespace) } for _, clientMount := range clientMounts.Items { @@ -899,7 +899,7 @@ func (s *defaultServer) findComputeMountInfo(ctx context.Context, req *pb.DataMo } if len(clientMounts.Items) == 0 { - return nil, nil, fmt.Errorf("no client mounts found on node '%s'", s.name) + return nil, nil, fmt.Errorf("no client mounts found for node '%s'", s.name) } for _, clientMount := range clientMounts.Items { diff --git a/daemons/copy-offload/cmd/main.go b/daemons/copy-offload/cmd/main.go new file mode 100644 index 00000000..191063cf --- /dev/null +++ b/daemons/copy-offload/cmd/main.go @@ -0,0 +1,126 @@ +/* + * Copyright 2024 Hewlett Packard Enterprise Development LP + * Other additional copyright holders may be indicated within. + * + * The entirety of this work is licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log/slog" + "net/http" + "os" + + "github.com/go-logr/logr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + zapcr "sigs.k8s.io/controller-runtime/pkg/log/zap" + + dwsv1alpha2 "github.com/DataWorkflowServices/dws/api/v1alpha2" + "github.com/NearNodeFlash/nnf-dm/daemons/copy-offload/pkg/driver" + userHttp "github.com/NearNodeFlash/nnf-dm/daemons/copy-offload/pkg/server" + nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4" +) + +var ( + scheme = runtime.NewScheme() + rabbitName string +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(nnfv1alpha4.AddToScheme(scheme)) + utilruntime.Must(dwsv1alpha2.AddToScheme(scheme)) +} + +func setupLog() logr.Logger { + encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) + zaplogger := zapcr.New(zapcr.Encoder(encoder), zapcr.UseDevMode(true)) + ctrl.SetLogger(zaplogger) + + // controllerruntime logger. + crLog := ctrl.Log.WithName("copy-offload") + return crLog +} + +func setupClient(crLog logr.Logger) client.Client { + config := ctrl.GetConfigOrDie() + + clnt, err := client.New(config, client.Options{Scheme: scheme}) + if err != nil { + crLog.Error(err, "Unable to create client") + os.Exit(1) + } + return clnt +} + +func clientSanity(crLog logr.Logger, clnt client.Client, rabbitName string) { + // Sanity check the client connection. + nnfNode := &nnfv1alpha4.NnfNode{} + if err := clnt.Get(context.TODO(), types.NamespacedName{Name: "nnf-nlc", Namespace: rabbitName}, nnfNode); err != nil { + crLog.Error(err, "Failed to retrieve my own NnfNode") + os.Exit(1) + } +} + +func main() { + port := "8080" + mock := false + + flag.StringVar(&port, "port", port, "Port for server.") + flag.BoolVar(&mock, "mock", mock, "Mock mode for tests; does not use k8s.") + flag.Parse() + + rabbitName = os.Getenv("NNF_NODE_NAME") + if rabbitName == "" { + fmt.Println("Did not find NNF_NODE_NAME") + os.Exit(1) + } + + crLog := setupLog() + // Make one of these for this server, and use it in all requests. + drvr := &driver.Driver{Log: crLog, RabbitName: rabbitName, Mock: mock} + if !mock { + clnt := setupClient(crLog) + clientSanity(crLog, clnt, rabbitName) + drvr.Client = clnt + } + slog.Info("Ready", "node", rabbitName, "port", port, "mock", mock) + + httpHandler := &userHttp.UserHttp{Log: crLog, Drvr: drvr, Mock: mock} + + http.HandleFunc("/hello", httpHandler.Hello) + http.HandleFunc("/trial", httpHandler.TrialRequest) + http.HandleFunc("/cancel/", httpHandler.CancelRequest) + http.HandleFunc("/list", httpHandler.ListRequests) + + err := http.ListenAndServe(fmt.Sprintf(":%s", port), nil) + if errors.Is(err, http.ErrServerClosed) { + slog.Info("the server is closed") + } else if err != nil { + slog.Error("unable to start server", "err", err.Error()) + } +} diff --git a/daemons/copy-offload/pkg/driver/dmrequest.go b/daemons/copy-offload/pkg/driver/dmrequest.go new file mode 100644 index 00000000..c569d5da --- /dev/null +++ b/daemons/copy-offload/pkg/driver/dmrequest.go @@ -0,0 +1,82 @@ +/* + * Copyright 2024 Hewlett Packard Enterprise Development LP + * Other additional copyright holders may be indicated within. + * + * The entirety of this work is licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package driver + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" +) + +// DMRequest represents the content of one http request. This has a +// one-to-one relationship with a DriverRequest object. +type DMRequest struct { + ComputeName string `json:"computeName"` + + // The name and namespace of the initiating workflow + WorkflowName string `json:"workflowName"` + WorkflowNamespace string `json:"workflowNamespace"` + // Source file or directory + SourcePath string `json:"sourcePath"` + // Destination file or directory + DestinationPath string `json:"destinationPath"` + // If True, the data movement command runs `/bin/true` rather than perform actual data movement + Dryrun bool `json:"dryrun"` + // Extra options to pass to `dcp` if present in the Data Movement command. + DcpOptions string `json:"dcpOptions"` + // If true, enable server-side logging of stdout when the command is successful. Failure output is always logged. + LogStdout bool `json:"logStdout"` + // If true, store stdout in DataMovementStatusResponse.Message when the command is successful. Failure output is always contained in the message. + StoreStdout bool `json:"storeStdout"` + // The number of slots specified in the MPI hostfile. A value of 0 disables the use of slots in + // the hostfile. -1 will defer to the server side configuration. + Slots int32 `json:"slots"` + // The number of max_slots specified in the MPI hostfile. A value of 0 disables the use of + // max_slots in the hostfile. -1 will defer to the server side configuration. + MaxSlots int32 `json:"maxSlots"` + // The name of the data movement configuration profile to use. The above parameters (e.g. slots, + // logStdout) will override the settings defined by the profile. This profile must exist on the + // server otherwise the data movement operation will be invalid. Empty will default to the + // default profile. + DMProfile string `json:"dmProfile"` + // Extra options to pass to `mpirun` if present in the Data Movement command. + MpirunOptions string `json:"mpirunOptions"` +} + +func (m *DMRequest) Validator() error { + + if m.ComputeName == "" { + return fmt.Errorf("compute name must be supplied") + } + if m.WorkflowName == "" { + return fmt.Errorf("workflow name must be supplied") + } + if m.SourcePath == "" { + return fmt.Errorf("source path must be supplied") + } + if m.DestinationPath == "" { + return fmt.Errorf("destination path must be supplied") + } + if m.WorkflowNamespace == "" { + m.WorkflowNamespace = corev1.NamespaceDefault + } + + return nil +} diff --git a/daemons/copy-offload/pkg/driver/driver.go b/daemons/copy-offload/pkg/driver/driver.go new file mode 100644 index 00000000..a7920ece --- /dev/null +++ b/daemons/copy-offload/pkg/driver/driver.go @@ -0,0 +1,762 @@ +/* + * Copyright 2024 Hewlett Packard Enterprise Development LP + * Other additional copyright holders may be indicated within. + * + * The entirety of this work is licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package driver + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "reflect" + "slices" + "strings" + "sync" + "time" + + "github.com/go-logr/logr" + "github.com/google/uuid" + "go.openly.dev/pointy" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + dwsv1alpha2 "github.com/DataWorkflowServices/dws/api/v1alpha2" + lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1" + "github.com/NearNodeFlash/nnf-dm/internal/controller/helpers" + nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4" +) + +var ( + // The name prefix used for generating NNF Data Movement operations. + nameBase = "nnf-copy-offload-" + nodeNameBase = "nnf-copy-offload-node-" +) + +// Driver will have only one instance per process, shared by all threads +// in the process. +type Driver struct { + Client client.Client + Log logr.Logger + RabbitName string + + Mock bool + MockCount int + + // We maintain a map of active operations which allows us to process cancel requests. + // This is a thread safe map since multiple data movement reconcilers and go routines will be executing at the same time. + // The SrvrDataMovementRecord objects are stored here. + contexts sync.Map +} + +// Keep track of the context and its cancel function so that we can track +// and cancel data movement operations in progress. +// These objects are stored in the Driver.contexts map. +type SrvrDataMovementRecord struct { + dmreq *nnfv1alpha4.NnfDataMovement + cancelContext helpers.DataMovementCancelContext +} + +// DriverRequest contains the information that is collected during the initial examination +// of a given http request. This has a one-to-one relationship with a DMRequest +// object. +type DriverRequest struct { + Drvr *Driver + + // Fresh copy of the chosen NnfDataMovementProfile. + dmProfile *nnfv1alpha4.NnfDataMovementProfile + // Storage nodes to use. + nodes []string + // Worker nodes to use. + hosts []string + // MPI hosts file. + mpiHostfile string +} + +func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alpha4.NnfDataMovement, error) { + + drvr := r.Drvr + crLog := drvr.Log.WithValues("workflow", dmreq.WorkflowName) + workflow := &dwsv1alpha2.Workflow{} + + wf := types.NamespacedName{Name: dmreq.WorkflowName, Namespace: dmreq.WorkflowNamespace} + if err := drvr.Client.Get(ctx, wf, workflow); err != nil { + crLog.Error(err, "Unable to get workflow") + return nil, err + } + if workflow.Status.State != dwsv1alpha2.StatePreRun || workflow.Status.Status != "Completed" { + err := fmt.Errorf("workflow must be in '%s' state and 'Completed' status", dwsv1alpha2.StatePreRun) + crLog.Error(err, "Invalid state") + return nil, err + } + + computeClientMount, computeMountInfo, err := r.findComputeMountInfo(ctx, dmreq) + if err != nil { + crLog.Error(err, "Failed to retrieve compute mountinfo") + return nil, err + } + + crLog = crLog.WithValues("type", computeMountInfo.Type) + var dm *nnfv1alpha4.NnfDataMovement + switch computeMountInfo.Type { + case "lustre": + dm, err = r.createNnfDataMovement(ctx, dmreq, computeMountInfo, computeClientMount) + case "gfs2": + dm, err = r.createNnfNodeDataMovement(ctx, dmreq, computeMountInfo) + default: + // xfs is not supported since it can only be mounted in one location at a time. It is + // already mounted on the compute node when copy offload occurs (PreRun/PostRun), therefore + // it cannot be mounted on the rabbit to perform data movement. + err = fmt.Errorf("filesystem not supported") + } + + if err != nil { + crLog.Error(err, "Failed to copy files") + return nil, err + } + + // Dm Profile - no pinned profiles here since copy_offload could use any profile. + r.dmProfile, err = r.selectProfile(ctx, dmreq) + if err != nil { + crLog.Error(err, "Failed to get profile", "profile", dmreq.DMProfile) + return nil, err + } + dm.Spec.ProfileReference = corev1.ObjectReference{ + Kind: reflect.TypeOf(nnfv1alpha4.NnfDataMovementProfile{}).Name(), + Name: r.dmProfile.Name, + Namespace: r.dmProfile.Namespace, + } + crLog.Info("Using NnfDataMovmentProfile", "name", r.dmProfile) + + dm.Spec.UserId = workflow.Spec.UserID + dm.Spec.GroupId = workflow.Spec.GroupID + + // Add appropriate workflow labels so this is cleaned up + dwsv1alpha2.AddWorkflowLabels(dm, workflow) + dwsv1alpha2.AddOwnerLabels(dm, workflow) + + // Label the NnfDataMovement with a teardown state of "post_run" so the NNF workflow + // controller can identify compute initiated data movements. + nnfv1alpha4.AddDataMovementTeardownStateLabel(dm, dwsv1alpha2.StatePostRun) + + // Allow the user to override/supplement certain settings + setUserConfig(dmreq, dm) + + // We name the NnfDataMovement ourselves, since we're not giving it to k8s. + // We'll use this name internally. + r.generateName(dm) + + return dm, nil +} + +func (r *DriverRequest) generateName(dm *nnfv1alpha4.NnfDataMovement) { + drvr := r.Drvr + + var nameSuffix string + if drvr.Mock { + nameSuffix = fmt.Sprintf("%d", drvr.MockCount) + drvr.MockCount += 1 + } else { + nameSuffix = string(uuid.NewString()[0:10]) + } + dm.Name = fmt.Sprintf("%s%s", dm.GetObjectMeta().GetGenerateName(), nameSuffix) +} + +func (r *DriverRequest) CreateMock(ctx context.Context, dmreq DMRequest) (*nnfv1alpha4.NnfDataMovement, error) { + drvr := r.Drvr + + dm := &nnfv1alpha4.NnfDataMovement{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: nodeNameBase, + Namespace: drvr.RabbitName, // Use the rabbit + Labels: map[string]string{ + nnfv1alpha4.DataMovementInitiatorLabel: dmreq.ComputeName, + }, + }, + } + r.generateName(dm) + return dm, nil +} + +func (r *DriverRequest) Drive(ctx context.Context, dmreq DMRequest, dm *nnfv1alpha4.NnfDataMovement) error { + + var err error + drvr := r.Drvr + crLog := drvr.Log.WithValues("workflow", dmreq.WorkflowName) + + r.nodes, err = helpers.GetStorageNodeNames(drvr.Client, ctx, dm) + if err != nil { + crLog.Error(err, "could not get storage nodes for data movement") + return err + } + + r.hosts, err = helpers.GetWorkerHostnames(drvr.Client, ctx, r.nodes) + if err != nil { + crLog.Error(err, "could not get worker nodes for data movement") + return err + } + + // Create the hostfile. This is needed for preparing the destination and the data movement + // command itself. + r.mpiHostfile, err = helpers.CreateMpiHostfile(r.dmProfile, r.hosts, dm) + if err != nil { + crLog.Error(err, "could not create MPI hostfile") + return err + } + crLog.Info("MPI Hostfile preview", "first line", helpers.PeekMpiHostfile(r.mpiHostfile)) + + ctxCancel := r.recordRequest(ctx, dm) + + if err := r.driveWithContext(ctx, ctxCancel, dm, crLog); err != nil { + crLog.Error(err, "failed copy") + os.RemoveAll(filepath.Dir(r.mpiHostfile)) + drvr.contexts.Delete(dm.Name) + return err + } + + return nil +} + +func (r *DriverRequest) DriveMock(ctx context.Context, dmreq DMRequest, dm *nnfv1alpha4.NnfDataMovement) error { + _ = r.recordRequest(ctx, dm) + + return nil +} + +func (r *DriverRequest) recordRequest(ctx context.Context, dm *nnfv1alpha4.NnfDataMovement) context.Context { + drvr := r.Drvr + + // Expand the context with cancel and store it in the map so the cancel function can be + // found by another server thread if necessary. + ctxCancel, cancel := context.WithCancel(ctx) + drvr.contexts.Store(dm.Name, SrvrDataMovementRecord{ + dmreq: dm, + cancelContext: helpers.DataMovementCancelContext{ + Ctx: ctxCancel, + Cancel: cancel, + }, + }) + return ctxCancel +} + +func (r *DriverRequest) CancelRequest(ctx context.Context, name string) error { + drvr := r.Drvr + + storedCancelContext, loaded := drvr.contexts.LoadAndDelete(name) + if !loaded { + // Maybe it already completed and removed itself? + return nil + } + + contextRecord := storedCancelContext.(SrvrDataMovementRecord) + cancelContext := contextRecord.cancelContext + drvr.Log.Info("cancelling request", "name", name) + cancelContext.Cancel() + <-cancelContext.Ctx.Done() + + // Nothing more to do - the go routine that is executing the data movement will exit + // and the status is recorded then. + + return nil +} + +func (r *DriverRequest) ListRequests(ctx context.Context) ([]string, error) { + drvr := r.Drvr + + drvr.Log.Info("Listing requests:") + items := make([]string, 0) + drvr.contexts.Range(func(key, val interface{}) bool { + skey := fmt.Sprintf("%s", key) + drvr.Log.Info(fmt.Sprintf(" %s", skey)) + items = append(items, skey) + return true + }) + slices.Sort(items) + + return items, nil +} + +func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context.Context, dm *nnfv1alpha4.NnfDataMovement, crLog logr.Logger) error { + drvr := r.Drvr + + // Prepare Destination Directory + if err := helpers.PrepareDestination(drvr.Client, ctx, r.dmProfile, dm, r.mpiHostfile, crLog); err != nil { + return err + } + + // Build command + cmdArgs, err := helpers.BuildDMCommand(r.dmProfile, r.mpiHostfile, dm, crLog) + if err != nil { + crLog.Error(err, "could not create data movement command") + return err + } + cmd := exec.CommandContext(ctxCancel, "/bin/bash", "-c", strings.Join(cmdArgs, " ")) + + // XXX DEAN DEAN at some point we need a lock for the dm.Status, + // to coordinate with a 'cancel' message that comes into the server. + + // Record the start of the data movement operation + now := metav1.NowMicro() + dm.Status.StartTime = &now + dm.Status.State = nnfv1alpha4.DataMovementConditionTypeRunning + cmdStatus := nnfv1alpha4.NnfDataMovementCommandStatus{} + cmdStatus.Command = cmd.String() + dm.Status.CommandStatus = &cmdStatus + crLog.Info("Running Command", "cmd", cmdStatus.Command) + + contextDelete := func() { drvr.contexts.Delete(dm.Name) } + + runit(ctxCancel, contextDelete, cmd, &cmdStatus, r.dmProfile, r.mpiHostfile, crLog) + return nil +} + +func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdStatus *nnfv1alpha4.NnfDataMovementCommandStatus, profile *nnfv1alpha4.NnfDataMovementProfile, mpiHostfile string, log logr.Logger) { + + // Execute the go routine to perform the data movement + go func() { + var err error + // Use a MultiWriter so that we can parse the output and save the full output at the end + var combinedOutBuf, parseBuf bytes.Buffer + cmd.Stdout = io.MultiWriter(&parseBuf, &combinedOutBuf) + cmd.Stderr = cmd.Stdout // Combine stderr/stdout + + // Use channels to sync progress collection and cmd.Wait(). + chCommandDone := make(chan bool, 1) + chProgressDone := make(chan bool) + + // Start the data movement command + cmd.Start() + + // While the command is running, capture and process the output. Read lines until EOF to + // ensure we have the latest output. Then use the last regex match to obtain the most recent + // progress. + progressCollectInterval := time.Duration(profile.Data.ProgressIntervalSeconds) * time.Second + if helpers.ProgressCollectionEnabled(progressCollectInterval) { + go func() { + var elapsed metav1.Duration + elapsed.Duration = 0 + progressStart := metav1.NowMicro() + + // Perform the actual collection and update logic + parseAndUpdateProgress := func() { + + // Read all lines of output until EOF + for { + line, err := parseBuf.ReadString('\n') + if err == io.EOF { + break + } else if err != nil { + log.Error(err, "failed to read progress output") + } + + // If it's a progress line, grab the percentage + if err := helpers.ParseDcpProgress(line, cmdStatus); err != nil { + log.Error(err, "failed to parse progress", "line", line) + return + } + + // Collect stats only when finished + if cmdStatus.ProgressPercentage != nil && *cmdStatus.ProgressPercentage >= 100 { + if err := helpers.ParseDcpStats(line, cmdStatus); err != nil { + log.Error(err, "failed to parse stats", "line", line) + return + } + } + + // Always update LastMessage and timing + cmdStatus.LastMessage = line + progressNow := metav1.NowMicro() + elapsed.Duration = progressNow.Time.Sub(progressStart.Time) + cmdStatus.LastMessageTime = progressNow + cmdStatus.ElapsedTime = elapsed + } + + //// Update the CommandStatus in the DM resource after we parsed all the lines + //err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + // dm := &nnfv1alpha4.NnfDataMovement{} + // if err := r.Get(ctx, req.NamespacedName, dm); err != nil { + // return client.IgnoreNotFound(err) + // } + // + // if dm.Status.CommandStatus == nil { + // dm.Status.CommandStatus = &nnfv1alpha4.NnfDataMovementCommandStatus{} + // } + // cmdStatus.DeepCopyInto(dm.Status.CommandStatus) + // + // return r.Status().Update(ctx, dm) + //}) + + //if err != nil { + // log.Error(err, "failed to update CommandStatus with Progress", "cmdStatus", cmdStatus) + //} + } + + // Main Progress Collection Loop + for { + select { + // Now that we're done, parse whatever output is left + case <-chCommandDone: + parseAndUpdateProgress() + chProgressDone <- true + return + // Collect Progress output on every interval + case <-time.After(progressCollectInterval): + parseAndUpdateProgress() + } + } + }() + } else { + log.Info("Skipping progress collection - collection interval is less than 1s", "collectInterval", progressCollectInterval) + } + + err = cmd.Wait() + + // If enabled, wait for final progress collection + if helpers.ProgressCollectionEnabled(progressCollectInterval) { + chCommandDone <- true // tell the process goroutine to stop parsing output + <-chProgressDone // wait for process goroutine to stop parsing final output + } + + // Command is finished, update status + //now := metav1.NowMicro() + //dm.Status.EndTime = &now + //dm.Status.State = nnfv1alpha4.DataMovementConditionTypeFinished + //dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonSuccess + + // On cancellation or failure, log the output. On failure, also store the output in the + // Status.Message. When successful, check the profile/UserConfig config options to log + // and/or store the output. + if errors.Is(ctxCancel.Err(), context.Canceled) { + log.Info("Data movement operation cancelled", "output", combinedOutBuf.String()) + //dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonCancelled + } else if err != nil { + log.Error(err, "Data movement operation failed", "output", combinedOutBuf.String()) + //dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonFailed + //dm.Status.Message = fmt.Sprintf("%s: %s", err.Error(), combinedOutBuf.String()) + //resourceErr := dwsv1alpha2.NewResourceError("").WithError(err).WithUserMessage("data movement operation failed: %s", combinedOutBuf.String()).WithFatal() + //dm.Status.SetResourceErrorAndLog(resourceErr, log) + } else { + log.Info("Data movement operation completed", "cmdStatus", cmdStatus) + + // Profile or DM request has enabled stdout logging + //if profile.Data.LogStdout || (dm.Spec.UserConfig != nil && dm.Spec.UserConfig.LogStdout) { + // log.Info("Data movement operation output", "output", combinedOutBuf.String()) + //} + log.Info("Data movement operation output", "output", combinedOutBuf.String()) + + //// Profile or DM request has enabled storing stdout + //if profile.Data.StoreStdout || (dm.Spec.UserConfig != nil && dm.Spec.UserConfig.StoreStdout) { + // dm.Status.Message = combinedOutBuf.String() + //} + } + + os.RemoveAll(filepath.Dir(mpiHostfile)) + + //status := dm.Status.DeepCopy() + + //err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + // dm := &nnfv1alpha4.NnfDataMovement{} + // if err := r.Get(ctx, req.NamespacedName, dm); err != nil { + // return client.IgnoreNotFound(err) + // } + // + // // Ensure we have the latest CommandStatus from the progress goroutine + // cmdStatus.DeepCopyInto(status.CommandStatus) + // status.DeepCopyInto(&dm.Status) + // + // return r.Status().Update(ctx, dm) + //}) + // + //if err != nil { + // log.Error(err, "failed to update dm status with completion") + // // TODO Add prometheus counter to track occurrences + //} + + contextDelete() + }() +} + +// Set the DM's UserConfig options based on the incoming requests's options +func setUserConfig(dmreq DMRequest, dm *nnfv1alpha4.NnfDataMovement) { + dm.Spec.UserConfig = &nnfv1alpha4.NnfDataMovementConfig{} + dm.Spec.UserConfig.Dryrun = dmreq.Dryrun + dm.Spec.UserConfig.MpirunOptions = dmreq.MpirunOptions + dm.Spec.UserConfig.DcpOptions = dmreq.DcpOptions + dm.Spec.UserConfig.LogStdout = dmreq.LogStdout + dm.Spec.UserConfig.StoreStdout = dmreq.StoreStdout + + if dmreq.Slots >= 0 { + dm.Spec.UserConfig.Slots = pointy.Int(int(dmreq.Slots)) + } + if dmreq.MaxSlots >= 0 { + dm.Spec.UserConfig.MaxSlots = pointy.Int(int(dmreq.MaxSlots)) + } +} + +// createNnfNodeDataMovement creates an NnfDataMovement to be used with Lustre. +func (r *DriverRequest) createNnfDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo, computeClientMount *dwsv1alpha2.ClientMount) (*nnfv1alpha4.NnfDataMovement, error) { + + // Find the ClientMount for the rabbit. + source, err := r.findRabbitRelativeSource(ctx, dmreq, computeMountInfo) + if err != nil { + return nil, err + } + + var dwIndex string + if dw, err := getDirectiveIndexFromClientMount(computeClientMount); err != nil { + return nil, err + } else { + dwIndex = dw + } + + lustrefs, err := r.findDestinationLustreFilesystem(ctx, dmreq.DestinationPath) + if err != nil { + return nil, err + } + + dm := &nnfv1alpha4.NnfDataMovement{ + ObjectMeta: metav1.ObjectMeta{ + // Be careful about how much you put into GenerateName. + // The MPI operator will use the resulting name as a + // prefix for its own names. + GenerateName: nameBase, + // Use the data movement namespace. + Namespace: nnfv1alpha4.DataMovementNamespace, + Labels: map[string]string{ + nnfv1alpha4.DataMovementInitiatorLabel: dmreq.ComputeName, + nnfv1alpha4.DirectiveIndexLabel: dwIndex, + }, + }, + Spec: nnfv1alpha4.NnfDataMovementSpec{ + Source: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ + Path: source, + StorageReference: computeMountInfo.Device.DeviceReference.ObjectReference, + }, + Destination: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ + Path: dmreq.DestinationPath, + StorageReference: corev1.ObjectReference{ + Kind: reflect.TypeOf(*lustrefs).Name(), + Namespace: lustrefs.Namespace, + Name: lustrefs.Name, + }, + }, + }, + } + + return dm, nil +} + +func getDirectiveIndexFromClientMount(object *dwsv1alpha2.ClientMount) (string, error) { + // Find the DW index for our work. + labels := object.GetLabels() + if labels == nil { + return "", fmt.Errorf("unable to find labels on compute ClientMount, namespaces=%s, name=%s", object.Namespace, object.Name) + } + + dwIndex, found := labels[nnfv1alpha4.DirectiveIndexLabel] + if !found { + return "", fmt.Errorf("unable to find directive index label on compute ClientMount, namespace=%s name=%s", object.Namespace, object.Name) + } + + return dwIndex, nil +} + +// createNnfNodeDataMovement creates an NnfDataMovement to be used with GFS2. +func (r *DriverRequest) createNnfNodeDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo) (*nnfv1alpha4.NnfDataMovement, error) { + drvr := r.Drvr + + // Find the ClientMount for the rabbit. + source, err := r.findRabbitRelativeSource(ctx, dmreq, computeMountInfo) + if err != nil { + return nil, err + } + + dm := &nnfv1alpha4.NnfDataMovement{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: nodeNameBase, + Namespace: drvr.RabbitName, // Use the rabbit + Labels: map[string]string{ + nnfv1alpha4.DataMovementInitiatorLabel: dmreq.ComputeName, + }, + }, + Spec: nnfv1alpha4.NnfDataMovementSpec{ + Source: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ + Path: source, + StorageReference: computeMountInfo.Device.DeviceReference.ObjectReference, + }, + Destination: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ + Path: dmreq.DestinationPath, + }, + }, + } + + return dm, nil +} + +func (r *DriverRequest) findRabbitRelativeSource(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo) (string, error) { + + drvr := r.Drvr + // Now look up the client mount on this Rabbit node and find the compute initiator. We append the relative path + // to this value resulting in the full path on the Rabbit. + + listOptions := []client.ListOption{ + client.InNamespace(drvr.RabbitName), + client.MatchingLabels(map[string]string{ + dwsv1alpha2.WorkflowNameLabel: dmreq.WorkflowName, + dwsv1alpha2.WorkflowNamespaceLabel: dmreq.WorkflowNamespace, + }), + } + + clientMounts := &dwsv1alpha2.ClientMountList{} + if err := drvr.Client.List(ctx, clientMounts, listOptions...); err != nil { + return "", err + } + + if len(clientMounts.Items) == 0 { + return "", fmt.Errorf("no client mounts found for node '%s'", drvr.RabbitName) + } + + for _, clientMount := range clientMounts.Items { + for _, mount := range clientMount.Spec.Mounts { + if *computeMountInfo.Device.DeviceReference == *mount.Device.DeviceReference { + return mount.MountPath + strings.TrimPrefix(dmreq.SourcePath, computeMountInfo.MountPath), nil + } + } + } + + return "", fmt.Errorf("initiator not found in list of client mounts: %s", dmreq.ComputeName) +} + +// Look up the client mounts on this node to find the compute relative mount path. The "spec.Source" must be +// prefixed with a mount path in the list of mounts. Once we find this mount, we can strip out the prefix and +// are left with the relative path. +func (r *DriverRequest) findComputeMountInfo(ctx context.Context, dmreq DMRequest) (*dwsv1alpha2.ClientMount, *dwsv1alpha2.ClientMountInfo, error) { + + drvr := r.Drvr + listOptions := []client.ListOption{ + client.InNamespace(dmreq.ComputeName), + client.MatchingLabels(map[string]string{ + dwsv1alpha2.WorkflowNameLabel: dmreq.WorkflowName, + dwsv1alpha2.WorkflowNamespaceLabel: dmreq.WorkflowNamespace, + }), + } + + clientMounts := &dwsv1alpha2.ClientMountList{} + if err := drvr.Client.List(ctx, clientMounts, listOptions...); err != nil { + return nil, nil, err + } + + if len(clientMounts.Items) == 0 { + return nil, nil, fmt.Errorf("no client mounts found for node '%s'", dmreq.ComputeName) + } + + for _, clientMount := range clientMounts.Items { + for _, mount := range clientMount.Spec.Mounts { + if strings.HasPrefix(dmreq.SourcePath, mount.MountPath) { + if mount.Device.DeviceReference == nil && mount.Device.Type != "lustre" { + return nil, nil, fmt.Errorf("ClientMount %s/%s: Source path '%s' does not have device reference", clientMount.Namespace, clientMount.Name, dmreq.SourcePath) + } + + return &clientMount, &mount, nil + } + } + } + + return nil, nil, fmt.Errorf("source path not found in list of client mounts: %s", dmreq.SourcePath) +} + +func (r *DriverRequest) findDestinationLustreFilesystem(ctx context.Context, dest string) (*lusv1beta1.LustreFileSystem, error) { + + drvr := r.Drvr + if !filepath.IsAbs(dest) { + return nil, fmt.Errorf("destination must be an absolute path") + } + origDest := dest + if !strings.HasSuffix(dest, "/") { + dest += "/" + } + + lustrefsList := &lusv1beta1.LustreFileSystemList{} + if err := drvr.Client.List(ctx, lustrefsList); err != nil { + return nil, fmt.Errorf("unable to list LustreFileSystem resources: %s", err.Error()) + } + if len(lustrefsList.Items) == 0 { + return nil, fmt.Errorf("no LustreFileSystem resources found") + } + + for _, lustrefs := range lustrefsList.Items { + mroot := lustrefs.Spec.MountRoot + if !strings.HasSuffix(mroot, "/") { + mroot += "/" + } + if strings.HasPrefix(dest, mroot) { + return &lustrefs, nil + } + } + + return nil, fmt.Errorf("unable to find a LustreFileSystem resource matching %s", origDest) +} + +func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nnfv1alpha4.NnfDataMovementProfile, error) { + drvr := r.Drvr + profileName := dmreq.DMProfile + ns := "nnf-system" + + // If a profile is named then verify that it exists. Otherwise, verify that a default profile + // can be found. + if len(profileName) == 0 { + NnfDataMovementProfiles := &nnfv1alpha4.NnfDataMovementProfileList{} + if err := drvr.Client.List(ctx, NnfDataMovementProfiles, &client.ListOptions{Namespace: ns}); err != nil { + return nil, err + } + profilesFound := make([]string, 0, len(NnfDataMovementProfiles.Items)) + for _, profile := range NnfDataMovementProfiles.Items { + if profile.Data.Default { + objkey := client.ObjectKeyFromObject(&profile) + profilesFound = append(profilesFound, objkey.Name) + } + } + // Require that there be one and only one default. + if len(profilesFound) == 0 { + return nil, fmt.Errorf("unable to find a default NnfDataMovementProfile to use") + } else if len(profilesFound) > 1 { + return nil, fmt.Errorf("more than one default NnfDataMovementProfile found; unable to pick one: %v", profilesFound) + } + profileName = profilesFound[0] + } + + profile := &nnfv1alpha4.NnfDataMovementProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: profileName, + Namespace: ns, + }, + } + + err := drvr.Client.Get(ctx, client.ObjectKeyFromObject(profile), profile) + if err != nil { + return nil, fmt.Errorf("unable to retrieve NnfDataMovementProfile: %s", profileName) + } + + return profile, nil +} diff --git a/daemons/copy-offload/pkg/server/server.go b/daemons/copy-offload/pkg/server/server.go new file mode 100644 index 00000000..a11959ae --- /dev/null +++ b/daemons/copy-offload/pkg/server/server.go @@ -0,0 +1,136 @@ +/* + * Copyright 2024 Hewlett Packard Enterprise Development LP + * Other additional copyright holders may be indicated within. + * + * The entirety of this work is licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "path/filepath" + "strings" + + "github.com/NearNodeFlash/nnf-dm/daemons/copy-offload/pkg/driver" + nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4" + "github.com/go-logr/logr" +) + +type UserHttp struct { + Log logr.Logger + Drvr *driver.Driver + InTest bool + Mock bool +} + +func (user *UserHttp) Hello(w http.ResponseWriter, req *http.Request) { + user.Log.Info("Hello") + fmt.Fprintf(w, "hello back at ya\n") +} + +func (user *UserHttp) ListRequests(w http.ResponseWriter, req *http.Request) { + + if req.Method != "GET" { + http.Error(w, "method not supported", http.StatusNotImplemented) + return + } + + drvrReq := driver.DriverRequest{Drvr: user.Drvr} + items, err := drvrReq.ListRequests(context.TODO()) + if err != nil { + http.Error(w, fmt.Sprintf("unable to list requests: %s\n", err.Error()), http.StatusInternalServerError) + return + } + if len(items) > 0 { + fmt.Fprintln(w, strings.Join(items, ",")) + } +} + +func (user *UserHttp) CancelRequest(w http.ResponseWriter, req *http.Request) { + + if req.Method != "DELETE" { + http.Error(w, "method not supported", http.StatusNotImplemented) + return + } + user.Log.Info("In DELETE", "url", req.URL) + urlParts, err := url.Parse(req.URL.String()) + if err != nil { + http.Error(w, "unable to parse URL", http.StatusBadRequest) + return + } + name := filepath.Base(urlParts.Path) + + drvrReq := driver.DriverRequest{Drvr: user.Drvr} + if err := drvrReq.CancelRequest(context.TODO(), name); err != nil { + http.Error(w, fmt.Sprintf("unable to cancel request: %s\n", err.Error()), http.StatusBadRequest) + return + } + http.Error(w, "", http.StatusNoContent) +} + +func (user *UserHttp) TrialRequest(w http.ResponseWriter, req *http.Request) { + + if req.Method != "POST" { + http.Error(w, "method not supported", http.StatusNotImplemented) + return + } + user.Log.Info("In TrialRequest", "url", req.URL) + + var dmreq driver.DMRequest + if err := json.NewDecoder(req.Body).Decode(&dmreq); err != nil { + http.Error(w, fmt.Sprintf("unable to decode data movement request body: %s", err.Error()), http.StatusBadRequest) + return + } + user.Log.Info(" TrialRequest", "dmreq", dmreq) + if err := dmreq.Validator(); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var dm *nnfv1alpha4.NnfDataMovement + var err error + drvrReq := driver.DriverRequest{Drvr: user.Drvr} + if user.Mock { + dm, err = drvrReq.CreateMock(context.TODO(), dmreq) + if err != nil { + http.Error(w, fmt.Sprintf("%s\n", err.Error()), http.StatusInternalServerError) + return + } + + err = drvrReq.DriveMock(context.TODO(), dmreq, dm) + if err != nil { + http.Error(w, fmt.Sprintf("%s\n", err.Error()), http.StatusInternalServerError) + return + } + } else { + dm, err = drvrReq.Create(context.TODO(), dmreq) + if err != nil { + http.Error(w, fmt.Sprintf("%s\n", err.Error()), http.StatusInternalServerError) + return + } + + err = drvrReq.Drive(context.TODO(), dmreq, dm) + if err != nil { + http.Error(w, fmt.Sprintf("%s\n", err.Error()), http.StatusInternalServerError) + return + } + } + fmt.Fprintf(w, "name=%s\n", dm.GetName()) +} diff --git a/daemons/copy-offload/pkg/server/server_test.go b/daemons/copy-offload/pkg/server/server_test.go new file mode 100644 index 00000000..6ab1e899 --- /dev/null +++ b/daemons/copy-offload/pkg/server/server_test.go @@ -0,0 +1,367 @@ +/* + * Copyright 2024 Hewlett Packard Enterprise Development LP + * Other additional copyright holders may be indicated within. + * + * The entirety of this work is licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + ctrl "sigs.k8s.io/controller-runtime" + zapcr "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/NearNodeFlash/nnf-dm/daemons/copy-offload/pkg/driver" +) + +func setupLog() logr.Logger { + encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) + zaplogger := zapcr.New(zapcr.Encoder(encoder), zapcr.UseDevMode(true)) + ctrl.SetLogger(zaplogger) + + // controllerruntime logger. + crLog := ctrl.Log.WithName("copy-offload-test") + return crLog +} + +func TestA_Hello(t *testing.T) { + t.Run("returns hello response", func(t *testing.T) { + request, _ := http.NewRequest(http.MethodGet, "/hello", nil) + response := httptest.NewRecorder() + + httpHandler := &UserHttp{Log: setupLog()} + + httpHandler.Hello(response, request) + + res := response.Result() + got := response.Body.String() + want := "hello back at ya\n" + statusWant := http.StatusOK + + if res.StatusCode != statusWant { + t.Errorf("got status %d, want status %d", res.StatusCode, statusWant) + } + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) +} + +func TestB_ListRequests(t *testing.T) { + testCases := []struct { + name string + method string + wantText string + wantStatus int + }{ + { + name: "returns status-no-content", + method: http.MethodGet, + wantText: "", + wantStatus: http.StatusOK, + }, + { + name: "returns status-not-implemented for POST", + method: http.MethodPost, + wantText: "method not supported\n", + wantStatus: http.StatusNotImplemented, + }, + { + name: "returns status-not-implemented for PUT", + method: http.MethodPut, + wantText: "method not supported\n", + wantStatus: http.StatusNotImplemented, + }, + } + + crLog := setupLog() + drvr := &driver.Driver{Log: crLog, Mock: true} + httpHandler := &UserHttp{Log: crLog, Drvr: drvr, Mock: true} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + request, _ := http.NewRequest(test.method, "/list", nil) + response := httptest.NewRecorder() + + httpHandler.ListRequests(response, request) + + res := response.Result() + got := response.Body.String() + + if res.StatusCode != test.wantStatus { + t.Errorf("got status %d, want status %d", res.StatusCode, test.wantStatus) + } + if got != test.wantText { + t.Errorf("got %q, want %q", got, test.wantText) + } + }) + } +} + +func TestC_CancelRequest(t *testing.T) { + testCases := []struct { + name string + method string + wantText string + wantStatus int + }{ + { + name: "returns status-no-content", + method: http.MethodDelete, + wantText: "\n", + wantStatus: http.StatusNoContent, + }, + { + name: "returns status-not-implemented for GET", + method: http.MethodGet, + wantText: "method not supported\n", + wantStatus: http.StatusNotImplemented, + }, + { + name: "returns status-not-implemented for PUT", + method: http.MethodPut, + wantText: "method not supported\n", + wantStatus: http.StatusNotImplemented, + }, + } + + crLog := setupLog() + drvr := &driver.Driver{Log: crLog, Mock: true} + httpHandler := &UserHttp{Log: crLog, Drvr: drvr, Mock: true} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + request, _ := http.NewRequest(test.method, "/cancel/nnf-copy-offload-node-9ae2a136-4", nil) + response := httptest.NewRecorder() + + httpHandler.CancelRequest(response, request) + + res := response.Result() + got := response.Body.String() + + if res.StatusCode != test.wantStatus { + t.Errorf("got status %d, want status %d", res.StatusCode, test.wantStatus) + } + if got != test.wantText { + t.Errorf("got %q, want %q", got, test.wantText) + } + }) + } +} + +func TestD_TrialRequest(t *testing.T) { + testCases := []struct { + name string + method string + body []byte + wantText string + wantStatus int + }{ + { + name: "returns status-ok", + method: http.MethodPost, + body: []byte("{\"computeName\": \"rabbit-compute-3\", \"workflowName\": \"yellow\", \"sourcePath\": \"/mnt/nnf/dc51a384-99bd-4ef1-8444-4ee3b0cdc8a8-0\", \"destinationPath\": \"/lus/global/dean/foo\", \"dryrun\": true}"), + wantText: "name=nnf-copy-offload-node-0\n", + wantStatus: http.StatusOK, + }, + { + name: "returns status-bad-request", + method: http.MethodPost, + body: []byte("{\"unknown\": 1}"), + wantText: "compute name must be supplied\n", + wantStatus: http.StatusBadRequest, + }, + { + name: "returns status-not-implemented for GET", + method: http.MethodGet, + wantText: "method not supported\n", + wantStatus: http.StatusNotImplemented, + }, + { + name: "returns status-not-implemented for PUT", + method: http.MethodPut, + wantText: "method not supported\n", + wantStatus: http.StatusNotImplemented, + }, + } + + crLog := setupLog() + drvr := &driver.Driver{Log: crLog, RabbitName: "rabbit-1", Mock: true} + httpHandler := &UserHttp{Log: crLog, Drvr: drvr, Mock: true} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + var readerBody io.Reader = nil + if len(test.body) > 0 { + readerBody = bytes.NewReader(test.body) + } + request, _ := http.NewRequest(test.method, "/trial", readerBody) + + response := httptest.NewRecorder() + + httpHandler.TrialRequest(response, request) + + res := response.Result() + got := response.Body.String() + + if res.StatusCode != test.wantStatus { + t.Errorf("got status %d, want status %d", res.StatusCode, test.wantStatus) + } + if got != test.wantText { + t.Errorf("got %q, want %q", got, test.wantText) + } + }) + } +} + +func TestE_Lifecycle(t *testing.T) { + + scheduleJobs := []struct { + name string + method string + body []byte + wantText string + wantStatus int + }{ + { + name: "schedule job 1", + method: http.MethodPost, + body: []byte("{\"computeName\": \"rabbit-compute-3\", \"workflowName\": \"yellow\", \"sourcePath\": \"/mnt/nnf/dc51a384-99bd-4ef1-8444-4ee3b0cdc8a8-0\", \"destinationPath\": \"/lus/global/dean/foo\", \"dryrun\": true}"), + wantText: "name=nnf-copy-offload-node-0\n", + wantStatus: http.StatusOK, + }, + { + name: "schedule job 2", + method: http.MethodPost, + body: []byte("{\"computeName\": \"rabbit-compute-4\", \"workflowName\": \"yellow\", \"sourcePath\": \"/mnt/nnf/dc51a384-99bd-4ef1-8444-4ee3b0cdc8a8-0\", \"destinationPath\": \"/lus/global/dean/foo\", \"dryrun\": true}"), + wantText: "name=nnf-copy-offload-node-1\n", + wantStatus: http.StatusOK, + }, + { + name: "schedule job 3", + method: http.MethodPost, + body: []byte("{\"computeName\": \"rabbit-compute-5\", \"workflowName\": \"yellow\", \"sourcePath\": \"/mnt/nnf/dc51a384-99bd-4ef1-8444-4ee3b0cdc8a8-0\", \"destinationPath\": \"/lus/global/dean/foo\", \"dryrun\": true}"), + wantText: "name=nnf-copy-offload-node-2\n", + wantStatus: http.StatusOK, + }, + } + + crLog := setupLog() + drvr := &driver.Driver{Log: crLog, RabbitName: "rabbit-1", Mock: true} + httpHandler := &UserHttp{Log: crLog, Drvr: drvr, Mock: true} + + var listWanted []string + var jobCount int = 0 + for _, test := range scheduleJobs { + t.Run(test.name, func(t *testing.T) { + var readerBody io.Reader = nil + if len(test.body) > 0 { + readerBody = bytes.NewReader(test.body) + } + request, _ := http.NewRequest(test.method, "/trial", readerBody) + + response := httptest.NewRecorder() + + httpHandler.TrialRequest(response, request) + + res := response.Result() + got := response.Body.String() + + if res.StatusCode != test.wantStatus { + t.Errorf("got status %d, want status %d", res.StatusCode, test.wantStatus) + } + if got != test.wantText { + t.Errorf("got %q, want %q", got, test.wantText) + } else { + parts := strings.Split(got, "=") + listWanted = append(listWanted, strings.TrimRight(parts[1], "\n")) + jobCount += 1 + } + }) + } + + stringWanted := strings.Join(listWanted, ",") + t.Run("list all jobs", func(t *testing.T) { + request, _ := http.NewRequest(http.MethodGet, "/list", nil) + response := httptest.NewRecorder() + + httpHandler.ListRequests(response, request) + + res := response.Result() + got := response.Body.String() + chopGot := strings.TrimRight(got, "\n") + + if res.StatusCode != http.StatusOK { + t.Errorf("got status %d, want status %d", res.StatusCode, http.StatusOK) + } + if chopGot != stringWanted { + t.Errorf("got %q, want %q", chopGot, stringWanted) + } + }) + + t.Run("cancel job", func(t *testing.T) { + // Go bug? If I try to dynamically build the url for this request I will + // get a null pointer reference in CancelRequest(), where 'req' will + // be null. + request, _ := http.NewRequest(http.MethodDelete, "/cancel/nnf-copy-offload-node-0", nil) + response := httptest.NewRecorder() + + httpHandler.CancelRequest(response, request) + + res := response.Result() + got := response.Body.String() + + if res.StatusCode != http.StatusNoContent { + t.Errorf("got status %d, want status %d", res.StatusCode, http.StatusNoContent) + } + if got != "\n" { + t.Errorf("got %q, want %q", got, "(newline)") + } + }) + + stringWanted = strings.Join(listWanted[1:], ",") + t.Run("list remaining jobs", func(t *testing.T) { + request, _ := http.NewRequest(http.MethodGet, "/list", nil) + response := httptest.NewRecorder() + + httpHandler.ListRequests(response, request) + + res := response.Result() + got := response.Body.String() + chopGot := strings.TrimRight(got, "\n") + + if res.StatusCode != http.StatusOK { + t.Errorf("got status %d, want status %d", res.StatusCode, http.StatusOK) + } + if chopGot != stringWanted { + t.Errorf("got %q, want %q", chopGot, stringWanted) + } + }) +} + +// Just touch ginkgo, so it's here to interpret any ginkgo args from +// "make test", so that doesn't fail on this test file. +var _ = BeforeSuite(func() {}) diff --git a/internal/controller/datamovement_controller.go b/internal/controller/datamovement_controller.go index a61e2160..29ecf513 100644 --- a/internal/controller/datamovement_controller.go +++ b/internal/controller/datamovement_controller.go @@ -339,7 +339,7 @@ func (r *DataMovementReconciler) Reconcile(ctx context.Context, req ctrl.Request // Status.Message. When successful, check the profile/UserConfig config options to log // and/or store the output. if errors.Is(ctxCancel.Err(), context.Canceled) { - log.Error(err, "Data movement operation cancelled", "output", combinedOutBuf.String()) + log.Info("Data movement operation cancelled", "output", combinedOutBuf.String()) dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonCancelled } else if err != nil { log.Error(err, "Data movement operation failed", "output", combinedOutBuf.String()) diff --git a/internal/controller/helpers/datamovement_helpers.go b/internal/controller/helpers/datamovement_helpers.go index ceb85a4b..026c8299 100644 --- a/internal/controller/helpers/datamovement_helpers.go +++ b/internal/controller/helpers/datamovement_helpers.go @@ -458,6 +458,10 @@ func mpiIsDir(profile *nnfv1alpha4.NnfDataMovementProfile, path string, uid, gid } else if strings.Contains(strings.ToLower(output), "file") { log.Info("mpiIsDir", "path", path, "directory", false) return false, nil + } else if os.Getenv("ENVIRONMENT") == "kind" && output == "symbolic link\n" { + // In KIND it will be a symlink to a directory. + log.Info("mpiIsDir using symlink in KIND", "path", path) + return true, nil } else { return false, fmt.Errorf("could not determine file type of path ('%s'): %s", path, output) }