Skip to content

Commit

Permalink
init commit
Browse files Browse the repository at this point in the history
  • Loading branch information
徐志强 committed Jul 18, 2018
0 parents commit 3a0de84
Show file tree
Hide file tree
Showing 11 changed files with 986 additions and 0 deletions.
134 changes: 134 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package qrpc

import (
"context"
"net"
"sync"
)

// Client defines a qrpc client
type Client struct {
mu sync.Mutex
conf ConnectionConfig
connections map[string]*sync.Pool
}

// Connection defines a qrpc connection
// it is not thread safe, because writer is not
type Connection struct {
net.Conn
reader *defaultFrameReader
writer FrameWriter
p *sync.Pool
conf ConnectionConfig
writeFrameCh chan writeFrameRequest // written by FrameWriter
subscriber SubFunc // there can be only one subscriber because of streamed frames
}

// NewClient creates a Client instance
func NewClient(conf ConnectionConfig) *Client {
cli := &Client{conf: conf, connections: make(map[string]*sync.Pool)}
return cli
}

// NewConnection creates a connection without Client
func NewConnection(addr string, conf ConnectionConfig, f func(*Frame)) (*Connection, error) {
return newConnectionWithPool(addr, conf, nil, SubFunc(f))
}

func newConnectionWithPool(addr string, conf ConnectionConfig, p *sync.Pool, f SubFunc) (*Connection, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}

if conf.Ctx == nil {
conf.Ctx = context.Background()
}

c := &Connection{Conn: conn, conf: conf, writeFrameCh: make(chan writeFrameRequest), subscriber: f}
c.reader = newFrameReader(conf.Ctx, conn, conf.ReadTimeout)
c.writer = NewFrameWriter(conf.Ctx, c.writeFrameCh)

go c.readFrames()
go c.writeFrames()

return c, nil
}

// GetConn get a connection from Client
func (cli *Client) GetConn(addr string, f func(*Frame)) *Connection {

cli.mu.Lock()

p, ok := cli.connections[addr]
if !ok {
p = &sync.Pool{}
newFunc := func() interface{} {

conn, err := newConnectionWithPool(addr, cli.conf, p, SubFunc(f))
if err != nil {
return nil
}

return conn
}
p.New = newFunc
cli.connections[addr] = p
}
cli.mu.Unlock()

return p.Get().(*Connection)
}

// Request send a request frame and returns response frame
func (conn *Connection) Request(cmd Cmd, flags PacketFlag, payload []byte) (*Frame, error) {

return nil, nil
}

// Close internally returns the connection to pool
func (conn *Connection) Close() error {
if conn.p != nil {
conn.p.Put(conn)
return nil
}

return conn.Conn.Close()
}

var requestID uint64

func (conn *Connection) readFrames() {
defer func() {
conn.Close()
}()
for {
frame, err := conn.reader.ReadFrame()
if err != nil {
return
}

if frame.Flags&PushFlag != 0 {
// pushed frame
if conn.subscriber != nil {
conn.subscriber(frame)
}

return
}

// deal with pulled frames
}
}

// Subscribe register f as callback for pushed message
func (conn *Connection) Subscribe(f func(*Frame)) {
if conn.subscriber != nil {
panic("only one subscriber allowed")
}
conn.subscriber = f
}
func (conn *Connection) writeFrames() {

}
23 changes: 23 additions & 0 deletions conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package qrpc

import (
"context"
)

// ServerBinding contains binding infos
type ServerBinding struct {
Addr string
Handler Handler // handler to invoke
DefaultReadTimeout int
DefaultWriteTimeout int
}

// SubFunc for subscribe callback
type SubFunc func(*Frame)

// ConnectionConfig is conf for Connection
type ConnectionConfig struct {
Ctx context.Context
WriteTimeout int
ReadTimeout int
}
160 changes: 160 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package qrpc

import (
"context"
"errors"
"net"
"runtime"
)

// A conn represents the server side of an qrpc connection.
type conn struct {
// server is the server on which the connection arrived.
// Immutable; never nil.
server *Server

// cancelCtx cancels the connection-level context.
cancelCtx context.CancelFunc

// rwc is the underlying network connection.
// This is never wrapped by other types and is the value given out
// to CloseNotifier callers. It is usually of type *net.TCPConn
rwc net.Conn

reader *defaultFrameReader // used in conn.readFrames
writer FrameWriter // used by handlers
readFrameCh chan readFrameResult // written by conn.readFrames
writeFrameCh chan writeFrameRequest // written by FrameWriter

}

// ConnectionInfoKey is context key for ConnectionInfo
// used to store custom information
var ConnectionInfoKey = &contextKey{"qrpc-connection"}

// Serve a new connection.
func (c *conn) serve(ctx context.Context, idx int) {

defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.rwc.RemoteAddr().String(), err, buf)
}
c.close()
}()

ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()

c.reader = newFrameReader(ctx, c.rwc, c.server.bindings[idx].DefaultReadTimeout)
c.writer = NewFrameWriter(ctx, c.writeFrameCh)

go c.readFrames(ctx)
go c.writeFrames(ctx, c.server.bindings[idx].DefaultWriteTimeout)

handler := c.server.bindings[idx].Handler

for {
select {
case <-ctx.Done():
return
case res := <-c.readFrameCh:
if res.f.ctx != nil {
panic("res.f.ctx is not nil")
}
res.f.ctx = ctx
if res.f.Flags&NBFlag == 0 {
handler.ServeQRPC(c.writer, res.f)
res.readMore()
} else {
res.readMore()
go handler.ServeQRPC(c.writer, res.f)
}
}

}

}

// ErrInvalidPacket when packet invalid
var ErrInvalidPacket = errors.New("invalid packet")

type readFrameResult struct {
f *Frame // valid until readMore is called

// readMore should be called once the consumer no longer needs or
// retains f. After readMore, f is invalid and more frames can be
// read.
readMore func()
}

type writeFrameRequest struct {
frame []byte
result chan error
}

// A gate lets two goroutines coordinate their activities.
type gate chan struct{}

func (g gate) Done() { g <- struct{}{} }

func (c *conn) readFrames(ctx context.Context) (err error) {

defer func() {
if err != nil {
c.close()
}
}()
gate := make(gate)
gateDone := gate.Done

for {
req, err := c.reader.ReadFrame()
if err != nil {
return err
}
select {
case c.readFrameCh <- readFrameResult{f: req, readMore: gateDone}:
case <-ctx.Done():
return nil
}

select {
case <-gate:
case <-ctx.Done():
return nil
}
}

}

func (c *conn) writeFrames(ctx context.Context, timeout int) (err error) {

writer := NewWriterWithTimeout(c.rwc, timeout)
for {
select {
case res := <-c.writeFrameCh:
_, err := writer.Write(res.frame)
if err != nil {
c.close()
}
res.result <- err
case <-ctx.Done():
return nil
}
}
}

// Close the connection.
func (c *conn) close() {
c.finalFlush()
c.rwc.Close()
c.cancelCtx()
}

// return to pool
func (c *conn) finalFlush() {
}
50 changes: 50 additions & 0 deletions frame.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package qrpc

import "context"

// Frame models a qrpc frame
// all fields are readly only
type Frame struct {
RequestID uint64
Flags PacketFlag
Cmd Cmd
Payload []byte
FrameCh <-chan *Frame // non nil for the first frame in stream

// ctx is either the client or server context. It should only
// be modified via copying the whole Request using WithContext.
// It is unexported to prevent people from using Context wrong
// and mutating the contexts held by callers of the same request.
ctx context.Context //fyi: https://www.reddit.com/r/golang/comments/69j71a/why_does_httprequestwithcontext_do_a_shallow_copy/
}

// Context returns the request's context. To change the context, use
// WithContext.
//
// The returned context is always non-nil; it defaults to the
// background context.
//
// For outgoing client requests, the context controls cancelation.
//
// For incoming server requests, the context is canceled when the
// client's connection closes, the request is canceled (with HTTP/2),
// or when the ServeHTTP method returns.
func (r *Frame) Context() context.Context {
if r.ctx != nil {
return r.ctx
}
return context.Background()
}

// WithContext returns a shallow copy of r with its context changed
// to ctx. The provided ctx must be non-nil.
func (r *Frame) WithContext(ctx context.Context) *Frame {
if ctx == nil {
panic("nil context")
}
r2 := new(Frame)
*r2 = *r
r2.ctx = ctx

return r2
}
Loading

0 comments on commit 3a0de84

Please sign in to comment.