Skip to content

Commit

Permalink
feat(audit): add favorite and follow action to action table by mq
Browse files Browse the repository at this point in the history
  • Loading branch information
Maple-pro committed Sep 3, 2023
1 parent af7bdbe commit 1179a05
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 8 deletions.
32 changes: 27 additions & 5 deletions src/constant/strings/service.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,53 @@
package strings

// Exchange name
const (
// Exchange name
VideoExchange = "video_exchange"
EventExchange = "event"
MessageExchange = "message_exchange"
AuditExchange = "audit_exchange"
)

// Queue name
// Queue name
const (
VideoPicker = "video_picker"
VideoSummary = "video_summary"
MessageCommon = "message_common"
MessageGPT = "message_gpt"
AuditPicker = "audit_picker"
)

// Routing key
// Routing key
const (
FavoriteActionEvent = "video.favorite.action"
VideoGetEvent = "video.get.action"
VideoCommentEvent = "video.comment.action"
VideoPublishEvent = "video.publish.action"

MessageActionEvent = "message.common"
MessageGptActionEvent = "message.gpt"
AuditPublishEvent = "audit"
)

// Action Id
// Action Type
const (
FavoriteIdActionLog = 1 // 用户点赞相关操作
FollowIdActionLog = 2 // 用户关注相关操作
)

// Action Name
// Action Name
const (
FavoriteNameActionLog = "favorite.action" // 用户点赞操作名称
FavoriteUpActionSubLog = "up"
FavoriteDownActionSubLog = "down"

FollowNameActionLog = "follow.action" // 用户关注操作名称
FollowUpActionSubLog = "up"
FollowDownActionSubLog = "down"
)

// Action Service Name
const (
FavoriteServiceName = "FavoriteService"
FollowServiceName = "FollowService"
)
12 changes: 11 additions & 1 deletion src/models/action.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package models

import "gorm.io/gorm"
import (
"GuGoTik/src/storage/database"
"gorm.io/gorm"
)

type Action struct {
Type uint // 用户操作的行为类型,如:1表示点赞相关
Expand All @@ -10,10 +13,17 @@ type Action struct {
Attached string // 附带信息,当 Name - SubName 无法说明时,添加一个额外的信息
ActorId uint32 // 操作者 Id
VideoId uint32 // 附属的视频 Id,没有填写为0
AffectUserId uint32 // 操作的用户 Id,如:被关注的用户 Id
AffectAction uint // 操作的类型,如:1. 自增/自减某个数据,2. 直接修改某个数据
AffectedData string // 操作的数值是什么,如果是自增,填 1,如果是修改为某个数据,那么填这个数据的值
EventId string // 如果这个操作是一个大操作的子类型,那么需要具有相同的 UUID
TraceId string // 这个操作的 TraceId
SpanId string // 这个操作的 SpanId
gorm.Model //数据库模型
}

func init() {
if err := database.Client.AutoMigrate(&Action{}); err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion src/models/comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Comment struct {
ID uint32 `gorm:"not null;primarykey;autoIncrement"` // 评论 ID
ID uint32 `gorm:"not null;primaryKey;autoIncrement"` // 评论 ID
VideoId uint32 `json:"video_id" column:"video_id" gorm:"not null;index:comment_video"` // 视频 ID
UserId uint32 `json:"user_id" column:"user_id" gorm:"not null"` // 用户 ID
Content string `json:"content" column:"content"` // 评论内容
Expand Down
46 changes: 45 additions & 1 deletion src/services/favorite/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"GuGoTik/src/rpc/feed"
"GuGoTik/src/rpc/user"
redis2 "GuGoTik/src/storage/redis"
"GuGoTik/src/utils/audit"
grpc2 "GuGoTik/src/utils/grpc"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/rabbitmq"
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel/trace"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -215,8 +218,9 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo
pipe.ZAdd(ctx, user_like_Id, redis.Z{Score: float64(time.Now().Unix()), Member: req.VideoId})
return nil
})
// Publish event to event_exchange and audit_exchange
wg := sync.WaitGroup{}
wg.Add(1)
wg.Add(2)
go func() {
defer wg.Done()
produceFavorite(ctx, models.RecommendEvent{
Expand All @@ -226,6 +230,23 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo
Source: config.FavoriteRpcServerName,
})
}()
go func() {
defer wg.Done()
action := &models.Action{
Type: strings.FavoriteIdActionLog,
Name: strings.FavoriteNameActionLog,
SubName: strings.FavoriteUpActionSubLog,
ServiceName: strings.FavoriteServiceName,
ActorId: req.ActorId,
VideoId: req.VideoId,
AffectAction: 1,
AffectedData: "1",
EventId: uuid.New().String(),
TraceId: trace.SpanContextFromContext(ctx).TraceID().String(),
SpanId: trace.SpanContextFromContext(ctx).SpanID().String(),
}
audit.PublishAuditEvent(ctx, action, channel)
}()
wg.Wait()
if err == redis.Nil {
err = nil
Expand Down Expand Up @@ -255,6 +276,29 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo

return nil
})

// Publish event to event_exchange and audit_exchange
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
action := &models.Action{
Type: strings.FavoriteIdActionLog,
Name: strings.FavoriteNameActionLog,
SubName: strings.FavoriteDownActionSubLog,
ServiceName: strings.FavoriteServiceName,
ActorId: req.ActorId,
VideoId: req.VideoId,
AffectAction: 1,
AffectedData: "-1",
EventId: uuid.New().String(),
TraceId: trace.SpanContextFromContext(ctx).TraceID().String(),
SpanId: trace.SpanContextFromContext(ctx).SpanID().String(),
}
audit.PublishAuditEvent(ctx, action, channel)
}()
wg.Wait()

if err == redis.Nil {
err = nil
}
Expand Down
5 changes: 5 additions & 0 deletions src/services/favorite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"GuGoTik/src/extra/profiling"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/rpc/favorite"
"GuGoTik/src/utils/audit"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/prom"
Expand Down Expand Up @@ -77,6 +78,10 @@ func main() {
log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err)
}
srv.New()

// Initialize the audit_exchange
audit.DeclareAuditExchange(channel)

srvMetrics.InitializeMetrics(s)

g := &run.Group{}
Expand Down
121 changes: 121 additions & 0 deletions src/services/msgconsumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sashabaranov/go-openai"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -101,6 +102,14 @@ func main() {
)
failOnError(err, "Failed to get exchange")

err = channel.ExchangeDeclare(
strings.AuditExchange,
"direct",
true, false, false, false,
nil,
)
failOnError(err, fmt.Sprintf("Failed to get %s exchange", strings.AuditExchange))

_, err = channel.QueueDeclare(
strings.MessageCommon,
true, false, false, false,
Expand All @@ -115,6 +124,13 @@ func main() {
)
failOnError(err, "Failed to define queue")

_, err = channel.QueueDeclare(
strings.AuditPicker,
true, false, false, false,
nil,
)
failOnError(err, fmt.Sprintf("Failed to define %s queue", strings.AuditPicker))

err = channel.QueueBind(
strings.MessageCommon,
"message.#",
Expand All @@ -133,6 +149,15 @@ func main() {
)
failOnError(err, "Failed to bind queue to exchange")

err = channel.QueueBind(
strings.AuditPicker,
strings.AuditPublishEvent,
strings.AuditExchange,
false,
nil,
)
failOnError(err, fmt.Sprintf("Failed to bind %s queue to %s exchange", strings.AuditPicker, strings.AuditExchange))

go saveMessage(channel)
logger := logging.LogService("MessageSend")
logger.Infof(strings.MessageActionEvent + " is running now")
Expand All @@ -141,6 +166,10 @@ func main() {
logger = logging.LogService("MessageGPTSend")
logger.Infof(strings.MessageGptActionEvent + " is running now")

go saveAuditAction(channel)
logger = logging.LogService("AuditPublish")
logger.Infof(strings.AuditPublishEvent + " is running now")

defer CloseMQConn()

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -327,6 +356,98 @@ func chatWithGPT(channel *amqp.Channel) {
}
}

func saveAuditAction(channel *amqp.Channel) {
msg, err := channel.Consume(
strings.AuditPicker,
"",
false, false, false, false,
nil,
)
failOnError(err, "Failed to Consume")

var action models.Action
for body := range msg {
ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers)

ctx, span := tracing.Tracer.Start(ctx, "AuditPublishService")
logger := logging.LogService("AuditPublish").WithContext(ctx)

if err := json.Unmarshal(body.Body, &action); err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when unmarshaling the prepare json body.")
logging.SetSpanError(span, err)
err = body.Nack(false, true)
if err != nil {
logger.WithFields(
logrus.Fields{
"err": err,
"Type": action.Type,
"SubName": action.SubName,
"ServiceName": action.ServiceName,
},
).Errorf("Error when nack the message")
logging.SetSpanError(span, err)
}
span.End()
continue
}

pAction := models.Action{
Type: action.Type,
Name: action.Name,
SubName: action.SubName,
ServiceName: action.ServiceName,
Attached: action.Attached,
ActorId: action.ActorId,
VideoId: action.VideoId,
AffectAction: action.AffectAction,
AffectedData: action.AffectedData,
EventId: action.EventId,
TraceId: action.TraceId,
SpanId: action.SpanId,
}
logger.WithFields(logrus.Fields{
"action": pAction,
}).Debugf("Recevie action event")

result := database.Client.WithContext(ctx).Create(&pAction)
if result.Error != nil {
logger.WithFields(
logrus.Fields{
"err": err,
"Type": action.Type,
"SubName": action.SubName,
"ServiceName": action.ServiceName,
},
).Errorf("Error when nack the message")
logging.SetSpanError(span, err)
err = body.Nack(false, true)
if err != nil {
logger.WithFields(
logrus.Fields{
"err": err,
"Type": action.Type,
"SubName": action.SubName,
"ServiceName": action.ServiceName,
},
).Errorf("Error when nack the message")
logging.SetSpanError(span, err)
}
span.End()
continue
}
err = body.Ack(false)

if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when dealing with the action...")
logging.SetSpanError(span, err)
}
}
}

func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger *logrus.Entry, span *trace.Span) {
if !requeue { // Nack the message
err := d.Nack(false, false)
Expand Down
Loading

0 comments on commit 1179a05

Please sign in to comment.