Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new relay store #802

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
40afdd5
wip
brianolson Oct 18, 2024
450b771
wip: new storage skeleton
brianolson Oct 25, 2024
5ada956
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Oct 31, 2024
2e408c1
wip: starting to fail at unit testing new carstore
brianolson Nov 1, 2024
79252d7
passes unit tests
brianolson Nov 1, 2024
4ad3bea
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Nov 1, 2024
29cb514
more sqlite txn
brianolson Nov 1, 2024
0b6612a
index (uid,rev DESC); add readme
brianolson Nov 2, 2024
9bc8bf7
add some trace spans
brianolson Nov 3, 2024
8bf9dcd
flag to enable experimental sqlite carstore
brianolson Nov 3, 2024
e5f8403
mkdirs
brianolson Nov 3, 2024
a631350
nil fix
brianolson Nov 3, 2024
e8a57d8
nil fix
brianolson Nov 3, 2024
da9022a
nil fix
brianolson Nov 3, 2024
06e76d5
note blocks written in otel span
brianolson Nov 7, 2024
1e283b7
wip: scylla backend compiles
brianolson Nov 8, 2024
c5e9107
connect scylla to main
brianolson Nov 8, 2024
d0e0a27
allow posting pds crawl with limits
brianolson Nov 8, 2024
190b304
scylla appeasment, index -> materialized view
brianolson Nov 8, 2024
367255d
not found is nil return, not err
brianolson Nov 11, 2024
2967953
cql marshal
brianolson Nov 11, 2024
43a68b2
explicit Cid <-> []byte for scylla
brianolson Nov 11, 2024
27741bc
use secondary index to delet blocks by uid
brianolson Nov 11, 2024
e48e72f
add time histograms on major scylla queries
brianolson Nov 11, 2024
b20aac6
fix
brianolson Nov 13, 2024
bac8361
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Nov 13, 2024
2fe848e
more info on bad repo compare
brianolson Nov 13, 2024
6afe8e4
cleanup
brianolson Nov 13, 2024
8072318
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Nov 15, 2024
2bb6244
fix arg name
brianolson Nov 15, 2024
a8a5b7b
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Nov 18, 2024
198c156
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Nov 22, 2024
8743ff0
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Dec 6, 2024
6026f75
env RELAY_SCYLLA_NODES
brianolson Dec 11, 2024
377c10a
Merge remote-tracking branch 'origin/main' into bolson/new-relay-store
brianolson Dec 17, 2024
805ae5b
fix usage of ImportNewRepo
brianolson Dec 17, 2024
d709ae9
func ptr -> interface
brianolson Dec 18, 2024
16c36b8
comment
brianolson Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,35 @@ func (bgs *BGS) handleAdminUnbanDomain(c echo.Context) error {
})
}

type PDSRates struct {
PerSecond int64 `json:"per_second,omitempty"`
PerHour int64 `json:"per_hour,omitempty"`
PerDay int64 `json:"per_day,omitempty"`
CrawlRate int64 `json:"crawl_rate,omitempty"`
RepoLimit int64 `json:"repo_limit,omitempty"`
}

func (pr *PDSRates) FromSlurper(s *Slurper) {
if pr.PerSecond == 0 {
pr.PerHour = s.DefaultPerSecondLimit
}
if pr.PerHour == 0 {
pr.PerHour = s.DefaultPerHourLimit
}
if pr.PerDay == 0 {
pr.PerDay = s.DefaultPerDayLimit
}
if pr.CrawlRate == 0 {
pr.CrawlRate = int64(s.DefaultCrawlLimit)
}
if pr.RepoLimit == 0 {
pr.RepoLimit = s.DefaultRepoLimit
}
}

type RateLimitChangeRequest struct {
Host string `json:"host"`
PerSecond int64 `json:"per_second"`
PerHour int64 `json:"per_hour"`
PerDay int64 `json:"per_day"`
CrawlRate int64 `json:"crawl_rate"`
RepoLimit int64 `json:"repo_limit"`
Host string `json:"host"`
PDSRates
}

func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
Expand Down Expand Up @@ -592,6 +614,9 @@ func (bgs *BGS) handleAdminAddTrustedDomain(e echo.Context) error {

type AdminRequestCrawlRequest struct {
Hostname string `json:"hostname"`

// optional:
PDSRates
}

func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
Expand Down Expand Up @@ -644,6 +669,8 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
}

// Skip checking if the server is online for now
rateOverrides := body.PDSRates
rateOverrides.FromSlurper(bgs.slurper)

return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check
return bgs.slurper.SubscribeToPds(ctx, host, true, true, &rateOverrides) // Override Trusted Domain Check
}
9 changes: 8 additions & 1 deletion bgs/fedmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (s *Slurper) canSlurpHost(host string) bool {
return !s.newSubsDisabled
}

func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool) error {
func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool, rateOverrides *PDSRates) error {
// TODO: for performance, lock on the hostname instead of global
s.lk.Lock()
defer s.lk.Unlock()
Expand Down Expand Up @@ -397,6 +397,13 @@ func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adm
CrawlRateLimit: float64(s.DefaultCrawlLimit),
RepoLimit: s.DefaultRepoLimit,
}
if rateOverrides != nil {
npds.RateLimit = float64(rateOverrides.PerSecond)
npds.HourlyEventLimit = rateOverrides.PerHour
npds.DailyEventLimit = rateOverrides.PerDay
npds.CrawlRateLimit = float64(rateOverrides.CrawlRate)
npds.RepoLimit = rateOverrides.RepoLimit
}
if err := s.db.Create(&npds).Error; err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp
}
}

return s.slurper.SubscribeToPds(ctx, host, true, false)
return s.slurper.SubscribeToPds(ctx, host, true, false, nil)
}

func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error {
Expand Down
41 changes: 41 additions & 0 deletions carstore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Carstore

Store a zillion users of PDS-like repo, with more limited operations (mainly: firehose in, firehose out).

## [ScyllaStore](scylla.go)

Blocks stored in ScyllaDB.
User and PDS metadata stored in gorm (PostgreSQL or sqlite3).

## [FileCarStore](bs.go)

Store 'car slices' from PDS source subscribeRepo firehose streams to filesystem.
Store metadata to gorm postgresql (or sqlite3).
Periodic compaction of car slices into fewer larger car slices.
User and PDS metadata stored in gorm (PostgreSQL or sqlite3).
FileCarStore was the first production carstore and used through at least 2024-11.

## [SQLiteStore](sqlite_store.go)

Experimental/demo.
Blocks stored in trivial local sqlite3 schema.
Minimal reference implementation from which fancy scalable/performant implementations may be derived.

```sql
CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid))
CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)

INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block

SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1

SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC

DELETE FROM blocks WHERE uid = ?

SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1

SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1

SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1
```
Loading
Loading