From eb3240198e22d6516dac056d76041d99c7bf2108 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 29 Sep 2019 14:54:54 +0300 Subject: [PATCH] v1.1.5 rollup to master (#185) --- README.md | 39 ++-- docs/MESSAGING.md | 37 ++++ docs/RETRY.md | 42 ++++ docs/TX.md | 31 +++ examples/vacation_app/.gitignore | 1 + examples/vacation_app/.gitingnore | 1 + examples/vacation_app/README.md | 35 +++ examples/vacation_app/cmd/booking.go | 41 ++++ examples/vacation_app/cmd/client.go | 66 ++++++ examples/vacation_app/cmd/flights.go | 64 ++++++ examples/vacation_app/cmd/helper.go | 17 ++ examples/vacation_app/cmd/hotels.go | 56 +++++ examples/vacation_app/cmd/root.go | 30 +++ examples/vacation_app/docker-compose.yml | 42 ++++ examples/vacation_app/go.mod | 14 ++ examples/vacation_app/go.sum | 238 +++++++++++++++++++++ examples/vacation_app/main.go | 8 + examples/vacation_app/messages/allinone.go | 84 ++++++++ examples/vacation_app/saga/booking_saga.go | 110 ++++++++++ examples/vacation_app/trace/init.go | 26 +++ tests/bus_test.go | 4 + 21 files changed, 963 insertions(+), 23 deletions(-) create mode 100644 docs/MESSAGING.md create mode 100644 docs/RETRY.md create mode 100644 docs/TX.md create mode 100644 examples/vacation_app/.gitignore create mode 100644 examples/vacation_app/.gitingnore create mode 100644 examples/vacation_app/README.md create mode 100644 examples/vacation_app/cmd/booking.go create mode 100644 examples/vacation_app/cmd/client.go create mode 100644 examples/vacation_app/cmd/flights.go create mode 100644 examples/vacation_app/cmd/helper.go create mode 100644 examples/vacation_app/cmd/hotels.go create mode 100644 examples/vacation_app/cmd/root.go create mode 100644 examples/vacation_app/docker-compose.yml create mode 100644 examples/vacation_app/go.mod create mode 100644 examples/vacation_app/go.sum create mode 100644 examples/vacation_app/main.go create mode 100644 examples/vacation_app/messages/allinone.go create mode 100644 examples/vacation_app/saga/booking_saga.go create mode 100644 examples/vacation_app/trace/init.go diff --git a/README.md b/README.md index d03c798..7043b8b 100644 --- a/README.md +++ b/README.md @@ -10,37 +10,30 @@ A lightweight transactional message bus on top of RabbitMQ supporting: -1) Supported messaging semantics - * One Way - * Duplex - * Publish/Subscribe - * Request/Reply (RPC) -2) Long running processes via the [Saga](https://github.com/wework/grabbit/blob/master/docs/SAGA.md) pattern -3) Retry and backoffs -4) Publisher confirms -5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern -6) Deadlettering -7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md) -8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus -9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing -10) [Extensible serialization](https://github.com/wework/grabbit/blob/master/docs/SERIALIZATION.md) with -default support for gob, protobuf and avro + +1) Supported [Messaging Styles](https://github.com/wework/grabbit/blob/master/docs/MESSAGING.md) + - One Way (Fire and forget) + - Publish/Subscribe + - Aync Command/Reply + - Blocking Command/Reply (RPC) +2) [Transactional](https://github.com/wework/grabbit/blob/master/docs/TX.md) message processing +3) Message Orchestration via the [Saga](https://github.com/wework/grabbit/blob/master/docs/SAGA.md) pattern +4) At least once reliable messaging via [Transaction Outbox](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and [Publisher Confirms](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) +5) [Retry and backoffs](https://github.com/wework/grabbit/blob/master/docs/RETRY.md) +6) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md) +7) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus +8) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing +9) [Extensible serialization](https://github.com/wework/grabbit/blob/master/docs/SERIALIZATION.md) with default support for gob, protobuf and avro ## Stable release the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates. ## Supported transactional resources 1) MySql > 8.0 (InnoDB) -## Supported serializers -1) gob -2) Avro -3) Protobuf - -## Instrumentation -1) Opentracing +## Basic Usage -## Usage +- For a complete sample application see the vacation booking [sample app](https://github.com/wework/grabbit/blob/master/examples/vacation_app) in the examples directory The following outlines the basic usage of grabbit. For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package diff --git a/docs/MESSAGING.md b/docs/MESSAGING.md new file mode 100644 index 0000000..6801d41 --- /dev/null +++ b/docs/MESSAGING.md @@ -0,0 +1,37 @@ +# Messaging + +grabbit is all about asynchronous messaging and supports different kinds of interaction patterns. +essentially to work with grabbit you will be registering on a grabbit bus instance handlers for specific types of messages or topics, allowing you to perform your business logic once a message is consumed by grabbit. +Once invoked handlers can reply to incoming messages send or publish messages to other services via the bus. + +At its core grabbit distinguishes between messages that target a specific service (commands) and +messages that may target many services (events). + +See [README.md](https://github.com/wework/grabbit/blob/master/README.md) or have a look at grabbit's [test suite](https://github.com/wework/grabbit/blob/master/tests/bus_test.go) to learn basic usage + +### One Way + +All messages in grabbit are essentially one-way messages sent to a RabbitMQ queue or exchange. +More sophisticated complex interactions are all built on one way messaging. +see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L29) as an example. + +### Async Command/Reply + +When a handlers replies to a command grabbit automatically adds the id of the inbound message to the outbound reply so services can correlate outbound messages with incoming ones. +see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L91) as an example. + +### Publish/Subscriber + +grabbit allows publishing events to RabbitMQ exchange and topic hierarchies and handlers to subscribe to those events. +Events do not get correlated and can not be replied to. + +see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L112) as an example. + +### Blocking Command/Reply (RPC) + +it is sometimes beneficial to simulate blocking semantics over an async command/reply message exchange. +In particular, it might come in handly when front end web applications need to call backend queued services. +grabbit allows this scenario by providing the RPC API over the bus interface. + +see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L215) as an example. + diff --git a/docs/RETRY.md b/docs/RETRY.md new file mode 100644 index 0000000..1982dda --- /dev/null +++ b/docs/RETRY.md @@ -0,0 +1,42 @@ +# Retry and Backoffs + +When the invocation of a grabbit handler fails grabbit will retry the handler and perform a jittered [binary exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff#Binary_exponential_backoff_algorithm). + +You can configure the number of retries and control the backoff time factor by passing in a gbus.BusConfiguration instance to the builder interface. + +MaxRetryCount configures the maximum number of retries grabbit will try executing the handler before rejecting the message. + +BaseRetryDuration is the base duration in milliseconds inputted into the backoff algorithm. +With a binary exponential backoff algorithm, the time between each retry attempt is calculated as 2^[retry attempt] * BaseRetryDuration + [random jitter (a few nanoseconds) ] + +the default MaxRetryCount is 3 and BaseRetryDuration is 10ms +Given the above configuration, grabbit will try retrying according to the following + +first retry after ~20ms +second retry after ~40ms +third retry after ~80ms + +```go + +package main + +import ( + "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/builder" +) + +bus := builder. + New(). + Bus("rabbitmq connection string"). + WithLogger(logger). + WorkerNum(3, 1). + WithConfirms(). + WithConfiguration(gbus.BusConfiguration{ + MaxRetryCount: 5, + BaseRetryDuration: 15, + }). + Txnl("mysql", "database connection string"). + Build("your service name") + +``` + diff --git a/docs/TX.md b/docs/TX.md new file mode 100644 index 0000000..a184054 --- /dev/null +++ b/docs/TX.md @@ -0,0 +1,31 @@ +# Transactional Support + +grabbit executes handlers within the scope of an active transaction. +When handlers use the passed in transaction instance to persist their business objects +grabbit guarantees local transactivity by bounding business objects persistence, outgoing command, reply and event messages the handler issues in a single transaction and routing messages to the [Transaction Outbox](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md). + +The following demonstrates how to access the active transaction from within a handler + +In this example, the updating of the orders table, publishing of the OrderCanceledEvent event and sending the OrderCanceledReply reply message are all bound to the same transaction. + + + +```go + +func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{ + cancelOrderCommand := message.Payload.(CancelOrderCommand) + if e := invocation.Tx().Exec("UPDATE orders SET status = 'Canceled' WHERE order_id=?", cancelOrderCommand.orderID); e != nil{ + return e + } + orderCanceledEvent := gbus.NewBusMessage(OrderCanceledEvent{ + OrderID: cancelOrderCommand.orderID, + }) + if e := invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "order.canceled", orderCanceledEvent); e := nil{ + return e + } + + orderCanceledReply := gbus.NewBusMessage(OrderCanceledReply{}) + return invocation.Reply(invocation.Ctx(), orderCanceledReply) + } + +``` \ No newline at end of file diff --git a/examples/vacation_app/.gitignore b/examples/vacation_app/.gitignore new file mode 100644 index 0000000..3c3ee0b --- /dev/null +++ b/examples/vacation_app/.gitignore @@ -0,0 +1 @@ +vacation_app diff --git a/examples/vacation_app/.gitingnore b/examples/vacation_app/.gitingnore new file mode 100644 index 0000000..3c3ee0b --- /dev/null +++ b/examples/vacation_app/.gitingnore @@ -0,0 +1 @@ +vacation_app diff --git a/examples/vacation_app/README.md b/examples/vacation_app/README.md new file mode 100644 index 0000000..d5863a4 --- /dev/null +++ b/examples/vacation_app/README.md @@ -0,0 +1,35 @@ +## grabbit example vacation app + +The following example simulates a vacation booking app and demonstrates the use of grabbit +to manage the booking process. + +The app is made up out of the following components + +- The client, a console app that you enter your vacation destination and calls the booking service to + book the vacation. + +- Booking Service manages the booking saga calling the hotels and flights services to book hotels and flights for the requested destination. it handles as well as applying flight cancelation in case the hotel service can not book a hotel. + The booking saga sends back a message to the client with the result of the booking request once the saga completes. + +- Flights Service, books flights to the requested destination and replies back the response. + The flight service. + +- Hotels Service, books hotels for the requested destination and replies back the response. + In this example requesting to book a vacation in Oslo results in a reply message with a Failed status + Triggering the booking saga to send a command to the flight service to cancel the booked flights. + +## building and running the example + + - go build + - docker-compose up + - run the booking servce: vacation_app booking + - run the hotels servce: vacation_app hotels + - run the flights servce: vacation_app flights + - run the client: vacation_app client + + Once the services are running you can enter a vacation destination in the client app which will invoke the booking saga which will orchestrate the flights and hotels services. + + + You can see a trace of the calls by browsing to the Jeager client at http://localhost:16686 + + diff --git a/examples/vacation_app/cmd/booking.go b/examples/vacation_app/cmd/booking.go new file mode 100644 index 0000000..2bbe166 --- /dev/null +++ b/examples/vacation_app/cmd/booking.go @@ -0,0 +1,41 @@ +package cmd + +import ( + "bufio" + "fmt" + "os" + "vacation_app/trace" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "vacation_app/saga" +) + +func init() { + rootCmd.AddCommand(runBookingServiceeCmd) +} + +var runBookingServiceeCmd = &cobra.Command{ + Use: "booking", + Short: "Run the booking service", + Run: func(cmd *cobra.Command, args []string) { + svcName := "booking-service" + closer, err := trace.CreatetJeagerTracer(svcName) + + if err != nil { + log.Printf("Could not initialize jaeger tracer: %s", err.Error()) + return + } + + defer closer.Close() + gb := createBus(svcName) + + gb.RegisterSaga(&saga.BookingSaga{}) + gb.Start() + defer gb.Shutdown() + + fmt.Print("Booking service running...press any key to exit...\n") + reader := bufio.NewReader(os.Stdin) + reader.ReadString('\n') + }, +} diff --git a/examples/vacation_app/cmd/client.go b/examples/vacation_app/cmd/client.go new file mode 100644 index 0000000..7fd4ddd --- /dev/null +++ b/examples/vacation_app/cmd/client.go @@ -0,0 +1,66 @@ +package cmd + +import ( + "bufio" + "context" + "fmt" + "os" + "strings" + "vacation_app/messages" + "vacation_app/trace" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/wework/grabbit/gbus" +) + + +var runClientCmd = &cobra.Command{ + Use: "client", + Short: "Run the client app", + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("\033]0;Title goes here\007") + log.SetFormatter(&log.TextFormatter{ForceColors: true}) + log.SetLevel(log.ErrorLevel) + + svcName := "client" + closer, err := trace.CreatetJeagerTracer(svcName) + + if err != nil { + log.Printf("Could not initialize jaeger tracer: %s", err.Error()) + return + } + defer closer.Close() + gb := createBus(svcName) + + gb.HandleMessage(messages.BookingComplete{}, HandleBookingComplete) + gb.Start() + defer gb.Shutdown() + + for { + fmt.Print("Enter destination ...\n") + reader := bufio.NewReader(os.Stdin) + dest, _ := reader.ReadString('\n') + dest = strings.TrimSpace(dest) + bookVacationCmd := gbus.NewBusMessage(messages.BookVacationCmd{ + Destination: dest, + }) + + gb.Send(context.Background(), "booking-service", bookVacationCmd) + + fmt.Printf("booking vacation for destination %s\n", dest) + + } + }, +} + +func HandleBookingComplete(invocation gbus.Invocation, message *gbus.BusMessage) error { + bookingComplete := message.Payload.(*messages.BookingComplete) + if bookingComplete.Success { + fmt.Printf("booking completed succesfully\n") + + } else { + fmt.Printf("failed to book vacation\n") + } + return nil +} diff --git a/examples/vacation_app/cmd/flights.go b/examples/vacation_app/cmd/flights.go new file mode 100644 index 0000000..73a72e2 --- /dev/null +++ b/examples/vacation_app/cmd/flights.go @@ -0,0 +1,64 @@ +package cmd + +import ( + "bufio" + "fmt" + "os" + "vacation_app/trace" + "vacation_app/messages" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/wework/grabbit/gbus" +) + + + +var runFlightsgServiceCmd = &cobra.Command{ + Use: "flights", + Short: "Run the flights service", + Run: func(cmd *cobra.Command, args []string) { + svcName := "flights-service" + closer, err := trace.CreatetJeagerTracer(svcName) + + if err != nil { + log.Printf("Could not initialize jaeger tracer: %s", err.Error()) + return + } + + defer closer.Close() + gb := createBus(svcName) + + gb.HandleMessage(messages.BookFlightsCmd{}, HandleBookFlightCommand) + gb.HandleMessage(messages.CancelFlightsCmd{}, HandleCancelFlightCommand) + + gb.Start() + defer gb.Shutdown() + + fmt.Print("Flights service running...press any key to exit...\n") + reader := bufio.NewReader(os.Stdin) + reader.ReadString('\n') + }, +} + +func HandleBookFlightCommand(invocation gbus.Invocation, message *gbus.BusMessage) error { + cmd := message.Payload.(*messages.BookFlightsCmd) + invocation.Log().Infof("booking flight to %s", cmd.Destination) + reply := gbus.NewBusMessage(messages.BookFlightsRsp{ + Success: true, + }) + invocation.Reply(invocation.Ctx(), reply) + + return nil +} + +func HandleCancelFlightCommand(invocation gbus.Invocation, message *gbus.BusMessage) error { + cmd := message.Payload.(*messages.CancelFlightsCmd) + invocation.Log().Infof("canceling flight to %s", cmd.Destination) + reply := gbus.NewBusMessage(messages.CancelFlightsRsp{ + Success: true, + }) + invocation.Reply(invocation.Ctx(), reply) + + return nil +} diff --git a/examples/vacation_app/cmd/helper.go b/examples/vacation_app/cmd/helper.go new file mode 100644 index 0000000..abe41e2 --- /dev/null +++ b/examples/vacation_app/cmd/helper.go @@ -0,0 +1,17 @@ +package cmd + +import ( + "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/builder" +) + +func createBus(serviceName string) gbus.Bus { + return builder. + New(). + Bus("amqp://rabbitmq:rabbitmq@localhost"). + WorkerNum(3, 1). + WithConfirms(). + PurgeOnStartUp(). + Txnl("mysql", "rhinof:rhinof@/rhinof"). + Build(serviceName) +} diff --git a/examples/vacation_app/cmd/hotels.go b/examples/vacation_app/cmd/hotels.go new file mode 100644 index 0000000..ebf9c7f --- /dev/null +++ b/examples/vacation_app/cmd/hotels.go @@ -0,0 +1,56 @@ +package cmd + +import ( + "bufio" + "fmt" + "os" + "strings" + "vacation_app/messages" + "vacation_app/trace" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/wework/grabbit/gbus" +) + +var runHotelsgServiceCmd = &cobra.Command{ + Use: "hotels", + Short: "Run the hotels service", + Run: func(cmd *cobra.Command, args []string) { + svcName := "hotels-service" + closer, err := trace.CreatetJeagerTracer(svcName) + + if err != nil { + log.Printf("Could not initialize jaeger tracer: %s", err.Error()) + return + } + + defer closer.Close() + gb := createBus(svcName) + gb.HandleMessage(messages.BookHotelCmd{}, HandleBookHotelCommand) + gb.Start() + defer gb.Shutdown() + + fmt.Print("Hotels service running...press any key to exit...\n") + reader := bufio.NewReader(os.Stdin) + reader.ReadString('\n') + }, +} + +func HandleBookHotelCommand(invocation gbus.Invocation, message *gbus.BusMessage) error { + cmd := message.Payload.(*messages.BookHotelCmd) + destination := cmd.Destination + response := messages.BookHotelRsp{} + + if strings.ToLower(destination) == "oslo" { + response.Success = false + invocation.Log().Info("can't book hotels in oslo at this time") + } else { + response.Success = true + invocation.Log().Infof("booking hotel to %s", cmd.Destination) + } + + invocation.Reply(invocation.Ctx(), gbus.NewBusMessage(response)) + + return nil +} diff --git a/examples/vacation_app/cmd/root.go b/examples/vacation_app/cmd/root.go new file mode 100644 index 0000000..a4f5519 --- /dev/null +++ b/examples/vacation_app/cmd/root.go @@ -0,0 +1,30 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(runClientCmd, runBookingServiceeCmd, runFlightsgServiceCmd, runHotelsgServiceCmd) +} + +var rootCmd = &cobra.Command{ + Use: "hugo", + Short: "Hugo is a very fast static site generator", + Long: `A Fast and Flexible Static Site Generator built with + love by spf13 and friends in Go. + Complete documentation is available at http://hugo.spf13.com`, + Run: func(cmd *cobra.Command, args []string) { + // Do Stuff Here + }, +} + +func Execute() { + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/examples/vacation_app/docker-compose.yml b/examples/vacation_app/docker-compose.yml new file mode 100644 index 0000000..886d05b --- /dev/null +++ b/examples/vacation_app/docker-compose.yml @@ -0,0 +1,42 @@ +version: '3' + +services: + rabbitmq: + image: "rabbitmq:3-management" + hostname: "rabbit1" + environment: + RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG" + RABBITMQ_DEFAULT_USER: "rabbitmq" + RABBITMQ_DEFAULT_PASS: "rabbitmq" + RABBITMQ_DEFAULT_VHOST: "/" + ports: + - "15672:15672" + - "5672:5672" + labels: + NAME: "rabbitmq1" + + mysqldb: + image: mysql + command: --default-authentication-plugin=mysql_native_password + restart: always + environment: + MYSQL_DATABASE: rhinof + MYSQL_ROOT_PASSWORD: rhinof + MYSQL_USER: rhinof + MYSQL_PASSWORD: rhinof + ports: + - 3306:3306 + adminer: + image: adminer + ports: + - 8080:8080 + jaeger: + image: jaegertracing/all-in-one:latest + ports: + - "5775:5775/udp" + - "6831:6831/udp" + - "6832:6832/udp" + - "5778:5778" + - "16686:16686" + - "14268:14268" + - "9411:9411" \ No newline at end of file diff --git a/examples/vacation_app/go.mod b/examples/vacation_app/go.mod new file mode 100644 index 0000000..3caeb11 --- /dev/null +++ b/examples/vacation_app/go.mod @@ -0,0 +1,14 @@ +module vacation_app + +go 1.12 + +require ( + github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect + github.com/opentracing/opentracing-go v1.1.0 + github.com/rhinof/grabbit v0.0.0-20190411110638-a50e536d03e3 + github.com/sirupsen/logrus v1.4.2 + github.com/spf13/cobra v0.0.5 + github.com/uber/jaeger-client-go v2.16.0+incompatible + github.com/uber/jaeger-lib v2.0.0+incompatible + github.com/wework/grabbit v1.1.5-0.20190929052249-0cd9890ac0f4 +) diff --git a/examples/vacation_app/go.sum b/examples/vacation_app/go.sum new file mode 100644 index 0000000..c737917 --- /dev/null +++ b/examples/vacation_app/go.sum @@ -0,0 +1,238 @@ +emperror.dev/emperror v0.21.3/go.mod h1:aeDoz3ERR3yJblyjfKojXoFFsXSd6K8Wfd4Zb1eEbZg= +emperror.dev/errors v0.4.1/go.mod h1:cA5SMsyzo+KXq997DKGK+lTV1DGx5TXLQUNtYe9p2p0= +emperror.dev/errors v0.4.3 h1:yfhVxX1vzHgCDXh0KL+gVKfKhXlJCabmc79jS6QQuus= +emperror.dev/errors v0.4.3/go.mod h1:cA5SMsyzo+KXq997DKGK+lTV1DGx5TXLQUNtYe9p2p0= +emperror.dev/handler/logrus v0.1.0 h1:hiBAmANpRKU4/RxARem/P5r0c8fa1Hx/BqX0mpplflg= +emperror.dev/handler/logrus v0.1.0/go.mod h1:aulFiWRpZEHT8+r2DUgd/IniD/dyNS9Quxn//jU5aYA= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo= +github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/Rican7/retry v0.1.0 h1:FqK94z34ly8Baa6K+G8Mmza9rYWTKOJk+yckIBB5qVk= +github.com/Rican7/retry v0.1.0/go.mod h1:FgOROf8P5bebcC1DS0PdOQiqGUridaZvikzUmkFW6gg= +github.com/Shopify/sarama v1.21.0/go.mod h1:yuqtN/pe8cXRWG5zPaO7hCfNJp5MwmkoJEoLjkm5tCQ= +github.com/Shopify/sarama v1.23.0 h1:slvlbm7bxyp7sKQbUwha5BQdZTqurhRoI+zbKorVigQ= +github.com/Shopify/sarama v1.23.0/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A= +github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= +github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 h1:QuKWm+/gc4/EuT8SCBAn1qcTh576rg0KoLfi7a0ArMM= +github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15/go.mod h1:NBrM4f6cInyw9KSBFONNXzpvPQ/WGige7ON42RICbWM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.4/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= +github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= +github.com/lopezator/migrator v0.2.0 h1:5t2GE77ojbyl9fZ4lHxkfFjwNZvTCzWFMDSorQq5O/c= +github.com/lopezator/migrator v0.2.0/go.mod h1:bpVAVPkWSvTw8ya2Pk7E/KiNAyDWNImgivQY79o8/8I= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 h1:OhtbNVqXz6DuVXGvwPYXnNwQy1n2rI+2mID9CQOok9U= +github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620/go.mod h1:UTAgTV5+tXpWiYqczgUb2kCslN9sqcshFQdmHSTyzlU= +github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= +github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= +github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 h1:eUm8ma4+yPknhXtkYlWh3tMkE6gBjXZToDned9s2gbQ= +github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rhinof/grabbit v0.0.0-20190411110638-a50e536d03e3 h1:yxIAU7hTyjCJeX5B/ydcwre0LcPTaHge5DU2pqND+PA= +github.com/rhinof/grabbit v0.0.0-20190411110638-a50e536d03e3/go.mod h1:enxXXxSAurM+2jWHMDQYCUS9Tjpjs1J2C4xbZcBo8IU= +github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/streadway/amqp v0.0.0-20190312223743-14f78b41ce6d/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQGFt7E53bPYqEgug/AoBtY= +github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-client-go v2.19.0+incompatible h1:pbwbYfHUoaase0oPQOdZ1GcaUjImYGimUXSQ/+8+Z8Q= +github.com/uber/jaeger-client-go v2.19.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-lib v2.0.0+incompatible h1:iMSCV0rmXEogjNWPh2D0xk9YVKvrtGoHJNe9ebLu/pw= +github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= +github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/wework/grabbit v1.1.2-0.20190909015842-61c7dc46f7e9/go.mod h1:SGYWPBZZdM9q0S9bJvR4lzKqZhYbay+jbEuEqOzjixA= +github.com/wework/grabbit v1.1.4 h1:2WPoKGAvLfHhN9FhKaVTQYj3Of8xoj3F4l+wZFGPal0= +github.com/wework/grabbit v1.1.4/go.mod h1:9KKwNCgVkVOjIf/g/89fne31lDVC7Uc+C1XsTDiFI5U= +github.com/wework/grabbit v1.1.5-0.20190929052249-0cd9890ac0f4 h1:6dPBHRiFvpVEOR19eEsrp1q11NAffMS6t1C+XjF5NVM= +github.com/wework/grabbit v1.1.5-0.20190929052249-0cd9890ac0f4/go.mod h1:9KKwNCgVkVOjIf/g/89fne31lDVC7Uc+C1XsTDiFI5U= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190320223903-b7391e95e576/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190322120337-addf6b3196f6/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190322080309-f49334f85ddc/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= +golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190822191935-b1e2c8edcefd/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/gokrb5.v7 v7.3.0 h1:0709Jtq/6QXEuWRfAm260XqlpcwL1vxtO1tUE2qK8Z4= +gopkg.in/jcmturner/gokrb5.v7 v7.3.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/examples/vacation_app/main.go b/examples/vacation_app/main.go new file mode 100644 index 0000000..54380f6 --- /dev/null +++ b/examples/vacation_app/main.go @@ -0,0 +1,8 @@ +package main + +import "vacation_app/cmd" + +func main() { + + cmd.Execute() +} diff --git a/examples/vacation_app/messages/allinone.go b/examples/vacation_app/messages/allinone.go new file mode 100644 index 0000000..e754cc7 --- /dev/null +++ b/examples/vacation_app/messages/allinone.go @@ -0,0 +1,84 @@ +package messages + +/**** Booking ****/ + +type BookVacationCmd struct { + Destination string +} + +func (BookVacationCmd) SchemaName() string { + return "BookVacationCmd" +} + +type BookVacationRsp struct { + Result bool +} + +func (BookVacationRsp) SchemaName() string { + return "BookVacationRsp" +} + +/**** Flights ****/ + +type BookFlightsCmd struct { + Destination string +} + +func (BookFlightsCmd) SchemaName() string { + return "BookFlightsCmd" +} + +type BookFlightsRsp struct { + Success bool +} + +func (BookFlightsRsp) SchemaName() string { + return "BookFlightsRsp" +} + +type CancelFlightsCmd struct { + Destination string +} + +func (CancelFlightsCmd) SchemaName() string { + return "CancelFlightsCmd" +} + +type CancelFlightsRsp struct { + Success bool +} + +func (CancelFlightsRsp) SchemaName() string { + return "CancelFlightsRsp" +} + +/**** Hotels ****/ + +type BookHotelCmd struct { + Destination string +} + +func (BookHotelCmd) SchemaName() string { + return "BookHotelCmd" +} + +type BookHotelRsp struct { + Success bool +} + +func (BookHotelRsp) SchemaName() string { + return "BookHotelRsp" +} + +type BookingComplete struct { + Destination string + Success bool +} + +func (BookingComplete) SchemaName() string { + return "BookingComplete" +} + +// *** CancelFlightCmd ***// + +// *** CancelFlighRpl ***// diff --git a/examples/vacation_app/saga/booking_saga.go b/examples/vacation_app/saga/booking_saga.go new file mode 100644 index 0000000..59405f5 --- /dev/null +++ b/examples/vacation_app/saga/booking_saga.go @@ -0,0 +1,110 @@ +package saga + +import ( + "context" + "vacation_app/messages" + + "github.com/wework/grabbit/gbus" +) + +type BookingSaga struct { + FinishedWithFlights bool + FinishedWithHotels bool + CancelingFlights bool + Destination string +} + +func (bs *BookingSaga) StartedBy() []gbus.Message { + startedBy := make([]gbus.Message, 0) + startedBy = append(startedBy, messages.BookVacationCmd{}) + return startedBy +} + +func (bs *BookingSaga) RegisterAllHandlers(register gbus.HandlerRegister) { + register.HandleMessage(messages.BookVacationCmd{}, bs.HandleBookingCommand) + register.HandleMessage(messages.BookFlightsRsp{}, bs.HandleBookFlightsRsp) + register.HandleMessage(messages.BookHotelRsp{}, bs.HandleBookHotelRsp) + register.HandleMessage(messages.CancelFlightsRsp{}, bs.HandleCancelFlightsRsp) +} + +func (bs *BookingSaga) IsComplete() bool { + return bs.FinishedWithFlights && bs.FinishedWithHotels +} + +func (bs *BookingSaga) New() gbus.Saga { + return &BookingSaga{} +} + +func (bs *BookingSaga) HandleBookingCommand(invocation gbus.Invocation, message *gbus.BusMessage) error { + cmd := message.Payload.(*messages.BookVacationCmd) + invocation.Log().Infof("booking a vacation to %s", cmd.Destination) + bs.Destination = cmd.Destination + bookFlights := gbus.NewBusMessage( + messages.BookFlightsCmd{ + Destination: bs.Destination, + }) + + bookHotels := gbus.NewBusMessage( + messages.BookHotelCmd{ + Destination: bs.Destination, + }) + hErr := invocation.Bus().Send(invocation.Ctx(), "hotels-service", bookHotels) + if hErr != nil { + return hErr + } + return invocation.Bus().Send(invocation.Ctx(), "flights-service", bookFlights) +} + +func (bs *BookingSaga) HandleBookFlightsRsp(invocation gbus.Invocation, message *gbus.BusMessage) error { + response := message.Payload.(*messages.BookFlightsRsp) + invocation.Log().Infof("flights were booked ? %t", response.Success) + if bs.CancelingFlights == false { + bs.FinishedWithFlights = true + } + return bs.notifyInitiatorIfSagaComplets(true, invocation.Ctx(), invocation) +} + +func (bs *BookingSaga) HandleBookHotelRsp(invocation gbus.Invocation, message *gbus.BusMessage) error { + response := message.Payload.(*messages.BookHotelRsp) + bs.FinishedWithHotels = true + if response.Success { + invocation.Log().Infof("hotel in %s was booked", bs.Destination) + + } else { + bs.FinishedWithFlights = false + bs.CancelingFlights = true + invocation.Log().Infof("hotel in %s was not booked !!", bs.Destination) + invocation.Log().Infof("canceling flights to %s !!", bs.Destination) + cancelFlightsCmd := gbus.NewBusMessage( + messages.CancelFlightsCmd{ + Destination: bs.Destination}, + ) + return invocation.Bus().Send(invocation.Ctx(), "flights-service", cancelFlightsCmd) + } + + return bs.notifyInitiatorIfSagaComplets(true, invocation.Ctx(), invocation) + +} + +func (bs *BookingSaga) HandleCancelFlightsRsp(invocation gbus.Invocation, message *gbus.BusMessage) error { + response := message.Payload.(*messages.CancelFlightsRsp) + invocation.Log().Infof("flights were canceled ? %t", response.Success) + bs.FinishedWithFlights = true + + return bs.notifyInitiatorIfSagaComplets(false, invocation.Ctx(), invocation) +} + +func (bs *BookingSaga) notifyInitiatorIfSagaComplets(success bool, ctx context.Context, invocation gbus.Invocation) error { + + if bs.IsComplete() { + + sagaInvocation := invocation.(gbus.SagaInvocation) + reply := gbus.NewBusMessage(messages.BookingComplete{ + Destination: bs.Destination, + Success: success, + }) + return sagaInvocation.ReplyToInitiator(invocation.Ctx(), reply) + + } + return nil +} diff --git a/examples/vacation_app/trace/init.go b/examples/vacation_app/trace/init.go new file mode 100644 index 0000000..1f36b5e --- /dev/null +++ b/examples/vacation_app/trace/init.go @@ -0,0 +1,26 @@ +package trace + +import ( + "io" + + jaeger "github.com/uber/jaeger-client-go" + jaegercfg "github.com/uber/jaeger-client-go/config" +) + +func CreatetJeagerTracer(svcName string) (io.Closer, error) { + + cfg := jaegercfg.Configuration{ + Sampler: &jaegercfg.SamplerConfig{ + Type: jaeger.SamplerTypeConst, + Param: 1, + }, + Reporter: &jaegercfg.ReporterConfig{ + LogSpans: false, + }, + } + + // Initialize tracer with a logger and a metrics factory + closer, err := cfg.InitGlobalTracer(svcName) + + return closer, err +} diff --git a/tests/bus_test.go b/tests/bus_test.go index 1aae892..6545199 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -88,6 +88,10 @@ func TestReply(t *testing.T) { t.Errorf("message handler for reply message invoked with wrong message type\r\n%v", message) } + if message.CorrelationID != cmdBusMsg.ID { + t.Errorf("CorrelationID didn't match expected %s but was %s", cmdBusMsg.ID, message.CorrelationID) + } + proceed <- true return nil }