Skip to content

Commit

Permalink
Merge pull request #187 from Maple-pro/dev
Browse files Browse the repository at this point in the history
feat(event): add event publisher in VideoSummaryService9
  • Loading branch information
liaosunny123 authored Aug 27, 2023
2 parents 5aac9ea + a7331f3 commit 5501894
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/constant/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ const RelationRpcServerPort = ":37008"
const RecommendRpcServiceName = "GuGoTik-Recommend"
const RecommendRpcServicePort = ":37009"

const VideoProcessorRpcServiceName = "GuGoTik-VideoProcessorService"

const VideoPicker = "GuGoTik-VideoPicker"
const Event = "GuGoTik-Recommend"
2 changes: 2 additions & 0 deletions src/constant/strings/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
FavoriteServiceError = "点赞服务内部出错"
UserServiceInnerErrorCode = 50021
UserServiceInnerError = "登录服务出现内部错误,请稍后重试!"
UnableToQueryVideoErrorCode = 50022
UnableToQueryVideoError = "无法查询到该视频"
)

// Expected Error
Expand Down
4 changes: 2 additions & 2 deletions src/models/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func init() {

// Create magic user (id = 0): show video summary and keywords, and act as ChatGPT
magicUser := User{
ID: 999999,
ID: 1,
UserName: "ChatGPT",
Password: "chatgpt",
Role: 0,
Role: 2,
Avatar: "https://maples31-blog.oss-cn-beijing.aliyuncs.com/img/ChatGPT_logo.svg.png",
BackgroundImage: "https://maples31-blog.oss-cn-beijing.aliyuncs.com/img/ChatGPT.jpg",
Signature: "GuGoTik 小助手",
Expand Down
6 changes: 3 additions & 3 deletions src/services/auth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) {
logger := logging.LogService("AuthService.Register.AddMagicUserFriend").WithContext(ctx)

isMagicUserExist, err := userClient.GetUserExistInformation(ctx, &user2.UserExistRequest{
UserId: 999999,
UserId: 1,
})
if err != nil {
logger.WithFields(logrus.Fields{
Expand All @@ -426,7 +426,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) {
// User follow magic user
_, err = relationClient.Follow(ctx, &relation.RelationActionRequest{
ActorId: userId,
UserId: 999999,
UserId: 1,
})
if err != nil {
logger.WithFields(logrus.Fields{
Expand All @@ -439,7 +439,7 @@ func addMagicUserFriend(ctx context.Context, span *trace.Span, userId uint32) {

// Magic user follow user
_, err = relationClient.Follow(ctx, &relation.RelationActionRequest{
ActorId: 999999,
ActorId: 1,
UserId: userId,
})
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion src/services/comment/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,19 @@ func addComment(ctx context.Context, logger *logrus.Entry, span trace.Span, pUse
// Rate comment
go rateComment(logger, span, pCommentText, rComment.ID)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
produceComment(ctx, models.RecommendEvent{
ActorId: pUser.Id,
VideoId: []uint32{pVideoID},
Type: 2,
Source: config.CommentRpcServerName,
})
}()
wg.Wait()

resp = &comment.ActionCommentResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
Expand Down Expand Up @@ -556,7 +561,7 @@ func reindexCommentList(commentList *[]models.Comment) {
var commonComments []models.Comment

for _, c := range *commentList {
if c.UserId == 999999 {
if c.UserId == 1 {
magicComments = append(magicComments, c)
} else {
commonComments = append(commonComments, c)
Expand Down
5 changes: 5 additions & 0 deletions src/services/favorite/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"github.com/streadway/amqp"
"strconv"
"sync"
"time"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -181,14 +182,18 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo
pipe.ZAdd(ctx, user_like_Id, redis.Z{Score: float64(time.Now().Unix()), Member: req.VideoId})
return nil
})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
produceFavorite(ctx, models.RecommendEvent{
ActorId: req.ActorId,
VideoId: []uint32{req.VideoId},
Type: 1,
Source: config.FavoriteRpcServerName,
})
}()
wg.Wait()
if err == redis.Nil {
err = nil
}
Expand Down
56 changes: 55 additions & 1 deletion src/services/feed/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedR
}
return resp, err
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
var videoLists []uint32
for _, item := range videos {
videoLists = append(videoLists, item.Id)
Expand All @@ -194,6 +197,7 @@ func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedR
Source: config.FeedRpcServerName,
})
}()
wg.Wait()
resp = &feed.ListFeedResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
Expand Down Expand Up @@ -269,8 +273,58 @@ func (s FeedServiceImpl) QueryVideoExisted(ctx context.Context, req *feed.VideoE
return
}

// QueryVideoSummaryAndKeywords TODO
func (s FeedServiceImpl) QueryVideoSummaryAndKeywords(ctx context.Context, req *feed.QueryVideoSummaryAndKeywordsRequest) (resp *feed.QueryVideoSummaryAndKeywordsResponse, err error) {
ctx, span := tracing.Tracer.Start(ctx, "QueryVideoSummaryAndKeywordsService")
defer span.End()
logger := logging.LogService("FeedService.QueryVideoSummaryAndKeywords").WithContext(ctx)

videoExistRes, err := s.QueryVideoExisted(ctx, &feed.VideoExistRequest{
VideoId: req.VideoId,
})

if err != nil {
logger.WithFields(logrus.Fields{
"VideoId": req.VideoId,
}).Errorf("Cannot check if the video exists")
logging.SetSpanError(span, err)

resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.VideoServiceInnerErrorCode,
StatusMsg: strings.VideoServiceInnerError,
}
return
}

if !videoExistRes.Existed {
resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.UnableToQueryVideoErrorCode,
StatusMsg: strings.UnableToQueryVideoError,
}
return
}

video := models.Video{}
result := database.Client.WithContext(ctx).Where("id = ?", req.VideoId).First(&video)
if result.Error != nil {
logger.WithFields(logrus.Fields{
"VideoId": req.VideoId,
}).Errorf("Cannot get video from database")
logging.SetSpanError(span, err)

resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.VideoServiceInnerErrorCode,
StatusMsg: strings.VideoServiceInnerError,
}
return
}

resp = &feed.QueryVideoSummaryAndKeywordsResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
Summary: video.Summary,
Keywords: video.Keywords,
}

return
}

Expand Down
1 change: 1 addition & 0 deletions src/services/videoprocessor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func main() {
logger.Infof(strings.VideoSummary + " is running now")

ConnectServiceClient()
defer CloseMQConn()

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
86 changes: 84 additions & 2 deletions src/services/videoprocessor/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"gorm.io/gorm/clause"
"os/exec"
"strings"
"sync"
)

var userClient user.UserServiceClient
Expand All @@ -32,11 +33,75 @@ var openaiClient = openai.NewClient(config.EnvCfg.ChatGPTAPIKEYS)
var delayTime = int32(2 * 60 * 1000) //2 minutes
var maxRetries = int32(3)

var conn *amqp.Connection
var channel *amqp.Channel

func ConnectServiceClient() {
userRpcConn := grpc2.Connect(config.UserRpcServerName)
userClient = user.NewUserServiceClient(userRpcConn)
commentRpcConn := grpc2.Connect(config.CommentRpcServerName)
commentClient = comment.NewCommentServiceClient(commentRpcConn)

var err error

conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr())
exitOnError(err)

channel, err = conn.Channel()
exitOnError(err)

err = channel.ExchangeDeclare(
strings2.EventExchange,
"topic",
true,
false,
false,
false,
nil,
)
exitOnError(err)
}

func CloseMQConn() {
if err := conn.Close(); err != nil {
panic(err)
}

if err := channel.Close(); err != nil {
panic(err)
}
}

func produceKeywords(ctx context.Context, event models.RecommendEvent) {
ctx, span := tracing.Tracer.Start(ctx, "KeywordsEventPublisher")
defer span.End()
logger := logging.LogService("VideoSummaryService.KeywordsEventPublisher").WithContext(ctx)
data, err := json.Marshal(event)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when marshal the event model")
logging.SetSpanError(span, err)
return
}

err = channel.Publish(
strings2.EventExchange,
strings2.FavoriteActionEvent,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when publishing the event model")
logging.SetSpanError(span, err)
return
}
}

// errorHandler If `requeue` is false, it will just `Nack` it. If `requeue` is true, it will try to re-publish it.
Expand Down Expand Up @@ -270,6 +335,23 @@ func SummaryConsume(channel *amqp.Channel) {
}
}

// Publish keywords event
if !keywordsExist && keywords != "" {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
produceKeywords(ctx, models.RecommendEvent{
ActorId: raw.ActorId,
VideoId: []uint32{raw.VideoId},
Type: 3,
Source: config.VideoProcessorRpcServiceName,
})
}()
wg.Wait()
}

// Add magic comments
isMagicUserExistRes := isMagicUserExist(ctx, logger, &span)
if isMagicUserExistRes {
logger.Debug("Magic user exist")
Expand Down Expand Up @@ -534,7 +616,7 @@ func isKeywordsExist(videoId uint32) (res bool, keywords string, err error) {

func isMagicUserExist(ctx context.Context, logger *logrus.Entry, span *trace.Span) bool {
isMagicUserExistRes, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{
UserId: 999999,
UserId: 1,
})
if err != nil {
logger.WithFields(logrus.Fields{
Expand All @@ -554,7 +636,7 @@ func isMagicUserExist(ctx context.Context, logger *logrus.Entry, span *trace.Spa

func addMagicComment(videoId uint32, content string, ctx context.Context, logger *logrus.Entry, span *trace.Span) {
_, err := commentClient.ActionComment(ctx, &comment.ActionCommentRequest{
ActorId: 999999,
ActorId: 1,
VideoId: videoId,
ActionType: comment.ActionCommentType_ACTION_COMMENT_TYPE_ADD,
Action: &comment.ActionCommentRequest_CommentText{CommentText: content},
Expand Down

0 comments on commit 5501894

Please sign in to comment.