Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Key, Partition Offset, and Headers in Kafka Source Plugin #3491

Open
wduczkowski opened this issue Jan 3, 2025 · 0 comments
Labels
good first issue Good for newcomers kind/feature New feature or request

Comments

@wduczkowski
Copy link

Currently, the Kafka source plugin does not retrieve key, partition, offset or header information from Kafka messages, even though the underlying library (github.com/segmentio/kafka-go) already supports these parameters. This omission prevents users from accessing important metadata that can be crucial for debugging, routing, or processing logic.

Proposed Solution:

Extend the existing Kafka source plugin to include the retrieval of:

  • Message Key
  • Partition
  • Offset
  • Headers
  • Topic

Update any existing data structures or interfaces within the plugin to store and return these additional metadata fields.
Ensure that backward compatibility is preserved if the plugin is already being used without this metadata.

Example Reference: Below is a sample code snippet using kafka-go that demonstrates how to read the topic, partition, offset, key, value and headers:

import (
	"context"
	"fmt"
	"log"
	"os"
	"strings"

	kafka "github.com/segmentio/kafka-go"
)

func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
	brokers := strings.Split(kafkaURL, ",")
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		GroupID:  groupID,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
}

func main() {
	kafkaURL := os.Getenv("kafkaURL")
	topic := os.Getenv("topic")
	groupID := os.Getenv("groupID")

	reader := getKafkaReader(kafkaURL, topic, groupID)
	defer reader.Close()

	fmt.Println("start consuming ... !!")
	for {
		m, err := reader.ReadMessage(context.Background())
		if err != nil {
			log.Fatalln(err)
		}
		fmt.Printf("message at topic:%v partition:%v offset:%v  %s = %s  headers:%v\n",
			m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), m.Headers)
	}
}

Event on kafka topic:

key: dev000001
header : {
     "app.id": "best-app-ever"
}
value: { "temperature":32.7063, "createTime":1735896706186, "correlationId":"bb889204-4ad7-4652-b18a-bc465592302d"} 

output:

consumer-logger-1  | message at topic:connect.edge.raw.temperature partition:0 offset:31106     dev000001 = {"temperature":32.7063,"createTime":1735896706186,"correlationId":"bb889204-4ad7-4652-b18a-bc465592302d"}  headers:[{app.id best-app-ever}]

@ngjaying ngjaying added kind/feature New feature or request good first issue Good for newcomers labels Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers kind/feature New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants