diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 0f50c621fd..1375655ba2 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -17,13 +17,12 @@ package main import ( "context" "fmt" - ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "net" _ "net/http/pprof" "strconv" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/discoveryregistry" @@ -33,6 +32,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" ) func main() { @@ -65,7 +65,7 @@ func run(port int, proPort int) error { var client discoveryregistry.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) if err != nil { log.ZError(context.Background(), "Failed to initialize discovery register", err) @@ -86,7 +86,7 @@ func run(port int, proPort int) error { router := api.NewGinRouter(client, rdb) ////////////////////////////// if config.Config.Prometheus.Enable { - p := ginProm.NewPrometheus("app", prom_metrics.GetGinCusMetrics("Api")) + p := ginProm.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p.SetListenAddress(fmt.Sprintf(":%d", proPort)) p.Use(router) } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index c58710e1bc..2466da2ebb 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -17,7 +17,6 @@ package msggateway import ( "context" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "net/http" "strconv" "sync" @@ -33,6 +32,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/redis/go-redis/v9" @@ -221,7 +221,7 @@ func (ws *WsServer) registerClient(client *Client) { if !userOK { ws.clients.Set(client.UserID, client) log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) - prom_metrics.OnlineUserGauge.Add(1) + prommetrics.OnlineUserGauge.Add(1) ws.onlineUserNum.Add(1) ws.onlineUserConnNum.Add(1) } else { @@ -361,7 +361,7 @@ func (ws *WsServer) unregisterClient(client *Client) { isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr()) if isDeleteUser { ws.onlineUserNum.Add(-1) - prom_metrics.OnlineUserGauge.Dec() + prommetrics.OnlineUserGauge.Dec() } ws.onlineUserConnNum.Add(-1) ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index c18186fa8d..bebf6819a9 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -17,16 +17,15 @@ package msgtransfer import ( "errors" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" + "log" + "net/http" + "sync" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "log" - "net/http" - "sync" "github.com/OpenIMSDK/tools/mw" @@ -36,6 +35,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/relation" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -65,7 +66,7 @@ func StartTransfer(prometheusPort int) error { if err := mongo.CreateMsgIndex(); err != nil { return err } - client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) /* client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, @@ -123,7 +124,7 @@ func (m *MsgTransfer) Start(prometheusPort int) error { reg.MustRegister( collectors.NewGoCollector(), ) - reg.MustRegister(prom_metrics.GetGrpcCusMetrics("Transfer")...) + reg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)) } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 88fd256d14..8ef15fe72e 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -16,7 +16,6 @@ package msgtransfer import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "github.com/IBM/sarama" "google.golang.org/protobuf/proto" @@ -27,6 +26,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" ) type OnlineHistoryMongoConsumerHandler struct { @@ -75,9 +75,9 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo( "conversationID", msgFromMQ.ConversationID, ) - prom_metrics.MsgInsertMongoFailedCounter.Inc() + prommetrics.MsgInsertMongoFailedCounter.Inc() } else { - prom_metrics.MsgInsertMongoSuccessCounter.Inc() + prommetrics.MsgInsertMongoSuccessCounter.Inc() } var seqs []int64 for _, msg := range msgFromMQ.MsgData { diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 2f3156c288..44eea15400 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,7 +18,7 @@ import ( "context" "encoding/json" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" "github.com/OpenIMSDK/protocol/conversation" @@ -41,6 +41,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -288,7 +289,7 @@ func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg } err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) if err != nil { - prom_metrics.MsgOfflinePushFailedCounter.Inc() + prommetrics.MsgOfflinePushFailedCounter.Inc() return err } return nil diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index bcca591522..ee8ead194d 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -16,7 +16,6 @@ package auth import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -35,6 +34,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -74,7 +74,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (* if err != nil { return nil, err } - prom_metrics.UserLoginCounter.Inc() + prommetrics.UserLoginCounter.Inc() resp.Token = token resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index b43bc82be6..dd08292bde 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -16,8 +16,8 @@ package msg import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/OpenIMSDK/protocol/constant" @@ -59,7 +59,7 @@ func (m *msgServer) sendMsgSuperGroupChat( req *pbmsg.SendMsgReq, ) (resp *pbmsg.SendMsgResp, err error) { if err = m.messageVerification(ctx, req); err != nil { - prom_metrics.GroupChatMsgProcessFailedCounter.Inc() + prommetrics.GroupChatMsgProcessFailedCounter.Inc() return nil, err } if err = callbackBeforeSendGroupMsg(ctx, req); err != nil { @@ -78,7 +78,7 @@ func (m *msgServer) sendMsgSuperGroupChat( if err = callbackAfterSendGroupMsg(ctx, req); err != nil { log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) } - prom_metrics.GroupChatMsgProcessSuccessCounter.Inc() + prommetrics.GroupChatMsgProcessSuccessCounter.Inc() resp = &pbmsg.SendMsgResp{} resp.SendTime = req.MsgData.SendTime resp.ServerMsgID = req.MsgData.ServerMsgID @@ -161,7 +161,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq } } if !isSend { - prom_metrics.SingleChatMsgProcessFailedCounter.Inc() + prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, nil } else { if err = callbackBeforeSendSingleMsg(ctx, req); err != nil { @@ -171,7 +171,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq return nil, err } if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { - prom_metrics.SingleChatMsgProcessFailedCounter.Inc() + prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } err = callbackAfterSendSingleMsg(ctx, req) @@ -183,7 +183,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq ClientMsgID: req.MsgData.ClientMsgID, SendTime: req.MsgData.SendTime, } - prom_metrics.SingleChatMsgProcessSuccessCounter.Inc() + prommetrics.SingleChatMsgProcessSuccessCounter.Inc() return resp, nil } } diff --git a/internal/tools/msg.go b/internal/tools/msg.go index f13938eba2..ad8f5c4717 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -23,7 +23,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "math/rand" @@ -76,7 +76,7 @@ func InitMsgTool() (*MsgTool, error) { if err != nil { return nil, err } - discov, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + discov, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) /* discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 62ecf7232e..b5249b5b44 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -17,7 +17,6 @@ package controller import ( "context" "errors" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" "time" "github.com/redis/go-redis/v9" @@ -31,6 +30,7 @@ import ( unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "go.mongodb.org/mongo-driver/mongo" pbmsg "github.com/OpenIMSDK/protocol/msg" @@ -376,20 +376,20 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs) if err != nil { - prom_metrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) + prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) } else { - prom_metrics.MsgInsertRedisSuccessCounter.Inc() + prommetrics.MsgInsertRedisSuccessCounter.Inc() } err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) if err != nil { log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID) - prom_metrics.SeqSetFailedCounter.Inc() + prommetrics.SeqSetFailedCounter.Inc() } err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) if err != nil { log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) - prom_metrics.SeqSetFailedCounter.Inc() + prommetrics.SeqSetFailedCounter.Inc() } return lastMaxSeq, isNew, utils.Wrap(err, "") } diff --git a/pkg/common/discovery_register/k8s_discovery_register.go b/pkg/common/discoveryregister/discoveryregister.go similarity index 98% rename from pkg/common/discovery_register/k8s_discovery_register.go rename to pkg/common/discoveryregister/discoveryregister.go index 81543a4474..c204184ff6 100644 --- a/pkg/common/discovery_register/k8s_discovery_register.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -1,4 +1,4 @@ -package discovery_register +package discoveryregister import ( "context" diff --git a/pkg/common/discoveryregister/discoveryregister_test.go b/pkg/common/discoveryregister/discoveryregister_test.go new file mode 100644 index 0000000000..8426598f9e --- /dev/null +++ b/pkg/common/discoveryregister/discoveryregister_test.go @@ -0,0 +1,407 @@ +package discoveryregister + +import ( + "context" + "reflect" + "testing" + + "github.com/OpenIMSDK/tools/discoveryregistry" + "google.golang.org/grpc" +) + +func TestNewDiscoveryRegister(t *testing.T) { + type args struct { + envType string + } + tests := []struct { + name string + args args + want discoveryregistry.SvcDiscoveryRegistry + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewDiscoveryRegister(tt.args.envType) + if (err != nil) != tt.wantErr { + t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewK8sDiscoveryRegister(t *testing.T) { + tests := []struct { + name string + want discoveryregistry.SvcDiscoveryRegistry + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewK8sDiscoveryRegister() + if (err != nil) != tt.wantErr { + t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestK8sDR_Register(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + serviceName string + host string + port int + opts []grpc.DialOption + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr { + t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestK8sDR_UnRegister(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + tests := []struct { + name string + fields fields + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + if err := cli.UnRegister(); (err != nil) != tt.wantErr { + t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestK8sDR_CreateRpcRootNodes(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + serviceNames []string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr { + t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestK8sDR_RegisterConf2Registry(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + key string + conf []byte + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr { + t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestK8sDR_GetConfFromRegistry(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + key string + } + tests := []struct { + name string + fields fields + args args + want []byte + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + got, err := cli.GetConfFromRegistry(tt.args.key) + if (err != nil) != tt.wantErr { + t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestK8sDR_GetConns(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + ctx context.Context + serviceName string + opts []grpc.DialOption + } + tests := []struct { + name string + fields fields + args args + want []*grpc.ClientConn + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestK8sDR_GetConn(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + ctx context.Context + serviceName string + opts []grpc.DialOption + } + tests := []struct { + name string + fields fields + args args + want *grpc.ClientConn + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestK8sDR_GetSelfConnTarget(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + tests := []struct { + name string + fields fields + want string + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + if got := cli.GetSelfConnTarget(); got != tt.want { + t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestK8sDR_AddOption(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + opts []grpc.DialOption + } + tests := []struct { + name string + fields fields + args args + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + cli.AddOption(tt.args.opts...) + }) + } +} + +func TestK8sDR_CloseConn(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + type args struct { + conn *grpc.ClientConn + } + tests := []struct { + name string + fields fields + args args + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + cli.CloseConn(tt.args.conn) + }) + } +} + +func TestK8sDR_GetClientLocalConns(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + tests := []struct { + name string + fields fields + want map[string][]*grpc.ClientConn + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestK8sDR_Close(t *testing.T) { + type fields struct { + options []grpc.DialOption + rpcRegisterAddr string + } + tests := []struct { + name string + fields fields + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cli := &K8sDR{ + options: tt.fields.options, + rpcRegisterAddr: tt.fields.rpcRegisterAddr, + } + cli.Close() + }) + } +} diff --git a/pkg/common/ginPrometheus/ginPrometheus.go b/pkg/common/ginprometheus/ginprometheus.go similarity index 99% rename from pkg/common/ginPrometheus/ginPrometheus.go rename to pkg/common/ginprometheus/ginprometheus.go index 3f7cd65c40..50a6c3a2c7 100644 --- a/pkg/common/ginPrometheus/ginPrometheus.go +++ b/pkg/common/ginprometheus/ginprometheus.go @@ -1,4 +1,4 @@ -package ginPrometheus +package ginprometheus import ( "bytes" diff --git a/pkg/common/locker/doc.go b/pkg/common/locker/doc.go deleted file mode 100644 index 8b9378f90c..0000000000 --- a/pkg/common/locker/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package locker // import "github.com/openimsdk/open-im-server/v3/pkg/common/locker" diff --git a/pkg/common/locker/message_locker.go b/pkg/common/locker/message_locker.go deleted file mode 100644 index 55241eb5fa..0000000000 --- a/pkg/common/locker/message_locker.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package locker - -import ( - "context" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" -) - -const GlOBALLOCK = "GLOBAL_LOCK" - -type MessageLocker interface { - LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) - UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error - LockGlobalMessage(ctx context.Context, clientMsgID string) (err error) - UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error) -} -type LockerMessage struct { - cache cache.MsgModel -} - -func NewLockerMessage(cache cache.MsgModel) *LockerMessage { - return &LockerMessage{cache: cache} -} - -func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) { - for i := 0; i < 3; i++ { - err = l.cache.LockMessageTypeKey(ctx, clientMsgID, typeKey) - if err != nil { - time.Sleep(time.Millisecond * 100) - continue - } else { - break - } - } - return err -} - -func (l *LockerMessage) LockGlobalMessage(ctx context.Context, clientMsgID string) (err error) { - for i := 0; i < 3; i++ { - err = l.cache.LockMessageTypeKey(ctx, clientMsgID, GlOBALLOCK) - if err != nil { - time.Sleep(time.Millisecond * 100) - continue - } else { - break - } - } - return err -} - -func (l *LockerMessage) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, typeKey string) error { - return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, typeKey) -} - -func (l *LockerMessage) UnLockGlobalMessage(ctx context.Context, clientMsgID string) error { - return l.cache.UnLockMessageTypeKey(ctx, clientMsgID, GlOBALLOCK) -} diff --git a/pkg/common/prom_metrics/func.go b/pkg/common/prommetrics/func.go similarity index 87% rename from pkg/common/prom_metrics/func.go rename to pkg/common/prommetrics/func.go index e451c441b9..244f96b459 100644 --- a/pkg/common/prom_metrics/func.go +++ b/pkg/common/prommetrics/func.go @@ -1,9 +1,9 @@ -package prom_metrics +package prommetrics import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus" + "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" ) @@ -35,11 +35,11 @@ func GetGrpcCusMetrics(registerName string) []prometheus.Collector { } } -func GetGinCusMetrics(name string) []*ginPrometheus.Metric { +func GetGinCusMetrics(name string) []*ginprometheus.Metric { switch name { case "Api": - return []*ginPrometheus.Metric{ApiCustomCnt} + return []*ginprometheus.Metric{ApiCustomCnt} default: - return []*ginPrometheus.Metric{ApiCustomCnt} + return []*ginprometheus.Metric{ApiCustomCnt} } } diff --git a/pkg/common/prom_metrics/gin-api.go b/pkg/common/prommetrics/gin-api.go similarity index 91% rename from pkg/common/prom_metrics/gin-api.go rename to pkg/common/prommetrics/gin-api.go index 7aa3f959ee..7cd82dad2c 100644 --- a/pkg/common/prom_metrics/gin-api.go +++ b/pkg/common/prommetrics/gin-api.go @@ -1,6 +1,6 @@ -package prom_metrics +package prommetrics -import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus" +import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" /* labels := prometheus.Labels{"label_one": "any", "label_two": "value"} diff --git a/pkg/common/prom_metrics/grpc-auth.go b/pkg/common/prommetrics/grpc-auth.go similarity index 90% rename from pkg/common/prom_metrics/grpc-auth.go rename to pkg/common/prommetrics/grpc-auth.go index 7ca5f1f493..e44c146bea 100644 --- a/pkg/common/prom_metrics/grpc-auth.go +++ b/pkg/common/prommetrics/grpc-auth.go @@ -1,4 +1,4 @@ -package prom_metrics +package prommetrics import ( "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/common/prom_metrics/grpc-msg.go b/pkg/common/prommetrics/grpc-msg.go similarity index 97% rename from pkg/common/prom_metrics/grpc-msg.go rename to pkg/common/prommetrics/grpc-msg.go index 14cb4d858d..88d4ef3ce9 100644 --- a/pkg/common/prom_metrics/grpc-msg.go +++ b/pkg/common/prommetrics/grpc-msg.go @@ -1,4 +1,4 @@ -package prom_metrics +package prommetrics import ( "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/common/prom_metrics/grpc-msggateway.go b/pkg/common/prommetrics/grpc-msggateway.go similarity index 90% rename from pkg/common/prom_metrics/grpc-msggateway.go rename to pkg/common/prommetrics/grpc-msggateway.go index add72e391e..bb62426e19 100644 --- a/pkg/common/prom_metrics/grpc-msggateway.go +++ b/pkg/common/prommetrics/grpc-msggateway.go @@ -1,4 +1,4 @@ -package prom_metrics +package prommetrics import ( "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/common/prom_metrics/grpc_push.go b/pkg/common/prommetrics/grpc_push.go similarity index 92% rename from pkg/common/prom_metrics/grpc_push.go rename to pkg/common/prommetrics/grpc_push.go index c05dd61805..aa5085c2c6 100644 --- a/pkg/common/prom_metrics/grpc_push.go +++ b/pkg/common/prommetrics/grpc_push.go @@ -1,4 +1,4 @@ -package prom_metrics +package prommetrics import ( "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/common/prom_metrics/transfer.go b/pkg/common/prommetrics/transfer.go similarity index 97% rename from pkg/common/prom_metrics/transfer.go rename to pkg/common/prommetrics/transfer.go index d3fec47d98..6b03870b55 100644 --- a/pkg/common/prom_metrics/transfer.go +++ b/pkg/common/prommetrics/transfer.go @@ -1,4 +1,4 @@ -package prom_metrics +package prommetrics import ( "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 975d212467..09f7177ae3 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,18 +16,19 @@ package startrpc import ( "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "log" "net" "net/http" "strconv" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -55,7 +56,7 @@ func Start( return err } defer listener.Close() - client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) if err != nil { return utils.Wrap1(err) } @@ -70,8 +71,8 @@ func Start( // ctx 中间件 if config.Config.Prometheus.Enable { ////////////////////////// - cusMetrics := prom_metrics.GetGrpcCusMetrics(rpcRegisterName) - reg, metric, err = prom_metrics.NewGrpcPromObj(cusMetrics) + cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName) + reg, metric, err = prommetrics.NewGrpcPromObj(cusMetrics) options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) } else {