diff --git a/.github/workflows/qodana.yml b/.github/workflows/qodana.yml index acb25be..41b3945 100644 --- a/.github/workflows/qodana.yml +++ b/.github/workflows/qodana.yml @@ -14,5 +14,13 @@ jobs: fetch-depth: 0 - name: 'Qodana Scan' uses: JetBrains/qodana-action@v2023.2 + with: + pr-mode: false + args: --apply-fixes + push-fixes: pull-request + upload-result: true env: - QODANA_TOKEN: ${{ secrets.QODANA_TOKEN }} \ No newline at end of file + QODANA_TOKEN: ${{ secrets.QODANA_TOKEN }} + - uses: github/codeql-action/upload-sarif@v2 + with: + sarif_file: ${{ runner.temp }}/qodana/results/qodana.sarif.json diff --git a/src/constant/config/service.go b/src/constant/config/service.go index 0180fc0..ea178b1 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -21,4 +21,7 @@ const PublishRpcServerPort = ":37005" const FavoriteRpcServerName = "GuGoTik-FavoriteService" const FavoriteRpcServerPort = ":37006" +const MessageRpcServerName = "GuGoTik-MessageService" +const MessageRpcServerPort = ":37007" + const VideoPicker = "GuGoTik-VideoPicker" diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index 443dfc3..a96df19 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -26,6 +26,11 @@ const ( ActorIDNotMatchError = "用户不匹配" UnableToDeleteCommentErrorCode = 50008 UnableToDeleteCommentError = "无法删除视频评论" + + UnableToAddMessageErrorCode = 50009 + UnableToAddMessageRrror = "发送消息出错" + UnableToQueryMessageErrorCode = 50010 + UnableToQueryMessageError = "查消息出错" ) // Expected Error diff --git a/src/models/message.go b/src/models/message.go new file mode 100644 index 0000000..2add457 --- /dev/null +++ b/src/models/message.go @@ -0,0 +1,25 @@ +package models + +import ( + "GuGoTik/src/storage/database" + + "gorm.io/gorm" +) + +type Message struct { + ID uint32 `gorm:"not null;primarykey;autoIncrement"` + ToUserId uint32 `gorm:"not null" ` + FromUserId uint32 `gorm:"not null"` + ConversationId string `gorm:"not null" index:"conversationid"` + Content string `gorm:"not null"` + + // Create_time time.Time `gorm:"not null"` + //Updatetime deleteTime + gorm.Model +} + +func init() { + if err := database.Client.AutoMigrate(&Message{}); err != nil { + panic(err) + } +} diff --git a/src/services/comment/handler.go b/src/services/comment/handler.go index 342c67f..4c7461e 100644 --- a/src/services/comment/handler.go +++ b/src/services/comment/handler.go @@ -190,6 +190,7 @@ func (c CommentServiceImpl) ListComment(ctx context.Context, request *comment.Li userMap[pComment.UserId] = &user.User{} } getUserInfoError := false + wg := sync.WaitGroup{} wg.Add(len(userMap)) for userId := range userMap { diff --git a/src/services/message/handler.go b/src/services/message/handler.go new file mode 100644 index 0000000..2da92fb --- /dev/null +++ b/src/services/message/handler.go @@ -0,0 +1,166 @@ +package main + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/models" + "GuGoTik/src/rpc/chat" + "GuGoTik/src/rpc/user" + "GuGoTik/src/storage/database" + grpc2 "GuGoTik/src/utils/grpc" + "GuGoTik/src/utils/logging" + "context" + "fmt" + + "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/trace" +) + +var UserClient user.UserServiceClient + +type MessageServiceImpl struct { + chat.ChatServiceServer +} + +func init() { + userRpcConn := grpc2.Connect(config.UserRpcServerName) + UserClient = user.NewUserServiceClient(userRpcConn) +} + +func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.ActionRequest) (res *chat.ActionResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "ChatActionService") + defer span.End() + logger := logging.LogService("ChatService.ActionMessage").WithContext(ctx) + + logger.WithFields(logrus.Fields{ + "ActorId": request.ActorId, + "user_id": request.UserId, + "action_type": request.ActionType, + "content_text": request.Content, + }).Debugf("Process start") + + userResponse, err := UserClient.GetUserInfo(ctx, &user.UserRequest{ + ActorId: request.ActorId, + UserId: request.UserId, + }) + + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "cctor_id": request.ActorId, + }).Errorf("User service error") + logging.SetSpanError(span, err) + + return &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageRrror, + }, err + } + + res, err = addMessage(ctx, logger, span, request.ActorId, request.UserId, request.Content) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + }).Errorf("User service error") + logging.SetSpanError(span, err) + return res, err + } + + logger.WithFields(logrus.Fields{ + "response": res, + }).Debugf("Process done.") + + return res, err +} + +// Chat(context.Context, *ChatRequest) (*ChatResponse, error) +func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) (resp *chat.ChatResponse, err error) { + ctx, span := tracing.Tracer.Start(ctx, "ChatService") + defer span.End() + logger := logging.LogService("ChatService.chat").WithContext(ctx) + logger.WithFields(logrus.Fields{ + "user_id": request.UserId, + "ActorId": request.ActorId, + }).Debugf("Process start") + toUserId := request.UserId + fromUserId := request.ActorId + + conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId) + + if toUserId > fromUserId { + conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId) + } + //这个地方应该取出多少条消息? + //TO DO 看怎么需要一下 + + var rMessageList []*chat.Message + result := database.Client.WithContext(ctx).Where("conversation_id=?", conversationId). + Order("created_at desc").Find(&rMessageList) + + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "err": result.Error, + }).Errorf("ChatServiceImpl list chat failed to response when listing message") + logging.SetSpanError(span, err) + + resp = &chat.ChatResponse{ + StatusCode: strings.UnableToQueryMessageErrorCode, + StatusMsg: strings.UnableToQueryMessageError, + } + return + } + + resp = &chat.ChatResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + MessageList: rMessageList, + } + + logger.WithFields(logrus.Fields{ + "response": resp, + }).Debugf("Process done.") + + return +} + +func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, fromUserId uint32, toUserId uint32, Context string) (resp *chat.ActionResponse, err error) { + conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId) + + if toUserId > fromUserId { + conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId) + } + message := models.Message{ + ToUserId: toUserId, + FromUserId: fromUserId, + Content: Context, + ConversationId: conversationId, + } + + //TO_DO 后面写mq? + result := database.Client.WithContext(ctx).Create(&message) + + if result.Error != nil { + //TO_DO 错误 替换 + logger.WithFields(logrus.Fields{ + "err": result.Error, + "id": message.ID, + "ActorId": message.FromUserId, + "to_id": message.ToUserId, + }).Errorf("send message failed when insert to database") + + resp = &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageRrror, + } + return + } + + resp = &chat.ActionResponse{ + StatusCode: strings.ServiceOKCode, + StatusMsg: strings.ServiceOK, + } + return + +} diff --git a/src/services/message/main.go b/src/services/message/main.go new file mode 100644 index 0000000..df06c4d --- /dev/null +++ b/src/services/message/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/extra/profiling" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/rpc/chat" + "GuGoTik/src/rpc/health" + healthImpl "GuGoTik/src/services/health" + "GuGoTik/src/utils/consul" + "GuGoTik/src/utils/logging" + "context" + "net" + + "github.com/sirupsen/logrus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" +) + +func main() { + tp, err := tracing.SetTraceProvider(config.MessageRpcServerName) + + if err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Panicf("Error to set the trace") + } + defer func() { + if err := tp.Shutdown(context.Background()); err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error to set the trace") + } + }() + + // Configure Pyroscope + profiling.InitPyroscope("GuGoTik.ChatService") + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + ) + + log := logging.LogService(config.MessageRpcServerName) + + lis, err := net.Listen("tcp", config.MessageRpcServerPort) + + if err != nil { + log.Panicf("Rpc %s listen happens error: %v", config.MessageRpcServerName, err) + } + + var srv MessageServiceImpl + var probe healthImpl.ProbeImpl + + chat.RegisterChatServiceServer(s, srv) + + health.RegisterHealthServer(s, &probe) + + if err := consul.RegisterConsul(config.MessageRpcServerName, config.MessageRpcServerPort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.MessageRpcServerName, err) + } + log.Infof("Rpc %s is running at %s now", config.MessageRpcServerName, config.MessageRpcServerPort) + if err := s.Serve(lis); err != nil { + log.Panicf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err) + } +} diff --git a/src/services/user/handler.go b/src/services/user/handler.go index 20d18a6..afcba97 100644 --- a/src/services/user/handler.go +++ b/src/services/user/handler.go @@ -8,6 +8,7 @@ import ( "GuGoTik/src/storage/cached" "GuGoTik/src/utils/logging" "context" + "github.com/sirupsen/logrus" ) @@ -25,6 +26,7 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ ok, err := cached.ScanGet(ctx, "UserInfo", &userModel) if err != nil { + resp = &user.UserResponse{ StatusCode: strings.AuthServiceInnerErrorCode, StatusMsg: strings.AuthServiceInnerError, diff --git a/src/web/main.go b/src/web/main.go index 4849da7..10cfb7e 100644 --- a/src/web/main.go +++ b/src/web/main.go @@ -9,14 +9,17 @@ import ( "GuGoTik/src/web/auth" comment2 "GuGoTik/src/web/comment" feed2 "GuGoTik/src/web/feed" + message2 "GuGoTik/src/web/message" "GuGoTik/src/web/middleware" "context" + + "time" + "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ginprometheus "github.com/zsais/go-gin-prometheus" "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" - "time" ) func main() { @@ -70,6 +73,13 @@ func main() { comment.GET("/list", comment2.ListCommentHandler) comment.GET("/count", comment2.CountCommentHandler) } + //todo + message := rootPath.Group("/message") + { + message.GET("/chat", message2.ListMessageHandler) + message.POST("/action", message2.ActionMessageHandler) + } + // Run Server if err := g.Run(config.WebServiceAddr); err != nil { panic("Can not run GuGoTik Gateway, binding port: " + config.WebServiceAddr) diff --git a/src/web/message/handler.go b/src/web/message/handler.go new file mode 100644 index 0000000..a3eaabf --- /dev/null +++ b/src/web/message/handler.go @@ -0,0 +1,108 @@ +package message + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" + "GuGoTik/src/rpc/chat" + grpc2 "GuGoTik/src/utils/grpc" + "GuGoTik/src/utils/logging" + "GuGoTik/src/web/models" + + "net/http" + + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" +) + +var Client chat.ChatServiceClient + +func init() { + conn := grpc2.Connect(config.MessageRpcServerName) + Client = chat.NewChatServiceClient(conn) +} + +func ActionMessageHandler(c *gin.Context) { + var req models.SMessageReq + _, span := tracing.Tracer.Start(c.Request.Context(), "ActionMessageHandler") + defer span.End() + logger := logging.LogService("GateWay.ActionChat").WithContext(c.Request.Context()) + + if err := c.ShouldBindQuery(&req); err != nil { + logger.WithFields(logrus.Fields{ + //"CreateTime": req.Create_time, + "ActorId": req.ActorId, + "from_id": req.UserId, + "err": err, + }).Errorf("Error when trying to bind query") + + c.JSON(http.StatusOK, models.ActionCommentRes{ + StatusCode: strings.GateWayParamsErrorCode, + StatusMsg: strings.GateWayParamsError, + }) + return + } + + var res *chat.ActionResponse + var err error + + res, err = Client.ChatAction(c.Request.Context(), &chat.ActionRequest{ + ActorId: uint32(req.ActorId), + UserId: uint32(req.UserId), + ActionType: uint32(req.Action_type), + Content: req.Content, + }) + + if err != nil { + logger.WithFields(logrus.Fields{ + "ActorId": req.ActorId, + "content": req.Content, + }).Error("Error when trying to connect with ActionMessageHandler") + + c.JSON(http.StatusBadRequest, res) + return + } + logger.WithFields(logrus.Fields{ + "ActorId": req.ActorId, + "content": req.Content, + }).Infof("Action send message success") + + c.JSON(http.StatusOK, res) +} + +func ListMessageHandler(c *gin.Context) { + var req models.ListMessageReq + _, span := tracing.Tracer.Start(c.Request.Context(), "ListMessageHandler") + defer span.End() + logger := logging.LogService("GateWay.ListMessage").WithContext(c.Request.Context()) + + if err := c.ShouldBindQuery(&req); err != nil { + c.JSON(http.StatusOK, models.ListCommentRes{ + StatusCode: strings.GateWayParamsErrorCode, + StatusMsg: strings.GateWayParamsError, + }) + return + } + + res, err := Client.Chat(c.Request.Context(), &chat.ChatRequest{ + ActorId: req.ActorId, + UserId: req.UserId, + PreMsgTime: req.PreMsgTime, + }) + + if err != nil { + logger.WithFields(logrus.Fields{ + "ActorId": req.ActorId, + "user_id": req.UserId, + }).Error("Error when trying to connect with ListMessageHandler") + c.JSON(http.StatusOK, res) + return + } + + logger.WithFields(logrus.Fields{ + "ActorId": req.ActorId, + "user_id": req.UserId, + }).Infof("List comment success") + + c.JSON(http.StatusOK, res) +} diff --git a/src/web/models/Message.go b/src/web/models/Message.go new file mode 100644 index 0000000..e49f6e9 --- /dev/null +++ b/src/web/models/Message.go @@ -0,0 +1,34 @@ +package models + +import ( + "GuGoTik/src/rpc/chat" +) + +// 这个是发数据的数据结构 +type SMessageReq struct { + ActorId int `form:"actor_id"` + UserId int `form:"user_id"` + Content string `form:"content"` + Action_type int `form:"action_type"` // send message + //Create_time string //time maybe have some question +} + +// 收的状态 +// statuc code 状态码 0- 成功 其他值 -失败 +// status_msg 返回状态描述 +type SMessageRes struct { + StatusCode int `json:"status_code"` + Status_msg string `json:"status_msg"` +} + +type ListMessageReq struct { + ActorId uint32 `form:"actor_id"` + UserId uint32 `from:"user_id"` + PreMsgTime uint32 `from:"preMsgTime"` +} + +type ListMessageRes struct { + StatusCode int `json:"status_code"` + StatusMsg string `json:"status_msg"` + MessageList []*chat.Message `json:"message_list"` +} diff --git a/test/rpc/messagerpc_test.go b/test/rpc/messagerpc_test.go new file mode 100644 index 0000000..68c98b8 --- /dev/null +++ b/test/rpc/messagerpc_test.go @@ -0,0 +1,49 @@ +package rpc + +import ( + "GuGoTik/src/constant/config" + "GuGoTik/src/rpc/chat" + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var chatClient chat.ChatServiceClient + +func setups() { + conn, _ := grpc.Dial(fmt.Sprintf("127.0.0.1%s", config.MessageRpcServerPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)) + chatClient = chat.NewChatServiceClient(conn) +} + +func TestActionMessage_Add(t *testing.T) { + setups() + res, err := chatClient.ChatAction(context.Background(), &chat.ActionRequest{ + ActorId: 3, + UserId: 1, + ActionType: 1, + Content: "Test message1", + }) + + assert.Empty(t, err) + assert.Equal(t, int32(0), res.StatusCode) + +} + +func TestChat(t *testing.T) { + setups() + res, err := chatClient.Chat(context.Background(), &chat.ChatRequest{ + ActorId: 1, + UserId: 3, + PreMsgTime: 0, + }) + + assert.Empty(t, err) + assert.Equal(t, int32(0), res.StatusCode) + assert.Equal(t, 2, len(res.MessageList)) +} diff --git a/test/web/message_test.go b/test/web/message_test.go new file mode 100644 index 0000000..16629c4 --- /dev/null +++ b/test/web/message_test.go @@ -0,0 +1,75 @@ +package web + +import ( + "GuGoTik/src/web/models" + "encoding/json" + "fmt" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestActionMessage_Add(t *testing.T) { + + var client = &http.Client{} + var baseUrl = "http://127.0.0.1:37000/douyin/message" + url := baseUrl + "/action" + method := "POST" + req, err := http.NewRequest(method, url, nil) + q := req.URL.Query() + q.Add("token", "1ae83f2a-7b82-4901-9e66-50d49dba00d5") + q.Add("actor_id", "1") + q.Add("user_id", "1") + q.Add("action_type", "1") + q.Add("content", "test comment") + req.URL.RawQuery = q.Encode() + + assert.Empty(t, err) + + res, err := client.Do(req) + assert.Empty(t, err) + defer func(Body io.ReadCloser) { + err := Body.Close() + assert.Empty(t, err) + }(res.Body) + + body, err := io.ReadAll(res.Body) + assert.Empty(t, err) + message := &models.ListMessageRes{} + err = json.Unmarshal(body, &message) + assert.Empty(t, err) + assert.Equal(t, 0, message.StatusCode) +} + +func TestChat(t *testing.T) { + var client = &http.Client{} + var baseUrl = "http://127.0.0.1:37000/douyin/message" + url := baseUrl + "/chat" + method := "GET" + req, err := http.NewRequest(method, url, nil) + + q := req.URL.Query() + q.Add("token", "1ae83f2a-7b82-4901-9e66-50d49dba00d5") + q.Add("actor_id", "1") + q.Add("user_id", "1") + q.Add("perMsgTime", "0") + req.URL.RawQuery = q.Encode() + assert.Empty(t, err) + + res, err := client.Do(req) + assert.Empty(t, err) + defer func(Body io.ReadCloser) { + err := Body.Close() + assert.Empty(t, err) + }(res.Body) + + body, err := io.ReadAll(res.Body) + assert.Empty(t, err) + listMessage := &models.ListMessageRes{} + fmt.Println(listMessage) + err = json.Unmarshal(body, &listMessage) + assert.Empty(t, err) + assert.Equal(t, 0, listMessage.StatusCode) +}