diff --git a/packages/ozone/src/api/index.ts b/packages/ozone/src/api/index.ts index 49a9d70e1fa..54e52ffe292 100644 --- a/packages/ozone/src/api/index.ts +++ b/packages/ozone/src/api/index.ts @@ -8,6 +8,8 @@ import getRepo from './admin/getRepo' import queryModerationStatuses from './admin/queryModerationStatuses' import queryModerationEvents from './admin/queryModerationEvents' import getModerationEvent from './admin/getModerationEvent' +import queryLabels from './label/queryLabels' +import subscribeLabels from './label/subscribeLabels' import fetchLabels from './temp/fetchLabels' import createCommunicationTemplate from './admin/createCommunicationTemplate' import updateCommunicationTemplate from './admin/updateCommunicationTemplate' @@ -27,6 +29,8 @@ export default function (server: Server, ctx: AppContext) { getModerationEvent(server, ctx) queryModerationEvents(server, ctx) queryModerationStatuses(server, ctx) + queryLabels(server, ctx) + subscribeLabels(server, ctx) fetchLabels(server, ctx) listCommunicationTemplates(server, ctx) createCommunicationTemplate(server, ctx) diff --git a/packages/ozone/src/api/label/queryLabels.ts b/packages/ozone/src/api/label/queryLabels.ts new file mode 100644 index 00000000000..6de6380d194 --- /dev/null +++ b/packages/ozone/src/api/label/queryLabels.ts @@ -0,0 +1,58 @@ +import { Server } from '../../lexicon' +import AppContext from '../../context' +import { InvalidRequestError } from '@atproto/xrpc-server' +import { sql } from 'kysely' +import { formatLabel } from '../../mod-service/util' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.label.queryLabels(async ({ params }) => { + const { uriPatterns, sources, limit, cursor } = params + let builder = ctx.db.db.selectFrom('label').selectAll().limit(limit) + // if includes '*', then we don't need a where clause + if (!uriPatterns.includes('*')) { + builder = builder.where((qb) => { + // starter where clause that is always false so that we can chain `orWhere`s + qb = qb.where(sql`1 = 0`) + for (const pattern of uriPatterns) { + // if no '*', then we're looking for an exact match + if (!pattern.includes('*')) { + qb = qb.orWhere('uri', '=', pattern) + } else { + if (pattern.indexOf('*') < pattern.length - 1) { + throw new InvalidRequestError(`invalid pattern: ${pattern}`) + } + const searchPattern = pattern + .slice(0, -1) + .replaceAll('%', '') // sanitize search pattern + .replaceAll('_', '\\_') // escape any underscores + qb = qb.orWhere('uri', 'like', `${searchPattern}%`) + } + } + return qb + }) + } + if (sources && sources.length > 0) { + builder = builder.where('src', 'in', sources) + } + if (cursor) { + const cursorId = parseInt(cursor, 10) + if (isNaN(cursorId)) { + throw new InvalidRequestError('invalid cursor') + } + builder = builder.where('id', '>', cursorId) + } + + const res = await builder.execute() + + const labels = res.map((l) => formatLabel(l)) + const resCursor = res.at(-1)?.id.toString(10) + + return { + encoding: 'application/json', + body: { + cursor: resCursor, + labels, + }, + } + }) +} diff --git a/packages/ozone/src/api/label/subscribeLabels.ts b/packages/ozone/src/api/label/subscribeLabels.ts new file mode 100644 index 00000000000..7efb339d488 --- /dev/null +++ b/packages/ozone/src/api/label/subscribeLabels.ts @@ -0,0 +1,25 @@ +import { Server } from '../../lexicon' +import AppContext from '../../context' +import Outbox from '../../sequencer/outbox' +import { InvalidRequestError } from '@atproto/xrpc-server' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.label.subscribeLabels(async function* ({ + params, + signal, + }) { + const { cursor } = params + const outbox = new Outbox(ctx.sequencer) + + if (cursor !== undefined) { + const curr = await ctx.sequencer.curr() + if (cursor > (curr ?? 0)) { + throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') + } + } + + for await (const evt of outbox.events(cursor, signal)) { + yield evt + } + }) +} diff --git a/packages/ozone/src/api/temp/fetchLabels.ts b/packages/ozone/src/api/temp/fetchLabels.ts index f11cb2028bb..fd0331487d1 100644 --- a/packages/ozone/src/api/temp/fetchLabels.ts +++ b/packages/ozone/src/api/temp/fetchLabels.ts @@ -1,5 +1,6 @@ import { Server } from '../../lexicon' import AppContext from '../../context' +import { formatLabel } from '../../mod-service/util' import { UNSPECCED_TAKEDOWN_BLOBS_LABEL, UNSPECCED_TAKEDOWN_LABEL, @@ -28,10 +29,7 @@ export default function (server: Server, ctx: AppContext) { .limit(limit) .execute() - const labels = labelRes.map((l) => ({ - ...l, - cid: l.cid === '' ? undefined : l.cid, - })) + const labels = labelRes.map((l) => formatLabel(l)) return { encoding: 'application/json', diff --git a/packages/ozone/src/config/config.ts b/packages/ozone/src/config/config.ts index caa799b2a90..32ed8ba5cb5 100644 --- a/packages/ozone/src/config/config.ts +++ b/packages/ozone/src/config/config.ts @@ -19,6 +19,9 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => { const dbCfg: OzoneConfig['db'] = { postgresUrl: env.dbPostgresUrl, postgresSchema: env.dbPostgresSchema, + poolSize: env.dbPoolSize, + poolMaxUses: env.dbPoolMaxUses, + poolIdleTimeoutMs: env.dbPoolIdleTimeoutMs, } assert(env.appviewUrl) @@ -67,6 +70,9 @@ export type ServiceConfig = { export type DatabaseConfig = { postgresUrl: string postgresSchema?: string + poolSize?: number + poolMaxUses?: number + poolIdleTimeoutMs?: number } export type AppviewConfig = { diff --git a/packages/ozone/src/config/env.ts b/packages/ozone/src/config/env.ts index 4f96ba63d53..b0ad10074eb 100644 --- a/packages/ozone/src/config/env.ts +++ b/packages/ozone/src/config/env.ts @@ -13,6 +13,9 @@ export const readEnv = (): OzoneEnvironment => { pdsDid: envStr('OZONE_PDS_DID'), dbPostgresUrl: envStr('OZONE_DB_POSTGRES_URL'), dbPostgresSchema: envStr('OZONE_DB_POSTGRES_SCHEMA'), + dbPoolSize: envInt('OZONE_DB_POOL_SIZE'), + dbPoolMaxUses: envInt('OZONE_DB_POOL_MAX_USES'), + dbPoolIdleTimeoutMs: envInt('OZONE_DB_POOL_IDLE_TIMEOUT_MS'), didPlcUrl: envStr('OZONE_DID_PLC_URL'), adminPassword: envStr('OZONE_ADMIN_PASSWORD'), moderatorPassword: envStr('OZONE_MODERATOR_PASSWORD'), @@ -33,6 +36,9 @@ export type OzoneEnvironment = { pdsDid?: string dbPostgresUrl?: string dbPostgresSchema?: string + dbPoolSize?: number + dbPoolMaxUses?: number + dbPoolIdleTimeoutMs?: number didPlcUrl?: string adminPassword?: string moderatorPassword?: string diff --git a/packages/ozone/src/context.ts b/packages/ozone/src/context.ts index 1cb0ec1bd83..00ef4bd71ba 100644 --- a/packages/ozone/src/context.ts +++ b/packages/ozone/src/context.ts @@ -10,6 +10,7 @@ import * as auth from './auth' import { BackgroundQueue } from './background' import assert from 'assert' import { EventPusher } from './daemon' +import Sequencer from './sequencer/sequencer' import { CommunicationTemplateService, CommunicationTemplateServiceCreator, @@ -25,6 +26,7 @@ export type AppContextOptions = { signingKey: Keypair idResolver: IdResolver backgroundQueue: BackgroundQueue + sequencer: Sequencer } export class AppContext { @@ -38,6 +40,9 @@ export class AppContext { const db = new Database({ url: cfg.db.postgresUrl, schema: cfg.db.postgresSchema, + poolSize: cfg.db.poolSize, + poolMaxUses: cfg.db.poolMaxUses, + poolIdleTimeoutMs: cfg.db.poolIdleTimeoutMs, }) const signingKey = await Secp256k1Keypair.import(secrets.signingKeyHex) const appviewAgent = new AtpAgent({ service: cfg.appview.url }) @@ -74,6 +79,8 @@ export class AppContext { plcUrl: cfg.identity.plcUrl, }) + const sequencer = new Sequencer(db) + return new AppContext( { db, @@ -85,6 +92,7 @@ export class AppContext { signingKey, idResolver, backgroundQueue, + sequencer, ...(overrides ?? {}), }, secrets, @@ -139,6 +147,10 @@ export class AppContext { return this.opts.backgroundQueue } + get sequencer(): Sequencer { + return this.opts.sequencer + } + get authVerifier() { return auth.authVerifier(this.idResolver, { aud: this.cfg.service.did }) } diff --git a/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts b/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts index f636f40a3f4..e08c80686d2 100644 --- a/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts +++ b/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts @@ -68,13 +68,19 @@ export async function up(db: Kysely): Promise { // Label await db.schema .createTable('label') + .addColumn('id', 'bigserial', (col) => col.primaryKey()) .addColumn('src', 'varchar', (col) => col.notNull()) .addColumn('uri', 'varchar', (col) => col.notNull()) .addColumn('cid', 'varchar', (col) => col.notNull()) .addColumn('val', 'varchar', (col) => col.notNull()) .addColumn('neg', 'boolean', (col) => col.notNull()) .addColumn('cts', 'varchar', (col) => col.notNull()) - .addPrimaryKeyConstraint('label_pkey', ['src', 'uri', 'cid', 'val']) + .execute() + await db.schema + .createIndex('unique_label_idx') + .unique() + .on('label') + .columns(['src', 'uri', 'cid', 'val']) .execute() await db.schema .createIndex('label_uri_index') diff --git a/packages/ozone/src/db/schema/label.ts b/packages/ozone/src/db/schema/label.ts index 0c8a398a7db..f50a6119ab3 100644 --- a/packages/ozone/src/db/schema/label.ts +++ b/packages/ozone/src/db/schema/label.ts @@ -1,6 +1,9 @@ +import { Generated, Selectable } from 'kysely' + export const tableName = 'label' export interface Label { + id: Generated src: string uri: string cid: string @@ -9,4 +12,8 @@ export interface Label { cts: string } +export type LabelRow = Selectable