Skip to content

Commit

Permalink
Issue #4156 - Bug: On restart, agbot doesn't update a node even thoug…
Browse files Browse the repository at this point in the history
…h a new service was added and a deployment policy update occurred

Signed-off-by: Le Zhang <[email protected]>
  • Loading branch information
LiilyZhang committed Oct 8, 2024
1 parent 79e6077 commit c1fafee
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
49 changes: 47 additions & 2 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,7 @@ func (w *AgreementBotWorker) syncOnInit() error {
if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil {

neededBCInstances := make(map[string]map[string]map[string]bool)
bPolicyCheckingMap := make(map[string]bool)

for _, ag := range agreements {

Expand All @@ -1061,11 +1062,13 @@ func (w *AgreementBotWorker) syncOnInit() error {
}
neededBCInstances[bcOrg][bcType][bcName] = true

var pol *policy.Policy

// If the agreement has received a reply then we just need to make sure that the policy manager's agreement counts
// are correct. Even for already timedout agreements, the governance process will cleanup old and outdated agreements,
// so we don't need to do anything here.
if ag.AgreementCreationTime != 0 {
if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil {
if pol, err = policy.DemarshalPolicy(ag.Policy); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err)))
} else if existingPol := w.pm.GetPolicy(ag.Org, pol.Header.Name); existingPol == nil {
glog.Errorf(AWlogString(fmt.Sprintf("agreement %v has a policy %v that doesn't exist anymore", ag.CurrentAgreementId, pol.Header.Name)))
Expand Down Expand Up @@ -1119,6 +1122,49 @@ func (w *AgreementBotWorker) syncOnInit() error {
glog.V(3).Infof(AWlogString(fmt.Sprintf("added agreement %v to policy agreement counter.", ag.CurrentAgreementId)))
}

// After checking the policy, add it in to a map. In Each for loop which iterate the agreements, checking if current policy inside agreement has been handled or not
// re-evaluate the agreement
if pol != nil && !bPolicyCheckingMap[pol.Header.Name] {
glog.V(3).Infof(AWlogString(fmt.Sprintf("checking policy against exchange for agreement %v.", ag.CurrentAgreementId)))
if exchPols, err := exchange.GetBusinessPolicies(w, exchange.GetOrg(pol.Header.Name), exchange.GetId(pol.Header.Name)); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("error getting business policies from exchange for org: %v, policy name: %v error: %v", ag.Org, pol.Header.Name, err)))
} else if len(exchPols) == 0 {
glog.V(3).Infof(AWlogString(fmt.Sprintf("business policy %v from agreement %v is not found from exchange.", pol.Header.Name, ag.CurrentAgreementId)))
// Need to cancel the agreement
policyDeletedMsg := events.NewPolicyDeletedMessage(events.DELETED_POLICY, "", pol.Header.Name, exchange.GetOrg(pol.Header.Name), ag.Policy)
pdCmd := NewPolicyDeletedCommand(*policyDeletedMsg)
// Queue the command to the relevant protocol handler for further processing.
if w.consumerPH.Get(agp).AcceptCommand(pdCmd) {
w.consumerPH.Get(agp).HandlePolicyDeleted(pdCmd, w.consumerPH.Get(agp))
}
} else {
for polId, exchPol := range exchPols {
bPol := exchPol.GetBusinessPolicy()
if exPolicy, err := bPol.GenPolicyFromBusinessPolicy(polId); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("error generating internal business policies for org: %v, policy name: %v from %v, error: %v", ag.Org, pol.Header.Name, exchPol.String(), err)))
} else if exPolicy == nil {
glog.Errorf(AWlogString(fmt.Sprintf("the generated internal business policies is nil for org: %v, policy name: %v from %v", ag.Org, pol.Header.Name, exchPol.String())))
} else if exPolicyString, err := policy.MarshalPolicy(exPolicy); err != nil {
glog.Errorf(fmt.Sprintf("Error trying to marshal internal business policy %v error: %v", exPolicy, err))
} else {
// If business policy has been changed during a restart, handle it
glog.V(3).Infof(AWlogString(fmt.Sprintf("re-evaluate agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name)))

// Call HandlePolicyChanged() function directly because at this point, the agbot is not set to ready to accept messages
policyChangedMsg := events.NewPolicyChangedMessage(events.CHANGED_POLICY, "", pol.Header.Name, ag.Org, exPolicyString, pol)
pcCmd := NewPolicyChangedCommand(*policyChangedMsg)
// Queue the command to the relevant protocol handler for further processing.
if w.consumerPH.Get(agp).AcceptCommand(pcCmd) {
w.consumerPH.Get(agp).HandlePolicyChanged(pcCmd, w.consumerPH.Get(agp))
}
}
}
}
bPolicyCheckingMap[pol.Header.Name] = true
} else {
glog.V(3).Infof(AWlogString(fmt.Sprintf("skip checking policy %v for agreement %v", pol, ag.CurrentAgreementId)))
}

// This state should never occur, but could if there was an error along the way. It means that a DB record
// was created for this agreement but the record was never updated with the creation time, which is supposed to occur
// immediately following creation of the record. Further, if this were to occur, then the exchange should not have been
Expand Down Expand Up @@ -1478,7 +1524,6 @@ func (w *AgreementBotWorker) databaseHeartBeat() int {

// Ask the database to check for stale partitions and move them into our partition if one is found.
func (w *AgreementBotWorker) stalePartitions() int {

// Dont try to grab a stale partition if we are unable to heartbeat.
now := uint64(time.Now().Unix())
if hb, err := w.db.GetHeartbeat(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions agreementbot/persistence/postgresql/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ const SECRET_DELETE_PATTERN = `DELETE FROM "secrets_pattern_ WHERE secret_org =

const SECRET_MOVE = `WITH moved_rows AS (
DELETE FROM "secrets_pattern_ a
RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check
RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check, a.secret_exists
)
INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING;
INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING;
`

const SECRET_DROP_PARTITION_POLICY = `DROP TABLE "secrets_policy_;`
Expand Down

0 comments on commit c1fafee

Please sign in to comment.