Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Oct 3, 2023
1 parent 7d1e3b2 commit 99862e3
Show file tree
Hide file tree
Showing 25 changed files with 596 additions and 242 deletions.
5 changes: 5 additions & 0 deletions packages/pds/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import { Crawlers } from './crawlers'
import { DiskBlobStore } from './storage'
import { getRedisClient } from './redis'
import { RuntimeFlags } from './runtime-flags'
import { UserDbCoordinator } from './user-db'

export type AppContextOptions = {
db: Database
userDb: UserDbCoordinator
blobstore: BlobStore
mailer: ServerMailer
moderationMailer: ModerationMailer
Expand All @@ -46,6 +48,7 @@ export type AppContextOptions = {

export class AppContext {
public db: Database
public userDb: UserDbCoordinator
public blobstore: BlobStore
public mailer: ServerMailer
public moderationMailer: ModerationMailer
Expand Down Expand Up @@ -102,6 +105,7 @@ export class AppContext {
poolMaxUses: cfg.db.pool.maxUses,
poolIdleTimeoutMs: cfg.db.pool.idleTimeoutMs,
})
const userDb = new UserDbCoordinator('./users')
const blobstore =
cfg.blobstore.provider === 's3'
? new S3BlobStore({ bucket: cfg.blobstore.bucket })
Expand Down Expand Up @@ -190,6 +194,7 @@ export class AppContext {

return new AppContext({
db,
userDb,
blobstore,
mailer,
moderationMailer,
Expand Down
12 changes: 0 additions & 12 deletions packages/pds/src/db/database-schema.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import { Kysely } from 'kysely'
import * as userAccount from './tables/user-account'
import * as userPref from './tables/user-pref'
import * as didHandle from './tables/did-handle'
import * as repoRoot from './tables/repo-root'
import * as didCache from './tables/did-cache'
import * as refreshToken from './tables/refresh-token'
import * as appPassword from './tables/app-password'
import * as record from './tables/record'
import * as backlink from './tables/backlink'
import * as ipldBlock from './tables/ipld-block'
import * as inviteCode from './tables/invite-code'
import * as blob from './tables/blob'
import * as repoBlob from './tables/repo-blob'
import * as emailToken from './tables/email-token'
import * as moderation from './tables/moderation'
import * as repoSeq from './tables/repo-seq'
Expand All @@ -21,18 +15,12 @@ import * as runtimeFlag from './tables/runtime-flag'
export type DatabaseSchemaType = appMigration.PartialDB &
runtimeFlag.PartialDB &
userAccount.PartialDB &
userPref.PartialDB &
didHandle.PartialDB &
refreshToken.PartialDB &
appPassword.PartialDB &
repoRoot.PartialDB &
didCache.PartialDB &
record.PartialDB &
backlink.PartialDB &
ipldBlock.PartialDB &
inviteCode.PartialDB &
blob.PartialDB &
repoBlob.PartialDB &
emailToken.PartialDB &
moderation.PartialDB &
repoSeq.PartialDB
Expand Down
1 change: 1 addition & 0 deletions packages/pds/src/db/tables/user-account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Generated, Selectable } from 'kysely'

export interface UserAccount {
did: string
handle: string | null
email: string
passwordScrypt: string
createdAt: string
Expand Down
70 changes: 0 additions & 70 deletions packages/pds/src/services/account/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,74 +524,8 @@ export class AccountService {
}
return res.did
}

async getPreferences(
did: string,
namespace?: string,
): Promise<UserPreference[]> {
const prefsRes = await this.db.db
.selectFrom('user_pref')
.where('did', '=', did)
.orderBy('id')
.selectAll()
.execute()
return prefsRes
.filter((pref) => !namespace || matchNamespace(namespace, pref.name))
.map((pref) => JSON.parse(pref.valueJson))
}

async putPreferences(
did: string,
values: UserPreference[],
namespace: string,
): Promise<void> {
this.db.assertTransaction()
if (!values.every((value) => matchNamespace(namespace, value.$type))) {
throw new InvalidRequestError(
`Some preferences are not in the ${namespace} namespace`,
)
}
// short-held row lock to prevent races
if (this.db.dialect === 'pg') {
await this.db.db
.selectFrom('user_account')
.selectAll()
.forUpdate()
.where('did', '=', did)
.executeTakeFirst()
}
// get all current prefs for user and prep new pref rows
const allPrefs = await this.db.db
.selectFrom('user_pref')
.where('did', '=', did)
.select(['id', 'name'])
.execute()
const putPrefs = values.map((value) => {
return {
did,
name: value.$type,
valueJson: JSON.stringify(value),
}
})
const allPrefIdsInNamespace = allPrefs
.filter((pref) => matchNamespace(namespace, pref.name))
.map((pref) => pref.id)
// replace all prefs in given namespace
if (allPrefIdsInNamespace.length) {
await this.db.db
.deleteFrom('user_pref')
.where('did', '=', did)
.where('id', 'in', allPrefIdsInNamespace)
.execute()
}
if (putPrefs.length) {
await this.db.db.insertInto('user_pref').values(putPrefs).execute()
}
}
}

export type UserPreference = Record<string, unknown> & { $type: string }

export type CodeDetail = {
code: string
available: number
Expand All @@ -618,8 +552,4 @@ export class ListKeyset extends TimeCidKeyset<{
}
}

const matchNamespace = (namespace: string, fullname: string) => {
return fullname === namespace || fullname.startsWith(`${namespace}.`)
}

export type HandleSequenceToken = { did: string; handle: string }
35 changes: 1 addition & 34 deletions packages/pds/src/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@ import { BlobStore } from '@atproto/repo'
import Database from '../db'
import { AccountService } from './account'
import { AuthService } from './auth'
import { RecordService } from './record'
import { RepoService } from './repo'
import { ModerationService } from './moderation'
import { BackgroundQueue } from '../background'
import { Crawlers } from '../crawlers'
import { LocalService } from './local'

export function createServices(resources: {
repoSigningKey: crypto.Keypair
Expand All @@ -18,46 +13,18 @@ export function createServices(resources: {
appViewAgent?: AtpAgent
appViewDid?: string
appViewCdnUrlPattern?: string
backgroundQueue: BackgroundQueue
crawlers: Crawlers
}): Services {
const {
repoSigningKey,
blobstore,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
backgroundQueue,
crawlers,
} = resources
const { blobstore } = resources
return {
account: AccountService.creator(),
auth: AuthService.creator(),
record: RecordService.creator(),
repo: RepoService.creator(
repoSigningKey,
blobstore,
backgroundQueue,
crawlers,
),
local: LocalService.creator(
repoSigningKey,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
),
moderation: ModerationService.creator(blobstore),
}
}

export type Services = {
account: FromDb<AccountService>
auth: FromDb<AuthService>
record: FromDb<RecordService>
repo: FromDb<RepoService>
local: FromDb<LocalService>
moderation: FromDb<ModerationService>
}

Expand Down
121 changes: 121 additions & 0 deletions packages/pds/src/user-db/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import assert from 'assert'
import {
Kysely,
SqliteDialect,
Migrator,
KyselyPlugin,
PluginTransformQueryArgs,
PluginTransformResultArgs,
RootOperationNode,
QueryResult,
UnknownRow,
} from 'kysely'
import SqliteDB from 'better-sqlite3'
import { DatabaseSchema } from './schema'
import * as migrations from './migrations'
import { CtxMigrationProvider } from './migrations/provider'

export class UserDb {
migrator: Migrator
destroyed = false

constructor(public db: Kysely<DatabaseSchema>) {
this.migrator = new Migrator({
db,
provider: new CtxMigrationProvider(migrations),
})
}

static sqlite(location: string): UserDb {
const db = new Kysely<DatabaseSchema>({
dialect: new SqliteDialect({
database: new SqliteDB(location),
}),
})
return new UserDb(db)
}

static memory(): UserDb {
return UserDb.sqlite(':memory:')
}

async transaction<T>(fn: (db: UserDb) => Promise<T>): Promise<T> {
const leakyTxPlugin = new LeakyTxPlugin()
return this.db
.withPlugin(leakyTxPlugin)
.transaction()
.execute(async (txn) => {
const dbTxn = new UserDb(txn)
const txRes = await fn(dbTxn)
.catch(async (err) => {
leakyTxPlugin.endTx()
// ensure that all in-flight queries are flushed & the connection is open
await dbTxn.db.getExecutor().provideConnection(async () => {})
throw err
})
.finally(() => leakyTxPlugin.endTx())
return txRes
})
}

get isTransaction() {
return this.db.isTransaction
}

assertTransaction() {
assert(this.isTransaction, 'Transaction required')
}

assertNotTransaction() {
assert(!this.isTransaction, 'Cannot be in a transaction')
}

async close(): Promise<void> {
if (this.destroyed) return
await this.db.destroy()
this.destroyed = true
}

async migrateToOrThrow(migration: string) {
const { error, results } = await this.migrator.migrateTo(migration)
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}

async migrateToLatestOrThrow() {
const { error, results } = await this.migrator.migrateToLatest()
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}
}

class LeakyTxPlugin implements KyselyPlugin {
private txOver: boolean

endTx() {
this.txOver = true
}

transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
if (this.txOver) {
throw new Error('tx already failed')
}
return args.node
}

async transformResult(
args: PluginTransformResultArgs,
): Promise<QueryResult<UnknownRow>> {
return args.result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Kysely } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {}

export async function down(db: Kysely<unknown>): Promise<void> {}
5 changes: 5 additions & 0 deletions packages/pds/src/user-db/migrations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// NOTE this file can be edited by hand, but it is also appended to by the migration:create command.
// It's important that every migration is exported from here with the proper name. We'd simplify
// this with kysely's FileMigrationProvider, but it doesn't play nicely with the build process.

export * as _20230613T164932261Z from './20230613T164932261Z-init'
24 changes: 24 additions & 0 deletions packages/pds/src/user-db/migrations/provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Kysely, Migration, MigrationProvider } from 'kysely'

// @TODO remove/cleanup

// Passes a context argument to migrations. We use this to thread the dialect into migrations

export class CtxMigrationProvider implements MigrationProvider {
constructor(private migrations: Record<string, CtxMigration>) {}
async getMigrations(): Promise<Record<string, Migration>> {
const ctxMigrations: Record<string, Migration> = {}
Object.entries(this.migrations).forEach(([name, migration]) => {
ctxMigrations[name] = {
up: async (db) => await migration.up(db),
down: async (db) => await migration.down?.(db),
}
})
return ctxMigrations
}
}

export interface CtxMigration {
up(db: Kysely<unknown>): Promise<void>
down?(db: Kysely<unknown>): Promise<void>
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export interface Blob {
creator: string
cid: string
mimeType: string
size: number
Expand Down
Loading

0 comments on commit 99862e3

Please sign in to comment.