Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor components interfaces #386

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions controllers/chyt_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"fmt"
"time"

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
Expand All @@ -17,21 +18,26 @@ func (r *ChytReconciler) Sync(ctx context.Context, resource *ytv1.Chyt, ytsaurus

chyt := apiproxy.NewChyt(resource, r.Client, r.Recorder, r.Scheme)

cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(chyt.APIProxy().Client()))
ytsaurusApi := apiproxy.NewYtsaurus(ytsaurus, r.Client, r.Recorder, r.Scheme)

component := components.NewChyt(cfgen, chyt, ytsaurus)
cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(chyt.Client()))

component := components.NewChyt(cfgen, chyt, ytsaurusApi)

err := component.Fetch(ctx)
if err != nil {
logger.Error(err, "failed to fetch CHYT status for controller")
return ctrl.Result{Requeue: true}, err
}

if chyt.GetResource().Status.ReleaseStatus == ytv1.ChytReleaseStatusFinished {
if chyt.Resource().Status.ReleaseStatus == ytv1.ChytReleaseStatusFinished {
return ctrl.Result{}, nil
}

status := component.Status(ctx)
status, err := component.Sync(ctx, true)
if err != nil {
return ctrl.Result{Requeue: true}, fmt.Errorf("failed to get status for %s: %w", component.GetName(), err)
}
if status.SyncStatus == components.SyncStatusBlocked {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
Expand All @@ -43,12 +49,12 @@ func (r *ChytReconciler) Sync(ctx context.Context, resource *ytv1.Chyt, ytsaurus
return ctrl.Result{Requeue: true}, err
}

if err := component.Sync(ctx); err != nil {
if _, err := component.Sync(ctx, false); err != nil {
logger.Error(err, "component sync failed", "component", "chyt")
return ctrl.Result{Requeue: true}, err
}

if err := chyt.APIProxy().UpdateStatus(ctx); err != nil {
if err := chyt.UpdateStatus(ctx); err != nil {
logger.Error(err, "update chyt status failed")
return ctrl.Result{Requeue: true}, err
}
Expand Down
37 changes: 19 additions & 18 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ func NewComponentManager(
ytsaurus *apiProxy.Ytsaurus,
) (*ComponentManager, error) {
logger := log.FromContext(ctx)
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client())
clusterDomain := getClusterDomain(ytsaurus.Client())
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
for _, hpSpec := range ytsaurus.Resource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
}
yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0])

var dnds []components.Component
nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain)
nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.Resource(), clusterDomain)
if len(resource.Spec.DataNodes) > 0 {
for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes {
for _, dndSpec := range ytsaurus.Resource().Spec.DataNodes {
dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, m, dndSpec))
}
}
Expand All @@ -70,31 +70,31 @@ func NewComponentManager(

if len(resource.Spec.RPCProxies) > 0 {
var rps []components.Component
for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies {
for _, rpSpec := range ytsaurus.Resource().Spec.RPCProxies {
rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, m, rpSpec))
}
allComponents = append(allComponents, rps...)
}

if len(resource.Spec.TCPProxies) > 0 {
var tps []components.Component
for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies {
for _, tpSpec := range ytsaurus.Resource().Spec.TCPProxies {
tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, m, tpSpec))
}
allComponents = append(allComponents, tps...)
}

var ends []components.Component
if len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
for _, endSpec := range ytsaurus.Resource().Spec.ExecNodes {
ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, m, endSpec))
}
}
allComponents = append(allComponents, ends...)

var tnds []components.Component
if len(resource.Spec.TabletNodes) > 0 {
for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes {
for idx, tndSpec := range ytsaurus.Resource().Spec.TabletNodes {
tnds = append(tnds, components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0))
}
}
Expand Down Expand Up @@ -152,22 +152,23 @@ func NewComponentManager(
return nil, err
}

componentStatus, err := c.Status(ctx)
componentStatus, err := c.Sync(ctx, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is clearer semantically and easier to read Status() method for this read-only action instead less obvious Sync(ctx, true) which is look like we are syncing but dry-running.
I think tradeoff of extra method in each component should be in favour of readability in place of use in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR in intermediate state so there might be rough pieces. I don't like this one too.
Unfortunately there is no defined clear semantics. I'm trying to make one.

if err != nil {
return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err)
}

c.SetReadyCondition(componentStatus)
ytsaurus.SetStatusCondition(components.GetReadyCondition(c, componentStatus))

if !componentStatus.IsRunning() {
status.needInit = true
}

syncStatus := componentStatus.SyncStatus

if syncStatus == components.SyncStatusNeedLocalUpdate {
status.needUpdate = append(status.needUpdate, c)
}

if !components.IsRunningStatus(syncStatus) {
status.needInit = true
}

if syncStatus != components.SyncStatusReady && syncStatus != components.SyncStatusUpdating {
status.allReadyOrUpdating = false
}
Expand Down Expand Up @@ -201,7 +202,7 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) {

hasPending := false
for _, c := range cm.allComponents {
status, err := c.Status(ctx)
status, err := c.Sync(ctx, true)
if err != nil {
return ctrl.Result{Requeue: true}, fmt.Errorf("failed to get status for %s: %w", c.GetName(), err)
}
Expand All @@ -210,14 +211,14 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) {
status.SyncStatus == components.SyncStatusUpdating {
hasPending = true
logger.Info("component sync", "component", c.GetName())
if err := c.Sync(ctx); err != nil {
if _, err := c.Sync(ctx, false); err != nil {
logger.Error(err, "component sync failed", "component", c.GetName())
return ctrl.Result{Requeue: true}, err
}
}
}

if err := cm.ytsaurus.APIProxy().UpdateStatus(ctx); err != nil {
if err := cm.ytsaurus.UpdateStatus(ctx); err != nil {
logger.Error(err, "update Ytsaurus status failed")
return ctrl.Result{Requeue: true}, err
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/remotedatanodes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *RemoteDataNodesReconciler) Sync(
component := components.NewRemoteDataNodes(
cfgen,
resource,
apiProxy,
apiProxy.APIProxy(),
resource.Spec.DataNodesSpec,
resource.Spec.CommonSpec,
)
Expand All @@ -42,7 +42,7 @@ func (r *RemoteDataNodesReconciler) Sync(
return ctrl.Result{Requeue: true}, err
}

status, err := component.Sync(ctx)
status, err := component.Sync(ctx, false)
if err != nil {
logger.Error(err, "failed to sync remote nodes")
return ctrl.Result{Requeue: true}, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/remoteexecnodes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *RemoteExecNodesReconciler) Sync(
component := components.NewRemoteExecNodes(
cfgen,
resource,
apiProxy,
apiProxy.APIProxy(),
resource.Spec.ExecNodesSpec,
resource.Spec.CommonSpec,
)
Expand All @@ -42,7 +42,7 @@ func (r *RemoteExecNodesReconciler) Sync(
return ctrl.Result{Requeue: true}, err
}

status, err := component.Sync(ctx)
status, err := component.Sync(ctx, false)
if err != nil {
logger.Error(err, "failed to sync remote nodes")
return ctrl.Result{Requeue: true}, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/remotetabletnodes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *RemoteTabletNodesReconciler) Sync(
component := components.NewRemoteTabletNodes(
cfgen,
resource,
apiProxy,
apiProxy.APIProxy(),
resource.Spec.TabletNodesSpec,
resource.Spec.CommonSpec,
)
Expand All @@ -42,7 +42,7 @@ func (r *RemoteTabletNodesReconciler) Sync(
return ctrl.Result{Requeue: true}, err
}

status, err := component.Sync(ctx)
status, err := component.Sync(ctx, false)
if err != nil {
logger.Error(err, "failed to sync remote nodes")
return ctrl.Result{Requeue: true}, err
Expand Down
14 changes: 9 additions & 5 deletions controllers/spyt_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus

spyt := apiproxy.NewSpyt(resource, r.Client, r.Recorder, r.Scheme)

cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(spyt.APIProxy().Client()))
cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(spyt.Client()))

component := components.NewSpyt(cfgen, spyt, ytsaurus)

Expand All @@ -27,11 +27,15 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus
return ctrl.Result{Requeue: true}, err
}

if spyt.GetResource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished {
if spyt.Resource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished {
return ctrl.Result{}, nil
}

componentStatus := component.Status(ctx)
componentStatus, err := component.Sync(ctx, true)
if err != nil {
logger.Error(err, "failed to get SPYT status for controller")
return ctrl.Result{Requeue: true}, err
}

if componentStatus.SyncStatus == components.SyncStatusBlocked {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
Expand All @@ -44,12 +48,12 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus
return ctrl.Result{Requeue: true}, err
}

if err := component.Sync(ctx); err != nil {
if _, err := component.Sync(ctx, false); err != nil {
logger.Error(err, "component sync failed", "component", "spyt")
return ctrl.Result{Requeue: true}, err
}

if err := spyt.APIProxy().UpdateStatus(ctx); err != nil {
if err := spyt.UpdateStatus(ctx); err != nil {
logger.Error(err, "update spyt status failed")
return ctrl.Result{Requeue: true}, err
}
Expand Down
16 changes: 8 additions & 8 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (r *YtsaurusReconciler) handleEverything(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand All @@ -41,7 +41,7 @@ func (r *YtsaurusReconciler) handleEverything(
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -167,7 +167,7 @@ func (r *YtsaurusReconciler) handleStateless(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand Down Expand Up @@ -239,7 +239,7 @@ func (r *YtsaurusReconciler) handleMasterOnly(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand All @@ -259,7 +259,7 @@ func (r *YtsaurusReconciler) handleMasterOnly(
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -315,7 +315,7 @@ func (r *YtsaurusReconciler) handleTabletNodesOnly(
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()
resource := ytsaurus.Resource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
Expand All @@ -335,7 +335,7 @@ func (r *YtsaurusReconciler) handleTabletNodesOnly(
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -543,7 +543,7 @@ func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus)
needUpdateNames = append(needUpdateNames, c.GetName())
}
logger = logger.WithValues("componentsForUpdateAll", needUpdateNames)
meta, blockMsg := chooseUpdateFlow(ytsaurus.GetResource().Spec, needUpdate)
meta, blockMsg := chooseUpdateFlow(ytsaurus.Resource().Spec, needUpdate)
if blockMsg != "" {
logger.Info(blockMsg)
return ctrl.Result{Requeue: true}, nil
Expand Down
Loading
Loading