Skip to content

Commit

Permalink
fix issue silenceper#32, Get funciton would hang when all connections…
Browse files Browse the repository at this point in the history
… are closed
  • Loading branch information
han committed Feb 23, 2021
1 parent 58e025e commit dc73e2e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 17 deletions.
104 changes: 88 additions & 16 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pool
import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"sync"
"time"
//"reflect"
Expand Down Expand Up @@ -47,13 +46,16 @@ type channelPool struct {
maxActive int
openingConns int
connReqs []chan connReq
openerCh chan struct{}
}

type idleConn struct {
conn interface{}
t time.Time
}

var connectionRequestQueueSize = 1000000

// NewChannelPool 初始化连接
func NewChannelPool(poolConfig *Config) (Pool, error) {
if !(poolConfig.InitialCap <= poolConfig.MaxIdle && poolConfig.MaxCap >= poolConfig.MaxIdle && poolConfig.InitialCap >= 0) {
Expand All @@ -73,12 +75,15 @@ func NewChannelPool(poolConfig *Config) (Pool, error) {
idleTimeout: poolConfig.IdleTimeout,
maxActive: poolConfig.MaxCap,
openingConns: poolConfig.InitialCap,
openerCh: make(chan struct{}, connectionRequestQueueSize),
}

if poolConfig.Ping != nil {
c.ping = poolConfig.Ping
}

go c.connectionOpener()

for i := 0; i < poolConfig.InitialCap; i++ {
conn, err := c.factory()
if err != nil {
Expand All @@ -99,6 +104,36 @@ func (c *channelPool) getConns() chan *idleConn {
return conns
}

// connectionOpener separate goroutine for opening new connection
func (c *channelPool) connectionOpener() {
for {
select {
case _, ok := <-c.openerCh:
if !ok {
return
}
c.openNewConnection()
}
}
}

// openNewConnection Open one new connection
func (c *channelPool) openNewConnection() {
conn, err := c.factory()
if err != nil {
c.mu.Lock()
c.openingConns--
c.maybeOpenNewConnections()
c.mu.Unlock()

// put nil connection into pool to wake up pending channel fetch
c.Put(nil)
return
}

c.Put(conn)
}

// Get 从pool中取一个连接
func (c *channelPool) Get() (interface{}, error) {
conns := c.getConns()
Expand Down Expand Up @@ -129,7 +164,6 @@ func (c *channelPool) Get() (interface{}, error) {
return wrapConn.conn, nil
default:
c.mu.Lock()
log.Debugf("openConn %v %v", c.openingConns, c.maxActive)
if c.openingConns >= c.maxActive {
req := make(chan connReq, 1)
c.connReqs = append(c.connReqs, req)
Expand All @@ -138,6 +172,9 @@ func (c *channelPool) Get() (interface{}, error) {
if !ok {
return nil, ErrMaxActiveConnReached
}
if ret.idleConn.conn == nil {
return nil, errors.New("failed to create a new connection")
}
if timeout := c.idleTimeout; timeout > 0 {
if ret.idleConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该连接
Expand All @@ -151,27 +188,26 @@ func (c *channelPool) Get() (interface{}, error) {
c.mu.Unlock()
return nil, ErrClosed
}

// c.factory 耗时较长,采用乐观策略,先增加,失败后再减少
c.openingConns++
c.mu.Unlock()
conn, err := c.factory()
if err != nil {
c.mu.Lock()
c.openingConns--
c.mu.Unlock()
return nil, err
}
c.openingConns++
c.mu.Unlock()
return conn, nil
}
}
}

// Put 将连接放回pool中
func (c *channelPool) Put(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}

c.mu.Lock()

if c.conns == nil {
if c.conns == nil && conn != nil {
c.mu.Unlock()
return c.Close(conn)
}
Expand All @@ -180,12 +216,17 @@ func (c *channelPool) Put(conn interface{}) error {
req := c.connReqs[0]
copy(c.connReqs, c.connReqs[1:])
c.connReqs = c.connReqs[:l-1]
req <- connReq{
idleConn: &idleConn{conn: conn, t: time.Now()},
if conn == nil {
req <- connReq{idleConn: nil}
//return errors.New("connection is nil. rejecting")
} else {
req <- connReq{
idleConn: &idleConn{conn: conn, t: time.Now()},
}
}
c.mu.Unlock()
return nil
} else {
} else if conn != nil {
select {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
c.mu.Unlock()
Expand All @@ -196,20 +237,46 @@ func (c *channelPool) Put(conn interface{}) error {
return c.Close(conn)
}
}

c.mu.Unlock()
return errors.New("connection is nil, rejecting")
}

// maybeOpenNewConnections 如果有请求在,并且池里的连接上限未达到时,开启新的连接
// Assumes c.mu is locked
func (c *channelPool) maybeOpenNewConnections() {
numRequest := len(c.connReqs)

if c.maxActive > 0 {
numCanOpen := c.maxActive - c.openingConns
if numRequest > numCanOpen {
numRequest = numCanOpen
}
}
for numRequest > 0 {
c.openingConns++
numRequest--
c.openerCh <- struct{}{}
}
}

// Close 关闭单条连接
func (c *channelPool) Close(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.close == nil {
return nil
}

var err error
err = c.close(conn)

c.mu.Lock()
c.openingConns--
return c.close(conn)
c.maybeOpenNewConnections()
c.mu.Unlock()
return err
}

// Ping 检查单条连接是否有效
Expand All @@ -229,13 +296,18 @@ func (c *channelPool) Release() {
c.ping = nil
closeFun := c.close
c.close = nil
openerCh := c.openerCh
c.openerCh = nil
c.mu.Unlock()

if conns == nil {
return
}

// close channels
close(conns)
close(openerCh)

for wrapConn := range conns {
//log.Printf("Type %v\n",reflect.TypeOf(wrapConn.conn))
closeFun(wrapConn.conn)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/silenceper/pool

go 1.13

require github.com/sirupsen/logrus v1.4.2
//require github.com/sirupsen/logrus v1.4.2

0 comments on commit dc73e2e

Please sign in to comment.