From 211f219d0a3085fd916ac6d9961c8ba97fbddfef Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 22 Oct 2019 20:14:51 +0300 Subject: [PATCH] fix wrongly set correlation ids when sending a command (#220) * adding the SagaCorrelationID header only for replies Fixing a bug where grabbit would set the message.CorrelationID and the message.SagaCorrelationID header even if the message was a command and not a reply. * fixing minor documentation issue --- gbus/abstractions.go | 2 ++ gbus/saga/invocation.go | 24 ++++++++++-------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 7b7e43b..aa3ebe6 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -18,6 +18,8 @@ const ( CMD Semantics = "cmd" //EVT represenst a messge with event semantics in grabbit EVT Semantics = "evt" + //REPLY represenst a messge with reply semantics in grabbit + REPLY Semantics = "reply" ) //Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index 5e4f218..fb304c9 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -34,25 +34,21 @@ type sagaInvocation struct { startedByRPCID string } -func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, isEvent bool, targetService string) { +func (si *sagaInvocation) setCorrelationIDs(message *gbus.BusMessage, targetService string, semantics gbus.Semantics) { - message.CorrelationID = si.inboundMsg.ID message.SagaID = si.sagaID - if !isEvent { - //support saga-to-saga communication - if si.inboundMsg.SagaID != "" { - message.SagaCorrelationID = si.inboundMsg.SagaID - } + if semantics == gbus.REPLY { + message.CorrelationID = si.inboundMsg.ID + message.SagaCorrelationID = si.inboundMsg.SagaID + + } else if semantics == gbus.CMD { //if the saga is potentially invoking itself then set the SagaCorrelationID to reflect that //https://github.com/wework/grabbit/issues/64 - if targetService == si.hostingSvc { message.SagaCorrelationID = message.SagaID } - } - } func (si *sagaInvocation) HostingSvc() string { return si.hostingSvc @@ -64,13 +60,13 @@ func (si *sagaInvocation) InvokingSvc() string { func (si *sagaInvocation) Reply(ctx context.Context, message *gbus.BusMessage) error { _, targetService := si.decoratedInvocation.Routing() - si.setCorrelationIDs(message, false, targetService) + si.setCorrelationIDs(message, targetService, gbus.REPLY) return si.decoratedInvocation.Reply(ctx, message) } func (si *sagaInvocation) ReplyToInitiator(ctx context.Context, message *gbus.BusMessage) error { - si.setCorrelationIDs(message, false, si.startedBy) + si.setCorrelationIDs(message, si.startedBy, gbus.REPLY) //override the correlation ids to those of the message creating the saga message.SagaCorrelationID = si.startedBySaga @@ -93,13 +89,13 @@ func (si *sagaInvocation) Ctx() context.Context { func (si *sagaInvocation) Send(ctx context.Context, toService string, command *gbus.BusMessage, policies ...gbus.MessagePolicy) error { - si.setCorrelationIDs(command, false, toService) + si.setCorrelationIDs(command, toService, gbus.CMD) return si.decoratedBus.Send(ctx, toService, command, policies...) } func (si *sagaInvocation) Publish(ctx context.Context, exchange, topic string, event *gbus.BusMessage, policies ...gbus.MessagePolicy) error { - si.setCorrelationIDs(event, true, "") + si.setCorrelationIDs(event, "", gbus.EVT) return si.decoratedBus.Publish(ctx, exchange, topic, event, policies...) }