diff --git a/haproxy/dataplane.go b/haproxy/dataplane.go index 3e0cd26..b542a19 100644 --- a/haproxy/dataplane.go +++ b/haproxy/dataplane.go @@ -30,19 +30,29 @@ type dataplaneClient struct { type tnx struct { txID string client *dataplaneClient + + after []func() error } -func (c *dataplaneClient) Tnx() (*tnx, error) { +func (c *dataplaneClient) Tnx() *tnx { + return &tnx{ + client: c, + } +} + +func (t *tnx) ensureTnx() error { + if t.txID != "" { + return nil + } res := models.Transaction{} - err := c.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/transactions?version=%d", c.version), nil, &res) + err := t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/transactions?version=%d", t.client.version), nil, &res) if err != nil { - return nil, err + return err } - return &tnx{ - txID: res.ID, - client: c, - }, nil + t.txID = res.ID + + return nil } func (c *dataplaneClient) Info() (*models.ProcessInfo, error) { @@ -64,57 +74,113 @@ func (c *dataplaneClient) Stats() (models.NativeStats, error) { } func (t *tnx) Commit() error { - err := t.client.makeReq(http.MethodPut, fmt.Sprintf("/v1/services/haproxy/transactions/%s", t.txID), nil, nil) - if err != nil { - return err + if t.txID != "" { + err := t.client.makeReq(http.MethodPut, fmt.Sprintf("/v1/services/haproxy/transactions/%s", t.txID), nil, nil) + if err != nil { + return err + } + + t.client.version++ } - t.client.version++ + for _, f := range t.after { + err := f() + if err != nil { + return err + } + } return nil } +func (t *tnx) After(fn func() error) { + t.after = append(t.after, fn) +} + func (t *tnx) CreateFrontend(fe models.Frontend) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/configuration/frontends?transaction_id=%s", t.txID), fe, nil) } func (t *tnx) DeleteFrontend(name string) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodDelete, fmt.Sprintf("/v1/services/haproxy/configuration/frontends/%s?transaction_id=%s", name, t.txID), nil, nil) } func (t *tnx) CreateBind(feName string, bind models.Bind) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/configuration/binds?frontend=%s&transaction_id=%s", feName, t.txID), bind, nil) } func (t *tnx) DeleteBackend(name string) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodDelete, fmt.Sprintf("/v1/services/haproxy/configuration/backends/%s?transaction_id=%s", name, t.txID), nil, nil) } func (t *tnx) CreateBackend(be models.Backend) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/configuration/backends?transaction_id=%s", t.txID), be, nil) } func (t *tnx) CreateServer(beName string, srv models.Server) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/configuration/servers?backend=%s&transaction_id=%s", beName, t.txID), srv, nil) } func (t *tnx) ReplaceServer(beName string, srv models.Server) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPut, fmt.Sprintf("/v1/services/haproxy/configuration/servers/%s?backend=%s&transaction_id=%s", srv.Name, beName, t.txID), srv, nil) } +func (c *dataplaneClient) ReplaceServer(beName string, srv models.Server) error { + err := c.makeReq(http.MethodPut, fmt.Sprintf("/v1/services/haproxy/configuration/servers/%s?backend=%s&version=%d", srv.Name, beName, c.version), srv, nil) + if err != nil { + return err + } + + c.version++ + return nil +} + func (t *tnx) DeleteServer(beName string, name string) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodDelete, fmt.Sprintf("/v1/services/haproxy/configuration/servers/%s?backend=%s&transaction_id=%s", name, beName, t.txID), nil, nil) } func (t *tnx) CreateFilter(parentType, parentName string, filter models.Filter) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/configuration/filters?parent_type=%s&parent_name=%s&transaction_id=%s", parentType, parentName, t.txID), filter, nil) } func (t *tnx) CreateTCPRequestRule(parentType, parentName string, rule models.TCPRequestRule) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/configuration/tcp_request_rules?parent_type=%s&parent_name=%s&transaction_id=%s", parentType, parentName, t.txID), rule, nil) } func (t *tnx) CreateLogTargets(parentType, parentName string, rule models.LogTarget) error { + if err := t.ensureTnx(); err != nil { + return err + } return t.client.makeReq(http.MethodPost, fmt.Sprintf("/v1/services/haproxy/configuration/log_targets?parent_type=%s&parent_name=%s&transaction_id=%s", parentType, parentName, t.txID), rule, nil) } diff --git a/haproxy/haproxy.go b/haproxy/haproxy.go index 1a8ec8c..a252542 100644 --- a/haproxy/haproxy.go +++ b/haproxy/haproxy.go @@ -27,14 +27,17 @@ type HAProxy struct { cfgC chan consul.Config currentCfg *consul.Config + upstreamServerSlots map[string][]upstreamSlot + haConfig *haConfig } func New(consulClient *api.Client, cfg chan consul.Config, opts Options) *HAProxy { return &HAProxy{ - opts: opts, - consulClient: consulClient, - cfgC: cfg, + opts: opts, + consulClient: consulClient, + cfgC: cfg, + upstreamServerSlots: make(map[string][]upstreamSlot), } } @@ -107,13 +110,10 @@ func (h *HAProxy) Run(sd *lib.Shutdown) error { } func (h *HAProxy) init() error { - tx, err := h.dataplaneClient.Tnx() - if err != nil { - return err - } + tx := h.dataplaneClient.Tnx() timeout := int64(30000) - err = tx.CreateBackend(models.Backend{ + err := tx.CreateBackend(models.Backend{ Name: "spoe_back", ServerTimeout: &timeout, ConnectTimeout: &timeout, @@ -137,12 +137,9 @@ func (h *HAProxy) init() error { } func (h *HAProxy) handleChange(cfg consul.Config) error { - tx, err := h.dataplaneClient.Tnx() - if err != nil { - return err - } + tx := h.dataplaneClient.Tnx() - err = h.handleDownstream(tx, cfg.Downstream) + err := h.handleDownstream(tx, cfg.Downstream) if err != nil { return err } diff --git a/haproxy/upstream.go b/haproxy/upstream.go index 95b5bea..60e126c 100644 --- a/haproxy/upstream.go +++ b/haproxy/upstream.go @@ -2,11 +2,18 @@ package haproxy import ( "fmt" + "math" "github.com/criteo/haproxy-consul-connect/consul" "github.com/haproxytech/models" + log "github.com/sirupsen/logrus" ) +type upstreamSlot struct { + consul.UpstreamNode + Enabled bool +} + func (h *HAProxy) deleteUpstream(tx *tnx, service string) error { feName := fmt.Sprintf("front_%s", service) beName := fmt.Sprintf("back_%s", service) @@ -23,10 +30,71 @@ func (h *HAProxy) deleteUpstream(tx *tnx, service string) error { return nil } -func (h *HAProxy) handleUpstream(tx *tnx, up consul.Upstream) error { +func (h *HAProxy) createUpstream(tx *tnx, up consul.Upstream) error { feName := fmt.Sprintf("front_%s", up.Service) beName := fmt.Sprintf("back_%s", up.Service) + timeout := int64(1000) + err := tx.CreateFrontend(models.Frontend{ + Name: feName, + DefaultBackend: beName, + ClientTimeout: &timeout, + Mode: models.FrontendModeHTTP, + Httplog: true, + }) + if err != nil { + return err + } + logID := int64(0) + err = tx.CreateLogTargets("frontend", feName, models.LogTarget{ + ID: &logID, + Address: h.haConfig.LogsSock, + Facility: models.LogTargetFacilityLocal0, + Format: models.LogTargetFormatRfc5424, + }) + if err != nil { + return err + } + + port := int64(up.LocalBindPort) + err = tx.CreateBind(feName, models.Bind{ + Name: fmt.Sprintf("%s_bind", feName), + Address: up.LocalBindAddress, + Port: &port, + }) + if err != nil { + return err + } + + err = tx.CreateBackend(models.Backend{ + Name: beName, + ServerTimeout: &timeout, + ConnectTimeout: &timeout, + Balance: &models.Balance{ + Algorithm: models.BalanceAlgorithmLeastconn, + }, + Mode: models.BackendModeHTTP, + }) + if err != nil { + return err + } + logID = int64(0) + err = tx.CreateLogTargets("backend", beName, models.LogTarget{ + ID: &logID, + Address: h.haConfig.LogsSock, + Facility: models.LogTargetFacilityLocal0, + Format: models.LogTargetFormatRfc5424, + }) + if err != nil { + return err + } + + return nil +} + +func (h *HAProxy) handleUpstream(tx *tnx, up consul.Upstream) error { + beName := fmt.Sprintf("back_%s", up.Service) + var current *consul.Upstream if h.currentCfg != nil { for _, u := range h.currentCfg.Upstreams { @@ -39,79 +107,12 @@ func (h *HAProxy) handleUpstream(tx *tnx, up consul.Upstream) error { backendDeleted := false if current != nil && !current.Equal(up) { - err := tx.DeleteFrontend(feName) - if err != nil { - return err - } - err = tx.DeleteBackend(beName) - if err != nil { - return err - } + h.deleteUpstream(tx, up.Service) backendDeleted = true } if backendDeleted || current == nil { - timeout := int64(1000) - err := tx.CreateFrontend(models.Frontend{ - Name: feName, - DefaultBackend: beName, - ClientTimeout: &timeout, - Mode: models.FrontendModeHTTP, - Httplog: true, - }) - if err != nil { - return err - } - logID := int64(0) - err = tx.CreateLogTargets("frontend", feName, models.LogTarget{ - ID: &logID, - Address: h.haConfig.LogsSock, - Facility: models.LogTargetFacilityLocal0, - Format: models.LogTargetFormatRfc5424, - }) - if err != nil { - return err - } - - port := int64(up.LocalBindPort) - err = tx.CreateBind(feName, models.Bind{ - Name: fmt.Sprintf("%s_bind", feName), - Address: up.LocalBindAddress, - Port: &port, - }) - if err != nil { - return err - } - - err = tx.CreateBackend(models.Backend{ - Name: beName, - ServerTimeout: &timeout, - ConnectTimeout: &timeout, - Balance: &models.Balance{ - Algorithm: models.BalanceAlgorithmLeastconn, - }, - Mode: models.BackendModeHTTP, - }) - if err != nil { - return err - } - logID = int64(0) - err = tx.CreateLogTargets("backend", beName, models.LogTarget{ - ID: &logID, - Address: h.haConfig.LogsSock, - Facility: models.LogTargetFacilityLocal0, - Format: models.LogTargetFormatRfc5424, - }) - if err != nil { - return err - } - } - - currentServers := map[string]consul.UpstreamNode{} - if !backendDeleted && current != nil { - for _, b := range current.Nodes { - currentServers[fmt.Sprintf("%s:%d", b.Host, b.Port)] = b - } + h.createUpstream(tx, up) } certPath, caPath, err := h.haConfig.CertsPath(up.TLS) @@ -119,51 +120,105 @@ func (h *HAProxy) handleUpstream(tx *tnx, up consul.Upstream) error { return err } - newServers := map[string]consul.UpstreamNode{} - for _, srv := range up.Nodes { - id := fmt.Sprintf("%s:%d", srv.Host, srv.Port) - newServers[id] = srv + one := int64(1) + disabledServer := models.Server{ + Address: "127.0.0.1", + Port: &one, + Weight: &one, + Ssl: models.ServerSslEnabled, + SslCertificate: certPath, + SslCafile: caPath, + Maintenance: models.ServerMaintenanceEnabled, + } + + serverSlots := h.upstreamServerSlots[up.Service] + if len(serverSlots) < len(up.Nodes) { + serverCount := int(math.Pow(2, math.Ceil(math.Log(float64(len(up.Nodes)))/math.Log(2)))) + log.Infof("increasing upstreams %s server pool size to %d", up.Service, serverCount) + newServerSlots := make([]upstreamSlot, serverCount) + copy(newServerSlots, serverSlots) + + for i := len(serverSlots); i < len(newServerSlots); i++ { + srv := disabledServer + srv.Name = fmt.Sprintf("srv_%d", i) + err := tx.CreateServer(beName, srv) + if err != nil { + return err + } + } - currentSrv, currentExists := currentServers[id] + serverSlots = newServerSlots + } - changed := currentExists && currentSrv.Weight != srv.Weight - if !changed && currentExists { + for i, slot := range serverSlots { + if slot.Host == "" { continue } - f := tx.CreateServer - if changed { - f = tx.ReplaceServer + found := false + for _, n := range up.Nodes { + if slot.Enabled && n.Equal(slot.UpstreamNode) { + found = true + break + } } - - fmt.Println(id, srv.Port) - port := int64(srv.Port) - weight := int64(srv.Weight) - serverDef := models.Server{ - Name: id, - Address: srv.Host, - Port: &port, - Weight: &weight, - Ssl: models.ServerSslEnabled, - SslCertificate: certPath, - SslCafile: caPath, + if found { + continue } - err := f(beName, serverDef) - if err != nil { - return err - } + (func(i int) { + tx.After(func() error { + srv := disabledServer + srv.Name = fmt.Sprintf("srv_%d", i) + + return h.dataplaneClient.ReplaceServer(beName, srv) + }) + })(i) + serverSlots[i].Enabled = false } - for current := range currentServers { - _, ok := newServers[current] - if !ok { - err := tx.DeleteServer(beName, current) - if err != nil { - return err +Next: + for _, node := range up.Nodes { + for _, s := range serverSlots { + if s.Enabled && node.Equal(s.UpstreamNode) { + continue Next } } + + for i, slot := range serverSlots { + if slot.Host != "" { + continue + } + + (func(i int, node consul.UpstreamNode) { + port := int64(node.Port) + weight := int64(node.Weight) + tx.After(func() error { + return h.dataplaneClient.ReplaceServer(beName, models.Server{ + Name: fmt.Sprintf("srv_%d", i), + Address: node.Host, + Port: &port, + Weight: &weight, + Ssl: models.ServerSslEnabled, + SslCertificate: certPath, + SslCafile: caPath, + Maintenance: models.ServerMaintenanceDisabled, + }) + }) + })(i, node) + + serverSlots[i] = upstreamSlot{ + UpstreamNode: node, + Enabled: true, + } + break + } } + tx.After(func() error { + h.upstreamServerSlots[up.Service] = serverSlots + return nil + }) + return nil }