From b8af64899fe7c8df42e21014b79834fcf2dbfe63 Mon Sep 17 00:00:00 2001 From: dholms Date: Tue, 19 Dec 2023 13:36:00 -0600 Subject: [PATCH] tidy & fix compiler errors --- .../com/atproto/admin/emitModerationEvent.ts | 3 +- .../src/api/com/atproto/admin/searchRepos.ts | 34 +- .../com/atproto/moderation/createReport.ts | 19 +- .../src/api/com/atproto/moderation/util.ts | 2 +- .../src/api/com/atproto/temp/fetchLabels.ts | 3 +- packages/mod-service/src/api/health.ts | 3 +- packages/mod-service/src/background.ts | 6 +- packages/mod-service/src/context.ts | 35 -- packages/mod-service/src/daemon/config.ts | 60 --- packages/mod-service/src/daemon/context.ts | 27 -- packages/mod-service/src/daemon/index.ts | 78 ---- packages/mod-service/src/daemon/logger.ts | 6 - .../mod-service/src/daemon/notifications.ts | 54 --- packages/mod-service/src/daemon/services.ts | 22 - packages/mod-service/src/db/leader.ts | 4 +- .../db/periodic-moderation-event-reversal.ts | 9 +- packages/mod-service/src/db/util.ts | 2 +- packages/mod-service/src/db/views.ts | 50 --- packages/mod-service/src/did-cache.ts | 87 ---- packages/mod-service/src/index.ts | 198 ++------- .../src/migrate-moderation-data.ts | 414 ------------------ packages/mod-service/src/notifications.ts | 382 ---------------- packages/mod-service/src/pipeline.ts | 22 - packages/mod-service/src/redis.ts | 205 --------- .../src/services/moderation/index.ts | 51 +++ .../src/services/moderation/views.ts | 23 +- packages/mod-service/src/services/types.ts | 3 +- .../src/services/util/notification.ts | 70 --- .../mod-service/src/services/util/post.ts | 65 --- .../mod-service/src/services/util/search.ts | 172 -------- 30 files changed, 157 insertions(+), 1952 deletions(-) delete mode 100644 packages/mod-service/src/daemon/config.ts delete mode 100644 packages/mod-service/src/daemon/context.ts delete mode 100644 packages/mod-service/src/daemon/index.ts delete mode 100644 packages/mod-service/src/daemon/logger.ts delete mode 100644 packages/mod-service/src/daemon/notifications.ts delete mode 100644 packages/mod-service/src/daemon/services.ts delete mode 100644 packages/mod-service/src/db/views.ts delete mode 100644 packages/mod-service/src/did-cache.ts delete mode 100644 packages/mod-service/src/migrate-moderation-data.ts delete mode 100644 packages/mod-service/src/notifications.ts delete mode 100644 packages/mod-service/src/pipeline.ts delete mode 100644 packages/mod-service/src/redis.ts delete mode 100644 packages/mod-service/src/services/util/notification.ts delete mode 100644 packages/mod-service/src/services/util/post.ts delete mode 100644 packages/mod-service/src/services/util/search.ts diff --git a/packages/mod-service/src/api/com/atproto/admin/emitModerationEvent.ts b/packages/mod-service/src/api/com/atproto/admin/emitModerationEvent.ts index 41d32f167bc..cb149d1e5ba 100644 --- a/packages/mod-service/src/api/com/atproto/admin/emitModerationEvent.ts +++ b/packages/mod-service/src/api/com/atproto/admin/emitModerationEvent.ts @@ -73,7 +73,6 @@ export default function (server: Server, ctx: AppContext) { const { result: moderationEvent, takenDown } = await db.transaction( async (dbTxn) => { const moderationTxn = ctx.services.moderation(dbTxn) - const labelTxn = ctx.services.label(dbTxn) const result = await moderationTxn.logEvent({ event, @@ -163,7 +162,7 @@ export default function (server: Server, ctx: AppContext) { } if (isLabelEvent) { - await labelTxn.formatAndCreate( + await moderationTxn.formatAndCreateLabels( ctx.cfg.labelerDid, result.subjectUri ?? result.subjectDid, result.subjectCid, diff --git a/packages/mod-service/src/api/com/atproto/admin/searchRepos.ts b/packages/mod-service/src/api/com/atproto/admin/searchRepos.ts index a1680ec5fdc..d6255f3c9cb 100644 --- a/packages/mod-service/src/api/com/atproto/admin/searchRepos.ts +++ b/packages/mod-service/src/api/com/atproto/admin/searchRepos.ts @@ -1,27 +1,27 @@ import { Server } from '../../../../lexicon' import AppContext from '../../../../context' +import { InvalidRequestError } from '@atproto/xrpc-server' export default function (server: Server, ctx: AppContext) { server.com.atproto.admin.searchRepos({ auth: ctx.roleVerifier, handler: async ({ params }) => { - const db = ctx.db - const moderationService = ctx.services.moderation(db) - const { limit, cursor } = params - // prefer new 'q' query param over deprecated 'term' - const query = params.q ?? params.term - - const { results, cursor: resCursor } = await ctx.services - .actor(db) - .getSearchResults({ query, limit, cursor, includeSoftDeleted: true }) - - return { - encoding: 'application/json', - body: { - cursor: resCursor, - repos: await moderationService.views.repo(results), - }, - } + throw new InvalidRequestError('@TODO') + // const db = ctx.db + // const moderationService = ctx.services.moderation(db) + // const { limit, cursor } = params + // // prefer new 'q' query param over deprecated 'term' + // const query = params.q ?? params.term + // const { results, cursor: resCursor } = await ctx.services + // .actor(db) + // .getSearchResults({ query, limit, cursor, includeSoftDeleted: true }) + // return { + // encoding: 'application/json', + // body: { + // cursor: resCursor, + // repos: await moderationService.views.repo(results), + // }, + // } }, }) } diff --git a/packages/mod-service/src/api/com/atproto/moderation/createReport.ts b/packages/mod-service/src/api/com/atproto/moderation/createReport.ts index b247a319527..7c22cd40367 100644 --- a/packages/mod-service/src/api/com/atproto/moderation/createReport.ts +++ b/packages/mod-service/src/api/com/atproto/moderation/createReport.ts @@ -12,15 +12,16 @@ export default function (server: Server, ctx: AppContext) { const { reasonType, reason, subject } = input.body const requester = auth.credentials.did - const db = ctx.db.getPrimary() + const db = ctx.db - if (requester) { - // Don't accept reports from users that are fully taken-down - const actor = await ctx.services.actor(db).getActor(requester, true) - if (actor && softDeleted(actor)) { - throw new AuthRequiredError() - } - } + // @TODO + // if (requester) { + // // Don't accept reports from users that are fully taken-down + // const actor = await ctx.services.actor(db).getActor(requester, true) + // if (actor && softDeleted(actor)) { + // throw new AuthRequiredError() + // } + // } const report = await db.transaction(async (dbTxn) => { const moderationTxn = ctx.services.moderation(dbTxn) @@ -35,7 +36,7 @@ export default function (server: Server, ctx: AppContext) { const moderationService = ctx.services.moderation(db) return { encoding: 'application/json', - body: moderationService.views.reportPublic(report), + body: moderationService.views.formatReport(report), } }, }) diff --git a/packages/mod-service/src/api/com/atproto/moderation/util.ts b/packages/mod-service/src/api/com/atproto/moderation/util.ts index bc0ece2ff9f..d757b359787 100644 --- a/packages/mod-service/src/api/com/atproto/moderation/util.ts +++ b/packages/mod-service/src/api/com/atproto/moderation/util.ts @@ -16,7 +16,7 @@ import { REVIEWESCALATED, REVIEWOPEN, } from '../../../../lexicon/types/com/atproto/admin/defs' -import { ModerationEvent } from '../../../../db/tables/moderation' +import { ModerationEvent } from '../../../../db/schema/moderation_event' import { ModerationSubjectStatusRow } from '../../../../services/moderation/types' type SubjectInput = ReportInput['subject'] | ActionInput['subject'] diff --git a/packages/mod-service/src/api/com/atproto/temp/fetchLabels.ts b/packages/mod-service/src/api/com/atproto/temp/fetchLabels.ts index 8a6cacc2fbd..926cbb868d1 100644 --- a/packages/mod-service/src/api/com/atproto/temp/fetchLabels.ts +++ b/packages/mod-service/src/api/com/atproto/temp/fetchLabels.ts @@ -4,10 +4,9 @@ import AppContext from '../../../../context' export default function (server: Server, ctx: AppContext) { server.com.atproto.temp.fetchLabels(async ({ params }) => { const { limit } = params - const db = ctx.db.getReplica() const since = params.since !== undefined ? new Date(params.since).toISOString() : '' - const labelRes = await db.db + const labelRes = await ctx.db.db .selectFrom('label') .selectAll() .orderBy('label.cts', 'asc') diff --git a/packages/mod-service/src/api/health.ts b/packages/mod-service/src/api/health.ts index bdcdeefcb4b..ef14882625d 100644 --- a/packages/mod-service/src/api/health.ts +++ b/packages/mod-service/src/api/health.ts @@ -7,9 +7,8 @@ export const createRouter = (ctx: AppContext): express.Router => { router.get('/xrpc/_health', async function (req, res) { const { version } = ctx.cfg - const db = ctx.db.getPrimary() try { - await sql`select 1`.execute(db.db) + await sql`select 1`.execute(ctx.db.db) } catch (err) { req.log.error(err, 'failed health check') return res.status(503).send({ version, error: 'Service Unavailable' }) diff --git a/packages/mod-service/src/background.ts b/packages/mod-service/src/background.ts index 466bad80a51..78cd790e779 100644 --- a/packages/mod-service/src/background.ts +++ b/packages/mod-service/src/background.ts @@ -1,5 +1,5 @@ import PQueue from 'p-queue' -import { PrimaryDatabase } from './db' +import { Database } from './db' import { dbLogger } from './logger' // A simple queue for in-process, out-of-band/backgrounded work @@ -7,7 +7,7 @@ import { dbLogger } from './logger' export class BackgroundQueue { queue = new PQueue({ concurrency: 20 }) destroyed = false - constructor(public db: PrimaryDatabase) {} + constructor(public db: Database) {} add(task: Task) { if (this.destroyed) { @@ -32,4 +32,4 @@ export class BackgroundQueue { } } -type Task = (db: PrimaryDatabase) => Promise +type Task = (db: Database) => Promise diff --git a/packages/mod-service/src/context.ts b/packages/mod-service/src/context.ts index 6280a4ea339..1f285b14c6c 100644 --- a/packages/mod-service/src/context.ts +++ b/packages/mod-service/src/context.ts @@ -5,14 +5,9 @@ import { Keypair } from '@atproto/crypto' import { createServiceJwt } from '@atproto/xrpc-server' import { Database } from './db' import { ServerConfig } from './config' -import { ImageUriBuilder } from './image/uri' import { Services } from './services' import * as auth from './auth' -import DidRedisCache from './did-cache' import { BackgroundQueue } from './background' -import { MountedAlgos } from './feed-gen/types' -import { NotificationServer } from './notifications' -import { Redis } from './redis' export class AppContext { public moderationPushAgent: AtpAgent | undefined @@ -20,17 +15,11 @@ export class AppContext { private opts: { db: Database appviewAgent: AtpAgent - imgUriBuilder: ImageUriBuilder cfg: ServerConfig services: Services signingKey: Keypair idResolver: IdResolver - didCache: DidRedisCache - redis: Redis backgroundQueue: BackgroundQueue - searchAgent?: AtpAgent - algos: MountedAlgos - notifServer: NotificationServer }, ) { if (opts.cfg.moderationPushUrl) { @@ -47,10 +36,6 @@ export class AppContext { return this.opts.db } - get imgUriBuilder(): ImageUriBuilder { - return this.opts.imgUriBuilder - } - get cfg(): ServerConfig { return this.opts.cfg } @@ -71,22 +56,6 @@ export class AppContext { return this.opts.idResolver } - get didCache(): DidRedisCache { - return this.opts.didCache - } - - get redis(): Redis { - return this.opts.redis - } - - get notifServer(): NotificationServer { - return this.opts.notifServer - } - - get searchAgent(): AtpAgent | undefined { - return this.opts.searchAgent - } - get authVerifier() { return auth.authVerifier(this.idResolver, { aud: this.cfg.serverDid }) } @@ -125,10 +94,6 @@ export class AppContext { get backgroundQueue(): BackgroundQueue { return this.opts.backgroundQueue } - - get algos(): MountedAlgos { - return this.opts.algos - } } export default AppContext diff --git a/packages/mod-service/src/daemon/config.ts b/packages/mod-service/src/daemon/config.ts deleted file mode 100644 index 3dd7d557652..00000000000 --- a/packages/mod-service/src/daemon/config.ts +++ /dev/null @@ -1,60 +0,0 @@ -import assert from 'assert' - -export interface DaemonConfigValues { - version: string - dbPostgresUrl: string - dbPostgresSchema?: string - notificationsDaemonFromDid?: string -} - -export class DaemonConfig { - constructor(private cfg: DaemonConfigValues) {} - - static readEnv(overrides?: Partial) { - const version = process.env.BSKY_VERSION || '0.0.0' - const dbPostgresUrl = - overrides?.dbPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL - const dbPostgresSchema = - overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA - const notificationsDaemonFromDid = - overrides?.notificationsDaemonFromDid || - process.env.BSKY_NOTIFS_DAEMON_FROM_DID || - undefined - assert(dbPostgresUrl) - return new DaemonConfig({ - version, - dbPostgresUrl, - dbPostgresSchema, - notificationsDaemonFromDid, - ...stripUndefineds(overrides ?? {}), - }) - } - - get version() { - return this.cfg.version - } - - get dbPostgresUrl() { - return this.cfg.dbPostgresUrl - } - - get dbPostgresSchema() { - return this.cfg.dbPostgresSchema - } - - get notificationsDaemonFromDid() { - return this.cfg.notificationsDaemonFromDid - } -} - -function stripUndefineds( - obj: Record, -): Record { - const result = {} - Object.entries(obj).forEach(([key, val]) => { - if (val !== undefined) { - result[key] = val - } - }) - return result -} diff --git a/packages/mod-service/src/daemon/context.ts b/packages/mod-service/src/daemon/context.ts deleted file mode 100644 index dd3d5c1114f..00000000000 --- a/packages/mod-service/src/daemon/context.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { PrimaryDatabase } from '../db' -import { DaemonConfig } from './config' -import { Services } from './services' - -export class DaemonContext { - constructor( - private opts: { - db: PrimaryDatabase - cfg: DaemonConfig - services: Services - }, - ) {} - - get db(): PrimaryDatabase { - return this.opts.db - } - - get cfg(): DaemonConfig { - return this.opts.cfg - } - - get services(): Services { - return this.opts.services - } -} - -export default DaemonContext diff --git a/packages/mod-service/src/daemon/index.ts b/packages/mod-service/src/daemon/index.ts deleted file mode 100644 index 80da01edc2f..00000000000 --- a/packages/mod-service/src/daemon/index.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { PrimaryDatabase } from '../db' -import { dbLogger } from '../logger' -import { DaemonConfig } from './config' -import { DaemonContext } from './context' -import { createServices } from './services' -import { ImageUriBuilder } from '../image/uri' -import { NotificationsDaemon } from './notifications' -import logger from './logger' - -export { DaemonConfig } from './config' -export type { DaemonConfigValues } from './config' - -export class BskyDaemon { - public ctx: DaemonContext - public notifications: NotificationsDaemon - private dbStatsInterval: NodeJS.Timer - private notifStatsInterval: NodeJS.Timer - - constructor(opts: { - ctx: DaemonContext - notifications: NotificationsDaemon - }) { - this.ctx = opts.ctx - this.notifications = opts.notifications - } - - static create(opts: { db: PrimaryDatabase; cfg: DaemonConfig }): BskyDaemon { - const { db, cfg } = opts - const imgUriBuilder = new ImageUriBuilder('https://daemon.invalid') // will not be used by daemon - const services = createServices({ - imgUriBuilder, - }) - const ctx = new DaemonContext({ - db, - cfg, - services, - }) - const notifications = new NotificationsDaemon(ctx) - return new BskyDaemon({ ctx, notifications }) - } - - async start() { - const { db, cfg } = this.ctx - const pool = db.pool - this.notifications.run({ - startFromDid: cfg.notificationsDaemonFromDid, - }) - this.dbStatsInterval = setInterval(() => { - dbLogger.info( - { - idleCount: pool.idleCount, - totalCount: pool.totalCount, - waitingCount: pool.waitingCount, - }, - 'db pool stats', - ) - }, 10000) - this.notifStatsInterval = setInterval(() => { - logger.info( - { - count: this.notifications.count, - lastDid: this.notifications.lastDid, - }, - 'notifications daemon stats', - ) - }, 10000) - return this - } - - async destroy(): Promise { - await this.notifications.destroy() - await this.ctx.db.close() - clearInterval(this.dbStatsInterval) - clearInterval(this.notifStatsInterval) - } -} - -export default BskyDaemon diff --git a/packages/mod-service/src/daemon/logger.ts b/packages/mod-service/src/daemon/logger.ts deleted file mode 100644 index 8599acc315e..00000000000 --- a/packages/mod-service/src/daemon/logger.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { subsystemLogger } from '@atproto/common' - -const logger: ReturnType = - subsystemLogger('bsky:daemon') - -export default logger diff --git a/packages/mod-service/src/daemon/notifications.ts b/packages/mod-service/src/daemon/notifications.ts deleted file mode 100644 index 96431af8c1f..00000000000 --- a/packages/mod-service/src/daemon/notifications.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { tidyNotifications } from '../services/util/notification' -import DaemonContext from './context' -import logger from './logger' - -export class NotificationsDaemon { - ac = new AbortController() - running: Promise | undefined - count = 0 - lastDid: string | null = null - - constructor(private ctx: DaemonContext) {} - - run(opts?: RunOptions) { - if (this.running) return - this.count = 0 - this.lastDid = null - this.ac = new AbortController() - this.running = this.tidyNotifications({ - ...opts, - forever: opts?.forever !== false, // run forever by default - }) - .catch((err) => { - // allow this to cause an unhandled rejection, let deployment handle the crash. - logger.error({ err }, 'notifications daemon crashed') - throw err - }) - .finally(() => (this.running = undefined)) - } - - private async tidyNotifications(opts: RunOptions) { - const actorService = this.ctx.services.actor(this.ctx.db) - for await (const { did } of actorService.all(opts)) { - if (this.ac.signal.aborted) return - try { - await tidyNotifications(this.ctx.db, did) - this.count++ - this.lastDid = did - } catch (err) { - logger.warn({ err, did }, 'failed to tidy notifications for actor') - } - } - } - - async destroy() { - this.ac.abort() - await this.running - } -} - -type RunOptions = { - forever?: boolean - batchSize?: number - startFromDid?: string -} diff --git a/packages/mod-service/src/daemon/services.ts b/packages/mod-service/src/daemon/services.ts deleted file mode 100644 index 93141d13a08..00000000000 --- a/packages/mod-service/src/daemon/services.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { PrimaryDatabase } from '../db' -import { ActorService } from '../services/actor' -import { ImageUriBuilder } from '../image/uri' -import { GraphService } from '../services/graph' -import { LabelService } from '../services/label' - -export function createServices(resources: { - imgUriBuilder: ImageUriBuilder -}): Services { - const { imgUriBuilder } = resources - const graph = GraphService.creator(imgUriBuilder) - const label = LabelService.creator(null) - return { - actor: ActorService.creator(imgUriBuilder, graph, label), - } -} - -export type Services = { - actor: FromDbPrimary -} - -type FromDbPrimary = (db: PrimaryDatabase) => T diff --git a/packages/mod-service/src/db/leader.ts b/packages/mod-service/src/db/leader.ts index ebd44bf98d6..3b76ebbe3d3 100644 --- a/packages/mod-service/src/db/leader.ts +++ b/packages/mod-service/src/db/leader.ts @@ -1,9 +1,9 @@ import { PoolClient } from 'pg' -import PrimaryDatabase from './primary' +import { Database } from './index' export class Leader { session: Session | null = null - constructor(public id: number, public db: PrimaryDatabase) {} + constructor(public id: number, public db: Database) {} async run( task: (ctx: { signal: AbortSignal }) => Promise, diff --git a/packages/mod-service/src/db/periodic-moderation-event-reversal.ts b/packages/mod-service/src/db/periodic-moderation-event-reversal.ts index 9937c113d59..1402d49a039 100644 --- a/packages/mod-service/src/db/periodic-moderation-event-reversal.ts +++ b/packages/mod-service/src/db/periodic-moderation-event-reversal.ts @@ -11,10 +11,7 @@ import { retryHttp } from '../util/retry' export const MODERATION_ACTION_REVERSAL_ID = 1011 export class PeriodicModerationEventReversal { - leader = new Leader( - MODERATION_ACTION_REVERSAL_ID, - this.appContext.db.getPrimary(), - ) + leader = new Leader(MODERATION_ACTION_REVERSAL_ID, this.appContext.db) destroyed = false pushAgent?: AtpAgent @@ -23,7 +20,7 @@ export class PeriodicModerationEventReversal { } async revertState(eventRow: ModerationSubjectStatusRow) { - await this.appContext.db.getPrimary().transaction(async (dbTxn) => { + await this.appContext.db.transaction(async (dbTxn) => { const moderationTxn = this.appContext.services.moderation(dbTxn) const originalEvent = await moderationTxn.getLastReversibleEventForSubject(eventRow) @@ -71,7 +68,7 @@ export class PeriodicModerationEventReversal { async findAndRevertDueActions() { const moderationService = this.appContext.services.moderation( - this.appContext.db.getPrimary(), + this.appContext.db, ) const subjectsDueForReversal = await moderationService.getSubjectsDueForReversal() diff --git a/packages/mod-service/src/db/util.ts b/packages/mod-service/src/db/util.ts index dfd93e66a17..d76e780a23c 100644 --- a/packages/mod-service/src/db/util.ts +++ b/packages/mod-service/src/db/util.ts @@ -8,7 +8,7 @@ import { SqliteIntrospector, SqliteQueryCompiler, } from 'kysely' -import DatabaseSchema from './database-schema' +import DatabaseSchema from './schema' export const actorWhereClause = (actor: string) => { if (actor.startsWith('did:')) { diff --git a/packages/mod-service/src/db/views.ts b/packages/mod-service/src/db/views.ts deleted file mode 100644 index d5aa9941436..00000000000 --- a/packages/mod-service/src/db/views.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { jitter, wait } from '@atproto/common' -import { Leader } from './leader' -import { dbLogger } from '../logger' -import { PrimaryDatabase } from '.' - -export const VIEW_MAINTAINER_ID = 1010 -const VIEWS = ['algo_whats_hot_view'] - -export class ViewMaintainer { - leader = new Leader(VIEW_MAINTAINER_ID, this.db) - destroyed = false - - // @NOTE the db must be authed as the owner of the materialized view, per postgres. - constructor(public db: PrimaryDatabase, public intervalSec = 60) {} - - async run() { - while (!this.destroyed) { - try { - const { ran } = await this.leader.run(async ({ signal }) => { - await this.db.maintainMaterializedViews({ - signal, - views: VIEWS, - intervalSec: this.intervalSec, - }) - }) - if (ran && !this.destroyed) { - throw new Error('View maintainer completed, but should be persistent') - } - } catch (err) { - dbLogger.error( - { - err, - views: VIEWS, - intervalSec: this.intervalSec, - lockId: VIEW_MAINTAINER_ID, - }, - 'view maintainer errored', - ) - } - if (!this.destroyed) { - await wait(10000 + jitter(2000)) - } - } - } - - destroy() { - this.destroyed = true - this.leader.destroy() - } -} diff --git a/packages/mod-service/src/did-cache.ts b/packages/mod-service/src/did-cache.ts deleted file mode 100644 index 9e45d0d8b30..00000000000 --- a/packages/mod-service/src/did-cache.ts +++ /dev/null @@ -1,87 +0,0 @@ -import PQueue from 'p-queue' -import { CacheResult, DidCache, DidDocument } from '@atproto/identity' -import { cacheLogger as log } from './logger' -import { Redis } from './redis' - -type CacheOptions = { - staleTTL: number - maxTTL: number -} - -export class DidRedisCache implements DidCache { - public pQueue: PQueue | null // null during teardown - - constructor(public redis: Redis, public opts: CacheOptions) { - this.pQueue = new PQueue() - } - - async cacheDid(did: string, doc: DidDocument): Promise { - const item = JSON.stringify({ - doc, - updatedAt: Date.now(), - }) - await this.redis.set(did, item, this.opts.maxTTL) - } - - async refreshCache( - did: string, - getDoc: () => Promise, - ): Promise { - this.pQueue?.add(async () => { - try { - const doc = await getDoc() - if (doc) { - await this.cacheDid(did, doc) - } else { - await this.clearEntry(did) - } - } catch (err) { - log.error({ did, err }, 'refreshing did cache failed') - } - }) - } - - async checkCache(did: string): Promise { - let got: string | null - try { - got = await this.redis.get(did) - } catch (err) { - got = null - log.error({ did, err }, 'error fetching did from cache') - } - if (!got) return null - const { doc, updatedAt } = JSON.parse(got) as CacheResult - const now = Date.now() - const expired = now > updatedAt + this.opts.maxTTL - const stale = now > updatedAt + this.opts.staleTTL - return { - doc, - updatedAt, - did, - stale, - expired, - } - } - - async clearEntry(did: string): Promise { - await this.redis.del(did) - } - - async clear(): Promise { - throw new Error('Not implemented for redis cache') - } - - async processAll() { - await this.pQueue?.onIdle() - } - - async destroy() { - const pQueue = this.pQueue - this.pQueue = null - pQueue?.pause() - pQueue?.clear() - await pQueue?.onIdle() - } -} - -export default DidRedisCache diff --git a/packages/mod-service/src/index.ts b/packages/mod-service/src/index.ts index 2f83efb3746..d76438ed310 100644 --- a/packages/mod-service/src/index.ts +++ b/packages/mod-service/src/index.ts @@ -6,47 +6,23 @@ import { createHttpTerminator, HttpTerminator } from 'http-terminator' import cors from 'cors' import compression from 'compression' import { IdResolver } from '@atproto/identity' -import { - RateLimiter, - RateLimiterOpts, - Options as XrpcServerOptions, -} from '@atproto/xrpc-server' -import { MINUTE } from '@atproto/common' -import API, { health, wellKnown, blobResolver } from './api' -import { DatabaseCoordinator } from './db' +import API, { health, wellKnown } from './api' import * as error from './error' -import { dbLogger, loggerMiddleware } from './logger' +import { loggerMiddleware } from './logger' import { ServerConfig } from './config' import { createServer } from './lexicon' -import { ImageUriBuilder } from './image/uri' -import { BlobDiskCache, ImageProcessingServer } from './image/server' import { createServices } from './services' import AppContext from './context' -import DidRedisCache from './did-cache' -import { - ImageInvalidator, - ImageProcessingServerInvalidator, -} from './image/invalidator' import { BackgroundQueue } from './background' -import { MountedAlgos } from './feed-gen/types' -import { NotificationServer } from './notifications' import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' -import { Redis } from './redis' +import Database from './db' export type { ServerConfigValues } from './config' -export type { MountedAlgos } from './feed-gen/types' export { ServerConfig } from './config' -export { Database, PrimaryDatabase, DatabaseCoordinator } from './db' +export { Database } from './db' export { PeriodicModerationEventReversal } from './db/periodic-moderation-event-reversal' -export { Redis } from './redis' -export { ViewMaintainer } from './db/views' export { AppContext } from './context' -export { makeAlgos } from './feed-gen' -export * from './daemon' -export * from './indexer' -export * from './ingester' -export { MigrateModerationData } from './migrate-moderation-data' export class BskyAppView { public ctx: AppContext @@ -61,131 +37,51 @@ export class BskyAppView { } static create(opts: { - db: DatabaseCoordinator - redis: Redis + db: Database config: ServerConfig signingKey: Keypair - imgInvalidator?: ImageInvalidator - algos?: MountedAlgos }): BskyAppView { - const { db, redis, config, signingKey, algos = {} } = opts - let maybeImgInvalidator = opts.imgInvalidator + const { db, config, signingKey } = opts const app = express() app.set('trust proxy', true) app.use(cors()) app.use(loggerMiddleware) app.use(compression()) - const didCache = new DidRedisCache(redis.withNamespace('did-doc'), { - staleTTL: config.didCacheStaleTTL, - maxTTL: config.didCacheMaxTTL, - }) - const idResolver = new IdResolver({ plcUrl: config.didPlcUrl, - didCache, backupNameservers: config.handleResolveNameservers, }) - const imgUriBuilder = new ImageUriBuilder( - config.imgUriEndpoint || `${config.publicUrl}/img`, - ) - - let imgProcessingServer: ImageProcessingServer | undefined - if (!config.imgUriEndpoint) { - const imgProcessingCache = new BlobDiskCache(config.blobCacheLocation) - imgProcessingServer = new ImageProcessingServer( - config, - imgProcessingCache, - ) - maybeImgInvalidator ??= new ImageProcessingServerInvalidator( - imgProcessingCache, - ) - } - - let imgInvalidator: ImageInvalidator - if (maybeImgInvalidator) { - imgInvalidator = maybeImgInvalidator - } else { - throw new Error('Missing appview image invalidator') - } - - const backgroundQueue = new BackgroundQueue(db.getPrimary()) - - const notifServer = new NotificationServer(db.getPrimary()) - const searchAgent = config.searchEndpoint - ? new AtpAgent({ service: config.searchEndpoint }) - : undefined - - const services = createServices({ - imgUriBuilder, - imgInvalidator, - labelCacheOpts: { - redis: redis.withNamespace('label'), - staleTTL: config.labelCacheStaleTTL, - maxTTL: config.labelCacheMaxTTL, - }, - }) + const backgroundQueue = new BackgroundQueue(db) + + const appviewAgent = new AtpAgent({ service: '@TODO' }) + + const services = createServices(appviewAgent) const ctx = new AppContext({ db, cfg: config, + appviewAgent, services, - imgUriBuilder, signingKey, idResolver, - didCache, - redis, backgroundQueue, - searchAgent, - algos, - notifServer, }) - const xrpcOpts: XrpcServerOptions = { + let server = createServer({ validateResponse: config.debugMode, payload: { jsonLimit: 100 * 1024, // 100kb textLimit: 100 * 1024, // 100kb blobLimit: 5 * 1024 * 1024, // 5mb }, - } - if (config.rateLimitsEnabled) { - const rlCreator = (opts: RateLimiterOpts) => - RateLimiter.redis(redis.driver, { - bypassSecret: config.rateLimitBypassKey, - bypassIps: config.rateLimitBypassIps, - ...opts, - }) - xrpcOpts['rateLimits'] = { - creator: rlCreator, - global: [ - { - name: 'global-unauthed-ip', - durationMs: 5 * MINUTE, - points: 3000, - calcKey: (ctx) => (ctx.auth ? null : ctx.req.ip), - }, - { - name: 'global-authed-did', - durationMs: 5 * MINUTE, - points: 3000, - calcKey: (ctx) => ctx.auth?.credentials?.did ?? null, - }, - ], - } - } - - let server = createServer(xrpcOpts) + }) server = API(server, ctx) app.use(health.createRouter(ctx)) app.use(wellKnown.createRouter(ctx)) - app.use(blobResolver.createRouter(ctx)) - if (imgProcessingServer) { - app.use('/img', imgProcessingServer.app) - } app.use(server.xrpc.router) app.use(error.handler) @@ -193,38 +89,36 @@ export class BskyAppView { } async start(): Promise { - const { db, backgroundQueue } = this.ctx - const primary = db.getPrimary() - const replicas = db.getReplicas() - this.dbStatsInterval = setInterval(() => { - dbLogger.info( - { - idleCount: replicas.reduce( - (tot, replica) => tot + replica.pool.idleCount, - 0, - ), - totalCount: replicas.reduce( - (tot, replica) => tot + replica.pool.totalCount, - 0, - ), - waitingCount: replicas.reduce( - (tot, replica) => tot + replica.pool.waitingCount, - 0, - ), - primaryIdleCount: primary.pool.idleCount, - primaryTotalCount: primary.pool.totalCount, - primaryWaitingCount: primary.pool.waitingCount, - }, - 'db pool stats', - ) - dbLogger.info( - { - runningCount: backgroundQueue.queue.pending, - waitingCount: backgroundQueue.queue.size, - }, - 'background queue stats', - ) - }, 10000) + // const { db, backgroundQueue } = this.ctx + // this.dbStatsInterval = setInterval(() => { + // dbLogger.info( + // { + // idleCount: replicas.reduce( + // (tot, replica) => tot + replica.pool.idleCount, + // 0, + // ), + // totalCount: replicas.reduce( + // (tot, replica) => tot + replica.pool.totalCount, + // 0, + // ), + // waitingCount: replicas.reduce( + // (tot, replica) => tot + replica.pool.waitingCount, + // 0, + // ), + // primaryIdleCount: primary.pool.idleCount, + // primaryTotalCount: primary.pool.totalCount, + // primaryWaitingCount: primary.pool.waitingCount, + // }, + // 'db pool stats', + // ) + // dbLogger.info( + // { + // runningCount: backgroundQueue.queue.pending, + // waitingCount: backgroundQueue.queue.size, + // }, + // 'background queue stats', + // ) + // }, 10000) const server = this.app.listen(this.ctx.cfg.port) this.server = server server.keepAliveTimeout = 90000 @@ -235,11 +129,9 @@ export class BskyAppView { return server } - async destroy(opts?: { skipDb: boolean; skipRedis: boolean }): Promise { - await this.ctx.didCache.destroy() + async destroy(opts?: { skipDb: boolean }): Promise { await this.terminator?.terminate() await this.ctx.backgroundQueue.destroy() - if (!opts?.skipRedis) await this.ctx.redis.destroy() if (!opts?.skipDb) await this.ctx.db.close() clearInterval(this.dbStatsInterval) } diff --git a/packages/mod-service/src/migrate-moderation-data.ts b/packages/mod-service/src/migrate-moderation-data.ts deleted file mode 100644 index 6919358170a..00000000000 --- a/packages/mod-service/src/migrate-moderation-data.ts +++ /dev/null @@ -1,414 +0,0 @@ -import { sql } from 'kysely' -import { DatabaseCoordinator, PrimaryDatabase } from './index' -import { adjustModerationSubjectStatus } from './services/moderation/status' -import { ModerationEventRow } from './services/moderation/types' - -type ModerationActionRow = Omit & { - reason: string | null -} - -const getEnv = () => ({ - DB_URL: - process.env.MODERATION_MIGRATION_DB_URL || - 'postgresql://pg:password@127.0.0.1:5433/postgres', - DB_POOL_SIZE: Number(process.env.MODERATION_MIGRATION_DB_POOL_SIZE) || 10, - DB_SCHEMA: process.env.MODERATION_MIGRATION_DB_SCHEMA || 'bsky', -}) - -const countEntries = async (db: PrimaryDatabase) => { - const [allActions, allReports] = await Promise.all([ - db.db - // @ts-ignore - .selectFrom('moderation_action') - // @ts-ignore - .select((eb) => eb.fn.count('id').as('count')) - .executeTakeFirstOrThrow(), - db.db - // @ts-ignore - .selectFrom('moderation_report') - // @ts-ignore - .select((eb) => eb.fn.count('id').as('count')) - .executeTakeFirstOrThrow(), - ]) - - return { reportsCount: allReports.count, actionsCount: allActions.count } -} - -const countEvents = async (db: PrimaryDatabase) => { - const events = await db.db - .selectFrom('moderation_event') - .select((eb) => eb.fn.count('id').as('count')) - .executeTakeFirstOrThrow() - - return events.count -} - -const getLatestReportLegacyRefId = async (db: PrimaryDatabase) => { - const events = await db.db - .selectFrom('moderation_event') - .select((eb) => eb.fn.max('legacyRefId').as('latestLegacyRefId')) - .where('action', '=', 'com.atproto.admin.defs#modEventReport') - .executeTakeFirstOrThrow() - - return events.latestLegacyRefId -} - -const countStatuses = async (db: PrimaryDatabase) => { - const events = await db.db - .selectFrom('moderation_subject_status') - .select((eb) => eb.fn.count('id').as('count')) - .executeTakeFirstOrThrow() - - return events.count -} - -const processLegacyReports = async ( - db: PrimaryDatabase, - legacyIds: number[], -) => { - if (!legacyIds.length) { - console.log('No legacy reports to process') - return - } - const reports = await db.db - .selectFrom('moderation_event') - .where('action', '=', 'com.atproto.admin.defs#modEventReport') - .where('legacyRefId', 'in', legacyIds) - .orderBy('legacyRefId', 'asc') - .selectAll() - .execute() - - console.log(`Processing ${reports.length} reports from ${legacyIds.length}`) - await db.transaction(async (tx) => { - // This will be slow but we need to run this in sequence - for (const report of reports) { - await adjustModerationSubjectStatus(tx, report) - } - }) - console.log(`Completed processing ${reports.length} reports`) -} - -const getReportEventsAboveLegacyId = async ( - db: PrimaryDatabase, - aboveLegacyId: number, -) => { - return await db.db - .selectFrom('moderation_event') - .where('action', '=', 'com.atproto.admin.defs#modEventReport') - .where('legacyRefId', '>', aboveLegacyId) - .select(sql`"legacyRefId"`.as('legacyRefId')) - .execute() -} - -const createEvents = async ( - db: PrimaryDatabase, - opts?: { onlyReportsAboveId: number }, -) => { - const commonColumnsToSelect = [ - 'subjectDid', - 'subjectUri', - 'subjectType', - 'subjectCid', - sql`reason`.as('comment'), - 'createdAt', - ] - const commonColumnsToInsert = [ - 'subjectDid', - 'subjectUri', - 'subjectType', - 'subjectCid', - 'comment', - 'createdAt', - 'action', - 'createdBy', - ] as const - - let totalActions: number - if (!opts?.onlyReportsAboveId) { - await db.db - .insertInto('moderation_event') - .columns([ - 'id', - ...commonColumnsToInsert, - 'createLabelVals', - 'negateLabelVals', - 'durationInHours', - 'expiresAt', - ]) - .expression((eb) => - eb - // @ts-ignore - .selectFrom('moderation_action') - // @ts-ignore - .select([ - 'id', - ...commonColumnsToSelect, - sql`CONCAT('com.atproto.admin.defs#modEvent', UPPER(SUBSTRING(SPLIT_PART(action, '#', 2) FROM 1 FOR 1)), SUBSTRING(SPLIT_PART(action, '#', 2) FROM 2))`.as( - 'action', - ), - 'createdBy', - 'createLabelVals', - 'negateLabelVals', - 'durationInHours', - 'expiresAt', - ]) - .orderBy('id', 'asc'), - ) - .execute() - - totalActions = await countEvents(db) - console.log(`Created ${totalActions} events from actions`) - - await sql`SELECT setval(pg_get_serial_sequence('moderation_event', 'id'), (select max(id) from moderation_event))`.execute( - db.db, - ) - console.log('Reset the id sequence for moderation_event') - } else { - totalActions = await countEvents(db) - } - - await db.db - .insertInto('moderation_event') - .columns([...commonColumnsToInsert, 'meta', 'legacyRefId']) - .expression((eb) => { - const builder = eb - // @ts-ignore - .selectFrom('moderation_report') - // @ts-ignore - .select([ - ...commonColumnsToSelect, - sql`'com.atproto.admin.defs#modEventReport'`.as('action'), - sql`"reportedByDid"`.as('createdBy'), - sql`json_build_object('reportType', "reasonType")`.as('meta'), - sql`id`.as('legacyRefId'), - ]) - - if (opts?.onlyReportsAboveId) { - // @ts-ignore - return builder.where('id', '>', opts.onlyReportsAboveId) - } - - return builder - }) - .execute() - - const totalEvents = await countEvents(db) - console.log(`Created ${totalEvents - totalActions} events from reports`) - - return -} - -const setReportedAtTimestamp = async (db: PrimaryDatabase) => { - console.log('Initiating lastReportedAt timestamp sync') - const didUpdate = await sql` - UPDATE moderation_subject_status - SET "lastReportedAt" = reports."createdAt" - FROM ( - select "subjectDid", "subjectUri", MAX("createdAt") as "createdAt" - from moderation_report - where "subjectUri" is null - group by "subjectDid", "subjectUri" - ) as reports - WHERE reports."subjectDid" = moderation_subject_status."did" - AND "recordPath" = '' - AND ("lastReportedAt" is null OR "lastReportedAt" < reports."createdAt") - `.execute(db.db) - - console.log( - `Updated lastReportedAt for ${didUpdate.numUpdatedOrDeletedRows} did subject`, - ) - - const contentUpdate = await sql` - UPDATE moderation_subject_status - SET "lastReportedAt" = reports."createdAt" - FROM ( - select "subjectDid", "subjectUri", MAX("createdAt") as "createdAt" - from moderation_report - where "subjectUri" is not null - group by "subjectDid", "subjectUri" - ) as reports - WHERE reports."subjectDid" = moderation_subject_status."did" - AND "recordPath" is not null - AND POSITION(moderation_subject_status."recordPath" IN reports."subjectUri") > 0 - AND ("lastReportedAt" is null OR "lastReportedAt" < reports."createdAt") - `.execute(db.db) - - console.log( - `Updated lastReportedAt for ${contentUpdate.numUpdatedOrDeletedRows} subject with uri`, - ) -} - -const createStatusFromActions = async (db: PrimaryDatabase) => { - const allEvents = await db.db - // @ts-ignore - .selectFrom('moderation_action') - // @ts-ignore - .where('reversedAt', 'is', null) - // @ts-ignore - .select((eb) => eb.fn.count('id').as('count')) - .executeTakeFirstOrThrow() - - const chunkSize = 2500 - const totalChunks = Math.ceil(allEvents.count / chunkSize) - - console.log(`Processing ${allEvents.count} actions in ${totalChunks} chunks`) - - await db.transaction(async (tx) => { - // This is not used for pagination but only for logging purposes - let currentChunk = 1 - let lastProcessedId: undefined | number = 0 - do { - const eventsQuery = tx.db - // @ts-ignore - .selectFrom('moderation_action') - // @ts-ignore - .where('reversedAt', 'is', null) - // @ts-ignore - .where('id', '>', lastProcessedId) - .limit(chunkSize) - .selectAll() - const events = (await eventsQuery.execute()) as ModerationActionRow[] - - for (const event of events) { - // Remap action to event data type - const actionParts = event.action.split('#') - await adjustModerationSubjectStatus(tx, { - ...event, - action: `com.atproto.admin.defs#modEvent${actionParts[1] - .charAt(0) - .toUpperCase()}${actionParts[1].slice( - 1, - )}` as ModerationEventRow['action'], - comment: event.reason, - meta: null, - }) - } - - console.log(`Processed events chunk ${currentChunk} of ${totalChunks}`) - lastProcessedId = events.at(-1)?.id - currentChunk++ - } while (lastProcessedId !== undefined) - }) - - console.log(`Events migration complete!`) - - const totalStatuses = await countStatuses(db) - console.log(`Created ${totalStatuses} statuses`) -} - -const remapFlagToAcknlowedge = async (db: PrimaryDatabase) => { - console.log('Initiating flag to ack remap') - const results = await sql` - UPDATE moderation_event - SET "action" = 'com.atproto.admin.defs#modEventAcknowledge' - WHERE action = 'com.atproto.admin.defs#modEventFlag' - `.execute(db.db) - console.log(`Remapped ${results.numUpdatedOrDeletedRows} flag actions to ack`) -} - -const syncBlobCids = async (db: PrimaryDatabase) => { - console.log('Initiating blob cid sync') - const results = await sql` - UPDATE moderation_subject_status - SET "blobCids" = blob_action."cids" - FROM ( - SELECT moderation_action."subjectUri", moderation_action."subjectDid", jsonb_agg(moderation_action_subject_blob."cid") as cids - FROM moderation_action_subject_blob - JOIN moderation_action - ON moderation_action.id = moderation_action_subject_blob."actionId" - WHERE moderation_action."reversedAt" is NULL - GROUP by moderation_action."subjectUri", moderation_action."subjectDid" - ) as blob_action - WHERE did = "subjectDid" AND position("recordPath" IN "subjectUri") > 0 - `.execute(db.db) - console.log(`Updated blob cids on ${results.numUpdatedOrDeletedRows} rows`) -} - -async function updateStatusFromUnresolvedReports(db: PrimaryDatabase) { - const { ref } = db.db.dynamic - const reports = await db.db - // @ts-ignore - .selectFrom('moderation_report') - .whereNotExists((qb) => - qb - .selectFrom('moderation_report_resolution') - .selectAll() - // @ts-ignore - .whereRef('reportId', '=', ref('moderation_report.id')), - ) - .select(sql`moderation_report.id`.as('legacyId')) - .execute() - - console.log('Updating statuses based on unresolved reports') - await processLegacyReports( - db, - reports.map((report) => report.legacyId), - ) - console.log('Completed updating statuses based on unresolved reports') -} - -export async function MigrateModerationData() { - const env = getEnv() - const db = new DatabaseCoordinator({ - schema: env.DB_SCHEMA, - primary: { - url: env.DB_URL, - poolSize: env.DB_POOL_SIZE, - }, - replicas: [], - }) - - const primaryDb = db.getPrimary() - - const [counts, existingEventsCount] = await Promise.all([ - countEntries(primaryDb), - countEvents(primaryDb), - ]) - - // If there are existing events in the moderation_event table, we assume that the migration has already been run - // so we just bring over any new reports since last run - if (existingEventsCount) { - console.log( - `Found ${existingEventsCount} existing events. Migrating ${counts.reportsCount} reports only, ignoring actions`, - ) - const reportMigrationStartedAt = Date.now() - const latestReportLegacyRefId = await getLatestReportLegacyRefId(primaryDb) - - if (latestReportLegacyRefId) { - await createEvents(primaryDb, { - onlyReportsAboveId: latestReportLegacyRefId, - }) - const newReportEvents = await getReportEventsAboveLegacyId( - primaryDb, - latestReportLegacyRefId, - ) - await processLegacyReports( - primaryDb, - newReportEvents.map((evt) => evt.legacyRefId), - ) - await setReportedAtTimestamp(primaryDb) - } else { - console.log('No reports have been migrated into events yet, bailing.') - } - - console.log( - `Time spent: ${(Date.now() - reportMigrationStartedAt) / 1000} seconds`, - ) - console.log('Migration complete!') - return - } - - const totalEntries = counts.actionsCount + counts.reportsCount - console.log(`Migrating ${totalEntries} rows of actions and reports`) - const startedAt = Date.now() - await createEvents(primaryDb) - // Important to run this before creation statuses from actions to ensure that we are not attempting to map flag actions - await remapFlagToAcknlowedge(primaryDb) - await createStatusFromActions(primaryDb) - await updateStatusFromUnresolvedReports(primaryDb) - await setReportedAtTimestamp(primaryDb) - await syncBlobCids(primaryDb) - - console.log(`Time spent: ${(Date.now() - startedAt) / 1000 / 60} minutes`) - console.log('Migration complete!') -} diff --git a/packages/mod-service/src/notifications.ts b/packages/mod-service/src/notifications.ts deleted file mode 100644 index fdf24919d19..00000000000 --- a/packages/mod-service/src/notifications.ts +++ /dev/null @@ -1,382 +0,0 @@ -import axios from 'axios' -import { Insertable, sql } from 'kysely' -import TTLCache from '@isaacs/ttlcache' -import { AtUri } from '@atproto/api' -import { MINUTE, chunkArray } from '@atproto/common' -import Database from './db/primary' -import { Notification } from './db/tables/notification' -import { NotificationPushToken as PushToken } from './db/tables/notification-push-token' -import logger from './indexer/logger' -import { notSoftDeletedClause, valuesList } from './db/util' -import { ids } from './lexicon/lexicons' -import { retryHttp } from './util/retry' - -export type Platform = 'ios' | 'android' | 'web' - -type PushNotification = { - tokens: string[] - platform: 1 | 2 // 1 = ios, 2 = android - title: string - message: string - topic: string - data?: { - [key: string]: string - } - collapse_id?: string - collapse_key?: string -} - -type InsertableNotif = Insertable - -type NotifDisplay = { - key: string - rateLimit: boolean - title: string - body: string - notif: InsertableNotif -} - -export class NotificationServer { - private rateLimiter = new RateLimiter(1, 30 * MINUTE) - - constructor(public db: Database, public pushEndpoint?: string) {} - - async getTokensByDid(dids: string[]) { - if (!dids.length) return {} - const tokens = await this.db.db - .selectFrom('notification_push_token') - .where('did', 'in', dids) - .selectAll() - .execute() - return tokens.reduce((acc, token) => { - acc[token.did] ??= [] - acc[token.did].push(token) - return acc - }, {} as Record) - } - - async prepareNotifsToSend(notifications: InsertableNotif[]) { - const now = Date.now() - const notifsToSend: PushNotification[] = [] - const tokensByDid = await this.getTokensByDid( - unique(notifications.map((n) => n.did)), - ) - // views for all notifications that have tokens - const notificationViews = await this.getNotificationDisplayAttributes( - notifications.filter((n) => tokensByDid[n.did]), - ) - - for (const notifView of notificationViews) { - if (!isRecent(notifView.notif.sortAt, 10 * MINUTE)) { - continue // if the notif is from > 10 minutes ago, don't send push notif - } - const { did: userDid } = notifView.notif - const userTokens = tokensByDid[userDid] ?? [] - for (const t of userTokens) { - const { appId, platform, token } = t - if (notifView.rateLimit && !this.rateLimiter.check(token, now)) { - continue - } - if (platform === 'ios' || platform === 'android') { - notifsToSend.push({ - tokens: [token], - platform: platform === 'ios' ? 1 : 2, - title: notifView.title, - message: notifView.body, - topic: appId, - data: { - reason: notifView.notif.reason, - recordUri: notifView.notif.recordUri, - recordCid: notifView.notif.recordCid, - }, - collapse_id: notifView.key, - collapse_key: notifView.key, - }) - } else { - // @TODO: Handle web notifs - logger.warn({ did: userDid }, 'cannot send web notification to user') - } - } - } - - return notifsToSend - } - - /** - * The function `addNotificationsToQueue` adds push notifications to a queue, taking into account rate - * limiting and batching the notifications for efficient processing. - * @param {PushNotification[]} notifs - An array of PushNotification objects. Each PushNotification - * object has a "tokens" property which is an array of tokens. - * @returns void - */ - async processNotifications(notifs: PushNotification[]) { - for (const batch of chunkArray(notifs, 20)) { - try { - await this.sendPushNotifications(batch) - } catch (err) { - logger.error({ err, batch }, 'notification push batch failed') - } - } - } - - /** 1. Get the user's token (APNS or FCM for iOS and Android respectively) from the database - User token will be in the format: - did || token || platform (1 = iOS, 2 = Android, 3 = Web) - 2. Send notification to `gorush` server with token - Notification will be in the format: - "notifications": [ - { - "tokens": string[], - "platform": 1 | 2, - "message": string, - "title": string, - "priority": "normal" | "high", - "image": string, (Android only) - "expiration": number, (iOS only) - "badge": number, (iOS only) - } - ] - 3. `gorush` will send notification to APNS or FCM - 4. store response from `gorush` which contains the ID of the notification - 5. If notification needs to be updated or deleted, find the ID of the notification from the database and send a new notification to `gorush` with the ID (repeat step 2) - */ - private async sendPushNotifications(notifications: PushNotification[]) { - // if pushEndpoint is not defined, we are not running in the indexer service, so we can't send push notifications - if (!this.pushEndpoint) { - throw new Error('Push endpoint not defined') - } - // if no notifications, skip and return early - if (notifications.length === 0) { - return - } - const pushEndpoint = this.pushEndpoint - await retryHttp(() => - axios.post( - pushEndpoint, - { notifications }, - { - headers: { - 'Content-Type': 'application/json', - accept: 'application/json', - }, - }, - ), - ) - } - - async registerDeviceForPushNotifications( - did: string, - token: string, - platform: Platform, - appId: string, - ) { - // if token doesn't exist, insert it, on conflict do nothing - await this.db.db - .insertInto('notification_push_token') - .values({ did, token, platform, appId }) - .onConflict((oc) => oc.doNothing()) - .execute() - } - - async getNotificationDisplayAttributes( - notifs: InsertableNotif[], - ): Promise { - const { ref } = this.db.db.dynamic - const authorDids = notifs.map((n) => n.author) - const subjectUris = notifs.flatMap((n) => n.reasonSubject ?? []) - const recordUris = notifs.map((n) => n.recordUri) - const allUris = [...subjectUris, ...recordUris] - - // gather potential display data for notifications in batch - const [authors, posts, blocksAndMutes] = await Promise.all([ - this.db.db - .selectFrom('actor') - .leftJoin('profile', 'profile.creator', 'actor.did') - .leftJoin('record', 'record.uri', 'profile.uri') - .where(notSoftDeletedClause(ref('actor'))) - .where(notSoftDeletedClause(ref('record'))) - .where('profile.creator', 'in', authorDids.length ? authorDids : ['']) - .select(['actor.did as did', 'handle', 'displayName']) - .execute(), - this.db.db - .selectFrom('post') - .innerJoin('actor', 'actor.did', 'post.creator') - .innerJoin('record', 'record.uri', 'post.uri') - .where(notSoftDeletedClause(ref('actor'))) - .where(notSoftDeletedClause(ref('record'))) - .where('post.uri', 'in', allUris.length ? allUris : ['']) - .select(['post.uri as uri', 'text']) - .execute(), - this.findBlocksAndMutes(notifs), - ]) - - const authorsByDid = authors.reduce((acc, author) => { - acc[author.did] = author - return acc - }, {} as Record) - const postsByUri = posts.reduce((acc, post) => { - acc[post.uri] = post - return acc - }, {} as Record) - - const results: NotifDisplay[] = [] - - for (const notif of notifs) { - const { - author: authorDid, - reason, - reasonSubject: subjectUri, // if like/reply/quote/mention, the post which was liked/replied to/mention is in/or quoted. if custom feed liked, the feed which was liked - recordUri, - } = notif - - const author = - authorsByDid[authorDid]?.displayName || authorsByDid[authorDid]?.handle - const postRecord = postsByUri[recordUri] - const postSubject = subjectUri ? postsByUri[subjectUri] : null - - // if blocked or muted, don't send notification - const shouldFilter = blocksAndMutes.some( - (pair) => pair.author === notif.author && pair.receiver === notif.did, - ) - if (shouldFilter || !author) { - // if no display name, dont send notification - continue - } - // const author = displayName.displayName - - // 2. Get post data content - // if follow, get the URI of the author's profile - // if reply, or mention, get URI of the postRecord - // if like, or custom feed like, or repost get the URI of the reasonSubject - const key = reason - let title = '' - let body = '' - let rateLimit = true - - // check follow first and mention first because they don't have subjectUri and return - // reply has subjectUri but the recordUri is the replied post - if (reason === 'follow') { - title = 'New follower!' - body = `${author} has followed you` - results.push({ key, title, body, notif, rateLimit }) - continue - } else if (reason === 'mention' || reason === 'reply') { - // use recordUri for mention and reply - title = - reason === 'mention' - ? `${author} mentioned you` - : `${author} replied to your post` - body = postRecord?.text || '' - rateLimit = false // always deliver - results.push({ key, title, body, notif, rateLimit }) - continue - } - - // if no subjectUri, don't send notification - // at this point, subjectUri should exist for all the other reasons - if (!postSubject) { - continue - } - - if (reason === 'like') { - title = `${author} liked your post` - body = postSubject?.text || '' - // custom feed like - const uri = subjectUri ? new AtUri(subjectUri) : null - if (uri?.collection === ids.AppBskyFeedGenerator) { - title = `${author} liked your custom feed` - body = uri?.rkey ?? '' - } - } else if (reason === 'quote') { - title = `${author} quoted your post` - body = postSubject?.text || '' - rateLimit = true // always deliver - } else if (reason === 'repost') { - title = `${author} reposted your post` - body = postSubject?.text || '' - } - - if (title === '' && body === '') { - logger.warn( - { notif }, - 'No notification display attributes found for this notification. Either profile or post data for this notification is missing.', - ) - continue - } - - results.push({ key, title, body, notif, rateLimit }) - } - - return results - } - - async findBlocksAndMutes(notifs: InsertableNotif[]) { - const pairs = notifs.map((n) => ({ author: n.author, receiver: n.did })) - const { ref } = this.db.db.dynamic - const blockQb = this.db.db - .selectFrom('actor_block') - .where((outer) => - outer - .where((qb) => - qb - .whereRef('actor_block.creator', '=', ref('author')) - .whereRef('actor_block.subjectDid', '=', ref('receiver')), - ) - .orWhere((qb) => - qb - .whereRef('actor_block.creator', '=', ref('receiver')) - .whereRef('actor_block.subjectDid', '=', ref('author')), - ), - ) - .select(['creator', 'subjectDid']) - const muteQb = this.db.db - .selectFrom('mute') - .whereRef('mute.subjectDid', '=', ref('author')) - .whereRef('mute.mutedByDid', '=', ref('receiver')) - .selectAll() - const muteListQb = this.db.db - .selectFrom('list_item') - .innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri') - .whereRef('list_mute.mutedByDid', '=', ref('receiver')) - .whereRef('list_item.subjectDid', '=', ref('author')) - .select('list_item.subjectDid') - - const values = valuesList(pairs.map((p) => sql`${p.author}, ${p.receiver}`)) - const filterPairs = await this.db.db - .selectFrom(values.as(sql`pair (author, receiver)`)) - .whereExists(muteQb) - .orWhereExists(muteListQb) - .orWhereExists(blockQb) - .selectAll() - .execute() - return filterPairs as { author: string; receiver: string }[] - } -} - -const isRecent = (isoTime: string, timeDiff: number): boolean => { - const diff = Date.now() - new Date(isoTime).getTime() - return diff < timeDiff -} - -const unique = (items: string[]) => [...new Set(items)] - -class RateLimiter { - private rateLimitCache = new TTLCache({ - max: 50000, - ttl: this.windowMs, - noUpdateTTL: true, - }) - constructor(private limit: number, private windowMs: number) {} - check(token: string, now = Date.now()) { - const key = getRateLimitKey(token, now) - const last = this.rateLimitCache.get(key) ?? 0 - const current = last + 1 - this.rateLimitCache.set(key, current) - return current <= this.limit - } -} - -const getRateLimitKey = (token: string, now: number) => { - const iteration = Math.floor(now / (20 * MINUTE)) - return `${iteration}:${token}` -} diff --git a/packages/mod-service/src/pipeline.ts b/packages/mod-service/src/pipeline.ts deleted file mode 100644 index 7798519bfa2..00000000000 --- a/packages/mod-service/src/pipeline.ts +++ /dev/null @@ -1,22 +0,0 @@ -export function createPipeline< - Params, - SkeletonState, - HydrationState extends SkeletonState, - View, - Context, ->( - skeleton: (params: Params, ctx: Context) => Promise, - hydration: (state: SkeletonState, ctx: Context) => Promise, - rules: (state: HydrationState, ctx: Context) => HydrationState, - presentation: (state: HydrationState, ctx: Context) => View, -) { - return async (params: Params, ctx: Context) => { - const skeletonState = await skeleton(params, ctx) - const hydrationState = await hydration(skeletonState, ctx) - return presentation(rules(hydrationState, ctx), ctx) - } -} - -export function noRules(state: T) { - return state -} diff --git a/packages/mod-service/src/redis.ts b/packages/mod-service/src/redis.ts deleted file mode 100644 index 3104f021e4a..00000000000 --- a/packages/mod-service/src/redis.ts +++ /dev/null @@ -1,205 +0,0 @@ -import assert from 'assert' -import { Redis as RedisDriver } from 'ioredis' - -export class Redis { - driver: RedisDriver - namespace?: string - constructor(opts: RedisOptions) { - if ('sentinel' in opts) { - assert(opts.sentinel && Array.isArray(opts.hosts) && opts.hosts.length) - this.driver = new RedisDriver({ - name: opts.sentinel, - sentinels: opts.hosts.map((h) => addressParts(h, 26379)), - password: opts.password, - db: opts.db, - commandTimeout: opts.commandTimeout, - }) - } else if ('host' in opts) { - assert(opts.host) - this.driver = new RedisDriver({ - ...addressParts(opts.host), - password: opts.password, - db: opts.db, - commandTimeout: opts.commandTimeout, - }) - } else { - assert(opts.driver) - this.driver = opts.driver - } - this.namespace = opts.namespace - } - - withNamespace(namespace: string): Redis { - return new Redis({ driver: this.driver, namespace }) - } - - async readStreams( - streams: StreamRef[], - opts: { count: number; blockMs?: number }, - ) { - const allRead = await this.driver.xreadBuffer( - 'COUNT', - opts.count, // events per stream - 'BLOCK', - opts.blockMs ?? 1000, // millis - 'STREAMS', - ...streams.map((s) => this.ns(s.key)), - ...streams.map((s) => s.cursor), - ) - const results: StreamOutput[] = [] - for (const [key, messages] of allRead ?? []) { - const result: StreamOutput = { - key: this.rmns(key.toString()), - messages: [], - } - results.push(result) - for (const [seqBuf, values] of messages) { - const message = { cursor: seqBuf.toString(), contents: {} } - result.messages.push(message) - for (let i = 0; i < values.length; ++i) { - if (i % 2 === 0) continue - const field = values[i - 1].toString() - message.contents[field] = values[i] - } - } - } - return results - } - - async addToStream( - key: string, - id: number | string, - fields: [key: string, value: string | Buffer][], - ) { - await this.driver.xadd(this.ns(key), id, ...fields.flat()) - } - - async addMultiToStream( - evts: { - key: string - id: number | string - fields: [key: string, value: string | Buffer][] - }[], - ) { - const pipeline = this.driver.pipeline() - for (const { key, id, fields } of evts) { - pipeline.xadd(this.ns(key), id, ...fields.flat()) - } - return (await pipeline.exec()) ?? [] - } - - async trimStream(key: string, cursor: number | string) { - await this.driver.xtrim(this.ns(key), 'MINID', cursor) - } - - async streamLengths(keys: string[]) { - const pipeline = this.driver.pipeline() - for (const key of keys) { - pipeline.xlen(this.ns(key)) - } - const results = await pipeline.exec() - return (results ?? []).map(([, len = 0]) => Number(len)) - } - - async get(key: string) { - return await this.driver.get(this.ns(key)) - } - - async set(key: string, val: string | number, ttlMs?: number) { - if (ttlMs !== undefined) { - await this.driver.set(this.ns(key), val, 'PX', ttlMs) - } else { - await this.driver.set(this.ns(key), val) - } - } - - async getMulti(keys: string[]) { - const namespaced = keys.map((k) => this.ns(k)) - const got = await this.driver.mget(...namespaced) - const results = {} - for (let i = 0; i < keys.length; i++) { - const key = keys[i] - results[key] = got[i] - } - return results - } - - async setMulti(vals: Record, ttlMs?: number) { - if (Object.keys(vals).length === 0) { - return - } - let builder = this.driver.multi({ pipeline: true }) - for (const key of Object.keys(vals)) { - if (ttlMs !== undefined) { - builder = builder.set(this.ns(key), vals[key], 'PX', ttlMs) - } else { - builder = builder.set(this.ns(key), vals[key]) - } - } - await builder.exec() - } - - async del(key: string) { - return await this.driver.del(this.ns(key)) - } - - async expire(key: string, seconds: number) { - return await this.driver.expire(this.ns(key), seconds) - } - - async zremrangebyscore(key: string, min: number, max: number) { - return await this.driver.zremrangebyscore(this.ns(key), min, max) - } - - async zcount(key: string, min: number, max: number) { - return await this.driver.zcount(this.ns(key), min, max) - } - - async zadd(key: string, score: number, member: number | string) { - return await this.driver.zadd(this.ns(key), score, member) - } - - async destroy() { - await this.driver.quit() - } - - // namespace redis keys - ns(key: string) { - return this.namespace ? `${this.namespace}:${key}` : key - } - - // remove namespace from redis key - rmns(key: string) { - return this.namespace && key.startsWith(`${this.namespace}:`) - ? key.replace(`${this.namespace}:`, '') - : key - } -} - -type StreamRef = { key: string; cursor: string | number } - -type StreamOutput = { - key: string - messages: { cursor: string; contents: Record }[] -} - -export type RedisOptions = ( - | { driver: RedisDriver } - | { host: string } - | { sentinel: string; hosts: string[] } -) & { - password?: string - namespace?: string - db?: number - commandTimeout?: number -} - -export function addressParts( - addr: string, - defaultPort = 6379, -): { host: string; port: number } { - const [host, portStr, ...others] = addr.split(':') - const port = portStr ? parseInt(portStr, 10) : defaultPort - assert(host && !isNaN(port) && !others.length, `invalid address: ${addr}`) - return { host, port } -} diff --git a/packages/mod-service/src/services/moderation/index.ts b/packages/mod-service/src/services/moderation/index.ts index 622fb41be0f..db548c7c6a2 100644 --- a/packages/mod-service/src/services/moderation/index.ts +++ b/packages/mod-service/src/services/moderation/index.ts @@ -30,6 +30,8 @@ import { ModerationEvent } from '../../db/schema/moderation_event' import { paginate } from '../../db/pagination' import { StatusKeyset, TimeIdKeyset } from './pagination' import AtpAgent from '@atproto/api' +import { Label } from '../../lexicon/types/com/atproto/label/defs' +import { sql } from 'kysely' export class ModerationService { constructor(public db: Database, public appviewAgent: AtpAgent) {} @@ -682,6 +684,55 @@ export class ModerationService { return !!result?.takendown } + + async formatAndCreateLabels( + src: string, + uri: string, + cid: string | null, + labels: { create?: string[]; negate?: string[] }, + ): Promise { + const { create = [], negate = [] } = labels + const toCreate = create.map((val) => ({ + src, + uri, + cid: cid ?? undefined, + val, + neg: false, + cts: new Date().toISOString(), + })) + const toNegate = negate.map((val) => ({ + src, + uri, + cid: cid ?? undefined, + val, + neg: true, + cts: new Date().toISOString(), + })) + const formatted = [...toCreate, ...toNegate] + await this.createLabels(formatted) + return formatted + } + + async createLabels(labels: Label[]) { + if (labels.length < 1) return + const dbVals = labels.map((l) => ({ + ...l, + cid: l.cid ?? '', + neg: !!l.neg, + })) + const { ref } = this.db.db.dynamic + const excluded = (col: string) => ref(`excluded.${col}`) + await this.db.db + .insertInto('label') + .values(dbVals) + .onConflict((oc) => + oc.columns(['src', 'uri', 'cid', 'val']).doUpdateSet({ + neg: sql`${excluded('neg')}`, + cts: sql`${excluded('cts')}`, + }), + ) + .execute() + } } export type TakedownSubjects = { diff --git a/packages/mod-service/src/services/moderation/views.ts b/packages/mod-service/src/services/moderation/views.ts index df4452b8c7f..045a2c8c0f4 100644 --- a/packages/mod-service/src/services/moderation/views.ts +++ b/packages/mod-service/src/services/moderation/views.ts @@ -1,6 +1,6 @@ import { sql } from 'kysely' import { ArrayEl } from '@atproto/common' -import { AtUri, INVALID_HANDLE } from '@atproto/syntax' +import { AtUri, INVALID_HANDLE, normalizeDatetimeAlways } from '@atproto/syntax' import { BlobRef } from '@atproto/lexicon' import { Database } from '../../db' import { @@ -16,12 +16,11 @@ import { AccountView, } from '../../lexicon/types/com/atproto/admin/defs' import { OutputSchema as ReportOutput } from '../../lexicon/types/com/atproto/moderation/createReport' -import { Label } from '../../lexicon/types/com/atproto/label/defs' +import { Label, isSelfLabels } from '../../lexicon/types/com/atproto/label/defs' import { ModerationEventRowWithHandle, ModerationSubjectStatusRowWithHandle, } from './types' -import { getSelfLabels } from '../label' import { REASONOTHER } from '../../lexicon/types/com/atproto/moderation/defs' import AtpAgent from '@atproto/api' @@ -506,3 +505,21 @@ function findBlobRefs(value: unknown, refs: BlobRef[] = []) { } return refs } + +export function getSelfLabels(details: { + uri: string | null + cid: string | null + record: Record | null +}): Label[] { + const { uri, cid, record } = details + if (!uri || !cid || !record) return [] + if (!isSelfLabels(record.labels)) return [] + const src = new AtUri(uri).host // record creator + const cts = + typeof record.createdAt === 'string' + ? normalizeDatetimeAlways(record.createdAt) + : new Date(0).toISOString() + return record.labels.values.map(({ val }) => { + return { src, uri, cid, val, cts, neg: false } + }) +} diff --git a/packages/mod-service/src/services/types.ts b/packages/mod-service/src/services/types.ts index 2039d6c07de..c275a86215c 100644 --- a/packages/mod-service/src/services/types.ts +++ b/packages/mod-service/src/services/types.ts @@ -1,4 +1,3 @@ -import { Database, PrimaryDatabase } from '../db' +import { Database } from '../db' export type FromDb = (db: Database) => T -export type FromDbPrimary = (db: PrimaryDatabase) => T diff --git a/packages/mod-service/src/services/util/notification.ts b/packages/mod-service/src/services/util/notification.ts deleted file mode 100644 index 811e6e41713..00000000000 --- a/packages/mod-service/src/services/util/notification.ts +++ /dev/null @@ -1,70 +0,0 @@ -import { sql } from 'kysely' -import { countAll } from '../../db/util' -import { PrimaryDatabase } from '../../db' - -// i.e. 30 days before the last time the user checked their notifs -export const BEFORE_LAST_SEEN_DAYS = 30 -// i.e. 180 days before the latest unread notification -export const BEFORE_LATEST_UNREAD_DAYS = 180 -// don't consider culling unreads until they hit this threshold, and then enforce beforeLatestUnreadThresholdDays -export const UNREAD_KEPT_COUNT = 500 - -export const tidyNotifications = async (db: PrimaryDatabase, did: string) => { - const stats = await db.db - .selectFrom('notification') - .select([ - sql<0 | 1>`("sortAt" < "lastSeenNotifs")`.as('read'), - countAll.as('count'), - sql`min("sortAt")`.as('earliestAt'), - sql`max("sortAt")`.as('latestAt'), - sql`max("lastSeenNotifs")`.as('lastSeenAt'), - ]) - .leftJoin('actor_state', 'actor_state.did', 'notification.did') - .where('notification.did', '=', did) - .groupBy(sql`1`) // group by read (i.e. 1st column) - .execute() - const readStats = stats.find((stat) => stat.read) - const unreadStats = stats.find((stat) => !stat.read) - let readCutoffAt: Date | undefined - let unreadCutoffAt: Date | undefined - if (readStats) { - readCutoffAt = addDays( - new Date(readStats.lastSeenAt), - -BEFORE_LAST_SEEN_DAYS, - ) - } - if (unreadStats && unreadStats.count > UNREAD_KEPT_COUNT) { - unreadCutoffAt = addDays( - new Date(unreadStats.latestAt), - -BEFORE_LATEST_UNREAD_DAYS, - ) - } - // take most recent of read/unread cutoffs - const cutoffAt = greatest(readCutoffAt, unreadCutoffAt) - if (cutoffAt) { - // skip delete if it wont catch any notifications - const earliestAt = least(readStats?.earliestAt, unreadStats?.earliestAt) - if (earliestAt && earliestAt < cutoffAt.toISOString()) { - await db.db - .deleteFrom('notification') - .where('did', '=', did) - .where('sortAt', '<', cutoffAt.toISOString()) - .execute() - } - } -} - -const addDays = (date: Date, days: number) => { - date.setDate(date.getDate() + days) - return date -} - -const least = (a: T | undefined, b: T | undefined) => { - return a !== undefined && (b === undefined || a < b) ? a : b -} - -const greatest = (a: T | undefined, b: T | undefined) => { - return a !== undefined && (b === undefined || a > b) ? a : b -} - -type Ordered = string | number | Date diff --git a/packages/mod-service/src/services/util/post.ts b/packages/mod-service/src/services/util/post.ts deleted file mode 100644 index 19e7fa3ee2c..00000000000 --- a/packages/mod-service/src/services/util/post.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { sql } from 'kysely' -import DatabaseSchema from '../../db/database-schema' - -export const getDescendentsQb = ( - db: DatabaseSchema, - opts: { - uri: string - depth: number // required, protects against cycles - }, -) => { - const { uri, depth } = opts - const query = db.withRecursive('descendent(uri, depth)', (cte) => { - return cte - .selectFrom('post') - .select(['post.uri as uri', sql`1`.as('depth')]) - .where(sql`1`, '<=', depth) - .where('replyParent', '=', uri) - .unionAll( - cte - .selectFrom('post') - .innerJoin('descendent', 'descendent.uri', 'post.replyParent') - .where('descendent.depth', '<', depth) - .select([ - 'post.uri as uri', - sql`descendent.depth + 1`.as('depth'), - ]), - ) - }) - return query -} - -export const getAncestorsAndSelfQb = ( - db: DatabaseSchema, - opts: { - uri: string - parentHeight: number // required, protects against cycles - }, -) => { - const { uri, parentHeight } = opts - const query = db.withRecursive( - 'ancestor(uri, ancestorUri, height)', - (cte) => { - return cte - .selectFrom('post') - .select([ - 'post.uri as uri', - 'post.replyParent as ancestorUri', - sql`0`.as('height'), - ]) - .where('uri', '=', uri) - .unionAll( - cte - .selectFrom('post') - .innerJoin('ancestor', 'ancestor.ancestorUri', 'post.uri') - .where('ancestor.height', '<', parentHeight) - .select([ - 'post.uri as uri', - 'post.replyParent as ancestorUri', - sql`ancestor.height + 1`.as('height'), - ]), - ) - }, - ) - return query -} diff --git a/packages/mod-service/src/services/util/search.ts b/packages/mod-service/src/services/util/search.ts deleted file mode 100644 index 994d2f43879..00000000000 --- a/packages/mod-service/src/services/util/search.ts +++ /dev/null @@ -1,172 +0,0 @@ -import { sql } from 'kysely' -import { InvalidRequestError } from '@atproto/xrpc-server' -import { Database } from '../../db' -import { notSoftDeletedClause, DbRef, AnyQb } from '../../db/util' -import { GenericKeyset, paginate } from '../../db/pagination' - -export const getUserSearchQuery = ( - db: Database, - opts: { - query: string - limit: number - cursor?: string - includeSoftDeleted?: boolean - }, -) => { - const { ref } = db.db.dynamic - const { query, limit, cursor, includeSoftDeleted } = opts - // Matching user accounts based on handle - const distanceAccount = distance(query, ref('handle')) - let accountsQb = getMatchingAccountsQb(db, { query, includeSoftDeleted }) - accountsQb = paginate(accountsQb, { - limit, - cursor, - direction: 'asc', - keyset: new SearchKeyset(distanceAccount, ref('actor.did')), - }) - // Matching profiles based on display name - const distanceProfile = distance(query, ref('displayName')) - let profilesQb = getMatchingProfilesQb(db, { query, includeSoftDeleted }) - profilesQb = paginate(profilesQb, { - limit, - cursor, - direction: 'asc', - keyset: new SearchKeyset(distanceProfile, ref('actor.did')), - }) - // Combine and paginate result set - return paginate(combineAccountsAndProfilesQb(db, accountsQb, profilesQb), { - limit, - cursor, - direction: 'asc', - keyset: new SearchKeyset(ref('distance'), ref('actor.did')), - }) -} - -// Takes maximal advantage of trigram index at the expense of ability to paginate. -export const getUserSearchQuerySimple = ( - db: Database, - opts: { - query: string - limit: number - }, -) => { - const { ref } = db.db.dynamic - const { query, limit } = opts - // Matching user accounts based on handle - const accountsQb = getMatchingAccountsQb(db, { query }) - .orderBy('distance', 'asc') - .limit(limit) - // Matching profiles based on display name - const profilesQb = getMatchingProfilesQb(db, { query }) - .orderBy('distance', 'asc') - .limit(limit) - // Combine and paginate result set - return paginate(combineAccountsAndProfilesQb(db, accountsQb, profilesQb), { - limit, - direction: 'asc', - keyset: new SearchKeyset(ref('distance'), ref('actor.did')), - }) -} - -// Matching user accounts based on handle -const getMatchingAccountsQb = ( - db: Database, - opts: { query: string; includeSoftDeleted?: boolean }, -) => { - const { ref } = db.db.dynamic - const { query, includeSoftDeleted } = opts - const distanceAccount = distance(query, ref('handle')) - return db.db - .selectFrom('actor') - .if(!includeSoftDeleted, (qb) => - qb.where(notSoftDeletedClause(ref('actor'))), - ) - .where('actor.handle', 'is not', null) - .where(similar(query, ref('handle'))) // Coarse filter engaging trigram index - .select(['actor.did as did', distanceAccount.as('distance')]) -} - -// Matching profiles based on display name -const getMatchingProfilesQb = ( - db: Database, - opts: { query: string; includeSoftDeleted?: boolean }, -) => { - const { ref } = db.db.dynamic - const { query, includeSoftDeleted } = opts - const distanceProfile = distance(query, ref('displayName')) - return db.db - .selectFrom('profile') - .innerJoin('actor', 'actor.did', 'profile.creator') - .if(!includeSoftDeleted, (qb) => - qb.where(notSoftDeletedClause(ref('actor'))), - ) - .where('actor.handle', 'is not', null) - .where(similar(query, ref('displayName'))) // Coarse filter engaging trigram index - .select(['profile.creator as did', distanceProfile.as('distance')]) -} - -// Combine profile and account result sets -const combineAccountsAndProfilesQb = ( - db: Database, - accountsQb: AnyQb, - profilesQb: AnyQb, -) => { - // Combine user account and profile results, taking best matches from each - const emptyQb = db.db - .selectFrom('actor') - .where(sql`1 = 0`) - .select([sql.literal('').as('did'), sql`0`.as('distance')]) - const resultsQb = db.db - .selectFrom( - emptyQb - .unionAll(sql`${accountsQb}`) // The sql`` is adding parens - .unionAll(sql`${profilesQb}`) - .as('accounts_and_profiles'), - ) - .selectAll() - .distinctOn('did') // Per did, take whichever of account and profile distance is best - .orderBy('did') - .orderBy('distance') - return db.db - .selectFrom(resultsQb.as('results')) - .innerJoin('actor', 'actor.did', 'results.did') -} - -// Remove leading @ in case a handle is input that way -export const cleanQuery = (query: string) => query.trim().replace(/^@/g, '') - -// Uses pg_trgm strict word similarity to check similarity between a search query and a stored value -const distance = (query: string, ref: DbRef) => - sql`(${query} <<-> ${ref})` - -// Can utilize trigram index to match on strict word similarity. -// The word_similarity_threshold is set to .4 (i.e. distance < .6) in db/index.ts. -const similar = (query: string, ref: DbRef) => - sql`(${query} <% ${ref})` - -type Result = { distance: number; did: string } -type LabeledResult = { primary: number; secondary: string } -export class SearchKeyset extends GenericKeyset { - labelResult(result: Result) { - return { - primary: result.distance, - secondary: result.did, - } - } - labeledResultToCursor(labeled: LabeledResult) { - return { - primary: labeled.primary.toString().replace('0.', '.'), - secondary: labeled.secondary, - } - } - cursorToLabeledResult(cursor: { primary: string; secondary: string }) { - const distance = parseFloat(cursor.primary) - if (isNaN(distance)) { - throw new InvalidRequestError('Malformed cursor') - } - return { - primary: distance, - secondary: cursor.secondary, - } - } -}