diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 41ab183..c449961 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -260,6 +260,7 @@ type TxProvider interface { //TxOutbox abstracts the transactional outgoing channel type type TxOutbox interface { + Logged Save(tx *sql.Tx, exchange, routingKey string, amqpMessage amqp.Publishing) error Start(amqpOut *AMQPOutbox) error Stop() error diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 60583b5..a00ae86 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -89,6 +89,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } } gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) + gb.Outbox.SetLogger(gb.Log()) timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup) default: @@ -107,6 +108,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { } } glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager) + glue.SetLogger(gb.Log()) gb.Glue = glue return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index c01bb88..eaa4708 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -229,6 +229,7 @@ func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gb })) exchange, routingKey := invocation.Routing() + instance.logger = imsm.Log() err := instance.invoke(exchange, routingKey, sginv, message) if err != nil { span.LogFields(slog.Error(err)) @@ -321,6 +322,8 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider, timeoutManager: timeoutManager, } + logged := &gbus.Glogged{} + g.Glogged = logged timeoutManager.SetTimeoutFunction(g.TimeoutSaga) return g } diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 157b93a..ef0c074 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -19,7 +19,7 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair - Log logrus.FieldLogger + logger logrus.FieldLogger /* Will hold the service name that sent the command or event that started the saga */ @@ -36,6 +36,14 @@ type Instance struct { StartedByRPCID string } +func (si *Instance) log() logrus.FieldLogger { + if si.logger == nil { + return logrus.WithField("id", si.ID) + } + + return si.logger +} + func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error { methodsToInvoke := si.getSagaMethodNameToInvoke(exchange, routingKey, message) @@ -55,10 +63,10 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati params := make([]reflect.Value, 0) params = append(params, reflect.ValueOf(invocation), valueOfMessage) method := reflectedVal.MethodByName(methodName) - if invocation.Log() == nil { + if si.log() == nil { panic("here") } - invocation.Log().WithFields(logrus.Fields{ + si.log().WithFields(logrus.Fields{ "method_name": methodName, "saga_id": si.ID, }).Info("invoking method on saga") @@ -78,13 +86,13 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati return val.Interface().(error) } return nil - }, methodName, message.PayloadFQN, invocation.Log()) + }, methodName, message.PayloadFQN, si.log()) if err != nil { return err } - invocation.Log().WithFields(logrus.Fields{ + si.log().WithFields(logrus.Fields{ "method_name": methodName, "saga_id": si.ID, }).Info("saga instance invoked") } diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index ba99af0..31e6701 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -31,6 +31,7 @@ var ( //TxOutbox is a mysql based transactional outbox type TxOutbox struct { + *gbus.Glogged svcName string txProv gbus.TxProvider purgeOnStartup bool @@ -44,7 +45,7 @@ type TxOutbox struct { } func (outbox *TxOutbox) log() *log.Entry { - return log.WithField("tx", "mysql") + return outbox.Log().WithField("tx", "mysql") } //Start starts the transactional outbox that is used to send messages in sync with domain object change @@ -128,6 +129,7 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool) *TxO ack: make(chan uint64, 1000000), nack: make(chan uint64, 1000000), exit: make(chan bool)} + txo.Glogged = &gbus.Glogged{} return txo }