From 9c4eac43c1f2ac62f0273619ee918f87ddbc97f8 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Tue, 29 Aug 2023 17:26:32 +0800 Subject: [PATCH 1/2] add mq chat --- src/constant/strings/service.go | 2 + src/services/message/handler.go | 143 ++++++++++++++++++++++--------- src/services/msgconsumer/main.go | 88 ++++++++++++++++++- 3 files changed, 192 insertions(+), 41 deletions(-) diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index 79b8b36..1fc8054 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -11,4 +11,6 @@ const ( VideoGetEvent = "video.get.action" VideoCommentEvent = "video.comment.action" VideoPublishEvent = "video.publish.action" + + MessageActionEvent = "message_send" ) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 23c2ac1..b3fbfdc 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -11,9 +11,13 @@ import ( "GuGoTik/src/storage/redis" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" "context" + "encoding/json" "fmt" + "github.com/go-redis/redis_rate/v10" + "github.com/streadway/amqp" "github.com/sirupsen/logrus" ) @@ -24,11 +28,52 @@ type MessageServiceImpl struct { chat.ChatServiceServer } +// 连接 +var conn *amqp.Connection +var channel *amqp.Channel + +//输出 + +func failOnError(err error, msg string) { + //打日志 + logging.Logger.Errorf("err %s", msg) + +} + func (c MessageServiceImpl) New() { userRpcConn := grpc2.Connect(config.UserRpcServerName) UserClient = user.NewUserServiceClient(userRpcConn) + var err error + conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) + if err != nil { + failOnError(err, "Fialed to conenct to RabbitMQ") + } + channel, err = conn.Channel() + if err != nil { + failOnError(err, "Failed to open a channel") + } + _, err = channel.QueueDeclare( + strings.MessageActionEvent, + true, false, false, false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue") + } + } +func CloseMQConn() { + if err := channel.Close(); err != nil { + failOnError(err, "close channel error") + } + if err := conn.Close(); err != nil { + failOnError(err, "close conn error") + } +} + +//发送消息 + var chatActionLimitKeyPrefix = config.EnvCfg.RedisPrefix + "chat_freq_limit" const chatActionMaxQPS = 3 @@ -82,25 +127,25 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action return } - userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - "action_type": request.ActionType, - "content_text": request.Content, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - return &chat.ActionResponse{ - StatusCode: strings.UnableToAddMessageErrorCode, - StatusMsg: strings.UnableToAddMessageError, - }, err - } + /* userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{ + UserId: request.UserId, + }) + + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + "user_id": request.UserId, + "action_type": request.ActionType, + "content_text": request.Content, + }).Errorf("User service error") + logging.SetSpanError(span, err) + + return &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageError, + }, err + } */ res, err = addMessage(ctx, request.ActorId, request.UserId, request.Content) if err != nil { @@ -132,25 +177,25 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) "pre_msg_time": request.PreMsgTime, }).Debugf("Process start") - userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - resp = &chat.ChatResponse{ - StatusCode: strings.UnableToQueryMessageErrorCode, - StatusMsg: strings.UnableToQueryMessageError, - } - return - } - + /* userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{ + UserId: request.UserId, + }) + + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + "user_id": request.UserId, + }).Errorf("User service error") + logging.SetSpanError(span, err) + + resp = &chat.ChatResponse{ + StatusCode: strings.UnableToQueryMessageErrorCode, + StatusMsg: strings.UnableToQueryMessageError, + } + return + } + */ toUserId := request.UserId fromUserId := request.ActorId @@ -222,10 +267,28 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context } //TO_DO 后面写mq? - result := database.Client.WithContext(ctx).Create(&message) - if result.Error != nil { + body, err := json.Marshal(message) + if err != nil { + resp = &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageError, + } + return + } + headers := rabbitmq.InjectAMQPHeaders(ctx) + err = channel.Publish("", "strings.MessageActionEvent", false, false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/json", + Body: body, + Headers: headers, + }) + + // result := database.Client.WithContext(ctx).Create(&message) + + if err != nil { resp = &chat.ActionResponse{ StatusCode: strings.UnableToAddMessageErrorCode, StatusMsg: strings.UnableToAddMessageError, diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 1f46d05..812efe8 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -1,5 +1,91 @@ package main +import ( + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/models" + "GuGoTik/src/storage/database" + "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/rabbitmq" + "context" + "encoding/json" + + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" +) + +func failOnError(err error, msg string) { + //打日志 + logging.Logger.Errorf("err %s", msg) + +} + func main() { - select {} + conn, err := amqp.Dial(rabbitmq.BuildMQConnAddr()) + if err != nil { + failOnError(err, "Fialed to conenct to RabbitMQ") + } + + defer func(conn *amqp.Connection) { + err := conn.Close() + failOnError(err, "Fialed to close conn") + }(conn) + channel, err := conn.Channel() + if err != nil { + failOnError(err, "Failed to open a channel") + } + + _, err = channel.QueueDeclare( + strings.MessageActionEvent, + true, false, false, false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue") + } + + msg, err := channel.Consume( + strings.MessageActionEvent, + "", + false, false, false, false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue") + } + + var foreever chan struct{} + + logger := logging.LogService("VideoPicker") + logger.Infof(strings.VideoPicker + " is running now") + go func() { + var message models.Message + for body := range msg { + ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) + ctx, span := tracing.Tracer.Start(ctx, "message_send Service") + logger := logging.LogService("message_send").WithContext(ctx) + if err := json.Unmarshal(body.Body, &message); err != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + logging.SetSpanError(span, err) + return + } + + result := database.Client.WithContext(ctx).Create(&message) + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": result.Error, + }).Errorf("Error when insert message to database.") + logging.SetSpanError(span, err) + return + } + } + }() + <-foreever + } From 91e7f00e7a2ea51f53bea851554528aceaea0fd2 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 30 Aug 2023 00:19:52 +0800 Subject: [PATCH 2/2] add mq chat --- src/services/message/handler.go | 49 +++++--------------------------- src/services/msgconsumer/main.go | 37 +++++++++++++++++------- 2 files changed, 34 insertions(+), 52 deletions(-) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 991d345..ff6ba64 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -53,7 +53,6 @@ func failOnError(err error, msg string) { } func (c MessageServiceImpl) New() { - userRpcConn := grpc2.Connect(config.UserRpcServerName) var err error conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) @@ -66,13 +65,15 @@ func (c MessageServiceImpl) New() { } _, err = channel.QueueDeclare( strings.MessageActionEvent, - true, false, false, false, + false, false, false, false, nil, ) if err != nil { failOnError(err, "Failed to define queue") } + userRpcConn := grpc2.Connect(config.UserRpcServerName) + userClient = user.NewUserServiceClient(userRpcConn) recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) @@ -88,6 +89,7 @@ func (c MessageServiceImpl) New() { chatClient = chat.NewChatServiceClient(chatRpcConn) cronRunner := cron.New(cron.WithSeconds()) + //_, err := cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00 _, err = cronRunner.AddFunc("@every 2m", sendMagicMessage) // execute every minute [for test] @@ -98,6 +100,7 @@ func (c MessageServiceImpl) New() { } cronRunner.Start() + } func CloseMQConn() { @@ -164,25 +167,6 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action return } - /* userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - "action_type": request.ActionType, - "content_text": request.Content, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - return &chat.ActionResponse{ - StatusCode: strings.UnableToAddMessageErrorCode, - StatusMsg: strings.UnableToAddMessageError, - }, err - } */ userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ UserId: request.UserId, }) @@ -233,25 +217,6 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) "pre_msg_time": request.PreMsgTime, }).Debugf("Process start") - /* userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - resp = &chat.ChatResponse{ - StatusCode: strings.UnableToQueryMessageErrorCode, - StatusMsg: strings.UnableToQueryMessageError, - } - return - } - */ userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ UserId: request.UserId, }) @@ -362,10 +327,10 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context return } headers := rabbitmq.InjectAMQPHeaders(ctx) - err = channel.Publish("", "strings.MessageActionEvent", false, false, + err = channel.Publish("", strings.MessageActionEvent, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, - ContentType: "text/json", + ContentType: "text/plain", Body: body, Headers: headers, }) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index fcceca9..5f7af8c 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -2,13 +2,13 @@ package main import ( "GuGoTik/src/constant/strings" - "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/storage/database" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/rabbitmq" "context" "encoding/json" + "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) @@ -24,7 +24,6 @@ func main() { if err != nil { failOnError(err, "Fialed to conenct to RabbitMQ") } - defer func(conn *amqp.Connection) { err := conn.Close() failOnError(err, "Fialed to close conn") @@ -36,7 +35,7 @@ func main() { _, err = channel.QueueDeclare( strings.MessageActionEvent, - true, false, false, false, + false, false, false, false, nil, ) if err != nil { @@ -46,7 +45,7 @@ func main() { msg, err := channel.Consume( strings.MessageActionEvent, "", - false, false, false, false, + true, false, false, false, nil, ) if err != nil { @@ -55,14 +54,24 @@ func main() { var foreever chan struct{} - logger := logging.LogService("VideoPicker") - logger.Infof(strings.VideoPicker + " is running now") + logger := logging.LogService("msgConsumer") + logger.Infof(strings.MessageActionEvent + " is running now") go func() { var message models.Message for body := range msg { - ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) + if err := json.Unmarshal(body.Body, &message); err != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + return + } + + /* ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) ctx, span := tracing.Tracer.Start(ctx, "message_send Service") logger := logging.LogService("message_send").WithContext(ctx) + if err := json.Unmarshal(body.Body, &message); err != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, @@ -71,20 +80,28 @@ func main() { }).Errorf("Error when unmarshaling the prepare json body.") logging.SetSpanError(span, err) return - } + } */ - result := database.Client.WithContext(ctx).Create(&message) + pmessage := models.Message{ + ToUserId: message.ToUserId, + FromUserId: message.FromUserId, + ConversationId: message.ConversationId, + Content: message.Content, + } + logger.Info(pmessage) + result := database.Client.WithContext(context.Background()).Create(&pmessage) if result.Error != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, "to_id": message.ToUserId, "err": result.Error, }).Errorf("Error when insert message to database.") - logging.SetSpanError(span, err) + // logging.SetSpanError(span, err) return } } }() + <-foreever }