diff --git a/packages/bsky/src/daemon/config.ts b/packages/bsky/src/daemon/config.ts new file mode 100644 index 00000000000..e0e789203e4 --- /dev/null +++ b/packages/bsky/src/daemon/config.ts @@ -0,0 +1,50 @@ +import assert from 'assert' + +export interface DaemonConfigValues { + version: string + dbPostgresUrl: string + dbPostgresSchema?: string +} + +export class DaemonConfig { + constructor(private cfg: DaemonConfigValues) {} + + static readEnv(overrides?: Partial) { + const version = process.env.BSKY_VERSION || '0.0.0' + const dbPostgresUrl = + overrides?.dbPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL + const dbPostgresSchema = + overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA + assert(dbPostgresUrl) + return new DaemonConfig({ + version, + dbPostgresUrl, + dbPostgresSchema, + ...stripUndefineds(overrides ?? {}), + }) + } + + get version() { + return this.cfg.version + } + + get dbPostgresUrl() { + return this.cfg.dbPostgresUrl + } + + get dbPostgresSchema() { + return this.cfg.dbPostgresSchema + } +} + +function stripUndefineds( + obj: Record, +): Record { + const result = {} + Object.entries(obj).forEach(([key, val]) => { + if (val !== undefined) { + result[key] = val + } + }) + return result +} diff --git a/packages/bsky/src/daemon/context.ts b/packages/bsky/src/daemon/context.ts new file mode 100644 index 00000000000..dd3d5c1114f --- /dev/null +++ b/packages/bsky/src/daemon/context.ts @@ -0,0 +1,27 @@ +import { PrimaryDatabase } from '../db' +import { DaemonConfig } from './config' +import { Services } from './services' + +export class DaemonContext { + constructor( + private opts: { + db: PrimaryDatabase + cfg: DaemonConfig + services: Services + }, + ) {} + + get db(): PrimaryDatabase { + return this.opts.db + } + + get cfg(): DaemonConfig { + return this.opts.cfg + } + + get services(): Services { + return this.opts.services + } +} + +export default DaemonContext diff --git a/packages/bsky/src/daemon/index.ts b/packages/bsky/src/daemon/index.ts new file mode 100644 index 00000000000..61bcd8568f4 --- /dev/null +++ b/packages/bsky/src/daemon/index.ts @@ -0,0 +1,79 @@ +import { PrimaryDatabase } from '../db' +import { dbLogger } from '../logger' +import { DaemonConfig } from './config' +import { DaemonContext } from './context' +import { createServices } from './services' +import { ImageUriBuilder } from '../image/uri' +import { LabelCache } from '../label-cache' +import { NotificationsDaemon } from './notifications' +import logger from './logger' + +export { DaemonConfig } from './config' +export type { DaemonConfigValues } from './config' + +export class BskyDaemon { + public ctx: DaemonContext + public notifications: NotificationsDaemon + private dbStatsInterval: NodeJS.Timer + private notifStatsInterval: NodeJS.Timer + + constructor(opts: { + ctx: DaemonContext + notifications: NotificationsDaemon + }) { + this.ctx = opts.ctx + this.notifications = opts.notifications + } + + static create(opts: { db: PrimaryDatabase; cfg: DaemonConfig }): BskyDaemon { + const { db, cfg } = opts + const imgUriBuilder = new ImageUriBuilder('https://daemon.invalid') // will not be used by daemon + const labelCache = new LabelCache(db) + const services = createServices({ + imgUriBuilder, + labelCache, + }) + const ctx = new DaemonContext({ + db, + cfg, + services, + }) + const notifications = new NotificationsDaemon(ctx) + return new BskyDaemon({ ctx, notifications }) + } + + async start() { + const { db } = this.ctx + const pool = db.pool + this.notifications.run() + this.dbStatsInterval = setInterval(() => { + dbLogger.info( + { + idleCount: pool.idleCount, + totalCount: pool.totalCount, + waitingCount: pool.waitingCount, + }, + 'db pool stats', + ) + }, 10000) + this.notifStatsInterval = setInterval(() => { + logger.info( + { + count: this.notifications.count, + lastDid: this.notifications.lastDid, + }, + 'notifications daemon stats', + ) + }, 10000) + return this + } + + async destroy(): Promise { + await this.notifications.destroy() + await this.ctx.db.close() + clearInterval(this.dbStatsInterval) + clearInterval(this.notifStatsInterval) + } +} + +export default BskyDaemon diff --git a/packages/bsky/src/daemon/logger.ts b/packages/bsky/src/daemon/logger.ts new file mode 100644 index 00000000000..8599acc315e --- /dev/null +++ b/packages/bsky/src/daemon/logger.ts @@ -0,0 +1,6 @@ +import { subsystemLogger } from '@atproto/common' + +const logger: ReturnType = + subsystemLogger('bsky:daemon') + +export default logger diff --git a/packages/bsky/src/daemon/notifications.ts b/packages/bsky/src/daemon/notifications.ts new file mode 100644 index 00000000000..e8e884b37c2 --- /dev/null +++ b/packages/bsky/src/daemon/notifications.ts @@ -0,0 +1,50 @@ +import { tidyNotifications } from '../services/util/notification' +import DaemonContext from './context' +import logger from './logger' + +export class NotificationsDaemon { + ac = new AbortController() + running: Promise | undefined + count = 0 + lastDid: string | null = null + + constructor(private ctx: DaemonContext) {} + + run(opts?: RunOptions) { + if (this.running) return + this.count = 0 + this.lastDid = null + this.ac = new AbortController() + this.running = this.tidyNotifications({ + ...opts, + forever: opts?.forever !== false, // run forever by default + }) + .catch((err) => { + // allow this to cause an unhandled rejection, let deployment handle the crash. + logger.error({ err }, 'notifications daemon crashed') + throw err + }) + .finally(() => (this.running = undefined)) + } + + private async tidyNotifications(opts: RunOptions) { + const actorService = this.ctx.services.actor(this.ctx.db) + for await (const { did } of actorService.all(opts)) { + if (this.ac.signal.aborted) return + try { + await tidyNotifications(this.ctx.db, did) + this.count++ + this.lastDid = did + } catch (err) { + logger.warn({ err, did }, 'failed to tidy notifications for actor') + } + } + } + + async destroy() { + this.ac.abort() + await this.running + } +} + +type RunOptions = { forever?: boolean; batchSize?: number } diff --git a/packages/bsky/src/daemon/services.ts b/packages/bsky/src/daemon/services.ts new file mode 100644 index 00000000000..a4e7935523c --- /dev/null +++ b/packages/bsky/src/daemon/services.ts @@ -0,0 +1,20 @@ +import { PrimaryDatabase } from '../db' +import { ActorService } from '../services/actor' +import { ImageUriBuilder } from '../image/uri' +import { LabelCache } from '../label-cache' + +export function createServices(resources: { + imgUriBuilder: ImageUriBuilder + labelCache: LabelCache +}): Services { + const { imgUriBuilder, labelCache } = resources + return { + actor: ActorService.creator(imgUriBuilder, labelCache), + } +} + +export type Services = { + actor: FromDbPrimary +} + +type FromDbPrimary = (db: PrimaryDatabase) => T diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index 9e0075dce37..7ceba61f990 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -37,6 +37,7 @@ export { Redis } from './redis' export { ViewMaintainer } from './db/views' export { AppContext } from './context' export { makeAlgos } from './feed-gen' +export * from './daemon' export * from './indexer' export * from './ingester' export { MigrateModerationData } from './migrate-moderation-data' diff --git a/packages/bsky/src/services/actor/index.ts b/packages/bsky/src/services/actor/index.ts index a2f980ce71d..51be90892fc 100644 --- a/packages/bsky/src/services/actor/index.ts +++ b/packages/bsky/src/services/actor/index.ts @@ -1,4 +1,5 @@ import { sql } from 'kysely' +import { wait } from '@atproto/common' import { Database } from '../../db' import { notSoftDeletedClause } from '../../db/util' import { ActorViews } from './views' @@ -144,6 +145,34 @@ export class ActorService { .executeTakeFirst() return res?.repoRev ?? null } + + async *all( + opts: { batchSize?: number; forever?: boolean; cooldownMs?: number } = {}, + ) { + const { cooldownMs = 1000, batchSize = 1000, forever = false } = opts + const baseQuery = this.db.db + .selectFrom('actor') + .selectAll() + .orderBy('did') + .limit(batchSize) + while (true) { + let cursor: ActorResult | undefined + do { + const actors = cursor + ? await baseQuery.where('did', '>', cursor.did).execute() + : await baseQuery.execute() + for (const actor of actors) { + yield actor + } + cursor = actors.at(-1) + } while (cursor) + if (forever) { + await wait(cooldownMs) + } else { + return + } + } + } } type ActorResult = Actor diff --git a/packages/bsky/src/services/util/notification.ts b/packages/bsky/src/services/util/notification.ts new file mode 100644 index 00000000000..811e6e41713 --- /dev/null +++ b/packages/bsky/src/services/util/notification.ts @@ -0,0 +1,70 @@ +import { sql } from 'kysely' +import { countAll } from '../../db/util' +import { PrimaryDatabase } from '../../db' + +// i.e. 30 days before the last time the user checked their notifs +export const BEFORE_LAST_SEEN_DAYS = 30 +// i.e. 180 days before the latest unread notification +export const BEFORE_LATEST_UNREAD_DAYS = 180 +// don't consider culling unreads until they hit this threshold, and then enforce beforeLatestUnreadThresholdDays +export const UNREAD_KEPT_COUNT = 500 + +export const tidyNotifications = async (db: PrimaryDatabase, did: string) => { + const stats = await db.db + .selectFrom('notification') + .select([ + sql<0 | 1>`("sortAt" < "lastSeenNotifs")`.as('read'), + countAll.as('count'), + sql`min("sortAt")`.as('earliestAt'), + sql`max("sortAt")`.as('latestAt'), + sql`max("lastSeenNotifs")`.as('lastSeenAt'), + ]) + .leftJoin('actor_state', 'actor_state.did', 'notification.did') + .where('notification.did', '=', did) + .groupBy(sql`1`) // group by read (i.e. 1st column) + .execute() + const readStats = stats.find((stat) => stat.read) + const unreadStats = stats.find((stat) => !stat.read) + let readCutoffAt: Date | undefined + let unreadCutoffAt: Date | undefined + if (readStats) { + readCutoffAt = addDays( + new Date(readStats.lastSeenAt), + -BEFORE_LAST_SEEN_DAYS, + ) + } + if (unreadStats && unreadStats.count > UNREAD_KEPT_COUNT) { + unreadCutoffAt = addDays( + new Date(unreadStats.latestAt), + -BEFORE_LATEST_UNREAD_DAYS, + ) + } + // take most recent of read/unread cutoffs + const cutoffAt = greatest(readCutoffAt, unreadCutoffAt) + if (cutoffAt) { + // skip delete if it wont catch any notifications + const earliestAt = least(readStats?.earliestAt, unreadStats?.earliestAt) + if (earliestAt && earliestAt < cutoffAt.toISOString()) { + await db.db + .deleteFrom('notification') + .where('did', '=', did) + .where('sortAt', '<', cutoffAt.toISOString()) + .execute() + } + } +} + +const addDays = (date: Date, days: number) => { + date.setDate(date.getDate() + days) + return date +} + +const least = (a: T | undefined, b: T | undefined) => { + return a !== undefined && (b === undefined || a < b) ? a : b +} + +const greatest = (a: T | undefined, b: T | undefined) => { + return a !== undefined && (b === undefined || a > b) ? a : b +} + +type Ordered = string | number | Date diff --git a/packages/bsky/tests/daemon.test.ts b/packages/bsky/tests/daemon.test.ts new file mode 100644 index 00000000000..32f0d6617ab --- /dev/null +++ b/packages/bsky/tests/daemon.test.ts @@ -0,0 +1,191 @@ +import assert from 'assert' +import { AtUri } from '@atproto/api' +import { TestNetwork } from '@atproto/dev-env' +import { BskyDaemon, DaemonConfig, PrimaryDatabase } from '../src' +import usersSeed from './seeds/users' +import { countAll, excluded } from '../src/db/util' +import { NotificationsDaemon } from '../src/daemon/notifications' +import { + BEFORE_LAST_SEEN_DAYS, + BEFORE_LATEST_UNREAD_DAYS, + UNREAD_KEPT_COUNT, +} from '../src/services/util/notification' + +describe('daemon', () => { + let network: TestNetwork + let daemon: BskyDaemon + let db: PrimaryDatabase + let actors: { did: string }[] = [] + + beforeAll(async () => { + network = await TestNetwork.create({ + dbPostgresSchema: 'bsky_daemon', + }) + db = network.bsky.ctx.db.getPrimary() + daemon = BskyDaemon.create({ + db, + cfg: new DaemonConfig({ + version: network.bsky.ctx.cfg.version, + dbPostgresUrl: network.bsky.ctx.cfg.dbPrimaryPostgresUrl, + dbPostgresSchema: network.bsky.ctx.cfg.dbPostgresSchema, + }), + }) + const sc = network.getSeedClient() + await usersSeed(sc) + await network.processAll() + actors = await db.db.selectFrom('actor').selectAll().execute() + }) + + afterAll(async () => { + await network.close() + }) + + describe('notifications daemon', () => { + it('processes all dids', async () => { + for (const { did } of actors) { + await Promise.all([ + setLastSeen(daemon.ctx.db, { did }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: 2 * BEFORE_LAST_SEEN_DAYS, + count: 1, + }), + ]) + } + await expect(countNotifications(db)).resolves.toBe(actors.length) + await runNotifsOnce(daemon.notifications) + await expect(countNotifications(db)).resolves.toBe(0) + }) + + it('removes read notifications older than threshold.', async () => { + const { did } = actors[0] + const lastSeenDaysAgo = 10 + await Promise.all([ + setLastSeen(daemon.ctx.db, { did, daysAgo: lastSeenDaysAgo }), + // read, delete + createNotifications(daemon.ctx.db, { + did, + daysAgo: lastSeenDaysAgo + BEFORE_LAST_SEEN_DAYS + 1, + count: 2, + }), + // read, keep + createNotifications(daemon.ctx.db, { + did, + daysAgo: lastSeenDaysAgo + BEFORE_LAST_SEEN_DAYS - 1, + count: 3, + }), + // unread, keep + createNotifications(daemon.ctx.db, { + did, + daysAgo: lastSeenDaysAgo - 1, + count: 4, + }), + ]) + await expect(countNotifications(db)).resolves.toBe(9) + await runNotifsOnce(daemon.notifications) + await expect(countNotifications(db)).resolves.toBe(7) + await clearNotifications(db) + }) + + it('removes unread notifications older than threshold.', async () => { + const { did } = actors[0] + await Promise.all([ + setLastSeen(daemon.ctx.db, { + did, + daysAgo: 2 * BEFORE_LATEST_UNREAD_DAYS, // all are unread + }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: 0, + count: 1, + }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: BEFORE_LATEST_UNREAD_DAYS - 1, + count: 99, + }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: BEFORE_LATEST_UNREAD_DAYS + 1, + count: 400, + }), + ]) + await expect(countNotifications(db)).resolves.toBe(UNREAD_KEPT_COUNT) + await runNotifsOnce(daemon.notifications) + // none removed when within UNREAD_KEPT_COUNT + await expect(countNotifications(db)).resolves.toBe(UNREAD_KEPT_COUNT) + // add one more, tip over UNREAD_KEPT_COUNT + await createNotifications(daemon.ctx.db, { + did, + daysAgo: BEFORE_LATEST_UNREAD_DAYS + 1, + count: 1, + }) + await runNotifsOnce(daemon.notifications) + // removed all older than BEFORE_LATEST_UNREAD_DAYS + await expect(countNotifications(db)).resolves.toBe(100) + await clearNotifications(db) + }) + }) + + const runNotifsOnce = async (notifsDaemon: NotificationsDaemon) => { + assert(!notifsDaemon.running, 'notifications daemon is already running') + notifsDaemon.run({ forever: false, batchSize: 2 }) + await notifsDaemon.running + } + + const setLastSeen = async ( + db: PrimaryDatabase, + opts: { did: string; daysAgo?: number }, + ) => { + const { did, daysAgo = 0 } = opts + const lastSeenAt = new Date() + lastSeenAt.setDate(lastSeenAt.getDate() - daysAgo) + await db.db + .insertInto('actor_state') + .values({ did, lastSeenNotifs: lastSeenAt.toISOString() }) + .onConflict((oc) => + oc.column('did').doUpdateSet({ + lastSeenNotifs: excluded(db.db, 'lastSeenNotifs'), + }), + ) + .execute() + } + + const createNotifications = async ( + db: PrimaryDatabase, + opts: { + did: string + count: number + daysAgo: number + }, + ) => { + const { did, count, daysAgo } = opts + const sortAt = new Date() + sortAt.setDate(sortAt.getDate() - daysAgo) + await db.db + .insertInto('notification') + .values( + [...Array(count)].map(() => ({ + did, + author: did, + reason: 'none', + recordCid: 'bafycid', + recordUri: AtUri.make(did, 'invalid.collection', 'self').toString(), + sortAt: sortAt.toISOString(), + })), + ) + .execute() + } + + const clearNotifications = async (db: PrimaryDatabase) => { + await db.db.deleteFrom('notification').execute() + } + + const countNotifications = async (db: PrimaryDatabase) => { + const { count } = await db.db + .selectFrom('notification') + .select(countAll.as('count')) + .executeTakeFirstOrThrow() + return count + } +}) diff --git a/packages/bsky/tests/views/profile.test.ts b/packages/bsky/tests/views/profile.test.ts index d4e0c718bed..726fb990a0d 100644 --- a/packages/bsky/tests/views/profile.test.ts +++ b/packages/bsky/tests/views/profile.test.ts @@ -25,7 +25,6 @@ describe('pds profile views', () => { sc = network.getSeedClient() await basicSeed(sc) await network.processAll() - await network.bsky.processAll() alice = sc.dids.alice bob = sc.dids.bob dan = sc.dids.dan diff --git a/services/bsky/daemon.js b/services/bsky/daemon.js new file mode 100644 index 00000000000..bd8322ab58f --- /dev/null +++ b/services/bsky/daemon.js @@ -0,0 +1,44 @@ +'use strict' /* eslint-disable */ + +require('dd-trace/init') // Only works with commonjs + +// Tracer code above must come before anything else +const { PrimaryDatabase, DaemonConfig, BskyDaemon } = require('@atproto/bsky') + +const main = async () => { + const env = getEnv() + const db = new PrimaryDatabase({ + url: env.dbPostgresUrl, + schema: env.dbPostgresSchema, + poolSize: env.dbPoolSize, + poolMaxUses: env.dbPoolMaxUses, + poolIdleTimeoutMs: env.dbPoolIdleTimeoutMs, + }) + const cfg = DaemonConfig.readEnv({ + version: env.version, + dbPostgresUrl: env.dbPostgresUrl, + dbPostgresSchema: env.dbPostgresSchema, + }) + const daemon = BskyDaemon.create({ db, cfg }) + await daemon.start() + process.on('SIGTERM', async () => { + await daemon.destroy() + }) +} + +const getEnv = () => ({ + version: process.env.BSKY_VERSION, + dbPostgresUrl: + process.env.DB_PRIMARY_POSTGRES_URL || process.env.DB_POSTGRES_URL, + dbPostgresSchema: process.env.DB_POSTGRES_SCHEMA || undefined, + dbPoolSize: maybeParseInt(process.env.DB_POOL_SIZE), + dbPoolMaxUses: maybeParseInt(process.env.DB_POOL_MAX_USES), + dbPoolIdleTimeoutMs: maybeParseInt(process.env.DB_POOL_IDLE_TIMEOUT_MS), +}) + +const maybeParseInt = (str) => { + const parsed = parseInt(str) + return isNaN(parsed) ? undefined : parsed +} + +main()