Skip to content

Commit

Permalink
README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
徐志强 committed Jul 27, 2018
1 parent 2b9cbbc commit f707ee6
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 3 deletions.
245 changes: 245 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# qrpc, tiny but powerful rpc framework

**qrpc** makes it tremendously easy to to perform rpc by offering 3 core features:

> * `blocking` or `nonblocking`
> * `streaming` or `nonstreaming`
> * `server push`
By default each frame is `blocking` and `nonstreaming`, this allows traditional block-in-header sequencial behaviour like `http/1.1`, but you can make it behave tremendously different by attach flags to your frames!

------

# Enough talk, let's demo!

## blocking mode

### `server.go`:
```
package main
import "github.com/zhiqiangxu/qrpc"
const (
HelloCmd qrpc.Cmd = iota
HelloRespCmd
)
func main() {
handler := qrpc.NewServeMux()
handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
writer.StartWrite(request.RequestID, HelloRespCmd, 0)
writer.WriteBytes(append([]byte("hello world "), request.Payload...))
writer.EndWrite()
})
bindings := []qrpc.ServerBinding{
qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
server := qrpc.NewServer(bindings)
server.ListenAndServe()
}
```

### `client.go`:

```
package main
import (
"fmt"
"github.com/zhiqiangxu/qrpc"
)
const (
HelloCmd qrpc.Cmd = iota
)
func main() {
conf := qrpc.ConnectionConfig{}
cli := qrpc.NewClient(conf)
conn := cli.GetConn("0.0.0.0:8080", nil)
_, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))
frame := resp.GetFrame()
fmt.Println("resp is", string(frame.Payload))
}
```

In the above example, server will process each client frames **in sequence order**.

## Nonblocking mode

To use this mode we only need to change 1 line in `client.go`:
```diff
- _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))
+ _, resp, _ := conn.Request(HelloCmd, qrpc.NBFlag, []byte("xu"))
```
In this mode `request` frames will be processed concurrently!

## stream mode

`stream` is like `chunked transfer` in `http`, besides, it's **bidirectional**, we can make either `request` or `response` in `stream`, or we can make both!

### Make `request` in `stream` mode:

### `streamclient.go`:

```
package main
import (
"fmt"
"github.com/zhiqiangxu/qrpc"
)
const (
HelloCmd qrpc.Cmd = iota
)
func main() {
conf := qrpc.ConnectionConfig{}
cli := qrpc.NewClient(conf)
conn := cli.GetConn("0.0.0.0:8080", nil)
writer, resp, _ := conn.StreamRequest(HelloCmd, 0, []byte("first frame"))
writer.StartWrite(HelloCmd)
writer.WriteBytes([]byte("last frame"))
writer.EndWrite(true) // will attach StreamEndFlag
frame := resp.GetFrame()
fmt.Println("resp is", string(frame.Payload))
}
```

### `streamserver.go`:
```
package main
import (
"github.com/zhiqiangxu/qrpc"
"fmt"
)
const (
HelloCmd qrpc.Cmd = iota
HelloRespCmd
)
func main() {
handler := qrpc.NewServeMux()
handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
writer.StartWrite(request.RequestID, HelloRespCmd, 0)
writer.WriteBytes(append([]byte("first frame "), request.Payload...))
for {
continueFrames := <-request.FrameCh()
if continueFrames == nil {
break
}
writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...))
}
writer.EndWrite()
})
bindings := []qrpc.ServerBinding{
qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
server := qrpc.NewServer(bindings)
err := server.ListenAndServe()
if err != nil {
panic(err)
}
}
```

In a similar fasion we can also make `response` in `stream` mode:

```
package main
import (
"github.com/zhiqiangxu/qrpc"
"fmt"
)
const (
HelloCmd qrpc.Cmd = iota
HelloRespCmd
)
func main() {
handler := qrpc.NewServeMux()
handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag)
writer.WriteBytes(append([]byte("first frame "), request.Payload...))
writer.EndWrite()
for {
continueFrames := <-request.FrameCh()
if continueFrames == nil {
break
}
fmt.Printf("%s\n", continueFrames.Payload)
writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag)
writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...))
writer.EndWrite()
}
})
bindings := []qrpc.ServerBinding{
qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
server := qrpc.NewServer(bindings)
err := server.ListenAndServe()
if err != nil {
panic(err)
}
}
```

The key is `StreamFlag`!

## push mode
```
package main
import (
"github.com/zhiqiangxu/qrpc"
"sync"
)
const (
HelloCmd qrpc.Cmd = iota
HelloRespCmd
)
func main() {
handler := qrpc.NewServeMux()
handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
var (
wg sync.WaitGroup
)
qserver := request.ConnectionInfo().SC.Server()
pushID := qserver.GetPushID()
qserver.WalkConn(0, func(writer qrpc.FrameWriter, ci *qrpc.ConnectionInfo) bool {
qrpc.GoFunc(&wg, func() {
writer.StartWrite(pushID, HelloCmd, qrpc.PushFlag)
writer.WriteBytes([]byte("pushed msg"))
})
return true
})
wg.Wait()
writer.StartWrite(request.RequestID, HelloRespCmd, 0)
writer.WriteBytes(append([]byte("push done"), request.Payload...))
writer.EndWrite()
})
bindings := []qrpc.ServerBinding{
qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}}
server := qrpc.NewServer(bindings)
err := server.ListenAndServe()
if err != nil {
panic(err)
}
}
```

In the above example, server will `push` a message to all connections !

To handle `pushed` message, the relevant change at `client` side is:

```diff
- conn := cli.GetConn("0.0.0.0:8080", nil)
+ conn := cli.GetConn("0.0.0.0:8080", func(conn *qrpc.Connection, pushedFrame *qrpc.Frame) {
+ fmt.Println(pushedFrame)
+ })
```
4 changes: 2 additions & 2 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ func (dsw *defaultStreamWriter) EndWrite(end bool) error {
}

// StreamRequest is for streamed request
func (conn *Connection) StreamRequest(cmd Cmd, flags FrameFlag, payload []byte) (Response, StreamWriter, error) {
func (conn *Connection) StreamRequest(cmd Cmd, flags FrameFlag, payload []byte) (StreamWriter, Response, error) {

flags = flags.ToStream()
requestID, resp, writer, err := conn.writeFirstFrame(cmd, flags, payload)
if err != nil {
return nil, nil, err
}
return resp, newStreamWriter(writer, requestID, flags), nil
return newStreamWriter(writer, requestID, flags), resp, nil
}

// Request send a nonstreamed request frame and returns response frame
Expand Down
1 change: 1 addition & 0 deletions framewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (dfw *defaultFrameWriter) EndWrite() error {
byte(length>>16),
byte(length>>8),
byte(length))
_ = append(dfw.wbuf[:12], byte(dfw.flags)) // flags may be changed by StreamWriter

wfr := writeFrameRequest{dfw: dfw, result: make(chan error)}
select {
Expand Down
2 changes: 1 addition & 1 deletion qrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (flg FrameFlag) ToNonStream() FrameFlag {

// ToStream convert flg to streamed flag
func (flg FrameFlag) ToStream() FrameFlag {
return flg & StreamFlag
return flg | StreamFlag
}

// ToEndStream set StreamEndFlag on
Expand Down

0 comments on commit f707ee6

Please sign in to comment.