Skip to content

Commit

Permalink
Merge branch 'multi-pds-auth' of github.com:bluesky-social/atproto in…
Browse files Browse the repository at this point in the history
…to multi-pds-auth
  • Loading branch information
devinivy committed Oct 31, 2023
2 parents 04c1d6b + f09a8d4 commit 6545960
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 46 deletions.
2 changes: 1 addition & 1 deletion packages/pds/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@atproto/pds",
"version": "0.3.0-entryway.0",
"version": "0.3.0-entryway.1",
"license": "MIT",
"description": "Reference implementation of atproto Personal Data Server (PDS)",
"keywords": [
Expand Down
31 changes: 30 additions & 1 deletion packages/pds/src/api/com/atproto/admin/updateAccountHandle.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import {
AuthRequiredError,
InvalidRequestError,
UpstreamFailureError,
} from '@atproto/xrpc-server'
import { normalizeAndValidateHandle } from '../../../../handle'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
Expand All @@ -7,6 +11,7 @@ import {
UserAlreadyExistsError,
} from '../../../../services/account'
import { httpLogger } from '../../../../logger'
import { isThisPds } from '../../../proxy'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.admin.updateAccountHandle({
Expand Down Expand Up @@ -47,6 +52,30 @@ export default function (server: Server, ctx: AppContext) {
})
}

const { pdsDid } = existingAccnt
if (ctx.cfg.service.isEntryway && !isThisPds(ctx, pdsDid)) {
const pds =
pdsDid &&
(await ctx.services.account(ctx.db).getPds(pdsDid, { cached: true }))
if (!pds) {
throw new UpstreamFailureError('unknown pds')
}
// the pds emits the handle event on the firehose, but the entryway is responsible for updating the did doc.
// the long flow is: pds(identity.updateHandle) -> entryway(identity.updateHandle) -> pds(admin.updateAccountHandle)
const agent = ctx.pdsAgents.get(pds.host)
await agent.com.atproto.admin.updateAccountHandle(
{
did,
handle: input.body.handle,
},
{
encoding: 'application/json',
headers: ctx.authVerifier.createAdminRoleHeaders(),
},
)
return // do not sequence handle event on the entryway
}

try {
await ctx.db.transaction(async (dbTxn) => {
await ctx.services.account(dbTxn).sequenceHandle(seqHandleTok)
Expand Down
29 changes: 27 additions & 2 deletions packages/pds/src/api/com/atproto/identity/updateHandle.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { InvalidRequestError, UpstreamFailureError } from '@atproto/xrpc-server'
import { DAY, MINUTE } from '@atproto/common'
import { normalizeAndValidateHandle } from '../../../../handle'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
Expand All @@ -7,7 +8,7 @@ import {
UserAlreadyExistsError,
} from '../../../../services/account'
import { httpLogger } from '../../../../logger'
import { DAY, MINUTE } from '@atproto/common'
import { isThisPds } from '../../../proxy'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.identity.updateHandle({
Expand All @@ -26,6 +27,7 @@ export default function (server: Server, ctx: AppContext) {
],
handler: async ({ auth, input }) => {
const requester = auth.credentials.did
const pdsDid = auth.credentials.pdsDid
const handle = await normalizeAndValidateHandle({
ctx,
handle: input.body.handle,
Expand Down Expand Up @@ -63,6 +65,29 @@ export default function (server: Server, ctx: AppContext) {
})
}

if (ctx.cfg.service.isEntryway && !isThisPds(ctx, pdsDid)) {
const pds =
pdsDid &&
(await ctx.services.account(ctx.db).getPds(pdsDid, { cached: true }))
if (!pds) {
throw new UpstreamFailureError('unknown pds')
}
// the pds emits the handle event on the firehose, but the entryway is responsible for updating the did doc.
// the long flow is: pds(identity.updateHandle) -> entryway(identity.updateHandle) -> pds(admin.updateAccountHandle)
const agent = ctx.pdsAgents.get(pds.host)
await agent.com.atproto.admin.updateAccountHandle(
{
did: requester,
handle: input.body.handle,
},
{
encoding: 'application/json',
headers: ctx.authVerifier.createAdminRoleHeaders(),
},
)
return // do not sequence handle event on the entryway
}

try {
await ctx.db.transaction(async (dbTxn) => {
await ctx.services.account(dbTxn).sequenceHandle(seqHandleTok)
Expand Down
12 changes: 6 additions & 6 deletions packages/pds/src/api/com/atproto/server/createAccount.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { MINUTE, cborDecode, cborEncode, check } from '@atproto/common'
import { AtprotoData, ensureAtpDocument } from '@atproto/identity'
import { InvalidRequestError } from '@atproto/xrpc-server'
import AtpAgent from '@atproto/api'
import * as plc from '@did-plc/lib'
import disposable from 'disposable-email'
import { normalizeAndValidateHandle } from '../../../../handle'
Expand All @@ -12,8 +11,9 @@ import { countAll } from '../../../../db/util'
import { UserAlreadyExistsError } from '../../../../services/account'
import AppContext from '../../../../context'
import Database from '../../../../db'
import { getPdsEndpoint, isThisPds } from '../../../proxy'
import { isThisPds } from '../../../proxy'
import { didDocForSession } from './util'
import { getPdsEndpoint } from '../../../../pds-agents'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.server.createAccount({
Expand Down Expand Up @@ -132,7 +132,7 @@ export default function (server: Server, ctx: AppContext) {
// Setup repo root
await repoTxn.createRepo(did, [], now)
} else {
const agent = new AtpAgent({ service: getPdsEndpoint(pds.host) })
const agent = ctx.pdsAgents.get(pds.host)
await agent.com.atproto.server.createAccount({
...input.body,
did,
Expand Down Expand Up @@ -215,7 +215,7 @@ const getDidAndPlcOp = async (
}> => {
const pdsEndpoint = pds ? getPdsEndpoint(pds.host) : ctx.cfg.service.publicUrl
const pdsSigningKey = pds
? await reserveSigningKey(pds.host)
? await reserveSigningKey(ctx, pds.host)
: ctx.repoSigningKey.did()

// if the user brings their own PLC op then we validate it then submit it to PLC on their behalf
Expand Down Expand Up @@ -334,8 +334,8 @@ const assignPds = async (ctx: AppContext) => {
return pdses.at(idx)
}

const reserveSigningKey = async (host: string) => {
const agent = new AtpAgent({ service: getPdsEndpoint(host) })
const reserveSigningKey = async (ctx: AppContext, host: string) => {
const agent = ctx.pdsAgents.get(host)
const result = await agent.com.atproto.server.reserveSigningKey()
return result.data.signingKey
}
Expand Down
13 changes: 10 additions & 3 deletions packages/pds/src/api/com/atproto/server/util.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { getPdsEndpoint } from '@atproto/common'
import * as crypto from '@atproto/crypto'
import { DidDocument } from '@atproto/identity'
import { ServerConfig } from '../../../../config'
Expand Down Expand Up @@ -26,16 +27,22 @@ export const getRandomToken = () => {
return token.slice(0, 5) + '-' + token.slice(5, 10)
}

// @TODO once supporting multiple pdses, validate pds in did doc based on allow-list.
export const didDocForSession = async (
ctx: AppContext,
did: string,
forceRefresh?: boolean,
): Promise<DidDocument | undefined> => {
if (!ctx.cfg.identity.enableDidDocWithSession) return
try {
const didDoc = await ctx.idResolver.did.resolve(did, forceRefresh)
return didDoc ?? undefined
const [didDoc, pdses] = await Promise.all([
ctx.idResolver.did.resolve(did, forceRefresh),
ctx.services.account(ctx.db).getPdses({ cached: true }),
])
if (!didDoc) return
const pdsEndpoint = getPdsEndpoint(didDoc)
const pdsHost = pdsEndpoint && new URL(pdsEndpoint).host
if (!pdses.some((pds) => pds.host === pdsHost)) return
return didDoc
} catch (err) {
dbLogger.warn({ err, did }, 'failed to resolve did doc')
}
Expand Down
14 changes: 2 additions & 12 deletions packages/pds/src/api/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ export const proxy = async <T>(
return null // skip proxying
}
const accountService = ctx.services.account(ctx.db)
const pds = pdsDid && (await accountService.getPds(pdsDid))
const pds = pdsDid && (await accountService.getPds(pdsDid, { cached: true }))
if (!pds) {
throw new UpstreamFailureError('unknown pds')
}
// @TODO reuse agents
const agent = new AtpAgent({ service: getPdsEndpoint(pds.host) })
try {
return await fn(agent)
return await fn(ctx.pdsAgents.get(pds.host))
} catch (err) {
// @TODO may need to pass through special lexicon errors
if (
Expand All @@ -42,14 +40,6 @@ export const proxy = async <T>(
}
}

export const getPdsEndpoint = (host: string) => {
const service = new URL(`https://${host}`)
if (service.hostname === 'localhost') {
service.protocol = 'http:'
}
return service.origin
}

export const isThisPds = (
ctx: AppContext,
pdsDid: string | null | undefined,
Expand Down
11 changes: 11 additions & 0 deletions packages/pds/src/auth-verifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,17 @@ export class AuthVerifier {
return { status: Invalid, admin: false, moderator: false, triage: false }
}

createAdminRoleHeaders = () => {
return {
authorization:
'Basic ' +
ui8.toString(
ui8.fromString(`admin:${this._adminPass}`, 'utf8'),
'base64pad',
),
}
}

isUserOrAdmin(
auth: AccessOutput | RoleOutput | NullOutput,
did: string,
Expand Down
7 changes: 7 additions & 0 deletions packages/pds/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Crawlers } from './crawlers'
import { DiskBlobStore } from './storage'
import { getRedisClient } from './redis'
import { RuntimeFlags } from './runtime-flags'
import { PdsAgents } from './pds-agents'

export type AppContextOptions = {
db: Database
Expand All @@ -38,6 +39,7 @@ export type AppContextOptions = {
crawlers: Crawlers
appViewAgent: AtpAgent
authVerifier: AuthVerifier
pdsAgents: PdsAgents
repoSigningKey: crypto.Keypair
plcRotationKey: crypto.Keypair
cfg: ServerConfig
Expand All @@ -60,6 +62,7 @@ export class AppContext {
public crawlers: Crawlers
public appViewAgent: AtpAgent
public authVerifier: AuthVerifier
public pdsAgents: PdsAgents
public repoSigningKey: crypto.Keypair
public plcRotationKey: crypto.Keypair
public cfg: ServerConfig
Expand All @@ -81,6 +84,7 @@ export class AppContext {
this.crawlers = opts.crawlers
this.appViewAgent = opts.appViewAgent
this.authVerifier = opts.authVerifier
this.pdsAgents = opts.pdsAgents
this.repoSigningKey = opts.repoSigningKey
this.plcRotationKey = opts.plcRotationKey
this.cfg = opts.cfg
Expand Down Expand Up @@ -191,6 +195,8 @@ export class AppContext {
crawlers,
})

const pdsAgents = new PdsAgents()

return new AppContext({
db,
blobstore,
Expand All @@ -210,6 +216,7 @@ export class AppContext {
authVerifier,
repoSigningKey,
plcRotationKey,
pdsAgents,
cfg,
...(overrides ?? {}),
})
Expand Down
22 changes: 22 additions & 0 deletions packages/pds/src/pds-agents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import AtpAgent from '@atproto/api'

export class PdsAgents {
// @NOTE only use with entries in the pds table, not for e.g. arbitrary entries found in did documents.
private cache = new Map<string, AtpAgent>()
get(host: string) {
const agent =
this.cache.get(host) ?? new AtpAgent({ service: getPdsEndpoint(host) })
if (!this.cache.has(host)) {
this.cache.set(host, agent)
}
return agent
}
}

export const getPdsEndpoint = (host: string) => {
const service = new URL(`https://${host}`)
if (service.hostname === 'localhost') {
service.protocol = 'http:'
}
return service.origin
}
Loading

0 comments on commit 6545960

Please sign in to comment.