Skip to content

Commit

Permalink
Merge pull request #154 from liftbridge-io/update_protocol
Browse files Browse the repository at this point in the history
Update protocol
  • Loading branch information
tylertreat authored Feb 27, 2020
2 parents 37038c2 + a45e004 commit 47a8525
Show file tree
Hide file tree
Showing 12 changed files with 685 additions and 154 deletions.
44 changes: 22 additions & 22 deletions documentation/client_implementation.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ func Publish(ctx context.Context, stream string, value []byte, opts ...MessageOp
attach to normal NATS subjects, it's also possible to [publish messages
directly to NATS](https://github.com/liftbridge-io/go-liftbridge#publishing-directly-with-nats)
using a [NATS client](https://nats.io/download/). Liftbridge works fine with
plain, opaque NATS messages, but it also extends NATS with a [protobuf-based
envelope protocol](https://github.com/liftbridge-io/liftbridge-api). This
allows publishers to add metadata to messages like a key, headers, and acking
information. Liftbridge client libraries may provide helper methods to make it
easy to create envelopes and deal with acks yourself using a NATS client
directly ([described below](#low-level-publish-helpers)). However, the
`Publish` API is intended to abstract this work away from you.
plain, opaque NATS messages, but it also extends NATS with a protobuf-based
[envelope protocol](./envelope_protocol.md). This allows publishers to add
metadata to messages like a key, headers, and acking information. Liftbridge
client libraries may provide helper methods to make it easy to create envelopes
and deal with acks yourself using a NATS client directly ([described
below](#low-level-publish-helpers)). However, the `Publish` API is intended to
abstract this work away from you.

`Publish` is a synchronous operation, meaning when it returns, the message has
been successfully published. `Publish` can also be configured to block until a
Expand Down Expand Up @@ -307,13 +307,13 @@ that match the subject). Since Liftbridge streams attach to normal NATS
subjects, it's also possible to [publish messages directly to
NATS](https://github.com/liftbridge-io/go-liftbridge#publishing-directly-with-nats)
using a [NATS client](https://nats.io/download/). Liftbridge works fine with
plain, opaque NATS messages, but it also extends NATS with a [protobuf-based
envelope protocol](https://github.com/liftbridge-io/liftbridge-api). This
allows publishers to add metadata to messages like a key, headers, and acking
information. Liftbridge client libraries may provide helper methods to make it
easy to create envelopes and deal with acks yourself using a NATS client
directly ([described below](#low-level-publish-helpers)). However, the
`PublishToSubject` API is intended to abstract this work away from you.
plain, opaque NATS messages, but it also extends NATS with a protobuf-based
[envelope protocol](./envelope_protocol.md). This allows publishers to add
metadata to messages like a key, headers, and acking information. Liftbridge
client libraries may provide helper methods to make it easy to create envelopes
and deal with acks yourself using a NATS client directly ([described
below](#low-level-publish-helpers)). However, the `PublishToSubject` API is
intended to abstract this work away from you.

`PublishToSubject` is a synchronous operation, meaning when it returns, the
message has been successfully published. `PublishToSubject` can also be
Expand Down Expand Up @@ -376,11 +376,11 @@ func main() {
```

However, these low-level publishes lose out on some of the additional
capabilities of Liftbridge provided by message envelopes, such as message
headers, keys, etc. As a result, client libraries may provide helper methods to
facilitate publishing message envelopes directly to NATS as well as handling
acks. These include `NewMessage`, `UnmarshalAck`, and `UnmarshalMessage`
described below.
capabilities of Liftbridge provided by message
[envelopes](./envelope_protocol.md), such as message headers, keys, etc. As a
result, client libraries may provide helper methods to facilitate publishing
message envelopes directly to NATS as well as handling acks. These include
`NewMessage`, `UnmarshalAck`, and `UnmarshalMessage` described below.

##### NewMessage

Expand All @@ -391,9 +391,9 @@ func NewMessage(value []byte, options ...MessageOption) []byte

`NewMessage` creates a Liftbridge message envelope serialized to bytes ready
for publishing to NATS. This consists of an [envelope
header](envelope_protocol.md) followed by the serialized message protobuf. It
takes the same arguments as `Publish` (see above) with the exception of the
context and subject.
header](envelope_protocol.md#liftbridge-envelope-header) followed by the
serialized message protobuf. It takes the same arguments as `Publish` (see
above) with the exception of the context and subject.

Note that the envelope protocol does not need to be implemented in the
`Publish` API since the envelope serialization is handled by the server.
Expand Down
34 changes: 29 additions & 5 deletions documentation/envelope_protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ the Liftbridge API.

## Liftbridge Envelope Header

All Liftbridge messages and RPCs sent over NATS are prefixed with an envelope
header. This includes client-facing messages, such as publishes and acks, as
well as internal RPCs like replication.

```plaintext
0 8 16 24 32
├───────────────┴───────────────┴───────────────┴───────────────┤
│ Magic Number │
├───────────────┬───────────────┬───────────────┬───────────────┤
│ Version │ HeaderLen │ Flags │ Reserved
│ Version │ HeaderLen │ Flags │ MsgType
├───────────────┴───────────────┴───────────────┴───────────────┤
│ CRC-32C (optional) │
└───────────────────────────────────────────────────────────────┘
Expand All @@ -43,6 +47,8 @@ bumped if the envelope format changes or if the message encoding changes in a
non-backwards-compatible way. Adding fields to the messages should not require
a version bump.

Currently, the only supported protocol version is v0, i.e. `0x00`.

### HeaderLen [1 byte]

The header length is the offset of the payload. This is included primarily for
Expand All @@ -53,12 +59,30 @@ safety.
The flag bits are defined as follows:

| Bit | Description |
| --- | --------------- |
| :-- | :-------------- |
| 0 | CRC-32C enabled |

### Reserved [1 byte]

Reserved for future use.
### MsgType [1 byte]

This is the Liftbridge-specific message type the envelope contains:

| MsgType | Name | Description | Internal |
| :------ | :------------------------ | :----------------------------------------------------- | :------- |
| 0 | Publish | Client-published message | no |
| 1 | Ack | Server-published ack | no |
| 2 | ReplicationRequest | Request to replicate partition data | yes |
| 3 | ReplicationResponse | Response to ReplicationRequest | yes |
| 4 | RaftJoinRequest | Request to join Raft cluster | yes |
| 5 | RaftJoinResponse | Response to RaftJoinRequest | yes |
| 6 | LeaderEpochOffsetRequest | Request for partition leader's latest offset for epoch | yes |
| 7 | LeaderEpochOffsetResponse | Response to LeaderEpochOffsetRequest | yes |
| 8 | PropagatedRequest | Request forwarded to metadata leader | yes |
| 9 | PropagatedResponse | Response to PropagatedRequest | yes |
| 10 | ServerInfoRequest | Request for cluster information | yes |
| 11 | ServerInfoResponse | Response to ServerInfoRequest | yes |
| 12 | PartitionStatusRequest | Request to get partition status | yes |
| 13 | PartitionStatusResponse | Response to PartitionStatusRequest | yes |
| 14 | PartitionNotification | Signal new data is available for partition | yes |

### CRC-32C [4 bytes, optional]

Expand Down
10 changes: 6 additions & 4 deletions documentation/replication_protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ by the maintainers of Kafka.

Replication RPCs are made over internal NATS subjects. Replication requests for
a partition are sent to `<namespace>.<stream>.<partition>.replicate` which is a
subject the partition leader subscribes to. The request data is a
[protobuf](https://github.com/liftbridge-io/liftbridge/blob/8bee0478da97711dc2a8e1fdae8b2d2e3086c756/server/proto/internal.proto#L87-L90)
subject the partition leader subscribes to. Request and response payloads are
prefixed with the [Liftbridge envelope header](./envelope_protocol.md). The
request data is a [protobuf](https://github.com/liftbridge-io/liftbridge/blob/8bee0478da97711dc2a8e1fdae8b2d2e3086c756/server/proto/internal.proto#L87-L90)
containing the ID of the follower and the offset they want to begin fetching
from. The NATS message also includes a random [reply
inbox](https://nats-io.github.io/docs/developer/sending/replyto.html) the
leader uses to send the response to.

The leader response uses a binary format consisting of the following:

```
```plaintext
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+...
| LeaderEpoch | HW |...
Expand All @@ -128,7 +129,8 @@ The leader response uses a binary format consisting of the following:

The remainder of the response consists of message data. If the follower is
caught up, there won't be any message data. The `LeaderEpoch` and `HW` are
always present, so there are 16 bytes guaranteed in the response.
always present, so there are 16 bytes guaranteed in the response after the
envelope header.

The `LeaderEpoch` offset requests also use internal NATS subjects similar to
replication RPCs. These requests are sent to
Expand Down
6 changes: 3 additions & 3 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (a *apiServer) Publish(ctx context.Context, req *client.PublishRequest) (
AckPolicy: req.AckPolicy,
}

buf, err := proto.MarshalEnvelope(msg)
buf, err := proto.MarshalPublish(msg)
if err != nil {
a.logger.Errorf("api: Failed to marshal message: %v", err.Error())
return nil, err
Expand Down Expand Up @@ -237,8 +237,8 @@ func (a *apiServer) publishSync(ctx context.Context, subject,
return nil, err
}

ack := new(client.Ack)
if err := proto.UnmarshalEnvelope(ackMsg.Data, ack); err != nil {
ack, err := proto.UnmarshalAck(ackMsg.Data)
if err != nil {
a.logger.Errorf("api: Invalid ack for publish: %v", err)
return nil, err
}
Expand Down
23 changes: 12 additions & 11 deletions server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/status"

client "github.com/liftbridge-io/liftbridge-api/go"

"github.com/liftbridge-io/liftbridge/server/proto"
)

Expand Down Expand Up @@ -196,9 +197,9 @@ func (m *metadataAPI) fetchBrokerInfo(ctx context.Context, numPeers int) ([]*cli
defer sub.Unsubscribe()

// Survey the cluster.
queryReq, err := (&proto.ServerInfoRequest{
queryReq, err := proto.MarshalServerInfoRequest(&proto.ServerInfoRequest{
Id: m.config.Clustering.ServerID,
}).Marshal()
})
if err != nil {
panic(err)
}
Expand All @@ -210,8 +211,8 @@ func (m *metadataAPI) fetchBrokerInfo(ctx context.Context, numPeers int) ([]*cli
if err != nil {
break
}
queryResp := &proto.ServerInfoResponse{}
if err := queryResp.Unmarshal(msg.Data); err != nil {
queryResp, err := proto.UnmarshalServerInfoResponse(msg.Data)
if err != nil {
m.logger.Warnf("Received invalid server info response: %v", err)
continue
}
Expand Down Expand Up @@ -688,7 +689,7 @@ func (m *metadataAPI) propagateRequest(ctx context.Context, req *proto.Propagate
return status.New(codes.Internal, "No known metadata leader")
}

data, err := req.Marshal()
data, err := proto.MarshalPropagatedRequest(req)
if err != nil {
panic(err)
}
Expand All @@ -704,8 +705,8 @@ func (m *metadataAPI) propagateRequest(ctx context.Context, req *proto.Propagate
return status.New(codes.Internal, err.Error())
}

r := &proto.PropagatedResponse{}
if err := r.Unmarshal(resp.Data); err != nil {
r, err := proto.UnmarshalPropagatedResponse(resp.Data)
if err != nil {
m.logger.Errorf("metadata: Invalid response for propagated request: %v", err)
return status.New(codes.Internal, "invalid response")
}
Expand All @@ -728,10 +729,10 @@ func (m *metadataAPI) waitForPartitionLeader(ctx context.Context, stream, leader
return
}

req, err := (&proto.PartitionStatusRequest{
req, err := proto.MarshalPartitionStatusRequest(&proto.PartitionStatusRequest{
Stream: stream,
Partition: partition,
}).Marshal()
})
if err != nil {
panic(err)
}
Expand All @@ -745,8 +746,8 @@ func (m *metadataAPI) waitForPartitionLeader(ctx context.Context, stream, leader
time.Sleep(100 * time.Millisecond)
continue
}
statusResp := &proto.PartitionStatusResponse{}
if err := statusResp.Unmarshal(resp.Data); err != nil {
statusResp, err := proto.UnmarshalPartitionStatusResponse(resp.Data)
if err != nil {
m.logger.Warnf(
"Invalid status response for partition [stream=%s, partition=%d] from leader %s: %v",
stream, partition, leader, err)
Expand Down
44 changes: 20 additions & 24 deletions server/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,14 @@ func (p *partition) stopFollowing() error {
// epoch or the log end offset if the leader's current epoch is equal to the
// one requested.
func (p *partition) handleLeaderOffsetRequest(msg *nats.Msg) {
req := &proto.LeaderEpochOffsetRequest{}
if err := req.Unmarshal(msg.Data); err != nil {
req, err := proto.UnmarshalLeaderEpochOffsetRequest(msg.Data)
if err != nil {
p.srv.logger.Errorf("Invalid leader epoch offset request for partition %s: %v", p, err)
return
}
resp, err := (&proto.LeaderEpochOffsetResponse{
resp, err := proto.MarshalLeaderEpochOffsetResponse(&proto.LeaderEpochOffsetResponse{
EndOffset: p.log.LastOffsetForLeaderEpoch(req.LeaderEpoch),
}).Marshal()
})
if err != nil {
panic(err)
}
Expand All @@ -439,8 +439,8 @@ func (p *partition) handleLeaderOffsetRequest(msg *nats.Msg) {
// NATS subject specified on the request.
func (p *partition) handleReplicationRequest(msg *nats.Msg) {
received := time.Now()
req := &proto.ReplicationRequest{}
if err := req.Unmarshal(msg.Data); err != nil {
req, err := proto.UnmarshalReplicationRequest(msg.Data)
if err != nil {
p.srv.logger.Errorf("Invalid replication request for partition %s: %v", p, err)
return
}
Expand All @@ -467,15 +467,12 @@ func (p *partition) handleReplicationRequest(msg *nats.Msg) {
// receives a replication response from the leader. This response will contain
// the leader epoch, leader HW, and (optionally) messages to replicate.
func (p *partition) handleReplicationResponse(msg *nats.Msg) int {
// We should have at least 16 bytes, 8 for leader epoch and 8 for HW, i.e.
// replicationOverhead.
if len(msg.Data) < replicationOverhead {
p.srv.logger.Warnf("Invalid replication response for partition %s", p)
leaderEpoch, hw, data, err := proto.UnmarshalReplicationResponse(msg.Data)
if err != nil {
p.srv.logger.Warnf("Invalid replication response for partition %s: %s", p, err)
return 0
}

leaderEpoch := proto.Encoding.Uint64(msg.Data[:8])

p.mu.RLock()
if !p.isFollowing {
p.mu.RUnlock()
Expand All @@ -489,10 +486,8 @@ func (p *partition) handleReplicationResponse(msg *nats.Msg) int {
p.mu.RUnlock()

// Update HW from leader's HW.
hw := int64(proto.Encoding.Uint64(msg.Data[8:]))
p.log.SetHighWatermark(hw)

data := msg.Data[replicationOverhead:]
if len(data) == 0 {
return 0
}
Expand Down Expand Up @@ -728,7 +723,7 @@ func (p *partition) sendAck(ack *client.Ack) {
if ack.AckInbox == "" {
return
}
data, err := proto.MarshalEnvelope(ack)
data, err := proto.MarshalAck(ack)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -814,10 +809,10 @@ func (p *partition) computeReplicaFetchSleep() time.Duration {
// messages that were replicated. Zero (without an error) indicates the
// follower is caught up with the leader.
func (p *partition) sendReplicationRequest() (int, error) {
data, err := (&proto.ReplicationRequest{
data, err := proto.MarshalReplicationRequest(&proto.ReplicationRequest{
ReplicaID: p.srv.config.Clustering.ServerID,
Offset: p.log.NewestOffset(),
}).Marshal()
})
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -870,7 +865,8 @@ func (p *partition) truncateUncommitted() error {
// sendLeaderOffsetRequest sends a request to the leader for the last offset
// for the current leader epoch.
func (p *partition) sendLeaderOffsetRequest(leaderEpoch uint64) (int64, error) {
data, err := (&proto.LeaderEpochOffsetRequest{LeaderEpoch: leaderEpoch}).Marshal()
data, err := proto.MarshalLeaderEpochOffsetRequest(
&proto.LeaderEpochOffsetRequest{LeaderEpoch: leaderEpoch})
if err != nil {
panic(err)
}
Expand All @@ -882,8 +878,8 @@ func (p *partition) sendLeaderOffsetRequest(leaderEpoch uint64) (int64, error) {
if err != nil {
return 0, err
}
offsetResp := &proto.LeaderEpochOffsetResponse{}
if err := offsetResp.Unmarshal(resp.Data); err != nil {
offsetResp, err := proto.UnmarshalLeaderEpochOffsetResponse(resp.Data)
if err != nil {
return 0, err
}
return offsetResp.EndOffset, nil
Expand Down Expand Up @@ -1062,10 +1058,10 @@ func (p *partition) updateISRLatestOffset(replica string, offset int64) {
// sendPartitionNotification sends a message to the given partition replica to
// indicate new data is available in the log.
func (p *partition) sendPartitionNotification(replica string) {
req, err := (&proto.PartitionNotification{
req, err := proto.MarshalPartitionNotification(&proto.PartitionNotification{
Stream: p.Stream,
Partition: p.Id,
}).Marshal()
})
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1094,8 +1090,8 @@ func (p *partition) getSubject() string {
// This is indicated by the presence of the envelope magic number. If it is
// not, nil is returned.
func getMessage(data []byte) *client.Message {
msg := new(client.Message)
if err := proto.UnmarshalEnvelope(data, msg); err != nil {
msg, err := proto.UnmarshalPublish(data)
if err != nil {
return nil
}
return msg
Expand Down
Loading

0 comments on commit 47a8525

Please sign in to comment.