Skip to content

Commit

Permalink
✨ Apply queue filter on database query when fetching subjects (#3280)
Browse files Browse the repository at this point in the history
* ✨ Apply queue filter on database query when fetching subjects

* ✨ Add queue seed option

* 📝 Add changeset

* 🐛 Allow 0 value for queueIndex
  • Loading branch information
foysalit authored Dec 23, 2024
1 parent b4674a6 commit 9ea2cce
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .changeset/metal-falcons-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@atproto/ozone": patch
"@atproto/api": patch
---

Apply ozone queue splitting at the database query level
12 changes: 12 additions & 0 deletions lexicons/tools/ozone/moderation/queryStatuses.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@
"parameters": {
"type": "params",
"properties": {
"queueCount": {
"type": "integer",
"description": "Number of queues being used by moderators. Subjects will be split among all queues."
},
"queueIndex": {
"type": "integer",
"description": "Index of the queue to fetch subjects from. Works only when queueCount value is specified."
},
"queueSeed": {
"type": "string",
"description": "A seeder to shuffle/balance the queue items."
},
"includeAllUserRecords": {
"type": "boolean",
"description": "All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned."
Expand Down
14 changes: 14 additions & 0 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12382,6 +12382,20 @@ export const schemaDict = {
parameters: {
type: 'params',
properties: {
queueCount: {
type: 'integer',
description:
'Number of queues being used by moderators. Subjects will be split among all queues.',
},
queueIndex: {
type: 'integer',
description:
'Index of the queue to fetch subjects from. Works only when queueCount value is specified.',
},
queueSeed: {
type: 'string',
description: 'A seeder to shuffle/balance the queue items.',
},
includeAllUserRecords: {
type: 'boolean',
description:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import { CID } from 'multiformats/cid'
import * as ToolsOzoneModerationDefs from './defs'

export interface QueryParams {
/** Number of queues being used by moderators. Subjects will be split among all queues. */
queueCount?: number
/** Index of the queue to fetch subjects from. Works only when queueCount value is specified. */
queueIndex?: number
/** A seeder to shuffle/balance the queue items. */
queueSeed?: string
/** All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned. */
includeAllUserRecords?: boolean
/** The subject to get the status for. */
Expand Down
6 changes: 6 additions & 0 deletions packages/ozone/src/api/moderation/queryStatuses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ export default function (server: Server, ctx: AppContext) {
excludeTags = [],
collections = [],
subjectType,
queueCount,
queueIndex,
queueSeed,
} = params
const db = ctx.db
const modService = ctx.modService(db)
Expand Down Expand Up @@ -63,6 +66,9 @@ export default function (server: Server, ctx: AppContext) {
excludeTags,
collections,
subjectType,
queueCount,
queueIndex,
queueSeed,
})
const subjectStatuses = results.statuses.map((status) =>
modService.views.formatSubjectStatus(status),
Expand Down
14 changes: 14 additions & 0 deletions packages/ozone/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12382,6 +12382,20 @@ export const schemaDict = {
parameters: {
type: 'params',
properties: {
queueCount: {
type: 'integer',
description:
'Number of queues being used by moderators. Subjects will be split among all queues.',
},
queueIndex: {
type: 'integer',
description:
'Index of the queue to fetch subjects from. Works only when queueCount value is specified.',
},
queueSeed: {
type: 'string',
description: 'A seeder to shuffle/balance the queue items.',
},
includeAllUserRecords: {
type: 'boolean',
description:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'
import * as ToolsOzoneModerationDefs from './defs'

export interface QueryParams {
/** Number of queues being used by moderators. Subjects will be split among all queues. */
queueCount?: number
/** Index of the queue to fetch subjects from. Works only when queueCount value is specified. */
queueIndex?: number
/** A seeder to shuffle/balance the queue items. */
queueSeed?: string
/** All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned. */
includeAllUserRecords?: boolean
/** The subject to get the status for. */
Expand Down
24 changes: 24 additions & 0 deletions packages/ozone/src/mod-service/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,9 @@ export class ModerationService {
}

async getSubjectStatuses({
queueCount,
queueIndex,
queueSeed = '',
includeAllUserRecords,
cursor,
limit = 50,
Expand Down Expand Up @@ -858,6 +861,9 @@ export class ModerationService {
collections,
subjectType,
}: {
queueCount?: number
queueIndex?: number
queueSeed?: string
includeAllUserRecords?: boolean
cursor?: string
limit?: number
Expand Down Expand Up @@ -909,6 +915,24 @@ export class ModerationService {
builder = builder.where('recordPath', '!=', '')
}

// Only fetch items that belongs to the specified queue when specified
if (
!subject &&
queueCount &&
queueCount > 0 &&
queueIndex !== undefined &&
queueIndex >= 0 &&
queueIndex < queueCount
) {
builder = builder.where(
queueSeed
? sql`ABS(HASHTEXT(${queueSeed} || did)) % ${queueCount}`
: sql`ABS(HASHTEXT(did)) % ${queueCount}`,
'=',
queueIndex,
)
}

// If subjectType is set to 'account' let that take priority and ignore collections filter
if (collections.length && subjectType !== 'account') {
builder = builder.where('recordPath', '!=', '').where((qb) => {
Expand Down
14 changes: 14 additions & 0 deletions packages/pds/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12382,6 +12382,20 @@ export const schemaDict = {
parameters: {
type: 'params',
properties: {
queueCount: {
type: 'integer',
description:
'Number of queues being used by moderators. Subjects will be split among all queues.',
},
queueIndex: {
type: 'integer',
description:
'Index of the queue to fetch subjects from. Works only when queueCount value is specified.',
},
queueSeed: {
type: 'string',
description: 'A seeder to shuffle/balance the queue items.',
},
includeAllUserRecords: {
type: 'boolean',
description:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import { HandlerAuth, HandlerPipeThrough } from '@atproto/xrpc-server'
import * as ToolsOzoneModerationDefs from './defs'

export interface QueryParams {
/** Number of queues being used by moderators. Subjects will be split among all queues. */
queueCount?: number
/** Index of the queue to fetch subjects from. Works only when queueCount value is specified. */
queueIndex?: number
/** A seeder to shuffle/balance the queue items. */
queueSeed?: string
/** All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned. */
includeAllUserRecords?: boolean
/** The subject to get the status for. */
Expand Down

0 comments on commit 9ea2cce

Please sign in to comment.