diff --git a/.changeset/metal-falcons-fail.md b/.changeset/metal-falcons-fail.md new file mode 100644 index 00000000000..80aff90f57b --- /dev/null +++ b/.changeset/metal-falcons-fail.md @@ -0,0 +1,6 @@ +--- +"@atproto/ozone": patch +"@atproto/api": patch +--- + +Apply ozone queue splitting at the database query level diff --git a/lexicons/tools/ozone/moderation/queryStatuses.json b/lexicons/tools/ozone/moderation/queryStatuses.json index 13d2a28f067..641d01cac1b 100644 --- a/lexicons/tools/ozone/moderation/queryStatuses.json +++ b/lexicons/tools/ozone/moderation/queryStatuses.json @@ -8,6 +8,18 @@ "parameters": { "type": "params", "properties": { + "queueCount": { + "type": "integer", + "description": "Number of queues being used by moderators. Subjects will be split among all queues." + }, + "queueIndex": { + "type": "integer", + "description": "Index of the queue to fetch subjects from. Works only when queueCount value is specified." + }, + "queueSeed": { + "type": "string", + "description": "A seeder to shuffle/balance the queue items." + }, "includeAllUserRecords": { "type": "boolean", "description": "All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned." diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index 1d0efff5611..c17d53bdd57 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -12382,6 +12382,20 @@ export const schemaDict = { parameters: { type: 'params', properties: { + queueCount: { + type: 'integer', + description: + 'Number of queues being used by moderators. Subjects will be split among all queues.', + }, + queueIndex: { + type: 'integer', + description: + 'Index of the queue to fetch subjects from. Works only when queueCount value is specified.', + }, + queueSeed: { + type: 'string', + description: 'A seeder to shuffle/balance the queue items.', + }, includeAllUserRecords: { type: 'boolean', description: diff --git a/packages/api/src/client/types/tools/ozone/moderation/queryStatuses.ts b/packages/api/src/client/types/tools/ozone/moderation/queryStatuses.ts index 2db3b9c0a60..2af6808ed19 100644 --- a/packages/api/src/client/types/tools/ozone/moderation/queryStatuses.ts +++ b/packages/api/src/client/types/tools/ozone/moderation/queryStatuses.ts @@ -9,6 +9,12 @@ import { CID } from 'multiformats/cid' import * as ToolsOzoneModerationDefs from './defs' export interface QueryParams { + /** Number of queues being used by moderators. Subjects will be split among all queues. */ + queueCount?: number + /** Index of the queue to fetch subjects from. Works only when queueCount value is specified. */ + queueIndex?: number + /** A seeder to shuffle/balance the queue items. */ + queueSeed?: string /** All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned. */ includeAllUserRecords?: boolean /** The subject to get the status for. */ diff --git a/packages/ozone/src/api/moderation/queryStatuses.ts b/packages/ozone/src/api/moderation/queryStatuses.ts index 00929ff272b..c82fdee7091 100644 --- a/packages/ozone/src/api/moderation/queryStatuses.ts +++ b/packages/ozone/src/api/moderation/queryStatuses.ts @@ -33,6 +33,9 @@ export default function (server: Server, ctx: AppContext) { excludeTags = [], collections = [], subjectType, + queueCount, + queueIndex, + queueSeed, } = params const db = ctx.db const modService = ctx.modService(db) @@ -63,6 +66,9 @@ export default function (server: Server, ctx: AppContext) { excludeTags, collections, subjectType, + queueCount, + queueIndex, + queueSeed, }) const subjectStatuses = results.statuses.map((status) => modService.views.formatSubjectStatus(status), diff --git a/packages/ozone/src/lexicon/lexicons.ts b/packages/ozone/src/lexicon/lexicons.ts index 1d0efff5611..c17d53bdd57 100644 --- a/packages/ozone/src/lexicon/lexicons.ts +++ b/packages/ozone/src/lexicon/lexicons.ts @@ -12382,6 +12382,20 @@ export const schemaDict = { parameters: { type: 'params', properties: { + queueCount: { + type: 'integer', + description: + 'Number of queues being used by moderators. Subjects will be split among all queues.', + }, + queueIndex: { + type: 'integer', + description: + 'Index of the queue to fetch subjects from. Works only when queueCount value is specified.', + }, + queueSeed: { + type: 'string', + description: 'A seeder to shuffle/balance the queue items.', + }, includeAllUserRecords: { type: 'boolean', description: diff --git a/packages/ozone/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts b/packages/ozone/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts index f1d5bb61690..0ccd1030c0b 100644 --- a/packages/ozone/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts +++ b/packages/ozone/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts @@ -10,6 +10,12 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server' import * as ToolsOzoneModerationDefs from './defs' export interface QueryParams { + /** Number of queues being used by moderators. Subjects will be split among all queues. */ + queueCount?: number + /** Index of the queue to fetch subjects from. Works only when queueCount value is specified. */ + queueIndex?: number + /** A seeder to shuffle/balance the queue items. */ + queueSeed?: string /** All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned. */ includeAllUserRecords?: boolean /** The subject to get the status for. */ diff --git a/packages/ozone/src/mod-service/index.ts b/packages/ozone/src/mod-service/index.ts index 825dc721ae7..978015f9d86 100644 --- a/packages/ozone/src/mod-service/index.ts +++ b/packages/ozone/src/mod-service/index.ts @@ -831,6 +831,9 @@ export class ModerationService { } async getSubjectStatuses({ + queueCount, + queueIndex, + queueSeed = '', includeAllUserRecords, cursor, limit = 50, @@ -858,6 +861,9 @@ export class ModerationService { collections, subjectType, }: { + queueCount?: number + queueIndex?: number + queueSeed?: string includeAllUserRecords?: boolean cursor?: string limit?: number @@ -909,6 +915,24 @@ export class ModerationService { builder = builder.where('recordPath', '!=', '') } + // Only fetch items that belongs to the specified queue when specified + if ( + !subject && + queueCount && + queueCount > 0 && + queueIndex !== undefined && + queueIndex >= 0 && + queueIndex < queueCount + ) { + builder = builder.where( + queueSeed + ? sql`ABS(HASHTEXT(${queueSeed} || did)) % ${queueCount}` + : sql`ABS(HASHTEXT(did)) % ${queueCount}`, + '=', + queueIndex, + ) + } + // If subjectType is set to 'account' let that take priority and ignore collections filter if (collections.length && subjectType !== 'account') { builder = builder.where('recordPath', '!=', '').where((qb) => { diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index 1d0efff5611..c17d53bdd57 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -12382,6 +12382,20 @@ export const schemaDict = { parameters: { type: 'params', properties: { + queueCount: { + type: 'integer', + description: + 'Number of queues being used by moderators. Subjects will be split among all queues.', + }, + queueIndex: { + type: 'integer', + description: + 'Index of the queue to fetch subjects from. Works only when queueCount value is specified.', + }, + queueSeed: { + type: 'string', + description: 'A seeder to shuffle/balance the queue items.', + }, includeAllUserRecords: { type: 'boolean', description: diff --git a/packages/pds/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts b/packages/pds/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts index f1d5bb61690..0ccd1030c0b 100644 --- a/packages/pds/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts +++ b/packages/pds/src/lexicon/types/tools/ozone/moderation/queryStatuses.ts @@ -10,6 +10,12 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server' import * as ToolsOzoneModerationDefs from './defs' export interface QueryParams { + /** Number of queues being used by moderators. Subjects will be split among all queues. */ + queueCount?: number + /** Index of the queue to fetch subjects from. Works only when queueCount value is specified. */ + queueIndex?: number + /** A seeder to shuffle/balance the queue items. */ + queueSeed?: string /** All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned. */ includeAllUserRecords?: boolean /** The subject to get the status for. */