diff --git a/.env b/.env index d3ae0426a9..1e7b1e11a7 100644 --- a/.env +++ b/.env @@ -4,7 +4,7 @@ REDIS_IMAGE=redis:7.0.0 ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8 KAFKA_IMAGE=bitnami/kafka:3.5.1 MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z - +ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13 OPENIM_WEB_FRONT_IMAGE=openim/openim-web-front:release-v3.5.1 OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.7 diff --git a/config/discovery.yml b/config/discovery.yml new file mode 100644 index 0000000000..3d96ff9b66 --- /dev/null +++ b/config/discovery.yml @@ -0,0 +1,13 @@ +enable: "etcd" +etcd: + rootDirectory: openim + address: [ localhost:12379 ] + username: '' + password: '' + +zookeeper: + schema: openim + address: [ localhost:12181 ] + username: '' + password: '' + diff --git a/config/share.yml b/config/share.yml index 2abbb77a0f..fc97b6a1ff 100644 --- a/config/share.yml +++ b/config/share.yml @@ -1,5 +1,4 @@ secret: openIM123 -env: zookeeper rpcRegisterName: user: user friend: friend diff --git a/config/zookeeper.yml b/config/zookeeper.yml deleted file mode 100644 index 33f52d7ca7..0000000000 --- a/config/zookeeper.yml +++ /dev/null @@ -1,6 +0,0 @@ - -schema: openim -address: [ localhost:12181 ] -username: '' -password: '' - diff --git a/docker-compose.yml b/docker-compose.yml index aeb53a4170..d72c1a2fa4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -58,6 +58,26 @@ services: networks: - openim + etcd: + image: "${ETCD_IMAGE}" + container_name: etcd + ports: + - "12379:2379" + - "12380:2380" + environment: + - ETCD_NAME=s1 + - ETCD_DATA_DIR=/etcd-data + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://0.0.0.0:2380 + - ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380 + - ETCD_INITIAL_CLUSTER_TOKEN=tkn + - ETCD_INITIAL_CLUSTER_STATE=new + restart: always + networks: + - openim + kafka: image: "${KAFKA_IMAGE}" container_name: kafka diff --git a/go.mod b/go.mod index b522065da4..e9777eaa8b 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.65 - github.com/openimsdk/tools v0.0.49-alpha.2 + github.com/openimsdk/tools v0.0.49-alpha.18 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -44,7 +44,6 @@ require ( golang.org/x/sync v0.6.0 ) - require ( cloud.google.com/go v0.112.0 // indirect cloud.google.com/go/compute v1.23.3 // indirect @@ -59,6 +58,8 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/clbanning/mxj v1.8.4 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -75,7 +76,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-zookeeper/zk v1.0.3 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/s2a-go v0.1.7 // indirect @@ -138,6 +139,9 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.etcd.io/etcd/api/v3 v3.5.13 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect + go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect diff --git a/go.sum b/go.sum index e2ecf7284e..3acb4709c6 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -111,6 +115,7 @@ github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= @@ -132,8 +137,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -283,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.2 h1:8IfV6o2ySU7C54sh/MG7ctEp1h3lSNe03OCUDWSk5Ws= -github.com/openimsdk/tools v0.0.49-alpha.2/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= +github.com/openimsdk/tools v0.0.49-alpha.18 h1:ARQeCiRmExvtB6XYItegThuV63JGOTxddwhSLHYXd78= +github.com/openimsdk/tools v0.0.49-alpha.18/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -378,6 +383,12 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4= +go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c= +go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg= +go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8= +go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js= +go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= diff --git a/internal/api/init.go b/internal/api/init.go index 6e784da9a9..b49a145696 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -38,20 +38,17 @@ import ( ) type Config struct { - RpcConfig config.API - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - MinioConfig config.Minio + API config.API + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, index int, config *Config) error { - apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index) + apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index) if err != nil { return err } - prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) + prometheusPort, err := datautil.GetElemByIndex(config.API.Prometheus.Ports, index) if err != nil { return err } @@ -59,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error { var client discovery.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } @@ -70,7 +67,7 @@ func Start(ctx context.Context, index int, config *Config) error { ) router := newGinRouter(client, config) - if config.RpcConfig.Prometheus.Enable { + if config.API.Prometheus.Enable { go func() { p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) @@ -81,7 +78,7 @@ func Start(ctx context.Context, index int, config *Config) error { }() } - address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort)) + address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) server := http.Server{Addr: address, Handler: router} log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) diff --git a/internal/api/router.go b/internal/api/router.go index bd2de99db7..1fbb33b092 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -34,7 +34,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg) conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation) authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) - thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.RpcConfig.Prometheus.GrafanaURL) + thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index af869dd850..0581a025b4 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -286,6 +286,7 @@ func (c *Client) KickOnlineMessage() error { resp := Resp{ ReqIdentifier: WSKickOnlineMsg, } + log.ZDebug(c.ctx, "KickOnlineMessage debug ") err := c.writeBinaryMsg(resp) c.close() return err diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index bfe81b6024..f9bb699ed9 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -35,7 +35,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover } func (s *Server) Start(ctx context.Context, index int, conf *Config) error { - return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, + return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, conf.MsgGateway.RPC.RegisterIP, conf.MsgGateway.RPC.Ports, index, conf.Share.RpcRegisterName.MessageGateway, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 727ade0afd..ef24d1bf93 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -24,10 +24,10 @@ import ( ) type Config struct { - MsgGateway config.MsgGateway - ZookeeperConfig config.ZooKeeper - Share config.Share - WebhooksConfig config.Webhooks + MsgGateway config.MsgGateway + Share config.Share + WebhooksConfig config.Webhooks + Discovery config.Discovery } // Start run ws server. diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index cf607d4707..defec16df1 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -211,7 +211,8 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C // Online push user online message to other node for _, v := range conns { - v := v // safe closure var + v := v + log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target()) if v.Target() == ws.disCov.GetSelfConnTarget() { log.ZDebug(ctx, "Filter out this node", "node", v.Target()) continue @@ -267,7 +268,9 @@ func (ws *WsServer) registerClient(client *Client) { } wg := sync.WaitGroup{} - if ws.msgGatewayConfig.Share.Env == "zookeeper" { + log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", ws.msgGatewayConfig.Discovery.Enable) + + if ws.msgGatewayConfig.Discovery.Enable != "k8s" { wg.Add(1) go func() { defer wg.Done() diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 68d953e902..3384b84937 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -56,13 +56,13 @@ type MsgTransfer struct { } type Config struct { - MsgTransfer config.MsgTransfer - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper - Share config.Share - WebhooksConfig config.Webhooks + MsgTransfer config.MsgTransfer + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + Share config.Share + WebhooksConfig config.Webhooks + Discovery config.Discovery } func Start(ctx context.Context, index int, config *Config) error { @@ -76,7 +76,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return err } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8691e92ab6..df2660804d 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -83,7 +83,7 @@ type OnlineHistoryRedisConsumerHandler struct { func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true) if err != nil { return nil, err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 978302e768..c9c0358935 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true) if err != nil { return nil, err } diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 30bdf3e2ed..a61399fb6b 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -12,11 +12,6 @@ import ( "sync" ) -const ( - KUBERNETES = "k8s" - ZOOKEEPER = "zookeeper" -) - type OnlinePusher interface { GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) @@ -42,10 +37,12 @@ func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * } func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher { - switch config.Share.Env { - case KUBERNETES: + switch config.Discovery.Enable { + case "k8s": return NewK8sStaticConsistentHash(disCov, config) - case ZOOKEEPER: + case "zookeeper": + return NewDefaultAllNode(disCov, config) + case "etcd": return NewDefaultAllNode(disCov, config) default: return newEmptyOnlinePUsher() @@ -64,7 +61,12 @@ func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *D func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway) - log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) + if len(conns) == 0 { + log.ZWarn(ctx, "get gateway conn 0 ", nil) + } else { + log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) + } + if err != nil { return nil, err } @@ -85,10 +87,12 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M // Online push message for _, conn := range conns { conn := conn // loop var safe + ctx := ctx wg.Go(func() error { msgClient := msggateway.NewMsgGatewayClient(conn) reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) if err != nil { + log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ", err, "req:", input.String()) return nil } diff --git a/internal/push/push.go b/internal/push/push.go index 18012a8646..2e5c4e5265 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -24,11 +24,11 @@ type Config struct { RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 3a9a696f64..bf0ede375c 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -60,7 +60,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, - []string{config.KafkaConfig.ToPushTopic}) + []string{config.KafkaConfig.ToPushTopic}, true) if err != nil { return nil, err } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index c6d236b21b..ddb655398d 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -45,10 +45,10 @@ type authServer struct { } type Config struct { - RpcConfig config.Auth - RedisConfig config.Redis - ZookeeperConfig config.ZooKeeper - Share config.Share + RpcConfig config.Auth + RedisConfig config.Redis + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 96d2a403f6..4c7828610f 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -51,10 +51,10 @@ type Config struct { RpcConfig config.Conversation RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index bffda3c04e..b49490f264 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -50,14 +50,15 @@ type friendServer struct { } type Config struct { - RpcConfig config.Friend - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper + RpcConfig config.Friend + RedisConfig config.Redis + MongodbConfig config.Mongo + //ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 13bd7f9beb..551554c236 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -68,11 +68,11 @@ type Config struct { RpcConfig config.Group RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 3f4df8d4b7..5d7c0b297b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -59,11 +59,11 @@ type ( RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } ) diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 9bf8cafa9e..a3d9085d31 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -46,11 +46,11 @@ type Config struct { RpcConfig config.Third RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share MinioConfig config.Minio LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index c453ac9f85..a28fa24e28 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -61,11 +61,11 @@ type Config struct { RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 20baeffaf0..bf037b694b 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -33,9 +33,9 @@ import ( ) type CronTaskConfig struct { - CronTask config.CronTask - ZookeeperConfig config.ZooKeeper - Share config.Share + CronTask config.CronTask + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, config *CronTaskConfig) error { @@ -43,7 +43,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 022fb10978..ecdb0dd3ad 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -33,9 +33,9 @@ func NewApiCmd() *ApiCmd { var apiConfig api.Config ret := &ApiCmd{apiConfig: &apiConfig} ret.configMap = map[string]any{ - OpenIMAPICfgFileName: &apiConfig.RpcConfig, - ZookeeperConfigFileName: &apiConfig.ZookeeperConfig, + OpenIMAPICfgFileName: &apiConfig.API, ShareFileName: &apiConfig.Share, + DiscoveryConfigFilename: &apiConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 5ed02ffd00..7d75a7da65 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -36,8 +36,8 @@ func NewAuthRpcCmd() *AuthRpcCmd { ret.configMap = map[string]any{ OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig, RedisConfigFileName: &authConfig.RedisConfig, - ZookeeperConfigFileName: &authConfig.ZookeeperConfig, ShareFileName: &authConfig.Share, + DiscoveryConfigFilename: &authConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -53,7 +53,7 @@ func (a *AuthRpcCmd) Exec() error { } func (a *AuthRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) } diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go index 55eb4a069f..45dbcafda2 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/cmd/constant.go @@ -26,7 +26,6 @@ var ( LocalCacheConfigFileName string KafkaConfigFileName string RedisConfigFileName string - ZookeeperConfigFileName string MongodbConfigFileName string MinioConfigFileName string LogConfigFileName string @@ -42,6 +41,7 @@ var ( OpenIMRPCMsgCfgFileName string OpenIMRPCThirdCfgFileName string OpenIMRPCUserCfgFileName string + DiscoveryConfigFilename string ) var ConfigEnvPrefixMap map[string]string @@ -54,7 +54,6 @@ func init() { LocalCacheConfigFileName = "local-cache.yml" KafkaConfigFileName = "kafka.yml" RedisConfigFileName = "redis.yml" - ZookeeperConfigFileName = "zookeeper.yml" MongodbConfigFileName = "mongodb.yml" MinioConfigFileName = "minio.yml" LogConfigFileName = "log.yml" @@ -70,16 +69,17 @@ func init() { OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml" OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml" OpenIMRPCUserCfgFileName = "openim-rpc-user.yml" + DiscoveryConfigFilename = "discovery.yml" ConfigEnvPrefixMap = make(map[string]string) fileNames := []string{ FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName, - KafkaConfigFileName, RedisConfigFileName, ZookeeperConfigFileName, + KafkaConfigFileName, RedisConfigFileName, MongodbConfigFileName, MinioConfigFileName, LogConfigFileName, OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName, OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName, OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName, - OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, + OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename, } for _, fileName := range fileNames { diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 0a617c7296..57ffa52bc4 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -36,11 +36,11 @@ func NewConversationRpcCmd() *ConversationRpcCmd { ret.configMap = map[string]any{ OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig, RedisConfigFileName: &conversationConfig.RedisConfig, - ZookeeperConfigFileName: &conversationConfig.ZookeeperConfig, MongodbConfigFileName: &conversationConfig.MongodbConfig, ShareFileName: &conversationConfig.Share, NotificationFileName: &conversationConfig.NotificationConfig, LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig, + DiscoveryConfigFilename: &conversationConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -55,7 +55,7 @@ func (a *ConversationRpcCmd) Exec() error { } func (a *ConversationRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) } diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index be26f5af36..fd44475246 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -34,8 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd { ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} ret.configMap = map[string]any{ OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, - ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig, ShareFileName: &cronTaskConfig.Share, + DiscoveryConfigFilename: &cronTaskConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index b8d46f77ef..8be1f77458 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -36,12 +36,12 @@ func NewFriendRpcCmd() *FriendRpcCmd { ret.configMap = map[string]any{ OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig, RedisConfigFileName: &friendConfig.RedisConfig, - ZookeeperConfigFileName: &friendConfig.ZookeeperConfig, MongodbConfigFileName: &friendConfig.MongodbConfig, ShareFileName: &friendConfig.Share, NotificationFileName: &friendConfig.NotificationConfig, WebhooksConfigFileName: &friendConfig.WebhooksConfig, LocalCacheConfigFileName: &friendConfig.LocalCacheConfig, + DiscoveryConfigFilename: &friendConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *FriendRpcCmd) Exec() error { } func (a *FriendRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.friendConfig.ZookeeperConfig, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports, a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 8bf9778249..f158b8c626 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -36,12 +36,12 @@ func NewGroupRpcCmd() *GroupRpcCmd { ret.configMap = map[string]any{ OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig, RedisConfigFileName: &groupConfig.RedisConfig, - ZookeeperConfigFileName: &groupConfig.ZookeeperConfig, MongodbConfigFileName: &groupConfig.MongodbConfig, ShareFileName: &groupConfig.Share, NotificationFileName: &groupConfig.NotificationConfig, WebhooksConfigFileName: &groupConfig.WebhooksConfig, LocalCacheConfigFileName: &groupConfig.LocalCacheConfig, + DiscoveryConfigFilename: &groupConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *GroupRpcCmd) Exec() error { } func (a *GroupRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.groupConfig.ZookeeperConfig, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index a3b521b4b7..91f7931fbe 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -36,13 +36,13 @@ func NewMsgRpcCmd() *MsgRpcCmd { ret.configMap = map[string]any{ OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig, RedisConfigFileName: &msgConfig.RedisConfig, - ZookeeperConfigFileName: &msgConfig.ZookeeperConfig, MongodbConfigFileName: &msgConfig.MongodbConfig, KafkaConfigFileName: &msgConfig.KafkaConfig, ShareFileName: &msgConfig.Share, NotificationFileName: &msgConfig.NotificationConfig, WebhooksConfigFileName: &msgConfig.WebhooksConfig, LocalCacheConfigFileName: &msgConfig.LocalCacheConfig, + DiscoveryConfigFilename: &msgConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *MsgRpcCmd) Exec() error { } func (a *MsgRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 897fd70087..78004094c7 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -36,9 +36,9 @@ func NewMsgGatewayCmd() *MsgGatewayCmd { ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig} ret.configMap = map[string]any{ OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway, - ZookeeperConfigFileName: &msgGatewayConfig.ZookeeperConfig, ShareFileName: &msgGatewayConfig.Share, WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig, + DiscoveryConfigFilename: &msgGatewayConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 86f42dc56f..0d48281e50 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -37,9 +37,9 @@ func NewMsgTransferCmd() *MsgTransferCmd { RedisConfigFileName: &msgTransferConfig.RedisConfig, MongodbConfigFileName: &msgTransferConfig.MongodbConfig, KafkaConfigFileName: &msgTransferConfig.KafkaConfig, - ZookeeperConfigFileName: &msgTransferConfig.ZookeeperConfig, ShareFileName: &msgTransferConfig.Share, WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig, + DiscoveryConfigFilename: &msgTransferConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 0140ced237..3e7c4c2492 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -36,13 +36,13 @@ func NewPushRpcCmd() *PushRpcCmd { ret.configMap = map[string]any{ OpenIMPushCfgFileName: &pushConfig.RpcConfig, RedisConfigFileName: &pushConfig.RedisConfig, - ZookeeperConfigFileName: &pushConfig.ZookeeperConfig, MongodbConfigFileName: &pushConfig.MongodbConfig, KafkaConfigFileName: &pushConfig.KafkaConfig, ShareFileName: &pushConfig.Share, NotificationFileName: &pushConfig.NotificationConfig, WebhooksConfigFileName: &pushConfig.WebhooksConfig, LocalCacheConfigFileName: &pushConfig.LocalCacheConfig, + DiscoveryConfigFilename: &pushConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *PushRpcCmd) Exec() error { } func (a *PushRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 0dfa7d5be5..b6731f1ff4 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -36,12 +36,12 @@ func NewThirdRpcCmd() *ThirdRpcCmd { ret.configMap = map[string]any{ OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig, RedisConfigFileName: &thirdConfig.RedisConfig, - ZookeeperConfigFileName: &thirdConfig.ZookeeperConfig, MongodbConfigFileName: &thirdConfig.MongodbConfig, ShareFileName: &thirdConfig.Share, NotificationFileName: &thirdConfig.NotificationConfig, MinioConfigFileName: &thirdConfig.MinioConfig, LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig, + DiscoveryConfigFilename: &thirdConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *ThirdRpcCmd) Exec() error { } func (a *ThirdRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 315b93256c..674f9e3a6d 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -36,13 +36,13 @@ func NewUserRpcCmd() *UserRpcCmd { ret.configMap = map[string]any{ OpenIMRPCUserCfgFileName: &userConfig.RpcConfig, RedisConfigFileName: &userConfig.RedisConfig, - ZookeeperConfigFileName: &userConfig.ZookeeperConfig, MongodbConfigFileName: &userConfig.MongodbConfig, KafkaConfigFileName: &userConfig.KafkaConfig, ShareFileName: &userConfig.Share, NotificationFileName: &userConfig.NotificationConfig, WebhooksConfigFileName: &userConfig.WebhooksConfig, LocalCacheConfigFileName: &userConfig.LocalCacheConfig, + DiscoveryConfigFilename: &userConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *UserRpcCmd) Exec() error { } func (a *UserRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 24d04d8ccc..12c4f7f789 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -345,7 +345,6 @@ type AfterConfig struct { type Share struct { Secret string `mapstructure:"secret"` - Env string `mapstructure:"env"` RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` IMAdminUserID []string `mapstructure:"imAdminUserID"` } @@ -432,6 +431,19 @@ type ZooKeeper struct { Password string `mapstructure:"password"` } +type Discovery struct { + Enable string `mapstructure:"enable"` + Etcd Etcd `mapstructure:"etcd"` + ZooKeeper ZooKeeper `mapstructure:"zooKeeper"` +} + +type Etcd struct { + RootDirectory string `mapstructure:"rootDirectory"` + Address []string `mapstructure:"address"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + func (m *Mongo) Build() *mongoutil.Config { return &mongoutil.Config{ Uri: m.URI, diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 38d7382fa6..559c937c10 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -18,36 +18,34 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" "time" ) -const ( - zookeeperConst = "zookeeper" - kubenetesConst = "k8s" - directConst = "direct" -) - // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { - switch share.Env { - case zookeeperConst: - +func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { + switch discovery.Enable { + case "zookeeper": return zookeeper.NewZkClient( - zookeeperConfig.Address, - zookeeperConfig.Schema, + discovery.ZooKeeper.Address, + discovery.ZooKeeper.Schema, zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password), + zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), ) - case kubenetesConst: + case "k8s": return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) - case directConst: - //return direct.NewConnDirect(config) + case "etcd": + return etcd.NewSvcDiscoveryRegistry( + discovery.Etcd.RootDirectory, + discovery.Etcd.Address, + etcd.WithDialTimeout(10*time.Second), + etcd.WithMaxCallSendMsgSize(20*1024*1024), + etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) default: - return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap() + return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap() } - return nil, nil } diff --git a/pkg/common/discoveryregister/etcd/doc.go b/pkg/common/discoveryregister/etcd/doc.go new file mode 100644 index 0000000000..1da7508a16 --- /dev/null +++ b/pkg/common/discoveryregister/etcd/doc.go @@ -0,0 +1,15 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd" diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go deleted file mode 100644 index 1d11414b67..0000000000 --- a/pkg/common/discoveryregister/zookeeper/zookeeper.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package zookeeper - -import ( - "os" - "strings" -) - -// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. -func getEnv(key, fallback string) string { - if value, exists := os.LookupEnv(key); exists { - return value - } - return fallback -} - -// getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables. -// If the environment variables are not set, it returns the fallback value. -func getZkAddrFromEnv(fallback []string) []string { - address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS") - port, portExists := os.LookupEnv("ZOOKEEPER_PORT") - - if addrExists && portExists { - addresses := strings.Split(address, ",") - for i, addr := range addresses { - addresses[i] = addr + ":" + port - } - return addresses - } - return fallback -} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index ebcd5aa7cc..a36bcfe1c8 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -44,7 +44,7 @@ import ( ) // Start rpc server. -func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP, +func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusConfig *config2.Prometheus, listenIP, registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config2.Share, config T, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { @@ -68,7 +68,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome } defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(zookeeperConfig, share) + client, err := kdisc.NewDiscoveryRegister(discovery, share) if err != nil { return err } diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 7fe64d3c5b..5fa84ac36f 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -22,6 +22,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/minio" @@ -43,6 +44,14 @@ func CheckZookeeper(ctx context.Context, config *config.ZooKeeper) error { return zookeeper.Check(ctx, config.Address, config.Schema, zookeeper.WithUserNameAndPassword(config.Username, config.Password)) } +func CheckEtcd(ctx context.Context, config *config.Etcd) error { + return etcd.Check(ctx, config.Address, "/check_openim_component", + true, + etcd.WithDialTimeout(10*time.Second), + etcd.WithMaxCallSendMsgSize(20*1024*1024), + etcd.WithUsernameAndPassword(config.Username, config.Password)) +} + func CheckMongo(ctx context.Context, config *config.Mongo) error { return mongoutil.Check(ctx, config.Build()) } @@ -59,14 +68,14 @@ func CheckKafka(ctx context.Context, conf *config.Kafka) error { return kafka.Check(ctx, conf.Build(), []string{conf.ToMongoTopic, conf.ToRedisTopic, conf.ToPushTopic}) } -func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.ZooKeeper, error) { +func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.Discovery, error) { var ( - mongoConfig = &config.Mongo{} - redisConfig = &config.Redis{} - kafkaConfig = &config.Kafka{} - minioConfig = &config.Minio{} - zookeeperConfig = &config.ZooKeeper{} - thirdConfig = &config.Third{} + mongoConfig = &config.Mongo{} + redisConfig = &config.Redis{} + kafkaConfig = &config.Kafka{} + minioConfig = &config.Minio{} + discovery = &config.Discovery{} + thirdConfig = &config.Third{} ) err := config.LoadConfig(filepath.Join(configDir, cmd.MongodbConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], mongoConfig) if err != nil { @@ -96,11 +105,11 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, } else { minioConfig = nil } - err = config.LoadConfig(filepath.Join(configDir, cmd.ZookeeperConfigFileName), cmd.ConfigEnvPrefixMap[cmd.ZookeeperConfigFileName], zookeeperConfig) + err = config.LoadConfig(filepath.Join(configDir, cmd.DiscoveryConfigFilename), cmd.ConfigEnvPrefixMap[cmd.DiscoveryConfigFilename], discovery) if err != nil { return nil, nil, nil, nil, nil, err } - return mongoConfig, redisConfig, kafkaConfig, minioConfig, zookeeperConfig, nil + return mongoConfig, redisConfig, kafkaConfig, minioConfig, discovery, nil } func main() { @@ -127,35 +136,40 @@ func main() { } } -func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, zookeeperConfig *config.ZooKeeper, maxRetry int) error { +func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, discovery *config.Discovery, maxRetry int) error { checksDone := make(map[string]bool) - checks := map[string]func() error{ - "Zookeeper": func() error { - return CheckZookeeper(ctx, zookeeperConfig) - }, - "Mongo": func() error { + checks := map[string]func(ctx context.Context) error{ + "Mongo": func(ctx context.Context) error { return CheckMongo(ctx, mongoConfig) }, - "Redis": func() error { + "Redis": func(ctx context.Context) error { return CheckRedis(ctx, redisConfig) }, - "Kafka": func() error { + "Kafka": func(ctx context.Context) error { return CheckKafka(ctx, kafkaConfig) }, } - if minioConfig != nil { - checks["MinIO"] = func() error { + checks["MinIO"] = func(ctx context.Context) error { return CheckMinIO(ctx, minioConfig) } } + if discovery.Enable == "etcd" { + checks["Etcd"] = func(ctx context.Context) error { + return CheckEtcd(ctx, &discovery.Etcd) + } + } else if discovery.Enable == "zookeeper" { + checks["Zookeeper"] = func(ctx context.Context) error { + return CheckZookeeper(ctx, &discovery.ZooKeeper) + } + } for i := 0; i < maxRetry; i++ { allSuccess := true for name, check := range checks { if !checksDone[name] { - if err := check(); err != nil { + if err := check(ctx); err != nil { fmt.Printf("%s check failed: %v\n", name, err) allSuccess = false } else {