diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index 8a02ccd..d595c6e 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -27,7 +27,7 @@ const ( UnableToDeleteCommentErrorCode = 50008 UnableToDeleteCommentError = "无法删除视频评论" UnableToAddMessageErrorCode = 50009 - UnableToAddMessageRrror = "发送消息出错" + UnableToAddMessageError = "发送消息出错" UnableToQueryMessageErrorCode = 50010 UnableToQueryMessageError = "查消息出错" PublishServiceInnerErrorCode = 50011 @@ -48,30 +48,34 @@ const ( StringToIntError = "字符串转数字失败" RelationServiceIntErrorCode = 50019 RelationServiceIntError = "关系服务出现内部错误" - FavorivateServiceErrorCode = 50020 - FavorivateServiceError = "点赞服务内部出错" + FavoriteServiceErrorCode = 50020 + FavoriteServiceError = "点赞服务内部出错" UserServiceInnerErrorCode = 50021 UserServiceInnerError = "登录服务出现内部错误,请稍后重试!" ) // Expected Error const ( - AuthUserExistedCode = 10001 - AuthUserExisted = "用户已存在,请更换用户名或尝试登录!" - UserNotExistedCode = 10002 - UserNotExisted = "用户不存在,请先注册或检查你的用户名是否正确!" - AuthUserLoginFailedCode = 10003 - AuthUserLoginFailed = "用户信息错误,请检查账号密码是否正确" - AuthUserNeededCode = 10004 - AuthUserNeeded = "用户无权限操作,请登陆后重试!" - ActionCommentTypeInvalidCode = 10005 - ActionCommentTypeInvalid = "不合法的评论类型" - ActionCommentLimitedCode = 10006 - ActionCommentLimited = "评论频繁,请稍后再试!" - InvalidContentTypeCode = 10007 - InvalidContentType = "不合法的内容类型" - FavorivateServiceDuplicateCode = 10008 - FavorivateServiceDuplicateError = "不能重复点赞" - FavorivateServiceCancelCode = 10009 - FavorivateServiceCancelError = "没有点赞,不能取消点赞" + AuthUserExistedCode = 10001 + AuthUserExisted = "用户已存在,请更换用户名或尝试登录!" + UserNotExistedCode = 10002 + UserNotExisted = "用户不存在,请先注册或检查你的用户名是否正确!" + AuthUserLoginFailedCode = 10003 + AuthUserLoginFailed = "用户信息错误,请检查账号密码是否正确" + AuthUserNeededCode = 10004 + AuthUserNeeded = "用户无权限操作,请登陆后重试!" + ActionCommentTypeInvalidCode = 10005 + ActionCommentTypeInvalid = "不合法的评论类型" + ActionCommentLimitedCode = 10006 + ActionCommentLimited = "评论频繁,请稍后再试!" + InvalidContentTypeCode = 10007 + InvalidContentType = "不合法的内容类型" + FavoriteServiceDuplicateCode = 10008 + FavoriteServiceDuplicateError = "不能重复点赞" + FavoriteServiceCancelCode = 10009 + FavoriteServiceCancelError = "没有点赞,不能取消点赞" + PublishVideoLimitedCode = 10010 + PublishVideoLimited = "视频发布频繁,请稍后再试!" + ChatActionLimitedCode = 10011 + ChatActionLimitedError = "发送消息频繁,请稍后再试!" ) diff --git a/src/services/favorite/handler.go b/src/services/favorite/handler.go index c51d962..8f89cbb 100644 --- a/src/services/favorite/handler.go +++ b/src/services/favorite/handler.go @@ -58,8 +58,8 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo logging.SetSpanError(span, err) return &favorite.FavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } @@ -88,8 +88,8 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo //重复点赞 if value > 0 { resp = &favorite.FavoriteResponse{ - StatusCode: strings.FavorivateServiceDuplicateCode, - StatusMsg: strings.FavorivateServiceDuplicateError, + StatusCode: strings.FavoriteServiceDuplicateCode, + StatusMsg: strings.FavoriteServiceDuplicateError, } logger.WithFields(logrus.Fields{ "ActorId": req.ActorId, @@ -114,8 +114,8 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo //没有的点过赞 if value == 0 { resp = &favorite.FavoriteResponse{ - StatusCode: strings.FavorivateServiceCancelCode, - StatusMsg: strings.FavorivateServiceCancelError, + StatusCode: strings.FavoriteServiceCancelCode, + StatusMsg: strings.FavoriteServiceCancelError, } logger.WithFields(logrus.Fields{ @@ -150,8 +150,8 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo logging.SetSpanError(span, err) return &favorite.FavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } resp = &favorite.FavoriteResponse{ @@ -190,8 +190,8 @@ func (c FavoriteServiceServerImpl) FavoriteList(ctx context.Context, req *favori logging.SetSpanError(span, err) return &favorite.FavoriteListResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } @@ -205,8 +205,8 @@ func (c FavoriteServiceServerImpl) FavoriteList(ctx context.Context, req *favori logging.SetSpanError(span, err) return &favorite.FavoriteListResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } if len(arr) == 0 { @@ -237,8 +237,8 @@ func (c FavoriteServiceServerImpl) FavoriteList(ctx context.Context, req *favori }).Errorf("feed Service error") logging.SetSpanError(span, err) return &favorite.FavoriteListResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } @@ -273,8 +273,8 @@ func (c FavoriteServiceServerImpl) IsFavorite(ctx context.Context, req *favorite }).Errorf("feed Service error") logging.SetSpanError(span, err) return &favorite.IsFavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } @@ -294,8 +294,8 @@ func (c FavoriteServiceServerImpl) IsFavorite(ctx context.Context, req *favorite logging.SetSpanError(span, err) return &favorite.IsFavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } @@ -340,8 +340,8 @@ func (c FavoriteServiceServerImpl) CountFavorite(ctx context.Context, req *favor }).Errorf("feed Service error") logging.SetSpanError(span, err) return &favorite.CountFavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } videoId := fmt.Sprintf("%svideo_like_%d", config.EnvCfg.RedisPrefix, req.VideoId) @@ -357,8 +357,8 @@ func (c FavoriteServiceServerImpl) CountFavorite(ctx context.Context, req *favor logging.SetSpanError(span, err) return &favorite.CountFavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } else { num, _ = strconv.Atoi(value) @@ -398,8 +398,8 @@ func (c FavoriteServiceServerImpl) CountUserFavorite(ctx context.Context, req *f logging.SetSpanError(span, err) return &favorite.CountUserFavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } user_like_id := fmt.Sprintf("%suser_like_%d", config.EnvCfg.RedisPrefix, req.UserId) @@ -416,8 +416,8 @@ func (c FavoriteServiceServerImpl) CountUserFavorite(ctx context.Context, req *f logging.SetSpanError(span, err) return &favorite.CountUserFavoriteResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } else { num = value @@ -459,8 +459,8 @@ func (c FavoriteServiceServerImpl) CountUserTotalFavorited(ctx context.Context, logging.SetSpanError(span, err) return &favorite.CountUserTotalFavoritedResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } user_liked_id := fmt.Sprintf("%suser_liked_%d", config.EnvCfg.RedisPrefix, req.UserId) @@ -479,8 +479,8 @@ func (c FavoriteServiceServerImpl) CountUserTotalFavorited(ctx context.Context, logging.SetSpanError(span, err) return &favorite.CountUserTotalFavoritedResponse{ - StatusCode: strings.FavorivateServiceErrorCode, - StatusMsg: strings.FavorivateServiceError, + StatusCode: strings.FavoriteServiceErrorCode, + StatusMsg: strings.FavoriteServiceError, }, err } else { num, _ = strconv.Atoi(value) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 2d07fb7..23c2ac1 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -8,10 +8,12 @@ import ( "GuGoTik/src/rpc/chat" "GuGoTik/src/rpc/user" "GuGoTik/src/storage/database" + "GuGoTik/src/storage/redis" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "context" "fmt" + "github.com/go-redis/redis_rate/v10" "github.com/sirupsen/logrus" ) @@ -27,6 +29,14 @@ func (c MessageServiceImpl) New() { UserClient = user.NewUserServiceClient(userRpcConn) } +var chatActionLimitKeyPrefix = config.EnvCfg.RedisPrefix + "chat_freq_limit" + +const chatActionMaxQPS = 3 + +func chatActionLimitKey(userId uint32) string { + return fmt.Sprintf("%s-%d", chatActionLimitKeyPrefix, userId) +} + func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.ActionRequest) (res *chat.ActionResponse, err error) { ctx, span := tracing.Tracer.Start(ctx, "ChatActionService") defer span.End() @@ -39,6 +49,39 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action "content_text": request.Content, }).Debugf("Process start") + // Rate limiting + limiter := redis_rate.NewLimiter(redis.Client) + limiterKey := chatActionLimitKey(request.ActorId) + limiterRes, err := limiter.Allow(ctx, limiterKey, redis_rate.PerSecond(chatActionMaxQPS)) + if err != nil { + logger.WithFields(logrus.Fields{ + "ActorId": request.ActorId, + "user_id": request.UserId, + "action_type": request.ActionType, + "content_text": request.Content, + }).Errorf("ChatAction limiter error") + + res = &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageError, + } + return + } + if limiterRes.Allowed == 0 { + logger.WithFields(logrus.Fields{ + "ActorId": request.ActorId, + "user_id": request.UserId, + "action_type": request.ActionType, + "content_text": request.Content, + }).Errorf("Chat action query too frequently by user %d", request.ActorId) + + res = &chat.ActionResponse{ + StatusCode: strings.ChatActionLimitedCode, + StatusMsg: strings.ChatActionLimitedError, + } + return + } + userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{ UserId: request.UserId, }) @@ -55,7 +98,7 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action return &chat.ActionResponse{ StatusCode: strings.UnableToAddMessageErrorCode, - StatusMsg: strings.UnableToAddMessageRrror, + StatusMsg: strings.UnableToAddMessageError, }, err } @@ -185,7 +228,7 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context resp = &chat.ActionResponse{ StatusCode: strings.UnableToAddMessageErrorCode, - StatusMsg: strings.UnableToAddMessageRrror, + StatusMsg: strings.UnableToAddMessageError, } return } diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index 6e73975..20ede42 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -9,6 +9,7 @@ import ( "GuGoTik/src/rpc/publish" "GuGoTik/src/storage/database" "GuGoTik/src/storage/file" + "GuGoTik/src/storage/redis" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/pathgen" @@ -16,6 +17,8 @@ import ( "bytes" "context" "encoding/json" + "fmt" + "github.com/go-redis/redis_rate/v10" "github.com/sirupsen/logrus" "github.com/streadway/amqp" "math/rand" @@ -39,6 +42,25 @@ func exitOnError(err error) { } } +func CloseMQConn() { + if err := conn.Close(); err != nil { + panic(err) + } + + if err := channel.Close(); err != nil { + panic(err) + } +} + +var createVideoLimitKeyPrefix = config.EnvCfg.RedisPrefix + "publish_freq_limit" + +const createVideoMaxQPS = 3 + +// Return redis key to record the amount of CreateVideo query of an actor, e.g., publish_freq_limit-1-1669524458 +func createVideoLimitKey(userId uint32) string { + return fmt.Sprintf("%s-%d", createVideoLimitKeyPrefix, userId) +} + func (a PublishServiceImpl) New() { FeedRpcConn := grpc2.Connect(config.FeedRpcServerName) FeedClient = feed.NewFeedServiceClient(FeedRpcConn) @@ -183,16 +205,6 @@ func (a PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountVi return } -func CloseMQConn() { - if err := conn.Close(); err != nil { - panic(err) - } - - if err := channel.Close(); err != nil { - panic(err) - } -} - func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.CreateVideoRequest) (resp *publish.CreateVideoResponse, err error) { ctx, span := tracing.Tracer.Start(ctx, "CreateVideoService") defer span.End() @@ -202,6 +214,36 @@ func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.Cr "ActorId": request.ActorId, "Title": request.Title, }).Infof("Create video requested.") + + // Rate limiting + limiter := redis_rate.NewLimiter(redis.Client) + limiterKey := createVideoLimitKey(request.ActorId) + limiterRes, err := limiter.Allow(ctx, limiterKey, redis_rate.PerSecond(createVideoMaxQPS)) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + }).Errorf("CreateVideo limiter error") + + resp = &publish.CreateVideoResponse{ + StatusCode: strings.VideoServiceInnerErrorCode, + StatusMsg: strings.VideoServiceInnerError, + } + return + } + if limiterRes.Allowed == 0 { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + }).Errorf("Create video query too frequently by user %d", request.ActorId) + + resp = &publish.CreateVideoResponse{ + StatusCode: strings.PublishVideoLimitedCode, + StatusMsg: strings.PublishVideoLimited, + } + return + } + // 检测视频格式 detectedContentType := http.DetectContentType(request.Data) if detectedContentType != "video/mp4" { diff --git a/src/services/videoprocessor/summary.go b/src/services/videoprocessor/summary.go index 306d30c..265b64f 100644 --- a/src/services/videoprocessor/summary.go +++ b/src/services/videoprocessor/summary.go @@ -133,6 +133,24 @@ func SummaryConsume(channel *amqp.Channel) { span.End() continue } + + // Save audio_file_name to db + video := &models.Video{ + ID: raw.VideoId, + AudioFileName: audioFileName, + } + result := database.Client.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.AssignmentColumns([]string{"audio_file_name"}), + }).Create(&video) + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "Err": result.Error, + "ID": raw.VideoId, + "AudioFileName": audioFileName, + }).Errorf("Error when updating audio file name to database") + logging.SetSpanError(span, result.Error) + } } else { logger.WithFields(logrus.Fields{ "VideoId": raw.VideoId, diff --git a/test/rpc/publishrpc_test.go b/test/rpc/publishrpc_test.go index 4a5069f..bc6da99 100644 --- a/test/rpc/publishrpc_test.go +++ b/test/rpc/publishrpc_test.go @@ -10,11 +10,13 @@ import ( "google.golang.org/grpc/credentials/insecure" "io" "os" + "sync" "testing" ) +var publishClient publish.PublishServiceClient + func TestListVideo(t *testing.T) { - var Client publish.PublishServiceClient req := publish.ListVideoRequest{ UserId: 123, ActorId: 123, @@ -23,15 +25,14 @@ func TestListVideo(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) assert.Empty(t, err) - Client = publish.NewPublishServiceClient(conn) + publishClient = publish.NewPublishServiceClient(conn) //调用服务端方法 - res, err := Client.ListVideo(context.Background(), &req) + res, err := publishClient.ListVideo(context.Background(), &req) assert.Empty(t, err) assert.Equal(t, int32(0), res.StatusCode) } func TestCountVideo(t *testing.T) { - var Client publish.PublishServiceClient req := publish.CountVideoRequest{ UserId: 1, } @@ -39,14 +40,13 @@ func TestCountVideo(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) assert.Empty(t, err) - Client = publish.NewPublishServiceClient(conn) - res, err := Client.CountVideo(context.Background(), &req) + publishClient = publish.NewPublishServiceClient(conn) + res, err := publishClient.CountVideo(context.Background(), &req) assert.Empty(t, err) assert.Equal(t, int32(0), res.StatusCode) } func TestPublishVideo(t *testing.T) { - var Client publish.PublishServiceClient reader, err := os.Open("/home/yangfeng/Repos/youthcamp/videos/upload_video_2_1080p.mp4") assert.Empty(t, err) bytes, err := io.ReadAll(reader) @@ -60,8 +60,35 @@ func TestPublishVideo(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) assert.Empty(t, err) - Client = publish.NewPublishServiceClient(conn) - res, err := Client.CreateVideo(context.Background(), &req) + publishClient = publish.NewPublishServiceClient(conn) + res, err := publishClient.CreateVideo(context.Background(), &req) assert.Empty(t, err) assert.Equal(t, int32(0), res.StatusCode) } + +func TestPublishVideo_Limiter(t *testing.T) { + conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.PublishRpcServerPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) + assert.Empty(t, err) + publishClient = publish.NewPublishServiceClient(conn) + + reader, err := os.Open("/home/yangfeng/Repos/youthcamp/videos/upload_video_4.mp4") + assert.Empty(t, err) + bytes, err := io.ReadAll(reader) + assert.Empty(t, err) + req := publish.CreateVideoRequest{ + ActorId: 1, + Data: bytes, + Title: "原神,启动!", + } + + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, _ = publishClient.CreateVideo(context.Background(), &req) + }() + } +}