diff --git a/publisher.go b/publisher.go index 8166a10..a2c6d5d 100644 --- a/publisher.go +++ b/publisher.go @@ -17,6 +17,7 @@ type PublisherOpt func(*Publisher) type publishMaybeErr struct { pub chan amqp.Publishing err chan error + key string } // Publisher hold definition for AMQP publishing @@ -43,14 +44,15 @@ func (p *Publisher) Write(b []byte) (int, error) { return len(b), p.Publish(pub) } -// Publish used to publish custom amqp.Publishing +// PublishWithRoutingKey used to publish custom amqp.Publishing and routing key // // WARNING: this is blocking call, it will not return until connection is // available. The only way to stop it is to use Cancel() method. -func (p *Publisher) Publish(pub amqp.Publishing) error { +func (p *Publisher) PublishWithRoutingKey(pub amqp.Publishing, key string) error { reqRepl := publishMaybeErr{ pub: make(chan amqp.Publishing, 2), err: make(chan error, 2), + key: key, } reqRepl.pub <- pub @@ -66,6 +68,14 @@ func (p *Publisher) Publish(pub amqp.Publishing) error { return err } +// Publish used to publish custom amqp.Publishing +// +// WARNING: this is blocking call, it will not return until connection is +// available. The only way to stop it is to use Cancel() method. +func (p *Publisher) Publish(pub amqp.Publishing) error { + return p.PublishWithRoutingKey(pub, p.key) +} + // Cancel this publisher func (p *Publisher) Cancel() { p.m.Lock() @@ -93,11 +103,11 @@ func (p *Publisher) serve(client mqDeleter, ch mqChannel) { msg := <-envelop.pub close(envelop.pub) if err := ch.Publish( - p.exchange, // exchange - p.key, // key - false, // mandatory - false, // immediate - msg, // msg amqp.Publishing + p.exchange, // exchange + envelop.key, // key + false, // mandatory + false, // immediate + msg, // msg amqp.Publishing ); err != nil { envelop.err <- err } diff --git a/publisher_test.go b/publisher_test.go index 8029132..06e800f 100644 --- a/publisher_test.go +++ b/publisher_test.go @@ -66,11 +66,13 @@ func TestPublisher_Cancel_willNotBlock(t *testing.T) { func TestPublisher_serve(t *testing.T) { var ( - runSync = make(chan bool) - deleted bool - closed bool - notifyClose bool - testMsg *amqp.Publishing + runSync = make(chan bool) + deleted bool + closed bool + notifyClose bool + exchangeName string + routingKey string + testMsg *amqp.Publishing ) p := newTestPublisher() @@ -90,6 +92,8 @@ func TestPublisher_serve(t *testing.T) { return errChan }, _Publish: func(ex string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error { + exchangeName = ex + routingKey = key testMsg = &msg return nil }, @@ -118,6 +122,85 @@ func TestPublisher_serve(t *testing.T) { t.Error("should close channel") } + if exchangeName != "exchange.name" { + t.Error("should set correct routing key") + } + + if routingKey != "routing.key" { + t.Error("should set correct routing key") + } + + if bytes.Compare(testMsg.Body, []byte("test1")) != 0 { + t.Error("should publish correct messaged") + } +} + +func TestPublisher_serve_customRoutingKey(t *testing.T) { + var ( + runSync = make(chan bool) + deleted bool + closed bool + notifyClose bool + exchangeName string + routingKey string + testMsg *amqp.Publishing + ) + + p := newTestPublisher() + cli := &mqDeleterTest{ + _deletePublisher: func(*Publisher) { + deleted = true + }, + } + + ch1 := &mqChannelTest{ + _Close: func() error { + closed = true + return nil + }, + _NotifyClose: func(errChan chan *amqp.Error) chan *amqp.Error { + notifyClose = true + return errChan + }, + _Publish: func(ex string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error { + exchangeName = ex + routingKey = key + testMsg = &msg + return nil + }, + } + + go func() { + <-runSync + p.serve(cli, ch1) + runSync <- true + }() + + runSync <- true + p.PublishWithRoutingKey(amqp.Publishing{Body: []byte("test1")}, "my.routing.key") + p.Cancel() + <-runSync + + if !notifyClose { + t.Error("should register notifyClose") + } + + if !deleted { + t.Error("should delete publisher") + } + + if !closed { + t.Error("should close channel") + } + + if exchangeName != "exchange.name" { + t.Error("should set correct routing key") + } + + if routingKey != "my.routing.key" { + t.Error("should set correct routing key") + } + if bytes.Compare(testMsg.Body, []byte("test1")) != 0 { t.Error("should publish correct messaged") } @@ -270,5 +353,5 @@ func TestPublishingTemplate(t *testing.T) { } func newTestPublisher(opts ...PublisherOpt) *Publisher { - return NewPublisher("", "", opts...) + return NewPublisher("exchange.name", "routing.key", opts...) }