From 14477321fd23e8b5992a9ab944a266fc5fa79173 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 3 Dec 2024 14:12:47 +0800 Subject: [PATCH] fix: update set seq implement. (#2911) * update set seq implement. * revert seq logic. * Update func logic. * add log print. --- go.mod | 2 +- go.sum | 4 +-- internal/push/push_handler.go | 1 + internal/rpc/conversation/conversation.go | 7 ++-- internal/rpc/group/group.go | 1 + internal/rpc/msg/clear.go | 35 ++++++++++++------- internal/tools/cron_task.go | 35 ++++++++++--------- pkg/common/storage/controller/conversation.go | 2 +- pkg/common/storage/controller/msg.go | 31 ++++++++++------ pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 3 +- .../storage/database/mgo/group_member.go | 2 +- pkg/rpcclient/conversation.go | 4 +-- pkg/rpcclient/msg.go | 4 +-- 14 files changed, 79 insertions(+), 53 deletions(-) diff --git a/go.mod b/go.mod index 375c0406ca..8eae1edb24 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.57 + github.com/openimsdk/protocol v0.0.72-alpha.59 github.com/openimsdk/tools v0.0.50-alpha.38 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 74ee1c2c61..e572f0b470 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.57 h1:oAVg0SJkDK15L8yDrL0KPG32f3iB/vjEpfpX577p5n4= -github.com/openimsdk/protocol v0.0.72-alpha.57/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/protocol v0.0.72-alpha.59 h1:+ycb2+68mLKPIo7VrxF0id/GXP6OqZ2/nBM1YZQr7qY= +github.com/openimsdk/protocol v0.0.72-alpha.59/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.38 h1:AU6/cvDfN4ciIOwAj8IWEwze3DeEp2cHYPgW3y0OlbU= github.com/openimsdk/tools v0.0.50-alpha.38/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index b5bbed3ed5..c1d1ac2f94 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -414,6 +414,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, if err != nil { return err } + return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) } diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index ce720e6401..0b6b656a4f 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -441,6 +441,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc map[string]any{"max_seq": req.MaxSeq}); err != nil { return nil, err } + return &pbconversation.SetConversationMaxSeqResp{}, nil } @@ -670,7 +671,7 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco }, nil } -func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) { +func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) { num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { log.ZError(ctx, "GetAllConversationIDsNumber failed", err) @@ -694,7 +695,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) if err != nil { - // log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue } @@ -717,7 +718,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex } } - return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil + return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil } func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 9e610df0fe..ba0c8a42de 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -964,6 +964,7 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro if err != nil { return err } + return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) } diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index c5bd36b445..ff732136a9 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -12,13 +12,14 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/stringutil" "golang.org/x/sync/errgroup" ) // hard delete in Database. -func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { +func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { return nil, err } @@ -26,18 +27,19 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error") } var ( - docNum int - msgNum int - start = time.Now() + docNum int + msgNum int + start = time.Now() + getLimit = 5000 ) - clearMsg := func(ctx context.Context) (bool, error) { + destructMsg := func(ctx context.Context) (bool, error) { docIDs, err := m.MsgDatabase.GetDocIDs(ctx) if err != nil { return false, err } - msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000) + msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit) if err != nil { return false, err } @@ -61,7 +63,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return true, nil } - _, err = clearMsg(ctx) + _, err = destructMsg(ctx) if err != nil { log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) return nil, err @@ -69,11 +71,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) - return &msg.ClearMsgResp{}, nil + return &msg.DestructMsgsResp{}, nil } -// soft delete for self -func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { +// soft delete for user self +func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { temp := convert.ConversationsPb2DB(req.Conversations) batchNum := 100 @@ -93,22 +95,31 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime) - seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) if err != nil { log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) continue } if len(seqs) > 0 { + minseq := datautil.Max(seqs...) + + // update if err := m.Conversation.UpdateConversation(handleCtx, &pbconversation.UpdateConversationReq{ UserIDs: []string{conversation.OwnerUserID}, ConversationID: conversation.ConversationID, - LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil { + LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), + MinSeq: wrapperspb.Int64(minseq), + }); err != nil { log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) continue } + if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil { + return err + } + // if you need Notify SDK client userseq is update. // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 484786880b..4f87036c7f 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -76,42 +76,43 @@ func Start(ctx context.Context, config *CronTaskConfig) error { crontab := cron.New() // scheduled hard delete outdated Msgs in specific time. - clearMsgFunc := func() { + destructMsgsFunc := func() { now := time.Now() deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) - log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) + log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) - if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { - log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) + if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil { + log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now)) return } - log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) + log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil { return errs.Wrap(err) } // scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. - msgDestructFunc := func() { + clearMsgFunc := func() { now := time.Now() ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) - log.ZDebug(ctx, "msg destruct cron start", "now", now) + log.ZDebug(ctx, "clear msg cron start", "now", now) - conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) + conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) if err != nil { log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) return - } else { - _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations}) - if err != nil { - log.ZError(ctx, "Destruct Msgs failed.", err) - return - } } - log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now)) + + _, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations}) + if err != nil { + log.ZError(ctx, "Clear Msg failed.", err) + return + } + + log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { return errs.Wrap(err) } diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index f0b7d70db6..bf41cce957 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -179,7 +179,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) } - if conversation.IsPinned == true { + if conversation.IsPinned { pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) } } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 3cd7064585..85b797c401 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -57,8 +57,8 @@ type CommonMsgDatabase interface { // DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis // cache). DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error - // UserMsgsDestruct marks messages for deletion based on destruct time and returns a list of sequence numbers for marked messages. - UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) + // ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. + ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. @@ -92,7 +92,7 @@ type CommonMsgDatabase interface { RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) - // clear msg + // get Msg when destruct msg before GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) @@ -528,10 +528,10 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } -func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { +func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) { var index int64 for { - // from oldest 2 newest + // from oldest 2 newest, ASC msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) if err != nil || msgDocModel.DocID == "" { if err != nil { @@ -544,15 +544,19 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion break } + index++ - // && msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli() + + // && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli() if len(msgDocModel.Msg) > 0 { i := 0 var over bool for _, msg := range msgDocModel.Msg { i++ - if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { - if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { + // over clear time, need to clear + if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() { + // if msg is not in del list, add to del list + if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { seqs = append(seqs, msg.Msg.Seq) } } else { @@ -567,13 +571,18 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string } } - log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) + log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs) + + // have msg need to destruct if len(seqs) > 0 { - userMinSeq := seqs[len(seqs)-1] + 1 - currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) + // update min seq to clear after + userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after + currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before if err != nil { return nil, err } + + // if before < after, update min seq if currentUserMinSeq < userMinSeq { if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { return nil, err diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 5a9b19035d..30ca01ee71 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -16,6 +16,7 @@ package database import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/pagination" ) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index f7ced1c2cf..10e223c893 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -16,9 +16,10 @@ package mgo import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/mongoutil" diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 2fdf2003b5..510a049d05 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -60,7 +60,7 @@ type GroupMemberMgo struct { } func (g *GroupMemberMgo) memberSort() any { - return bson.D{{"role_level", -1}, {"create_time", 1}} + return bson.D{{Key: "role_level", Value: -1}, {Key: "create_time", Value: 1}} } func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) { diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index ccca856194..c69d355d68 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -152,8 +152,8 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont return resp.UserIDs, nil } -func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) { - resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) +func (c *ConversationRpcClient) GetConversationsNeedClearMsg(ctx context.Context) ([]*pbconversation.Conversation, error) { + resp, err := c.Client.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) if err != nil { return nil, err } diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 9b26a7abda..8313937cdb 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -244,8 +244,8 @@ func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversati return resp.MaxSeq, nil } -func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error { - _, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts}) +func (m *MessageRpcClient) DestructMsgs(ctx context.Context, ts int64) error { + _, err := m.Client.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: ts}) return err }