Skip to content

Commit

Permalink
Merge pull request #428 from zmberg/1.16.x
Browse files Browse the repository at this point in the history
1.16.x mesos支持污点调度机制
  • Loading branch information
DeveloperJim authored Apr 17, 2020
2 parents 90fbf0d + 7b09857 commit 84c9755
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 31 deletions.
17 changes: 9 additions & 8 deletions bcs-common/common/types/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ const (
)

const (
Constraint_Type_UNIQUE = "UNIQUE"
Constraint_Type_CLUSTER = "CLUSTER"
Constraint_Type_GROUP_BY = "GROUPBY"
Constraint_Type_MAX_PER = "MAXPER"
Constraint_Type_LIKE = "LIKE"
Constraint_Type_UNLIKE = "UNLIKE"
Constraint_Type_EXCLUDE = "EXCLUDE"
Constraint_Type_GREATER = "GREATER"
Constraint_Type_UNIQUE = "UNIQUE"
Constraint_Type_CLUSTER = "CLUSTER"
Constraint_Type_GROUP_BY = "GROUPBY"
Constraint_Type_MAX_PER = "MAXPER"
Constraint_Type_LIKE = "LIKE"
Constraint_Type_UNLIKE = "UNLIKE"
Constraint_Type_EXCLUDE = "EXCLUDE"
Constraint_Type_GREATER = "GREATER"
Constraint_Type_TOLERATION = "TOLERATION"
)

type ConstraintData struct {
Expand Down
8 changes: 8 additions & 0 deletions bcs-common/common/types/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type BcsClusterAgentSetting struct {
Disabled bool `json:"disabled"`
AttrStrings map[string]MesosValue_Text `json:"strings"`
AttrScalars map[string]MesosValue_Scalar `json:"scalars"`
NoSchedule map[string]string `json:"noSchedule"`
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
Expand All @@ -92,6 +93,13 @@ func (in *BcsClusterAgentSetting) DeepCopyInto(out *BcsClusterAgentSetting) {
(*out)[key] = val
}
}
if in.NoSchedule != nil {
in, out := &in.NoSchedule, &out.NoSchedule
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}

return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ func (s *Scheduler) initActions() {
httpserver.NewAction("GET", "/agentsettings", nil, s.getAgentSettingListHandler),
httpserver.NewAction("DELETE", "/agentsettings", nil, s.deleteAgentSettingListHandler),
httpserver.NewAction("POST", "/agentsettings", nil, s.setAgentSettingListHandler),
httpserver.NewAction("POST", "/agentsettings/update", nil, s.updateAgentSettingListHandler),
//httpserver.NewAction("POST", "/agentsettings/update", nil, s.updateAgentSettingListHandler),
httpserver.NewAction("POST", "/agentsettings/enable", nil, s.enableAgentListHandler),
httpserver.NewAction("POST", "/agentsettings/disable", nil, s.disableAgentListHandler),
httpserver.NewAction("PUT", "/agentsettings/taint", nil, s.taintAgentsHandler),
/*================= agentsetting ====================*/

/*-------------- custom resource deprecated from 1.15.x -----------------*/
Expand Down Expand Up @@ -544,6 +545,29 @@ func (s *Scheduler) disableAgentListHandler(req *restful.Request, resp *restful.
resp.Write([]byte(reply))
}

func (s *Scheduler) taintAgentsHandler(req *restful.Request, resp *restful.Response) {
if s.GetHost() == "" {
blog.Error("no scheduler is connected by driver")
err := bhttp.InternalError(common.BcsErrCommHttpDo, common.BcsErrCommHttpDoStr+"scheduler not exist")
resp.Write([]byte(err.Error()))
return
}

body, _ := s.getRequestInfo(req)
url := s.GetHost() + "/v1/agentsettings/taint"
blog.Infof("put url(%s) body(%s)", url, string(body))

reply, err := s.client.PUT(url, nil, body)
if err != nil {
blog.Error("request to url(%s) failed! err(%s)", url, err.Error())
err = bhttp.InternalError(common.BcsErrCommHttpDo, common.BcsErrCommHttpDoStr+err.Error())
resp.Write([]byte(err.Error()))
return
}

resp.Write([]byte(reply))
}

func (s *Scheduler) GetClusterResourcesHandler(req *restful.Request, resp *restful.Response) {

blog.V(3).Infof("get cluster resources")
Expand Down
32 changes: 30 additions & 2 deletions bcs-mesos/bcs-scheduler/src/manager/sched/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,34 @@ func (r *Router) setAgentSettingList(req *restful.Request, resp *restful.Respons
return
}

func (r *Router) taintAgents(req *restful.Request, resp *restful.Response) {
if r.backend.GetRole() != scheduler.SchedulerRoleMaster {
blog.Warn("scheduler is not master, can not process cmd")
return
}

var agents []*commtypes.BcsClusterAgentSetting
decoder := json.NewDecoder(req.Request.Body)
if err := decoder.Decode(&agents); err != nil {
blog.Warn("fail to Decode body(%s) for agent, err:%s", err.Error())
data := createResponeDataV2(comm.BcsErrCommRequestDataErr, err.Error(), nil)
resp.Write([]byte(data))
return
}

err := r.backend.TaintAgents(agents)
if err != nil {
blog.Error("fail to taint agents(%v), err:%s", agents, err.Error())
data := createResponeDataV2(comm.BcsErrCommRequestDataErr, err.Error(), nil)
resp.Write([]byte(data))
return
}

data := createResponeData(nil, "success", nil)
resp.Write([]byte(data))
return
}

func (r *Router) disableAgentList(req *restful.Request, resp *restful.Response) {
if r.backend.GetRole() != scheduler.SchedulerRoleMaster {
blog.Warn("scheduler is not master, can not process cmd")
Expand Down Expand Up @@ -166,7 +194,7 @@ func (r *Router) enableAgentList(req *restful.Request, resp *restful.Response) {
return
}

func (r *Router) updateAgentSettingList(req *restful.Request, resp *restful.Response) {
/*func (r *Router) updateAgentSettingList(req *restful.Request, resp *restful.Response) {
if r.backend.GetRole() != scheduler.SchedulerRoleMaster {
blog.Warn("scheduler is not master, can not process cmd")
return
Expand Down Expand Up @@ -196,7 +224,7 @@ func (r *Router) updateAgentSettingList(req *restful.Request, resp *restful.Resp
blog.Info("update agentsetting finish")
return
}
}*/

func (r *Router) queryAgentSetting(req *restful.Request, resp *restful.Response) {
if r.backend.GetRole() != scheduler.SchedulerRoleMaster {
Expand Down
3 changes: 2 additions & 1 deletion bcs-mesos/bcs-scheduler/src/manager/sched/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ func (r *Router) initRoutes() {
r.actions = append(r.actions, httpserver.NewAction("GET", "/agentsettings", nil, r.queryAgentSettingList))
r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings/delete", nil, r.deleteAgentSettingList))
r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings", nil, r.setAgentSettingList))
r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings/update", nil, r.updateAgentSettingList))
//r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings/update", nil, r.updateAgentSettingList))
r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings/enable", nil, r.enableAgentList))
r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings/disable", nil, r.disableAgentList))
r.actions = append(r.actions, httpserver.NewAction("PUT", "/agentsettings/taint", nil, r.taintAgents))
/*-------------- agent setting ---------------*/

/*-------------- custom resource -----------------*/
Expand Down
56 changes: 47 additions & 9 deletions bcs-mesos/bcs-scheduler/src/manager/sched/backend/agent_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ import (
comm "bk-bcs/bcs-common/common"
"bk-bcs/bcs-common/common/blog"
commtypes "bk-bcs/bcs-common/common/types"
"errors"
"encoding/json"
)

// DisableAgent setting agent unschedulable
func (b *backend) DisableAgent(IP string) error {

agent, err := b.store.FetchAgentSetting(IP)
if err != nil {
blog.Error("fetch agent setting(%s) from db fail:%s", IP, err.Error())
return err
}

blog.Infof("disable agent(%s)", IP)

if agent != nil {
Expand Down Expand Up @@ -66,6 +64,35 @@ func (b *backend) EnableAgent(IP string) error {
return b.store.SaveAgentSetting(&agentNew)
}

func (b *backend) TaintAgents(agents []*commtypes.BcsClusterAgentSetting) error {
for _, o := range agents {
agent, err := b.store.FetchAgentSetting(o.InnerIP)
if err != nil {
blog.Error("fetch agent setting(%s) from db fail:%s", o.InnerIP, err.Error())
return err
}
by, _ := json.Marshal(o.NoSchedule)
blog.Infof("taints agent(%s) %s", o.InnerIP, string(by))

if agent != nil {
agent.NoSchedule = o.NoSchedule
} else {
agent = &commtypes.BcsClusterAgentSetting{
InnerIP: o.InnerIP,
NoSchedule: o.NoSchedule,
}
}

err = b.store.SaveAgentSetting(agent)
if err != nil {
blog.Error("save agent(%s) in db fail:%s", o.InnerIP, err.Error())
return err
}
}

return nil
}

//QueryAgentSetting by IP address
func (b *backend) QueryAgentSetting(IP string) (*commtypes.BcsClusterAgentSetting, error) {

Expand Down Expand Up @@ -128,13 +155,24 @@ func (b *backend) DeleteAgentSettingList(IPs []string) (int, error) {
func (b *backend) SetAgentSettingList(agents []*commtypes.BcsClusterAgentSetting) (int, error) {

for _, agent := range agents {
err := b.store.SaveAgentSetting(agent)
o, err := b.store.FetchAgentSetting(agent.InnerIP)
if err != nil {
blog.Error("save agent setting(%s) [%+v] to zk err:%s", agent.InnerIP, agent, err.Error())
blog.Error("fetch agent setting(%s) from db fail:%s", agent.InnerIP, err.Error())
return comm.BcsErrCommCreateZkNodeFail, err
}
by, _ := json.Marshal(agent)
blog.Infof("set agent(%s) setting(%s)", agent.InnerIP, string(by))
if o == nil {
o = agent
} else {
o.AttrScalars = agent.AttrScalars
o.AttrStrings = agent.AttrStrings
}
err = b.store.SaveAgentSetting(o)
if err != nil {
blog.Error("save agent setting(%s) [%+v] to zk err:%s", o.InnerIP, o, err.Error())
return comm.BcsErrCommCreateZkNodeFail, err
}

blog.Infof("save agent setting(%s) [%+v] to zk", agent.InnerIP, agent)
}

return comm.BcsSuccess, nil
Expand Down Expand Up @@ -212,7 +250,7 @@ func (b *backend) EnableAgentList(IPs []string) (int, error) {
}

//UpdateAgentSettingList update agent setting by details
func (b *backend) UpdateAgentSettingList(update *commtypes.BcsClusterAgentSettingUpdate) (int, error) {
/*func (b *backend) UpdateAgentSettingList(update *commtypes.BcsClusterAgentSettingUpdate) (int, error) {
if len(update.IPs) <= 0 {
return comm.BcsErrCommRequestDataErr, errors.New("no ips to update")
Expand Down Expand Up @@ -298,4 +336,4 @@ func (b *backend) UpdateAgentSettingList(update *commtypes.BcsClusterAgentSettin
}
return comm.BcsSuccess, nil
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ type Backend interface {
EnableAgentList(IPs []string) (int, error)

//update user custom mesos slaves attributes
UpdateAgentSettingList(*commtypes.BcsClusterAgentSettingUpdate) (int, error)
//UpdateAgentSettingList(*commtypes.BcsClusterAgentSettingUpdate) (int, error)

//taints agent
TaintAgents([]*commtypes.BcsClusterAgentSetting) error

//custom resource register
RegisterCustomResource(*commtypes.Crr) error
Expand Down
24 changes: 24 additions & 0 deletions bcs-mesos/bcs-scheduler/src/manager/sched/offer/offerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
typesplugin "bk-bcs/bcs-common/common/plugin"
commtype "bk-bcs/bcs-common/common/types"
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/mesos"
"bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"container/list"
"encoding/json"
"fmt"
"golang.org/x/net/context"
"sync"
"time"
Expand Down Expand Up @@ -721,6 +723,28 @@ func (p *offerPool) addOfferAttributes(offer *mesos.Offer, agentSetting *commtyp
offer.Attributes = append(offer.Attributes, &attr)
}

//noSchedule, likes k8s Taints\Tolerations
name := types.MesosAttributeNoSchedule
t := mesos.Value_SET
noScheduleAttr := &mesos.Attribute{
Name: &name,
Type: &t,
Set: &mesos.Value_Set{
Item: make([]string, 0),
},
}
for k, v := range agentSetting.NoSchedule {
blog.V(3).Infof("offer(%s:%s) add noSchedule attribute(%s:%s) from agentsetting",
offer.GetId().GetValue(), offer.GetHostname(), k, v)
if k == "" || v == "" {
continue
}
noScheduleAttr.Set.Item = append(noScheduleAttr.Set.Item, fmt.Sprintf("%s=%s", k, v))
}
if len(noScheduleAttr.Set.Item) > 0 {
offer.Attributes = append(offer.Attributes, noScheduleAttr)
}

return nil
}

Expand Down
Loading

0 comments on commit 84c9755

Please sign in to comment.