Skip to content

Commit

Permalink
refactor components interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
koct9i committed Nov 6, 2024
1 parent 5d45249 commit dbc03e6
Show file tree
Hide file tree
Showing 45 changed files with 557 additions and 870 deletions.
18 changes: 12 additions & 6 deletions controllers/chyt_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"fmt"
"time"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
Expand All @@ -17,21 +18,26 @@ func (r *ChytReconciler) Sync(ctx context.Context, resource *ytv1.Chyt, ytsaurus

chyt := apiproxy.NewChyt(resource, r.Client, r.Recorder, r.Scheme)

cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(chyt.APIProxy().Client()))
ytsaurusApi := apiproxy.NewYtsaurus(ytsaurus, r.Client, r.Recorder, r.Scheme)

component := components.NewChyt(cfgen, chyt, ytsaurus)
cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(chyt.Client()))

component := components.NewChyt(cfgen, chyt, ytsaurusApi)

err := component.Fetch(ctx)
if err != nil {
logger.Error(err, "failed to fetch CHYT status for controller")
return ctrl.Result{Requeue: true}, err
}

if chyt.GetResource().Status.ReleaseStatus == ytv1.ChytReleaseStatusFinished {
if chyt.Resource().Status.ReleaseStatus == ytv1.ChytReleaseStatusFinished {
return ctrl.Result{}, nil
}

status := component.Status(ctx)
status, err := component.Sync(ctx, true)
if err != nil {
return ctrl.Result{Requeue: true}, fmt.Errorf("failed to get status for %s: %w", component.GetName(), err)
}
if status.SyncStatus == components.SyncStatusBlocked {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
Expand All @@ -43,12 +49,12 @@ func (r *ChytReconciler) Sync(ctx context.Context, resource *ytv1.Chyt, ytsaurus
return ctrl.Result{Requeue: true}, err
}

if err := component.Sync(ctx); err != nil {
if _, err := component.Sync(ctx, false); err != nil {
logger.Error(err, "component sync failed", "component", "chyt")
return ctrl.Result{Requeue: true}, err
}

if err := chyt.APIProxy().UpdateStatus(ctx); err != nil {
if err := chyt.UpdateStatus(ctx); err != nil {
logger.Error(err, "update chyt status failed")
return ctrl.Result{Requeue: true}, err
}
Expand Down
37 changes: 19 additions & 18 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ func NewComponentManager(
ytsaurus *apiProxy.Ytsaurus,
) (*ComponentManager, error) {
logger := log.FromContext(ctx)
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client())
clusterDomain := getClusterDomain(ytsaurus.Client())
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
for _, hpSpec := range ytsaurus.Resource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
}
yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0])

var dnds []components.Component
nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain)
nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.Resource(), clusterDomain)
if len(resource.Spec.DataNodes) > 0 {
for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes {
for _, dndSpec := range ytsaurus.Resource().Spec.DataNodes {
dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, m, dndSpec))
}
}
Expand All @@ -70,31 +70,31 @@ func NewComponentManager(

if len(resource.Spec.RPCProxies) > 0 {
var rps []components.Component
for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies {
for _, rpSpec := range ytsaurus.Resource().Spec.RPCProxies {
rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, m, rpSpec))
}
allComponents = append(allComponents, rps...)
}

if len(resource.Spec.TCPProxies) > 0 {
var tps []components.Component
for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies {
for _, tpSpec := range ytsaurus.Resource().Spec.TCPProxies {
tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, m, tpSpec))
}
allComponents = append(allComponents, tps...)
}

var ends []components.Component
if len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
for _, endSpec := range ytsaurus.Resource().Spec.ExecNodes {
ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, m, endSpec))
}
}
allComponents = append(allComponents, ends...)

var tnds []components.Component
if len(resource.Spec.TabletNodes) > 0 {
for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes {
for idx, tndSpec := range ytsaurus.Resource().Spec.TabletNodes {
tnds = append(tnds, components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0))
}
}
Expand Down Expand Up @@ -152,22 +152,23 @@ func NewComponentManager(
return nil, err
}

componentStatus, err := c.Status(ctx)
componentStatus, err := c.Sync(ctx, true)
if err != nil {
return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err)
}

c.SetReadyCondition(componentStatus)
ytsaurus.SetStatusCondition(components.GetReadyCondition(c, componentStatus))

if !componentStatus.IsRunning() {
status.needInit = true
}

syncStatus := componentStatus.SyncStatus

if syncStatus == components.SyncStatusNeedLocalUpdate {
status.needUpdate = append(status.needUpdate, c)
}

if !components.IsRunningStatus(syncStatus) {
status.needInit = true
}

if syncStatus != components.SyncStatusReady && syncStatus != components.SyncStatusUpdating {
status.allReadyOrUpdating = false
}
Expand Down Expand Up @@ -201,7 +202,7 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) {

hasPending := false
for _, c := range cm.allComponents {
status, err := c.Status(ctx)
status, err := c.Sync(ctx, true)
if err != nil {
return ctrl.Result{Requeue: true}, fmt.Errorf("failed to get status for %s: %w", c.GetName(), err)
}
Expand All @@ -210,14 +211,14 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) {
status.SyncStatus == components.SyncStatusUpdating {
hasPending = true
logger.Info("component sync", "component", c.GetName())
if err := c.Sync(ctx); err != nil {
if _, err := c.Sync(ctx, false); err != nil {
logger.Error(err, "component sync failed", "component", c.GetName())
return ctrl.Result{Requeue: true}, err
}
}
}

if err := cm.ytsaurus.APIProxy().UpdateStatus(ctx); err != nil {
if err := cm.ytsaurus.UpdateStatus(ctx); err != nil {
logger.Error(err, "update Ytsaurus status failed")
return ctrl.Result{Requeue: true}, err
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/remotedatanodes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *RemoteDataNodesReconciler) Sync(
component := components.NewRemoteDataNodes(
cfgen,
resource,
apiProxy,
apiProxy.APIProxy(),
resource.Spec.DataNodesSpec,
resource.Spec.CommonSpec,
)
Expand All @@ -42,7 +42,7 @@ func (r *RemoteDataNodesReconciler) Sync(
return ctrl.Result{Requeue: true}, err
}

status, err := component.Sync(ctx)
status, err := component.Sync(ctx, false)
if err != nil {
logger.Error(err, "failed to sync remote nodes")
return ctrl.Result{Requeue: true}, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/remoteexecnodes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *RemoteExecNodesReconciler) Sync(
component := components.NewRemoteExecNodes(
cfgen,
resource,
apiProxy,
apiProxy.APIProxy(),
resource.Spec.ExecNodesSpec,
resource.Spec.CommonSpec,
)
Expand All @@ -42,7 +42,7 @@ func (r *RemoteExecNodesReconciler) Sync(
return ctrl.Result{Requeue: true}, err
}

status, err := component.Sync(ctx)
status, err := component.Sync(ctx, false)
if err != nil {
logger.Error(err, "failed to sync remote nodes")
return ctrl.Result{Requeue: true}, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/remotetabletnodes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *RemoteTabletNodesReconciler) Sync(
component := components.NewRemoteTabletNodes(
cfgen,
resource,
apiProxy,
apiProxy.APIProxy(),
resource.Spec.TabletNodesSpec,
resource.Spec.CommonSpec,
)
Expand All @@ -42,7 +42,7 @@ func (r *RemoteTabletNodesReconciler) Sync(
return ctrl.Result{Requeue: true}, err
}

status, err := component.Sync(ctx)
status, err := component.Sync(ctx, false)
if err != nil {
logger.Error(err, "failed to sync remote nodes")
return ctrl.Result{Requeue: true}, err
Expand Down
14 changes: 9 additions & 5 deletions controllers/spyt_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus

spyt := apiproxy.NewSpyt(resource, r.Client, r.Recorder, r.Scheme)

cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(spyt.APIProxy().Client()))
cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(spyt.Client()))

component := components.NewSpyt(cfgen, spyt, ytsaurus)

Expand All @@ -27,11 +27,15 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus
return ctrl.Result{Requeue: true}, err
}

if spyt.GetResource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished {
if spyt.Resource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished {
return ctrl.Result{}, nil
}

componentStatus := component.Status(ctx)
componentStatus, err := component.Sync(ctx, true)
if err != nil {
logger.Error(err, "failed to get SPYT status for controller")
return ctrl.Result{Requeue: true}, err
}

if componentStatus.SyncStatus == components.SyncStatusBlocked {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
Expand All @@ -44,12 +48,12 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus
return ctrl.Result{Requeue: true}, err
}

if err := component.Sync(ctx); err != nil {
if _, err := component.Sync(ctx, false); err != nil {
logger.Error(err, "component sync failed", "component", "spyt")
return ctrl.Result{Requeue: true}, err
}

if err := spyt.APIProxy().UpdateStatus(ctx); err != nil {
if err := spyt.UpdateStatus(ctx); err != nil {
logger.Error(err, "update spyt status failed")
return ctrl.Result{Requeue: true}, err
}
Expand Down
16 changes: 8 additions & 8 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (r *YtsaurusReconciler) handleEverything(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand All @@ -41,7 +41,7 @@ func (r *YtsaurusReconciler) handleEverything(
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -167,7 +167,7 @@ func (r *YtsaurusReconciler) handleStateless(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand Down Expand Up @@ -239,7 +239,7 @@ func (r *YtsaurusReconciler) handleMasterOnly(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand All @@ -259,7 +259,7 @@ func (r *YtsaurusReconciler) handleMasterOnly(
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -315,7 +315,7 @@ func (r *YtsaurusReconciler) handleTabletNodesOnly(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand All @@ -335,7 +335,7 @@ func (r *YtsaurusReconciler) handleTabletNodesOnly(
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -543,7 +543,7 @@ func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus)
needUpdateNames = append(needUpdateNames, c.GetName())
}
logger = logger.WithValues("componentsForUpdateAll", needUpdateNames)
meta, blockMsg := chooseUpdateFlow(ytsaurus.GetResource().Spec, needUpdate)
meta, blockMsg := chooseUpdateFlow(ytsaurus.Resource().Spec, needUpdate)
if blockMsg != "" {
logger.Info(blockMsg)
return ctrl.Result{Requeue: true}, nil
Expand Down
Loading

0 comments on commit dbc03e6

Please sign in to comment.