diff --git a/documentation/client_implementation.md b/documentation/client_implementation.md index 9cb02623..7824a98b 100644 --- a/documentation/client_implementation.md +++ b/documentation/client_implementation.md @@ -223,13 +223,13 @@ func Publish(ctx context.Context, stream string, value []byte, opts ...MessageOp attach to normal NATS subjects, it's also possible to [publish messages directly to NATS](https://github.com/liftbridge-io/go-liftbridge#publishing-directly-with-nats) using a [NATS client](https://nats.io/download/). Liftbridge works fine with -plain, opaque NATS messages, but it also extends NATS with a [protobuf-based -envelope protocol](https://github.com/liftbridge-io/liftbridge-api). This -allows publishers to add metadata to messages like a key, headers, and acking -information. Liftbridge client libraries may provide helper methods to make it -easy to create envelopes and deal with acks yourself using a NATS client -directly ([described below](#low-level-publish-helpers)). However, the -`Publish` API is intended to abstract this work away from you. +plain, opaque NATS messages, but it also extends NATS with a protobuf-based +[envelope protocol](./envelope_protocol.md). This allows publishers to add +metadata to messages like a key, headers, and acking information. Liftbridge +client libraries may provide helper methods to make it easy to create envelopes +and deal with acks yourself using a NATS client directly ([described +below](#low-level-publish-helpers)). However, the `Publish` API is intended to +abstract this work away from you. `Publish` is a synchronous operation, meaning when it returns, the message has been successfully published. `Publish` can also be configured to block until a @@ -307,13 +307,13 @@ that match the subject). Since Liftbridge streams attach to normal NATS subjects, it's also possible to [publish messages directly to NATS](https://github.com/liftbridge-io/go-liftbridge#publishing-directly-with-nats) using a [NATS client](https://nats.io/download/). Liftbridge works fine with -plain, opaque NATS messages, but it also extends NATS with a [protobuf-based -envelope protocol](https://github.com/liftbridge-io/liftbridge-api). This -allows publishers to add metadata to messages like a key, headers, and acking -information. Liftbridge client libraries may provide helper methods to make it -easy to create envelopes and deal with acks yourself using a NATS client -directly ([described below](#low-level-publish-helpers)). However, the -`PublishToSubject` API is intended to abstract this work away from you. +plain, opaque NATS messages, but it also extends NATS with a protobuf-based +[envelope protocol](./envelope_protocol.md). This allows publishers to add +metadata to messages like a key, headers, and acking information. Liftbridge +client libraries may provide helper methods to make it easy to create envelopes +and deal with acks yourself using a NATS client directly ([described +below](#low-level-publish-helpers)). However, the `PublishToSubject` API is +intended to abstract this work away from you. `PublishToSubject` is a synchronous operation, meaning when it returns, the message has been successfully published. `PublishToSubject` can also be @@ -376,11 +376,11 @@ func main() { ``` However, these low-level publishes lose out on some of the additional -capabilities of Liftbridge provided by message envelopes, such as message -headers, keys, etc. As a result, client libraries may provide helper methods to -facilitate publishing message envelopes directly to NATS as well as handling -acks. These include `NewMessage`, `UnmarshalAck`, and `UnmarshalMessage` -described below. +capabilities of Liftbridge provided by message +[envelopes](./envelope_protocol.md), such as message headers, keys, etc. As a +result, client libraries may provide helper methods to facilitate publishing +message envelopes directly to NATS as well as handling acks. These include +`NewMessage`, `UnmarshalAck`, and `UnmarshalMessage` described below. ##### NewMessage @@ -391,9 +391,9 @@ func NewMessage(value []byte, options ...MessageOption) []byte `NewMessage` creates a Liftbridge message envelope serialized to bytes ready for publishing to NATS. This consists of an [envelope -header](envelope_protocol.md) followed by the serialized message protobuf. It -takes the same arguments as `Publish` (see above) with the exception of the -context and subject. +header](envelope_protocol.md#liftbridge-envelope-header) followed by the +serialized message protobuf. It takes the same arguments as `Publish` (see +above) with the exception of the context and subject. Note that the envelope protocol does not need to be implemented in the `Publish` API since the envelope serialization is handled by the server. diff --git a/documentation/envelope_protocol.md b/documentation/envelope_protocol.md index 178ba502..0f05d6d4 100644 --- a/documentation/envelope_protocol.md +++ b/documentation/envelope_protocol.md @@ -19,12 +19,16 @@ the Liftbridge API. ## Liftbridge Envelope Header +All Liftbridge messages and RPCs sent over NATS are prefixed with an envelope +header. This includes client-facing messages, such as publishes and acks, as +well as internal RPCs like replication. + ```plaintext 0 8 16 24 32 ├───────────────┴───────────────┴───────────────┴───────────────┤ │ Magic Number │ ├───────────────┬───────────────┬───────────────┬───────────────┤ -│ Version │ HeaderLen │ Flags │ Reserved │ +│ Version │ HeaderLen │ Flags │ MsgType │ ├───────────────┴───────────────┴───────────────┴───────────────┤ │ CRC-32C (optional) │ └───────────────────────────────────────────────────────────────┘ @@ -43,6 +47,8 @@ bumped if the envelope format changes or if the message encoding changes in a non-backwards-compatible way. Adding fields to the messages should not require a version bump. +Currently, the only supported protocol version is v0, i.e. `0x00`. + ### HeaderLen [1 byte] The header length is the offset of the payload. This is included primarily for @@ -53,12 +59,30 @@ safety. The flag bits are defined as follows: | Bit | Description | -| --- | --------------- | +| :-- | :-------------- | | 0 | CRC-32C enabled | -### Reserved [1 byte] - -Reserved for future use. +### MsgType [1 byte] + +This is the Liftbridge-specific message type the envelope contains: + +| MsgType | Name | Description | Internal | +| :------ | :------------------------ | :----------------------------------------------------- | :------- | +| 0 | Publish | Client-published message | no | +| 1 | Ack | Server-published ack | no | +| 2 | ReplicationRequest | Request to replicate partition data | yes | +| 3 | ReplicationResponse | Response to ReplicationRequest | yes | +| 4 | RaftJoinRequest | Request to join Raft cluster | yes | +| 5 | RaftJoinResponse | Response to RaftJoinRequest | yes | +| 6 | LeaderEpochOffsetRequest | Request for partition leader's latest offset for epoch | yes | +| 7 | LeaderEpochOffsetResponse | Response to LeaderEpochOffsetRequest | yes | +| 8 | PropagatedRequest | Request forwarded to metadata leader | yes | +| 9 | PropagatedResponse | Response to PropagatedRequest | yes | +| 10 | ServerInfoRequest | Request for cluster information | yes | +| 11 | ServerInfoResponse | Response to ServerInfoRequest | yes | +| 12 | PartitionStatusRequest | Request to get partition status | yes | +| 13 | PartitionStatusResponse | Response to PartitionStatusRequest | yes | +| 14 | PartitionNotification | Signal new data is available for partition | yes | ### CRC-32C [4 bytes, optional] diff --git a/documentation/replication_protocol.md b/documentation/replication_protocol.md index 04ff7c2f..b534073b 100644 --- a/documentation/replication_protocol.md +++ b/documentation/replication_protocol.md @@ -109,8 +109,9 @@ by the maintainers of Kafka. Replication RPCs are made over internal NATS subjects. Replication requests for a partition are sent to `...replicate` which is a -subject the partition leader subscribes to. The request data is a -[protobuf](https://github.com/liftbridge-io/liftbridge/blob/8bee0478da97711dc2a8e1fdae8b2d2e3086c756/server/proto/internal.proto#L87-L90) +subject the partition leader subscribes to. Request and response payloads are +prefixed with the [Liftbridge envelope header](./envelope_protocol.md). The +request data is a [protobuf](https://github.com/liftbridge-io/liftbridge/blob/8bee0478da97711dc2a8e1fdae8b2d2e3086c756/server/proto/internal.proto#L87-L90) containing the ID of the follower and the offset they want to begin fetching from. The NATS message also includes a random [reply inbox](https://nats-io.github.io/docs/developer/sending/replyto.html) the @@ -118,7 +119,7 @@ leader uses to send the response to. The leader response uses a binary format consisting of the following: -``` +```plaintext 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+... | LeaderEpoch | HW |... @@ -128,7 +129,8 @@ The leader response uses a binary format consisting of the following: The remainder of the response consists of message data. If the follower is caught up, there won't be any message data. The `LeaderEpoch` and `HW` are -always present, so there are 16 bytes guaranteed in the response. +always present, so there are 16 bytes guaranteed in the response after the +envelope header. The `LeaderEpoch` offset requests also use internal NATS subjects similar to replication RPCs. These requests are sent to diff --git a/server/api.go b/server/api.go index f9026217..559632d9 100644 --- a/server/api.go +++ b/server/api.go @@ -165,7 +165,7 @@ func (a *apiServer) Publish(ctx context.Context, req *client.PublishRequest) ( AckPolicy: req.AckPolicy, } - buf, err := proto.MarshalEnvelope(msg) + buf, err := proto.MarshalPublish(msg) if err != nil { a.logger.Errorf("api: Failed to marshal message: %v", err.Error()) return nil, err @@ -237,8 +237,8 @@ func (a *apiServer) publishSync(ctx context.Context, subject, return nil, err } - ack := new(client.Ack) - if err := proto.UnmarshalEnvelope(ackMsg.Data, ack); err != nil { + ack, err := proto.UnmarshalAck(ackMsg.Data) + if err != nil { a.logger.Errorf("api: Invalid ack for publish: %v", err) return nil, err } diff --git a/server/metadata.go b/server/metadata.go index 61ec3068..643d45ff 100644 --- a/server/metadata.go +++ b/server/metadata.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/status" client "github.com/liftbridge-io/liftbridge-api/go" + "github.com/liftbridge-io/liftbridge/server/proto" ) @@ -196,9 +197,9 @@ func (m *metadataAPI) fetchBrokerInfo(ctx context.Context, numPeers int) ([]*cli defer sub.Unsubscribe() // Survey the cluster. - queryReq, err := (&proto.ServerInfoRequest{ + queryReq, err := proto.MarshalServerInfoRequest(&proto.ServerInfoRequest{ Id: m.config.Clustering.ServerID, - }).Marshal() + }) if err != nil { panic(err) } @@ -210,8 +211,8 @@ func (m *metadataAPI) fetchBrokerInfo(ctx context.Context, numPeers int) ([]*cli if err != nil { break } - queryResp := &proto.ServerInfoResponse{} - if err := queryResp.Unmarshal(msg.Data); err != nil { + queryResp, err := proto.UnmarshalServerInfoResponse(msg.Data) + if err != nil { m.logger.Warnf("Received invalid server info response: %v", err) continue } @@ -688,7 +689,7 @@ func (m *metadataAPI) propagateRequest(ctx context.Context, req *proto.Propagate return status.New(codes.Internal, "No known metadata leader") } - data, err := req.Marshal() + data, err := proto.MarshalPropagatedRequest(req) if err != nil { panic(err) } @@ -704,8 +705,8 @@ func (m *metadataAPI) propagateRequest(ctx context.Context, req *proto.Propagate return status.New(codes.Internal, err.Error()) } - r := &proto.PropagatedResponse{} - if err := r.Unmarshal(resp.Data); err != nil { + r, err := proto.UnmarshalPropagatedResponse(resp.Data) + if err != nil { m.logger.Errorf("metadata: Invalid response for propagated request: %v", err) return status.New(codes.Internal, "invalid response") } @@ -728,10 +729,10 @@ func (m *metadataAPI) waitForPartitionLeader(ctx context.Context, stream, leader return } - req, err := (&proto.PartitionStatusRequest{ + req, err := proto.MarshalPartitionStatusRequest(&proto.PartitionStatusRequest{ Stream: stream, Partition: partition, - }).Marshal() + }) if err != nil { panic(err) } @@ -745,8 +746,8 @@ func (m *metadataAPI) waitForPartitionLeader(ctx context.Context, stream, leader time.Sleep(100 * time.Millisecond) continue } - statusResp := &proto.PartitionStatusResponse{} - if err := statusResp.Unmarshal(resp.Data); err != nil { + statusResp, err := proto.UnmarshalPartitionStatusResponse(resp.Data) + if err != nil { m.logger.Warnf( "Invalid status response for partition [stream=%s, partition=%d] from leader %s: %v", stream, partition, leader, err) diff --git a/server/partition.go b/server/partition.go index f61b7ab4..4caa2294 100644 --- a/server/partition.go +++ b/server/partition.go @@ -418,14 +418,14 @@ func (p *partition) stopFollowing() error { // epoch or the log end offset if the leader's current epoch is equal to the // one requested. func (p *partition) handleLeaderOffsetRequest(msg *nats.Msg) { - req := &proto.LeaderEpochOffsetRequest{} - if err := req.Unmarshal(msg.Data); err != nil { + req, err := proto.UnmarshalLeaderEpochOffsetRequest(msg.Data) + if err != nil { p.srv.logger.Errorf("Invalid leader epoch offset request for partition %s: %v", p, err) return } - resp, err := (&proto.LeaderEpochOffsetResponse{ + resp, err := proto.MarshalLeaderEpochOffsetResponse(&proto.LeaderEpochOffsetResponse{ EndOffset: p.log.LastOffsetForLeaderEpoch(req.LeaderEpoch), - }).Marshal() + }) if err != nil { panic(err) } @@ -439,8 +439,8 @@ func (p *partition) handleLeaderOffsetRequest(msg *nats.Msg) { // NATS subject specified on the request. func (p *partition) handleReplicationRequest(msg *nats.Msg) { received := time.Now() - req := &proto.ReplicationRequest{} - if err := req.Unmarshal(msg.Data); err != nil { + req, err := proto.UnmarshalReplicationRequest(msg.Data) + if err != nil { p.srv.logger.Errorf("Invalid replication request for partition %s: %v", p, err) return } @@ -467,15 +467,12 @@ func (p *partition) handleReplicationRequest(msg *nats.Msg) { // receives a replication response from the leader. This response will contain // the leader epoch, leader HW, and (optionally) messages to replicate. func (p *partition) handleReplicationResponse(msg *nats.Msg) int { - // We should have at least 16 bytes, 8 for leader epoch and 8 for HW, i.e. - // replicationOverhead. - if len(msg.Data) < replicationOverhead { - p.srv.logger.Warnf("Invalid replication response for partition %s", p) + leaderEpoch, hw, data, err := proto.UnmarshalReplicationResponse(msg.Data) + if err != nil { + p.srv.logger.Warnf("Invalid replication response for partition %s: %s", p, err) return 0 } - leaderEpoch := proto.Encoding.Uint64(msg.Data[:8]) - p.mu.RLock() if !p.isFollowing { p.mu.RUnlock() @@ -489,10 +486,8 @@ func (p *partition) handleReplicationResponse(msg *nats.Msg) int { p.mu.RUnlock() // Update HW from leader's HW. - hw := int64(proto.Encoding.Uint64(msg.Data[8:])) p.log.SetHighWatermark(hw) - data := msg.Data[replicationOverhead:] if len(data) == 0 { return 0 } @@ -728,7 +723,7 @@ func (p *partition) sendAck(ack *client.Ack) { if ack.AckInbox == "" { return } - data, err := proto.MarshalEnvelope(ack) + data, err := proto.MarshalAck(ack) if err != nil { panic(err) } @@ -814,10 +809,10 @@ func (p *partition) computeReplicaFetchSleep() time.Duration { // messages that were replicated. Zero (without an error) indicates the // follower is caught up with the leader. func (p *partition) sendReplicationRequest() (int, error) { - data, err := (&proto.ReplicationRequest{ + data, err := proto.MarshalReplicationRequest(&proto.ReplicationRequest{ ReplicaID: p.srv.config.Clustering.ServerID, Offset: p.log.NewestOffset(), - }).Marshal() + }) if err != nil { panic(err) } @@ -870,7 +865,8 @@ func (p *partition) truncateUncommitted() error { // sendLeaderOffsetRequest sends a request to the leader for the last offset // for the current leader epoch. func (p *partition) sendLeaderOffsetRequest(leaderEpoch uint64) (int64, error) { - data, err := (&proto.LeaderEpochOffsetRequest{LeaderEpoch: leaderEpoch}).Marshal() + data, err := proto.MarshalLeaderEpochOffsetRequest( + &proto.LeaderEpochOffsetRequest{LeaderEpoch: leaderEpoch}) if err != nil { panic(err) } @@ -882,8 +878,8 @@ func (p *partition) sendLeaderOffsetRequest(leaderEpoch uint64) (int64, error) { if err != nil { return 0, err } - offsetResp := &proto.LeaderEpochOffsetResponse{} - if err := offsetResp.Unmarshal(resp.Data); err != nil { + offsetResp, err := proto.UnmarshalLeaderEpochOffsetResponse(resp.Data) + if err != nil { return 0, err } return offsetResp.EndOffset, nil @@ -1062,10 +1058,10 @@ func (p *partition) updateISRLatestOffset(replica string, offset int64) { // sendPartitionNotification sends a message to the given partition replica to // indicate new data is available in the log. func (p *partition) sendPartitionNotification(replica string) { - req, err := (&proto.PartitionNotification{ + req, err := proto.MarshalPartitionNotification(&proto.PartitionNotification{ Stream: p.Stream, Partition: p.Id, - }).Marshal() + }) if err != nil { panic(err) } @@ -1094,8 +1090,8 @@ func (p *partition) getSubject() string { // This is indicated by the presence of the envelope magic number. If it is // not, nil is returned. func getMessage(data []byte) *client.Message { - msg := new(client.Message) - if err := proto.UnmarshalEnvelope(data, msg); err != nil { + msg, err := proto.UnmarshalPublish(data) + if err != nil { return nil } return msg diff --git a/server/proto/envelope.go b/server/proto/envelope.go index f1fb0635..acef246e 100644 --- a/server/proto/envelope.go +++ b/server/proto/envelope.go @@ -8,6 +8,35 @@ import ( "hash/crc32" pb "github.com/golang/protobuf/proto" + client "github.com/liftbridge-io/liftbridge-api/go" +) + +// msgType indicates the type of message contained by an envelope. +type msgType byte + +const ( + msgTypePublish msgType = iota + msgTypeAck + + msgTypeReplicationRequest + msgTypeReplicationResponse + + msgTypeRaftJoinRequest + msgTypeRaftJoinResponse + + msgTypeLeaderEpochOffsetRequest + msgTypeLeaderEpochOffsetResponse + + msgTypePropagatedRequest + msgTypePropagatedResponse + + msgTypeServerInfoRequest + msgTypeServerInfoResponse + + msgTypePartitionStatusRequest + msgTypePartitionStatusResponse + + msgTypePartitionNotification ) const ( @@ -33,9 +62,104 @@ var ( crc32cTable = crc32.MakeTable(crc32.Castagnoli) ) -// MarshalEnvelope serializes a protobuf message into the Liftbridge envelope +// MarshalPublish serializes a protobuf publish message into the Liftbridge +// envelope wire format. +func MarshalPublish(msg *client.Message) ([]byte, error) { + return marshalEnvelope(msg, msgTypePublish) +} + +// MarshalAck serializes a protobuf ack message into the Liftbridge envelope // wire format. -func MarshalEnvelope(msg pb.Message) ([]byte, error) { +func MarshalAck(ack *client.Ack) ([]byte, error) { + return marshalEnvelope(ack, msgTypeAck) +} + +// MarshalServerInfoRequest serializes a ServerInfoRequest protobuf into the +// Liftbridge envelope wire format. +func MarshalServerInfoRequest(req *ServerInfoRequest) ([]byte, error) { + return marshalEnvelope(req, msgTypeServerInfoRequest) +} + +// MarshalServerInfoResponse serializes a ServerInfoResponse protobuf into the +// Liftbridge envelope wire format. +func MarshalServerInfoResponse(req *ServerInfoResponse) ([]byte, error) { + return marshalEnvelope(req, msgTypeServerInfoResponse) +} + +// MarshalPropagatedRequest serializes a PropagatedRequest protobuf into the +// Liftbridge envelope wire format. +func MarshalPropagatedRequest(req *PropagatedRequest) ([]byte, error) { + return marshalEnvelope(req, msgTypePropagatedRequest) +} + +// MarshalPropagatedResponse serializes a PropagatedResponse protobuf into the +// Liftbridge envelope wire format. +func MarshalPropagatedResponse(req *PropagatedResponse) ([]byte, error) { + return marshalEnvelope(req, msgTypePropagatedResponse) +} + +// MarshalPartitionStatusRequest serializes a PartitionStatusRequest protobuf +// into the Liftbridge envelope wire format. +func MarshalPartitionStatusRequest(req *PartitionStatusRequest) ([]byte, error) { + return marshalEnvelope(req, msgTypePartitionStatusRequest) +} + +// MarshalPartitionStatusResponse serializes a PartitionStatusResponse protobuf +// into the Liftbridge envelope wire format. +func MarshalPartitionStatusResponse(resp *PartitionStatusResponse) ([]byte, error) { + return marshalEnvelope(resp, msgTypePartitionStatusResponse) +} + +// MarshalReplicationRequest serializes a ReplicationRequest protobuf into the +// Liftbridge envelope wire format. +func MarshalReplicationRequest(req *ReplicationRequest) ([]byte, error) { + return marshalEnvelope(req, msgTypeReplicationRequest) +} + +// MarshalLeaderEpochOffsetRequest serializes a LeaderEpochOffsetRequest +// protobuf into the Liftbridge envelope wire format. +func MarshalLeaderEpochOffsetRequest(req *LeaderEpochOffsetRequest) ([]byte, error) { + return marshalEnvelope(req, msgTypeLeaderEpochOffsetRequest) +} + +// MarshalLeaderEpochOffsetResponse serializes a LeaderEpochOffsetResponse +// protobuf into the Liftbridge envelope wire format. +func MarshalLeaderEpochOffsetResponse(req *LeaderEpochOffsetResponse) ([]byte, error) { + return marshalEnvelope(req, msgTypeLeaderEpochOffsetResponse) +} + +// MarshalPartitionNotification serializes a PartitionNotification protobuf +// into the Liftbridge envelope wire format. +func MarshalPartitionNotification(req *PartitionNotification) ([]byte, error) { + return marshalEnvelope(req, msgTypePartitionNotification) +} + +// MarshalRaftJoinRequest serializes a RaftJoinRequest protobuf into the +// Liftbridge envelope wire format. +func MarshalRaftJoinRequest(req *RaftJoinRequest) ([]byte, error) { + return marshalEnvelope(req, msgTypeRaftJoinRequest) +} + +// MarshalRaftJoinResponse serializes a RaftJoinResponse protobuf into the +// Liftbridge envelope wire format. +func MarshalRaftJoinResponse(req *RaftJoinResponse) ([]byte, error) { + return marshalEnvelope(req, msgTypeRaftJoinResponse) +} + +// WriteReplicationResponseHeader writes the envelope protocol header for +// replication messages to the buffer and returns the number of bytes written. +func WriteReplicationResponseHeader(buf *bytes.Buffer) int { + buf.Write(envelopeMagicNumber) + buf.WriteByte(envelopeProtoV0) + buf.WriteByte(byte(envelopeMinHeaderLen)) + buf.WriteByte(0x00) + buf.WriteByte(byte(msgTypeReplicationResponse)) + return 8 +} + +// marshalEnvelope serializes a protobuf message into the Liftbridge envelope +// wire format. +func marshalEnvelope(msg pb.Message, msgType msgType) ([]byte, error) { data, err := pb.Marshal(msg) if err != nil { return nil, err @@ -54,7 +178,7 @@ func MarshalEnvelope(msg pb.Message) ([]byte, error) { pos++ buf[pos] = 0x00 // Flags pos++ - buf[pos] = 0x00 // Reserved + buf[pos] = byte(msgType) // MsgType pos++ if pos != headerLen { panic(fmt.Sprintf("Payload position (%d) does not match expected HeaderLen (%d)", @@ -64,38 +188,211 @@ func MarshalEnvelope(msg pb.Message) ([]byte, error) { return buf, nil } -// UnmarshalEnvelope deserializes a Liftbridge envelope into a protobuf +// UnmarshalPublish deserializes a Liftbridge publish envelope into a protobuf // message. -func UnmarshalEnvelope(data []byte, msg pb.Message) error { +func UnmarshalPublish(data []byte) (*client.Message, error) { + var ( + msg = new(client.Message) + err = unmarshalEnvelope(data, msg, msgTypePublish) + ) + return msg, err +} + +// UnmarshalAck deserializes a Liftbridge ack envelope into a protobuf message. +func UnmarshalAck(data []byte) (*client.Ack, error) { + var ( + ack = new(client.Ack) + err = unmarshalEnvelope(data, ack, msgTypeAck) + ) + return ack, err +} + +// UnmarshalPropagatedRequest deserializes a Liftbridge PropagatedRequest +// envelope into a protobuf message. +func UnmarshalPropagatedRequest(data []byte) (*PropagatedRequest, error) { + var ( + req = new(PropagatedRequest) + err = unmarshalEnvelope(data, req, msgTypePropagatedRequest) + ) + return req, err +} + +// UnmarshalPropagatedResponse deserializes a Liftbridge PropagatedResponse +// envelope into a protobuf message. +func UnmarshalPropagatedResponse(data []byte) (*PropagatedResponse, error) { + var ( + resp = new(PropagatedResponse) + err = unmarshalEnvelope(data, resp, msgTypePropagatedResponse) + ) + return resp, err +} + +// UnmarshalServerInfoRequest deserializes a Liftbridge ServerInfoRequest +// envelope into a protobuf message. +func UnmarshalServerInfoRequest(data []byte) (*ServerInfoRequest, error) { + var ( + req = new(ServerInfoRequest) + err = unmarshalEnvelope(data, req, msgTypeServerInfoRequest) + ) + return req, err +} + +// UnmarshalServerInfoResponse deserializes a Liftbridge ServerInfoResponse +// envelope into a protobuf message. +func UnmarshalServerInfoResponse(data []byte) (*ServerInfoResponse, error) { + var ( + resp = new(ServerInfoResponse) + err = unmarshalEnvelope(data, resp, msgTypeServerInfoResponse) + ) + return resp, err +} + +// UnmarshalPartitionStatusRequest deserializes a Liftbridge +// PartitionStatusRequest envelope into a protobuf message. +func UnmarshalPartitionStatusRequest(data []byte) (*PartitionStatusRequest, error) { + var ( + req = new(PartitionStatusRequest) + err = unmarshalEnvelope(data, req, msgTypePartitionStatusRequest) + ) + return req, err +} + +// UnmarshalPartitionStatusResponse deserializes a Liftbridge +// PartitionStatusResponse envelope into a protobuf message. +func UnmarshalPartitionStatusResponse(data []byte) (*PartitionStatusResponse, error) { + var ( + resp = new(PartitionStatusResponse) + err = unmarshalEnvelope(data, resp, msgTypePartitionStatusResponse) + ) + return resp, err +} + +// UnmarshalRaftJoinRequest deserializes a Liftbridge RaftJoinRequest envelope +// into a protobuf message. +func UnmarshalRaftJoinRequest(data []byte) (*RaftJoinRequest, error) { + var ( + req = new(RaftJoinRequest) + err = unmarshalEnvelope(data, req, msgTypeRaftJoinRequest) + ) + return req, err +} + +// UnmarshalRaftJoinResponse deserializes a Liftbridge RaftJoinResponse +// envelope into a protobuf message. +func UnmarshalRaftJoinResponse(data []byte) (*RaftJoinResponse, error) { + var ( + resp = new(RaftJoinResponse) + err = unmarshalEnvelope(data, resp, msgTypeRaftJoinResponse) + ) + return resp, err +} + +// UnmarshalPartitionNotification deserializes a Liftbridge +// PartitionNotification envelope into a protobuf message. +func UnmarshalPartitionNotification(data []byte) (*PartitionNotification, error) { + var ( + req = new(PartitionNotification) + err = unmarshalEnvelope(data, req, msgTypePartitionNotification) + ) + return req, err +} + +// UnmarshalLeaderEpochOffsetRequest deserializes a Liftbridge +// LeaderEpochOffsetRequest envelope into a protobuf message. +func UnmarshalLeaderEpochOffsetRequest(data []byte) (*LeaderEpochOffsetRequest, error) { + var ( + req = new(LeaderEpochOffsetRequest) + err = unmarshalEnvelope(data, req, msgTypeLeaderEpochOffsetRequest) + ) + return req, err +} + +// UnmarshalLeaderEpochOffsetResponse deserializes a Liftbridge +// LeaderEpochOffsetResponse envelope into a protobuf message. +func UnmarshalLeaderEpochOffsetResponse(data []byte) (*LeaderEpochOffsetResponse, error) { + var ( + resp = new(LeaderEpochOffsetResponse) + err = unmarshalEnvelope(data, resp, msgTypeLeaderEpochOffsetResponse) + ) + return resp, err +} + +// UnmarshalReplicationRequest deserializes a Liftbridge ReplicationRequest +// envelope into a protobuf message. +func UnmarshalReplicationRequest(data []byte) (*ReplicationRequest, error) { + var ( + req = new(ReplicationRequest) + err = unmarshalEnvelope(data, req, msgTypeReplicationRequest) + ) + return req, err +} + +// UnmarshalReplicationResponse deserializes a Liftbridge replication response +// envelope and returns the leader epoch, HW, and message data. +func UnmarshalReplicationResponse(data []byte) (uint64, int64, []byte, error) { + payload, err := checkEnvelope(data, msgTypeReplicationResponse) + if err != nil { + return 0, 0, nil, err + } + + // We should have at least 16 bytes, 8 for leader epoch and 8 for HW. + if len(payload) < 16 { + return 0, 0, nil, errors.New("not enough data") + } + + var ( + leaderEpoch = Encoding.Uint64(payload[:8]) + hw = int64(Encoding.Uint64(payload[8:])) + ) + + return leaderEpoch, hw, payload[16:], nil +} + +// unmarshalEnvelope deserializes a Liftbridge envelope into a protobuf +// message. +func unmarshalEnvelope(data []byte, msg pb.Message, msgType msgType) error { + payload, err := checkEnvelope(data, msgType) + if err != nil { + return err + } + return pb.Unmarshal(payload, msg) +} + +func checkEnvelope(data []byte, expectedType msgType) ([]byte, error) { if len(data) < envelopeMinHeaderLen { - return errors.New("data missing envelope header") + return nil, errors.New("data missing envelope header") } if !bytes.Equal(data[:envelopeMagicNumberLen], envelopeMagicNumber) { - return errors.New("unexpected envelope magic number") + return nil, errors.New("unexpected envelope magic number") } if data[4] != envelopeProtoV0 { - return fmt.Errorf("unknown envelope protocol: %v", data[4]) + return nil, fmt.Errorf("unknown envelope protocol: %v", data[4]) } var ( - headerLen = int(data[5]) - flags = data[6] - payload = data[headerLen:] + headerLen = int(data[5]) + flags = data[6] + actualType = msgType(data[7]) + payload = data[headerLen:] ) + if actualType != expectedType { + return nil, fmt.Errorf("MsgType mismatch: expected %v, got %v", expectedType, actualType) + } + // Check CRC. if hasBit(flags, 0) { // Make sure there is a CRC present. if headerLen != envelopeMinHeaderLen+4 { - return errors.New("incorrect envelope header size") + return nil, errors.New("incorrect envelope header size") } crc := Encoding.Uint32(data[envelopeMinHeaderLen:headerLen]) if c := crc32.Checksum(payload, crc32cTable); c != crc { - return fmt.Errorf("crc mismatch: expected %d, got %d", crc, c) + return nil, fmt.Errorf("crc mismatch: expected %d, got %d", crc, c) } } - return pb.Unmarshal(payload, msg) + return payload, nil } // hasBit checks if the given bit position is set on the provided byte. diff --git a/server/proto/envelope_test.go b/server/proto/envelope_test.go index aae4dafd..87ecac1d 100644 --- a/server/proto/envelope_test.go +++ b/server/proto/envelope_test.go @@ -1,6 +1,8 @@ package proto import ( + "bytes" + "encoding/binary" "testing" "time" @@ -9,7 +11,7 @@ import ( ) // Ensure we can marshal a message and then unmarshal it. -func TestMarshalUnmarshalEnvelopeMessage(t *testing.T) { +func TestMarshalUnmarshalPublish(t *testing.T) { msg := &client.Message{ Offset: 42, Key: []byte("foo"), @@ -26,17 +28,17 @@ func TestMarshalUnmarshalEnvelopeMessage(t *testing.T) { CorrelationId: "123", } - envelope, err := MarshalEnvelope(msg) + envelope, err := MarshalPublish(msg) require.NoError(t, err) - unmarshaled := new(client.Message) - require.NoError(t, UnmarshalEnvelope(envelope, unmarshaled)) + unmarshaled, err := UnmarshalPublish(envelope) + require.NoError(t, err) require.Equal(t, msg, unmarshaled) } // Ensure we can marshal an ack and then unmarshal it. -func TestMarshalUnmarshalEnvelopeAck(t *testing.T) { +func TestMarshalUnmarshalAck(t *testing.T) { ack := &client.Ack{ Offset: 42, Stream: "foo", @@ -46,48 +48,257 @@ func TestMarshalUnmarshalEnvelopeAck(t *testing.T) { PartitionSubject: "foo.1", } - envelope, err := MarshalEnvelope(ack) + envelope, err := MarshalAck(ack) require.NoError(t, err) - unmarshaled := new(client.Ack) - require.NoError(t, UnmarshalEnvelope(envelope, unmarshaled)) + unmarshaled, err := UnmarshalAck(envelope) + require.NoError(t, err) require.Equal(t, ack, unmarshaled) } -// Ensure UnmarshalEnvelope returns an error if there is not enough data for an +// Ensure we can marshal a ServerInfoRequest and then unmarshal it. +func TestMarshalUnmarshalServerInfoRequest(t *testing.T) { + req := &ServerInfoRequest{ + Id: "foo", + } + envelope, err := MarshalServerInfoRequest(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalServerInfoRequest(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a ServerInfoResponse and then unmarshal it. +func TestMarshalUnmarshalServerInfoResponse(t *testing.T) { + req := &ServerInfoResponse{ + Id: "foo", + Host: "0.0.0.0", + Port: 4000, + } + envelope, err := MarshalServerInfoResponse(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalServerInfoResponse(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a PropagatedRequest and then unmarshal it. +func TestMarshalUnmarshalPropagatedRequest(t *testing.T) { + req := &PropagatedRequest{ + Op: Op_CREATE_PARTITION, + CreatePartitionOp: &CreatePartitionOp{ + Partition: &Partition{ + Subject: "foo", + Stream: "foo", + ReplicationFactor: 3, + }, + }, + } + envelope, err := MarshalPropagatedRequest(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalPropagatedRequest(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a PropagatedResponse and then unmarshal it. +func TestMarshalUnmarshalPropagatedResponse(t *testing.T) { + req := &PropagatedResponse{ + Op: Op_CREATE_PARTITION, + } + envelope, err := MarshalPropagatedResponse(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalPropagatedResponse(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a PartitionStatusRequest and then unmarshal it. +func TestMarshalUnmarshalPartitionStatusRequest(t *testing.T) { + req := &PartitionStatusRequest{ + Stream: "foo", + Partition: 1, + } + envelope, err := MarshalPartitionStatusRequest(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalPartitionStatusRequest(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a PartitionStatusResponse and then unmarshal it. +func TestMarshalUnmarshalPartitionStatusResponse(t *testing.T) { + req := &PartitionStatusResponse{ + Exists: false, + } + envelope, err := MarshalPartitionStatusResponse(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalPartitionStatusResponse(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a ReplicationRequest and then unmarshal it. +func TestMarshalUnmarshalReplicationRequest(t *testing.T) { + req := &ReplicationRequest{ + ReplicaID: "b", + Offset: 10, + } + envelope, err := MarshalReplicationRequest(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalReplicationRequest(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a ReplicationResponse and then unmarshal it. +func TestMarshalUnmarshalReplicationResponse(t *testing.T) { + buf := new(bytes.Buffer) + n := WriteReplicationResponseHeader(buf) + require.Equal(t, 8, n) + + var ( + epoch = uint64(2) + hw = int64(100) + data = []byte("blah") + ) + + // Write the leader epoch. + binary.Write(buf, Encoding, epoch) + // Write the HW. + binary.Write(buf, Encoding, hw) + // Write some fake message data. + buf.Write(data) + + unmarshaledEpoch, unmarshaledHW, unmarshaledData, err := UnmarshalReplicationResponse(buf.Bytes()) + require.NoError(t, err) + require.Equal(t, epoch, unmarshaledEpoch) + require.Equal(t, hw, unmarshaledHW) + require.Equal(t, data, unmarshaledData) +} + +// Ensure we can marshal a LeaderEpochOffsetRequest and then unmarshal it. +func TestMarshalUnmarshalLeaderEpochOffsetRequest(t *testing.T) { + req := &LeaderEpochOffsetRequest{ + LeaderEpoch: 1, + } + envelope, err := MarshalLeaderEpochOffsetRequest(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalLeaderEpochOffsetRequest(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a LeaderEpochOffsetResponse and then unmarshal it. +func TestMarshalUnmarshalLeaderEpochOffsetResponse(t *testing.T) { + req := &LeaderEpochOffsetResponse{ + EndOffset: 10, + } + envelope, err := MarshalLeaderEpochOffsetResponse(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalLeaderEpochOffsetResponse(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a PartitionNotification and then unmarshal it. +func TestMarshalUnmarshalPartitionNotification(t *testing.T) { + req := &PartitionNotification{ + Stream: "foo", + Partition: 2, + } + envelope, err := MarshalPartitionNotification(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalPartitionNotification(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a RaftJoinRequest and then unmarshal it. +func TestMarshalUnmarshalRaftJoinRequest(t *testing.T) { + req := &RaftJoinRequest{ + NodeID: "foo", + NodeAddr: "bar", + } + envelope, err := MarshalRaftJoinRequest(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalRaftJoinRequest(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure we can marshal a RaftJoinResponse and then unmarshal it. +func TestMarshalUnmarshalRaftJoinResponse(t *testing.T) { + req := &RaftJoinResponse{} + envelope, err := MarshalRaftJoinResponse(req) + require.NoError(t, err) + + unmarshaled, err := UnmarshalRaftJoinResponse(envelope) + require.NoError(t, err) + + require.Equal(t, req, unmarshaled) +} + +// Ensure unmarshalEnvelope returns an error if there is not enough data for an // envelope. func TestUnmarshalEnvelopeUnderflow(t *testing.T) { - require.Error(t, UnmarshalEnvelope([]byte{}, new(client.Ack))) + _, err := UnmarshalAck([]byte{}) + require.Error(t, err) } -// Ensure UnmarshalEnvelope returns an error if the magic number is different. +// Ensure unmarshalEnvelope returns an error if the magic number is different. func TestUnmarshalEnvelopeUnexpectedMagicNumber(t *testing.T) { - require.Error(t, UnmarshalEnvelope([]byte("foobarbaz"), new(client.Ack))) + _, err := UnmarshalAck([]byte("foobarbaz")) + require.Error(t, err) } -// Ensure UnmarshalEnvelope returns an error if the protocol version is +// Ensure unmarshalEnvelope returns an error if the protocol version is // unknown. func TestUnmarshalEnvelopeUnexpectedProtoVersion(t *testing.T) { - msg, err := MarshalEnvelope(new(client.Message)) + msg, err := MarshalPublish(new(client.Message)) require.NoError(t, err) msg[4] = 0x01 - require.Error(t, UnmarshalEnvelope(msg, new(client.Message))) + _, err = UnmarshalPublish(msg) + require.Error(t, err) } -// Ensure UnmarshalEnvelope returns an error if the CRC flag is set but no CRC +// Ensure unmarshalEnvelope returns an error if the CRC flag is set but no CRC // is present. func TestUnmarshalEnvelopeMissingCRC(t *testing.T) { - msg, err := MarshalEnvelope(new(client.Message)) + msg, err := MarshalPublish(new(client.Message)) require.NoError(t, err) msg[6] = setBit(msg[6], 0) - require.Error(t, UnmarshalEnvelope(msg, new(client.Message))) + _, err = UnmarshalPublish(msg) + require.Error(t, err) } -// Ensure UnmarshalEnvelope returns an error if the CRC flag is set but the CRC +// Ensure unmarshalEnvelope returns an error if the CRC flag is set but the CRC // doesn't match the expected CRC. func TestUnmarshalEnvelopeMismatchedCRC(t *testing.T) { - msg, err := MarshalEnvelope(new(client.Message)) + msg, err := MarshalPublish(new(client.Message)) require.NoError(t, err) msg[6] = setBit(msg[6], 0) buf := make([]byte, len(msg)+4) @@ -95,7 +306,17 @@ func TestUnmarshalEnvelopeMismatchedCRC(t *testing.T) { buf[8] = byte(32) copy(buf[12:], msg[8:]) buf[5] = byte(12) - require.Error(t, UnmarshalEnvelope(buf, new(client.Message))) + _, err = UnmarshalPublish(buf) + require.Error(t, err) +} + +// Ensure unmarshalEnvelope returns an error if the envelope's MsgType doesn't +// match the expected type. +func TestUnmarshalEnvelopeMismatchedType(t *testing.T) { + msg, err := MarshalPublish(new(client.Message)) + require.NoError(t, err) + _, err = UnmarshalAck(msg) + require.Error(t, err) } func setBit(n byte, pos uint8) byte { diff --git a/server/raft.go b/server/raft.go index 4d34c209..f9d62efe 100644 --- a/server/raft.go +++ b/server/raft.go @@ -134,17 +134,14 @@ func (s *Server) setupMetadataRaft() error { s.logger.Debug("Successfully bootstrapped metadata Raft group") } else if !existingState { // Attempt to join the cluster if we're not bootstrapping. - req, err := (&proto.RaftJoinRequest{ + req, err := proto.MarshalRaftJoinRequest(&proto.RaftJoinRequest{ NodeID: s.config.Clustering.ServerID, NodeAddr: s.config.Clustering.ServerID, // NATS transport uses ID for addr. - }).Marshal() + }) if err != nil { panic(err) } - var ( - joined = false - resp = &proto.RaftJoinResponse{} - ) + joined := false // Attempt to join for up to 30 seconds before giving up. for i := 0; i < raftJoinAttempts; i++ { s.logger.Debug("Attempting to join metadata Raft group...") @@ -154,7 +151,8 @@ func (s *Server) setupMetadataRaft() error { time.Sleep(time.Second) continue } - if err := resp.Unmarshal(r.Data); err != nil { + resp, err := proto.UnmarshalRaftJoinResponse(r.Data) + if err != nil { time.Sleep(time.Second) continue } @@ -320,8 +318,8 @@ func (s *Server) createRaftNode() (bool, error) { if node.State() != raft.Leader { return } - req := &proto.RaftJoinRequest{} - if err := req.Unmarshal(msg.Data); err != nil { + req, err := proto.UnmarshalRaftJoinRequest(msg.Data) + if err != nil { s.logger.Warn("Invalid join request for metadata Raft group") return } @@ -339,7 +337,7 @@ func (s *Server) createRaftNode() (bool, error) { } // Send the response. - r, err := resp.Marshal() + r, err := proto.MarshalRaftJoinResponse(resp) if err != nil { panic(err) } diff --git a/server/replicator.go b/server/replicator.go index 6a1ad491..c3791925 100644 --- a/server/replicator.go +++ b/server/replicator.go @@ -306,6 +306,7 @@ type protocolWriter struct { buf *bytes.Buffer log commitlog.CommitLog lastOffset int64 + dataPos int stop <-chan struct{} } @@ -334,7 +335,7 @@ func (w *protocolWriter) Write(offset int64, headers, message []byte) error { func (w *protocolWriter) Flush(write func([]byte) error) error { data := w.buf.Bytes() // Replace the HW. - proto.Encoding.PutUint64(data[8:], uint64(w.log.HighWatermark())) + proto.Encoding.PutUint64(data[w.dataPos+8:], uint64(w.log.HighWatermark())) if err := write(data); err != nil { w.Reset() @@ -353,6 +354,9 @@ func (w *protocolWriter) Reset() { w.buf.Reset() w.lastOffset = -1 + // Write envelope header. + w.dataPos = proto.WriteReplicationResponseHeader(w.buf) + // Write the leader epoch. binary.Write(w.buf, proto.Encoding, w.replicator.epoch) // Reserve space for the HW. This will be replaced with the HW at the time diff --git a/server/replicator_test.go b/server/replicator_test.go index ce7af169..f82dc352 100644 --- a/server/replicator_test.go +++ b/server/replicator_test.go @@ -839,8 +839,8 @@ func TestReplicatorNotifyNewData(t *testing.T) { follower.config.Clustering.ServerID) ) _, err = nc.Subscribe(inbox, func(msg *nats.Msg) { - req := &proto.PartitionNotification{} - if err := req.Unmarshal(msg.Data); err != nil { + req, err := proto.UnmarshalPartitionNotification(msg.Data) + if err != nil { t.Fatalf("Invalid partition notification: %v", err) } notifications <- req diff --git a/server/server.go b/server/server.go index 77d31acf..d0c884e4 100644 --- a/server/server.go +++ b/server/server.go @@ -521,10 +521,10 @@ func (s *Server) getPropagateInbox() string { // will fail when it's proposed to the Raft cluster. func (s *Server) handlePropagatedRequest(m *nats.Msg) { var ( - req = &proto.PropagatedRequest{} - resp []byte + req, err = proto.UnmarshalPropagatedRequest(m.Data) + resp *proto.PropagatedResponse ) - if err := req.Unmarshal(m.Data); err != nil { + if err != nil { s.logger.Warnf("Invalid propagated request: %v", err) return } @@ -541,65 +541,53 @@ func (s *Server) handlePropagatedRequest(m *nats.Msg) { s.logger.Warnf("Unknown propagated request operation: %s", req.Op) return } - if err := m.Respond(resp); err != nil { + data, err := proto.MarshalPropagatedResponse(resp) + if err != nil { + panic(err) + } + if err := m.Respond(data); err != nil { s.logger.Errorf("Failed to respond to propagated request: %v", err) } } -func (s *Server) handleCreatePartition(req *proto.PropagatedRequest) []byte { +func (s *Server) handleCreatePartition(req *proto.PropagatedRequest) *proto.PropagatedResponse { resp := &proto.PropagatedResponse{ Op: req.Op, } if err := s.metadata.CreatePartition(context.Background(), req.CreatePartitionOp); err != nil { resp.Error = &proto.Error{Code: uint32(err.Code()), Msg: err.Message()} } - data, err := resp.Marshal() - if err != nil { - panic(err) - } - return data + return resp } -func (s *Server) handleShrinkISR(req *proto.PropagatedRequest) []byte { +func (s *Server) handleShrinkISR(req *proto.PropagatedRequest) *proto.PropagatedResponse { resp := &proto.PropagatedResponse{ Op: req.Op, } if err := s.metadata.ShrinkISR(context.Background(), req.ShrinkISROp); err != nil { resp.Error = &proto.Error{Code: uint32(err.Code()), Msg: err.Message()} } - data, err := resp.Marshal() - if err != nil { - panic(err) - } - return data + return resp } -func (s *Server) handleExpandISR(req *proto.PropagatedRequest) []byte { +func (s *Server) handleExpandISR(req *proto.PropagatedRequest) *proto.PropagatedResponse { resp := &proto.PropagatedResponse{ Op: req.Op, } if err := s.metadata.ExpandISR(context.Background(), req.ExpandISROp); err != nil { resp.Error = &proto.Error{Code: uint32(err.Code()), Msg: err.Message()} } - data, err := resp.Marshal() - if err != nil { - panic(err) - } - return data + return resp } -func (s *Server) handleReportLeader(req *proto.PropagatedRequest) []byte { +func (s *Server) handleReportLeader(req *proto.PropagatedRequest) *proto.PropagatedResponse { resp := &proto.PropagatedResponse{ Op: req.Op, } if err := s.metadata.ReportLeader(context.Background(), req.ReportLeaderOp); err != nil { resp.Error = &proto.Error{Code: uint32(err.Code()), Msg: err.Message()} } - data, err := resp.Marshal() - if err != nil { - panic(err) - } - return data + return resp } func (s *Server) isShutdown() bool { @@ -653,8 +641,8 @@ func (s *Server) natsErrorHandler(nc *nats.Conn, sub *nats.Subscription, err err // handleServerInfoRequest is a NATS handler used to process requests for // server information used in the metadata API. func (s *Server) handleServerInfoRequest(m *nats.Msg) { - req := &proto.ServerInfoRequest{} - if err := req.Unmarshal(m.Data); err != nil { + req, err := proto.UnmarshalServerInfoRequest(m.Data) + if err != nil { s.logger.Warnf("Dropping invalid server info request: %v", err) return } @@ -665,11 +653,11 @@ func (s *Server) handleServerInfoRequest(m *nats.Msg) { } connectionAddress := s.config.GetConnectionAddress() - data, err := (&proto.ServerInfoResponse{ + data, err := proto.MarshalServerInfoResponse(&proto.ServerInfoResponse{ Id: s.config.Clustering.ServerID, Host: connectionAddress.Host, Port: int32(connectionAddress.Port), - }).Marshal() + }) if err != nil { panic(err) } @@ -683,8 +671,8 @@ func (s *Server) handleServerInfoRequest(m *nats.Msg) { // querying the status of a partition. This is used as a readiness check to // determine if a created partition has actually started. func (s *Server) handlePartitionStatusRequest(m *nats.Msg) { - req := &proto.PartitionStatusRequest{} - if err := req.Unmarshal(m.Data); err != nil { + req, err := proto.UnmarshalPartitionStatusRequest(m.Data) + if err != nil { s.logger.Warnf("Dropping invalid partition status request: %v", err) return } @@ -696,7 +684,7 @@ func (s *Server) handlePartitionStatusRequest(m *nats.Msg) { resp.IsLeader = partition.IsLeader() } - data, err := resp.Marshal() + data, err := proto.MarshalPartitionStatusResponse(resp) if err != nil { panic(err) } @@ -717,8 +705,8 @@ func (s *Server) handlePartitionStatusRequest(m *nats.Msg) { // caught up and send a notification in order to wake an idle follower back up // when new data is written to the log. func (s *Server) handlePartitionNotification(m *nats.Msg) { - req := &proto.PartitionNotification{} - if err := req.Unmarshal(m.Data); err != nil { + req, err := proto.UnmarshalPartitionNotification(m.Data) + if err != nil { s.logger.Warnf("Dropping invalid partition notification: %v", err) return }