From d5d2803e76b44c4316417aad3fb78e99272e4bf5 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Tue, 20 Feb 2024 20:56:29 +0800 Subject: [PATCH] feat: optimize server code (#1931) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * utils.Wrap -> errs.Wrap * utils.Wrap -> errs.Wrap --------- Co-authored-by: withchao --- go.mod | 2 +- go.sum | 4 +- internal/msggateway/client.go | 15 ++--- internal/msggateway/compressor.go | 21 ++++--- internal/msggateway/encoder.go | 5 +- internal/push/offlinepush/getui/push.go | 4 +- internal/rpc/auth/auth.go | 6 +- internal/rpc/group/group.go | 2 +- pkg/common/db/cache/meta_cache.go | 4 +- pkg/common/db/cache/msg.go | 77 +++++++++++++++---------- pkg/common/db/controller/auth.go | 5 +- pkg/common/db/controller/msg.go | 2 +- pkg/common/db/unrelation/msg.go | 18 +++--- pkg/common/db/unrelation/user.go | 3 +- pkg/common/http/http_client.go | 20 +++---- pkg/common/kafka/producer.go | 11 ++-- 16 files changed, 101 insertions(+), 98 deletions(-) diff --git a/go.mod b/go.mod index be835c0a41..ac02683b42 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible github.com/OpenIMSDK/protocol v0.0.55 - github.com/OpenIMSDK/tools v0.0.35 + github.com/OpenIMSDK/tools v0.0.36 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 github.com/gin-gonic/gin v1.9.1 diff --git a/go.sum b/go.sum index c4b661f1c2..38596d45e3 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw= github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE= github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA= github.com/OpenIMSDK/protocol v0.0.55/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.35 h1:YH8UYoaErXqfNrwpUvQxe8nhL++gFH6qCisQPyzk0w8= -github.com/OpenIMSDK/tools v0.0.35/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= +github.com/OpenIMSDK/tools v0.0.36 h1:BT0q64l4f3QJDW16Rc0uJYt1gQFkiPoUQYQ33vo0EcE= +github.com/OpenIMSDK/tools v0.0.36/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 43047fd735..dfd2b03d3e 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/tools/errs" "runtime/debug" "sync" "sync/atomic" @@ -173,7 +174,7 @@ func (c *Client) handleMessage(message []byte) error { var err error message, err = c.longConnServer.DecompressWithPool(message) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } } @@ -182,15 +183,15 @@ func (c *Client) handleMessage(message []byte) error { err := c.longConnServer.Decode(message, binaryReq) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } if err := c.longConnServer.Validate(binaryReq); err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } if binaryReq.SendID != c.UserID { - return utils.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String()) + return errs.Wrap(errors.New("exception conn userID not same to req userID"), binaryReq.String()) } ctx := mcontext.WithMustInfoCtx( @@ -313,7 +314,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error { encodedBuf, err := c.longConnServer.Encode(resp) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } c.w.Lock() @@ -323,7 +324,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error { if c.IsCompress { resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf) if compressErr != nil { - return utils.Wrap(compressErr, "") + return errs.Wrap(compressErr) } return c.conn.WriteMessage(MessageBinary, resultBuf) } @@ -341,7 +342,7 @@ func (c *Client) writePongMsg() error { err := c.conn.SetWriteDeadline(writeWait) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } return c.conn.WriteMessage(PongMessage, nil) diff --git a/internal/msggateway/compressor.go b/internal/msggateway/compressor.go index ae5e9cdd04..9bbec1ec9e 100644 --- a/internal/msggateway/compressor.go +++ b/internal/msggateway/compressor.go @@ -18,10 +18,9 @@ import ( "bytes" "compress/gzip" "errors" + "github.com/OpenIMSDK/tools/errs" "io" "sync" - - "github.com/OpenIMSDK/tools/utils" ) var ( @@ -47,10 +46,10 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) { gzipBuffer := bytes.Buffer{} gz := gzip.NewWriter(&gzipBuffer) if _, err := gz.Write(rawData); err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } if err := gz.Close(); err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } return gzipBuffer.Bytes(), nil } @@ -63,10 +62,10 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) { gz.Reset(&gzipBuffer) if _, err := gz.Write(rawData); err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } if err := gz.Close(); err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } return gzipBuffer.Bytes(), nil } @@ -75,11 +74,11 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) { buff := bytes.NewBuffer(compressedData) reader, err := gzip.NewReader(buff) if err != nil { - return nil, utils.Wrap(err, "NewReader failed") + return nil, errs.Wrap(err, "NewReader failed") } compressedData, err = io.ReadAll(reader) if err != nil { - return nil, utils.Wrap(err, "ReadAll failed") + return nil, errs.Wrap(err, "ReadAll failed") } _ = reader.Close() return compressedData, nil @@ -88,18 +87,18 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) { func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) { reader := gzipReaderPool.Get().(*gzip.Reader) if reader == nil { - return nil, errors.New("NewReader failed") + return nil, errs.Wrap(errors.New("NewReader failed")) } defer gzipReaderPool.Put(reader) err := reader.Reset(bytes.NewReader(compressedData)) if err != nil { - return nil, utils.Wrap(err, "NewReader failed") + return nil, errs.Wrap(err, "NewReader failed") } compressedData, err = io.ReadAll(reader) if err != nil { - return nil, utils.Wrap(err, "ReadAll failed") + return nil, errs.Wrap(err, "ReadAll failed") } _ = reader.Close() return compressedData, nil diff --git a/internal/msggateway/encoder.go b/internal/msggateway/encoder.go index c5f1d00a82..2c46a774bf 100644 --- a/internal/msggateway/encoder.go +++ b/internal/msggateway/encoder.go @@ -17,8 +17,7 @@ package msggateway import ( "bytes" "encoding/gob" - - "github.com/OpenIMSDK/tools/utils" + "github.com/OpenIMSDK/tools/errs" ) type Encoder interface { @@ -47,7 +46,7 @@ func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { dec := gob.NewDecoder(buff) err := dec.Decode(decodeData) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } return nil } diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index b657c9c23c..8115e4efb9 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -34,8 +34,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http" - - "github.com/OpenIMSDK/tools/utils" ) var ( @@ -137,7 +135,7 @@ func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) ( pushReq.Settings = &Settings{TTL: &ttl} err := g.request(ctx, taskURL, pushReq, token, &respTask) if err != nil { - return "", utils.Wrap(err, "") + return "", errs.Wrap(err) } return respTask.TaskID, nil } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index eaf63f868d..1b10ba06a7 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -29,8 +29,6 @@ import ( "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/tokenverify" - "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" @@ -105,7 +103,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret()) if err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } m, err := s.authDatabase.GetTokensWithoutError(ctx, claims.UserID, claims.PlatformID) if err != nil { @@ -121,7 +119,7 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim case constant.KickedToken: return nil, errs.ErrTokenKicked.Wrap() default: - return nil, utils.Wrap(errs.ErrTokenUnknown, "") + return nil, errs.Wrap(errs.ErrTokenUnknown) } } return nil, errs.ErrTokenNotExist.Wrap() diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 1d068b1b2c..95f82266f9 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -956,7 +956,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf return nil, err } if group.Status == constant.GroupStatusDismissed { - return nil, utils.Wrap(errs.ErrDismissedAlready, "") + return nil, errs.Wrap(errs.ErrDismissedAlready) } resp := &pbgroup.SetGroupInfoResp{} count, err := s.db.FindGroupMemberNum(ctx, group.GroupID) diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index 4bc2a046aa..7eb486c9ab 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -134,7 +134,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin } bs, err := json.Marshal(t) if err != nil { - return "", utils.Wrap(err, "") + return "", errs.Wrap(err) } write = true @@ -153,7 +153,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin if err != nil { log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) - return t, utils.Wrap(err, "") + return t, errs.Wrap(err) } return t, nil diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 5cd3cb22c5..8a54e1a8b2 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -149,11 +149,15 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string } func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { - return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) + return errs.Wrap(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) } func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey func(conversationID string) string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, getkey(conversationID)).Int64()) + val, err := c.rdb.Get(ctx, getkey(conversationID)).Int64() + if err != nil { + return 0, errs.Wrap(err) + } + return val, nil } func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { @@ -214,7 +218,11 @@ func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) s } func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()) + val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() + if err != nil { + return 0, errs.Wrap(err) + } + return val, nil } func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (m map[string]int64, err error) { @@ -224,7 +232,7 @@ func (c *msgCache) GetConversationUserMinSeqs(ctx context.Context, conversationI } func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { - return utils.Wrap1(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) + return errs.Wrap(c.rdb.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()) } func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { @@ -240,7 +248,7 @@ func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID strin } func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) + return errs.Wrap(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err()) } func (c *msgCache) SetHasReadSeqs(ctx context.Context, conversationID string, hasReadSeqs map[string]int64) error { @@ -262,12 +270,15 @@ func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversati } func (c *msgCache) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64()) + val, err := c.rdb.Get(ctx, c.getHasReadSeqKey(conversationID, userID)).Int64() + if err != nil { + return 0, err + } + return val, nil } func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) } @@ -694,7 +705,11 @@ func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime i } func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) { - return utils.Wrap2(c.rdb.Get(ctx, getuiToken).Result()) + val, err := c.rdb.Get(ctx, getuiToken).Result() + if err != nil { + return "", errs.Wrap(err) + } + return val, nil } func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { @@ -702,7 +717,11 @@ func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime } func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { - return utils.Wrap2(c.rdb.Get(ctx, getuiTaskID).Result()) + val, err := c.rdb.Get(ctx, getuiTaskID).Result() + if err != nil { + return "", errs.Wrap(err) + } + return val, nil } func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { @@ -720,7 +739,11 @@ func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID i } func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { - return utils.Wrap2(c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result()) + val, err := c.rdb.Get(ctx, FCM_TOKEN+account+":"+strconv.Itoa(platformID)).Result() + if err != nil { + return "", errs.Wrap(err) + } + return val, nil } func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { @@ -738,7 +761,8 @@ func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string } func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { - return utils.Wrap2(c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) + val, err := c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int() + return val, errs.Wrap(err) } func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { @@ -771,42 +795,31 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() if err != nil { - return false, utils.Wrap(err, "") + return false, errs.Wrap(err) } return n > 0, nil } -func (c *msgCache) SetMessageTypeKeyValue( - ctx context.Context, - clientMsgID string, - sessionType int32, - typeKey, value string, -) error { +func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) } func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) + val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result() + return val, errs.Wrap(err) } func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) + val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result() + return val, errs.Wrap(err) } -func (c *msgCache) GetOneMessageAllReactionList( - ctx context.Context, - clientMsgID string, - sessionType int32, -) (map[string]string, error) { - return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) +func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { + val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() + return val, errs.Wrap(err) } -func (c *msgCache) DeleteOneMessageKey( - ctx context.Context, - clientMsgID string, - sessionType int32, - subKey string, -) error { +func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) } diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index 17b4a440d4..fe15198678 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -16,6 +16,7 @@ package controller import ( "context" + "github.com/OpenIMSDK/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -23,8 +24,6 @@ import ( "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/tokenverify" - "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" ) @@ -78,7 +77,7 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) tokenString, err := token.SignedString([]byte(a.accessSecret)) if err != nil { - return "", utils.Wrap(err, "") + return "", errs.Wrap(err) } return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index b841a7d310..d427cc3a16 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -408,7 +408,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() } - return lastMaxSeq, isNew, utils.Wrap(err, "") + return lastMaxSeq, isNew, errs.Wrap(err) } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 9b461dd1f0..0aa9fa58d0 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -35,8 +35,6 @@ import ( "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/utils" - table "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" ) @@ -79,7 +77,7 @@ func (m *MsgMongoDriver) UpdateMsg( update := bson.M{"$set": bson.M{field: value}} res, err := m.MsgCollection.UpdateOne(ctx, filter, update) if err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } return res, nil } @@ -106,7 +104,7 @@ func (m *MsgMongoDriver) PushUnique( } res, err := m.MsgCollection.UpdateOne(ctx, filter, update) if err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } return res, nil } @@ -118,7 +116,7 @@ func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, ind bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}}, ) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } return nil } @@ -133,7 +131,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc( msg.Status = status bytes, err := proto.Marshal(msg) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } _, err = m.MsgCollection.UpdateOne( ctx, @@ -141,7 +139,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc( bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}}, ) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } return nil } @@ -167,12 +165,12 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex( findOpts, ) if err != nil { - return nil, utils.Wrap(err, "") + return nil, errs.Wrap(err) } var msgs []table.MsgDocModel err = cursor.All(ctx, &msgs) if err != nil { - return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String())) + return nil, errs.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String())) } if len(msgs) > 0 { return &msgs[0], nil @@ -223,7 +221,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st } _, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates) if err != nil { - return utils.Wrap(err, "") + return errs.Wrap(err) } return nil } diff --git a/pkg/common/db/unrelation/user.go b/pkg/common/db/unrelation/user.go index 4b4a78c795..f5595c4ebb 100644 --- a/pkg/common/db/unrelation/user.go +++ b/pkg/common/db/unrelation/user.go @@ -18,7 +18,6 @@ import ( "context" "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/utils" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -119,7 +118,7 @@ func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string opts, ) if err != nil { - return utils.Wrap(err, "transaction failed") + return errs.Wrap(err, "transaction failed") } } return nil diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index a80d1c9a41..7fc456a1d0 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -106,31 +106,31 @@ func PostReturn(ctx context.Context, url string, header map[string]string, input } func callBackPostReturn(ctx context.Context, url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { - defer log.ZDebug(ctx, "callback", "url", url, "command", command, "input", input, "output", output, "callbackConfig", callbackConfig) - // - //v := urllib.Values{} - //v.Set(constant.CallbackCommand, command) - //url = url + "/" + v.Encode() url = url + "/" + command + log.ZInfo(ctx, "callback", "url", url, "input", input, "config", callbackConfig) b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) if err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { - log.ZWarn(ctx, "callback failed but continue", err, "url", url) + log.ZInfo(ctx, "callback failed but continue", err, "url", url) return nil } + log.ZWarn(ctx, "callback network failed", err, "url", url, "input", input) return errs.ErrNetwork.Wrap(err.Error()) } - defer log.ZDebug(ctx, "callback", "data", string(b)) - if err = json.Unmarshal(b, output); err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { log.ZWarn(ctx, "callback failed but continue", err, "url", url) return nil } + log.ZWarn(ctx, "callback json unmarshal failed", err, "url", url, "input", input, "response", string(b)) return errs.ErrData.WithDetail(err.Error() + "response format error") } - - return output.Parse() + if err := output.Parse(); err != nil { + log.ZWarn(ctx, "callback parse failed", err, "url", url, "input", input, "response", string(b)) + return err + } + log.ZInfo(ctx, "callback success", "url", url, "input", input, "response", string(b)) + return nil } func CallBackPostReturn(ctx context.Context, url string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 417aadb542..c2e0f33dc2 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -27,7 +27,6 @@ import ( "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" - "github.com/OpenIMSDK/tools/utils" "google.golang.org/protobuf/proto" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -153,10 +152,10 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag // Marshal the protobuf message bMsg, err := proto.Marshal(msg) if err != nil { - return 0, 0, utils.Wrap(err, "kafka proto Marshal err") + return 0, 0, errs.Wrap(err, "kafka proto Marshal err") } if len(bMsg) == 0 { - return 0, 0, utils.Wrap(errEmptyMsg, "") + return 0, 0, errs.Wrap(errEmptyMsg, "") } // Prepare Kafka message @@ -168,13 +167,13 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag // Validate message key and value if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { - return 0, 0, utils.Wrap(errEmptyMsg, "") + return 0, 0, errs.Wrap(errEmptyMsg) } // Attach context metadata as headers header, err := GetMQHeaderWithContext(ctx) if err != nil { - return 0, 0, utils.Wrap(err, "") + return 0, 0, errs.Wrap(err) } kMsg.Headers = header @@ -182,7 +181,7 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag partition, offset, err := p.producer.SendMessage(kMsg) if err != nil { log.ZWarn(ctx, "p.producer.SendMessage error", err) - return 0, 0, utils.Wrap(err, "") + return 0, 0, errs.Wrap(err) } log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())