diff --git a/src/constant/strings/service.go b/src/constant/strings/service.go index 1fc8054..f2a055f 100644 --- a/src/constant/strings/service.go +++ b/src/constant/strings/service.go @@ -1,8 +1,9 @@ package strings const ( - VideoExchange = "video_exchange" - EventExchange = "event" + VideoExchange = "video_exchange" + EventExchange = "event" + MessageExchange = "message_exchange" VideoPicker = "video_picker" VideoSummary = "video_summary" @@ -12,5 +13,5 @@ const ( VideoCommentEvent = "video.comment.action" VideoPublishEvent = "video.publish.action" - MessageActionEvent = "message_send" + MessageActionEvent = "message.send" ) diff --git a/src/services/message/handler.go b/src/services/message/handler.go index d6f4cd6..70090ad 100644 --- a/src/services/message/handler.go +++ b/src/services/message/handler.go @@ -59,15 +59,30 @@ func (c MessageServiceImpl) New() { if err != nil { failOnError(err, "Fialed to conenct to RabbitMQ") } + channel, err = conn.Channel() if err != nil { failOnError(err, "Failed to open a channel") } + + err = channel.ExchangeDeclare( + strings.MessageExchange, + "x-delayed-message", + true, false, false, false, + amqp.Table{ + "x-delayed-type": "direct", + }, + ) + if err != nil { + failOnError(err, "Failed to get exchange") + } + _, err = channel.QueueDeclare( strings.MessageActionEvent, false, false, false, false, nil, ) + if err != nil { failOnError(err, "Failed to define queue") } @@ -281,14 +296,29 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) } rMessageList := make([]*chat.Message, 0, len(pMessageList)) - for _, pMessage := range pMessageList { - rMessageList = append(rMessageList, &chat.Message{ - Id: pMessage.ID, - Content: pMessage.Content, - CreateTime: uint64(pMessage.CreatedAt.UnixMilli()), - FromUserId: ptr.Ptr(pMessage.FromUserId), - ToUserId: ptr.Ptr(pMessage.ToUserId), - }) + if request.PreMsgTime == 0 { + for _, pMessage := range pMessageList { + + rMessageList = append(rMessageList, &chat.Message{ + Id: pMessage.ID, + Content: pMessage.Content, + CreateTime: uint64(pMessage.CreatedAt.UnixMilli()), + FromUserId: ptr.Ptr(pMessage.FromUserId), + ToUserId: ptr.Ptr(pMessage.ToUserId), + }) + } + } else { + for _, pMessage := range pMessageList { + if pMessage.ToUserId == request.ActorId { + rMessageList = append(rMessageList, &chat.Message{ + Id: pMessage.ID, + Content: pMessage.Content, + CreateTime: uint64(pMessage.CreatedAt.UnixMilli()), + FromUserId: ptr.Ptr(pMessage.FromUserId), + ToUserId: ptr.Ptr(pMessage.ToUserId), + }) + } + } } resp = &chat.ChatResponse{ diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index 5f7af8c..184ed14 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -99,6 +99,12 @@ func main() { // logging.SetSpanError(span, err) return } + /* err = body.Ack(true) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the ,essage...") + } */ } }()