Skip to content

Commit

Permalink
fix: fix role probe failed when roleUpdateMechanism is readinessProbe…
Browse files Browse the repository at this point in the history
…EventUpdate
  • Loading branch information
derecknowayback authored Oct 11, 2023
1 parent 23bcfc2 commit c72e3cd
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 148 deletions.
45 changes: 36 additions & 9 deletions cmd/lorry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,53 @@ package main
import (
"flag"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/spf13/pflag"
"go.uber.org/automaxprocs/maxprocs"
"google.golang.org/grpc"
health "google.golang.org/grpc/health/grpc_health_v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/apecloud/kubeblocks/internal/constant"
viper "github.com/apecloud/kubeblocks/internal/viperx"
"github.com/apecloud/kubeblocks/lorry/component"
"github.com/apecloud/kubeblocks/lorry/highavailability"
"github.com/apecloud/kubeblocks/lorry/middleware/http/probe"
customgrpc "github.com/apecloud/kubeblocks/lorry/middleware/grpc"
probe2 "github.com/apecloud/kubeblocks/lorry/middleware/probe"
"github.com/apecloud/kubeblocks/lorry/util"
)

var port int
var configDir string
var (
port int
grpcPort int
configDir string
)

const (
DefaultPort = 3501
DefaultGRPCPort = 50001
DefaultConfigPath = "/config/lorry/components"
)

func init() {
viper.AutomaticEnv()
flag.IntVar(&port, "port", DefaultPort, "probe default port")
flag.StringVar(&configDir, "config-path", DefaultConfigPath, "probe default config directory for builtin type")
viper.SetDefault(constant.KBEnvCharacterType, "custom")
flag.IntVar(&port, "port", DefaultPort, "lorry http default port")
flag.IntVar(&grpcPort, "grpcport", DefaultGRPCPort, "lorry grpc default port")
flag.StringVar(&configDir, "config-path", DefaultConfigPath, "lorry default config directory for builtin type")
}

func main() {
// set GOMAXPROCS
_, _ = maxprocs.Set()

// setup log
opts := zap.Options{
Development: true,
}
Expand All @@ -72,17 +83,20 @@ func main() {

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

characterType := viper.GetString(constant.KBEnvCharacterType)
workloadType := viper.GetString(constant.KBEnvWorkloadType)
err = component.GetAllComponent(configDir) // find all builtin config file and read
if err != nil { // Handle errors reading the config file
panic(fmt.Errorf("fatal error config file: %v", err))
}

err = probe.RegisterBuiltin() // register all builtin component
err = probe2.RegisterBuiltin(characterType) // register all builtin component
if err != nil {
panic(fmt.Errorf("fatal error register builtin: %v", err))
}

http.HandleFunc("/", probe.SetMiddleware(probe.GetRouter()))
// start http server for lorry client
http.HandleFunc("/", probe2.SetMiddleware(probe2.GetRouter()))
go func() {
addr := fmt.Sprintf(":%d", port)
err := http.ListenAndServe(addr, nil)
Expand All @@ -91,9 +105,22 @@ func main() {
}
}()

// start grpc server for role probe
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
panic(fmt.Errorf("fatal error listen on port %d: %v", grpcPort, err))
}

healthServer := customgrpc.NewGRPCServer()
server := grpc.NewServer()
health.RegisterHealthServer(server, healthServer)

err = server.Serve(listen)
if err != nil {
panic(fmt.Errorf("fatal error grpcserver serve failed: %v", err))
}

// ha dependent on dbmanager which is initialized by rt.Run
characterType := viper.GetString(constant.KBEnvCharacterType)
workloadType := viper.GetString(constant.KBEnvWorkloadType)
logHa := ctrl.Log.WithName("HA")
ha := highavailability.NewHa(logHa)
if util.IsHAAvailable(characterType, workloadType) {
Expand Down
43 changes: 0 additions & 43 deletions docker/Dockerfile-probe

This file was deleted.

9 changes: 9 additions & 0 deletions docker/Dockerfile-tools
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ RUN --mount=type=bind,target=. \
--mount=type=cache,target=/go/pkg/mod \
CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -ldflags="${LD_FLAGS}" -tags="containers_image_openpgp" -a -o /out/kbcli cmd/cli/main.go

RUN GRPC_HEALTH_PROBE_VERSION=v0.4.13 GOOS=${TARGETOS} GOARCH=${TARGETARCH} && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-${GOOS}-${GOARCH}


# Use alpine with tag 20230329 is corresponding to "edge" tag (latest release to date is 3.18) as of 20230625
FROM docker.io/alpine:edge as dist
ARG APK_MIRROR
Expand All @@ -97,12 +101,17 @@ COPY --from=builder /out/config_render /bin
COPY --from=builder /out/lorry /bin
COPY --from=builder /out/lorryctl /bin
COPY --from=builder /out/kbcli /bin
COPY --from=builder /bin/grpc_health_probe /bin

# make breaking change compatible
RUN ln -s /bin/lorry /bin/probe
RUN ln -s /config/lorry /config/probe

# enable grpc_health_probe binary
RUN chmod +x /bin/grpc_health_probe

# mkdir kbcli config dir and helm cache dir.
RUN mkdir /.kbcli && chown -R 65532:65532 /.kbcli \
&& mkdir /.cache && chown -R 65532:65532 /.cache
USER 65532:65532

4 changes: 3 additions & 1 deletion internal/controller/component/probe_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ func buildLorryServiceContainer(component *SynthesizedComponent, container *core
container.Image = viper.GetString(constant.KBToolsImage)
container.ImagePullPolicy = corev1.PullPolicy(viper.GetString(constant.KBImagePullPolicy))
container.Command = []string{"lorry",
"--port", strconv.Itoa(probeSvcHTTPPort)}
"--port", strconv.Itoa(probeSvcHTTPPort),
"--grpcport", strconv.Itoa(probeSvcGRPCPort),
}

if len(component.PodSpec.Containers) > 0 {
mainContainer := component.PodSpec.Containers[0]
Expand Down
38 changes: 28 additions & 10 deletions internal/controller/rsm/transformer_object_generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat
if probeDaemonPort == 0 {
probeDaemonPort = defaultRoleProbeDaemonPort
}
probeGRPCPort := viper.GetInt("ROLE_PROBE_GRPC_PORT")
if probeGRPCPort == 0 {
probeGRPCPort = defaultRoleProbeGRPCPort
}
env := credentialEnv
env = append(env,
corev1.EnvVar{
Expand Down Expand Up @@ -489,9 +493,11 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat

readinessProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: roleProbeURI,
Port: intstr.FromInt(probeDaemonPort),
Exec: &corev1.ExecAction{
Command: []string{
grpcHealthProbeBinaryPath,
fmt.Sprintf(grpcHealthProbeArgsFormat, probeGRPCPort),
},
},
},
InitialDelaySeconds: roleProbe.InitialDelaySeconds,
Expand All @@ -517,12 +523,16 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat
}
return nil
}

// if role probe container exists, update the readiness probe, env and serving container port
if container := tryToGetRoleProbeContainer(); container != nil {
// presume the first port is the http port.
// presume the second port is the grpc port.
// this is an easily broken contract between rsm controller and cluster controller.
// TODO(free6om): design a better way to do this after Lorry-WeSyncer separation done
readinessProbe.HTTPGet.Port = intstr.FromInt(int(container.Ports[0].ContainerPort))
readinessProbe.Exec.Command = []string{
grpcHealthProbeBinaryPath,
fmt.Sprintf(grpcHealthProbeArgsFormat, int(container.Ports[1].ContainerPort)),
}
container.ReadinessProbe = readinessProbe
for _, e := range env {
if slices.IndexFunc(container.Env, func(v corev1.EnvVar) bool {
Expand All @@ -543,13 +553,21 @@ func injectRoleProbeAgentContainer(rsm workloads.ReplicatedStateMachine, templat
AddCommands([]string{
roleProbeBinaryName,
"--port", strconv.Itoa(probeDaemonPort),
"--grpcport", strconv.Itoa(probeGRPCPort),
}...).
AddEnv(env...).
AddPorts(corev1.ContainerPort{
ContainerPort: int32(probeDaemonPort),
Name: roleProbeContainerName,
Protocol: "TCP",
}).
AddPorts(
corev1.ContainerPort{
ContainerPort: int32(probeDaemonPort),
Name: roleProbeContainerName,
Protocol: "TCP",
},
corev1.ContainerPort{
ContainerPort: int32(probeGRPCPort),
Name: roleProbeGRPCPortName,
Protocol: "TCP",
},
).
SetReadinessProbe(*readinessProbe).
GetObject()

Expand Down
5 changes: 4 additions & 1 deletion internal/controller/rsm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ const (
shell2httpServePath = "/role"
defaultRoleProbeAgentImage = "apecloud/kubeblocks-tools:latest"
defaultRoleProbeDaemonPort = 7373
roleProbeURI = "/v1.0/bindings/custom?operation=checkRole"
defaultRoleProbeGRPCPort = 50001
roleProbeGRPCPortName = "probe-grpc-port"
grpcHealthProbeBinaryPath = "/bin/grpc_health_probe"
grpcHealthProbeArgsFormat = "-addr=:%d"
defaultActionImage = "busybox:latest"
usernameCredentialVarName = "KB_RSM_USERNAME"
passwordCredentialVarName = "KB_RSM_PASSWORD"
Expand Down
2 changes: 1 addition & 1 deletion lorry/binding/custom/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type HTTPCustom struct {
BaseOperations
}

var perNodeRegx = regexp.MustCompile("[a-zA-Z0-9]+")
var perNodeRegx = regexp.MustCompile("^[^,]*$")

// NewHTTPCustom returns a new HTTPCustom.
func NewHTTPCustom() *HTTPCustom {
Expand Down
1 change: 1 addition & 0 deletions lorry/binding/mongodb/mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestGetRole(t *testing.T) {
primitive.E{Key: "_id", Value: 0},
primitive.E{Key: "state", Value: 1},
primitive.E{Key: "stateStr", Value: "PRIMARY"},
primitive.E{Key: "self", Value: true},
},
}},
})
Expand Down
8 changes: 6 additions & 2 deletions lorry/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/apecloud/kubeblocks/internal/constant"
testapps "github.com/apecloud/kubeblocks/internal/testutil/apps"
viper "github.com/apecloud/kubeblocks/internal/viperx"
. "github.com/apecloud/kubeblocks/lorry/util"
)

Expand Down Expand Up @@ -96,6 +97,8 @@ func TestGetRole(t *testing.T) {
portStr := addr[index+1:]
port, _ := strconv.Atoi(portStr)

viper.Set(constant.KBToolsImage, "lorry")

cli, closer, err := initSQLChannelClient(port, t)
if err != nil {
t.Errorf("new sql channel client error: %v", err)
Expand All @@ -114,7 +117,7 @@ func TestGetRole(t *testing.T) {
})

t.Run("ResponseTimeout", func(t *testing.T) {
cli.ReconcileTimeout = 0 * time.Millisecond
cli.ReconcileTimeout = 0
_, err := cli.GetRole()
t.Logf("err: %v", err)
if err == nil {
Expand Down Expand Up @@ -311,7 +314,8 @@ func initSQLChannelClient(httpPort int, t *testing.T) (*OperationClient, func(),
port, closer := newTCPServer(t, 50001)
podName := "pod-for-sqlchannel-test"
pod := testapps.NewPodFactory("default", podName).
AddContainer(corev1.Container{Name: testapps.DefaultNginxContainerName, Image: testapps.NginxImage}).GetObject()
AddContainer(corev1.Container{Name: viper.GetString(constant.KBToolsImage), Image: viper.GetString(constant.KBToolsImage)}).
GetObject()
pod.Spec.Containers[0].Ports = []corev1.ContainerPort{
{
ContainerPort: int32(httpPort),
Expand Down
Loading

0 comments on commit c72e3cd

Please sign in to comment.