From 04a5c97e3103fe01bd6bc5b2658e2c0fcd2e9ae8 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Fri, 28 Jun 2024 16:14:06 +0800 Subject: [PATCH] feat:optimize GetConversationsHasReadAndMaxSeq --- pkg/common/storage/cache/redis/seq.go | 21 ++++++++--- pkg/rpccache/conversation.go | 50 +++++++++++++++++++++++---- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go index 76dd921a50..c15eb6647e 100644 --- a/pkg/common/storage/cache/redis/seq.go +++ b/pkg/common/storage/cache/redis/seq.go @@ -16,6 +16,7 @@ package redis import ( "context" + "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/errs" @@ -61,17 +62,27 @@ func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey fun func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { m = make(map[string]int64, len(items)) + keys := make([]string, len(items)) for i, v := range items { - res, err := c.rdb.Get(ctx, getkey(v)).Result() - if err != nil && err != redis.Nil { - return nil, errs.Wrap(err) + keys[i] = getkey(v) + } + + res, err := c.rdb.MGet(ctx, keys...).Result() + if err != nil && !errors.Is(err, redis.Nil) { + return nil, errs.Wrap(err) + } + + // len(res) == len(items) + for i := range res { + strRes, ok := res[i].(string) + if !ok { + continue } - val := stringutil.StringToInt64(res) + val := stringutil.StringToInt64(strRes) if val != 0 { m[items[i]] = val } } - return m, nil } diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 4c00dd1f7e..1d40a649c9 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -16,15 +16,21 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mq/memamq" "github.com/redis/go-redis/v9" + "sync" +) + +const ( + notificationWorkerCount = 2 + notificationBufferSize = 200 ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { @@ -39,6 +45,7 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach localcache.WithLocalSuccessTTL(lc.Success()), localcache.WithLocalFailedTTL(lc.Failed()), ), + queue: memamq.NewMemoryQueue(notificationWorkerCount, notificationBufferSize), } if lc.Enable() { go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) @@ -49,6 +56,8 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach type ConversationLocalCache struct { client rpcclient.ConversationRpcClient local localcache.Cache[any] + + queue *memamq.MemoryQueue } func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) { @@ -91,16 +100,43 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) + + errChan := make(chan error, len(conversationIDs)) + conversationsChan := make(chan *pbconversation.Conversation, len(conversationIDs)) + var wg sync.WaitGroup + wg.Add(len(conversationIDs)) + for _, conversationID := range conversationIDs { - conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) - if err != nil { - if errs.ErrRecordNotFound.Is(err) { - continue + err := c.queue.Push(func() { + defer wg.Done() + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + return + } + errChan <- err + return } - return nil, err + conversationsChan <- conversation + }) + if err != nil { + // push err + return nil, errs.Wrap(err) } + } + wg.Wait() + close(errChan) + close(conversationsChan) + + err, ok := <-errChan + if ok { + return nil, err + } + + for conversation := range conversationsChan { conversations = append(conversations, conversation) } + return conversations, nil }