From dbc03e62ef5f42db3f8cc34cb94df9ceef00fc5d Mon Sep 17 00:00:00 2001 From: Konstantin Khlebnikov Date: Tue, 5 Nov 2024 18:51:02 +0100 Subject: [PATCH] refactor components interfaces --- controllers/chyt_sync.go | 18 ++- controllers/component_manager.go | 37 +++--- controllers/remotedatanodes_sync.go | 4 +- controllers/remoteexecnodes_sync.go | 4 +- controllers/remotetabletnodes_sync.go | 4 +- controllers/spyt_sync.go | 14 +- controllers/sync.go | 16 +-- pkg/apiproxy/chyt.go | 31 ++--- pkg/apiproxy/proxy.go | 77 +++++++---- pkg/apiproxy/spyt.go | 24 ++-- pkg/apiproxy/ytsaurus.go | 64 +++++----- pkg/components/chyt.go | 64 ++++------ pkg/components/component.go | 162 +++++++++++++++++------- pkg/components/controller_agent.go | 32 +---- pkg/components/data_node.go | 32 +---- pkg/components/data_node_remote.go | 33 ++--- pkg/components/discovery.go | 24 +--- pkg/components/exec_node.go | 49 +++---- pkg/components/exec_node_remote.go | 24 ++-- pkg/components/helpers.go | 25 ++-- pkg/components/httpproxy.go | 32 +---- pkg/components/init_job.go | 10 +- pkg/components/init_job_test.go | 4 +- pkg/components/master.go | 36 ++---- pkg/components/master_caches.go | 28 +--- pkg/components/microservice.go | 2 +- pkg/components/pods_manager.go | 37 ++---- pkg/components/query_tracker.go | 32 +---- pkg/components/queue_agent.go | 48 ++----- pkg/components/rpcproxy.go | 32 +---- pkg/components/scheduler.go | 50 ++------ pkg/components/server.go | 5 - pkg/components/spyt.go | 33 ++--- pkg/components/strawberry_controller.go | 48 ++----- pkg/components/suite_test.go | 24 ++-- pkg/components/tablet_node.go | 34 +---- pkg/components/tablet_node_remote.go | 29 ++--- pkg/components/tablet_node_test.go | 40 +++--- pkg/components/tcpproxy.go | 32 +---- pkg/components/ui.go | 34 ++--- pkg/components/yql_agent.go | 40 ++---- pkg/components/ytsaurus_client.go | 47 +++---- pkg/labeller/labeller.go | 6 +- pkg/resources/deployment.go | 4 +- pkg/resources/job.go | 2 + 45 files changed, 557 insertions(+), 870 deletions(-) diff --git a/controllers/chyt_sync.go b/controllers/chyt_sync.go index a00b9bd1..5ff66ece 100644 --- a/controllers/chyt_sync.go +++ b/controllers/chyt_sync.go @@ -2,6 +2,7 @@ package controllers import ( "context" + "fmt" "time" ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1" @@ -17,9 +18,11 @@ func (r *ChytReconciler) Sync(ctx context.Context, resource *ytv1.Chyt, ytsaurus chyt := apiproxy.NewChyt(resource, r.Client, r.Recorder, r.Scheme) - cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(chyt.APIProxy().Client())) + ytsaurusApi := apiproxy.NewYtsaurus(ytsaurus, r.Client, r.Recorder, r.Scheme) - component := components.NewChyt(cfgen, chyt, ytsaurus) + cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(chyt.Client())) + + component := components.NewChyt(cfgen, chyt, ytsaurusApi) err := component.Fetch(ctx) if err != nil { @@ -27,11 +30,14 @@ func (r *ChytReconciler) Sync(ctx context.Context, resource *ytv1.Chyt, ytsaurus return ctrl.Result{Requeue: true}, err } - if chyt.GetResource().Status.ReleaseStatus == ytv1.ChytReleaseStatusFinished { + if chyt.Resource().Status.ReleaseStatus == ytv1.ChytReleaseStatusFinished { return ctrl.Result{}, nil } - status := component.Status(ctx) + status, err := component.Sync(ctx, true) + if err != nil { + return ctrl.Result{Requeue: true}, fmt.Errorf("failed to get status for %s: %w", component.GetName(), err) + } if status.SyncStatus == components.SyncStatusBlocked { return ctrl.Result{RequeueAfter: time.Second * 10}, nil } @@ -43,12 +49,12 @@ func (r *ChytReconciler) Sync(ctx context.Context, resource *ytv1.Chyt, ytsaurus return ctrl.Result{Requeue: true}, err } - if err := component.Sync(ctx); err != nil { + if _, err := component.Sync(ctx, false); err != nil { logger.Error(err, "component sync failed", "component", "chyt") return ctrl.Result{Requeue: true}, err } - if err := chyt.APIProxy().UpdateStatus(ctx); err != nil { + if err := chyt.UpdateStatus(ctx); err != nil { logger.Error(err, "update chyt status failed") return ctrl.Result{Requeue: true}, err } diff --git a/controllers/component_manager.go b/controllers/component_manager.go index 61ef993e..f5a912c0 100644 --- a/controllers/component_manager.go +++ b/controllers/component_manager.go @@ -34,23 +34,23 @@ func NewComponentManager( ytsaurus *apiProxy.Ytsaurus, ) (*ComponentManager, error) { logger := log.FromContext(ctx) - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() - clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client()) + clusterDomain := getClusterDomain(ytsaurus.Client()) cfgen := ytconfig.NewGenerator(resource, clusterDomain) d := components.NewDiscovery(cfgen, ytsaurus) m := components.NewMaster(cfgen, ytsaurus) var hps []components.Component - for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies { + for _, hpSpec := range ytsaurus.Resource().Spec.HTTPProxies { hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec)) } yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0]) var dnds []components.Component - nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain) + nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.Resource(), clusterDomain) if len(resource.Spec.DataNodes) > 0 { - for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes { + for _, dndSpec := range ytsaurus.Resource().Spec.DataNodes { dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, m, dndSpec)) } } @@ -70,7 +70,7 @@ func NewComponentManager( if len(resource.Spec.RPCProxies) > 0 { var rps []components.Component - for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies { + for _, rpSpec := range ytsaurus.Resource().Spec.RPCProxies { rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, m, rpSpec)) } allComponents = append(allComponents, rps...) @@ -78,7 +78,7 @@ func NewComponentManager( if len(resource.Spec.TCPProxies) > 0 { var tps []components.Component - for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies { + for _, tpSpec := range ytsaurus.Resource().Spec.TCPProxies { tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, m, tpSpec)) } allComponents = append(allComponents, tps...) @@ -86,7 +86,7 @@ func NewComponentManager( var ends []components.Component if len(resource.Spec.ExecNodes) > 0 { - for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes { + for _, endSpec := range ytsaurus.Resource().Spec.ExecNodes { ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, m, endSpec)) } } @@ -94,7 +94,7 @@ func NewComponentManager( var tnds []components.Component if len(resource.Spec.TabletNodes) > 0 { - for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes { + for idx, tndSpec := range ytsaurus.Resource().Spec.TabletNodes { tnds = append(tnds, components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0)) } } @@ -152,22 +152,23 @@ func NewComponentManager( return nil, err } - componentStatus, err := c.Status(ctx) + componentStatus, err := c.Sync(ctx, true) if err != nil { return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err) } - c.SetReadyCondition(componentStatus) + ytsaurus.SetStatusCondition(components.GetReadyCondition(c, componentStatus)) + + if !componentStatus.IsRunning() { + status.needInit = true + } + syncStatus := componentStatus.SyncStatus if syncStatus == components.SyncStatusNeedLocalUpdate { status.needUpdate = append(status.needUpdate, c) } - if !components.IsRunningStatus(syncStatus) { - status.needInit = true - } - if syncStatus != components.SyncStatusReady && syncStatus != components.SyncStatusUpdating { status.allReadyOrUpdating = false } @@ -201,7 +202,7 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) { hasPending := false for _, c := range cm.allComponents { - status, err := c.Status(ctx) + status, err := c.Sync(ctx, true) if err != nil { return ctrl.Result{Requeue: true}, fmt.Errorf("failed to get status for %s: %w", c.GetName(), err) } @@ -210,14 +211,14 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) { status.SyncStatus == components.SyncStatusUpdating { hasPending = true logger.Info("component sync", "component", c.GetName()) - if err := c.Sync(ctx); err != nil { + if _, err := c.Sync(ctx, false); err != nil { logger.Error(err, "component sync failed", "component", c.GetName()) return ctrl.Result{Requeue: true}, err } } } - if err := cm.ytsaurus.APIProxy().UpdateStatus(ctx); err != nil { + if err := cm.ytsaurus.UpdateStatus(ctx); err != nil { logger.Error(err, "update Ytsaurus status failed") return ctrl.Result{Requeue: true}, err } diff --git a/controllers/remotedatanodes_sync.go b/controllers/remotedatanodes_sync.go index 877a812d..7a2a97c7 100644 --- a/controllers/remotedatanodes_sync.go +++ b/controllers/remotedatanodes_sync.go @@ -32,7 +32,7 @@ func (r *RemoteDataNodesReconciler) Sync( component := components.NewRemoteDataNodes( cfgen, resource, - apiProxy, + apiProxy.APIProxy(), resource.Spec.DataNodesSpec, resource.Spec.CommonSpec, ) @@ -42,7 +42,7 @@ func (r *RemoteDataNodesReconciler) Sync( return ctrl.Result{Requeue: true}, err } - status, err := component.Sync(ctx) + status, err := component.Sync(ctx, false) if err != nil { logger.Error(err, "failed to sync remote nodes") return ctrl.Result{Requeue: true}, err diff --git a/controllers/remoteexecnodes_sync.go b/controllers/remoteexecnodes_sync.go index 41a70428..68ffa225 100644 --- a/controllers/remoteexecnodes_sync.go +++ b/controllers/remoteexecnodes_sync.go @@ -32,7 +32,7 @@ func (r *RemoteExecNodesReconciler) Sync( component := components.NewRemoteExecNodes( cfgen, resource, - apiProxy, + apiProxy.APIProxy(), resource.Spec.ExecNodesSpec, resource.Spec.CommonSpec, ) @@ -42,7 +42,7 @@ func (r *RemoteExecNodesReconciler) Sync( return ctrl.Result{Requeue: true}, err } - status, err := component.Sync(ctx) + status, err := component.Sync(ctx, false) if err != nil { logger.Error(err, "failed to sync remote nodes") return ctrl.Result{Requeue: true}, err diff --git a/controllers/remotetabletnodes_sync.go b/controllers/remotetabletnodes_sync.go index 6c5793ce..d7143732 100644 --- a/controllers/remotetabletnodes_sync.go +++ b/controllers/remotetabletnodes_sync.go @@ -32,7 +32,7 @@ func (r *RemoteTabletNodesReconciler) Sync( component := components.NewRemoteTabletNodes( cfgen, resource, - apiProxy, + apiProxy.APIProxy(), resource.Spec.TabletNodesSpec, resource.Spec.CommonSpec, ) @@ -42,7 +42,7 @@ func (r *RemoteTabletNodesReconciler) Sync( return ctrl.Result{Requeue: true}, err } - status, err := component.Sync(ctx) + status, err := component.Sync(ctx, false) if err != nil { logger.Error(err, "failed to sync remote nodes") return ctrl.Result{Requeue: true}, err diff --git a/controllers/spyt_sync.go b/controllers/spyt_sync.go index 33789920..227f832a 100644 --- a/controllers/spyt_sync.go +++ b/controllers/spyt_sync.go @@ -17,7 +17,7 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus spyt := apiproxy.NewSpyt(resource, r.Client, r.Recorder, r.Scheme) - cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(spyt.APIProxy().Client())) + cfgen := ytconfig.NewGenerator(ytsaurus, getClusterDomain(spyt.Client())) component := components.NewSpyt(cfgen, spyt, ytsaurus) @@ -27,11 +27,15 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus return ctrl.Result{Requeue: true}, err } - if spyt.GetResource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished { + if spyt.Resource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished { return ctrl.Result{}, nil } - componentStatus := component.Status(ctx) + componentStatus, err := component.Sync(ctx, true) + if err != nil { + logger.Error(err, "failed to get SPYT status for controller") + return ctrl.Result{Requeue: true}, err + } if componentStatus.SyncStatus == components.SyncStatusBlocked { return ctrl.Result{RequeueAfter: time.Second * 10}, nil @@ -44,12 +48,12 @@ func (r *SpytReconciler) Sync(ctx context.Context, resource *ytv1.Spyt, ytsaurus return ctrl.Result{Requeue: true}, err } - if err := component.Sync(ctx); err != nil { + if _, err := component.Sync(ctx, false); err != nil { logger.Error(err, "component sync failed", "component", "spyt") return ctrl.Result{Requeue: true}, err } - if err := spyt.APIProxy().UpdateStatus(ctx); err != nil { + if err := spyt.UpdateStatus(ctx); err != nil { logger.Error(err, "update spyt status failed") return ctrl.Result{Requeue: true}, err } diff --git a/controllers/sync.go b/controllers/sync.go index 346dc752..d0972f7f 100644 --- a/controllers/sync.go +++ b/controllers/sync.go @@ -21,7 +21,7 @@ func (r *YtsaurusReconciler) handleEverything( ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager, ) (*ctrl.Result, error) { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() switch resource.Status.UpdateStatus.State { case ytv1.UpdateStateNone: @@ -41,7 +41,7 @@ func (r *YtsaurusReconciler) handleEverything( } case ytv1.UpdateStateImpossibleToStart: - if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate { + if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate { ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling") err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate) return &ctrl.Result{Requeue: true}, err @@ -167,7 +167,7 @@ func (r *YtsaurusReconciler) handleStateless( ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager, ) (*ctrl.Result, error) { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() switch resource.Status.UpdateStatus.State { case ytv1.UpdateStateNone: @@ -239,7 +239,7 @@ func (r *YtsaurusReconciler) handleMasterOnly( ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager, ) (*ctrl.Result, error) { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() switch resource.Status.UpdateStatus.State { case ytv1.UpdateStateNone: @@ -259,7 +259,7 @@ func (r *YtsaurusReconciler) handleMasterOnly( } case ytv1.UpdateStateImpossibleToStart: - if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate { + if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate { ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling") err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate) return &ctrl.Result{Requeue: true}, err @@ -315,7 +315,7 @@ func (r *YtsaurusReconciler) handleTabletNodesOnly( ytsaurus *apiProxy.Ytsaurus, componentManager *ComponentManager, ) (*ctrl.Result, error) { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() switch resource.Status.UpdateStatus.State { case ytv1.UpdateStateNone: @@ -335,7 +335,7 @@ func (r *YtsaurusReconciler) handleTabletNodesOnly( } case ytv1.UpdateStateImpossibleToStart: - if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate { + if !componentManager.needSync() || !ytsaurus.Resource().Spec.EnableFullUpdate { ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling") err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate) return &ctrl.Result{Requeue: true}, err @@ -543,7 +543,7 @@ func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) needUpdateNames = append(needUpdateNames, c.GetName()) } logger = logger.WithValues("componentsForUpdateAll", needUpdateNames) - meta, blockMsg := chooseUpdateFlow(ytsaurus.GetResource().Spec, needUpdate) + meta, blockMsg := chooseUpdateFlow(ytsaurus.Resource().Spec, needUpdate) if blockMsg != "" { logger.Info(blockMsg) return ctrl.Result{Requeue: true}, nil diff --git a/pkg/apiproxy/chyt.go b/pkg/apiproxy/chyt.go index bc50eb3e..15353991 100644 --- a/pkg/apiproxy/chyt.go +++ b/pkg/apiproxy/chyt.go @@ -13,45 +13,42 @@ import ( ) type Chyt struct { - apiProxy APIProxy - chyt *ytv1.Chyt + apiProxy[*ytv1.Chyt] } +var _ TypedAPIProxy[*ytv1.Chyt] = &Chyt{} + func NewChyt( chyt *ytv1.Chyt, client client.Client, recorder record.EventRecorder, scheme *runtime.Scheme) *Chyt { return &Chyt{ - chyt: chyt, - apiProxy: NewAPIProxy(chyt, client, recorder, scheme), + apiProxy: apiProxy[*ytv1.Chyt]{ + resource: chyt, + client: client, + recorder: recorder, + scheme: scheme, + }, } } -func (c *Chyt) GetResource() *ytv1.Chyt { - return c.chyt -} - -func (c *Chyt) APIProxy() APIProxy { - return c.apiProxy -} - func (c *Chyt) SetStatusCondition(condition metav1.Condition) { - meta.SetStatusCondition(&c.chyt.Status.Conditions, condition) + meta.SetStatusCondition(&c.resource.Status.Conditions, condition) } func (c *Chyt) IsStatusConditionTrue(conditionType string) bool { - return meta.IsStatusConditionTrue(c.chyt.Status.Conditions, conditionType) + return meta.IsStatusConditionTrue(c.resource.Status.Conditions, conditionType) } func (c *Chyt) IsStatusConditionFalse(conditionType string) bool { - return meta.IsStatusConditionFalse(c.chyt.Status.Conditions, conditionType) + return meta.IsStatusConditionFalse(c.resource.Status.Conditions, conditionType) } func (c *Chyt) SaveReleaseStatus(ctx context.Context, releaseStatus ytv1.ChytReleaseStatus) error { logger := log.FromContext(ctx) - c.GetResource().Status.ReleaseStatus = releaseStatus - if err := c.apiProxy.UpdateStatus(ctx); err != nil { + c.resource.Status.ReleaseStatus = releaseStatus + if err := c.UpdateStatus(ctx); err != nil { logger.Error(err, "unable to update Chyt release status") return err } diff --git a/pkg/apiproxy/proxy.go b/pkg/apiproxy/proxy.go index a11ac038..c9f941aa 100644 --- a/pkg/apiproxy/proxy.go +++ b/pkg/apiproxy/proxy.go @@ -15,7 +15,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -type APIProxy interface { +type OwningResource interface { + client.Object +} + +type TypedAPIProxy[T OwningResource] interface { + Resource() T + + APIProxy() APIProxy + Client() client.Client FetchObject(ctx context.Context, name string, obj client.Object) error ListObjects(ctx context.Context, objList client.ObjectList, opts ...client.ListOption) error @@ -27,44 +35,63 @@ type APIProxy interface { UpdateStatus(ctx context.Context) error } +type APIProxy = TypedAPIProxy[OwningResource] + type ConditionManager interface { SetStatusCondition(condition metav1.Condition) IsStatusConditionTrue(conditionType string) bool IsStatusConditionFalse(conditionType string) bool } -func NewAPIProxy( - object client.Object, +type UpdateConditionManager interface { + SetUpdateStatusCondition(ctx context.Context, condition metav1.Condition) + IsUpdateStatusConditionTrue(condition string) bool +} + +func NewAPIProxy[T OwningResource]( + resource T, client client.Client, recorder record.EventRecorder, - scheme *runtime.Scheme) APIProxy { - return &apiProxy{ - object: object, + scheme *runtime.Scheme) apiProxy[T] { + return apiProxy[T]{ + resource: resource, client: client, recorder: recorder, scheme: scheme, } } -type apiProxy struct { - object client.Object +type apiProxy[T OwningResource] struct { + resource T client client.Client recorder record.EventRecorder scheme *runtime.Scheme } -func (c *apiProxy) getObjectKey(name string) types.NamespacedName { +var _ APIProxy = &apiProxy[OwningResource]{} + +func (c *apiProxy[T]) Resource() T { + return c.resource +} + +func (c *apiProxy[T]) APIProxy() APIProxy { + var r OwningResource = c.resource + p := NewAPIProxy(r, c.client, c.recorder, c.scheme) + return &p +} + +func (c *apiProxy[T]) getObjectKey(name string) types.NamespacedName { return types.NamespacedName{ Name: name, - Namespace: c.object.GetNamespace(), + Namespace: c.resource.GetNamespace(), } } -func (c *apiProxy) Client() client.Client { +func (c *apiProxy[T]) Client() client.Client { return c.client } -func (c *apiProxy) FetchObject(ctx context.Context, name string, obj client.Object) error { +func (c *apiProxy[T]) FetchObject(ctx context.Context, name string, obj client.Object) error { err := c.client.Get(ctx, c.getObjectKey(name), obj) if err == nil || !apierrors.IsNotFound(err) { return err @@ -73,28 +100,28 @@ func (c *apiProxy) FetchObject(ctx context.Context, name string, obj client.Obje return nil } -func (c *apiProxy) ListObjects(ctx context.Context, objList client.ObjectList, opts ...client.ListOption) error { +func (c *apiProxy[T]) ListObjects(ctx context.Context, objList client.ObjectList, opts ...client.ListOption) error { err := c.client.List(ctx, objList, opts...) return err } -func (c *apiProxy) RecordWarning(reason, message string) { +func (c *apiProxy[T]) RecordWarning(reason, message string) { c.recorder.Event( - c.object, + c.resource, corev1.EventTypeWarning, reason, message) } -func (c *apiProxy) RecordNormal(reason, message string) { +func (c *apiProxy[T]) RecordNormal(reason, message string) { c.recorder.Event( - c.object, + c.resource, corev1.EventTypeNormal, reason, message) } -func (c *apiProxy) SyncObject(ctx context.Context, oldObj, newObj client.Object) error { +func (c *apiProxy[T]) SyncObject(ctx context.Context, oldObj, newObj client.Object) error { var err error if newObj.GetName() == "" { return fmt.Errorf("cannot sync uninitialized object, object type %T", oldObj) @@ -109,7 +136,7 @@ func (c *apiProxy) SyncObject(ctx context.Context, oldObj, newObj client.Object) return err } -func (c *apiProxy) DeleteObject(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { +func (c *apiProxy[T]) DeleteObject(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { logger := log.FromContext(ctx) if err := c.client.Delete(ctx, obj, opts...); err != nil { @@ -128,10 +155,10 @@ func (c *apiProxy) DeleteObject(ctx context.Context, obj client.Object, opts ... return nil } -func (c *apiProxy) updateObject(ctx context.Context, obj client.Object) error { +func (c *apiProxy[T]) updateObject(ctx context.Context, obj client.Object) error { logger := log.FromContext(ctx) - if err := ctrl.SetControllerReference(c.object, obj, c.scheme); err != nil { + if err := ctrl.SetControllerReference(c.resource, obj, c.scheme); err != nil { logger.Error(err, "unable to set controller reference", "object_name", obj.GetName()) return err } @@ -152,10 +179,10 @@ func (c *apiProxy) updateObject(ctx context.Context, obj client.Object) error { return nil } -func (c *apiProxy) createAndReferenceObject(ctx context.Context, obj client.Object) error { +func (c *apiProxy[T]) createAndReferenceObject(ctx context.Context, obj client.Object) error { logger := log.FromContext(ctx) - if err := ctrl.SetControllerReference(c.object, obj, c.scheme); err != nil { + if err := ctrl.SetControllerReference(c.resource, obj, c.scheme); err != nil { logger.Error(err, "unable to set controller reference", "object_name", obj.GetName()) return err } @@ -175,6 +202,6 @@ func (c *apiProxy) createAndReferenceObject(ctx context.Context, obj client.Obje return nil } -func (c *apiProxy) UpdateStatus(ctx context.Context) error { - return c.client.Status().Update(ctx, c.object) +func (c *apiProxy[T]) UpdateStatus(ctx context.Context) error { + return c.client.Status().Update(ctx, c.resource) } diff --git a/pkg/apiproxy/spyt.go b/pkg/apiproxy/spyt.go index 3ec5588a..e8502acd 100644 --- a/pkg/apiproxy/spyt.go +++ b/pkg/apiproxy/spyt.go @@ -13,45 +13,37 @@ import ( ) type Spyt struct { - apiProxy APIProxy - spyt *ytv1.Spyt + apiProxy[*ytv1.Spyt] } +var _ TypedAPIProxy[*ytv1.Spyt] = &Spyt{} + func NewSpyt( spyt *ytv1.Spyt, client client.Client, recorder record.EventRecorder, scheme *runtime.Scheme) *Spyt { return &Spyt{ - spyt: spyt, apiProxy: NewAPIProxy(spyt, client, recorder, scheme), } } -func (c *Spyt) GetResource() *ytv1.Spyt { - return c.spyt -} - -func (c *Spyt) APIProxy() APIProxy { - return c.apiProxy -} - func (c *Spyt) SetStatusCondition(condition metav1.Condition) { - meta.SetStatusCondition(&c.spyt.Status.Conditions, condition) + meta.SetStatusCondition(&c.resource.Status.Conditions, condition) } func (c *Spyt) IsStatusConditionTrue(conditionType string) bool { - return meta.IsStatusConditionTrue(c.spyt.Status.Conditions, conditionType) + return meta.IsStatusConditionTrue(c.resource.Status.Conditions, conditionType) } func (c *Spyt) IsStatusConditionFalse(conditionType string) bool { - return meta.IsStatusConditionFalse(c.spyt.Status.Conditions, conditionType) + return meta.IsStatusConditionFalse(c.resource.Status.Conditions, conditionType) } func (c *Spyt) SaveReleaseStatus(ctx context.Context, releaseStatus ytv1.SpytReleaseStatus) error { logger := log.FromContext(ctx) - c.GetResource().Status.ReleaseStatus = releaseStatus - if err := c.apiProxy.UpdateStatus(ctx); err != nil { + c.Resource().Status.ReleaseStatus = releaseStatus + if err := c.UpdateStatus(ctx); err != nil { logger.Error(err, "unable to update Spyt release status") return err } diff --git a/pkg/apiproxy/ytsaurus.go b/pkg/apiproxy/ytsaurus.go index 00dab182..12820108 100644 --- a/pkg/apiproxy/ytsaurus.go +++ b/pkg/apiproxy/ytsaurus.go @@ -17,35 +17,31 @@ import ( ) type Ytsaurus struct { - apiProxy APIProxy - ytsaurus *ytv1.Ytsaurus + apiProxy[*ytv1.Ytsaurus] } +var _ TypedAPIProxy[*ytv1.Ytsaurus] = &Ytsaurus{} + func NewYtsaurus( ytsaurus *ytv1.Ytsaurus, client client.Client, recorder record.EventRecorder, scheme *runtime.Scheme) *Ytsaurus { return &Ytsaurus{ - ytsaurus: ytsaurus, apiProxy: NewAPIProxy(ytsaurus, client, recorder, scheme), } } -func (c *Ytsaurus) APIProxy() APIProxy { - return c.apiProxy -} - -func (c *Ytsaurus) GetResource() *ytv1.Ytsaurus { - return c.ytsaurus +func (c *Ytsaurus) Spec() *ytv1.YtsaurusSpec { + return &c.resource.Spec } func (c *Ytsaurus) GetCommonSpec() ytv1.CommonSpec { - return c.GetResource().Spec.CommonSpec + return c.resource.Spec.CommonSpec } func (c *Ytsaurus) GetClusterState() ytv1.ClusterState { - return c.ytsaurus.Status.State + return c.resource.Status.State } func (c *Ytsaurus) IsUpdating() bool { @@ -53,34 +49,34 @@ func (c *Ytsaurus) IsUpdating() bool { } func (c *Ytsaurus) GetUpdateState() ytv1.UpdateState { - return c.ytsaurus.Status.UpdateStatus.State + return c.resource.Status.UpdateStatus.State } func (c *Ytsaurus) GetLocalUpdatingComponents() []string { - return c.ytsaurus.Status.UpdateStatus.Components + return c.resource.Status.UpdateStatus.Components } func (c *Ytsaurus) GetUpdateFlow() ytv1.UpdateFlow { - return c.ytsaurus.Status.UpdateStatus.Flow + return c.resource.Status.UpdateStatus.Flow } func (c *Ytsaurus) IsUpdateStatusConditionTrue(condition string) bool { - return meta.IsStatusConditionTrue(c.ytsaurus.Status.UpdateStatus.Conditions, condition) + return meta.IsStatusConditionTrue(c.resource.Status.UpdateStatus.Conditions, condition) } func (c *Ytsaurus) SetUpdateStatusCondition(ctx context.Context, condition metav1.Condition) { logger := log.FromContext(ctx) logger.Info("Setting update status condition", "condition", condition) - meta.SetStatusCondition(&c.ytsaurus.Status.UpdateStatus.Conditions, condition) - sortConditions(c.ytsaurus.Status.UpdateStatus.Conditions) + meta.SetStatusCondition(&c.resource.Status.UpdateStatus.Conditions, condition) + sortConditions(c.resource.Status.UpdateStatus.Conditions) } func (c *Ytsaurus) ClearUpdateStatus(ctx context.Context) error { - c.ytsaurus.Status.UpdateStatus.Conditions = make([]metav1.Condition, 0) - c.ytsaurus.Status.UpdateStatus.TabletCellBundles = make([]ytv1.TabletCellBundleInfo, 0) - c.ytsaurus.Status.UpdateStatus.MasterMonitoringPaths = make([]string, 0) - c.ytsaurus.Status.UpdateStatus.Components = nil - c.ytsaurus.Status.UpdateStatus.Flow = ytv1.UpdateFlowNone + c.resource.Status.UpdateStatus.Conditions = make([]metav1.Condition, 0) + c.resource.Status.UpdateStatus.TabletCellBundles = make([]ytv1.TabletCellBundleInfo, 0) + c.resource.Status.UpdateStatus.MasterMonitoringPaths = make([]string, 0) + c.resource.Status.UpdateStatus.Components = nil + c.resource.Status.UpdateStatus.Flow = ytv1.UpdateFlowNone return c.apiProxy.UpdateStatus(ctx) } @@ -92,11 +88,11 @@ func (c *Ytsaurus) LogUpdate(ctx context.Context, message string) { func (c *Ytsaurus) SaveUpdatingClusterState(ctx context.Context, flow ytv1.UpdateFlow, components []string) error { logger := log.FromContext(ctx) - c.ytsaurus.Status.State = ytv1.ClusterStateUpdating - c.ytsaurus.Status.UpdateStatus.Flow = flow - c.ytsaurus.Status.UpdateStatus.Components = components + c.resource.Status.State = ytv1.ClusterStateUpdating + c.resource.Status.UpdateStatus.Flow = flow + c.resource.Status.UpdateStatus.Components = components - if err := c.apiProxy.UpdateStatus(ctx); err != nil { + if err := c.UpdateStatus(ctx); err != nil { logger.Error(err, "unable to update Ytsaurus cluster status") return err } @@ -106,7 +102,7 @@ func (c *Ytsaurus) SaveUpdatingClusterState(ctx context.Context, flow ytv1.Updat func (c *Ytsaurus) SaveClusterState(ctx context.Context, clusterState ytv1.ClusterState) error { logger := log.FromContext(ctx) - c.ytsaurus.Status.State = clusterState + c.resource.Status.State = clusterState if err := c.apiProxy.UpdateStatus(ctx); err != nil { logger.Error(err, "unable to update Ytsaurus cluster status") return err @@ -118,16 +114,16 @@ func (c *Ytsaurus) SaveClusterState(ctx context.Context, clusterState ytv1.Clust // SyncObservedGeneration confirms that current generation was observed. // Returns true if generation actually has been changed and status must be saved. func (c *Ytsaurus) SyncObservedGeneration() bool { - if c.ytsaurus.Status.ObservedGeneration == c.ytsaurus.Generation { + if c.resource.Status.ObservedGeneration == c.resource.Generation { return false } - c.ytsaurus.Status.ObservedGeneration = c.ytsaurus.Generation + c.resource.Status.ObservedGeneration = c.resource.Generation return true } func (c *Ytsaurus) SaveUpdateState(ctx context.Context, updateState ytv1.UpdateState) error { logger := log.FromContext(ctx) - c.ytsaurus.Status.UpdateStatus.State = updateState + c.resource.Status.UpdateStatus.State = updateState if err := c.apiProxy.UpdateStatus(ctx); err != nil { logger.Error(err, "unable to update Ytsaurus update state") return err @@ -136,16 +132,16 @@ func (c *Ytsaurus) SaveUpdateState(ctx context.Context, updateState ytv1.UpdateS } func (c *Ytsaurus) SetStatusCondition(condition metav1.Condition) { - meta.SetStatusCondition(&c.ytsaurus.Status.Conditions, condition) - sortConditions(c.ytsaurus.Status.Conditions) + meta.SetStatusCondition(&c.resource.Status.Conditions, condition) + sortConditions(c.resource.Status.Conditions) } func (c *Ytsaurus) IsStatusConditionTrue(conditionType string) bool { - return meta.IsStatusConditionTrue(c.ytsaurus.Status.Conditions, conditionType) + return meta.IsStatusConditionTrue(c.resource.Status.Conditions, conditionType) } func (c *Ytsaurus) IsStatusConditionFalse(conditionType string) bool { - return meta.IsStatusConditionFalse(c.ytsaurus.Status.Conditions, conditionType) + return meta.IsStatusConditionFalse(c.resource.Status.Conditions, conditionType) } func sortConditions(conditions []metav1.Condition) { diff --git a/pkg/components/chyt.go b/pkg/components/chyt.go index b9aa99bd..534f83fb 100644 --- a/pkg/components/chyt.go +++ b/pkg/components/chyt.go @@ -17,10 +17,10 @@ import ( ) type Chyt struct { - labeller *labeller.Labeller - chyt *apiproxy.Chyt - cfgen *ytconfig.Generator - ytsaurus *ytv1.Ytsaurus + localComponent + + chyt *apiproxy.Chyt + cfgen *ytconfig.Generator secret *resources.StringSecret @@ -29,23 +29,25 @@ type Chyt struct { initChPublicJob *InitJob } +var _ LocalComponent = &Chyt{} + func NewChyt( cfgen *ytconfig.Generator, chyt *apiproxy.Chyt, - ytsaurus *ytv1.Ytsaurus) *Chyt { + ytsaurusApi *apiproxy.Ytsaurus) *Chyt { + ytsaurus := ytsaurusApi.Resource() + l := labeller.Labeller{ - ObjectMeta: &chyt.GetResource().ObjectMeta, - APIProxy: chyt.APIProxy(), + ObjectMeta: &chyt.Resource().ObjectMeta, ComponentType: consts.ChytType, - ComponentNamePart: chyt.GetResource().Name, + ComponentNamePart: chyt.Resource().Name, Annotations: ytsaurus.Spec.ExtraPodAnnotations, } return &Chyt{ - labeller: &l, - chyt: chyt, - cfgen: cfgen, - ytsaurus: ytsaurus, + localComponent: newLocalComponent(&l, ytsaurusApi), + chyt: chyt, + cfgen: cfgen, initUser: NewInitJob( &l, chyt.APIProxy(), @@ -65,7 +67,7 @@ func NewChyt( ytsaurus.Spec.ImagePullSecrets, "release", consts.ClientConfigFileName, - chyt.GetResource().Spec.Image, + chyt.Resource().Spec.Image, cfgen.GetNativeClientConfig, ytsaurus.Spec.Tolerations, ytsaurus.Spec.NodeSelector, @@ -77,7 +79,7 @@ func NewChyt( ytsaurus.Spec.ImagePullSecrets, "ch-public", consts.ClientConfigFileName, - chyt.GetResource().Spec.Image, + chyt.Resource().Spec.Image, cfgen.GetNativeClientConfig, ytsaurus.Spec.Tolerations, ytsaurus.Spec.NodeSelector, @@ -103,7 +105,7 @@ func (c *Chyt) createInitUserScript() string { func (c *Chyt) createInitScript() string { script := "/setup_cluster_for_chyt.sh" - if c.chyt.GetResource().Spec.MakeDefault { + if c.chyt.Resource().Spec.MakeDefault { script += " --make-default" } @@ -135,10 +137,10 @@ func (c *Chyt) prepareChPublicJob() { container.EnvFrom = []corev1.EnvFromSource{c.secret.GetEnvSource()} } -func (c *Chyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (c *Chyt) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if c.ytsaurus.Status.State != ytv1.ClusterStateRunning { + if c.ytsaurus.GetClusterState() != ytv1.ClusterStateRunning { return WaitingStatus(SyncStatusBlocked, "ytsaurus running"), err } @@ -151,7 +153,7 @@ func (c *Chyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { } err = c.secret.Sync(ctx) } - c.chyt.GetResource().Status.ReleaseStatus = ytv1.ChytReleaseStatusCreatingUserSecret + c.chyt.Resource().Status.ReleaseStatus = ytv1.ChytReleaseStatusCreatingUserSecret return WaitingStatus(SyncStatusPending, c.secret.Name()), err } @@ -161,7 +163,7 @@ func (c *Chyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { status, err := c.initUser.Sync(ctx, dry) if status.SyncStatus != SyncStatusReady { - c.chyt.GetResource().Status.ReleaseStatus = ytv1.ChytReleaseStatusCreatingUser + c.chyt.Resource().Status.ReleaseStatus = ytv1.ChytReleaseStatusCreatingUser return status, err } @@ -184,23 +186,23 @@ func (c *Chyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { status, err = c.initEnvironment.Sync(ctx, dry) if err != nil || status.SyncStatus != SyncStatusReady { - c.chyt.GetResource().Status.ReleaseStatus = ytv1.ChytReleaseStatusUploadingIntoCypress + c.chyt.Resource().Status.ReleaseStatus = ytv1.ChytReleaseStatusUploadingIntoCypress return status, err } - createPublicClique := c.chyt.GetResource().Spec.CreatePublicClique - if c.ytsaurus.Spec.StrawberryController != nil && createPublicClique != nil && *createPublicClique { + createPublicClique := c.chyt.Resource().Spec.CreatePublicClique + if c.ytsaurus.Resource().Spec.StrawberryController != nil && createPublicClique != nil && *createPublicClique { if !dry { c.prepareChPublicJob() } status, err = c.initChPublicJob.Sync(ctx, dry) if err != nil || status.SyncStatus != SyncStatusReady { - c.chyt.GetResource().Status.ReleaseStatus = ytv1.ChytReleaseStatusCreatingChPublicClique + c.chyt.Resource().Status.ReleaseStatus = ytv1.ChytReleaseStatusCreatingChPublicClique return status, err } } - c.chyt.GetResource().Status.ReleaseStatus = ytv1.ChytReleaseStatusFinished + c.chyt.Resource().Status.ReleaseStatus = ytv1.ChytReleaseStatusFinished return SimpleStatus(SyncStatusReady), err } @@ -213,17 +215,3 @@ func (c *Chyt) Fetch(ctx context.Context) error { c.secret, ) } - -func (c *Chyt) Status(ctx context.Context) ComponentStatus { - status, err := c.doSync(ctx, true) - if err != nil { - panic(err) - } - - return status -} - -func (c *Chyt) Sync(ctx context.Context) error { - _, err := c.doSync(ctx, false) - return err -} diff --git a/pkg/components/component.go b/pkg/components/component.go index 0595ebeb..2b520cdc 100644 --- a/pkg/components/component.go +++ b/pkg/components/component.go @@ -21,15 +21,15 @@ const ( SyncStatusUpdating SyncStatus = "Updating" ) -func IsRunningStatus(status SyncStatus) bool { - return status == SyncStatusReady || status == SyncStatusNeedLocalUpdate -} - type ComponentStatus struct { SyncStatus SyncStatus Message string } +func (s ComponentStatus) IsRunning() bool { + return s.SyncStatus == SyncStatusReady || s.SyncStatus == SyncStatusNeedLocalUpdate +} + func NewComponentStatus(status SyncStatus, message string) ComponentStatus { return ComponentStatus{status, message} } @@ -43,44 +43,92 @@ func SimpleStatus(status SyncStatus) ComponentStatus { } type Component interface { - Fetch(ctx context.Context) error - Sync(ctx context.Context) error - Status(ctx context.Context) (ComponentStatus, error) - GetName() string GetType() consts.ComponentType - SetReadyCondition(status ComponentStatus) - // TODO(nadya73): refactor it - IsUpdatable() bool + // GetName returns component's name, which is used as an identifier in component management + // and for mentioning in logs. For example: "Master", "DataNode". + GetName() string + + Fetch(ctx context.Context) error + Sync(ctx context.Context, dry bool) (ComponentStatus, error) + + getAPIProxy() apiproxy.APIProxy + getLabeller() *labeller.Labeller +} + +type LocalComponent interface { + Component + + getYtsaurus() *apiproxy.Ytsaurus +} + +type LocalServerComponent interface { + LocalComponent + + getServer() server +} + +type RemoteServerComponent interface { + Component } // Following structs are used as a base for implementing YTsaurus components objects. -// baseComponent is a base struct intended for use in the simplest components and remote components -// (the ones that don't have access to the ytsaurus resource). type baseComponent struct { + apiProxy apiproxy.APIProxy labeller *labeller.Labeller } -// GetName returns component's name, which is used as an identifier in component management -// and for mentioning in logs. -// For example for master component name is "Master", -// For data node name looks like "DataNode". +func (c *baseComponent) GetType() consts.ComponentType { + return c.labeller.ComponentType +} + func (c *baseComponent) GetName() string { return c.labeller.GetFullComponentName() } +func (c *baseComponent) getAPIProxy() apiproxy.APIProxy { + return c.apiProxy +} + +func (c *baseComponent) getLabeller() *labeller.Labeller { + return c.labeller +} + +func newBaseComponent( + apiProxy apiproxy.APIProxy, + labeller *labeller.Labeller, +) baseComponent { + return baseComponent{ + apiProxy: apiProxy, + labeller: labeller, + } +} + // localComponent is a base structs for components which have access to ytsaurus resource, // but don't depend on server. Example: UI, Strawberry. type localComponent struct { - baseComponent ytsaurus *apiproxy.Ytsaurus + labeller *labeller.Labeller } -// localServerComponent is a base structs for components which have access to ytsaurus resource, -// and use server. Almost all components are based on this struct. -type localServerComponent struct { - localComponent - server server +func (c *localComponent) GetType() consts.ComponentType { + return c.labeller.ComponentType +} + +func (c *localComponent) GetName() string { + return c.labeller.GetFullComponentName() +} + +func (c *localComponent) getAPIProxy() apiproxy.APIProxy { + return c.ytsaurus.APIProxy() +} + +func (c *localComponent) getLabeller() *labeller.Labeller { + return c.labeller +} + +func (c *localComponent) getYtsaurus() *apiproxy.Ytsaurus { + return c.ytsaurus } func newLocalComponent( @@ -88,22 +136,20 @@ func newLocalComponent( ytsaurus *apiproxy.Ytsaurus, ) localComponent { return localComponent{ - baseComponent: baseComponent{labeller: labeller}, - ytsaurus: ytsaurus, + ytsaurus: ytsaurus, + labeller: labeller, } } -func (c *localComponent) SetReadyCondition(status ComponentStatus) { - ready := metav1.ConditionFalse - if status.SyncStatus == SyncStatusReady { - ready = metav1.ConditionTrue - } - c.ytsaurus.SetStatusCondition(metav1.Condition{ - Type: fmt.Sprintf("%sReady", c.labeller.GetFullComponentName()), - Status: ready, - Reason: string(status.SyncStatus), - Message: status.Message, - }) +// localServerComponent is a base structs for components which have access to ytsaurus resource, +// and use server. Almost all components are based on this struct. +type localServerComponent struct { + localComponent + server server +} + +func (c *localServerComponent) getServer() server { + return c.server } func newLocalServerComponent( @@ -112,21 +158,41 @@ func newLocalServerComponent( server server, ) localServerComponent { return localServerComponent{ - localComponent: localComponent{ - baseComponent: baseComponent{ - labeller: labeller, - }, - ytsaurus: ytsaurus, - }, - server: server, + localComponent: newLocalComponent(labeller, ytsaurus), + server: server, + } +} + +type remoteServerComponent struct { + baseComponent + server server +} + +func newRemoteServerComponent( + apiProxy apiproxy.APIProxy, + labeller *labeller.Labeller, + server server, +) remoteServerComponent { + return remoteServerComponent{ + baseComponent: newBaseComponent(apiProxy, labeller), + server: server, } } -func (c *localServerComponent) NeedSync() bool { - return LocalServerNeedSync(c.server, c.ytsaurus) +func ServerNeedSync(s server, ytsaurus *apiproxy.Ytsaurus) bool { + // FIXME(khlebnikov): Explain this logic and move into upper layer. + return (s.configNeedsReload() && ytsaurus.IsUpdating()) || s.needBuild() } -func LocalServerNeedSync(srv server, ytsaurus *apiproxy.Ytsaurus) bool { - return (srv.configNeedsReload() && ytsaurus.IsUpdating()) || - srv.needBuild() +func GetReadyCondition(component Component, status ComponentStatus) metav1.Condition { + ready := metav1.ConditionFalse + if status.SyncStatus == SyncStatusReady { + ready = metav1.ConditionTrue + } + return metav1.Condition{ + Type: fmt.Sprintf("%sReady", component.GetName()), + Status: ready, + Reason: string(status.SyncStatus), + Message: status.Message, + } } diff --git a/pkg/components/controller_agent.go b/pkg/components/controller_agent.go index 20216b84..af4158ae 100644 --- a/pkg/components/controller_agent.go +++ b/pkg/components/controller_agent.go @@ -21,10 +21,9 @@ type ControllerAgent struct { } func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *ControllerAgent { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.ControllerAgentType, } @@ -55,17 +54,11 @@ func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, } } -func (ca *ControllerAgent) IsUpdatable() bool { - return true -} - -func (ca *ControllerAgent) GetType() consts.ComponentType { return consts.ControllerAgentType } - func (ca *ControllerAgent) Fetch(ctx context.Context) error { return resources.Fetch(ctx, ca.server) } -func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (ca *ControllerAgent) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(ca.ytsaurus.GetClusterState()) && ca.server.needUpdate() { @@ -73,20 +66,16 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu } if ca.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, ca, dry); status != nil { return *status, err } } - masterStatus, err := ca.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, ca.master.GetName()), err + if status, err := checkComponentDependency(ctx, ca.master); status != nil { + return *status, err } - if ca.NeedSync() { + if ServerNeedSync(ca.server, ca.ytsaurus) { if !dry { err = ca.server.Sync(ctx) } @@ -99,12 +88,3 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu return SimpleStatus(SyncStatusReady), err } - -func (ca *ControllerAgent) Status(ctx context.Context) (ComponentStatus, error) { - return ca.doSync(ctx, true) -} - -func (ca *ControllerAgent) Sync(ctx context.Context) error { - _, err := ca.doSync(ctx, false) - return err -} diff --git a/pkg/components/data_node.go b/pkg/components/data_node.go index 8eb3046f..96ace62e 100644 --- a/pkg/components/data_node.go +++ b/pkg/components/data_node.go @@ -26,10 +26,9 @@ func NewDataNode( master Component, spec ytv1.DataNodesSpec, ) *DataNode { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.DataNodeType, ComponentNamePart: spec.Name, } @@ -63,17 +62,11 @@ func NewDataNode( } } -func (n *DataNode) IsUpdatable() bool { - return true -} - -func (n *DataNode) GetType() consts.ComponentType { return consts.DataNodeType } - func (n *DataNode) Fetch(ctx context.Context) error { return resources.Fetch(ctx, n.server) } -func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (n *DataNode) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(n.ytsaurus.GetClusterState()) && n.server.needUpdate() { @@ -81,20 +74,16 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error } if n.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, n.ytsaurus, n, &n.localComponent, n.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, n, dry); status != nil { return *status, err } } - masterStatus, err := n.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err + if status, err := checkComponentDependency(ctx, n.master); status != nil { + return *status, err } - if n.NeedSync() { + if ServerNeedSync(n.server, n.ytsaurus) { if !dry { err = n.server.Sync(ctx) } @@ -107,12 +96,3 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error return SimpleStatus(SyncStatusReady), err } - -func (n *DataNode) Status(ctx context.Context) (ComponentStatus, error) { - return n.doSync(ctx, true) -} - -func (n *DataNode) Sync(ctx context.Context) error { - _, err := n.doSync(ctx, false) - return err -} diff --git a/pkg/components/data_node_remote.go b/pkg/components/data_node_remote.go index f5638df1..c6b38d5b 100644 --- a/pkg/components/data_node_remote.go +++ b/pkg/components/data_node_remote.go @@ -15,22 +15,22 @@ import ( ) type RemoteDataNode struct { - server server - cfgen *ytconfig.NodeGenerator - spec *ytv1.DataNodesSpec - baseComponent + remoteServerComponent + cfgen *ytconfig.NodeGenerator + spec *ytv1.DataNodesSpec } +var _ RemoteServerComponent = &RemoteDataNode{} + func NewRemoteDataNodes( cfgen *ytconfig.NodeGenerator, nodes *ytv1.RemoteDataNodes, - proxy apiproxy.APIProxy, + apiProxy apiproxy.APIProxy, spec ytv1.DataNodesSpec, commonSpec ytv1.CommonSpec, ) *RemoteDataNode { l := labeller.Labeller{ ObjectMeta: &nodes.ObjectMeta, - APIProxy: proxy, ComponentType: consts.DataNodeType, ComponentNamePart: spec.Name, } @@ -39,9 +39,9 @@ func NewRemoteDataNodes( spec.InstanceSpec.MonitoringPort = ptr.To(int32(consts.DataNodeMonitoringPort)) } - srv := newServerConfigured( + server := newServerConfigured( &l, - proxy, + apiProxy, commonSpec, &spec.InstanceSpec, "/usr/bin/ytserver-node", @@ -58,17 +58,16 @@ func NewRemoteDataNodes( }), ) return &RemoteDataNode{ - baseComponent: baseComponent{labeller: &l}, - server: srv, - cfgen: cfgen, - spec: &spec, + remoteServerComponent: newRemoteServerComponent(apiProxy, &l, server), + cfgen: cfgen, + spec: &spec, } } -func (n *RemoteDataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (n *RemoteDataNode) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if n.server.needSync() || n.server.needUpdate() { + if n.server.configNeedsReload() || n.server.needBuild() || n.server.needUpdate() { if !dry { err = n.server.Sync(ctx) } @@ -82,12 +81,6 @@ func (n *RemoteDataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, return SimpleStatus(SyncStatusReady), err } -func (n *RemoteDataNode) GetType() consts.ComponentType { return consts.DataNodeType } - -func (n *RemoteDataNode) Sync(ctx context.Context) (ComponentStatus, error) { - return n.doSync(ctx, false) -} - func (n *RemoteDataNode) Fetch(ctx context.Context) error { return resources.Fetch(ctx, n.server) } diff --git a/pkg/components/discovery.go b/pkg/components/discovery.go index 4b113105..b4e7153f 100644 --- a/pkg/components/discovery.go +++ b/pkg/components/discovery.go @@ -20,10 +20,9 @@ type Discovery struct { } func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Discovery { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.DiscoveryType, } @@ -55,17 +54,11 @@ func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Disco } } -func (d *Discovery) IsUpdatable() bool { - return true -} - -func (d *Discovery) GetType() consts.ComponentType { return consts.DiscoveryType } - func (d *Discovery) Fetch(ctx context.Context) error { return resources.Fetch(ctx, d.server) } -func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (d *Discovery) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(d.ytsaurus.GetClusterState()) && d.server.needUpdate() { @@ -73,12 +66,12 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } if d.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, d, dry); status != nil { return *status, err } } - if d.NeedSync() { + if ServerNeedSync(d.server, d.ytsaurus) { if !dry { err = d.server.Sync(ctx) } @@ -91,12 +84,3 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } - -func (d *Discovery) Status(ctx context.Context) (ComponentStatus, error) { - return d.doSync(ctx, true) -} - -func (d *Discovery) Sync(ctx context.Context) error { - _, err := d.doSync(ctx, false) - return err -} diff --git a/pkg/components/exec_node.go b/pkg/components/exec_node.go index df7baf1a..cae70322 100644 --- a/pkg/components/exec_node.go +++ b/pkg/components/exec_node.go @@ -14,21 +14,22 @@ import ( ) type ExecNode struct { + localServerComponent baseExecNode - localComponent master Component } +var _ LocalServerComponent = &ExecNode{} + func NewExecNode( cfgen *ytconfig.NodeGenerator, ytsaurus *apiproxy.Ytsaurus, master Component, spec ytv1.ExecNodesSpec, ) *ExecNode { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.ExecNodeType, ComponentNamePart: spec.Name, } @@ -37,7 +38,7 @@ func NewExecNode( spec.InstanceSpec.MonitoringPort = ptr.To(int32(consts.ExecNodeMonitoringPort)) } - srv := newServer( + server := newServer( &l, ytsaurus, &spec.InstanceSpec, @@ -61,7 +62,7 @@ func NewExecNode( &l, ytsaurus.APIProxy(), l.GetSidecarConfigMapName(consts.JobsContainerName), - ytsaurus.GetResource().Spec.ConfigOverrides, + ytsaurus.Resource().Spec.ConfigOverrides, map[string]ytconfig.GeneratorDescriptor{ consts.ContainerdConfigFileName: { F: func() ([]byte, error) { @@ -73,9 +74,9 @@ func NewExecNode( } return &ExecNode{ - localComponent: newLocalComponent(&l, ytsaurus), + localServerComponent: newLocalServerComponent(&l, ytsaurus, server), baseExecNode: baseExecNode{ - server: srv, + server: server, cfgen: cfgen, spec: &spec, sidecarConfig: sidecarConfig, @@ -84,49 +85,31 @@ func NewExecNode( } } -func (n *ExecNode) IsUpdatable() bool { - return true -} - -func (n *ExecNode) GetType() consts.ComponentType { return consts.ExecNodeType } - -func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (n *ExecNode) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if ytv1.IsReadyToUpdateClusterState(n.ytsaurus.GetClusterState()) && n.server.needUpdate() { + if ytv1.IsReadyToUpdateClusterState(n.ytsaurus.GetClusterState()) && n.localServerComponent.server.needUpdate() { return SimpleStatus(SyncStatusNeedLocalUpdate), err } if n.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, n.ytsaurus, n, &n.localComponent, n.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, n, dry); status != nil { return *status, err } } - masterStatus, err := n.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err + if status, err := checkComponentDependency(ctx, n.master); status != nil { + return *status, err } - if LocalServerNeedSync(n.server, n.ytsaurus) { + server := n.localServerComponent.server + if ServerNeedSync(server, n.ytsaurus) { return n.doSyncBase(ctx, dry) } - if !n.server.arePodsReady(ctx) { + if !server.arePodsReady(ctx) { return WaitingStatus(SyncStatusBlocked, "pods"), err } return SimpleStatus(SyncStatusReady), err } - -func (n *ExecNode) Status(ctx context.Context) (ComponentStatus, error) { - return n.doSync(ctx, true) -} - -func (n *ExecNode) Sync(ctx context.Context) error { - _, err := n.doSync(ctx, false) - return err -} diff --git a/pkg/components/exec_node_remote.go b/pkg/components/exec_node_remote.go index 9611cd3e..85325bf2 100644 --- a/pkg/components/exec_node_remote.go +++ b/pkg/components/exec_node_remote.go @@ -14,10 +14,12 @@ import ( ) type RemoteExecNode struct { + remoteServerComponent baseExecNode - baseComponent } +var _ RemoteServerComponent = &RemoteExecNode{} + func NewRemoteExecNodes( cfgen *ytconfig.NodeGenerator, nodes *ytv1.RemoteExecNodes, @@ -27,7 +29,6 @@ func NewRemoteExecNodes( ) *RemoteExecNode { l := labeller.Labeller{ ObjectMeta: &nodes.ObjectMeta, - APIProxy: proxy, ComponentType: consts.ExecNodeType, ComponentNamePart: spec.Name, } @@ -36,7 +37,7 @@ func NewRemoteExecNodes( spec.InstanceSpec.MonitoringPort = ptr.To(int32(consts.ExecNodeMonitoringPort)) } - srv := newServerConfigured( + server := newServerConfigured( &l, proxy, commonSpec, @@ -73,9 +74,9 @@ func NewRemoteExecNodes( } return &RemoteExecNode{ - baseComponent: baseComponent{labeller: &l}, + remoteServerComponent: newRemoteServerComponent(proxy, &l, server), baseExecNode: baseExecNode{ - server: srv, + server: server, cfgen: cfgen, spec: &spec, sidecarConfig: sidecarConfig, @@ -83,22 +84,17 @@ func NewRemoteExecNodes( } } -func (n *RemoteExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (n *RemoteExecNode) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if n.server.needSync() || n.server.needUpdate() { + server := n.remoteServerComponent.server + if server.configNeedsReload() || server.needBuild() || server.needUpdate() { return n.doSyncBase(ctx, dry) } - if !n.server.arePodsReady(ctx) { + if !server.arePodsReady(ctx) { return WaitingStatus(SyncStatusBlocked, "pods"), err } return SimpleStatus(SyncStatusReady), err } - -func (n *RemoteExecNode) GetType() consts.ComponentType { return consts.ExecNodeType } - -func (n *RemoteExecNode) Sync(ctx context.Context) (ComponentStatus, error) { - return n.doSync(ctx, false) -} diff --git a/pkg/components/helpers.go b/pkg/components/helpers.go index 755dc684..26a65d9f 100644 --- a/pkg/components/helpers.go +++ b/pkg/components/helpers.go @@ -20,6 +20,7 @@ import ( ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts" ) func CreateTabletCells(ctx context.Context, ytClient yt.Client, bundle string, tabletCellCount int) error { @@ -139,23 +140,31 @@ func CreateUser(ctx context.Context, ytClient yt.Client, userName, token string, func IsUpdatingComponent(ytsaurus *apiproxy.Ytsaurus, component Component) bool { componentNames := ytsaurus.GetLocalUpdatingComponents() - return (componentNames == nil && component.IsUpdatable()) || slices.Contains(componentNames, component.GetName()) + return (componentNames == nil && component.GetType() != consts.YtsaurusClientType) || + slices.Contains(componentNames, component.GetName()) +} + +func checkComponentDependency(ctx context.Context, dep Component) (*ComponentStatus, error) { + status, err := dep.Sync(ctx, true) + if err != nil || !status.IsRunning() { + return ptr.To(WaitingStatus(SyncStatusBlocked, dep.GetName())), err + } + return nil, nil } func handleUpdatingClusterState( ctx context.Context, - ytsaurus *apiproxy.Ytsaurus, - cmp Component, - cmpBase *localComponent, - server server, + component LocalServerComponent, dry bool, ) (*ComponentStatus, error) { var err error - if IsUpdatingComponent(ytsaurus, cmp) { + labeller := component.getLabeller() + ytsaurus := component.getYtsaurus() + if IsUpdatingComponent(ytsaurus, component) { if ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsRemoval { if !dry { - err = removePods(ctx, server, cmpBase) + err = removePods(ctx, component.getServer(), ytsaurus, labeller) } return ptr.To(WaitingStatus(SyncStatusUpdating, "pods removal")), err } @@ -166,7 +175,7 @@ func handleUpdatingClusterState( } else { return ptr.To(NewComponentStatus(SyncStatusReady, "Not updating component")), err } - return nil, err + return nil, nil } func SetPathAcl(path string, acl []yt.ACE) string { diff --git a/pkg/components/httpproxy.go b/pkg/components/httpproxy.go index 8e9e1dff..0fde6607 100644 --- a/pkg/components/httpproxy.go +++ b/pkg/components/httpproxy.go @@ -32,10 +32,9 @@ func NewHTTPProxy( ytsaurus *apiproxy.Ytsaurus, masterReconciler Component, spec ytv1.HTTPProxiesSpec) *HttpProxy { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.HttpProxyType, ComponentNamePart: spec.Role, } @@ -104,12 +103,6 @@ func NewHTTPProxy( } } -func (hp *HttpProxy) IsUpdatable() bool { - return true -} - -func (hp *HttpProxy) GetType() consts.ComponentType { return consts.HttpProxyType } - func (hp *HttpProxy) Fetch(ctx context.Context) error { return resources.Fetch(ctx, hp.server, @@ -117,7 +110,7 @@ func (hp *HttpProxy) Fetch(ctx context.Context) error { ) } -func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (hp *HttpProxy) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(hp.ytsaurus.GetClusterState()) && hp.server.needUpdate() { @@ -125,20 +118,16 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err } if hp.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, hp.ytsaurus, hp, &hp.localComponent, hp.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, hp, dry); status != nil { return *status, err } } - masterStatus, err := hp.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, hp.master.GetName()), err + if status, err := checkComponentDependency(ctx, hp.master); status != nil { + return *status, err } - if hp.NeedSync() { + if ServerNeedSync(hp.server, hp.ytsaurus) { if !dry { statefulSet := hp.server.buildStatefulSet() if hp.httpsSecret != nil { @@ -165,12 +154,3 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err return SimpleStatus(SyncStatusReady), err } - -func (hp *HttpProxy) Status(ctx context.Context) (ComponentStatus, error) { - return hp.doSync(ctx, true) -} - -func (hp *HttpProxy) Sync(ctx context.Context) error { - _, err := hp.doSync(ctx, false) - return err -} diff --git a/pkg/components/init_job.go b/pkg/components/init_job.go index 39c04735..677a5194 100644 --- a/pkg/components/init_job.go +++ b/pkg/components/init_job.go @@ -36,7 +36,6 @@ func initJobWithNativeDriverPrologue() string { type InitJob struct { baseComponent - apiProxy apiproxy.APIProxy conditionsManager apiproxy.ConditionManager imagePullSecrets []corev1.LocalObjectReference @@ -52,6 +51,8 @@ type InitJob struct { builtJob *batchv1.Job } +var _ Component = &InitJob{} + func NewInitJob( labeller *labeller.Labeller, apiProxy apiproxy.APIProxy, @@ -63,10 +64,7 @@ func NewInitJob( nodeSelector map[string]string, ) *InitJob { return &InitJob{ - baseComponent: baseComponent{ - labeller: labeller, - }, - apiProxy: apiProxy, + baseComponent: newBaseComponent(apiProxy, labeller), conditionsManager: conditionsManager, imagePullSecrets: imagePullSecrets, initCompletedCondition: fmt.Sprintf("%s%sInitJobCompleted", name, labeller.GetFullComponentName()), @@ -111,7 +109,7 @@ func (j *InitJob) Build() *batchv1.Job { var defaultMode int32 = 0o500 job := j.initJob.Build() job.Spec.Template = corev1.PodTemplateSpec{ - ObjectMeta: j.baseComponent.labeller.GetInitJobObjectMeta(), + ObjectMeta: j.labeller.GetInitJobObjectMeta(), Spec: corev1.PodSpec{ ImagePullSecrets: j.imagePullSecrets, Containers: []corev1.Container{ diff --git a/pkg/components/init_job_test.go b/pkg/components/init_job_test.go index 3ec20e55..c322b328 100644 --- a/pkg/components/init_job_test.go +++ b/pkg/components/init_job_test.go @@ -51,7 +51,7 @@ func prepareTest(t *testing.T, namespace string) (*testutil.TestHelper, *apiprox fakeRecorder := record.NewFakeRecorder(100) ytsaurus := apiproxy.NewYtsaurus(&ytsaurusResource, h.GetK8sClient(), fakeRecorder, scheme) - cfgen := ytconfig.NewGenerator(ytsaurus.GetResource(), domain) + cfgen := ytconfig.NewGenerator(ytsaurus.Resource(), domain) return h, ytsaurus, cfgen } @@ -78,7 +78,7 @@ func newTestJob(ytsaurus *apiproxy.Ytsaurus) *InitJob { &labeller.Labeller{ ObjectMeta: &metav1.ObjectMeta{ Name: k8sName, - Namespace: ytsaurus.GetResource().Namespace, + Namespace: ytsaurus.Resource().Namespace, }, ComponentType: consts.MasterType, }, diff --git a/pkg/components/master.go b/pkg/components/master.go index 30d11fee..87f5bb56 100644 --- a/pkg/components/master.go +++ b/pkg/components/master.go @@ -34,10 +34,9 @@ type Master struct { } func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.MasterType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -96,17 +95,11 @@ func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master { } } -func (m *Master) GetType() consts.ComponentType { return consts.MasterType } - -func (m *Master) IsUpdatable() bool { - return true -} - func (m *Master) Fetch(ctx context.Context) error { - if m.ytsaurus.GetResource().Spec.AdminCredentials != nil { - err := m.ytsaurus.APIProxy().FetchObject( + if m.ytsaurus.Resource().Spec.AdminCredentials != nil { + err := m.ytsaurus.FetchObject( ctx, - m.ytsaurus.GetResource().Spec.AdminCredentials.Name, + m.ytsaurus.Spec().AdminCredentials.Name, &m.adminCredentials) if err != nil { return err @@ -151,7 +144,7 @@ type Medium struct { func (m *Master) getExtraMedia() []Medium { mediaMap := make(map[string]Medium) - for _, d := range m.ytsaurus.GetResource().Spec.DataNodes { + for _, d := range m.ytsaurus.Resource().Spec.DataNodes { for _, l := range d.Locations { if l.Medium == consts.DefaultMedium { continue @@ -291,7 +284,7 @@ func (m *Master) createExitReadOnlyScript() string { return strings.Join(script, "\n") } -func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (m *Master) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(m.ytsaurus.GetClusterState()) && m.server.needUpdate() { @@ -303,12 +296,12 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error) st, err := m.exitReadOnly(ctx, dry) return *st, err } - if status, err := handleUpdatingClusterState(ctx, m.ytsaurus, m, &m.localComponent, m.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, m, dry); status != nil { return *status, err } } - if m.NeedSync() { + if ServerNeedSync(m.server, m.ytsaurus) { if !dry { err = m.doServerSync(ctx) } @@ -326,19 +319,10 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error) return m.initJob.Sync(ctx, dry) } -func (m *Master) Status(ctx context.Context) (ComponentStatus, error) { - return m.doSync(ctx, true) -} - -func (m *Master) Sync(ctx context.Context) error { - _, err := m.doSync(ctx, false) - return err -} - func (m *Master) doServerSync(ctx context.Context) error { statefulSet := m.server.buildStatefulSet() podSpec := &statefulSet.Spec.Template.Spec - primaryMastersSpec := m.ytsaurus.GetResource().Spec.PrimaryMasters + primaryMastersSpec := m.ytsaurus.Resource().Spec.PrimaryMasters if err := AddSidecarsToPodSpec(primaryMastersSpec.Sidecars, podSpec); err != nil { return err @@ -351,7 +335,7 @@ func (m *Master) doServerSync(ctx context.Context) error { } func (m *Master) getHostAddressLabel() string { - primaryMastersSpec := m.ytsaurus.GetResource().Spec.PrimaryMasters + primaryMastersSpec := m.ytsaurus.Resource().Spec.PrimaryMasters if primaryMastersSpec.HostAddressLabel != "" { return primaryMastersSpec.HostAddressLabel } diff --git a/pkg/components/master_caches.go b/pkg/components/master_caches.go index 48101b25..154c2d7f 100644 --- a/pkg/components/master_caches.go +++ b/pkg/components/master_caches.go @@ -21,10 +21,9 @@ type MasterCache struct { } func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *MasterCache { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.MasterCacheType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -55,17 +54,11 @@ func NewMasterCache(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Mas } } -func (mc *MasterCache) IsUpdatable() bool { - return true -} - -func (mc *MasterCache) GetType() consts.ComponentType { return consts.MasterCacheType } - func (mc *MasterCache) Fetch(ctx context.Context) error { return resources.Fetch(ctx, mc.server) } -func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (mc *MasterCache) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(mc.ytsaurus.GetClusterState()) && mc.server.needUpdate() { @@ -73,12 +66,12 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e } if mc.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, mc.ytsaurus, mc, &mc.localComponent, mc.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, mc, dry); status != nil { return *status, err } } - if mc.NeedSync() { + if ServerNeedSync(mc.server, mc.ytsaurus) { if !dry { err = mc.doServerSync(ctx) } @@ -92,18 +85,9 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e return SimpleStatus(SyncStatusReady), err } -func (mc *MasterCache) Status(ctx context.Context) (ComponentStatus, error) { - return mc.doSync(ctx, true) -} - -func (mc *MasterCache) Sync(ctx context.Context) error { - _, err := mc.doSync(ctx, false) - return err -} - func (mc *MasterCache) doServerSync(ctx context.Context) error { statefulSet := mc.server.buildStatefulSet() - masterCachesSpec := mc.ytsaurus.GetResource().Spec.MasterCaches + masterCachesSpec := mc.ytsaurus.Resource().Spec.MasterCaches if len(masterCachesSpec.HostAddresses) != 0 { AddAffinity(statefulSet, mc.getHostAddressLabel(), masterCachesSpec.HostAddresses) } @@ -112,7 +96,7 @@ func (mc *MasterCache) doServerSync(ctx context.Context) error { } func (mc *MasterCache) getHostAddressLabel() string { - masterCachesSpec := mc.ytsaurus.GetResource().Spec.MasterCaches + masterCachesSpec := mc.ytsaurus.Resource().Spec.MasterCaches if masterCachesSpec.HostAddressLabel != "" { return masterCachesSpec.HostAddressLabel } diff --git a/pkg/components/microservice.go b/pkg/components/microservice.go index 45ac6c34..f9989513 100644 --- a/pkg/components/microservice.go +++ b/pkg/components/microservice.go @@ -79,7 +79,7 @@ func newMicroservice( labeller, ytsaurus.APIProxy(), labeller.GetMainConfigMapName(), - ytsaurus.GetResource().Spec.ConfigOverrides, + ytsaurus.Resource().Spec.ConfigOverrides, generators), } } diff --git a/pkg/components/pods_manager.go b/pkg/components/pods_manager.go index b1be835d..c0381919 100644 --- a/pkg/components/pods_manager.go +++ b/pkg/components/pods_manager.go @@ -5,6 +5,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/labeller" ) @@ -16,13 +17,18 @@ type podsManager interface { podsImageCorrespondsToSpec() bool } -func removePods(ctx context.Context, manager podsManager, c *localComponent) error { - if !isPodsRemovingStarted(c) { +func removePods(ctx context.Context, manager podsManager, conditionsManager apiproxy.UpdateConditionManager, labeller *labeller.Labeller) error { + started := labeller.GetPodsRemovingStartedCondition() + if !conditionsManager.IsUpdateStatusConditionTrue(started) { if err := manager.removePods(ctx); err != nil { return err } - - setPodsRemovingStartedCondition(ctx, c) + conditionsManager.SetUpdateStatusCondition(ctx, metav1.Condition{ + Type: started, + Status: metav1.ConditionTrue, + Reason: "Update", + Message: "Pods removing was started", + }) return nil } @@ -30,28 +36,11 @@ func removePods(ctx context.Context, manager podsManager, c *localComponent) err return nil } - setPodsRemovedCondition(ctx, c) - return nil -} - -func isPodsRemovingStarted(c *localComponent) bool { - return c.ytsaurus.IsUpdateStatusConditionTrue(c.labeller.GetPodsRemovingStartedCondition()) -} - -func setPodsRemovingStartedCondition(ctx context.Context, c *localComponent) { - c.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ - Type: c.labeller.GetPodsRemovingStartedCondition(), - Status: metav1.ConditionTrue, - Reason: "Update", - Message: "Pods removing was started", - }) -} - -func setPodsRemovedCondition(ctx context.Context, c *localComponent) { - c.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ - Type: labeller.GetPodsRemovedCondition(c.GetName()), + conditionsManager.SetUpdateStatusCondition(ctx, metav1.Condition{ + Type: labeller.GetPodsRemovedCondition(), Status: metav1.ConditionTrue, Reason: "Update", Message: "Pods removed", }) + return nil } diff --git a/pkg/components/query_tracker.go b/pkg/components/query_tracker.go index dde8f959..79c7e1d7 100644 --- a/pkg/components/query_tracker.go +++ b/pkg/components/query_tracker.go @@ -38,10 +38,9 @@ func NewQueryTracker( yc internalYtsaurusClient, tabletNodes []Component, ) *QueryTracker { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.QueryTrackerType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -91,12 +90,6 @@ func NewQueryTracker( } } -func (qt *QueryTracker) IsUpdatable() bool { - return true -} - -func (qt *QueryTracker) GetType() consts.ComponentType { return consts.QueryTrackerType } - func (qt *QueryTracker) Fetch(ctx context.Context) error { return resources.Fetch(ctx, qt.server, @@ -105,7 +98,7 @@ func (qt *QueryTracker) Fetch(ctx context.Context) error { ) } -func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (qt *QueryTracker) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(qt.ytsaurus.GetClusterState()) && qt.server.needUpdate() { @@ -116,7 +109,7 @@ func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus, if IsUpdatingComponent(qt.ytsaurus, qt) { if qt.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsRemoval && IsUpdatingComponent(qt.ytsaurus, qt) { if !dry { - err = removePods(ctx, qt.server, &qt.localComponent) + err = removePods(ctx, qt.server, qt.ytsaurus, qt.labeller) } return WaitingStatus(SyncStatusUpdating, "pods removal"), err } @@ -144,7 +137,7 @@ func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus, return WaitingStatus(SyncStatusPending, qt.secret.Name()), err } - if qt.NeedSync() { + if ServerNeedSync(qt.server, qt.ytsaurus) { if !dry { err = qt.server.Sync(ctx) } @@ -162,12 +155,8 @@ func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus, } for _, tnd := range qt.tabletNodes { - tndStatus, err := tnd.Status(ctx) - if err != nil { - return tndStatus, err - } - if !IsRunningStatus(tndStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, "tablet nodes"), err + if status, err := checkComponentDependency(ctx, tnd); status != nil { + return *status, err } } @@ -407,15 +396,6 @@ func (qt *QueryTracker) init(ctx context.Context, ytClient yt.Client) (err error return } -func (qt *QueryTracker) Status(ctx context.Context) (ComponentStatus, error) { - return qt.doSync(ctx, true) -} - -func (qt *QueryTracker) Sync(ctx context.Context) error { - _, err := qt.doSync(ctx, false) - return err -} - func (qt *QueryTracker) prepareInitQueryTrackerState() { path := "/usr/bin/init_query_tracker_state" diff --git a/pkg/components/queue_agent.go b/pkg/components/queue_agent.go index 00cbdfd0..aa179978 100644 --- a/pkg/components/queue_agent.go +++ b/pkg/components/queue_agent.go @@ -40,10 +40,9 @@ func NewQueueAgent( master Component, tabletNodes []Component, ) *QueueAgent { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.QueueAgentType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -94,10 +93,6 @@ func NewQueueAgent( } } -func (qa *QueueAgent) IsUpdatable() bool { - return true -} - func (qa *QueueAgent) Fetch(ctx context.Context) error { return resources.Fetch(ctx, qa.server, @@ -106,9 +101,7 @@ func (qa *QueueAgent) Fetch(ctx context.Context) error { ) } -func (qa *QueueAgent) GetType() consts.ComponentType { return consts.QueueAgentType } - -func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (qa *QueueAgent) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(qa.ytsaurus.GetClusterState()) && qa.server.needUpdate() { @@ -116,17 +109,13 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er } if qa.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, qa.ytsaurus, qa, &qa.localComponent, qa.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, qa, dry); status != nil { return *status, err } } - masterStatus, err := qa.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, qa.master.GetName()), err + if status, err := checkComponentDependency(ctx, qa.master); status != nil { + return *status, err } // It makes no sense to start queue agents without tablet nodes. @@ -134,12 +123,8 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er return WaitingStatus(SyncStatusBlocked, "tablet nodes"), fmt.Errorf("cannot initialize queue agent without tablet nodes") } for _, tnd := range qa.tabletNodes { - tndStatus, err := tnd.Status(ctx) - if err != nil { - return tndStatus, err - } - if !IsRunningStatus(tndStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err + if status, err := checkComponentDependency(ctx, tnd); status != nil { + return *status, err } } @@ -154,7 +139,7 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er return WaitingStatus(SyncStatusPending, qa.secret.Name()), err } - if qa.NeedSync() { + if ServerNeedSync(qa.server, qa.ytsaurus) { if !dry { err = qa.server.Sync(ctx) } @@ -168,12 +153,8 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er var ytClient yt.Client if qa.ytsaurus.GetClusterState() != ytv1.ClusterStateUpdating { - ytClientStatus, err := qa.ytsaurusClient.Status(ctx) - if err != nil { - return ytClientStatus, err - } - if ytClientStatus.SyncStatus != SyncStatusReady { - return WaitingStatus(SyncStatusBlocked, qa.ytsaurusClient.GetName()), err + if status, err := checkComponentDependency(ctx, qa.ytsaurusClient); status != nil { + return *status, err } if !dry { @@ -338,12 +319,3 @@ func (qa *QueueAgent) prepareInitQueueAgentState() { container := &job.Spec.Template.Spec.Containers[0] container.EnvFrom = []corev1.EnvFromSource{qa.secret.GetEnvSource()} } - -func (qa *QueueAgent) Status(ctx context.Context) (ComponentStatus, error) { - return qa.doSync(ctx, true) -} - -func (qa *QueueAgent) Sync(ctx context.Context) error { - _, err := qa.doSync(ctx, false) - return err -} diff --git a/pkg/components/rpcproxy.go b/pkg/components/rpcproxy.go index b2b35aaf..3665af88 100644 --- a/pkg/components/rpcproxy.go +++ b/pkg/components/rpcproxy.go @@ -31,10 +31,9 @@ func NewRPCProxy( ytsaurus *apiproxy.Ytsaurus, masterReconciler Component, spec ytv1.RPCProxiesSpec) *RpcProxy { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.RpcProxyType, ComponentNamePart: spec.Role, } @@ -89,12 +88,6 @@ func NewRPCProxy( } } -func (rp *RpcProxy) IsUpdatable() bool { - return true -} - -func (rp *RpcProxy) GetType() consts.ComponentType { return consts.RpcProxyType } - func (rp *RpcProxy) Fetch(ctx context.Context) error { fetchable := []resources.Fetchable{ rp.server, @@ -105,7 +98,7 @@ func (rp *RpcProxy) Fetch(ctx context.Context) error { return resources.Fetch(ctx, fetchable...) } -func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (rp *RpcProxy) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(rp.ytsaurus.GetClusterState()) && rp.server.needUpdate() { @@ -113,20 +106,16 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } if rp.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, rp.ytsaurus, rp, &rp.localComponent, rp.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, rp, dry); status != nil { return *status, err } } - masterStatus, err := rp.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, rp.master.GetName()), err + if status, err := checkComponentDependency(ctx, rp.master); status != nil { + return *status, err } - if rp.NeedSync() { + if ServerNeedSync(rp.server, rp.ytsaurus) { if !dry { statefulSet := rp.server.buildStatefulSet() if secret := rp.tlsSecret; secret != nil { @@ -153,12 +142,3 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } - -func (rp *RpcProxy) Status(ctx context.Context) (ComponentStatus, error) { - return rp.doSync(ctx, true) -} - -func (rp *RpcProxy) Sync(ctx context.Context) error { - _, err := rp.doSync(ctx, false) - return err -} diff --git a/pkg/components/scheduler.go b/pkg/components/scheduler.go index 0296ad66..f10211bd 100644 --- a/pkg/components/scheduler.go +++ b/pkg/components/scheduler.go @@ -36,10 +36,9 @@ func NewScheduler( ytsaurus *apiproxy.Ytsaurus, master Component, execNodes, tabletNodes []Component) *Scheduler { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.SchedulerType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -103,12 +102,6 @@ func NewScheduler( } } -func (s *Scheduler) IsUpdatable() bool { - return true -} - -func (s *Scheduler) GetType() consts.ComponentType { return consts.SchedulerType } - func (s *Scheduler) Fetch(ctx context.Context) error { return resources.Fetch(ctx, s.server, @@ -118,16 +111,7 @@ func (s *Scheduler) Fetch(ctx context.Context) error { ) } -func (s *Scheduler) Status(ctx context.Context) (ComponentStatus, error) { - return s.doSync(ctx, true) -} - -func (s *Scheduler) Sync(ctx context.Context) error { - _, err := s.doSync(ctx, false) - return err -} - -func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (s *Scheduler) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(s.ytsaurus.GetClusterState()) && s.server.needUpdate() { @@ -138,7 +122,7 @@ func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, erro if IsUpdatingComponent(s.ytsaurus, s) { if s.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsRemoval { if !dry { - err = removePods(ctx, s.server, &s.localComponent) + err = removePods(ctx, s.server, s.ytsaurus, s.labeller) } return WaitingStatus(SyncStatusUpdating, "pods removal"), err } @@ -156,22 +140,13 @@ func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } } - masterStatus, err := s.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, s.master.GetName()), err + if status, err := checkComponentDependency(ctx, s.master); status != nil { + return *status, err } for _, end := range s.execNodes { - endStatus, err := end.Status(ctx) - if err != nil { - return endStatus, err - } - if !IsRunningStatus(endStatus.SyncStatus) { - // It makes no sense to start scheduler without exec nodes. - return WaitingStatus(SyncStatusBlocked, end.GetName()), err + if status, err := checkComponentDependency(ctx, end); status != nil { + return *status, err } } @@ -186,7 +161,7 @@ func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return WaitingStatus(SyncStatusPending, s.secret.Name()), err } - if s.NeedSync() { + if ServerNeedSync(s.server, s.ytsaurus) { if !dry { err = s.server.Sync(ctx) } @@ -216,13 +191,8 @@ func (s *Scheduler) initOpAchieve(ctx context.Context, dry bool) (ComponentStatu } for _, tnd := range s.tabletNodes { - tndStatus, err := tnd.Status(ctx) - if err != nil { - return tndStatus, err - } - if !IsRunningStatus(tndStatus.SyncStatus) { - // Wait for tablet nodes to proceed with operations archive init. - return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err + if status, err := checkComponentDependency(ctx, tnd); status != nil { + return *status, err } } diff --git a/pkg/components/server.go b/pkg/components/server.go index 4b789f10..2535f194 100644 --- a/pkg/components/server.go +++ b/pkg/components/server.go @@ -32,7 +32,6 @@ type server interface { needUpdate() bool configNeedsReload() bool needBuild() bool - needSync() bool buildStatefulSet() *appsv1.StatefulSet rebuildStatefulSet() *appsv1.StatefulSet } @@ -204,10 +203,6 @@ func (s *serverImpl) needBuild() bool { s.statefulSet.NeedSync(s.instanceSpec.InstanceCount) } -func (s *serverImpl) needSync() bool { - return s.configNeedsReload() || s.needBuild() -} - func (s *serverImpl) Sync(ctx context.Context) error { _ = s.configHelper.Build() _ = s.headlessService.Build() diff --git a/pkg/components/spyt.go b/pkg/components/spyt.go index 8aa8a56b..50b578a8 100644 --- a/pkg/components/spyt.go +++ b/pkg/components/spyt.go @@ -32,10 +32,9 @@ func NewSpyt( spyt *apiproxy.Spyt, ytsaurus *ytv1.Ytsaurus) *Spyt { l := labeller.Labeller{ - ObjectMeta: &spyt.GetResource().ObjectMeta, - APIProxy: spyt.APIProxy(), + ObjectMeta: &spyt.Resource().ObjectMeta, ComponentType: consts.SpytType, - ComponentNamePart: spyt.GetResource().Name, + ComponentNamePart: spyt.Resource().Name, Annotations: ytsaurus.Spec.ExtraPodAnnotations, } @@ -63,7 +62,7 @@ func NewSpyt( ytsaurus.Spec.ImagePullSecrets, "spyt-environment", consts.ClientConfigFileName, - spyt.GetResource().Spec.Image, + spyt.Resource().Spec.Image, cfgen.GetNativeClientConfig, ytsaurus.Spec.Tolerations, ytsaurus.Spec.NodeSelector, @@ -94,14 +93,14 @@ func (s *Spyt) createInitScript() string { return strings.Join(script, "\n") } -func (s *Spyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (s *Spyt) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if s.ytsaurus.Status.State != ytv1.ClusterStateRunning { return WaitingStatus(SyncStatusBlocked, s.ytsaurus.GetName()), err } - if s.spyt.GetResource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished { + if s.spyt.Resource().Status.ReleaseStatus == ytv1.SpytReleaseStatusFinished { return SimpleStatus(SyncStatusReady), err } @@ -114,7 +113,7 @@ func (s *Spyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { } err = s.secret.Sync(ctx) } - s.spyt.GetResource().Status.ReleaseStatus = ytv1.SpytReleaseStatusCreatingUserSecret + s.spyt.Resource().Status.ReleaseStatus = ytv1.SpytReleaseStatusCreatingUserSecret return WaitingStatus(SyncStatusPending, s.secret.Name()), err } @@ -123,7 +122,7 @@ func (s *Spyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { } status, err := s.initUser.Sync(ctx, dry) if status.SyncStatus != SyncStatusReady { - s.spyt.GetResource().Status.ReleaseStatus = ytv1.SpytReleaseStatusCreatingUser + s.spyt.Resource().Status.ReleaseStatus = ytv1.SpytReleaseStatusCreatingUser return status, err } @@ -150,11 +149,11 @@ func (s *Spyt) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { status, err = s.initEnvironment.Sync(ctx, dry) if status.SyncStatus != SyncStatusReady { - s.spyt.GetResource().Status.ReleaseStatus = ytv1.SpytReleaseStatusUploadingIntoCypress + s.spyt.Resource().Status.ReleaseStatus = ytv1.SpytReleaseStatusUploadingIntoCypress return status, err } - s.spyt.GetResource().Status.ReleaseStatus = ytv1.SpytReleaseStatusFinished + s.spyt.Resource().Status.ReleaseStatus = ytv1.SpytReleaseStatusFinished return SimpleStatus(SyncStatusReady), nil } @@ -166,17 +165,3 @@ func (s *Spyt) Fetch(ctx context.Context) error { s.secret, ) } - -func (s *Spyt) Status(ctx context.Context) ComponentStatus { - status, err := s.doSync(ctx, true) - if err != nil { - panic(err) - } - - return status -} - -func (s *Spyt) Sync(ctx context.Context) error { - _, err := s.doSync(ctx, false) - return err -} diff --git a/pkg/components/strawberry_controller.go b/pkg/components/strawberry_controller.go index 72391509..7d1b14be 100644 --- a/pkg/components/strawberry_controller.go +++ b/pkg/components/strawberry_controller.go @@ -41,7 +41,7 @@ func NewStrawberryController( master Component, scheduler Component, dataNodes []Component) *StrawberryController { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() // TODO: strawberry has a different image and can't be nil/fallback on CoreImage. image := resource.Spec.CoreImage @@ -51,7 +51,6 @@ func NewStrawberryController( l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.StrawberryControllerType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -81,7 +80,7 @@ func NewStrawberryController( &l, ytsaurus.APIProxy(), ytsaurus, - ytsaurus.GetResource().Spec.ImagePullSecrets, + ytsaurus.Resource().Spec.ImagePullSecrets, "user", consts.ClientConfigFileName, resource.Spec.CoreImage, @@ -112,12 +111,6 @@ func NewStrawberryController( } } -func (c *StrawberryController) IsUpdatable() bool { - return true -} - -func (c *StrawberryController) GetType() consts.ComponentType { return consts.StrawberryControllerType } - func (c *StrawberryController) Fetch(ctx context.Context) error { return resources.Fetch(ctx, c.microservice, @@ -220,7 +213,7 @@ func (c *StrawberryController) syncComponents(ctx context.Context) (err error) { return c.microservice.Sync(ctx) } -func (c *StrawberryController) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (c *StrawberryController) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(c.ytsaurus.GetClusterState()) && c.microservice.needUpdate() { @@ -231,7 +224,7 @@ func (c *StrawberryController) doSync(ctx context.Context, dry bool) (ComponentS if IsUpdatingComponent(c.ytsaurus, c) { if c.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsRemoval { if !dry { - err = removePods(ctx, c.microservice, &c.localComponent) + err = removePods(ctx, c.microservice, c.ytsaurus, c.labeller) } return WaitingStatus(SyncStatusUpdating, "pods removal"), err } @@ -244,29 +237,17 @@ func (c *StrawberryController) doSync(ctx context.Context, dry bool) (ComponentS } } - masterStatus, err := c.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, c.master.GetName()), err + if status, err := checkComponentDependency(ctx, c.master); status != nil { + return *status, err } - schStatus, err := c.scheduler.Status(ctx) - if err != nil { - return schStatus, err - } - if !IsRunningStatus(schStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, c.scheduler.GetName()), err + if status, err := checkComponentDependency(ctx, c.scheduler); status != nil { + return *status, err } for _, dataNode := range c.dataNodes { - dndStatus, err := dataNode.Status(ctx) - if err != nil { - return dndStatus, err - } - if !IsRunningStatus(dndStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, dataNode.GetName()), err + if status, err := checkComponentDependency(ctx, dataNode); status != nil { + return *status, err } } @@ -306,12 +287,3 @@ func (c *StrawberryController) doSync(ctx context.Context, dry bool) (ComponentS return SimpleStatus(SyncStatusReady), err } - -func (c *StrawberryController) Status(ctx context.Context) (ComponentStatus, error) { - return c.doSync(ctx, true) -} - -func (c *StrawberryController) Sync(ctx context.Context) error { - _, err := c.doSync(ctx, false) - return err -} diff --git a/pkg/components/suite_test.go b/pkg/components/suite_test.go index c0e6bd6b..9ae17295 100644 --- a/pkg/components/suite_test.go +++ b/pkg/components/suite_test.go @@ -13,7 +13,9 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts" + "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/labeller" mock_yt "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/mock" ) @@ -37,6 +39,8 @@ type FakeComponent struct { status ComponentStatus } +var _ Component = &FakeComponent{} + func NewFakeComponent(name string, compType consts.ComponentType) *FakeComponent { return &FakeComponent{ name: name, @@ -45,19 +49,19 @@ func NewFakeComponent(name string, compType consts.ComponentType) *FakeComponent } } -func (fc *FakeComponent) IsUpdatable() bool { - return false +func (fc *FakeComponent) getAPIProxy() apiproxy.APIProxy { + return nil } -func (fc *FakeComponent) Fetch(ctx context.Context) error { +func (fyc *FakeComponent) getLabeller() *labeller.Labeller { return nil } -func (fc *FakeComponent) Sync(ctx context.Context) error { +func (fc *FakeComponent) Fetch(ctx context.Context) error { return nil } -func (fc *FakeComponent) Status(ctx context.Context) (ComponentStatus, error) { +func (fc *FakeComponent) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { return fc.status, nil } @@ -103,10 +107,6 @@ func (fs *FakeServer) needBuild() bool { return false } -func (fs *FakeServer) needSync() bool { - return false -} - func (fs *FakeServer) arePodsRemoved(ctx context.Context) bool { return true } @@ -140,6 +140,8 @@ type FakeYtsaurusClient struct { client *mock_yt.MockClient } +var _ internalYtsaurusClient = &FakeYtsaurusClient{} + func NewFakeYtsaurusClient(client *mock_yt.MockClient) *FakeYtsaurusClient { return &FakeYtsaurusClient{ FakeComponent: *NewFakeComponent("ytsaurus_client", consts.YtsaurusClientType), @@ -154,7 +156,3 @@ func (fyc *FakeYtsaurusClient) GetYtClient() yt.Client { func (fyc *FakeYtsaurusClient) SetStatus(status ComponentStatus) { fyc.status = status } - -func (fyc *FakeYtsaurusClient) IsUpdatable() bool { - return false -} diff --git a/pkg/components/tablet_node.go b/pkg/components/tablet_node.go index 945dadd9..2ae9d7b6 100644 --- a/pkg/components/tablet_node.go +++ b/pkg/components/tablet_node.go @@ -43,10 +43,9 @@ func NewTabletNode( spec ytv1.TabletNodesSpec, doInitiailization bool, ) *TabletNode { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.TabletNodeType, ComponentNamePart: spec.Name, } @@ -83,13 +82,7 @@ func NewTabletNode( } } -func (tn *TabletNode) IsUpdatable() bool { - return true -} - -func (tn *TabletNode) GetType() consts.ComponentType { return consts.TabletNodeType } - -func (tn *TabletNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (tn *TabletNode) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(tn.ytsaurus.GetClusterState()) && tn.server.needUpdate() { @@ -97,12 +90,12 @@ func (tn *TabletNode) doSync(ctx context.Context, dry bool) (ComponentStatus, er } if tn.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, tn.ytsaurus, tn, &tn.localComponent, tn.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, tn, dry); status != nil { return *status, err } } - if tn.NeedSync() { + if ServerNeedSync(tn.server, tn.ytsaurus) { if !dry { err = tn.server.Sync(ctx) } @@ -118,12 +111,8 @@ func (tn *TabletNode) doSync(ctx context.Context, dry bool) (ComponentStatus, er return SimpleStatus(SyncStatusReady), err } - ytClientStatus, err := tn.ytsaurusClient.Status(ctx) - if err != nil { - return ytClientStatus, err - } - if ytClientStatus.SyncStatus != SyncStatusReady { - return WaitingStatus(SyncStatusBlocked, tn.ytsaurusClient.GetName()), err + if status, err := checkComponentDependency(ctx, tn.ytsaurusClient); status != nil { + return *status, err } if !dry && tn.doInitialization { @@ -137,7 +126,7 @@ func (tn *TabletNode) doSync(ctx context.Context, dry bool) (ComponentStatus, er } func (tn *TabletNode) getBundleBootstrap(bundle string) *ytv1.BundleBootstrapSpec { - resource := tn.ytsaurus.GetResource() + resource := tn.ytsaurus.Resource() if resource.Spec.Bootstrap == nil || resource.Spec.Bootstrap.TabletCellBundles == nil { return nil } @@ -247,15 +236,6 @@ func (tn *TabletNode) initBundles(ctx context.Context) (ComponentStatus, error) return SimpleStatus(SyncStatusReady), nil } -func (tn *TabletNode) Status(ctx context.Context) (ComponentStatus, error) { - return tn.doSync(ctx, true) -} - -func (tn *TabletNode) Sync(ctx context.Context) error { - _, err := tn.doSync(ctx, false) - return err -} - func (tn *TabletNode) Fetch(ctx context.Context) error { return resources.Fetch(ctx, tn.server) } diff --git a/pkg/components/tablet_node_remote.go b/pkg/components/tablet_node_remote.go index 422bed0c..48fd06f7 100644 --- a/pkg/components/tablet_node_remote.go +++ b/pkg/components/tablet_node_remote.go @@ -15,12 +15,13 @@ import ( ) type RemoteTabletNode struct { - server server - cfgen *ytconfig.NodeGenerator - spec *ytv1.TabletNodesSpec - baseComponent + remoteServerComponent + cfgen *ytconfig.NodeGenerator + spec *ytv1.TabletNodesSpec } +var _ RemoteServerComponent = &RemoteTabletNode{} + func NewRemoteTabletNodes( cfgen *ytconfig.NodeGenerator, nodes *ytv1.RemoteTabletNodes, @@ -30,7 +31,6 @@ func NewRemoteTabletNodes( ) *RemoteTabletNode { l := labeller.Labeller{ ObjectMeta: &nodes.ObjectMeta, - APIProxy: proxy, ComponentType: consts.TabletNodeType, ComponentNamePart: spec.Name, } @@ -39,7 +39,7 @@ func NewRemoteTabletNodes( spec.InstanceSpec.MonitoringPort = ptr.To(int32(consts.TabletNodeMonitoringPort)) } - srv := newServerConfigured( + server := newServerConfigured( &l, proxy, commonSpec, @@ -58,17 +58,16 @@ func NewRemoteTabletNodes( }), ) return &RemoteTabletNode{ - baseComponent: baseComponent{labeller: &l}, - server: srv, - cfgen: cfgen, - spec: &spec, + remoteServerComponent: newRemoteServerComponent(proxy, &l, server), + cfgen: cfgen, + spec: &spec, } } -func (n *RemoteTabletNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (n *RemoteTabletNode) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if n.server.needSync() || n.server.needUpdate() { + if n.server.configNeedsReload() || n.server.needBuild() || n.server.needUpdate() { if !dry { err = n.server.Sync(ctx) } @@ -82,12 +81,6 @@ func (n *RemoteTabletNode) doSync(ctx context.Context, dry bool) (ComponentStatu return SimpleStatus(SyncStatusReady), err } -func (n *RemoteTabletNode) GetType() consts.ComponentType { return consts.TabletNodeType } - -func (n *RemoteTabletNode) Sync(ctx context.Context) (ComponentStatus, error) { - return n.doSync(ctx, false) -} - func (n *RemoteTabletNode) Fetch(ctx context.Context) error { return resources.Fetch(ctx, n.server) } diff --git a/pkg/components/tablet_node_test.go b/pkg/components/tablet_node_test.go index 3169b681..43dcf8fd 100644 --- a/pkg/components/tablet_node_test.go +++ b/pkg/components/tablet_node_test.go @@ -173,13 +173,13 @@ var _ = Describe("Tablet node test", func() { tabletNode := NewTabletNode(cfgen, ytsaurus, ytsaurusClient, ytsaurusSpec.Spec.TabletNodes[0], true) tabletNode.server = NewFakeServer() - status, err := tabletNode.Status(context.Background()) + status, err := tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusBlocked)) ytsaurusClient.SetStatus(SimpleStatus(SyncStatusReady)) - status, err = tabletNode.Status(context.Background()) + status, err = tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusPending)) }) @@ -194,13 +194,13 @@ var _ = Describe("Tablet node test", func() { fakeServer.podsReady = false tabletNode.server = fakeServer - status, err := tabletNode.Status(context.Background()) + status, err := tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusBlocked)) fakeServer.podsReady = true - status, err = tabletNode.Status(context.Background()) + status, err = tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusPending)) }) @@ -228,9 +228,9 @@ var _ = Describe("Tablet node test", func() { gomock.Nil()). Return(false, existsNetError), ) - err := tabletNode.Sync(context.Background()) + _, err := tabletNode.Sync(context.Background(), false) Expect(err).Should(Equal(existsNetError)) - status, err := tabletNode.Status(context.Background()) + status, err := tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusPending)) @@ -249,9 +249,9 @@ var _ = Describe("Tablet node test", func() { gomock.Any()). Return(yt.NodeID(guid.New()), createBundleNetError), ) - err = tabletNode.Sync(context.Background()) + _, err = tabletNode.Sync(context.Background(), false) Expect(err).Should(Equal(createBundleNetError)) - status, err = tabletNode.Status(context.Background()) + status, err = tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusPending)) @@ -291,9 +291,9 @@ var _ = Describe("Tablet node test", func() { nil). Return(getNetError), ) - err = tabletNode.Sync(context.Background()) + _, err = tabletNode.Sync(context.Background(), false) Expect(err).Should(Equal(getNetError)) - status, err = tabletNode.Status(context.Background()) + status, err = tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusPending)) @@ -334,9 +334,9 @@ var _ = Describe("Tablet node test", func() { gomock.Any()). Return(yt.NodeID(guid.New()), createCellNetError), ) - err = tabletNode.Sync(context.Background()) + _, err = tabletNode.Sync(context.Background(), false) Expect(err).Should(Equal(createCellNetError)) - status, err = tabletNode.Status(context.Background()) + status, err = tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusPending)) gomock.InOrder() @@ -385,9 +385,9 @@ var _ = Describe("Tablet node test", func() { nil). Return(getNetError).Times(1), ) - err = tabletNode.Sync(context.Background()) + _, err = tabletNode.Sync(context.Background(), false) Expect(err).Should(Equal(getNetError)) - status, err = tabletNode.Status(context.Background()) + status, err = tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusPending)) @@ -436,9 +436,9 @@ var _ = Describe("Tablet node test", func() { gomock.Any()). Return(yt.NodeID(guid.New()), nil).Times(1), ) - err = tabletNode.Sync(context.Background()) + _, err = tabletNode.Sync(context.Background(), false) Expect(err).Should(Succeed()) - status, err = tabletNode.Status(context.Background()) + status, err = tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusReady)) }) @@ -512,10 +512,10 @@ var _ = Describe("Tablet node test", func() { nodeCfgen := ytconfig.NewLocalNodeGenerator(ytsaurusSpec, "cluster_domain") tabletNode := NewTabletNode(nodeCfgen, ytsaurus, ytsaurusClient, ytsaurusSpec.Spec.TabletNodes[0], true) tabletNode.server = NewFakeServer() - err := tabletNode.Sync(context.Background()) + _, err := tabletNode.Sync(context.Background(), false) Expect(err).Should(Succeed()) - status, err := tabletNode.Status(context.Background()) + status, err := tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusReady)) }) @@ -528,10 +528,10 @@ var _ = Describe("Tablet node test", func() { cfgen := ytconfig.NewLocalNodeGenerator(ytsaurusSpec, "cluster_domain") tabletNode := NewTabletNode(cfgen, ytsaurus, ytsaurusClient, ytsaurusSpec.Spec.TabletNodes[0], false) tabletNode.server = NewFakeServer() - err := tabletNode.Sync(context.Background()) + _, err := tabletNode.Sync(context.Background(), false) Expect(err).Should(Succeed()) - status, err := tabletNode.Status(context.Background()) + status, err := tabletNode.Sync(context.Background(), true) Expect(err).Should(Succeed()) Expect(status.SyncStatus).Should(Equal(SyncStatusReady)) }) diff --git a/pkg/components/tcpproxy.go b/pkg/components/tcpproxy.go index 32640a92..17c6c396 100644 --- a/pkg/components/tcpproxy.go +++ b/pkg/components/tcpproxy.go @@ -30,10 +30,9 @@ func NewTCPProxy( ytsaurus *apiproxy.Ytsaurus, masterReconciler Component, spec ytv1.TCPProxiesSpec) *TcpProxy { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.TcpProxyType, ComponentNamePart: spec.Role, } @@ -75,12 +74,6 @@ func NewTCPProxy( } } -func (tp *TcpProxy) IsUpdatable() bool { - return true -} - -func (tp *TcpProxy) GetType() consts.ComponentType { return consts.TcpProxyType } - func (tp *TcpProxy) Fetch(ctx context.Context) error { fetchable := []resources.Fetchable{ tp.server, @@ -91,7 +84,7 @@ func (tp *TcpProxy) Fetch(ctx context.Context) error { return resources.Fetch(ctx, fetchable...) } -func (tp *TcpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (tp *TcpProxy) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(tp.ytsaurus.GetClusterState()) && tp.server.needUpdate() { @@ -99,20 +92,16 @@ func (tp *TcpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } if tp.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, tp.ytsaurus, tp, &tp.localComponent, tp.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, tp, dry); status != nil { return *status, err } } - tpStatus, err := tp.master.Status(ctx) - if err != nil { - return tpStatus, err - } - if !IsRunningStatus(tpStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, tp.master.GetName()), err + if status, err := checkComponentDependency(ctx, tp.master); status != nil { + return *status, err } - if tp.NeedSync() { + if ServerNeedSync(tp.server, tp.ytsaurus) { if !dry { err = tp.server.Sync(ctx) } @@ -133,12 +122,3 @@ func (tp *TcpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } - -func (tp *TcpProxy) Status(ctx context.Context) (ComponentStatus, error) { - return tp.doSync(ctx, true) -} - -func (tp *TcpProxy) Sync(ctx context.Context) error { - _, err := tp.doSync(ctx, false) - return err -} diff --git a/pkg/components/ui.go b/pkg/components/ui.go index 65b866f1..1a7f38db 100644 --- a/pkg/components/ui.go +++ b/pkg/components/ui.go @@ -28,14 +28,15 @@ type UI struct { caBundle *resources.CABundle } +var _ LocalComponent = &UI{} + const UIClustersConfigFileName = "clusters-config.json" const UICustomConfigFileName = "common.js" func NewUI(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *UI { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.UIType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -97,12 +98,6 @@ func NewUI(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Compon } } -func (u *UI) IsUpdatable() bool { - return true -} - -func (u *UI) GetType() consts.ComponentType { return consts.UIType } - func (u *UI) Fetch(ctx context.Context) error { return resources.Fetch(ctx, u.microservice, @@ -127,7 +122,7 @@ func (u *UI) createInitScript() string { } func (u *UI) syncComponents(ctx context.Context) (err error) { - ytsaurusResource := u.ytsaurus.GetResource() + ytsaurusResource := u.ytsaurus.Resource() service := u.microservice.buildService() service.Spec.Type = ytsaurusResource.Spec.UI.ServiceType @@ -269,7 +264,7 @@ func (u *UI) syncComponents(ctx context.Context) (err error) { return u.microservice.Sync(ctx) } -func (u *UI) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (u *UI) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if u.ytsaurus.GetClusterState() == ytv1.ClusterStateRunning && u.microservice.needUpdate() { @@ -280,7 +275,7 @@ func (u *UI) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { if IsUpdatingComponent(u.ytsaurus, u) { if u.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsRemoval { if !dry { - err = removePods(ctx, u.microservice, &u.localComponent) + err = removePods(ctx, u.microservice, u.ytsaurus, u.labeller) } return WaitingStatus(SyncStatusUpdating, "pods removal"), err } @@ -293,12 +288,8 @@ func (u *UI) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { } } - masterStatus, err := u.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, u.master.GetName()), err + if status, err := checkComponentDependency(ctx, u.master); status != nil { + return *status, err } if u.secret.NeedSync(consts.TokenSecretKey, "") { @@ -335,12 +326,3 @@ func (u *UI) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { return SimpleStatus(SyncStatusReady), err } - -func (u *UI) Status(ctx context.Context) (ComponentStatus, error) { - return u.doSync(ctx, true) -} - -func (u *UI) Sync(ctx context.Context) error { - _, err := u.doSync(ctx, false) - return err -} diff --git a/pkg/components/yql_agent.go b/pkg/components/yql_agent.go index cef94751..d804c739 100644 --- a/pkg/components/yql_agent.go +++ b/pkg/components/yql_agent.go @@ -27,10 +27,9 @@ type YqlAgent struct { } func NewYQLAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *YqlAgent { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.YqlAgentType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -80,16 +79,6 @@ func NewYQLAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master } } -func (yqla *YqlAgent) IsUpdatable() bool { - return true -} - -func (yqla *YqlAgent) GetType() consts.ComponentType { return consts.YqlAgentType } - -func (yqla *YqlAgent) GetName() string { - return yqla.labeller.GetFullComponentName() -} - func (yqla *YqlAgent) Fetch(ctx context.Context) error { return resources.Fetch(ctx, yqla.server, @@ -127,7 +116,7 @@ func (yqla *YqlAgent) createInitScript() string { return strings.Join(script, "\n") } -func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (yqla *YqlAgent) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error if ytv1.IsReadyToUpdateClusterState(yqla.ytsaurus.GetClusterState()) && yqla.server.needUpdate() { @@ -135,17 +124,13 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er } if yqla.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if status, err := handleUpdatingClusterState(ctx, yqla.ytsaurus, yqla, &yqla.localComponent, yqla.server, dry); status != nil { + if status, err := handleUpdatingClusterState(ctx, yqla, dry); status != nil { return *status, err } } - masterStatus, err := yqla.master.Status(ctx) - if err != nil { - return masterStatus, err - } - if !IsRunningStatus(masterStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, yqla.master.GetName()), err + if status, err := checkComponentDependency(ctx, yqla.master); status != nil { + return *status, err } if yqla.secret.NeedSync(consts.TokenSecretKey, "") { @@ -159,15 +144,15 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er return WaitingStatus(SyncStatusPending, yqla.secret.Name()), err } - if yqla.NeedSync() { + if ServerNeedSync(yqla.server, yqla.ytsaurus) { if !dry { ss := yqla.server.buildStatefulSet() container := &ss.Spec.Template.Spec.Containers[0] container.Command = []string{"sh", "-c", fmt.Sprintf("echo -n $YT_TOKEN > %s; %s", consts.DefaultYqlTokenPath, strings.Join(container.Command, " "))} container.EnvFrom = []corev1.EnvFromSource{yqla.secret.GetEnvSource()} - if yqla.ytsaurus.GetResource().Spec.UseIPv6 && !yqla.ytsaurus.GetResource().Spec.UseIPv4 { + if yqla.ytsaurus.Resource().Spec.UseIPv6 && !yqla.ytsaurus.Resource().Spec.UseIPv4 { container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "1"}} - } else if !yqla.ytsaurus.GetResource().Spec.UseIPv6 && yqla.ytsaurus.GetResource().Spec.UseIPv4 { + } else if !yqla.ytsaurus.Resource().Spec.UseIPv6 && yqla.ytsaurus.Resource().Spec.UseIPv4 { container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "1"}, {Name: "YT_FORCE_IPV6", Value: "0"}} } else { container.Env = []corev1.EnvVar{{Name: "YT_FORCE_IPV4", Value: "0"}, {Name: "YT_FORCE_IPV6", Value: "0"}} @@ -187,12 +172,3 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er return yqla.initEnvironment.Sync(ctx, dry) } - -func (yqla *YqlAgent) Status(ctx context.Context) (ComponentStatus, error) { - return yqla.doSync(ctx, true) -} - -func (yqla *YqlAgent) Sync(ctx context.Context) error { - _, err := yqla.doSync(ctx, false) - return err -} diff --git a/pkg/components/ytsaurus_client.go b/pkg/components/ytsaurus_client.go index 2fe5c984..b62375c4 100644 --- a/pkg/components/ytsaurus_client.go +++ b/pkg/components/ytsaurus_client.go @@ -38,15 +38,16 @@ type YtsaurusClient struct { ytClient yt.Client } +var _ LocalComponent = &YtsaurusClient{} + func NewYtsaurusClient( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, httpProxy Component, ) *YtsaurusClient { - resource := ytsaurus.GetResource() + resource := ytsaurus.Resource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, - APIProxy: ytsaurus.APIProxy(), ComponentType: consts.YtsaurusClientType, Annotations: resource.Spec.ExtraPodAnnotations, } @@ -59,7 +60,7 @@ func NewYtsaurusClient( &l, ytsaurus.APIProxy(), ytsaurus, - ytsaurus.GetResource().Spec.ImagePullSecrets, + ytsaurus.Resource().Spec.ImagePullSecrets, "user", consts.ClientConfigFileName, resource.Spec.CoreImage, @@ -74,12 +75,6 @@ func NewYtsaurusClient( } } -func (yc *YtsaurusClient) IsUpdatable() bool { - return false -} - -func (yc *YtsaurusClient) GetType() consts.ComponentType { return consts.YtsaurusClientType } - func (yc *YtsaurusClient) Fetch(ctx context.Context) error { return resources.Fetch(ctx, yc.secret, @@ -135,7 +130,7 @@ func (yc *YtsaurusClient) getAllMasters(ctx context.Context) ([]MasterInfo, erro } var secondaryMasters []MasterInfo - if len(yc.ytsaurus.GetResource().Spec.SecondaryMasters) > 0 { + if len(yc.ytsaurus.Resource().Spec.SecondaryMasters) > 0 { err = yc.ytClient.GetNode(ctx, ypath.Path("//sys/@cluster_connection/secondary_masters"), &secondaryMasters, getReadOnlyGetOptions()) if err != nil { return nil, err @@ -206,7 +201,7 @@ func (yc *YtsaurusClient) handleUpdatingState(ctx context.Context) (ComponentSta return SimpleStatus(SyncStatusUpdating), err } - yc.ytsaurus.GetResource().Status.UpdateStatus.TabletCellBundles = tabletCellBundles + yc.ytsaurus.Resource().Status.UpdateStatus.TabletCellBundles = tabletCellBundles yc.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ Type: consts.ConditionTabletCellsSaved, @@ -260,7 +255,7 @@ func (yc *YtsaurusClient) handleUpdatingState(ctx context.Context) (ComponentSta if err != nil { return SimpleStatus(SyncStatusUpdating), err } - yc.ytsaurus.GetResource().Status.UpdateStatus.MasterMonitoringPaths = monitoringPaths + yc.ytsaurus.Resource().Status.UpdateStatus.MasterMonitoringPaths = monitoringPaths yc.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{ Type: consts.ConditionSnapshotsMonitoringInfoSaved, @@ -271,7 +266,7 @@ func (yc *YtsaurusClient) handleUpdatingState(ctx context.Context) (ComponentSta } if !yc.ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionSnapshotsBuildingStarted) { - if err := yc.StartBuildMasterSnapshots(ctx, yc.ytsaurus.GetResource().Status.UpdateStatus.MasterMonitoringPaths); err != nil { + if err := yc.StartBuildMasterSnapshots(ctx, yc.ytsaurus.Resource().Status.UpdateStatus.MasterMonitoringPaths); err != nil { return SimpleStatus(SyncStatusUpdating), err } @@ -283,7 +278,7 @@ func (yc *YtsaurusClient) handleUpdatingState(ctx context.Context) (ComponentSta }) } - built, err := yc.AreMasterSnapshotsBuilt(ctx, yc.ytsaurus.GetResource().Status.UpdateStatus.MasterMonitoringPaths) + built, err := yc.AreMasterSnapshotsBuilt(ctx, yc.ytsaurus.Resource().Status.UpdateStatus.MasterMonitoringPaths) if err != nil { return SimpleStatus(SyncStatusUpdating), err } @@ -303,7 +298,7 @@ func (yc *YtsaurusClient) handleUpdatingState(ctx context.Context) (ComponentSta case ytv1.UpdateStateWaitingForTabletCellsRecovery: if !yc.ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionTabletCellsRecovered) { - err = yc.RecoverTableCells(ctx, yc.ytsaurus.GetResource().Status.UpdateStatus.TabletCellBundles) + err = yc.RecoverTableCells(ctx, yc.ytsaurus.Resource().Status.UpdateStatus.TabletCellBundles) if err != nil { return SimpleStatus(SyncStatusUpdating), err } @@ -337,14 +332,11 @@ func (yc *YtsaurusClient) handleUpdatingState(ctx context.Context) (ComponentSta return SimpleStatus(SyncStatusUpdating), err } -func (yc *YtsaurusClient) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { +func (yc *YtsaurusClient) Sync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - hpStatus, err := yc.httpProxy.Status(ctx) - if err != nil { - return hpStatus, err - } - if !IsRunningStatus(hpStatus.SyncStatus) { - return WaitingStatus(SyncStatusBlocked, yc.httpProxy.GetName()), err + + if status, err := checkComponentDependency(ctx, yc.httpProxy); status != nil { + return *status, err } if yc.secret.NeedSync(consts.TokenSecretKey, "") { @@ -388,7 +380,7 @@ func (yc *YtsaurusClient) doSync(ctx context.Context, dry bool) (ComponentStatus } if yc.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating { - if yc.ytsaurus.GetResource().Status.UpdateStatus.State == ytv1.UpdateStateImpossibleToStart { + if yc.ytsaurus.Resource().Status.UpdateStatus.State == ytv1.UpdateStateImpossibleToStart { return SimpleStatus(SyncStatusReady), err } if dry { @@ -400,15 +392,6 @@ func (yc *YtsaurusClient) doSync(ctx context.Context, dry bool) (ComponentStatus return SimpleStatus(SyncStatusReady), err } -func (yc *YtsaurusClient) Status(ctx context.Context) (ComponentStatus, error) { - return yc.doSync(ctx, true) -} - -func (yc *YtsaurusClient) Sync(ctx context.Context) error { - _, err := yc.doSync(ctx, false) - return err -} - func (yc *YtsaurusClient) GetYtClient() yt.Client { return yc.ytClient } diff --git a/pkg/labeller/labeller.go b/pkg/labeller/labeller.go index 39bc74ab..4c52f9b2 100644 --- a/pkg/labeller/labeller.go +++ b/pkg/labeller/labeller.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" - "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -16,7 +15,6 @@ type FetchableObject struct { } type Labeller struct { - APIProxy apiproxy.APIProxy ObjectMeta *metav1.ObjectMeta ComponentType consts.ComponentType // An optional name identifying a group of instances of the type above. @@ -77,6 +75,10 @@ func (l *Labeller) GetPodsRemovingStartedCondition() string { return fmt.Sprintf("%sPodsRemovingStarted", l.GetFullComponentName()) } +func (l *Labeller) GetPodsRemovedCondition() string { + return GetPodsRemovedCondition(l.GetFullComponentName()) +} + func (l *Labeller) GetObjectMeta(name string) metav1.ObjectMeta { return metav1.ObjectMeta{ Name: name, diff --git a/pkg/resources/deployment.go b/pkg/resources/deployment.go index 28b1f970..109c99d6 100644 --- a/pkg/resources/deployment.go +++ b/pkg/resources/deployment.go @@ -62,10 +62,10 @@ func (d *Deployment) Build() *appsv1.Deployment { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: d.labeller.GetMetaLabelMap(false), - Annotations: d.ytsaurus.GetResource().Spec.ExtraPodAnnotations, + Annotations: d.ytsaurus.Resource().Spec.ExtraPodAnnotations, }, Spec: corev1.PodSpec{ - ImagePullSecrets: d.ytsaurus.GetResource().Spec.ImagePullSecrets, + ImagePullSecrets: d.ytsaurus.Resource().Spec.ImagePullSecrets, Tolerations: d.tolerations, NodeSelector: d.nodeSelector, }, diff --git a/pkg/resources/job.go b/pkg/resources/job.go index 5ebce1ed..80c9e37b 100644 --- a/pkg/resources/job.go +++ b/pkg/resources/job.go @@ -19,6 +19,8 @@ type Job struct { newObject batchv1.Job } +var _ Resource = &Job{} + func NewJob(name string, l *labeller.Labeller, apiProxy apiproxy.APIProxy) *Job { return &Job{ name: name,