diff --git a/cmd/kingshard/main.go b/cmd/kingshard/main.go index aac70e15..308d3f4e 100644 --- a/cmd/kingshard/main.go +++ b/cmd/kingshard/main.go @@ -66,12 +66,14 @@ func main() { } if len(*configFile) == 0 { fmt.Println("must use a config file") + os.Exit(1) return } cfg, err := config.ParseConfigFile(*configFile) if err != nil { fmt.Printf("parse config file error:%v\n", err.Error()) + os.Exit(1) return } @@ -81,6 +83,7 @@ func main() { sysFile, err := golog.NewRotatingFileHandler(sysFilePath, MaxLogSize, 1) if err != nil { fmt.Printf("new log file error:%v\n", err.Error()) + os.Exit(1) return } golog.GlobalSysLogger = golog.New(sysFile, golog.Lfile|golog.Ltime|golog.Llevel) @@ -89,6 +92,7 @@ func main() { sqlFile, err := golog.NewRotatingFileHandler(sqlFilePath, MaxLogSize, 1) if err != nil { fmt.Printf("new log file error:%v\n", err.Error()) + os.Exit(1) return } golog.GlobalSqlLogger = golog.New(sqlFile, golog.Lfile|golog.Ltime|golog.Llevel) @@ -108,6 +112,7 @@ func main() { golog.Error("main", "main", err.Error(), 0) golog.GlobalSysLogger.Close() golog.GlobalSqlLogger.Close() + os.Exit(1) return } apiSvr, err = web.NewApiServer(cfg, svr) @@ -116,6 +121,7 @@ func main() { golog.GlobalSysLogger.Close() golog.GlobalSqlLogger.Close() svr.Close() + os.Exit(1) return } prometheusSvr, err = monitor.NewPrometheus(cfg.PrometheusAddr, svr) @@ -124,6 +130,7 @@ func main() { golog.GlobalSysLogger.Close() golog.GlobalSqlLogger.Close() svr.Close() + os.Exit(1) return } diff --git a/etc/ks.yaml b/etc/ks.yaml index 727af5d5..ffd1bf77 100755 --- a/etc/ks.yaml +++ b/etc/ks.yaml @@ -28,7 +28,7 @@ log_level : debug # if set log_sql(on|off) off,the sql log will not output log_sql: on - + # only log the query that take more than slow_log_time ms #slow_log_time : 100 @@ -46,38 +46,38 @@ log_sql: on # node is an agenda for real remote mysql server. nodes : -- - name : node1 +- + name : node1 # default max conns for mysql server max_conns_limit : 32 # all mysql in a node must have the same user and password - user : root + user : root password : root - # master represents a real mysql master server + # master represents a real mysql master server master : 127.0.0.1:3306 - # slave represents a real mysql salve server,and the number after '@' is + # slave represents a real mysql salve server,and the number after '@' is # read load weight of this slave. #slave : 192.168.59.101:3307@2,192.168.59.101:3307@3 down_after_noalive : 32 -- - name : node2 +- + name : node2 # default max conns for mysql server max_conns_limit : 32 # all mysql in a node must have the same user and password - user : root + user : root password : root - # master represents a real mysql master server + # master represents a real mysql master server master : 127.0.0.1:3309 - # slave represents a real mysql salve server - slave : + # slave represents a real mysql salve server + slave : # down mysql after N seconds noalive # 0 will no down @@ -91,28 +91,28 @@ schema_list : default: node1 shard: - - + db : node2_db + nodes: [node2] - user: kingshard nodes: [node1,node2] - default: node1 + default: node1 shard: - - + - db : kingshard table: test_shard_hash key: id nodes: [node1, node2] type: hash locations: [4,4] - - - + - db : hidb table: test_hash key: id nodes: [node1, node2] type: hash locations: [4,4] - - + - db : kingshard table: test_shard_range key: id diff --git a/proxy/server/conn_preshard.go b/proxy/server/conn_preshard.go index 1b9c55e9..6f783f73 100644 --- a/proxy/server/conn_preshard.go +++ b/proxy/server/conn_preshard.go @@ -30,6 +30,7 @@ import ( type ExecuteDB struct { ExecNode *backend.Node IsSlave bool + db string sql string } @@ -210,6 +211,17 @@ func (c *ClientConn) setExecuteNode(tokens []string, tokensLen int, executeDB *E } if executeDB.ExecNode == nil { + if c.db != "" && executeDB.db == "" { + executeDB.db = c.db + } + if executeDB.db != "" { + // get default db nodes from rules + rule := c.schema.rule.Rules[executeDB.db][""] + if rule != nil && len(rule.Nodes) > 0 { + executeDB.ExecNode = c.proxy.GetNode(rule.Nodes[0]) + return nil + } + } defaultRule := c.schema.rule.DefaultRule if len(defaultRule.Nodes) == 0 { return errors.ErrNoDefaultNode @@ -239,6 +251,7 @@ func (c *ClientConn) getSelectExecDB(sql string, tokens []string, tokensLen int) //if the token[i+1] like this:kingshard.test_shard_hash if DBName != "" { ruleDB = DBName + executeDB.db = DBName } else { ruleDB = c.db } @@ -288,6 +301,7 @@ func (c *ClientConn) getDeleteExecDB(sql string, tokens []string, tokensLen int) //if the token[i+1] like this:kingshard.test_shard_hash if DBName != "" { ruleDB = DBName + executeDB.db = DBName } else { ruleDB = c.db } @@ -326,6 +340,7 @@ func (c *ClientConn) getInsertOrReplaceExecDB(sql string, tokens []string, token //if the token[i+1] like this:kingshard.test_shard_hash if DBName != "" { ruleDB = DBName + executeDB.db = DBName } else { ruleDB = c.db } @@ -363,6 +378,7 @@ func (c *ClientConn) getUpdateExecDB(sql string, tokens []string, tokensLen int) //if the token[i+1] like this:kingshard.test_shard_hash if DBName != "" { ruleDB = DBName + executeDB.db = DBName } else { ruleDB = c.db } @@ -457,6 +473,7 @@ func (c *ClientConn) handleShowColumns(sql string, tokens []string, //get the ruleDB if i+4 < tokensLen && strings.ToLower(tokens[i+1]) == mysql.TK_STR_FROM { ruleDB = strings.Trim(tokens[i+4], "`") + executeDB.db = ruleDB } else { ruleDB = c.db } @@ -494,6 +511,7 @@ func (c *ClientConn) getTruncateExecDB(sql string, tokens []string, tokensLen in //if the token[i+1] like this:kingshard.test_shard_hash if DBName != "" { ruleDB = DBName + executeDB.db = DBName } else { ruleDB = c.db } diff --git a/proxy/server/conn_query.go b/proxy/server/conn_query.go index 22ec0217..ef5c7d8c 100644 --- a/proxy/server/conn_query.go +++ b/proxy/server/conn_query.go @@ -113,6 +113,21 @@ func (c *ClientConn) handleQuery(sql string) (err error) { return nil } +func (c *ClientConn) getExecuteNode() (*backend.Node, error) { + if c.db != "" { + rule := c.schema.rule.Rules[c.db][""] + if rule != nil && len(rule.Nodes) > 0 { + return c.proxy.GetNode(rule.Nodes[0]), nil + } + } + // fallback to default + defaultRule := c.schema.rule.DefaultRule + if len(defaultRule.Nodes) == 0 { + return nil, errors.ErrNoDefaultNode + } + return c.proxy.GetNode(defaultRule.Nodes[0]), nil +} + func (c *ClientConn) getBackendConn(n *backend.Node, fromSlave bool) (co *backend.BackendConn, err error) { if !c.isInTransaction() { if fromSlave { diff --git a/proxy/server/conn_stmt.go b/proxy/server/conn_stmt.go index a5d258a8..9735d600 100644 --- a/proxy/server/conn_stmt.go +++ b/proxy/server/conn_stmt.go @@ -21,7 +21,6 @@ import ( "strconv" "strings" - "github.com/flike/kingshard/core/errors" "github.com/flike/kingshard/core/golog" "github.com/flike/kingshard/mysql" "github.com/flike/kingshard/sqlparser" @@ -72,9 +71,10 @@ func (c *ClientConn) handleStmtPrepare(sql string) error { s.sql = sql - defaultRule := c.schema.rule.DefaultRule - - n := c.proxy.GetNode(defaultRule.Nodes[0]) + n, err := c.getExecuteNode() + if err != nil { + return err + } co, err := c.getBackendConn(n, false) defer c.closeConn(co, false) @@ -258,14 +258,13 @@ func (c *ClientConn) handleStmtExecute(data []byte) error { } func (c *ClientConn) handlePrepareSelect(stmt *sqlparser.Select, sql string, args []interface{}) error { - defaultRule := c.schema.rule.DefaultRule - if len(defaultRule.Nodes) == 0 { - return errors.ErrNoDefaultNode + n, err := c.getExecuteNode() + if err != nil { + return err } - defaultNode := c.proxy.GetNode(defaultRule.Nodes[0]) //choose connection in slave DB first - conn, err := c.getBackendConn(defaultNode, true) + conn, err := c.getBackendConn(n, true) defer c.closeConn(conn, false) if err != nil { return err @@ -295,14 +294,13 @@ func (c *ClientConn) handlePrepareSelect(stmt *sqlparser.Select, sql string, arg } func (c *ClientConn) handlePrepareExec(stmt sqlparser.Statement, sql string, args []interface{}) error { - defaultRule := c.schema.rule.DefaultRule - if len(defaultRule.Nodes) == 0 { - return errors.ErrNoDefaultNode + n, err := c.getExecuteNode() + if err != nil { + return err } - defaultNode := c.proxy.GetNode(defaultRule.Nodes[0]) //execute in Master DB - conn, err := c.getBackendConn(defaultNode, false) + conn, err := c.getBackendConn(n, false) defer c.closeConn(conn, false) if err != nil { return err