From be3663e525e7cba2c9b74e1325b415c27e69e2ea Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 16 Aug 2023 03:33:48 -0700 Subject: [PATCH 1/7] chat --- src/services/message/handler.go | 140 ++++++++++++++++++++++++++++++++ src/services/message/main.go | 65 +++++++++++++++ src/web/main.go | 11 +++ src/web/message/handler.go | 106 ++++++++++++++++++++++++ 4 files changed, 322 insertions(+) create mode 100644 src/services/message/handler.go create mode 100644 src/services/message/main.go create mode 100644 src/web/message/handler.go diff --git a/src/services/message/handler.go b/src/services/message/handler.go new file mode 100644 index 0000000..c172393 --- /dev/null +++ b/src/services/message/handler.go @@ -0,0 +1,140 @@ +package main + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/models" + "GuGoTik/src/rpc/chat" + "GuGoTik/src/rpc/user" + "GuGoTik/src/storage/database" + grpc2 "GuGoTik/src/utils/grpc" + "GuGoTik/src/utils/logging" + "context" + + "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/trace" +) + +var UserClient user.UserServiceClient + +type MessageServiceImpl struct { + chat.ChatServiceServer +} + +func init() { + userRpcConn := grpc2.Connect(config.UserRpcServerName) + UserClient = user.NewUserServiceClient(userRpcConn) +} + +func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.ActionRequest) (res *chat.ActionResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "MessageActionService") + defer span.End() + logger := logging.LogService("MessageService.ActionMessage").WithContext(ctx) + + logger.WithFields(logrus.Fields{ + "actor_id": request.ActorId, + "user_id": request.UserId, + "action_type": request.ActionType, + "Content_text": request.Content, + }) + logger.Debugf("Process start") + + userResponse, err := UserClient.GetUserInfo(ctx, &user.UserRequest{ + UserId: request.ActorId, + ActorId: request.ActorId, + }) + + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + }).Errorf("User service error") + logging.SetSpanError(span, err) + + return &chat.ActionResponse{ + StatusCode: strings.UnableToQueryUserErrorCode, + StatusMsg: strings.UnableToQueryUserError, + }, nil + } + + pUser := userResponse.User + + res, err = addMessage(ctx, logger, span, pUser, request.UserId, request.Content) + + if err != nil { + return res, err + } + + logger.WithFields(logrus.Fields{ + "response": res, + }).Debugf("Process done.") + + return res, err +} + +// Chat(context.Context, *ChatRequest) (*ChatResponse, error) +func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) (resp *chat.ChatResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "MessageService") + defer span.End() + logger := logging.LogService("MessageService.chat").WithContext(ctx) + logger.WithFields(logrus.Fields{ + "user_id": request.UserId, + "from_id": request.ActorId, + }) + logger.Debugf("Process start") + + var rMessageList []*chat.Message + result := database.Client.WithContext(ctx).Where("to_user_id=?", request.UserId).Find(&rMessageList) + + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "err": result.Error, + }).Errorf("MessageServiceImpl list comment failed to response when listing message") + logging.SetSpanError(span, err) + + resp = &chat.ChatResponse{ + StatusCode: strings.UnableToQueryCommentErrorCode, + StatusMsg: strings.UnableToQueryCommentError, + } + return + } + + resp = &chat.ChatResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + MessageList: rMessageList, + } + + logger.WithFields(logrus.Fields{ + "response": resp, + }).Debugf("Process done.") + + return +} + +func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, pUser *user.User, to_user_id uint32, Context string) (resp *chat.ActionResponse, err error) { + message := models.Message{ + To_user_id: to_user_id, + From_user_id: pUser.Id, + Content: Context, + } + + result := database.Client.WithContext(ctx).Create(&message) + + if result.Error != nil { + //TO_DO 错误 替换 + resp = &chat.ActionResponse{ + StatusCode: 400, + StatusMsg: "发生错误了", + } + return + } + + resp = &chat.ActionResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + } + return + +} diff --git a/src/services/message/main.go b/src/services/message/main.go new file mode 100644 index 0000000..d3ccf62 --- /dev/null +++ b/src/services/message/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/extra/profiling" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/rpc/chat" + "GuGoTik/src/rpc/health" + healthImpl "GuGoTik/src/services/health" + "GuGoTik/src/utils/consul" + "GuGoTik/src/utils/logging" + "context" + "net" + + "github.com/sirupsen/logrus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" +) + +func main() { + tp, err := tracing.SetTraceProvider(config.MessageRpcServerName) + + if err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Panicf("Error to set the trace") + } + defer func() { + if err := tp.Shutdown(context.Background()); err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error to set the trace") + } + }() + + // Configure Pyroscope + profiling.InitPyroscope("GuGoTik.MessageService") + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + ) + + log := logging.LogService(config.MessageRpcServerName) + + lis, err := net.Listen("tcp", config.MessageRpcServerPort) + + if err != nil { + log.Panicf("Rpc %s listen happens error: %v", config.MessageRpcServerName, err) + } + + var srv MessageServiceImpl + var probe healthImpl.ProbeImpl + + chat.RegisterChatServiceServer(s, srv) + + health.RegisterHealthServer(s, &probe) + + if err := consul.RegisterConsul(config.MessageRpcServerName, config.MessageRpcServerPort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.MessageRpcServerName, err) + } + log.Infof("Rpc %s is running at %s now", config.MessageRpcServerName, config.MessageRpcServerPort) + if err := s.Serve(lis); err != nil { + log.Panicf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err) + } +} diff --git a/src/web/main.go b/src/web/main.go index 8ba82d5..85f18fe 100644 --- a/src/web/main.go +++ b/src/web/main.go @@ -9,8 +9,11 @@ import ( "GuGoTik/src/web/auth" comment2 "GuGoTik/src/web/comment" feed2 "GuGoTik/src/web/feed" + message2 "GuGoTik/src/web/message" "GuGoTik/src/web/middleware" "context" + "fmt" + "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -68,6 +71,14 @@ func main() { comment.GET("/list", comment2.ListCommentHandler) comment.GET("/count", comment2.CountCommentHandler) } + //todo + message := rootPath.Group("/message") + { + message.GET("/chat", message2.ListMessageHandler) + message.POST("/action", message2.ActionMessageHandler) + fmt.Println(message) + } + // Run Server if err := g.Run(config.WebServiceAddr); err != nil { panic("Can not run GuGoTik Gateway, binding port: " + config.WebServiceAddr) diff --git a/src/web/message/handler.go b/src/web/message/handler.go new file mode 100644 index 0000000..d6454f5 --- /dev/null +++ b/src/web/message/handler.go @@ -0,0 +1,106 @@ +package message + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/rpc/chat" + grpc2 "GuGoTik/src/utils/grpc" + "GuGoTik/src/utils/logging" + "GuGoTik/src/web/models" + + "net/http" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" +) + +var Client chat.ChatServiceClient + +func init() { + conn := grpc2.Connect(config.MessageRpcServerName) + Client = chat.NewChatServiceClient(conn) +} + +func ActionMessageHandler(c *gin.Context) { + var req models.SMessageReq + _, span := tracing.Tracer.Start(c.Request.Context(), "ActionMessageHandler") + defer span.End() + logger := logging.LogService("GateWay.ActionMessage").WithContext(c.Request.Context()) + + if err := c.ShouldBindQuery(&req); err != nil { + logger.WithFields(logrus.Fields{ + //"CreateTime": req.Create_time, + "err": err, + }).Warnf("Error when trying to bind query") + c.JSON(http.StatusOK, models.ActionCommentRes{ + StatusCode: strings.GateWayParamsErrorCode, + StatusMsg: strings.GateWayParamsError, + }) + return + } + + var res *chat.ActionResponse + var err error + + res, err = Client.ChatAction(c.Request.Context(), &chat.ActionRequest{ + ActorId: uint32(req.ActorId), + UserId: uint32(req.User_id), + ActionType: uint32(req.Action_type), + Content: req.Content, + }) + + if err != nil { + logger.WithFields(logrus.Fields{ + "actor_id": req.ActorId, + "content": req.Content, + }).Warnf("Error when trying to connect with ActionMessageHandler") + + //这个位置返回状态是不是有问题? + c.JSON(http.StatusOK, res) + return + } + logger.WithFields(logrus.Fields{ + "actor_id": req.ActorId, + "content": req.Content, + }).Infof("Action send message success") + + c.JSON(http.StatusOK, res) +} + +func ListMessageHandler(c *gin.Context) { + var req models.ListMessageReq + _, span := tracing.Tracer.Start(c.Request.Context(), "ListMessageHandler") + defer span.End() + logger := logging.LogService("GateWay.ListMessage").WithContext(c.Request.Context()) + + if err := c.ShouldBindQuery(&req); err != nil { + c.JSON(http.StatusOK, models.ListCommentRes{ + StatusCode: strings.GateWayParamsErrorCode, + StatusMsg: strings.GateWayParamsError, + }) + return + } + + res, err := Client.Chat(c.Request.Context(), &chat.ChatRequest{ + ActorId: req.ActorId, + UserId: req.UserId, + PreMsgTime: req.PreMsgTime, + }) + + if err != nil { + logger.WithFields(logrus.Fields{ + "actor_id": req.ActorId, + "user_id": req.UserId, + }).Warnf("Error when trying to connect with ListMessageHandler") + c.JSON(http.StatusOK, res) + return + } + + logger.WithFields(logrus.Fields{ + "actor_id": req.ActorId, + "user_id": req.UserId, + }).Infof("List comment success") + + c.JSON(http.StatusOK, res) +} From abab808e16c16a4f606275f0143ac7e87054ca16 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 16 Aug 2023 03:35:14 -0700 Subject: [PATCH 2/7] chat --- src/constant/config/service.go | 3 +++ src/models/message.go | 22 ++++++++++++++++++++ src/web/models/Message.go | 37 ++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 src/models/message.go create mode 100644 src/web/models/Message.go diff --git a/src/constant/config/service.go b/src/constant/config/service.go index 4391f94..da71199 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -20,3 +20,6 @@ const PublishRpcServerPort = ":37005" const FavoriteRpcServerName = "GuGoTik-FavoriteService" const FavoriteRpcServerPort = ":37006" + +const MessageRpcServerName = "GuGoTik-MessageService" +const MessageRpcServerPort = ":37010" diff --git a/src/models/message.go b/src/models/message.go new file mode 100644 index 0000000..7e8cd8d --- /dev/null +++ b/src/models/message.go @@ -0,0 +1,22 @@ +package models + +import ( + "GuGoTik/src/storage/database" + + "gorm.io/gorm" +) + +type Message struct { + ID uint32 `gorm:"not null;primarykey;autoIncrement"` + To_user_id uint32 `gorm:"not null"` + From_user_id uint32 `gorm:"not null"` + Content string `gorm:"not null"` + // Create_time time.Time `gorm:"not null"` + gorm.Model +} + +func init() { + if err := database.Client.AutoMigrate(&Message{}); err != nil { + panic(err) + } +} diff --git a/src/web/models/Message.go b/src/web/models/Message.go new file mode 100644 index 0000000..9a27c50 --- /dev/null +++ b/src/web/models/Message.go @@ -0,0 +1,37 @@ +package models + +import ( + "GuGoTik/src/rpc/chat" +) + +// 这个是发数据的数据结构 +type SMessageReq struct { + Token string `form:"token" binding:"required"` + ActorId int `form:"actor_id"` + User_id int `form:"user_id"` + Content string `form:"content"` + Action_type int `form:"action_type"` // send message + //Create_time string //time maybe have some question +} + +// 收的状态 +// statuc code 状态码 0- 成功 其他值 -失败 +// status_msg 返回状态描述 +type SMessageRes struct { + Status_code int `json:"status_code"` + Status_msg string `json:"status_msg"` +} + +type ListMessageReq struct { + Token string `form:"token" binding:"required"` + ActorId uint32 `form:"actor_id"` + UserId uint32 `from:"user_id"` + + PreMsgTime uint32 `from:"preMsgTime"` +} + +type ListMessageRes struct { + StatusCode int `json:"status_code"` + StatusMsg string `json:"status_msg"` + MessageList []*chat.Message `json:"message_list"` +} From 4a7f8527bd582e71516156664b5479e6bb104335 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 16 Aug 2023 03:35:44 -0700 Subject: [PATCH 3/7] chat message --- test/rpc/messagerpc_test.go | 49 +++++++++++++++++++++++++ test/web/message_test.go | 71 +++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 test/rpc/messagerpc_test.go create mode 100644 test/web/message_test.go diff --git a/test/rpc/messagerpc_test.go b/test/rpc/messagerpc_test.go new file mode 100644 index 0000000..f6b8280 --- /dev/null +++ b/test/rpc/messagerpc_test.go @@ -0,0 +1,49 @@ +package rpc + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/rpc/chat" + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var chatClient chat.ChatServiceClient + +func setups() { + conn, _ := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.MessageRpcServerPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) + chatClient = chat.NewChatServiceClient(conn) +} + +func TestActionMessage_Add(t *testing.T) { + setups() + res, err := chatClient.ChatAction(context.Background(), &chat.ActionRequest{ + ActorId: 2, + UserId: 2, + ActionType: 1, + Content: "Test message13241234", + }) + + assert.Empty(t, err) + assert.Equal(t, int32(0), res.StatusCode) + +} + +func TestChat(t *testing.T) { + setups() + res, err := chatClient.Chat(context.Background(), &chat.ChatRequest{ + ActorId: 1, + UserId: 1, + PreMsgTime: 0, + }) + + assert.Empty(t, err) + assert.Equal(t, int32(0), res.StatusCode) + +} diff --git a/test/web/message_test.go b/test/web/message_test.go new file mode 100644 index 0000000..f58ce15 --- /dev/null +++ b/test/web/message_test.go @@ -0,0 +1,71 @@ +package web + +import ( + "GuGoTik/src/web/models" + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestActionMessage_Add(t *testing.T) { + + var client = &http.Client{} + var baseUrl = "http://127.0.0.1:37000/douyin/message" + url := baseUrl + "/action" + method := "POST" + req, err := http.NewRequest(method, url, nil) + q := req.URL.Query() + q.Add("token", "2f54104f-96d1-4ba3-94af-ee2e6b72dc5f") + q.Add("actor_id", "1") + q.Add("user_id", "1") + q.Add("action_type", "1") + q.Add("content", "test comment") + req.URL.RawQuery = q.Encode() + + assert.Empty(t, err) + + res, err := client.Do(req) + assert.Empty(t, err) + defer func(Body io.ReadCloser) { + err := Body.Close() + assert.Empty(t, err) + }(res.Body) + + body, err := io.ReadAll(res.Body) + assert.Empty(t, err) + message := &models.ListMessageRes{} + err = json.Unmarshal(body, &message) + assert.Empty(t, err) + assert.Equal(t, 0, message.StatusCode) +} + +func TestChat(t *testing.T) { + var client = &http.Client{} + var baseUrl = "http://127.0.0.1:37000/douyin/message" + url := baseUrl + "/chat" + method := "GET" + req, err := http.NewRequest(method, url, nil) + q := req.URL.Query() + q.Add("token", "2f54104f-96d1-4ba3-94af-ee2e6b72dc5f") + q.Add("actor_id", "1") + q.Add("user_id", "1") + q.Add("perMsgTime", "0") + req.URL.RawQuery = q.Encode() + + res, err := client.Do(req) + assert.Empty(t, err) + defer func(Body io.ReadCloser) { + err := Body.Close() + assert.Empty(t, err) + }(res.Body) + + body, err := io.ReadAll(res.Body) + assert.Empty(t, err) + listMessage := &models.ListMessageRes{} + err = json.Unmarshal(body, &listMessage) + assert.Empty(t, err) + assert.Equal(t, 0, listMessage.StatusCode) +} From f2ca1757e98a517b3c8e680987419eeb7f83e3e2 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Wed, 16 Aug 2023 04:33:35 -0700 Subject: [PATCH 4/7] test --- test/web/message_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/web/message_test.go b/test/web/message_test.go index f58ce15..f682d46 100644 --- a/test/web/message_test.go +++ b/test/web/message_test.go @@ -3,6 +3,7 @@ package web import ( "GuGoTik/src/web/models" "encoding/json" + "fmt" "io" "net/http" "testing" @@ -48,12 +49,14 @@ func TestChat(t *testing.T) { url := baseUrl + "/chat" method := "GET" req, err := http.NewRequest(method, url, nil) + q := req.URL.Query() - q.Add("token", "2f54104f-96d1-4ba3-94af-ee2e6b72dc5f") + q.Add("token", "1206dfe1-5f2a-44fa-a121-ef27b0fe5f8d") q.Add("actor_id", "1") q.Add("user_id", "1") q.Add("perMsgTime", "0") req.URL.RawQuery = q.Encode() + assert.Empty(t, err) res, err := client.Do(req) assert.Empty(t, err) @@ -65,6 +68,7 @@ func TestChat(t *testing.T) { body, err := io.ReadAll(res.Body) assert.Empty(t, err) listMessage := &models.ListMessageRes{} + fmt.Println(listMessage) err = json.Unmarshal(body, &listMessage) assert.Empty(t, err) assert.Equal(t, 0, listMessage.StatusCode) From 22d27af658a32d985b240142a0c3e329bbb1d02a Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Sun, 20 Aug 2023 20:03:25 -0700 Subject: [PATCH 5/7] pull merge --- src/services/comment/handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/comment/handler.go b/src/services/comment/handler.go index 7c37f71..1967971 100644 --- a/src/services/comment/handler.go +++ b/src/services/comment/handler.go @@ -142,6 +142,7 @@ func (c CommentServiceImpl) ListComment(ctx context.Context, request *comment.Li "pComment": pComment, }).Errorf("Unable to get user info") logging.SetSpanError(span, err) + } rCommentList = append(rCommentList, &comment.Comment{ From 0750287c709244b0d14802e0e50d5da99f30e6ba Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Mon, 21 Aug 2023 01:33:12 -0700 Subject: [PATCH 6/7] =?UTF-8?q?=E9=83=A8=E5=88=86chat=20=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/constant/config/.env.example | 2 +- src/constant/config/service.go | 6 +-- src/constant/strings/err.go | 5 ++ src/models/message.go | 11 +++-- src/services/comment/handler.go | 15 +----- src/services/message/handler.go | 84 +++++++++++++++++++++----------- src/services/message/main.go | 2 +- src/services/user/handler.go | 2 + src/web/main.go | 5 +- src/web/message/handler.go | 18 ++++--- src/web/models/Message.go | 13 ++--- test/rpc/messagerpc_test.go | 10 ++-- test/web/message_test.go | 4 +- 13 files changed, 98 insertions(+), 79 deletions(-) diff --git a/src/constant/config/.env.example b/src/constant/config/.env.example index 12fad85..5549ff1 100644 --- a/src/constant/config/.env.example +++ b/src/constant/config/.env.example @@ -1,7 +1,7 @@ # Configure Consul address, the default address is `localhost:8500` # TIPS: If you provide `CONSUL_ANONYMITY_NAME`, all services will register with `CONSUL_ANONYMITY_NAME` as prefix CONSUL_ADDR=localhost:8500 -CONSUL_ANONYMITY_NAME=paraparty. +CONSUL_ANONYMITY_NAME= # Configure logger level, support: DEBUG, INFO, WARN (WARNING), ERROR, FATAL LOGGER_LEVEL=INFO # Cofigure logger integrated with otel, support: enable, disable diff --git a/src/constant/config/service.go b/src/constant/config/service.go index fa0458d..ea178b1 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -21,9 +21,7 @@ const PublishRpcServerPort = ":37005" const FavoriteRpcServerName = "GuGoTik-FavoriteService" const FavoriteRpcServerPort = ":37006" -<<<<<<< HEAD const MessageRpcServerName = "GuGoTik-MessageService" -const MessageRpcServerPort = ":37010" -======= +const MessageRpcServerPort = ":37007" + const VideoPicker = "GuGoTik-VideoPicker" ->>>>>>> tmp diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index 443dfc3..a96df19 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -26,6 +26,11 @@ const ( ActorIDNotMatchError = "用户不匹配" UnableToDeleteCommentErrorCode = 50008 UnableToDeleteCommentError = "无法删除视频评论" + + UnableToAddMessageErrorCode = 50009 + UnableToAddMessageRrror = "发送消息出错" + UnableToQueryMessageErrorCode = 50010 + UnableToQueryMessageError = "查消息出错" ) // Expected Error diff --git a/src/models/message.go b/src/models/message.go index 7e8cd8d..2add457 100644 --- a/src/models/message.go +++ b/src/models/message.go @@ -7,11 +7,14 @@ import ( ) type Message struct { - ID uint32 `gorm:"not null;primarykey;autoIncrement"` - To_user_id uint32 `gorm:"not null"` - From_user_id uint32 `gorm:"not null"` - Content string `gorm:"not null"` + ID uint32 `gorm:"not null;primarykey;autoIncrement"` + ToUserId uint32 `gorm:"not null" ` + FromUserId uint32 `gorm:"not null"` + ConversationId string `gorm:"not null" index:"conversationid"` + Content string `gorm:"not null"` + // Create_time time.Time `gorm:"not null"` + //Updatetime deleteTime gorm.Model } diff --git a/src/services/comment/handler.go b/src/services/comment/handler.go index 004721a..4c7461e 100644 --- a/src/services/comment/handler.go +++ b/src/services/comment/handler.go @@ -187,22 +187,10 @@ func (c CommentServiceImpl) ListComment(ctx context.Context, request *comment.Li rCommentList := make([]*comment.Comment, 0, result.RowsAffected) userMap := make(map[uint32]*user.User) for _, pComment := range pCommentList { -<<<<<<< HEAD - userResponse, err := UserClient.GetUserInfo(ctx, &user.UserRequest{ - UserId: pComment.UserId, - ActorId: request.ActorId, - }) - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "pComment": pComment, - }).Errorf("Unable to get user info") - logging.SetSpanError(span, err) - -======= userMap[pComment.UserId] = &user.User{} } getUserInfoError := false + wg := sync.WaitGroup{} wg.Add(len(userMap)) for userId := range userMap { @@ -230,7 +218,6 @@ func (c CommentServiceImpl) ListComment(ctx context.Context, request *comment.Li resp = &comment.ListCommentResponse{ StatusCode: strings.UnableToQueryUserErrorCode, StatusMsg: strings.UnableToQueryUserError, ->>>>>>> tmp } return } diff --git a/src/services/message/handler.go b/src/services/message/handler.go index c172393..e892f93 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -11,6 +11,7 @@ import ( grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "context" + "fmt" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/trace" @@ -28,41 +29,42 @@ func init() { } func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.ActionRequest) (res *chat.ActionResponse, err error) { - ctx, span := tracing.Tracer.Start(ctx, "MessageActionService") + ctx, span := tracing.Tracer.Start(ctx, "ChatActionService") defer span.End() - logger := logging.LogService("MessageService.ActionMessage").WithContext(ctx) + logger := logging.LogService("ChatService.ActionMessage").WithContext(ctx) logger.WithFields(logrus.Fields{ "actor_id": request.ActorId, "user_id": request.UserId, "action_type": request.ActionType, - "Content_text": request.Content, - }) - logger.Debugf("Process start") + "content_text": request.Content, + }).Debugf("Process start") userResponse, err := UserClient.GetUserInfo(ctx, &user.UserRequest{ - UserId: request.ActorId, ActorId: request.ActorId, + UserId: request.UserId, }) if err != nil || userResponse.StatusCode != strings.ServiceOKCode { logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, + "err": err, + "cctor_id": request.ActorId, }).Errorf("User service error") logging.SetSpanError(span, err) return &chat.ActionResponse{ - StatusCode: strings.UnableToQueryUserErrorCode, - StatusMsg: strings.UnableToQueryUserError, - }, nil + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageRrror, + }, err } - pUser := userResponse.User - - res, err = addMessage(ctx, logger, span, pUser, request.UserId, request.Content) - + res, err = addMessage(ctx, logger, span, request.ActorId, request.UserId, request.Content) if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "Actor_id": request.ActorId, + }).Errorf("User service error") + logging.SetSpanError(span, err) return res, err } @@ -75,27 +77,37 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action // Chat(context.Context, *ChatRequest) (*ChatResponse, error) func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) (resp *chat.ChatResponse, err error) { - ctx, span := tracing.Tracer.Start(ctx, "MessageService") + ctx, span := tracing.Tracer.Start(ctx, "ChatService") defer span.End() - logger := logging.LogService("MessageService.chat").WithContext(ctx) + logger := logging.LogService("ChatService.chat").WithContext(ctx) logger.WithFields(logrus.Fields{ "user_id": request.UserId, "from_id": request.ActorId, - }) - logger.Debugf("Process start") + }).Debugf("Process start") + toUserId := request.UserId + fromUserId := request.ActorId + + conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId) + + if toUserId > fromUserId { + conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId) + } + //这个地方应该取出多少条消息? + //TO DO 看怎么需要一下 var rMessageList []*chat.Message - result := database.Client.WithContext(ctx).Where("to_user_id=?", request.UserId).Find(&rMessageList) + result := database.Client.WithContext(ctx).Where("conversation_id=?", conversationId). + Order("created_at desc").Find(&rMessageList) if result.Error != nil { logger.WithFields(logrus.Fields{ "err": result.Error, - }).Errorf("MessageServiceImpl list comment failed to response when listing message") + }).Errorf("ChatServiceImpl list chat failed to response when listing message") logging.SetSpanError(span, err) resp = &chat.ChatResponse{ - StatusCode: strings.UnableToQueryCommentErrorCode, - StatusMsg: strings.UnableToQueryCommentError, + StatusCode: strings.UnableToQueryMessageErrorCode, + StatusMsg: strings.UnableToQueryMessageError, } return } @@ -113,20 +125,34 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) return } -func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, pUser *user.User, to_user_id uint32, Context string) (resp *chat.ActionResponse, err error) { +func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, fromUserId uint32, toUserId uint32, Context string) (resp *chat.ActionResponse, err error) { + conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId) + + if toUserId > fromUserId { + conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId) + } message := models.Message{ - To_user_id: to_user_id, - From_user_id: pUser.Id, - Content: Context, + ToUserId: toUserId, + FromUserId: fromUserId, + Content: Context, + ConversationId: conversationId, } + //TO_DO 后面写mq? result := database.Client.WithContext(ctx).Create(&message) if result.Error != nil { //TO_DO 错误 替换 + logger.WithFields(logrus.Fields{ + "err": result.Error, + "id": message.ID, + "from_id": message.FromUserId, + "to_id": message.ToUserId, + }).Errorf("send message failed when insert to database") + resp = &chat.ActionResponse{ - StatusCode: 400, - StatusMsg: "发生错误了", + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageRrror, } return } diff --git a/src/services/message/main.go b/src/services/message/main.go index d3ccf62..df06c4d 100644 --- a/src/services/message/main.go +++ b/src/services/message/main.go @@ -34,7 +34,7 @@ func main() { }() // Configure Pyroscope - profiling.InitPyroscope("GuGoTik.MessageService") + profiling.InitPyroscope("GuGoTik.ChatService") s := grpc.NewServer( grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), diff --git a/src/services/user/handler.go b/src/services/user/handler.go index 20d18a6..afcba97 100644 --- a/src/services/user/handler.go +++ b/src/services/user/handler.go @@ -8,6 +8,7 @@ import ( "GuGoTik/src/storage/cached" "GuGoTik/src/utils/logging" "context" + "github.com/sirupsen/logrus" ) @@ -25,6 +26,7 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ ok, err := cached.ScanGet(ctx, "UserInfo", &userModel) if err != nil { + resp = &user.UserResponse{ StatusCode: strings.AuthServiceInnerErrorCode, StatusMsg: strings.AuthServiceInnerError, diff --git a/src/web/main.go b/src/web/main.go index 803b7dd..10cfb7e 100644 --- a/src/web/main.go +++ b/src/web/main.go @@ -12,14 +12,14 @@ import ( message2 "GuGoTik/src/web/message" "GuGoTik/src/web/middleware" "context" - "fmt" + + "time" "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ginprometheus "github.com/zsais/go-gin-prometheus" "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" - "time" ) func main() { @@ -78,7 +78,6 @@ func main() { { message.GET("/chat", message2.ListMessageHandler) message.POST("/action", message2.ActionMessageHandler) - fmt.Println(message) } // Run Server diff --git a/src/web/message/handler.go b/src/web/message/handler.go index d6454f5..17dcffc 100644 --- a/src/web/message/handler.go +++ b/src/web/message/handler.go @@ -26,13 +26,16 @@ func ActionMessageHandler(c *gin.Context) { var req models.SMessageReq _, span := tracing.Tracer.Start(c.Request.Context(), "ActionMessageHandler") defer span.End() - logger := logging.LogService("GateWay.ActionMessage").WithContext(c.Request.Context()) + logger := logging.LogService("GateWay.ActionChat").WithContext(c.Request.Context()) if err := c.ShouldBindQuery(&req); err != nil { logger.WithFields(logrus.Fields{ //"CreateTime": req.Create_time, - "err": err, - }).Warnf("Error when trying to bind query") + "user_id": req.ActorId, + "from_id": req.UserId, + "err": err, + }).Errorf("Error when trying to bind query") + c.JSON(http.StatusOK, models.ActionCommentRes{ StatusCode: strings.GateWayParamsErrorCode, StatusMsg: strings.GateWayParamsError, @@ -45,7 +48,7 @@ func ActionMessageHandler(c *gin.Context) { res, err = Client.ChatAction(c.Request.Context(), &chat.ActionRequest{ ActorId: uint32(req.ActorId), - UserId: uint32(req.User_id), + UserId: uint32(req.UserId), ActionType: uint32(req.Action_type), Content: req.Content, }) @@ -54,10 +57,9 @@ func ActionMessageHandler(c *gin.Context) { logger.WithFields(logrus.Fields{ "actor_id": req.ActorId, "content": req.Content, - }).Warnf("Error when trying to connect with ActionMessageHandler") + }).Error("Error when trying to connect with ActionMessageHandler") - //这个位置返回状态是不是有问题? - c.JSON(http.StatusOK, res) + c.JSON(http.StatusBadRequest, res) return } logger.WithFields(logrus.Fields{ @@ -92,7 +94,7 @@ func ListMessageHandler(c *gin.Context) { logger.WithFields(logrus.Fields{ "actor_id": req.ActorId, "user_id": req.UserId, - }).Warnf("Error when trying to connect with ListMessageHandler") + }).Error("Error when trying to connect with ListMessageHandler") c.JSON(http.StatusOK, res) return } diff --git a/src/web/models/Message.go b/src/web/models/Message.go index 9a27c50..e49f6e9 100644 --- a/src/web/models/Message.go +++ b/src/web/models/Message.go @@ -6,9 +6,8 @@ import ( // 这个是发数据的数据结构 type SMessageReq struct { - Token string `form:"token" binding:"required"` ActorId int `form:"actor_id"` - User_id int `form:"user_id"` + UserId int `form:"user_id"` Content string `form:"content"` Action_type int `form:"action_type"` // send message //Create_time string //time maybe have some question @@ -18,15 +17,13 @@ type SMessageReq struct { // statuc code 状态码 0- 成功 其他值 -失败 // status_msg 返回状态描述 type SMessageRes struct { - Status_code int `json:"status_code"` - Status_msg string `json:"status_msg"` + StatusCode int `json:"status_code"` + Status_msg string `json:"status_msg"` } type ListMessageReq struct { - Token string `form:"token" binding:"required"` - ActorId uint32 `form:"actor_id"` - UserId uint32 `from:"user_id"` - + ActorId uint32 `form:"actor_id"` + UserId uint32 `from:"user_id"` PreMsgTime uint32 `from:"preMsgTime"` } diff --git a/test/rpc/messagerpc_test.go b/test/rpc/messagerpc_test.go index f6b8280..68c98b8 100644 --- a/test/rpc/messagerpc_test.go +++ b/test/rpc/messagerpc_test.go @@ -24,10 +24,10 @@ func setups() { func TestActionMessage_Add(t *testing.T) { setups() res, err := chatClient.ChatAction(context.Background(), &chat.ActionRequest{ - ActorId: 2, - UserId: 2, + ActorId: 3, + UserId: 1, ActionType: 1, - Content: "Test message13241234", + Content: "Test message1", }) assert.Empty(t, err) @@ -39,11 +39,11 @@ func TestChat(t *testing.T) { setups() res, err := chatClient.Chat(context.Background(), &chat.ChatRequest{ ActorId: 1, - UserId: 1, + UserId: 3, PreMsgTime: 0, }) assert.Empty(t, err) assert.Equal(t, int32(0), res.StatusCode) - + assert.Equal(t, 2, len(res.MessageList)) } diff --git a/test/web/message_test.go b/test/web/message_test.go index f682d46..16629c4 100644 --- a/test/web/message_test.go +++ b/test/web/message_test.go @@ -19,7 +19,7 @@ func TestActionMessage_Add(t *testing.T) { method := "POST" req, err := http.NewRequest(method, url, nil) q := req.URL.Query() - q.Add("token", "2f54104f-96d1-4ba3-94af-ee2e6b72dc5f") + q.Add("token", "1ae83f2a-7b82-4901-9e66-50d49dba00d5") q.Add("actor_id", "1") q.Add("user_id", "1") q.Add("action_type", "1") @@ -51,7 +51,7 @@ func TestChat(t *testing.T) { req, err := http.NewRequest(method, url, nil) q := req.URL.Query() - q.Add("token", "1206dfe1-5f2a-44fa-a121-ef27b0fe5f8d") + q.Add("token", "1ae83f2a-7b82-4901-9e66-50d49dba00d5") q.Add("actor_id", "1") q.Add("user_id", "1") q.Add("perMsgTime", "0") From 7c6187a1299440b2933a9edfb44cfa043e354924 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Mon, 21 Aug 2023 02:22:36 -0700 Subject: [PATCH 7/7] env and log --- src/constant/config/.env.example | 2 +- src/services/message/handler.go | 10 +++++----- src/web/message/handler.go | 18 +++++++++--------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/constant/config/.env.example b/src/constant/config/.env.example index 5549ff1..12fad85 100644 --- a/src/constant/config/.env.example +++ b/src/constant/config/.env.example @@ -1,7 +1,7 @@ # Configure Consul address, the default address is `localhost:8500` # TIPS: If you provide `CONSUL_ANONYMITY_NAME`, all services will register with `CONSUL_ANONYMITY_NAME` as prefix CONSUL_ADDR=localhost:8500 -CONSUL_ANONYMITY_NAME= +CONSUL_ANONYMITY_NAME=paraparty. # Configure logger level, support: DEBUG, INFO, WARN (WARNING), ERROR, FATAL LOGGER_LEVEL=INFO # Cofigure logger integrated with otel, support: enable, disable diff --git a/src/services/message/handler.go b/src/services/message/handler.go index e892f93..2da92fb 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -34,7 +34,7 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action logger := logging.LogService("ChatService.ActionMessage").WithContext(ctx) logger.WithFields(logrus.Fields{ - "actor_id": request.ActorId, + "ActorId": request.ActorId, "user_id": request.UserId, "action_type": request.ActionType, "content_text": request.Content, @@ -61,8 +61,8 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action res, err = addMessage(ctx, logger, span, request.ActorId, request.UserId, request.Content) if err != nil { logger.WithFields(logrus.Fields{ - "err": err, - "Actor_id": request.ActorId, + "err": err, + "ActorId": request.ActorId, }).Errorf("User service error") logging.SetSpanError(span, err) return res, err @@ -82,7 +82,7 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) logger := logging.LogService("ChatService.chat").WithContext(ctx) logger.WithFields(logrus.Fields{ "user_id": request.UserId, - "from_id": request.ActorId, + "ActorId": request.ActorId, }).Debugf("Process start") toUserId := request.UserId fromUserId := request.ActorId @@ -146,7 +146,7 @@ func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, from logger.WithFields(logrus.Fields{ "err": result.Error, "id": message.ID, - "from_id": message.FromUserId, + "ActorId": message.FromUserId, "to_id": message.ToUserId, }).Errorf("send message failed when insert to database") diff --git a/src/web/message/handler.go b/src/web/message/handler.go index 17dcffc..a3eaabf 100644 --- a/src/web/message/handler.go +++ b/src/web/message/handler.go @@ -31,7 +31,7 @@ func ActionMessageHandler(c *gin.Context) { if err := c.ShouldBindQuery(&req); err != nil { logger.WithFields(logrus.Fields{ //"CreateTime": req.Create_time, - "user_id": req.ActorId, + "ActorId": req.ActorId, "from_id": req.UserId, "err": err, }).Errorf("Error when trying to bind query") @@ -55,16 +55,16 @@ func ActionMessageHandler(c *gin.Context) { if err != nil { logger.WithFields(logrus.Fields{ - "actor_id": req.ActorId, - "content": req.Content, + "ActorId": req.ActorId, + "content": req.Content, }).Error("Error when trying to connect with ActionMessageHandler") c.JSON(http.StatusBadRequest, res) return } logger.WithFields(logrus.Fields{ - "actor_id": req.ActorId, - "content": req.Content, + "ActorId": req.ActorId, + "content": req.Content, }).Infof("Action send message success") c.JSON(http.StatusOK, res) @@ -92,16 +92,16 @@ func ListMessageHandler(c *gin.Context) { if err != nil { logger.WithFields(logrus.Fields{ - "actor_id": req.ActorId, - "user_id": req.UserId, + "ActorId": req.ActorId, + "user_id": req.UserId, }).Error("Error when trying to connect with ListMessageHandler") c.JSON(http.StatusOK, res) return } logger.WithFields(logrus.Fields{ - "actor_id": req.ActorId, - "user_id": req.UserId, + "ActorId": req.ActorId, + "user_id": req.UserId, }).Infof("List comment success") c.JSON(http.StatusOK, res)