diff --git a/README.md b/README.md index 0fe4d96..da5f4bd 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,11 @@ $ protokaf consume HelloRequest -G mygroup -t test $ protokaf consume HelloRequest -G mygroup -t test -c 10 ``` +**Read from offset `5` messages from `test` topic** +```sh +$ protokaf consume HelloRequest -G mygroup -t test -o 5 +``` + ## Testing ### Prepare test environment diff --git a/cmd/cmd_consume.go b/cmd/cmd_consume.go index d80c845..ff5e0db 100644 --- a/cmd/cmd_consume.go +++ b/cmd/cmd_consume.go @@ -3,7 +3,7 @@ package cmd import ( "context" "errors" - + "fmt" "github.com/SberMarket-Tech/protokaf/internal/kafka" "github.com/SberMarket-Tech/protokaf/internal/utils/dump" "github.com/Shopify/sarama" @@ -11,6 +11,13 @@ import ( "github.com/jhump/protoreflect/dynamic" "github.com/spf13/cobra" "github.com/spf13/viper" + "strconv" + "sync" +) + +var ( + ErrInvalidOffset = errors.New("invalid offset format") + ErrOffsetNotSet = errors.New("offset not set") ) func NewConsumeCmd() *cobra.Command { @@ -19,6 +26,7 @@ func NewConsumeCmd() *cobra.Command { topicsFlag []string countFlag int noCommit bool + offset string ) cmd := &cobra.Command{ @@ -41,6 +49,7 @@ func NewConsumeCmd() *cobra.Command { if noCommit { kafkaConfig.Consumer.Offsets.AutoCommit.Enable = false } + // consumer consumer, err := kafka.NewConsumerGroup(viper.GetStringSlice("broker"), groupFlag, kafkaConfig) if err != nil { @@ -58,9 +67,18 @@ func NewConsumeCmd() *cobra.Command { handler := &protoHandler{ MaxCount: countFlag, desc: md, + topic: topicsFlag[0], + } + + // set offset + offsetsArg, err := parseOffsetFlag(offset) + if err != nil && !errors.Is(err, ErrOffsetNotSet) { + log.Errorf("Failed to parse offset: %s", err) + return } + handler.offset = offsetsArg - err := consumer.Consume(context.Background(), topicsFlag, handler) + err = consumer.Consume(context.Background(), topicsFlag, handler) if handler.maximumReached() { log.Debugf("Message consuming limit reached: %d", countFlag) @@ -91,6 +109,7 @@ func NewConsumeCmd() *cobra.Command { flags.StringSliceVarP(&topicsFlag, "topic", "t", []string{}, "Topic to consume from") flags.IntVarP(&countFlag, "count", "c", 0, "Exit after consuming this number of messages") flags.BoolVar(&noCommit, "no-commit", false, "Consume messages without commiting offset") + flags.StringVarP(&offset, "offset", "o", "", "Start consuming from this offset (default: newest)") _ = cmd.MarkFlagRequired("group") _ = cmd.MarkFlagRequired("topic") @@ -98,12 +117,43 @@ func NewConsumeCmd() *cobra.Command { return cmd } +func parseOffsetFlag(offsetsFlag string) (offset int64, err error) { + if offsetsFlag == "" { + return -1, ErrOffsetNotSet + } + intOffst, err := strconv.ParseInt(offsetsFlag, 10, 64) + if err != nil { + return -1, fmt.Errorf("error with offset '%s': %w", offsetsFlag, ErrInvalidOffset) + } + if intOffst < 0 { + return -1, fmt.Errorf("error negative offset '%s': %w", offsetsFlag, ErrInvalidOffset) + } + return intOffst, nil +} + type protoHandler struct { desc *desc.MessageDescriptor MaxCount, counter int + topic string + partition int32 + offset int64 +} + +var once sync.Once + +func (p protoHandler) Setup(sess sarama.ConsumerGroupSession) error { + once.Do(func() { + if partFromFlags := flags.Partition; partFromFlags > 0 { + p.partition = partFromFlags + } + + if p.offset >= 0 { + sess.ResetOffset(p.topic, p.partition, p.offset, "") + } + }) + return nil } -func (protoHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (protoHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } // ErrMaximumReached error if limit reached diff --git a/cmd/cmd_consume_test.go b/cmd/cmd_consume_test.go index 2a6455a..7358c2f 100644 --- a/cmd/cmd_consume_test.go +++ b/cmd/cmd_consume_test.go @@ -3,14 +3,40 @@ package cmd import ( "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_NewConsumeCmd_NoTopicFlags(t *testing.T) { - cmd := NewProduceCmd() + cmd := NewConsumeCmd() cmd.SetArgs([]string{"HelloRequest"}) _, _, err := getCommandOut(t, cmd) - assert.Contains(t, err.Error(), `required flag(s) "topic" not set`) + require.Contains(t, err.Error(), `required flag(s) "group", "topic" not set`) +} + +func Test_parseOffsetsFlag(t *testing.T) { + t.Run("offset happy case", func(t *testing.T) { + offset, err := parseOffsetFlag("1") + require.Nil(t, err) + require.EqualValues(t, 1, offset) + }) + + t.Run("error when offset is not a number", func(t *testing.T) { + v, err := parseOffsetFlag("fdgdfg") + require.ErrorIs(t, err, ErrInvalidOffset) + require.EqualValues(t, -1, v) + }) + + t.Run("error empty offset", func(t *testing.T) { + v, err := parseOffsetFlag("") + require.ErrorIs(t, err, ErrOffsetNotSet) + require.EqualValues(t, -1, v) + }) + + t.Run("error negative offset", func(t *testing.T) { + v, err := parseOffsetFlag("-1") + require.ErrorIs(t, err, ErrInvalidOffset) + require.EqualValues(t, -1, v) + }) }