Skip to content

Commit

Permalink
add mq chat
Browse files Browse the repository at this point in the history
  • Loading branch information
XFFFCCCC committed Aug 29, 2023
1 parent b15ad4c commit 91e7f00
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 52 deletions.
49 changes: 7 additions & 42 deletions src/services/message/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func failOnError(err error, msg string) {
}

func (c MessageServiceImpl) New() {
userRpcConn := grpc2.Connect(config.UserRpcServerName)

var err error
conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr())
Expand All @@ -66,13 +65,15 @@ func (c MessageServiceImpl) New() {
}
_, err = channel.QueueDeclare(
strings.MessageActionEvent,
true, false, false, false,
false, false, false, false,
nil,
)
if err != nil {
failOnError(err, "Failed to define queue")
}

userRpcConn := grpc2.Connect(config.UserRpcServerName)

userClient = user.NewUserServiceClient(userRpcConn)

recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName)
Expand All @@ -88,6 +89,7 @@ func (c MessageServiceImpl) New() {
chatClient = chat.NewChatServiceClient(chatRpcConn)

cronRunner := cron.New(cron.WithSeconds())

//_, err := cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00
_, err = cronRunner.AddFunc("@every 2m", sendMagicMessage) // execute every minute [for test]

Expand All @@ -98,6 +100,7 @@ func (c MessageServiceImpl) New() {
}

cronRunner.Start()

}

func CloseMQConn() {
Expand Down Expand Up @@ -164,25 +167,6 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action
return
}

/* userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{
UserId: request.UserId,
})
if err != nil || userResponse.StatusCode != strings.ServiceOKCode {
logger.WithFields(logrus.Fields{
"err": err,
"ActorId": request.ActorId,
"user_id": request.UserId,
"action_type": request.ActionType,
"content_text": request.Content,
}).Errorf("User service error")
logging.SetSpanError(span, err)
return &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageError,
}, err
} */
userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{
UserId: request.UserId,
})
Expand Down Expand Up @@ -233,25 +217,6 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest)
"pre_msg_time": request.PreMsgTime,
}).Debugf("Process start")

/* userResponse, err := UserClient.GetUserExistInformation(ctx, &user.UserExistRequest{
UserId: request.UserId,
})
if err != nil || userResponse.StatusCode != strings.ServiceOKCode {
logger.WithFields(logrus.Fields{
"err": err,
"ActorId": request.ActorId,
"user_id": request.UserId,
}).Errorf("User service error")
logging.SetSpanError(span, err)
resp = &chat.ChatResponse{
StatusCode: strings.UnableToQueryMessageErrorCode,
StatusMsg: strings.UnableToQueryMessageError,
}
return
}
*/
userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{
UserId: request.UserId,
})
Expand Down Expand Up @@ -362,10 +327,10 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context
return
}
headers := rabbitmq.InjectAMQPHeaders(ctx)
err = channel.Publish("", "strings.MessageActionEvent", false, false,
err = channel.Publish("", strings.MessageActionEvent, false, false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/json",
ContentType: "text/plain",
Body: body,
Headers: headers,
})
Expand Down
37 changes: 27 additions & 10 deletions src/services/msgconsumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package main

import (
"GuGoTik/src/constant/strings"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/storage/database"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/rabbitmq"
"context"
"encoding/json"

"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
Expand All @@ -24,7 +24,6 @@ func main() {
if err != nil {
failOnError(err, "Fialed to conenct to RabbitMQ")
}

defer func(conn *amqp.Connection) {
err := conn.Close()
failOnError(err, "Fialed to close conn")
Expand All @@ -36,7 +35,7 @@ func main() {

_, err = channel.QueueDeclare(
strings.MessageActionEvent,
true, false, false, false,
false, false, false, false,
nil,
)
if err != nil {
Expand All @@ -46,7 +45,7 @@ func main() {
msg, err := channel.Consume(
strings.MessageActionEvent,
"",
false, false, false, false,
true, false, false, false,
nil,
)
if err != nil {
Expand All @@ -55,14 +54,24 @@ func main() {

var foreever chan struct{}

logger := logging.LogService("VideoPicker")
logger.Infof(strings.VideoPicker + " is running now")
logger := logging.LogService("msgConsumer")
logger.Infof(strings.MessageActionEvent + " is running now")
go func() {
var message models.Message
for body := range msg {
ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers)
if err := json.Unmarshal(body.Body, &message); err != nil {
logger.WithFields(logrus.Fields{
"from_id": message.FromUserId,
"to_id": message.ToUserId,
"err": err,
}).Errorf("Error when unmarshaling the prepare json body.")
return
}

/* ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers)
ctx, span := tracing.Tracer.Start(ctx, "message_send Service")
logger := logging.LogService("message_send").WithContext(ctx)
if err := json.Unmarshal(body.Body, &message); err != nil {
logger.WithFields(logrus.Fields{
"from_id": message.FromUserId,
Expand All @@ -71,20 +80,28 @@ func main() {
}).Errorf("Error when unmarshaling the prepare json body.")
logging.SetSpanError(span, err)
return
}
} */

result := database.Client.WithContext(ctx).Create(&message)
pmessage := models.Message{
ToUserId: message.ToUserId,
FromUserId: message.FromUserId,
ConversationId: message.ConversationId,
Content: message.Content,
}
logger.Info(pmessage)
result := database.Client.WithContext(context.Background()).Create(&pmessage)
if result.Error != nil {
logger.WithFields(logrus.Fields{
"from_id": message.FromUserId,
"to_id": message.ToUserId,
"err": result.Error,
}).Errorf("Error when insert message to database.")
logging.SetSpanError(span, err)
// logging.SetSpanError(span, err)
return
}
}
}()

<-foreever

}

0 comments on commit 91e7f00

Please sign in to comment.