Skip to content

Commit

Permalink
add a user cache on the bgs (#816)
Browse files Browse the repository at this point in the history
should cut ~3% of all DB usage on the prod relay
  • Loading branch information
whyrusleeping authored Nov 16, 2024
2 parents 05b8751 + 4a15b38 commit a422903
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 23 deletions.
76 changes: 69 additions & 7 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/bluesky-social/indigo/models"
"github.com/bluesky-social/indigo/repomgr"
"github.com/bluesky-social/indigo/xrpc"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -87,6 +88,9 @@ type BGS struct {

// Management of Compaction
compactor *Compactor

// User cache
userCache *lru.Cache[string, *User]
}

type PDSResync struct {
Expand Down Expand Up @@ -136,6 +140,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
db.AutoMigrate(models.PDS{})
db.AutoMigrate(models.DomainBan{})

uc, _ := lru.New[string, *User](1_000_000)

bgs := &BGS{
Index: ix,
db: db,
Expand All @@ -151,6 +157,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
consumers: make(map[uint64]*SocketConsumer),

pdsResyncs: make(map[uint]*PDSResync),

userCache: uc,
}

ix.CreateExternalUser = bgs.createExternalUser
Expand Down Expand Up @@ -521,6 +529,44 @@ type User struct {

// UpstreamStatus is the state of the user as reported by the upstream PDS
UpstreamStatus string `gorm:"index"`

lk sync.Mutex
}

func (u *User) SetTakenDown(v bool) {
u.lk.Lock()
defer u.lk.Unlock()
u.TakenDown = v
}

func (u *User) GetTakenDown() bool {
u.lk.Lock()
defer u.lk.Unlock()
return u.TakenDown
}

func (u *User) SetTombstoned(v bool) {
u.lk.Lock()
defer u.lk.Unlock()
u.Tombstoned = v
}

func (u *User) GetTombstoned() bool {
u.lk.Lock()
defer u.lk.Unlock()
return u.Tombstoned
}

func (u *User) SetUpstreamStatus(v string) {
u.lk.Lock()
defer u.lk.Unlock()
u.UpstreamStatus = v
}

func (u *User) GetUpstreamStatus() string {
u.lk.Lock()
defer u.lk.Unlock()
return u.UpstreamStatus
}

type addTargetBody struct {
Expand Down Expand Up @@ -771,6 +817,11 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error)
ctx, span := tracer.Start(ctx, "lookupUserByDid")
defer span.End()

cu, ok := bgs.userCache.Get(did)
if ok {
return cu, nil
}

var u User
if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil {
return nil, err
Expand All @@ -780,6 +831,8 @@ func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error)
return nil, gorm.ErrRecordNotFound
}

bgs.userCache.Add(did, &u)

return &u, nil
}

Expand Down Expand Up @@ -840,20 +893,21 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
u.Did = evt.Repo
}

span.SetAttributes(attribute.String("upstream_status", u.UpstreamStatus))
ustatus := u.GetUpstreamStatus()
span.SetAttributes(attribute.String("upstream_status", ustatus))

if u.TakenDown || u.UpstreamStatus == events.AccountStatusTakendown {
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown))
if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}
Expand All @@ -877,12 +931,13 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
}
}

if u.Tombstoned {
if u.GetTombstoned() {
span.SetAttributes(attribute.Bool("tombstoned", true))
// we've checked the authority of the users PDS, so reinstate the account
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
return fmt.Errorf("failed to un-tombstone a user: %w", err)
}
u.SetTombstoned(false)

ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
Expand Down Expand Up @@ -1041,7 +1096,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
return fmt.Errorf("failed to look up user by did: %w", err)
}

if u.TakenDown {
if u.GetTakenDown() {
shouldBeActive = false
status = &events.AccountStatusTakendown
}
Expand Down Expand Up @@ -1370,18 +1425,22 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil {
return fmt.Errorf("failed to set user active status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusActive)
case events.AccountStatusDeactivated:
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil {
return fmt.Errorf("failed to set user deactivation status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusDeactivated)
case events.AccountStatusSuspended:
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil {
return fmt.Errorf("failed to set user suspension status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusSuspended)
case events.AccountStatusTakendown:
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil {
return fmt.Errorf("failed to set user taken down status: %w", err)
}
u.SetUpstreamStatus(events.AccountStatusTakendown)

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
Expand All @@ -1396,6 +1455,7 @@ func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status stri
}).Error; err != nil {
return err
}
u.SetUpstreamStatus(events.AccountStatusDeleted)

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
Expand All @@ -1422,6 +1482,7 @@ func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error {
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil {
return err
}
u.SetTakenDown(true)

if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
return err
Expand All @@ -1443,6 +1504,7 @@ func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error {
if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil {
return err
}
u.SetTakenDown(false)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion bgs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStra
return state, nil
}

func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {
func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) {
ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo")
defer span.End()
log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
Expand Down
33 changes: 18 additions & 15 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,24 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
if u.GetTombstoned() {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
if u.GetTakenDown() {
return nil, fmt.Errorf("account was taken down by the Relay")
}

if u.UpstreamStatus == events.AccountStatusTakendown {
ustatus := u.GetUpstreamStatus()
if ustatus == events.AccountStatusTakendown {
return nil, fmt.Errorf("account was taken down by its PDS")
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
return nil, fmt.Errorf("account is temporarily deactivated")
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
return nil, fmt.Errorf("account is suspended by its PDS")
}

Expand Down Expand Up @@ -91,23 +92,24 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
if u.GetTombstoned() {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
if u.GetTakenDown() {
return nil, fmt.Errorf("account was taken down by the Relay")
}

if u.UpstreamStatus == events.AccountStatusTakendown {
ustatus := u.GetUpstreamStatus()
if ustatus == events.AccountStatusTakendown {
return nil, fmt.Errorf("account was taken down by its PDS")
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
return nil, fmt.Errorf("account is temporarily deactivated")
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
return nil, fmt.Errorf("account is suspended by its PDS")
}

Expand Down Expand Up @@ -253,23 +255,24 @@ func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did strin
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

if u.Tombstoned {
if u.GetTombstoned() {
return nil, fmt.Errorf("account was deleted")
}

if u.TakenDown {
if u.GetTakenDown() {
return nil, fmt.Errorf("account was taken down by the Relay")
}

if u.UpstreamStatus == events.AccountStatusTakendown {
ustatus := u.GetUpstreamStatus()
if ustatus == events.AccountStatusTakendown {
return nil, fmt.Errorf("account was taken down by its PDS")
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
if ustatus == events.AccountStatusDeactivated {
return nil, fmt.Errorf("account is temporarily deactivated")
}

if u.UpstreamStatus == events.AccountStatusSuspended {
if ustatus == events.AccountStatusSuspended {
return nil, fmt.Errorf("account is suspended by its PDS")
}

Expand Down

0 comments on commit a422903

Please sign in to comment.