Skip to content

Commit

Permalink
协议编解码 注释
Browse files Browse the repository at this point in the history
  • Loading branch information
kingofzihua committed Feb 5, 2021
1 parent b6ac41b commit 30b14a7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 44 deletions.
66 changes: 33 additions & 33 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ type Codec interface {
Decode([]byte) ([]byte, error)
}

const FrameHeadLen = 15
const Magic = 0x11
const Version = 0
const FrameHeadLen = 15 //定义数据帧头大小
const Magic = 0x11 //定义魔数
const Version = 0 //当前版本

// FrameHeader describes the header structure of a data frame
// 数据帧头
type FrameHeader struct {
Magic uint8 // magic
Version uint8 // version
MsgType uint8 // msg type e.g. : 0x0: general req, 0x1: heartbeat
ReqType uint8 // request type e.g. : 0x0: send and receive, 0x1: send but not receive, 0x2: client stream request, 0x3: server stream request, 0x4: bidirectional streaming request
CompressType uint8 // compression or not : 0x0: not compression, 0x1: compression
StreamID uint16 // stream ID
Length uint32 // total packet length
Reserved uint32 // 4 bytes reserved
Magic uint8 // 魔数 => 硬写到代码里的整数常量
Version uint8 // 版本号 用来支持版本迭代
MsgType uint8 // 消息类型 e.g. : 0x0: 普通消息 , 0x1: 心跳消息
ReqType uint8 // 请求类型 e.g. : 0x0: 一发一收, 0x1: 只发不收, 0x2: 客户端流式请求, 0x3: 服务端流式请求, 0x4: 双向流式请求
CompressType uint8 // 是否压缩 : 0x0: 不压缩, 0x1: 压缩
StreamID uint16 // 流 id 为了支持后续流式传输的能力
Length uint32 // 消息的长度
Reserved uint32 // 4个字节的保留位
}

// GetCodec get a Codec by a codec name
Expand All @@ -45,7 +45,7 @@ var codecMap = make(map[string]Codec)
var DefaultCodec = NewCodec()

// NewCodec returns a globally unique codec
var NewCodec = func () Codec {
var NewCodec = func() Codec {
return &defaultCodec{}
}

Expand All @@ -61,61 +61,61 @@ func RegisterCodec(name string, codec Codec) {
codecMap[name] = codec
}

// 编码 => 将一个经过序列化的 request/response 二进制数据,拼接帧头形成一个完整的数据帧
func (c *defaultCodec) Encode(data []byte) ([]byte, error) {

totalLen := FrameHeadLen + len(data)
buffer := bytes.NewBuffer(make([]byte, 0, totalLen))

//封装帧头
frame := FrameHeader{
Magic : Magic,
Version : Version,
MsgType : 0x0,
ReqType : 0x0,
CompressType: 0x0,
Length: uint32(len(data)),
Magic: Magic,
Version: Version,
MsgType: 0x0,
ReqType: 0x0,
CompressType: 0x0, // 默认不压缩
Length: uint32(len(data)),
}

// binary.BigEndian 大端序 => 网络传输一般是大端序

// 拼装帧头
if err := binary.Write(buffer, binary.BigEndian, frame.Magic); err != nil {
return nil, err
}

if err := binary.Write(buffer, binary.BigEndian, frame.Version); err != nil {
return nil, err
}

if err := binary.Write(buffer, binary.BigEndian, frame.MsgType); err != nil {
return nil, err
}

if err := binary.Write(buffer, binary.BigEndian, frame.ReqType); err != nil {
return nil, err
}

if err := binary.Write(buffer, binary.BigEndian, frame.CompressType); err != nil {
return nil, err
}

if err := binary.Write(buffer, binary.BigEndian, frame.StreamID); err != nil {
return nil, err
}

if err := binary.Write(buffer, binary.BigEndian, frame.Length); err != nil {
return nil, err
}

if err := binary.Write(buffer, binary.BigEndian, frame.Reserved); err != nil {
return nil, err
}

// 拼装包数据成为一个完整的数据帧
if err := binary.Write(buffer, binary.BigEndian, data); err != nil {
return nil, err
}

return buffer.Bytes(), nil
}


func (c *defaultCodec) Decode(frame []byte) ([]byte,error) {
// 解码
func (c *defaultCodec) Decode(frame []byte) ([]byte, error) {
//去掉帧头,就是包头+包体
return frame[FrameHeadLen:], nil
}

Expand All @@ -129,15 +129,15 @@ func upperLimit(val int) uint32 {
}

var bufferPool = &sync.Pool{
New : func() interface {} {
return &cachedBuffer {
Buffer : proto.Buffer{},
lastMarshaledSize : 16,
New: func() interface{} {
return &cachedBuffer{
Buffer: proto.Buffer{},
lastMarshaledSize: 16,
}
},
}

type cachedBuffer struct {
proto.Buffer
lastMarshaledSize uint32
}
}
29 changes: 18 additions & 11 deletions transport/transport.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Network communication layer, responsible for the bottom layer of network communication,
// mainly including tcp && udp two protocol implementation
// 网络通信层,负责底层网络通信
// 主要包括 tcp udp 两种协议的实现
package transport

import (
Expand All @@ -15,65 +15,72 @@ import (
const DefaultPayloadLength = 1024
const MaxPayloadLength = 4 * 1024 * 1024

// ServerTransport defines the criteria that all server transport layers
// ServerTransport 定义所有 Server 传输层
// need to support
type ServerTransport interface {
// monitoring and processing of requests
// 实现请求的监听和处理,所有的 server transport 都需要实现这个方法,
// 同时设计成 interface 接口的方式,主要是为了实现可插拔,支持业务自定义
ListenAndServe(context.Context, ...ServerTransportOption) error
}

// ClientTransport defines the criteria that all client transport layers
// ClientTransport 定义所有 Client 传输层
// need to support
type ClientTransport interface {
// send requests
// 发起请求调用,传参除了上下文 context 之外,还有二进制的请求包 request,返回是一个二进制的完整数据帧
Send(context.Context, []byte, ...ClientTransportOption) ([]byte, error)
}

// Framer defines the reading of data frames from a data stream
// Framer 定义从数据流中读取数据帧
type Framer interface {
// read a full frame
// 读取数据帧的通用化定义
ReadFrame(net.Conn) ([]byte, error)
}

type framer struct {
buffer []byte
counter int // to prevent the dead loop
counter int // 防止死循环
}

// Create a Framer
// 创建一个数据帧
func NewFramer() Framer {
return &framer{
buffer: make([]byte, DefaultPayloadLength),
}
}

//重新规划大小 会扩容成原来的两倍
func (f *framer) Resize() {
f.buffer = make([]byte, len(f.buffer)*2)
}


func (f *framer) ReadFrame(conn net.Conn) ([]byte, error) {

//读取出 15 byte 的帧头
frameHeader := make([]byte, codec.FrameHeadLen)
if num, err := io.ReadFull(conn, frameHeader); num != codec.FrameHeadLen || err != nil {
return nil, err
}

// validate magic
// 验证魔数
if magic := uint8(frameHeader[0]); magic != codec.Magic {
return nil, codes.NewFrameworkError(codes.ClientMsgErrorCode, "invalid magic...")
}

//从帧头中获取包头 + 包体总长度 length ( 7~11 存储的是包的长度)
length := binary.BigEndian.Uint32(frameHeader[7:11])

if length > MaxPayloadLength {
return nil, codes.NewFrameworkError(codes.ClientMsgErrorCode, "payload too large...")
}

//当 buffer > 4M 时或者 扩容的次数 counter 大于 12 时,会跳出循环,不再 Resize
for uint32(len(f.buffer)) < length && f.counter <= 12 {
f.buffer = make([]byte, len(f.buffer)*2)
f.counter++
}

//读取 包头 + 包体
if num, err := io.ReadFull(conn, f.buffer[:length]); uint32(num) != length || err != nil {
return nil, err
}
Expand Down

0 comments on commit 30b14a7

Please sign in to comment.