Skip to content

Commit

Permalink
Etcd naming and discovery (#2300)
Browse files Browse the repository at this point in the history
* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism

* Add etcd as a service discovery mechanism
  • Loading branch information
skiffer-git authored May 14, 2024
1 parent 961fb47 commit 835ff38
Show file tree
Hide file tree
Showing 46 changed files with 224 additions and 182 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ REDIS_IMAGE=redis:7.0.0
ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8
KAFKA_IMAGE=bitnami/kafka:3.5.1
MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z

ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13

OPENIM_WEB_FRONT_IMAGE=openim/openim-web-front:release-v3.5.1
OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.7
Expand Down
13 changes: 13 additions & 0 deletions config/discovery.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
enable: "etcd"
etcd:
rootDirectory: openim
address: [ localhost:12379 ]
username: ''
password: ''

zookeeper:
schema: openim
address: [ localhost:12181 ]
username: ''
password: ''

1 change: 0 additions & 1 deletion config/share.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
secret: openIM123
env: zookeeper
rpcRegisterName:
user: user
friend: friend
Expand Down
6 changes: 0 additions & 6 deletions config/zookeeper.yml

This file was deleted.

20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ services:
networks:
- openim

etcd:
image: "${ETCD_IMAGE}"
container_name: etcd
ports:
- "12379:2379"
- "12380:2380"
environment:
- ETCD_NAME=s1
- ETCD_DATA_DIR=/etcd-data
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
- ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379
- ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
- ETCD_INITIAL_ADVERTISE_PEER_URLS=http://0.0.0.0:2380
- ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
- ETCD_INITIAL_CLUSTER_TOKEN=tkn
- ETCD_INITIAL_CLUSTER_STATE=new
restart: always
networks:
- openim

kafka:
image: "${KAFKA_IMAGE}"
container_name: kafka
Expand Down
10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.65
github.com/openimsdk/tools v0.0.49-alpha.2
github.com/openimsdk/tools v0.0.49-alpha.18
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -44,7 +44,6 @@ require (
golang.org/x/sync v0.6.0
)


require (
cloud.google.com/go v0.112.0 // indirect
cloud.google.com/go/compute v1.23.3 // indirect
Expand All @@ -59,6 +58,8 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand All @@ -75,7 +76,7 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-zookeeper/zk v1.0.3 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
Expand Down Expand Up @@ -138,6 +139,9 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.etcd.io/etcd/api/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/v3 v3.5.13 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
Expand Down
19 changes: 15 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -111,6 +115,7 @@ github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
Expand All @@ -132,8 +137,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -283,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.2 h1:8IfV6o2ySU7C54sh/MG7ctEp1h3lSNe03OCUDWSk5Ws=
github.com/openimsdk/tools v0.0.49-alpha.2/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko=
github.com/openimsdk/tools v0.0.49-alpha.18 h1:ARQeCiRmExvtB6XYItegThuV63JGOTxddwhSLHYXd78=
github.com/openimsdk/tools v0.0.49-alpha.18/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
Expand Down Expand Up @@ -378,6 +383,12 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4=
go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c=
go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg=
go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8=
go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js=
go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI=
go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80=
go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
Expand Down
19 changes: 8 additions & 11 deletions internal/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,25 @@ import (
)

type Config struct {
RpcConfig config.API
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
MinioConfig config.Minio
API config.API
Share config.Share
Discovery config.Discovery
}

func Start(ctx context.Context, index int, config *Config) error {
apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index)
apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index)
if err != nil {
return err
}
prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index)
prometheusPort, err := datautil.GetElemByIndex(config.API.Prometheus.Ports, index)
if err != nil {
return err
}

var client discovery.SvcDiscoveryRegistry

// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
Expand All @@ -70,7 +67,7 @@ func Start(ctx context.Context, index int, config *Config) error {
)

router := newGinRouter(client, config)
if config.RpcConfig.Prometheus.Enable {
if config.API.Prometheus.Enable {
go func() {
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort))
Expand All @@ -81,7 +78,7 @@ func Start(ctx context.Context, index int, config *Config) error {
}()

}
address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort))
address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))

server := http.Server{Addr: address, Handler: router}
log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
Expand Down
2 changes: 1 addition & 1 deletion internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg)
conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation)
authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.RpcConfig.Prometheus.GrafanaURL)
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL)

u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
Expand Down
1 change: 1 addition & 0 deletions internal/msggateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (c *Client) KickOnlineMessage() error {
resp := Resp{
ReqIdentifier: WSKickOnlineMsg,
}
log.ZDebug(c.ctx, "KickOnlineMessage debug ")
err := c.writeBinaryMsg(resp)
c.close()
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/msggateway/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover
}

func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP,
conf.MsgGateway.RPC.RegisterIP,
conf.MsgGateway.RPC.Ports, index,
conf.Share.RpcRegisterName.MessageGateway,
Expand Down
8 changes: 4 additions & 4 deletions internal/msggateway/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

type Config struct {
MsgGateway config.MsgGateway
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
MsgGateway config.MsgGateway
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
}

// Start run ws server.
Expand Down
7 changes: 5 additions & 2 deletions internal/msggateway/n_ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C

// Online push user online message to other node
for _, v := range conns {
v := v // safe closure var
v := v
log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target())
if v.Target() == ws.disCov.GetSelfConnTarget() {
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
continue
Expand Down Expand Up @@ -267,7 +268,9 @@ func (ws *WsServer) registerClient(client *Client) {
}

wg := sync.WaitGroup{}
if ws.msgGatewayConfig.Share.Env == "zookeeper" {
log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", ws.msgGatewayConfig.Discovery.Enable)

if ws.msgGatewayConfig.Discovery.Enable != "k8s" {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
16 changes: 8 additions & 8 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ type MsgTransfer struct {
}

type Config struct {
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
}

func Start(ctx context.Context, index int, config *Config) error {
Expand All @@ -76,7 +76,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type OnlineHistoryRedisConsumerHandler struct {

func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic})
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct {
}

func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic})
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true)
if err != nil {
return nil, err
}
Expand Down
22 changes: 13 additions & 9 deletions internal/push/onlinepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import (
"sync"
)

const (
KUBERNETES = "k8s"
ZOOKEEPER = "zookeeper"
)

type OnlinePusher interface {
GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error)
Expand All @@ -42,10 +37,12 @@ func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
}

func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
switch config.Share.Env {
case KUBERNETES:
switch config.Discovery.Enable {
case "k8s":
return NewK8sStaticConsistentHash(disCov, config)
case ZOOKEEPER:
case "zookeeper":
return NewDefaultAllNode(disCov, config)
case "etcd":
return NewDefaultAllNode(disCov, config)
default:
return newEmptyOnlinePUsher()
Expand All @@ -64,7 +61,12 @@ func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *D
func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if len(conns) == 0 {
log.ZWarn(ctx, "get gateway conn 0 ", nil)
} else {
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
}

if err != nil {
return nil, err
}
Expand All @@ -85,10 +87,12 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M
// Online push message
for _, conn := range conns {
conn := conn // loop var safe
ctx := ctx
wg.Go(func() error {
msgClient := msggateway.NewMsgGatewayClient(conn)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
if err != nil {
log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ", err, "req:", input.String())
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ type Config struct {
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}

func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
Expand Down
Loading

0 comments on commit 835ff38

Please sign in to comment.