diff --git a/.gitignore b/.gitignore index eba47bc..7acf899 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,5 @@ svault/ !LICENSE # Ignore files -checklist.md \ No newline at end of file +checklist.md +Links.MD diff --git a/Makefile b/Makefile index 27e6ed9..f87c001 100644 --- a/Makefile +++ b/Makefile @@ -8,10 +8,10 @@ run: go build && ./rabbitmq-benchmark run_p: - go build && ./rabbitmq-benchmark -t 1 -r producer -f 1000 + go build && ./rabbitmq-benchmark -t 2 -r producer -debug -f 1000 run_c: - go build && ./rabbitmq-benchmark -t 1 -r consumer -debug + go build && ./rabbitmq-benchmark -t 5 -r consumer -debug start_mq: docker-compose up -d diff --git a/queue/consumer.go b/queue/consumer.go index f77370f..1d65706 100644 --- a/queue/consumer.go +++ b/queue/consumer.go @@ -51,12 +51,10 @@ func ConsumerMQ(cfg utils.ConfigStore) { for { select { case <-notify: - fmt.Println("Detects connection failuer, retring..") + log.Println("Detects connection failuer, retrying..") ch, q, notify, msgs = failuerRetry(cfg) case d = <-msgs: - if cfg.EnableDebug { - log.Printf("consumer message: %s\n", d.Body) - } + utils.DebugLogging(fmt.Sprintf("Consumer message:%s\n", d.Body), cfg.EnableDebug) // Manual ack for consumed messages d.Ack(false) } @@ -70,8 +68,9 @@ func failuerRetry(cfg utils.ConfigStore) (*amqp.Channel, amqp.Queue, chan *amqp. if err != nil { log.Println("Sleep 15 sec before retrying the publish") time.Sleep(15 * time.Second) + } else { - fmt.Println("Reconnection is successful.") + log.Println("Reconnection is successful.") msgs, _ := consumer(ch, q, cfg.EnableQuorum) return ch, q, notify, msgs } diff --git a/queue/producer.go b/queue/producer.go index 19058f2..79dc3ca 100644 --- a/queue/producer.go +++ b/queue/producer.go @@ -11,7 +11,7 @@ import ( ) // publisher Publish messages -func publisher(message string, ch *amqp.Channel, q amqp.Queue) error { +func publisher(message string, ch *amqp.Channel, q amqp.Queue, endebug bool) error { err := ch.Publish( "", // exchange @@ -27,21 +27,23 @@ func publisher(message string, ch *amqp.Channel, q amqp.Queue) error { return err } - fmt.Println("published message: " + message) + utils.DebugLogging(fmt.Sprintf("published message: %s\n", message), endebug) return nil } // PublishMQ worker func func PublishMQ(cfg utils.ConfigStore) { + ch, q, _, err := InitRabbitMQ(cfg.RabbitURL, cfg.QueueName, cfg.EnableQuorum) if err != nil { log.Fatalf("%s: %s", "Failed to publish a message", err) } + for { if (ch != nil && q != amqp.Queue{}) { rand.Seed(time.Now().UnixNano()) - err = publisher(utils.RandString(cfg.MsgSize), ch, q) + err = publisher(utils.RandString(cfg.MsgSize), ch, q, cfg.EnableDebug) time.Sleep(time.Duration(cfg.TimeFrequencyMS) * time.Millisecond) } diff --git a/utils/logdebug.go b/utils/logdebug.go new file mode 100644 index 0000000..0abdd91 --- /dev/null +++ b/utils/logdebug.go @@ -0,0 +1,10 @@ +package utils + +import "log" + +// DebugLogging helper func to log debug +func DebugLogging(logstring string, enableDebug bool) { + if enableDebug { + log.Printf(logstring) + } +}