diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..e3fb5ab --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,37 @@ +linters-settings: + golint: + # minimal confidence for issues, default is 0.8 + min-confidence: 0.8 + gocyclo: + min-complexity: 15 + + govet: + # report about shadowed variables + check-shadowing: false + + # settings per analyzer + settings: + printf: # analyzer name, run `go tool vet help` to see all analyzers + funcs: # run `go tool vet help printf` to see available settings for `printf` analyzer + - (github.com/golangci/golangci-lint/pkg/logutils.Log).Infof + - (github.com/golangci/golangci-lint/pkg/logutils.Log).Warnf + - (github.com/golangci/golangci-lint/pkg/logutils.Log).Errorf + - (github.com/golangci/golangci-lint/pkg/logutils.Log).Fatalf +linters: + disable-all: true + enable: + - deadcode + # - errcheck + - gosimple + - govet + - ineffassign + # - staticcheck + # - typecheck + - unused + # - varcheck + # - deadcode + # - dupl + # - gocritic + - gocyclo + - golint + - misspell \ No newline at end of file diff --git a/gbus/metrics/handler_metrics.go b/gbus/metrics/handler_metrics.go index e60c751..c84b208 100644 --- a/gbus/metrics/handler_metrics.go +++ b/gbus/metrics/handler_metrics.go @@ -2,11 +2,12 @@ package metrics import ( "fmt" - "github.com/prometheus/client_golang/prometheus/promauto" "sync" + "github.com/prometheus/client_golang/prometheus/promauto" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_model/go" "github.com/sirupsen/logrus" ) @@ -37,7 +38,8 @@ const ( handler = "handler" ) -type handlerMetrics struct { +//HandlerMetrics holds the metrics results for a handler +type HandlerMetrics struct { result *prometheus.CounterVec latency prometheus.Summary } @@ -84,17 +86,17 @@ func RunHandlerWithMetric(handleMessage func() error, handlerName, messageType s } //GetHandlerMetrics gets the metrics handler associated with the handlerName -func GetHandlerMetrics(handlerName string) *handlerMetrics { +func GetHandlerMetrics(handlerName string) *HandlerMetrics { entry, ok := handlerMetricsByHandlerName.Load(handlerName) if ok { - return entry.(*handlerMetrics) + return entry.(*HandlerMetrics) } return nil } -func newHandlerMetrics(handlerName string) *handlerMetrics { - return &handlerMetrics{ +func newHandlerMetrics(handlerName string) *HandlerMetrics { + return &HandlerMetrics{ result: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: grabbitPrefix, @@ -150,17 +152,17 @@ func GetLatencySampleCountByMessageTypeAndHandlerName(messageType, handlerName s } //GetSuccessCount gets the value of the handlers success value -func (hm *handlerMetrics) GetSuccessCount() (float64, error) { +func (hm *HandlerMetrics) GetSuccessCount() (float64, error) { return getCounterValue(hm.result.WithLabelValues(success)) } //GetFailureCount gets the value of the handlers failure value -func (hm *handlerMetrics) GetFailureCount() (float64, error) { +func (hm *HandlerMetrics) GetFailureCount() (float64, error) { return getCounterValue(hm.result.WithLabelValues(failure)) } //GetLatencySampleCount gets the value of the handlers latency value -func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) { +func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) { return getSummarySampleCount(hm.latency) } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 7a43612..c01bb88 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -101,6 +101,39 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def { return defs } +func (imsm *Glue) handleNewSaga(def *Def, invocation gbus.Invocation, message *gbus.BusMessage) error { + newInstance := def.newInstance() + newInstance.StartedBy = invocation.InvokingSvc() + newInstance.StartedBySaga = message.SagaCorrelationID + newInstance.StartedByRPCID = message.RPCID + newInstance.StartedByMessageID = message.ID + + imsm.Log(). + WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). + Info("created new saga") + if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil { + imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") + return invkErr + } + + if !newInstance.isComplete() { + imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga") + + if e := imsm.sagaStore.SaveNewSaga(invocation.Tx(), def.sagaType, newInstance); e != nil { + imsm.Log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed") + return e + } + + if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { + imsm.Log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") + if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil { + return tme + } + } + } + return nil +} + //SagaHandler is the generic handler invoking saga instances func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessage) error { @@ -121,37 +154,8 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa */ startNew := def.shouldStartNewSaga(message) if startNew { + return imsm.handleNewSaga(def, invocation, message) - newInstance := def.newInstance() - newInstance.StartedBy = invocation.InvokingSvc() - newInstance.StartedBySaga = message.SagaCorrelationID - newInstance.StartedByRPCID = message.RPCID - newInstance.StartedByMessageID = message.ID - - imsm.Log(). - WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). - Info("created new saga") - if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil { - imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") - return invkErr - } - - if !newInstance.isComplete() { - imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga") - - if e := imsm.sagaStore.SaveNewSaga(invocation.Tx(), def.sagaType, newInstance); e != nil { - imsm.Log().WithError(e).WithField("saga_id", newInstance.ID).Error("saving new saga failed") - return e - } - - if requestsTimeout, duration := newInstance.requestsTimeout(); requestsTimeout { - imsm.Log().WithFields(logrus.Fields{"saga_id": newInstance.ID, "timeout_duration": duration}).Info("new saga requested timeout") - if tme := imsm.timeoutManager.RegisterTimeout(invocation.Tx(), newInstance.ID, duration); tme != nil { - return tme - } - } - } - return nil } else if message.SagaCorrelationID != "" { instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), message.SagaCorrelationID) diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 9e5cad1..157b93a 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -63,6 +63,7 @@ func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocati }).Info("invoking method on saga") span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), methodName) + // replace the original context with the conext built around the span so we ca // trace the saga handler that is invoked invocation.ctx = sctx diff --git a/gbus/tx/mysql/migrations.go b/gbus/tx/mysql/migrations.go index 67e527b..8f0d4fa 100644 --- a/gbus/tx/mysql/migrations.go +++ b/gbus/tx/mysql/migrations.go @@ -47,7 +47,7 @@ func sagaStoreAddSagaCreatorDetails(svcName string) *migrator.Migration { } } -func sagaStoreAdd_RPCID_Details(svcName string) *migrator.Migration { +func sagaStoreAddRPCIDDetails(svcName string) *migrator.Migration { tblName := tx.GrabbitTableNameTemplate(svcName, "sagas") addCreatorDetailsSQL := `ALTER TABLE ` + tblName + ` ADD COLUMN started_by_msg_id VARCHAR(50) AFTER started_by_request_of_svc, ADD COLUMN started_by_rpcid VARCHAR(50) AFTER started_by_msg_id` @@ -157,7 +157,7 @@ func EnsureSchema(db *sql.DB, svcName string) { legacyMigrationsTable(svcName), outboxChangeColumnLength(svcName), sagaStoreAddSagaCreatorDetails(svcName), - sagaStoreAdd_RPCID_Details(svcName), + sagaStoreAddRPCIDDetails(svcName), )) if err != nil { panic(err) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 5ea3e9b..ba99af0 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -294,12 +294,8 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, } } - for recid := range failedDeliveries { - _, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET delivery_attempts=delivery_attempts+1 WHERE rec_id=?", recid) - if updateErr != nil { - outbox.log().WithError(updateErr).WithField("record_id", recid).Warn("failed to update transactional outbox with failed deivery attempt for record") - } - } + outbox.updateFailedDeliveries(tx, failedDeliveries) + if cmtErr := tx.Commit(); cmtErr != nil { outbox.log().WithError(cmtErr).Error("Error committing outbox transaction") } else { @@ -314,6 +310,15 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, return nil } +func (outbox *TxOutbox) updateFailedDeliveries(tx *sql.Tx, failedDeliveries []int) { + for recid := range failedDeliveries { + _, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET delivery_attempts=delivery_attempts+1 WHERE rec_id=?", recid) + if updateErr != nil { + outbox.log().WithError(updateErr).WithField("record_id", recid).Warn("failed to update transactional outbox with failed deivery attempt for record") + } + } +} + func getOutboxName(svcName string) string { return tx.GrabbitTableNameTemplate(svcName, "outbox") diff --git a/tests/bus_test.go b/tests/bus_test.go index 240c993..1aae892 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -319,7 +319,7 @@ func TestRawMessageHandling(t *testing.T) { func TestReturnDeadToQueue(t *testing.T) { var visited bool - proceed := make(chan bool, 0) + proceed := make(chan bool) poison := gbus.NewBusMessage(Command1{}) service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true, diff --git a/tests/testMessages.go b/tests/testMessages.go index ea0f585..5718bee 100644 --- a/tests/testMessages.go +++ b/tests/testMessages.go @@ -64,7 +64,6 @@ type Reply2 struct { Data string } - //SchemaName implementing gbus.Message func (Reply2) SchemaName() string { return "grabbit.tests.Reply2" @@ -80,8 +79,6 @@ func (Reply3) SchemaName() string { return "grabbit.tests.Reply3" } - - //Event1 for testing type Event1 struct { Data string