Skip to content

Commit

Permalink
Successfully connect to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
swhsiang committed Sep 13, 2017
1 parent ff72016 commit ccc2614
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
46 changes: 46 additions & 0 deletions database/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package database

import (
"log"

kafka "github.com/Shopify/sarama"
// "github.com/hyperpilotio/ingestor/config"
// "github.com/hyperpilotio/ingestor/log"
)

var producer kafka.SyncProducer

func init() {
// FIXME pass configuration into this function
producer = newKafkaProducer()
}

// func newKafkaProducer(config config.Provider) {}
func newKafkaProducer() kafka.SyncProducer {
// FIXME Change the fixed value
producer, err := kafka.NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
return producer
}

func Producer() *kafka.SyncProducer {
return &producer
}

func CloseProducer() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}

func Send(topic, value string) {
msg := &kafka.ProducerMessage{Topic: topic, Value: kafka.StringEncoder(value)}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
} else {
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
}
20 changes: 19 additions & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ccc2614

Please sign in to comment.