From 07dc3767c3c1ce33218adcdba9eab42e0ab7ae45 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 19 Nov 2024 13:36:48 -0800 Subject: [PATCH 01/12] hack: non-archival relay work --- bgs/bgs.go | 3 ++ carstore/bs.go | 33 ++++++++++++--- pds/server.go | 19 +++++---- repomgr/bench_test.go | 9 +++- repomgr/ingest_test.go | 9 +++- repomgr/repomgr.go | 93 +++++++++++++++++++++++++++++++++++++++++- testing/integ_test.go | 1 + testing/utils.go | 8 +++- 8 files changed, 157 insertions(+), 18 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index d7838311e..119e63f62 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1188,6 +1188,7 @@ func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *a }).Error; err != nil { return err } + u.SetTombstoned(true) if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ "handle": nil, @@ -1416,6 +1417,8 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor if err := s.db.Create(&u).Error; err != nil { return nil, fmt.Errorf("failed to create user after handle conflict: %w", err) } + + s.userCache.Remove(did) } else { return nil, fmt.Errorf("failed to create other pds user: %w", err) } diff --git a/carstore/bs.go b/carstore/bs.go index 6486a8fb0..3acfaccc0 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -97,7 +97,7 @@ func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) { } type userView struct { - cs *FileCarStore + cs CarStore user models.Uid cache map[cid.Cid]blockformat.Block @@ -111,13 +111,24 @@ func (uv *userView) HashOnRead(hor bool) { } func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) { - return uv.cs.meta.HasUidCid(ctx, uv.user, k) + _, have := uv.cache[k] + if have { + return have, nil + } + + fcd, ok := uv.cs.(*FileCarStore) + if !ok { + return false, nil + } + + return fcd.meta.HasUidCid(ctx, uv.user, k) } var CacheHits int64 var CacheMiss int64 func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) { + if !k.Defined() { return nil, fmt.Errorf("attempted to 'get' undefined cid") } @@ -132,7 +143,12 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro } atomic.AddInt64(&CacheMiss, 1) - path, offset, user, err := uv.cs.meta.LookupBlockRef(ctx, k) + fcd, ok := uv.cs.(*FileCarStore) + if !ok { + return nil, ipld.ErrNotFound{Cid: k} + } + + path, offset, user, err := fcd.meta.LookupBlockRef(ctx, k) if err != nil { return nil, err } @@ -272,7 +288,7 @@ type DeltaSession struct { baseCid cid.Cid seq int readonly bool - cs *FileCarStore + cs CarStore lastRev string } @@ -587,7 +603,14 @@ func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev str return nil, fmt.Errorf("cannot write to readonly deltaSession") } - return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) + switch ocs := ds.cs.(type) { + case *FileCarStore: + return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) + case *NonArchivalCarstore: + return nil, ocs.updateLastCommit(ctx, ds.user, rev, root) + default: + return nil, fmt.Errorf("unsupported carstore type") + } } func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { diff --git a/pds/server.go b/pds/server.go index da534518c..d8c87d817 100644 --- a/pds/server.go +++ b/pds/server.go @@ -15,7 +15,6 @@ import ( "github.com/bluesky-social/indigo/api/atproto" comatproto "github.com/bluesky-social/indigo/api/atproto" - bsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/carstore" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/indexer" @@ -202,14 +201,16 @@ func (s *Server) createExternalUser(ctx context.Context, did string) (*models.Ac handle = hurl.Host } - profile, err := bsky.ActorGetProfile(ctx, c, did) - if err != nil { - return nil, err - } + /* + profile, err := bsky.ActorGetProfile(ctx, c, did) + if err != nil { + return nil, err + } - if handle != profile.Handle { - return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) - } + if handle != profile.Handle { + return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) + } + */ // TODO: request this users info from their server to fill out our data... u := User{ @@ -227,7 +228,7 @@ func (s *Server) createExternalUser(ctx context.Context, did string) (*models.Ac subj := &models.ActorInfo{ Uid: u.ID, Handle: sql.NullString{String: handle, Valid: true}, - DisplayName: *profile.DisplayName, + DisplayName: "missing display name", Did: did, Type: "", PDS: peering.ID, diff --git a/repomgr/bench_test.go b/repomgr/bench_test.go index c01789422..7f403ff6c 100644 --- a/repomgr/bench_test.go +++ b/repomgr/bench_test.go @@ -54,12 +54,19 @@ func BenchmarkRepoMgrCreates(b *testing.B) { b.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, []string{cspath}) + /* + cs, err := carstore.NewCarStore(cardb, []string{cspath}) + if err != nil { + b.Fatal(err) + } + */ + cs, err := carstore.NewNonArchivalCarstore(cardb) if err != nil { b.Fatal(err) } repoman := NewRepoManager(cs, &util.FakeKeyManager{}) + repoman.noArchive = true ctx := context.TODO() if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:foo:bar", "catdog", "", ""); err != nil { diff --git a/repomgr/ingest_test.go b/repomgr/ingest_test.go index 03444b9f3..955705a84 100644 --- a/repomgr/ingest_test.go +++ b/repomgr/ingest_test.go @@ -80,7 +80,14 @@ func testCarstore(t *testing.T, dir string) carstore.CarStore { t.Fatal(err) } - cs, err := carstore.NewCarStore(cardb, []string{cspath}) + /* + cs, err := carstore.NewCarStore(cardb, []string{cspath}) + if err != nil { + t.Fatal(err) + } + */ + + cs, err := carstore.NewNonArchivalCarstore(cardb) if err != nil { t.Fatal(err) } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 1e09dbcd9..613949341 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -34,11 +34,17 @@ import ( func NewRepoManager(cs carstore.CarStore, kmgr KeyManager) *RepoManager { + var noArchive bool + if _, ok := cs.(*carstore.NonArchivalCarstore); ok { + noArchive = true + } + return &RepoManager{ cs: cs, userLocks: make(map[models.Uid]*userLock), kmgr: kmgr, log: slog.Default().With("system", "repomgr"), + noArchive: noArchive, } } @@ -62,7 +68,8 @@ type RepoManager struct { events func(context.Context, *RepoEvent) hydrateRecords bool - log *slog.Logger + log *slog.Logger + noArchive bool } type ActorInfo struct { @@ -530,6 +537,90 @@ func (rm *RepoManager) CheckRepoSig(ctx context.Context, r *repo.Repo, expdid st } func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { + if rm.noArchive { + return rm.handleExternalUserEventNoArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops) + } else { + return rm.handleExternalUserEventArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops) + } +} + +func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { + ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") + defer span.End() + + span.SetAttributes(attribute.Int64("uid", int64(uid))) + + log.Debugw("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) + + unlock := rm.lockUser(ctx, uid) + defer unlock() + + start := time.Now() + root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice) + if err != nil { + return fmt.Errorf("importing external carslice: %w", err) + } + + r, err := repo.OpenRepo(ctx, ds, root) + if err != nil { + return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err) + } + + if err := rm.CheckRepoSig(ctx, r, did); err != nil { + return fmt.Errorf("check repo sig: %w", err) + } + openAndSigCheckDuration.Observe(time.Since(start).Seconds()) + + evtops := make([]RepoOp, 0, len(ops)) + for _, op := range ops { + parts := strings.SplitN(op.Path, "/", 2) + if len(parts) != 2 { + return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey") + } + + switch EventKind(op.Action) { + case EvtKindCreateRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindCreateRecord, + Collection: parts[0], + Rkey: parts[1], + RecCid: (*cid.Cid)(op.Cid), + }) + case EvtKindUpdateRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindUpdateRecord, + Collection: parts[0], + Rkey: parts[1], + RecCid: (*cid.Cid)(op.Cid), + }) + case EvtKindDeleteRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindDeleteRecord, + Collection: parts[0], + Rkey: parts[1], + }) + default: + return fmt.Errorf("unrecognized external user event kind: %q", op.Action) + } + } + + if rm.events != nil { + rm.events(ctx, &RepoEvent{ + User: uid, + //OldRoot: prev, + NewRoot: root, + Rev: nrev, + Since: since, + Ops: evtops, + RepoSlice: carslice, + PDS: pdsid, + }) + } + + return nil +} + +func (rm *RepoManager) handleExternalUserEventArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") defer span.End() diff --git a/testing/integ_test.go b/testing/integ_test.go index df3c3fedb..549daeeb6 100644 --- a/testing/integ_test.go +++ b/testing/integ_test.go @@ -544,6 +544,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) { } e2 := evts.Next() + fmt.Println(e2.RepoCommit.Ops) assert.Equal(len(e2.RepoCommit.Ops), 0) assert.Equal(e2.RepoCommit.Repo, bob.DID()) } diff --git a/testing/utils.go b/testing/utils.go index 066f7bf51..ad64892d7 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -550,7 +550,13 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) { return nil, err } - cs, err := carstore.NewCarStore(cardb, []string{cspath}) + /* + cs, err := carstore.NewCarStore(cardb, []string{cspath}) + if err != nil { + return nil, err + } + */ + cs, err := carstore.NewNonArchivalCarstore(cardb) if err != nil { return nil, err } From 5c7f937323f77188ec6726bbb89440347a8a3092 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 19 Nov 2024 13:41:54 -0800 Subject: [PATCH 02/12] config flag --- cmd/bigsky/main.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index a2d1de1a6..33c798508 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -216,6 +216,11 @@ func run(args []string) error { Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", EnvVars: []string{"RELAY_NEXT_CRAWLER"}, }, + &cli.BoolFlag{ + Name: "non-archival", + EnvVars: []string{"RELAY_NON_ARCHIVAL"}, + Value: false, + }, } app.Action = runBigsky @@ -345,9 +350,22 @@ func runBigsky(cctx *cli.Context) error { } } - cstore, err := carstore.NewCarStore(csdb, csdirs) - if err != nil { - return err + var cstore carstore.CarStore + + if cctx.Bool("non-archival") { + cs, err := carstore.NewNonArchivalCarstore(csdb) + if err != nil { + return err + } + + cstore = cs + } else { + cs, err := carstore.NewCarStore(csdb, csdirs) + if err != nil { + return err + } + + cstore = cs } // DID RESOLUTION From cc763787f3a6ace6dfc7b2297206ceaa6cd9a69e Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 19 Nov 2024 14:33:05 -0800 Subject: [PATCH 03/12] add missing file --- carstore/nonarchive.go | 250 +++++++++++++++++++++++++++++++++++++++++ indexer/indexer.go | 1 + repomgr/repomgr.go | 55 ++++++++- testing/integ_test.go | 3 +- testing/utils.go | 4 +- 5 files changed, 307 insertions(+), 6 deletions(-) create mode 100644 carstore/nonarchive.go diff --git a/carstore/nonarchive.go b/carstore/nonarchive.go new file mode 100644 index 000000000..64b0a32f7 --- /dev/null +++ b/carstore/nonarchive.go @@ -0,0 +1,250 @@ +package carstore + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + + "github.com/bluesky-social/indigo/models" + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + car "github.com/ipld/go-car" + "go.opentelemetry.io/otel" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type NonArchivalCarstore struct { + db *gorm.DB + + lk sync.Mutex + lastCommitCache map[models.Uid]*commitRefInfo +} + +func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) { + if err := db.AutoMigrate(&commitRefInfo{}); err != nil { + return nil, err + } + + return &NonArchivalCarstore{ + db: db, + lastCommitCache: make(map[models.Uid]*commitRefInfo), + }, nil +} + +type commitRefInfo struct { + ID uint `gorm:"primarykey"` + Uid models.Uid `gorm:"uniqueIndex"` + Rev string + Root models.DbCID +} + +func (cs *NonArchivalCarstore) checkLastShardCache(user models.Uid) *commitRefInfo { + cs.lk.Lock() + defer cs.lk.Unlock() + + ls, ok := cs.lastCommitCache[user] + if ok { + return ls + } + + return nil +} + +func (cs *NonArchivalCarstore) removeLastShardCache(user models.Uid) { + cs.lk.Lock() + defer cs.lk.Unlock() + + delete(cs.lastCommitCache, user) +} + +func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) { + cs.lk.Lock() + defer cs.lk.Unlock() + + cs.lastCommitCache[ls.Uid] = ls +} + +func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) { + var out commitRefInfo + if err := cs.db.Find(&out, "uid = ?", user).Error; err != nil { + return nil, err + } + + return &out, nil +} + +func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "getCommitRefInfo") + defer span.End() + + maybeLs := cs.checkLastShardCache(user) + if maybeLs != nil { + return maybeLs, nil + } + + lastShard, err := cs.loadCommitRefInfo(ctx, user) + if err != nil { + return nil, err + } + + cs.putLastShardCache(lastShard) + return lastShard, nil +} + +func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models.Uid, rev string, cid cid.Cid) error { + cri := &commitRefInfo{ + Uid: uid, + Rev: rev, + Root: models.DbCID{cid}, + } + + if err := cs.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "uid"}}, + UpdateAll: true, + }).Create(cri).Error; err != nil { + return fmt.Errorf("update or set last commit info: %w", err) + } + + cs.putLastShardCache(cri) + + return nil +} + +func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") + defer span.End() + + // TODO: ensure that we don't write updates on top of the wrong head + // this needs to be a compare and swap type operation + lastShard, err := cs.getCommitRefInfo(ctx, user) + if err != nil { + return nil, err + } + + if since != nil && *since != lastShard.Rev { + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) + } + + return &DeltaSession{ + fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()), + blks: make(map[cid.Cid]blockformat.Block), + base: &userView{ + user: user, + cs: cs, + prefetch: true, + cache: make(map[cid.Cid]blockformat.Block), + }, + user: user, + baseCid: lastShard.Root.CID, + cs: cs, + seq: 0, + lastRev: lastShard.Rev, + }, nil +} + +func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { + return &DeltaSession{ + base: &userView{ + user: user, + cs: cs, + prefetch: false, + cache: make(map[cid.Cid]blockformat.Block), + }, + readonly: true, + user: user, + cs: cs, + }, nil +} + +// TODO: incremental is only ever called true, remove the param +func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { + return fmt.Errorf("not supported in non-archival mode") +} + +func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") + defer span.End() + + carr, err := car.NewCarReader(bytes.NewReader(carslice)) + if err != nil { + return cid.Undef, nil, err + } + + if len(carr.Header.Roots) != 1 { + return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) + } + + ds, err := cs.NewDeltaSession(ctx, uid, since) + if err != nil { + return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) + } + + var cids []cid.Cid + for { + blk, err := carr.Next() + if err != nil { + if err == io.EOF { + break + } + return cid.Undef, nil, err + } + + cids = append(cids, blk.Cid()) + + if err := ds.Put(ctx, blk); err != nil { + return cid.Undef, nil, err + } + } + + return carr.Header.Roots[0], ds, nil +} + +func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { + lastShard, err := cs.getCommitRefInfo(ctx, user) + if err != nil { + return cid.Undef, err + } + if lastShard.ID == 0 { + return cid.Undef, nil + } + + return lastShard.Root.CID, nil +} + +func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { + lastShard, err := cs.getCommitRefInfo(ctx, user) + if err != nil { + return "", err + } + if lastShard.ID == 0 { + return "", nil + } + + return lastShard.Rev, nil +} + +func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { + return nil, nil +} + +func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error { + if err := cs.db.Raw("DELETE from commit_ref_infos WHERE uid = ?", user).Error; err != nil { + return err + } + + cs.removeLastShardCache(user) + return nil +} + +func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { + return nil, fmt.Errorf("compaction not supported on non-archival") +} + +func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { + return nil, fmt.Errorf("compaction not supported in non-archival") +} diff --git a/indexer/indexer.go b/indexer/indexer.go index 2c3a2b53d..a92745e80 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -538,6 +538,7 @@ func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEven case *bsky.ActorProfile: ix.log.Debug("TODO: got actor profile record creation, need to do something with this") default: + log.Warnw("unrecognized record", "record", op.Record, "collection", op.Collection) return nil, fmt.Errorf("unrecognized record type (creation): %s", op.Collection) } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 613949341..d23c1ab84 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -578,21 +578,68 @@ func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pds return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey") } + /* + switch EventKind(op.Action) { + case EvtKindCreateRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindCreateRecord, + Collection: parts[0], + Rkey: parts[1], + RecCid: (*cid.Cid)(op.Cid), + }) + case EvtKindUpdateRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindUpdateRecord, + Collection: parts[0], + Rkey: parts[1], + RecCid: (*cid.Cid)(op.Cid), + }) + case EvtKindDeleteRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindDeleteRecord, + Collection: parts[0], + Rkey: parts[1], + }) + default: + return fmt.Errorf("unrecognized external user event kind: %q", op.Action) + } + */ switch EventKind(op.Action) { case EvtKindCreateRecord: - evtops = append(evtops, RepoOp{ + rop := RepoOp{ Kind: EvtKindCreateRecord, Collection: parts[0], Rkey: parts[1], RecCid: (*cid.Cid)(op.Cid), - }) + } + + if rm.hydrateRecords { + _, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return fmt.Errorf("reading changed record from car slice: %w", err) + } + rop.Record = rec + } + + evtops = append(evtops, rop) case EvtKindUpdateRecord: - evtops = append(evtops, RepoOp{ + rop := RepoOp{ Kind: EvtKindUpdateRecord, Collection: parts[0], Rkey: parts[1], RecCid: (*cid.Cid)(op.Cid), - }) + } + + if rm.hydrateRecords { + _, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return fmt.Errorf("reading changed record from car slice: %w", err) + } + + rop.Record = rec + } + + evtops = append(evtops, rop) case EvtKindDeleteRecord: evtops = append(evtops, RepoOp{ Kind: EvtKindDeleteRecord, diff --git a/testing/integ_test.go b/testing/integ_test.go index 549daeeb6..53003505c 100644 --- a/testing/integ_test.go +++ b/testing/integ_test.go @@ -536,6 +536,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) { e1 := evts.Next() assert.NotNil(e1.RepoCommit) assert.Equal(e1.RepoCommit.Repo, bob.DID()) + fmt.Println(e1.RepoCommit.Ops[0]) ctx := context.TODO() rm := p1.server.Repoman() @@ -544,7 +545,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) { } e2 := evts.Next() - fmt.Println(e2.RepoCommit.Ops) + //fmt.Println(e2.RepoCommit.Ops[0]) assert.Equal(len(e2.RepoCommit.Ops), 0) assert.Equal(e2.RepoCommit.Repo, bob.DID()) } diff --git a/testing/utils.go b/testing/utils.go index ad64892d7..b834e1b77 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -555,11 +555,13 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) { if err != nil { return nil, err } - */ + //*/ + //* cs, err := carstore.NewNonArchivalCarstore(cardb) if err != nil { return nil, err } + //*/ //kmgr := indexer.NewKeyManager(didr, nil) kmgr := &bsutil.FakeKeyManager{} From 2a077980cc0f79e5a5d45174f192b6fc4a041c7d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 19 Nov 2024 15:39:56 -0800 Subject: [PATCH 04/12] be okay missing revs --- carstore/nonarchive.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/carstore/nonarchive.go b/carstore/nonarchive.go index 64b0a32f7..ee5ee014a 100644 --- a/carstore/nonarchive.go +++ b/carstore/nonarchive.go @@ -127,7 +127,7 @@ func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models. } if since != nil && *since != lastShard.Rev { - return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) + log.Warn("revision mismatch: %s != %s: %s", *since, lastShard.Rev, ErrRepoBaseMismatch) } return &DeltaSession{ From f9f6e8c14630800561658d16b790e9632d7a7a9b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 21 Nov 2024 17:39:46 -0800 Subject: [PATCH 05/12] fixing up a few more tests --- repomgr/ingest_test.go | 23 ++++++++++++----------- testing/integ_test.go | 36 ++++++++++++++++++++++++++++-------- testing/utils.go | 24 +++++++++++++----------- 3 files changed, 53 insertions(+), 30 deletions(-) diff --git a/repomgr/ingest_test.go b/repomgr/ingest_test.go index 955705a84..602a06c38 100644 --- a/repomgr/ingest_test.go +++ b/repomgr/ingest_test.go @@ -69,7 +69,7 @@ func TestLoadNewRepo(t *testing.T) { } } -func testCarstore(t *testing.T, dir string) carstore.CarStore { +func testCarstore(t *testing.T, dir string, archive bool) carstore.CarStore { cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite"))) if err != nil { t.Fatal(err) @@ -80,19 +80,20 @@ func testCarstore(t *testing.T, dir string) carstore.CarStore { t.Fatal(err) } - /* + if archive { cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { t.Fatal(err) } - */ + return cs + } else { + cs, err := carstore.NewNonArchivalCarstore(cardb) + if err != nil { + t.Fatal(err) + } - cs, err := carstore.NewNonArchivalCarstore(cardb) - if err != nil { - t.Fatal(err) + return cs } - - return cs } func TestIngestWithGap(t *testing.T) { @@ -113,7 +114,7 @@ func TestIngestWithGap(t *testing.T) { Uid: 1, }) - cs := testCarstore(t, dir) + cs := testCarstore(t, dir, true) repoman := NewRepoManager(cs, &util.FakeKeyManager{}) @@ -121,7 +122,7 @@ func TestIngestWithGap(t *testing.T) { if err != nil { t.Fatal(err) } - cs2 := testCarstore(t, dir2) + cs2 := testCarstore(t, dir2, true) var since *string ctx := context.TODO() @@ -205,7 +206,7 @@ func TestDuplicateRecord(t *testing.T) { Uid: 1, }) - cs := testCarstore(t, dir) + cs := testCarstore(t, dir, true) repoman := NewRepoManager(cs, &util.FakeKeyManager{}) diff --git a/testing/integ_test.go b/testing/integ_test.go index 53003505c..5b56ce4ae 100644 --- a/testing/integ_test.go +++ b/testing/integ_test.go @@ -20,6 +20,16 @@ import ( ) func TestRelayBasic(t *testing.T) { + t.Helper() + testRelayBasic(t, true) +} + +func TestRelayBasicNonArchive(t *testing.T) { + t.Helper() + testRelayBasic(t, false) +} + +func testRelayBasic(t *testing.T, archive bool) { if testing.Short() { t.Skip("skipping Relay test in 'short' test mode") } @@ -28,7 +38,7 @@ func TestRelayBasic(t *testing.T) { p1 := MustSetupPDS(t, ".tpds", didr) p1.Run(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, archive) b1.Run(t) b1.tr.TrialHosts = []string{p1.RawHost()} @@ -111,6 +121,16 @@ func socialSim(t *testing.T, users []*TestUser, postiter, likeiter int) []*atpro } func TestRelayMultiPDS(t *testing.T) { + t.Helper() + testRelayMultiPDS(t, true) +} + +func TestRelayMultiPDSNonArchive(t *testing.T) { + t.Helper() + testRelayMultiPDS(t, false) +} + +func testRelayMultiPDS(t *testing.T, archive bool) { if testing.Short() { t.Skip("skipping Relay test in 'short' test mode") } @@ -125,7 +145,7 @@ func TestRelayMultiPDS(t *testing.T) { p2 := MustSetupPDS(t, ".pdsdos", didr) p2.Run(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, archive) b1.Run(t) b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()} @@ -193,7 +213,7 @@ func TestRelayMultiGap(t *testing.T) { p2 := MustSetupPDS(t, ".pdsdos", didr) p2.Run(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, true) b1.Run(t) b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()} @@ -251,7 +271,7 @@ func TestHandleChange(t *testing.T) { p1 := MustSetupPDS(t, ".pdsuno", didr) p1.Run(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, true) b1.Run(t) b1.tr.TrialHosts = []string{p1.RawHost()} @@ -288,7 +308,7 @@ func TestAccountEvent(t *testing.T) { p1 := MustSetupPDS(t, ".pdsuno", didr) p1.Run(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, true) b1.Run(t) b1.tr.TrialHosts = []string{p1.RawHost()} @@ -396,7 +416,7 @@ func TestRelayTakedown(t *testing.T) { p1 := MustSetupPDS(t, ".tpds", didr) p1.Run(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, true) b1.Run(t) b1.tr.TrialHosts = []string{p1.RawHost()} @@ -475,7 +495,7 @@ func TestDomainBans(t *testing.T) { } didr := TestPLC(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, true) b1.Run(t) b1.BanDomain(t, "foo.com") @@ -518,7 +538,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) { p1 := MustSetupPDS(t, ".tpds", didr) p1.Run(t) - b1 := MustSetupRelay(t, didr) + b1 := MustSetupRelay(t, didr, true) b1.Run(t) b1.tr.TrialHosts = []string{p1.RawHost()} diff --git a/testing/utils.go b/testing/utils.go index b834e1b77..d2847d026 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -518,10 +518,10 @@ func (t *TestRelay) Host() string { return t.listener.Addr().String() } -func MustSetupRelay(t *testing.T, didr plc.PLCClient) *TestRelay { +func MustSetupRelay(t *testing.T, didr plc.PLCClient, archive bool) *TestRelay { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - tbgs, err := SetupRelay(ctx, didr) + tbgs, err := SetupRelay(ctx, didr, archive) if err != nil { t.Fatal(err) } @@ -529,7 +529,7 @@ func MustSetupRelay(t *testing.T, didr plc.PLCClient) *TestRelay { return tbgs } -func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) { +func SetupRelay(ctx context.Context, didr plc.PLCClient, archive bool) (*TestRelay, error) { dir, err := os.MkdirTemp("", "integtest") if err != nil { return nil, err @@ -550,18 +550,20 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) { return nil, err } - /* - cs, err := carstore.NewCarStore(cardb, []string{cspath}) + var cs carstore.CarStore + if archive { + arccs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { return nil, err } - //*/ - //* - cs, err := carstore.NewNonArchivalCarstore(cardb) - if err != nil { - return nil, err + cs = arccs + } else { + nacs, err := carstore.NewNonArchivalCarstore(cardb) + if err != nil { + return nil, err + } + cs = nacs } - //*/ //kmgr := indexer.NewKeyManager(didr, nil) kmgr := &bsutil.FakeKeyManager{} From d30a1e0e37d05ae212d487369da5330c4e46dd34 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 21 Nov 2024 17:50:15 -0800 Subject: [PATCH 06/12] rm deadcode --- repomgr/repomgr.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index d23c1ab84..71d357021 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -578,32 +578,6 @@ func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pds return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey") } - /* - switch EventKind(op.Action) { - case EvtKindCreateRecord: - evtops = append(evtops, RepoOp{ - Kind: EvtKindCreateRecord, - Collection: parts[0], - Rkey: parts[1], - RecCid: (*cid.Cid)(op.Cid), - }) - case EvtKindUpdateRecord: - evtops = append(evtops, RepoOp{ - Kind: EvtKindUpdateRecord, - Collection: parts[0], - Rkey: parts[1], - RecCid: (*cid.Cid)(op.Cid), - }) - case EvtKindDeleteRecord: - evtops = append(evtops, RepoOp{ - Kind: EvtKindDeleteRecord, - Collection: parts[0], - Rkey: parts[1], - }) - default: - return fmt.Errorf("unrecognized external user event kind: %q", op.Action) - } - */ switch EventKind(op.Action) { case EvtKindCreateRecord: rop := RepoOp{ From aa676332f4b09bcef69a664949c03b04fd9a4b7b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 21 Nov 2024 22:07:50 -0800 Subject: [PATCH 07/12] run more tests on both archive/nonarchive --- testing/integ_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/testing/integ_test.go b/testing/integ_test.go index 5b56ce4ae..fd5533932 100644 --- a/testing/integ_test.go +++ b/testing/integ_test.go @@ -406,6 +406,14 @@ func TestAccountEvent(t *testing.T) { } func TestRelayTakedown(t *testing.T) { + testRelayTakedown(t, true) +} + +func TestRelayTakedownNonArchive(t *testing.T) { + testRelayTakedown(t, false) +} + +func testRelayTakedown(t *testing.T, archive bool) { if testing.Short() { t.Skip("skipping Relay test in 'short' test mode") } From 8f492f775299e3dfd7875b88dbde4690c596b403 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2024 19:13:22 -0800 Subject: [PATCH 08/12] return written blocks in closing deltasession --- carstore/bs.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/carstore/bs.go b/carstore/bs.go index 3acfaccc0..dbce4f098 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -607,7 +607,11 @@ func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev str case *FileCarStore: return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) case *NonArchivalCarstore: - return nil, ocs.updateLastCommit(ctx, ds.user, rev, root) + slice, err := blocksToCar(ctx, root, rev, ds.blks) + if err != nil { + return nil, err + } + return slice, ocs.updateLastCommit(ctx, ds.user, rev, root) default: return nil, fmt.Errorf("unsupported carstore type") } @@ -631,6 +635,23 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { return hnw, nil } +func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) { + buf := new(bytes.Buffer) + _, err := WriteCarHeader(buf, root) + if err != nil { + return nil, fmt.Errorf("failed to write car header: %w", err) + } + + for k, blk := range blks { + _, err := LdWrite(buf, k.Bytes(), blk.RawData()) + if err != nil { + return nil, fmt.Errorf("failed to write block: %w", err) + } + } + + return buf.Bytes(), nil +} + func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { buf := new(bytes.Buffer) From 23e962887b9230f60b16f62f9d5b45f636c98578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=94=90=E9=B3=B3?= Date: Thu, 12 Dec 2024 21:54:23 -0500 Subject: [PATCH 09/12] Use keyed field in DbCID initialization. fixes "struct literal uses unkeyed fields" in lint. --- carstore/nonarchive.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/carstore/nonarchive.go b/carstore/nonarchive.go index ee5ee014a..730ff7fe4 100644 --- a/carstore/nonarchive.go +++ b/carstore/nonarchive.go @@ -100,7 +100,7 @@ func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models. cri := &commitRefInfo{ Uid: uid, Rev: rev, - Root: models.DbCID{cid}, + Root: models.DbCID{CID: cid}, } if err := cs.db.Clauses(clause.OnConflict{ From 51af58f526040357c5e6d16d502e0d82799f24d0 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 16 Dec 2024 10:12:42 -0800 Subject: [PATCH 10/12] small review updates --- pds/server.go | 11 ----------- repomgr/bench_test.go | 1 + 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/pds/server.go b/pds/server.go index d8c87d817..c883012ed 100644 --- a/pds/server.go +++ b/pds/server.go @@ -201,17 +201,6 @@ func (s *Server) createExternalUser(ctx context.Context, did string) (*models.Ac handle = hurl.Host } - /* - profile, err := bsky.ActorGetProfile(ctx, c, did) - if err != nil { - return nil, err - } - - if handle != profile.Handle { - return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) - } - */ - // TODO: request this users info from their server to fill out our data... u := User{ Handle: handle, diff --git a/repomgr/bench_test.go b/repomgr/bench_test.go index 7f403ff6c..1137474f0 100644 --- a/repomgr/bench_test.go +++ b/repomgr/bench_test.go @@ -54,6 +54,7 @@ func BenchmarkRepoMgrCreates(b *testing.B) { b.Fatal(err) } + // TODO: constructor for 'either type' /* cs, err := carstore.NewCarStore(cardb, []string{cspath}) if err != nil { From 62578eb480ea5e13901f44ac4d03d50765110605 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 16 Dec 2024 10:19:30 -0800 Subject: [PATCH 11/12] fixup logging now --- carstore/nonarchive.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/carstore/nonarchive.go b/carstore/nonarchive.go index 730ff7fe4..5197b1c6e 100644 --- a/carstore/nonarchive.go +++ b/carstore/nonarchive.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "log/slog" "sync" "github.com/bluesky-social/indigo/models" @@ -23,6 +24,8 @@ type NonArchivalCarstore struct { lk sync.Mutex lastCommitCache map[models.Uid]*commitRefInfo + + log *slog.Logger } func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) { @@ -33,6 +36,7 @@ func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) { return &NonArchivalCarstore{ db: db, lastCommitCache: make(map[models.Uid]*commitRefInfo), + log: slog.Default().With("system", "carstorena"), }, nil } @@ -127,7 +131,7 @@ func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models. } if since != nil && *since != lastShard.Rev { - log.Warn("revision mismatch: %s != %s: %s", *since, lastShard.Rev, ErrRepoBaseMismatch) + cs.log.Warn("revision mismatch: %s != %s: %s", *since, lastShard.Rev, ErrRepoBaseMismatch) } return &DeltaSession{ From e495daf5cd002cd143db8382b19df3f2923cce6b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 16 Dec 2024 10:26:52 -0800 Subject: [PATCH 12/12] more logging fixup --- indexer/indexer.go | 2 +- repomgr/repomgr.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index a92745e80..e6a324e9e 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -538,7 +538,7 @@ func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEven case *bsky.ActorProfile: ix.log.Debug("TODO: got actor profile record creation, need to do something with this") default: - log.Warnw("unrecognized record", "record", op.Record, "collection", op.Collection) + ix.log.Warn("unrecognized record", "record", op.Record, "collection", op.Collection) return nil, fmt.Errorf("unrecognized record type (creation): %s", op.Collection) } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 71d357021..d2c3766f3 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -550,7 +550,7 @@ func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pds span.SetAttributes(attribute.Int64("uid", int64(uid))) - log.Debugw("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) + rm.log.Debug("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) unlock := rm.lockUser(ctx, uid) defer unlock()