From 00d36d3de94833fe8b02841461d18fe2c7bc9917 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Wed, 17 Jan 2024 17:30:38 +0200 Subject: [PATCH] fix lastmessages option with partition > 0 --- server/memphis_helper.go | 97 +++++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 5e258ad1a..2828efae9 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -738,29 +738,33 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta } var deliveryPolicy DeliverPolicy var optStartSeq uint64 + lastSeqPerPartition := map[string]uint64{} // This check for case when the last message is 0 (in case StartConsumeFromSequence > 1 the LastMessages is 0 ) if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 1 { deliveryPolicy = DeliverNew } else if consumer.LastMessages > 0 { var streamInfo *StreamInfo if len(partitionsList) == 1 { - streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$1.final") + streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$1") if err != nil { return err } + lastSeq := streamInfo.State.LastSeq + lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1 + if int(lastMessages) < 1 { + lastMessages = uint64(1) + } + deliveryPolicy = DeliverByStartSequence + optStartSeq = lastMessages } else { - streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+".final") - if err != nil { - return err + for _, pl := range partitionsList { + streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$"+strconv.Itoa(pl)) + if err != nil { + return err + } + lastSeqPerPartition[stationName.Intern()+"$"+strconv.Itoa(pl)] = streamInfo.State.LastSeq } } - lastSeq := streamInfo.State.LastSeq - lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1 - if int(lastMessages) < 1 { - lastMessages = uint64(1) - } - deliveryPolicy = DeliverByStartSequence - optStartSeq = lastMessages } else if consumer.StartConsumeFromSeq > 1 { deliveryPolicy = DeliverByStartSequence optStartSeq = consumer.StartConsumeFromSeq @@ -787,26 +791,59 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta err = s.memphisAddConsumer(tenantName, stationName.Intern(), consumerConfig) return err } else { - for _, pl := range partitionsList { - consumerConfig := &ConsumerConfig{ - Durable: consumerName, - DeliverPolicy: deliveryPolicy, - AckPolicy: AckExplicit, - AckWait: time.Duration(maxAckTimeMs) * time.Millisecond, - MaxDeliver: MaxMsgDeliveries, - FilterSubject: stationName.Intern() + "$" + strconv.Itoa(pl) + ".final", - ReplayPolicy: ReplayInstant, - MaxAckPending: -1, - HeadersOnly: false, - // RateLimit: ,// Bits per sec - // Heartbeat: // time.Duration, - } - if deliveryPolicy == DeliverByStartSequence { - consumerConfig.OptStartSeq = optStartSeq + if consumer.LastMessages > 0 { + for k, v := range lastSeqPerPartition { + lastSeq := v + lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1 + if int(lastMessages) < 1 { + lastMessages = uint64(1) + } + deliveryPolicy = DeliverByStartSequence + optStartSeq = lastMessages + + consumerConfig := &ConsumerConfig{ + Durable: consumerName, + DeliverPolicy: deliveryPolicy, + AckPolicy: AckExplicit, + AckWait: time.Duration(maxAckTimeMs) * time.Millisecond, + MaxDeliver: MaxMsgDeliveries, + FilterSubject: k + ".final", + ReplayPolicy: ReplayInstant, + MaxAckPending: -1, + HeadersOnly: false, + // RateLimit: ,// Bits per sec + // Heartbeat: // time.Duration, + } + if deliveryPolicy == DeliverByStartSequence { + consumerConfig.OptStartSeq = optStartSeq + } + err = s.memphisAddConsumer(tenantName, k, consumerConfig) + if err != nil { + return err + } } - err = s.memphisAddConsumer(tenantName, stationName.Intern()+"$"+strconv.Itoa(pl), consumerConfig) - if err != nil { - return err + } else { + for _, pl := range partitionsList { + consumerConfig := &ConsumerConfig{ + Durable: consumerName, + DeliverPolicy: deliveryPolicy, + AckPolicy: AckExplicit, + AckWait: time.Duration(maxAckTimeMs) * time.Millisecond, + MaxDeliver: MaxMsgDeliveries, + FilterSubject: stationName.Intern() + "$" + strconv.Itoa(pl) + ".final", + ReplayPolicy: ReplayInstant, + MaxAckPending: -1, + HeadersOnly: false, + // RateLimit: ,// Bits per sec + // Heartbeat: // time.Duration, + } + if deliveryPolicy == DeliverByStartSequence { + consumerConfig.OptStartSeq = optStartSeq + } + err = s.memphisAddConsumer(tenantName, stationName.Intern()+"$"+strconv.Itoa(pl), consumerConfig) + if err != nil { + return err + } } } }