Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: API RPC call statistics #2398

Closed
wants to merge 151 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
151 commits
Select commit Hold shift + click to select a range
4c3c455
fix: GroupApplicationAcceptedNotification
withchao Jan 11, 2024
f5f2cf4
Merge remote-tracking branch 'origin/main'
withchao Jan 11, 2024
ad979d4
Merge branch 'openimsdk:main' into main
withchao Jan 11, 2024
a00d77a
fix: GroupApplicationAcceptedNotification
withchao Jan 11, 2024
48ff03f
fix: NotificationUserInfoUpdate
withchao Jan 12, 2024
45859d9
Merge branch 'openimsdk:main' into main
withchao Jan 16, 2024
e3d78de
Merge branch 'openimsdk:main' into main
withchao Jan 18, 2024
c81e199
Merge branch 'openimsdk:main' into main
withchao Jan 22, 2024
4266fed
cicd: robot automated Change
withchao Jan 22, 2024
1a14191
Merge remote-tracking branch 'upstream/main'
withchao Jan 26, 2024
86d3a66
Merge branch 'openimsdk:main' into main
withchao Jan 28, 2024
1964a4b
Merge branch 'openimsdk:main' into main
withchao Jan 31, 2024
d91cde9
Merge branch 'openimsdk:main' into main
withchao Feb 5, 2024
6a05e64
Merge branch 'openimsdk:main' into main
withchao Feb 20, 2024
39a6141
Merge branch 'openimsdk:main' into main
withchao Mar 6, 2024
c2009ce
Merge branch 'openimsdk:main' into main
withchao Mar 7, 2024
34c7f22
Merge branch 'openimsdk:main' into main
withchao Mar 8, 2024
ea9c75b
Merge branch 'openimsdk:main' into main
withchao Mar 13, 2024
7b5279a
fix: component
withchao Mar 13, 2024
34daf13
fix: getConversationInfo
withchao Mar 13, 2024
65d58ae
Merge branch 'openimsdk:main' into main
withchao Mar 13, 2024
e601b5f
Merge remote-tracking branch 'upstream/main'
withchao Mar 20, 2024
5f72b1e
Merge remote-tracking branch 'origin/main'
withchao Mar 20, 2024
20dfafd
Merge branch 'openimsdk:main' into main
withchao Apr 24, 2024
dc57d38
feat: cron task
withchao Apr 25, 2024
2f13149
feat: cron task
withchao Apr 25, 2024
2e43950
feat: cron task
withchao Apr 25, 2024
7a29a85
feat: cron task
withchao Apr 26, 2024
3920c06
feat: cron task
withchao Apr 26, 2024
bed7a60
Merge branch 'openimsdk:main' into main
withchao Apr 26, 2024
f2fbf19
Merge remote-tracking branch 'refs/remotes/origin/main' into corn37
withchao Apr 26, 2024
01851df
Merge remote-tracking branch 'upstream/main'
withchao Apr 28, 2024
480ccc7
fix: minio config url recognition error
withchao Apr 28, 2024
1046323
Merge branch 'openimsdk:main' into main
withchao May 8, 2024
76c6fe8
Merge branch 'openimsdk:main' into main
withchao May 10, 2024
6b119bd
Merge branch 'openimsdk:main' into main
withchao May 17, 2024
5528e8e
new mongo
withchao May 22, 2024
c6942f0
new mongo
withchao May 22, 2024
82c6b00
new mongo
withchao May 23, 2024
a2bfc90
new mongo
withchao May 23, 2024
05cec1b
new mongo
withchao May 24, 2024
1184002
new mongo
withchao May 24, 2024
e99eaf9
new mongo
withchao May 24, 2024
2f97933
new mongo
withchao May 24, 2024
1f02bdc
friend incr sync
withchao May 27, 2024
8e37a41
friend incr sync
withchao May 27, 2024
f6131c4
friend incr sync
withchao May 29, 2024
0d57f28
friend incr sync
withchao May 29, 2024
9ba22f3
friend incr sync
withchao May 30, 2024
61740d4
Merge branch 'openimsdk:main' into main
withchao May 30, 2024
10315e9
Merge branch 'main' into list
withchao May 30, 2024
eb362da
mage
withchao May 30, 2024
0aaf8b9
optimization version log
withchao May 30, 2024
0f72de8
optimization version log
withchao May 30, 2024
cfc01bb
sync
withchao May 31, 2024
e9f4627
sync
withchao Jun 3, 2024
6363358
sync
withchao Jun 3, 2024
1b5621b
group sync
withchao Jun 3, 2024
6939352
sync option
withchao Jun 4, 2024
7e13faa
sync option
withchao Jun 4, 2024
1796c3a
refactor: replace `friend` package with `realtion`.
mo3et Jun 5, 2024
cfd07b1
Merge branch 'openimsdk:main' into list
mo3et Jun 5, 2024
7821f5f
Merge branch 'list' into list
mo3et Jun 5, 2024
e11807c
refactor: update lastest commit to relation.
mo3et Jun 5, 2024
9362169
Merge pull request #1 from mo3et/list
FGadvancer Jun 5, 2024
6285b68
sync option
withchao Jun 6, 2024
8e1d0c5
Merge remote-tracking branch 'origin/list' into list
withchao Jun 6, 2024
c5f565f
sync option
withchao Jun 6, 2024
a1523f4
sync option
withchao Jun 6, 2024
58c4c13
sync
withchao Jun 7, 2024
caebdf3
sync
withchao Jun 11, 2024
17dad5c
Merge branch 'openimsdk:main' into main
withchao Jun 12, 2024
1b10271
go.mod
withchao Jun 13, 2024
ef71d0c
seq
withchao Jun 14, 2024
904842b
update: go mod
icey-yu Jun 14, 2024
c5fa596
refactor: change incremental to full
icey-yu Jun 14, 2024
f19f6f9
feat: get full friend user ids
icey-yu Jun 14, 2024
cee1a49
feat: api and config
icey-yu Jun 14, 2024
fe4842b
seq
withchao Jun 14, 2024
1aa610c
Merge pull request #2 from icey-yu/friend-increamental
withchao Jun 14, 2024
a41c8c6
group version
withchao Jun 17, 2024
5156795
Merge branch 'openimsdk:main' into main
withchao Jun 17, 2024
c161330
Merge remote-tracking branch 'origin/list' into list
withchao Jun 17, 2024
bd4fb8a
merge
withchao Jun 17, 2024
ec40d82
Merge remote-tracking branch 'origin/main' into list
withchao Jun 17, 2024
d55d416
seq
withchao Jun 17, 2024
a2a28b4
seq
withchao Jun 18, 2024
cce382d
seq
withchao Jun 18, 2024
aff8322
fix: sort by id avoid unstable sort friends.
FGadvancer Jun 18, 2024
08cb4a9
group
withchao Jun 19, 2024
f613c6d
group
withchao Jun 19, 2024
7182298
Merge remote-tracking branch 'origin/list' into seq38
withchao Jun 19, 2024
1ee33f3
group
withchao Jun 19, 2024
924888d
fix: sort by id avoid unstable sort friends.
FGadvancer Jun 19, 2024
5006af5
fix: sort by id avoid unstable sort friends.
FGadvancer Jun 19, 2024
4f359b7
fix: sort by id avoid unstable sort friends.
FGadvancer Jun 19, 2024
4074f81
Merge branch 'refs/heads/list' into seq38
withchao Jun 19, 2024
03d4564
user version
withchao Jun 20, 2024
88c1510
Merge remote-tracking branch 'origin/list' into list
withchao Jun 20, 2024
ac9f4f6
Merge remote-tracking branch 'origin/list' into seq38
withchao Jun 20, 2024
425008a
Merge remote-tracking branch 'origin/seq38' into seq38
withchao Jun 20, 2024
2d2941a
seq
withchao Jun 20, 2024
8e3890e
seq
withchao Jun 21, 2024
9b043fe
seq user
withchao Jun 24, 2024
8f0403e
user online
withchao Jun 26, 2024
2b2a75f
implement minio expire delete.
mo3et Jun 28, 2024
8f86049
user online
withchao Jun 28, 2024
3960d28
config
withchao Jun 28, 2024
affa909
fix
withchao Jun 28, 2024
3ef62d4
fix
withchao Jun 28, 2024
dac8fba
implement minio expire delete logic.
mo3et Jun 28, 2024
3167780
online cache
withchao Jul 1, 2024
d06c323
online cache
withchao Jul 1, 2024
bf2cf42
Merge pull request #6 from mo3et/minio-cron-del
withchao Jul 1, 2024
4781cf4
Merge branch 'openimsdk:main' into main
withchao Jul 1, 2024
45b07bc
online cache
withchao Jul 1, 2024
32c5f65
online cache
withchao Jul 2, 2024
9388cb6
online cache
withchao Jul 2, 2024
28c8b78
online cache
withchao Jul 2, 2024
6e2659c
online cache
withchao Jul 2, 2024
f87ee44
online cache
withchao Jul 2, 2024
1bfaf3e
online cache
withchao Jul 2, 2024
dcd8749
online cache
withchao Jul 2, 2024
14aba3b
online cache
withchao Jul 2, 2024
3df39a8
online cache
withchao Jul 2, 2024
3306117
feat: implement scheduled delete outdated object in minio.
mo3et Jul 2, 2024
bfbfb78
Merge branch 'openimsdk:main' into main
withchao Jul 3, 2024
8d4737c
update gomake version
withchao Jul 3, 2024
0b4c802
update gomake version
withchao Jul 3, 2024
7897044
implement FindExpires pagination.
mo3et Jul 3, 2024
686ccae
remove unnesseary incr.
mo3et Jul 3, 2024
a421bd1
fix uncorrect args call.
mo3et Jul 3, 2024
c117ef8
Merge pull request #8 from mo3et/minio-cron-del
withchao Jul 3, 2024
e791cbd
online push
withchao Jul 3, 2024
006766c
online push
withchao Jul 3, 2024
fcda73f
online push
withchao Jul 4, 2024
a464750
Merge branch 'openimsdk:main' into main
withchao Jul 4, 2024
73a9265
Merge remote-tracking branch 'origin/main' into seq38
withchao Jul 4, 2024
c5cf078
resolving conflicts
withchao Jul 4, 2024
f315177
Merge branch 'seq38' into online
withchao Jul 4, 2024
a9ab9ba
resolving conflicts
withchao Jul 4, 2024
97636c4
test
withchao Jul 4, 2024
1336b83
api prommetrics
withchao Jul 4, 2024
6713986
api prommetrics
withchao Jul 4, 2024
9151b56
api prommetrics
withchao Jul 4, 2024
c1d66fa
api prommetrics
withchao Jul 4, 2024
e3ee24d
api prommetrics
withchao Jul 4, 2024
0a9b53b
rpc prommetrics
withchao Jul 5, 2024
ab84d77
Merge branch 'openimsdk:main' into main
withchao Jul 5, 2024
181107d
Merge branch 'main' into count
withchao Jul 5, 2024
2052e1c
rpc prommetrics
withchao Jul 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/openim-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ func main() {
if err := cmd.NewApiCmd().Exec(); err != nil {
program.ExitWithError(err)
}

}
3 changes: 2 additions & 1 deletion config/openim-crontask.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
chatRecordsClearTime: "0 2 * * *"
cronExecuteTime: "0 2 * * *"
retainChatRecords: 365
fileExpireTime: 90
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.17
github.com/openimsdk/protocol v0.0.69-alpha.24
github.com/openimsdk/tools v0.0.49-alpha.45
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down Expand Up @@ -175,5 +175,3 @@ require (
golang.org/x/crypto v0.21.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

//replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M=
github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc=
github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0=
github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
Expand Down
1 change: 0 additions & 1 deletion internal/api/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func (o *FriendApi) GetFriendList(c *gin.Context) {

func (o *FriendApi) GetDesignatedFriends(c *gin.Context) {
a2r.Call(relation.FriendClient.GetDesignatedFriends, o.Client, c)
//a2r.Call(relation.FriendClient.GetDesignatedFriends, o.Client, c, a2r.NewNilReplaceOption(relation.FriendClient.GetDesignatedFriends))
}

func (o *FriendApi) SetFriendRemark(c *gin.Context) {
Expand Down
9 changes: 4 additions & 5 deletions internal/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
Expand Down Expand Up @@ -72,10 +71,10 @@ func Start(ctx context.Context, index int, config *Config) error {
netDone <- struct{}{}
return
}
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort))
if err = p.Use(router); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("prometheus start err: %d", prometheusPort))
srv := http.NewServeMux()
srv.Handle(prommetrics.ApiPath, prommetrics.ApiHandler())
if err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv); err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort))
netDone <- struct{}{}
}
}()
Expand Down
23 changes: 19 additions & 4 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package api

import (
"fmt"
"net/http"
"strings"

"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
Expand All @@ -17,8 +15,25 @@ import (
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"net/http"
"strings"
)

func prommetricsGin() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
path := c.FullPath()
if c.Writer.Status() == http.StatusNotFound {
prommetrics.HttpCall("<404>", c.Request.Method, c.Writer.Status())
} else {
prommetrics.HttpCall(path, c.Request.Method, c.Writer.Status())
}
if resp := apiresp.GetGinApiResponse(c); resp != nil {
prommetrics.APICall(path, c.Request.Method, resp.ErrCode)
}
}
}

func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
Expand All @@ -37,7 +52,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL)

r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
userRouterGroup := r.Group("/user")
Expand Down
2 changes: 2 additions & 0 deletions internal/msggateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Client struct {
closed atomic.Bool
closedErr error
token string
//subLock sync.Mutex
//subUserIDs map[string]struct{}
}

// ResetClient updates the client's state with new connection and context information.
Expand Down
7 changes: 3 additions & 4 deletions internal/msggateway/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@ func Start(ctx context.Context, index int, conf *Config) error {
if err != nil {
return err
}
longServer, err := NewWsServer(
longServer := NewWsServer(
conf,
WithPort(wsPort),
WithMaxConnNum(int64(conf.MsgGateway.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(conf.MsgGateway.LongConnSvr.WebsocketTimeout)*time.Second),
WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen),
)
if err != nil {
return err
}

go longServer.ChangeOnlineStatus(4)

hubServer := NewServer(rpcPort, longServer, conf)
netDone := make(chan error)
Expand Down
68 changes: 35 additions & 33 deletions internal/msggateway/n_ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ type LongConnServer interface {
}

type WsServer struct {
msgGatewayConfig *Config
port int
wsMaxConnNum int64
registerChan chan *Client
unregisterChan chan *Client
kickHandlerChan chan *kickHandler
clients *UserMap
msgGatewayConfig *Config
port int
wsMaxConnNum int64
registerChan chan *Client
unregisterChan chan *Client
kickHandlerChan chan *kickHandler
clients UserMap
//subscription *Subscription
clientPool sync.Pool
onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64
Expand Down Expand Up @@ -90,18 +91,18 @@ func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry,
ws.disCov = disCov
}

func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
err := ws.userClient.SetUserStatus(ctx, client.UserID, status, client.PlatformID)
if err != nil {
log.ZWarn(ctx, "SetUserStatus err", err)
}
switch status {
case constant.Online:
ws.webhookAfterUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOnline, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
case constant.Offline:
ws.webhookAfterUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOffline, client.UserID, client.PlatformID, client.ctx.GetConnID())
}
}
//func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
// err := ws.userClient.SetUserStatus(ctx, client.UserID, status, client.PlatformID)
// if err != nil {
// log.ZWarn(ctx, "SetUserStatus err", err)
// }
// switch status {
// case constant.Online:
// ws.webhookAfterUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOnline, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
// case constant.Offline:
// ws.webhookAfterUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOffline, client.UserID, client.PlatformID, client.ctx.GetConnID())
// }
//}

func (ws *WsServer) UnRegister(c *Client) {
ws.unregisterChan <- c
Expand All @@ -119,7 +120,7 @@ func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client,
return ws.clients.Get(userID, platform)
}

func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) {
func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
var config configs
for _, o := range opts {
o(&config)
Expand All @@ -141,10 +142,11 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) {
kickHandlerChan: make(chan *kickHandler, 1000),
validate: v,
clients: newUserMap(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
}, nil
//subscription: newSubscription(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
}
}

func (ws *WsServer) Run(done chan error) error {
Expand Down Expand Up @@ -278,11 +280,11 @@ func (ws *WsServer) registerClient(client *Client) {
}()
}

wg.Add(1)
go func() {
defer wg.Done()
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
}()
//wg.Add(1)
//go func() {
// defer wg.Done()
// ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
//}()

wg.Wait()

Expand All @@ -309,7 +311,7 @@ func getRemoteAdders(client []*Client) string {
}

func (ws *WsServer) KickUserConn(client *Client) error {
ws.clients.deleteClients(client.UserID, []*Client{client})
ws.clients.DeleteClients(client.UserID, []*Client{client})
return client.KickOnlineMessage()
}

Expand All @@ -325,7 +327,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
if !clientOK {
return
}
ws.clients.deleteClients(newClient.UserID, oldClients)
ws.clients.DeleteClients(newClient.UserID, oldClients)
for _, c := range oldClients {
err := c.KickOnlineMessage()
if err != nil {
Expand All @@ -345,13 +347,13 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien

func (ws *WsServer) unregisterClient(client *Client) {
defer ws.clientPool.Put(client)
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
isDeleteUser := ws.clients.DeleteClients(client.UserID, []*Client{client})
if isDeleteUser {
ws.onlineUserNum.Add(-1)
prommetrics.OnlineUserGauge.Dec()
}
ws.onlineUserConnNum.Add(-1)
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
ws.onlineUserNum.Load(), "online user conn Num",
ws.onlineUserConnNum.Load(),
Expand Down
112 changes: 112 additions & 0 deletions internal/msggateway/online.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package msggateway

import (
"context"
"crypto/md5"
"encoding/binary"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"math/rand"
"strconv"
"time"
)

func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
if concurrent < 1 {
concurrent = 1
}
const renewalTime = cachekey.OnlineExpire / 3
//const renewalTime = time.Second * 10
renewalTicker := time.NewTicker(renewalTime)

requestChs := make([]chan *pbuser.SetUserOnlineStatusReq, concurrent)
changeStatus := make([][]UserState, concurrent)

for i := 0; i < concurrent; i++ {
requestChs[i] = make(chan *pbuser.SetUserOnlineStatusReq, 64)
changeStatus[i] = make([]UserState, 0, 100)
}

mergeTicker := time.NewTicker(time.Second)

local2pb := func(u UserState) *pbuser.UserOnlineStatus {
return &pbuser.UserOnlineStatus{
UserID: u.UserID,
Online: u.Online,
Offline: u.Offline,
}
}

rNum := rand.Uint64()
pushUserState := func(us ...UserState) {
for _, u := range us {
sum := md5.Sum([]byte(u.UserID))
i := (binary.BigEndian.Uint64(sum[:]) + rNum) % uint64(concurrent)
changeStatus[i] = append(changeStatus[i], u)
status := changeStatus[i]
if len(status) == cap(status) {
req := &pbuser.SetUserOnlineStatusReq{
Status: datautil.Slice(status, local2pb),
}
changeStatus[i] = status[:0]
select {
case requestChs[i] <- req:
default:
log.ZError(context.Background(), "user online processing is too slow", nil)
}
}
}
}

pushAllUserState := func() {
for i, status := range changeStatus {
if len(status) == 0 {
continue
}
req := &pbuser.SetUserOnlineStatusReq{
Status: datautil.Slice(status, local2pb),
}
changeStatus[i] = status[:0]
select {
case requestChs[i] <- req:
default:
log.ZError(context.Background(), "user online processing is too slow", nil)
}
}
}

opIdCtx := mcontext.SetOperationID(context.Background(), "r"+strconv.FormatUint(rNum, 10))
doRequest := func(req *pbuser.SetUserOnlineStatusReq) {
ctx, cancel := context.WithTimeout(opIdCtx, time.Second*5)
defer cancel()
if _, err := ws.userClient.Client.SetUserOnlineStatus(ctx, req); err != nil {
log.ZError(ctx, "update user online status", err)
}
}

for i := 0; i < concurrent; i++ {
go func(ch <-chan *pbuser.SetUserOnlineStatusReq) {
for req := range ch {
doRequest(req)
}
}(requestChs[i])
}

for {
select {
case <-mergeTicker.C:
pushAllUserState()
case now := <-renewalTicker.C:
deadline := now.Add(-cachekey.OnlineExpire / 3)
users := ws.clients.GetAllUserStatus(deadline, now)
log.ZDebug(context.Background(), "renewal ticker", "deadline", deadline, "nowtime", now, "num", len(users))
pushUserState(users...)
case state := <-ws.clients.UserState():
log.ZDebug(context.Background(), "OnlineCache user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline)
pushUserState(state)
}
}
}
Loading
Loading