Skip to content

Commit

Permalink
getRecord & getRepo mostly working
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Dec 20, 2023
1 parent df4d40b commit b121af1
Show file tree
Hide file tree
Showing 18 changed files with 257 additions and 112 deletions.
42 changes: 42 additions & 0 deletions packages/bsky/src/api/com/atproto/admin/getAccountInfos.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { Actor } from '../../../../db/tables/actor'
import { mapDefined } from '@atproto/common'
import { INVALID_HANDLE } from '@atproto/syntax'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.admin.getAccountInfos({
auth: ctx.roleVerifier,
handler: async ({ params }) => {
const { dids } = params
const db = ctx.db.getPrimary()
const actorService = ctx.services.actor(db)
const [actors, profiles] = await Promise.all([
actorService.getActors(dids, true),
actorService.getProfileRecords(dids, true),
])
const actorByDid = actors.reduce((acc, cur) => {
return acc.set(cur.did, cur)
}, new Map<string, Actor>())

const infos = mapDefined(dids, (did) => {
const info = actorByDid.get(did)
if (!info) return
const profile = profiles.get(did)
return {
did,
handle: info.handle ?? INVALID_HANDLE,
relatedRecords: profile ? [profile] : undefined,
indexedAt: info.indexedAt,
}
})

return {
encoding: 'application/json',
body: {
infos,
},
}
},
})
}
2 changes: 2 additions & 0 deletions packages/bsky/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import emitModerationEvent from './com/atproto/admin/emitModerationEvent'
import searchRepos from './com/atproto/admin/searchRepos'
import adminGetRecord from './com/atproto/admin/getRecord'
import getRepo from './com/atproto/admin/getRepo'
import getAccountInfos from './com/atproto/admin/getAccountInfos'
import queryModerationStatuses from './com/atproto/admin/queryModerationStatuses'
import resolveHandle from './com/atproto/identity/resolveHandle'
import getRecord from './com/atproto/repo/getRecord'
Expand Down Expand Up @@ -106,6 +107,7 @@ export default function (server: Server, ctx: AppContext) {
searchRepos(server, ctx)
adminGetRecord(server, ctx)
getRepo(server, ctx)
getAccountInfos(server, ctx)
getModerationEvent(server, ctx)
queryModerationEvents(server, ctx)
queryModerationStatuses(server, ctx)
Expand Down
22 changes: 22 additions & 0 deletions packages/bsky/src/services/actor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { SearchKeyset, getUserSearchQuery } from '../util/search'
import { FromDb } from '../types'
import { GraphService } from '../graph'
import { LabelService } from '../label'
import { AtUri } from '@atproto/syntax'
import { ids } from '../../lexicon/lexicons'

export * from './types'

Expand Down Expand Up @@ -96,6 +98,26 @@ export class ActorService {
})
}

async getProfileRecords(dids: string[], includeSoftDeleted = false) {
if (dids.length === 0) return new Map()
const profileUris = dids.map((did) =>
AtUri.make(did, ids.AppBskyActorProfile, 'self').toString(),
)
const { ref } = this.db.db.dynamic
const res = await this.db.db
.selectFrom('record')
.innerJoin('actor', 'actor.did', 'record.did')
.if(!includeSoftDeleted, (qb) =>
qb.where(notSoftDeletedClause(ref('actor'))),
)
.where('uri', 'in', profileUris)
.select(['record.did', 'record.json'])
.execute()
return res.reduce((acc, cur) => {
return acc.set(cur.did, JSON.parse(cur.json))
}, new Map<string, JSON>())
}

async getSearchResults({
cursor,
limit = 25,
Expand Down
24 changes: 14 additions & 10 deletions packages/dev-env/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ export class TestNetwork extends TestNetworkNoAppView {

const plc = await TestPlc.create(params.plc ?? {})

let ozone: TestOzone | undefined = undefined
if (params.ozone?.enabled) {
ozone = await TestOzone.create({
plcUrl: plc.url,
dbPostgresSchema: `ozone_${dbPostgresSchema}`,
dbPrimaryPostgresUrl: dbPostgresUrl,
...params.ozone,
})
}

const bskyPort = params.bsky?.port ?? (await getPort())
const pdsPort = params.pds?.port ?? (await getPort())
const bsky = await TestBsky.create({
Expand All @@ -59,6 +49,19 @@ export class TestNetwork extends TestNetworkNoAppView {
moderationPushUrl: `http://admin:${ADMIN_PASSWORD}@localhost:${pdsPort}`,
...params.bsky,
})

let ozone: TestOzone | undefined = undefined
if (params.ozone?.enabled) {
ozone = await TestOzone.create({
plcUrl: plc.url,
dbPostgresSchema: `ozone_${dbPostgresSchema}`,
dbPrimaryPostgresUrl: dbPostgresUrl,
appviewUrl: bsky.url,
moderationPushUrl: `http://admin:${ADMIN_PASSWORD}@localhost:${pdsPort}`, // @TODO fix this
...params.ozone,
})
}

const pds = await TestPds.create({
port: pdsPort,
didPlcUrl: plc.url,
Expand Down Expand Up @@ -126,6 +129,7 @@ export class TestNetwork extends TestNetworkNoAppView {

async close() {
await Promise.all(this.feedGens.map((fg) => fg.close()))
await this.ozone?.close()
await this.bsky.close()
await this.pds.close()
await this.plc.close()
Expand Down
14 changes: 6 additions & 8 deletions packages/dev-env/src/ozone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ export class TestOzone {
labelCacheStaleTTL: 30 * SECOND,
labelCacheMaxTTL: MINUTE,
...cfg,
// Each test suite gets its own lock id for the repo subscription
adminPassword: ADMIN_PASSWORD,
moderatorPassword: MOD_PASSWORD,
triagePassword: TRIAGE_PASSWORD,
Expand All @@ -49,13 +48,6 @@ export class TestOzone {
rateLimitsEnabled: false,
})

// shared across server, ingester, and indexer in order to share pool, avoid too many pg connections.
const db = new ozone.Database({
schema: cfg.dbPostgresSchema,
url: cfg.dbPrimaryPostgresUrl,
poolSize: 10,
})

// Separate migration db in case migration changes some connection state that we need in the tests, e.g. "alter database ... set ..."
const migrationDb = new ozone.Database({
schema: cfg.dbPostgresSchema,
Expand All @@ -68,6 +60,12 @@ export class TestOzone {
}
await migrationDb.close()

const db = new ozone.Database({
schema: cfg.dbPostgresSchema,
url: cfg.dbPrimaryPostgresUrl,
poolSize: 10,
})

// api server
const server = ozone.OzoneService.create({
db,
Expand Down
1 change: 1 addition & 0 deletions packages/dev-env/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export type BskyConfig = Partial<bsky.ServerConfig> & {
export type OzoneConfig = Partial<ozone.ServerConfig> & {
enabled?: boolean
plcUrl: string
appviewUrl: string
dbPrimaryPostgresUrl: string
migration?: string
}
Expand Down
8 changes: 8 additions & 0 deletions packages/ozone/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface ServerConfigValues {
publicUrl?: string
serverDid: string
feedGenDid?: string
appviewUrl: string
dbPrimaryPostgresUrl: string
dbReplicaPostgresUrls?: string[]
dbReplicaTags?: Record<string, number[]> // E.g. { timeline: [0], thread: [1] }
Expand Down Expand Up @@ -53,6 +54,8 @@ export class ServerConfig {
const feedGenDid = process.env.FEED_GEN_DID
const envPort = parseInt(process.env.PORT || '', 10)
const port = isNaN(envPort) ? 2584 : envPort
const appviewUrl = process.env.APPVIEW_URL
assert(appviewUrl)
const redisHost =
overrides?.redisHost || process.env.REDIS_HOST || undefined
const redisSentinelName =
Expand Down Expand Up @@ -133,6 +136,7 @@ export class ServerConfig {
publicUrl,
serverDid,
feedGenDid,
appviewUrl,
dbPrimaryPostgresUrl,
dbReplicaPostgresUrls,
dbReplicaTags,
Expand Down Expand Up @@ -199,6 +203,10 @@ export class ServerConfig {
return this.cfg.feedGenDid
}

get appviewUrl() {
return this.cfg.appviewUrl
}

get dbPrimaryPostgresUrl() {
return this.cfg.dbPrimaryPostgresUrl
}
Expand Down
45 changes: 38 additions & 7 deletions packages/ozone/src/db/migrations/20231219T205730722Z-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,50 @@ export async function up(db: Kysely<unknown>): Promise<void> {
.column('uri')
.execute()

// PushEvent
// Push Events
await db.schema
.createTable('push_event')
.createTable('repo_push_event')
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('subjectDid', 'varchar', (col) => col.notNull())
.addColumn('subjectUri', 'varchar')
.addColumn('takedownId', 'integer')
.addColumn('confirmedAt', 'varchar')
.addPrimaryKeyConstraint('repo_push_event_pkey', [
'subjectDid',
'eventType',
])
.execute()

await db.schema
.createTable('record_push_event')
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('subjectDid', 'varchar', (col) => col.notNull())
.addColumn('subjectUri', 'varchar', (col) => col.notNull())
.addColumn('subjectCid', 'varchar')
.addColumn('subjectBlobCid', 'varchar')
.addColumn('takedownId', 'integer')
.addColumn('confirmedAt', 'varchar')
.addPrimaryKeyConstraint('push_event_pkey', [
.addPrimaryKeyConstraint('record_push_event_pkey', [
'subjectUri',
'eventType',
])
.execute()
await db.schema
.createIndex('record_push_event_did_type_idx')
.on('record_push_event')
.columns(['subjectDid', 'eventType'])
.execute()

await db.schema
.createTable('blob_push_event')
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('subjectDid', 'varchar', (col) => col.notNull())
.addColumn('subjectBlobCid', 'varchar', (col) => col.notNull())
.addColumn('subjectUri', 'varchar')
.addColumn('takedownId', 'integer')
.addColumn('confirmedAt', 'varchar')
.addPrimaryKeyConstraint('blob_push_event_pkey', [
'subjectDid',
'subjectUri',
'subjectBlobCid',
'eventType',
])
.execute()
}
Expand All @@ -103,5 +132,7 @@ export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('moderation_event').execute()
await db.schema.dropTable('moderation_subject_status').execute()
await db.schema.dropTable('label').execute()
await db.schema.dropTable('push_event').execute()
await db.schema.dropTable('repo_push_event').execute()
await db.schema.dropTable('record_push_event').execute()
await db.schema.dropTable('blob_push_event').execute()
}
16 changes: 16 additions & 0 deletions packages/ozone/src/db/schema/blob_push_event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const eventTableName = 'blob_push_event'

export type BlobPushEventType = 'takedown'

export interface BlobPushEvent {
eventType: BlobPushEventType
subjectDid: string
subjectBlobCid: string
subjectUri: string | null
takedownId: number | null
confirmedAt: string | null
}

export type PartialDB = {
[eventTableName]: BlobPushEvent
}
10 changes: 7 additions & 3 deletions packages/ozone/src/db/schema/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import { Kysely } from 'kysely'
import * as modEvent from './moderation_event'
import * as modSubjectStatus from './moderation_subject_status'
import * as pushEvent from './push_event'
import * as repoPushEvent from './repo_push_event'
import * as recordPushEvent from './record_push_event'
import * as blobPushEvent from './blob_push_event'
import * as label from './label'

export type DatabaseSchemaType = modEvent.PartialDB &
modSubjectStatus.PartialDB &
pushEvent.PartialDB &
label.PartialDB
label.PartialDB &
repoPushEvent.PartialDB &
recordPushEvent.PartialDB &
blobPushEvent.PartialDB

export type DatabaseSchema = Kysely<DatabaseSchemaType>

Expand Down
20 changes: 0 additions & 20 deletions packages/ozone/src/db/schema/push_event.ts

This file was deleted.

16 changes: 16 additions & 0 deletions packages/ozone/src/db/schema/record_push_event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const eventTableName = 'record_push_event'

export type RecordPushEventType = 'takedown'

export interface RecordPushEvent {
eventType: RecordPushEventType
subjectDid: string
subjectUri: string
subjectCid: string | null
takedownId: number | null
confirmedAt: string | null
}

export type PartialDB = {
[eventTableName]: RecordPushEvent
}
14 changes: 14 additions & 0 deletions packages/ozone/src/db/schema/repo_push_event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export const eventTableName = 'repo_push_event'

export type RepoPushEventType = 'takedown'

export interface RepoPushEvent {
eventType: RepoPushEventType
subjectDid: string
takedownId: number | null
confirmedAt: string | null
}

export type PartialDB = {
[eventTableName]: RepoPushEvent
}
Loading

0 comments on commit b121af1

Please sign in to comment.