Skip to content

Commit

Permalink
Merge branch 'signup-queueing-take2' into signup-queue-notify
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Jan 31, 2024
2 parents e908d8a + c1bbd68 commit a8959b3
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-push-pds-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
- signup-queueing-take2
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}
Expand Down
1 change: 1 addition & 0 deletions packages/dev-env/src/pds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class TestPds {
const secrets = pds.envToSecrets(env)

const server = await pds.PDS.create(cfg, secrets)
await server.ctx.signupActivator.destroy()

// Separate migration db on postgres in case migration changes some
// connection state that we need in the tests, e.g. "alter database ... set ..."
Expand Down
2 changes: 1 addition & 1 deletion packages/pds/src/api/com/atproto/server/createAccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export default function (server: Server, ctx: AppContext) {
points: 100,
},
handler: async ({ input, req }) => {
const hasAvailability = ctx.signupLimiter.hasAvailability()
const hasAvailability = await ctx.signupLimiter.hasAvailability()

const {
did,
Expand Down
17 changes: 8 additions & 9 deletions packages/pds/src/api/com/atproto/temp/checkSignupQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ export default function (server: Server, ctx: AppContext) {
placeInQueue = res?.count
}

const limiterStatus = ctx.signupLimiter.status
const limiter = ctx.signupLimiter
let estimatedTimeMs: number | undefined
if (
placeInQueue &&
!limiterStatus.disableSignups &&
limiterStatus.accountsInPeriod > 0
) {
estimatedTimeMs =
(placeInQueue * limiterStatus.periodMs) /
limiterStatus.accountsInPeriod
if (placeInQueue && !limiter.flags.disableSignups) {
const accountsInPeriod = await limiter.accountsInPeriod()
if (accountsInPeriod > 0) {
estimatedTimeMs = Math.ceil(
(placeInQueue * limiter.flags.periodMs) / accountsInPeriod,
)
}
}

return {
Expand Down
2 changes: 2 additions & 0 deletions packages/pds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class PDS {
await this.ctx.db.startListeningToChannels()
await this.ctx.runtimeFlags.start()
await this.ctx.signupLimiter.start()
this.ctx.signupActivator.run()
const server = this.app.listen(this.ctx.cfg.service.port)
this.server = server
this.server.keepAliveTimeout = 90000
Expand All @@ -175,6 +176,7 @@ export class PDS {
async destroy(): Promise<void> {
await this.ctx.runtimeFlags.destroy()
await this.ctx.signupLimiter.destroy()
await this.ctx.signupActivator.run()
await this.ctx.sequencerLeader?.destroy()
await this.terminator?.terminate()
await this.ctx.backgroundQueue.destroy()
Expand Down
4 changes: 3 additions & 1 deletion packages/pds/src/signup-queue/activator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ export class SignupActivator {

async activateBatch() {
const status = await getQueueStatus(this.db)
if (status.disableSignups) return
const toAdmit = status.periodAllowance - status.accountsInPeriod
log.info({ ...status, toAdmit }, 'activating accounts')

if (status.disableSignups) return
if (toAdmit < 1) return

const activatedAt = new Date().toISOString()
Expand Down
19 changes: 12 additions & 7 deletions packages/pds/src/signup-queue/limiter.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import { SECOND } from '@atproto/common'
import { limiterLogger as log } from '../logger'
import Database from '../db'
import { LimiterStatus, getQueueStatus } from './util'
import { LimiterFlags, getAccountsInPeriod, getQueueStatus } from './util'

export class SignupLimiter {
destroyed = false
promise: Promise<void> = Promise.resolve()
timer: NodeJS.Timer | undefined
status: LimiterStatus
flags: LimiterFlags

constructor(private db: Database) {}

hasAvailability(): boolean {
if (this.status.disableSignups) return false
return this.status.accountsInPeriod < this.status.periodAllowance
async hasAvailability(): Promise<boolean> {
if (this.flags.disableSignups) return false
const accountsInPeriod = await this.accountsInPeriod()
return accountsInPeriod < this.flags.periodAllowance
}

async start() {
Expand All @@ -38,9 +39,13 @@ export class SignupLimiter {
await this.promise
}

async accountsInPeriod(): Promise<number> {
return getAccountsInPeriod(this.db, this.flags.periodMs)
}

async refresh() {
this.status = await getQueueStatus(this.db)
this.flags = await getQueueStatus(this.db)

log.info({ ...this.status }, 'limiter refresh')
log.info({ ...this.flags }, 'limiter refresh')
}
}

0 comments on commit a8959b3

Please sign in to comment.