Skip to content

Commit

Permalink
Merge pull request #202 from XFFFCCCC/dev
Browse files Browse the repository at this point in the history
add mq to chat
  • Loading branch information
liaosunny123 authored Aug 30, 2023
2 parents 1a1ea3c + 91e7f00 commit fd7e500
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/constant/strings/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ const (
VideoGetEvent = "video.get.action"
VideoCommentEvent = "video.comment.action"
VideoPublishEvent = "video.publish.action"

MessageActionEvent = "message_send"
)
76 changes: 72 additions & 4 deletions src/services/message/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ import (
grpc2 "GuGoTik/src/utils/grpc"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/ptr"
"GuGoTik/src/utils/rabbitmq"
"context"
"encoding/json"
"fmt"

"time"

"github.com/go-redis/redis_rate/v10"
"github.com/robfig/cron/v3"
"github.com/streadway/amqp"
"gorm.io/gorm"
"time"

"github.com/sirupsen/logrus"
)
Expand All @@ -35,8 +40,40 @@ type MessageServiceImpl struct {
chat.ChatServiceServer
}

// 连接
var conn *amqp.Connection
var channel *amqp.Channel

//输出

func failOnError(err error, msg string) {
//打日志
logging.Logger.Errorf("err %s", msg)

}

func (c MessageServiceImpl) New() {

var err error
conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr())
if err != nil {
failOnError(err, "Fialed to conenct to RabbitMQ")
}
channel, err = conn.Channel()
if err != nil {
failOnError(err, "Failed to open a channel")
}
_, err = channel.QueueDeclare(
strings.MessageActionEvent,
false, false, false, false,
nil,
)
if err != nil {
failOnError(err, "Failed to define queue")
}

userRpcConn := grpc2.Connect(config.UserRpcServerName)

userClient = user.NewUserServiceClient(userRpcConn)

recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName)
Expand All @@ -52,8 +89,9 @@ func (c MessageServiceImpl) New() {
chatClient = chat.NewChatServiceClient(chatRpcConn)

cronRunner := cron.New(cron.WithSeconds())

//_, err := cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00
_, err := cronRunner.AddFunc("@every 2m", sendMagicMessage) // execute every minute [for test]
_, err = cronRunner.AddFunc("@every 2m", sendMagicMessage) // execute every minute [for test]

if err != nil {
logging.Logger.WithFields(logrus.Fields{
Expand All @@ -62,8 +100,20 @@ func (c MessageServiceImpl) New() {
}

cronRunner.Start()

}

func CloseMQConn() {
if err := channel.Close(); err != nil {
failOnError(err, "close channel error")
}
if err := conn.Close(); err != nil {
failOnError(err, "close conn error")
}
}

//发送消息

var chatActionLimitKeyPrefix = config.EnvCfg.RedisPrefix + "chat_freq_limit"

const chatActionMaxQPS = 3
Expand Down Expand Up @@ -266,10 +316,28 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context
}

//TO_DO 后面写mq?
result := database.Client.WithContext(ctx).Create(&message)

if result.Error != nil {
body, err := json.Marshal(message)

if err != nil {
resp = &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageError,
}
return
}
headers := rabbitmq.InjectAMQPHeaders(ctx)
err = channel.Publish("", strings.MessageActionEvent, false, false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
Headers: headers,
})

// result := database.Client.WithContext(ctx).Create(&message)

if err != nil {
resp = &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageError,
Expand Down
103 changes: 100 additions & 3 deletions src/services/msgconsumer/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,107 @@
package main

import (
"fmt"
"time"
"GuGoTik/src/constant/strings"
"GuGoTik/src/models"
"GuGoTik/src/storage/database"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/rabbitmq"
"context"
"encoding/json"

"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
//打日志
logging.Logger.Errorf("err %s", msg)

}

func main() {
fmt.Println(time.Now().UTC().Format(time.RFC3339))
conn, err := amqp.Dial(rabbitmq.BuildMQConnAddr())
if err != nil {
failOnError(err, "Fialed to conenct to RabbitMQ")
}
defer func(conn *amqp.Connection) {
err := conn.Close()
failOnError(err, "Fialed to close conn")
}(conn)
channel, err := conn.Channel()
if err != nil {
failOnError(err, "Failed to open a channel")
}

_, err = channel.QueueDeclare(
strings.MessageActionEvent,
false, false, false, false,
nil,
)
if err != nil {
failOnError(err, "Failed to define queue")
}

msg, err := channel.Consume(
strings.MessageActionEvent,
"",
true, false, false, false,
nil,
)
if err != nil {
failOnError(err, "Failed to define queue")
}

var foreever chan struct{}

logger := logging.LogService("msgConsumer")
logger.Infof(strings.MessageActionEvent + " is running now")
go func() {
var message models.Message
for body := range msg {
if err := json.Unmarshal(body.Body, &message); err != nil {
logger.WithFields(logrus.Fields{
"from_id": message.FromUserId,
"to_id": message.ToUserId,
"err": err,
}).Errorf("Error when unmarshaling the prepare json body.")
return
}

/* ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers)
ctx, span := tracing.Tracer.Start(ctx, "message_send Service")
logger := logging.LogService("message_send").WithContext(ctx)
if err := json.Unmarshal(body.Body, &message); err != nil {
logger.WithFields(logrus.Fields{
"from_id": message.FromUserId,
"to_id": message.ToUserId,
"err": err,
}).Errorf("Error when unmarshaling the prepare json body.")
logging.SetSpanError(span, err)
return
} */

pmessage := models.Message{
ToUserId: message.ToUserId,
FromUserId: message.FromUserId,
ConversationId: message.ConversationId,
Content: message.Content,
}
logger.Info(pmessage)
result := database.Client.WithContext(context.Background()).Create(&pmessage)
if result.Error != nil {
logger.WithFields(logrus.Fields{
"from_id": message.FromUserId,
"to_id": message.ToUserId,
"err": result.Error,
}).Errorf("Error when insert message to database.")
// logging.SetSpanError(span, err)
return
}
}
}()

<-foreever

}

0 comments on commit fd7e500

Please sign in to comment.