Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加了kingshard多个功能,详见里面的列表 #208

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
97 changes: 97 additions & 0 deletions backend/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/flike/kingshard/core/errors"
"github.com/flike/kingshard/core/golog"
"github.com/flike/kingshard/mysql"
)

Expand Down Expand Up @@ -266,6 +267,31 @@ func (db *DB) PopConn() (*Conn, error) {
return co, nil
}

func (db *DB) calcDBConnsCount(n int) int {
cacheConnsCount := len(db.cacheConns)
var calcCount int
switch {
case cacheConnsCount < 5:
calcCount = 1
case n <= cacheConnsCount/2:
calcCount = n
case n > cacheConnsCount/2:
calcCount = cacheConnsCount / 2
default:
calcCount = 1
}
golog.Info("DB", "DBConnsCount", "calcDBConnsCount", 0,
"Count", calcCount)
return calcCount
}
func (db *DB) PopConns(n int) ([]*Conn, error) {
cacheConns, idleConns := db.getConns()
if cacheConns == nil || idleConns == nil {
return nil, errors.ErrDatabaseClose
}
calcCount := db.calcDBConnsCount(n)
return db.GetDBConns(calcCount)
}
func (db *DB) GetConnFromCache(cacheConns chan *Conn) *Conn {
var co *Conn
var err error
Expand All @@ -284,6 +310,47 @@ func (db *DB) GetConnFromCache(cacheConns chan *Conn) *Conn {
}
return co
}
func (db *DB) GetDBConns(n int) ([]*Conn, error) {
var co *Conn
var err error
conns := make([]*Conn, 0)
for 0 < len(db.cacheConns) {
co = <-db.cacheConns
if co != nil && PingPeroid < time.Now().Unix()-co.pushTimestamp {
err = co.Ping()
if err != nil {
db.closeConn(co)
co = nil
continue
}
}
if co != nil {
err = db.tryReuse(co)
if err != nil {
db.closeConn(co)
co = nil
continue
}
conns = append(conns, co)
if len(conns) >= n {
break
}
}
}
// if len(conns) == 0 {
// co, err = db.GetConnFromIdle(db.cacheConns, db.idleConns)
// if err != nil {
// return conns, err
// }
// err = db.tryReuse(co)
// if err != nil {
// db.closeConn(co)
// return conns, err
// }
// conns = append(conns, co)
// }
return conns, nil
}

func (db *DB) GetConnFromIdle(cacheConns, idleConns chan *Conn) (*Conn, error) {
var co *Conn
Expand Down Expand Up @@ -350,6 +417,12 @@ func (p *BackendConn) Close() {
}
}

func (p *BackendConn) GetBackendDB() (*DB, error) {
if p == nil {
return nil, errors.ErrNoDatabase
}
return p.db, nil
}
func (db *DB) GetConn() (*BackendConn, error) {
c, err := db.PopConn()
if err != nil {
Expand All @@ -358,6 +431,30 @@ func (db *DB) GetConn() (*BackendConn, error) {
return &BackendConn{c, db}, nil
}

func (db *DB) GetConns(n int, dbname, charset string) ([]*BackendConn, error) {
backendConn := make([]*BackendConn, 0)
if n <= 0 {
return backendConn, nil
}
conns, err := db.PopConns(n)
if err != nil {
return backendConn, err
}
for _, c := range conns {
if err = c.UseDB(dbname); err != nil {
golog.Error("DB", "GetConns, UseDB", err.Error(), 0)
}
if err = c.SetCharset(charset); err != nil {
golog.Error("DB", "GetConns, SetCharset", err.Error(), 0)
}
backendConn = append(backendConn, &BackendConn{c, db})
}
// if len(backendConn) == 0 {
// return backendConn, errors.ErrConnIsNil
// }
return backendConn, nil
}

func (db *DB) SetLastPing() {
db.lastPing = time.Now().Unix()
}
Expand Down
34 changes: 34 additions & 0 deletions cmd/kingshard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ const banner string = `
/____/
`

func writePID(cfg *config.Config) {
pidpath := cfg.PidPath
if pidpath == "" {
fmt.Println("WritePid failed, please confirm that the pidfile configuration item are correctly filed")
os.Exit(-1)
}

fp, err := os.Create(pidpath)
if err != nil {
fmt.Println("WritePID failed, create pidfile failed %s", err.Error())
os.Exit(-1)
}
defer fp.Close()

pid := os.Getpid()
_, err = fp.WriteString(fmt.Sprintf("%d", pid))
if err != nil {
fmt.Println("WritePID failed, %s", err.Error())
os.Exit(-1)
}
}

func main() {
fmt.Print(banner)
runtime.GOMAXPROCS(runtime.NumCPU())
Expand Down Expand Up @@ -103,6 +125,14 @@ func main() {
return
}

var monitor_svr *server.ServerMonitor
monitor_svr, err = server.NewMonitorServer(cfg)
if err != nil {
golog.Error("main", "main", err.Error(), 0)
golog.GlobalSysLogger.Close()
golog.GlobalSqlLogger.Close()
return
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGINT,
Expand All @@ -119,12 +149,16 @@ func main() {
golog.GlobalSysLogger.Close()
golog.GlobalSqlLogger.Close()
svr.Close()
monitor_svr.Close()
return
} else if sig == syscall.SIGPIPE {
golog.Info("main", "main", "Ignore broken pipe signal", 0)
}
}
}()

writePID(cfg)
go monitor_svr.Run()
svr.Run()
}

Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

//整个config文件对应的结构
type Config struct {
PidPath string `yaml:"pidfile"`
Addr string `yaml:"addr"`
MonitorAddr string `yaml:"monitor_addr"`
User string `yaml:"user"`
Password string `yaml:"password"`
LogPath string `yaml:"log_path"`
Expand Down Expand Up @@ -66,6 +68,7 @@ type ShardConfig struct {
Nodes []string `yaml:"nodes"`
Locations []int `yaml:"locations"`
Type string `yaml:"type"`
TableRowBase int `yaml:"table_row_base"`
TableRowLimit int `yaml:"table_row_limit"`
DateRange []string `yaml:"date_range"`
}
Expand Down
3 changes: 3 additions & 0 deletions etc/ks.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pidfile : /uc/etc/kingshard.pid
# server listen addr
addr : 0.0.0.0:9696
monitor_addr : 0.0.0.0:19696

# server user and password
user : kingshard
Expand Down Expand Up @@ -87,6 +89,7 @@ schema :
type: range
nodes: [node1, node2]
locations: [4,4]
table_row_base: 2000
table_row_limit: 10000
-
table: test_shard_time
Expand Down
6 changes: 3 additions & 3 deletions proxy/router/numkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (kr NumKeyRange) String() string {
return fmt.Sprintf("{Start: %d, End: %d}", kr.Start, kr.End)
}

func ParseNumSharding(Locations []int, TableRowLimit int) ([]NumKeyRange, error) {
func ParseNumSharding(Locations []int, TableRowLimit int, TableRowBase int) ([]NumKeyRange, error) {
tableCount := 0
length := len(Locations)

Expand All @@ -57,8 +57,8 @@ func ParseNumSharding(Locations []int, TableRowLimit int) ([]NumKeyRange, error)

ranges := make([]NumKeyRange, tableCount)
for i := 0; i < tableCount; i++ {
ranges[i].Start = int64(i * TableRowLimit)
ranges[i].End = int64((i + 1) * TableRowLimit)
ranges[i].Start = int64(TableRowBase + i*TableRowLimit)
ranges[i].End = int64(TableRowBase + (i+1)*TableRowLimit)
}
return ranges, nil
}
Expand Down
30 changes: 28 additions & 2 deletions proxy/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func parseShard(r *Rule, cfg *config.ShardConfig) error {
case HashRuleType:
r.Shard = &HashShard{ShardNum: len(r.TableToNode)}
case RangeRuleType:
rs, err := ParseNumSharding(cfg.Locations, cfg.TableRowLimit)
rs, err := ParseNumSharding(cfg.Locations, cfg.TableRowLimit, cfg.TableRowBase)
if err != nil {
return err
}
Expand Down Expand Up @@ -570,7 +570,32 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex
tableIndex,
)
}
buf.Fprintf(" %s %v", v.Join, v.RightExpr)

if ate_r, ok_r := (v.RightExpr).(*sqlparser.AliasedTableExpr); ok_r {
if len(ate_r.As) != 0 {
fmt.Fprintf(buf, " %s %s_%04d as %s",
sqlparser.AST_LEFT_JOIN,
sqlparser.String(ate_r.Expr),
tableIndex,
string(ate_r.As),
)
} else {
fmt.Fprintf(buf, " %s %s_%04d",
sqlparser.AST_LEFT_JOIN,
sqlparser.String(ate_r.Expr),
tableIndex,
)
}
} else {
fmt.Fprintf(buf, " %s %s_%04d",
sqlparser.AST_LEFT_JOIN,
sqlparser.String(v.RightExpr),
tableIndex,
)
}

//buf.Fprintf(" %s %v", v.Join, v.RightExpr)
//golog.Info("router", "sqlparser.JoinTableExpr and v.Join, v.RightExpr", buf.String(), 0)
if v.On != nil {
buf.Fprintf(" on %v", v.On)
}
Expand All @@ -591,6 +616,7 @@ func (r *Router) rewriteSelectSql(plan *Plan, node *sqlparser.Select, tableIndex
//do not change limit
newLimit = node.Limit
}

//rewrite where
oldright, err := plan.rewriteWhereIn(tableIndex)

Expand Down
Loading