Skip to content

Commit

Permalink
fix call vs request
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Nov 21, 2023
1 parent 1311ed9 commit efdd7ae
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 606 deletions.
10 changes: 0 additions & 10 deletions cluster/clusterproviders/zk/log.go
Original file line number Diff line number Diff line change
@@ -1,11 +1 @@
package zk

import "github.com/asynkron/protoactor-go/log"

var plog = log.New(log.InfoLevel, "[CLU/ZK]")

// SetLogLevel sets the log level for the logger
// SetLogLevel is safe to be called concurrently
func SetLogLevel(level log.Level) {
plog.SetLevel(level)
}
60 changes: 29 additions & 31 deletions cluster/clusterproviders/zk/zk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package zk
import (
"context"
"fmt"
"log/slog"
"net"
"strconv"
"strings"
"time"

"github.com/asynkron/protoactor-go/cluster"
"github.com/asynkron/protoactor-go/log"
"github.com/go-zookeeper/zk"
)

Expand Down Expand Up @@ -71,12 +71,10 @@ func New(endpoints []string, opts ...Option) (*Provider, error) {
}
conn, err := connectZk(endpoints, zkCfg.SessionTimeout, WithEventCallback(p.onEvent))
if err != nil {
plog.Error("connect zk fail", log.Error(err))
return nil, err
}
if auth := zkCfg.Auth; !auth.isEmpty() {
if err = conn.AddAuth(auth.Scheme, []byte(auth.Credential)); err != nil {
plog.Error("auth failure.", log.String("scheme", auth.Scheme), log.String("cred", auth.Credential), log.Error(err))
return nil, err
}
}
Expand Down Expand Up @@ -113,23 +111,23 @@ func (p *Provider) init(c *cluster.Cluster) error {

func (p *Provider) StartMember(c *cluster.Cluster) error {
if err := p.init(c); err != nil {
plog.Error("init fail " + err.Error())
p.cluster.Logger().Error("init fail " + err.Error())
return err
}

p.startRoleChangedNotifyLoop()

// register self
if err := p.registerService(); err != nil {
plog.Error("register service fail " + err.Error())
p.cluster.Logger().Error("register service fail " + err.Error())
return err
}
plog.Info("StartMember register service.", log.String("node", p.self.ID), log.String("seq", p.self.Meta[metaKeySeq]))
p.cluster.Logger().Info("StartMember register service.", slog.String("node", p.self.ID), slog.String("seq", p.self.Meta[metaKeySeq]))

// fetch member list
nodes, version, err := p.fetchNodes()
if err != nil {
plog.Error("fetch nodes fail " + err.Error())
p.cluster.Logger().Error("fetch nodes fail " + err.Error())
return err
}
// initialize members
Expand Down Expand Up @@ -163,7 +161,7 @@ func (p *Provider) Shutdown(graceful bool) error {
p.updateLeadership(nil)
err := p.deregisterService()
if err != nil {
plog.Error("deregisterMember", log.Error(err))
p.cluster.Logger().Error("deregisterMember", slog.Any("error", err))
return err
}
p.deregistered = true
Expand All @@ -178,19 +176,19 @@ func (p *Provider) getID() string {
func (p *Provider) registerService() error {
data, err := p.self.Serialize()
if err != nil {
plog.Error("registerService Serialize fail.", log.Error(err))
p.cluster.Logger().Error("registerService Serialize fail.", slog.Any("error", err))
return err
}

path, err := p.createEphemeralChildNode(data)
if err != nil {
plog.Error("createEphemeralChildNode fail.", log.String("node", p.clusterKey), log.Error(err))
p.cluster.Logger().Error("createEphemeralChildNode fail.", slog.String("node", p.clusterKey), slog.Any("error", err))
return err
}
p.fullpath = path
seq, _ := parseSeq(path)
p.self.SetMeta(metaKeySeq, intToStr(seq))
plog.Info("RegisterService.", log.String("id", p.self.ID), log.Int("seq", seq))
p.cluster.Logger().Info("RegisterService.", slog.String("id", p.self.ID), slog.Int("seq", seq))

return nil
}
Expand All @@ -201,7 +199,7 @@ func (p *Provider) createClusterNode(dir string) error {
}
exist, _, err := p.conn.Exists(dir)
if err != nil {
plog.Error("check exist of node fail", log.String("dir", dir), log.Error(err))
p.cluster.Logger().Error("check exist of node fail", slog.String("dir", dir), slog.Any("error", err))
return err
}
if exist {
Expand All @@ -211,7 +209,7 @@ func (p *Provider) createClusterNode(dir string) error {
return err
}
if _, err = p.conn.Create(dir, []byte{}, 0, zk.WorldACL(zk.PermAll)); err != nil {
plog.Error("create dir node fail", log.String("dir", dir), log.Error(err))
p.cluster.Logger().Error("create dir node fail", slog.String("dir", dir), slog.Any("error", err))
return err
}
return nil
Expand All @@ -229,7 +227,7 @@ func (p *Provider) deregisterService() error {
func (p *Provider) keepWatching(ctx context.Context, registerSelf bool) error {
evtChan, err := p.addWatcher(ctx, p.clusterKey)
if err != nil {
plog.Error("list children fail", log.String("node", p.clusterKey), log.Error(err))
p.cluster.Logger().Error("list children fail", slog.String("node", p.clusterKey), slog.Any("error", err))
return err
}

Expand All @@ -239,16 +237,16 @@ func (p *Provider) keepWatching(ctx context.Context, registerSelf bool) error {
func (p *Provider) addWatcher(ctx context.Context, clusterKey string) (<-chan zk.Event, error) {
_, stat, evtChan, err := p.conn.ChildrenW(clusterKey)
if err != nil {
plog.Error("list children fail", log.String("node", clusterKey), log.Error(err))
p.cluster.Logger().Error("list children fail", slog.String("node", clusterKey), slog.Any("error", err))
return nil, err
}

plog.Info("KeepWatching cluster.", log.String("cluster", clusterKey), log.Int("children", int(stat.NumChildren)))
p.cluster.Logger().Info("KeepWatching cluster.", slog.String("cluster", clusterKey), slog.Int("children", int(stat.NumChildren)))
if !p.isChildrenChanged(ctx, stat) {
return evtChan, nil
}

plog.Info("Chilren changed, wait 1 sec and watch again", log.Int("old_cversion", int(p.revision)), log.Int("new_revison", int(stat.Cversion)))
p.cluster.Logger().Info("Chilren changed, wait 1 sec and watch again", slog.Int("old_cversion", int(p.revision)), slog.Int("new_revison", int(stat.Cversion)))
time.Sleep(1 * time.Second)
nodes, version, err := p.fetchNodes()
if err != nil {
Expand All @@ -268,16 +266,16 @@ func (p *Provider) isChildrenChanged(ctx context.Context, stat *zk.Stat) bool {
func (p *Provider) _keepWatching(registerSelf bool, stream <-chan zk.Event) error {
event := <-stream
if err := event.Err; err != nil {
plog.Error("Failure watching service.", log.Error(err))
p.cluster.Logger().Error("Failure watching service.", slog.Any("error", err))
if registerSelf && p.clusterNotContainsSelfPath() {
plog.Info("Register info lost, register self again")
p.cluster.Logger().Info("Register info lost, register self again")
p.registerService()
}
return err
}
nodes, version, err := p.fetchNodes()
if err != nil {
plog.Error("Failure fetch nodes when watching service.", log.Error(err))
p.cluster.Logger().Error("Failure fetch nodes when watching service.", slog.Any("error", err))
return err
}
if !p.containSelf(nodes) && registerSelf {
Expand All @@ -288,7 +286,7 @@ func (p *Provider) _keepWatching(registerSelf bool, stream <-chan zk.Event) erro
// reload nodes
nodes, version, err = p.fetchNodes()
if err != nil {
plog.Error("Failure fetch nodes when watching service.", log.Error(err))
p.cluster.Logger().Error("Failure fetch nodes when watching service.", slog.Any("error", err))
return err
}
}
Expand Down Expand Up @@ -334,21 +332,21 @@ func (p *Provider) updateLeadership(ns []*Node) {
role = Leader
}
if role != p.role {
plog.Info("Role changed.", log.String("from", p.role.String()), log.String("to", role.String()))
p.cluster.Logger().Info("Role changed.", slog.String("from", p.role.String()), slog.String("to", role.String()))
p.role = role
p.roleChangedChan <- role
}
}

func (p *Provider) onEvent(evt zk.Event) {
plog.Debug("Zookeeper event.", log.String("type", evt.Type.String()), log.String("state", evt.State.String()), log.String("path", evt.Path))
p.cluster.Logger().Debug("Zookeeper event.", slog.String("type", evt.Type.String()), slog.String("state", evt.State.String()), slog.String("path", evt.Path))
if evt.Type != zk.EventSession {
return
}
switch evt.State {
case zk.StateConnecting, zk.StateDisconnected, zk.StateExpired:
if p.role == Leader {
plog.Info("Role changed.", log.String("from", Leader.String()), log.String("to", Follower.String()))
p.cluster.Logger().Info("Role changed.", slog.String("from", Leader.String()), slog.String("to", Follower.String()))
p.role = Follower
p.roleChangedChan <- Follower
}
Expand Down Expand Up @@ -379,7 +377,7 @@ func (p *Provider) startWatching(registerSelf bool) {
go func() {
for !p.shutdown {
if err := p.keepWatching(ctx, registerSelf); err != nil {
plog.Error("Failed to keepWatching.", log.Error(err))
p.cluster.Logger().Error("Failed to keepWatching.", slog.Any("error", err))
p.clusterError = err
}
}
Expand All @@ -394,7 +392,7 @@ func (p *Provider) GetHealthStatus() error {
func (p *Provider) fetchNodes() ([]*Node, int32, error) {
children, stat, err := p.conn.Children(p.clusterKey)
if err != nil {
plog.Error("FetchNodes fail.", log.String("node", p.clusterKey), log.Error(err))
p.cluster.Logger().Error("FetchNodes fail.", slog.String("node", p.clusterKey), slog.Any("error", err))
return nil, 0, err
}

Expand All @@ -403,21 +401,21 @@ func (p *Provider) fetchNodes() ([]*Node, int32, error) {
long := joinPath(p.clusterKey, short)
value, _, err := p.conn.Get(long)
if err != nil {
plog.Error("FetchNodes fail.", log.String("node", long), log.Error(err))
p.cluster.Logger().Error("FetchNodes fail.", slog.String("node", long), slog.Any("error", err))
return nil, stat.Cversion, err
}
n := Node{Meta: make(map[string]string)}
if err := n.Deserialize(value); err != nil {
plog.Error("FetchNodes Deserialize fail.", log.String("node", long), log.String("val", string(value)), log.Error(err))
p.cluster.Logger().Error("FetchNodes Deserialize fail.", slog.String("node", long), slog.String("val", string(value)), slog.Any("error", err))
return nil, stat.Cversion, err
}
seq, err := parseSeq(long)
if err != nil {
plog.Error("FetchNodes parse seq fail.", log.String("node", long), log.String("val", string(value)), log.Error(err))
p.cluster.Logger().Error("FetchNodes parse seq fail.", slog.String("node", long), slog.String("val", string(value)), slog.Any("error", err))
} else {
n.SetMeta(metaKeySeq, intToStr(seq))
}
plog.Info("FetchNodes new node.", log.String("id", n.ID), log.String("path", long), log.Int("seq", seq))
p.cluster.Logger().Info("FetchNodes new node.", slog.String("id", n.ID), slog.String("path", long), slog.Int("seq", seq))
nodes = append(nodes, &n)
}
return p.uniqNodes(nodes), stat.Cversion, nil
Expand Down Expand Up @@ -469,7 +467,7 @@ func (p *Provider) createClusterTopologyEvent() []*cluster.Member {

func (p *Provider) publishClusterTopologyEvent() {
res := p.createClusterTopologyEvent()
plog.Info("Update cluster.", log.Int("members", len(res)))
p.cluster.Logger().Info("Update cluster.", slog.Int("members", len(res)))
p.cluster.MemberList.UpdateClusterTopology(res)
}

Expand Down
Loading

0 comments on commit efdd7ae

Please sign in to comment.