Skip to content

Commit

Permalink
fix: zk add close to avoid zk block. (#1284)
Browse files Browse the repository at this point in the history
* fix: zk add close to avoid zk block.

* fix: zk add close to avoid zk block.

* fix: zk add close to avoid zk block.

* fix: zk add close to avoid zk block.
  • Loading branch information
FGadvancer authored Oct 26, 2023
1 parent cb03943 commit 38ab3e0
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require github.com/google/uuid v1.3.1
require (
github.com/IBM/sarama v1.41.3
github.com/OpenIMSDK/protocol v0.0.30
github.com/OpenIMSDK/tools v0.0.14
github.com/OpenIMSDK/tools v0.0.15
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.7.1
Expand Down
11 changes: 11 additions & 0 deletions internal/rpc/msg/sync_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (m *msgServer) PullMessageBySeqs(
case sdkws.PullOrder_PullOrderDesc:
isEnd = seq.Begin <= minSeq
}
if len(msgs) == 0 {
log.ZWarn(ctx, "not have msgs", nil, "conversationID", seq.ConversationID, "seq", seq)

continue
}
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs, IsEnd: isEnd}
} else {
var seqs []int64
Expand All @@ -71,6 +76,7 @@ func (m *msgServer) PullMessageBySeqs(
minSeq, maxSeq, notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, seq.ConversationID, seqs)
if err != nil {
log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq)

continue
}
var isEnd bool
Expand All @@ -80,6 +86,11 @@ func (m *msgServer) PullMessageBySeqs(
case sdkws.PullOrder_PullOrderDesc:
isEnd = seq.Begin <= minSeq
}
if len(notificationMsgs) == 0 {
log.ZWarn(ctx, "not have notificationMsgs", nil, "conversationID", seq.ConversationID, "seq", seq)

continue
}
resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs, IsEnd: isEnd}
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/discovery_register/k8s_discovery_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,6 @@ func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}
func (cli *K8sDR) Close() {
return
}
17 changes: 6 additions & 11 deletions pkg/common/startrpc/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,12 @@ func Start(
return err
}
defer listener.Close()
zkClient, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
/*
zkClient, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
zookeeper.WithFreq(time.Hour), zookeeper.WithUserNameAndPassword(
config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password,
), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/if err != nil {
client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil {
return utils.Wrap1(err)
}
// defer zkClient.CloseZK()
zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP)
if err != nil {
return err
Expand All @@ -81,11 +76,11 @@ func Start(
}
srv := grpc.NewServer(options...)
defer srv.GracefulStop()
err = rpcFn(zkClient, srv)
err = rpcFn(client, srv)
if err != nil {
return utils.Wrap1(err)
}
err = zkClient.Register(
err = client.Register(
rpcRegisterName,
registerIP,
rpcPort,
Expand Down

0 comments on commit 38ab3e0

Please sign in to comment.