Skip to content

Commit

Permalink
fixed logging issues (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron authored Sep 24, 2019
1 parent f308b4b commit 80161c6
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 6 deletions.
1 change: 1 addition & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ type TxProvider interface {

//TxOutbox abstracts the transactional outgoing channel type
type TxOutbox interface {
Logged
Save(tx *sql.Tx, exchange, routingKey string, amqpMessage amqp.Publishing) error
Start(amqpOut *AMQPOutbox) error
Stop() error
Expand Down
2 changes: 2 additions & 0 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
gb.Outbox.SetLogger(gb.Log())
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
Expand All @@ -107,6 +108,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
}
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
glue.SetLogger(gb.Log())
gb.Glue = glue
return gb
}
Expand Down
3 changes: 3 additions & 0 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gb
}))

exchange, routingKey := invocation.Routing()
instance.logger = imsm.Log()
err := instance.invoke(exchange, routingKey, sginv, message)
if err != nil {
span.LogFields(slog.Error(err))
Expand Down Expand Up @@ -321,6 +322,8 @@ func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider,
timeoutManager: timeoutManager,
}

logged := &gbus.Glogged{}
g.Glogged = logged
timeoutManager.SetTimeoutFunction(g.TimeoutSaga)
return g
}
18 changes: 13 additions & 5 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Instance struct {
ConcurrencyCtrl int
UnderlyingInstance gbus.Saga
MsgToMethodMap []*MsgToFuncPair
Log logrus.FieldLogger
logger logrus.FieldLogger
/*
Will hold the service name that sent the command or event that started the saga
*/
Expand All @@ -36,6 +36,14 @@ type Instance struct {
StartedByRPCID string
}

func (si *Instance) log() logrus.FieldLogger {
if si.logger == nil {
return logrus.WithField("id", si.ID)
}

return si.logger
}

func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error {

methodsToInvoke := si.getSagaMethodNameToInvoke(exchange, routingKey, message)
Expand All @@ -55,10 +63,10 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati
params := make([]reflect.Value, 0)
params = append(params, reflect.ValueOf(invocation), valueOfMessage)
method := reflectedVal.MethodByName(methodName)
if invocation.Log() == nil {
if si.log() == nil {
panic("here")
}
invocation.Log().WithFields(logrus.Fields{
si.log().WithFields(logrus.Fields{
"method_name": methodName, "saga_id": si.ID,
}).Info("invoking method on saga")

Expand All @@ -78,13 +86,13 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati
return val.Interface().(error)
}
return nil
}, methodName, message.PayloadFQN, invocation.Log())
}, methodName, message.PayloadFQN, si.log())

if err != nil {
return err
}

invocation.Log().WithFields(logrus.Fields{
si.log().WithFields(logrus.Fields{
"method_name": methodName, "saga_id": si.ID,
}).Info("saga instance invoked")
}
Expand Down
4 changes: 3 additions & 1 deletion gbus/tx/mysql/txoutbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (

//TxOutbox is a mysql based transactional outbox
type TxOutbox struct {
*gbus.Glogged
svcName string
txProv gbus.TxProvider
purgeOnStartup bool
Expand All @@ -44,7 +45,7 @@ type TxOutbox struct {
}

func (outbox *TxOutbox) log() *log.Entry {
return log.WithField("tx", "mysql")
return outbox.Log().WithField("tx", "mysql")
}

//Start starts the transactional outbox that is used to send messages in sync with domain object change
Expand Down Expand Up @@ -128,6 +129,7 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool) *TxO
ack: make(chan uint64, 1000000),
nack: make(chan uint64, 1000000),
exit: make(chan bool)}
txo.Glogged = &gbus.Glogged{}
return txo
}

Expand Down

0 comments on commit 80161c6

Please sign in to comment.