Skip to content

Commit

Permalink
non-archival relay (#827)
Browse files Browse the repository at this point in the history
real jank right now, but doesnt keep all the repo data, just does basic
event validation
  • Loading branch information
whyrusleeping authored Dec 16, 2024
2 parents 749cc07 + e495daf commit 86719b0
Show file tree
Hide file tree
Showing 11 changed files with 522 additions and 44 deletions.
3 changes: 3 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,7 @@ func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *a
}).Error; err != nil {
return err
}
u.SetTombstoned(true)

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
Expand Down Expand Up @@ -1416,6 +1417,8 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
if err := s.db.Create(&u).Error; err != nil {
return nil, fmt.Errorf("failed to create user after handle conflict: %w", err)
}

s.userCache.Remove(did)
} else {
return nil, fmt.Errorf("failed to create other pds user: %w", err)
}
Expand Down
54 changes: 49 additions & 5 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
}

type userView struct {
cs *FileCarStore
cs CarStore
user models.Uid

cache map[cid.Cid]blockformat.Block
Expand All @@ -111,13 +111,24 @@ func (uv *userView) HashOnRead(hor bool) {
}

func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) {
return uv.cs.meta.HasUidCid(ctx, uv.user, k)
_, have := uv.cache[k]
if have {
return have, nil
}

fcd, ok := uv.cs.(*FileCarStore)
if !ok {
return false, nil
}

return fcd.meta.HasUidCid(ctx, uv.user, k)
}

var CacheHits int64
var CacheMiss int64

func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) {

if !k.Defined() {
return nil, fmt.Errorf("attempted to 'get' undefined cid")
}
Expand All @@ -132,7 +143,12 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro
}
atomic.AddInt64(&CacheMiss, 1)

path, offset, user, err := uv.cs.meta.LookupBlockRef(ctx, k)
fcd, ok := uv.cs.(*FileCarStore)
if !ok {
return nil, ipld.ErrNotFound{Cid: k}
}

path, offset, user, err := fcd.meta.LookupBlockRef(ctx, k)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +288,7 @@ type DeltaSession struct {
baseCid cid.Cid
seq int
readonly bool
cs *FileCarStore
cs CarStore
lastRev string
}

Expand Down Expand Up @@ -587,7 +603,18 @@ func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev str
return nil, fmt.Errorf("cannot write to readonly deltaSession")
}

return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
switch ocs := ds.cs.(type) {
case *FileCarStore:
return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
case *NonArchivalCarstore:
slice, err := blocksToCar(ctx, root, rev, ds.blks)
if err != nil {
return nil, err
}
return slice, ocs.updateLastCommit(ctx, ds.user, rev, root)
default:
return nil, fmt.Errorf("unsupported carstore type")
}
}

func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
Expand All @@ -608,6 +635,23 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
return hnw, nil
}

func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) {
buf := new(bytes.Buffer)
_, err := WriteCarHeader(buf, root)
if err != nil {
return nil, fmt.Errorf("failed to write car header: %w", err)
}

for k, blk := range blks {
_, err := LdWrite(buf, k.Bytes(), blk.RawData())
if err != nil {
return nil, fmt.Errorf("failed to write block: %w", err)
}
}

return buf.Bytes(), nil
}

func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {

buf := new(bytes.Buffer)
Expand Down
254 changes: 254 additions & 0 deletions carstore/nonarchive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package carstore

import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"sync"

"github.com/bluesky-social/indigo/models"
blockformat "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
car "github.com/ipld/go-car"
"go.opentelemetry.io/otel"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type NonArchivalCarstore struct {
db *gorm.DB

lk sync.Mutex
lastCommitCache map[models.Uid]*commitRefInfo

log *slog.Logger
}

func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) {
if err := db.AutoMigrate(&commitRefInfo{}); err != nil {
return nil, err
}

return &NonArchivalCarstore{
db: db,
lastCommitCache: make(map[models.Uid]*commitRefInfo),
log: slog.Default().With("system", "carstorena"),
}, nil
}

type commitRefInfo struct {
ID uint `gorm:"primarykey"`
Uid models.Uid `gorm:"uniqueIndex"`
Rev string
Root models.DbCID
}

func (cs *NonArchivalCarstore) checkLastShardCache(user models.Uid) *commitRefInfo {
cs.lk.Lock()
defer cs.lk.Unlock()

ls, ok := cs.lastCommitCache[user]
if ok {
return ls
}

return nil
}

func (cs *NonArchivalCarstore) removeLastShardCache(user models.Uid) {
cs.lk.Lock()
defer cs.lk.Unlock()

delete(cs.lastCommitCache, user)
}

func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.lastCommitCache[ls.Uid] = ls
}

func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
var out commitRefInfo
if err := cs.db.Find(&out, "uid = ?", user).Error; err != nil {
return nil, err
}

return &out, nil
}

func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "getCommitRefInfo")
defer span.End()

maybeLs := cs.checkLastShardCache(user)
if maybeLs != nil {
return maybeLs, nil
}

lastShard, err := cs.loadCommitRefInfo(ctx, user)
if err != nil {
return nil, err
}

cs.putLastShardCache(lastShard)
return lastShard, nil
}

func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models.Uid, rev string, cid cid.Cid) error {
cri := &commitRefInfo{
Uid: uid,
Rev: rev,
Root: models.DbCID{CID: cid},
}

if err := cs.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "uid"}},
UpdateAll: true,
}).Create(cri).Error; err != nil {
return fmt.Errorf("update or set last commit info: %w", err)
}

cs.putLastShardCache(cri)

return nil
}

func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
defer span.End()

// TODO: ensure that we don't write updates on top of the wrong head
// this needs to be a compare and swap type operation
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return nil, err
}

if since != nil && *since != lastShard.Rev {
cs.log.Warn("revision mismatch: %s != %s: %s", *since, lastShard.Rev, ErrRepoBaseMismatch)
}

return &DeltaSession{
fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()),
blks: make(map[cid.Cid]blockformat.Block),
base: &userView{
user: user,
cs: cs,
prefetch: true,
cache: make(map[cid.Cid]blockformat.Block),
},
user: user,
baseCid: lastShard.Root.CID,
cs: cs,
seq: 0,
lastRev: lastShard.Rev,
}, nil
}

func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
return &DeltaSession{
base: &userView{
user: user,
cs: cs,
prefetch: false,
cache: make(map[cid.Cid]blockformat.Block),
},
readonly: true,
user: user,
cs: cs,
}, nil
}

// TODO: incremental is only ever called true, remove the param
func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
return fmt.Errorf("not supported in non-archival mode")
}

func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
defer span.End()

carr, err := car.NewCarReader(bytes.NewReader(carslice))
if err != nil {
return cid.Undef, nil, err
}

if len(carr.Header.Roots) != 1 {
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
}

ds, err := cs.NewDeltaSession(ctx, uid, since)
if err != nil {
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
}

var cids []cid.Cid
for {
blk, err := carr.Next()
if err != nil {
if err == io.EOF {
break
}
return cid.Undef, nil, err
}

cids = append(cids, blk.Cid())

if err := ds.Put(ctx, blk); err != nil {
return cid.Undef, nil, err
}
}

return carr.Header.Roots[0], ds, nil
}

func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return cid.Undef, err
}
if lastShard.ID == 0 {
return cid.Undef, nil
}

return lastShard.Root.CID, nil
}

func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return "", err
}
if lastShard.ID == 0 {
return "", nil
}

return lastShard.Rev, nil
}

func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
return nil, nil
}

func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error {
if err := cs.db.Raw("DELETE from commit_ref_infos WHERE uid = ?", user).Error; err != nil {
return err
}

cs.removeLastShardCache(user)
return nil
}

func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
return nil, fmt.Errorf("compaction not supported on non-archival")
}

func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
return nil, fmt.Errorf("compaction not supported in non-archival")
}
Loading

0 comments on commit 86719b0

Please sign in to comment.