Skip to content

Commit

Permalink
add ability to specify offsets for consumer (#10)
Browse files Browse the repository at this point in the history
Add -offset (-o) flag: start consuming from given offset
  • Loading branch information
vadiminshakov authored Aug 28, 2023
1 parent 5f1d902 commit 5dbdefb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ $ protokaf consume HelloRequest -G mygroup -t test
$ protokaf consume HelloRequest -G mygroup -t test -c 10
```

**Read from offset `5` messages from `test` topic**
```sh
$ protokaf consume HelloRequest -G mygroup -t test -o 5
```

## Testing

### Prepare test environment
Expand Down
56 changes: 53 additions & 3 deletions cmd/cmd_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ package cmd
import (
"context"
"errors"

"fmt"
"github.com/SberMarket-Tech/protokaf/internal/kafka"
"github.com/SberMarket-Tech/protokaf/internal/utils/dump"
"github.com/Shopify/sarama"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"strconv"
"sync"
)

var (
ErrInvalidOffset = errors.New("invalid offset format")
ErrOffsetNotSet = errors.New("offset not set")
)

func NewConsumeCmd() *cobra.Command {
Expand All @@ -19,6 +26,7 @@ func NewConsumeCmd() *cobra.Command {
topicsFlag []string
countFlag int
noCommit bool
offset string
)

cmd := &cobra.Command{
Expand All @@ -41,6 +49,7 @@ func NewConsumeCmd() *cobra.Command {
if noCommit {
kafkaConfig.Consumer.Offsets.AutoCommit.Enable = false
}

// consumer
consumer, err := kafka.NewConsumerGroup(viper.GetStringSlice("broker"), groupFlag, kafkaConfig)
if err != nil {
Expand All @@ -58,9 +67,18 @@ func NewConsumeCmd() *cobra.Command {
handler := &protoHandler{
MaxCount: countFlag,
desc: md,
topic: topicsFlag[0],
}

// set offset
offsetsArg, err := parseOffsetFlag(offset)
if err != nil && !errors.Is(err, ErrOffsetNotSet) {
log.Errorf("Failed to parse offset: %s", err)
return
}
handler.offset = offsetsArg

err := consumer.Consume(context.Background(), topicsFlag, handler)
err = consumer.Consume(context.Background(), topicsFlag, handler)

if handler.maximumReached() {
log.Debugf("Message consuming limit reached: %d", countFlag)
Expand Down Expand Up @@ -91,19 +109,51 @@ func NewConsumeCmd() *cobra.Command {
flags.StringSliceVarP(&topicsFlag, "topic", "t", []string{}, "Topic to consume from")
flags.IntVarP(&countFlag, "count", "c", 0, "Exit after consuming this number of messages")
flags.BoolVar(&noCommit, "no-commit", false, "Consume messages without commiting offset")
flags.StringVarP(&offset, "offset", "o", "", "Start consuming from this offset (default: newest)")

_ = cmd.MarkFlagRequired("group")
_ = cmd.MarkFlagRequired("topic")

return cmd
}

func parseOffsetFlag(offsetsFlag string) (offset int64, err error) {
if offsetsFlag == "" {
return -1, ErrOffsetNotSet
}
intOffst, err := strconv.ParseInt(offsetsFlag, 10, 64)
if err != nil {
return -1, fmt.Errorf("error with offset '%s': %w", offsetsFlag, ErrInvalidOffset)
}
if intOffst < 0 {
return -1, fmt.Errorf("error negative offset '%s': %w", offsetsFlag, ErrInvalidOffset)
}
return intOffst, nil
}

type protoHandler struct {
desc *desc.MessageDescriptor
MaxCount, counter int
topic string
partition int32
offset int64
}

var once sync.Once

func (p protoHandler) Setup(sess sarama.ConsumerGroupSession) error {
once.Do(func() {
if partFromFlags := flags.Partition; partFromFlags > 0 {
p.partition = partFromFlags
}

if p.offset >= 0 {
sess.ResetOffset(p.topic, p.partition, p.offset, "")
}
})
return nil
}

func (protoHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (protoHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

// ErrMaximumReached error if limit reached
Expand Down
32 changes: 29 additions & 3 deletions cmd/cmd_consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,40 @@ package cmd
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_NewConsumeCmd_NoTopicFlags(t *testing.T) {
cmd := NewProduceCmd()
cmd := NewConsumeCmd()
cmd.SetArgs([]string{"HelloRequest"})

_, _, err := getCommandOut(t, cmd)

assert.Contains(t, err.Error(), `required flag(s) "topic" not set`)
require.Contains(t, err.Error(), `required flag(s) "group", "topic" not set`)
}

func Test_parseOffsetsFlag(t *testing.T) {
t.Run("offset happy case", func(t *testing.T) {
offset, err := parseOffsetFlag("1")
require.Nil(t, err)
require.EqualValues(t, 1, offset)
})

t.Run("error when offset is not a number", func(t *testing.T) {
v, err := parseOffsetFlag("fdgdfg")
require.ErrorIs(t, err, ErrInvalidOffset)
require.EqualValues(t, -1, v)
})

t.Run("error empty offset", func(t *testing.T) {
v, err := parseOffsetFlag("")
require.ErrorIs(t, err, ErrOffsetNotSet)
require.EqualValues(t, -1, v)
})

t.Run("error negative offset", func(t *testing.T) {
v, err := parseOffsetFlag("-1")
require.ErrorIs(t, err, ErrInvalidOffset)
require.EqualValues(t, -1, v)
})
}

0 comments on commit 5dbdefb

Please sign in to comment.