Skip to content

Commit

Permalink
refact code
Browse files Browse the repository at this point in the history
  • Loading branch information
jiayouxujin committed Oct 19, 2023
1 parent d40075f commit a56969f
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 90 deletions.
2 changes: 1 addition & 1 deletion examples/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
labels:
kvrocks/monitored-by: sentinel-1
spec:
image: apache/kvrocks:nightly # kvrocks image
image: apache/kvrocks:2.5.1 # kvrocks image
imagePullPolicy: IfNotPresent
master: 3
replicas: 2
Expand Down
14 changes: 7 additions & 7 deletions examples/standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ apiVersion: kvrocks.apache.org/v1alpha1
kind: KVRocks
metadata:
name: kvrocks-standard-1-demo
namespace: kvrocks
namespace: default
labels:
kvrocks/monitored-by: sentinel-1
spec:
image: apache/kvrocks:nightly
image: apache/kvrocks:2.4.0
imagePullPolicy: IfNotPresent
master: 1
replicas: 3
Expand Down Expand Up @@ -39,9 +39,9 @@ spec:
rocksdb.compression: "no"
rocksdb.wal_ttl_seconds: "0"
rocksdb.wal_size_limit_mb: "0"
# storage:
# size: 32Gi
# class: local-hostpath
storage:
size: 200Mi
class: standard
toleration:
- key: kvrocks
effect: NoSchedule
Expand All @@ -54,5 +54,5 @@ spec:
cpu: 2
memory: 8Gi
requests:
cpu: 1
memory: 4Gi
cpu: 200m
memory: 200Mi
1 change: 0 additions & 1 deletion pkg/controllers/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

// etcd-> controller
// TODO owner reference
func (h *KVRocksClusterHandler) ensureController() error {
etcdService := resources.NewEtcdService(h.instance)
if err := h.k8s.CreateIfNotExistsService(etcdService); err != nil {
Expand Down
23 changes: 11 additions & 12 deletions pkg/controllers/cluster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ import (
)

type KVRocksClusterHandler struct {
instance *kvrocksv1alpha1.KVRocks
k8s *k8s.Client
kvrocks *kvrocks.Client
log logr.Logger
password string
requeue bool
stsNodes [][]*kvrocks.Node
key types.NamespacedName
version int
masters map[string]*kvrocks.Node

instance *kvrocksv1alpha1.KVRocks
k8s *k8s.Client
kvrocks *kvrocks.Client
log logr.Logger
password string
requeue bool
stsNodes [][]*kvrocks.Node
key types.NamespacedName
version int
masters map[string]*kvrocks.Node
controllerClient *controller.Client
}

Expand Down Expand Up @@ -69,7 +68,7 @@ func (h *KVRocksClusterHandler) Handle() (error, bool) {
if err != nil || h.requeue {
return err, false
}
err = h.reBalance()
err = h.ensureMigrate()
if err != nil || h.requeue {
return err, false
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/controllers/cluster/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/RocksLabs/kvrocks-operator/pkg/client/kvrocks"
)

func (h *KVRocksClusterHandler) reBalance() error {
func (h *KVRocksClusterHandler) ensureMigrate() error {
masters := make([]*kvrocks.Node, 0)
h.masters = map[string]*kvrocks.Node{}
for _, nodes := range h.stsNodes {
Expand All @@ -24,7 +24,7 @@ func (h *KVRocksClusterHandler) reBalance() error {
return h.ensureReBalanceTopo(index, master)
}
}
h.log.Info("reBalance successfully")
h.log.Info("migrate successfully")
return h.ensureStatusTopoMsg()
}

Expand All @@ -36,7 +36,6 @@ func (h *KVRocksClusterHandler) ensureReBalanceTopo(src int, node *kvrocks.Node)
retry := 0
wait := time.Millisecond * 10
moveSlots:
// TODO controller 500 "there is a migration task running"
err := h.controllerClient.MigrateSlotAndData(src, dest, slot)
if err != nil {
h.log.Error(err, "move slot error")
Expand Down
36 changes: 2 additions & 34 deletions pkg/controllers/cluster/sentinel.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package cluster

import (
"strconv"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kvrocksv1alpha1 "github.com/RocksLabs/kvrocks-operator/api/v1alpha1"
sentinel "github.com/RocksLabs/kvrocks-operator/pkg/controllers/sentinel"
"github.com/RocksLabs/kvrocks-operator/pkg/resources"
)

Expand All @@ -23,38 +21,8 @@ func (h *KVRocksClusterHandler) ensureSentinel() error {
}
// notify sentinel to update
if v, ok := h.instance.Labels[resources.MonitoredBy]; ok {
return h.updateSentinelAnnotationCount(v)
return sentinel.UpdateSentinelAnnotationCount(h.k8s, h.instance.Namespace, v)
}

return nil
}

func (h *KVRocksClusterHandler) updateSentinelAnnotationCount(sentinelName string) error {
sentinel, err := h.k8s.GetKVRocks(types.NamespacedName{
Namespace: h.instance.Namespace,
Name: sentinelName,
})
if err != nil {
return err
}
annotations := sentinel.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
count, ok := annotations["change-count"]
if !ok {
count = "0"
}
countInt, err := strconv.Atoi(count)
if err != nil {
return err
}
countInt++
annotations["change-count"] = strconv.Itoa(countInt)
sentinel.SetAnnotations(annotations)
if err := h.k8s.UpdateKVRocks(sentinel); err != nil {
return err
}
h.log.V(1).Info("sentinel monitor ready")
return nil
}
33 changes: 33 additions & 0 deletions pkg/controllers/sentinel/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package sentinel

import (
"strconv"

"github.com/RocksLabs/kvrocks-operator/pkg/client/k8s"
"github.com/RocksLabs/kvrocks-operator/pkg/resources"
"k8s.io/apimachinery/pkg/types"
)

func (h *KVRocksSentinelHandler) ensureKubernetes() error {
Expand Down Expand Up @@ -37,3 +41,32 @@ func (h *KVRocksSentinelHandler) ensureKubernetes() error {
h.log.Info("kubernetes resources ok")
return nil
}

func UpdateSentinelAnnotationCount(k8s *k8s.Client, namespace, sentinelName string) error {
sentinel, err := k8s.GetKVRocks(types.NamespacedName{
Namespace: namespace,
Name: sentinelName,
})
if err != nil {
return err
}
annotations := sentinel.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
count, ok := annotations["change-count"]
if !ok {
count = "0"
}
countInt, err := strconv.Atoi(count)
if err != nil {
return err
}
countInt++
annotations["change-count"] = strconv.Itoa(countInt)
sentinel.SetAnnotations(annotations)
if err := k8s.UpdateKVRocks(sentinel); err != nil {
return err
}
return nil
}
34 changes: 2 additions & 32 deletions pkg/controllers/standard/kvrocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package standard
import (
"errors"
"fmt"
"strconv"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kvrocksv1alpha1 "github.com/RocksLabs/kvrocks-operator/api/v1alpha1"
"github.com/RocksLabs/kvrocks-operator/pkg/client/kvrocks"
"github.com/RocksLabs/kvrocks-operator/pkg/controllers/common"
sentinel "github.com/RocksLabs/kvrocks-operator/pkg/controllers/sentinel"
"github.com/RocksLabs/kvrocks-operator/pkg/resources"
)

Expand Down Expand Up @@ -102,41 +102,11 @@ func (h *KVRocksStandardHandler) ensureKVRocksReplication() error {
}
// notify sentinel to update
if v, ok := h.instance.Labels[resources.MonitoredBy]; ok {
return h.updateSentinelAnnotationCount(v)
return sentinel.UpdateSentinelAnnotationCount(h.k8s, h.instance.Namespace, v)
}
return nil
}

func (h *KVRocksStandardHandler) updateSentinelAnnotationCount(sentinelName string) error {
sentinel, err := h.k8s.GetKVRocks(types.NamespacedName{
Namespace: h.instance.Namespace,
Name: sentinelName,
})
if err != nil {
return err
}
annotations := sentinel.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
count, ok := annotations["change-count"]
if !ok {
count = "0"
}
countInt, err := strconv.Atoi(count)
if err != nil {
return err
}
countInt++
annotations["change-count"] = strconv.Itoa(countInt)
sentinel.SetAnnotations(annotations)
if err := h.k8s.UpdateKVRocks(sentinel); err != nil {
return err
}
h.log.V(1).Info("sentinel monitor ready")
return nil
}

func (h *KVRocksStandardHandler) updateKVRocksRole(podID int, role string) error {
podName := fmt.Sprintf("%s-%d", h.instance.Name, podID)
pod, err := h.k8s.GetPod(types.NamespacedName{
Expand Down

0 comments on commit a56969f

Please sign in to comment.