diff --git a/agreementbot/agreementworker.go b/agreementbot/agreementworker.go index 5d83cfd55..ce1b28328 100644 --- a/agreementbot/agreementworker.go +++ b/agreementbot/agreementworker.go @@ -1077,11 +1077,19 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler, objPolicies := b.mmsObjMgr.GetObjectPolicies(agreement.Org, serviceNamePieces[0], serviceNamePieces[2], serviceNamePieces[1]) destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0) - if addedToList, _, err := AssignObjectToNodes(b, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, false); err != nil { + destsToDeleteMap := make(map[string]*exchange.ObjectDestinationsToDelete, 0) + + if err := AssignObjectToNodes(b, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, destsToDeleteMap, nil, false); err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to assign object(s) to node %v, error %v", agreement.DeviceId, err))) - } else if addedToList { + } + + if len(destsToAddMap) > 0 { AddDestinationsForObjects(b, destsToAddMap) } + if len(destsToDeleteMap) > 0 { + DeleteDestinationsForObjects(b, destsToDeleteMap) + } + } } else if b.GetCSSURL() == "" { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("unable to evaluate object placement because there is no CSS URL configured in this agbot"))) diff --git a/agreementbot/policy.go b/agreementbot/policy.go index 87033f3ac..89ae5d538 100644 --- a/agreementbot/policy.go +++ b/agreementbot/policy.go @@ -15,19 +15,33 @@ import ( "strings" ) +const DESTINATIONS_LOG_LIMIT = 50 + // This function is called when an object is ready to be deployed to a node. It will perform the policy compatibility test -// if necessary and will then update the object's destination list in the CSS. -func AssignObjectToNodes(ec exchange.ExchangeContext, objPolicies *exchange.ObjectDestinationPolicies, nodeId string, nodePolicy *policy.Policy, destsToAddMap map[string]*exchange.ObjectDestinationsToAdd, knownCompatible bool) (bool, map[string]*exchange.ObjectDestinationsToAdd, error) { +// if necessary and will then update the object's destination list (either adding or removing) in the CSS. If node is +// appropriately in or out of the destination map already, no action will be taken. +func AssignObjectToNodes(ec exchange.ExchangeContext, + objPolicies *exchange.ObjectDestinationPolicies, + nodeId string, + nodePolicy *policy.Policy, + destsToAddMap map[string]*exchange.ObjectDestinationsToAdd, + destsToDeleteMap map[string]*exchange.ObjectDestinationsToDelete, + destNodesMap map[string][]string, + knownCompatible bool) error { + if len(*objPolicies) == 0 { - return false, destsToAddMap, nil + return nil } getObjectHandler := exchange.GetHTTPObjectQueryHandler(ec) + getObjDestHandler := exchange.GetHTTPObjectDestinationQueryHandler(ec) // For each object policy received, make sure the object is still valid, evaluate it against the node policy if necessary, // and then update the object's destination list. for _, objPol := range *objPolicies { + objectKey := getObjectKey(objPol.OrgID, objPol.ObjectType, objPol.ObjectID) + if obj, err := getObjectHandler(objPol.OrgID, objPol.ObjectID, objPol.ObjectType); err != nil { glog.Errorf(opLogstring(fmt.Sprintf("error reading object %v %v %v, %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, err))) } else if obj == nil { @@ -35,6 +49,26 @@ func AssignObjectToNodes(ec exchange.ExchangeContext, objPolicies *exchange.Obje continue } + currentDestNodes := make([]string, 0) + + // When called from HandleMMSObjectPolicy, caller will pass in the destinations list to avoid retrieving it multiple times. Other callers will depend on this function to get the current list + if destNodesMap != nil { + currentDestNodes = destNodesMap[objectKey] + } else { + + currentObjDestinations := new(exchange.ObjectDestinationStatuses) + // Grab the current destinations of the object. + if dests, err := getObjDestHandler(objPol.OrgID, objPol.ObjectID, objPol.ObjectType); err != nil { + glog.Errorf(opLogstring(fmt.Sprintf("error reading object %v %v %v destinations, %v", objPol.OrgID, objPol.ObjectID, objPol.ObjectType, err))) + } else if dests != nil { + currentObjDestinations = dests + } + // Construct a list of destinations where the object currently lives + for _, destStatus := range *currentObjDestinations { + currentDestNodes = append(currentDestNodes, destStatus.DestID) + } + } + // The caller might have already done the compatibility test. if !knownCompatible { if glog.V(5) { @@ -56,33 +90,48 @@ func AssignObjectToNodes(ec exchange.ExchangeContext, objPolicies *exchange.Obje // properties plus service policy properties in the model policy properties. nodePolicy.Constraints = []string{} - // Check if node and model polices are compatible. Incompatible policies are not necessarily an error so just log a warning and return. + // Check if node and model polices are compatible. Incompatible policies are not necessarily an error so just log a warning. + // If the node is in the destination list, the return code will indicate to remove it and then return. if err := policy.Are_Compatible(nodePolicy, internalObjPol, nil); err != nil { glog.Warningf(opLogstring(fmt.Sprintf("error matching node policy %v and object policy %v, error: %v", nodePolicy, internalObjPol, err))) - return false, destsToAddMap, nil + + // If it was in the destination list, need to remove it + if cutil.SliceContains(currentDestNodes, exchange.GetId(nodeId)) { + UnassignObjectFromNodes(ec, &objPol, nodeId, destsToDeleteMap) + } + + continue } else { glog.V(3).Infof(opLogstring(fmt.Sprintf("node %v is compatible with object %v/%v with type %v", nodeId, objPol.OrgID, objPol.ObjectID, objPol.ObjectType))) } } - // Policies are compatible so add this node to destination list for the object. - dest := "openhorizon.edgenode:" + exchange.GetId(nodeId) - if glog.V(5) { - glog.Infof(opLogstring(fmt.Sprintf("adding node %v to destination list for object %v:%v:%v", dest, objPol.OrgID, objPol.ObjectType, objPol.ObjectID))) - } + // Policies are compatible so add this node to destination list for the object if it is not currently there. + found := cutil.SliceContains(currentDestNodes, exchange.GetId(nodeId)) - objKey := getObjectKey(objPol.OrgID, objPol.ObjectType, objPol.ObjectID) - if _, ok := destsToAddMap[objKey]; !ok { - destsToAdd := new(exchange.ObjectDestinationsToAdd) - (*destsToAdd) = append((*destsToAdd), dest) - destsToAddMap[objKey] = destsToAdd + if found { + if glog.V(5) { + glog.Infof(opLogstring(fmt.Sprintf("node %v already found in destination list", exchange.GetId(nodeId)))) + } } else { - destsToAdd := destsToAddMap[objKey] - (*destsToAdd) = append((*destsToAdd), dest) + + dest := "openhorizon.edgenode:" + exchange.GetId(nodeId) + if glog.V(5) { + glog.Infof(opLogstring(fmt.Sprintf("adding node %v to destination list for object %v:%v:%v", dest, objPol.OrgID, objPol.ObjectType, objPol.ObjectID))) + } + if _, ok := destsToAddMap[objectKey]; !ok { + destsToAdd := new(exchange.ObjectDestinationsToAdd) + (*destsToAdd) = append((*destsToAdd), dest) + destsToAddMap[objectKey] = destsToAdd + } else { + destsToAdd := destsToAddMap[objectKey] + (*destsToAdd) = append((*destsToAdd), dest) + } } } - return true, destsToAddMap, nil + + return nil } // This function is called to remove an object from a node. It is assumed that the caller has already done the @@ -107,14 +156,42 @@ func UnassignObjectFromNodes(ec exchange.ExchangeContext, objPol *exchange.Objec return nil } +// Send a bulk add/delete to CSS if batch limit size reached +func (w *BaseAgreementWorker) CheckDestListBulkBatchSize(destsToAddMap map[string]*exchange.ObjectDestinationsToAdd, destsToDeleteMap map[string]*exchange.ObjectDestinationsToDelete, CSSDestinationBatchSize int) (map[string]*exchange.ObjectDestinationsToAdd, map[string]*exchange.ObjectDestinationsToDelete) { + // bulk Add and Remove destinations if limit reached + addDestLimitReached := false + for _, destsToAddTmp := range destsToAddMap { + if len(*destsToAddTmp) > CSSDestinationBatchSize { + addDestLimitReached = true + } + } + if addDestLimitReached { + AddDestinationsForObjects(w, destsToAddMap) + destsToAddMap = make(map[string]*exchange.ObjectDestinationsToAdd, 0) + } + deleteDestLimitReached := false + for _, destsToDeleteTmp := range destsToDeleteMap { + if len(*destsToDeleteTmp) > CSSDestinationBatchSize { + deleteDestLimitReached = true + } + } + if deleteDestLimitReached { + DeleteDestinationsForObjects(w, destsToDeleteMap) + destsToDeleteMap = make(map[string]*exchange.ObjectDestinationsToDelete, 0) + } + + return destsToAddMap, destsToDeleteMap +} + func AddDestinationsForObjects(ec exchange.ExchangeContext, destsToAddMap map[string]*exchange.ObjectDestinationsToAdd) { glog.V(3).Infof(opLogstring(fmt.Sprintf("Start to call CSS to add destinations"))) postDestHandler := exchange.GetHTTPAddOrRemoveObjectDestinationHandler(ec) for key, destsToAdd := range destsToAddMap { objOrg, objType, objID := extractObjectKey(key) - glog.V(3).Infof(opLogstring(fmt.Sprintf("adding %d destinations for object %v of type %v", len(*destsToAdd), objID, objType))) - if glog.V(3) && len(*destsToAdd) < 50 { - glog.Infof(opLogstring(fmt.Sprintf("Added destinations: %v", *destsToAdd))) + if len(*destsToAdd) < DESTINATIONS_LOG_LIMIT { + glog.V(3).Infof(opLogstring(fmt.Sprintf("adding %d destinations %v for object %v of type %v", len(*destsToAdd), *destsToAdd, objID, objType))) + } else { + glog.V(3).Infof(opLogstring(fmt.Sprintf("adding %d destinations %v... for object %v of type %v", len(*destsToAdd), (*destsToAdd)[:DESTINATIONS_LOG_LIMIT], objID, objType))) } postDestRequest := exchange.PostDestsRequest{ @@ -138,9 +215,10 @@ func DeleteDestinationsForObjects(ec exchange.ExchangeContext, destsToDeleteMap getObjectHandler := exchange.GetHTTPObjectQueryHandler(ec) for key, destsToDelete := range destsToDeleteMap { objOrg, objType, objID := extractObjectKey(key) - glog.V(3).Infof(opLogstring(fmt.Sprintf("deleting %d destinations for object %v of type %v", len(*destsToDelete), objID, objType))) - if glog.V(3) && len(*destsToDelete) < 50 { - glog.Infof(opLogstring(fmt.Sprintf("Deleted destinations: %v", *destsToDelete))) + if len(*destsToDelete) < DESTINATIONS_LOG_LIMIT { + glog.V(3).Infof(opLogstring(fmt.Sprintf("deleting %d destinations %v for object %v of type %v", len(*destsToDelete), *destsToDelete, objID, objType))) + } else { + glog.V(3).Infof(opLogstring(fmt.Sprintf("deleting %d destinations %v... for object %v of type %v", len(*destsToDelete), (*destsToDelete)[:DESTINATIONS_LOG_LIMIT], objID, objType))) } postDestRequest := exchange.PostDestsRequest{ @@ -205,16 +283,28 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler, glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy NewPolicy: %v", newPolicy))) } + CSSDestinationBatchSize := w.config.AgreementBot.CSSDestinationBatchSize + // Construct a list of destinations where the object currently lives. These will be in the policy update (the new policy). - destNodes := make([]string, 0, 5) + destNodes := make([]string, 0, len(newPolicy.Destinations)) for _, dest := range newPolicy.Destinations { destNodes = append(destNodes, dest.DestID) } if glog.V(5) { - glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy current dest nodes: %v", destNodes))) + // Avoid building very long log messages + if len(destNodes) < DESTINATIONS_LOG_LIMIT { + glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy current dest nodes: %v", destNodes))) + } else { + glog.Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy current dest nodes has length %v", len(destNodes)))) + } } + // Key: {objOrg}:{objType}:{objID}, value: list of current destinations + destNodesMap := make(map[string][]string, 1) + mapDestNodesKey := getObjectKey(newPolicy.OrgID, newPolicy.ObjectType, newPolicy.ObjectID) + destNodesMap[mapDestNodesKey] = destNodes + inProgress := func() persistence.AFilter { return func(e persistence.Agreement) bool { return e.AgreementCreationTime != 0 && e.AgreementTimedout == 0 } } @@ -272,6 +362,7 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler, // Key: {objOrg}:{objType}:{objID}, value: list of destinations to add/delete destsToAddMap := make(map[string]*exchange.ObjectDestinationsToAdd, 0) destsToDeleteMap := make(map[string]*exchange.ObjectDestinationsToDelete, 0) + for _, agreement := range agreements { // if the agreement is for a service that is compatible (including arch and version range) with a service in the new policy @@ -287,18 +378,15 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler, // if agreement's node's policy is compatible with new object policy // add the agreement's node to object's destination list to bulk add - addedToList, _, err := AssignObjectToNodes(w, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, false) + err := AssignObjectToNodes(w, objPolicies, agreement.DeviceId, nodePolicy, destsToAddMap, destsToDeleteMap, destNodesMap, false) if err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) - } else if !addedToList { - // else - // add the agreement's node to destination list to bulk delete - err := UnassignObjectFromNodes(w, &newPolicy, agreement.DeviceId, destsToDeleteMap) - if err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) - } } } + + // bulk Add and Remove destinations if limit reached + destsToAddMap, destsToDeleteMap = w.CheckDestListBulkBatchSize(destsToAddMap, destsToDeleteMap, CSSDestinationBatchSize) + continue } @@ -332,6 +420,8 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler, if err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) } + // bulk Add and Remove destinations if limit reached + destsToAddMap, destsToDeleteMap = w.CheckDestListBulkBatchSize(destsToAddMap, destsToDeleteMap, CSSDestinationBatchSize) } else { // else // nothing to do, assume that the agbot which owns the agreement for the other services will handle this same policy change event appropriately diff --git a/config/config.go b/config/config.go index 5def53437..2bb0b807f 100644 --- a/config/config.go +++ b/config/config.go @@ -126,6 +126,7 @@ type AGConfig struct { PolicySearchOrder bool // When true, search policies from most recently changed to least recently changed. Vault VaultConfig // The hashicorp vault config to connect to and fetch secrets from. SecretsUpdateCheck int // The number of seconds between checks for updated secrets. + CSSDestinationBatchSize int // The max number of destination updates to send to CSS in a single update. } // Contains the hashicorp vault configuration used within AGConfig. @@ -370,16 +371,17 @@ func Read(file string) (*HorizonConfig, error) { K8sCRInstallTimeoutS: K8sCRInstallTimeoutS_DEFAULT, }, AgreementBot: AGConfig{ - MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT, - AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT, - AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, - MessageQueueScale: AgbotMessageQueueScale_DEFAULT, - QueueHistorySize: AgbotQueueHistorySize_DEFAULT, - FullRescanS: AgbotFullRescan_DEFAULT, - MaxExchangeChanges: AgbotMaxChanges_DEFAULT, - RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, - PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT, - SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT, + MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT, + AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT, + AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, + MessageQueueScale: AgbotMessageQueueScale_DEFAULT, + QueueHistorySize: AgbotQueueHistorySize_DEFAULT, + FullRescanS: AgbotFullRescan_DEFAULT, + MaxExchangeChanges: AgbotMaxChanges_DEFAULT, + RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, + PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT, + SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT, + CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT, }, } @@ -554,6 +556,7 @@ func (agc *AGConfig) String() string { ", CheckUpdatedPolicyS: %v"+ ", CSSURL: %v"+ ", CSSSSLCert: %v"+ + ", CSSDestinationBatchSize: %v"+ ", AgreementBatchSize: %v"+ ", AgreementQueueSize: %v"+ ", MessageQueueScale: %v"+ @@ -569,7 +572,7 @@ func (agc *AGConfig) String() string { agc.IgnoreContractWithAttribs, agc.ExchangeURL, agc.ExchangeHeartbeat, agc.ExchangeId, mask, agc.DVPrefix, agc.ActiveDeviceTimeoutS, agc.ExchangeMessageTTL, agc.MessageKeyPath, mask, agc.APIListen, agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey, - agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.AgreementBatchSize, + agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize, agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.MaxExchangeChanges, agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault) } diff --git a/config/constants.go b/config/constants.go index 6bc7c8414..db262e687 100644 --- a/config/constants.go +++ b/config/constants.go @@ -119,3 +119,6 @@ const K8sCRInstallTimeoutS_DEFAULT = 180 // Time between secret update checks const SecretsUpdateCheck_DEFAULT = 60 + +// Batch destination size to send to CSS +const AgbotCSSDestinationBatchSize_DEFAULT = 200 diff --git a/exchange/css.go b/exchange/css.go index fb7ef6908..e81c8a66a 100644 --- a/exchange/css.go +++ b/exchange/css.go @@ -14,8 +14,6 @@ import ( // so that we can use our types when demarhsalling them, which enables us to perform compatibility checks // using these policies. -const DestinationsSizeLogLimit = 50 - type DestinationPolicy struct { // Properties is the set of properties for a particular policy Properties externalpolicy.PropertyList `json:"properties" bson:"properties"` @@ -57,10 +55,12 @@ type ObjectDestinationPolicy struct { } func (d ObjectDestinationPolicy) String() string { - if len(d.Destinations) < DestinationsSizeLogLimit { - return fmt.Sprintf("Object Destination Policy: Org %v, Type %v, ID %v, %v, Destinations %v", d.OrgID, d.ObjectType, d.ObjectID, d.DestinationPolicy, d.Destinations) + length := len(d.Destinations) + return_str := fmt.Sprintf("Object Destination Policy: Org %v, Type %v, ID %v, %v, Destinations (length %d) ", d.OrgID, d.ObjectType, d.ObjectID, d.DestinationPolicy, length) + if length < 50 { + return return_str + fmt.Sprintf("%v", d.Destinations) } else { - return fmt.Sprintf("Object Destination Policy: Org %v, Type %v, ID %v, %v, Destinations size %v", d.OrgID, d.ObjectType, d.ObjectID, d.DestinationPolicy, len(d.Destinations)) + return return_str + fmt.Sprintf("%v ... %v", d.Destinations[:25], d.Destinations[length-25:length]) } } @@ -189,10 +189,11 @@ func AddOrRemoveDestinations(ec ExchangeContext, org string, objType string, obj } } else { if glog.V(5) { - if len(postDestsRequest.Destinations) > DestinationsSizeLogLimit { - glog.Infof(rpclogString(fmt.Sprintf("%s destinations for object %v of type %v with length of destinations %v", postDestsRequest.Action, objID, objType, len(postDestsRequest.Destinations)))) + if len(postDestsRequest.Destinations) < 50 { + glog.Infof(rpclogString(fmt.Sprintf("%s destinations for object %v of type %v with {%v}", postDestsRequest.Action, objID, objType, postDestsRequest.Destinations))) } else { - glog.Infof(rpclogString(fmt.Sprintf("%s destinations for object %v of type %v with %v", postDestsRequest.Action, objID, objType, postDestsRequest.Destinations))) + length := len(postDestsRequest.Destinations) + glog.Infof(rpclogString(fmt.Sprintf("%s destinations for object %v of type %v with %v ... %v", postDestsRequest.Action, objID, objType, postDestsRequest.Destinations[:25], postDestsRequest.Destinations[length-25:length]))) } } return nil @@ -230,10 +231,14 @@ func GetObject(ec ExchangeContext, org string, objID string, objType string) (*c } else { objMeta := resp.(*common.MetaData) if objMeta.ObjectID != "" { - glog.V(5).Infof(rpclogString(fmt.Sprintf("found object %v %v for org %v: %v", objID, objType, org, objMeta))) + if glog.V(5) { + glog.Infof(rpclogString(fmt.Sprintf("found object %v %v for org %v: %v", objID, objType, org, objMeta))) + } return objMeta, nil } else { - glog.V(5).Infof(rpclogString(fmt.Sprintf("object %v %v for org %v not found", objID, objType, org))) + if glog.V(5) { + glog.Infof(rpclogString(fmt.Sprintf("object %v %v for org %v not found", objID, objType, org))) + } return nil, nil } } @@ -271,10 +276,10 @@ func GetObjectDestinations(ec ExchangeContext, org string, objID string, objType dests := resp.(*ObjectDestinationStatuses) if len(*dests) != 0 { if glog.V(5) { - if len(*(dests)) > DestinationsSizeLogLimit { - glog.Infof(rpclogString(fmt.Sprintf("found destinations for %v %v %v: length of %v", org, objID, objType, len(*(dests))))) - } else { + if len(*dests) < 50 { glog.Infof(rpclogString(fmt.Sprintf("found destinations for %v %v %v: %v", org, objID, objType, dests))) + } else { + glog.Infof(rpclogString(fmt.Sprintf("found %d destinations for %v %v %v", len(*dests), org, objID, objType))) } } return dests, nil