Skip to content

Commit

Permalink
rabbit option
Browse files Browse the repository at this point in the history
  • Loading branch information
pinguo-yangbing committed Sep 3, 2020
1 parent 6616166 commit 7d4efc5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 5 deletions.
30 changes: 27 additions & 3 deletions adapter/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pinguo/pgo2/client/rabbitmq"
"github.com/pinguo/pgo2/iface"
"github.com/pinguo/pgo2/util"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -110,14 +111,14 @@ func (r *RabbitMq) Publish(opCode string, data interface{}, dftOpUid ...string)
opUid = dftOpUid[0]
}

res, err := r.client.Publish(&rabbitmq.PublishData{ OpCode: opCode, Data: data, OpUid: opUid}, r.Context().LogId())
res, err := r.client.Publish(&rabbitmq.PublishData{ OpCode: opCode, Data: data, OpUid: opUid, DeliveryMode:amqp.Persistent}, r.Context().LogId())
panicErr(err)

return res
}

func (r *RabbitMq) PublishExchange(serviceName, exchangeName, exchangeType, opCode string, data interface{}, dftOpUid ...string) bool {
profile := "rabbit.Publish"
profile := "rabbit.PublishExchange"
r.Context().ProfileStart(profile)
defer r.Context().ProfileStop(profile)
defer r.handlePanic()
Expand All @@ -127,7 +128,30 @@ func (r *RabbitMq) PublishExchange(serviceName, exchangeName, exchangeType, opCo
opUid = dftOpUid[0]
}

res, err := r.client.Publish(&rabbitmq.PublishData{ServiceName: serviceName,
res, err := r.client.Publish(&rabbitmq.PublishData{ServiceName: serviceName,DeliveryMode:amqp.Persistent,
ExChange: &rabbitmq.ExchangeData{Name: exchangeName, Type: exchangeType, Durable:true},
OpCode: opCode, Data: data, OpUid: opUid}, r.Context().LogId())
panicErr(err)

return res
}

func (r *RabbitMq) PublishExchangeDelivery(serviceName, exchangeName, exchangeType, opCode string, data interface{},deliveryMode uint8, dftOpUid ...string) bool {
profile := "rabbit.PublishExchangeDelivery"
r.Context().ProfileStart(profile)
defer r.Context().ProfileStop(profile)
defer r.handlePanic()

opUid := ""
if len(dftOpUid) > 0 {
opUid = dftOpUid[0]
}

if deliveryMode > amqp.Persistent || deliveryMode < 0 {
panicErr(errors.New("Invalid deliveryMode"))
}

res, err := r.client.Publish(&rabbitmq.PublishData{ServiceName: serviceName,DeliveryMode:deliveryMode,
ExChange: &rabbitmq.ExchangeData{Name: exchangeName, Type: exchangeType, Durable:true},
OpCode: opCode, Data: data, OpUid: opUid}, r.Context().LogId())
panicErr(err)
Expand Down
1 change: 1 addition & 0 deletions client/rabbitmq/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ StartPublish:
Body: goBytes.Bytes(),
Headers: amqp.Table{"logId": logId, "service": c.ServiceName(parameter.ServiceName), "opUid": parameter.OpUid},
Timestamp: time.Now(),
DeliveryMode:parameter.DeliveryMode,
})

if err == nil {
Expand Down
1 change: 1 addition & 0 deletions client/rabbitmq/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ExchangeData struct {

// rabbit 发布结构
type PublishData struct {
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
ServiceName string // 服务名
ExChange *ExchangeData
OpCode string // 操作code 和queue绑定相关
Expand Down
4 changes: 2 additions & 2 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (o *Object) GetObj(obj iface.IObject) iface.IObject {
}

// GetObjPool Get Object from pool
// Deprecated: Use GetObjBox instead.
// Recommended: Use GetObjBox instead.
func (o *Object) GetObjPool(className string, funcName iface.IObjPoolFunc, params ...interface{}) iface.IObject {
return o.GetObjPoolCtx(o.Context(), className, funcName, params...)
}
Expand All @@ -49,7 +49,7 @@ func (o *Object) GetObjCtx(ctx iface.IContext, obj iface.IObject) iface.IObject
}

// GetObjPoolCtx Get Object from pool and new Context
// Deprecated: Use GetObjBoxCtx instead.
// Recommended: Use GetObjBoxCtx instead.
func (o *Object) GetObjPoolCtx(ctx iface.IContext, className string, funcName iface.IObjPoolFunc, params ...interface{}) iface.IObject {
var obj iface.IObject
if funcName != nil {
Expand Down

0 comments on commit 7d4efc5

Please sign in to comment.