Skip to content

Commit

Permalink
Allow consumers and streams to be found with a certain leader
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Aug 5, 2024
1 parent 363a303 commit efe46cb
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
21 changes: 21 additions & 0 deletions consumer_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type consumerQuery struct {
waiting int
ackPending int
pending uint64
leader string
ageLimit time.Duration
lastDeliveryLimit time.Duration
}
Expand All @@ -54,6 +55,14 @@ func ConsumerQueryExpression(e string) ConsumerQueryOpt {
}
}

// ConsumerQueryLeaderServer finds clustered consumers where a certain node is the leader
func ConsumerQueryLeaderServer(server string) ConsumerQueryOpt {
return func(q *consumerQuery) error {
q.leader = server
return nil
}
}

// ConsumerQueryIsPull finds only Pull consumers
func ConsumerQueryIsPull() ConsumerQueryOpt {
return func(q *consumerQuery) error {
Expand Down Expand Up @@ -167,6 +176,7 @@ func (s *Stream) QueryConsumers(opts ...ConsumerQueryOpt) ([]*Consumer, error) {
q.matchAge,
q.matchDelivery,
q.matchReplicas,
q.matchLeaderServer,
}

var consumers []*Consumer
Expand Down Expand Up @@ -210,6 +220,17 @@ func (q *consumerQuery) cbMatcher(consumers []*Consumer, onlyIf bool, cb func(*C
return matched, nil
}

func (q *consumerQuery) matchLeaderServer(consumers []*Consumer) ([]*Consumer, error) {
return q.cbMatcher(consumers, q.replicas > 0, func(consumer *Consumer) bool {
nfo, _ := consumer.LatestState()
if nfo.Cluster == nil {
return false
}

return (!q.invert && nfo.Cluster.Leader == q.leader) || (q.invert && nfo.Cluster.Leader != q.leader)
})
}

func (q *consumerQuery) matchReplicas(consumers []*Consumer) ([]*Consumer, error) {
return q.cbMatcher(consumers, q.replicas > 0, func(consumer *Consumer) bool {
return (!q.invert && consumer.Replicas() <= q.replicas) || (q.invert && consumer.Replicas() >= q.replicas)
Expand Down
34 changes: 34 additions & 0 deletions stream_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type streamQuery struct {
sourced bool
sourcedIsSet bool
expression string
leader string
matchers []streamMatcher
}

Expand Down Expand Up @@ -163,6 +164,14 @@ func StreamQueryInvert() StreamQueryOpt {
}
}

// StreamQueryLeaderServer finds clustered streams where a certain node is the leader
func StreamQueryLeaderServer(server string) StreamQueryOpt {
return func(q *streamQuery) error {
q.leader = server
return nil
}
}

// QueryStreams filters the streams found in JetStream using various filter options
func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error) {
q := &streamQuery{}
Expand All @@ -185,6 +194,7 @@ func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error) {
q.matchReplicas,
q.matchSourced,
q.matchMirrored,
q.matchLeaderServer,
}

streams, _, err := m.Streams(nil)
Expand Down Expand Up @@ -259,6 +269,30 @@ func (q *streamQuery) matchExpression(streams []*Stream) ([]*Stream, error) {
return matched, nil
}

func (q *streamQuery) matchLeaderServer(streams []*Stream) ([]*Stream, error) {
if q.leader == "" {
return streams, nil
}

var matched []*Stream
for _, stream := range streams {
nfo, err := stream.LatestInformation()
if err != nil {
return nil, err
}

if nfo.Cluster == nil {
continue
}

if (!q.invert && nfo.Cluster.Leader == q.leader) || (q.invert && nfo.Cluster.Leader != q.leader) {
matched = append(matched, stream)
}
}

return matched, nil
}

func (q *streamQuery) matchMirrored(streams []*Stream) ([]*Stream, error) {
if !q.mirroredIsSet {
return streams, nil
Expand Down

0 comments on commit efe46cb

Please sign in to comment.