Skip to content

Commit

Permalink
v1.1.5 rollup to master (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron authored Sep 29, 2019
1 parent 900f24f commit eb32401
Show file tree
Hide file tree
Showing 21 changed files with 963 additions and 23 deletions.
39 changes: 16 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,30 @@

A lightweight transactional message bus on top of RabbitMQ supporting:

1) Supported messaging semantics
* One Way
* Duplex
* Publish/Subscribe
* Request/Reply (RPC)
2) Long running processes via the [Saga](https://github.com/wework/grabbit/blob/master/docs/SAGA.md) pattern
3) Retry and backoffs
4) Publisher confirms
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
6) Deadlettering
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing
10) [Extensible serialization](https://github.com/wework/grabbit/blob/master/docs/SERIALIZATION.md) with
default support for gob, protobuf and avro

1) Supported [Messaging Styles](https://github.com/wework/grabbit/blob/master/docs/MESSAGING.md)
- One Way (Fire and forget)
- Publish/Subscribe
- Aync Command/Reply
- Blocking Command/Reply (RPC)
2) [Transactional](https://github.com/wework/grabbit/blob/master/docs/TX.md) message processing
3) Message Orchestration via the [Saga](https://github.com/wework/grabbit/blob/master/docs/SAGA.md) pattern
4) At least once reliable messaging via [Transaction Outbox](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and [Publisher Confirms](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md)
5) [Retry and backoffs](https://github.com/wework/grabbit/blob/master/docs/RETRY.md)
6) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
7) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
8) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing
9) [Extensible serialization](https://github.com/wework/grabbit/blob/master/docs/SERIALIZATION.md) with default support for gob, protobuf and avro

## Stable release
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

## Supported transactional resources
1) MySql > 8.0 (InnoDB)
## Supported serializers
1) gob
2) Avro
3) Protobuf

## Instrumentation

1) Opentracing
## Basic Usage

## Usage
- For a complete sample application see the vacation booking [sample app](https://github.com/wework/grabbit/blob/master/examples/vacation_app) in the examples directory

The following outlines the basic usage of grabbit.
For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package
Expand Down
37 changes: 37 additions & 0 deletions docs/MESSAGING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Messaging

grabbit is all about asynchronous messaging and supports different kinds of interaction patterns.
essentially to work with grabbit you will be registering on a grabbit bus instance handlers for specific types of messages or topics, allowing you to perform your business logic once a message is consumed by grabbit.
Once invoked handlers can reply to incoming messages send or publish messages to other services via the bus.

At its core grabbit distinguishes between messages that target a specific service (commands) and
messages that may target many services (events).

See [README.md](https://github.com/wework/grabbit/blob/master/README.md) or have a look at grabbit's [test suite](https://github.com/wework/grabbit/blob/master/tests/bus_test.go) to learn basic usage

### One Way

All messages in grabbit are essentially one-way messages sent to a RabbitMQ queue or exchange.
More sophisticated complex interactions are all built on one way messaging.
see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L29) as an example.

### Async Command/Reply

When a handlers replies to a command grabbit automatically adds the id of the inbound message to the outbound reply so services can correlate outbound messages with incoming ones.
see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L91) as an example.

### Publish/Subscriber

grabbit allows publishing events to RabbitMQ exchange and topic hierarchies and handlers to subscribe to those events.
Events do not get correlated and can not be replied to.

see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L112) as an example.

### Blocking Command/Reply (RPC)

it is sometimes beneficial to simulate blocking semantics over an async command/reply message exchange.
In particular, it might come in handly when front end web applications need to call backend queued services.
grabbit allows this scenario by providing the RPC API over the bus interface.

see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L215) as an example.

42 changes: 42 additions & 0 deletions docs/RETRY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Retry and Backoffs

When the invocation of a grabbit handler fails grabbit will retry the handler and perform a jittered [binary exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff#Binary_exponential_backoff_algorithm).

You can configure the number of retries and control the backoff time factor by passing in a gbus.BusConfiguration instance to the builder interface.

MaxRetryCount configures the maximum number of retries grabbit will try executing the handler before rejecting the message.

BaseRetryDuration is the base duration in milliseconds inputted into the backoff algorithm.
With a binary exponential backoff algorithm, the time between each retry attempt is calculated as 2^[retry attempt] * BaseRetryDuration + [random jitter (a few nanoseconds) ]

the default MaxRetryCount is 3 and BaseRetryDuration is 10ms
Given the above configuration, grabbit will try retrying according to the following

first retry after ~20ms
second retry after ~40ms
third retry after ~80ms

```go

package main

import (
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
)

bus := builder.
New().
Bus("rabbitmq connection string").
WithLogger(logger).
WorkerNum(3, 1).
WithConfirms().
WithConfiguration(gbus.BusConfiguration{
MaxRetryCount: 5,
BaseRetryDuration: 15,
}).
Txnl("mysql", "database connection string").
Build("your service name")

```

31 changes: 31 additions & 0 deletions docs/TX.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Transactional Support

grabbit executes handlers within the scope of an active transaction.
When handlers use the passed in transaction instance to persist their business objects
grabbit guarantees local transactivity by bounding business objects persistence, outgoing command, reply and event messages the handler issues in a single transaction and routing messages to the [Transaction Outbox](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md).

The following demonstrates how to access the active transaction from within a handler

In this example, the updating of the orders table, publishing of the OrderCanceledEvent event and sending the OrderCanceledReply reply message are all bound to the same transaction.



```go

func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
cancelOrderCommand := message.Payload.(CancelOrderCommand)
if e := invocation.Tx().Exec("UPDATE orders SET status = 'Canceled' WHERE order_id=?", cancelOrderCommand.orderID); e != nil{
return e
}
orderCanceledEvent := gbus.NewBusMessage(OrderCanceledEvent{
OrderID: cancelOrderCommand.orderID,
})
if e := invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "order.canceled", orderCanceledEvent); e := nil{
return e
}

orderCanceledReply := gbus.NewBusMessage(OrderCanceledReply{})
return invocation.Reply(invocation.Ctx(), orderCanceledReply)
}

```
1 change: 1 addition & 0 deletions examples/vacation_app/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vacation_app
1 change: 1 addition & 0 deletions examples/vacation_app/.gitingnore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vacation_app
35 changes: 35 additions & 0 deletions examples/vacation_app/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## grabbit example vacation app

The following example simulates a vacation booking app and demonstrates the use of grabbit
to manage the booking process.

The app is made up out of the following components

- The client, a console app that you enter your vacation destination and calls the booking service to
book the vacation.

- Booking Service manages the booking saga calling the hotels and flights services to book hotels and flights for the requested destination. it handles as well as applying flight cancelation in case the hotel service can not book a hotel.
The booking saga sends back a message to the client with the result of the booking request once the saga completes.

- Flights Service, books flights to the requested destination and replies back the response.
The flight service.

- Hotels Service, books hotels for the requested destination and replies back the response.
In this example requesting to book a vacation in Oslo results in a reply message with a Failed status
Triggering the booking saga to send a command to the flight service to cancel the booked flights.

## building and running the example

- go build
- docker-compose up
- run the booking servce: vacation_app booking
- run the hotels servce: vacation_app hotels
- run the flights servce: vacation_app flights
- run the client: vacation_app client

Once the services are running you can enter a vacation destination in the client app which will invoke the booking saga which will orchestrate the flights and hotels services.


You can see a trace of the calls by browsing to the Jeager client at http://localhost:16686


41 changes: 41 additions & 0 deletions examples/vacation_app/cmd/booking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cmd

import (
"bufio"
"fmt"
"os"
"vacation_app/trace"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"vacation_app/saga"
)

func init() {
rootCmd.AddCommand(runBookingServiceeCmd)
}

var runBookingServiceeCmd = &cobra.Command{
Use: "booking",
Short: "Run the booking service",
Run: func(cmd *cobra.Command, args []string) {
svcName := "booking-service"
closer, err := trace.CreatetJeagerTracer(svcName)

if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return
}

defer closer.Close()
gb := createBus(svcName)

gb.RegisterSaga(&saga.BookingSaga{})
gb.Start()
defer gb.Shutdown()

fmt.Print("Booking service running...press any key to exit...\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadString('\n')
},
}
66 changes: 66 additions & 0 deletions examples/vacation_app/cmd/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cmd

import (
"bufio"
"context"
"fmt"
"os"
"strings"
"vacation_app/messages"
"vacation_app/trace"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/wework/grabbit/gbus"
)


var runClientCmd = &cobra.Command{
Use: "client",
Short: "Run the client app",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("\033]0;Title goes here\007")
log.SetFormatter(&log.TextFormatter{ForceColors: true})
log.SetLevel(log.ErrorLevel)

svcName := "client"
closer, err := trace.CreatetJeagerTracer(svcName)

if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return
}
defer closer.Close()
gb := createBus(svcName)

gb.HandleMessage(messages.BookingComplete{}, HandleBookingComplete)
gb.Start()
defer gb.Shutdown()

for {
fmt.Print("Enter destination ...\n")
reader := bufio.NewReader(os.Stdin)
dest, _ := reader.ReadString('\n')
dest = strings.TrimSpace(dest)
bookVacationCmd := gbus.NewBusMessage(messages.BookVacationCmd{
Destination: dest,
})

gb.Send(context.Background(), "booking-service", bookVacationCmd)

fmt.Printf("booking vacation for destination %s\n", dest)

}
},
}

func HandleBookingComplete(invocation gbus.Invocation, message *gbus.BusMessage) error {
bookingComplete := message.Payload.(*messages.BookingComplete)
if bookingComplete.Success {
fmt.Printf("booking completed succesfully\n")

} else {
fmt.Printf("failed to book vacation\n")
}
return nil
}
64 changes: 64 additions & 0 deletions examples/vacation_app/cmd/flights.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cmd

import (
"bufio"
"fmt"
"os"
"vacation_app/trace"
"vacation_app/messages"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/wework/grabbit/gbus"
)



var runFlightsgServiceCmd = &cobra.Command{
Use: "flights",
Short: "Run the flights service",
Run: func(cmd *cobra.Command, args []string) {
svcName := "flights-service"
closer, err := trace.CreatetJeagerTracer(svcName)

if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return
}

defer closer.Close()
gb := createBus(svcName)

gb.HandleMessage(messages.BookFlightsCmd{}, HandleBookFlightCommand)
gb.HandleMessage(messages.CancelFlightsCmd{}, HandleCancelFlightCommand)

gb.Start()
defer gb.Shutdown()

fmt.Print("Flights service running...press any key to exit...\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadString('\n')
},
}

func HandleBookFlightCommand(invocation gbus.Invocation, message *gbus.BusMessage) error {
cmd := message.Payload.(*messages.BookFlightsCmd)
invocation.Log().Infof("booking flight to %s", cmd.Destination)
reply := gbus.NewBusMessage(messages.BookFlightsRsp{
Success: true,
})
invocation.Reply(invocation.Ctx(), reply)

return nil
}

func HandleCancelFlightCommand(invocation gbus.Invocation, message *gbus.BusMessage) error {
cmd := message.Payload.(*messages.CancelFlightsCmd)
invocation.Log().Infof("canceling flight to %s", cmd.Destination)
reply := gbus.NewBusMessage(messages.CancelFlightsRsp{
Success: true,
})
invocation.Reply(invocation.Ctx(), reply)

return nil
}
17 changes: 17 additions & 0 deletions examples/vacation_app/cmd/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package cmd

import (
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
)

func createBus(serviceName string) gbus.Bus {
return builder.
New().
Bus("amqp://rabbitmq:rabbitmq@localhost").
WorkerNum(3, 1).
WithConfirms().
PurgeOnStartUp().
Txnl("mysql", "rhinof:rhinof@/rhinof").
Build(serviceName)
}
Loading

0 comments on commit eb32401

Please sign in to comment.