diff --git a/go.mod b/go.mod index b982bc7d0a..5e5a8b5bea 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.46 + github.com/openimsdk/protocol v0.0.72-alpha.47 github.com/openimsdk/tools v0.0.50-alpha.16 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index df9cf5194c..53109c8908 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.46 h1:1LZlfEHLzw1F4afFmqBczmXKJWm5rUQ+yr8rJ4oyEAc= -github.com/openimsdk/protocol v0.0.72-alpha.46/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.47 h1:FGHnEwsA05GxT3vnz7YH3fbVkuoO3P71ZZgkQQ71MjA= +github.com/openimsdk/protocol v0.0.72-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/router.go b/internal/api/router.go index 560516d303..17c9989120 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -198,6 +198,13 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En objectGroup.POST("/initiate_form_data", t.InitiateFormData) objectGroup.POST("/complete_form_data", t.CompleteFormData) objectGroup.GET("/*name", t.ObjectRedirect) + + applicationGroup := r.Group("application") + applicationGroup.POST("/add_version", t.AddApplicationVersion) + applicationGroup.POST("/update_version", t.UpdateApplicationVersion) + applicationGroup.POST("/delete_version", t.DeleteApplicationVersion) + applicationGroup.POST("/latest_version", t.LatestApplicationVersion) + applicationGroup.POST("/page_versions", t.PageApplicationVersion) } // Message msgGroup := r.Group("/msg") @@ -290,4 +297,6 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc { var Whitelist = []string{ "/auth/get_admin_token", "/auth/parse_token", + "/application/latest_version", + "/application/page_versions", } diff --git a/internal/api/third.go b/internal/api/third.go index 6baa70ee5d..56661ba89d 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -170,3 +170,23 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) { func (o *ThirdApi) GetPrometheus(c *gin.Context) { c.Redirect(http.StatusFound, o.GrafanaUrl) } + +func (o *ThirdApi) LatestApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.LatestApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) AddApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.AddApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) UpdateApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.UpdateApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) DeleteApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.DeleteApplicationVersion, o.Client, c) +} + +func (o *ThirdApi) PageApplicationVersion(c *gin.Context) { + a2r.Call(third.ThirdClient.PageApplicationVersion, o.Client, c) +} diff --git a/internal/rpc/third/application.go b/internal/rpc/third/application.go new file mode 100644 index 0000000000..a6556055c1 --- /dev/null +++ b/internal/rpc/third/application.go @@ -0,0 +1,117 @@ +package third + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/datautil" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "time" +) + +func IsNotFound(err error) bool { + switch errs.Unwrap(err) { + case redis.Nil, mongo.ErrNoDocuments: + return true + default: + return false + } +} + +func (t *thirdServer) db2pbApplication(val *model.Application) *third.ApplicationVersion { + return &third.ApplicationVersion{ + Id: val.ID.Hex(), + Platform: val.Platform, + Version: val.Version, + Url: val.Url, + Text: val.Text, + Force: val.Force, + Latest: val.Latest, + CreateTime: val.CreateTime.UnixMilli(), + } +} + +func (t *thirdServer) LatestApplicationVersion(ctx context.Context, req *third.LatestApplicationVersionReq) (*third.LatestApplicationVersionResp, error) { + res, err := t.applicationDatabase.LatestVersion(ctx, req.Platform) + if err == nil { + return &third.LatestApplicationVersionResp{Version: t.db2pbApplication(res)}, nil + } else if IsNotFound(err) { + return &third.LatestApplicationVersionResp{}, nil + } else { + return nil, err + } +} + +func (t *thirdServer) AddApplicationVersion(ctx context.Context, req *third.AddApplicationVersionReq) (*third.AddApplicationVersionResp, error) { + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err + } + val := &model.Application{ + ID: primitive.NewObjectID(), + Platform: req.Platform, + Version: req.Version, + Url: req.Url, + Text: req.Text, + Force: req.Force, + Latest: req.Latest, + CreateTime: time.Now(), + } + if err := t.applicationDatabase.AddVersion(ctx, val); err != nil { + return nil, err + } + return &third.AddApplicationVersionResp{}, nil +} + +func (t *thirdServer) UpdateApplicationVersion(ctx context.Context, req *third.UpdateApplicationVersionReq) (*third.UpdateApplicationVersionResp, error) { + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err + } + oid, err := primitive.ObjectIDFromHex(req.Id) + if err != nil { + return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) + } + update := make(map[string]any) + putUpdate(update, "platform", req.Platform) + putUpdate(update, "version", req.Version) + putUpdate(update, "url", req.Url) + putUpdate(update, "text", req.Text) + putUpdate(update, "force", req.Force) + putUpdate(update, "latest", req.Latest) + if err := t.applicationDatabase.UpdateVersion(ctx, oid, update); err != nil { + return nil, err + } + return &third.UpdateApplicationVersionResp{}, nil +} + +func (t *thirdServer) DeleteApplicationVersion(ctx context.Context, req *third.DeleteApplicationVersionReq) (*third.DeleteApplicationVersionResp, error) { + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err + } + ids := make([]primitive.ObjectID, 0, len(req.Id)) + for _, id := range req.Id { + oid, err := primitive.ObjectIDFromHex(id) + if err != nil { + return nil, errs.ErrArgs.WrapMsg("invalid id " + err.Error()) + } + ids = append(ids, oid) + } + if err := t.applicationDatabase.DeleteVersion(ctx, ids); err != nil { + return nil, err + } + return &third.DeleteApplicationVersionResp{}, nil +} + +func (t *thirdServer) PageApplicationVersion(ctx context.Context, req *third.PageApplicationVersionReq) (*third.PageApplicationVersionResp, error) { + total, res, err := t.applicationDatabase.PageVersion(ctx, req.Platform, req.Pagination) + if err != nil { + return nil, err + } + return &third.PageApplicationVersionResp{ + Total: total, + Versions: datautil.Slice(res, t.db2pbApplication), + }, nil +} diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 0eeaaa3140..c6b588d8d4 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -38,12 +38,13 @@ import ( ) type thirdServer struct { - thirdDatabase controller.ThirdDatabase - s3dataBase controller.S3Database - userRpcClient rpcclient.UserRpcClient - defaultExpire time.Duration - config *Config - minio *minio.Minio + thirdDatabase controller.ThirdDatabase + s3dataBase controller.S3Database + userRpcClient rpcclient.UserRpcClient + defaultExpire time.Duration + config *Config + minio *minio.Minio + applicationDatabase controller.ApplicationDatabase } type Config struct { @@ -74,6 +75,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + applicationMgo, err := mgo.NewApplicationMgo(mgocli.GetDB()) + if err != nil { + return err + } + // Select the oss method according to the profile policy enable := config.RpcConfig.Object.Enable var ( @@ -98,12 +104,13 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } localcache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ - thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), - userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), - s3dataBase: controller.NewS3Database(rdb, o, s3db), - defaultExpire: time.Hour * 24 * 7, - config: config, - minio: minioCli, + thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), + userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), + s3dataBase: controller.NewS3Database(rdb, o, s3db), + defaultExpire: time.Hour * 24 * 7, + config: config, + minio: minioCli, + applicationDatabase: controller.NewApplicationDatabase(applicationMgo, redis.NewApplicationRedisCache(applicationMgo, rdb)), }) return nil } diff --git a/internal/rpc/third/tool.go b/internal/rpc/third/tool.go index ac4be39689..4e22ffbf97 100644 --- a/internal/rpc/third/tool.go +++ b/internal/rpc/third/tool.go @@ -82,3 +82,11 @@ func checkValidObjectName(objectName string) error { func (t *thirdServer) IsManagerUserID(opUserID string) bool { return authverify.IsManagerUserID(opUserID, t.config.Share.IMAdminUserID) } + +func putUpdate[T any](update map[string]any, name string, val interface{ GetValuePtr() *T }) { + ptrVal := val.GetValuePtr() + if ptrVal == nil { + return + } + update[name] = *ptrVal +} diff --git a/pkg/common/storage/cache/application.go b/pkg/common/storage/cache/application.go new file mode 100644 index 0000000000..588732ec8d --- /dev/null +++ b/pkg/common/storage/cache/application.go @@ -0,0 +1,11 @@ +package cache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +type ApplicationCache interface { + LatestVersion(ctx context.Context, platform string) (*model.Application, error) + DeleteCache(ctx context.Context, platforms []string) error +} diff --git a/pkg/common/storage/cache/cachekey/application.go b/pkg/common/storage/cache/cachekey/application.go new file mode 100644 index 0000000000..032adba3c1 --- /dev/null +++ b/pkg/common/storage/cache/cachekey/application.go @@ -0,0 +1,9 @@ +package cachekey + +const ( + ApplicationLatestVersion = "APPLICATION_LATEST_VERSION:" +) + +func GetApplicationLatestVersionKey(platform string) string { + return ApplicationLatestVersion + platform +} diff --git a/pkg/common/storage/cache/redis/application.go b/pkg/common/storage/cache/redis/application.go new file mode 100644 index 0000000000..4a7a4ced67 --- /dev/null +++ b/pkg/common/storage/cache/redis/application.go @@ -0,0 +1,43 @@ +package redis + +import ( + "context" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/utils/datautil" + "github.com/redis/go-redis/v9" + "time" +) + +func NewApplicationRedisCache(db database.Application, rdb redis.UniversalClient) *ApplicationRedisCache { + return &ApplicationRedisCache{ + db: db, + rcClient: rockscache.NewClient(rdb, *GetRocksCacheOptions()), + deleter: NewBatchDeleterRedis(rdb, GetRocksCacheOptions(), nil), + expireTime: time.Hour * 24 * 7, + } +} + +type ApplicationRedisCache struct { + db database.Application + rcClient *rockscache.Client + deleter *BatchDeleterRedis + expireTime time.Duration +} + +func (a *ApplicationRedisCache) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { + return getCache(ctx, a.rcClient, cachekey.GetApplicationLatestVersionKey(platform), a.expireTime, func(ctx context.Context) (*model.Application, error) { + return a.db.LatestVersion(ctx, platform) + }) +} + +func (a *ApplicationRedisCache) DeleteCache(ctx context.Context, platforms []string) error { + if len(platforms) == 0 { + return nil + } + return a.deleter.ExecDelWithKeys(ctx, datautil.Slice(platforms, func(platform string) string { + return cachekey.GetApplicationLatestVersionKey(platform) + })) +} diff --git a/pkg/common/storage/controller/application.go b/pkg/common/storage/controller/application.go new file mode 100644 index 0000000000..72bca07efa --- /dev/null +++ b/pkg/common/storage/controller/application.go @@ -0,0 +1,69 @@ +package controller + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type ApplicationDatabase interface { + LatestVersion(ctx context.Context, platform string) (*model.Application, error) + AddVersion(ctx context.Context, val *model.Application) error + UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error + DeleteVersion(ctx context.Context, id []primitive.ObjectID) error + PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) +} + +func NewApplicationDatabase(db database.Application, cache cache.ApplicationCache) ApplicationDatabase { + return &applicationDatabase{db: db, cache: cache} +} + +type applicationDatabase struct { + db database.Application + cache cache.ApplicationCache +} + +func (a *applicationDatabase) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { + return a.cache.LatestVersion(ctx, platform) +} + +func (a *applicationDatabase) AddVersion(ctx context.Context, val *model.Application) error { + if err := a.db.AddVersion(ctx, val); err != nil { + return err + } + return a.cache.DeleteCache(ctx, []string{val.Platform}) +} + +func (a *applicationDatabase) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { + platforms, err := a.db.FindPlatform(ctx, []primitive.ObjectID{id}) + if err != nil { + return err + } + if err := a.db.UpdateVersion(ctx, id, update); err != nil { + return err + } + if p, ok := update["platform"]; ok { + if val, ok := p.(string); ok { + platforms = append(platforms, val) + } + } + return a.cache.DeleteCache(ctx, platforms) +} + +func (a *applicationDatabase) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { + platforms, err := a.db.FindPlatform(ctx, id) + if err != nil { + return err + } + if err := a.db.DeleteVersion(ctx, id); err != nil { + return err + } + return a.cache.DeleteCache(ctx, platforms) +} + +func (a *applicationDatabase) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { + return a.db.PageVersion(ctx, platforms, page) +} diff --git a/pkg/common/storage/database/application.go b/pkg/common/storage/database/application.go new file mode 100644 index 0000000000..c98ae74c82 --- /dev/null +++ b/pkg/common/storage/database/application.go @@ -0,0 +1,17 @@ +package database + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/pagination" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type Application interface { + LatestVersion(ctx context.Context, platform string) (*model.Application, error) + AddVersion(ctx context.Context, val *model.Application) error + UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error + DeleteVersion(ctx context.Context, id []primitive.ObjectID) error + PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) + FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) +} diff --git a/pkg/common/storage/database/mgo/application.go b/pkg/common/storage/database/mgo/application.go new file mode 100644 index 0000000000..e59c0560ab --- /dev/null +++ b/pkg/common/storage/database/mgo/application.go @@ -0,0 +1,82 @@ +package mgo + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/pagination" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewApplicationMgo(db *mongo.Database) (*ApplicationMgo, error) { + coll := db.Collection("application") + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "platform", Value: 1}, + {Key: "version", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "latest", Value: -1}, + }, + }, + }) + if err != nil { + return nil, err + } + return &ApplicationMgo{coll: coll}, nil +} + +type ApplicationMgo struct { + coll *mongo.Collection +} + +func (a *ApplicationMgo) sort() any { + return bson.D{{"latest", -1}, {"_id", -1}} +} + +func (a *ApplicationMgo) LatestVersion(ctx context.Context, platform string) (*model.Application, error) { + return mongoutil.FindOne[*model.Application](ctx, a.coll, bson.M{"platform": platform}, options.FindOne().SetSort(a.sort())) +} + +func (a *ApplicationMgo) AddVersion(ctx context.Context, val *model.Application) error { + if val.ID.IsZero() { + val.ID = primitive.NewObjectID() + } + return mongoutil.InsertMany(ctx, a.coll, []*model.Application{val}) +} + +func (a *ApplicationMgo) UpdateVersion(ctx context.Context, id primitive.ObjectID, update map[string]any) error { + if len(update) == 0 { + return nil + } + return mongoutil.UpdateOne(ctx, a.coll, bson.M{"_id": id}, bson.M{"$set": update}, true) +} + +func (a *ApplicationMgo) DeleteVersion(ctx context.Context, id []primitive.ObjectID) error { + if len(id) == 0 { + return nil + } + return mongoutil.DeleteMany(ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}) +} + +func (a *ApplicationMgo) PageVersion(ctx context.Context, platforms []string, page pagination.Pagination) (int64, []*model.Application, error) { + filter := bson.M{} + if len(platforms) > 0 { + filter["platform"] = bson.M{"$in": platforms} + } + return mongoutil.FindPage[*model.Application](ctx, a.coll, filter, page, options.Find().SetSort(a.sort())) +} + +func (a *ApplicationMgo) FindPlatform(ctx context.Context, id []primitive.ObjectID) ([]string, error) { + if len(id) == 0 { + return nil, nil + } + return mongoutil.Find[string](ctx, a.coll, bson.M{"_id": bson.M{"$in": id}}, options.Find().SetProjection(bson.M{"_id": 0, "platform": 1})) +} diff --git a/pkg/common/storage/model/application.go b/pkg/common/storage/model/application.go new file mode 100644 index 0000000000..f5bae2be65 --- /dev/null +++ b/pkg/common/storage/model/application.go @@ -0,0 +1,17 @@ +package model + +import ( + "go.mongodb.org/mongo-driver/bson/primitive" + "time" +) + +type Application struct { + ID primitive.ObjectID `bson:"_id"` + Platform string `bson:"platform"` + Version string `bson:"version"` + Url string `bson:"url"` + Text string `bson:"text"` + Force bool `bson:"force"` + Latest bool `bson:"latest"` + CreateTime time.Time `bson:"create_time"` +}