Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay api key #805

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,114 @@ func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {

return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check
}

// Whoooo are the users in your neighborhood?
type ApiKey struct {
// ApiKey goes in header: `Authorization: Bearer {}`
ApiKey string `json:"key"`
// Priority; more is more (unkeyed == 0)
Priority int32 `json:"prio"`
}

// set by
type ApiKeySet struct {
Keys []ApiKey `json:"keys"`
}

// ApiKeys is the in-memory version
type ApiKeys struct {
Keys map[string]ApiKey `json:"keys"`
}

func NewApiKeys() *ApiKeys {
out := new(ApiKeys)
out.Keys = make(map[string]ApiKey)
return out
}

func (ak *ApiKeys) Clone() *ApiKeys {
if ak == nil {
return NewApiKeys()
}
out := new(ApiKeys)
out.Keys = make(map[string]ApiKey, len(ak.Keys))
for k, v := range ak.Keys {
out.Keys[k] = v
}
return out
}

// handleAdminSetApiKeys handles POST /admin/api/keys
// ?reset=t to clear all keys first
func (bgs *BGS) handleAdminSetApiKeys(e echo.Context) error {
var body ApiKeySet
if err := e.Bind(&body); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err))
}
reset := e.QueryParam("reset")
var prevAk *ApiKeys
var ak *ApiKeys
done := false
for !done {
if reset == "t" {
// start with empty api key set, use only what is uploaded on this POST
ak = NewApiKeys()
} else if reset == "" {
prevAk = bgs.apiKeys.Load()
if prevAk == nil {
ak = NewApiKeys()
} else {
ak = prevAk.Clone()
}
// ok
} else {
return echo.NewHTTPError(http.StatusBadRequest, "nonsense reset")
}
for _, nak := range body.Keys {
ak.Keys[nak.ApiKey] = nak
}
if reset == "t" {
// don't compare-and-set, just clobber
bgs.apiKeys.Store(ak)
done = true
} else {
done = bgs.apiKeys.CompareAndSwap(prevAk, ak)
}
}
return e.JSON(200, map[string]any{
"success": true,
})
}

// handleAdminGetApiKeys handles GET /admin/api/keys
func (bgs *BGS) handleAdminGetApiKeys(e echo.Context) error {
var out ApiKeySet
prevAk := bgs.apiKeys.Load()
if prevAk != nil {
out.Keys = make([]ApiKey, 0, len(prevAk.Keys))
for _, ak := range prevAk.Keys {
out.Keys = append(out.Keys, ak)
}
}
return e.JSON(200, out)
}

func (bgs *BGS) handleAdminSetApiPriority(e echo.Context) error {
levelStr := e.QueryParam("level")
if levelStr == "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass level")
}
level, err := strconv.ParseInt(levelStr, 10, 32)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "invalid level")
}
bgs.minApiKeyLevel.Store(int32(level))
select {
case bgs.minLevelEvents <- int32(level):
default:
log.Warn("minLevelEvents is full")
}
return e.JSON(200, map[string]any{
"success": true,
})
}
127 changes: 114 additions & 13 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"contrib.go.opencensus.io/exporter/prometheus"
Expand Down Expand Up @@ -87,6 +88,10 @@ type BGS struct {

// Management of Compaction
compactor *Compactor

apiKeys atomic.Pointer[ApiKeys]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we use this instead of a lock?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use these atomic semantics anywhere else and I'm a bit wary of throwing them in here now over using a standard mutex pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally atomics are faster; there are a few large system NUMA cases where they are less good than usual, but I'm not sure they're ever slower than sync.Mutex ? I think in the optimistic case where there is no contention they're much faster.
They don't provide fairness of unlock to multiple lockers like sync.Mutex, but that doesn't matter here.

minApiKeyLevel atomic.Int32
minLevelEvents chan int32
}

type PDSResync struct {
Expand All @@ -102,8 +107,10 @@ type PDSResync struct {
type SocketConsumer struct {
UserAgent string
RemoteAddr string
AuthKey string
ConnectedAt time.Time
EventsSent promclient.Counter
Cancel func()
}

type BGSConfig struct {
Expand Down Expand Up @@ -149,8 +156,14 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
consumers: make(map[uint64]*SocketConsumer),

pdsResyncs: make(map[uint]*PDSResync),

minLevelEvents: make(chan int32, 100),
}

bgs.minApiKeyLevel.Store(0)

go bgs.priorityChangeThread()

ix.CreateExternalUser = bgs.createExternalUser
slOpts := DefaultSlurperOptions()
slOpts.SSL = config.SSL
Expand Down Expand Up @@ -338,15 +351,18 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {

// TODO: this API is temporary until we formalize what we want here

e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler)
e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord)
e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo)
e.GET("/xrpc/com.atproto.sync.getBlocks", bgs.HandleComAtprotoSyncGetBlocks)
e.GET("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl)
e.POST("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl)
e.GET("/xrpc/com.atproto.sync.listRepos", bgs.HandleComAtprotoSyncListRepos)
e.GET("/xrpc/com.atproto.sync.getLatestCommit", bgs.HandleComAtprotoSyncGetLatestCommit)
e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", bgs.HandleComAtprotoSyncNotifyOfUpdate)
// These apis check for API key (mostly in case of traffic shedding)
e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler, bgs.checkApiKey)
e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord, bgs.checkApiKey)
e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo, bgs.checkApiKey)
e.GET("/xrpc/com.atproto.sync.getBlocks", bgs.HandleComAtprotoSyncGetBlocks, bgs.checkApiKey)
e.GET("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl, bgs.checkApiKey)
e.POST("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl, bgs.checkApiKey)
e.GET("/xrpc/com.atproto.sync.listRepos", bgs.HandleComAtprotoSyncListRepos, bgs.checkApiKey)
e.GET("/xrpc/com.atproto.sync.getLatestCommit", bgs.HandleComAtprotoSyncGetLatestCommit, bgs.checkApiKey)
e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", bgs.HandleComAtprotoSyncNotifyOfUpdate, bgs.checkApiKey)

// These don't need to check any API key
e.GET("/xrpc/_health", bgs.HandleHealthCheck)
e.GET("/_health", bgs.HandleHealthCheck)
e.GET("/", bgs.HandleHomeMessage)
Expand Down Expand Up @@ -387,6 +403,10 @@ func (bgs *BGS) StartWithListener(listen net.Listener) error {
// Consumer-related Admin API
admin.GET("/consumers/list", bgs.handleAdminListConsumers)

admin.POST("/api/keys", bgs.handleAdminSetApiKeys)
admin.GET("/api/keys", bgs.handleAdminGetApiKeys)
admin.POST("/api/priority", bgs.handleAdminSetApiPriority)

// In order to support booting on random ports in tests, we need to tell the
// Echo instance it's already got a port, and then use its StartServer
// method to re-use that listener.
Expand Down Expand Up @@ -471,6 +491,42 @@ func (bgs *BGS) CreateAdminToken(tok string) error {
}).Error
}

func getBearerAuthorization(c echo.Context) string {
authAll := c.Request().Header.Get("Authorization")
if authAll == "" {
return ""
}
if strings.HasPrefix(authAll, "Bearer ") {
return authAll[7:]
}
return authAll
}

// middleware to check an incoming connection for API key and current API Key Priority Level
func (bgs *BGS) checkApiKey(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
auth := getBearerAuthorization(c)
keyLevel := int32(0)
if auth != "" {
known := bgs.apiKeys.Load()
if known == nil || len(known.Keys) == 0 {
// no auth configured, allow
return next(c)
}
ak, found := known.Keys[auth]
if found {
keyLevel = ak.Priority
}
}
if bgs.minApiKeyLevel.Load() > keyLevel {
// TODO: nicer message about capacity?
return c.JSONBlob(http.StatusUnauthorized, []byte("{}"))
}
return next(c)
}
}

// middleware to check /admin access key
func (bgs *BGS) checkAdminAuth(next echo.HandlerFunc) echo.HandlerFunc {
return func(e echo.Context) error {
ctx, span := tracer.Start(e.Request().Context(), "checkAdminAuth")
Expand Down Expand Up @@ -539,23 +595,66 @@ func (bgs *BGS) cleanupConsumer(id uint64) {
bgs.consumersLk.Lock()
defer bgs.consumersLk.Unlock()

c := bgs.consumers[id]
sc, found := bgs.consumers[id]
if found {
bgs.cleanupConsumerInner(id, sc)
}
}

// assumes bgs.consumersLk is held
func (bgs *BGS) cleanupConsumerInner(id uint64, sc *SocketConsumer) {
var m = &dto.Metric{}
if err := c.EventsSent.Write(m); err != nil {
if err := sc.EventsSent.Write(m); err != nil {
log.Errorf("failed to get sent counter: %s", err)
}

log.Infow("consumer disconnected",
"consumer_id", id,
"remote_addr", c.RemoteAddr,
"user_agent", c.UserAgent,
"remote_addr", sc.RemoteAddr,
"user_agent", sc.UserAgent,
"events_sent", m.Counter.GetValue())

delete(bgs.consumers, id)
}

func (bgs *BGS) priorityChangeThread() {
// TODO: add a context.Context into BGS ? listen for .Done here?
for newLevel := range bgs.minLevelEvents {
bgs.filterConsumers(newLevel)
}
}
func (bgs *BGS) filterConsumers(priority int32) {
apiKeys := bgs.apiKeys.Load()
if apiKeys == nil || len(apiKeys.Keys) == 0 {
// allow everything
return
}

bgs.consumersLk.Lock()
defer bgs.consumersLk.Unlock()

var todel []uint64
for id, sc := range bgs.consumers {
consumerPriority := int32(0)
consumerKey, found := apiKeys.Keys[sc.AuthKey]
if found {
consumerPriority = consumerKey.Priority
}
if consumerPriority < priority {
todel = append(todel, id)
}
}
for _, id := range todel {
sc := bgs.consumers[id]
sc.Cancel()
bgs.cleanupConsumerInner(id, sc)
}
}

func (bgs *BGS) EventsHandler(c echo.Context) error {
// get auth key to put into ongoing feed handler in case we need to cancel it due to traffic shedding priority change
auth := getBearerAuthorization(c)

var since *int64
if sinceVal := c.QueryParam("cursor"); sinceVal != "" {
sval, err := strconv.ParseInt(sinceVal, 10, 64)
Expand Down Expand Up @@ -643,6 +742,8 @@ func (bgs *BGS) EventsHandler(c echo.Context) error {
RemoteAddr: c.RealIP(),
UserAgent: c.Request().UserAgent(),
ConnectedAt: time.Now(),
AuthKey: auth,
Cancel: cancel,
}
sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent)
consumer.EventsSent = sentCounter
Expand Down
Loading