Skip to content

Commit

Permalink
Add PublishWithRoutingKey func to Publisher.
Browse files Browse the repository at this point in the history
  • Loading branch information
williambailey committed Dec 21, 2016
1 parent bede840 commit 7a992f8
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 13 deletions.
24 changes: 17 additions & 7 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type PublisherOpt func(*Publisher)
type publishMaybeErr struct {
pub chan amqp.Publishing
err chan error
key string
}

// Publisher hold definition for AMQP publishing
Expand All @@ -43,14 +44,15 @@ func (p *Publisher) Write(b []byte) (int, error) {
return len(b), p.Publish(pub)
}

// Publish used to publish custom amqp.Publishing
// PublishWithRoutingKey used to publish custom amqp.Publishing and routing key
//
// WARNING: this is blocking call, it will not return until connection is
// available. The only way to stop it is to use Cancel() method.
func (p *Publisher) Publish(pub amqp.Publishing) error {
func (p *Publisher) PublishWithRoutingKey(pub amqp.Publishing, key string) error {
reqRepl := publishMaybeErr{
pub: make(chan amqp.Publishing, 2),
err: make(chan error, 2),
key: key,
}

reqRepl.pub <- pub
Expand All @@ -66,6 +68,14 @@ func (p *Publisher) Publish(pub amqp.Publishing) error {
return err
}

// Publish used to publish custom amqp.Publishing
//
// WARNING: this is blocking call, it will not return until connection is
// available. The only way to stop it is to use Cancel() method.
func (p *Publisher) Publish(pub amqp.Publishing) error {
return p.PublishWithRoutingKey(pub, p.key)
}

// Cancel this publisher
func (p *Publisher) Cancel() {
p.m.Lock()
Expand Down Expand Up @@ -93,11 +103,11 @@ func (p *Publisher) serve(client mqDeleter, ch mqChannel) {
msg := <-envelop.pub
close(envelop.pub)
if err := ch.Publish(
p.exchange, // exchange
p.key, // key
false, // mandatory
false, // immediate
msg, // msg amqp.Publishing
p.exchange, // exchange
envelop.key, // key
false, // mandatory
false, // immediate
msg, // msg amqp.Publishing
); err != nil {
envelop.err <- err
}
Expand Down
95 changes: 89 additions & 6 deletions publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ func TestPublisher_Cancel_willNotBlock(t *testing.T) {

func TestPublisher_serve(t *testing.T) {
var (
runSync = make(chan bool)
deleted bool
closed bool
notifyClose bool
testMsg *amqp.Publishing
runSync = make(chan bool)
deleted bool
closed bool
notifyClose bool
exchangeName string
routingKey string
testMsg *amqp.Publishing
)

p := newTestPublisher()
Expand All @@ -90,6 +92,8 @@ func TestPublisher_serve(t *testing.T) {
return errChan
},
_Publish: func(ex string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error {
exchangeName = ex
routingKey = key
testMsg = &msg
return nil
},
Expand Down Expand Up @@ -118,6 +122,85 @@ func TestPublisher_serve(t *testing.T) {
t.Error("should close channel")
}

if exchangeName != "exchange.name" {
t.Error("should set correct routing key")
}

if routingKey != "routing.key" {
t.Error("should set correct routing key")
}

if bytes.Compare(testMsg.Body, []byte("test1")) != 0 {
t.Error("should publish correct messaged")
}
}

func TestPublisher_serve_customRoutingKey(t *testing.T) {
var (
runSync = make(chan bool)
deleted bool
closed bool
notifyClose bool
exchangeName string
routingKey string
testMsg *amqp.Publishing
)

p := newTestPublisher()
cli := &mqDeleterTest{
_deletePublisher: func(*Publisher) {
deleted = true
},
}

ch1 := &mqChannelTest{
_Close: func() error {
closed = true
return nil
},
_NotifyClose: func(errChan chan *amqp.Error) chan *amqp.Error {
notifyClose = true
return errChan
},
_Publish: func(ex string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error {
exchangeName = ex
routingKey = key
testMsg = &msg
return nil
},
}

go func() {
<-runSync
p.serve(cli, ch1)
runSync <- true
}()

runSync <- true
p.PublishWithRoutingKey(amqp.Publishing{Body: []byte("test1")}, "my.routing.key")
p.Cancel()
<-runSync

if !notifyClose {
t.Error("should register notifyClose")
}

if !deleted {
t.Error("should delete publisher")
}

if !closed {
t.Error("should close channel")
}

if exchangeName != "exchange.name" {
t.Error("should set correct routing key")
}

if routingKey != "my.routing.key" {
t.Error("should set correct routing key")
}

if bytes.Compare(testMsg.Body, []byte("test1")) != 0 {
t.Error("should publish correct messaged")
}
Expand Down Expand Up @@ -270,5 +353,5 @@ func TestPublishingTemplate(t *testing.T) {
}

func newTestPublisher(opts ...PublisherOpt) *Publisher {
return NewPublisher("", "", opts...)
return NewPublisher("exchange.name", "routing.key", opts...)
}

0 comments on commit 7a992f8

Please sign in to comment.