diff --git a/README.md b/README.md new file mode 100644 index 0000000..6ac39bd --- /dev/null +++ b/README.md @@ -0,0 +1,245 @@ +# qrpc, tiny but powerful rpc framework + +**qrpc** makes it tremendously easy to to perform rpc by offering 3 core features: + +> * `blocking` or `nonblocking` +> * `streaming` or `nonstreaming` +> * `server push` + +By default each frame is `blocking` and `nonstreaming`, this allows traditional block-in-header sequencial behaviour like `http/1.1`, but you can make it behave tremendously different by attach flags to your frames! + +------ + +# Enough talk, let's demo! + +## blocking mode + +### `server.go`: +``` +package main +import "github.com/zhiqiangxu/qrpc" + +const ( + HelloCmd qrpc.Cmd = iota + HelloRespCmd +) +func main() { + handler := qrpc.NewServeMux() + handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) { + writer.StartWrite(request.RequestID, HelloRespCmd, 0) + + writer.WriteBytes(append([]byte("hello world "), request.Payload...)) + writer.EndWrite() + }) + bindings := []qrpc.ServerBinding{ + qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} + server := qrpc.NewServer(bindings) + server.ListenAndServe() +} + +``` + +### `client.go`: + +``` +package main +import ( + "fmt" + "github.com/zhiqiangxu/qrpc" +) + +const ( + HelloCmd qrpc.Cmd = iota +) +func main() { + conf := qrpc.ConnectionConfig{} + cli := qrpc.NewClient(conf) + + conn := cli.GetConn("0.0.0.0:8080", nil) + + _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu")) + frame := resp.GetFrame() + fmt.Println("resp is", string(frame.Payload)) +} +``` + +In the above example, server will process each client frames **in sequence order**. + +## Nonblocking mode + +To use this mode we only need to change 1 line in `client.go`: +```diff +- _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu")) ++ _, resp, _ := conn.Request(HelloCmd, qrpc.NBFlag, []byte("xu")) +``` +In this mode `request` frames will be processed concurrently! + +## stream mode + +`stream` is like `chunked transfer` in `http`, besides, it's **bidirectional**, we can make either `request` or `response` in `stream`, or we can make both! + +### Make `request` in `stream` mode: + +### `streamclient.go`: + +``` +package main +import ( + "fmt" + "github.com/zhiqiangxu/qrpc" +) + +const ( + HelloCmd qrpc.Cmd = iota +) +func main() { + conf := qrpc.ConnectionConfig{} + cli := qrpc.NewClient(conf) + + conn := cli.GetConn("0.0.0.0:8080", nil) + + writer, resp, _ := conn.StreamRequest(HelloCmd, 0, []byte("first frame")) + writer.StartWrite(HelloCmd) + writer.WriteBytes([]byte("last frame")) + writer.EndWrite(true) // will attach StreamEndFlag + frame := resp.GetFrame() + fmt.Println("resp is", string(frame.Payload)) +} + +``` + +### `streamserver.go`: +``` +package main +import ( + "github.com/zhiqiangxu/qrpc" + "fmt" +) + +const ( + HelloCmd qrpc.Cmd = iota + HelloRespCmd +) +func main() { + handler := qrpc.NewServeMux() + handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) { + writer.StartWrite(request.RequestID, HelloRespCmd, 0) + + writer.WriteBytes(append([]byte("first frame "), request.Payload...)) + + for { + continueFrames := <-request.FrameCh() + if continueFrames == nil { + break + } + writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...)) + } + writer.EndWrite() + }) + bindings := []qrpc.ServerBinding{ + qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} + server := qrpc.NewServer(bindings) + err := server.ListenAndServe() + if err != nil { + panic(err) + } +} +``` + +In a similar fasion we can also make `response` in `stream` mode: + +``` +package main +import ( + "github.com/zhiqiangxu/qrpc" + "fmt" +) + +const ( + HelloCmd qrpc.Cmd = iota + HelloRespCmd +) +func main() { + handler := qrpc.NewServeMux() + handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) { + writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag) + writer.WriteBytes(append([]byte("first frame "), request.Payload...)) + writer.EndWrite() + + for { + continueFrames := <-request.FrameCh() + if continueFrames == nil { + break + } + + fmt.Printf("%s\n", continueFrames.Payload) + writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag) + writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...)) + writer.EndWrite() + } + }) + bindings := []qrpc.ServerBinding{ + qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} + server := qrpc.NewServer(bindings) + err := server.ListenAndServe() + if err != nil { + panic(err) + } +} +``` + +The key is `StreamFlag`! + +## push mode +``` +package main +import ( + "github.com/zhiqiangxu/qrpc" + "sync" +) + +const ( + HelloCmd qrpc.Cmd = iota + HelloRespCmd +) +func main() { + handler := qrpc.NewServeMux() + handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) { + var ( + wg sync.WaitGroup + ) + qserver := request.ConnectionInfo().SC.Server() + pushID := qserver.GetPushID() + qserver.WalkConn(0, func(writer qrpc.FrameWriter, ci *qrpc.ConnectionInfo) bool { + qrpc.GoFunc(&wg, func() { + writer.StartWrite(pushID, HelloCmd, qrpc.PushFlag) + writer.WriteBytes([]byte("pushed msg")) + }) + return true + }) + wg.Wait() + + writer.StartWrite(request.RequestID, HelloRespCmd, 0) + writer.WriteBytes(append([]byte("push done"), request.Payload...)) + writer.EndWrite() + }) + bindings := []qrpc.ServerBinding{ + qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} + server := qrpc.NewServer(bindings) + err := server.ListenAndServe() + if err != nil { + panic(err) + } +} +``` + +In the above example, server will `push` a message to all connections ! + +To handle `pushed` message, the relevant change at `client` side is: + +```diff +- conn := cli.GetConn("0.0.0.0:8080", nil) ++ conn := cli.GetConn("0.0.0.0:8080", func(conn *qrpc.Connection, pushedFrame *qrpc.Frame) { ++ fmt.Println(pushedFrame) ++ }) +``` \ No newline at end of file diff --git a/clientconn.go b/clientconn.go index 731d6ea..2805200 100644 --- a/clientconn.go +++ b/clientconn.go @@ -169,14 +169,14 @@ func (dsw *defaultStreamWriter) EndWrite(end bool) error { } // StreamRequest is for streamed request -func (conn *Connection) StreamRequest(cmd Cmd, flags FrameFlag, payload []byte) (Response, StreamWriter, error) { +func (conn *Connection) StreamRequest(cmd Cmd, flags FrameFlag, payload []byte) (StreamWriter, Response, error) { flags = flags.ToStream() requestID, resp, writer, err := conn.writeFirstFrame(cmd, flags, payload) if err != nil { return nil, nil, err } - return resp, newStreamWriter(writer, requestID, flags), nil + return newStreamWriter(writer, requestID, flags), resp, nil } // Request send a nonstreamed request frame and returns response frame diff --git a/framewriter.go b/framewriter.go index d5363a1..c3c9c93 100644 --- a/framewriter.go +++ b/framewriter.go @@ -67,6 +67,7 @@ func (dfw *defaultFrameWriter) EndWrite() error { byte(length>>16), byte(length>>8), byte(length)) + _ = append(dfw.wbuf[:12], byte(dfw.flags)) // flags may be changed by StreamWriter wfr := writeFrameRequest{dfw: dfw, result: make(chan error)} select { diff --git a/qrpc.go b/qrpc.go index 8d88f8b..a327b37 100644 --- a/qrpc.go +++ b/qrpc.go @@ -28,7 +28,7 @@ func (flg FrameFlag) ToNonStream() FrameFlag { // ToStream convert flg to streamed flag func (flg FrameFlag) ToStream() FrameFlag { - return flg & StreamFlag + return flg | StreamFlag } // ToEndStream set StreamEndFlag on