Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Declare passive #25

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions declaration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ type Declaration func(Declarer) error
// Declarer is implemented by *amqp.Channel
type Declarer interface {
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

Expand All @@ -31,6 +33,25 @@ func DeclareQueue(q *Queue) Declaration {
}
}

// DeclareQueuePassive is a way to declare AMQP queue
func DeclareQueuePassive(q *Queue) Declaration {
name := q.Name
return func(c Declarer) error {
q.Name = name
realQ, err := c.QueueDeclarePassive(q.Name,
q.Durable,
q.AutoDelete,
q.Exclusive,
false,
q.Args,
)
q.l.Lock()
q.Name = realQ.Name
q.l.Unlock()
return err
}
}

// DeclareExchange is a way to declare AMQP exchange
func DeclareExchange(e Exchange) Declaration {
return func(c Declarer) error {
Expand All @@ -45,6 +66,20 @@ func DeclareExchange(e Exchange) Declaration {
}
}

// DeclareExchange is a way to declare AMQP exchange
func DeclareExchangePassive(e Exchange) Declaration {
return func(c Declarer) error {
return c.ExchangeDeclarePassive(e.Name,
e.Kind,
e.Durable,
e.AutoDelete,
false,
false,
e.Args,
)
}
}

// DeclareBinding is a way to declare AMQP binding between AMQP queue and exchange
func DeclareBinding(b Binding) Declaration {
return func(c Declarer) error {
Expand Down
74 changes: 71 additions & 3 deletions declaration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,33 @@ import (
)

type testDeclarer struct {
_QueueDeclare func(string) (amqp.Queue, error)
_ExchangeDeclare func() error
_QueueBind func() error
_QueueDeclare func(string) (amqp.Queue, error)
_QueueDeclarePassive func(string) (amqp.Queue, error)
_ExchangeDeclare func() error
_ExchangeDeclarePassive func() error
_QueueBind func() error
}

func (td *testDeclarer) QueueDeclare(name string, durable, autoDelete,
exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
return td._QueueDeclare(name)
}

func (td *testDeclarer) QueueDeclarePassive(name string, durable, autoDelete,
exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
return td._QueueDeclarePassive(name)
}

func (td *testDeclarer) ExchangeDeclare(name, kind string, durable, autoDelete,
internal, noWait bool, args amqp.Table) error {
return td._ExchangeDeclare()
}

func (td *testDeclarer) ExchangeDeclarePassive(name, kind string, durable, autoDelete,
internal, noWait bool, args amqp.Table) error {
return td._ExchangeDeclarePassive()
}

func (td *testDeclarer) QueueBind(name, key, exchange string, noWait bool,
args amqp.Table) error {
return td._QueueBind()
Expand Down Expand Up @@ -64,6 +76,43 @@ func TestDeclareQueue(t *testing.T) {
}
}

func TestDeclareQueuePassive(t *testing.T) {
var (
callOK, nameOK bool
)

q := &Queue{
Name: "Q1 Passive",
}

td := &testDeclarer{
_QueueDeclarePassive: func(name string) (amqp.Queue, error) {
callOK = true
if name == "Q1 Passive" {
nameOK = true
}
return amqp.Queue{Name: "Q1_REAL_PASSIVE"}, nil
},
}

testDec := DeclareQueuePassive(q)
testDec(td)

if !callOK {
t.Error("DeclareQueuePassive() should call declarer.QueueDeclarePassive()")
}

if q.Name != "Q1_REAL_PASSIVE" {
t.Error("DeclareQueuePassive() should update queue name from AMQP reply")
}

// call it another time (like reconnect event happened)
testDec(td)
if !nameOK {
t.Error("queue name should be preserved")
}
}

func TestDeclareExchange(t *testing.T) {
var ok bool

Expand All @@ -83,6 +132,25 @@ func TestDeclareExchange(t *testing.T) {
}
}

func TestDeclareExchangePassive(t *testing.T) {
var ok bool

e := Exchange{Name: "ex1passive"}

td := &testDeclarer{
_ExchangeDeclarePassive: func() error {
ok = true
return nil
},
}

DeclareExchangePassive(e)(td)

if !ok {
t.Error("DeclareExchangePassive() should call declarer.ExchangeDeclarePassive()")
}
}

func TestDeclareBinding(t *testing.T) {
var ok bool

Expand Down