diff --git a/client.go b/client.go new file mode 100644 index 0000000..b2ad383 --- /dev/null +++ b/client.go @@ -0,0 +1,134 @@ +package qrpc + +import ( + "context" + "net" + "sync" +) + +// Client defines a qrpc client +type Client struct { + mu sync.Mutex + conf ConnectionConfig + connections map[string]*sync.Pool +} + +// Connection defines a qrpc connection +// it is not thread safe, because writer is not +type Connection struct { + net.Conn + reader *defaultFrameReader + writer FrameWriter + p *sync.Pool + conf ConnectionConfig + writeFrameCh chan writeFrameRequest // written by FrameWriter + subscriber SubFunc // there can be only one subscriber because of streamed frames +} + +// NewClient creates a Client instance +func NewClient(conf ConnectionConfig) *Client { + cli := &Client{conf: conf, connections: make(map[string]*sync.Pool)} + return cli +} + +// NewConnection creates a connection without Client +func NewConnection(addr string, conf ConnectionConfig, f func(*Frame)) (*Connection, error) { + return newConnectionWithPool(addr, conf, nil, SubFunc(f)) +} + +func newConnectionWithPool(addr string, conf ConnectionConfig, p *sync.Pool, f SubFunc) (*Connection, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + + if conf.Ctx == nil { + conf.Ctx = context.Background() + } + + c := &Connection{Conn: conn, conf: conf, writeFrameCh: make(chan writeFrameRequest), subscriber: f} + c.reader = newFrameReader(conf.Ctx, conn, conf.ReadTimeout) + c.writer = NewFrameWriter(conf.Ctx, c.writeFrameCh) + + go c.readFrames() + go c.writeFrames() + + return c, nil +} + +// GetConn get a connection from Client +func (cli *Client) GetConn(addr string, f func(*Frame)) *Connection { + + cli.mu.Lock() + + p, ok := cli.connections[addr] + if !ok { + p = &sync.Pool{} + newFunc := func() interface{} { + + conn, err := newConnectionWithPool(addr, cli.conf, p, SubFunc(f)) + if err != nil { + return nil + } + + return conn + } + p.New = newFunc + cli.connections[addr] = p + } + cli.mu.Unlock() + + return p.Get().(*Connection) +} + +// Request send a request frame and returns response frame +func (conn *Connection) Request(cmd Cmd, flags PacketFlag, payload []byte) (*Frame, error) { + + return nil, nil +} + +// Close internally returns the connection to pool +func (conn *Connection) Close() error { + if conn.p != nil { + conn.p.Put(conn) + return nil + } + + return conn.Conn.Close() +} + +var requestID uint64 + +func (conn *Connection) readFrames() { + defer func() { + conn.Close() + }() + for { + frame, err := conn.reader.ReadFrame() + if err != nil { + return + } + + if frame.Flags&PushFlag != 0 { + // pushed frame + if conn.subscriber != nil { + conn.subscriber(frame) + } + + return + } + + // deal with pulled frames + } +} + +// Subscribe register f as callback for pushed message +func (conn *Connection) Subscribe(f func(*Frame)) { + if conn.subscriber != nil { + panic("only one subscriber allowed") + } + conn.subscriber = f +} +func (conn *Connection) writeFrames() { + +} diff --git a/conf.go b/conf.go new file mode 100644 index 0000000..2d819e0 --- /dev/null +++ b/conf.go @@ -0,0 +1,23 @@ +package qrpc + +import ( + "context" +) + +// ServerBinding contains binding infos +type ServerBinding struct { + Addr string + Handler Handler // handler to invoke + DefaultReadTimeout int + DefaultWriteTimeout int +} + +// SubFunc for subscribe callback +type SubFunc func(*Frame) + +// ConnectionConfig is conf for Connection +type ConnectionConfig struct { + Ctx context.Context + WriteTimeout int + ReadTimeout int +} diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..ca3dc6f --- /dev/null +++ b/conn.go @@ -0,0 +1,160 @@ +package qrpc + +import ( + "context" + "errors" + "net" + "runtime" +) + +// A conn represents the server side of an qrpc connection. +type conn struct { + // server is the server on which the connection arrived. + // Immutable; never nil. + server *Server + + // cancelCtx cancels the connection-level context. + cancelCtx context.CancelFunc + + // rwc is the underlying network connection. + // This is never wrapped by other types and is the value given out + // to CloseNotifier callers. It is usually of type *net.TCPConn + rwc net.Conn + + reader *defaultFrameReader // used in conn.readFrames + writer FrameWriter // used by handlers + readFrameCh chan readFrameResult // written by conn.readFrames + writeFrameCh chan writeFrameRequest // written by FrameWriter + +} + +// ConnectionInfoKey is context key for ConnectionInfo +// used to store custom information +var ConnectionInfoKey = &contextKey{"qrpc-connection"} + +// Serve a new connection. +func (c *conn) serve(ctx context.Context, idx int) { + + defer func() { + if err := recover(); err != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + c.server.logf("http: panic serving %v: %v\n%s", c.rwc.RemoteAddr().String(), err, buf) + } + c.close() + }() + + ctx, cancelCtx := context.WithCancel(ctx) + c.cancelCtx = cancelCtx + defer cancelCtx() + + c.reader = newFrameReader(ctx, c.rwc, c.server.bindings[idx].DefaultReadTimeout) + c.writer = NewFrameWriter(ctx, c.writeFrameCh) + + go c.readFrames(ctx) + go c.writeFrames(ctx, c.server.bindings[idx].DefaultWriteTimeout) + + handler := c.server.bindings[idx].Handler + + for { + select { + case <-ctx.Done(): + return + case res := <-c.readFrameCh: + if res.f.ctx != nil { + panic("res.f.ctx is not nil") + } + res.f.ctx = ctx + if res.f.Flags&NBFlag == 0 { + handler.ServeQRPC(c.writer, res.f) + res.readMore() + } else { + res.readMore() + go handler.ServeQRPC(c.writer, res.f) + } + } + + } + +} + +// ErrInvalidPacket when packet invalid +var ErrInvalidPacket = errors.New("invalid packet") + +type readFrameResult struct { + f *Frame // valid until readMore is called + + // readMore should be called once the consumer no longer needs or + // retains f. After readMore, f is invalid and more frames can be + // read. + readMore func() +} + +type writeFrameRequest struct { + frame []byte + result chan error +} + +// A gate lets two goroutines coordinate their activities. +type gate chan struct{} + +func (g gate) Done() { g <- struct{}{} } + +func (c *conn) readFrames(ctx context.Context) (err error) { + + defer func() { + if err != nil { + c.close() + } + }() + gate := make(gate) + gateDone := gate.Done + + for { + req, err := c.reader.ReadFrame() + if err != nil { + return err + } + select { + case c.readFrameCh <- readFrameResult{f: req, readMore: gateDone}: + case <-ctx.Done(): + return nil + } + + select { + case <-gate: + case <-ctx.Done(): + return nil + } + } + +} + +func (c *conn) writeFrames(ctx context.Context, timeout int) (err error) { + + writer := NewWriterWithTimeout(c.rwc, timeout) + for { + select { + case res := <-c.writeFrameCh: + _, err := writer.Write(res.frame) + if err != nil { + c.close() + } + res.result <- err + case <-ctx.Done(): + return nil + } + } +} + +// Close the connection. +func (c *conn) close() { + c.finalFlush() + c.rwc.Close() + c.cancelCtx() +} + +// return to pool +func (c *conn) finalFlush() { +} diff --git a/frame.go b/frame.go new file mode 100644 index 0000000..bb0a4ed --- /dev/null +++ b/frame.go @@ -0,0 +1,50 @@ +package qrpc + +import "context" + +// Frame models a qrpc frame +// all fields are readly only +type Frame struct { + RequestID uint64 + Flags PacketFlag + Cmd Cmd + Payload []byte + FrameCh <-chan *Frame // non nil for the first frame in stream + + // ctx is either the client or server context. It should only + // be modified via copying the whole Request using WithContext. + // It is unexported to prevent people from using Context wrong + // and mutating the contexts held by callers of the same request. + ctx context.Context //fyi: https://www.reddit.com/r/golang/comments/69j71a/why_does_httprequestwithcontext_do_a_shallow_copy/ +} + +// Context returns the request's context. To change the context, use +// WithContext. +// +// The returned context is always non-nil; it defaults to the +// background context. +// +// For outgoing client requests, the context controls cancelation. +// +// For incoming server requests, the context is canceled when the +// client's connection closes, the request is canceled (with HTTP/2), +// or when the ServeHTTP method returns. +func (r *Frame) Context() context.Context { + if r.ctx != nil { + return r.ctx + } + return context.Background() +} + +// WithContext returns a shallow copy of r with its context changed +// to ctx. The provided ctx must be non-nil. +func (r *Frame) WithContext(ctx context.Context) *Frame { + if ctx == nil { + panic("nil context") + } + r2 := new(Frame) + *r2 = *r + r2.ctx = ctx + + return r2 +} diff --git a/framereader.go b/framereader.go new file mode 100644 index 0000000..7c9240c --- /dev/null +++ b/framereader.go @@ -0,0 +1,118 @@ +package qrpc + +import ( + "context" + "encoding/binary" + "errors" + "net" +) + +// defaultFrameReader is responsible for read frames +// should create one instance per connection +type defaultFrameReader struct { + *Reader + rbuf [16]byte // for header + streamFrameCh map[uint64]chan<- *Frame + ctx context.Context +} + +// newFrameReader creates a FrameWriter instance to read frames +func newFrameReader(ctx context.Context, rwc net.Conn, timeout int) *defaultFrameReader { + return &defaultFrameReader{Reader: NewReaderWithTimeout(rwc, timeout), ctx: ctx} +} + +var ( + // ErrInvalidFrameSize when invalid size + ErrInvalidFrameSize = errors.New("invalid frame size") + // ErrStreamFrameMustNB when stream frame not non block + ErrStreamFrameMustNB = errors.New("streaming frame must be non block") +) + +func (dfr *defaultFrameReader) ReadFrame() (*Frame, error) { + + f, err := dfr.readFrame() + if err != nil { + return f, err + } + + requestID := f.RequestID + flags := f.Flags + + // done for non streamed frame + if flags&StreamFlag == 0 { + return f, nil + } + + // deal with streamed frames + + if flags&NBFlag == 0 { + return nil, ErrStreamFrameMustNB + } + + if flags&StreamEndFlag == 0 { + if dfr.streamFrameCh == nil { + // the first stream for the connection + dfr.streamFrameCh = make(map[uint64]chan<- *Frame) + } + ch, ok := dfr.streamFrameCh[requestID] + if !ok { + // the first frame for the stream + ch = make(chan<- *Frame) + dfr.streamFrameCh[requestID] = ch + return f, nil + } + + // continuation frame for the stream + select { + case ch <- f: + return dfr.ReadFrame() + case <-dfr.ctx.Done(): + return nil, dfr.ctx.Err() + } + } else { + // the ending frame of the stream + if dfr.streamFrameCh == nil { + // ending frame with no prior stream frames + return f, nil + } + ch, ok := dfr.streamFrameCh[requestID] + if !ok { + // ending frame with no prior stream frames + return f, nil + } + // ending frame for the stream + select { + case ch <- f: + delete(dfr.streamFrameCh, requestID) + return dfr.ReadFrame() + case <-dfr.ctx.Done(): + return nil, dfr.ctx.Err() + } + } +} + +func (dfr *defaultFrameReader) readFrame() (*Frame, error) { + + header := dfr.rbuf[:] + err := dfr.ReadBytes(header) + if err != nil { + return nil, err + } + + size := binary.BigEndian.Uint32(header) + requestID := binary.BigEndian.Uint64(header[4:]) + cmdAndFlags := binary.BigEndian.Uint32(header[12:]) + cmd := Cmd(cmdAndFlags & 0xffffff) + flags := PacketFlag(cmdAndFlags >> 24) + if size < 12 { + return nil, ErrInvalidFrameSize + } + + payload := make([]byte, size-12) + err = dfr.ReadBytes(payload) + if err != nil { + return nil, err + } + + return &Frame{RequestID: requestID, Cmd: cmd, Flags: flags, Payload: payload}, nil +} diff --git a/framewriter.go b/framewriter.go new file mode 100644 index 0000000..43a7ffc --- /dev/null +++ b/framewriter.go @@ -0,0 +1,88 @@ +package qrpc + +import ( + "context" +) + +// defaultFrameWriter is responsible for write frames +// should create one instance per goroutine +type defaultFrameWriter struct { + writeCh chan<- writeFrameRequest + wbuf []byte + ctx context.Context +} + +// NewFrameWriter creates a FrameWriter instance to write frames +func NewFrameWriter(ctx context.Context, writeCh chan<- writeFrameRequest) FrameWriter { + return &defaultFrameWriter{writeCh: writeCh, ctx: ctx} +} + +// StartWrite Write the FrameHeader. +func (dfw *defaultFrameWriter) StartWrite(requestID uint64, cmd Cmd, flags PacketFlag) { + + // Write the FrameHeader. + dfw.wbuf = append(dfw.wbuf[:0], + 0, // 4 bytes of length, filled in in endWrite + 0, + 0, + 0, + byte(requestID>>56), + byte(requestID>>48), + byte(requestID>>40), + byte(requestID>>32), + byte(requestID>>24), + byte(requestID>>16), + byte(requestID>>8), + byte(requestID), + byte(flags), + byte(cmd>>16), + byte(cmd>>8), + byte(cmd)) +} + +// EndWrite finishes write frame +func (dfw *defaultFrameWriter) EndWrite() error { + length := len(dfw.wbuf) - 4 + _ = append(dfw.wbuf[:0], + byte(length>>24), + byte(length>>16), + byte(length>>8), + byte(length)) + + wfr := writeFrameRequest{frame: dfw.wbuf, result: make(chan error)} + select { + case dfw.writeCh <- wfr: + case <-dfw.ctx.Done(): + return dfw.ctx.Err() + } + + select { + case err := <-wfr.result: + return err + case <-dfw.ctx.Done(): + return dfw.ctx.Err() + } +} + +// WriteUint64 write uint64 to wbuf +func (dfw *defaultFrameWriter) WriteUint64(v uint64) { + dfw.wbuf = append(dfw.wbuf, byte(v>>56), byte(v>>48), byte(v>>40), byte(v>>32), byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) +} + +// WriteUint32 write uint32 to wbuf +func (dfw *defaultFrameWriter) WriteUint32(v uint32) { + dfw.wbuf = append(dfw.wbuf, byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) +} + +// WriteUint16 write uint16 to wbuf +func (dfw *defaultFrameWriter) WriteUint16(v uint16) { + dfw.wbuf = append(dfw.wbuf, byte(v>>8), byte(v)) +} + +// WriteUint8 write uint8 to wbuf +func (dfw *defaultFrameWriter) WriteUint8(v uint8) { + dfw.wbuf = append(dfw.wbuf, byte(v)) +} + +// WriteBytes write multiple bytes +func (dfw *defaultFrameWriter) WriteBytes(v []byte) { dfw.wbuf = append(dfw.wbuf, v...) } diff --git a/qrpc.go b/qrpc.go new file mode 100644 index 0000000..b2affcb --- /dev/null +++ b/qrpc.go @@ -0,0 +1,28 @@ +package qrpc + +// Cmd is for multiplexer +type Cmd uint32 + +// PacketFlag defines flags for qrpc +type PacketFlag uint8 + +const ( + // StreamFlag means packet is streamed + StreamFlag PacketFlag = 1 << iota + // StreamEndFlag denotes the end of a stream + StreamEndFlag + // NBFlag means it should be handled nonblockly + NBFlag + // CompressFlag indicate packet is compressed (TODO) + CompressFlag + // PushFlag mean the frame is pushed from server + PushFlag +) + +// contextKey is a value for use with context.WithValue. It's used as +// a pointer so it fits in an interface{} without allocation. +type contextKey struct { + name string +} + +func (k *contextKey) String() string { return "qrpc context value " + k.name } diff --git a/reader.go b/reader.go new file mode 100644 index 0000000..fa59e88 --- /dev/null +++ b/reader.go @@ -0,0 +1,54 @@ +package qrpc + +import ( + "encoding/binary" + "io" + "net" + "time" +) + +// Reader read data from socket +type Reader struct { + conn net.Conn + timeout int +} + +const ( + // ReadNoTimeout will never timeout + ReadNoTimeout = -1 +) + +// NewReader creates a StreamReader instance +func NewReader(conn net.Conn) *Reader { + return NewReaderWithTimeout(conn, ReadNoTimeout) +} + +// NewReaderWithTimeout allows specify timeout +func NewReaderWithTimeout(conn net.Conn, timeout int) *Reader { + return &Reader{conn: conn, timeout: timeout} +} + +// ReadUint32 read uint32 from socket +func (r *Reader) ReadUint32() (uint32, error) { + bytes := make([]byte, 4) + err := r.ReadBytes(bytes) + if err != nil { + return 0, err + } + + return binary.BigEndian.Uint32(bytes), nil +} + +// ReadBytes read bytes +func (r *Reader) ReadBytes(bytes []byte) error { + timeout := r.timeout + if timeout > 0 { + r.conn.SetReadDeadline(time.Now().Add(time.Duration(timeout) * time.Second)) + } + _, err := io.ReadFull(r.conn, bytes) + if err != nil { + return err + } + + return nil +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..c10ec7c --- /dev/null +++ b/server.go @@ -0,0 +1,252 @@ +package qrpc + +import ( + "context" + "errors" + "net" + "sync" + "time" +) + +// FrameWriter looks like writes a qrpc resp +// but it internally needs be scheduled, thus maintains a simple yet powerful interface +type FrameWriter interface { + StartWrite(requestID uint64, cmd Cmd, flags PacketFlag) + WriteBytes(v []byte) // v is copied in WriteBytes + EndWrite() error // block until scheduled +} + +// A Handler responds to an qrpc request. +type Handler interface { + ServeQRPC(FrameWriter, *Frame) +} + +// The HandlerFunc type is an adapter to allow the use of +// ordinary functions as qrpc handlers. If f is a function +// with the appropriate signature, HandlerFunc(f) is a +// Handler that calls f. +type HandlerFunc func(FrameWriter, *Frame) + +// ServeQRPC calls f(w, r). +func (f HandlerFunc) ServeQRPC(w FrameWriter, r *Frame) { + f(w, r) +} + +// ServeMux is qrpc request multiplexer. +type ServeMux struct { + mu sync.RWMutex + m map[Cmd]Handler +} + +// NewServeMux allocates and returns a new ServeMux. +func NewServeMux() *ServeMux { return new(ServeMux) } + +// HandleFunc registers the handler function for the given pattern. +func (mux *ServeMux) HandleFunc(cmd Cmd, handler func(FrameWriter, *Frame)) { + mux.Handle(cmd, HandlerFunc(handler)) +} + +// Handle registers the handler for the given pattern. +// If a handler already exists for pattern, Handle panics. +func (mux *ServeMux) Handle(cmd Cmd, handler Handler) { + mux.mu.Lock() + defer mux.mu.Unlock() + + if handler == nil { + panic("http: nil handler") + } + if _, exist := mux.m[cmd]; exist { + panic("http: multiple registrations for " + string(cmd)) + } + + if mux.m == nil { + mux.m = make(map[Cmd]Handler) + } + mux.m[cmd] = handler +} + +// ServeQRPC dispatches the request to the handler whose +// cmd matches the request. +func (mux *ServeMux) ServeQRPC(w FrameWriter, r *Frame) { + mux.mu.RLock() + h, ok := mux.m[r.Cmd] + if !ok { + return + } + mux.mu.RUnlock() + h.ServeQRPC(w, r) +} + +// Server defines parameters for running an qrpc server. +type Server struct { + // one handler for each listening address + bindings []ServerBinding + + // manages below two + mu sync.Mutex + listeners map[net.Listener]struct{} + doneChan chan struct{} + + activeConn []sync.Map // for better iterate when write, map[conn]*ConnectionInfo +} + +// NewServer creates a server +func NewServer(bindings []ServerBinding) *Server { + return &Server{ + bindings: bindings, + listeners: make(map[net.Listener]struct{}), + doneChan: make(chan struct{}), + activeConn: make([]sync.Map, len(bindings))} +} + +// ListenAndServe starts listening on all bindings +func (srv *Server) ListenAndServe() error { + + for idx, binding := range srv.bindings { + ln, err := net.Listen("tcp", binding.Addr) + if err != nil { + srv.Shutdown(nil) + return err + } + + go srv.serve(ln, idx) + + } + + return nil +} + +// ErrServerClosed is returned by the Server's Serve, ListenAndServe, +// methods after a call to Shutdown or Close. +var ErrServerClosed = errors.New("qrpc: Server closed") + +var defaultAcceptTimeout = 5 * time.Second + +// serve accepts incoming connections on the Listener l, creating a +// new service goroutine for each. The service goroutines read requests and +// then call srv.Handler to reply to them. +// +// serve always returns a non-nil error. After Shutdown or Close, the +// returned error is ErrServerClosed. +func (srv *Server) serve(l net.Listener, idx int) error { + + defer l.Close() + var tempDelay time.Duration // how long to sleep on accept failure + + srv.trackListener(l, true) + defer srv.trackListener(l, false) + + serveCtx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + for { + l.(*net.TCPListener).SetDeadline(time.Now().Add(defaultAcceptTimeout)) + rw, e := l.Accept() + if e != nil { + select { + case <-srv.doneChan: + return ErrServerClosed + default: + } + if opError, ok := e.(*net.OpError); ok && opError.Timeout() { + // don't log the scheduled timeout + continue + } + if ne, ok := e.(net.Error); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay) + time.Sleep(tempDelay) + continue + } + return e + } + tempDelay = 0 + c := srv.newConn(rw) + + go c.serve(serveCtx, idx) + } +} + +func (srv *Server) trackListener(ln net.Listener, add bool) { + srv.mu.Lock() + defer srv.mu.Unlock() + if add { + srv.listeners[ln] = struct{}{} + } else { + delete(srv.listeners, ln) + } +} + +// Create new connection from rwc. +func (srv *Server) newConn(rwc net.Conn) *conn { + c := &conn{ + server: srv, + rwc: rwc, + readFrameCh: make(chan readFrameResult), + writeFrameCh: make(chan writeFrameRequest)} + return c +} + +func (srv *Server) logf(format string, args ...interface{}) { + +} + +var shutdownPollInterval = 500 * time.Millisecond + +// Shutdown gracefully shutdown the server +func (srv *Server) Shutdown(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } + + srv.mu.Lock() + lnerr := srv.closeListenersLocked() + if lnerr != nil { + return lnerr + } + srv.mu.Unlock() + + close(srv.doneChan) + + ticker := time.NewTicker(shutdownPollInterval) + defer ticker.Stop() + for { + if srv.waitConnDone() { + return lnerr + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +func (srv *Server) waitConnDone() bool { + done := true + for idx := 0; done && idx < len(srv.bindings); idx++ { + srv.activeConn[idx].Range(func(key, value interface{}) bool { + done = false + return false + }) + } + + return done +} + +func (srv *Server) closeListenersLocked() error { + var err error + for ln := range srv.listeners { + if cerr := ln.Close(); cerr != nil && err == nil { + err = cerr + } + delete(srv.listeners, ln) + } + return err +} diff --git a/test/qrpc_test.go b/test/qrpc_test.go new file mode 100644 index 0000000..c362e75 --- /dev/null +++ b/test/qrpc_test.go @@ -0,0 +1,31 @@ +package test + +import ( + "qrpc" + "testing" + "time" +) + +const ( + addr = "0.0.0.0:8080" +) + +// TestConnection tests connection +func TestConnection(t *testing.T) { + + go startServer() + time.Sleep(time.Second * 2) + + conf := qrpc.ConnectionConfig{} + cli := qrpc.NewClient(conf) + + cli.GetConn("ab") +} + +func startServer() { + handler := qrpc.NewServeMux() + bindings := []qrpc.ServerBinding{ + qrpc.ServerBinding{Addr: addr, Handler: handler}} + server := qrpc.NewServer(bindings) + server.ListenAndServe() +} diff --git a/writer.go b/writer.go new file mode 100644 index 0000000..044bc01 --- /dev/null +++ b/writer.go @@ -0,0 +1,48 @@ +package qrpc + +import ( + "net" + "time" +) + +// Writer writes data to connection +type Writer struct { + conn net.Conn + timeout int +} + +const ( + // WriteNoTimeout will never timeout + WriteNoTimeout = -1 +) + +// NewWriter new instance +func NewWriter(conn net.Conn) *Writer { + return &Writer{conn: conn, timeout: WriteNoTimeout} +} + +// NewWriterWithTimeout new instance with timeout +func NewWriterWithTimeout(conn net.Conn, timeout int) *Writer { + return &Writer{conn: conn, timeout: timeout} +} + +// Write writes bytes +func (w *Writer) Write(bytes []byte) (int, error) { + size := len(bytes) + + offset := 0 + + if w.timeout > 0 { + w.conn.SetWriteDeadline(time.Now().Add(time.Duration(w.timeout) * time.Second)) + } + for { + n, err := w.conn.Write(bytes[offset:]) + offset += n + if err != nil { + return offset, err + } + if offset >= size { + return offset, nil + } + } +}