Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev 还没加上mq和chatgpt 以及es #95

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/constant/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ const PublishRpcServerPort = ":37005"
const FavoriteRpcServerName = "GuGoTik-FavoriteService"
const FavoriteRpcServerPort = ":37006"

const MessageRpcServerName = "GuGoTik-MessageService"
const MessageRpcServerPort = ":37007"

const VideoPicker = "GuGoTik-VideoPicker"
5 changes: 5 additions & 0 deletions src/constant/strings/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ const (
ActorIDNotMatchError = "用户不匹配"
UnableToDeleteCommentErrorCode = 50008
UnableToDeleteCommentError = "无法删除视频评论"

UnableToAddMessageErrorCode = 50009
UnableToAddMessageRrror = "发送消息出错"
UnableToQueryMessageErrorCode = 50010
UnableToQueryMessageError = "查消息出错"
)

// Expected Error
Expand Down
25 changes: 25 additions & 0 deletions src/models/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package models

import (
"GuGoTik/src/storage/database"

"gorm.io/gorm"
)

type Message struct {
ID uint32 `gorm:"not null;primarykey;autoIncrement"`
ToUserId uint32 `gorm:"not null" `
FromUserId uint32 `gorm:"not null"`
ConversationId string `gorm:"not null" index:"conversationid"`
Content string `gorm:"not null"`

// Create_time time.Time `gorm:"not null"`
//Updatetime deleteTime
gorm.Model
}

func init() {
if err := database.Client.AutoMigrate(&Message{}); err != nil {
panic(err)
}
}
1 change: 1 addition & 0 deletions src/services/comment/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (c CommentServiceImpl) ListComment(ctx context.Context, request *comment.Li
userMap[pComment.UserId] = &user.User{}
}
getUserInfoError := false

wg := sync.WaitGroup{}
wg.Add(len(userMap))
for userId := range userMap {
Expand Down
166 changes: 166 additions & 0 deletions src/services/message/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package main

import (
"GuGoTik/src/constant/config"
"GuGoTik/src/constant/strings"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/rpc/chat"
"GuGoTik/src/rpc/user"
"GuGoTik/src/storage/database"
grpc2 "GuGoTik/src/utils/grpc"
"GuGoTik/src/utils/logging"
"context"
"fmt"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)

var UserClient user.UserServiceClient

type MessageServiceImpl struct {
chat.ChatServiceServer
}

func init() {
userRpcConn := grpc2.Connect(config.UserRpcServerName)
UserClient = user.NewUserServiceClient(userRpcConn)
}

func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.ActionRequest) (res *chat.ActionResponse, err error) {
ctx, span := tracing.Tracer.Start(ctx, "ChatActionService")
defer span.End()
logger := logging.LogService("ChatService.ActionMessage").WithContext(ctx)

logger.WithFields(logrus.Fields{
"ActorId": request.ActorId,
"user_id": request.UserId,
"action_type": request.ActionType,
"content_text": request.Content,
}).Debugf("Process start")

userResponse, err := UserClient.GetUserInfo(ctx, &user.UserRequest{
ActorId: request.ActorId,
UserId: request.UserId,
})

if err != nil || userResponse.StatusCode != strings.ServiceOKCode {
logger.WithFields(logrus.Fields{
"err": err,
"cctor_id": request.ActorId,
}).Errorf("User service error")
logging.SetSpanError(span, err)

return &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageRrror,
}, err
}

res, err = addMessage(ctx, logger, span, request.ActorId, request.UserId, request.Content)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
"ActorId": request.ActorId,
}).Errorf("User service error")
logging.SetSpanError(span, err)
return res, err
}

logger.WithFields(logrus.Fields{
"response": res,
}).Debugf("Process done.")

return res, err
}

// Chat(context.Context, *ChatRequest) (*ChatResponse, error)
func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) (resp *chat.ChatResponse, err error) {
ctx, span := tracing.Tracer.Start(ctx, "ChatService")
defer span.End()
logger := logging.LogService("ChatService.chat").WithContext(ctx)
logger.WithFields(logrus.Fields{
"user_id": request.UserId,
"ActorId": request.ActorId,
}).Debugf("Process start")
toUserId := request.UserId
fromUserId := request.ActorId

conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId)

if toUserId > fromUserId {
conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId)
}
//这个地方应该取出多少条消息?
//TO DO 看怎么需要一下

var rMessageList []*chat.Message
result := database.Client.WithContext(ctx).Where("conversation_id=?", conversationId).
Order("created_at desc").Find(&rMessageList)

if result.Error != nil {
logger.WithFields(logrus.Fields{
"err": result.Error,
}).Errorf("ChatServiceImpl list chat failed to response when listing message")
logging.SetSpanError(span, err)

resp = &chat.ChatResponse{
StatusCode: strings.UnableToQueryMessageErrorCode,
StatusMsg: strings.UnableToQueryMessageError,
}
return
}

resp = &chat.ChatResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
MessageList: rMessageList,
}

logger.WithFields(logrus.Fields{
"response": resp,
}).Debugf("Process done.")

return
}

func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, fromUserId uint32, toUserId uint32, Context string) (resp *chat.ActionResponse, err error) {
conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId)

if toUserId > fromUserId {
conversationId = fmt.Sprintf("%d_%d", fromUserId, toUserId)
}
message := models.Message{
ToUserId: toUserId,
FromUserId: fromUserId,
Content: Context,
ConversationId: conversationId,
}

//TO_DO 后面写mq?
result := database.Client.WithContext(ctx).Create(&message)

if result.Error != nil {
//TO_DO 错误 替换
logger.WithFields(logrus.Fields{
"err": result.Error,
"id": message.ID,
"ActorId": message.FromUserId,
"to_id": message.ToUserId,
}).Errorf("send message failed when insert to database")

resp = &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageRrror,
}
return
}

resp = &chat.ActionResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
}
return

}
65 changes: 65 additions & 0 deletions src/services/message/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"GuGoTik/src/constant/config"
"GuGoTik/src/extra/profiling"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/rpc/chat"
"GuGoTik/src/rpc/health"
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"context"
"net"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)

func main() {
tp, err := tracing.SetTraceProvider(config.MessageRpcServerName)

if err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Panicf("Error to set the trace")
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error to set the trace")
}
}()

// Configure Pyroscope
profiling.InitPyroscope("GuGoTik.ChatService")

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)

log := logging.LogService(config.MessageRpcServerName)

lis, err := net.Listen("tcp", config.MessageRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.MessageRpcServerName, err)
}

var srv MessageServiceImpl
var probe healthImpl.ProbeImpl

chat.RegisterChatServiceServer(s, srv)

health.RegisterHealthServer(s, &probe)

if err := consul.RegisterConsul(config.MessageRpcServerName, config.MessageRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.MessageRpcServerName, err)
}
log.Infof("Rpc %s is running at %s now", config.MessageRpcServerName, config.MessageRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err)
}
}
2 changes: 2 additions & 0 deletions src/services/user/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"GuGoTik/src/storage/cached"
"GuGoTik/src/utils/logging"
"context"

"github.com/sirupsen/logrus"
)

Expand All @@ -25,6 +26,7 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ
ok, err := cached.ScanGet(ctx, "UserInfo", &userModel)

if err != nil {

resp = &user.UserResponse{
StatusCode: strings.AuthServiceInnerErrorCode,
StatusMsg: strings.AuthServiceInnerError,
Expand Down
12 changes: 11 additions & 1 deletion src/web/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"GuGoTik/src/web/auth"
comment2 "GuGoTik/src/web/comment"
feed2 "GuGoTik/src/web/feed"
message2 "GuGoTik/src/web/message"
"GuGoTik/src/web/middleware"
"context"

"time"

"github.com/gin-contrib/gzip"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
ginprometheus "github.com/zsais/go-gin-prometheus"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"time"
)

func main() {
Expand Down Expand Up @@ -70,6 +73,13 @@ func main() {
comment.GET("/list", comment2.ListCommentHandler)
comment.GET("/count", comment2.CountCommentHandler)
}
//todo
message := rootPath.Group("/message")
{
message.GET("/chat", message2.ListMessageHandler)
message.POST("/action", message2.ActionMessageHandler)
}

// Run Server
if err := g.Run(config.WebServiceAddr); err != nil {
panic("Can not run GuGoTik Gateway, binding port: " + config.WebServiceAddr)
Expand Down
Loading
Loading