Skip to content

Commit

Permalink
add ServerSubFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiangxu committed Jun 5, 2020
1 parent 3dae71e commit 14a0ac4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
6 changes: 5 additions & 1 deletion conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type ServerLifecycleCallbacks struct {
OnClose func(net.Conn)
}

// ServerSubFunc for server subscribe callback
type ServerSubFunc func(*ConnectionInfo, *Frame)

// ServerBinding contains binding infos
type ServerBinding struct {
Addr string
Expand All @@ -41,6 +44,7 @@ type ServerBinding struct {
ListenFunc func(network, address string) (net.Listener, error)
Codec CompressorCodec
OverlayNetwork func(net.Listener, *tls.Config) Listener
SubFunc ServerSubFunc
OnKickCB func(w FrameWriter)
LatencyMetric metrics.Histogram
CounterMetric metrics.Counter
Expand All @@ -49,7 +53,7 @@ type ServerBinding struct {
ln Listener
}

// SubFunc for subscribe callback
// SubFunc for client subscribe callback
type SubFunc func(*Connection, *Frame)

// ClientLifecycleCallbacks for Connection
Expand Down
8 changes: 8 additions & 0 deletions serveconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,14 @@ func (sc *serveconn) readFrames() (err error) {
}
return err
}
if req.Flags.IsPush() {
// pushed frame
if binding.SubFunc != nil {
binding.SubFunc(ci, req)
}

continue
}
if req.FromServer() {
ci.l.Lock()
if ci.respes != nil {
Expand Down
8 changes: 4 additions & 4 deletions test/qrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,11 @@ func startServer() {
panic(err)
}
})
handler.HandleFunc(PushCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) {
// noop
})
subFunc := func(ci *qrpc.ConnectionInfo, request *qrpc.Frame) {
fmt.Println("pushedmsg")
}
bindings := []qrpc.ServerBinding{
qrpc.ServerBinding{Addr: addr, Handler: handler, ReadFrameChSize: 10000, WriteFrameChSize: 1000, WBufSize: 2000000, RBufSize: 2000000}}
qrpc.ServerBinding{Addr: addr, Handler: handler, SubFunc: subFunc, ReadFrameChSize: 10000, WriteFrameChSize: 1000, WBufSize: 2000000, RBufSize: 2000000}}
server := qrpc.NewServer(bindings)
err := server.ListenAndServe()
if err != nil {
Expand Down

0 comments on commit 14a0ac4

Please sign in to comment.