diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index cd0f314..330255b 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -1,18 +1,24 @@ package strings +// Exchange name const ( - // Exchange name VideoExchange = "video_exchange" EventExchange = "event" MessageExchange = "message_exchange" + AuditExchange = "audit_exchange" +) - // Queue name +// Queue name +const ( VideoPicker = "video_picker" VideoSummary = "video_summary" MessageCommon = "message_common" MessageGPT = "message_gpt" + AuditPicker = "audit_picker" +) - // Routing key +// Routing key +const ( FavoriteActionEvent = "video.favorite.action" VideoGetEvent = "video.get.action" VideoCommentEvent = "video.comment.action" @@ -20,12 +26,28 @@ const ( MessageActionEvent = "message.common" MessageGptActionEvent = "message.gpt" + AuditPublishEvent = "audit" +) - // Action Id +// Action Type +const ( FavoriteIdActionLog = 1 // 用户点赞相关操作 + FollowIdActionLog = 2 // 用户关注相关操作 +) - // Action Name +// Action Name +const ( FavoriteNameActionLog = "favorite.action" // 用户点赞操作名称 FavoriteUpActionSubLog = "up" FavoriteDownActionSubLog = "down" + + FollowNameActionLog = "follow.action" // 用户关注操作名称 + FollowUpActionSubLog = "up" + FollowDownActionSubLog = "down" +) + +// Action Service Name +const ( + FavoriteServiceName = "FavoriteService" + FollowServiceName = "FollowService" ) diff --git a/src/models/action.go b/src/models/action.go index 58c17d6..33e6d85 100644 --- a/src/models/action.go +++ b/src/models/action.go @@ -1,6 +1,9 @@ package models -import "gorm.io/gorm" +import ( + "GuGoTik/src/storage/database" + "gorm.io/gorm" +) type Action struct { Type uint // 用户操作的行为类型,如:1表示点赞相关 @@ -10,6 +13,7 @@ type Action struct { Attached string // 附带信息,当 Name - SubName 无法说明时,添加一个额外的信息 ActorId uint32 // 操作者 Id VideoId uint32 // 附属的视频 Id,没有填写为0 + AffectUserId uint32 // 操作的用户 Id,如:被关注的用户 Id AffectAction uint // 操作的类型,如:1. 自增/自减某个数据,2. 直接修改某个数据 AffectedData string // 操作的数值是什么,如果是自增,填 1,如果是修改为某个数据,那么填这个数据的值 EventId string // 如果这个操作是一个大操作的子类型,那么需要具有相同的 UUID @@ -17,3 +21,9 @@ type Action struct { SpanId string // 这个操作的 SpanId gorm.Model //数据库模型 } + +func init() { + if err := database.Client.AutoMigrate(&Action{}); err != nil { + panic(err) + } +} diff --git a/src/models/comment.go b/src/models/comment.go index b3b9262..2e929d2 100644 --- a/src/models/comment.go +++ b/src/models/comment.go @@ -6,7 +6,7 @@ import ( ) type Comment struct { - ID uint32 `gorm:"not null;primarykey;autoIncrement"` // 评论 ID + ID uint32 `gorm:"not null;primaryKey;autoIncrement"` // 评论 ID VideoId uint32 `json:"video_id" column:"video_id" gorm:"not null;index:comment_video"` // 视频 ID UserId uint32 `json:"user_id" column:"user_id" gorm:"not null"` // 用户 ID Content string `json:"content" column:"content"` // 评论内容 diff --git a/src/services/favorite/handler.go b/src/services/favorite/handler.go index 17d54f2..11e5420 100644 --- a/src/services/favorite/handler.go +++ b/src/services/favorite/handler.go @@ -9,13 +9,16 @@ import ( "GuGoTik/src/rpc/feed" "GuGoTik/src/rpc/user" redis2 "GuGoTik/src/storage/redis" + "GuGoTik/src/utils/audit" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/rabbitmq" "context" "encoding/json" "fmt" + "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" + "go.opentelemetry.io/otel/trace" "strconv" "sync" "time" @@ -215,8 +218,9 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo pipe.ZAdd(ctx, user_like_Id, redis.Z{Score: float64(time.Now().Unix()), Member: req.VideoId}) return nil }) + // Publish event to event_exchange and audit_exchange wg := sync.WaitGroup{} - wg.Add(1) + wg.Add(2) go func() { defer wg.Done() produceFavorite(ctx, models.RecommendEvent{ @@ -226,6 +230,23 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo Source: config.FavoriteRpcServerName, }) }() + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FavoriteIdActionLog, + Name: strings.FavoriteNameActionLog, + SubName: strings.FavoriteUpActionSubLog, + ServiceName: strings.FavoriteServiceName, + ActorId: req.ActorId, + VideoId: req.VideoId, + AffectAction: 1, + AffectedData: "1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() wg.Wait() if err == redis.Nil { err = nil @@ -255,6 +276,29 @@ func (c FavoriteServiceServerImpl) FavoriteAction(ctx context.Context, req *favo return nil }) + + // Publish event to event_exchange and audit_exchange + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FavoriteIdActionLog, + Name: strings.FavoriteNameActionLog, + SubName: strings.FavoriteDownActionSubLog, + ServiceName: strings.FavoriteServiceName, + ActorId: req.ActorId, + VideoId: req.VideoId, + AffectAction: 1, + AffectedData: "-1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() + wg.Wait() + if err == redis.Nil { err = nil } diff --git a/src/services/favorite/main.go b/src/services/favorite/main.go index c0a516b..c054428 100644 --- a/src/services/favorite/main.go +++ b/src/services/favorite/main.go @@ -5,6 +5,7 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/favorite" + "GuGoTik/src/utils/audit" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -77,6 +78,10 @@ func main() { log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err) } srv.New() + + // Initialize the audit_exchange + audit.DeclareAuditExchange(channel) + srvMetrics.InitializeMetrics(s) g := &run.Group{} diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 50e95e4..bdd3046 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -13,6 +13,7 @@ import ( "context" "encoding/json" "errors" + "fmt" amqp "github.com/rabbitmq/amqp091-go" "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" @@ -101,6 +102,14 @@ func main() { ) failOnError(err, "Failed to get exchange") + err = channel.ExchangeDeclare( + strings.AuditExchange, + "direct", + true, false, false, false, + nil, + ) + failOnError(err, fmt.Sprintf("Failed to get %s exchange", strings.AuditExchange)) + _, err = channel.QueueDeclare( strings.MessageCommon, true, false, false, false, @@ -115,6 +124,13 @@ func main() { ) failOnError(err, "Failed to define queue") + _, err = channel.QueueDeclare( + strings.AuditPicker, + true, false, false, false, + nil, + ) + failOnError(err, fmt.Sprintf("Failed to define %s queue", strings.AuditPicker)) + err = channel.QueueBind( strings.MessageCommon, "message.#", @@ -133,6 +149,15 @@ func main() { ) failOnError(err, "Failed to bind queue to exchange") + err = channel.QueueBind( + strings.AuditPicker, + strings.AuditPublishEvent, + strings.AuditExchange, + false, + nil, + ) + failOnError(err, fmt.Sprintf("Failed to bind %s queue to %s exchange", strings.AuditPicker, strings.AuditExchange)) + go saveMessage(channel) logger := logging.LogService("MessageSend") logger.Infof(strings.MessageActionEvent + " is running now") @@ -141,6 +166,10 @@ func main() { logger = logging.LogService("MessageGPTSend") logger.Infof(strings.MessageGptActionEvent + " is running now") + go saveAuditAction(channel) + logger = logging.LogService("AuditPublish") + logger.Infof(strings.AuditPublishEvent + " is running now") + defer CloseMQConn() wg := sync.WaitGroup{} @@ -327,6 +356,98 @@ func chatWithGPT(channel *amqp.Channel) { } } +func saveAuditAction(channel *amqp.Channel) { + msg, err := channel.Consume( + strings.AuditPicker, + "", + false, false, false, false, + nil, + ) + failOnError(err, "Failed to Consume") + + var action models.Action + for body := range msg { + ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) + + ctx, span := tracing.Tracer.Start(ctx, "AuditPublishService") + logger := logging.LogService("AuditPublish").WithContext(ctx) + + if err := json.Unmarshal(body.Body, &action); err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + logging.SetSpanError(span, err) + err = body.Nack(false, true) + if err != nil { + logger.WithFields( + logrus.Fields{ + "err": err, + "Type": action.Type, + "SubName": action.SubName, + "ServiceName": action.ServiceName, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() + continue + } + + pAction := models.Action{ + Type: action.Type, + Name: action.Name, + SubName: action.SubName, + ServiceName: action.ServiceName, + Attached: action.Attached, + ActorId: action.ActorId, + VideoId: action.VideoId, + AffectAction: action.AffectAction, + AffectedData: action.AffectedData, + EventId: action.EventId, + TraceId: action.TraceId, + SpanId: action.SpanId, + } + logger.WithFields(logrus.Fields{ + "action": pAction, + }).Debugf("Recevie action event") + + result := database.Client.WithContext(ctx).Create(&pAction) + if result.Error != nil { + logger.WithFields( + logrus.Fields{ + "err": err, + "Type": action.Type, + "SubName": action.SubName, + "ServiceName": action.ServiceName, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + err = body.Nack(false, true) + if err != nil { + logger.WithFields( + logrus.Fields{ + "err": err, + "Type": action.Type, + "SubName": action.SubName, + "ServiceName": action.ServiceName, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() + continue + } + err = body.Ack(false) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the action...") + logging.SetSpanError(span, err) + } + } +} + func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger *logrus.Entry, span *trace.Span) { if !requeue { // Nack the message err := d.Nack(false, false) diff --git a/src/services/relation/handler.go b/src/services/relation/handler.go index 7e7918d..683620f 100644 --- a/src/services/relation/handler.go +++ b/src/services/relation/handler.go @@ -10,12 +10,16 @@ import ( "GuGoTik/src/storage/cached" "GuGoTik/src/storage/database" redis2 "GuGoTik/src/storage/redis" + "GuGoTik/src/utils/audit" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" "context" "errors" "fmt" "github.com/go-redis/redis_rate/v10" + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/trace" "gorm.io/gorm" @@ -38,9 +42,33 @@ func actionRelationLimitKey(userId uint32) string { return fmt.Sprintf("%s-%d", actionRelationLimitKeyPrefix, userId) } +func exitOnError(err error) { + if err != nil { + panic(err) + } +} + +func CloseMQConn() { + if err := conn.Close(); err != nil { + panic(err) + } + + if err := channel.Close(); err != nil { + panic(err) + } +} + func (r RelationServiceImpl) New() { userRPCConn := grpc2.Connect(config.UserRpcServerName) userClient = user.NewUserServiceClient(userRPCConn) + + var err error + + conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) + exitOnError(err) + + channel, err = conn.Channel() + exitOnError(err) } func (r RelationServiceImpl) Follow(ctx context.Context, request *relation.RelationActionRequest) (resp *relation.RelationActionResponse, err error) { @@ -213,6 +241,30 @@ func (r RelationServiceImpl) Follow(ctx context.Context, request *relation.Relat StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, } + + // Publish event to event_exchange and audit_exchange + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FollowIdActionLog, + Name: strings.FollowNameActionLog, + SubName: strings.FollowUpActionSubLog, + ServiceName: strings.FollowServiceName, + ActorId: request.ActorId, + VideoId: 0, + AffectUserId: request.UserId, + AffectAction: 1, + AffectedData: "1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() + wg.Wait() + return } @@ -351,6 +403,30 @@ func (r RelationServiceImpl) Unfollow(ctx context.Context, request *relation.Rel StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, } + + // Publish event to event_exchange and audit_exchange + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + action := &models.Action{ + Type: strings.FollowIdActionLog, + Name: strings.FollowNameActionLog, + SubName: strings.FollowDownActionSubLog, + ServiceName: strings.FollowServiceName, + ActorId: request.ActorId, + VideoId: 0, + AffectUserId: request.UserId, + AffectAction: 1, + AffectedData: "-1", + EventId: uuid.New().String(), + TraceId: trace.SpanContextFromContext(ctx).TraceID().String(), + SpanId: trace.SpanContextFromContext(ctx).SpanID().String(), + } + audit.PublishAuditEvent(ctx, action, channel) + }() + wg.Wait() + return } diff --git a/src/services/relation/main.go b/src/services/relation/main.go index 0951e62..2fbf2d3 100644 --- a/src/services/relation/main.go +++ b/src/services/relation/main.go @@ -5,6 +5,7 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/relation" + "GuGoTik/src/utils/audit" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -12,6 +13,7 @@ import ( grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus/promhttp" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" @@ -23,6 +25,9 @@ import ( "syscall" ) +var conn = &amqp.Connection{} +var channel = &amqp.Channel{} + func main() { tp, err := tracing.SetTraceProvider(config.RelationRpcServerName) @@ -74,6 +79,11 @@ func main() { grpc_health_v1.RegisterHealthServer(s, health.NewServer()) srv.New() + + // Initialize the audit_exchange + audit.DeclareAuditExchange(channel) + defer CloseMQConn() + srvMetrics.InitializeMetrics(s) g := &run.Group{} diff --git a/src/utils/audit/publish.go b/src/utils/audit/publish.go new file mode 100644 index 0000000..2e8acd1 --- /dev/null +++ b/src/utils/audit/publish.go @@ -0,0 +1,71 @@ +package audit + +import ( + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + models2 "GuGoTik/src/models" + "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" + "context" + "encoding/json" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +func exitOnError(err error) { + if err != nil { + panic(err) + } +} + +func DeclareAuditExchange(channel *amqp.Channel) { + err := channel.ExchangeDeclare( + strings.AuditExchange, + "direct", + true, + false, + false, + false, + nil, + ) + exitOnError(err) +} + +func PublishAuditEvent(ctx context.Context, action *models2.Action, channel *amqp.Channel) { + ctx, span := tracing.Tracer.Start(ctx, "AuditEventPublisher") + defer span.End() + logging.SetSpanWithHostname(span) + logger := logging.LogService("AuditEventPublisher").WithContext(ctx) + + data, err := json.Marshal(action) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when marshal the action model") + logging.SetSpanError(span, err) + return + } + + headers := rabbitmq.InjectAMQPHeaders(ctx) + + err = channel.PublishWithContext(ctx, + strings.AuditExchange, + strings.AuditPublishEvent, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: data, + Headers: headers, + }, + ) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when publishing the action model") + logging.SetSpanError(span, err) + return + } + +}