Skip to content

Commit

Permalink
move to nnfv1alpha4
Browse files Browse the repository at this point in the history
Signed-off-by: Dean Roehrich <[email protected]>
  • Loading branch information
roehrich-hpe committed Nov 21, 2024
1 parent f84af9c commit 1c08d29
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
4 changes: 2 additions & 2 deletions daemons/user-copy/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
dwsv1alpha2 "github.com/DataWorkflowServices/dws/api/v1alpha2"
"github.com/NearNodeFlash/nnf-dm/daemons/user-copy/pkg/driver"
userHttp "github.com/NearNodeFlash/nnf-dm/daemons/user-copy/pkg/server"
nnfv1alpha3 "github.com/NearNodeFlash/nnf-sos/api/v1alpha3"
nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4"
)

var (
Expand All @@ -53,7 +53,7 @@ var (

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(nnfv1alpha3.AddToScheme(scheme))
utilruntime.Must(nnfv1alpha4.AddToScheme(scheme))
utilruntime.Must(dwsv1alpha2.AddToScheme(scheme))
}

Expand Down
78 changes: 39 additions & 39 deletions daemons/user-copy/pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
dwsv1alpha2 "github.com/DataWorkflowServices/dws/api/v1alpha2"
lusv1beta1 "github.com/NearNodeFlash/lustre-fs-operator/api/v1beta1"
"github.com/NearNodeFlash/nnf-dm/internal/controller/helpers"
nnfv1alpha3 "github.com/NearNodeFlash/nnf-sos/api/v1alpha3"
nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4"
)

var (
Expand All @@ -70,7 +70,7 @@ type Driver struct {
// and cancel data movement operations in progress.
// These objects are stored in the Driver.contexts map.
type SrvrDataMovementRecord struct {
dmreq *nnfv1alpha3.NnfDataMovement
dmreq *nnfv1alpha4.NnfDataMovement
cancelContext helpers.DataMovementCancelContext
}

Expand All @@ -81,7 +81,7 @@ type DriverRequest struct {
Drvr *Driver

// Fresh copy of the chosen NnfDataMovementProfile.
dmProfile *nnfv1alpha3.NnfDataMovementProfile
dmProfile *nnfv1alpha4.NnfDataMovementProfile
// Storage nodes to use.
nodes []string
// Worker nodes to use.
Expand All @@ -90,7 +90,7 @@ type DriverRequest struct {
mpiHostfile string
}

func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alpha3.NnfDataMovement, error) {
func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alpha4.NnfDataMovement, error) {

drvr := r.Drvr
crLog := drvr.Log.WithValues("workflow", dmreq.WorkflowName)
Expand All @@ -114,7 +114,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph
}

crLog = crLog.WithValues("type", computeMountInfo.Type)
var dm *nnfv1alpha3.NnfDataMovement
var dm *nnfv1alpha4.NnfDataMovement
switch computeMountInfo.Type {
case "lustre":
dm, err = r.createNnfDataMovement(ctx, dmreq, computeMountInfo, computeClientMount)
Expand All @@ -139,7 +139,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph
return nil, err
}
dm.Spec.ProfileReference = corev1.ObjectReference{
Kind: reflect.TypeOf(nnfv1alpha3.NnfDataMovementProfile{}).Name(),
Kind: reflect.TypeOf(nnfv1alpha4.NnfDataMovementProfile{}).Name(),
Name: r.dmProfile.Name,
Namespace: r.dmProfile.Namespace,
}
Expand All @@ -154,7 +154,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph

// Label the NnfDataMovement with a teardown state of "post_run" so the NNF workflow
// controller can identify compute initiated data movements.
nnfv1alpha3.AddDataMovementTeardownStateLabel(dm, dwsv1alpha2.StatePostRun)
nnfv1alpha4.AddDataMovementTeardownStateLabel(dm, dwsv1alpha2.StatePostRun)

// Allow the user to override/supplement certain settings
setUserConfig(dmreq, dm)
Expand All @@ -167,7 +167,7 @@ func (r *DriverRequest) Create(ctx context.Context, dmreq DMRequest) (*nnfv1alph
return dm, nil
}

func (r *DriverRequest) Drive(ctx context.Context, dmreq DMRequest, dm *nnfv1alpha3.NnfDataMovement) error {
func (r *DriverRequest) Drive(ctx context.Context, dmreq DMRequest, dm *nnfv1alpha4.NnfDataMovement) error {

var err error
drvr := r.Drvr
Expand Down Expand Up @@ -251,7 +251,7 @@ func (r *DriverRequest) ListRequests(ctx context.Context) ([]string, error) {
return items, nil
}

func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context.Context, dm *nnfv1alpha3.NnfDataMovement, crLog logr.Logger) error {
func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context.Context, dm *nnfv1alpha4.NnfDataMovement, crLog logr.Logger) error {
drvr := r.Drvr

// Prepare Destination Directory
Expand All @@ -273,8 +273,8 @@ func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context.
// Record the start of the data movement operation
now := metav1.NowMicro()
dm.Status.StartTime = &now
dm.Status.State = nnfv1alpha3.DataMovementConditionTypeRunning
cmdStatus := nnfv1alpha3.NnfDataMovementCommandStatus{}
dm.Status.State = nnfv1alpha4.DataMovementConditionTypeRunning
cmdStatus := nnfv1alpha4.NnfDataMovementCommandStatus{}
cmdStatus.Command = cmd.String()
dm.Status.CommandStatus = &cmdStatus
crLog.Info("Running Command", "cmd", cmdStatus.Command)
Expand All @@ -285,7 +285,7 @@ func (r *DriverRequest) driveWithContext(ctx context.Context, ctxCancel context.
return nil
}

func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdStatus *nnfv1alpha3.NnfDataMovementCommandStatus, profile *nnfv1alpha3.NnfDataMovementProfile, mpiHostfile string, log logr.Logger) {
func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdStatus *nnfv1alpha4.NnfDataMovementCommandStatus, profile *nnfv1alpha4.NnfDataMovementProfile, mpiHostfile string, log logr.Logger) {

// Execute the go routine to perform the data movement
go func() {
Expand Down Expand Up @@ -348,13 +348,13 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt

//// Update the CommandStatus in the DM resource after we parsed all the lines
//err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
// dm := &nnfv1alpha3.NnfDataMovement{}
// dm := &nnfv1alpha4.NnfDataMovement{}
// if err := r.Get(ctx, req.NamespacedName, dm); err != nil {
// return client.IgnoreNotFound(err)
// }
//
// if dm.Status.CommandStatus == nil {
// dm.Status.CommandStatus = &nnfv1alpha3.NnfDataMovementCommandStatus{}
// dm.Status.CommandStatus = &nnfv1alpha4.NnfDataMovementCommandStatus{}
// }
// cmdStatus.DeepCopyInto(dm.Status.CommandStatus)
//
Expand Down Expand Up @@ -395,18 +395,18 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt
// Command is finished, update status
//now := metav1.NowMicro()
//dm.Status.EndTime = &now
//dm.Status.State = nnfv1alpha3.DataMovementConditionTypeFinished
//dm.Status.Status = nnfv1alpha3.DataMovementConditionReasonSuccess
//dm.Status.State = nnfv1alpha4.DataMovementConditionTypeFinished
//dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonSuccess

// On cancellation or failure, log the output. On failure, also store the output in the
// Status.Message. When successful, check the profile/UserConfig config options to log
// and/or store the output.
if errors.Is(ctxCancel.Err(), context.Canceled) {
log.Error(err, "Data movement operation cancelled", "output", combinedOutBuf.String())
//dm.Status.Status = nnfv1alpha3.DataMovementConditionReasonCancelled
//dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonCancelled
} else if err != nil {
log.Error(err, "Data movement operation failed", "output", combinedOutBuf.String())
//dm.Status.Status = nnfv1alpha3.DataMovementConditionReasonFailed
//dm.Status.Status = nnfv1alpha4.DataMovementConditionReasonFailed
//dm.Status.Message = fmt.Sprintf("%s: %s", err.Error(), combinedOutBuf.String())
//resourceErr := dwsv1alpha2.NewResourceError("").WithError(err).WithUserMessage("data movement operation failed: %s", combinedOutBuf.String()).WithFatal()
//dm.Status.SetResourceErrorAndLog(resourceErr, log)
Expand All @@ -430,7 +430,7 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt
//status := dm.Status.DeepCopy()

//err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
// dm := &nnfv1alpha3.NnfDataMovement{}
// dm := &nnfv1alpha4.NnfDataMovement{}
// if err := r.Get(ctx, req.NamespacedName, dm); err != nil {
// return client.IgnoreNotFound(err)
// }
Expand All @@ -452,8 +452,8 @@ func runit(ctxCancel context.Context, contextDelete func(), cmd *exec.Cmd, cmdSt
}

// Set the DM's UserConfig options based on the incoming requests's options
func setUserConfig(dmreq DMRequest, dm *nnfv1alpha3.NnfDataMovement) {
dm.Spec.UserConfig = &nnfv1alpha3.NnfDataMovementConfig{}
func setUserConfig(dmreq DMRequest, dm *nnfv1alpha4.NnfDataMovement) {
dm.Spec.UserConfig = &nnfv1alpha4.NnfDataMovementConfig{}
dm.Spec.UserConfig.Dryrun = dmreq.Dryrun
dm.Spec.UserConfig.MpirunOptions = dmreq.MpirunOptions
dm.Spec.UserConfig.DcpOptions = dmreq.DcpOptions
Expand All @@ -468,7 +468,7 @@ func setUserConfig(dmreq DMRequest, dm *nnfv1alpha3.NnfDataMovement) {
}
}

func (r *DriverRequest) createNnfDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo, computeClientMount *dwsv1alpha2.ClientMount) (*nnfv1alpha3.NnfDataMovement, error) {
func (r *DriverRequest) createNnfDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo, computeClientMount *dwsv1alpha2.ClientMount) (*nnfv1alpha4.NnfDataMovement, error) {

// Find the ClientMount for the rabbit.
source, err := r.findRabbitRelativeSource(ctx, dmreq, computeMountInfo)
Expand All @@ -488,25 +488,25 @@ func (r *DriverRequest) createNnfDataMovement(ctx context.Context, dmreq DMReque
return nil, err
}

dm := &nnfv1alpha3.NnfDataMovement{
dm := &nnfv1alpha4.NnfDataMovement{
ObjectMeta: metav1.ObjectMeta{
// Be careful about how much you put into GenerateName.
// The MPI operator will use the resulting name as a
// prefix for its own names.
GenerateName: nameBase,
// Use the data movement namespace.
Namespace: nnfv1alpha3.DataMovementNamespace,
Namespace: nnfv1alpha4.DataMovementNamespace,
Labels: map[string]string{
nnfv1alpha3.DataMovementInitiatorLabel: dmreq.ComputeName,
nnfv1alpha3.DirectiveIndexLabel: dwIndex,
nnfv1alpha4.DataMovementInitiatorLabel: dmreq.ComputeName,
nnfv1alpha4.DirectiveIndexLabel: dwIndex,
},
},
Spec: nnfv1alpha3.NnfDataMovementSpec{
Source: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{
Spec: nnfv1alpha4.NnfDataMovementSpec{
Source: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{
Path: source,
StorageReference: computeMountInfo.Device.DeviceReference.ObjectReference,
},
Destination: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{
Destination: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{
Path: dmreq.DestinationPath,
StorageReference: corev1.ObjectReference{
Kind: reflect.TypeOf(*lustrefs).Name(),
Expand All @@ -527,15 +527,15 @@ func getDirectiveIndexFromClientMount(object *dwsv1alpha2.ClientMount) (string,
return "", fmt.Errorf("unable to find labels on compute ClientMount, namespaces=%s, name=%s", object.Namespace, object.Name)
}

dwIndex, found := labels[nnfv1alpha3.DirectiveIndexLabel]
dwIndex, found := labels[nnfv1alpha4.DirectiveIndexLabel]
if !found {
return "", fmt.Errorf("unable to find directive index label on compute ClientMount, namespace=%s name=%s", object.Namespace, object.Name)
}

return dwIndex, nil
}

func (r *DriverRequest) createNnfNodeDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo) (*nnfv1alpha3.NnfDataMovement, error) {
func (r *DriverRequest) createNnfNodeDataMovement(ctx context.Context, dmreq DMRequest, computeMountInfo *dwsv1alpha2.ClientMountInfo) (*nnfv1alpha4.NnfDataMovement, error) {
drvr := r.Drvr

// Find the ClientMount for the rabbit.
Expand All @@ -544,20 +544,20 @@ func (r *DriverRequest) createNnfNodeDataMovement(ctx context.Context, dmreq DMR
return nil, err
}

dm := &nnfv1alpha3.NnfDataMovement{
dm := &nnfv1alpha4.NnfDataMovement{
ObjectMeta: metav1.ObjectMeta{
GenerateName: nodeNameBase,
Namespace: drvr.RabbitName, // Use the rabbit
Labels: map[string]string{
nnfv1alpha3.DataMovementInitiatorLabel: dmreq.ComputeName,
nnfv1alpha4.DataMovementInitiatorLabel: dmreq.ComputeName,
},
},
Spec: nnfv1alpha3.NnfDataMovementSpec{
Source: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{
Spec: nnfv1alpha4.NnfDataMovementSpec{
Source: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{
Path: source,
StorageReference: computeMountInfo.Device.DeviceReference.ObjectReference,
},
Destination: &nnfv1alpha3.NnfDataMovementSpecSourceDestination{
Destination: &nnfv1alpha4.NnfDataMovementSpecSourceDestination{
Path: dmreq.DestinationPath,
},
},
Expand Down Expand Up @@ -670,15 +670,15 @@ func (r *DriverRequest) findDestinationLustreFilesystem(ctx context.Context, des
return nil, fmt.Errorf("unable to find a LustreFileSystem resource matching %s", origDest)
}

func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nnfv1alpha3.NnfDataMovementProfile, error) {
func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nnfv1alpha4.NnfDataMovementProfile, error) {
drvr := r.Drvr
profileName := dmreq.DMProfile
ns := "nnf-system"

// If a profile is named then verify that it exists. Otherwise, verify that a default profile
// can be found.
if len(profileName) == 0 {
NnfDataMovementProfiles := &nnfv1alpha3.NnfDataMovementProfileList{}
NnfDataMovementProfiles := &nnfv1alpha4.NnfDataMovementProfileList{}
if err := drvr.Client.List(ctx, NnfDataMovementProfiles, &client.ListOptions{Namespace: ns}); err != nil {
return nil, err
}
Expand All @@ -698,7 +698,7 @@ func (r *DriverRequest) selectProfile(ctx context.Context, dmreq DMRequest) (*nn
profileName = profilesFound[0]
}

profile := &nnfv1alpha3.NnfDataMovementProfile{
profile := &nnfv1alpha4.NnfDataMovementProfile{
ObjectMeta: metav1.ObjectMeta{
Name: profileName,
Namespace: ns,
Expand Down

0 comments on commit 1c08d29

Please sign in to comment.