From 48d6c967b0e456996fed4cd2409b47d9630bea1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Thu, 17 Aug 2023 17:50:31 +0800 Subject: [PATCH 01/10] style: note off --- src/constant/strings/err.go | 2 + src/services/feed/handler.go | 94 ++++++++++++++++++------------------ src/web/main.go | 6 +++ 3 files changed, 55 insertions(+), 47 deletions(-) diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index 443dfc3..32decb9 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -26,6 +26,8 @@ const ( ActorIDNotMatchError = "用户不匹配" UnableToDeleteCommentErrorCode = 50008 UnableToDeleteCommentError = "无法删除视频评论" + PublishServiceInnerErrorCode = 50009 + PublishServiceInnerError = "发布服务出现内部错误,请稍后重试!" ) // Expected Error diff --git a/src/services/feed/handler.go b/src/services/feed/handler.go index 3a7ff10..864579e 100644 --- a/src/services/feed/handler.go +++ b/src/services/feed/handler.go @@ -6,6 +6,7 @@ import ( "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/rpc/comment" + "GuGoTik/src/rpc/favorite" "GuGoTik/src/rpc/feed" "GuGoTik/src/rpc/user" "GuGoTik/src/storage/database" @@ -31,16 +32,15 @@ const ( var UserClient user.UserServiceClient var CommentClient comment.CommentServiceClient - -//var FavoriteClient favorite.FavoriteServiceClient +var FavoriteClient favorite.FavoriteServiceClient func init() { userRpcConn := grpc2.Connect(config.UserRpcServerName) UserClient = user.NewUserServiceClient(userRpcConn) commentRpcConn := grpc2.Connect(config.CommentRpcServerName) CommentClient = comment.NewCommentServiceClient(commentRpcConn) - //favoriteRpcConn := grpc2.Connect(config.FavoriteRpcServerName) - //FavoriteClient = favorite.NewFavoriteServiceClient(favoriteRpcConn) + favoriteRpcConn := grpc2.Connect(config.FavoriteRpcServerName) + FavoriteClient = favorite.NewFavoriteServiceClient(favoriteRpcConn) } func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedRequest) (resp *feed.ListFeedResponse, err error) { @@ -232,28 +232,28 @@ func queryDetailed( }(i, v) // fill favorite count - //go func(i int, v *models.Video) { - // defer wg.Done() - // favoriteCount, localErr := FavoriteClient.CountFavorite(ctx, &favorite.CountFavoriteRequest{ - // VideoId: v.ID, - // }) - // if localErr != nil { - // logger.WithFields(logrus.Fields{ - // "video_id": v.ID, - // "err": localErr, - // }).Warning("failed to fetch favorite count") - // logging.SetSpanError(span, localErr) - // return - // } - // respVideoList[i].FavoriteCount = favoriteCount.Count - //}(i, v) - - // mock favorite count go func(i int, v *models.Video) { defer wg.Done() - respVideoList[i].FavoriteCount = uint32(countFavorite()) + favoriteCount, localErr := FavoriteClient.CountFavorite(ctx, &favorite.CountFavoriteRequest{ + VideoId: v.ID, + }) + if localErr != nil { + logger.WithFields(logrus.Fields{ + "video_id": v.ID, + "err": localErr, + }).Warning("failed to fetch favorite count") + logging.SetSpanError(span, localErr) + return + } + respVideoList[i].FavoriteCount = favoriteCount.Count }(i, v) + // mock favorite count + //go func(i int, v *models.Video) { + // defer wg.Done() + // respVideoList[i].FavoriteCount = uint32(countFavorite()) + //}(i, v) + // fill comment count go func(i int, v *models.Video) { defer wg.Done() @@ -272,29 +272,29 @@ func queryDetailed( }(i, v) // fill is favorite - //go func(i int, v *models.Video) { - // defer wg.Done() - // isFavorite, localErr := FavoriteClient.IsFavorite(ctx, &favorite.IsFavoriteRequest{ - // ActorId: actorId, - // VideoId: v.ID, - // }) - // if localErr != nil { - // logger.WithFields(logrus.Fields{ - // "video_id": v.ID, - // "err": localErr, - // }).Warning("failed to fetch favorite status") - // logging.SetSpanError(span, localErr) - // return - // } - // respVideoList[i].IsFavorite = isFavorite.Result - //}(i, v) - - // mock isFavorite go func(i int, v *models.Video) { defer wg.Done() - respVideoList[i].IsFavorite = isFavorite() + isFavorite, localErr := FavoriteClient.IsFavorite(ctx, &favorite.IsFavoriteRequest{ + ActorId: actorId, + VideoId: v.ID, + }) + if localErr != nil { + logger.WithFields(logrus.Fields{ + "video_id": v.ID, + "err": localErr, + }).Warning("failed to fetch favorite status") + logging.SetSpanError(span, localErr) + return + } + respVideoList[i].IsFavorite = isFavorite.Result }(i, v) + // mock isFavorite + //go func(i int, v *models.Video) { + // defer wg.Done() + // respVideoList[i].IsFavorite = isFavorite() + //}(i, v) + } wg.Wait() @@ -311,9 +311,9 @@ func query(ctx context.Context, logger *logrus.Entry, actorId uint32, videoIds [ return queryDetailed(ctx, logger, actorId, videos), nil } -func countFavorite() int { - return 0 -} -func isFavorite() bool { - return true -} +//func countFavorite() int { +// return 0 +//} +//func isFavorite() bool { +// return true +//} diff --git a/src/web/main.go b/src/web/main.go index 8ba82d5..aa73fe1 100644 --- a/src/web/main.go +++ b/src/web/main.go @@ -10,6 +10,7 @@ import ( comment2 "GuGoTik/src/web/comment" feed2 "GuGoTik/src/web/feed" "GuGoTik/src/web/middleware" + publish2 "GuGoTik/src/web/publish" "context" "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" @@ -68,6 +69,11 @@ func main() { comment.GET("/list", comment2.ListCommentHandler) comment.GET("/count", comment2.CountCommentHandler) } + publish := rootPath.Group("/publish") + { + //publish.POST("/action", publish2.ActionPublishHandle) + publish.GET("/list", publish2.ListPublishHandle) + } // Run Server if err := g.Run(config.WebServiceAddr); err != nil { panic("Can not run GuGoTik Gateway, binding port: " + config.WebServiceAddr) From acfab8b58e3f13197820d0708639430a638a06b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Thu, 17 Aug 2023 17:51:10 +0800 Subject: [PATCH 02/10] feature: add publish server --- src/services/publish/handler.go | 128 +++++++++++++++----------------- src/services/publish/main.go | 7 +- src/web/models/Publish.go | 14 ++++ src/web/publish/handler.go | 53 +++++++++++++ test/rpc/publishrpc_test.go | 44 +++++++++++ test/web/publish_test.go | 36 +++++++++ 6 files changed, 208 insertions(+), 74 deletions(-) create mode 100644 src/web/models/Publish.go create mode 100644 src/web/publish/handler.go create mode 100644 test/rpc/publishrpc_test.go create mode 100644 test/web/publish_test.go diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index c1753f0..91bc13b 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -1,106 +1,94 @@ package main import ( + "GuGoTik/src/constant/config" "GuGoTik/src/constant/strings" "GuGoTik/src/extra/tracing" "GuGoTik/src/models" + "GuGoTik/src/rpc/feed" "GuGoTik/src/rpc/publish" + "GuGoTik/src/storage/database" + grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" - "GuGoTik/src/utils/pathgen" - "GuGoTik/src/utils/rabbitmq" "context" - "encoding/json" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" ) type PublishServiceImpl struct { publish.PublishServiceServer } -var conn *amqp.Connection - -var channel *amqp.Channel - -var queue amqp.Queue +var FeedClient feed.FeedServiceClient func init() { - var err error - conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) - - if err != nil { - panic(err) - } - - channel, err = conn.Channel() - - if err != nil { - panic(err) - } - queue, err = channel.QueueDeclare( - strings.VideoPicker, //视频信息采集(封面/水印) - true, - false, - false, - false, - nil, - ) - - if err != nil { - panic(err) - } + FeedRpcConn := grpc2.Connect(config.FeedRpcServerName) + FeedClient = feed.NewFeedServiceClient(FeedRpcConn) } -func CloseMQConn() { - if err := conn.Close(); err != nil { - panic(err) +func (s *PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVideoRequest) (resp *publish.ListVideoResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.ListVideo") + defer span.End() + logger := logging.LogService("PublishServiceImpl.ListVideo").WithContext(ctx) + + var videos []models.Video + result := database.Client.WithContext(ctx). + Where("user_id = ?", req.UserId). + Order("created_at DESC"). + Find(&videos).Error + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("failed to query video") + logging.SetSpanError(span, err) + resp = &publish.ListVideoResponse{ + StatusCode: strings.PublishServiceInnerErrorCode, + StatusMsg: strings.PublishServiceInnerError, + } + return } - if err := channel.Close(); err != nil { - panic(err) + videoIds := make([]uint32, 0, len(videos)) + for _, video := range videos { + videoIds = append(videoIds, video.ID) } -} -func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.CreateVideoRequest) (resp *publish.CreateVideoResponse, err error) { - ctx, span := tracing.Tracer.Start(ctx, "CreateVideoService") - defer span.End() - logger := logging.LogService("PublishService.CreateVideo").WithContext(ctx) + queryVideoResp, err := FeedClient.QueryVideos(ctx, &feed.QueryVideosRequest{ + ActorId: req.ActorId, + VideoIds: videoIds, + }) logger.WithFields(logrus.Fields{ - "ActorId": request.ActorId, - "Title": request.Title, - }).Infof("Create video requested.") - - raw := &models.RawVideo{ - ActorId: request.ActorId, - Title: request.Title, - FilePath: pathgen.GenerateRawVideoName(request.ActorId, request.Title), - } - - bytes, err := json.Marshal(raw) + "response": resp, + }).Debug("all process done, ready to launch response") + return &publish.ListVideoResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + VideoList: queryVideoResp.VideoList, + }, nil +} - if err != nil { - resp = &publish.CreateVideoResponse{ - StatusCode: strings.VideoServiceInnerErrorCode, - StatusMsg: strings.VideoServiceInnerError, +func (s *PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountVideoRequest) (resp *publish.CountVideoResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") + defer span.End() + logger := logging.LogService("PublishServiceImpl.CountVideo").WithContext(ctx) + var count int64 + result := database.Client.WithContext(ctx).Where("user_id = ?", req.UserId).Count(&count).Error + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("failed to count video") + resp = &publish.CountVideoResponse{ + StatusCode: strings.PublishServiceInnerErrorCode, + StatusMsg: strings.PublishServiceInnerError, } + logging.SetSpanError(span, err) return } - // Context 注入到 RabbitMQ 中 - headers := rabbitmq.InjectAMQPHeaders(ctx) - - err = channel.Publish("", queue.Name, false, false, - amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "text/plain", - Body: bytes, - Headers: headers, - }) - - resp = &publish.CreateVideoResponse{ + resp = &publish.CountVideoResponse{ StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, + Count: uint32(count), } return } diff --git a/src/services/publish/main.go b/src/services/publish/main.go index afbdb50..24794b3 100644 --- a/src/services/publish/main.go +++ b/src/services/publish/main.go @@ -48,14 +48,13 @@ func main() { var srv PublishServiceImpl var probe healthImpl.ProbeImpl - defer CloseMQConn() - publish.RegisterPublishServiceServer(s, srv) + publish.RegisterPublishServiceServer(s, &srv) health.RegisterHealthServer(s, &probe) if err := consul.RegisterConsul(config.PublishRpcServerName, config.PublishRpcServerPort); err != nil { - log.Panicf("Rpc %s register consul happens error for: %v", config.PublishRpcServerName, err) + log.Panicf("Rpc %s register consul hanpens error for: %v", config.PublishRpcServerPort, err) } log.Infof("Rpc %s is running at %s now", config.PublishRpcServerName, config.PublishRpcServerPort) if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen happens error for: %v", config.PublishRpcServerName, err) + log.Panicf("Rpc %s listen hanpens error for: %v", config.PublishRpcServerName, err) } } diff --git a/src/web/models/Publish.go b/src/web/models/Publish.go new file mode 100644 index 0000000..197ba69 --- /dev/null +++ b/src/web/models/Publish.go @@ -0,0 +1,14 @@ +package models + +import "GuGoTik/src/rpc/feed" + +type ListPublishReq struct { + ActorId uint32 `form:"actor_id" binding:"required"` + UserId uint32 `form:"user_id" binding:"required"` +} + +type ListPublishRes struct { + StatusCode int `json:"status_code"` + StatusMsg string `json:"status_msg"` + VideoList []*feed.Video `json:"vide_list"` +} diff --git a/src/web/publish/handler.go b/src/web/publish/handler.go new file mode 100644 index 0000000..ddd6fa5 --- /dev/null +++ b/src/web/publish/handler.go @@ -0,0 +1,53 @@ +package publish + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/rpc/publish" + grpc2 "GuGoTik/src/utils/grpc" + "GuGoTik/src/utils/logging" + "GuGoTik/src/web/models" + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "net/http" +) + +var Client publish.PublishServiceClient + +func init() { + conn := grpc2.Connect(config.PublishRpcServerName) + Client = publish.NewPublishServiceClient(conn) +} + +func ListPublishHandle(c *gin.Context) { + _, span := tracing.Tracer.Start(c.Request.Context(), "Publish-ListHandle") + defer span.End() + logger := logging.LogService("GateWay.PublishList").WithContext(c.Request.Context()) + var req models.ListPublishReq + if err := c.ShouldBindQuery(&req); err != nil { + c.JSON(http.StatusOK, models.ListPublishRes{ + StatusCode: strings.GateWayParamsErrorCode, + StatusMsg: strings.GateWayParamsError, + VideoList: nil, + }) + } + + res, err := Client.ListVideo(c.Request.Context(), &publish.ListVideoRequest{ + ActorId: req.ActorId, + UserId: req.UserId, + }) + if err != nil { + logger.WithFields(logrus.Fields{ + "UserId": req.UserId, + }).Warnf("Error when trying to connect with PublishService") + c.JSON(http.StatusOK, res) + return + } + userId := req.UserId + logger.WithFields(logrus.Fields{ + "UserId": userId, + }).Infof("Publish List videos") + + c.JSON(http.StatusOK, res) +} diff --git a/test/rpc/publishrpc_test.go b/test/rpc/publishrpc_test.go new file mode 100644 index 0000000..713d8ad --- /dev/null +++ b/test/rpc/publishrpc_test.go @@ -0,0 +1,44 @@ +package rpc + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/rpc/publish" + "context" + "fmt" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "testing" +) + +func TestListVideo(t *testing.T) { + var Client publish.PublishServiceClient + req := publish.ListVideoRequest{ + UserId: 123, + ActorId: 123, + } + conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.PublishRpcServerPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) + assert.Empty(t, err) + Client = publish.NewPublishServiceClient(conn) + //调用服务端方法 + res, err := Client.ListVideo(context.Background(), &req) + assert.Empty(t, err) + assert.Equal(t, int32(0), res.StatusCode) +} + +func TestCountVideo(t *testing.T) { + var Client publish.PublishServiceClient + req := publish.CountVideoRequest{ + UserId: 123, + } + conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.PublishRpcServerPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) + assert.Empty(t, err) + Client = publish.NewPublishServiceClient(conn) + res, err := Client.CountVideo(context.Background(), &req) + assert.Empty(t, err) + assert.Equal(t, int32(0), res.StatusCode) +} diff --git a/test/web/publish_test.go b/test/web/publish_test.go new file mode 100644 index 0000000..c17db76 --- /dev/null +++ b/test/web/publish_test.go @@ -0,0 +1,36 @@ +package web + +import ( + "GuGoTik/src/web/models" + "encoding/json" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +func TestListVideo(t *testing.T) { + url := baseUrl + "/publish/list" + method := "GET" + req, err := http.NewRequest(method, url, nil) + q := req.URL.Query() + q.Add("actor_id", "1") + q.Add("video_id", "0") + req.URL.RawQuery = q.Encode() + + assert.Empty(t, err) + + res, err := client.Do(req) + assert.Empty(t, err) + defer func(Body io.ReadCloser) { + err := Body.Close() + assert.Empty(t, err) + }(res.Body) + + body, err := io.ReadAll(res.Body) + assert.Empty(t, err) + ListPublishRes := &models.ListPublishRes{} + err = json.Unmarshal(body, &ListPublishRes) + assert.Empty(t, err) + assert.Equal(t, 0, ListPublishRes.StatusCode) +} From 50313bd6b95ea2387a8a3cb53754022f41b98e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Fri, 18 Aug 2023 16:36:06 +0800 Subject: [PATCH 03/10] feature: add video processing --- src/models/rawvideo.go | 8 +++ src/services/publish/handler.go | 90 ++++++++++++++++++++++++++++- src/services/publish/main.go | 7 ++- src/services/videoprocessor/main.go | 79 +++++++++++++++++++++++++ 4 files changed, 179 insertions(+), 5 deletions(-) diff --git a/src/models/rawvideo.go b/src/models/rawvideo.go index 97f88be..0c2d15f 100644 --- a/src/models/rawvideo.go +++ b/src/models/rawvideo.go @@ -1,8 +1,16 @@ package models +import "GuGoTik/src/storage/database" + type RawVideo struct { ActorId uint32 Title string FilePath string CoverPath string } + +func init() { + if err := database.Client.AutoMigrate(&RawVideo{}); err != nil { + panic(err) + } +} diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index 91bc13b..029e282 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -10,22 +10,54 @@ import ( "GuGoTik/src/storage/database" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/pathgen" + "GuGoTik/src/utils/rabbitmq" "context" + "encoding/json" "github.com/sirupsen/logrus" + "github.com/streadway/amqp" ) type PublishServiceImpl struct { publish.PublishServiceServer } +var conn *amqp.Connection + +var channel *amqp.Channel + +var queue amqp.Queue + var FeedClient feed.FeedServiceClient func init() { FeedRpcConn := grpc2.Connect(config.FeedRpcServerName) FeedClient = feed.NewFeedServiceClient(FeedRpcConn) + var err error + conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) + if err != nil { + panic(err) + } + + channel, err = conn.Channel() + if err != nil { + panic(err) + } + + queue, err = channel.QueueDeclare( + strings.VideoPicker, //视频信息采集(封面/水印) + true, + false, + false, + false, + nil, + ) + if err != nil { + panic(err) + } } -func (s *PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVideoRequest) (resp *publish.ListVideoResponse, err error) { +func (a PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVideoRequest) (resp *publish.ListVideoResponse, err error) { ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.ListVideo") defer span.End() logger := logging.LogService("PublishServiceImpl.ListVideo").WithContext(ctx) @@ -67,7 +99,7 @@ func (s *PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVid }, nil } -func (s *PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountVideoRequest) (resp *publish.CountVideoResponse, err error) { +func (a PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountVideoRequest) (resp *publish.CountVideoResponse, err error) { ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") defer span.End() logger := logging.LogService("PublishServiceImpl.CountVideo").WithContext(ctx) @@ -92,3 +124,57 @@ func (s *PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountV } return } + +func CloseMQConn() { + if err := conn.Close(); err != nil { + panic(err) + } + + if err := channel.Close(); err != nil { + panic(err) + } +} + +func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.CreateVideoRequest) (resp *publish.CreateVideoResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "CreateVideoService") + defer span.End() + logger := logging.LogService("PublishService.CreateVideo").WithContext(ctx) + + logger.WithFields(logrus.Fields{ + "ActorId": request.ActorId, + "Title": request.Title, + }).Infof("Create video requested.") + + raw := &models.RawVideo{ + ActorId: request.ActorId, + Title: request.Title, + FilePath: pathgen.GenerateRawVideoName(request.ActorId, request.Title), + } + + bytes, err := json.Marshal(raw) + + if err != nil { + resp = &publish.CreateVideoResponse{ + StatusCode: strings.VideoServiceInnerErrorCode, + StatusMsg: strings.VideoServiceInnerError, + } + return + } + + // Context 注入到 RabbitMQ 中 + headers := rabbitmq.InjectAMQPHeaders(ctx) + + err = channel.Publish("", queue.Name, false, false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: bytes, + Headers: headers, + }) + + resp = &publish.CreateVideoResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + } + return +} diff --git a/src/services/publish/main.go b/src/services/publish/main.go index 24794b3..afbdb50 100644 --- a/src/services/publish/main.go +++ b/src/services/publish/main.go @@ -48,13 +48,14 @@ func main() { var srv PublishServiceImpl var probe healthImpl.ProbeImpl - publish.RegisterPublishServiceServer(s, &srv) + defer CloseMQConn() + publish.RegisterPublishServiceServer(s, srv) health.RegisterHealthServer(s, &probe) if err := consul.RegisterConsul(config.PublishRpcServerName, config.PublishRpcServerPort); err != nil { - log.Panicf("Rpc %s register consul hanpens error for: %v", config.PublishRpcServerPort, err) + log.Panicf("Rpc %s register consul happens error for: %v", config.PublishRpcServerName, err) } log.Infof("Rpc %s is running at %s now", config.PublishRpcServerName, config.PublishRpcServerPort) if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen hanpens error for: %v", config.PublishRpcServerName, err) + log.Panicf("Rpc %s listen happens error for: %v", config.PublishRpcServerName, err) } } diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index 861170a..4f50450 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -5,12 +5,17 @@ import ( "GuGoTik/src/constant/strings" "GuGoTik/src/extra/tracing" "GuGoTik/src/models" + "GuGoTik/src/storage/database" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/pathgen" "GuGoTik/src/utils/rabbitmq" "context" "encoding/json" + "fmt" "github.com/sirupsen/logrus" "github.com/streadway/amqp" + "os" + "os/exec" "sync" ) @@ -89,6 +94,23 @@ func Consume(channel *amqp.Channel) { }).Errorf("Error when unmarshaling the prepare json body.") } + // 截取封面 + coverPath, err := ExtractVideoCover(ctx, raw) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "coverPath": coverPath, + }).Errorf("Error when extracting video cover.") + } + // 添加水印逻辑 + watermarkedVideo, err := addWatermarkToVideo(ctx, raw) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "watermarkedVideo": watermarkedVideo, + }).Errorf("Error when adding watermark to video.") + } + span.End() err = d.Ack(false) if err != nil { @@ -98,3 +120,60 @@ func Consume(channel *amqp.Channel) { } } } + +func ExtractVideoCover(ctx context.Context, video models.RawVideo) (string, error) { + ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") + defer span.End() + logger := logging.LogService("VideoPicker.Picker").WithContext(ctx) + logger.Debug("Extracting video cover...") + cmdArgs := []string{ + "-i", video.FilePath, + "-ss", "00:00:01", + "-vframes", "1", + video.CoverPath, + } + cmd := exec.Command("ffmpeg", cmdArgs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + coverPath := pathgen.GenerateRawVideoName(video.ActorId, video.Title) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("failed to get video cover") + return coverPath, err + } + err = database.Client.Create(&video).Error + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("failed to save processed video to database") + return coverPath, err + } + + return coverPath, nil +} + +func addWatermarkToVideo(ctx context.Context, video models.RawVideo) (models.RawVideo, error) { + ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") + defer span.End() + logger := logging.LogService("VideoPicker.Picker").WithContext(ctx) + logger.Debug("Adding watermark to video...") + + cmdArgs := []string{ + "-i", video.FilePath, + "-vf", fmt.Sprintf("drawtext=text='%s':x=(w-text_w-10):y=10:fontsize=24:fontcolor=white", video.Title), + video.FilePath, + } + cmd := exec.Command("ffmpeg", cmdArgs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("failed to add video watermark") + return video, err + } + return video, nil +} From 517ede15edc8c9ba9813f47c57c0f62a698475ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Fri, 18 Aug 2023 16:41:37 +0800 Subject: [PATCH 04/10] fix: fix log err --- src/services/publish/handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index 029e282..fe415ff 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -63,11 +63,11 @@ func (a PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVide logger := logging.LogService("PublishServiceImpl.ListVideo").WithContext(ctx) var videos []models.Video - result := database.Client.WithContext(ctx). + err = database.Client.WithContext(ctx). Where("user_id = ?", req.UserId). Order("created_at DESC"). Find(&videos).Error - if result.Error != nil { + if err != nil { logger.WithFields(logrus.Fields{ "err": err, }).Warnf("failed to query video") @@ -104,8 +104,8 @@ func (a PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountVi defer span.End() logger := logging.LogService("PublishServiceImpl.CountVideo").WithContext(ctx) var count int64 - result := database.Client.WithContext(ctx).Where("user_id = ?", req.UserId).Count(&count).Error - if result.Error != nil { + err = database.Client.WithContext(ctx).Where("user_id = ?", req.UserId).Count(&count).Error + if err != nil { logger.WithFields(logrus.Fields{ "err": err, }).Warnf("failed to count video") From 1ffe7e63dd8ae26e4c2cf93b2072aba52bbcbb95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Sun, 20 Aug 2023 23:35:58 +0800 Subject: [PATCH 05/10] feature: add CreateVideo --- src/constant/strings/err.go | 2 + src/models/rawvideo.go | 5 ++- src/services/publish/handler.go | 56 +++++++++++++++++++++---- src/services/videoprocessor/main.go | 64 +++++++++++++++++------------ src/utils/pathgen/video.go | 18 +++++--- 5 files changed, 104 insertions(+), 41 deletions(-) diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index 32decb9..2475790 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -44,4 +44,6 @@ const ( ActionCommentTypeInvalid = "不合法的评论类型" ActionCommentLimitedCode = 10006 ActionCommentLimited = "评论频繁,请稍后再试!" + InvalidContentTypeCode = 10007 + InvalidContentType = "不合法的内容类型" ) diff --git a/src/models/rawvideo.go b/src/models/rawvideo.go index 0c2d15f..5d82bf7 100644 --- a/src/models/rawvideo.go +++ b/src/models/rawvideo.go @@ -4,9 +4,10 @@ import "GuGoTik/src/storage/database" type RawVideo struct { ActorId uint32 + VideoId uint32 Title string - FilePath string - CoverPath string + FileName string + CoverName string } func init() { diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index fe415ff..0c781ea 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -8,14 +8,19 @@ import ( "GuGoTik/src/rpc/feed" "GuGoTik/src/rpc/publish" "GuGoTik/src/storage/database" + "GuGoTik/src/storage/file" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/pathgen" "GuGoTik/src/utils/rabbitmq" + "bytes" "context" "encoding/json" "github.com/sirupsen/logrus" "github.com/streadway/amqp" + "math/rand" + "net/http" + "time" ) type PublishServiceImpl struct { @@ -57,7 +62,7 @@ func init() { } } -func (a PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVideoRequest) (resp *publish.ListVideoResponse, err error) { +func (a PublishServiceImpl) ListVideoService(ctx context.Context, req *publish.ListVideoRequest) (resp *publish.ListVideoResponse, err error) { ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.ListVideo") defer span.End() logger := logging.LogService("PublishServiceImpl.ListVideo").WithContext(ctx) @@ -144,15 +149,52 @@ func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.Cr "ActorId": request.ActorId, "Title": request.Title, }).Infof("Create video requested.") + // 检测视频格式 + detectedContentType := http.DetectContentType(request.Data) + if detectedContentType != "video/mp4" { + logger.WithFields(logrus.Fields{ + "content_type": detectedContentType, + }).Debug("invalid content type") + resp = &publish.CreateVideoResponse{ + StatusCode: strings.InvalidContentTypeCode, + StatusMsg: strings.InvalidContentType, + } + return + } + // byte[] -> reader + reader := bytes.NewReader(request.Data) + + // 创建一个新的随机数生成器 + r := rand.New(rand.NewSource(time.Now().UnixNano())) + videoId := r.Uint32() + fileName := pathgen.GenerateRawVideoName(request.ActorId, request.Title, videoId) + coverName := pathgen.GenerateCoverName(request.ActorId, request.Title, videoId) + // 上传视频 + _, err = file.Upload(ctx, fileName, reader) + if err != nil { + logger.WithFields(logrus.Fields{ + "file_name": fileName, + "err": err, + }).Debug("failed to upload video") + resp = &publish.CreateVideoResponse{ + StatusCode: strings.VideoServiceInnerErrorCode, + StatusMsg: strings.VideoServiceInnerError, + } + return + } + logger.WithFields(logrus.Fields{ + "file_name": fileName, + }).Debug("uploaded video") raw := &models.RawVideo{ - ActorId: request.ActorId, - Title: request.Title, - FilePath: pathgen.GenerateRawVideoName(request.ActorId, request.Title), + ActorId: request.ActorId, + VideoId: videoId, + Title: request.Title, + FileName: fileName, + CoverName: coverName, } - bytes, err := json.Marshal(raw) - + marshal, err := json.Marshal(raw) if err != nil { resp = &publish.CreateVideoResponse{ StatusCode: strings.VideoServiceInnerErrorCode, @@ -168,7 +210,7 @@ func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.Cr amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", - Body: bytes, + Body: marshal, Headers: headers, }) diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index 4f50450..61aa60c 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -6,6 +6,7 @@ import ( "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/storage/database" + "GuGoTik/src/storage/file" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/pathgen" "GuGoTik/src/utils/rabbitmq" @@ -95,22 +96,34 @@ func Consume(channel *amqp.Channel) { } // 截取封面 - coverPath, err := ExtractVideoCover(ctx, raw) + err := ExtractVideoCover(ctx, &raw) if err != nil { logger.WithFields(logrus.Fields{ - "err": err, - "coverPath": coverPath, + "err": err, }).Errorf("Error when extracting video cover.") } + // 添加水印逻辑 - watermarkedVideo, err := addWatermarkToVideo(ctx, raw) + err = addWatermarkToVideo(ctx, &raw) if err != nil { logger.WithFields(logrus.Fields{ - "err": err, - "watermarkedVideo": watermarkedVideo, + "err": err, }).Errorf("Error when adding watermark to video.") } + // 保存到数据库 + err = database.Client.WithContext(ctx).Create(&raw).Error + if err != nil { + logger.WithFields(logrus.Fields{ + "file_name": raw.FileName, + "cover_name": raw.CoverName, + "err": err, + }).Debug("failed to create db entry") + } + logger.WithFields(logrus.Fields{ + "entry": raw, + }).Debug("saved db entry") + span.End() err = d.Ack(false) if err != nil { @@ -121,49 +134,47 @@ func Consume(channel *amqp.Channel) { } } -func ExtractVideoCover(ctx context.Context, video models.RawVideo) (string, error) { +func ExtractVideoCover(ctx context.Context, video *models.RawVideo) error { ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") defer span.End() logger := logging.LogService("VideoPicker.Picker").WithContext(ctx) logger.Debug("Extracting video cover...") + RawFileName := video.FileName + CoverFileName := video.CoverName + RawFilePath := file.GetLocalPath(ctx, RawFileName) + CoverFilePath := file.GetLocalPath(ctx, CoverFileName) cmdArgs := []string{ - "-i", video.FilePath, + "-i", RawFilePath, "-ss", "00:00:01", "-vframes", "1", - video.CoverPath, + CoverFilePath, } cmd := exec.Command("ffmpeg", cmdArgs...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err := cmd.Run() - coverPath := pathgen.GenerateRawVideoName(video.ActorId, video.Title) if err != nil { logger.WithFields(logrus.Fields{ "err": err, }).Warnf("failed to get video cover") - return coverPath, err + return err } - err = database.Client.Create(&video).Error - if err != nil { - logger.WithFields(logrus.Fields{ - "err": err, - }).Warnf("failed to save processed video to database") - return coverPath, err - } - - return coverPath, nil + return nil } -func addWatermarkToVideo(ctx context.Context, video models.RawVideo) (models.RawVideo, error) { +func addWatermarkToVideo(ctx context.Context, video *models.RawVideo) error { ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") defer span.End() logger := logging.LogService("VideoPicker.Picker").WithContext(ctx) logger.Debug("Adding watermark to video...") - + RawFileName := video.FileName + FinalFileName := pathgen.GenerateFinalVideoName(video.ActorId, video.Title, video.VideoId) + RawFilePath := file.GetLocalPath(ctx, RawFileName) + FinalFilePath := file.GetLocalPath(ctx, FinalFileName) cmdArgs := []string{ - "-i", video.FilePath, + "-i", RawFilePath, "-vf", fmt.Sprintf("drawtext=text='%s':x=(w-text_w-10):y=10:fontsize=24:fontcolor=white", video.Title), - video.FilePath, + FinalFilePath, } cmd := exec.Command("ffmpeg", cmdArgs...) cmd.Stdout = os.Stdout @@ -173,7 +184,8 @@ func addWatermarkToVideo(ctx context.Context, video models.RawVideo) (models.Raw logger.WithFields(logrus.Fields{ "err": err, }).Warnf("failed to add video watermark") - return video, err + return err } - return video, nil + video.FileName = FinalFileName + return nil } diff --git a/src/utils/pathgen/video.go b/src/utils/pathgen/video.go index 3c28dff..32e60aa 100644 --- a/src/utils/pathgen/video.go +++ b/src/utils/pathgen/video.go @@ -6,14 +6,20 @@ import ( "strconv" ) -// GenerateRawVideoName 生成初始视频链接,此链接仅用于内部使用,暴露给用户的视频地址 -func GenerateRawVideoName(actorId uint32, title string) string { - hash := sha256.Sum256([]byte("RAW" + strconv.FormatUint(uint64(actorId), 10) + title)) +// GenerateRawVideoName 生成初始视频名称,此链接仅用于内部使用,暴露给用户的视频名称 +func GenerateRawVideoName(actorId uint32, title string, videoId uint32) string { + hash := sha256.Sum256([]byte("RAW" + strconv.FormatUint(uint64(actorId), 10) + title + strconv.FormatUint(uint64(videoId), 10))) return hex.EncodeToString(hash[:]) + ".mp4" } -// GenerateFinalVideoName 最终暴露给用户的视频地址 -func GenerateFinalVideoName(actorId uint32, title string) string { - hash := sha256.Sum256([]byte(strconv.FormatUint(uint64(actorId), 10) + title)) +// GenerateFinalVideoName 最终暴露给用户的视频名称 +func GenerateFinalVideoName(actorId uint32, title string, videoId uint32) string { + hash := sha256.Sum256([]byte(strconv.FormatUint(uint64(actorId), 10) + title + strconv.FormatUint(uint64(videoId), 10))) return hex.EncodeToString(hash[:]) + ".mp4" } + +// GenerateCoverName 生成视频封面名称 +func GenerateCoverName(actorId uint32, title string, videoId uint32) string { + hash := sha256.Sum256([]byte(strconv.FormatUint(uint64(actorId), 10) + title + strconv.FormatUint(uint64(videoId), 10))) + return hex.EncodeToString(hash[:]) + ".jpg" +} From 3ce086ad21c884b1bc35abae3c946dd7fe970005 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Mon, 21 Aug 2023 08:44:13 +0800 Subject: [PATCH 06/10] fix: fix part review --- src/services/publish/handler.go | 8 ++++---- src/services/videoprocessor/main.go | 13 +++++++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index 0c781ea..cdd8348 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -62,8 +62,8 @@ func init() { } } -func (a PublishServiceImpl) ListVideoService(ctx context.Context, req *publish.ListVideoRequest) (resp *publish.ListVideoResponse, err error) { - ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.ListVideo") +func (a PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVideoRequest) (resp *publish.ListVideoResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "ListVideoService") defer span.End() logger := logging.LogService("PublishServiceImpl.ListVideo").WithContext(ctx) @@ -83,7 +83,7 @@ func (a PublishServiceImpl) ListVideoService(ctx context.Context, req *publish.L } return } - + // todo: 使用协程完成,开 go func videoIds := make([]uint32, 0, len(videos)) for _, video := range videos { videoIds = append(videoIds, video.ID) @@ -105,7 +105,7 @@ func (a PublishServiceImpl) ListVideoService(ctx context.Context, req *publish.L } func (a PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountVideoRequest) (resp *publish.CountVideoResponse, err error) { - ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") + ctx, span := tracing.Tracer.Start(ctx, "CountVideoService") defer span.End() logger := logging.LogService("PublishServiceImpl.CountVideo").WithContext(ctx) var count int64 diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index 61aa60c..fa052ea 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -96,11 +96,12 @@ func Consume(channel *amqp.Channel) { } // 截取封面 - err := ExtractVideoCover(ctx, &raw) + err := extractVideoCover(ctx, &raw) if err != nil { logger.WithFields(logrus.Fields{ "err": err, }).Errorf("Error when extracting video cover.") + logging.SetSpanError(span, err) } // 添加水印逻辑 @@ -109,7 +110,9 @@ func Consume(channel *amqp.Channel) { logger.WithFields(logrus.Fields{ "err": err, }).Errorf("Error when adding watermark to video.") + logging.SetSpanError(span, err) } + //todo: update封面 // 保存到数据库 err = database.Client.WithContext(ctx).Create(&raw).Error @@ -134,8 +137,8 @@ func Consume(channel *amqp.Channel) { } } -func ExtractVideoCover(ctx context.Context, video *models.RawVideo) error { - ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") +func extractVideoCover(ctx context.Context, video *models.RawVideo) error { + ctx, span := tracing.Tracer.Start(ctx, "ExtractVideoCoverService") defer span.End() logger := logging.LogService("VideoPicker.Picker").WithContext(ctx) logger.Debug("Extracting video cover...") @@ -157,13 +160,14 @@ func ExtractVideoCover(ctx context.Context, video *models.RawVideo) error { logger.WithFields(logrus.Fields{ "err": err, }).Warnf("failed to get video cover") + logging.SetSpanError(span, err) return err } return nil } func addWatermarkToVideo(ctx context.Context, video *models.RawVideo) error { - ctx, span := tracing.Tracer.Start(ctx, "PublishServiceImpl.CountVideo") + ctx, span := tracing.Tracer.Start(ctx, "AddWatermarkToVideoService") defer span.End() logger := logging.LogService("VideoPicker.Picker").WithContext(ctx) logger.Debug("Adding watermark to video...") @@ -184,6 +188,7 @@ func addWatermarkToVideo(ctx context.Context, video *models.RawVideo) error { logger.WithFields(logrus.Fields{ "err": err, }).Warnf("failed to add video watermark") + logging.SetSpanError(span, err) return err } video.FileName = FinalFileName From c8cd71df5fb743d6c4a61d62193d6959aa8751e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Mon, 21 Aug 2023 15:55:39 +0800 Subject: [PATCH 07/10] fix: fix update cover,video --- src/services/publish/handler.go | 19 ++++++++--- src/services/videoprocessor/main.go | 52 +++++++++++++++++++---------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index cdd8348..af24051 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -83,25 +83,36 @@ func (a PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVide } return } - // todo: 使用协程完成,开 go func videoIds := make([]uint32, 0, len(videos)) for _, video := range videos { videoIds = append(videoIds, video.ID) } - + //todo: go func queryVideoResp, err := FeedClient.QueryVideos(ctx, &feed.QueryVideosRequest{ ActorId: req.ActorId, VideoIds: videoIds, }) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("queryVideoResp failed to obtain") + logging.SetSpanError(span, err) + resp = &publish.ListVideoResponse{ + StatusCode: strings.FeedServiceInnerErrorCode, + StatusMsg: strings.FeedServiceInnerError, + } + return + } logger.WithFields(logrus.Fields{ "response": resp, }).Debug("all process done, ready to launch response") - return &publish.ListVideoResponse{ + resp = &publish.ListVideoResponse{ StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, VideoList: queryVideoResp.VideoList, - }, nil + } + return } func (a PublishServiceImpl) CountVideo(ctx context.Context, req *publish.CountVideoRequest) (resp *publish.CountVideoResponse, err error) { diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index fa052ea..0ceb1d0 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -10,12 +10,11 @@ import ( "GuGoTik/src/utils/logging" "GuGoTik/src/utils/pathgen" "GuGoTik/src/utils/rabbitmq" + "bytes" "context" "encoding/json" - "fmt" "github.com/sirupsen/logrus" "github.com/streadway/amqp" - "os" "os/exec" "sync" ) @@ -112,7 +111,6 @@ func Consume(channel *amqp.Channel) { }).Errorf("Error when adding watermark to video.") logging.SetSpanError(span, err) } - //todo: update封面 // 保存到数据库 err = database.Client.WithContext(ctx).Create(&raw).Error @@ -145,21 +143,28 @@ func extractVideoCover(ctx context.Context, video *models.RawVideo) error { RawFileName := video.FileName CoverFileName := video.CoverName RawFilePath := file.GetLocalPath(ctx, RawFileName) - CoverFilePath := file.GetLocalPath(ctx, CoverFileName) cmdArgs := []string{ - "-i", RawFilePath, - "-ss", "00:00:01", - "-vframes", "1", - CoverFilePath, + "-i", RawFilePath, "-vframes", "1", "-an", "-f", "image2pipe", "-", } cmd := exec.Command("ffmpeg", cmdArgs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + // Create a bytes.Buffer to capture stdout + var buf bytes.Buffer + cmd.Stdout = &buf + err := cmd.Run() if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Warnf("failed to get video cover") + }).Warnf("cmd.Run() failed with %s\n", err) + logging.SetSpanError(span, err) + return err + } + // buf.Bytes() now contains the image data. You can use it to write to a file or send it to an output stream. + _, err = file.Upload(ctx, CoverFileName, bytes.NewReader(buf.Bytes())) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("failed to upload video cover") logging.SetSpanError(span, err) return err } @@ -174,20 +179,33 @@ func addWatermarkToVideo(ctx context.Context, video *models.RawVideo) error { RawFileName := video.FileName FinalFileName := pathgen.GenerateFinalVideoName(video.ActorId, video.Title, video.VideoId) RawFilePath := file.GetLocalPath(ctx, RawFileName) - FinalFilePath := file.GetLocalPath(ctx, FinalFileName) cmdArgs := []string{ "-i", RawFilePath, - "-vf", fmt.Sprintf("drawtext=text='%s':x=(w-text_w-10):y=10:fontsize=24:fontcolor=white", video.Title), - FinalFilePath, + "-vf", "drawtext=text=" + video.Title + ":x=(w-text_w-10):y=10:fontsize=24:fontcolor=white", + "-c:v", "copy", + "-f", "mp4", + "-", } + cmd := exec.Command("ffmpeg", cmdArgs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + var buf bytes.Buffer + cmd.Stdout = &buf + + // Execute the command err := cmd.Run() if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Warnf("failed to add video watermark") + }).Warnf("cmd.Run() failed with %s\n", err) + logging.SetSpanError(span, err) + } + + // Write the captured stdout to a file + _, err = file.Upload(ctx, FinalFileName, bytes.NewReader(buf.Bytes())) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Warnf("failed to upload video with watermark") logging.SetSpanError(span, err) return err } From b5d7fcd9caec8ef32a637ca807f6a5740c59ada1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Mon, 21 Aug 2023 18:00:00 +0800 Subject: [PATCH 08/10] refactor: comment out todo --- src/services/publish/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index af24051..6bcd8fd 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -87,7 +87,7 @@ func (a PublishServiceImpl) ListVideo(ctx context.Context, req *publish.ListVide for _, video := range videos { videoIds = append(videoIds, video.ID) } - //todo: go func + queryVideoResp, err := FeedClient.QueryVideos(ctx, &feed.QueryVideosRequest{ ActorId: req.ActorId, VideoIds: videoIds, From 81d479801abd5969fb10495b9d76b0c6ec7a8ab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Mon, 21 Aug 2023 19:38:15 +0800 Subject: [PATCH 09/10] refactor: resolve conflicts --- src/constant/strings/err.go | 13 ++++++++----- src/models/rawvideo.go | 10 +++++----- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index a96df19..35984c5 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -26,11 +26,12 @@ const ( ActorIDNotMatchError = "用户不匹配" UnableToDeleteCommentErrorCode = 50008 UnableToDeleteCommentError = "无法删除视频评论" - - UnableToAddMessageErrorCode = 50009 - UnableToAddMessageRrror = "发送消息出错" - UnableToQueryMessageErrorCode = 50010 - UnableToQueryMessageError = "查消息出错" + UnableToAddMessageErrorCode = 50009 + UnableToAddMessageRrror = "发送消息出错" + UnableToQueryMessageErrorCode = 50010 + UnableToQueryMessageError = "查消息出错" + PublishServiceInnerErrorCode = 50011 + PublishServiceInnerError = "发布服务出现内部错误,请稍后重试!" ) // Expected Error @@ -47,4 +48,6 @@ const ( ActionCommentTypeInvalid = "不合法的评论类型" ActionCommentLimitedCode = 10006 ActionCommentLimited = "评论频繁,请稍后再试!" + InvalidContentTypeCode = 10007 + InvalidContentType = "不合法的内容类型" ) diff --git a/src/models/rawvideo.go b/src/models/rawvideo.go index 5d82bf7..c0fa7aa 100644 --- a/src/models/rawvideo.go +++ b/src/models/rawvideo.go @@ -3,11 +3,11 @@ package models import "GuGoTik/src/storage/database" type RawVideo struct { - ActorId uint32 - VideoId uint32 - Title string - FileName string - CoverName string + ActorId uint32 `gorm:"not null;primarykey;"` + VideoId uint32 `json:"video_id" column:"video_id" gorm:"not null;"` // 视频 ID + Title string `json:"title" gorm:"not null;"` + FileName string `json:"play_name" gorm:"not null;"` + CoverName string `json:"cover_name" gorm:"not null;"` } func init() { From 11c0cd5edd49cc37a89c2759ce06810d40328e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=87=E6=9D=B0?= <2707138687@qq.com> Date: Mon, 21 Aug 2023 20:02:21 +0800 Subject: [PATCH 10/10] refactor: resolve conflicts --- src/services/publish/handler.go | 50 ++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index 35edc1f..bbd6224 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -5,13 +5,18 @@ import ( "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/rpc/publish" + "GuGoTik/src/storage/file" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/pathgen" "GuGoTik/src/utils/rabbitmq" + "bytes" "context" "encoding/json" "github.com/sirupsen/logrus" "github.com/streadway/amqp" + "math/rand" + "net/http" + "time" ) type PublishServiceImpl struct { @@ -106,15 +111,52 @@ func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.Cr "ActorId": request.ActorId, "Title": request.Title, }).Infof("Create video requested.") + // 检测视频格式 + detectedContentType := http.DetectContentType(request.Data) + if detectedContentType != "video/mp4" { + logger.WithFields(logrus.Fields{ + "content_type": detectedContentType, + }).Debug("invalid content type") + resp = &publish.CreateVideoResponse{ + StatusCode: strings.InvalidContentTypeCode, + StatusMsg: strings.InvalidContentType, + } + return + } + // byte[] -> reader + reader := bytes.NewReader(request.Data) + + // 创建一个新的随机数生成器 + r := rand.New(rand.NewSource(time.Now().UnixNano())) + videoId := r.Uint32() + fileName := pathgen.GenerateRawVideoName(request.ActorId, request.Title, videoId) + coverName := pathgen.GenerateCoverName(request.ActorId, request.Title, videoId) + // 上传视频 + _, err = file.Upload(ctx, fileName, reader) + if err != nil { + logger.WithFields(logrus.Fields{ + "file_name": fileName, + "err": err, + }).Debug("failed to upload video") + resp = &publish.CreateVideoResponse{ + StatusCode: strings.VideoServiceInnerErrorCode, + StatusMsg: strings.VideoServiceInnerError, + } + return + } + logger.WithFields(logrus.Fields{ + "file_name": fileName, + }).Debug("uploaded video") raw := &models.RawVideo{ - ActorId: request.ActorId, - Title: request.Title, - FileName: pathgen.GenerateRawVideoName(request.ActorId, request.Title), + ActorId: request.ActorId, + VideoId: videoId, + Title: request.Title, + FileName: fileName, + CoverName: coverName, } bytes, err := json.Marshal(raw) - if err != nil { resp = &publish.CreateVideoResponse{ StatusCode: strings.VideoServiceInnerErrorCode,