Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add video processing #99

Merged
merged 19 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/constant/strings/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ const (
ActorIDNotMatchError = "用户不匹配"
UnableToDeleteCommentErrorCode = 50008
UnableToDeleteCommentError = "无法删除视频评论"

UnableToAddMessageErrorCode = 50009
UnableToAddMessageRrror = "发送消息出错"
UnableToQueryMessageErrorCode = 50010
UnableToQueryMessageError = "查消息出错"
UnableToAddMessageErrorCode = 50009
UnableToAddMessageRrror = "发送消息出错"
UnableToQueryMessageErrorCode = 50010
UnableToQueryMessageError = "查消息出错"
PublishServiceInnerErrorCode = 50011
PublishServiceInnerError = "发布服务出现内部错误,请稍后重试!"
)

// Expected Error
Expand All @@ -47,4 +48,6 @@ const (
ActionCommentTypeInvalid = "不合法的评论类型"
ActionCommentLimitedCode = 10006
ActionCommentLimited = "评论频繁,请稍后再试!"
InvalidContentTypeCode = 10007
InvalidContentType = "不合法的内容类型"
)
94 changes: 47 additions & 47 deletions src/services/feed/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/rpc/comment"
"GuGoTik/src/rpc/favorite"
"GuGoTik/src/rpc/feed"
"GuGoTik/src/rpc/user"
"GuGoTik/src/storage/database"
Expand All @@ -31,16 +32,15 @@ const (

var UserClient user.UserServiceClient
var CommentClient comment.CommentServiceClient

//var FavoriteClient favorite.FavoriteServiceClient
var FavoriteClient favorite.FavoriteServiceClient

func init() {
userRpcConn := grpc2.Connect(config.UserRpcServerName)
UserClient = user.NewUserServiceClient(userRpcConn)
commentRpcConn := grpc2.Connect(config.CommentRpcServerName)
CommentClient = comment.NewCommentServiceClient(commentRpcConn)
//favoriteRpcConn := grpc2.Connect(config.FavoriteRpcServerName)
//FavoriteClient = favorite.NewFavoriteServiceClient(favoriteRpcConn)
favoriteRpcConn := grpc2.Connect(config.FavoriteRpcServerName)
FavoriteClient = favorite.NewFavoriteServiceClient(favoriteRpcConn)
}

func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedRequest) (resp *feed.ListFeedResponse, err error) {
Expand Down Expand Up @@ -232,28 +232,28 @@ func queryDetailed(
}(i, v)

// fill favorite count
//go func(i int, v *models.Video) {
// defer wg.Done()
// favoriteCount, localErr := FavoriteClient.CountFavorite(ctx, &favorite.CountFavoriteRequest{
// VideoId: v.ID,
// })
// if localErr != nil {
// logger.WithFields(logrus.Fields{
// "video_id": v.ID,
// "err": localErr,
// }).Warning("failed to fetch favorite count")
// logging.SetSpanError(span, localErr)
// return
// }
// respVideoList[i].FavoriteCount = favoriteCount.Count
//}(i, v)

// mock favorite count
go func(i int, v *models.Video) {
defer wg.Done()
respVideoList[i].FavoriteCount = uint32(countFavorite())
favoriteCount, localErr := FavoriteClient.CountFavorite(ctx, &favorite.CountFavoriteRequest{
VideoId: v.ID,
})
if localErr != nil {
logger.WithFields(logrus.Fields{
"video_id": v.ID,
"err": localErr,
}).Warning("failed to fetch favorite count")
logging.SetSpanError(span, localErr)
return
}
respVideoList[i].FavoriteCount = favoriteCount.Count
}(i, v)

// mock favorite count
//go func(i int, v *models.Video) {
// defer wg.Done()
// respVideoList[i].FavoriteCount = uint32(countFavorite())
//}(i, v)

// fill comment count
go func(i int, v *models.Video) {
defer wg.Done()
Expand All @@ -272,29 +272,29 @@ func queryDetailed(
}(i, v)

// fill is favorite
//go func(i int, v *models.Video) {
// defer wg.Done()
// isFavorite, localErr := FavoriteClient.IsFavorite(ctx, &favorite.IsFavoriteRequest{
// ActorId: actorId,
// VideoId: v.ID,
// })
// if localErr != nil {
// logger.WithFields(logrus.Fields{
// "video_id": v.ID,
// "err": localErr,
// }).Warning("failed to fetch favorite status")
// logging.SetSpanError(span, localErr)
// return
// }
// respVideoList[i].IsFavorite = isFavorite.Result
//}(i, v)

// mock isFavorite
go func(i int, v *models.Video) {
defer wg.Done()
respVideoList[i].IsFavorite = isFavorite()
isFavorite, localErr := FavoriteClient.IsFavorite(ctx, &favorite.IsFavoriteRequest{
ActorId: actorId,
VideoId: v.ID,
})
if localErr != nil {
logger.WithFields(logrus.Fields{
"video_id": v.ID,
"err": localErr,
}).Warning("failed to fetch favorite status")
logging.SetSpanError(span, localErr)
return
}
respVideoList[i].IsFavorite = isFavorite.Result
}(i, v)

// mock isFavorite
//go func(i int, v *models.Video) {
// defer wg.Done()
// respVideoList[i].IsFavorite = isFavorite()
//}(i, v)

}
wg.Wait()

Expand All @@ -311,9 +311,9 @@ func query(ctx context.Context, logger *logrus.Entry, actorId uint32, videoIds [
return queryDetailed(ctx, logger, actorId, videos), nil
}

func countFavorite() int {
return 0
}
func isFavorite() bool {
return true
}
//func countFavorite() int {
// return 0
//}
//func isFavorite() bool {
// return true
//}
50 changes: 46 additions & 4 deletions src/services/publish/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import (
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/rpc/publish"
"GuGoTik/src/storage/file"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/pathgen"
"GuGoTik/src/utils/rabbitmq"
"bytes"
"context"
"encoding/json"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"math/rand"
"net/http"
"time"
)

type PublishServiceImpl struct {
Expand Down Expand Up @@ -106,15 +111,52 @@ func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.Cr
"ActorId": request.ActorId,
"Title": request.Title,
}).Infof("Create video requested.")
// 检测视频格式
detectedContentType := http.DetectContentType(request.Data)
if detectedContentType != "video/mp4" {
logger.WithFields(logrus.Fields{
"content_type": detectedContentType,
}).Debug("invalid content type")
resp = &publish.CreateVideoResponse{
StatusCode: strings.InvalidContentTypeCode,
StatusMsg: strings.InvalidContentType,
}
return
}
// byte[] -> reader
reader := bytes.NewReader(request.Data)

// 创建一个新的随机数生成器
r := rand.New(rand.NewSource(time.Now().UnixNano()))
videoId := r.Uint32()
fileName := pathgen.GenerateRawVideoName(request.ActorId, request.Title, videoId)
coverName := pathgen.GenerateCoverName(request.ActorId, request.Title, videoId)
// 上传视频
_, err = file.Upload(ctx, fileName, reader)
if err != nil {
logger.WithFields(logrus.Fields{
"file_name": fileName,
"err": err,
}).Debug("failed to upload video")
resp = &publish.CreateVideoResponse{
StatusCode: strings.VideoServiceInnerErrorCode,
StatusMsg: strings.VideoServiceInnerError,
}
return
}
logger.WithFields(logrus.Fields{
"file_name": fileName,
}).Debug("uploaded video")

raw := &models.RawVideo{
ActorId: request.ActorId,
Title: request.Title,
FileName: pathgen.GenerateRawVideoName(request.ActorId, request.Title),
ActorId: request.ActorId,
VideoId: videoId,
Title: request.Title,
FileName: fileName,
CoverName: coverName,
}

bytes, err := json.Marshal(raw)

if err != nil {
resp = &publish.CreateVideoResponse{
StatusCode: strings.VideoServiceInnerErrorCode,
Expand Down
114 changes: 114 additions & 0 deletions src/services/videoprocessor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import (
"GuGoTik/src/constant/strings"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/storage/database"
"GuGoTik/src/storage/file"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/pathgen"
"GuGoTik/src/utils/rabbitmq"
"bytes"
"context"
"encoding/json"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"os/exec"
"sync"
)

Expand Down Expand Up @@ -135,6 +140,37 @@ func Consume(channel *amqp.Channel) {
}).Errorf("Error when unmarshaling the prepare json body.")
}

// 截取封面
err := extractVideoCover(ctx, &raw)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when extracting video cover.")
logging.SetSpanError(span, err)
}

// 添加水印逻辑
err = addWatermarkToVideo(ctx, &raw)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when adding watermark to video.")
logging.SetSpanError(span, err)
}

// 保存到数据库
err = database.Client.WithContext(ctx).Create(&raw).Error
if err != nil {
logger.WithFields(logrus.Fields{
"file_name": raw.FileName,
"cover_name": raw.CoverName,
"err": err,
}).Debug("failed to create db entry")
}
logger.WithFields(logrus.Fields{
"entry": raw,
}).Debug("saved db entry")

span.End()
err = d.Ack(false)
if err != nil {
Expand All @@ -144,3 +180,81 @@ func Consume(channel *amqp.Channel) {
}
}
}

func extractVideoCover(ctx context.Context, video *models.RawVideo) error {
ctx, span := tracing.Tracer.Start(ctx, "ExtractVideoCoverService")
defer span.End()
logger := logging.LogService("VideoPicker.Picker").WithContext(ctx)
logger.Debug("Extracting video cover...")
RawFileName := video.FileName
CoverFileName := video.CoverName
RawFilePath := file.GetLocalPath(ctx, RawFileName)
cmdArgs := []string{
"-i", RawFilePath, "-vframes", "1", "-an", "-f", "image2pipe", "-",
}
cmd := exec.Command("ffmpeg", cmdArgs...)
// Create a bytes.Buffer to capture stdout
var buf bytes.Buffer
cmd.Stdout = &buf

err := cmd.Run()
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Warnf("cmd.Run() failed with %s\n", err)
logging.SetSpanError(span, err)
return err
}
// buf.Bytes() now contains the image data. You can use it to write to a file or send it to an output stream.
_, err = file.Upload(ctx, CoverFileName, bytes.NewReader(buf.Bytes()))
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Warnf("failed to upload video cover")
logging.SetSpanError(span, err)
return err
}
return nil
}

func addWatermarkToVideo(ctx context.Context, video *models.RawVideo) error {
ctx, span := tracing.Tracer.Start(ctx, "AddWatermarkToVideoService")
defer span.End()
logger := logging.LogService("VideoPicker.Picker").WithContext(ctx)
logger.Debug("Adding watermark to video...")
RawFileName := video.FileName
FinalFileName := pathgen.GenerateFinalVideoName(video.ActorId, video.Title, video.VideoId)
RawFilePath := file.GetLocalPath(ctx, RawFileName)
cmdArgs := []string{
"-i", RawFilePath,
"-vf", "drawtext=text=" + video.Title + ":x=(w-text_w-10):y=10:fontsize=24:fontcolor=white",
"-c:v", "copy",
"-f", "mp4",
"-",
}

cmd := exec.Command("ffmpeg", cmdArgs...)
var buf bytes.Buffer
cmd.Stdout = &buf

// Execute the command
err := cmd.Run()
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Warnf("cmd.Run() failed with %s\n", err)
logging.SetSpanError(span, err)
}

// Write the captured stdout to a file
_, err = file.Upload(ctx, FinalFileName, bytes.NewReader(buf.Bytes()))
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Warnf("failed to upload video with watermark")
logging.SetSpanError(span, err)
return err
}
video.FileName = FinalFileName
return nil
}
Loading