Skip to content

Commit

Permalink
rework readers & transactors
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Oct 3, 2023
1 parent f9e9096 commit 82fd36a
Show file tree
Hide file tree
Showing 25 changed files with 342 additions and 581 deletions.
61 changes: 61 additions & 0 deletions packages/pds/src/actor-store/blob/reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import stream from 'stream'
import { CID } from 'multiformats/cid'
import { BlobNotFoundError, BlobStore } from '@atproto/repo'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { ActorDb } from '../actor-db'
import { notSoftDeletedClause } from '../../db/util'

export class BlobReader {
constructor(public db: ActorDb, public blobstore: BlobStore) {}

async getBlob(
cid: CID,
): Promise<{ size: number; mimeType?: string; stream: stream.Readable }> {
const { ref } = this.db.db.dynamic
const found = await this.db.db
.selectFrom('blob')
.selectAll()
.innerJoin('repo_blob', 'repo_blob.cid', 'blob.cid')
.where('blob.cid', '=', cid.toString())
.where(notSoftDeletedClause(ref('repo_blob')))
.executeTakeFirst()
if (!found) {
throw new InvalidRequestError('Blob not found')
}
let blobStream
try {
blobStream = await this.blobstore.getStream(cid)
} catch (err) {
if (err instanceof BlobNotFoundError) {
throw new InvalidRequestError('Blob not found')
}
throw err
}
return {
size: found.size,
mimeType: found.mimeType,
stream: blobStream,
}
}

async listBlobs(opts: {
since?: string
cursor?: string
limit: number
}): Promise<string[]> {
const { since, cursor, limit } = opts
let builder = this.db.db
.selectFrom('repo_blob')
.select('cid')
.orderBy('cid', 'asc')
.limit(limit)
if (since) {
builder = builder.where('repoRev', '>', since)
}
if (cursor) {
builder = builder.where('cid', '>', cursor)
}
const res = await builder.execute()
return res.map((row) => row.cid)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import {
} from '../../repo/types'
import * as img from '../../image'
import { BackgroundQueue } from '../../background'
import { BlobReader } from './reader'

export class ActorBlob {
export class BlobTransactor extends BlobReader {
constructor(
public db: ActorDb,
public blobstore: BlobStore,
public backgroundQueue: BackgroundQueue,
) {}
) {
super(db, blobstore)
}

async addUntetheredBlob(
userSuggestedMime: string,
Expand Down
75 changes: 52 additions & 23 deletions packages/pds/src/actor-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,31 @@ import { AtpAgent } from '@atproto/api'
import * as crypto from '@atproto/crypto'
import { BlobStore } from '@atproto/repo'
import { ActorDb } from './actor-db'
import { ActorRepo } from './repo'
import { ActorRecord } from './record'
import { ActorLocal } from './local'
import { ActorPreference } from './preference'
import { BackgroundQueue } from '../background'
import { RecordReader } from './record/reader'
import { LocalReader } from './local/reader'
import { PreferenceReader } from './preference/reader'
import { RepoReader } from './repo/reader'
import { RepoTransactor } from './repo/transactor'
import { PreferenceTransactor } from './preference/preference'

type ActorStoreReaderResources = {
type ActorStoreResources = {
repoSigningKey: crypto.Keypair
blobstore: BlobStore
backgroundQueue: BackgroundQueue
pdsHostname: string
appViewAgent?: AtpAgent
appViewDid?: string
appViewCdnUrlPattern?: string
}

type ActorStoreResources = ActorStoreReaderResources & {
blobstore: BlobStore
backgroundQueue: BackgroundQueue
}

export const createActorStore = (
resources: ActorStoreResources,
): ActorStore => {
return {
db: (did: string) => {
return ActorDb.sqlite('', did)
},
reader: (did: string) => {
const db = ActorDb.sqlite('', did)
return createActorReader(db, resources)
Expand All @@ -43,52 +45,79 @@ const createActorTransactor = (
db: ActorDb,
resources: ActorStoreResources,
): ActorStoreTransactor => {
const { repoSigningKey, blobstore, backgroundQueue } = resources
const reader = createActorReader(db, resources)
const {
repoSigningKey,
blobstore,
backgroundQueue,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
} = resources
return {
...reader,
repo: new ActorRepo(db, repoSigningKey, blobstore, backgroundQueue),
db,
repo: new RepoTransactor(db, repoSigningKey, blobstore, backgroundQueue),
record: new RecordReader(db),
local: new LocalReader(
db,
repoSigningKey,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
),
pref: new PreferenceTransactor(db),
}
}

const createActorReader = (
db: ActorDb,
resources: ActorStoreReaderResources,
resources: ActorStoreResources,
): ActorStoreReader => {
const {
repoSigningKey,
blobstore,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
} = resources
return {
record: new ActorRecord(db),
local: new ActorLocal(
db,
repo: new RepoReader(db, blobstore),
record: new RecordReader(db),
local: new LocalReader(
db,
repoSigningKey,
pdsHostname,
appViewAgent,
appViewDid,
appViewCdnUrlPattern,
),
pref: new ActorPreference(db),
pref: new PreferenceReader(db),
}
}

export type ActorStore = {
db: (did: string) => ActorDb
reader: (did: string) => ActorStoreReader
transact: <T>(did: string, store: ActorStoreTransactFn<T>) => Promise<T>
}

export type ActorStoreTransactFn<T> = (fn: ActorStoreTransactor) => Promise<T>

export type ActorStoreTransactor = ActorStoreReader & {
repo: ActorRepo
export type ActorStoreTransactor = {
db: ActorDb
repo: RepoTransactor
record: RecordReader
local: LocalReader
pref: PreferenceTransactor
}

export type ActorStoreReader = {
record: ActorRecord
local: ActorLocal
pref: ActorPreference
db: ActorDb
repo: RepoReader
record: RecordReader
local: LocalReader
pref: PreferenceReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,40 @@ import util from 'util'
import { CID } from 'multiformats/cid'
import { AtUri } from '@atproto/syntax'
import { cborToLexRecord } from '@atproto/repo'
import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post'
import { Record as ProfileRecord } from '../lexicon/types/app/bsky/actor/profile'
import { ids } from '../lexicon/lexicons'
import { AtpAgent } from '@atproto/api'
import { Keypair } from '@atproto/crypto'
import { createServiceAuthHeaders } from '@atproto/xrpc-server'
import { Record as PostRecord } from '../../lexicon/types/app/bsky/feed/post'
import { Record as ProfileRecord } from '../../lexicon/types/app/bsky/actor/profile'
import { ids } from '../../lexicon/lexicons'
import {
ProfileViewBasic,
ProfileView,
ProfileViewDetailed,
} from '../lexicon/types/app/bsky/actor/defs'
import { FeedViewPost, PostView } from '../lexicon/types/app/bsky/feed/defs'
} from '../../lexicon/types/app/bsky/actor/defs'
import { FeedViewPost, PostView } from '../../lexicon/types/app/bsky/feed/defs'
import {
Main as EmbedImages,
isMain as isEmbedImages,
} from '../lexicon/types/app/bsky/embed/images'
} from '../../lexicon/types/app/bsky/embed/images'
import {
Main as EmbedExternal,
isMain as isEmbedExternal,
} from '../lexicon/types/app/bsky/embed/external'
} from '../../lexicon/types/app/bsky/embed/external'
import {
Main as EmbedRecord,
isMain as isEmbedRecord,
View as EmbedRecordView,
} from '../lexicon/types/app/bsky/embed/record'
} from '../../lexicon/types/app/bsky/embed/record'
import {
Main as EmbedRecordWithMedia,
isMain as isEmbedRecordWithMedia,
} from '../lexicon/types/app/bsky/embed/recordWithMedia'
import { AtpAgent } from '@atproto/api'
import { Keypair } from '@atproto/crypto'
import { createServiceAuthHeaders } from '@atproto/xrpc-server'
import { ActorDb } from './actor-db'
} from '../../lexicon/types/app/bsky/embed/recordWithMedia'
import { ActorDb } from '../actor-db'

type CommonSignedUris = 'avatar' | 'banner' | 'feed_thumbnail' | 'feed_fullsize'

export class ActorLocal {
export class LocalReader {
constructor(
public db: ActorDb,
public signingKey: Keypair,
Expand All @@ -45,24 +45,6 @@ export class ActorLocal {
public appviewCdnUrlPattern?: string,
) {}

static creator(
signingKey: Keypair,
pdsHostname: string,
appViewAgent?: AtpAgent,
appviewDid?: string,
appviewCdnUrlPattern?: string,
) {
return (db: ActorDb) =>
new ActorLocal(
db,
signingKey,
pdsHostname,
appViewAgent,
appviewDid,
appviewCdnUrlPattern,
)
}

getImageUrl(pattern: CommonSignedUris, did: string, cid: string) {
if (!this.appviewCdnUrlPattern) {
return `https://${this.pdsHostname}/xrpc/${ids.ComAtprotoSyncGetBlob}?did=${did}&cid=${cid}`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,13 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { ActorDb } from './actor-db'

export class ActorPreference {
constructor(public db: ActorDb) {}

static creator() {
return (db: ActorDb) => new ActorPreference(db)
}

async getPreferences(
did: string,
namespace?: string,
): Promise<UserPreference[]> {
const prefsRes = await this.db.db
.selectFrom('user_pref')
.orderBy('id')
.selectAll()
.execute()
return prefsRes
.filter((pref) => !namespace || matchNamespace(namespace, pref.name))
.map((pref) => JSON.parse(pref.valueJson))
}
import { PreferenceReader, UserPreference, prefMatchNamespace } from './reader'

export class PreferenceTransactor extends PreferenceReader {
async putPreferences(
did: string,
values: UserPreference[],
namespace: string,
): Promise<void> {
this.db.assertTransaction()
if (!values.every((value) => matchNamespace(namespace, value.$type))) {
if (!values.every((value) => prefMatchNamespace(namespace, value.$type))) {
throw new InvalidRequestError(
`Some preferences are not in the ${namespace} namespace`,
)
Expand All @@ -40,13 +19,12 @@ export class ActorPreference {
.execute()
const putPrefs = values.map((value) => {
return {
did,
name: value.$type,
valueJson: JSON.stringify(value),
}
})
const allPrefIdsInNamespace = allPrefs
.filter((pref) => matchNamespace(namespace, pref.name))
.filter((pref) => prefMatchNamespace(namespace, pref.name))
.map((pref) => pref.id)
// replace all prefs in given namespace
if (allPrefIdsInNamespace.length) {
Expand All @@ -60,9 +38,3 @@ export class ActorPreference {
}
}
}

export type UserPreference = Record<string, unknown> & { $type: string }

const matchNamespace = (namespace: string, fullname: string) => {
return fullname === namespace || fullname.startsWith(`${namespace}.`)
}
22 changes: 22 additions & 0 deletions packages/pds/src/actor-store/preference/reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { ActorDb } from '../actor-db'

export class PreferenceReader {
constructor(public db: ActorDb) {}

async getPreferences(namespace?: string): Promise<UserPreference[]> {
const prefsRes = await this.db.db
.selectFrom('user_pref')
.orderBy('id')
.selectAll()
.execute()
return prefsRes
.filter((pref) => !namespace || prefMatchNamespace(namespace, pref.name))
.map((pref) => JSON.parse(pref.valueJson))
}
}

export type UserPreference = Record<string, unknown> & { $type: string }

export const prefMatchNamespace = (namespace: string, fullname: string) => {
return fullname === namespace || fullname.startsWith(`${namespace}.`)
}
1 change: 0 additions & 1 deletion packages/pds/src/actor-store/reader.ts

This file was deleted.

Loading

0 comments on commit 82fd36a

Please sign in to comment.