diff --git a/bgs/admin.go b/bgs/admin.go index 44b07cd8f..e6a18525d 100644 --- a/bgs/admin.go +++ b/bgs/admin.go @@ -353,13 +353,35 @@ func (bgs *BGS) handleAdminUnbanDomain(c echo.Context) error { }) } +type PDSRates struct { + PerSecond int64 `json:"per_second,omitempty"` + PerHour int64 `json:"per_hour,omitempty"` + PerDay int64 `json:"per_day,omitempty"` + CrawlRate int64 `json:"crawl_rate,omitempty"` + RepoLimit int64 `json:"repo_limit,omitempty"` +} + +func (pr *PDSRates) FromSlurper(s *Slurper) { + if pr.PerSecond == 0 { + pr.PerHour = s.DefaultPerSecondLimit + } + if pr.PerHour == 0 { + pr.PerHour = s.DefaultPerHourLimit + } + if pr.PerDay == 0 { + pr.PerDay = s.DefaultPerDayLimit + } + if pr.CrawlRate == 0 { + pr.CrawlRate = int64(s.DefaultCrawlLimit) + } + if pr.RepoLimit == 0 { + pr.RepoLimit = s.DefaultRepoLimit + } +} + type RateLimitChangeRequest struct { - Host string `json:"host"` - PerSecond int64 `json:"per_second"` - PerHour int64 `json:"per_hour"` - PerDay int64 `json:"per_day"` - CrawlRate int64 `json:"crawl_rate"` - RepoLimit int64 `json:"repo_limit"` + Host string `json:"host"` + PDSRates } func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error { @@ -592,6 +614,9 @@ func (bgs *BGS) handleAdminAddTrustedDomain(e echo.Context) error { type AdminRequestCrawlRequest struct { Hostname string `json:"hostname"` + + // optional: + PDSRates } func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { @@ -644,6 +669,8 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { } // Skip checking if the server is online for now + rateOverrides := body.PDSRates + rateOverrides.FromSlurper(bgs.slurper) - return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check + return bgs.slurper.SubscribeToPds(ctx, host, true, true, &rateOverrides) // Override Trusted Domain Check } diff --git a/bgs/fedmgr.go b/bgs/fedmgr.go index c68759d91..c711ccdd6 100644 --- a/bgs/fedmgr.go +++ b/bgs/fedmgr.go @@ -363,7 +363,7 @@ func (s *Slurper) canSlurpHost(host string) bool { return !s.newSubsDisabled } -func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool) error { +func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool, rateOverrides *PDSRates) error { // TODO: for performance, lock on the hostname instead of global s.lk.Lock() defer s.lk.Unlock() @@ -397,6 +397,13 @@ func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adm CrawlRateLimit: float64(s.DefaultCrawlLimit), RepoLimit: s.DefaultRepoLimit, } + if rateOverrides != nil { + npds.RateLimit = float64(rateOverrides.PerSecond) + npds.HourlyEventLimit = rateOverrides.PerHour + npds.DailyEventLimit = rateOverrides.PerDay + npds.CrawlRateLimit = float64(rateOverrides.CrawlRate) + npds.RepoLimit = rateOverrides.RepoLimit + } if err := s.db.Create(&npds).Error; err != nil { return err } diff --git a/bgs/handlers.go b/bgs/handlers.go index 0e46f0043..ef5284bd7 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -212,7 +212,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp } } - return s.slurper.SubscribeToPds(ctx, host, true, false) + return s.slurper.SubscribeToPds(ctx, host, true, false, nil) } func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error { diff --git a/carstore/README.md b/carstore/README.md new file mode 100644 index 000000000..90880defb --- /dev/null +++ b/carstore/README.md @@ -0,0 +1,41 @@ +# Carstore + +Store a zillion users of PDS-like repo, with more limited operations (mainly: firehose in, firehose out). + +## [ScyllaStore](scylla.go) + +Blocks stored in ScyllaDB. +User and PDS metadata stored in gorm (PostgreSQL or sqlite3). + +## [FileCarStore](bs.go) + +Store 'car slices' from PDS source subscribeRepo firehose streams to filesystem. +Store metadata to gorm postgresql (or sqlite3). +Periodic compaction of car slices into fewer larger car slices. +User and PDS metadata stored in gorm (PostgreSQL or sqlite3). +FileCarStore was the first production carstore and used through at least 2024-11. + +## [SQLiteStore](sqlite_store.go) + +Experimental/demo. +Blocks stored in trivial local sqlite3 schema. +Minimal reference implementation from which fancy scalable/performant implementations may be derived. + +```sql +CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid)) +CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC) + +INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block + +SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1 + +SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC + +DELETE FROM blocks WHERE uid = ? + +SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1 + +SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1 + +SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1 +``` diff --git a/carstore/bs.go b/carstore/bs.go index dbce4f098..aacbc1c80 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "sort" - "sync" "sync/atomic" "time" @@ -20,7 +19,6 @@ import ( blockformat "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" @@ -47,8 +45,11 @@ const MaxSliceLength = 2 << 20 const BigShardThreshold = 2 << 20 type CarStore interface { + // TODO: not really part of general interface CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) + // TODO: not really part of general interface GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) + GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) @@ -63,8 +64,7 @@ type FileCarStore struct { meta *CarStoreGormMeta rootDirs []string - lscLk sync.Mutex - lastShardCache map[models.Uid]*CarShard + lastShardCache lastShardCache log *slog.Logger } @@ -88,16 +88,29 @@ func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) { return nil, err } - return &FileCarStore{ - meta: &CarStoreGormMeta{meta: meta}, - rootDirs: roots, - lastShardCache: make(map[models.Uid]*CarShard), - log: slog.Default().With("system", "carstore"), - }, nil + gormMeta := &CarStoreGormMeta{meta: meta} + out := &FileCarStore{ + meta: gormMeta, + rootDirs: roots, + lastShardCache: lastShardCache{ + source: gormMeta, + }, + log: slog.Default().With("system", "carstore"), + } + out.lastShardCache.Init() + return out, nil } +// userView needs these things to get into the underlying block store +// implemented by CarStoreGormMeta +type userViewSource interface { + HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) + LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) +} + +// wrapper into a block store that keeps track of which user we are working on behalf of type userView struct { - cs CarStore + cs userViewSource user models.Uid cache map[cid.Cid]blockformat.Block @@ -115,13 +128,7 @@ func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) { if have { return have, nil } - - fcd, ok := uv.cs.(*FileCarStore) - if !ok { - return false, nil - } - - return fcd.meta.HasUidCid(ctx, uv.user, k) + return uv.cs.HasUidCid(ctx, uv.user, k) } var CacheHits int64 @@ -143,12 +150,7 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro } atomic.AddInt64(&CacheMiss, 1) - fcd, ok := uv.cs.(*FileCarStore) - if !ok { - return nil, ipld.ErrNotFound{Cid: k} - } - - path, offset, user, err := fcd.meta.LookupBlockRef(ctx, k) + path, offset, user, err := uv.cs.LookupBlockRef(ctx, k) if err != nil { return nil, err } @@ -279,61 +281,39 @@ func (uv *userView) GetSize(ctx context.Context, k cid.Cid) (int, error) { return len(blk.RawData()), nil } +// subset of blockstore.Blockstore that we actually use here +type minBlockstore interface { + Get(ctx context.Context, bcid cid.Cid) (blockformat.Block, error) + Has(ctx context.Context, bcid cid.Cid) (bool, error) + GetSize(ctx context.Context, bcid cid.Cid) (int, error) +} + type DeltaSession struct { - fresh blockstore.Blockstore blks map[cid.Cid]blockformat.Block rmcids map[cid.Cid]bool - base blockstore.Blockstore + base minBlockstore user models.Uid baseCid cid.Cid seq int readonly bool - cs CarStore + cs shardWriter lastRev string } func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard { - cs.lscLk.Lock() - defer cs.lscLk.Unlock() - - ls, ok := cs.lastShardCache[user] - if ok { - return ls - } - - return nil + return cs.lastShardCache.check(user) } func (cs *FileCarStore) removeLastShardCache(user models.Uid) { - cs.lscLk.Lock() - defer cs.lscLk.Unlock() - - delete(cs.lastShardCache, user) + cs.lastShardCache.remove(user) } func (cs *FileCarStore) putLastShardCache(ls *CarShard) { - cs.lscLk.Lock() - defer cs.lscLk.Unlock() - - cs.lastShardCache[ls.Usr] = ls + cs.lastShardCache.put(ls) } func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { - ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") - defer span.End() - - maybeLs := cs.checkLastShardCache(user) - if maybeLs != nil { - return maybeLs, nil - } - - lastShard, err := cs.meta.GetLastShard(ctx, user) - if err != nil { - return nil, err - } - - cs.putLastShardCache(lastShard) - return lastShard, nil + return cs.lastShardCache.get(ctx, user) } var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head") @@ -354,11 +334,10 @@ func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, si } return &DeltaSession{ - fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()), - blks: make(map[cid.Cid]blockformat.Block), + blks: make(map[cid.Cid]blockformat.Block), base: &userView{ user: user, - cs: cs, + cs: cs.meta, prefetch: true, cache: make(map[cid.Cid]blockformat.Block), }, @@ -374,7 +353,7 @@ func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) return &DeltaSession{ base: &userView{ user: user, - cs: cs, + cs: cs.meta, prefetch: false, cache: make(map[cid.Cid]blockformat.Block), }, @@ -385,7 +364,7 @@ func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) } // TODO: incremental is only ever called true, remove the param -func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { +func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") defer span.End() @@ -398,7 +377,6 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR } } - // TODO: Why does ReadUserCar want shards seq DESC but CompactUserShards wants seq ASC ? shards, err := cs.meta.GetUserShardsDesc(ctx, user, earlySeq) if err != nil { return err @@ -418,12 +396,12 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR if err := car.WriteHeader(&car.CarHeader{ Roots: []cid.Cid{shards[0].Root.CID}, Version: 1, - }, w); err != nil { + }, shardOut); err != nil { return err } for _, sh := range shards { - if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { + if err := cs.writeShardBlocks(ctx, &sh, shardOut); err != nil { return err } } @@ -433,7 +411,7 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR // inner loop part of ReadUserCar // copy shard blocks from disk to Writer -func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error { +func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error { ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks") defer span.End() @@ -448,7 +426,7 @@ func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io return err } - _, err = io.Copy(w, fi) + _, err = io.Copy(shardOut, fi) if err != nil { return err } @@ -603,18 +581,7 @@ func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev str return nil, fmt.Errorf("cannot write to readonly deltaSession") } - switch ocs := ds.cs.(type) { - case *FileCarStore: - return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) - case *NonArchivalCarstore: - slice, err := blocksToCar(ctx, root, rev, ds.blks) - if err != nil { - return nil, err - } - return slice, ocs.updateLastCommit(ctx, ds.user, rev, root) - default: - return nil, fmt.Errorf("unsupported carstore type") - } + return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) } func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { @@ -635,6 +602,12 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { return hnw, nil } +// shardWriter.writeNewShard called from inside DeltaSession.CloseWithRoot +type shardWriter interface { + // writeNewShard stores blocks in `blks` arg and creates a new shard to propagate out to our firehose + writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) +} + func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) { buf := new(bytes.Buffer) _, err := WriteCarHeader(buf, root) diff --git a/carstore/last_shard_cache.go b/carstore/last_shard_cache.go new file mode 100644 index 000000000..8371b8883 --- /dev/null +++ b/carstore/last_shard_cache.go @@ -0,0 +1,70 @@ +package carstore + +import ( + "context" + "github.com/bluesky-social/indigo/models" + "go.opentelemetry.io/otel" + "sync" +) + +type LastShardSource interface { + GetLastShard(context.Context, models.Uid) (*CarShard, error) +} + +type lastShardCache struct { + source LastShardSource + + lscLk sync.Mutex + lastShardCache map[models.Uid]*CarShard +} + +func (lsc *lastShardCache) Init() { + lsc.lastShardCache = make(map[models.Uid]*CarShard) +} + +func (lsc *lastShardCache) check(user models.Uid) *CarShard { + lsc.lscLk.Lock() + defer lsc.lscLk.Unlock() + + ls, ok := lsc.lastShardCache[user] + if ok { + return ls + } + + return nil +} + +func (lsc *lastShardCache) remove(user models.Uid) { + lsc.lscLk.Lock() + defer lsc.lscLk.Unlock() + + delete(lsc.lastShardCache, user) +} + +func (lsc *lastShardCache) put(ls *CarShard) { + if ls == nil { + return + } + lsc.lscLk.Lock() + defer lsc.lscLk.Unlock() + + lsc.lastShardCache[ls.Usr] = ls +} + +func (lsc *lastShardCache) get(ctx context.Context, user models.Uid) (*CarShard, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") + defer span.End() + + maybeLs := lsc.check(user) + if maybeLs != nil { + return maybeLs, nil + } + + lastShard, err := lsc.source.GetLastShard(ctx, user) + if err != nil { + return nil, err + } + + lsc.put(lastShard) + return lastShard, nil +} diff --git a/carstore/nonarchive.go b/carstore/nonarchive.go index 5197b1c6e..56b325848 100644 --- a/carstore/nonarchive.go +++ b/carstore/nonarchive.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + ipld "github.com/ipfs/go-ipld-format" "io" "log/slog" "sync" @@ -11,8 +12,6 @@ import ( "github.com/bluesky-social/indigo/models" blockformat "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - blockstore "github.com/ipfs/go-ipfs-blockstore" car "github.com/ipld/go-car" "go.opentelemetry.io/otel" "gorm.io/gorm" @@ -135,8 +134,7 @@ func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models. } return &DeltaSession{ - fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()), - blks: make(map[cid.Cid]blockformat.Block), + blks: make(map[cid.Cid]blockformat.Block), base: &userView{ user: user, cs: cs, @@ -252,3 +250,19 @@ func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCo func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { return nil, fmt.Errorf("compaction not supported in non-archival") } + +func (cs *NonArchivalCarstore) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) { + return false, nil +} + +func (cs *NonArchivalCarstore) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) { + return "", 0, 0, ipld.ErrNotFound{Cid: k} +} + +func (cs *NonArchivalCarstore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { + slice, err := blocksToCar(ctx, root, rev, blks) + if err != nil { + return nil, err + } + return slice, cs.updateLastCommit(ctx, user, rev, root) +} diff --git a/carstore/repo_test.go b/carstore/repo_test.go index 8366cab95..c0c1c7414 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "log/slog" "os" "path/filepath" "testing" @@ -24,7 +25,7 @@ import ( "gorm.io/gorm" ) -func testCarStore() (CarStore, func(), error) { +func testCarStore(t testing.TB) (CarStore, func(), error) { tempdir, err := os.MkdirTemp("", "msttest-") if err != nil { return nil, nil, err @@ -60,6 +61,23 @@ func testCarStore() (CarStore, func(), error) { }, nil } +func testSqliteCarStore(t testing.TB) (CarStore, func(), error) { + sqs := &SQLiteStore{} + sqs.log = slogForTest(t) + err := sqs.Open(":memory:") + if err != nil { + return nil, nil, err + } + return sqs, func() {}, nil +} + +type testFactory func(t testing.TB) (CarStore, func(), error) + +var backends = map[string]testFactory{ + "cartore": testCarStore, + "sqlite": testSqliteCarStore, +} + func testFlatfsBs() (blockstore.Blockstore, func(), error) { tempdir, err := os.MkdirTemp("", "msttest-") if err != nil { @@ -78,91 +96,96 @@ func testFlatfsBs() (blockstore.Blockstore, func(), error) { }, nil } -func TestBasicOperation(t *testing.T) { +func TestBasicOperation(ot *testing.T) { ctx := context.TODO() - cs, cleanup, err := testCarStore() - if err != nil { - t.Fatal(err) - } - defer cleanup() + for fname, tf := range backends { + ot.Run(fname, func(t *testing.T) { - ds, err := cs.NewDeltaSession(ctx, 1, nil) - if err != nil { - t.Fatal(err) - } + cs, cleanup, err := tf(t) + if err != nil { + t.Fatal(err) + } + defer cleanup() - ncid, rev, err := setupRepo(ctx, ds, false) - if err != nil { - t.Fatal(err) - } + ds, err := cs.NewDeltaSession(ctx, 1, nil) + if err != nil { + t.Fatal(err) + } - if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { - t.Fatal(err) - } + ncid, rev, err := setupRepo(ctx, ds, false) + if err != nil { + t.Fatal(err) + } - var recs []cid.Cid - head := ncid - for i := 0; i < 10; i++ { - ds, err := cs.NewDeltaSession(ctx, 1, &rev) - if err != nil { - t.Fatal(err) - } + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { + t.Fatal(err) + } - rr, err := repo.OpenRepo(ctx, ds, head) - if err != nil { - t.Fatal(err) - } + var recs []cid.Cid + head := ncid + for i := 0; i < 10; i++ { + ds, err := cs.NewDeltaSession(ctx, 1, &rev) + if err != nil { + t.Fatal(err) + } - rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ - Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), - }) - if err != nil { - t.Fatal(err) - } + rr, err := repo.OpenRepo(ctx, ds, head) + if err != nil { + t.Fatal(err) + } - recs = append(recs, rc) + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), + }) + if err != nil { + t.Fatal(err) + } - kmgr := &util.FakeKeyManager{} - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) - if err != nil { - t.Fatal(err) - } + recs = append(recs, rc) - rev = nrev + kmgr := &util.FakeKeyManager{} + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) + if err != nil { + t.Fatal(err) + } - if err := ds.CalcDiff(ctx, nil); err != nil { - t.Fatal(err) - } + rev = nrev - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { - t.Fatal(err) - } + if err := ds.CalcDiff(ctx, nil); err != nil { + t.Fatal(err) + } - head = nroot - } + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { + t.Fatal(err) + } - buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { - t.Fatal(err) - } - checkRepo(t, cs, buf, recs) + head = nroot + } - if _, err := cs.CompactUserShards(ctx, 1, false); err != nil { - t.Fatal(err) - } + buf := new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, cs, buf, recs) - buf = new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { - t.Fatal(err) + if _, err := cs.CompactUserShards(ctx, 1, false); err != nil { + t.Fatal(err) + } + + buf = new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, cs, buf, recs) + }) } - checkRepo(t, cs, buf, recs) } func TestRepeatedCompactions(t *testing.T) { ctx := context.TODO() - cs, cleanup, err := testCarStore() + cs, cleanup, err := testCarStore(t) if err != nil { t.Fatal(err) } @@ -323,7 +346,16 @@ func setupRepo(ctx context.Context, bs blockstore.Blockstore, mkprofile bool) (c func BenchmarkRepoWritesCarstore(b *testing.B) { ctx := context.TODO() - cs, cleanup, err := testCarStore() + cs, cleanup, err := testCarStore(b) + innerBenchmarkRepoWritesCarstore(b, ctx, cs, cleanup, err) +} +func BenchmarkRepoWritesSqliteCarstore(b *testing.B) { + ctx := context.TODO() + + cs, cleanup, err := testSqliteCarStore(b) + innerBenchmarkRepoWritesCarstore(b, ctx, cs, cleanup, err) +} +func innerBenchmarkRepoWritesCarstore(b *testing.B, ctx context.Context, cs CarStore, cleanup func(), err error) { if err != nil { b.Fatal(err) } @@ -458,131 +490,152 @@ func BenchmarkRepoWritesSqlite(b *testing.B) { } } -func TestDuplicateBlockAcrossShards(t *testing.T) { +func TestDuplicateBlockAcrossShards(ot *testing.T) { ctx := context.TODO() - cs, cleanup, err := testCarStore() - if err != nil { - t.Fatal(err) - } - defer cleanup() + for fname, tf := range backends { + ot.Run(fname, func(t *testing.T) { - ds1, err := cs.NewDeltaSession(ctx, 1, nil) - if err != nil { - t.Fatal(err) - } + cs, cleanup, err := tf(t) + if err != nil { + t.Fatal(err) + } + defer cleanup() - ds2, err := cs.NewDeltaSession(ctx, 2, nil) - if err != nil { - t.Fatal(err) - } + ds1, err := cs.NewDeltaSession(ctx, 1, nil) + if err != nil { + t.Fatal(err) + } - ds3, err := cs.NewDeltaSession(ctx, 3, nil) - if err != nil { - t.Fatal(err) - } + ds2, err := cs.NewDeltaSession(ctx, 2, nil) + if err != nil { + t.Fatal(err) + } - var cids []cid.Cid - var revs []string - for _, ds := range []*DeltaSession{ds1, ds2, ds3} { - ncid, rev, err := setupRepo(ctx, ds, true) - if err != nil { - t.Fatal(err) - } + ds3, err := cs.NewDeltaSession(ctx, 3, nil) + if err != nil { + t.Fatal(err) + } - if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { - t.Fatal(err) - } - cids = append(cids, ncid) - revs = append(revs, rev) - } + var cids []cid.Cid + var revs []string + for _, ds := range []*DeltaSession{ds1, ds2, ds3} { + ncid, rev, err := setupRepo(ctx, ds, true) + if err != nil { + t.Fatal(err) + } - var recs []cid.Cid - head := cids[1] - rev := revs[1] - for i := 0; i < 10; i++ { - ds, err := cs.NewDeltaSession(ctx, 2, &rev) - if err != nil { - t.Fatal(err) - } + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { + t.Fatal(err) + } + cids = append(cids, ncid) + revs = append(revs, rev) + } - rr, err := repo.OpenRepo(ctx, ds, head) - if err != nil { - t.Fatal(err) - } + var recs []cid.Cid + head := cids[1] + rev := revs[1] + for i := 0; i < 10; i++ { + ds, err := cs.NewDeltaSession(ctx, 2, &rev) + if err != nil { + t.Fatal(err) + } - rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ - Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), - }) - if err != nil { - t.Fatal(err) - } + rr, err := repo.OpenRepo(ctx, ds, head) + if err != nil { + t.Fatal(err) + } - recs = append(recs, rc) + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), + }) + if err != nil { + t.Fatal(err) + } - kmgr := &util.FakeKeyManager{} - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) - if err != nil { - t.Fatal(err) - } + recs = append(recs, rc) - rev = nrev + kmgr := &util.FakeKeyManager{} + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) + if err != nil { + t.Fatal(err) + } - if err := ds.CalcDiff(ctx, nil); err != nil { - t.Fatal(err) - } + rev = nrev - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { - t.Fatal(err) - } + if err := ds.CalcDiff(ctx, nil); err != nil { + t.Fatal(err) + } - head = nroot - } + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { + t.Fatal(err) + } - // explicitly update the profile object - { - ds, err := cs.NewDeltaSession(ctx, 2, &rev) - if err != nil { - t.Fatal(err) - } + head = nroot + } - rr, err := repo.OpenRepo(ctx, ds, head) - if err != nil { - t.Fatal(err) - } + // explicitly update the profile object + { + ds, err := cs.NewDeltaSession(ctx, 2, &rev) + if err != nil { + t.Fatal(err) + } - desc := "this is so unique" - rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{ - Description: &desc, - }) - if err != nil { - t.Fatal(err) - } + rr, err := repo.OpenRepo(ctx, ds, head) + if err != nil { + t.Fatal(err) + } - recs = append(recs, rc) + desc := "this is so unique" + rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{ + Description: &desc, + }) + if err != nil { + t.Fatal(err) + } - kmgr := &util.FakeKeyManager{} - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) - if err != nil { - t.Fatal(err) - } + recs = append(recs, rc) - rev = nrev + kmgr := &util.FakeKeyManager{} + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) + if err != nil { + t.Fatal(err) + } - if err := ds.CalcDiff(ctx, nil); err != nil { - t.Fatal(err) - } + rev = nrev - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { - t.Fatal(err) - } + if err := ds.CalcDiff(ctx, nil); err != nil { + t.Fatal(err) + } - head = nroot + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { + t.Fatal(err) + } + + head = nroot + } + + buf := new(bytes.Buffer) + if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil { + t.Fatal(err) + } + checkRepo(t, cs, buf, recs) + }) } +} - buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil { - t.Fatal(err) +type testWriter struct { + t testing.TB +} + +func (tw testWriter) Write(p []byte) (n int, err error) { + tw.t.Log(string(p)) + return len(p), nil +} + +func slogForTest(t testing.TB) *slog.Logger { + hopts := slog.HandlerOptions{ + Level: slog.LevelDebug, } - checkRepo(t, cs, buf, recs) + return slog.New(slog.NewTextHandler(&testWriter{t}, &hopts)) } diff --git a/carstore/scylla.go b/carstore/scylla.go new file mode 100644 index 000000000..8f872c02a --- /dev/null +++ b/carstore/scylla.go @@ -0,0 +1,636 @@ +package carstore + +import ( + "bytes" + "context" + "errors" + "fmt" + "github.com/bluesky-social/indigo/models" + "github.com/gocql/gocql" + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipld/go-car" + _ "github.com/mattn/go-sqlite3" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "io" + "log/slog" + "math" + "math/rand/v2" + "time" +) + +type ScyllaStore struct { + WriteSession *gocql.Session + ReadSession *gocql.Session + + // scylla servers + scyllaAddrs []string + // scylla namespace where we find our table + keyspace string + + log *slog.Logger + + lastShardCache lastShardCache +} + +func NewScyllaStore(addrs []string, keyspace string) (*ScyllaStore, error) { + out := new(ScyllaStore) + out.scyllaAddrs = addrs + out.keyspace = keyspace + err := out.Open() + if err != nil { + return nil, err + } + return out, nil +} + +func (sqs *ScyllaStore) Open() error { + if sqs.log == nil { + sqs.log = slog.Default() + } + sqs.log.Debug("scylla connect", "addrs", sqs.scyllaAddrs) + var err error + + // + // Write session + // + var writeSession *gocql.Session + for retry := 0; ; retry++ { + writeCluster := gocql.NewCluster(sqs.scyllaAddrs...) + writeCluster.Keyspace = sqs.keyspace + // Default port, the client should automatically upgrade to shard-aware port + writeCluster.Port = 9042 + writeCluster.Consistency = gocql.Quorum + writeCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 10, Min: 100 * time.Millisecond, Max: 10 * time.Second} + writeCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + writeSession, err = writeCluster.CreateSession() + if err != nil { + if retry > 200 { + return fmt.Errorf("failed to connect read session too many times: %w", err) + } + sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err) + time.Sleep(delayForAttempt(retry)) + continue + } + break + } + + // + // Read session + // + var readSession *gocql.Session + for retry := 0; ; retry++ { + readCluster := gocql.NewCluster(sqs.scyllaAddrs...) + readCluster.Keyspace = sqs.keyspace + // Default port, the client should automatically upgrade to shard-aware port + readCluster.Port = 9042 + readCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 5, Min: 10 * time.Millisecond, Max: 1 * time.Second} + readCluster.Consistency = gocql.One + readCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + readSession, err = readCluster.CreateSession() + if err != nil { + if retry > 200 { + return fmt.Errorf("failed to connect read session too many times: %w", err) + } + sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err) + time.Sleep(delayForAttempt(retry)) + continue + } + break + } + + sqs.WriteSession = writeSession + sqs.ReadSession = readSession + + err = sqs.createTables() + if err != nil { + return fmt.Errorf("scylla could not create tables, %w", err) + } + sqs.lastShardCache.source = sqs + sqs.lastShardCache.Init() + return nil +} + +var createTableTexts = []string{ + `CREATE TABLE IF NOT EXISTS blocks (uid bigint, cid blob, rev varchar, root blob, block blob, PRIMARY KEY((uid,cid)))`, + // This is the INDEX I wish we could use, but scylla can't do it so we MATERIALIZED VIEW instead + //`CREATE INDEX IF NOT EXISTS block_by_rev ON blocks (uid, rev)`, + `CREATE MATERIALIZED VIEW IF NOT EXISTS blocks_by_uidrev +AS SELECT uid, rev, cid, root +FROM blocks +WHERE uid IS NOT NULL AND rev IS NOT NULL AND cid IS NOT NULL +PRIMARY KEY ((uid), rev, cid) WITH CLUSTERING ORDER BY (rev DESC)`, +} + +func (sqs *ScyllaStore) createTables() error { + for i, text := range createTableTexts { + err := sqs.WriteSession.Query(text).Exec() + if err != nil { + return fmt.Errorf("scylla create table statement [%d] %v: %w", i, text, err) + } + } + return nil +} + +// writeNewShard needed for DeltaSession.CloseWithRoot +func (sqs *ScyllaStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { + scWriteNewShard.Inc() + sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) + start := time.Now() + ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") + defer span.End() + buf := new(bytes.Buffer) + hnw, err := WriteCarHeader(buf, root) + if err != nil { + return nil, fmt.Errorf("failed to write car header: %w", err) + } + offset := hnw + + dbroot := root.Bytes() + + span.SetAttributes(attribute.Int("blocks", len(blks))) + + for bcid, block := range blks { + // build shard for output firehose + nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) + if err != nil { + return nil, fmt.Errorf("failed to write block: %w", err) + } + offset += nw + + // TODO: scylla BATCH doesn't apply if the batch crosses partition keys; BUT, we may be able to send many blocks concurrently? + dbcid := bcid.Bytes() + blockbytes := block.RawData() + // we're relying on cql auto-prepare, no 'PreparedStatement' + err = sqs.WriteSession.Query( + `INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?)`, + user, dbcid, rev, dbroot, blockbytes, + ).Idempotent(true).Exec() + if err != nil { + return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) + } + sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) + } + + shard := CarShard{ + Root: models.DbCID{CID: root}, + DataStart: hnw, + Seq: seq, + Usr: user, + Rev: rev, + } + + sqs.lastShardCache.put(&shard) + + dt := time.Since(start).Seconds() + scWriteTimes.Observe(dt) + return buf.Bytes(), nil +} + +// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache +// What we actually seem to need from this: last {Rev, Root.CID} +func (sqs *ScyllaStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { + scGetLastShard.Inc() + var rev string + var rootb []byte + err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks_by_uidrev WHERE uid = ? ORDER BY rev DESC LIMIT 1`, uid).Scan(&rev, &rootb) + if errors.Is(err, gocql.ErrNotFound) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("last shard err, %w", err) + } + xcid, cidErr := cid.Cast(rootb) + if cidErr != nil { + return nil, fmt.Errorf("last shard bad cid, %w", cidErr) + } + return &CarShard{ + Root: models.DbCID{CID: xcid}, + Rev: rev, + }, nil +} + +func (sqs *ScyllaStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { + sqs.log.Warn("TODO: don't call compaction") + return nil, nil +} + +func (sqs *ScyllaStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { + sqs.log.Warn("TODO: don't call compaction targets") + return nil, nil +} + +func (sqs *ScyllaStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return cid.Undef, err + } + if lastShard == nil { + return cid.Undef, nil + } + if lastShard.ID == 0 { + return cid.Undef, nil + } + + return lastShard.Root.CID, nil +} + +func (sqs *ScyllaStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return "", err + } + if lastShard == nil { + return "", nil + } + if lastShard.ID == 0 { + return "", nil + } + + return lastShard.Rev, nil +} + +func (sqs *ScyllaStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { + // TODO: same as FileCarStore, re-unify + ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") + defer span.End() + + carr, err := car.NewCarReader(bytes.NewReader(carslice)) + if err != nil { + return cid.Undef, nil, err + } + + if len(carr.Header.Roots) != 1 { + return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) + } + + ds, err := sqs.NewDeltaSession(ctx, uid, since) + if err != nil { + return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) + } + + var cids []cid.Cid + for { + blk, err := carr.Next() + if err != nil { + if err == io.EOF { + break + } + return cid.Undef, nil, err + } + + cids = append(cids, blk.Cid()) + + if err := ds.Put(ctx, blk); err != nil { + return cid.Undef, nil, err + } + } + + return carr.Header.Roots[0], ds, nil +} + +func (sqs *ScyllaStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") + defer span.End() + + // TODO: ensure that we don't write updates on top of the wrong head + // this needs to be a compare and swap type operation + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) + } + + if lastShard == nil { + lastShard = &zeroShard + } + + if since != nil && *since != lastShard.Rev { + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) + } + + return &DeltaSession{ + blks: make(map[cid.Cid]blockformat.Block), + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + user: user, + baseCid: lastShard.Root.CID, + cs: sqs, + seq: lastShard.Seq + 1, + lastRev: lastShard.Rev, + }, nil +} + +func (sqs *ScyllaStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { + return &DeltaSession{ + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + readonly: true, + user: user, + cs: sqs, + }, nil +} + +// ReadUserCar +// incremental is only ever called true +func (sqs *ScyllaStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { + scGetCar.Inc() + ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") + defer span.End() + start := time.Now() + + cidchan := make(chan cid.Cid, 100) + + go func() { + defer close(cidchan) + cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ? AND rev > ? ORDER BY rev DESC`, user, sinceRev).Iter() + defer cids.Close() + for { + var cidb []byte + ok := cids.Scan(&cidb) + if !ok { + break + } + xcid, cidErr := cid.Cast(cidb) + if cidErr != nil { + sqs.log.Warn("ReadUserCar bad cid", "err", cidErr) + continue + } + cidchan <- xcid + } + }() + nblocks := 0 + first := true + for xcid := range cidchan { + var xrev string + var xroot []byte + var xblock []byte + err := sqs.ReadSession.Query("SELECT rev, root, block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, xcid.Bytes()).Scan(&xrev, &xroot, &xblock) + if err != nil { + return fmt.Errorf("rcar bad read, %w", err) + } + if first { + rootCid, cidErr := cid.Cast(xroot) + if cidErr != nil { + return fmt.Errorf("rcar bad rootcid, %w", err) + } + if err := car.WriteHeader(&car.CarHeader{ + Roots: []cid.Cid{rootCid}, + Version: 1, + }, shardOut); err != nil { + return fmt.Errorf("rcar bad header, %w", err) + } + first = false + } + nblocks++ + _, err = LdWrite(shardOut, xcid.Bytes(), xblock) + if err != nil { + return fmt.Errorf("rcar bad write, %w", err) + } + } + span.SetAttributes(attribute.Int("blocks", nblocks)) + sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) + scReadCarTimes.Observe(time.Since(start).Seconds()) + return nil +} + +// Stat is only used in a debugging admin handler +// don't bother implementing it (for now?) +func (sqs *ScyllaStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { + sqs.log.Warn("Stat debugging method not implemented for sqlite store") + return nil, nil +} + +func (sqs *ScyllaStore) WipeUserData(ctx context.Context, user models.Uid) error { + ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") + defer span.End() + + // LOL, can't do this if primary key is (uid,cid) because that's hashed with no scan! + //err := sqs.WriteSession.Query("DELETE FROM blocks WHERE uid = ?", user).Exec() + + cidchan := make(chan cid.Cid, 100) + + go func() { + defer close(cidchan) + cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ?`, user).Iter() + defer cids.Close() + for { + var cidb []byte + ok := cids.Scan(&cidb) + if !ok { + break + } + xcid, cidErr := cid.Cast(cidb) + if cidErr != nil { + sqs.log.Warn("ReadUserCar bad cid", "err", cidErr) + continue + } + cidchan <- xcid + } + }() + nblocks := 0 + errcount := 0 + for xcid := range cidchan { + err := sqs.ReadSession.Query("DELETE FROM blocks WHERE uid = ? AND cid = ?", user, xcid.Bytes()).Exec() + if err != nil { + sqs.log.Warn("ReadUserCar bad delete, %w", err) + errcount++ + if errcount > 10 { + return err + } + } + nblocks++ + } + scUsersWiped.Inc() + scBlocksDeleted.Add(float64(nblocks)) + return nil +} + +// HasUidCid needed for NewDeltaSession userView +func (sqs *ScyllaStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + scHas.Inc() + var rev string + var rootb []byte + err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1`, user, bcid.Bytes()).Scan(&rev, &rootb) + if err != nil { + return false, fmt.Errorf("hasUC bad scan, %w", err) + } + return true, nil +} + +func (sqs *ScyllaStore) CarStore() CarStore { + return sqs +} + +func (sqs *ScyllaStore) Close() error { + sqs.WriteSession.Close() + sqs.ReadSession.Close() + return nil +} + +func (sqs *ScyllaStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + scGetBlock.Inc() + start := time.Now() + var blockb []byte + err := sqs.ReadSession.Query("SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&blockb) + if err != nil { + return nil, fmt.Errorf("getb err, %w", err) + } + dt := time.Since(start) + scGetTimes.Observe(dt.Seconds()) + return blocks.NewBlock(blockb), nil +} + +func (sqs *ScyllaStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + scGetBlockSize.Inc() + var out int64 + err := sqs.ReadSession.Query("SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&out) + if err != nil { + return 0, fmt.Errorf("getbs err, %w", err) + } + return out, nil +} + +var scUsersWiped = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_users_wiped", + Help: "User rows deleted in scylla backend", +}) + +var scBlocksDeleted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_blocks_deleted", + Help: "User blocks deleted in scylla backend", +}) + +var scGetBlock = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_block", + Help: "get block scylla backend", +}) + +var scGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_block_size", + Help: "get block size scylla backend", +}) + +var scGetCar = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_car", + Help: "get block scylla backend", +}) + +var scHas = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_has", + Help: "check block presence scylla backend", +}) + +var scGetLastShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_get_last_shard", + Help: "get last shard scylla backend", +}) + +var scWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sc_write_shard", + Help: "write shard blocks scylla backend", +}) + +var timeBuckets []float64 +var scWriteTimes prometheus.Histogram +var scGetTimes prometheus.Histogram +var scReadCarTimes prometheus.Histogram + +func init() { + timeBuckets = make([]float64, 1, 20) + timeBuckets[0] = 0.000_0100 + i := 0 + for timeBuckets[i] < 1 && len(timeBuckets) < 20 { + timeBuckets = append(timeBuckets, timeBuckets[i]*2) + i++ + } + scWriteTimes = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "bgs_sc_write_times", + Buckets: timeBuckets, + }) + scGetTimes = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "bgs_sc_get_times", + Buckets: timeBuckets, + }) + scReadCarTimes = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "bgs_sc_readcar_times", + Buckets: timeBuckets, + }) +} + +// TODO: copied from tango, re-unify? +// ExponentialBackoffRetryPolicy sleeps between attempts +type ExponentialBackoffRetryPolicy struct { + NumRetries int + Min, Max time.Duration +} + +func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration { + return getExponentialTime(e.Min, e.Max, attempts) +} + +func (e *ExponentialBackoffRetryPolicy) Attempt(q gocql.RetryableQuery) bool { + if q.Attempts() > e.NumRetries { + return false + } + time.Sleep(e.napTime(q.Attempts())) + return true +} + +// used to calculate exponentially growing time +func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration { + if min <= 0 { + min = 100 * time.Millisecond + } + if max <= 0 { + max = 10 * time.Second + } + minFloat := float64(min) + napDuration := minFloat * math.Pow(2, float64(attempts-1)) + // add some jitter + napDuration += rand.Float64()*minFloat - (minFloat / 2) + if napDuration > float64(max) { + return time.Duration(max) + } + return time.Duration(napDuration) +} + +// GetRetryType returns the retry type for the given error +func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) gocql.RetryType { + // Retry timeouts and/or contention errors on the same host + if errors.Is(err, gocql.ErrTimeoutNoResponse) || + errors.Is(err, gocql.ErrNoStreams) || + errors.Is(err, gocql.ErrTooManyTimeouts) { + return gocql.Retry + } + + // Retry next host on unavailable errors + if errors.Is(err, gocql.ErrUnavailable) || + errors.Is(err, gocql.ErrConnectionClosed) || + errors.Is(err, gocql.ErrSessionClosed) { + return gocql.RetryNextHost + } + + // Otherwise don't retry + return gocql.Rethrow +} + +func delayForAttempt(attempt int) time.Duration { + if attempt < 50 { + return time.Millisecond * 5 + } + + return time.Second +} diff --git a/carstore/sqlite_store.go b/carstore/sqlite_store.go new file mode 100644 index 000000000..18a8467db --- /dev/null +++ b/carstore/sqlite_store.go @@ -0,0 +1,576 @@ +package carstore + +import ( + "bytes" + "context" + "database/sql" + "errors" + "fmt" + "go.opentelemetry.io/otel/attribute" + "io" + "log/slog" + "os" + "path/filepath" + + "github.com/bluesky-social/indigo/models" + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipld/go-car" + _ "github.com/mattn/go-sqlite3" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel" +) + +// var log = logging.Logger("sqstore") + +type SQLiteStore struct { + dbPath string + db *sql.DB + + log *slog.Logger + + lastShardCache lastShardCache +} + +func ensureDir(path string) error { + fi, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return os.MkdirAll(path, 0755) + } + return err + } + if fi.IsDir() { + return nil + } + return fmt.Errorf("%s exists but is not a directory", path) +} + +func NewSqliteStore(csdir string) (*SQLiteStore, error) { + if err := ensureDir(csdir); err != nil { + return nil, err + } + dbpath := filepath.Join(csdir, "db.sqlite3") + out := new(SQLiteStore) + err := out.Open(dbpath) + if err != nil { + return nil, err + } + return out, nil +} + +func (sqs *SQLiteStore) Open(path string) error { + if sqs.log == nil { + sqs.log = slog.Default() + } + sqs.log.Debug("open db", "path", path) + db, err := sql.Open("sqlite3", path) + if err != nil { + return fmt.Errorf("%s: sqlite could not open, %w", path, err) + } + sqs.db = db + sqs.dbPath = path + err = sqs.createTables() + if err != nil { + return fmt.Errorf("%s: sqlite could not create tables, %w", path, err) + } + sqs.lastShardCache.source = sqs + sqs.lastShardCache.Init() + return nil +} + +func (sqs *SQLiteStore) createTables() error { + tx, err := sqs.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));") + if err != nil { + return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err) + } + _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)") + if err != nil { + return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err) + } + return tx.Commit() +} + +// writeNewShard needed for DeltaSession.CloseWithRoot +func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { + sqWriteNewShard.Inc() + sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) + ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") + defer span.End() + // this is "write many blocks", "write one block" is above in putBlock(). keep them in sync. + buf := new(bytes.Buffer) + hnw, err := WriteCarHeader(buf, root) + if err != nil { + return nil, fmt.Errorf("failed to write car header: %w", err) + } + offset := hnw + + tx, err := sqs.db.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("bad block insert tx, %w", err) + } + defer tx.Rollback() + insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block") + if err != nil { + return nil, fmt.Errorf("bad block insert sql, %w", err) + } + defer insertStatement.Close() + + dbroot := models.DbCID{CID: root} + + span.SetAttributes(attribute.Int("blocks", len(blks))) + + for bcid, block := range blks { + // build shard for output firehose + nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) + if err != nil { + return nil, fmt.Errorf("failed to write block: %w", err) + } + offset += nw + + // TODO: better databases have an insert-many option for a prepared statement + dbcid := models.DbCID{CID: bcid} + blockbytes := block.RawData() + _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes) + if err != nil { + return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) + } + sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) + } + err = tx.Commit() + if err != nil { + return nil, fmt.Errorf("bad block insert commit, %w", err) + } + + shard := CarShard{ + Root: models.DbCID{CID: root}, + DataStart: hnw, + Seq: seq, + Usr: user, + Rev: rev, + } + + sqs.lastShardCache.put(&shard) + + return buf.Bytes(), nil +} + +var ErrNothingThere = errors.New("nothing to read)") + +// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache +// What we actually seem to need from this: last {Rev, Root.CID} +func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { + sqGetLastShard.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return nil, fmt.Errorf("bad last shard tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1") + if err != nil { + return nil, fmt.Errorf("bad last shard sql, %w", err) + } + rows, err := qstmt.QueryContext(ctx, uid) + if err != nil { + return nil, fmt.Errorf("last shard err, %w", err) + } + if rows.Next() { + var rev string + var rootb models.DbCID + err = rows.Scan(&rev, &rootb) + if err != nil { + return nil, fmt.Errorf("last shard bad scan, %w", err) + } + return &CarShard{ + Root: rootb, + Rev: rev, + }, nil + } + return nil, nil +} + +func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { + sqs.log.Warn("TODO: don't call compaction") + return nil, nil +} + +func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { + sqs.log.Warn("TODO: don't call compaction targets") + return nil, nil +} + +func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return cid.Undef, err + } + if lastShard == nil { + return cid.Undef, nil + } + if lastShard.ID == 0 { + return cid.Undef, nil + } + + return lastShard.Root.CID, nil +} + +func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { + // TODO: same as FileCarStore; re-unify + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return "", err + } + if lastShard == nil { + return "", nil + } + if lastShard.ID == 0 { + return "", nil + } + + return lastShard.Rev, nil +} + +func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { + // TODO: same as FileCarStore, re-unify + ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") + defer span.End() + + carr, err := car.NewCarReader(bytes.NewReader(carslice)) + if err != nil { + return cid.Undef, nil, err + } + + if len(carr.Header.Roots) != 1 { + return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) + } + + ds, err := sqs.NewDeltaSession(ctx, uid, since) + if err != nil { + return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) + } + + var cids []cid.Cid + for { + blk, err := carr.Next() + if err != nil { + if err == io.EOF { + break + } + return cid.Undef, nil, err + } + + cids = append(cids, blk.Cid()) + + if err := ds.Put(ctx, blk); err != nil { + return cid.Undef, nil, err + } + } + + return carr.Header.Roots[0], ds, nil +} + +var zeroShard CarShard + +func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") + defer span.End() + + // TODO: ensure that we don't write updates on top of the wrong head + // this needs to be a compare and swap type operation + lastShard, err := sqs.lastShardCache.get(ctx, user) + if err != nil { + return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) + } + + if lastShard == nil { + lastShard = &zeroShard + } + + if since != nil && *since != lastShard.Rev { + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) + } + + return &DeltaSession{ + blks: make(map[cid.Cid]blockformat.Block), + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + user: user, + baseCid: lastShard.Root.CID, + cs: sqs, + seq: lastShard.Seq + 1, + lastRev: lastShard.Rev, + }, nil +} + +func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { + return &DeltaSession{ + base: &sqliteUserView{ + uid: user, + sqs: sqs, + }, + readonly: true, + user: user, + cs: sqs, + }, nil +} + +type cartmp struct { + xcid cid.Cid + rev string + root string + block []byte +} + +// ReadUserCar +// incremental is only ever called true +func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { + sqGetCar.Inc() + ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") + defer span.End() + + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return fmt.Errorf("rcar tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC") + if err != nil { + return fmt.Errorf("rcar sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, sinceRev) + if err != nil { + return fmt.Errorf("rcar err, %w", err) + } + nblocks := 0 + first := true + for rows.Next() { + var xcid models.DbCID + var xrev string + var xroot models.DbCID + var xblock []byte + err = rows.Scan(&xcid, &xrev, &xroot, &xblock) + if err != nil { + return fmt.Errorf("rcar bad scan, %w", err) + } + if first { + if err := car.WriteHeader(&car.CarHeader{ + Roots: []cid.Cid{xroot.CID}, + Version: 1, + }, shardOut); err != nil { + return fmt.Errorf("rcar bad header, %w", err) + } + first = false + } + nblocks++ + _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock) + if err != nil { + return fmt.Errorf("rcar bad write, %w", err) + } + } + sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) + return nil +} + +// Stat is only used in a debugging admin handler +// don't bother implementing it (for now?) +func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { + sqs.log.Warn("Stat debugging method not implemented for sqlite store") + return nil, nil +} + +func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error { + ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") + defer span.End() + tx, err := sqs.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("wipe tx, %w", err) + } + defer tx.Rollback() + deleteResult, err := tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user) + nrows, ierr := deleteResult.RowsAffected() + if ierr == nil { + sqRowsDeleted.Add(float64(nrows)) + } + if err == nil { + err = ierr + } + if err == nil { + err = tx.Commit() + } + return err +} + +var txReadOnly = sql.TxOptions{ReadOnly: true} + +// HasUidCid needed for NewDeltaSession userView +func (sqs *SQLiteStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + sqHas.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return false, fmt.Errorf("hasUC tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") + if err != nil { + return false, fmt.Errorf("hasUC sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) + if err != nil { + return false, fmt.Errorf("hasUC err, %w", err) + } + if rows.Next() { + var rev string + var rootb models.DbCID + err = rows.Scan(&rev, &rootb) + if err != nil { + return false, fmt.Errorf("hasUC bad scan, %w", err) + } + return true, nil + } + return false, nil +} + +func (sqs *SQLiteStore) CarStore() CarStore { + return sqs +} + +func (sqs *SQLiteStore) Close() error { + return sqs.db.Close() +} + +func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + sqGetBlock.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return nil, fmt.Errorf("getb tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") + if err != nil { + return nil, fmt.Errorf("getb sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) + if err != nil { + return nil, fmt.Errorf("getb err, %w", err) + } + if rows.Next() { + //var rev string + //var rootb models.DbCID + var blockb []byte + err = rows.Scan(&blockb) + if err != nil { + return nil, fmt.Errorf("getb bad scan, %w", err) + } + return blocks.NewBlock(blockb), nil + } + return nil, ErrNothingThere +} + +func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { + // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData + sqGetBlockSize.Inc() + tx, err := sqs.db.BeginTx(ctx, &txReadOnly) + if err != nil { + return 0, fmt.Errorf("getbs tx, %w", err) + } + defer tx.Rollback() + qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") + if err != nil { + return 0, fmt.Errorf("getbs sql, %w", err) + } + defer qstmt.Close() + rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) + if err != nil { + return 0, fmt.Errorf("getbs err, %w", err) + } + if rows.Next() { + var out int64 + err = rows.Scan(&out) + if err != nil { + return 0, fmt.Errorf("getbs bad scan, %w", err) + } + return out, nil + } + return 0, nil +} + +type sqliteUserViewInner interface { + HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) + getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) + getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) +} + +// TODO: rename, used by both sqlite and scylla +type sqliteUserView struct { + sqs sqliteUserViewInner + uid models.Uid +} + +func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) { + // TODO: cache block metadata? + return s.sqs.HasUidCid(ctx, s.uid, c) +} + +func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { + // TODO: cache blocks? + return s.sqs.getBlock(ctx, s.uid, c) +} + +func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) { + // TODO: cache block metadata? + bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c) + return int(bigsize), err +} + +// ensure we implement the interface +var _ minBlockstore = (*sqliteUserView)(nil) + +var sqRowsDeleted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_rows_deleted", + Help: "User rows deleted in sqlite backend", +}) + +var sqGetBlock = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_block", + Help: "get block sqlite backend", +}) + +var sqGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_block_size", + Help: "get block size sqlite backend", +}) + +var sqGetCar = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_car", + Help: "get block sqlite backend", +}) + +var sqHas = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_has", + Help: "check block presence sqlite backend", +}) + +var sqGetLastShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_get_last_shard", + Help: "get last shard sqlite backend", +}) + +var sqWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{ + Name: "bgs_sq_write_shard", + Help: "write shard blocks sqlite backend", +}) diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 33c798508..08d638c3c 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -216,6 +216,17 @@ func run(args []string) error { Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", EnvVars: []string{"RELAY_NEXT_CRAWLER"}, }, + &cli.BoolFlag{ + Name: "ex-sqlite-carstore", + Usage: "enable experimental sqlite carstore", + Value: false, + }, + &cli.StringSliceFlag{ + Name: "scylla-carstore", + Usage: "scylla server addresses for storage backend, comma separated", + Value: &cli.StringSlice{}, + EnvVars: []string{"RELAY_SCYLLA_NODES"}, + }, &cli.BoolFlag{ Name: "non-archival", EnvVars: []string{"RELAY_NON_ARCHIVAL"}, @@ -316,56 +327,72 @@ func runBigsky(cctx *cli.Context) error { return err } - slog.Info("setting up main database") dburl := cctx.String("db-url") + slog.Info("setting up main database", "url", dburl) db, err := cliutil.SetupDatabase(dburl, cctx.Int("max-metadb-connections")) if err != nil { return err } - - slog.Info("setting up carstore database") - csdburl := cctx.String("carstore-db-url") - csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections")) - if err != nil { - return err - } - if cctx.Bool("db-tracing") { if err := db.Use(tracing.NewPlugin()); err != nil { return err } - if err := csdb.Use(tracing.NewPlugin()); err != nil { - return err - } - } - - csdirs := []string{csdir} - if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 { - csdirs = paramDirs } - for _, csd := range csdirs { - if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil { + var cstore carstore.CarStore + scyllaAddrs := cctx.StringSlice("scylla-carstore") + sqliteStore := cctx.Bool("ex-sqlite-carstore") + if len(scyllaAddrs) != 0 { + slog.Info("starting scylla carstore", "addrs", scyllaAddrs) + cstore, err = carstore.NewScyllaStore(scyllaAddrs, "cs") + } else if sqliteStore { + slog.Info("starting sqlite carstore", "dir", csdir) + cstore, err = carstore.NewSqliteStore(csdir) + } else if cctx.Bool("non-archival") { + csdburl := cctx.String("carstore-db-url") + slog.Info("setting up non-archival carstore database", "url", csdburl) + csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections")) + if err != nil { return err } - } - - var cstore carstore.CarStore - - if cctx.Bool("non-archival") { + if cctx.Bool("db-tracing") { + if err := csdb.Use(tracing.NewPlugin()); err != nil { + return err + } + } cs, err := carstore.NewNonArchivalCarstore(csdb) if err != nil { return err } - cstore = cs } else { - cs, err := carstore.NewCarStore(csdb, csdirs) + // make standard FileCarStore + csdburl := cctx.String("carstore-db-url") + slog.Info("setting up carstore database", "url", csdburl) + csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections")) if err != nil { return err } + if cctx.Bool("db-tracing") { + if err := csdb.Use(tracing.NewPlugin()); err != nil { + return err + } + } + csdirs := []string{csdir} + if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 { + csdirs = paramDirs + } - cstore = cs + for _, csd := range csdirs { + if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil { + return err + } + } + cstore, err = carstore.NewCarStore(csdb, csdirs) + } + + if err != nil { + return err } // DID RESOLUTION diff --git a/cmd/gosky/debug.go b/cmd/gosky/debug.go index 2037c328c..a51895244 100644 --- a/cmd/gosky/debug.go +++ b/cmd/gosky/debug.go @@ -885,7 +885,7 @@ var debugCompareReposCmd = &cli.Command{ rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes)) if err != nil { - logger.Error("reading repo", "err", err) + logger.Error("reading repo", "err", err, "bytes", len(repo1bytes)) os.Exit(1) return } @@ -904,7 +904,7 @@ var debugCompareReposCmd = &cli.Command{ rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes)) if err != nil { - logger.Error("reading repo", "err", err) + logger.Error("reading repo", "err", err, "bytes", len(repo2bytes)) os.Exit(1) return } diff --git a/go.mod b/go.mod index ff1c8ee77..e7bc94fe7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/flosch/pongo2/v6 v6.0.0 github.com/go-redis/cache/v9 v9.0.0 github.com/goccy/go-json v0.10.2 + github.com/gocql/gocql v0.0.0-00010101000000-000000000000 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/gorilla/websocket v1.5.1 github.com/hashicorp/go-retryablehttp v0.7.5 @@ -87,6 +88,7 @@ require ( github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect @@ -105,6 +107,7 @@ require ( github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect + gopkg.in/inf.v0 v0.9.1 // indirect ) require ( @@ -151,7 +154,7 @@ require ( github.com/lestrrat-go/option v1.0.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/mattn/go-sqlite3 v1.14.22 github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect @@ -187,3 +190,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) + +replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4 diff --git a/go.sum b/go.sum index b72fdce5b..5187c101a 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,10 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c= github.com/brianvoe/gofakeit/v6 v6.25.0 h1:ZpFjktOpLZUeF8q223o0rUuXtA+m5qW5srjvVi+JkXk= @@ -211,6 +215,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -255,6 +260,8 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= @@ -592,6 +599,8 @@ github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/samber/slog-echo v1.8.0 h1:DQQRtAliSvQw+ScEdu5gv3jbHu9cCTzvHuTD8GDv7zI= github.com/samber/slog-echo v1.8.0/go.mod h1:0ab2AwcciQXNAXEcjkHwD9okOh9vEHEYn8xP97ocuhM= +github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA= +github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -806,6 +815,7 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= @@ -1090,6 +1100,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -1124,3 +1136,5 @@ lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/indexer/crawler.go b/indexer/crawler.go index 526da9bb6..7e2656dd9 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -14,19 +14,22 @@ import ( ) type CrawlDispatcher struct { + // from Crawl() ingest chan *models.ActorInfo - repoSync chan *crawlWork - + // from AddToCatchupQueue() catchup chan *crawlWork + // from main loop to fetchWorker() + repoSync chan *crawlWork + complete chan models.Uid maplk sync.Mutex todo map[models.Uid]*crawlWork inProgress map[models.Uid]*crawlWork - doRepoCrawl func(context.Context, *crawlWork) error + repoFetcher CrawlRepoFetcher concurrency int @@ -35,7 +38,12 @@ type CrawlDispatcher struct { done chan struct{} } -func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { +// this is what we need of RepoFetcher +type CrawlRepoFetcher interface { + FetchAndIndexRepo(ctx context.Context, job *crawlWork) error +} + +func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { if concurrency < 1 { return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") } @@ -45,7 +53,7 @@ func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurre repoSync: make(chan *crawlWork), complete: make(chan models.Uid), catchup: make(chan *crawlWork), - doRepoCrawl: repoFn, + repoFetcher: repoFetcher, concurrency: concurrency, todo: make(map[models.Uid]*crawlWork), inProgress: make(map[models.Uid]*crawlWork), @@ -221,7 +229,7 @@ func (c *CrawlDispatcher) fetchWorker() { for { select { case job := <-c.repoSync: - if err := c.doRepoCrawl(context.TODO(), job); err != nil { + if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) } diff --git a/indexer/indexer.go b/indexer/indexer.go index e6a324e9e..6920c7fb6 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -69,7 +69,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events } if crawl { - c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency, ix.log) + c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency, ix.log) if err != nil { return nil, err } diff --git a/indexer/repofetch.go b/indexer/repofetch.go index 8ce68bb5f..1e93612d8 100644 --- a/indexer/repofetch.go +++ b/indexer/repofetch.go @@ -141,8 +141,10 @@ func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) er } } + revp := &rev if rev == "" { span.SetAttributes(attribute.Bool("full", true)) + revp = nil } c := models.ClientForPds(&pds) @@ -153,7 +155,7 @@ func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) er return err } - if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil { + if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), revp); err != nil { span.RecordError(err) if ipld.IsNotFound(err) || errors.Is(err, io.EOF) || errors.Is(err, fs.ErrNotExist) { diff --git a/models/dbcid.go b/models/dbcid.go index 366a0e829..d64ae0bc6 100644 --- a/models/dbcid.go +++ b/models/dbcid.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "github.com/gocql/gocql" "github.com/ipfs/go-cid" ) @@ -62,3 +63,15 @@ func (dbc *DbCID) UnmarshalJSON(b []byte) error { func (dbc *DbCID) GormDataType() string { return "bytes" } + +func (dbc *DbCID) MarshalCQL(info gocql.TypeInfo) ([]byte, error) { + return dbc.CID.Bytes(), nil +} +func (dbc *DbCID) UnmarshalCQL(info gocql.TypeInfo, data []byte) error { + xcid, err := cid.Cast(data) + if err != nil { + return err + } + dbc.CID = xcid + return nil +} diff --git a/repo/repo.go b/repo/repo.go index acdafcce6..f4e683f4c 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -80,7 +80,7 @@ func IngestRepo(ctx context.Context, bs blockstore.Blockstore, r io.Reader) (cid br, err := car.NewBlockReader(r) if err != nil { - return cid.Undef, err + return cid.Undef, fmt.Errorf("IngestRepo:NewBlockReader: %w", err) } for { @@ -89,11 +89,11 @@ func IngestRepo(ctx context.Context, bs blockstore.Blockstore, r io.Reader) (cid if err == io.EOF { break } - return cid.Undef, err + return cid.Undef, fmt.Errorf("IngestRepo:Next: %w", err) } if err := bs.Put(ctx, blk); err != nil { - return cid.Undef, err + return cid.Undef, fmt.Errorf("IngestRepo:Put: %w", err) } } @@ -104,7 +104,7 @@ func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) { bs := blockstore.NewBlockstore(datastore.NewMapDatastore()) root, err := IngestRepo(ctx, bs, r) if err != nil { - return nil, err + return nil, fmt.Errorf("ReadRepoFromCar:IngestRepo: %w", err) } return OpenRepo(ctx, bs, root) diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index d2c3766f3..6c349a1c9 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -912,6 +912,9 @@ func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoD return err } + if rev != nil && *rev == "" { + rev = nil + } if rev == nil { // if 'rev' is nil, this implies a fresh sync. // in this case, ignore any existing blocks we have and treat this like a clean import. diff --git a/testing/utils.go b/testing/utils.go index d2847d026..d7875bbd4 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -210,12 +210,14 @@ func (tp *TestPDS) BumpLimits(t *testing.T, b *TestRelay) { } limReqBody := bgs.RateLimitChangeRequest{ - Host: u.Host, - PerSecond: 5_000, - PerHour: 100_000, - PerDay: 1_000_000, - RepoLimit: 500_000, - CrawlRate: 50_000, + Host: u.Host, + PDSRates: bgs.PDSRates{ + PerSecond: 5_000, + PerHour: 100_000, + PerDay: 1_000_000, + RepoLimit: 500_000, + CrawlRate: 50_000, + }, } // JSON encode the request body