Skip to content

Commit

Permalink
Merge branch 'GuGoOrg:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Maple-pro authored Aug 21, 2023
2 parents e55bc37 + c1c8e76 commit 1eeaf58
Show file tree
Hide file tree
Showing 13 changed files with 553 additions and 2 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/qodana.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,13 @@ jobs:
fetch-depth: 0
- name: 'Qodana Scan'
uses: JetBrains/[email protected]
with:
pr-mode: false
args: --apply-fixes
push-fixes: pull-request
upload-result: true
env:
QODANA_TOKEN: ${{ secrets.QODANA_TOKEN }}
QODANA_TOKEN: ${{ secrets.QODANA_TOKEN }}
- uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: ${{ runner.temp }}/qodana/results/qodana.sarif.json
3 changes: 3 additions & 0 deletions src/constant/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ const PublishRpcServerPort = ":37005"
const FavoriteRpcServerName = "GuGoTik-FavoriteService"
const FavoriteRpcServerPort = ":37006"

const MessageRpcServerName = "GuGoTik-MessageService"
const MessageRpcServerPort = ":37007"

const VideoPicker = "GuGoTik-VideoPicker"
5 changes: 5 additions & 0 deletions src/constant/strings/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ const (
ActorIDNotMatchError = "用户不匹配"
UnableToDeleteCommentErrorCode = 50008
UnableToDeleteCommentError = "无法删除视频评论"

UnableToAddMessageErrorCode = 50009
UnableToAddMessageRrror = "发送消息出错"
UnableToQueryMessageErrorCode = 50010
UnableToQueryMessageError = "查消息出错"
)

// Expected Error
Expand Down
25 changes: 25 additions & 0 deletions src/models/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package models

import (
"GuGoTik/src/storage/database"

"gorm.io/gorm"
)

type Message struct {
ID uint32 `gorm:"not null;primarykey;autoIncrement"`
ToUserId uint32 `gorm:"not null" `
FromUserId uint32 `gorm:"not null"`
ConversationId string `gorm:"not null" index:"conversationid"`
Content string `gorm:"not null"`

// Create_time time.Time `gorm:"not null"`
//Updatetime deleteTime
gorm.Model
}

func init() {
if err := database.Client.AutoMigrate(&Message{}); err != nil {
panic(err)
}
}
1 change: 1 addition & 0 deletions src/services/comment/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (c CommentServiceImpl) ListComment(ctx context.Context, request *comment.Li
userMap[pComment.UserId] = &user.User{}
}
getUserInfoError := false

wg := sync.WaitGroup{}
wg.Add(len(userMap))
for userId := range userMap {
Expand Down
166 changes: 166 additions & 0 deletions src/services/message/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package main

import (
"GuGoTik/src/constant/config"
"GuGoTik/src/constant/strings"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/rpc/chat"
"GuGoTik/src/rpc/user"
"GuGoTik/src/storage/database"
grpc2 "GuGoTik/src/utils/grpc"
"GuGoTik/src/utils/logging"
"context"
"fmt"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)

var UserClient user.UserServiceClient

type MessageServiceImpl struct {
chat.ChatServiceServer
}

func init() {
userRpcConn := grpc2.Connect(config.UserRpcServerName)
UserClient = user.NewUserServiceClient(userRpcConn)
}

func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.ActionRequest) (res *chat.ActionResponse, err error) {
ctx, span := tracing.Tracer.Start(ctx, "ChatActionService")
defer span.End()
logger := logging.LogService("ChatService.ActionMessage").WithContext(ctx)

logger.WithFields(logrus.Fields{
"ActorId": request.ActorId,
"user_id": request.UserId,
"action_type": request.ActionType,
"content_text": request.Content,
}).Debugf("Process start")

userResponse, err := UserClient.GetUserInfo(ctx, &user.UserRequest{
ActorId: request.ActorId,
UserId: request.UserId,
})

if err != nil || userResponse.StatusCode != strings.ServiceOKCode {
logger.WithFields(logrus.Fields{
"err": err,
"cctor_id": request.ActorId,
}).Errorf("User service error")
logging.SetSpanError(span, err)

return &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageRrror,
}, err
}

res, err = addMessage(ctx, logger, span, request.ActorId, request.UserId, request.Content)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
"ActorId": request.ActorId,
}).Errorf("User service error")
logging.SetSpanError(span, err)
return res, err
}

logger.WithFields(logrus.Fields{
"response": res,
}).Debugf("Process done.")

return res, err
}

// Chat(context.Context, *ChatRequest) (*ChatResponse, error)
func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) (resp *chat.ChatResponse, err error) {
ctx, span := tracing.Tracer.Start(ctx, "ChatService")
defer span.End()
logger := logging.LogService("ChatService.chat").WithContext(ctx)
logger.WithFields(logrus.Fields{
"user_id": request.UserId,
"ActorId": request.ActorId,
}).Debugf("Process start")
toUserId := request.UserId
fromUserId := request.ActorId

conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId)

if toUserId > fromUserId {
conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId)
}
//这个地方应该取出多少条消息?
//TO DO 看怎么需要一下

var rMessageList []*chat.Message
result := database.Client.WithContext(ctx).Where("conversation_id=?", conversationId).
Order("created_at desc").Find(&rMessageList)

if result.Error != nil {
logger.WithFields(logrus.Fields{
"err": result.Error,
}).Errorf("ChatServiceImpl list chat failed to response when listing message")
logging.SetSpanError(span, err)

resp = &chat.ChatResponse{
StatusCode: strings.UnableToQueryMessageErrorCode,
StatusMsg: strings.UnableToQueryMessageError,
}
return
}

resp = &chat.ChatResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
MessageList: rMessageList,
}

logger.WithFields(logrus.Fields{
"response": resp,
}).Debugf("Process done.")

return
}

func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, fromUserId uint32, toUserId uint32, Context string) (resp *chat.ActionResponse, err error) {
conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId)

if toUserId > fromUserId {
conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId)
}
message := models.Message{
ToUserId: toUserId,
FromUserId: fromUserId,
Content: Context,
ConversationId: conversationId,
}

//TO_DO 后面写mq?
result := database.Client.WithContext(ctx).Create(&message)

if result.Error != nil {
//TO_DO 错误 替换
logger.WithFields(logrus.Fields{
"err": result.Error,
"id": message.ID,
"ActorId": message.FromUserId,
"to_id": message.ToUserId,
}).Errorf("send message failed when insert to database")

resp = &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageRrror,
}
return
}

resp = &chat.ActionResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
}
return

}
65 changes: 65 additions & 0 deletions src/services/message/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"GuGoTik/src/constant/config"
"GuGoTik/src/extra/profiling"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/rpc/chat"
"GuGoTik/src/rpc/health"
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"context"
"net"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)

func main() {
tp, err := tracing.SetTraceProvider(config.MessageRpcServerName)

if err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Panicf("Error to set the trace")
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error to set the trace")
}
}()

// Configure Pyroscope
profiling.InitPyroscope("GuGoTik.ChatService")

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)

log := logging.LogService(config.MessageRpcServerName)

lis, err := net.Listen("tcp", config.MessageRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.MessageRpcServerName, err)
}

var srv MessageServiceImpl
var probe healthImpl.ProbeImpl

chat.RegisterChatServiceServer(s, srv)

health.RegisterHealthServer(s, &probe)

if err := consul.RegisterConsul(config.MessageRpcServerName, config.MessageRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.MessageRpcServerName, err)
}
log.Infof("Rpc %s is running at %s now", config.MessageRpcServerName, config.MessageRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err)
}
}
2 changes: 2 additions & 0 deletions src/services/user/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"GuGoTik/src/storage/cached"
"GuGoTik/src/utils/logging"
"context"

"github.com/sirupsen/logrus"
)

Expand All @@ -25,6 +26,7 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ
ok, err := cached.ScanGet(ctx, "UserInfo", &userModel)

if err != nil {

resp = &user.UserResponse{
StatusCode: strings.AuthServiceInnerErrorCode,
StatusMsg: strings.AuthServiceInnerError,
Expand Down
12 changes: 11 additions & 1 deletion src/web/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"GuGoTik/src/web/auth"
comment2 "GuGoTik/src/web/comment"
feed2 "GuGoTik/src/web/feed"
message2 "GuGoTik/src/web/message"
"GuGoTik/src/web/middleware"
"context"

"time"

"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
ginprometheus "github.com/zsais/go-gin-prometheus"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"time"
)

func main() {
Expand Down Expand Up @@ -70,6 +73,13 @@ func main() {
comment.GET("/list", comment2.ListCommentHandler)
comment.GET("/count", comment2.CountCommentHandler)
}
//todo
message := rootPath.Group("/message")
{
message.GET("/chat", message2.ListMessageHandler)
message.POST("/action", message2.ActionMessageHandler)
}

// Run Server
if err := g.Run(config.WebServiceAddr); err != nil {
panic("Can not run GuGoTik Gateway, binding port: " + config.WebServiceAddr)
Expand Down
Loading

0 comments on commit 1eeaf58

Please sign in to comment.