Skip to content

Commit

Permalink
feat: 完成recommend-mq (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lansongxx authored Feb 9, 2024
1 parent 3869460 commit 46c1022
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 5 deletions.
4 changes: 4 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ type Config struct {
CreateNotificationsConf kq.KqConf
UpdateNotificationsConf kq.KqConf
DeleteNotificationsConf kq.KqConf
CreateItemsConf kq.KqConf
UpdateItemConf kq.KqConf
DeleteItemConf kq.KqConf
CreateFeedBacksConf kq.KqConf
}
4 changes: 4 additions & 0 deletions app/listen/kqMqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,9 @@ func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext)
kq.MustNewQueue(c.CreateNotificationsConf, mq.NewCreateNotificationsMq(ctx, svcContext)),
kq.MustNewQueue(c.DeleteNotificationsConf, mq.NewDeleteNotificationsMq(ctx, svcContext)),
kq.MustNewQueue(c.UpdateNotificationsConf, mq.NewUpdateNotificationsMq(ctx, svcContext)),
kq.MustNewQueue(c.CreateItemsConf, mq.NewCreateItemsMq(ctx, svcContext)),
kq.MustNewQueue(c.UpdateItemConf, mq.NewUpdateItemMq(ctx, svcContext)),
kq.MustNewQueue(c.DeleteItemConf, mq.NewDeleteItemMq(ctx, svcContext)),
kq.MustNewQueue(c.CreateFeedBacksConf, mq.NewCreateFeedBacksMq(ctx, svcContext)),
}
}
77 changes: 77 additions & 0 deletions app/mq/createFeedBacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package mq

import (
"context"
"github.com/CloudStriver/cloudmind-mq/app/svc"
"github.com/CloudStriver/cloudmind-mq/app/util/heap"
"github.com/CloudStriver/cloudmind-mq/app/util/message"
"github.com/CloudStriver/go-pkg/utils/pconvertor"
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/content"
"github.com/bytedance/sonic"
"github.com/samber/lo"
"github.com/zeromicro/go-zero/core/logx"
)

type CreateFeedBacksMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
msgChan []chan []*message.CreateFeedBacksMessage
Heap *heap.PairHeap
}

func NewCreateFeedBacksMq(ctx context.Context, svcCtx *svc.ServiceContext) *CreateFeedBacksMq {
CreateFeedBacksMq := &CreateFeedBacksMq{
ctx: ctx,
svcCtx: svcCtx,
Heap: &heap.PairHeap{},
}
for i := 0; i < 10; i++ {
CreateFeedBacksMq.Heap.Push(heap.Pair{
First: 0,
Second: i,
})
}
CreateFeedBacksMq.msgChan = make([]chan []*message.CreateFeedBacksMessage, chanCount)
for i := 0; i < chanCount; i++ {
ch := make(chan []*message.CreateFeedBacksMessage, bufferCount)
CreateFeedBacksMq.msgChan[i] = ch
go CreateFeedBacksMq.consume(ch)
}

return CreateFeedBacksMq
}

func (l *CreateFeedBacksMq) Consume(_, value string) error {
var msg []*message.CreateFeedBacksMessage
if err := sonic.Unmarshal(pconvertor.String2Bytes(value), &msg); err != nil {
logx.Errorf("CreateFeedBacksMq->Consume Unmarshal err : %v , val : %s", err, value)
return err
}

num := l.Heap.Pop()
l.msgChan[num.Second] <- msg
num.First += len(msg)
l.Heap.Push(num)
return nil
}

func (l *CreateFeedBacksMq) consume(ch chan []*message.CreateFeedBacksMessage) {
for {
msg, ok := <-ch
if !ok {
logx.Errorf("CreateFeedBacksMq->consume err : %v", ok)
return
}

FeedBacks := lo.Map[*message.CreateFeedBacksMessage, *content.FeedBack](msg, func(m *message.CreateFeedBacksMessage, _ int) *content.FeedBack {
return m.FeedBack
})
if _, err := l.svcCtx.CloudMindContentRPC.CreateFeedBacks(l.ctx, &content.CreateFeedBacksReq{
FeedBacks: FeedBacks,
}); err != nil {
logx.Errorf("CreateFeedBacksMq->consume err : %v", err)
} else {
logx.Infof("CreateFeedBacksMq->consume message : %v", msg)
}
}
}
77 changes: 77 additions & 0 deletions app/mq/createItems.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package mq

import (
"context"
"github.com/CloudStriver/cloudmind-mq/app/svc"
"github.com/CloudStriver/cloudmind-mq/app/util/heap"
"github.com/CloudStriver/cloudmind-mq/app/util/message"
"github.com/CloudStriver/go-pkg/utils/pconvertor"
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/content"
"github.com/bytedance/sonic"
"github.com/samber/lo"
"github.com/zeromicro/go-zero/core/logx"
)

type CreateItemsMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
msgChan []chan []*message.CreateItemsMessage
Heap *heap.PairHeap
}

func NewCreateItemsMq(ctx context.Context, svcCtx *svc.ServiceContext) *CreateItemsMq {
CreateItemsMq := &CreateItemsMq{
ctx: ctx,
svcCtx: svcCtx,
Heap: &heap.PairHeap{},
}
for i := 0; i < 10; i++ {
CreateItemsMq.Heap.Push(heap.Pair{
First: 0,
Second: i,
})
}
CreateItemsMq.msgChan = make([]chan []*message.CreateItemsMessage, chanCount)
for i := 0; i < chanCount; i++ {
ch := make(chan []*message.CreateItemsMessage, bufferCount)
CreateItemsMq.msgChan[i] = ch
go CreateItemsMq.consume(ch)
}

return CreateItemsMq
}

func (l *CreateItemsMq) Consume(_, value string) error {
var msg []*message.CreateItemsMessage
if err := sonic.Unmarshal(pconvertor.String2Bytes(value), &msg); err != nil {
logx.Errorf("CreateItemsMq->Consume Unmarshal err : %v , val : %s", err, value)
return err
}

num := l.Heap.Pop()
l.msgChan[num.Second] <- msg
num.First += len(msg)
l.Heap.Push(num)
return nil
}

func (l *CreateItemsMq) consume(ch chan []*message.CreateItemsMessage) {
for {
msg, ok := <-ch
if !ok {
logx.Errorf("CreateItemsMq->consume err : %v", ok)
return
}

Items := lo.Map[*message.CreateItemsMessage, *content.Item](msg, func(m *message.CreateItemsMessage, _ int) *content.Item {
return m.Item
})
if _, err := l.svcCtx.CloudMindContentRPC.CreateItems(l.ctx, &content.CreateItemsReq{
Items: Items,
}); err != nil {
logx.Errorf("CreateItemsMq->consume err : %v", err)
} else {
logx.Infof("CreateItemsMq->consume message : %v", msg)
}
}
}
74 changes: 74 additions & 0 deletions app/mq/deleteItem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package mq

import (
"github.com/CloudStriver/cloudmind-mq/app/svc"
"github.com/CloudStriver/cloudmind-mq/app/util/heap"
"github.com/CloudStriver/cloudmind-mq/app/util/message"
"github.com/CloudStriver/go-pkg/utils/pconvertor"
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/content"
"github.com/bytedance/sonic"
"github.com/zeromicro/go-zero/core/logx"
"golang.org/x/net/context"
)

type DeleteItemMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
msgChan []chan *message.DeleteItemMessage
Heap *heap.PairHeap
}

func NewDeleteItemMq(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteItemMq {
DeleteItemMq := &DeleteItemMq{
ctx: ctx,
svcCtx: svcCtx,
Heap: &heap.PairHeap{},
}
for i := 0; i < 10; i++ {
DeleteItemMq.Heap.Push(heap.Pair{
First: 0,
Second: i,
})
}
DeleteItemMq.msgChan = make([]chan *message.DeleteItemMessage, chanCount)
for i := 0; i < chanCount; i++ {
ch := make(chan *message.DeleteItemMessage, bufferCount)
DeleteItemMq.msgChan[i] = ch
go DeleteItemMq.consume(ch)
}

return DeleteItemMq
}

func (l *DeleteItemMq) Consume(_, value string) error {
var msg []message.DeleteItemMessage
if err := sonic.Unmarshal(pconvertor.String2Bytes(value), &msg); err != nil {
logx.Errorf("DeleteItemMq->Consume Unmarshal err : %v , val : %s", err, value)
return err
}

for _, d := range msg {
num := l.Heap.Pop()
l.msgChan[num.Second] <- &d
num.First++
l.Heap.Push(num)
}
return nil
}

func (l *DeleteItemMq) consume(ch chan *message.DeleteItemMessage) {
for {
msg, ok := <-ch
if !ok {
logx.Errorf("DeleteItemMq->consume err : %v", ok)
return
}
if _, err := l.svcCtx.CloudMindContentRPC.DeleteItem(l.ctx, &content.DeleteItemReq{
ItemId: msg.ItemId,
}); err != nil {
logx.Errorf("DeleteItemMq->consume err : %v", err)
} else {
logx.Infof("DeleteItemMq->consume message : %v", msg)
}
}
}
78 changes: 78 additions & 0 deletions app/mq/updateItem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package mq

import (
"github.com/CloudStriver/cloudmind-mq/app/svc"
"github.com/CloudStriver/cloudmind-mq/app/util/heap"
"github.com/CloudStriver/cloudmind-mq/app/util/message"
"github.com/CloudStriver/go-pkg/utils/pconvertor"
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/content"
"github.com/bytedance/sonic"
"github.com/zeromicro/go-zero/core/logx"
"golang.org/x/net/context"
)

type UpdateItemMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
msgChan []chan *message.UpdateItemMessage
Heap *heap.PairHeap
}

func NewUpdateItemMq(ctx context.Context, svcCtx *svc.ServiceContext) *UpdateItemMq {
UpdateItemMq := &UpdateItemMq{
ctx: ctx,
svcCtx: svcCtx,
Heap: &heap.PairHeap{},
}
for i := 0; i < 10; i++ {
UpdateItemMq.Heap.Push(heap.Pair{
First: 0,
Second: i,
})
}
UpdateItemMq.msgChan = make([]chan *message.UpdateItemMessage, chanCount)
for i := 0; i < chanCount; i++ {
ch := make(chan *message.UpdateItemMessage, bufferCount)
UpdateItemMq.msgChan[i] = ch
go UpdateItemMq.consume(ch)
}

return UpdateItemMq
}

func (l *UpdateItemMq) Consume(_, value string) error {
var msg []message.UpdateItemMessage
if err := sonic.Unmarshal(pconvertor.String2Bytes(value), &msg); err != nil {
logx.Errorf("UpdateItemMq->Consume Unmarshal err : %v , val : %s", err, value)
return err
}

for _, d := range msg {
num := l.Heap.Pop()
l.msgChan[num.Second] <- &d
num.First++
l.Heap.Push(num)
}
return nil
}

func (l *UpdateItemMq) consume(ch chan *message.UpdateItemMessage) {
for {
msg, ok := <-ch
if !ok {
logx.Errorf("UpdateItemMq->consume err : %v", ok)
return
}
if _, err := l.svcCtx.CloudMindContentRPC.UpdateItem(l.ctx, &content.UpdateItemReq{
ItemId: msg.ItemId,
IsHidden: msg.IsHidden,
Labels: msg.Labels,
Categories: msg.Categories,
Comment: msg.Comment,
}); err != nil {
logx.Errorf("UpdateItemMq->consume err : %v", err)
} else {
logx.Infof("UpdateItemMq->consume message : %v", msg)
}
}
}
11 changes: 7 additions & 4 deletions app/svc/serviceContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package svc
import (
"github.com/CloudStriver/cloudmind-mq/app/config"
"github.com/CloudStriver/go-pkg/utils/kitex/client"
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/content/contentservice"
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/system/systemservice"
)

type ServiceContext struct {
Config config.Config
CloudMindSystemRPC systemservice.Client
Config config.Config
CloudMindSystemRPC systemservice.Client
CloudMindContentRPC contentservice.Client
}

func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
CloudMindSystemRPC: client.NewClient(c.Name, "cloudmind-system", systemservice.NewClient),
Config: c,
CloudMindSystemRPC: client.NewClient(c.Name, "cloudmind-system", systemservice.NewClient),
CloudMindContentRPC: client.NewClient(c.Name, "cloudmind-content", contentservice.NewClient),
}
}
21 changes: 21 additions & 0 deletions app/util/message/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package message

import (
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/content"
"github.com/CloudStriver/service-idl-gen-go/kitex_gen/cloudmind/system"
)

Expand All @@ -21,3 +22,23 @@ type DeleteNotificationsMessage struct {
OnlyType *int64
OnlyIsRead *bool
}

type UpdateItemMessage struct {
ItemId string `protobuf:"bytes,1,opt,name=itemId,proto3" json:"itemId,omitempty"`
IsHidden *bool `protobuf:"varint,2,opt,name=isHidden,proto3,oneof" json:"isHidden,omitempty"`
Labels []string `protobuf:"bytes,3,rep,name=labels,proto3" json:"labels,omitempty"`
Categories []string `protobuf:"bytes,4,rep,name=categories,proto3" json:"categories,omitempty"`
Comment *string `protobuf:"bytes,5,opt,name=comment,proto3,oneof" json:"comment,omitempty"`
}

type CreateFeedBacksMessage struct {
FeedBack *content.FeedBack
}

type CreateItemsMessage struct {
Item *content.Item
}

type DeleteItemMessage struct {
ItemId string
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/CloudStriver/go-pkg v0.0.0-20240206060942-84060a3dd273
github.com/CloudStriver/service-idl-gen-go v0.0.0-20240207025156-072e8b2ff946
github.com/CloudStriver/service-idl-gen-go v0.0.0-20240209070137-e8c51d34a88f
github.com/bytedance/sonic v1.10.2
github.com/jinzhu/copier v0.4.0
github.com/samber/lo v1.39.0
Expand Down
Loading

0 comments on commit 46c1022

Please sign in to comment.