diff --git a/agreementbot/agreementbot.go b/agreementbot/agreementbot.go index 92bde07d9..41690ca87 100644 --- a/agreementbot/agreementbot.go +++ b/agreementbot/agreementbot.go @@ -1046,6 +1046,7 @@ func (w *AgreementBotWorker) syncOnInit() error { if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil { neededBCInstances := make(map[string]map[string]map[string]bool) + bPolicyCheckingMap := make(map[string]bool) for _, ag := range agreements { @@ -1061,11 +1062,13 @@ func (w *AgreementBotWorker) syncOnInit() error { } neededBCInstances[bcOrg][bcType][bcName] = true + var pol *policy.Policy + // If the agreement has received a reply then we just need to make sure that the policy manager's agreement counts // are correct. Even for already timedout agreements, the governance process will cleanup old and outdated agreements, // so we don't need to do anything here. if ag.AgreementCreationTime != 0 { - if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil { + if pol, err = policy.DemarshalPolicy(ag.Policy); err != nil { glog.Errorf(AWlogString(fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err))) } else if existingPol := w.pm.GetPolicy(ag.Org, pol.Header.Name); existingPol == nil { glog.Errorf(AWlogString(fmt.Sprintf("agreement %v has a policy %v that doesn't exist anymore", ag.CurrentAgreementId, pol.Header.Name))) @@ -1119,6 +1122,49 @@ func (w *AgreementBotWorker) syncOnInit() error { glog.V(3).Infof(AWlogString(fmt.Sprintf("added agreement %v to policy agreement counter.", ag.CurrentAgreementId))) } + // After checking the policy, add it in to a map. In Each for loop which iterate the agreements, checking if current policy inside agreement has been handled or not + // re-evaluate the agreement + if pol != nil && !bPolicyCheckingMap[pol.Header.Name] { + glog.V(3).Infof(AWlogString(fmt.Sprintf("checking policy against exchange for agreement %v.", ag.CurrentAgreementId))) + if exchPols, err := exchange.GetBusinessPolicies(w, exchange.GetOrg(pol.Header.Name), exchange.GetId(pol.Header.Name)); err != nil { + glog.Errorf(AWlogString(fmt.Sprintf("error getting business policies from exchange for org: %v, policy name: %v error: %v", ag.Org, pol.Header.Name, err))) + } else if len(exchPols) == 0 { + glog.V(3).Infof(AWlogString(fmt.Sprintf("business policy %v from agreement %v is not found from exchange.", pol.Header.Name, ag.CurrentAgreementId))) + // Need to cancel the agreement + policyDeletedMsg := events.NewPolicyDeletedMessage(events.DELETED_POLICY, "", pol.Header.Name, exchange.GetOrg(pol.Header.Name), ag.Policy) + pdCmd := NewPolicyDeletedCommand(*policyDeletedMsg) + // Queue the command to the relevant protocol handler for further processing. + if w.consumerPH.Get(agp).AcceptCommand(pdCmd) { + w.consumerPH.Get(agp).HandlePolicyDeleted(pdCmd, w.consumerPH.Get(agp)) + } + } else { + for polId, exchPol := range exchPols { + bPol := exchPol.GetBusinessPolicy() + if exPolicy, err := bPol.GenPolicyFromBusinessPolicy(polId); err != nil { + glog.Errorf(AWlogString(fmt.Sprintf("error generating internal business policies for org: %v, policy name: %v from %v, error: %v", ag.Org, pol.Header.Name, exchPol.String(), err))) + } else if exPolicy == nil { + glog.Errorf(AWlogString(fmt.Sprintf("the generated internal business policies is nil for org: %v, policy name: %v from %v", ag.Org, pol.Header.Name, exchPol.String()))) + } else if exPolicyString, err := policy.MarshalPolicy(exPolicy); err != nil { + glog.Errorf(fmt.Sprintf("Error trying to marshal internal business policy %v error: %v", exPolicy, err)) + } else { + // If business policy has been changed during a restart, handle it + glog.V(3).Infof(AWlogString(fmt.Sprintf("re-evaluate agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name))) + + // Call HandlePolicyChanged() function directly because at this point, the agbot is not set to ready to accept messages + policyChangedMsg := events.NewPolicyChangedMessage(events.CHANGED_POLICY, "", pol.Header.Name, ag.Org, exPolicyString, pol) + pcCmd := NewPolicyChangedCommand(*policyChangedMsg) + // Queue the command to the relevant protocol handler for further processing. + if w.consumerPH.Get(agp).AcceptCommand(pcCmd) { + w.consumerPH.Get(agp).HandlePolicyChanged(pcCmd, w.consumerPH.Get(agp)) + } + } + } + } + bPolicyCheckingMap[pol.Header.Name] = true + } else { + glog.V(3).Infof(AWlogString(fmt.Sprintf("skip checking policy %v for agreement %v", pol, ag.CurrentAgreementId))) + } + // This state should never occur, but could if there was an error along the way. It means that a DB record // was created for this agreement but the record was never updated with the creation time, which is supposed to occur // immediately following creation of the record. Further, if this were to occur, then the exchange should not have been @@ -1478,7 +1524,6 @@ func (w *AgreementBotWorker) databaseHeartBeat() int { // Ask the database to check for stale partitions and move them into our partition if one is found. func (w *AgreementBotWorker) stalePartitions() int { - // Dont try to grab a stale partition if we are unable to heartbeat. now := uint64(time.Now().Unix()) if hb, err := w.db.GetHeartbeat(); err != nil { diff --git a/agreementbot/persistence/postgresql/secrets.go b/agreementbot/persistence/postgresql/secrets.go index fb0040f93..31a1ccd93 100644 --- a/agreementbot/persistence/postgresql/secrets.go +++ b/agreementbot/persistence/postgresql/secrets.go @@ -65,9 +65,9 @@ const SECRET_DELETE_PATTERN = `DELETE FROM "secrets_pattern_ WHERE secret_org = const SECRET_MOVE = `WITH moved_rows AS ( DELETE FROM "secrets_pattern_ a - RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check + RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check, a.secret_exists ) -INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING; +INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING; ` const SECRET_DROP_PARTITION_POLICY = `DROP TABLE "secrets_policy_;`