diff --git a/go.mod b/go.mod index ad0937d011..e7d9097d29 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,8 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/redis/go-redis/v9 v9.2.1 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 + go.uber.org/automaxprocs v1.5.3 + golang.org/x/sync v0.4.0 gopkg.in/src-d/go-git.v4 v4.13.1 gotest.tools v2.2.0+incompatible ) @@ -127,12 +129,10 @@ require ( github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.opencensus.io v0.24.0 // indirect go.uber.org/atomic v1.7.0 // indirect - go.uber.org/automaxprocs v1.5.3 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect - golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/go.sum b/go.sum index e1886ca911..b1ee37912d 100644 --- a/go.sum +++ b/go.sum @@ -280,6 +280,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/internal/api/third.go b/internal/api/third.go index fca133ea98..5191903da7 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -15,11 +15,12 @@ package api import ( - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "math/rand" "net/http" "strconv" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/gin-gonic/gin" "github.com/OpenIMSDK/protocol/third" diff --git a/internal/push/offlinepush/dummy/push.go b/internal/push/offlinepush/dummy/push.go index 1be234d688..2b15bc05da 100644 --- a/internal/push/offlinepush/dummy/push.go +++ b/internal/push/offlinepush/dummy/push.go @@ -2,6 +2,7 @@ package dummy import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" ) diff --git a/internal/tools/cron_task_test.go b/internal/tools/cron_task_test.go index 2fcfba01b4..1f4f1f5c1c 100644 --- a/internal/tools/cron_task_test.go +++ b/internal/tools/cron_task_test.go @@ -7,10 +7,11 @@ import ( "testing" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) func TestDisLock(t *testing.T) { diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index e9e2c3b13d..1e0ab3214f 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -22,37 +22,37 @@ import ( type SendMsg struct { // SendID uniquely identifies the sender. SendID string `json:"sendID" binding:"required"` - + // GroupID is the identifier for the group, required if SessionType is 2 or 3. GroupID string `json:"groupID" binding:"required_if=SessionType 2|required_if=SessionType 3"` - + // SenderNickname is the nickname of the sender. SenderNickname string `json:"senderNickname"` - + // SenderFaceURL is the URL to the sender's avatar. SenderFaceURL string `json:"senderFaceURL"` - + // SenderPlatformID is an integer identifier for the sender's platform. SenderPlatformID int32 `json:"senderPlatformID"` - + // Content is the actual content of the message, required and excluded from Swagger documentation. Content map[string]interface{} `json:"content" binding:"required" swaggerignore:"true"` - + // ContentType is an integer that represents the type of the content. ContentType int32 `json:"contentType" binding:"required"` - + // SessionType is an integer that represents the type of session for the message. SessionType int32 `json:"sessionType" binding:"required"` - + // IsOnlineOnly specifies if the message is only sent when the receiver is online. IsOnlineOnly bool `json:"isOnlineOnly"` - + // NotOfflinePush specifies if the message should not trigger offline push notifications. NotOfflinePush bool `json:"notOfflinePush"` - + // SendTime is a timestamp indicating when the message was sent. SendTime int64 `json:"sendTime"` - + // OfflinePushInfo contains information for offline push notifications. OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` } @@ -67,10 +67,10 @@ type SendMsgReq struct { // BatchSendMsgReq defines the structure for sending a message to multiple recipients. type BatchSendMsgReq struct { SendMsg - + // IsSendAll indicates whether the message should be sent to all users. IsSendAll bool `json:"isSendAll"` - + // RecvIDs is a slice of receiver identifiers to whom the message will be sent, required field. RecvIDs []string `json:"recvIDs" binding:"required"` } @@ -79,7 +79,7 @@ type BatchSendMsgReq struct { type BatchSendMsgResp struct { // Results is a slice of SingleReturnResult, representing the outcome of each message sent. Results []*SingleReturnResult `json:"results"` - + // FailedIDs is a slice of user IDs for whom the message send failed. FailedIDs []string `json:"failedUserIDs"` } @@ -88,13 +88,13 @@ type BatchSendMsgResp struct { type SingleReturnResult struct { // ServerMsgID is the message identifier on the server-side. ServerMsgID string `json:"serverMsgID"` - + // ClientMsgID is the message identifier on the client-side. ClientMsgID string `json:"clientMsgID"` - + // SendTime is the timestamp of when the message was sent. SendTime int64 `json:"sendTime"` - + // RecvID uniquely identifies the receiver of the message. RecvID string `json:"recvID"` } diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 903d1fb957..f99b625c70 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -16,10 +16,12 @@ package cmd import ( "fmt" + "github.com/OpenIMSDK/protocol/constant" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/spf13/cobra" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" ) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index b5249b5b44..1e7fe08f86 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -16,22 +16,27 @@ package controller import ( "context" + "encoding/json" "errors" "time" + "github.com/OpenIMSDK/protocol/constant" + + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/redis/go-redis/v9" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" + "go.mongodb.org/mongo-driver/mongo" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "go.mongodb.org/mongo-driver/mongo" pbmsg "github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/sdkws" @@ -397,7 +402,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) - msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) + msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { return nil, err } @@ -408,12 +413,94 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat return totalMsgs, nil } -func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) { +func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*unrelationtb.MsgInfoModel, userID, conversationID string, msg *unrelationtb.MsgInfoModel) { + if msg.IsRead { + msg.Msg.IsRead = true + } + if msg.Msg.ContentType != constant.Quote { + return + } + if msg.Msg.Content == "" { + return + } + type MsgData struct { + SendID string `protobuf:"bytes,1,opt,name=sendID,proto3" json:"sendID"` + RecvID string `protobuf:"bytes,2,opt,name=recvID,proto3" json:"recvID"` + GroupID string `protobuf:"bytes,3,opt,name=groupID,proto3" json:"groupID"` + ClientMsgID string `protobuf:"bytes,4,opt,name=clientMsgID,proto3" json:"clientMsgID"` + ServerMsgID string `protobuf:"bytes,5,opt,name=serverMsgID,proto3" json:"serverMsgID"` + SenderPlatformID int32 `protobuf:"varint,6,opt,name=senderPlatformID,proto3" json:"senderPlatformID"` + SenderNickname string `protobuf:"bytes,7,opt,name=senderNickname,proto3" json:"senderNickname"` + SenderFaceURL string `protobuf:"bytes,8,opt,name=senderFaceURL,proto3" json:"senderFaceURL"` + SessionType int32 `protobuf:"varint,9,opt,name=sessionType,proto3" json:"sessionType"` + MsgFrom int32 `protobuf:"varint,10,opt,name=msgFrom,proto3" json:"msgFrom"` + ContentType int32 `protobuf:"varint,11,opt,name=contentType,proto3" json:"contentType"` + Content string `protobuf:"bytes,12,opt,name=content,proto3" json:"content"` + Seq int64 `protobuf:"varint,14,opt,name=seq,proto3" json:"seq"` + SendTime int64 `protobuf:"varint,15,opt,name=sendTime,proto3" json:"sendTime"` + CreateTime int64 `protobuf:"varint,16,opt,name=createTime,proto3" json:"createTime"` + Status int32 `protobuf:"varint,17,opt,name=status,proto3" json:"status"` + IsRead bool `protobuf:"varint,18,opt,name=isRead,proto3" json:"isRead"` + Options map[string]bool `protobuf:"bytes,19,rep,name=options,proto3" json:"options" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + OfflinePushInfo *sdkws.OfflinePushInfo `protobuf:"bytes,20,opt,name=offlinePushInfo,proto3" json:"offlinePushInfo"` + AtUserIDList []string `protobuf:"bytes,21,rep,name=atUserIDList,proto3" json:"atUserIDList"` + AttachedInfo string `protobuf:"bytes,22,opt,name=attachedInfo,proto3" json:"attachedInfo"` + Ex string `protobuf:"bytes,23,opt,name=ex,proto3" json:"ex"` + } + var quoteMsg struct { + Text string `json:"text,omitempty"` + QuoteMessage *MsgData `json:"quoteMessage,omitempty"` + MessageEntityList json.RawMessage `json:"messageEntityList,omitempty"` + } + if err := json.Unmarshal([]byte(msg.Msg.Content), "eMsg); err != nil { + log.ZError(ctx, "json.Unmarshal", err) + return + } + if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification { + return + } + var msgs []*unrelationtb.MsgInfoModel + if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok { + msgs = v + } else { + if quoteMsg.QuoteMessage.Seq > 0 { + ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msg.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq}) + if err != nil { + log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq) + return + } + msgs = ms + cache[quoteMsg.QuoteMessage.Seq] = ms + } + } + if len(msgs) != 0 && msgs[0].Msg.ContentType != constant.MsgRevokeNotification { + return + } + quoteMsg.QuoteMessage.ContentType = constant.MsgRevokeNotification + if len(msgs) > 0 { + quoteMsg.QuoteMessage.Content = msgs[0].Msg.Content + } else { + quoteMsg.QuoteMessage.Content = "{}" + } + data, err := json.Marshal("eMsg) + if err != nil { + log.ZError(ctx, "json.Marshal", err) + return + } + msg.Msg.Content = string(data) + if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msg.GetDocID(conversationID, msg.Msg.Seq), db.msg.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { + log.ZError(ctx, "UpdateMsgContent", err) + } +} + +func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) + if err != nil { + return nil, err + } + tempCache := make(map[int64][]*unrelationtb.MsgInfoModel) for _, msg := range msgs { - if msg.IsRead { - msg.Msg.IsRead = true - } + db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg) } return msgs, err } @@ -422,7 +509,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) - msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs) + msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { return nil, err } diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 80e2db122b..ba5aecd258 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -144,9 +144,9 @@ func Test_BatchInsertChat2DB(t *testing.T) { } func GetDB() *commonMsgDatabase { - config.Config.Mongo.Address = []string{"192.168.44.128:37017"} + config.Config.Mongo.Address = []string{"203.56.175.233:37017"} // config.Config.Mongo.Timeout = 60 - config.Config.Mongo.Database = "openIM" + config.Config.Mongo.Database = "openIM_v3" // config.Config.Mongo.Source = "admin" config.Config.Mongo.Username = "root" config.Config.Mongo.Password = "openIM123" @@ -232,37 +232,17 @@ func Test_FindBySeq(t *testing.T) { // } //} -//func Test_Delete1(t *testing.T) { -// config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} -// config.Config.Mongo.DBTimeout = 60 -// config.Config.Mongo.DBDatabase = "openIM" -// config.Config.Mongo.DBSource = "admin" -// config.Config.Mongo.DBUserName = "root" -// config.Config.Mongo.DBPassword = "openIM123" -// config.Config.Mongo.DBMaxPoolSize = 100 -// config.Config.Mongo.DBRetainChatRecords = 3650 -// config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" -// -// mongo, err := unrelation.NewMongo() -// if err != nil { -// panic(err) -// } -// err = mongo.GetDatabase().Client().Ping(context.Background(), nil) -// if err != nil { -// panic(err) -// } -// -// c := mongo.GetClient().Database("openIM").Collection("msg") -// -// var o unrelationtb.MsgDocModel -// -// err = c.FindOne(context.Background(), bson.M{"doc_id": "test:0"}).Decode(&o) -// if err != nil { -// panic(err) -// } -// -// for i, model := range o.Msg { -// fmt.Println(i, model == nil) -// } -// -//} +func TestName(t *testing.T) { + db := GetDB() + var seqs []int64 + for i := int64(1); i <= 4; i++ { + seqs = append(seqs, i) + } + msgs, err := db.getMsgBySeqsRange(context.Background(), "4931176757", "si_3866692501_4931176757", seqs, seqs[0], seqs[len(seqs)-1]) + if err != nil { + t.Fatal(err) + } + + t.Log(msgs) + +} diff --git a/pkg/common/db/s3/cont/controller.go b/pkg/common/db/s3/cont/controller.go index 09025e1305..7040c73068 100644 --- a/pkg/common/db/s3/cont/controller.go +++ b/pkg/common/db/s3/cont/controller.go @@ -175,7 +175,6 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa return nil, err } if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash { - fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash) return nil, errors.New("md5 mismatching") } if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil { diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 244f96b459..26b02b16fb 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -2,10 +2,11 @@ package prommetrics import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" ) func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *grpc_prometheus.ServerMetrics, error) { diff --git a/pkg/common/prommetrics/prommetrics_test.go b/pkg/common/prommetrics/prommetrics_test.go index 771fcac191..babc5e4100 100644 --- a/pkg/common/prommetrics/prommetrics_test.go +++ b/pkg/common/prommetrics/prommetrics_test.go @@ -3,9 +3,10 @@ package prommetrics import ( "testing" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) func TestNewGrpcPromObj(t *testing.T) { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 09f7177ae3..d5e31701e6 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -28,10 +28,11 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/network" diff --git a/test/e2e/api/user/curd.go b/test/e2e/api/user/curd.go index cdff1ea1e0..28b55b6824 100644 --- a/test/e2e/api/user/curd.go +++ b/test/e2e/api/user/curd.go @@ -41,4 +41,4 @@ func GetUsersOnlineStatus(token string, userIDs []string) error { UserIDs: userIDs, } return sendPostRequestWithToken("http://your-api-host:port/user/get_users_online_status", token, requestBody) -} \ No newline at end of file +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index d578a988b8..8fe8107895 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -20,4 +20,4 @@ func TestMain(m *testing.M) { func TestE2E(t *testing.T) { RunE2ETests(t) -} \ No newline at end of file +}