diff --git a/daemons/user-copy/cmd/main.go b/daemons/user-copy/cmd/main.go index a63b9d22..0416988d 100644 --- a/daemons/user-copy/cmd/main.go +++ b/daemons/user-copy/cmd/main.go @@ -43,7 +43,7 @@ import ( dwsv1alpha2 "github.com/DataWorkflowServices/dws/api/v1alpha2" "github.com/NearNodeFlash/nnf-dm/daemons/user-copy/pkg/driver" userHttp "github.com/NearNodeFlash/nnf-dm/daemons/user-copy/pkg/server" - nnfv1alpha3 "github.com/NearNodeFlash/nnf-sos/api/v1alpha3" + nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4" ) var ( @@ -53,7 +53,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(nnfv1alpha3.AddToScheme(scheme)) + utilruntime.Must(nnfv1alpha4.AddToScheme(scheme)) utilruntime.Must(dwsv1alpha2.AddToScheme(scheme)) } diff --git a/daemons/user-copy/pkg/driver/driver.go b/daemons/user-copy/pkg/driver/driver.go index 941806d7..bd822bde 100644 --- a/daemons/user-copy/pkg/driver/driver.go +++ b/daemons/user-copy/pkg/driver/driver.go @@ -44,7 +44,7 @@ import ( dwsv1alpha2 "github.com/DataWorkflowServices/dws/api/v1alpha2" lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1" "github.com/NearNodeFlash/nnf-dm/internal/controller/helpers" - nnfv1alpha3 "github.com/NearNodeFlash/nnf-sos/api/v1alpha3" + nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4" ) var ( @@ -70,7 +70,7 @@ type Driver struct { // and cancel data movement operations in progress. // These objects are stored in the Driver.contexts map. type SrvrDataMovementRecord struct { - dmreq *nnfv1alpha3.NnfDataMovement + dmreq *nnfv1alpha4.NnfDataMovement cancelContext helpers.DataMovementCancelContext } @@ -81,7 +81,7 @@ type DriverRequest struct { Drvr *Driver // Fresh copy of the chosen NnfDataMovementProfile. - dmProfile *nnfv1alpha3.NnfDataMovementProfile + dmProfile *nnfv1alpha4.NnfDataMovementProfile // Storage nodes to use. nodes []string // Worker nodes to use. @@ -90,7 +90,7 @@ type DriverRequest struct { mpiHostfile string } -func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alpha3.NnfDataMovement, error) { +func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alpha4.NnfDataMovement, error) { drvr := r.Drvr crLog := drvr.Log.WithValues("workflow", dmreq.WorkflowName) @@ -114,7 +114,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph } crLog = crLog.WithValues("type", computeMountInfo.Type) - var dm *nnfv1alpha3.NnfDataMovement + var dm *nnfv1alpha4.NnfDataMovement switch computeMountInfo.Type { case "lustre": dm, err = r.createNnfDataMovement(ctx, dmreq, computeMountInfo, computeClientMount) @@ -139,7 +139,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph return nil, err } dm.Spec.ProfileReference = corev1.ObjectReference{ - Kind: reflect.TypeOf(nnfv1alpha3.NnfDataMovementProfile{}).Name(), + Kind: reflect.TypeOf(nnfv1alpha4.NnfDataMovementProfile{}).Name(), Name: r.dmProfile.Name, Namespace: r.dmProfile.Namespace, } @@ -154,7 +154,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph // Label the NnfDataMovement with a teardown state of "post_run" so the NNF workflow // controller can identify compute initiated data movements. - nnfv1alpha3.AddDataMovementTeardownStateLabel(dm, dwsv1alpha2.StatePostRun) + nnfv1alpha4.AddDataMovementTeardownStateLabel(dm, dwsv1alpha2.StatePostRun) // Allow the user to override/supplement certain settings setUserConfig(dmreq, dm) @@ -167,7 +167,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph return dm, nil } -func (r *DriverRequest) Drive(ctx context.Context, dmreq DMRequest, dm *nnfv1alpha3.NnfDataMovement) error { +func (r *DriverRequest) Drive(ctx context.Context, dmreq DMRequest, dm *nnfv1alpha4.NnfDataMovement) error { var err error drvr := r.Drvr @@ -251,7 +251,7 @@ func (r *DriverRequest) ListRequests(ctx context.Context) ([]string, error) { return items, nil } -func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context.Context, dm *nnfv1alpha3.NnfDataMovement, crLog logr.Logger) error { +func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context.Context, dm *nnfv1alpha4.NnfDataMovement, crLog logr.Logger) error { drvr := r.Drvr // Prepare Destination Directory @@ -273,8 +273,8 @@ func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context. // Record the start of the data movement operation now := metav1.NowMicro() dm.Status.StartTime = &now - dm.Status.State = nnfv1alpha3.DataMovementConditionTypeRunning - cmdStatus := nnfv1alpha3.NnfDataMovementCommandStatus{} + dm.Status.State = nnfv1alpha4.DataMovementConditionTypeRunning + cmdStatus := nnfv1alpha4.NnfDataMovementCommandStatus{} cmdStatus.Command = cmd.String() dm.Status.CommandStatus = &cmdStatus crLog.Info("Running Command", "cmd", cmdStatus.Command) @@ -285,7 +285,7 @@ func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context. return nil } -func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdStatus *nnfv1alpha3.NnfDataMovementCommandStatus, profile *nnfv1alpha3.NnfDataMovementProfile, mpiHostfile string, log logr.Logger) { +func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdStatus *nnfv1alpha4.NnfDataMovementCommandStatus, profile *nnfv1alpha4.NnfDataMovementProfile, mpiHostfile string, log logr.Logger) { // Execute the go routine to perform the data movement go func() { @@ -348,13 +348,13 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt //// Update the CommandStatus in the DM resource after we parsed all the lines //err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - // dm := &nnfv1alpha3.NnfDataMovement{} + // dm := &nnfv1alpha4.NnfDataMovement{} // if err := r.Get(ctx, req.NamespacedName, dm); err != nil { // return client.IgnoreNotFound(err) // } // // if dm.Status.CommandStatus == nil { - // dm.Status.CommandStatus = &nnfv1alpha3.NnfDataMovementCommandStatus{} + // dm.Status.CommandStatus = &nnfv1alpha4.NnfDataMovementCommandStatus{} // } // cmdStatus.DeepCopyInto(dm.Status.CommandStatus) // @@ -395,18 +395,18 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt // Command is finished, update status //now := metav1.NowMicro() //dm.Status.EndTime = &now - //dm.Status.State = nnfv1alpha3.DataMovementConditionTypeFinished - //dm.Status.Status = nnfv1alpha3.DataMovementConditionReasonSuccess + //dm.Status.State = nnfv1alpha4.DataMovementConditionTypeFinished + //dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonSuccess // On cancellation or failure, log the output. On failure, also store the output in the // Status.Message. When successful, check the profile/UserConfig config options to log // and/or store the output. if errors.Is(ctxCancel.Err(), context.Canceled) { log.Error(err, "Data movement operation cancelled", "output", combinedOutBuf.String()) - //dm.Status.Status = nnfv1alpha3.DataMovementConditionReasonCancelled + //dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonCancelled } else if err != nil { log.Error(err, "Data movement operation failed", "output", combinedOutBuf.String()) - //dm.Status.Status = nnfv1alpha3.DataMovementConditionReasonFailed + //dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonFailed //dm.Status.Message = fmt.Sprintf("%s: %s", err.Error(), combinedOutBuf.String()) //resourceErr := dwsv1alpha2.NewResourceError("").WithError(err).WithUserMessage("data movement operation failed: %s", combinedOutBuf.String()).WithFatal() //dm.Status.SetResourceErrorAndLog(resourceErr, log) @@ -430,7 +430,7 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt //status := dm.Status.DeepCopy() //err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - // dm := &nnfv1alpha3.NnfDataMovement{} + // dm := &nnfv1alpha4.NnfDataMovement{} // if err := r.Get(ctx, req.NamespacedName, dm); err != nil { // return client.IgnoreNotFound(err) // } @@ -452,8 +452,8 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt } // Set the DM's UserConfig options based on the incoming requests's options -func setUserConfig(dmreq DMRequest, dm *nnfv1alpha3.NnfDataMovement) { - dm.Spec.UserConfig = &nnfv1alpha3.NnfDataMovementConfig{} +func setUserConfig(dmreq DMRequest, dm *nnfv1alpha4.NnfDataMovement) { + dm.Spec.UserConfig = &nnfv1alpha4.NnfDataMovementConfig{} dm.Spec.UserConfig.Dryrun = dmreq.Dryrun dm.Spec.UserConfig.MpirunOptions = dmreq.MpirunOptions dm.Spec.UserConfig.DcpOptions = dmreq.DcpOptions @@ -468,7 +468,7 @@ func setUserConfig(dmreq DMRequest, dm *nnfv1alpha3.NnfDataMovement) { } } -func (r *DriverRequest) createNnfDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo, computeClientMount *dwsv1alpha2.ClientMount) (*nnfv1alpha3.NnfDataMovement, error) { +func (r *DriverRequest) createNnfDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo, computeClientMount *dwsv1alpha2.ClientMount) (*nnfv1alpha4.NnfDataMovement, error) { // Find the ClientMount for the rabbit. source, err := r.findRabbitRelativeSource(ctx, dmreq, computeMountInfo) @@ -488,25 +488,25 @@ func (r *DriverRequest) createNnfDataMovement(ctx context.Context, dmreq DMReque return nil, err } - dm := &nnfv1alpha3.NnfDataMovement{ + dm := &nnfv1alpha4.NnfDataMovement{ ObjectMeta: metav1.ObjectMeta{ // Be careful about how much you put into GenerateName. // The MPI operator will use the resulting name as a // prefix for its own names. GenerateName: nameBase, // Use the data movement namespace. - Namespace: nnfv1alpha3.DataMovementNamespace, + Namespace: nnfv1alpha4.DataMovementNamespace, Labels: map[string]string{ - nnfv1alpha3.DataMovementInitiatorLabel: dmreq.ComputeName, - nnfv1alpha3.DirectiveIndexLabel: dwIndex, + nnfv1alpha4.DataMovementInitiatorLabel: dmreq.ComputeName, + nnfv1alpha4.DirectiveIndexLabel: dwIndex, }, }, - Spec: nnfv1alpha3.NnfDataMovementSpec{ - Source: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{ + Spec: nnfv1alpha4.NnfDataMovementSpec{ + Source: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ Path: source, StorageReference: computeMountInfo.Device.DeviceReference.ObjectReference, }, - Destination: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{ + Destination: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ Path: dmreq.DestinationPath, StorageReference: corev1.ObjectReference{ Kind: reflect.TypeOf(*lustrefs).Name(), @@ -527,7 +527,7 @@ func getDirectiveIndexFromClientMount(object *dwsv1alpha2.ClientMount) (string, return "", fmt.Errorf("unable to find labels on compute ClientMount, namespaces=%s, name=%s", object.Namespace, object.Name) } - dwIndex, found := labels[nnfv1alpha3.DirectiveIndexLabel] + dwIndex, found := labels[nnfv1alpha4.DirectiveIndexLabel] if !found { return "", fmt.Errorf("unable to find directive index label on compute ClientMount, namespace=%s name=%s", object.Namespace, object.Name) } @@ -535,7 +535,7 @@ func getDirectiveIndexFromClientMount(object *dwsv1alpha2.ClientMount) (string, return dwIndex, nil } -func (r *DriverRequest) createNnfNodeDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo) (*nnfv1alpha3.NnfDataMovement, error) { +func (r *DriverRequest) createNnfNodeDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo) (*nnfv1alpha4.NnfDataMovement, error) { drvr := r.Drvr // Find the ClientMount for the rabbit. @@ -544,20 +544,20 @@ func (r *DriverRequest) createNnfNodeDataMovement(ctx context.Context, dmreq DMR return nil, err } - dm := &nnfv1alpha3.NnfDataMovement{ + dm := &nnfv1alpha4.NnfDataMovement{ ObjectMeta: metav1.ObjectMeta{ GenerateName: nodeNameBase, Namespace: drvr.RabbitName, // Use the rabbit Labels: map[string]string{ - nnfv1alpha3.DataMovementInitiatorLabel: dmreq.ComputeName, + nnfv1alpha4.DataMovementInitiatorLabel: dmreq.ComputeName, }, }, - Spec: nnfv1alpha3.NnfDataMovementSpec{ - Source: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{ + Spec: nnfv1alpha4.NnfDataMovementSpec{ + Source: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ Path: source, StorageReference: computeMountInfo.Device.DeviceReference.ObjectReference, }, - Destination: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{ + Destination: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{ Path: dmreq.DestinationPath, }, }, @@ -670,7 +670,7 @@ func (r *DriverRequest) findDestinationLustreFilesystem(ctx context.Context, des return nil, fmt.Errorf("unable to find a LustreFileSystem resource matching %s", origDest) } -func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nnfv1alpha3.NnfDataMovementProfile, error) { +func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nnfv1alpha4.NnfDataMovementProfile, error) { drvr := r.Drvr profileName := dmreq.DMProfile ns := "nnf-system" @@ -678,7 +678,7 @@ func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nn // If a profile is named then verify that it exists. Otherwise, verify that a default profile // can be found. if len(profileName) == 0 { - NnfDataMovementProfiles := &nnfv1alpha3.NnfDataMovementProfileList{} + NnfDataMovementProfiles := &nnfv1alpha4.NnfDataMovementProfileList{} if err := drvr.Client.List(ctx, NnfDataMovementProfiles, &client.ListOptions{Namespace: ns}); err != nil { return nil, err } @@ -698,7 +698,7 @@ func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nn profileName = profilesFound[0] } - profile := &nnfv1alpha3.NnfDataMovementProfile{ + profile := &nnfv1alpha4.NnfDataMovementProfile{ ObjectMeta: metav1.ObjectMeta{ Name: profileName, Namespace: ns,