Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Sep 4, 2024
1 parent 4726c33 commit 2afd06e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 132 deletions.
200 changes: 104 additions & 96 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var crdDefines = map[string]struct{}{
v1alpha1.MetricKind: {},
}

func runE(ctx context.Context, flags *flagpole) error {
func runE(ctx context.Context, flags *flagpole) (err error) {
logger := log.FromContext(ctx)

if flags.Kubeconfig != "" {
Expand All @@ -137,14 +137,47 @@ func runE(ctx context.Context, flags *flagpole) error {
}
}

manages := flags.Options.Manages
var nodeSel internalversion.ManageNodeSelector
if len(manages) != 0 {
nodeSel, err = manages.NodeSelector()
if err != nil {
return err
}
} else {
switch {
case flags.Options.ManageSingleNode != "":
logger.Info("Watch single node",
"node", flags.Options.ManageSingleNode,
)
nodeSel.ManageSingleNode = flags.Options.ManageSingleNode
case flags.Options.ManageAllNodes:
logger.Info("Watch all nodes")
nodeSel.ManageAllNodes = true
case flags.Options.ManageNodesWithAnnotationSelector != "" || flags.Options.ManageNodesWithLabelSelector != "":
logger.Info("Watch nodes",
"annotation", flags.Options.ManageNodesWithAnnotationSelector,
"label", flags.Options.ManageNodesWithLabelSelector,
)
nodeSel.ManageNodesWithLabelSelector = flags.Options.ManageNodesWithLabelSelector
nodeSel.ManageNodesWithAnnotationSelector = flags.Options.ManageNodesWithAnnotationSelector
}
}

if nodeSel.IsEmpty() {
flags.Options.EnableCRDs = slices.Filter(flags.Options.EnableCRDs, func(s string) bool {
return s == v1alpha1.MetricKind || s == v1alpha1.StageKind
})
}

for _, crd := range flags.Options.EnableCRDs {
if _, ok := crdDefines[crd]; !ok {
return fmt.Errorf("invalid crd: %s", crd)
}
}

stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx)
err := checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.StageKind, stagesData)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.StageKind, stagesData)
if err != nil {
return err
}
Expand Down Expand Up @@ -220,33 +253,6 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

manages := flags.Options.Manages
var nodeSel internalversion.ManageNodeSelector
if len(manages) != 0 {
nodeSel, err = manages.NodeSelector()
if err != nil {
return err
}
} else {
switch {
case flags.Options.ManageSingleNode != "":
logger.Info("Watch single node",
"node", flags.Options.ManageSingleNode,
)
nodeSel.ManageSingleNode = flags.Options.ManageSingleNode
case flags.Options.ManageAllNodes:
logger.Info("Watch all nodes")
nodeSel.ManageAllNodes = true
case flags.Options.ManageNodesWithAnnotationSelector != "" || flags.Options.ManageNodesWithLabelSelector != "":
logger.Info("Watch nodes",
"annotation", flags.Options.ManageNodesWithAnnotationSelector,
"label", flags.Options.ManageNodesWithLabelSelector,
)
nodeSel.ManageNodesWithLabelSelector = flags.Options.ManageNodesWithLabelSelector
nodeSel.ManageNodesWithAnnotationSelector = flags.Options.ManageNodesWithAnnotationSelector
}
}

id, err := controllers.Identity()
if err != nil {
return err
Expand Down Expand Up @@ -312,95 +318,97 @@ func startServer(ctx context.Context, flags *flagpole, ctr *controllers.Controll
}

if serverAddress != "" {
mangeNode := !nodeSelector.IsEmpty()
conf := server.Config{
TypedKwokClient: typedKwokClient,
NoManageNode: nodeSelector.IsEmpty(),
EnableCRDs: flags.Options.EnableCRDs,
DataSource: ctr,
NodeCacheGetter: ctr.GetNodeCache(),
PodCacheGetter: ctr.GetPodCache(),
clusterPortForwards := config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, clusterPortForwards)
if err != nil {
return err
}

if mangeNode {
conf.ClusterPortForwards = config.FilterWithTypeFromContext[*internalversion.ClusterPortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterPortForwardKind, conf.ClusterPortForwards)
if err != nil {
return err
}

conf.PortForwards = config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, conf.PortForwards)
if err != nil {
return err
}
portForwards := config.FilterWithTypeFromContext[*internalversion.PortForward](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.PortForwardKind, portForwards)
if err != nil {
return err
}

conf.ClusterExecs = config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, conf.ClusterExecs)
if err != nil {
return err
}
clusterExecs := config.FilterWithTypeFromContext[*internalversion.ClusterExec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterExecKind, clusterExecs)
if err != nil {
return err
}

conf.Execs = config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, conf.Execs)
if err != nil {
return err
}
execs := config.FilterWithTypeFromContext[*internalversion.Exec](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ExecKind, execs)
if err != nil {
return err
}

conf.ClusterLogs = config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, conf.ClusterLogs)
if err != nil {
return err
}
clusterLogs := config.FilterWithTypeFromContext[*internalversion.ClusterLogs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterLogsKind, clusterLogs)
if err != nil {
return err
}

conf.Logs = config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, conf.Logs)
if err != nil {
return err
}
logs := config.FilterWithTypeFromContext[*internalversion.Logs](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.LogsKind, logs)
if err != nil {
return err
}

conf.ClusterAttaches = config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, conf.ClusterAttaches)
if err != nil {
return err
}
clusterAttaches := config.FilterWithTypeFromContext[*internalversion.ClusterAttach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterAttachKind, clusterAttaches)
if err != nil {
return err
}

conf.Attaches = config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, conf.Attaches)
if err != nil {
return err
}
attaches := config.FilterWithTypeFromContext[*internalversion.Attach](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.AttachKind, attaches)
if err != nil {
return err
}

conf.ClusterResourceUsages = config.FilterWithTypeFromContext[*internalversion.ClusterResourceUsage](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterResourceUsageKind, conf.ClusterResourceUsages)
if err != nil {
return err
}
clusterResourceUsages := config.FilterWithTypeFromContext[*internalversion.ClusterResourceUsage](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ClusterResourceUsageKind, clusterResourceUsages)
if err != nil {
return err
}

conf.ResourceUsages = config.FilterWithTypeFromContext[*internalversion.ResourceUsage](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ResourceUsageKind, conf.ResourceUsages)
if err != nil {
return err
}
resourceUsages := config.FilterWithTypeFromContext[*internalversion.ResourceUsage](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.ResourceUsageKind, resourceUsages)
if err != nil {
return err
}

conf.Metrics = config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, conf.Metrics)
metrics := config.FilterWithTypeFromContext[*internalversion.Metric](ctx)
err = checkConfigOrCRD(flags.Options.EnableCRDs, v1alpha1.MetricKind, metrics)
if err != nil {
return err
}

conf := server.Config{
TypedKwokClient: typedKwokClient,
EnableCRDs: flags.Options.EnableCRDs,
ClusterPortForwards: clusterPortForwards,
PortForwards: portForwards,
ClusterExecs: clusterExecs,
Execs: execs,
ClusterLogs: clusterLogs,
Logs: logs,
ClusterAttaches: clusterAttaches,
Attaches: attaches,
ClusterResourceUsages: clusterResourceUsages,
ResourceUsages: resourceUsages,
Metrics: metrics,
DataSource: ctr,
NodeCacheGetter: ctr.GetNodeCache(),
PodCacheGetter: ctr.GetPodCache(),
}
svc, err := server.NewServer(conf)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
svc.InstallHealthz()

if mangeNode {
svc.InstallServiceDiscovery()
}

if mangeNode && flags.Options.EnableDebuggingHandlers {
if flags.Options.EnableDebuggingHandlers {
svc.InstallDebuggingHandlers()
svc.InstallProfilingHandler(flags.Options.EnableProfilingHandler, flags.Options.EnableContentionProfiling)
} else {
Expand Down
36 changes: 0 additions & 36 deletions pkg/kwok/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type Server struct {

enableCRDs []string

noManageNode bool

restfulCont *restful.Container

idleTimeout time.Duration
Expand Down Expand Up @@ -99,8 +97,6 @@ type Config struct {
TypedKwokClient versioned.Interface
EnableCRDs []string

NoManageNode bool

ClusterPortForwards []*internalversion.ClusterPortForward
PortForwards []*internalversion.PortForward
ClusterExecs []*internalversion.ClusterExec
Expand Down Expand Up @@ -129,8 +125,6 @@ func NewServer(conf Config) (*Server, error) {
idleTimeout: 1 * time.Hour,
streamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout,

noManageNode: conf.NoManageNode,

clusterPortForwards: resources.NewStaticGetter(conf.ClusterPortForwards),
portForwards: resources.NewStaticGetter(conf.PortForwards),
clusterExecs: resources.NewStaticGetter(conf.ClusterExecs),
Expand Down Expand Up @@ -167,9 +161,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
for _, crd := range s.enableCRDs {
switch crd {
case v1alpha1.ClusterPortForwardKind:
if s.noManageNode {
continue
}
if len(s.clusterPortForwards.Get()) != 0 {
return nil, fmt.Errorf("cluster port forwards already exists, cannot watch CRD")
}
Expand All @@ -193,9 +184,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterPortForwards)
s.clusterPortForwards = clusterPortForwards
case v1alpha1.PortForwardKind:
if s.noManageNode {
continue
}
if len(s.portForwards.Get()) != 0 {
return nil, fmt.Errorf("port forwards already exists, cannot watch CRD")
}
Expand All @@ -219,9 +207,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, portForwards)
s.portForwards = portForwards
case v1alpha1.ClusterExecKind:
if s.noManageNode {
continue
}
if len(s.clusterExecs.Get()) != 0 {
return nil, fmt.Errorf("cluster execs already exists, cannot watch CRD")
}
Expand All @@ -245,9 +230,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterExecs)
s.clusterExecs = clusterExecs
case v1alpha1.ExecKind:
if s.noManageNode {
continue
}
if len(s.execs.Get()) != 0 {
return nil, fmt.Errorf("execs already exists, cannot watch CRD")
}
Expand All @@ -271,9 +253,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, execs)
s.execs = execs
case v1alpha1.ClusterLogsKind:
if s.noManageNode {
continue
}
if len(s.clusterLogs.Get()) != 0 {
return nil, fmt.Errorf("cluster logs already exists, cannot watch CRD")
}
Expand All @@ -297,9 +276,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterLogs)
s.clusterLogs = clusterLogs
case v1alpha1.LogsKind:
if s.noManageNode {
continue
}
if len(s.logs.Get()) != 0 {
return nil, fmt.Errorf("logs already exists, cannot watch CRD")
}
Expand All @@ -323,9 +299,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, logs)
s.logs = logs
case v1alpha1.ClusterAttachKind:
if s.noManageNode {
continue
}
if len(s.clusterAttaches.Get()) != 0 {
return nil, fmt.Errorf("cluster attaches already exists, cannot watch CRD")
}
Expand All @@ -349,9 +322,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterAttaches)
s.clusterAttaches = clusterAttaches
case v1alpha1.AttachKind:
if s.noManageNode {
continue
}
if len(s.attaches.Get()) != 0 {
return nil, fmt.Errorf("attaches already exists, cannot watch CRD")
}
Expand All @@ -375,9 +345,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, attaches)
s.attaches = attaches
case v1alpha1.ClusterResourceUsageKind:
if s.noManageNode {
continue
}
if len(s.clusterResourceUsages.Get()) != 0 {
return nil, fmt.Errorf("cluster resource usage already exists, cannot watch CRD")
}
Expand All @@ -401,9 +368,6 @@ func (s *Server) initWatchCRD(ctx context.Context) ([]resources.Starter, error)
starters = append(starters, clusterResourceUsages)
s.clusterResourceUsages = clusterResourceUsages
case v1alpha1.ResourceUsageKind:
if s.noManageNode {
continue
}
if len(s.resourceUsages.Get()) != 0 {
return nil, fmt.Errorf("resource usage already exists, cannot watch CRD")
}
Expand Down

0 comments on commit 2afd06e

Please sign in to comment.