Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue open-horizon#4146 - Bug: Searching business policies will stop … #4153

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions agreementbot/node_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ type NodeSearch struct {
ec exchange.ExchangeContext
msgs chan events.Message // Outgoing internal event messages are placed here.
nextScanIntervalS uint64 // The interval between scans when there are changes in the system. It allows the system to process existing work before injecting new agreements.
errRecanIntervalS uint64 // The interval between scans if error occured during last scan
fullRescanIntervalS uint64 // The interval between scans when there are NOT changes in the system. This is a safety net in case changes are missed.
lastSearchHasErr bool // Indicate there is an error happened during prvious round of node search
lastSearchComplete bool
lastSearchTime uint64
searchThread chan bool
Expand All @@ -44,7 +46,9 @@ type NodeSearch struct {
func NewNodeSearch() *NodeSearch {
ns := &NodeSearch{
nextScanIntervalS: 0,
errRecanIntervalS: 0,
fullRescanIntervalS: 0,
lastSearchHasErr: false,
lastSearchComplete: true,
lastSearchTime: 0,
searchThread: make(chan bool, 10),
Expand All @@ -64,6 +68,7 @@ func (n *NodeSearch) Init(db persistence.AgbotDatabase, pm *policy.PolicyManager
n.msgs = msgs
n.ec = ec
n.nextScanIntervalS = cfg.AgreementBot.NewContractIntervalS
n.errRecanIntervalS = cfg.GetAgbotErrReschanInterval()
n.fullRescanIntervalS = cfg.GetAgbotFullRescan()
n.batchSize = cfg.GetAgbotAgreementBatchSize()
n.activeDeviceTimeoutS = cfg.AgreementBot.ActiveDeviceTimeoutS
Expand Down Expand Up @@ -111,6 +116,26 @@ func (n *NodeSearch) IsRescanNeeded() bool {
return n.rescanNeeded
}

// Indicate there is a error happened during last scan
func (n *NodeSearch) setLastSearchHasErr() {
n.rescanLock.Lock()
defer n.rescanLock.Unlock()
n.lastSearchHasErr = true
}

func (n *NodeSearch) UnsetLastSearchHasErr() {
n.rescanLock.Lock()
defer n.rescanLock.Unlock()
n.lastSearchHasErr = false
}

// Check if there is an error happened during last node scan. This function is thread safe.
func (n *NodeSearch) LastSearchHasErr() bool {
n.rescanLock.Lock()
defer n.rescanLock.Unlock()
return n.lastSearchHasErr
}

// This is the main driving function in this object. It will initiate a node scan if needed, using an exiting search session or obtain a new one if needed.
// The actual processing of a node scan for all policies and patterns is actually performed on a sub-thread. This function also also handles updating
// itself if a previous scan has completed since the last time this method was called.
Expand All @@ -136,7 +161,7 @@ func (n *NodeSearch) Scan() {
go n.findAndMakeAgreements()
}

// If changes in the system have occurred such that a rescan is needed, start a scan now.
// If changes in the system have occurred such that a rescan is needed, start a scan now. nextScanIntervalS is set to 1 by default
if n.lastSearchComplete && n.IsRescanNeeded() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.nextScanIntervalS)) {
n.lastSearchTime = uint64(time.Now().Unix())
glog.V(3).Infof(AWlogString("Polling Exchange"))
Expand All @@ -145,6 +170,14 @@ func (n *NodeSearch) Scan() {
go n.findAndMakeAgreements()
}

// If error happens and rescan is needed, start a scan
if n.lastSearchComplete && n.LastSearchHasErr() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.errRecanIntervalS)) {
n.lastSearchTime = uint64(time.Now().Unix())
glog.V(3).Infof(AWlogString("Polling Exchange (recover)"))
n.lastSearchComplete = false
n.UnsetLastSearchHasErr()
go n.findAndMakeAgreements()
}
}

// Go through all the patterns and deployment polices and make agreements. This function runs on a sub-thread of the agbot
Expand All @@ -155,6 +188,10 @@ func (n *NodeSearch) findAndMakeAgreements() {
glog.Errorf(AWlogString(fmt.Sprintf("unable to dump search session records, error: %v", err)))
}

if n.LastSearchHasErr() {
n.UnsetLastSearchHasErr()
}

// Errors encountered during the search will cause the next set of searches to be performed with the same changedSince
// time and the same search session.
searchError := false
Expand Down Expand Up @@ -201,7 +238,7 @@ func (n *NodeSearch) findAndMakeAgreements() {
_, polName := cutil.SplitOrgSpecUrl(consumerPolicy.Header.Name)

// Get the hash of the business policy entry
pBE_hash := string(pBE.Hash)
pBE_hash := org + string(pBE.Hash)

//Check to see if we have already searched this policy... makes sure we check other policies before circling back and repeating re-searching ones we already did
_, ok := n.completedSearches[pBE_hash] // Use the hash since the policy may get updated which would change the hash but not the name. Do want to search changed/new policies
Expand All @@ -210,6 +247,7 @@ func (n *NodeSearch) findAndMakeAgreements() {
if lastPage, err := n.searchNodesAndMakeAgreements(&consumerPolicy, org, polName, pBE.Updated); err != nil {
// Dont move the changed since time forward since there was an error.
searchError = true
glog.Errorf(AWlogString(fmt.Sprintf("received error searching for nodes under org %v: %v", org, err)))
break
} else if !lastPage {
// The search returned a large number of results that need to be processed. Let the system work on them
Expand All @@ -229,14 +267,11 @@ func (n *NodeSearch) findAndMakeAgreements() {
}

}
if searchError {
break
}
}

// Done scanning all nodes across all policies, and no errors were encountered.
if searchError {
n.SetRescanNeeded()
n.setLastSearchHasErr()
}

if doClearSearchedMap {
Expand Down Expand Up @@ -507,7 +542,6 @@ func (n *NodeSearch) searchExchange(pol *policy.Policy, polOrg string, polName s
for {

glog.V(3).Infof(AWlogString(fmt.Sprintf("searching %v with %v", pol.Header.Name, ser)))

// Invoke the exchange and return the device list or any hard errors that occur.
resp, err := exchange.GetHTTPAgbotPolicyNodeSearchHandler(n.ec)(&ser, polOrg, polName)
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type AGConfig struct {
AgreementQueueSize uint64 // The agreement bot work queue max size.
MessageQueueScale float64 // Scaling factor applied to the AgreementQueueSize when determining how deep to keep the queues.
QueueHistorySize int // The number of statistics records to retain in the prioritized queue history.
ErrRescanS uint64 // The number of seconds between rescan if error occurs from last rescan
FullRescanS uint64 // The number of seconds between policy scans when there have been no changes reported by the exchange.
MaxExchangeChanges int // The maximum number of exchange changes to request on a given call the exchange /changes API.
RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried.
Expand Down Expand Up @@ -222,6 +223,10 @@ func (c *HorizonConfig) GetAgbotQueueHistorySize() int {
return c.AgreementBot.QueueHistorySize
}

func (c *HorizonConfig) GetAgbotErrReschanInterval() uint64 {
return c.AgreementBot.ErrRescanS
}

func (c *HorizonConfig) GetAgbotFullRescan() uint64 {
return c.AgreementBot.FullRescanS
}
Expand Down Expand Up @@ -397,6 +402,7 @@ func Read(file string) (*HorizonConfig, error) {
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
ErrRescanS: AgbotErrRescan_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
Expand Down Expand Up @@ -585,6 +591,7 @@ func (agc *AGConfig) String() string {
", MessageQueueScale: %v"+
", QueueHistorySize: %v"+
", FullRescanS: %v"+
", ErrRescanS: %v"+
", MaxExchangeChanges: %v"+
", RetryLookBackWindow: %v"+
", PolicySearchOrder: %v"+
Expand All @@ -596,7 +603,7 @@ func (agc *AGConfig) String() string {
mask, agc.DVPrefix, agc.ActiveDeviceTimeoutS, agc.ExchangeMessageTTL, agc.MessageKeyPath, mask, agc.APIListen,
agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey,
agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize,
agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.MaxExchangeChanges,
agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges,
agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault)
}

Expand Down
3 changes: 3 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ const AgbotMessageQueueScale_DEFAULT = 33.0
// The default number of prioritized queue history records to keep before aging out the old ones.
const AgbotQueueHistorySize_DEFAULT = 30

// The default full rescan interval if error happens during the node search
const AgbotErrRescan_DEFAULT = 15

// The default full rescan interval
const AgbotFullRescan_DEFAULT = 600

Expand Down
Loading