diff --git a/declaration.go b/declaration.go index ea5784a..1b5ad60 100644 --- a/declaration.go +++ b/declaration.go @@ -1,6 +1,10 @@ package cony -import "github.com/streadway/amqp" +import ( + "errors" + + "github.com/streadway/amqp" +) // Declaration is a callback type to declare AMQP queue/exchange/binding type Declaration func(Declarer) error @@ -12,6 +16,12 @@ type Declarer interface { QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error } +// DeclarerPassive is implemented by *amqp.Channel +type DeclarerPassive interface { + QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) + ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error +} + // DeclareQueue is a way to declare AMQP queue func DeclareQueue(q *Queue) Declaration { name := q.Name @@ -31,6 +41,30 @@ func DeclareQueue(q *Queue) Declaration { } } +// DeclareQueuePassive is a way to declare AMQP queue +func DeclareQueuePassive(q *Queue) Declaration { + name := q.Name + return func(c Declarer) error { + cp, found := c.(DeclarerPassive) + if !found { + return errors.New("Type not found.") + } + + q.Name = name + realQ, err := cp.QueueDeclarePassive(q.Name, + q.Durable, + q.AutoDelete, + q.Exclusive, + false, + q.Args, + ) + q.l.Lock() + q.Name = realQ.Name + q.l.Unlock() + return err + } +} + // DeclareExchange is a way to declare AMQP exchange func DeclareExchange(e Exchange) Declaration { return func(c Declarer) error { @@ -45,6 +79,25 @@ func DeclareExchange(e Exchange) Declaration { } } +// DeclareExchange is a way to declare AMQP exchange +func DeclareExchangePassive(e Exchange) Declaration { + return func(c Declarer) error { + cp, found := c.(DeclarerPassive) + if !found { + return errors.New("Type not found.") + } + + return cp.ExchangeDeclarePassive(e.Name, + e.Kind, + e.Durable, + e.AutoDelete, + false, + false, + e.Args, + ) + } +} + // DeclareBinding is a way to declare AMQP binding between AMQP queue and exchange func DeclareBinding(b Binding) Declaration { return func(c Declarer) error { diff --git a/declaration_test.go b/declaration_test.go index 776d120..493cb47 100644 --- a/declaration_test.go +++ b/declaration_test.go @@ -7,9 +7,11 @@ import ( ) type testDeclarer struct { - _QueueDeclare func(string) (amqp.Queue, error) - _ExchangeDeclare func() error - _QueueBind func() error + _QueueDeclare func(string) (amqp.Queue, error) + _QueueDeclarePassive func(string) (amqp.Queue, error) + _ExchangeDeclare func() error + _ExchangeDeclarePassive func() error + _QueueBind func() error } func (td *testDeclarer) QueueDeclare(name string, durable, autoDelete, @@ -17,11 +19,21 @@ func (td *testDeclarer) QueueDeclare(name string, durable, autoDelete, return td._QueueDeclare(name) } +func (td *testDeclarer) QueueDeclarePassive(name string, durable, autoDelete, + exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { + return td._QueueDeclarePassive(name) +} + func (td *testDeclarer) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { return td._ExchangeDeclare() } +func (td *testDeclarer) ExchangeDeclarePassive(name, kind string, durable, autoDelete, + internal, noWait bool, args amqp.Table) error { + return td._ExchangeDeclarePassive() +} + func (td *testDeclarer) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error { return td._QueueBind() @@ -64,6 +76,43 @@ func TestDeclareQueue(t *testing.T) { } } +func TestDeclareQueuePassive(t *testing.T) { + var ( + callOK, nameOK bool + ) + + q := &Queue{ + Name: "Q1 Passive", + } + + td := &testDeclarer{ + _QueueDeclarePassive: func(name string) (amqp.Queue, error) { + callOK = true + if name == "Q1 Passive" { + nameOK = true + } + return amqp.Queue{Name: "Q1_REAL_PASSIVE"}, nil + }, + } + + testDec := DeclareQueuePassive(q) + testDec(td) + + if !callOK { + t.Error("DeclareQueuePassive() should call declarer.QueueDeclarePassive()") + } + + if q.Name != "Q1_REAL_PASSIVE" { + t.Error("DeclareQueuePassive() should update queue name from AMQP reply") + } + + // call it another time (like reconnect event happened) + testDec(td) + if !nameOK { + t.Error("queue name should be preserved") + } +} + func TestDeclareExchange(t *testing.T) { var ok bool @@ -83,6 +132,25 @@ func TestDeclareExchange(t *testing.T) { } } +func TestDeclareExchangePassive(t *testing.T) { + var ok bool + + e := Exchange{Name: "ex1passive"} + + td := &testDeclarer{ + _ExchangeDeclarePassive: func() error { + ok = true + return nil + }, + } + + DeclareExchangePassive(e)(td) + + if !ok { + t.Error("DeclareExchangePassive() should call declarer.ExchangeDeclarePassive()") + } +} + func TestDeclareBinding(t *testing.T) { var ok bool