Skip to content

Commit

Permalink
perf: reduce register client latency (#1312)
Browse files Browse the repository at this point in the history
Signed-off-by: rfyiamcool <[email protected]>
  • Loading branch information
rfyiamcool authored Nov 2, 2023
1 parent cc1f773 commit 1664579
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions internal/msggateway/n_ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type WsServer struct {
kickHandlerChan chan *kickHandler
clients *UserMap
clientPool sync.Pool
onlineUserNum int64
onlineUserConnNum int64
onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64
handshakeTimeout time.Duration
hubServer *Server
validate *validator.Validate
Expand Down Expand Up @@ -220,8 +220,8 @@ func (ws *WsServer) registerClient(client *Client) {
if !userOK {
ws.clients.Set(client.UserID, client)
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
ws.onlineUserNum.Add(1)
ws.onlineUserConnNum.Add(1)
} else {
i := &kickHandler{
clientOK: clientOK,
Expand All @@ -234,22 +234,35 @@ func (ws *WsServer) registerClient(client *Client) {
ws.clients.Set(client.UserID, client)
// 已经有同平台的连接存在
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
atomic.AddInt64(&ws.onlineUserConnNum, 1)
ws.onlineUserConnNum.Add(1)
} else {
ws.clients.Set(client.UserID, client)

atomic.AddInt64(&ws.onlineUserConnNum, 1)
ws.onlineUserConnNum.Add(1)
}
}
ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()

wg.Add(1)
go func() {
defer wg.Done()
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
}()

wg.Wait()

log.ZInfo(
client.ctx,
"user online",
"online user Num",
ws.onlineUserNum,
ws.onlineUserNum.Load(),
"online user conn Num",
ws.onlineUserConnNum,
ws.onlineUserConnNum.Load(),
)
}

Expand Down Expand Up @@ -282,7 +295,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
if clientOK {
isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients)
if isDeleteUser {
atomic.AddInt64(&ws.onlineUserNum, -1)
ws.onlineUserNum.Add(-1)
}
for _, c := range oldClients {
err := c.KickOnlineMessage()
Expand Down Expand Up @@ -350,18 +363,18 @@ func (ws *WsServer) unregisterClient(client *Client) {
defer ws.clientPool.Put(client)
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
if isDeleteUser {
atomic.AddInt64(&ws.onlineUserNum, -1)
ws.onlineUserNum.Add(-1)
}
atomic.AddInt64(&ws.onlineUserConnNum, -1)
ws.onlineUserConnNum.Add(-1)
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num",
ws.onlineUserConnNum,
ws.onlineUserConnNum.Load(),
)
}

func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
connContext := newContext(w, r)
if ws.onlineUserConnNum >= ws.wsMaxConnNum {
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
httpError(connContext, errs.ErrConnOverMaxNumLimit)
return
}
Expand Down

0 comments on commit 1664579

Please sign in to comment.