Skip to content

Commit

Permalink
feat: lorry support oceanbase and crobjobs (#6796)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuriwuyun authored Mar 25, 2024
1 parent ff20a9a commit 6bc4295
Show file tree
Hide file tree
Showing 37 changed files with 937 additions and 605 deletions.
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/opsrequest_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func NewDataScriptCondition(ops *OpsRequest) *metav1.Condition {
return newOpsCondition(ops, ConditionTypeDataScript, "DataScriptStarted", fmt.Sprintf("Start to execute data script in Cluster: %s", ops.Spec.ClusterRef))
}

func newOpsCondition(ops *OpsRequest, condType, reason, message string) *metav1.Condition {
func newOpsCondition(_ *OpsRequest, condType, reason, message string) *metav1.Condition {
return &metav1.Condition{
Type: condType,
Status: metav1.ConditionTrue,
Expand Down
6 changes: 3 additions & 3 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (r *OpsRequest) validateOps(ctx context.Context,
}

// validateExpose validates expose api when spec.type is Expose
func (r *OpsRequest) validateExpose(ctx context.Context, cluster *Cluster) error {
func (r *OpsRequest) validateExpose(_ context.Context, cluster *Cluster) error {
exposeList := r.Spec.ExposeList
if exposeList == nil {
return notEmptyError("spec.expose")
Expand Down Expand Up @@ -385,7 +385,7 @@ func compareQuantity(requestQuantity, limitQuantity *resource.Quantity) bool {
}

// validateHorizontalScaling validates api when spec.type is HorizontalScaling
func (r *OpsRequest) validateHorizontalScaling(ctx context.Context, cli client.Client, cluster *Cluster) error {
func (r *OpsRequest) validateHorizontalScaling(_ context.Context, _ client.Client, cluster *Cluster) error {
horizontalScalingList := r.Spec.HorizontalScalingList
if len(horizontalScalingList) == 0 {
return notEmptyError("spec.horizontalScaling")
Expand Down Expand Up @@ -567,7 +567,7 @@ func (r *OpsRequest) checkStorageClassAllowExpansion(ctx context.Context,
if err := cli.Get(ctx, types.NamespacedName{Name: *storageClassName}, storageClass); err != nil && !apierrors.IsNotFound(err) {
return false, err
}
if storageClass == nil || storageClass.AllowVolumeExpansion == nil {
if storageClass.AllowVolumeExpansion == nil {
return false, nil
}
return *storageClass.AllowVolumeExpansion, nil
Expand Down
12 changes: 6 additions & 6 deletions apis/apps/v1alpha1/opsrequest_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ var _ = Describe("OpsRequest webhook", func() {
Expect(k8sClient.Update(context.Background(), opsRequest).Error()).Should(ContainSubstring("forbidden to cancel the opsRequest which type not in ['VerticalScaling','HorizontalScaling']"))
}

testVerticalScaling := func(cluster *Cluster) {
testVerticalScaling := func(_ *Cluster) {
verticalScalingList := []VerticalScaling{
{
ComponentOps: ComponentOps{ComponentName: "vs-not-exist"},
Expand Down Expand Up @@ -266,7 +266,7 @@ var _ = Describe("OpsRequest webhook", func() {
}
}

testVolumeExpansion := func(cluster *Cluster) {
testVolumeExpansion := func(_ *Cluster) {
getSingleVolumeExpansionList := func(compName, vctName, storage string) []VolumeExpansion {
return []VolumeExpansion{
{
Expand Down Expand Up @@ -343,7 +343,7 @@ var _ = Describe("OpsRequest webhook", func() {
Expect(testCtx.CreateObj(ctx, opsRequest1).Error()).Should(ContainSubstring("existing other VolumeExpansion OpsRequest"))
}

testHorizontalScaling := func(clusterDef *ClusterDefinition, cluster *Cluster) {
testHorizontalScaling := func(clusterDef *ClusterDefinition, _ *Cluster) {
hScalingList := []HorizontalScaling{
{
ComponentOps: ComponentOps{ComponentName: "hs-not-exist"},
Expand Down Expand Up @@ -481,7 +481,7 @@ var _ = Describe("OpsRequest webhook", func() {
Expect(testCtx.CheckedCreateObj(ctx, opsRequest)).Should(Succeed())
}

testSwitchoverWithCompDef := func(clusterDef *ClusterDefinition, compDef *ComponentDefinition, cluster *Cluster) {
testSwitchoverWithCompDef := func(_ *ClusterDefinition, compDef *ComponentDefinition, cluster *Cluster) {
switchoverList := []Switchover{
{
ComponentOps: ComponentOps{ComponentName: "switchover-component-not-exist"},
Expand Down Expand Up @@ -591,7 +591,7 @@ var _ = Describe("OpsRequest webhook", func() {
Expect(k8sClient.Patch(ctx, opsRequest, patch)).Should(Succeed())
}

testRestart := func(cluster *Cluster) *OpsRequest {
testRestart := func(_ *Cluster) *OpsRequest {
By("By testing restart when componentNames is not correct")
opsRequest := createTestOpsRequest(clusterName, opsRequestName, RestartType)
opsRequest.Spec.RestartList = []ComponentOps{
Expand All @@ -605,7 +605,7 @@ var _ = Describe("OpsRequest webhook", func() {
return opsRequest
}

testReconfiguring := func(cluster *Cluster, clusterDef *ClusterDefinition) {
testReconfiguring := func(_ *Cluster, _ *ClusterDefinition) {
opsRequest := createTestOpsRequest(clusterName, opsRequestName+"-reconfiguring", ReconfiguringType)

createReconfigureObj := func(compName string) *Reconfigure {
Expand Down
10 changes: 9 additions & 1 deletion cmd/lorry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
kzap "sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/lorry/cronjobs"
"github.com/apecloud/kubeblocks/pkg/lorry/dcs"
"github.com/apecloud/kubeblocks/pkg/lorry/engines/register"
"github.com/apecloud/kubeblocks/pkg/lorry/grpcserver"
Expand Down Expand Up @@ -114,14 +115,21 @@ func main() {
panic(fmt.Errorf("fatal error grpcserver serve failed: %v", err))
}

// Start HTTP Server
// start HTTP Server
ops := opsregister.Operations()
httpServer := httpserver.NewServer(ops)
err = httpServer.StartNonBlocking()
if err != nil {
panic(errors.Wrap(err, "HTTP server initialize failed"))
}

// start cron jobs
jobManager, err := cronjobs.NewManager()
if err != nil {
panic(errors.Wrap(err, "Cron jobs initialize failed"))
}
jobManager.Start()

stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGTERM, os.Interrupt)
<-stop
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/transformer_cluster_backup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *clusterBackupPolicyTransformer) Transform(ctx graph.TransformContext, d
// only create backup schedule for the default backup policy template
// if there are more than one backup policy templates.
if r.isDefaultTemplate != trueVal && r.tplCount > 1 {
r.V(1).Info("Skip creating backup schedule for non-default backup policy template %s", tpl.Name)
r.V(1).Info("Skip creating backup schedule for non-default backup policy template", "template", tpl.Name)
return
}

Expand Down
1 change: 1 addition & 0 deletions pkg/constant/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const (
KBEnvWorkloadType = "KB_WORKLOAD_TYPE"
KBEnvBuiltinHandler = "KB_BUILTIN_HANDLER"
KBEnvActionCommands = "KB_ACTION_COMMANDS"
KBEnvCronJobs = "KB_CRON_JOBS"
KBEnvCharacterType = "KB_SERVICE_CHARACTER_TYPE"
KBEnvServiceUser = "KB_SERVICE_USER"
KBEnvServicePassword = "KB_SERVICE_PASSWORD"
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/lorry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
// action keys
const (
RoleProbeAction = "roleProbe"
HealthyCheckAction = "healthyCheck"
MemberJoinAction = "memberJoin"
MemberLeaveAction = "memberLeave"
ReadonlyAction = "readonly"
Expand Down
24 changes: 24 additions & 0 deletions pkg/controller/component/lorry_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func buildLorryEnvs(container *corev1.Container, synthesizeComp *SynthesizedComp
if volumeProtectionEnabled(synthesizeComp) {
envs = append(envs, buildEnv4VolumeProtection(*synthesizeComp.VolumeProtection))
}
envs = append(envs, buildEnv4CronJobs(synthesizeComp)...)

container.Env = append(container.Env, envs...)
}
Expand Down Expand Up @@ -358,6 +359,29 @@ func buildEnv4VolumeProtection(spec appsv1alpha1.VolumeProtectionSpec) corev1.En
}
}

func buildEnv4CronJobs(_ *SynthesizedComponent) []corev1.EnvVar {
return nil
// if synthesizeComp.LifecycleActions == nil || synthesizeComp.LifecycleActions.HealthyCheck == nil {
// return nil
// }
// healthyCheck := synthesizeComp.LifecycleActions.HealthyCheck
// healthCheckSetting := make(map[string]string)
// healthCheckSetting["periodSeconds"] = strconv.Itoa(int(healthyCheck.PeriodSeconds))
// healthCheckSetting["timeoutSeconds"] = strconv.Itoa(int(healthyCheck.TimeoutSeconds))
// healthCheckSetting["failureThreshold"] = strconv.Itoa(int(healthyCheck.FailureThreshold))
// healthCheckSetting["successThreshold"] = strconv.Itoa(int(healthyCheck.SuccessThreshold))
// cronJobs := make(map[string]map[string]string)
// cronJobs["healthyCheck"] = healthCheckSetting

// jsonStr, _ := json.Marshal(cronJobs)
// return []corev1.EnvVar{
// {
// Name: constant.KBEnvCronJobs,
// Value: string(jsonStr),
// },
// }
}

// getBuiltinActionHandler gets the built-in handler.
// The BuiltinActionHandler within the same synthesizeComp LifecycleActions should be consistent, we can take any one of them.
func getBuiltinActionHandler(synthesizeComp *SynthesizedComponent) appsv1alpha1.BuiltinActionHandlerType {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/component/service_descriptor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func handleServiceDescriptorTypeServiceRef(reqCtx intctrlutil.RequestCtx,
serviceRefDecl appsv1alpha1.ServiceRefDeclaration,
serviceReferences map[string]*appsv1alpha1.ServiceDescriptor) error {
// verify service kind and version
verifyServiceKindAndVersion := func(serviceDescriptor appsv1alpha1.ServiceDescriptor, serviceRefDeclSpecs ...appsv1alpha1.ServiceRefDeclarationSpec) bool {
verifyServiceKindAndVersion := func(serviceDescriptor appsv1alpha1.ServiceDescriptor, _ ...appsv1alpha1.ServiceRefDeclarationSpec) bool {
for _, serviceRefDeclSpec := range serviceRefDecl.ServiceRefDeclarationSpecs {
if getWellKnownServiceKindAliasMapping(serviceRefDeclSpec.ServiceKind) != getWellKnownServiceKindAliasMapping(serviceDescriptor.Spec.ServiceKind) {
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataprotection/restore/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (r *RestoreManager) BuildVolumePopulateJob(
populatePVC *corev1.PersistentVolumeClaim,
index int) (*batchv1.Job, error) {
prepareDataConfig := r.Restore.Spec.PrepareDataConfig
if prepareDataConfig == nil && prepareDataConfig.DataSourceRef == nil {
if prepareDataConfig == nil || prepareDataConfig.DataSourceRef == nil {
return nil, nil
}
if !backupSet.ActionSet.HasPrepareDataStage() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataprotection/restore/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("Backup Deleter Test", func() {
return backup
}

initResources := func(reqCtx intctrlutil.RequestCtx, startingIndex int, useVolumeSnapshotBackup bool, change func(f *testdp.MockRestoreFactory)) (*RestoreManager, *BackupActionSet) {
initResources := func(reqCtx intctrlutil.RequestCtx, _ int, _ bool, change func(f *testdp.MockRestoreFactory)) (*RestoreManager, *BackupActionSet) {
By("create a completed backup")
backup := mockBackupForRestore(&testCtx, actionSet.Name, testdp.BackupPVCName, true, false)

Expand Down
106 changes: 106 additions & 0 deletions pkg/lorry/cronjobs/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package cronjobs

import (
"context"
"strconv"
"time"

"github.com/apecloud/kubeblocks/pkg/lorry/operations"
)

type Job struct {
Name string
Ticker *time.Ticker
Operation operations.Operation
TimeoutSeconds int
PeriodSeconds int
SuccessThreshold int
FailureThreshold int
}

func NewJob(name string, settings map[string]string) (*Job, error) {
operations := operations.Operations()
ops, ok := operations[name]
if !ok {
logger.Info("Operation not found", "name", name)
return nil, nil
}
job := &Job{
Name: name,
Operation: ops,
TimeoutSeconds: 60,
PeriodSeconds: 60,
SuccessThreshold: 1,
FailureThreshold: 3,
}

if v, ok := settings["timeoutSeconds"]; ok {
timeoutSeconds, err := strconv.Atoi(v)
if err != nil {
logger.Info("Failed to parse timeoutSeconds", "error", err.Error(), "job", name, "value", v)
return nil, err
}
job.TimeoutSeconds = timeoutSeconds
}

if settings["periodSeconds"] != "" {
periodSeconds, err := strconv.Atoi(settings["periodSeconds"])
if err != nil {
logger.Info("Failed to parse periodSeconds", "error", err.Error(), "job", name, "value", settings["periodSeconds"])
return nil, err
}
job.PeriodSeconds = periodSeconds
}

if settings["successThreshold"] != "" {
successThreshold, err := strconv.Atoi(settings["successThreshold"])
if err != nil {
logger.Info("Failed to parse successThreshold", "error", err.Error(), "job", name, "value", settings["successThreshold"])
return nil, err
}
job.SuccessThreshold = successThreshold
}

if settings["failureThreshold"] != "" {
failureThreshold, err := strconv.Atoi(settings["failureThreshold"])
if err != nil {
logger.Info("Failed to parse failureThreshold", "error", err.Error(), "job", name, "value", settings["failureThreshold"])
return nil, err
}
job.FailureThreshold = failureThreshold
}
// operation is initialized in httpserver/apis.go
job.Operation.SetTimeout(time.Duration(job.TimeoutSeconds) * time.Second)
return job, nil
}

func (job *Job) Start() {
job.Ticker = time.NewTicker(time.Duration(job.PeriodSeconds) * time.Second)
defer job.Ticker.Stop()
for range job.Ticker.C {
_, err := job.Operation.Do(context.Background(), nil)
if err != nil {
logger.Info("Failed to run job", "name", job.Name, "error", err.Error())
// Handle error, e.g., increase failure count, stop job if failureThreshold is reached, etc.
}
}
}
68 changes: 68 additions & 0 deletions pkg/lorry/cronjobs/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package cronjobs

import (
"encoding/json"

ctrl "sigs.k8s.io/controller-runtime"

"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/spf13/viper"
)

type Manager struct {
Jobs map[string]*Job
}

var logger = ctrl.Log.WithName("cronjobs")

func NewManager() (*Manager, error) {
cronSettings := make(map[string]map[string]string)
jsonStr := viper.GetString(constant.KBEnvCronJobs)
if jsonStr == "" {
logger.Info("env is not set", "env", constant.KBEnvCronJobs)
return &Manager{}, nil
}

err := json.Unmarshal([]byte(jsonStr), &cronSettings)
if err != nil {
logger.Info("Failed to unmarshal env", "name", constant.KBEnvCronJobs, "value", jsonStr, "error", err.Error())
return nil, err
}

jobs := make(map[string]*Job)
for name, setting := range cronSettings {
job, err := NewJob(name, setting)
if err != nil {
logger.Info("Failed to create job", "error", err.Error(), "name", name, "setting", setting)
}
jobs[name] = job
}
return &Manager{
Jobs: jobs,
}, nil
}

func (m *Manager) Start() {
for _, job := range m.Jobs {
go job.Start()
}
}
Loading

0 comments on commit 6bc4295

Please sign in to comment.