Skip to content

Commit

Permalink
Merge pull request #1705 from bluesky-social/pds-sqlite-refactor
Browse files Browse the repository at this point in the history
Pds sqlite refactor
  • Loading branch information
dholms authored Nov 1, 2023
2 parents fec3ec7 + 2166add commit 8449ceb
Show file tree
Hide file tree
Showing 190 changed files with 3,336 additions and 5,421 deletions.
38 changes: 33 additions & 5 deletions packages/aws/src/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class S3BlobStore implements BlobStore {
private client: aws.S3
private bucket: string

constructor(cfg: S3Config) {
constructor(public did: string, cfg: S3Config) {
const { bucket, ...rest } = cfg
this.bucket = bucket
this.client = new aws.S3({
Expand All @@ -26,20 +26,26 @@ export class S3BlobStore implements BlobStore {
})
}

static creator(cfg: S3Config) {
return (did: string) => {
return new S3BlobStore(did, cfg)
}
}

private genKey() {
return randomStr(32, 'base32')
}

private getTmpPath(key: string): string {
return `tmp/${key}`
return `tmp/${this.did}/${key}`
}

private getStoredPath(cid: CID): string {
return `blocks/${cid.toString()}`
return `blocks/${this.did}/${cid.toString()}`
}

private getQuarantinedPath(cid: CID): string {
return `quarantine/${cid.toString()}`
return `quarantine/${this.did}/${cid.toString()}`
}

async putTemp(bytes: Uint8Array | stream.Readable): Promise<string> {
Expand Down Expand Up @@ -122,11 +128,24 @@ export class S3BlobStore implements BlobStore {
await this.deleteKey(this.getStoredPath(cid))
}

async deleteMany(cids: CID[]): Promise<void> {
const keys = cids.map((cid) => this.getStoredPath(cid))
await this.deleteManyKeys(keys)
}

async hasStored(cid: CID): Promise<boolean> {
return this.hasKey(this.getStoredPath(cid))
}

async hasTemp(key: string): Promise<boolean> {
return this.hasKey(this.getTmpPath(key))
}

private async hasKey(key: string) {
try {
const res = await this.client.headObject({
Bucket: this.bucket,
Key: this.getStoredPath(cid),
Key: key,
})
return res.$metadata.httpStatusCode === 200
} catch (err) {
Expand All @@ -141,6 +160,15 @@ export class S3BlobStore implements BlobStore {
})
}

private async deleteManyKeys(keys: string[]) {
await this.client.deleteObjects({
Bucket: this.bucket,
Delete: {
Objects: keys.map((k) => ({ Key: k })),
},
})
}

private async move(keys: { from: string; to: string }) {
await this.client.copyObject({
Bucket: this.bucket,
Expand Down
3 changes: 1 addition & 2 deletions packages/bsky/tests/algos/hot-classic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ describe('algo hot-classic', () => {
alice = sc.dids.alice
bob = sc.dids.bob
await network.processAll()
await network.bsky.processAll()
})

afterAll(async () => {
Expand Down Expand Up @@ -59,7 +58,7 @@ describe('algo hot-classic', () => {
await sc.like(sc.dids[name], two.ref)
await sc.like(sc.dids[name], three.ref)
}
await network.bsky.processAll()
await network.processAll()

const res = await agent.api.app.bsky.feed.getFeed(
{ feed: feedUri },
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/tests/auth.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe('auth', () => {
{ headers: { authorization: `Bearer ${jwt}` } },
)
}
const origSigningKey = network.pds.ctx.repoSigningKey
const origSigningKey = await network.pds.ctx.actorStore.keypair(issuer)
const newSigningKey = await Secp256k1Keypair.create({ exportable: true })
// confirm original signing key works
await expect(attemptWithKey(origSigningKey)).resolves.toBeDefined()
Expand Down
39 changes: 19 additions & 20 deletions packages/bsky/tests/auto-moderator/labeler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,25 @@ describe('labeler', () => {
await usersSeed(sc)
await network.processAll()
alice = sc.dids.alice
const repoSvc = pdsCtx.services.repo(pdsCtx.db)
const storeBlob = async (bytes: Uint8Array) => {
const blobRef = await repoSvc.blobs.addUntetheredBlob(
alice,
'image/jpeg',
Readable.from([bytes], { objectMode: false }),
)
const preparedBlobRef = {
cid: blobRef.ref,
mimeType: 'image/jpeg',
constraints: {},
}
await repoSvc.blobs.verifyBlobAndMakePermanent(alice, preparedBlobRef)
await repoSvc.blobs.associateBlob(
preparedBlobRef,
postUri(),
TID.nextStr(),
alice,
)
return blobRef
const storeBlob = (bytes: Uint8Array) => {
return pdsCtx.actorStore.transact(alice, async (store) => {
const blobRef = await store.repo.blob.addUntetheredBlob(
'image/jpeg',
Readable.from([bytes], { objectMode: false }),
)
const preparedBlobRef = {
cid: blobRef.ref,
mimeType: 'image/jpeg',
constraints: {},
}
await store.repo.blob.verifyBlobAndMakePermanent(preparedBlobRef)
await store.repo.blob.associateBlob(
preparedBlobRef,
postUri(),
TID.nextStr(),
)
return blobRef
})
}
const bytes1 = new Uint8Array([1, 2, 3, 4])
const bytes2 = new Uint8Array([5, 6, 7, 8])
Expand Down
6 changes: 4 additions & 2 deletions packages/bsky/tests/auto-moderator/takedowns.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ describe('takedowner', () => {
.executeTakeFirst()
expect(record?.takedownId).toEqual(modAction.id)

const recordPds = await network.pds.ctx.db.db
const actorDb = await network.pds.ctx.actorStore.db(post.ref.uri.hostname)
const recordPds = await actorDb.db
.selectFrom('record')
.where('uri', '=', post.ref.uriStr)
.select('takedownRef')
Expand Down Expand Up @@ -135,7 +136,8 @@ describe('takedowner', () => {
.executeTakeFirst()
expect(record?.takedownId).toEqual(modAction.id)

const recordPds = await network.pds.ctx.db.db
const actorDb = await network.pds.ctx.actorStore.db(alice)
const recordPds = await actorDb.db
.selectFrom('record')
.where('uri', '=', res.data.uri)
.select('takedownRef')
Expand Down
6 changes: 4 additions & 2 deletions packages/bsky/tests/blob-resolver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ describe('blob resolver', () => {
})

it('fails on blob with bad signature check.', async () => {
await network.pds.ctx.blobstore.delete(fileCid)
await network.pds.ctx.blobstore.putPermanent(fileCid, randomBytes(100))
await network.pds.ctx.blobstore(fileDid).delete(fileCid)
await network.pds.ctx
.blobstore(fileDid)
.putPermanent(fileCid, randomBytes(100))
const tryGetBlob = client.get(`/blob/${fileDid}/${fileCid.toString()}`)
await expect(tryGetBlob).rejects.toThrow(
'maxContentLength size of -1 exceeded',
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/tests/handle-invalidation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe('handle invalidation', () => {
await backdateIndexedAt(bob)
// update alices handle so that the pds will let bob take her old handle
await network.pds.ctx.db.db
.updateTable('did_handle')
.updateTable('account')
.where('did', '=', alice)
.set({ handle: 'not-alice.test' })
.execute()
Expand Down
13 changes: 11 additions & 2 deletions packages/bsky/tests/indexing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,18 @@ describe('indexing', () => {
validate: false,
}),
])
const writeCommit = await network.pds.ctx.actorStore.transact(
sc.dids.alice,
(store) => store.repo.processWrites(writes),
)
await pdsServices
.repo(pdsDb)
.processWrites({ did: sc.dids.alice, writes }, 1)
.account(pdsDb)
.updateRepoRoot(sc.dids.alice, writeCommit.cid, writeCommit.rev)
await network.pds.ctx.sequencer.sequenceCommit(
sc.dids.alice,
writeCommit,
writes,
)
// Index
const { data: commit } =
await pdsAgent.api.com.atproto.sync.getLatestCommit({
Expand Down
3 changes: 3 additions & 0 deletions packages/bsky/tests/seeds/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ export default async (sc: SeedClient, users = true) => {
'tests/sample-img/key-landscape-small.jpg',
'image/jpeg',
)
// must ensure ordering of replies in indexing
await sc.network.processAll()
await sc.reply(
bob,
sc.posts[alice][1].ref,
Expand All @@ -117,6 +119,7 @@ export default async (sc: SeedClient, users = true) => {
sc.posts[alice][1].ref,
replies.carol[0],
)
await sc.network.processAll()
const alicesReplyToBob = await sc.reply(
alice,
sc.posts[alice][1].ref,
Expand Down
11 changes: 4 additions & 7 deletions packages/bsky/tests/subscription/repo.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import AtpAgent from '@atproto/api'
import { TestNetwork, SeedClient } from '@atproto/dev-env'
import { CommitData } from '@atproto/repo'
import { RepoService } from '@atproto/pds/src/services/repo'
import { PreparedWrite } from '@atproto/pds/src/repo'
import * as sequencer from '@atproto/pds/src/sequencer'
import { cborDecode, cborEncode } from '@atproto/common'
Expand Down Expand Up @@ -84,9 +83,8 @@ describe('sync', () => {

it('indexes actor when commit is unprocessable.', async () => {
// mock sequencing to create an unprocessable commit event
const afterWriteProcessingOriginal =
RepoService.prototype.afterWriteProcessing
RepoService.prototype.afterWriteProcessing = async function (
const sequenceCommitOrig = network.pds.ctx.sequencer.sequenceCommit
network.pds.ctx.sequencer.sequenceCommit = async function (
did: string,
commitData: CommitData,
writes: PreparedWrite[],
Expand All @@ -95,20 +93,19 @@ describe('sync', () => {
const evt = cborDecode(seqEvt.event) as sequencer.CommitEvt
evt.blocks = new Uint8Array() // bad blocks
seqEvt.event = cborEncode(evt)
await sequencer.sequenceEvt(this.db, seqEvt)
await network.pds.ctx.sequencer.sequenceEvt(seqEvt)
}
// create account and index the initial commit event
await sc.createAccount('jack', {
handle: 'jack.test',
email: '[email protected]',
password: 'password',
})
await network.pds.ctx.sequencerLeader?.isCaughtUp()
await network.processAll()
// confirm jack was indexed as an actor despite the bad event
const actors = await dumpTable(ctx.db.getPrimary(), 'actor', ['did'])
expect(actors.map((a) => a.handle)).toContain('jack.test')
RepoService.prototype.afterWriteProcessing = afterWriteProcessingOriginal
network.pds.ctx.sequencer.sequenceCommit = sequenceCommitOrig
})

async function updateProfile(
Expand Down
11 changes: 11 additions & 0 deletions packages/bsky/tests/views/threadgating.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe('views with thread gating', () => {
{ post: post.ref.uriStr, createdAt: iso(), allow: [] },
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
await sc.reply(sc.dids.alice, post.ref, post.ref, 'empty rules reply')
await network.processAll()
const {
Expand Down Expand Up @@ -78,6 +79,7 @@ describe('views with thread gating', () => {
},
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
await sc.reply(
sc.dids.alice,
post.ref,
Expand Down Expand Up @@ -125,6 +127,7 @@ describe('views with thread gating', () => {
},
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
// carol only follows alice
await sc.reply(
sc.dids.dan,
Expand Down Expand Up @@ -213,6 +216,7 @@ describe('views with thread gating', () => {
},
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
//
await sc.reply(sc.dids.bob, post.ref, post.ref, 'list rule reply disallow')
const aliceReply = await sc.reply(
Expand Down Expand Up @@ -277,6 +281,7 @@ describe('views with thread gating', () => {
},
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
await sc.reply(
sc.dids.alice,
post.ref,
Expand Down Expand Up @@ -317,6 +322,7 @@ describe('views with thread gating', () => {
},
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
// carol only follows alice, and the post mentions dan.
await sc.reply(sc.dids.bob, post.ref, post.ref, 'multi rule reply disallow')
const aliceReply = await sc.reply(
Expand Down Expand Up @@ -372,6 +378,7 @@ describe('views with thread gating', () => {
{ post: post.ref.uriStr, createdAt: iso() },
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
const aliceReply = await sc.reply(
sc.dids.alice,
post.ref,
Expand Down Expand Up @@ -406,6 +413,7 @@ describe('views with thread gating', () => {
},
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
// carol only follows alice
const orphanedReply = await sc.reply(
sc.dids.alice,
Expand Down Expand Up @@ -465,6 +473,7 @@ describe('views with thread gating', () => {
{ post: post.ref.uriStr, createdAt: iso(), allow: [] },
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
const selfReply = await sc.reply(
sc.dids.carol,
post.ref,
Expand Down Expand Up @@ -498,6 +507,7 @@ describe('views with thread gating', () => {
},
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
// carol only follows alice
const badReply = await sc.reply(
sc.dids.dan,
Expand Down Expand Up @@ -541,6 +551,7 @@ describe('views with thread gating', () => {
{ post: postB.ref.uriStr, createdAt: iso(), allow: [] },
sc.getHeaders(sc.dids.carol),
)
await network.processAll()
await sc.reply(sc.dids.alice, postA.ref, postA.ref, 'ungated reply')
await sc.reply(sc.dids.alice, postB.ref, postB.ref, 'ungated reply')
await network.processAll()
Expand Down
15 changes: 15 additions & 0 deletions packages/common/src/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,18 @@ export const fileExists = async (location: string): Promise<boolean> => {
throw err
}
}

export const rmIfExists = async (
filepath: string,
recursive = false,
): Promise<void> => {
try {
await fs.rm(filepath, { recursive })
} catch (err) {
if (isErrnoException(err) && err.code === 'ENOENT') {
// if blob not found, then it's already been deleted & we can just return
return
}
throw err
}
}
7 changes: 7 additions & 0 deletions packages/crypto/src/sha.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ export const sha256 = async (
typeof input === 'string' ? uint8arrays.fromString(input, 'utf8') : input
return noble.sha256(bytes)
}

export const sha256Hex = async (
input: Uint8Array | string,
): Promise<string> => {
const hash = await sha256(input)
return uint8arrays.toString(hash, 'hex')
}
Loading

0 comments on commit 8449ceb

Please sign in to comment.