Skip to content

Commit

Permalink
refactor components interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
koct9i committed Nov 5, 2024
1 parent 5d45249 commit 282418d
Show file tree
Hide file tree
Showing 29 changed files with 209 additions and 274 deletions.
3 changes: 2 additions & 1 deletion controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func NewComponentManager(
return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err)
}

c.SetReadyCondition(componentStatus)
ytsaurus.SetStatusCondition(components.GetReadyCondition(c, componentStatus))

syncStatus := componentStatus.SyncStatus

if syncStatus == components.SyncStatusNeedLocalUpdate {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type ConditionManager interface {
IsStatusConditionFalse(conditionType string) bool
}

type UpdateConditionManager interface {
SetUpdateStatusCondition(ctx context.Context, condition metav1.Condition)
IsUpdateStatusConditionTrue(condition string) bool
}

func NewAPIProxy(
object client.Object,
client client.Client,
Expand Down
148 changes: 102 additions & 46 deletions pkg/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,67 +43,95 @@ func SimpleStatus(status SyncStatus) ComponentStatus {
}

type Component interface {
GetType() consts.ComponentType
GetName() string

Status(ctx context.Context) (ComponentStatus, error)

Fetch(ctx context.Context) error
Sync(ctx context.Context) error
Status(ctx context.Context) (ComponentStatus, error)
GetName() string
GetType() consts.ComponentType
SetReadyCondition(status ComponentStatus)
}

// TODO(nadya73): refactor it
IsUpdatable() bool
type LocalComponent interface {
Component

getLabeller() *labeller.Labeller
getYtsaurus() *apiproxy.Ytsaurus
}

// Following structs are used as a base for implementing YTsaurus components objects.
// baseComponent is a base struct intended for use in the simplest components and remote components
// (the ones that don't have access to the ytsaurus resource).
type baseComponent struct {
type LocalServerComponent interface {
LocalComponent

getServer() server
}

type RemoteServerComponent interface {
Component
}

// localComponent is a base structs for components which have access to ytsaurus resource,
// but don't depend on server. Example: UI, Strawberry.
type localComponent struct {
labeller *labeller.Labeller
ytsaurus *apiproxy.Ytsaurus
}

func (c *localComponent) GetType() consts.ComponentType {
return c.labeller.ComponentType
}

// GetName returns component's name, which is used as an identifier in component management
// and for mentioning in logs.
// For example for master component name is "Master",
// For data node name looks like "DataNode<NameFromSpec>".
func (c *baseComponent) GetName() string {
func (c *localComponent) GetName() string {
return c.labeller.GetFullComponentName()
}

// localComponent is a base structs for components which have access to ytsaurus resource,
// but don't depend on server. Example: UI, Strawberry.
type localComponent struct {
baseComponent
ytsaurus *apiproxy.Ytsaurus
func (c *localComponent) getLabeller() *labeller.Labeller {
return c.labeller
}

// localServerComponent is a base structs for components which have access to ytsaurus resource,
// and use server. Almost all components are based on this struct.
type localServerComponent struct {
localComponent
server server
func (c *localComponent) getYtsaurus() *apiproxy.Ytsaurus {
return c.ytsaurus
}

func newLocalComponent(
labeller *labeller.Labeller,
ytsaurus *apiproxy.Ytsaurus,
) localComponent {
return localComponent{
baseComponent: baseComponent{labeller: labeller},
ytsaurus: ytsaurus,
labeller: labeller,
ytsaurus: ytsaurus,
}
}

func (c *localComponent) SetReadyCondition(status ComponentStatus) {
ready := metav1.ConditionFalse
if status.SyncStatus == SyncStatusReady {
ready = metav1.ConditionTrue
}
c.ytsaurus.SetStatusCondition(metav1.Condition{
Type: fmt.Sprintf("%sReady", c.labeller.GetFullComponentName()),
Status: ready,
Reason: string(status.SyncStatus),
Message: status.Message,
})
// localServerComponent is a base structs for components which have access to ytsaurus resource,
// and use server. Almost all components are based on this struct.
type localServerComponent struct {
labeller *labeller.Labeller
ytsaurus *apiproxy.Ytsaurus
server server
}

func (c *localServerComponent) GetType() consts.ComponentType {
return c.labeller.ComponentType
}

func (c *localServerComponent) GetName() string {
return c.labeller.GetFullComponentName()
}

func (c *localServerComponent) getLabeller() *labeller.Labeller {
return c.labeller
}

func (c *localServerComponent) getYtsaurus() *apiproxy.Ytsaurus {
return c.ytsaurus
}

func (c *localServerComponent) getServer() server {
return c.server
}

func newLocalServerComponent(
Expand All @@ -112,21 +140,49 @@ func newLocalServerComponent(
server server,
) localServerComponent {
return localServerComponent{
localComponent: localComponent{
baseComponent: baseComponent{
labeller: labeller,
},
ytsaurus: ytsaurus,
},
server: server,
labeller: labeller,
ytsaurus: ytsaurus,
server: server,
}
}

type remoteServerComponent struct {
labeller *labeller.Labeller
server server
}

func (c *remoteServerComponent) GetType() consts.ComponentType {
return c.labeller.ComponentType
}

func (c *remoteServerComponent) GetName() string {
return c.labeller.GetFullComponentName()
}

func newRemoteServerComponent(
labeller *labeller.Labeller,
server server,
) remoteServerComponent {
return remoteServerComponent{
labeller: labeller,
server: server,
}
}

func (c *localServerComponent) NeedSync() bool {
return LocalServerNeedSync(c.server, c.ytsaurus)
func ServerNeedSync(s server, ytsaurus *apiproxy.Ytsaurus) bool {
// FIXME(khlebnikov): Explain this logic and move to upper layer.
return (s.configNeedsReload() && ytsaurus.IsUpdating()) || s.needBuild()
}

func LocalServerNeedSync(srv server, ytsaurus *apiproxy.Ytsaurus) bool {
return (srv.configNeedsReload() && ytsaurus.IsUpdating()) ||
srv.needBuild()
func GetReadyCondition(component Component, status ComponentStatus) metav1.Condition {
ready := metav1.ConditionFalse
if status.SyncStatus == SyncStatusReady {
ready = metav1.ConditionTrue
}
return metav1.Condition{
Type: fmt.Sprintf("%sReady", component.GetName()),
Status: ready,
Reason: string(status.SyncStatus),
Message: status.Message,
}
}
10 changes: 2 additions & 8 deletions pkg/components/controller_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus,
}
}

func (ca *ControllerAgent) IsUpdatable() bool {
return true
}

func (ca *ControllerAgent) GetType() consts.ComponentType { return consts.ControllerAgentType }

func (ca *ControllerAgent) Fetch(ctx context.Context) error {
return resources.Fetch(ctx, ca.server)
}
Expand All @@ -73,7 +67,7 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu
}

if ca.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, ca.ytsaurus, ca, &ca.localComponent, ca.server, dry); status != nil {
if status, err := handleUpdatingClusterState(ctx, ca, dry); status != nil {
return *status, err
}
}
Expand All @@ -86,7 +80,7 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu
return WaitingStatus(SyncStatusBlocked, ca.master.GetName()), err
}

if ca.NeedSync() {
if ServerNeedSync(ca.server, ca.ytsaurus) {
if !dry {
err = ca.server.Sync(ctx)
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/components/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ func NewDataNode(
}
}

func (n *DataNode) IsUpdatable() bool {
return true
}

func (n *DataNode) GetType() consts.ComponentType { return consts.DataNodeType }

func (n *DataNode) Fetch(ctx context.Context) error {
return resources.Fetch(ctx, n.server)
}
Expand All @@ -81,7 +75,7 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error
}

if n.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, n.ytsaurus, n, &n.localComponent, n.server, dry); status != nil {
if status, err := handleUpdatingClusterState(ctx, n, dry); status != nil {
return *status, err
}
}
Expand All @@ -94,7 +88,7 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error
return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err
}

if n.NeedSync() {
if ServerNeedSync(n.server, n.ytsaurus) {
if !dry {
err = n.server.Sync(ctx)
}
Expand Down
22 changes: 10 additions & 12 deletions pkg/components/data_node_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
)

type RemoteDataNode struct {
server server
cfgen *ytconfig.NodeGenerator
spec *ytv1.DataNodesSpec
baseComponent
remoteServerComponent
cfgen *ytconfig.NodeGenerator
spec *ytv1.DataNodesSpec
}

// var _ RemoteServerComponent = &RemoteDataNode{}

func NewRemoteDataNodes(
cfgen *ytconfig.NodeGenerator,
nodes *ytv1.RemoteDataNodes,
Expand All @@ -39,7 +40,7 @@ func NewRemoteDataNodes(
spec.InstanceSpec.MonitoringPort = ptr.To(int32(consts.DataNodeMonitoringPort))
}

srv := newServerConfigured(
server := newServerConfigured(
&l,
proxy,
commonSpec,
Expand All @@ -58,17 +59,16 @@ func NewRemoteDataNodes(
}),
)
return &RemoteDataNode{
baseComponent: baseComponent{labeller: &l},
server: srv,
cfgen: cfgen,
spec: &spec,
remoteServerComponent: newRemoteServerComponent(&l, server),
cfgen: cfgen,
spec: &spec,
}
}

func (n *RemoteDataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

if n.server.needSync() || n.server.needUpdate() {
if n.server.configNeedsReload() || n.server.needBuild() || n.server.needUpdate() {
if !dry {
err = n.server.Sync(ctx)
}
Expand All @@ -82,8 +82,6 @@ func (n *RemoteDataNode) doSync(ctx context.Context, dry bool) (ComponentStatus,
return SimpleStatus(SyncStatusReady), err
}

func (n *RemoteDataNode) GetType() consts.ComponentType { return consts.DataNodeType }

func (n *RemoteDataNode) Sync(ctx context.Context) (ComponentStatus, error) {
return n.doSync(ctx, false)
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/components/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ func NewDiscovery(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Disco
}
}

func (d *Discovery) IsUpdatable() bool {
return true
}

func (d *Discovery) GetType() consts.ComponentType { return consts.DiscoveryType }

func (d *Discovery) Fetch(ctx context.Context) error {
return resources.Fetch(ctx, d.server)
}
Expand All @@ -73,12 +67,12 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro
}

if d.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, d.ytsaurus, d, &d.localComponent, d.server, dry); status != nil {
if status, err := handleUpdatingClusterState(ctx, d, dry); status != nil {
return *status, err
}
}

if d.NeedSync() {
if ServerNeedSync(d.server, d.ytsaurus) {
if !dry {
err = d.server.Sync(ctx)
}
Expand Down
Loading

0 comments on commit 282418d

Please sign in to comment.