diff --git a/cmd/lorry/main.go b/cmd/lorry/main.go index b3534775fcb..0bd52ae583e 100644 --- a/cmd/lorry/main.go +++ b/cmd/lorry/main.go @@ -22,6 +22,7 @@ package main import ( "flag" "fmt" + "net" "net/http" "os" "os/signal" @@ -29,6 +30,8 @@ import ( "github.com/spf13/pflag" "go.uber.org/automaxprocs/maxprocs" + "google.golang.org/grpc" + health "google.golang.org/grpc/health/grpc_health_v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -36,28 +39,36 @@ import ( viper "github.com/apecloud/kubeblocks/internal/viperx" "github.com/apecloud/kubeblocks/lorry/component" "github.com/apecloud/kubeblocks/lorry/highavailability" - "github.com/apecloud/kubeblocks/lorry/middleware/http/probe" + customgrpc "github.com/apecloud/kubeblocks/lorry/middleware/grpc" + probe2 "github.com/apecloud/kubeblocks/lorry/middleware/probe" "github.com/apecloud/kubeblocks/lorry/util" ) -var port int -var configDir string +var ( + port int + grpcPort int + configDir string +) const ( DefaultPort = 3501 + DefaultGRPCPort = 50001 DefaultConfigPath = "/config/lorry/components" ) func init() { viper.AutomaticEnv() - flag.IntVar(&port, "port", DefaultPort, "probe default port") - flag.StringVar(&configDir, "config-path", DefaultConfigPath, "probe default config directory for builtin type") + viper.SetDefault(constant.KBEnvCharacterType, "custom") + flag.IntVar(&port, "port", DefaultPort, "lorry http default port") + flag.IntVar(&grpcPort, "grpcport", DefaultGRPCPort, "lorry grpc default port") + flag.StringVar(&configDir, "config-path", DefaultConfigPath, "lorry default config directory for builtin type") } func main() { // set GOMAXPROCS _, _ = maxprocs.Set() + // setup log opts := zap.Options{ Development: true, } @@ -72,17 +83,20 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + characterType := viper.GetString(constant.KBEnvCharacterType) + workloadType := viper.GetString(constant.KBEnvWorkloadType) err = component.GetAllComponent(configDir) // find all builtin config file and read if err != nil { // Handle errors reading the config file panic(fmt.Errorf("fatal error config file: %v", err)) } - err = probe.RegisterBuiltin() // register all builtin component + err = probe2.RegisterBuiltin(characterType) // register all builtin component if err != nil { panic(fmt.Errorf("fatal error register builtin: %v", err)) } - http.HandleFunc("/", probe.SetMiddleware(probe.GetRouter())) + // start http server for lorry client + http.HandleFunc("/", probe2.SetMiddleware(probe2.GetRouter())) go func() { addr := fmt.Sprintf(":%d", port) err := http.ListenAndServe(addr, nil) @@ -91,9 +105,22 @@ func main() { } }() + // start grpc server for role probe + listen, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort)) + if err != nil { + panic(fmt.Errorf("fatal error listen on port %d: %v", grpcPort, err)) + } + + healthServer := customgrpc.NewGRPCServer() + server := grpc.NewServer() + health.RegisterHealthServer(server, healthServer) + + err = server.Serve(listen) + if err != nil { + panic(fmt.Errorf("fatal error grpcserver serve failed: %v", err)) + } + // ha dependent on dbmanager which is initialized by rt.Run - characterType := viper.GetString(constant.KBEnvCharacterType) - workloadType := viper.GetString(constant.KBEnvWorkloadType) logHa := ctrl.Log.WithName("HA") ha := highavailability.NewHa(logHa) if util.IsHAAvailable(characterType, workloadType) { diff --git a/docker/Dockerfile-probe b/docker/Dockerfile-probe deleted file mode 100644 index 56dffc56e35..00000000000 --- a/docker/Dockerfile-probe +++ /dev/null @@ -1,43 +0,0 @@ -# Build the kubeblocks tools binaries -# only includes role-observation. - -## docker buildx build injected build-args: -#BUILDPLATFORM — matches the current machine. (e.g. linux/amd64) -#BUILDOS — os component of BUILDPLATFORM, e.g. linux -#BUILDARCH — e.g. amd64, arm64, riscv64 -#BUILDVARIANT — used to set build ARM variant, e.g. v7 -#TARGETPLATFORM — The value set with --platform flag on build -#TARGETOS - OS component from --platform, e.g. linux -#TARGETARCH - Architecture from --platform, e.g. arm64 -#TARGETVARIANT - used to set target ARM variant, e.g. v7 - -ARG GO_VERSION=1.21 - -FROM --platform=${BUILDPLATFORM} golang:${GO_VERSION} as builder -ARG TARGETOS -ARG TARGETARCH -ARG GOPROXY -##ARG GOPROXY=https://goproxy.cn -ARG LD_FLAGS="-s -w" - -ENV GONOPROXY=github.com/apecloud -ENV GONOSUMDB=github.com/apecloud -ENV GOPRIVATE=github.com/apecloud -ENV GOPROXY=${GOPROXY} - -WORKDIR /src - -# Build -# role-observe -RUN --mount=type=bind,target=. \ - --mount=type=cache,target=/root/.cache/go-build \ - --mount=type=cache,target=/go/pkg \ - CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -ldflags="${LD_FLAGS}" -a -o /out/probe cmd/lorry/main.go - - -# Use alpine -FROM docker.io/alpine:3.18 as dist - -# copy probe and config -COPY --from=builder /out/probe /bin/probe -COPY config/lorry/components config/probe \ No newline at end of file diff --git a/docker/Dockerfile-tools b/docker/Dockerfile-tools index ea7f76feb7b..f46bb0a25e9 100644 --- a/docker/Dockerfile-tools +++ b/docker/Dockerfile-tools @@ -79,6 +79,10 @@ RUN --mount=type=bind,target=. \ --mount=type=cache,target=/go/pkg/mod \ CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -ldflags="${LD_FLAGS}" -tags="containers_image_openpgp" -a -o /out/kbcli cmd/cli/main.go +RUN GRPC_HEALTH_PROBE_VERSION=v0.4.13 GOOS=${TARGETOS} GOARCH=${TARGETARCH} && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-${GOOS}-${GOARCH} + + # Use alpine with tag 20230329 is corresponding to "edge" tag (latest release to date is 3.18) as of 20230625 FROM docker.io/alpine:edge as dist ARG APK_MIRROR @@ -97,12 +101,17 @@ COPY --from=builder /out/config_render /bin COPY --from=builder /out/lorry /bin COPY --from=builder /out/lorryctl /bin COPY --from=builder /out/kbcli /bin +COPY --from=builder /bin/grpc_health_probe /bin # make breaking change compatible RUN ln -s /bin/lorry /bin/probe RUN ln -s /config/lorry /config/probe +# enable grpc_health_probe binary +RUN chmod +x /bin/grpc_health_probe + # mkdir kbcli config dir and helm cache dir. RUN mkdir /.kbcli && chown -R 65532:65532 /.kbcli \ && mkdir /.cache && chown -R 65532:65532 /.cache USER 65532:65532 + diff --git a/internal/controller/component/probe_utils.go b/internal/controller/component/probe_utils.go index e9e6b2a2c6c..44ee55d3607 100644 --- a/internal/controller/component/probe_utils.go +++ b/internal/controller/component/probe_utils.go @@ -144,7 +144,9 @@ func buildLorryServiceContainer(component *SynthesizedComponent, container *core container.Image = viper.GetString(constant.KBToolsImage) container.ImagePullPolicy = corev1.PullPolicy(viper.GetString(constant.KBImagePullPolicy)) container.Command = []string{"lorry", - "--port", strconv.Itoa(probeSvcHTTPPort)} + "--port", strconv.Itoa(probeSvcHTTPPort), + "--grpcport", strconv.Itoa(probeSvcGRPCPort), + } if len(component.PodSpec.Containers) > 0 { mainContainer := component.PodSpec.Containers[0] diff --git a/internal/controller/rsm/transformer_object_generation.go b/internal/controller/rsm/transformer_object_generation.go index 421f2d8788e..5f4320ed5a0 100644 --- a/internal/controller/rsm/transformer_object_generation.go +++ b/internal/controller/rsm/transformer_object_generation.go @@ -402,6 +402,10 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat if probeDaemonPort == 0 { probeDaemonPort = defaultRoleProbeDaemonPort } + probeGRPCPort := viper.GetInt("ROLE_PROBE_GRPC_PORT") + if probeGRPCPort == 0 { + probeGRPCPort = defaultRoleProbeGRPCPort + } env := credentialEnv env = append(env, corev1.EnvVar{ @@ -489,9 +493,11 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat readinessProbe := &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: roleProbeURI, - Port: intstr.FromInt(probeDaemonPort), + Exec: &corev1.ExecAction{ + Command: []string{ + grpcHealthProbeBinaryPath, + fmt.Sprintf(grpcHealthProbeArgsFormat, probeGRPCPort), + }, }, }, InitialDelaySeconds: roleProbe.InitialDelaySeconds, @@ -517,12 +523,16 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat } return nil } + // if role probe container exists, update the readiness probe, env and serving container port if container := tryToGetRoleProbeContainer(); container != nil { - // presume the first port is the http port. + // presume the second port is the grpc port. // this is an easily broken contract between rsm controller and cluster controller. // TODO(free6om): design a better way to do this after Lorry-WeSyncer separation done - readinessProbe.HTTPGet.Port = intstr.FromInt(int(container.Ports[0].ContainerPort)) + readinessProbe.Exec.Command = []string{ + grpcHealthProbeBinaryPath, + fmt.Sprintf(grpcHealthProbeArgsFormat, int(container.Ports[1].ContainerPort)), + } container.ReadinessProbe = readinessProbe for _, e := range env { if slices.IndexFunc(container.Env, func(v corev1.EnvVar) bool { @@ -543,13 +553,21 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat AddCommands([]string{ roleProbeBinaryName, "--port", strconv.Itoa(probeDaemonPort), + "--grpcport", strconv.Itoa(probeGRPCPort), }...). AddEnv(env...). - AddPorts(corev1.ContainerPort{ - ContainerPort: int32(probeDaemonPort), - Name: roleProbeContainerName, - Protocol: "TCP", - }). + AddPorts( + corev1.ContainerPort{ + ContainerPort: int32(probeDaemonPort), + Name: roleProbeContainerName, + Protocol: "TCP", + }, + corev1.ContainerPort{ + ContainerPort: int32(probeGRPCPort), + Name: roleProbeGRPCPortName, + Protocol: "TCP", + }, + ). SetReadinessProbe(*readinessProbe). GetObject() diff --git a/internal/controller/rsm/types.go b/internal/controller/rsm/types.go index 9912c48143b..4da747b09ea 100644 --- a/internal/controller/rsm/types.go +++ b/internal/controller/rsm/types.go @@ -80,7 +80,10 @@ const ( shell2httpServePath = "/role" defaultRoleProbeAgentImage = "apecloud/kubeblocks-tools:latest" defaultRoleProbeDaemonPort = 7373 - roleProbeURI = "/v1.0/bindings/custom?operation=checkRole" + defaultRoleProbeGRPCPort = 50001 + roleProbeGRPCPortName = "probe-grpc-port" + grpcHealthProbeBinaryPath = "/bin/grpc_health_probe" + grpcHealthProbeArgsFormat = "-addr=:%d" defaultActionImage = "busybox:latest" usernameCredentialVarName = "KB_RSM_USERNAME" passwordCredentialVarName = "KB_RSM_PASSWORD" diff --git a/lorry/binding/custom/custom.go b/lorry/binding/custom/custom.go index 0de70843e22..8e67419769c 100644 --- a/lorry/binding/custom/custom.go +++ b/lorry/binding/custom/custom.go @@ -47,7 +47,7 @@ type HTTPCustom struct { BaseOperations } -var perNodeRegx = regexp.MustCompile("[a-zA-Z0-9]+") +var perNodeRegx = regexp.MustCompile("^[^,]*$") // NewHTTPCustom returns a new HTTPCustom. func NewHTTPCustom() *HTTPCustom { diff --git a/lorry/binding/mongodb/mongodb_test.go b/lorry/binding/mongodb/mongodb_test.go index 8f4aafe965d..c1dfbad8f5b 100644 --- a/lorry/binding/mongodb/mongodb_test.go +++ b/lorry/binding/mongodb/mongodb_test.go @@ -45,6 +45,7 @@ func TestGetRole(t *testing.T) { primitive.E{Key: "_id", Value: 0}, primitive.E{Key: "state", Value: 1}, primitive.E{Key: "stateStr", Value: "PRIMARY"}, + primitive.E{Key: "self", Value: true}, }, }}, }) diff --git a/lorry/client/client_test.go b/lorry/client/client_test.go index f0489c3c8de..d98b7a5dc07 100644 --- a/lorry/client/client_test.go +++ b/lorry/client/client_test.go @@ -37,6 +37,7 @@ import ( "github.com/apecloud/kubeblocks/internal/constant" testapps "github.com/apecloud/kubeblocks/internal/testutil/apps" + viper "github.com/apecloud/kubeblocks/internal/viperx" . "github.com/apecloud/kubeblocks/lorry/util" ) @@ -96,6 +97,8 @@ func TestGetRole(t *testing.T) { portStr := addr[index+1:] port, _ := strconv.Atoi(portStr) + viper.Set(constant.KBToolsImage, "lorry") + cli, closer, err := initSQLChannelClient(port, t) if err != nil { t.Errorf("new sql channel client error: %v", err) @@ -114,7 +117,7 @@ func TestGetRole(t *testing.T) { }) t.Run("ResponseTimeout", func(t *testing.T) { - cli.ReconcileTimeout = 0 * time.Millisecond + cli.ReconcileTimeout = 0 _, err := cli.GetRole() t.Logf("err: %v", err) if err == nil { @@ -311,7 +314,8 @@ func initSQLChannelClient(httpPort int, t *testing.T) (*OperationClient, func(), port, closer := newTCPServer(t, 50001) podName := "pod-for-sqlchannel-test" pod := testapps.NewPodFactory("default", podName). - AddContainer(corev1.Container{Name: testapps.DefaultNginxContainerName, Image: testapps.NginxImage}).GetObject() + AddContainer(corev1.Container{Name: viper.GetString(constant.KBToolsImage), Image: viper.GetString(constant.KBToolsImage)}). + GetObject() pod.Spec.Containers[0].Ports = []corev1.ContainerPort{ { ContainerPort: int32(httpPort), diff --git a/lorry/middleware/grpc/gprc_server.go b/lorry/middleware/grpc/gprc_server.go new file mode 100644 index 00000000000..6e3387f94b7 --- /dev/null +++ b/lorry/middleware/grpc/gprc_server.go @@ -0,0 +1,78 @@ +/* +Copyright (C) 2022-2023 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package grpc + +import ( + "context" + "encoding/json" + "errors" + + "github.com/go-logr/logr" + "google.golang.org/grpc/codes" + health "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/apecloud/kubeblocks/internal/constant" + viper "github.com/apecloud/kubeblocks/internal/viperx" + . "github.com/apecloud/kubeblocks/lorry/binding" + probe2 "github.com/apecloud/kubeblocks/lorry/middleware/probe" +) + +type GRPCServer struct { + character string + logger logr.Logger + router func(ctx context.Context) (*ProbeResponse, error) +} + +func (s *GRPCServer) Check(ctx context.Context, in *health.HealthCheckRequest) (*health.HealthCheckResponse, error) { + s.logger.Info("role probe request", "type", s.character) + resp, err := s.router(ctx) + + if err != nil { + s.logger.Error(err, "role probe failed") + return &health.HealthCheckResponse{Status: health.HealthCheckResponse_NOT_SERVING}, err + } + + code, ok := resp.Metadata[StatusCode] + if ok && code == OperationFailedHTTPCode { + s.logger.Info("Role changed event detected", "role", string(resp.Data)) + return &health.HealthCheckResponse{Status: health.HealthCheckResponse_NOT_SERVING}, errors.New(string(resp.Data)) + } + + meta, _ := json.Marshal(resp.Metadata) + + s.logger.Info("No event detected", "meta", string(meta), "data", string(resp.Data)) + return &health.HealthCheckResponse{Status: health.HealthCheckResponse_SERVING}, nil +} + +func (s *GRPCServer) Watch(in *health.HealthCheckRequest, _ health.Health_WatchServer) error { + // didn't implement the `watch` function + return status.Error(codes.Unimplemented, "unimplemented") +} + +func NewGRPCServer() *GRPCServer { + characterType := viper.GetString(constant.KBEnvCharacterType) + return &GRPCServer{ + character: characterType, + logger: ctrl.Log.WithName("grpc"), + router: probe2.GetGrpcRouter(characterType), + } +} diff --git a/lorry/middleware/grpc/grpc_server_test.go b/lorry/middleware/grpc/grpc_server_test.go new file mode 100644 index 00000000000..41113a5b1e4 --- /dev/null +++ b/lorry/middleware/grpc/grpc_server_test.go @@ -0,0 +1,56 @@ +package grpc + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + health "google.golang.org/grpc/health/grpc_health_v1" + + viper "github.com/apecloud/kubeblocks/internal/viperx" + "github.com/apecloud/kubeblocks/lorry/binding" + "github.com/apecloud/kubeblocks/lorry/middleware/probe" +) + +func TestNewServer(t *testing.T) { + server := NewGRPCServer() + assert.NotNil(t, server.logger) + assert.Error(t, server.Watch(nil, nil)) +} + +func TestCheck(t *testing.T) { + // set up the host + s := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + _, _ = w.Write([]byte("leader")) + }), + ) + addr := s.Listener.Addr().String() + index := strings.LastIndex(addr, ":") + portStr := addr[index+1:] + + // set up the environment + viper.Set("KB_RSM_ACTION_SVC_LIST", "["+portStr+"]") + viper.Set("KB_RSM_ROLE_UPDATE_MECHANISM", "ReadinessProbeEventUpdate") + + assert.Nil(t, probe.RegisterBuiltin("")) + server := NewGRPCServer() + check, err := server.Check(context.Background(), nil) + + assert.Error(t, err) + assert.Equal(t, health.HealthCheckResponse_NOT_SERVING, check.Status) + + // set up the expected answer + result := binding.OpsResult{} + result["event"] = "Success" + result["operation"] = "checkRole" + result["originalRole"] = "" + result["role"] = "leader" + ans, _ := json.Marshal(result) + + assert.Equal(t, string(ans), err.Error()) +} diff --git a/lorry/middleware/http/probe/checks_middleware.go b/lorry/middleware/probe/checks_middleware.go similarity index 96% rename from lorry/middleware/http/probe/checks_middleware.go rename to lorry/middleware/probe/checks_middleware.go index 614aa63031f..948096fcf46 100644 --- a/lorry/middleware/http/probe/checks_middleware.go +++ b/lorry/middleware/probe/checks_middleware.go @@ -53,7 +53,7 @@ func init() { logger = ctrl.Log.WithName("middleware") } -func GetRequestBody(operation string, args map[string][]string) []byte { +func getRequestBody(operation string, args map[string][]string) []byte { metadata := make(map[string]string) walkFunc := func(key string, value []string) { if key == operationKey { @@ -94,7 +94,7 @@ func SetMiddleware(next http.HandlerFunc) http.HandlerFunc { operation := uri.Query().Get(operationKey) if strings.HasPrefix(operation, "get") || strings.HasPrefix(operation, "check") || strings.HasPrefix(operation, "list") { - body := GetRequestBody(operation, uri.Query()) + body := getRequestBody(operation, uri.Query()) request.Body = io.NopCloser(bytes.NewReader(body)) } else { logger.Info("unknown probe operation", "operation", operation) diff --git a/lorry/middleware/http/probe/checks_middleware_test.go b/lorry/middleware/probe/checks_middleware_test.go similarity index 98% rename from lorry/middleware/http/probe/checks_middleware_test.go rename to lorry/middleware/probe/checks_middleware_test.go index 0518a36473e..40b2d8bf71a 100644 --- a/lorry/middleware/http/probe/checks_middleware_test.go +++ b/lorry/middleware/probe/checks_middleware_test.go @@ -33,7 +33,7 @@ func TestGetRequestBody(t *testing.T) { mock := make(map[string][]string) mock["sql"] = []string{"dd"} operation := "exec" - body := GetRequestBody(operation, mock) + body := getRequestBody(operation, mock) meta := RequestMeta{ Operation: operation, diff --git a/lorry/middleware/http/probe/router.go b/lorry/middleware/probe/router.go similarity index 61% rename from lorry/middleware/http/probe/router.go rename to lorry/middleware/probe/router.go index 66bb03a44e8..3870e305336 100644 --- a/lorry/middleware/http/probe/router.go +++ b/lorry/middleware/probe/router.go @@ -25,10 +25,11 @@ import ( "fmt" "io" "net/http" - "strings" + viper "github.com/apecloud/kubeblocks/internal/viperx" "github.com/go-errors/errors" + "github.com/apecloud/kubeblocks/internal/constant" . "github.com/apecloud/kubeblocks/lorry/binding" "github.com/apecloud/kubeblocks/lorry/binding/custom" "github.com/apecloud/kubeblocks/lorry/binding/etcd" @@ -43,66 +44,66 @@ import ( var builtinMap = make(map[string]BaseInternalOps) var customOp *custom.HTTPCustom -func RegisterBuiltin() error { +func RegisterBuiltin(characterType string) error { initErrFmt := "%s init err: %v" - - mysqlOp := mysql.NewMysql() - builtinMap["mysql"] = mysqlOp - properties := component.GetProperties("mysql") - err := mysqlOp.Init(properties) - if err != nil { - return errors.Errorf(initErrFmt, "mysql", err) - } - - redisOp := redis.NewRedis() - builtinMap["redis"] = redisOp - properties = component.GetProperties("redis") - err = redisOp.Init(properties) - if err != nil { - return errors.Errorf(initErrFmt, "redis", err) - } - - pgOp := postgres.NewPostgres() - builtinMap["postgres"] = pgOp - properties = component.GetProperties("postgres") - err = pgOp.Init(properties) - if err != nil { - return errors.Errorf(initErrFmt, "postgres", err) - } - - etcdOp := etcd.NewEtcd() - builtinMap["etcd"] = etcdOp - properties = component.GetProperties("etcd") - err = etcdOp.Init(properties) - if err != nil { - return errors.Errorf(initErrFmt, "etcd", err) - } - - mongoOp := mongodb.NewMongoDB() - builtinMap["mongodb"] = mongoOp - properties = component.GetProperties("mongodb") - err = mongoOp.Init(properties) - if err != nil { - return errors.Errorf(initErrFmt, "mongodb", err) - } - - customOp = custom.NewHTTPCustom() - empty := make(component.Properties) - err = customOp.Init(empty) - if err != nil { - return errors.Errorf(initErrFmt, "custom", err) + switch characterType { + case "mysql": + mysqlOp := mysql.NewMysql() + builtinMap["mysql"] = mysqlOp + properties := component.GetProperties("mysql") + err := mysqlOp.Init(properties) + if err != nil { + return errors.Errorf(initErrFmt, "mysql", err) + } + case "redis": + redisOp := redis.NewRedis() + builtinMap["redis"] = redisOp + properties := component.GetProperties("redis") + err := redisOp.Init(properties) + if err != nil { + return errors.Errorf(initErrFmt, "redis", err) + } + case "postgres": + pgOp := postgres.NewPostgres() + builtinMap["postgres"] = pgOp + properties := component.GetProperties("postgres") + err := pgOp.Init(properties) + if err != nil { + return errors.Errorf(initErrFmt, "postgres", err) + } + case "etcd": + etcdOp := etcd.NewEtcd() + builtinMap["etcd"] = etcdOp + properties := component.GetProperties("etcd") + err := etcdOp.Init(properties) + if err != nil { + return errors.Errorf(initErrFmt, "etcd", err) + } + case "mongodb": + mongoOp := mongodb.NewMongoDB() + builtinMap["mongodb"] = mongoOp + properties := component.GetProperties("mongodb") + err := mongoOp.Init(properties) + if err != nil { + return errors.Errorf(initErrFmt, "mongodb", err) + } + default: + customOp = custom.NewHTTPCustom() + empty := make(component.Properties) + err := customOp.Init(empty) + if err != nil { + return errors.Errorf(initErrFmt, "custom", err) + } } - return nil } func GetRouter() func(writer http.ResponseWriter, request *http.Request) { return func(writer http.ResponseWriter, request *http.Request) { // get the character type - character := GetCharacter(request.URL.Path) - if character == "" { - logger.Error(nil, "character type missing in path") - return + characterType := viper.GetString(constant.KBEnvCharacterType) + if len(characterType) == 0 { + characterType = "custom" } body := request.Body @@ -123,7 +124,7 @@ func GetRouter() func(writer http.ResponseWriter, request *http.Request) { probeRequest.Operation = util.OperationKind(meta.Operation) // route the request to engine - probeResp, err := route(character, request.Context(), probeRequest) + probeResp, err := route(characterType, request.Context(), probeRequest) logger.Info("request routed", "request", probeRequest, "response", probeResp) if err != nil { @@ -149,17 +150,6 @@ func GetRouter() func(writer http.ResponseWriter, request *http.Request) { } } -func GetCharacter(url string) string { - if !strings.HasPrefix(url, bindingPath) { - return "" - } - splits := strings.Split(url, "/") - if len(splits) < 4 { - return "" - } - return splits[3] -} - func route(character string, ctx context.Context, request *ProbeRequest) (*ProbeResponse, error) { ops, ok := builtinMap[character] // if there is no builtin type, use the custom @@ -169,3 +159,10 @@ func route(character string, ctx context.Context, request *ProbeRequest) (*Probe } return ops.Invoke(ctx, request) } + +func GetGrpcRouter(character string) func(ctx context.Context) (*ProbeResponse, error) { + return func(ctx context.Context) (*ProbeResponse, error) { + getRoleRequest := &ProbeRequest{Operation: util.CheckRoleOperation} + return route(character, ctx, getRoleRequest) + } +} diff --git a/lorry/middleware/http/probe/router_test.go b/lorry/middleware/probe/router_test.go similarity index 89% rename from lorry/middleware/http/probe/router_test.go rename to lorry/middleware/probe/router_test.go index 87924b3c7d2..5be43e7de1e 100644 --- a/lorry/middleware/http/probe/router_test.go +++ b/lorry/middleware/probe/router_test.go @@ -34,18 +34,6 @@ import ( "github.com/apecloud/kubeblocks/lorry/util" ) -func TestGetCharacter(t *testing.T) { - t.Run("Wrong Prefix", func(t *testing.T) { - character := GetCharacter("/v2.0/bindings/mysql") - assert.Equal(t, "", character) - }) - - t.Run("mysql", func(t *testing.T) { - character := GetCharacter("/v1.0/bindings/mysql") - assert.Equal(t, "mysql", character) - }) -} - func TestGetRouter(t *testing.T) { t.Run("Use Custom to get role", func(t *testing.T) { s := httptest.NewServer( @@ -105,5 +93,5 @@ func emptyConfig() map[string]component.Properties { func mockRegister() error { component.Name2Property = emptyConfig() - return RegisterBuiltin() + return RegisterBuiltin("custom") }