-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce a copy-offload daemon in a user container (#238)
This daemon is an http server that accepts copy-offload requests and executes them directly. The daemon is in a container, on the rabbit, and was placed there by a "#DW container" directive. This commit includes a daemon that can be built locally and run outside a container, or it can be built in a container image. The necessary RBAC bits for config/ are coming in the next step. Signed-off-by: Dean Roehrich <[email protected]>
- Loading branch information
1 parent
86bb0a1
commit ab9ffbe
Showing
12 changed files
with
1,570 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright 2024 Hewlett Packard Enterprise Development LP | ||
* Other additional copyright holders may be indicated within. | ||
* | ||
* The entirety of this work is licensed under the Apache License, | ||
* Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. | ||
* | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"flag" | ||
"fmt" | ||
"log/slog" | ||
"net/http" | ||
"os" | ||
|
||
"github.com/go-logr/logr" | ||
"go.uber.org/zap" | ||
"go.uber.org/zap/zapcore" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/types" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
clientgoscheme "k8s.io/client-go/kubernetes/scheme" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
zapcr "sigs.k8s.io/controller-runtime/pkg/log/zap" | ||
|
||
dwsv1alpha2 "github.com/DataWorkflowServices/dws/api/v1alpha2" | ||
"github.com/NearNodeFlash/nnf-dm/daemons/copy-offload/pkg/driver" | ||
userHttp "github.com/NearNodeFlash/nnf-dm/daemons/copy-offload/pkg/server" | ||
nnfv1alpha4 "github.com/NearNodeFlash/nnf-sos/api/v1alpha4" | ||
) | ||
|
||
var ( | ||
scheme = runtime.NewScheme() | ||
rabbitName string | ||
) | ||
|
||
func init() { | ||
utilruntime.Must(clientgoscheme.AddToScheme(scheme)) | ||
utilruntime.Must(nnfv1alpha4.AddToScheme(scheme)) | ||
utilruntime.Must(dwsv1alpha2.AddToScheme(scheme)) | ||
} | ||
|
||
func setupLog() logr.Logger { | ||
encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) | ||
zaplogger := zapcr.New(zapcr.Encoder(encoder), zapcr.UseDevMode(true)) | ||
ctrl.SetLogger(zaplogger) | ||
|
||
// controllerruntime logger. | ||
crLog := ctrl.Log.WithName("copy-offload") | ||
return crLog | ||
} | ||
|
||
func setupClient(crLog logr.Logger) client.Client { | ||
config := ctrl.GetConfigOrDie() | ||
|
||
clnt, err := client.New(config, client.Options{Scheme: scheme}) | ||
if err != nil { | ||
crLog.Error(err, "Unable to create client") | ||
os.Exit(1) | ||
} | ||
return clnt | ||
} | ||
|
||
func clientSanity(crLog logr.Logger, clnt client.Client, rabbitName string) { | ||
// Sanity check the client connection. | ||
nnfNode := &nnfv1alpha4.NnfNode{} | ||
if err := clnt.Get(context.TODO(), types.NamespacedName{Name: "nnf-nlc", Namespace: rabbitName}, nnfNode); err != nil { | ||
crLog.Error(err, "Failed to retrieve my own NnfNode") | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
func main() { | ||
port := "8080" | ||
mock := false | ||
|
||
flag.StringVar(&port, "port", port, "Port for server.") | ||
flag.BoolVar(&mock, "mock", mock, "Mock mode for tests; does not use k8s.") | ||
flag.Parse() | ||
|
||
rabbitName = os.Getenv("NNF_NODE_NAME") | ||
if rabbitName == "" { | ||
fmt.Println("Did not find NNF_NODE_NAME") | ||
os.Exit(1) | ||
} | ||
|
||
crLog := setupLog() | ||
// Make one of these for this server, and use it in all requests. | ||
drvr := &driver.Driver{Log: crLog, RabbitName: rabbitName, Mock: mock} | ||
if !mock { | ||
clnt := setupClient(crLog) | ||
clientSanity(crLog, clnt, rabbitName) | ||
drvr.Client = clnt | ||
} | ||
slog.Info("Ready", "node", rabbitName, "port", port, "mock", mock) | ||
|
||
httpHandler := &userHttp.UserHttp{Log: crLog, Drvr: drvr, Mock: mock} | ||
|
||
http.HandleFunc("/hello", httpHandler.Hello) | ||
http.HandleFunc("/trial", httpHandler.TrialRequest) | ||
http.HandleFunc("/cancel/", httpHandler.CancelRequest) | ||
http.HandleFunc("/list", httpHandler.ListRequests) | ||
|
||
err := http.ListenAndServe(fmt.Sprintf(":%s", port), nil) | ||
if errors.Is(err, http.ErrServerClosed) { | ||
slog.Info("the server is closed") | ||
} else if err != nil { | ||
slog.Error("unable to start server", "err", err.Error()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright 2024 Hewlett Packard Enterprise Development LP | ||
* Other additional copyright holders may be indicated within. | ||
* | ||
* The entirety of this work is licensed under the Apache License, | ||
* Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. | ||
* | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package driver | ||
|
||
import ( | ||
"fmt" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
) | ||
|
||
// DMRequest represents the content of one http request. This has a | ||
// one-to-one relationship with a DriverRequest object. | ||
type DMRequest struct { | ||
ComputeName string `json:"computeName"` | ||
|
||
// The name and namespace of the initiating workflow | ||
WorkflowName string `json:"workflowName"` | ||
WorkflowNamespace string `json:"workflowNamespace"` | ||
// Source file or directory | ||
SourcePath string `json:"sourcePath"` | ||
// Destination file or directory | ||
DestinationPath string `json:"destinationPath"` | ||
// If True, the data movement command runs `/bin/true` rather than perform actual data movement | ||
Dryrun bool `json:"dryrun"` | ||
// Extra options to pass to `dcp` if present in the Data Movement command. | ||
DcpOptions string `json:"dcpOptions"` | ||
// If true, enable server-side logging of stdout when the command is successful. Failure output is always logged. | ||
LogStdout bool `json:"logStdout"` | ||
// If true, store stdout in DataMovementStatusResponse.Message when the command is successful. Failure output is always contained in the message. | ||
StoreStdout bool `json:"storeStdout"` | ||
// The number of slots specified in the MPI hostfile. A value of 0 disables the use of slots in | ||
// the hostfile. -1 will defer to the server side configuration. | ||
Slots int32 `json:"slots"` | ||
// The number of max_slots specified in the MPI hostfile. A value of 0 disables the use of | ||
// max_slots in the hostfile. -1 will defer to the server side configuration. | ||
MaxSlots int32 `json:"maxSlots"` | ||
// The name of the data movement configuration profile to use. The above parameters (e.g. slots, | ||
// logStdout) will override the settings defined by the profile. This profile must exist on the | ||
// server otherwise the data movement operation will be invalid. Empty will default to the | ||
// default profile. | ||
DMProfile string `json:"dmProfile"` | ||
// Extra options to pass to `mpirun` if present in the Data Movement command. | ||
MpirunOptions string `json:"mpirunOptions"` | ||
} | ||
|
||
func (m *DMRequest) Validator() error { | ||
|
||
if m.ComputeName == "" { | ||
return fmt.Errorf("compute name must be supplied") | ||
} | ||
if m.WorkflowName == "" { | ||
return fmt.Errorf("workflow name must be supplied") | ||
} | ||
if m.SourcePath == "" { | ||
return fmt.Errorf("source path must be supplied") | ||
} | ||
if m.DestinationPath == "" { | ||
return fmt.Errorf("destination path must be supplied") | ||
} | ||
if m.WorkflowNamespace == "" { | ||
m.WorkflowNamespace = corev1.NamespaceDefault | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.