Skip to content

Commit

Permalink
automod: refactor identity and account event processing (#781)
Browse files Browse the repository at this point in the history
@foysalit: this is pulling out some of the engine-layer refactors from
#708
  • Loading branch information
bnewbold authored Oct 30, 2024
2 parents 3ff7405 + 5fdfd70 commit 296087d
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 53 deletions.
10 changes: 9 additions & 1 deletion automod/capture/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/automod"
Expand Down Expand Up @@ -38,12 +39,19 @@ func ProcessCaptureRules(eng *automod.Engine, capture AccountCapture) error {
ctx := context.Background()

did := capture.AccountMeta.Identity.DID
handle := capture.AccountMeta.Identity.Handle.String()
dir := identity.NewMockDirectory()
dir.Insert(*capture.AccountMeta.Identity)
eng.Directory = &dir

// initial identity rules
eng.ProcessIdentityEvent(ctx, "new", did)
identEvent := comatproto.SyncSubscribeRepos_Identity{
Did: did.String(),
Handle: &handle,
Seq: 12345,
Time: syntax.DatetimeNow().String(),
}
eng.ProcessIdentityEvent(ctx, identEvent)

// all the post rules
for _, pr := range capture.PostRecords {
Expand Down
59 changes: 10 additions & 49 deletions automod/consumer/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,54 +80,20 @@ func (fc *FirehoseConsumer) Run(ctx context.Context) error {
},
RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "identity", did); err != nil {
if err := fc.Engine.ProcessIdentityEvent(ctx, *evt); err != nil {
fc.Logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err)
}
return nil
},
RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoAccount event", "did", evt.Did, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "account", did); err != nil {
if err := fc.Engine.ProcessAccountEvent(ctx, *evt); err != nil {
fc.Logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err)
}
return nil
},
// TODO: deprecated
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "handle", did); err != nil {
fc.Logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
}
return nil
},
// TODO: deprecated
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
atomic.StoreInt64(&fc.lastSeq, evt.Seq)
did, err := syntax.ParseDID(evt.Did)
if err != nil {
fc.Logger.Error("bad DID in RepoTombstone event", "did", evt.Did, "seq", evt.Seq, "err", err)
return nil
}
if err := fc.Engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil {
fc.Logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err)
}
return nil
},
// NOTE: no longer process #handle events
// NOTE: no longer process #tombstone events
}

var scheduler events.Scheduler
Expand Down Expand Up @@ -176,13 +142,6 @@ func (fc *FirehoseConsumer) HandleRepoCommit(ctx context.Context, evt *comatprot
return nil
}

// empty commit is a special case, temporarily, basically indicates "new account"
if len(evt.Ops) == 0 {
if err := fc.Engine.ProcessIdentityEvent(ctx, "create", did); err != nil {
fc.Logger.Error("processing handle update failed", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq, "err", err)
}
}

for _, op := range evt.Ops {
logger = logger.With("eventKind", op.Action, "path", op.Path)
collection, rkey, err := splitRepoPath(op.Path)
Expand Down Expand Up @@ -215,27 +174,29 @@ func (fc *FirehoseConsumer) HandleRepoCommit(ctx context.Context, evt *comatprot
break
}
recCID := syntax.CID(op.Cid.String())
err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{
op := automod.RecordOp{
Action: action,
DID: did,
Collection: collection,
RecordKey: rkey,
CID: &recCID,
RecordCBOR: *recCBOR,
})
}
err = fc.Engine.ProcessRecordOp(ctx, op)
if err != nil {
logger.Error("engine failed to process record", "err", err)
continue
}
case repomgr.EvtKindDeleteRecord:
err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{
op := automod.RecordOp{
Action: automod.DeleteOp,
DID: did,
Collection: collection,
RecordKey: rkey,
CID: nil,
RecordCBOR: nil,
})
}
err = fc.Engine.ProcessRecordOp(ctx, op)
if err != nil {
logger.Error("engine failed to process record", "err", err)
continue
Expand Down
84 changes: 81 additions & 3 deletions automod/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/automod/cachestore"
Expand Down Expand Up @@ -53,21 +54,26 @@ type EngineConfig struct {
SkipAccountMeta bool
}

// Entrypoint for external code pushing arbitrary identity events in to the engine.
// Entrypoint for external code pushing #identity events in to the engine.
//
// This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel.
func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syntax.DID) error {
func (eng *Engine) ProcessIdentityEvent(ctx context.Context, evt comatproto.SyncSubscribeRepos_Identity) error {
eventProcessCount.WithLabelValues("identity").Inc()
start := time.Now()
defer func() {
duration := time.Since(start)
eventProcessDuration.WithLabelValues("identity").Observe(duration.Seconds())
}()

did, err := syntax.ParseDID(evt.Did)
if err != nil {
return fmt.Errorf("bad DID in repo #identity event (%s): %w", evt.Did, err)
}

// similar to an HTTP server, we want to recover any panics from rule execution
defer func() {
if r := recover(); r != nil {
eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", typ)
eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", "identity")
eventErrorCount.WithLabelValues("identity").Inc()
}
}()
Expand All @@ -78,6 +84,7 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn
if err := eng.PurgeAccountCaches(ctx, did); err != nil {
eng.Logger.Error("failed to purge identity cache; identity rule may not run correctly", "err", err)
}
// TODO(bnewbold): if it was a tombstone, this might fail
ident, err := eng.Directory.LookupDID(ctx, did)
if err != nil {
eventErrorCount.WithLabelValues("identity").Inc()
Expand Down Expand Up @@ -118,6 +125,77 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, typ string, did syn
return nil
}

// Entrypoint for external code pushing #account events in to the engine.
//
// This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel.
func (eng *Engine) ProcessAccountEvent(ctx context.Context, evt comatproto.SyncSubscribeRepos_Account) error {
eventProcessCount.WithLabelValues("account").Inc()
start := time.Now()
defer func() {
duration := time.Since(start)
eventProcessDuration.WithLabelValues("account").Observe(duration.Seconds())
}()

did, err := syntax.ParseDID(evt.Did)
if err != nil {
return fmt.Errorf("bad DID in repo #account event (%s): %w", evt.Did, err)
}

// similar to an HTTP server, we want to recover any panics from rule execution
defer func() {
if r := recover(); r != nil {
eng.Logger.Error("automod event execution exception", "err", r, "did", did, "type", "account")
eventErrorCount.WithLabelValues("account").Inc()
}
}()
ctx, cancel := context.WithTimeout(ctx, identityEventTimeout)
defer cancel()

// first purge any caches; we need to re-resolve from scratch on account updates
if err := eng.PurgeAccountCaches(ctx, did); err != nil {
eng.Logger.Error("failed to purge account cache; account rule may not run correctly", "err", err)
}
// TODO(bnewbold): if it was a tombstone, this might fail
ident, err := eng.Directory.LookupDID(ctx, did)
if err != nil {
eventErrorCount.WithLabelValues("account").Inc()
return fmt.Errorf("resolving identity: %w", err)
}
if ident == nil {
eventErrorCount.WithLabelValues("account").Inc()
return fmt.Errorf("identity not found for DID: %s", did.String())
}

var am *AccountMeta
if !eng.Config.SkipAccountMeta {
am, err = eng.GetAccountMeta(ctx, ident)
if err != nil {
eventErrorCount.WithLabelValues("identity").Inc()
return fmt.Errorf("failed to fetch account metadata: %w", err)
}
} else {
am = &AccountMeta{
Identity: ident,
Profile: ProfileSummary{},
}
}
ac := NewAccountContext(ctx, eng, *am)
if err := eng.Rules.CallAccountRules(&ac); err != nil {
eventErrorCount.WithLabelValues("account").Inc()
return fmt.Errorf("rule execution failed: %w", err)
}
eng.CanonicalLogLineAccount(&ac)
if err := eng.persistAccountModActions(&ac); err != nil {
eventErrorCount.WithLabelValues("account").Inc()
return fmt.Errorf("failed to persist actions for account event: %w", err)
}
if err := eng.persistCounters(ctx, ac.effects); err != nil {
eventErrorCount.WithLabelValues("account").Inc()
return fmt.Errorf("failed to persist counters for account event: %w", err)
}
return nil
}

// Entrypoint for external code pushing repository updates. A simple repo commit results in multiple calls.
//
// This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel.
Expand Down
12 changes: 12 additions & 0 deletions automod/engine/ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type RuleSet struct {
RecordRules []RecordRuleFunc
RecordDeleteRules []RecordRuleFunc
IdentityRules []IdentityRuleFunc
AccountRules []AccountRuleFunc
BlobRules []BlobRuleFunc
NotificationRules []NotificationRuleFunc
OzoneEventRules []OzoneEventRuleFunc
Expand Down Expand Up @@ -89,6 +90,17 @@ func (r *RuleSet) CallIdentityRules(c *AccountContext) error {
return nil
}

// Executes rules for account update events.
func (r *RuleSet) CallAccountRules(c *AccountContext) error {
for _, f := range r.AccountRules {
err := f(c)
if err != nil {
c.Logger.Error("account rule execution failed", "err", err)
}
}
return nil
}

func (r *RuleSet) CallNotificationRules(c *NotificationContext) error {
for _, f := range r.NotificationRules {
err := f(c)
Expand Down
1 change: 1 addition & 0 deletions automod/engine/ruletypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
)

type IdentityRuleFunc = func(c *AccountContext) error
type AccountRuleFunc = func(c *AccountContext) error
type RecordRuleFunc = func(c *RecordContext) error
type PostRuleFunc = func(c *RecordContext, post *appbsky.FeedPost) error
type ProfileRuleFunc = func(c *RecordContext, profile *appbsky.ActorProfile) error
Expand Down

0 comments on commit 296087d

Please sign in to comment.