From 5b80e91c391693a2d96b5469dfa4398933d93a30 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Thu, 31 Aug 2023 01:10:08 +0800 Subject: [PATCH 01/19] . --- src/constant/strings/service.go | 3 +- src/services/message/handler.go | 151 +++++++++++++++++++------------ src/services/msgconsumer/main.go | 50 +++++++++- 3 files changed, 142 insertions(+), 62 deletions(-) diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index f2a055f..023122b 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -13,5 +13,6 @@ const ( VideoCommentEvent = "video.comment.action" VideoPublishEvent = "video.publish.action" - MessageActionEvent = "message.send" + MessageActionEvent = "message.send" + MessageGptActionEvent = "message.gpt.send" ) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 70090ad..240c8b5 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -12,7 +12,6 @@ import ( "GuGoTik/src/rpc/user" "GuGoTik/src/storage/database" "GuGoTik/src/storage/redis" - grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/ptr" "GuGoTik/src/utils/rabbitmq" @@ -23,18 +22,17 @@ import ( "time" "github.com/go-redis/redis_rate/v10" - "github.com/robfig/cron/v3" "github.com/streadway/amqp" "gorm.io/gorm" "github.com/sirupsen/logrus" ) -var userClient user.UserServiceClient +/* var userClient user.UserServiceClient var recommendClient recommend.RecommendServiceClient var relationClient relation.RelationServiceClient var feedClient feed.FeedServiceClient -var chatClient chat.ChatServiceClient +var chatClient chat.ChatServiceClient */ type MessageServiceImpl struct { chat.ChatServiceServer @@ -87,34 +85,66 @@ func (c MessageServiceImpl) New() { failOnError(err, "Failed to define queue") } - userRpcConn := grpc2.Connect(config.UserRpcServerName) + _, err = channel.QueueDeclare( + strings.MessageGptActionEvent, + false, false, false, false, + nil, + ) + + if err != nil { + failOnError(err, "Failed to define queue") + } + + channel.QueueBind( + strings.MessageActionEvent, + strings.MessageActionEvent, + strings.MessageExchange, + false, + nil, + ) + if err != nil { + failOnError(err, "Failed to bind queue to exchange") + } - userClient = user.NewUserServiceClient(userRpcConn) + channel.QueueBind( + strings.MessageGptActionEvent, + strings.MessageGptActionEvent, + strings.MessageExchange, + false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue to exchange") + } - recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) - recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn) + /* userRpcConn := grpc2.Connect(config.UserRpcServerName) - relationRpcConn := grpc2.Connect(config.RelationRpcServerName) - relationClient = relation.NewRelationServiceClient(relationRpcConn) + userClient = user.NewUserServiceClient(userRpcConn) - feedRpcConn := grpc2.Connect(config.FeedRpcServerName) - feedClient = feed.NewFeedServiceClient(feedRpcConn) + recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) + recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn) - chatRpcConn := grpc2.Connect(config.MessageRpcServerName) - chatClient = chat.NewChatServiceClient(chatRpcConn) + relationRpcConn := grpc2.Connect(config.RelationRpcServerName) + relationClient = relation.NewRelationServiceClient(relationRpcConn) - cronRunner := cron.New(cron.WithSeconds()) + feedRpcConn := grpc2.Connect(config.FeedRpcServerName) + feedClient = feed.NewFeedServiceClient(feedRpcConn) - //_, err = cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00 - _, err = cronRunner.AddFunc("@every 5m", sendMagicMessage) // execute every minute [for test] + chatRpcConn := grpc2.Connect(config.MessageRpcServerName) + chatClient = chat.NewChatServiceClient(chatRpcConn) - if err != nil { - logging.Logger.WithFields(logrus.Fields{ - "err": err, - }).Errorf("Cannot start SendMagicMessage cron job") - } + cronRunner := cron.New(cron.WithSeconds()) - cronRunner.Start() + //_, err = cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00 + _, err = cronRunner.AddFunc("@every 5m", sendMagicMessage) // execute every minute [for test] + + if err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Cannot start SendMagicMessage cron job") + } + + cronRunner.Start() */ } @@ -183,25 +213,25 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action return } - userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - "action_type": request.ActionType, - "content_text": request.Content, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - return &chat.ActionResponse{ - StatusCode: strings.UnableToAddMessageErrorCode, - StatusMsg: strings.UnableToAddMessageError, - }, err - } + /* userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ + UserId: request.UserId, + }) + + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + "user_id": request.UserId, + "action_type": request.ActionType, + "content_text": request.Content, + }).Errorf("User service error") + logging.SetSpanError(span, err) + + return &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageError, + }, err + } */ res, err = addMessage(ctx, request.ActorId, request.UserId, request.Content) if err != nil { @@ -233,25 +263,25 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) "ActorId": request.ActorId, "pre_msg_time": request.PreMsgTime, }).Debugf("Process start") + /* + userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ + UserId: request.UserId, + }) - userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - }).Errorf("User service error") - logging.SetSpanError(span, err) + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + "user_id": request.UserId, + }).Errorf("User service error") + logging.SetSpanError(span, err) - resp = &chat.ChatResponse{ - StatusCode: strings.UnableToQueryMessageErrorCode, - StatusMsg: strings.UnableToQueryMessageError, - } - return - } + resp = &chat.ChatResponse{ + StatusCode: strings.UnableToQueryMessageErrorCode, + StatusMsg: strings.UnableToQueryMessageError, + } + return + } */ toUserId := request.UserId fromUserId := request.ActorId @@ -385,7 +415,7 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context } -func sendMagicMessage() { +/* func sendMagicMessage() { ctx, span := tracing.Tracer.Start(context.Background(), "SendMagicMessageService") defer span.End() logging.SetSpanWithHostname(span) @@ -476,3 +506,4 @@ func sendMagicMessage() { }).Infof("Successfully send the magic message") } } +*/ diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 184ed14..27ae21d 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" "github.com/streadway/amqp" + "go-micro.dev/v4/logger" ) func failOnError(err error, msg string) { @@ -49,7 +50,7 @@ func main() { nil, ) if err != nil { - failOnError(err, "Failed to define queue") + failOnError(err, "Failed to Consume") } var foreever chan struct{} @@ -66,6 +67,7 @@ func main() { "err": err, }).Errorf("Error when unmarshaling the prepare json body.") return + //todo } /* ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) @@ -108,6 +110,52 @@ func main() { } }() + go ss(channel) + <-foreever } + +func ss(channel *amqp.Channel) { + gptmsg, err := channel.Consume( + strings.MessageGptActionEvent, + "", + false, false, false, false, + nil, + ) + if err != nil { + failOnError(err, "Failed to Consume") + } + var message models.Message + for body := range gptmsg { + if err := json.Unmarshal(body.Body, &message); err != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": err, + }).Errorf("Error when unmarshaling the prepare json body.") + continue + } + + pmessage := models.Message{ + ToUserId: message.ToUserId, + FromUserId: message.FromUserId, + ConversationId: message.ConversationId, + Content: message.Content, + } + + result := database.Client.WithContext(context.Background()).Create(&pmessage) + //发一份消息到openai api + + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": result.Error, + }).Errorf("Error when insert message to database.") + // logging.SetSpanError(span, err) + return + } + + } +} From ac0921f45b18337001e8fd1bc8b9efe93f48d836 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Thu, 31 Aug 2023 01:39:46 +0800 Subject: [PATCH 02/19] tt --- src/services/message/handler.go | 4 ---- src/services/msgconsumer/main.go | 7 +++---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 240c8b5..6aaf741 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -6,10 +6,6 @@ import ( "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/rpc/chat" - "GuGoTik/src/rpc/feed" - "GuGoTik/src/rpc/recommend" - "GuGoTik/src/rpc/relation" - "GuGoTik/src/rpc/user" "GuGoTik/src/storage/database" "GuGoTik/src/storage/redis" "GuGoTik/src/utils/logging" diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 27ae21d..763fb33 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -11,7 +11,6 @@ import ( "github.com/sirupsen/logrus" "github.com/streadway/amqp" - "go-micro.dev/v4/logger" ) func failOnError(err error, msg string) { @@ -110,13 +109,13 @@ func main() { } }() - go ss(channel) + // go ss(channel) <-foreever } -func ss(channel *amqp.Channel) { +/* func ss(channel *amqp.Channel) { gptmsg, err := channel.Consume( strings.MessageGptActionEvent, "", @@ -158,4 +157,4 @@ func ss(channel *amqp.Channel) { } } -} +} */ From 3ed3c4c2643e85c201da4f186696cb069cb4392a Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Wed, 30 Aug 2023 20:46:41 +0800 Subject: [PATCH 03/19] fix(feed): fix latestTime error --- src/services/feed/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/feed/handler.go b/src/services/feed/handler.go index ffbf032..e616d2e 100644 --- a/src/services/feed/handler.go +++ b/src/services/feed/handler.go @@ -134,7 +134,7 @@ func (s FeedServiceImpl) ListVideosByRecommend(ctx context.Context, request *fee now := time.Now().UnixMilli() latestTime := now - if request.LatestTime != nil && *request.LatestTime != "" { + if request.LatestTime != nil && *request.LatestTime != "" && *request.LatestTime != "0" { // Check if request.LatestTime is a timestamp t, ok := isUnixMilliTimestamp(*request.LatestTime) if ok { From 83c24d8d9d95563cee40ee8c384edd4459798d1f Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Wed, 30 Aug 2023 20:46:57 +0800 Subject: [PATCH 04/19] typo(message): fix typo --- src/services/message/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 70090ad..bedfb81 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -57,7 +57,7 @@ func (c MessageServiceImpl) New() { var err error conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr()) if err != nil { - failOnError(err, "Fialed to conenct to RabbitMQ") + failOnError(err, "Failed to connect to RabbitMQ") } channel, err = conn.Channel() @@ -285,7 +285,7 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) "user_id": request.UserId, "ActorId": request.ActorId, "pre_msg_time": request.PreMsgTime, - }).Errorf("ChatServiceImpl list chat failed to response when listing message,database err") + }).Errorf("ChatServiceImpl list chat failed to response when listing message, database err") logging.SetSpanError(span, err) resp = &chat.ChatResponse{ From 18d357cf4aa5103fbfe8491e4d306d1516c98cb6 Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Thu, 31 Aug 2023 01:44:16 +0800 Subject: [PATCH 05/19] feat(auth): add bloom filter to filter out non-existent username in login service --- go.mod | 3 +++ go.sum | 6 ++++++ src/services/auth/handler.go | 20 ++++++++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/go.mod b/go.mod index 9c427e9..d6a7b1b 100644 --- a/go.mod +++ b/go.mod @@ -207,6 +207,7 @@ require ( github.com/sivchari/tenv v1.7.1 // indirect github.com/sonatard/noctx v0.0.2 // indirect github.com/sourcegraph/go-diff v0.7.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/cobra v1.7.0 // indirect @@ -229,6 +230,8 @@ require ( github.com/ultraware/funlen v0.1.0 // indirect github.com/ultraware/whitespace v0.0.5 // indirect github.com/uudashr/gocognit v1.0.7 // indirect + github.com/willf/bitset v1.1.11 // indirect + github.com/willf/bloom v2.0.3+incompatible // indirect github.com/xen0n/gosmopolitan v1.2.1 // indirect github.com/yagipy/maintidx v1.0.0 // indirect github.com/yeya24/promlinter v0.2.0 // indirect diff --git a/go.sum b/go.sum index eadfe67..2381297 100644 --- a/go.sum +++ b/go.sum @@ -678,6 +678,8 @@ github.com/sonatard/noctx v0.0.2 h1:L7Dz4De2zDQhW8S0t+KUjY0MAQJd6SgVwhzNIc4ok00= github.com/sonatard/noctx v0.0.2/go.mod h1:kzFz+CzWSjQ2OzIm46uJZoXuBpa2+0y3T36U18dWqIo= github.com/sourcegraph/go-diff v0.7.0 h1:9uLlrd5T46OXs5qpp8L/MTltk0zikUGi0sNNyCpA8G0= github.com/sourcegraph/go-diff v0.7.0/go.mod h1:iBszgVvyxdc8SFZ7gm69go2KDdt3ag071iBaWPF6cjs= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM= github.com/spf13/afero v1.9.5/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/UhQ= github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA= @@ -748,6 +750,10 @@ github.com/ultraware/whitespace v0.0.5 h1:hh+/cpIcopyMYbZNVov9iSxvJU3OYQg78Sfaqz github.com/ultraware/whitespace v0.0.5/go.mod h1:aVMh/gQve5Maj9hQ/hg+F75lr/X5A89uZnzAmWSineA= github.com/uudashr/gocognit v1.0.7 h1:e9aFXgKgUJrQ5+bs61zBigmj7bFJ/5cC6HmMahVzuDo= github.com/uudashr/gocognit v1.0.7/go.mod h1:nAIUuVBnYU7pcninia3BHOvQkpQCeO76Uscky5BOwcY= +github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= +github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= +github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA= +github.com/willf/bloom v2.0.3+incompatible/go.mod h1:MmAltL9pDMNTrvUkxdg0k0q5I0suxmuwp3KbyrZLOZ8= github.com/xen0n/gosmopolitan v1.2.1 h1:3pttnTuFumELBRSh+KQs1zcz4fN6Zy7aB0xlnQSn1Iw= github.com/xen0n/gosmopolitan v1.2.1/go.mod h1:JsHq/Brs1o050OOdmzHeOr0N7OtlnKRAGAsElF8xBQA= github.com/yagipy/maintidx v1.0.0 h1:h5NvIsCz+nRDapQ0exNv4aJ0yXSI0420omVANTv3GJM= diff --git a/src/services/auth/handler.go b/src/services/auth/handler.go index fe7c30e..b8fa091 100644 --- a/src/services/auth/handler.go +++ b/src/services/auth/handler.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/google/uuid" "github.com/sirupsen/logrus" + "github.com/willf/bloom" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/crypto/bcrypt" @@ -35,6 +36,8 @@ var relationClient relation.RelationServiceClient var userClient user2.UserServiceClient var recommendClient recommend.RecommendServiceClient +var bloomFilter *bloom.BloomFilter + type AuthServiceImpl struct { auth.AuthServiceServer } @@ -46,6 +49,9 @@ func (a AuthServiceImpl) New() { userClient = user2.NewUserServiceClient(userRpcConn) recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn) + + // Create a new Bloom filter with a target false positive rate of 0.1% + bloomFilter = bloom.NewWithEstimates(10000000, 0.001) // assuming we have 1 million users } func (a AuthServiceImpl) Authenticate(ctx context.Context, request *auth.AuthenticateRequest) (resp *auth.AuthenticateResponse, err error) { @@ -233,6 +239,7 @@ func (a AuthServiceImpl) Register(ctx context.Context, request *auth.RegisterReq resp.StatusCode = strings.ServiceOKCode resp.StatusMsg = strings.ServiceOK + bloomFilter.AddString(user.UserName) addMagicUserFriend(ctx, &span, user.ID) return @@ -247,6 +254,19 @@ func (a AuthServiceImpl) Login(ctx context.Context, request *auth.LoginRequest) "username": request.Username, }).Infof("User try to log in.") + // Check if a username might be in the filter + if !bloomFilter.TestString(request.Username) { + resp = &auth.LoginResponse{ + StatusCode: strings.UnableToQueryUserErrorCode, + StatusMsg: strings.UnableToQueryUserError, + } + + logger.WithFields(logrus.Fields{ + "username": request.Username, + }).Infof("The user is blocked by Bloom Filter") + return + } + resp = &auth.LoginResponse{} user := models.User{ UserName: request.Username, From a72d26761571f29d82e82173318b044d5b5b1d8f Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Thu, 31 Aug 2023 05:15:27 +0800 Subject: [PATCH 06/19] add chatgpt to message --- src/constant/config/service.go | 1 + src/services/message/handler.go | 154 +++++++------- src/services/message/main.go | 9 +- src/services/msgconsumer/main.go | 302 +++++++++++++++++++++++++--- src/services/videoprocessor/main.go | 1 + 5 files changed, 371 insertions(+), 96 deletions(-) diff --git a/src/constant/config/service.go b/src/constant/config/service.go index 4a968d1..1a57999 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -35,3 +35,4 @@ const VideoProcessorRpcServiceName = "GuGoTik-VideoProcessorService" const VideoPicker = "GuGoTik-VideoPicker" const Event = "GuGoTik-Recommend" +const MsgConsumer = "GuGoTik-MgsConsumer" diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 6aaf741..0c98c57 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -6,6 +6,10 @@ import ( "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/rpc/chat" + "GuGoTik/src/rpc/feed" + "GuGoTik/src/rpc/recommend" + "GuGoTik/src/rpc/relation" + "GuGoTik/src/rpc/user" "GuGoTik/src/storage/database" "GuGoTik/src/storage/redis" "GuGoTik/src/utils/logging" @@ -15,20 +19,22 @@ import ( "encoding/json" "fmt" + grpc2 "GuGoTik/src/utils/grpc" "time" "github.com/go-redis/redis_rate/v10" + "github.com/robfig/cron/v3" "github.com/streadway/amqp" "gorm.io/gorm" "github.com/sirupsen/logrus" ) -/* var userClient user.UserServiceClient +var userClient user.UserServiceClient var recommendClient recommend.RecommendServiceClient var relationClient relation.RelationServiceClient var feedClient feed.FeedServiceClient -var chatClient chat.ChatServiceClient */ +var chatClient chat.ChatServiceClient type MessageServiceImpl struct { chat.ChatServiceServer @@ -73,7 +79,7 @@ func (c MessageServiceImpl) New() { _, err = channel.QueueDeclare( strings.MessageActionEvent, - false, false, false, false, + true, false, false, false, nil, ) @@ -83,7 +89,7 @@ func (c MessageServiceImpl) New() { _, err = channel.QueueDeclare( strings.MessageGptActionEvent, - false, false, false, false, + true, false, false, false, nil, ) @@ -113,34 +119,34 @@ func (c MessageServiceImpl) New() { failOnError(err, "Failed to define queue to exchange") } - /* userRpcConn := grpc2.Connect(config.UserRpcServerName) + userRpcConn := grpc2.Connect(config.UserRpcServerName) - userClient = user.NewUserServiceClient(userRpcConn) + userClient = user.NewUserServiceClient(userRpcConn) - recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) - recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn) + recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) + recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn) - relationRpcConn := grpc2.Connect(config.RelationRpcServerName) - relationClient = relation.NewRelationServiceClient(relationRpcConn) + relationRpcConn := grpc2.Connect(config.RelationRpcServerName) + relationClient = relation.NewRelationServiceClient(relationRpcConn) - feedRpcConn := grpc2.Connect(config.FeedRpcServerName) - feedClient = feed.NewFeedServiceClient(feedRpcConn) + feedRpcConn := grpc2.Connect(config.FeedRpcServerName) + feedClient = feed.NewFeedServiceClient(feedRpcConn) - chatRpcConn := grpc2.Connect(config.MessageRpcServerName) - chatClient = chat.NewChatServiceClient(chatRpcConn) + chatRpcConn := grpc2.Connect(config.MessageRpcServerName) + chatClient = chat.NewChatServiceClient(chatRpcConn) - cronRunner := cron.New(cron.WithSeconds()) + cronRunner := cron.New(cron.WithSeconds()) - //_, err = cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00 - _, err = cronRunner.AddFunc("@every 5m", sendMagicMessage) // execute every minute [for test] + //_, err = cronRunner.AddFunc("0 0 18 * * *", sendMagicMessage) // execute every 18:00 + _, err = cronRunner.AddFunc("@every 5m", sendMagicMessage) // execute every minute [for test] - if err != nil { - logging.Logger.WithFields(logrus.Fields{ - "err": err, - }).Errorf("Cannot start SendMagicMessage cron job") - } + if err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Cannot start SendMagicMessage cron job") + } - cronRunner.Start() */ + cronRunner.Start() } @@ -209,25 +215,25 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action return } - /* userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - "action_type": request.ActionType, - "content_text": request.Content, - }).Errorf("User service error") - logging.SetSpanError(span, err) - - return &chat.ActionResponse{ - StatusCode: strings.UnableToAddMessageErrorCode, - StatusMsg: strings.UnableToAddMessageError, - }, err - } */ + userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ + UserId: request.UserId, + }) + + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + "user_id": request.UserId, + "action_type": request.ActionType, + "content_text": request.Content, + }).Errorf("User service error") + logging.SetSpanError(span, err) + + return &chat.ActionResponse{ + StatusCode: strings.UnableToAddMessageErrorCode, + StatusMsg: strings.UnableToAddMessageError, + }, err + } res, err = addMessage(ctx, request.ActorId, request.UserId, request.Content) if err != nil { @@ -259,25 +265,25 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) "ActorId": request.ActorId, "pre_msg_time": request.PreMsgTime, }).Debugf("Process start") - /* - userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ - UserId: request.UserId, - }) - if err != nil || userResponse.StatusCode != strings.ServiceOKCode { - logger.WithFields(logrus.Fields{ - "err": err, - "ActorId": request.ActorId, - "user_id": request.UserId, - }).Errorf("User service error") - logging.SetSpanError(span, err) + userResponse, err := userClient.GetUserExistInformation(ctx, &user.UserExistRequest{ + UserId: request.UserId, + }) - resp = &chat.ChatResponse{ - StatusCode: strings.UnableToQueryMessageErrorCode, - StatusMsg: strings.UnableToQueryMessageError, - } - return - } */ + if err != nil || userResponse.StatusCode != strings.ServiceOKCode { + logger.WithFields(logrus.Fields{ + "err": err, + "ActorId": request.ActorId, + "user_id": request.UserId, + }).Errorf("User service error") + logging.SetSpanError(span, err) + + resp = &chat.ChatResponse{ + StatusCode: strings.UnableToQueryMessageErrorCode, + StatusMsg: strings.UnableToQueryMessageError, + } + return + } toUserId := request.UserId fromUserId := request.ActorId @@ -385,13 +391,26 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context return } headers := rabbitmq.InjectAMQPHeaders(ctx) - err = channel.Publish("", strings.MessageActionEvent, false, false, - amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "text/plain", - Body: body, - Headers: headers, - }) + + if message.ToUserId == config.EnvCfg.MagicUserId { + err = channel.Publish("", strings.MessageGptActionEvent, false, false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: body, + Headers: headers, + }) + + } else { + + err = channel.Publish("", strings.MessageActionEvent, false, false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: body, + Headers: headers, + }) + } // result := database.Client.WithContext(ctx).Create(&message) @@ -411,7 +430,7 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context } -/* func sendMagicMessage() { +func sendMagicMessage() { ctx, span := tracing.Tracer.Start(context.Background(), "SendMagicMessageService") defer span.End() logging.SetSpanWithHostname(span) @@ -502,4 +521,3 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context }).Infof("Successfully send the magic message") } } -*/ diff --git a/src/services/message/main.go b/src/services/message/main.go index 235631e..1337bb7 100644 --- a/src/services/message/main.go +++ b/src/services/message/main.go @@ -11,14 +11,15 @@ import ( "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" "context" - grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" - "github.com/oklog/run" - "github.com/prometheus/client_golang/prometheus/promhttp" "net" "net/http" "os" "syscall" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" @@ -74,7 +75,7 @@ func main() { var probe healthImpl.ProbeImpl chat.RegisterChatServiceServer(s, srv) health.RegisterHealthServer(s, &probe) - + defer CloseMQConn() srv.New() srvMetrics.InitializeMetrics(s) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 763fb33..d729a2c 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -1,16 +1,24 @@ package main import ( + "GuGoTik/src/constant/config" "GuGoTik/src/constant/strings" + "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/storage/database" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/rabbitmq" "context" "encoding/json" + "errors" + "net/http" + url2 "net/url" + + "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" "github.com/streadway/amqp" + "go.opentelemetry.io/otel/trace" ) func failOnError(err error, msg string) { @@ -19,6 +27,25 @@ func failOnError(err error, msg string) { } +var delayTime = int32(2 * 60 * 1000) //2 minutes +var maxRetries = int32(3) + +var openaiClient *openai.Client + +func init() { + cfg := openai.DefaultConfig(config.EnvCfg.ChatGPTAPIKEYS) + url, err := url2.Parse(config.EnvCfg.ChatGptProxy) + if err != nil { + panic(err) + } + cfg.HTTPClient = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyURL(url), + }, + } + openaiClient = openai.NewClientWithConfig(cfg) +} + func main() { conn, err := amqp.Dial(rabbitmq.BuildMQConnAddr()) if err != nil { @@ -28,11 +55,43 @@ func main() { err := conn.Close() failOnError(err, "Fialed to close conn") }(conn) + + tp, err := tracing.SetTraceProvider(config.MsgConsumer) + if err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Panicf("Error to set the trace") + } + defer func() { + if err := tp.Shutdown(context.Background()); err != nil { + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error to set the trace") + } + }() + channel, err := conn.Channel() if err != nil { failOnError(err, "Failed to open a channel") } + defer func(channel *amqp.Channel) { + err := channel.Close() + failOnError(err, "Fialed to close channel") + }(channel) + + err = channel.ExchangeDeclare( + strings.MessageExchange, + "x-delayed-message", + true, false, false, false, + amqp.Table{ + "x-delayed-type": "direct", + }, + ) + if err != nil { + failOnError(err, "Failed to get exchange") + } + _, err = channel.QueueDeclare( strings.MessageActionEvent, false, false, false, false, @@ -42,10 +101,42 @@ func main() { failOnError(err, "Failed to define queue") } + _, err = channel.QueueDeclare( + strings.MessageGptActionEvent, + false, false, false, false, + nil, + ) + + if err != nil { + failOnError(err, "Failed to define queue") + } + + channel.QueueBind( + strings.MessageActionEvent, + strings.MessageActionEvent, + strings.MessageExchange, + false, + nil, + ) + if err != nil { + failOnError(err, "Failed to bind queue to exchange") + } + + channel.QueueBind( + strings.MessageGptActionEvent, + strings.MessageGptActionEvent, + strings.MessageExchange, + false, + nil, + ) + if err != nil { + failOnError(err, "Failed to define queue to exchange") + } + msg, err := channel.Consume( strings.MessageActionEvent, "", - true, false, false, false, + false, false, false, false, nil, ) if err != nil { @@ -59,17 +150,8 @@ func main() { go func() { var message models.Message for body := range msg { - if err := json.Unmarshal(body.Body, &message); err != nil { - logger.WithFields(logrus.Fields{ - "from_id": message.FromUserId, - "to_id": message.ToUserId, - "err": err, - }).Errorf("Error when unmarshaling the prepare json body.") - return - //todo - } + ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) - /* ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) ctx, span := tracing.Tracer.Start(ctx, "message_send Service") logger := logging.LogService("message_send").WithContext(ctx) @@ -77,11 +159,25 @@ func main() { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, "to_id": message.ToUserId, + "content": message.Content, "err": err, }).Errorf("Error when unmarshaling the prepare json body.") logging.SetSpanError(span, err) - return - } */ + err = body.Nack(false, true) + if err != nil { + logger.WithFields( + logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "content": message.Content, + "err": err, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() + continue + } pmessage := models.Message{ ToUserId: message.ToUserId, @@ -90,32 +186,48 @@ func main() { Content: message.Content, } logger.Info(pmessage) + //可能会重新插入数据 开启事务 晚点改 result := database.Client.WithContext(context.Background()).Create(&pmessage) if result.Error != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, "to_id": message.ToUserId, + "content": message.Content, "err": result.Error, }).Errorf("Error when insert message to database.") - // logging.SetSpanError(span, err) - return + logging.SetSpanError(span, err) + err = body.Nack(false, true) + if err != nil { + logger.WithFields( + logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "content": message.Content, + "err": err, + }).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() + continue } - /* err = body.Ack(true) + err = body.Ack(true) + if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("Error when dealing with the ,essage...") - } */ + }).Errorf("Error when dealing with the message...") + logging.SetSpanError(span, err) + } } }() - // go ss(channel) + go ss(channel) <-foreever } -/* func ss(channel *amqp.Channel) { +func ss(channel *amqp.Channel) { gptmsg, err := channel.Consume( strings.MessageGptActionEvent, "", @@ -126,13 +238,36 @@ func main() { failOnError(err, "Failed to Consume") } var message models.Message + for body := range gptmsg { + ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) + ctx, span := tracing.Tracer.Start(ctx, "message_send Service") + logger := logging.LogService("message_send").WithContext(ctx) + if err := json.Unmarshal(body.Body, &message); err != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, "to_id": message.ToUserId, + "content": message.Content, "err": err, }).Errorf("Error when unmarshaling the prepare json body.") + logging.SetSpanError(span, err) + + //重试 + errorHandler(channel, body, false, logger, &span) + + if err != nil { + logger.WithFields( + logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "content": message.Content, + "err": err, + }, + ).Errorf("Error when nack the message") + logging.SetSpanError(span, err) + } + span.End() continue } @@ -142,9 +277,56 @@ func main() { ConversationId: message.ConversationId, Content: message.Content, } - + //可能会重新插入数据 开启事务 晚点改 result := database.Client.WithContext(context.Background()).Create(&pmessage) //发一份消息到openai api + if result.Error != nil { + logger.WithFields(logrus.Fields{ + "from_id": message.FromUserId, + "to_id": message.ToUserId, + "err": result.Error, + }).Errorf("Error when insert message to database.") + logging.SetSpanError(span, err) + //重试? + continue + } + + req := openai.ChatCompletionRequest{ + Model: openai.GPT3Dot5Turbo, + Messages: []openai.ChatCompletionMessage{{ + Role: openai.ChatMessageRoleUser, + Content: message.Content, + }, + }, + } + + resp, err := openaiClient.CreateChatCompletion( + context.Background(), + req, + ) + + if err != nil { + logger.WithFields(logrus.Fields{ + "Err": err, + "from_id": message.FromUserId, + "context": message.Content, + }).Errorf("Failed to get keywords from ChatGPT") + + logging.SetSpanError(span, err) + //重试 + errorHandler(channel, body, true, logger, &span) + } + + text := resp.Choices[0].Message.Content + pmessage = models.Message{ + ToUserId: message.FromUserId, + FromUserId: message.ToUserId, + ConversationId: message.ConversationId, + Content: text, + // Content: "111", + } + + result = database.Client.WithContext(context.Background()).Create(&pmessage) if result.Error != nil { logger.WithFields(logrus.Fields{ @@ -152,9 +334,81 @@ func main() { "to_id": message.ToUserId, "err": result.Error, }).Errorf("Error when insert message to database.") - // logging.SetSpanError(span, err) - return + logging.SetSpanError(span, err) + //重试? + continue } + err = body.Ack(true) + + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the message...") + logging.SetSpanError(span, err) + } + + } +} + +func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger *logrus.Entry, span *trace.Span) { + if !requeue { // Nack the message + err := d.Nack(false, false) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when nacking the video...") + logging.SetSpanError(*span, err) + } + } else { // Re-publish the message + curRetry, ok := d.Headers["x-retry"].(int32) + if !ok { + curRetry = 0 + } + if curRetry >= maxRetries { + logger.WithFields(logrus.Fields{ + "body": d.Body, + }).Errorf("Maximum retries reached for message.") + logging.SetSpanError(*span, errors.New("maximum retries reached for message")) + err := d.Ack(false) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the video...") + } + } else { + curRetry++ + headers := d.Headers + headers["x-delay"] = delayTime + headers["x-retry"] = curRetry + + err := d.Ack(false) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the video...") + } + + logger.Debugf("Retrying %d times", curRetry) + + err = channel.Publish( + strings.MessageExchange, + strings.MessageGptActionEvent, + false, + false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: d.Body, + Headers: headers, + }, + ) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when re-publishing the video to queue...") + logging.SetSpanError(*span, err) + } + } } -} */ +} diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index 87b6115..e7f2c69 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -155,6 +155,7 @@ func Consume(channel *amqp.Channel) { "err": err, }).Errorf("Error when unmarshaling the prepare json body.") return + //这个地方直接能return吗 } // 截取封面 From cda5e1701bd994dcefaae60020585eccbcf8e0f3 Mon Sep 17 00:00:00 2001 From: XFFFCCCC Date: Thu, 31 Aug 2023 05:24:10 +0800 Subject: [PATCH 07/19] fix err --- src/services/message/handler.go | 6 +++--- src/services/msgconsumer/main.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 0c98c57..8ac1cfd 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -97,7 +97,7 @@ func (c MessageServiceImpl) New() { failOnError(err, "Failed to define queue") } - channel.QueueBind( + err = channel.QueueBind( strings.MessageActionEvent, strings.MessageActionEvent, strings.MessageExchange, @@ -108,7 +108,7 @@ func (c MessageServiceImpl) New() { failOnError(err, "Failed to bind queue to exchange") } - channel.QueueBind( + err = channel.QueueBind( strings.MessageGptActionEvent, strings.MessageGptActionEvent, strings.MessageExchange, @@ -116,7 +116,7 @@ func (c MessageServiceImpl) New() { nil, ) if err != nil { - failOnError(err, "Failed to define queue to exchange") + failOnError(err, "Failed to bind queue to exchange") } userRpcConn := grpc2.Connect(config.UserRpcServerName) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index d729a2c..0f1d3f9 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -111,7 +111,7 @@ func main() { failOnError(err, "Failed to define queue") } - channel.QueueBind( + err = channel.QueueBind( strings.MessageActionEvent, strings.MessageActionEvent, strings.MessageExchange, @@ -122,7 +122,7 @@ func main() { failOnError(err, "Failed to bind queue to exchange") } - channel.QueueBind( + err = channel.QueueBind( strings.MessageGptActionEvent, strings.MessageGptActionEvent, strings.MessageExchange, @@ -130,7 +130,7 @@ func main() { nil, ) if err != nil { - failOnError(err, "Failed to define queue to exchange") + failOnError(err, "Failed to bind queue to exchange") } msg, err := channel.Consume( From cdb5798880761736302101973976ea0438c4e34b Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Thu, 31 Aug 2023 13:44:57 +0800 Subject: [PATCH 08/19] typo(msgconsummer) --- src/services/msgconsumer/main.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 0f1d3f9..6db7778 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -49,11 +49,11 @@ func init() { func main() { conn, err := amqp.Dial(rabbitmq.BuildMQConnAddr()) if err != nil { - failOnError(err, "Fialed to conenct to RabbitMQ") + failOnError(err, "Failed to connect to RabbitMQ") } defer func(conn *amqp.Connection) { err := conn.Close() - failOnError(err, "Fialed to close conn") + failOnError(err, "Failed to close connection") }(conn) tp, err := tracing.SetTraceProvider(config.MsgConsumer) @@ -77,7 +77,7 @@ func main() { defer func(channel *amqp.Channel) { err := channel.Close() - failOnError(err, "Fialed to close channel") + failOnError(err, "Failed to close channel") }(channel) err = channel.ExchangeDeclare( @@ -143,7 +143,7 @@ func main() { failOnError(err, "Failed to Consume") } - var foreever chan struct{} + var forever chan struct{} logger := logging.LogService("msgConsumer") logger.Infof(strings.MessageActionEvent + " is running now") @@ -179,15 +179,15 @@ func main() { continue } - pmessage := models.Message{ + pMessage := models.Message{ ToUserId: message.ToUserId, FromUserId: message.FromUserId, ConversationId: message.ConversationId, Content: message.Content, } - logger.Info(pmessage) + logger.Info(pMessage) //可能会重新插入数据 开启事务 晚点改 - result := database.Client.WithContext(context.Background()).Create(&pmessage) + result := database.Client.WithContext(context.Background()).Create(&pMessage) if result.Error != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, @@ -223,12 +223,12 @@ func main() { go ss(channel) - <-foreever + <-forever } func ss(channel *amqp.Channel) { - gptmsg, err := channel.Consume( + gptMsg, err := channel.Consume( strings.MessageGptActionEvent, "", false, false, false, false, @@ -239,7 +239,7 @@ func ss(channel *amqp.Channel) { } var message models.Message - for body := range gptmsg { + for body := range gptMsg { ctx := rabbitmq.ExtractAMQPHeaders(context.Background(), body.Headers) ctx, span := tracing.Tracer.Start(ctx, "message_send Service") logger := logging.LogService("message_send").WithContext(ctx) @@ -271,14 +271,14 @@ func ss(channel *amqp.Channel) { continue } - pmessage := models.Message{ + pMessage := models.Message{ ToUserId: message.ToUserId, FromUserId: message.FromUserId, ConversationId: message.ConversationId, Content: message.Content, } //可能会重新插入数据 开启事务 晚点改 - result := database.Client.WithContext(context.Background()).Create(&pmessage) + result := database.Client.WithContext(context.Background()).Create(&pMessage) //发一份消息到openai api if result.Error != nil { logger.WithFields(logrus.Fields{ @@ -318,7 +318,7 @@ func ss(channel *amqp.Channel) { } text := resp.Choices[0].Message.Content - pmessage = models.Message{ + pMessage = models.Message{ ToUserId: message.FromUserId, FromUserId: message.ToUserId, ConversationId: message.ConversationId, @@ -326,7 +326,7 @@ func ss(channel *amqp.Channel) { // Content: "111", } - result = database.Client.WithContext(context.Background()).Create(&pmessage) + result = database.Client.WithContext(context.Background()).Create(&pMessage) if result.Error != nil { logger.WithFields(logrus.Fields{ From 50451bc99829df6c27dc86d2df7f4a55c3c0e37d Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Thu, 31 Aug 2023 18:15:37 +0800 Subject: [PATCH 09/19] feat(auth): add bloom filter in login service --- src/services/auth/handler.go | 23 ++++++++++++++------ src/services/auth/main.go | 42 ++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/services/auth/handler.go b/src/services/auth/handler.go index b8fa091..14841e4 100644 --- a/src/services/auth/handler.go +++ b/src/services/auth/handler.go @@ -11,6 +11,7 @@ import ( user2 "GuGoTik/src/rpc/user" "GuGoTik/src/storage/cached" "GuGoTik/src/storage/database" + "GuGoTik/src/storage/redis" grpc2 "GuGoTik/src/utils/grpc" "GuGoTik/src/utils/logging" "context" @@ -36,7 +37,7 @@ var relationClient relation.RelationServiceClient var userClient user2.UserServiceClient var recommendClient recommend.RecommendServiceClient -var bloomFilter *bloom.BloomFilter +var BloomFilter *bloom.BloomFilter type AuthServiceImpl struct { auth.AuthServiceServer @@ -49,9 +50,6 @@ func (a AuthServiceImpl) New() { userClient = user2.NewUserServiceClient(userRpcConn) recommendRpcConn := grpc2.Connect(config.RecommendRpcServiceName) recommendClient = recommend.NewRecommendServiceClient(recommendRpcConn) - - // Create a new Bloom filter with a target false positive rate of 0.1% - bloomFilter = bloom.NewWithEstimates(10000000, 0.001) // assuming we have 1 million users } func (a AuthServiceImpl) Authenticate(ctx context.Context, request *auth.AuthenticateRequest) (resp *auth.AuthenticateResponse, err error) { @@ -239,7 +237,20 @@ func (a AuthServiceImpl) Register(ctx context.Context, request *auth.RegisterReq resp.StatusCode = strings.ServiceOKCode resp.StatusMsg = strings.ServiceOK - bloomFilter.AddString(user.UserName) + // Publish the username to redis + BloomFilter.AddString(user.UserName) + logger.WithFields(logrus.Fields{ + "username": user.UserName, + }).Infof("Publishing user name to redis channel") + err = redis.Client.Publish(ctx, config.BloomRedisChannel, user.UserName).Err() + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + "username": user.UserName, + }).Errorf("Publishing user name to redis channel happens error") + logging.SetSpanError(span, err) + } + addMagicUserFriend(ctx, &span, user.ID) return @@ -255,7 +266,7 @@ func (a AuthServiceImpl) Login(ctx context.Context, request *auth.LoginRequest) }).Infof("User try to log in.") // Check if a username might be in the filter - if !bloomFilter.TestString(request.Username) { + if !BloomFilter.TestString(request.Username) { resp = &auth.LoginResponse{ StatusCode: strings.UnableToQueryUserErrorCode, StatusMsg: strings.UnableToQueryUserError, diff --git a/src/services/auth/main.go b/src/services/auth/main.go index d416e54..57110d7 100644 --- a/src/services/auth/main.go +++ b/src/services/auth/main.go @@ -4,9 +4,12 @@ import ( "GuGoTik/src/constant/config" "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" + "GuGoTik/src/models" "GuGoTik/src/rpc/auth" "GuGoTik/src/rpc/health" healthImpl "GuGoTik/src/services/health" + "GuGoTik/src/storage/database" + "GuGoTik/src/storage/redis" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -17,7 +20,9 @@ import ( _ "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" _ "github.com/prometheus/client_golang/prometheus/promhttp" + redis2 "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" + "github.com/willf/bloom" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "net" @@ -61,6 +66,43 @@ func main() { reg := prom.Client reg.MustRegister(srvMetrics) + // Create a new Bloom filter with a target false positive rate of 0.1% + BloomFilter = bloom.NewWithEstimates(10000000, 0.001) // assuming we have 1 million users + + // Initialize BloomFilter from database + var users []models.User + userNamesResult := database.Client.WithContext(context.Background()).Select("user_name").Find(&users) + if userNamesResult.Error != nil { + log.Panicf("Getting user names from databse happens error: %s", userNamesResult.Error) + panic(userNamesResult.Error) + } + for _, u := range users { + BloomFilter.AddString(u.UserName) + } + + // Create a go routine to receive redis message and add it to BloomFilter + go func() { + pubSub := redis.Client.Subscribe(context.Background(), config.BloomRedisChannel) + defer func(pubSub *redis2.PubSub) { + err := pubSub.Close() + if err != nil { + log.Panicf("Closing redis pubsub happend error: %s", err) + } + }(pubSub) + + _, err := pubSub.ReceiveMessage(context.Background()) + if err != nil { + log.Panicf("Reveiving message from redis happens error: %s", err) + panic(err) + } + + ch := pubSub.Channel() + for msg := range ch { + log.Infof("Add user name to BloomFilter: %s", msg.Payload) + BloomFilter.AddString(msg.Payload) + } + }() + s := grpc.NewServer( grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), From 2f70c10a32309ec11b230a74a5a0169837f864f1 Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Thu, 31 Aug 2023 18:18:05 +0800 Subject: [PATCH 10/19] feat(auth): add bloom filter in login request --- src/constant/config/service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/constant/config/service.go b/src/constant/config/service.go index 1a57999..62859d2 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -36,3 +36,5 @@ const VideoProcessorRpcServiceName = "GuGoTik-VideoProcessorService" const VideoPicker = "GuGoTik-VideoPicker" const Event = "GuGoTik-Recommend" const MsgConsumer = "GuGoTik-MgsConsumer" + +const BloomRedisChannel = "GuGoTik-Bloom" From ecb6aa313c5ff25bb250a78aadc43acefb6befc3 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 22:08:46 +0800 Subject: [PATCH 11/19] fix: fix ack --- src/services/msgconsumer/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 6db7778..df32d17 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -210,7 +210,7 @@ func main() { span.End() continue } - err = body.Ack(true) + err = body.Ack(false) if err != nil { logger.WithFields(logrus.Fields{ @@ -339,7 +339,7 @@ func ss(channel *amqp.Channel) { continue } - err = body.Ack(true) + err = body.Ack(false) if err != nil { logger.WithFields(logrus.Fields{ From 3e1d71ce334de61cd9e69aa1b06568eda93d7935 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 11:36:48 +0800 Subject: [PATCH 12/19] feat: merge --- go.mod | 1 + go.sum | 9 ++++ src/constant/config/.env.example | 6 +++ src/constant/config/env.go | 72 ++++++++++++++------------ src/services/videoprocessor/main.go | 3 +- src/services/videoprocessor/summary.go | 12 +++-- src/storage/database/gorm.go | 35 +++++++++++-- 7 files changed, 94 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index d6a7b1b..239d698 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( google.golang.org/protobuf v1.31.0 gorm.io/driver/postgres v1.5.2 gorm.io/gorm v1.25.4 + gorm.io/plugin/dbresolver v1.4.7 gorm.io/plugin/opentelemetry v0.1.3 ) diff --git a/go.sum b/go.sum index 2381297..a555e0b 100644 --- a/go.sum +++ b/go.sum @@ -224,6 +224,8 @@ github.com/go-playground/validator/v10 v10.15.1 h1:BSe8uhN+xQ4r5guV/ywQI4gO59C2r github.com/go-playground/validator/v10 v10.15.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= github.com/go-redis/redis_rate/v10 v10.0.1 h1:calPxi7tVlxojKunJwQ72kwfozdy25RjA0bCj1h0MUo= github.com/go-redis/redis_rate/v10 v10.0.1/go.mod h1:EMiuO9+cjRkR7UvdvwMO7vbgqJkltQHtwbdIQvaBKIU= +github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-toolsmith/astcast v1.1.0 h1:+JN9xZV1A+Re+95pgnMgDboWNVnIMMQXwfBwLRPgSC8= @@ -422,6 +424,7 @@ github.com/jingyugao/rowserrcheck v1.1.1 h1:zibz55j/MJtLsjP1OF4bSdgXxwL1b+Vn7Tjz github.com/jingyugao/rowserrcheck v1.1.1/go.mod h1:4yvlZSDb3IyDTUZJUmpZfm2Hwok+Dtp+nu2qOq+er9c= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af h1:KA9BjwUk7KlCh6S9EAGWBt1oExIUv9WyNCiRz5amv48= @@ -1238,11 +1241,17 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.4.3 h1:/JhWJhO2v17d8hjApTltKNADm7K7YI2ogkR7avJUL3k= +gorm.io/driver/mysql v1.4.3/go.mod h1:sSIebwZAVPiT+27jK9HIwvsqOGKx3YMPmrA3mBJR10c= gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0= gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8= gorm.io/driver/sqlite v1.5.0 h1:zKYbzRCpBrT1bNijRnxLDJWPjVfImGEn0lSnUY5gZ+c= +gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= +gorm.io/gorm v1.25.2/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw= gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= +gorm.io/plugin/dbresolver v1.4.7 h1:ZwtwmJQxTx9us7o6zEHFvH1q4OeEo1pooU7efmnunJA= +gorm.io/plugin/dbresolver v1.4.7/go.mod h1:l4Cn87EHLEYuqUncpEeTC2tTJQkjngPSD+lo8hIvcT0= gorm.io/plugin/opentelemetry v0.1.3 h1:z6QgEBef/+4S6D00+jUeRPreI0LAf7Idfqe3dz3TWKg= gorm.io/plugin/opentelemetry v0.1.3/go.mod h1:tndJHOdvPT0pyGhOb8E2209eXJCUxhC5UpKw7bGVWeI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/src/constant/config/.env.example b/src/constant/config/.env.example index a6699f6..529a217 100644 --- a/src/constant/config/.env.example +++ b/src/constant/config/.env.example @@ -19,6 +19,12 @@ POSTGRESQL_PASSWORD= POSTGRESQL_DATABASE= POSTGRESQL_SCHEMA= POSTGRESQL_PREFIX= +# Configure PostgreSQL replica, use `enable` or `disable`, default by `disable` +POSTGRESQL_REPLICA=disable +# Configure PostgreSQL Replica, use `,` to split different addr, like 'test1:5432,test2:5432' +POSTGRESQL_REPLICA_ADDR= +POSTGRESQL_REPLICA_USER= +POSTGRESQL_REPLICA_PASSWORD= # Configure storage mode, support: fs, s3 # fs: stoarge binary files in the local machine, use this should provide `FS_PATH` config, or will output at /tmp. Aslo, # you should provide `FS_BASEURL`, the default is `http://localhost/` diff --git a/src/constant/config/env.go b/src/constant/config/env.go index 269f0e1..ec80023 100644 --- a/src/constant/config/env.go +++ b/src/constant/config/env.go @@ -9,40 +9,44 @@ import ( var EnvCfg envConfig type envConfig struct { - ConsulAddr string `env:"CONSUL_ADDR" envDefault:"localhost:8500"` - LoggerLevel string `env:"LOGGER_LEVEL" envDefault:"INFO"` - LoggerWithTraceState string `env:"LOGGER_OUT_TRACING" envDefault:"disable"` - TiedLogging string `env:"TIED" envDefault:"NONE"` - PostgreSQLHost string `env:"POSTGRESQL_HOST"` - PostgreSQLPort string `env:"POSTGRESQL_PORT"` - PostgreSQLUser string `env:"POSTGRESQL_USER"` - PostgreSQLPassword string `env:"POSTGRESQL_PASSWORD"` - PostgreSQLDataBase string `env:"POSTGRESQL_DATABASE"` - StorageType string `env:"STORAGE_TYPE" envDefault:"fs"` - FileSystemStartPath string `env:"FS_PATH" envDefault:"/tmp"` - FileSystemBaseUrl string `env:"FS_BASEURL" envDefault:"http://localhost/"` - RedisAddr string `env:"REDIS_ADDR"` - RedisPassword string `env:"REDIS_PASSWORD" envDefault:""` - RedisDB int `env:"REDIS_DB" envDefault:"0"` - TracingEndPoint string `env:"TRACING_ENDPOINT"` - PyroscopeState string `env:"PYROSCOPE_STATE" envDefault:"false"` - PyroscopeAddr string `env:"PYROSCOPE_ADDR"` - RedisPrefix string `env:"REDIS_PREFIX" envDefault:"GUGUTIK"` - PostgreSQLSchema string `env:"POSTGRESQL_SCHEMA"` - RedisMaster string `env:"REDIS_MASTER"` - ConsulAnonymityPrefix string `env:"CONSUL_ANONYMITY_NAME" envDefault:""` - RabbitMQUsername string `env:"RABBITMQ_USERNAME" envDefault:"guest"` - RabbitMQPassword string `env:"RABBITMQ_PASSWORD" envDefault:"guest"` - RabbitMQAddr string `env:"RABBITMQ_ADDRESS" envDefault:"localhost"` - RabbitMQPort string `env:"RABBITMQ_PORT" envDefault:"5672"` - RabbitMQVhostPrefix string `env:"RABBITMQ_VHOST_PREFIX" envDefault:""` - ChatGPTAPIKEYS string `env:"CHATGPT_API_KEYS"` - PodIpAddr string `env:"POD_IP" envDefault:"localhost"` - GorseAddr string `env:"GORSE_ADDR"` - GorseApiKey string `env:"GORSE_APIKEY"` - MagicUserId uint32 `env:"MAGIC_USER_ID" envDefault:"1"` - ChatGptProxy string `env:"CHATGPT_PROXY"` - PostgreSQLPrefix string `env:"POSTGRESQL_PREFIX" envDefault:""` + ConsulAddr string `env:"CONSUL_ADDR" envDefault:"localhost:8500"` + LoggerLevel string `env:"LOGGER_LEVEL" envDefault:"INFO"` + LoggerWithTraceState string `env:"LOGGER_OUT_TRACING" envDefault:"disable"` + TiedLogging string `env:"TIED" envDefault:"NONE"` + PostgreSQLHost string `env:"POSTGRESQL_HOST"` + PostgreSQLPort string `env:"POSTGRESQL_PORT"` + PostgreSQLUser string `env:"POSTGRESQL_USER"` + PostgreSQLPassword string `env:"POSTGRESQL_PASSWORD"` + PostgreSQLDataBase string `env:"POSTGRESQL_DATABASE"` + StorageType string `env:"STORAGE_TYPE" envDefault:"fs"` + FileSystemStartPath string `env:"FS_PATH" envDefault:"/tmp"` + FileSystemBaseUrl string `env:"FS_BASEURL" envDefault:"http://localhost/"` + RedisAddr string `env:"REDIS_ADDR"` + RedisPassword string `env:"REDIS_PASSWORD" envDefault:""` + RedisDB int `env:"REDIS_DB" envDefault:"0"` + TracingEndPoint string `env:"TRACING_ENDPOINT"` + PyroscopeState string `env:"PYROSCOPE_STATE" envDefault:"false"` + PyroscopeAddr string `env:"PYROSCOPE_ADDR"` + RedisPrefix string `env:"REDIS_PREFIX" envDefault:"GUGUTIK"` + PostgreSQLSchema string `env:"POSTGRESQL_SCHEMA"` + RedisMaster string `env:"REDIS_MASTER"` + ConsulAnonymityPrefix string `env:"CONSUL_ANONYMITY_NAME" envDefault:""` + RabbitMQUsername string `env:"RABBITMQ_USERNAME" envDefault:"guest"` + RabbitMQPassword string `env:"RABBITMQ_PASSWORD" envDefault:"guest"` + RabbitMQAddr string `env:"RABBITMQ_ADDRESS" envDefault:"localhost"` + RabbitMQPort string `env:"RABBITMQ_PORT" envDefault:"5672"` + RabbitMQVhostPrefix string `env:"RABBITMQ_VHOST_PREFIX" envDefault:""` + ChatGPTAPIKEYS string `env:"CHATGPT_API_KEYS"` + PodIpAddr string `env:"POD_IP" envDefault:"localhost"` + GorseAddr string `env:"GORSE_ADDR"` + GorseApiKey string `env:"GORSE_APIKEY"` + MagicUserId uint32 `env:"MAGIC_USER_ID" envDefault:"1"` + ChatGptProxy string `env:"CHATGPT_PROXY"` + PostgreSQLPrefix string `env:"POSTGRESQL_PREFIX" envDefault:""` + PostgreSQLReplicaState string `env:"POSTGRESQL_REPLICA"` + PostgreSQLReplicaAddress string `env:"POSTGRESQL_REPLICA_ADDR"` + PostgreSQLReplicaUsername string `env:"POSTGRESQL_REPLICA_USER"` + PostgreSQLReplicaPassword string `env:"POSTGRESQL_REPLICA_PASSWORD"` } func init() { diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index e7f2c69..739de86 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -154,8 +154,7 @@ func Consume(channel *amqp.Channel) { logger.WithFields(logrus.Fields{ "err": err, }).Errorf("Error when unmarshaling the prepare json body.") - return - //这个地方直接能return吗 + continue } // 截取封面 diff --git a/src/services/videoprocessor/summary.go b/src/services/videoprocessor/summary.go index 5a0dc61..4a069c9 100644 --- a/src/services/videoprocessor/summary.go +++ b/src/services/videoprocessor/summary.go @@ -29,11 +29,13 @@ import ( "sync" ) -var userClient user.UserServiceClient -var commentClient comment.CommentServiceClient -var openaiClient *openai.Client -var delayTime = int32(2 * 60 * 1000) //2 minutes -var maxRetries = int32(3) +var ( + userClient user.UserServiceClient + commentClient comment.CommentServiceClient + openaiClient *openai.Client + delayTime = int32(2 * 60 * 1000) //2 minutes + maxRetries = int32(3) +) var conn *amqp.Connection var channel *amqp.Channel diff --git a/src/storage/database/gorm.go b/src/storage/database/gorm.go index 76e6352..4445889 100644 --- a/src/storage/database/gorm.go +++ b/src/storage/database/gorm.go @@ -7,7 +7,9 @@ import ( "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/schema" + "gorm.io/plugin/dbresolver" "gorm.io/plugin/opentelemetry/tracing" + "strings" "time" ) @@ -50,14 +52,41 @@ func init() { panic(err) } + if config.EnvCfg.PostgreSQLReplicaState == "enable" { + var replicas []gorm.Dialector + for _, addr := range strings.Split(config.EnvCfg.PostgreSQLReplicaAddress, ",") { + pair := strings.Split(addr, ":") + if len(pair) != 2 { + continue + } + + replicas = append(replicas, postgres.Open( + fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s", + pair[0], + config.EnvCfg.PostgreSQLReplicaUsername, + config.EnvCfg.PostgreSQLReplicaPassword, + config.EnvCfg.PostgreSQLDataBase, + pair[1]))) + } + + err := Client.Use(dbresolver.Register(dbresolver.Config{ + Replicas: replicas, + Policy: dbresolver.RandomPolicy{}, + })) + if err != nil { + panic(err) + } + } + sqlDB, err := Client.DB() if err != nil { panic(err) } - sqlDB.SetMaxIdleConns(10) - sqlDB.SetMaxOpenConns(100) - sqlDB.SetConnMaxLifetime(time.Hour) + sqlDB.SetMaxIdleConns(100) + sqlDB.SetMaxOpenConns(200) + sqlDB.SetConnMaxLifetime(24 * time.Hour) + sqlDB.SetConnMaxIdleTime(time.Hour) if err := Client.Use(tracing.NewPlugin()); err != nil { panic(err) From a21dd9cc4a2ffc383dcac09e7a9a4f49ea4a57f5 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 22:18:21 +0800 Subject: [PATCH 13/19] fix: fix message conflict --- .env.docker.compose | 2 +- go.mod | 18 ++++++++----- go.sum | 37 ++++++++++++++++++-------- src/extra/tracing/otel.go | 13 ++++++--- src/services/auth/main.go | 10 +++---- src/services/comment/handler.go | 4 +-- src/services/comment/main.go | 7 +++-- src/services/event/main.go | 2 +- src/services/favorite/handler.go | 4 +-- src/services/favorite/main.go | 7 +++-- src/services/feed/handler.go | 4 +-- src/services/feed/main.go | 7 +++-- src/services/health/handler.go | 18 ------------- src/services/message/handler.go | 10 ++++--- src/services/message/main.go | 12 +++++---- src/services/msgconsumer/main.go | 6 +++-- src/services/publish/handler.go | 4 +-- src/services/publish/main.go | 7 +++-- src/services/recommend/main.go | 7 +++-- src/services/relation/main.go | 7 +++-- src/services/user/main.go | 7 +++-- src/services/videoprocessor/main.go | 2 +- src/services/videoprocessor/summary.go | 6 ++--- src/utils/consul/register.go | 6 +++-- 24 files changed, 106 insertions(+), 101 deletions(-) delete mode 100644 src/services/health/handler.go diff --git a/.env.docker.compose b/.env.docker.compose index 95b9ec5..007ef91 100644 --- a/.env.docker.compose +++ b/.env.docker.compose @@ -41,7 +41,7 @@ REDIS_PASSWORD= REDIS_DB= REDIS_MASTER= # Config Tracing EndPoint, support Jaeger -TRACING_ENDPOINT=http://jaeger:14268/api/traces +TRACING_ENDPOINT=jaeger:4318 # Optional: Config Pyroscope # Decide whether to enable the service, support : enable, disable. # If you enable this service, you must provide Pyroscope server environment diff --git a/go.mod b/go.mod index 239d698..c6099df 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus/client_golang v1.16.0 github.com/pyroscope-io/client v0.7.2 + github.com/rabbitmq/amqp091-go v1.8.1 github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 github.com/redis/go-redis/v9 v9.1.0 github.com/robfig/cron/v3 v3.0.0 @@ -25,14 +26,16 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/streadway/amqp v1.1.0 github.com/stretchr/testify v1.8.4 + github.com/willf/bloom v2.0.3+incompatible github.com/zsais/go-gin-prometheus v0.1.0 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.42.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 - go.opentelemetry.io/otel v1.16.0 - go.opentelemetry.io/otel/exporters/jaeger v1.16.0 - go.opentelemetry.io/otel/sdk v1.16.0 - go.opentelemetry.io/otel/trace v1.16.0 + go.opentelemetry.io/otel v1.17.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0 + go.opentelemetry.io/otel/sdk v1.17.0 + go.opentelemetry.io/otel/trace v1.17.0 golang.org/x/crypto v0.12.0 google.golang.org/grpc v1.57.0 google.golang.org/protobuf v1.31.0 @@ -70,6 +73,7 @@ require ( github.com/butuzov/mirror v1.1.0 // indirect github.com/bytedance/sonic v1.10.0 // indirect github.com/ccojocar/zxcvbn-go v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/charithe/durationcheck v0.0.10 // indirect github.com/chavacava/garif v0.0.0-20230608123814-4bd63c2919ab // indirect @@ -126,6 +130,7 @@ require ( github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect @@ -232,13 +237,13 @@ require ( github.com/ultraware/whitespace v0.0.5 // indirect github.com/uudashr/gocognit v1.0.7 // indirect github.com/willf/bitset v1.1.11 // indirect - github.com/willf/bloom v2.0.3+incompatible // indirect github.com/xen0n/gosmopolitan v1.2.1 // indirect github.com/yagipy/maintidx v1.0.0 // indirect github.com/yeya24/promlinter v0.2.0 // indirect github.com/ykadowak/zerologlint v0.1.3 // indirect gitlab.com/bosi/decorder v0.4.0 // indirect - go.opentelemetry.io/otel/metric v1.16.0 // indirect + go.opentelemetry.io/otel/metric v1.17.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.tmz.dev/musttag v0.7.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.25.0 // indirect @@ -252,6 +257,7 @@ require ( golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index a555e0b..fc702a4 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,8 @@ github.com/caarlos0/env/v6 v6.10.1 h1:t1mPSxNpei6M5yAeu1qtRdPAK29Nbcf/n3G7x+b3/I github.com/caarlos0/env/v6 v6.10.1/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc= github.com/ccojocar/zxcvbn-go v1.0.1 h1:+sxrANSCj6CdadkcMnvde/GWU1vZiiXRbqYSCalV4/4= github.com/ccojocar/zxcvbn-go v1.0.1/go.mod h1:g1qkXtUSvHP8lhHp5GrSmTz6uWALGRMQdw6Qnz/hi60= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -258,6 +260,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -362,6 +365,8 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0 h1 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0/go.mod h1:kdXbOySqcQeTxiqglW7aahTmWZy3Pgi6SYL36yvKeyA= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 h1:o95KDiV/b1xdkumY5YbLR0/n2+wBxUpgf3HgfKgTyLI= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3/go.mod h1:hTxjzRcX49ogbTGVJ1sM5mz5s+SSgiGIyL3jjPxl32E= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/consul/api v1.24.0 h1:u2XyStA2j0jnCiVUU7Qyrt8idjRn4ORhK6DlvZ3bWhA= github.com/hashicorp/consul/api v1.24.0/go.mod h1:NZJGRFYruc/80wYowkPFCp1LbGmJC9L8izrwfyVx/Wg= github.com/hashicorp/consul/sdk v0.14.1 h1:ZiwE2bKb+zro68sWzZ1SgHF3kRMBZ94TwOCFRF4ylPs= @@ -628,6 +633,8 @@ github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 h1:TCg2WBOl github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727/go.mod h1:rlzQ04UMyJXu/aOvhd8qT+hvDrFpiwqp8MRXDY9szc0= github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 h1:M8mH9eK4OUR4lu7Gd+PU1fV2/qnDNfzT635KRSObncs= github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567/go.mod h1:DWNGW8A4Y+GyBgPuaQJuWiy0XYftx4Xm/y5Jqk9I6VQ= +github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= +github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 h1:EaDatTxkdHG+U3Bk4EUr+DZ7fOGwTfezUiUJMaIcaho= github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5/go.mod h1:fyalQWdtzDBECAQFBJuQe5bzQ02jGd5Qcbgb97Flm7U= github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 h1:EfpWLLCyXw8PSM2/XNJLjI3Pb27yVE+gIAfeqp8LUCc= @@ -790,20 +797,25 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= go.opentelemetry.io/contrib/propagators/b3 v1.17.0 h1:ImOVvHnku8jijXqkwCSyYKRDt2YrnGXD4BbhcpfbfJo= -go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= -go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= -go.opentelemetry.io/otel/exporters/jaeger v1.16.0 h1:YhxxmXZ011C0aDZKoNw+juVWAmEfv/0W2XBOv9aHTaA= -go.opentelemetry.io/otel/exporters/jaeger v1.16.0/go.mod h1:grYbBo/5afWlPpdPZYhyn78Bk04hnvxn2+hvxQhKIQM= +go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= +go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 h1:U5GYackKpVKlPrd/5gKMlrTlP2dCESAAFU682VCpieY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0/go.mod h1:aFsJfCEnLzEu9vRRAcUiB/cpRTbVsNdF3OHSPpdjxZQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0 h1:kvWMtSUNVylLVrOE4WLUmBtgziYoCIYUNSpTYtMzVJI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0/go.mod h1:SExUrRYIXhDgEKG4tkiQovd2HTaELiHUsuK08s5Nqx4= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.15.1 h1:2PunuO5SbkN5MhCbuHCd3tC6qrcaj+uDAkX/qBU5BAs= -go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= -go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= -go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= -go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= -go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= -go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= +go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= +go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= +go.opentelemetry.io/otel/sdk v1.17.0 h1:FLN2X66Ke/k5Sg3V623Q7h7nt3cHXaW1FOvKKrW0IpE= +go.opentelemetry.io/otel/sdk v1.17.0/go.mod h1:U87sE0f5vQB7hwUoW98pW5Rz4ZDuCFBZFNUBlSgmDFQ= +go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= +go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.tmz.dev/musttag v0.7.2 h1:1J6S9ipDbalBSODNT5jCep8dhZyMr4ttnjQagmGYR5s= go.tmz.dev/musttag v0.7.2/go.mod h1:m6q5NiiSKMnQYokefa2xGoyoXnrswCbJ0AWYzf4Zs28= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= @@ -1183,6 +1195,9 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc h1:kVKPf/IiYSBWEWtkIn6wZXwWGCnLKcC8oWfZvXjsGnM= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 h1:lv6/DhyiFFGsmzxbsUUTOkN29II+zeWHxvT8Lpdxsv0= google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/src/extra/tracing/otel.go b/src/extra/tracing/otel.go index af7c705..c2f63a9 100644 --- a/src/extra/tracing/otel.go +++ b/src/extra/tracing/otel.go @@ -3,9 +3,11 @@ package tracing import ( "GuGoTik/src/constant/config" "GuGoTik/src/utils/logging" + "context" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/trace" @@ -16,8 +18,11 @@ import ( var Tracer trace2.Tracer func SetTraceProvider(name string) (*trace.TracerProvider, error) { - url := config.EnvCfg.TracingEndPoint - jexp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + client := otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(config.EnvCfg.TracingEndPoint), + otlptracehttp.WithInsecure(), + ) + exporter, err := otlptrace.New(context.Background(), client) if err != nil { logging.Logger.WithFields(logrus.Fields{ "err": err, @@ -26,7 +31,7 @@ func SetTraceProvider(name string) (*trace.TracerProvider, error) { } tp := trace.NewTracerProvider( - trace.WithBatcher(jexp), + trace.WithBatcher(exporter), trace.WithResource( resource.NewWithAttributes( semconv.SchemaURL, diff --git a/src/services/auth/main.go b/src/services/auth/main.go index 57110d7..986cc47 100644 --- a/src/services/auth/main.go +++ b/src/services/auth/main.go @@ -6,8 +6,6 @@ import ( "GuGoTik/src/extra/tracing" "GuGoTik/src/models" "GuGoTik/src/rpc/auth" - "GuGoTik/src/rpc/health" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/storage/database" "GuGoTik/src/storage/redis" "GuGoTik/src/utils/consul" @@ -16,15 +14,14 @@ import ( "context" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/oklog/run" - _ "github.com/prometheus/client_golang/prometheus" - _ "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" - _ "github.com/prometheus/client_golang/prometheus/promhttp" redis2 "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" "github.com/willf/bloom" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -115,9 +112,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.AuthRpcServerName, config.AuthRpcServerPort) var srv AuthServiceImpl - var probe healthImpl.ProbeImpl auth.RegisterAuthServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) srv.New() srvMetrics.InitializeMetrics(s) diff --git a/src/services/comment/handler.go b/src/services/comment/handler.go index ed50f7e..5f44767 100644 --- a/src/services/comment/handler.go +++ b/src/services/comment/handler.go @@ -17,8 +17,8 @@ import ( "encoding/json" "fmt" "github.com/go-redis/redis_rate/v10" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" "go.opentelemetry.io/otel/trace" "strconv" "sync" @@ -104,7 +104,7 @@ func produceComment(ctx context.Context, event models.RecommendEvent) { headers := rabbitmq.InjectAMQPHeaders(ctx) - err = channel.Publish( + err = channel.PublishWithContext(ctx, strings.EventExchange, strings.VideoCommentEvent, false, diff --git a/src/services/comment/main.go b/src/services/comment/main.go index f11ce9f..446c60e 100644 --- a/src/services/comment/main.go +++ b/src/services/comment/main.go @@ -5,8 +5,6 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/comment" - "GuGoTik/src/rpc/health" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -17,6 +15,8 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -69,9 +69,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.CommentRpcServerName, config.CommentRpcServerPort) var srv CommentServiceImpl - var probe healthImpl.ProbeImpl comment.RegisterCommentServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) defer CloseMQConn() if err := consul.RegisterConsul(config.CommentRpcServerName, config.CommentRpcServerPort); err != nil { log.Panicf("Rpc %s register consul happens error for: %v", config.CommentRpcServerName, err) diff --git a/src/services/event/main.go b/src/services/event/main.go index b047e16..23dd233 100644 --- a/src/services/event/main.go +++ b/src/services/event/main.go @@ -10,8 +10,8 @@ import ( "GuGoTik/src/utils/rabbitmq" "context" "encoding/json" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" "strconv" "sync" "time" diff --git a/src/services/favorite/handler.go b/src/services/favorite/handler.go index 78c7adc..e094fca 100644 --- a/src/services/favorite/handler.go +++ b/src/services/favorite/handler.go @@ -15,7 +15,7 @@ import ( "context" "encoding/json" "fmt" - "github.com/streadway/amqp" + amqp "github.com/rabbitmq/amqp091-go" "strconv" "sync" "time" @@ -93,7 +93,7 @@ func produceFavorite(ctx context.Context, event models.RecommendEvent) { headers := rabbitmq.InjectAMQPHeaders(ctx) - err = channel.Publish( + err = channel.PublishWithContext(ctx, strings.EventExchange, strings.FavoriteActionEvent, false, diff --git a/src/services/favorite/main.go b/src/services/favorite/main.go index bda348d..c0a516b 100644 --- a/src/services/favorite/main.go +++ b/src/services/favorite/main.go @@ -5,8 +5,6 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/favorite" - "GuGoTik/src/rpc/health" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -14,6 +12,8 @@ import ( grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -70,9 +70,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.FavoriteRpcServerName, config.FavoriteRpcServerPort) var srv FavoriteServiceServerImpl - var probe healthImpl.ProbeImpl favorite.RegisterFavoriteServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) defer CloseMQConn() if err := consul.RegisterConsul(config.FavoriteRpcServerName, config.FavoriteRpcServerPort); err != nil { log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err) diff --git a/src/services/feed/handler.go b/src/services/feed/handler.go index e616d2e..1938b75 100644 --- a/src/services/feed/handler.go +++ b/src/services/feed/handler.go @@ -18,7 +18,7 @@ import ( "context" "encoding/json" "errors" - "github.com/streadway/amqp" + amqp "github.com/rabbitmq/amqp091-go" "gorm.io/gorm" "strconv" "sync" @@ -106,7 +106,7 @@ func produceFeed(ctx context.Context, event models.RecommendEvent) { headers := rabbitmq.InjectAMQPHeaders(ctx) - err = channel.Publish( + err = channel.PublishWithContext(ctx, strings.EventExchange, strings.VideoGetEvent, false, diff --git a/src/services/feed/main.go b/src/services/feed/main.go index 6ef19cc..fbaabf5 100644 --- a/src/services/feed/main.go +++ b/src/services/feed/main.go @@ -5,8 +5,6 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/feed" - "GuGoTik/src/rpc/health" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -17,6 +15,8 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -69,9 +69,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.FeedRpcServerName, config.FeedRpcServerPort) var srv FeedServiceImpl - var probe healthImpl.ProbeImpl feed.RegisterFeedServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) defer CloseMQConn() if err := consul.RegisterConsul(config.FeedRpcServerName, config.FeedRpcServerPort); err != nil { log.Panicf("Rpc %s register consul happens error for: %v", config.FeedRpcServerName, err) diff --git a/src/services/health/handler.go b/src/services/health/handler.go deleted file mode 100644 index 02f791c..0000000 --- a/src/services/health/handler.go +++ /dev/null @@ -1,18 +0,0 @@ -package health - -import ( - "GuGoTik/src/rpc/health" - "context" -) - -type ProbeImpl struct { - health.HealthServer -} - -func (h ProbeImpl) Check(context.Context, *health.HealthCheckRequest) (*health.HealthCheckResponse, error) { - return &health.HealthCheckResponse{Status: health.HealthCheckResponse_SERVING}, nil -} - -func (h ProbeImpl) Watch(*health.HealthCheckRequest, health.Health_WatchServer) error { - return nil -} diff --git a/src/services/message/handler.go b/src/services/message/handler.go index 5d7601b..f1fc2a7 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -23,8 +23,8 @@ import ( "time" "github.com/go-redis/redis_rate/v10" + amqp "github.com/rabbitmq/amqp091-go" "github.com/robfig/cron/v3" - "github.com/streadway/amqp" "gorm.io/gorm" "github.com/sirupsen/logrus" @@ -48,8 +48,9 @@ var channel *amqp.Channel func failOnError(err error, msg string) { //打日志 - logging.Logger.Errorf("err %s", msg) - + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf(msg) } func (c MessageServiceImpl) New() { @@ -393,7 +394,8 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context headers := rabbitmq.InjectAMQPHeaders(ctx) if message.ToUserId == config.EnvCfg.MagicUserId { - err = channel.Publish("", strings.MessageGptActionEvent, false, false, + err = channel.PublishWithContext(ctx, + "", strings.MessageGptActionEvent, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", diff --git a/src/services/message/main.go b/src/services/message/main.go index 1337bb7..da4f3c8 100644 --- a/src/services/message/main.go +++ b/src/services/message/main.go @@ -5,12 +5,15 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/rpc/chat" - "GuGoTik/src/rpc/health" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -59,7 +62,7 @@ func main() { reg := prom.Client reg.MustRegister(srvMetrics) - + defer CloseMQConn() s := grpc.NewServer( grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), @@ -72,9 +75,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.MessageRpcServerName, config.MessageRpcServerPort) var srv MessageServiceImpl - var probe healthImpl.ProbeImpl chat.RegisterChatServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) defer CloseMQConn() srv.New() srvMetrics.InitializeMetrics(s) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index df32d17..bc40e25 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -15,6 +15,7 @@ import ( url2 "net/url" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" "github.com/streadway/amqp" @@ -23,8 +24,9 @@ import ( func failOnError(err error, msg string) { //打日志 - logging.Logger.Errorf("err %s", msg) - + logging.Logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf(msg) } var delayTime = int32(2 * 60 * 1000) //2 minutes diff --git a/src/services/publish/handler.go b/src/services/publish/handler.go index 80c8bbc..d2fd6b6 100644 --- a/src/services/publish/handler.go +++ b/src/services/publish/handler.go @@ -19,8 +19,8 @@ import ( "encoding/json" "fmt" "github.com/go-redis/redis_rate/v10" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" "math/rand" "net/http" "time" @@ -316,7 +316,7 @@ func (a PublishServiceImpl) CreateVideo(ctx context.Context, request *publish.Cr routingKeys := []string{strings.VideoPicker, strings.VideoSummary} for _, key := range routingKeys { // Send raw video to all queues bound the exchange - err = channel.Publish(strings.VideoExchange, key, false, false, + err = channel.PublishWithContext(ctx, strings.VideoExchange, key, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", diff --git a/src/services/publish/main.go b/src/services/publish/main.go index d8a9ab9..21e98bd 100644 --- a/src/services/publish/main.go +++ b/src/services/publish/main.go @@ -4,9 +4,7 @@ import ( "GuGoTik/src/constant/config" "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" - "GuGoTik/src/rpc/health" "GuGoTik/src/rpc/publish" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -17,6 +15,8 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -69,9 +69,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.PublishRpcServerName, config.PublishRpcServerPort) var srv PublishServiceImpl - var probe healthImpl.ProbeImpl publish.RegisterPublishServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) defer CloseMQConn() if err := consul.RegisterConsul(config.PublishRpcServerName, config.PublishRpcServerPort); err != nil { log.Panicf("Rpc %s register consul happens error for: %v", config.PublishRpcServerName, err) diff --git a/src/services/recommend/main.go b/src/services/recommend/main.go index 1311824..5d4ed62 100644 --- a/src/services/recommend/main.go +++ b/src/services/recommend/main.go @@ -4,9 +4,7 @@ import ( "GuGoTik/src/constant/config" "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" - "GuGoTik/src/rpc/health" "GuGoTik/src/rpc/recommend" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -17,6 +15,8 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -70,9 +70,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.RecommendRpcServiceName, config.RecommendRpcServicePort) var srv RecommendServiceImpl - var probe healthImpl.ProbeImpl recommend.RegisterRecommendServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) srv.New() srvMetrics.InitializeMetrics(s) diff --git a/src/services/relation/main.go b/src/services/relation/main.go index b761256..0951e62 100644 --- a/src/services/relation/main.go +++ b/src/services/relation/main.go @@ -4,9 +4,7 @@ import ( "GuGoTik/src/constant/config" "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" - "GuGoTik/src/rpc/health" "GuGoTik/src/rpc/relation" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" "GuGoTik/src/utils/prom" @@ -17,6 +15,8 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "net" "net/http" "os" @@ -70,9 +70,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.RelationRpcServerName, config.RelationRpcServerPort) var srv RelationServiceImpl - var probe healthImpl.ProbeImpl relation.RegisterRelationServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) srv.New() srvMetrics.InitializeMetrics(s) diff --git a/src/services/user/main.go b/src/services/user/main.go index 0ce697e..d4548ee 100644 --- a/src/services/user/main.go +++ b/src/services/user/main.go @@ -5,9 +5,7 @@ import ( "GuGoTik/src/extra/profiling" "GuGoTik/src/extra/tracing" "GuGoTik/src/models" - "GuGoTik/src/rpc/health" "GuGoTik/src/rpc/user" - healthImpl "GuGoTik/src/services/health" "GuGoTik/src/storage/database" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" @@ -19,6 +17,8 @@ import ( "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "gorm.io/gorm/clause" "net" "net/http" @@ -73,9 +73,8 @@ func main() { log.Infof("Rpc %s is running at %s now", config.UserRpcServerName, config.UserRpcServerPort) var srv UserServiceImpl - var probe healthImpl.ProbeImpl user.RegisterUserServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) + grpc_health_v1.RegisterHealthServer(s, health.NewServer()) srv.New() createMagicUser() srvMetrics.InitializeMetrics(s) diff --git a/src/services/videoprocessor/main.go b/src/services/videoprocessor/main.go index 739de86..d271a6d 100644 --- a/src/services/videoprocessor/main.go +++ b/src/services/videoprocessor/main.go @@ -14,8 +14,8 @@ import ( "context" "encoding/json" "github.com/golang/freetype/truetype" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" "gorm.io/gorm/clause" "image" "image/color" diff --git a/src/services/videoprocessor/summary.go b/src/services/videoprocessor/summary.go index 4a069c9..ccac091 100644 --- a/src/services/videoprocessor/summary.go +++ b/src/services/videoprocessor/summary.go @@ -17,9 +17,9 @@ import ( "context" "encoding/json" "errors" + amqp "github.com/rabbitmq/amqp091-go" "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" "go.opentelemetry.io/otel/trace" "gorm.io/gorm/clause" "net/http" @@ -105,7 +105,7 @@ func produceKeywords(ctx context.Context, event models.RecommendEvent) { return } - err = channel.Publish( + err = channel.PublishWithContext(ctx, strings2.EventExchange, strings2.VideoPublishEvent, false, @@ -165,7 +165,7 @@ func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger * logger.Debugf("Retrying %d times", curRetry) - err = channel.Publish( + err = channel.PublishWithContext(context.Background(), strings2.VideoExchange, strings2.VideoSummary, false, diff --git a/src/utils/consul/register.go b/src/utils/consul/register.go index f0eeab2..132cc77 100644 --- a/src/utils/consul/register.go +++ b/src/utils/consul/register.go @@ -4,6 +4,7 @@ import ( "GuGoTik/src/constant/config" "GuGoTik/src/utils/logging" "fmt" + "github.com/google/uuid" capi "github.com/hashicorp/consul/api" log "github.com/sirupsen/logrus" "strconv" @@ -34,14 +35,15 @@ func RegisterConsul(name string, port string) error { return err } reg := &capi.AgentServiceRegistration{ - ID: fmt.Sprintf("%s-1", name), + ID: fmt.Sprintf("%s-%s", name, uuid.New().String()[:5]), Name: name, Address: config.EnvCfg.PodIpAddr, Port: parsedPort, Check: &capi.AgentServiceCheck{ Interval: "5s", Timeout: "5s", - GRPC: fmt.Sprintf("%s:%d/Heath", config.EnvCfg.PodIpAddr, parsedPort), + GRPC: fmt.Sprintf("%s:%d", config.EnvCfg.PodIpAddr, parsedPort), + GRPCUseTLS: false, DeregisterCriticalServiceAfter: "30s", }, } From fe0c210fdafbf38afccfb07be01f04803a700e3b Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 16:29:32 +0800 Subject: [PATCH 14/19] fix: max size --- docker-compose.yaml | 6 ++++++ src/services/publish/main.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index 0893683..cdcb153 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -96,6 +96,12 @@ services: - GF_AUTH_DISABLE_LOGIN_FORM=true ports: - "3000:3000" + influxdb: + container_name: "GuGoTik-Extra-InfluxDB" + image: tutum/influxdb + ports: + - "8083:8083" + - "8011:8086" rabbitmq: container_name: "GuGoTik-Extra-RabbitMQ" image: epicmo/rabbitmq-manager:1.0 diff --git a/src/services/publish/main.go b/src/services/publish/main.go index 21e98bd..408c62b 100644 --- a/src/services/publish/main.go +++ b/src/services/publish/main.go @@ -56,8 +56,11 @@ func main() { ) reg := prom.Client reg.MustRegister(srvMetrics) + maxSize := 500 * 1024 * 1024 s := grpc.NewServer( + grpc.MaxRecvMsgSize(maxSize), + grpc.MaxSendMsgSize(maxSize), grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), From 525e02af4490143262643f9fdc004adbcc27da4c Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 17:38:11 +0800 Subject: [PATCH 15/19] fix: ut --- .env.docker.compose | 2 +- promethus.docker.compose.yml | 15 +++++++++++---- src/services/auth/handler.go | 4 ++++ test/k6/login.js | 18 ++++-------------- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/.env.docker.compose b/.env.docker.compose index 007ef91..8dfe655 100644 --- a/.env.docker.compose +++ b/.env.docker.compose @@ -47,7 +47,7 @@ TRACING_ENDPOINT=jaeger:4318 # If you enable this service, you must provide Pyroscope server environment # This profiling is ONLY designed for DEBUGGING # SO, PLEASE DO NOT ENABLE THIS SERVICE IN YOUR PRODUCTION ENVIRONMENT, OR IT MAY TAKE MUCH RUNTIME COST. -PYROSCOPE_STATE=disable +PYROSCOPE_STATE=enable PYROSCOPE_ADDR=http://pyroscope:4040/ # Configure RabbitMQ # Optional: `RABBITMQ_VHOST_PREFIX`: If you provide this config, the service will use value as the rabbit mq vhost prefix. diff --git a/promethus.docker.compose.yml b/promethus.docker.compose.yml index 756e172..54d5730 100644 --- a/promethus.docker.compose.yml +++ b/promethus.docker.compose.yml @@ -24,7 +24,14 @@ scrape_configs: # metrics_path defaults to '/metrics' # scheme defaults to 'http'. static_configs: - - targets: ["localhost:9090"] - - targets: ["gateway:37000"] - - targets: ["auth:37099"] - - targets: ["user:37099"] \ No newline at end of file + - targets: [ "localhost:9090" ] + - targets: [ "gateway:37000" ] + - targets: [ "auth:37099" ] + - targets: [ "user:37099" ] + - targets: [ "comment:37099" ] + - targets: [ "favorite:37099" ] + - targets: [ "feed:37099" ] + - targets: [ "message:37099" ] + - targets: [ "publish:37099" ] + - targets: [ "recommend:37099" ] + - targets: [ "relation:37099" ] \ No newline at end of file diff --git a/src/services/auth/handler.go b/src/services/auth/handler.go index 14841e4..90ea0ae 100644 --- a/src/services/auth/handler.go +++ b/src/services/auth/handler.go @@ -376,6 +376,10 @@ func (a AuthServiceImpl) Login(ctx context.Context, request *auth.LoginRequest) return } + logger.WithFields(logrus.Fields{ + "token": token, + "userId": user.ID, + }).Infof("User log in sucess !") resp = &auth.LoginResponse{ StatusCode: strings.ServiceOKCode, StatusMsg: strings.ServiceOK, diff --git a/test/k6/login.js b/test/k6/login.js index a48ed8c..bf22701 100644 --- a/test/k6/login.js +++ b/test/k6/login.js @@ -2,33 +2,23 @@ import { sleep } from 'k6' import http from 'k6/http' export const options = { - ext: { - loadimpact: { - distribution: { 'amazon:us:ashburn': { loadZone: 'amazon:us:ashburn', percent: 100 } }, - apm: [], - }, - }, - thresholds: {}, scenarios: { Scenario_1: { executor: 'ramping-vus', gracefulStop: '30s', stages: [ - { target: 200, duration: '1s' }, { target: 100, duration: '15s' }, { target: 200, duration: '30s' }, { target: 100, duration: '15s' }, ], gracefulRampDown: '30s', - exec: 'scenario_1', + exec: 'login', }, }, } -export function scenario_1() { - // LoginTest - http.post('http://127.0.0.1:37000/douyin/user/login?username=epicmo&password=epicmo') +export function login() { + http.post('https://gugotik.endymx.qzwxsaedc.cn/douyin/user/login?username=epicmo&password=epicmo') - // Automatically added sleep - sleep(1) + sleep(3) } From 0cb92d045a7c07f6bb01364ff18617da96831963 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 18:06:26 +0800 Subject: [PATCH 16/19] fix: add login log --- src/services/auth/handler.go | 6 ++++++ test/k6/login.js | 8 ++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/services/auth/handler.go b/src/services/auth/handler.go index 90ea0ae..9b5d601 100644 --- a/src/services/auth/handler.go +++ b/src/services/auth/handler.go @@ -406,6 +406,12 @@ func checkPasswordHash(ctx context.Context, password, hash string) bool { } func getToken(ctx context.Context, userId uint32) (string, error) { + span := trace.SpanFromContext(ctx) + logging.SetSpanWithHostname(span) + logger := logging.LogService("AuthService.Login").WithContext(ctx) + logger.WithFields(logrus.Fields{ + "userId": userId, + }).Infof("Select for user token") return cached.GetWithFunc(ctx, "U2T"+strconv.FormatUint(uint64(userId), 10), func(ctx context.Context, key string) (string, error) { span := trace.SpanFromContext(ctx) diff --git a/test/k6/login.js b/test/k6/login.js index bf22701..a72f6fe 100644 --- a/test/k6/login.js +++ b/test/k6/login.js @@ -7,9 +7,9 @@ export const options = { executor: 'ramping-vus', gracefulStop: '30s', stages: [ - { target: 100, duration: '15s' }, - { target: 200, duration: '30s' }, - { target: 100, duration: '15s' }, + { target: 1000, duration: '15s' }, + { target: 2000, duration: '30s' }, + { target: 1000, duration: '15s' }, ], gracefulRampDown: '30s', exec: 'login', @@ -18,7 +18,7 @@ export const options = { } export function login() { - http.post('https://gugotik.endymx.qzwxsaedc.cn/douyin/user/login?username=epicmo&password=epicmo') + http.post('http://127.0.0.1:37000/douyin/user/login?username=epicmo&password=epicmo') sleep(3) } From 8e01de730cd1048817013082033ded57652a7b4f Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 18:49:10 +0800 Subject: [PATCH 17/19] adjust: adjust res --- manifests-endymx/deployment-auth-service.yaml | 8 ++++---- manifests-endymx/deployment-comment-service.yaml | 8 ++++---- manifests-endymx/deployment-event-service.yaml | 8 ++++---- manifests-endymx/deployment-favorite-service.yaml | 8 ++++---- manifests-endymx/deployment-feed-service.yaml | 8 ++++---- manifests-endymx/deployment-http-service.yaml | 8 ++++---- manifests-endymx/deployment-message-service.yaml | 8 ++++---- manifests-endymx/deployment-msg-consumer-service.yaml | 8 ++++---- manifests-endymx/deployment-publish-service.yaml | 6 +++--- manifests-endymx/deployment-recommend-service.yaml | 8 ++++---- manifests-endymx/deployment-relation-service.yaml | 4 ++-- manifests-endymx/deployment-user-service.yaml | 8 ++++---- src/web/main.go | 2 +- test/k6/login.js | 2 +- 14 files changed, 47 insertions(+), 47 deletions(-) diff --git a/manifests-endymx/deployment-auth-service.yaml b/manifests-endymx/deployment-auth-service.yaml index ab1c6ad..e70f22b 100644 --- a/manifests-endymx/deployment-auth-service.yaml +++ b/manifests-endymx/deployment-auth-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-comment-service.yaml b/manifests-endymx/deployment-comment-service.yaml index bc87b16..e18485b 100644 --- a/manifests-endymx/deployment-comment-service.yaml +++ b/manifests-endymx/deployment-comment-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-event-service.yaml b/manifests-endymx/deployment-event-service.yaml index 3139809..17eae40 100644 --- a/manifests-endymx/deployment-event-service.yaml +++ b/manifests-endymx/deployment-event-service.yaml @@ -42,9 +42,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-favorite-service.yaml b/manifests-endymx/deployment-favorite-service.yaml index 23fe4c9..efdc42a 100644 --- a/manifests-endymx/deployment-favorite-service.yaml +++ b/manifests-endymx/deployment-favorite-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-feed-service.yaml b/manifests-endymx/deployment-feed-service.yaml index ec08f0c..edc7753 100644 --- a/manifests-endymx/deployment-feed-service.yaml +++ b/manifests-endymx/deployment-feed-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-http-service.yaml b/manifests-endymx/deployment-http-service.yaml index f3227a4..39d18ba 100644 --- a/manifests-endymx/deployment-http-service.yaml +++ b/manifests-endymx/deployment-http-service.yaml @@ -42,9 +42,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 256Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-message-service.yaml b/manifests-endymx/deployment-message-service.yaml index b54d8a2..c71380d 100644 --- a/manifests-endymx/deployment-message-service.yaml +++ b/manifests-endymx/deployment-message-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-msg-consumer-service.yaml b/manifests-endymx/deployment-msg-consumer-service.yaml index a4bd912..fb11098 100644 --- a/manifests-endymx/deployment-msg-consumer-service.yaml +++ b/manifests-endymx/deployment-msg-consumer-service.yaml @@ -42,9 +42,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-publish-service.yaml b/manifests-endymx/deployment-publish-service.yaml index 4677daa..ec36004 100644 --- a/manifests-endymx/deployment-publish-service.yaml +++ b/manifests-endymx/deployment-publish-service.yaml @@ -49,10 +49,10 @@ spec: protocol: TCP resources: limits: - cpu: 1000m - memory: 1Gi + cpu: 2000m + memory: 1024Mi requests: - cpu: 500m + cpu: 1000m memory: 512Mi volumeMounts: - mountPath: /data/apps/gugotik-service-bundle/data diff --git a/manifests-endymx/deployment-recommend-service.yaml b/manifests-endymx/deployment-recommend-service.yaml index 5d2230f..8b6c4e9 100644 --- a/manifests-endymx/deployment-recommend-service.yaml +++ b/manifests-endymx/deployment-recommend-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-relation-service.yaml b/manifests-endymx/deployment-relation-service.yaml index 593f822..378b29d 100644 --- a/manifests-endymx/deployment-relation-service.yaml +++ b/manifests-endymx/deployment-relation-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m + cpu: 2000m memory: 1024Mi requests: - cpu: 100m + cpu: 1000m memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-user-service.yaml b/manifests-endymx/deployment-user-service.yaml index a49aa95..6fb4c76 100644 --- a/manifests-endymx/deployment-user-service.yaml +++ b/manifests-endymx/deployment-user-service.yaml @@ -45,9 +45,9 @@ spec: protocol: TCP resources: limits: - cpu: 500m - memory: 512Mi + cpu: 2000m + memory: 1024Mi requests: - cpu: 100m - memory: 128Mi + cpu: 1000m + memory: 512Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/src/web/main.go b/src/web/main.go index d818a38..d47ca61 100644 --- a/src/web/main.go +++ b/src/web/main.go @@ -51,7 +51,7 @@ func main() { // Configure Tracing g.Use(otelgin.Middleware(config.WebServiceName)) g.Use(middleware.TokenAuthMiddleware()) - g.Use(middleware.RateLimiterMiddleWare(time.Second, 100, 100)) + g.Use(middleware.RateLimiterMiddleWare(time.Second, 10000, 10000)) // Configure Pyroscope profiling.InitPyroscope("GuGoTik.GateWay") diff --git a/test/k6/login.js b/test/k6/login.js index a72f6fe..8db3b7c 100644 --- a/test/k6/login.js +++ b/test/k6/login.js @@ -18,7 +18,7 @@ export const options = { } export function login() { - http.post('http://127.0.0.1:37000/douyin/user/login?username=epicmo&password=epicmo') + http.post('http://localhost:37000/douyin/user/login?username=epicmo&password=epicmo') sleep(3) } From 61b9d32d5c33c7b446745845ac1f476f7a61b1ce Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 18:54:11 +0800 Subject: [PATCH 18/19] adjust: adjust res --- manifests-endymx/deployment-auth-service.yaml | 4 ++-- manifests-endymx/deployment-comment-service.yaml | 4 ++-- manifests-endymx/deployment-event-service.yaml | 4 ++-- manifests-endymx/deployment-favorite-service.yaml | 4 ++-- manifests-endymx/deployment-feed-service.yaml | 4 ++-- manifests-endymx/deployment-http-service.yaml | 4 ++-- manifests-endymx/deployment-message-service.yaml | 4 ++-- manifests-endymx/deployment-msg-consumer-service.yaml | 4 ++-- manifests-endymx/deployment-publish-service.yaml | 4 ++-- manifests-endymx/deployment-recommend-service.yaml | 4 ++-- manifests-endymx/deployment-relation-service.yaml | 4 ++-- manifests-endymx/deployment-user-service.yaml | 4 ++-- manifests-endymx/deployment-video-processor-service.yaml | 4 ++-- src/web/main.go | 4 +--- 14 files changed, 27 insertions(+), 29 deletions(-) diff --git a/manifests-endymx/deployment-auth-service.yaml b/manifests-endymx/deployment-auth-service.yaml index e70f22b..060cd65 100644 --- a/manifests-endymx/deployment-auth-service.yaml +++ b/manifests-endymx/deployment-auth-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-comment-service.yaml b/manifests-endymx/deployment-comment-service.yaml index e18485b..f76e7c8 100644 --- a/manifests-endymx/deployment-comment-service.yaml +++ b/manifests-endymx/deployment-comment-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-event-service.yaml b/manifests-endymx/deployment-event-service.yaml index 17eae40..3bf4f7f 100644 --- a/manifests-endymx/deployment-event-service.yaml +++ b/manifests-endymx/deployment-event-service.yaml @@ -43,8 +43,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-favorite-service.yaml b/manifests-endymx/deployment-favorite-service.yaml index efdc42a..63cbe54 100644 --- a/manifests-endymx/deployment-favorite-service.yaml +++ b/manifests-endymx/deployment-favorite-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-feed-service.yaml b/manifests-endymx/deployment-feed-service.yaml index edc7753..497d053 100644 --- a/manifests-endymx/deployment-feed-service.yaml +++ b/manifests-endymx/deployment-feed-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-http-service.yaml b/manifests-endymx/deployment-http-service.yaml index 39d18ba..b12e237 100644 --- a/manifests-endymx/deployment-http-service.yaml +++ b/manifests-endymx/deployment-http-service.yaml @@ -43,8 +43,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-message-service.yaml b/manifests-endymx/deployment-message-service.yaml index c71380d..0e6c8f1 100644 --- a/manifests-endymx/deployment-message-service.yaml +++ b/manifests-endymx/deployment-message-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-msg-consumer-service.yaml b/manifests-endymx/deployment-msg-consumer-service.yaml index fb11098..df5e102 100644 --- a/manifests-endymx/deployment-msg-consumer-service.yaml +++ b/manifests-endymx/deployment-msg-consumer-service.yaml @@ -43,8 +43,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-publish-service.yaml b/manifests-endymx/deployment-publish-service.yaml index ec36004..8a1154a 100644 --- a/manifests-endymx/deployment-publish-service.yaml +++ b/manifests-endymx/deployment-publish-service.yaml @@ -50,10 +50,10 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi volumeMounts: - mountPath: /data/apps/gugotik-service-bundle/data name: volume diff --git a/manifests-endymx/deployment-recommend-service.yaml b/manifests-endymx/deployment-recommend-service.yaml index 8b6c4e9..c97b112 100644 --- a/manifests-endymx/deployment-recommend-service.yaml +++ b/manifests-endymx/deployment-recommend-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-relation-service.yaml b/manifests-endymx/deployment-relation-service.yaml index 378b29d..1bcd94f 100644 --- a/manifests-endymx/deployment-relation-service.yaml +++ b/manifests-endymx/deployment-relation-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-user-service.yaml b/manifests-endymx/deployment-user-service.yaml index 6fb4c76..93570d9 100644 --- a/manifests-endymx/deployment-user-service.yaml +++ b/manifests-endymx/deployment-user-service.yaml @@ -46,8 +46,8 @@ spec: resources: limits: cpu: 2000m - memory: 1024Mi + memory: 2048Mi requests: cpu: 1000m - memory: 512Mi + memory: 1024Mi terminationGracePeriodSeconds: 30 \ No newline at end of file diff --git a/manifests-endymx/deployment-video-processor-service.yaml b/manifests-endymx/deployment-video-processor-service.yaml index 1c85d63..163af7a 100644 --- a/manifests-endymx/deployment-video-processor-service.yaml +++ b/manifests-endymx/deployment-video-processor-service.yaml @@ -46,10 +46,10 @@ spec: protocol: TCP resources: limits: - cpu: 4000m + cpu: 8000m memory: 8Gi requests: - cpu: 2000m + cpu: 4000m memory: 4Gi volumeMounts: - mountPath: /data/apps/gugotik-service-bundle/data diff --git a/src/web/main.go b/src/web/main.go index d47ca61..74128d0 100644 --- a/src/web/main.go +++ b/src/web/main.go @@ -16,8 +16,6 @@ import ( relation2 "GuGoTik/src/web/relation" user2 "GuGoTik/src/web/user" "context" - "time" - "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -51,7 +49,7 @@ func main() { // Configure Tracing g.Use(otelgin.Middleware(config.WebServiceName)) g.Use(middleware.TokenAuthMiddleware()) - g.Use(middleware.RateLimiterMiddleWare(time.Second, 10000, 10000)) + //g.Use(middleware.RateLimiterMiddleWare(time.Second, 10000, 10000)) // Configure Pyroscope profiling.InitPyroscope("GuGoTik.GateWay") From 89ca60aee5a9ee1edc0d5f88ee1944a42b6b25c3 Mon Sep 17 00:00:00 2001 From: EpicMo <1982742309@qq.com> Date: Thu, 31 Aug 2023 22:21:32 +0800 Subject: [PATCH 19/19] fix: fix conflict --- go.mod | 1 - src/services/message/handler.go | 2 +- src/services/message/main.go | 4 ---- src/services/msgconsumer/main.go | 3 +-- 4 files changed, 2 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index c6099df..66cec14 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,6 @@ require ( github.com/robfig/cron/v3 v3.0.0 github.com/sashabaranov/go-openai v1.14.2 github.com/sirupsen/logrus v1.9.3 - github.com/streadway/amqp v1.1.0 github.com/stretchr/testify v1.8.4 github.com/willf/bloom v2.0.3+incompatible github.com/zsais/go-gin-prometheus v0.1.0 diff --git a/src/services/message/handler.go b/src/services/message/handler.go index f1fc2a7..cd042c5 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -405,7 +405,7 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context } else { - err = channel.Publish("", strings.MessageActionEvent, false, false, + err = channel.PublishWithContext(ctx, "", strings.MessageActionEvent, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", diff --git a/src/services/message/main.go b/src/services/message/main.go index da4f3c8..ab6d5db 100644 --- a/src/services/message/main.go +++ b/src/services/message/main.go @@ -19,10 +19,6 @@ import ( "os" "syscall" - grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" - "github.com/oklog/run" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index bc40e25..785a6f9 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -18,7 +18,6 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" - "github.com/streadway/amqp" "go.opentelemetry.io/otel/trace" ) @@ -393,7 +392,7 @@ func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger * logger.Debugf("Retrying %d times", curRetry) - err = channel.Publish( + err = channel.PublishWithContext(context.Background(), strings.MessageExchange, strings.MessageGptActionEvent, false,