From 3a0de845c792461f8880b27b4c605a0efc3d97f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=BF=97=E5=BC=BA?= Date: Wed, 18 Jul 2018 14:54:22 +0800 Subject: [PATCH] init commit --- client.go | 134 ++++++++++++++++++++++++ conf.go | 23 +++++ conn.go | 160 +++++++++++++++++++++++++++++ frame.go | 50 +++++++++ framereader.go | 118 ++++++++++++++++++++++ framewriter.go | 88 ++++++++++++++++ qrpc.go | 28 ++++++ reader.go | 54 ++++++++++ server.go | 252 ++++++++++++++++++++++++++++++++++++++++++++++ test/qrpc_test.go | 31 ++++++ writer.go | 48 +++++++++ 11 files changed, 986 insertions(+) create mode 100644 client.go create mode 100644 conf.go create mode 100644 conn.go create mode 100644 frame.go create mode 100644 framereader.go create mode 100644 framewriter.go create mode 100644 qrpc.go create mode 100644 reader.go create mode 100644 server.go create mode 100644 test/qrpc_test.go create mode 100644 writer.go diff --git a/client.go b/client.go new file mode 100644 index 0000000..b2ad383 --- /dev/null +++ b/client.go @@ -0,0 +1,134 @@ +package qrpc + +import ( + "context" + "net" + "sync" +) + +// Client defines a qrpc client +type Client struct { + mu sync.Mutex + conf ConnectionConfig + connections map[string]*sync.Pool +} + +// Connection defines a qrpc connection +// it is not thread safe, because writer is not +type Connection struct { + net.Conn + reader *defaultFrameReader + writer FrameWriter + p *sync.Pool + conf ConnectionConfig + writeFrameCh chan writeFrameRequest // written by FrameWriter + subscriber SubFunc // there can be only one subscriber because of streamed frames +} + +// NewClient creates a Client instance +func NewClient(conf ConnectionConfig) *Client { + cli := &Client{conf: conf, connections: make(map[string]*sync.Pool)} + return cli +} + +// NewConnection creates a connection without Client +func NewConnection(addr string, conf ConnectionConfig, f func(*Frame)) (*Connection, error) { + return newConnectionWithPool(addr, conf, nil, SubFunc(f)) +} + +func newConnectionWithPool(addr string, conf ConnectionConfig, p *sync.Pool, f SubFunc) (*Connection, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + + if conf.Ctx == nil { + conf.Ctx = context.Background() + } + + c := &Connection{Conn: conn, conf: conf, writeFrameCh: make(chan writeFrameRequest), subscriber: f} + c.reader = newFrameReader(conf.Ctx, conn, conf.ReadTimeout) + c.writer = NewFrameWriter(conf.Ctx, c.writeFrameCh) + + go c.readFrames() + go c.writeFrames() + + return c, nil +} + +// GetConn get a connection from Client +func (cli *Client) GetConn(addr string, f func(*Frame)) *Connection { + + cli.mu.Lock() + + p, ok := cli.connections[addr] + if !ok { + p = &sync.Pool{} + newFunc := func() interface{} { + + conn, err := newConnectionWithPool(addr, cli.conf, p, SubFunc(f)) + if err != nil { + return nil + } + + return conn + } + p.New = newFunc + cli.connections[addr] = p + } + cli.mu.Unlock() + + return p.Get().(*Connection) +} + +// Request send a request frame and returns response frame +func (conn *Connection) Request(cmd Cmd, flags PacketFlag, payload []byte) (*Frame, error) { + + return nil, nil +} + +// Close internally returns the connection to pool +func (conn *Connection) Close() error { + if conn.p != nil { + conn.p.Put(conn) + return nil + } + + return conn.Conn.Close() +} + +var requestID uint64 + +func (conn *Connection) readFrames() { + defer func() { + conn.Close() + }() + for { + frame, err := conn.reader.ReadFrame() + if err != nil { + return + } + + if frame.Flags&PushFlag != 0 { + // pushed frame + if conn.subscriber != nil { + conn.subscriber(frame) + } + + return + } + + // deal with pulled frames + } +} + +// Subscribe register f as callback for pushed message +func (conn *Connection) Subscribe(f func(*Frame)) { + if conn.subscriber != nil { + panic("only one subscriber allowed") + } + conn.subscriber = f +} +func (conn *Connection) writeFrames() { + +} diff --git a/conf.go b/conf.go new file mode 100644 index 0000000..2d819e0 --- /dev/null +++ b/conf.go @@ -0,0 +1,23 @@ +package qrpc + +import ( + "context" +) + +// ServerBinding contains binding infos +type ServerBinding struct { + Addr string + Handler Handler // handler to invoke + DefaultReadTimeout int + DefaultWriteTimeout int +} + +// SubFunc for subscribe callback +type SubFunc func(*Frame) + +// ConnectionConfig is conf for Connection +type ConnectionConfig struct { + Ctx context.Context + WriteTimeout int + ReadTimeout int +} diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..ca3dc6f --- /dev/null +++ b/conn.go @@ -0,0 +1,160 @@ +package qrpc + +import ( + "context" + "errors" + "net" + "runtime" +) + +// A conn represents the server side of an qrpc connection. +type conn struct { + // server is the server on which the connection arrived. + // Immutable; never nil. + server *Server + + // cancelCtx cancels the connection-level context. + cancelCtx context.CancelFunc + + // rwc is the underlying network connection. + // This is never wrapped by other types and is the value given out + // to CloseNotifier callers. It is usually of type *net.TCPConn + rwc net.Conn + + reader *defaultFrameReader // used in conn.readFrames + writer FrameWriter // used by handlers + readFrameCh chan readFrameResult // written by conn.readFrames + writeFrameCh chan writeFrameRequest // written by FrameWriter + +} + +// ConnectionInfoKey is context key for ConnectionInfo +// used to store custom information +var ConnectionInfoKey = &contextKey{"qrpc-connection"} + +// Serve a new connection. +func (c *conn) serve(ctx context.Context, idx int) { + + defer func() { + if err := recover(); err != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + c.server.logf("http: panic serving %v: %v\n%s", c.rwc.RemoteAddr().String(), err, buf) + } + c.close() + }() + + ctx, cancelCtx := context.WithCancel(ctx) + c.cancelCtx = cancelCtx + defer cancelCtx() + + c.reader = newFrameReader(ctx, c.rwc, c.server.bindings[idx].DefaultReadTimeout) + c.writer = NewFrameWriter(ctx, c.writeFrameCh) + + go c.readFrames(ctx) + go c.writeFrames(ctx, c.server.bindings[idx].DefaultWriteTimeout) + + handler := c.server.bindings[idx].Handler + + for { + select { + case <-ctx.Done(): + return + case res := <-c.readFrameCh: + if res.f.ctx != nil { + panic("res.f.ctx is not nil") + } + res.f.ctx = ctx + if res.f.Flags&NBFlag == 0 { + handler.ServeQRPC(c.writer, res.f) + res.readMore() + } else { + res.readMore() + go handler.ServeQRPC(c.writer, res.f) + } + } + + } + +} + +// ErrInvalidPacket when packet invalid +var ErrInvalidPacket = errors.New("invalid packet") + +type readFrameResult struct { + f *Frame // valid until readMore is called + + // readMore should be called once the consumer no longer needs or + // retains f. After readMore, f is invalid and more frames can be + // read. + readMore func() +} + +type writeFrameRequest struct { + frame []byte + result chan error +} + +// A gate lets two goroutines coordinate their activities. +type gate chan struct{} + +func (g gate) Done() { g <- struct{}{} } + +func (c *conn) readFrames(ctx context.Context) (err error) { + + defer func() { + if err != nil { + c.close() + } + }() + gate := make(gate) + gateDone := gate.Done + + for { + req, err := c.reader.ReadFrame() + if err != nil { + return err + } + select { + case c.readFrameCh <- readFrameResult{f: req, readMore: gateDone}: + case <-ctx.Done(): + return nil + } + + select { + case <-gate: + case <-ctx.Done(): + return nil + } + } + +} + +func (c *conn) writeFrames(ctx context.Context, timeout int) (err error) { + + writer := NewWriterWithTimeout(c.rwc, timeout) + for { + select { + case res := <-c.writeFrameCh: + _, err := writer.Write(res.frame) + if err != nil { + c.close() + } + res.result <- err + case <-ctx.Done(): + return nil + } + } +} + +// Close the connection. +func (c *conn) close() { + c.finalFlush() + c.rwc.Close() + c.cancelCtx() +} + +// return to pool +func (c *conn) finalFlush() { +} diff --git a/frame.go b/frame.go new file mode 100644 index 0000000..bb0a4ed --- /dev/null +++ b/frame.go @@ -0,0 +1,50 @@ +package qrpc + +import "context" + +// Frame models a qrpc frame +// all fields are readly only +type Frame struct { + RequestID uint64 + Flags PacketFlag + Cmd Cmd + Payload []byte + FrameCh <-chan *Frame // non nil for the first frame in stream + + // ctx is either the client or server context. It should only + // be modified via copying the whole Request using WithContext. + // It is unexported to prevent people from using Context wrong + // and mutating the contexts held by callers of the same request. + ctx context.Context //fyi: https://www.reddit.com/r/golang/comments/69j71a/why_does_httprequestwithcontext_do_a_shallow_copy/ +} + +// Context returns the request's context. To change the context, use +// WithContext. +// +// The returned context is always non-nil; it defaults to the +// background context. +// +// For outgoing client requests, the context controls cancelation. +// +// For incoming server requests, the context is canceled when the +// client's connection closes, the request is canceled (with HTTP/2), +// or when the ServeHTTP method returns. +func (r *Frame) Context() context.Context { + if r.ctx != nil { + return r.ctx + } + return context.Background() +} + +// WithContext returns a shallow copy of r with its context changed +// to ctx. The provided ctx must be non-nil. +func (r *Frame) WithContext(ctx context.Context) *Frame { + if ctx == nil { + panic("nil context") + } + r2 := new(Frame) + *r2 = *r + r2.ctx = ctx + + return r2 +} diff --git a/framereader.go b/framereader.go new file mode 100644 index 0000000..7c9240c --- /dev/null +++ b/framereader.go @@ -0,0 +1,118 @@ +package qrpc + +import ( + "context" + "encoding/binary" + "errors" + "net" +) + +// defaultFrameReader is responsible for read frames +// should create one instance per connection +type defaultFrameReader struct { + *Reader + rbuf [16]byte // for header + streamFrameCh map[uint64]chan<- *Frame + ctx context.Context +} + +// newFrameReader creates a FrameWriter instance to read frames +func newFrameReader(ctx context.Context, rwc net.Conn, timeout int) *defaultFrameReader { + return &defaultFrameReader{Reader: NewReaderWithTimeout(rwc, timeout), ctx: ctx} +} + +var ( + // ErrInvalidFrameSize when invalid size + ErrInvalidFrameSize = errors.New("invalid frame size") + // ErrStreamFrameMustNB when stream frame not non block + ErrStreamFrameMustNB = errors.New("streaming frame must be non block") +) + +func (dfr *defaultFrameReader) ReadFrame() (*Frame, error) { + + f, err := dfr.readFrame() + if err != nil { + return f, err + } + + requestID := f.RequestID + flags := f.Flags + + // done for non streamed frame + if flags&StreamFlag == 0 { + return f, nil + } + + // deal with streamed frames + + if flags&NBFlag == 0 { + return nil, ErrStreamFrameMustNB + } + + if flags&StreamEndFlag == 0 { + if dfr.streamFrameCh == nil { + // the first stream for the connection + dfr.streamFrameCh = make(map[uint64]chan<- *Frame) + } + ch, ok := dfr.streamFrameCh[requestID] + if !ok { + // the first frame for the stream + ch = make(chan<- *Frame) + dfr.streamFrameCh[requestID] = ch + return f, nil + } + + // continuation frame for the stream + select { + case ch <- f: + return dfr.ReadFrame() + case <-dfr.ctx.Done(): + return nil, dfr.ctx.Err() + } + } else { + // the ending frame of the stream + if dfr.streamFrameCh == nil { + // ending frame with no prior stream frames + return f, nil + } + ch, ok := dfr.streamFrameCh[requestID] + if !ok { + // ending frame with no prior stream frames + return f, nil + } + // ending frame for the stream + select { + case ch <- f: + delete(dfr.streamFrameCh, requestID) + return dfr.ReadFrame() + case <-dfr.ctx.Done(): + return nil, dfr.ctx.Err() + } + } +} + +func (dfr *defaultFrameReader) readFrame() (*Frame, error) { + + header := dfr.rbuf[:] + err := dfr.ReadBytes(header) + if err != nil { + return nil, err + } + + size := binary.BigEndian.Uint32(header) + requestID := binary.BigEndian.Uint64(header[4:]) + cmdAndFlags := binary.BigEndian.Uint32(header[12:]) + cmd := Cmd(cmdAndFlags & 0xffffff) + flags := PacketFlag(cmdAndFlags >> 24) + if size < 12 { + return nil, ErrInvalidFrameSize + } + + payload := make([]byte, size-12) + err = dfr.ReadBytes(payload) + if err != nil { + return nil, err + } + + return &Frame{RequestID: requestID, Cmd: cmd, Flags: flags, Payload: payload}, nil +} diff --git a/framewriter.go b/framewriter.go new file mode 100644 index 0000000..43a7ffc --- /dev/null +++ b/framewriter.go @@ -0,0 +1,88 @@ +package qrpc + +import ( + "context" +) + +// defaultFrameWriter is responsible for write frames +// should create one instance per goroutine +type defaultFrameWriter struct { + writeCh chan<- writeFrameRequest + wbuf []byte + ctx context.Context +} + +// NewFrameWriter creates a FrameWriter instance to write frames +func NewFrameWriter(ctx context.Context, writeCh chan<- writeFrameRequest) FrameWriter { + return &defaultFrameWriter{writeCh: writeCh, ctx: ctx} +} + +// StartWrite Write the FrameHeader. +func (dfw *defaultFrameWriter) StartWrite(requestID uint64, cmd Cmd, flags PacketFlag) { + + // Write the FrameHeader. + dfw.wbuf = append(dfw.wbuf[:0], + 0, // 4 bytes of length, filled in in endWrite + 0, + 0, + 0, + byte(requestID>>56), + byte(requestID>>48), + byte(requestID>>40), + byte(requestID>>32), + byte(requestID>>24), + byte(requestID>>16), + byte(requestID>>8), + byte(requestID), + byte(flags), + byte(cmd>>16), + byte(cmd>>8), + byte(cmd)) +} + +// EndWrite finishes write frame +func (dfw *defaultFrameWriter) EndWrite() error { + length := len(dfw.wbuf) - 4 + _ = append(dfw.wbuf[:0], + byte(length>>24), + byte(length>>16), + byte(length>>8), + byte(length)) + + wfr := writeFrameRequest{frame: dfw.wbuf, result: make(chan error)} + select { + case dfw.writeCh <- wfr: + case <-dfw.ctx.Done(): + return dfw.ctx.Err() + } + + select { + case err := <-wfr.result: + return err + case <-dfw.ctx.Done(): + return dfw.ctx.Err() + } +} + +// WriteUint64 write uint64 to wbuf +func (dfw *defaultFrameWriter) WriteUint64(v uint64) { + dfw.wbuf = append(dfw.wbuf, byte(v>>56), byte(v>>48), byte(v>>40), byte(v>>32), byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) +} + +// WriteUint32 write uint32 to wbuf +func (dfw *defaultFrameWriter) WriteUint32(v uint32) { + dfw.wbuf = append(dfw.wbuf, byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) +} + +// WriteUint16 write uint16 to wbuf +func (dfw *defaultFrameWriter) WriteUint16(v uint16) { + dfw.wbuf = append(dfw.wbuf, byte(v>>8), byte(v)) +} + +// WriteUint8 write uint8 to wbuf +func (dfw *defaultFrameWriter) WriteUint8(v uint8) { + dfw.wbuf = append(dfw.wbuf, byte(v)) +} + +// WriteBytes write multiple bytes +func (dfw *defaultFrameWriter) WriteBytes(v []byte) { dfw.wbuf = append(dfw.wbuf, v...) } diff --git a/qrpc.go b/qrpc.go new file mode 100644 index 0000000..b2affcb --- /dev/null +++ b/qrpc.go @@ -0,0 +1,28 @@ +package qrpc + +// Cmd is for multiplexer +type Cmd uint32 + +// PacketFlag defines flags for qrpc +type PacketFlag uint8 + +const ( + // StreamFlag means packet is streamed + StreamFlag PacketFlag = 1 << iota + // StreamEndFlag denotes the end of a stream + StreamEndFlag + // NBFlag means it should be handled nonblockly + NBFlag + // CompressFlag indicate packet is compressed (TODO) + CompressFlag + // PushFlag mean the frame is pushed from server + PushFlag +) + +// contextKey is a value for use with context.WithValue. It's used as +// a pointer so it fits in an interface{} without allocation. +type contextKey struct { + name string +} + +func (k *contextKey) String() string { return "qrpc context value " + k.name } diff --git a/reader.go b/reader.go new file mode 100644 index 0000000..fa59e88 --- /dev/null +++ b/reader.go @@ -0,0 +1,54 @@ +package qrpc + +import ( + "encoding/binary" + "io" + "net" + "time" +) + +// Reader read data from socket +type Reader struct { + conn net.Conn + timeout int +} + +const ( + // ReadNoTimeout will never timeout + ReadNoTimeout = -1 +) + +// NewReader creates a StreamReader instance +func NewReader(conn net.Conn) *Reader { + return NewReaderWithTimeout(conn, ReadNoTimeout) +} + +// NewReaderWithTimeout allows specify timeout +func NewReaderWithTimeout(conn net.Conn, timeout int) *Reader { + return &Reader{conn: conn, timeout: timeout} +} + +// ReadUint32 read uint32 from socket +func (r *Reader) ReadUint32() (uint32, error) { + bytes := make([]byte, 4) + err := r.ReadBytes(bytes) + if err != nil { + return 0, err + } + + return binary.BigEndian.Uint32(bytes), nil +} + +// ReadBytes read bytes +func (r *Reader) ReadBytes(bytes []byte) error { + timeout := r.timeout + if timeout > 0 { + r.conn.SetReadDeadline(time.Now().Add(time.Duration(timeout) * time.Second)) + } + _, err := io.ReadFull(r.conn, bytes) + if err != nil { + return err + } + + return nil +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..c10ec7c --- /dev/null +++ b/server.go @@ -0,0 +1,252 @@ +package qrpc + +import ( + "context" + "errors" + "net" + "sync" + "time" +) + +// FrameWriter looks like writes a qrpc resp +// but it internally needs be scheduled, thus maintains a simple yet powerful interface +type FrameWriter interface { + StartWrite(requestID uint64, cmd Cmd, flags PacketFlag) + WriteBytes(v []byte) // v is copied in WriteBytes + EndWrite() error // block until scheduled +} + +// A Handler responds to an qrpc request. +type Handler interface { + ServeQRPC(FrameWriter, *Frame) +} + +// The HandlerFunc type is an adapter to allow the use of +// ordinary functions as qrpc handlers. If f is a function +// with the appropriate signature, HandlerFunc(f) is a +// Handler that calls f. +type HandlerFunc func(FrameWriter, *Frame) + +// ServeQRPC calls f(w, r). +func (f HandlerFunc) ServeQRPC(w FrameWriter, r *Frame) { + f(w, r) +} + +// ServeMux is qrpc request multiplexer. +type ServeMux struct { + mu sync.RWMutex + m map[Cmd]Handler +} + +// NewServeMux allocates and returns a new ServeMux. +func NewServeMux() *ServeMux { return new(ServeMux) } + +// HandleFunc registers the handler function for the given pattern. +func (mux *ServeMux) HandleFunc(cmd Cmd, handler func(FrameWriter, *Frame)) { + mux.Handle(cmd, HandlerFunc(handler)) +} + +// Handle registers the handler for the given pattern. +// If a handler already exists for pattern, Handle panics. +func (mux *ServeMux) Handle(cmd Cmd, handler Handler) { + mux.mu.Lock() + defer mux.mu.Unlock() + + if handler == nil { + panic("http: nil handler") + } + if _, exist := mux.m[cmd]; exist { + panic("http: multiple registrations for " + string(cmd)) + } + + if mux.m == nil { + mux.m = make(map[Cmd]Handler) + } + mux.m[cmd] = handler +} + +// ServeQRPC dispatches the request to the handler whose +// cmd matches the request. +func (mux *ServeMux) ServeQRPC(w FrameWriter, r *Frame) { + mux.mu.RLock() + h, ok := mux.m[r.Cmd] + if !ok { + return + } + mux.mu.RUnlock() + h.ServeQRPC(w, r) +} + +// Server defines parameters for running an qrpc server. +type Server struct { + // one handler for each listening address + bindings []ServerBinding + + // manages below two + mu sync.Mutex + listeners map[net.Listener]struct{} + doneChan chan struct{} + + activeConn []sync.Map // for better iterate when write, map[conn]*ConnectionInfo +} + +// NewServer creates a server +func NewServer(bindings []ServerBinding) *Server { + return &Server{ + bindings: bindings, + listeners: make(map[net.Listener]struct{}), + doneChan: make(chan struct{}), + activeConn: make([]sync.Map, len(bindings))} +} + +// ListenAndServe starts listening on all bindings +func (srv *Server) ListenAndServe() error { + + for idx, binding := range srv.bindings { + ln, err := net.Listen("tcp", binding.Addr) + if err != nil { + srv.Shutdown(nil) + return err + } + + go srv.serve(ln, idx) + + } + + return nil +} + +// ErrServerClosed is returned by the Server's Serve, ListenAndServe, +// methods after a call to Shutdown or Close. +var ErrServerClosed = errors.New("qrpc: Server closed") + +var defaultAcceptTimeout = 5 * time.Second + +// serve accepts incoming connections on the Listener l, creating a +// new service goroutine for each. The service goroutines read requests and +// then call srv.Handler to reply to them. +// +// serve always returns a non-nil error. After Shutdown or Close, the +// returned error is ErrServerClosed. +func (srv *Server) serve(l net.Listener, idx int) error { + + defer l.Close() + var tempDelay time.Duration // how long to sleep on accept failure + + srv.trackListener(l, true) + defer srv.trackListener(l, false) + + serveCtx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + for { + l.(*net.TCPListener).SetDeadline(time.Now().Add(defaultAcceptTimeout)) + rw, e := l.Accept() + if e != nil { + select { + case <-srv.doneChan: + return ErrServerClosed + default: + } + if opError, ok := e.(*net.OpError); ok && opError.Timeout() { + // don't log the scheduled timeout + continue + } + if ne, ok := e.(net.Error); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay) + time.Sleep(tempDelay) + continue + } + return e + } + tempDelay = 0 + c := srv.newConn(rw) + + go c.serve(serveCtx, idx) + } +} + +func (srv *Server) trackListener(ln net.Listener, add bool) { + srv.mu.Lock() + defer srv.mu.Unlock() + if add { + srv.listeners[ln] = struct{}{} + } else { + delete(srv.listeners, ln) + } +} + +// Create new connection from rwc. +func (srv *Server) newConn(rwc net.Conn) *conn { + c := &conn{ + server: srv, + rwc: rwc, + readFrameCh: make(chan readFrameResult), + writeFrameCh: make(chan writeFrameRequest)} + return c +} + +func (srv *Server) logf(format string, args ...interface{}) { + +} + +var shutdownPollInterval = 500 * time.Millisecond + +// Shutdown gracefully shutdown the server +func (srv *Server) Shutdown(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } + + srv.mu.Lock() + lnerr := srv.closeListenersLocked() + if lnerr != nil { + return lnerr + } + srv.mu.Unlock() + + close(srv.doneChan) + + ticker := time.NewTicker(shutdownPollInterval) + defer ticker.Stop() + for { + if srv.waitConnDone() { + return lnerr + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +func (srv *Server) waitConnDone() bool { + done := true + for idx := 0; done && idx < len(srv.bindings); idx++ { + srv.activeConn[idx].Range(func(key, value interface{}) bool { + done = false + return false + }) + } + + return done +} + +func (srv *Server) closeListenersLocked() error { + var err error + for ln := range srv.listeners { + if cerr := ln.Close(); cerr != nil && err == nil { + err = cerr + } + delete(srv.listeners, ln) + } + return err +} diff --git a/test/qrpc_test.go b/test/qrpc_test.go new file mode 100644 index 0000000..c362e75 --- /dev/null +++ b/test/qrpc_test.go @@ -0,0 +1,31 @@ +package test + +import ( + "qrpc" + "testing" + "time" +) + +const ( + addr = "0.0.0.0:8080" +) + +// TestConnection tests connection +func TestConnection(t *testing.T) { + + go startServer() + time.Sleep(time.Second * 2) + + conf := qrpc.ConnectionConfig{} + cli := qrpc.NewClient(conf) + + cli.GetConn("ab") +} + +func startServer() { + handler := qrpc.NewServeMux() + bindings := []qrpc.ServerBinding{ + qrpc.ServerBinding{Addr: addr, Handler: handler}} + server := qrpc.NewServer(bindings) + server.ListenAndServe() +} diff --git a/writer.go b/writer.go new file mode 100644 index 0000000..044bc01 --- /dev/null +++ b/writer.go @@ -0,0 +1,48 @@ +package qrpc + +import ( + "net" + "time" +) + +// Writer writes data to connection +type Writer struct { + conn net.Conn + timeout int +} + +const ( + // WriteNoTimeout will never timeout + WriteNoTimeout = -1 +) + +// NewWriter new instance +func NewWriter(conn net.Conn) *Writer { + return &Writer{conn: conn, timeout: WriteNoTimeout} +} + +// NewWriterWithTimeout new instance with timeout +func NewWriterWithTimeout(conn net.Conn, timeout int) *Writer { + return &Writer{conn: conn, timeout: timeout} +} + +// Write writes bytes +func (w *Writer) Write(bytes []byte) (int, error) { + size := len(bytes) + + offset := 0 + + if w.timeout > 0 { + w.conn.SetWriteDeadline(time.Now().Add(time.Duration(w.timeout) * time.Second)) + } + for { + n, err := w.conn.Write(bytes[offset:]) + offset += n + if err != nil { + return offset, err + } + if offset >= size { + return offset, nil + } + } +}