Skip to content

Commit

Permalink
Cleanup outdated notifications in appview, add daemon for similar tas…
Browse files Browse the repository at this point in the history
…ks (#1893)

* initial notification tidy logic

* helper for maintenance across all appview users

* tiny reorg

* add bsky daemon to tidy notifications

* tidy, add bsky daemon service entrypoint

* test notifs tidy daemon, add stats

* tidy

* crash failed notification daemon

* fix notification tidy constants
  • Loading branch information
devinivy authored Dec 2, 2023
1 parent 6d21cc1 commit cad30a7
Show file tree
Hide file tree
Showing 12 changed files with 567 additions and 1 deletion.
50 changes: 50 additions & 0 deletions packages/bsky/src/daemon/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import assert from 'assert'

export interface DaemonConfigValues {
version: string
dbPostgresUrl: string
dbPostgresSchema?: string
}

export class DaemonConfig {
constructor(private cfg: DaemonConfigValues) {}

static readEnv(overrides?: Partial<DaemonConfigValues>) {
const version = process.env.BSKY_VERSION || '0.0.0'
const dbPostgresUrl =
overrides?.dbPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL
const dbPostgresSchema =
overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA
assert(dbPostgresUrl)
return new DaemonConfig({
version,
dbPostgresUrl,
dbPostgresSchema,
...stripUndefineds(overrides ?? {}),
})
}

get version() {
return this.cfg.version
}

get dbPostgresUrl() {
return this.cfg.dbPostgresUrl
}

get dbPostgresSchema() {
return this.cfg.dbPostgresSchema
}
}

function stripUndefineds(
obj: Record<string, unknown>,
): Record<string, unknown> {
const result = {}
Object.entries(obj).forEach(([key, val]) => {
if (val !== undefined) {
result[key] = val
}
})
return result
}
27 changes: 27 additions & 0 deletions packages/bsky/src/daemon/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { PrimaryDatabase } from '../db'
import { DaemonConfig } from './config'
import { Services } from './services'

export class DaemonContext {
constructor(
private opts: {
db: PrimaryDatabase
cfg: DaemonConfig
services: Services
},
) {}

get db(): PrimaryDatabase {
return this.opts.db
}

get cfg(): DaemonConfig {
return this.opts.cfg
}

get services(): Services {
return this.opts.services
}
}

export default DaemonContext
79 changes: 79 additions & 0 deletions packages/bsky/src/daemon/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { PrimaryDatabase } from '../db'
import { dbLogger } from '../logger'
import { DaemonConfig } from './config'
import { DaemonContext } from './context'
import { createServices } from './services'
import { ImageUriBuilder } from '../image/uri'
import { LabelCache } from '../label-cache'
import { NotificationsDaemon } from './notifications'
import logger from './logger'

export { DaemonConfig } from './config'
export type { DaemonConfigValues } from './config'

export class BskyDaemon {
public ctx: DaemonContext
public notifications: NotificationsDaemon
private dbStatsInterval: NodeJS.Timer
private notifStatsInterval: NodeJS.Timer

constructor(opts: {
ctx: DaemonContext
notifications: NotificationsDaemon
}) {
this.ctx = opts.ctx
this.notifications = opts.notifications
}

static create(opts: { db: PrimaryDatabase; cfg: DaemonConfig }): BskyDaemon {
const { db, cfg } = opts
const imgUriBuilder = new ImageUriBuilder('https://daemon.invalid') // will not be used by daemon
const labelCache = new LabelCache(db)
const services = createServices({
imgUriBuilder,
labelCache,
})
const ctx = new DaemonContext({
db,
cfg,
services,
})
const notifications = new NotificationsDaemon(ctx)
return new BskyDaemon({ ctx, notifications })
}

async start() {
const { db } = this.ctx
const pool = db.pool
this.notifications.run()
this.dbStatsInterval = setInterval(() => {
dbLogger.info(
{
idleCount: pool.idleCount,
totalCount: pool.totalCount,
waitingCount: pool.waitingCount,
},
'db pool stats',
)
}, 10000)
this.notifStatsInterval = setInterval(() => {
logger.info(
{
count: this.notifications.count,
lastDid: this.notifications.lastDid,
},
'notifications daemon stats',
)
}, 10000)
return this
}

async destroy(): Promise<void> {
await this.notifications.destroy()
await this.ctx.db.close()
clearInterval(this.dbStatsInterval)
clearInterval(this.notifStatsInterval)
}
}

export default BskyDaemon
6 changes: 6 additions & 0 deletions packages/bsky/src/daemon/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { subsystemLogger } from '@atproto/common'

const logger: ReturnType<typeof subsystemLogger> =
subsystemLogger('bsky:daemon')

export default logger
50 changes: 50 additions & 0 deletions packages/bsky/src/daemon/notifications.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { tidyNotifications } from '../services/util/notification'
import DaemonContext from './context'
import logger from './logger'

export class NotificationsDaemon {
ac = new AbortController()
running: Promise<void> | undefined
count = 0
lastDid: string | null = null

constructor(private ctx: DaemonContext) {}

run(opts?: RunOptions) {
if (this.running) return
this.count = 0
this.lastDid = null
this.ac = new AbortController()
this.running = this.tidyNotifications({
...opts,
forever: opts?.forever !== false, // run forever by default
})
.catch((err) => {
// allow this to cause an unhandled rejection, let deployment handle the crash.
logger.error({ err }, 'notifications daemon crashed')
throw err
})
.finally(() => (this.running = undefined))
}

private async tidyNotifications(opts: RunOptions) {
const actorService = this.ctx.services.actor(this.ctx.db)
for await (const { did } of actorService.all(opts)) {
if (this.ac.signal.aborted) return
try {
await tidyNotifications(this.ctx.db, did)
this.count++
this.lastDid = did
} catch (err) {
logger.warn({ err, did }, 'failed to tidy notifications for actor')
}
}
}

async destroy() {
this.ac.abort()
await this.running
}
}

type RunOptions = { forever?: boolean; batchSize?: number }
20 changes: 20 additions & 0 deletions packages/bsky/src/daemon/services.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { PrimaryDatabase } from '../db'
import { ActorService } from '../services/actor'
import { ImageUriBuilder } from '../image/uri'
import { LabelCache } from '../label-cache'

export function createServices(resources: {
imgUriBuilder: ImageUriBuilder
labelCache: LabelCache
}): Services {
const { imgUriBuilder, labelCache } = resources
return {
actor: ActorService.creator(imgUriBuilder, labelCache),
}
}

export type Services = {
actor: FromDbPrimary<ActorService>
}

type FromDbPrimary<T> = (db: PrimaryDatabase) => T
1 change: 1 addition & 0 deletions packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export { Redis } from './redis'
export { ViewMaintainer } from './db/views'
export { AppContext } from './context'
export { makeAlgos } from './feed-gen'
export * from './daemon'
export * from './indexer'
export * from './ingester'
export { MigrateModerationData } from './migrate-moderation-data'
Expand Down
29 changes: 29 additions & 0 deletions packages/bsky/src/services/actor/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { sql } from 'kysely'
import { wait } from '@atproto/common'
import { Database } from '../../db'
import { notSoftDeletedClause } from '../../db/util'
import { ActorViews } from './views'
Expand Down Expand Up @@ -144,6 +145,34 @@ export class ActorService {
.executeTakeFirst()
return res?.repoRev ?? null
}

async *all(
opts: { batchSize?: number; forever?: boolean; cooldownMs?: number } = {},
) {
const { cooldownMs = 1000, batchSize = 1000, forever = false } = opts
const baseQuery = this.db.db
.selectFrom('actor')
.selectAll()
.orderBy('did')
.limit(batchSize)
while (true) {
let cursor: ActorResult | undefined
do {
const actors = cursor
? await baseQuery.where('did', '>', cursor.did).execute()
: await baseQuery.execute()
for (const actor of actors) {
yield actor
}
cursor = actors.at(-1)
} while (cursor)
if (forever) {
await wait(cooldownMs)
} else {
return
}
}
}
}

type ActorResult = Actor
Expand Down
70 changes: 70 additions & 0 deletions packages/bsky/src/services/util/notification.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { sql } from 'kysely'
import { countAll } from '../../db/util'
import { PrimaryDatabase } from '../../db'

// i.e. 30 days before the last time the user checked their notifs
export const BEFORE_LAST_SEEN_DAYS = 30
// i.e. 180 days before the latest unread notification
export const BEFORE_LATEST_UNREAD_DAYS = 180
// don't consider culling unreads until they hit this threshold, and then enforce beforeLatestUnreadThresholdDays
export const UNREAD_KEPT_COUNT = 500

export const tidyNotifications = async (db: PrimaryDatabase, did: string) => {
const stats = await db.db
.selectFrom('notification')
.select([
sql<0 | 1>`("sortAt" < "lastSeenNotifs")`.as('read'),
countAll.as('count'),
sql<string>`min("sortAt")`.as('earliestAt'),
sql<string>`max("sortAt")`.as('latestAt'),
sql<string>`max("lastSeenNotifs")`.as('lastSeenAt'),
])
.leftJoin('actor_state', 'actor_state.did', 'notification.did')
.where('notification.did', '=', did)
.groupBy(sql`1`) // group by read (i.e. 1st column)
.execute()
const readStats = stats.find((stat) => stat.read)
const unreadStats = stats.find((stat) => !stat.read)
let readCutoffAt: Date | undefined
let unreadCutoffAt: Date | undefined
if (readStats) {
readCutoffAt = addDays(
new Date(readStats.lastSeenAt),
-BEFORE_LAST_SEEN_DAYS,
)
}
if (unreadStats && unreadStats.count > UNREAD_KEPT_COUNT) {
unreadCutoffAt = addDays(
new Date(unreadStats.latestAt),
-BEFORE_LATEST_UNREAD_DAYS,
)
}
// take most recent of read/unread cutoffs
const cutoffAt = greatest(readCutoffAt, unreadCutoffAt)
if (cutoffAt) {
// skip delete if it wont catch any notifications
const earliestAt = least(readStats?.earliestAt, unreadStats?.earliestAt)
if (earliestAt && earliestAt < cutoffAt.toISOString()) {
await db.db
.deleteFrom('notification')
.where('did', '=', did)
.where('sortAt', '<', cutoffAt.toISOString())
.execute()
}
}
}

const addDays = (date: Date, days: number) => {
date.setDate(date.getDate() + days)
return date
}

const least = <T extends Ordered>(a: T | undefined, b: T | undefined) => {
return a !== undefined && (b === undefined || a < b) ? a : b
}

const greatest = <T extends Ordered>(a: T | undefined, b: T | undefined) => {
return a !== undefined && (b === undefined || a > b) ? a : b
}

type Ordered = string | number | Date
Loading

0 comments on commit cad30a7

Please sign in to comment.