From 7d4efc573ec9fc8c2f9177f16f87ddd5c1638125 Mon Sep 17 00:00:00 2001 From: yangbing Date: Thu, 3 Sep 2020 09:22:52 +0800 Subject: [PATCH] rabbit option --- adapter/rabbitmq.go | 30 +++++++++++++++++++++++++++--- client/rabbitmq/client.go | 1 + client/rabbitmq/init.go | 1 + object.go | 4 ++-- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/adapter/rabbitmq.go b/adapter/rabbitmq.go index 09b0b67..a7d41ff 100644 --- a/adapter/rabbitmq.go +++ b/adapter/rabbitmq.go @@ -7,6 +7,7 @@ import ( "github.com/pinguo/pgo2/client/rabbitmq" "github.com/pinguo/pgo2/iface" "github.com/pinguo/pgo2/util" + "github.com/pkg/errors" "github.com/streadway/amqp" ) @@ -110,14 +111,14 @@ func (r *RabbitMq) Publish(opCode string, data interface{}, dftOpUid ...string) opUid = dftOpUid[0] } - res, err := r.client.Publish(&rabbitmq.PublishData{ OpCode: opCode, Data: data, OpUid: opUid}, r.Context().LogId()) + res, err := r.client.Publish(&rabbitmq.PublishData{ OpCode: opCode, Data: data, OpUid: opUid, DeliveryMode:amqp.Persistent}, r.Context().LogId()) panicErr(err) return res } func (r *RabbitMq) PublishExchange(serviceName, exchangeName, exchangeType, opCode string, data interface{}, dftOpUid ...string) bool { - profile := "rabbit.Publish" + profile := "rabbit.PublishExchange" r.Context().ProfileStart(profile) defer r.Context().ProfileStop(profile) defer r.handlePanic() @@ -127,7 +128,30 @@ func (r *RabbitMq) PublishExchange(serviceName, exchangeName, exchangeType, opCo opUid = dftOpUid[0] } - res, err := r.client.Publish(&rabbitmq.PublishData{ServiceName: serviceName, + res, err := r.client.Publish(&rabbitmq.PublishData{ServiceName: serviceName,DeliveryMode:amqp.Persistent, + ExChange: &rabbitmq.ExchangeData{Name: exchangeName, Type: exchangeType, Durable:true}, + OpCode: opCode, Data: data, OpUid: opUid}, r.Context().LogId()) + panicErr(err) + + return res +} + +func (r *RabbitMq) PublishExchangeDelivery(serviceName, exchangeName, exchangeType, opCode string, data interface{},deliveryMode uint8, dftOpUid ...string) bool { + profile := "rabbit.PublishExchangeDelivery" + r.Context().ProfileStart(profile) + defer r.Context().ProfileStop(profile) + defer r.handlePanic() + + opUid := "" + if len(dftOpUid) > 0 { + opUid = dftOpUid[0] + } + + if deliveryMode > amqp.Persistent || deliveryMode < 0 { + panicErr(errors.New("Invalid deliveryMode")) + } + + res, err := r.client.Publish(&rabbitmq.PublishData{ServiceName: serviceName,DeliveryMode:deliveryMode, ExChange: &rabbitmq.ExchangeData{Name: exchangeName, Type: exchangeType, Durable:true}, OpCode: opCode, Data: data, OpUid: opUid}, r.Context().LogId()) panicErr(err) diff --git a/client/rabbitmq/client.go b/client/rabbitmq/client.go index c6d2f4d..2a2453d 100644 --- a/client/rabbitmq/client.go +++ b/client/rabbitmq/client.go @@ -144,6 +144,7 @@ StartPublish: Body: goBytes.Bytes(), Headers: amqp.Table{"logId": logId, "service": c.ServiceName(parameter.ServiceName), "opUid": parameter.OpUid}, Timestamp: time.Now(), + DeliveryMode:parameter.DeliveryMode, }) if err == nil { diff --git a/client/rabbitmq/init.go b/client/rabbitmq/init.go index a8764f1..b308f7c 100644 --- a/client/rabbitmq/init.go +++ b/client/rabbitmq/init.go @@ -41,6 +41,7 @@ type ExchangeData struct { // rabbit 发布结构 type PublishData struct { + DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) ServiceName string // 服务名 ExChange *ExchangeData OpCode string // 操作code 和queue绑定相关 diff --git a/object.go b/object.go index f07ed40..10de848 100644 --- a/object.go +++ b/object.go @@ -25,7 +25,7 @@ func (o *Object) GetObj(obj iface.IObject) iface.IObject { } // GetObjPool Get Object from pool -// Deprecated: Use GetObjBox instead. +// Recommended: Use GetObjBox instead. func (o *Object) GetObjPool(className string, funcName iface.IObjPoolFunc, params ...interface{}) iface.IObject { return o.GetObjPoolCtx(o.Context(), className, funcName, params...) } @@ -49,7 +49,7 @@ func (o *Object) GetObjCtx(ctx iface.IContext, obj iface.IObject) iface.IObject } // GetObjPoolCtx Get Object from pool and new Context -// Deprecated: Use GetObjBoxCtx instead. +// Recommended: Use GetObjBoxCtx instead. func (o *Object) GetObjPoolCtx(ctx iface.IContext, className string, funcName iface.IObjPoolFunc, params ...interface{}) iface.IObject { var obj iface.IObject if funcName != nil {