Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Oct 3, 2023
1 parent 99862e3 commit f9e9096
Show file tree
Hide file tree
Showing 31 changed files with 433 additions and 474 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import assert from 'assert'
import path from 'path'
import {
Kysely,
SqliteDialect,
Expand All @@ -14,38 +15,38 @@ import SqliteDB from 'better-sqlite3'
import { DatabaseSchema } from './schema'
import * as migrations from './migrations'
import { CtxMigrationProvider } from './migrations/provider'
export * from './schema'

export class UserDb {
type CommitHook = () => void

export class ActorDb {
migrator: Migrator
destroyed = false
commitHooks: CommitHook[] = []

constructor(public db: Kysely<DatabaseSchema>) {
constructor(public did: string, public db: Kysely<DatabaseSchema>) {
this.migrator = new Migrator({
db,
provider: new CtxMigrationProvider(migrations),
})
}

static sqlite(location: string): UserDb {
static sqlite(location: string, did: string): ActorDb {
const db = new Kysely<DatabaseSchema>({
dialect: new SqliteDialect({
database: new SqliteDB(location),
database: new SqliteDB(path.join(location, did)),
}),
})
return new UserDb(db)
}

static memory(): UserDb {
return UserDb.sqlite(':memory:')
return new ActorDb(did, db)
}

async transaction<T>(fn: (db: UserDb) => Promise<T>): Promise<T> {
async transaction<T>(fn: (db: ActorDb) => Promise<T>): Promise<T> {
const leakyTxPlugin = new LeakyTxPlugin()
return this.db
const { hooks, txRes } = await this.db
.withPlugin(leakyTxPlugin)
.transaction()
.execute(async (txn) => {
const dbTxn = new UserDb(txn)
const dbTxn = new ActorDb(this.did, txn)
const txRes = await fn(dbTxn)
.catch(async (err) => {
leakyTxPlugin.endTx()
Expand All @@ -54,8 +55,16 @@ export class UserDb {
throw err
})
.finally(() => leakyTxPlugin.endTx())
return txRes
const hooks = dbTxn.commitHooks
return { hooks, txRes }
})
hooks.map((hook) => hook())
return txRes
}

onCommit(fn: () => void) {
this.assertTransaction()
this.commitHooks.push(fn)
}

get isTransaction() {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ import * as ipldBlock from './ipld-block'
import * as blob from './blob'
import * as repoBlob from './repo-blob'

export type { UserPref } from './user-pref'
export type { RepoRoot } from './repo-root'
export type { Record } from './record'
export type { Backlink } from './backlink'
export type { IpldBlock } from './ipld-block'
export type { Blob } from './blob'
export type { RepoBlob } from './repo-blob'

export type DatabaseSchema = userPref.PartialDB &
repoRoot.PartialDB &
record.PartialDB &
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ export interface RepoRoot {
indexedAt: string
}

export const tableName = 'repo_root'
const tableName = 'repo_root'

export type PartialDB = { [tableName]: RepoRoot }
94 changes: 94 additions & 0 deletions packages/pds/src/actor-store/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { AtpAgent } from '@atproto/api'
import * as crypto from '@atproto/crypto'
import { BlobStore } from '@atproto/repo'
import { ActorDb } from './actor-db'
import { ActorRepo } from './repo'
import { ActorRecord } from './record'
import { ActorLocal } from './local'
import { ActorPreference } from './preference'
import { BackgroundQueue } from '../background'

type ActorStoreReaderResources = {
repoSigningKey: crypto.Keypair
pdsHostname: string
appViewAgent?: AtpAgent
appViewDid?: string
appViewCdnUrlPattern?: string
}

type ActorStoreResources = ActorStoreReaderResources & {
blobstore: BlobStore
backgroundQueue: BackgroundQueue
}

export const createActorStore = (
resources: ActorStoreResources,
): ActorStore => {
return {
reader: (did: string) => {
const db = ActorDb.sqlite('', did)
return createActorReader(db, resources)
},
transact: <T>(did: string, fn: ActorStoreTransactFn<T>) => {
const db = ActorDb.sqlite('', did)
return db.transaction((dbTxn) => {
const store = createActorTransactor(dbTxn, resources)
return fn(store)
})
},
}
}

const createActorTransactor = (
db: ActorDb,
resources: ActorStoreResources,
): ActorStoreTransactor => {
const { repoSigningKey, blobstore, backgroundQueue } = resources
const reader = createActorReader(db, resources)
return {
...reader,
repo: new ActorRepo(db, repoSigningKey, blobstore, backgroundQueue),
}
}

const createActorReader = (
db: ActorDb,
resources: ActorStoreReaderResources,
): ActorStoreReader => {
const {
repoSigningKey,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
} = resources
return {
record: new ActorRecord(db),
local: new ActorLocal(
db,
repoSigningKey,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
),
pref: new ActorPreference(db),
}
}

export type ActorStore = {
reader: (did: string) => ActorStoreReader
transact: <T>(did: string, store: ActorStoreTransactFn<T>) => Promise<T>
}

export type ActorStoreTransactFn<T> = (fn: ActorStoreTransactor) => Promise<T>

export type ActorStoreTransactor = ActorStoreReader & {
repo: ActorRepo
}

export type ActorStoreReader = {
record: ActorRecord
local: ActorLocal
pref: ActorPreference
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ import {
import { AtpAgent } from '@atproto/api'
import { Keypair } from '@atproto/crypto'
import { createServiceAuthHeaders } from '@atproto/xrpc-server'
import { UserDb } from '../user-db'
import { ActorDb } from './actor-db'

type CommonSignedUris = 'avatar' | 'banner' | 'feed_thumbnail' | 'feed_fullsize'

export class LocalService {
export class ActorLocal {
constructor(
public db: UserDb,
public db: ActorDb,
public signingKey: Keypair,
public pdsHostname: string,
public appViewAgent?: AtpAgent,
Expand All @@ -52,8 +52,8 @@ export class LocalService {
appviewDid?: string,
appviewCdnUrlPattern?: string,
) {
return (db: UserDb) =>
new LocalService(
return (db: ActorDb) =>
new ActorLocal(
db,
signingKey,
pdsHostname,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { UserDb } from '../user-db'
import { ActorDb } from './actor-db'

export class PreferencesService {
constructor(public db: UserDb) {}
export class ActorPreference {
constructor(public db: ActorDb) {}

static creator() {
return (db: UserDb) => new PreferencesService(db)
return (db: ActorDb) => new ActorPreference(db)
}

async getPreferences(
Expand Down
1 change: 1 addition & 0 deletions packages/pds/src/actor-store/reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class ActorStoreReader {}
Loading

0 comments on commit f9e9096

Please sign in to comment.