Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mock data plane #1925

Merged
merged 21 commits into from
Dec 7, 2023
11 changes: 1 addition & 10 deletions packages/bsky/src/api/app/bsky/graph/getMutes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Server } from '../../../../lexicon'
import { paginate, TimeCidKeyset } from '../../../../db/pagination'
import { CreatedAtDidKeyset, paginate } from '../../../../db/pagination'
import AppContext from '../../../../context'
import { notSoftDeletedClause } from '../../../../db/util'

Expand Down Expand Up @@ -44,12 +44,3 @@ export default function (server: Server, ctx: AppContext) {
},
})
}

export class CreatedAtDidKeyset extends TimeCidKeyset<{
createdAt: string
did: string // dids are treated identically to cids in TimeCidKeyset
}> {
labelResult(result: { createdAt: string; did: string }) {
return { primary: result.createdAt, secondary: result.did }
}
}
6 changes: 4 additions & 2 deletions packages/bsky/src/data-plane/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import http from 'http'
import events from 'events'
import express from 'express'
import { expressConnectMiddleware } from '@connectrpc/connect-express'
import routes from './routes'
import createRoutes from './routes'
import { Database } from '../../db'

export class DataPlaneServer {
constructor(public server: http.Server) {}

static async create(port: number) {
static async create(db: Database, port: number) {
const app = express()
const routes = createRoutes(db)
app.use(expressConnectMiddleware({ routes }))
const server = app.listen(port)
await events.once(server, 'listening')
Expand Down
12 changes: 0 additions & 12 deletions packages/bsky/src/data-plane/server/routes.ts

This file was deleted.

121 changes: 121 additions & 0 deletions packages/bsky/src/data-plane/server/routes/blocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { ServiceImpl } from '@connectrpc/connect'
import { Service } from '../../gen/bsky_connect'
import { Database } from '../../../db'
import { TimeCidKeyset, paginate } from '../../../db/pagination'

export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
async getBidirectionalBlock(req) {
const { actorDid, targetDid } = req
const res = await db.db
.selectFrom('actor_block')
.where((qb) =>
qb
.where('actor_block.creator', '=', actorDid)
.where('actor_block.subjectDid', '=', targetDid),
)
.orWhere((qb) =>
qb
.where('actor_block.creator', '=', targetDid)
.where('actor_block.subjectDid', '=', actorDid),
)
.limit(1)
.selectAll()
.executeTakeFirst()

return {
blockUri: res?.uri,
}
},

async getBlocks(req) {
const { actorDid, cursor, limit } = req
const { ref } = db.db.dynamic

let builder = db.db
.selectFrom('actor_block')
.where('actor_block.creator', '=', actorDid)
.selectAll()

const keyset = new TimeCidKeyset(
ref('actor_block.sortAt'),
ref('actor_block.cid'),
)
builder = paginate(builder, {
limit,
cursor,
keyset,
})

const blocks = await builder.execute()
return {
blockUris: blocks.map((b) => b.uri),
cursor: keyset.packFromResult(blocks),
}
},

async getBidirectionalBlockViaList(req) {
const { actorDid, targetDid } = req
const res = await db.db
.selectFrom('list_block')
.innerJoin('list_item', 'list_item.listUri', 'list_block.subjectUri')
.where((qb) =>
qb
.where('list_block.creator', '=', actorDid)
.where('list_item.subjectDid', '=', targetDid),
)
.orWhere((qb) =>
qb
.where('list_block.creator', '=', targetDid)
.where('list_item.subjectDid', '=', actorDid),
)
.limit(1)
.selectAll('list_block')
.executeTakeFirst()

return {
listUri: res?.subjectUri,
}
},

async getBlocklistSubscription(req) {
const { actorDid, listUri } = req
const res = await db.db
.selectFrom('list_block')
.where('creator', '=', actorDid)
.where('subjectUri', '=', listUri)
.selectAll()
.limit(1)
.executeTakeFirst()
return {
subscribed: !!res,
}
},

async getBlocklistSubscriptions(req) {
const { actorDid, limit, cursor } = req
const { ref } = db.db.dynamic
let builder = db.db
.selectFrom('list')
.whereExists(
db.db
.selectFrom('list_block')
.where('list_block.creator', '=', actorDid)
.whereRef('list_block.subjectUri', '=', ref('list.uri'))
.selectAll(),
)
.selectAll('list')

const keyset = new TimeCidKeyset(ref('list.createdAt'), ref('list.cid'))
builder = paginate(builder, {
limit,
cursor,
keyset,
})
const lists = await builder.execute()

return {
listUris: lists.map((l) => l.uri),
cursor: keyset.packFromResult(lists),
}
},
})
67 changes: 67 additions & 0 deletions packages/bsky/src/data-plane/server/routes/feed-gens.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { ServiceImpl } from '@connectrpc/connect'
import { Service } from '../../gen/bsky_connect'
import * as ui8 from 'uint8arrays'
import { Database } from '../../../db'
import { keyBy } from '@atproto/common'
import { TimeCidKeyset, paginate } from '../../../db/pagination'

export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
async getFeedGenerators(req) {
if (req.uris.length === 0) {
return { records: [] }
}
const res = await db.db
.selectFrom('record')
.selectAll()
.where('uri', 'in', req.uris)
.execute()
const byUri = keyBy(res, 'uri')
const records = req.uris.map((uri) => {
const row = byUri[uri]
const json = row ? row.json : JSON.stringify(null)
return ui8.fromString(json, 'utf8')
})
return { records }
},

async getActorFeeds(req) {
const { actorDid, limit, cursor } = req

const { ref } = db.db.dynamic
let builder = db.db
.selectFrom('feed_generator')
.selectAll()
.where('feed_generator.creator', '=', actorDid)

const keyset = new TimeCidKeyset(
ref('feed_generator.createdAt'),
ref('feed_generator.cid'),
)
builder = paginate(builder, {
limit,
cursor,
keyset,
})
const feeds = await builder.execute()

return {
uris: feeds.map((f) => f.uri),
cursor: keyset.packFromResult(feeds),
}
},

async getSuggestedFeeds() {
const feeds = await db.db
.selectFrom('suggested_feed')
.orderBy('suggested_feed.order', 'asc')
.selectAll()
.execute()
return {
uris: feeds.map((f) => f.uri),
}
},

async getFeedGeneratorStatus() {
throw new Error('unimplemented')
},
})
128 changes: 128 additions & 0 deletions packages/bsky/src/data-plane/server/routes/feeds.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { ServiceImpl } from '@connectrpc/connect'
import { Service } from '../../gen/bsky_connect'
import { Database } from '../../../db'
import { TimeCidKeyset, paginate } from '../../../db/pagination'

export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
async getAuthorFeed(req) {
const { actorDid, limit, cursor, repliesOnly, mediaOnly } = req
const { ref } = db.db.dynamic

// defaults to posts, reposts, and replies
let builder = db.db
.selectFrom('feed_item')
.selectAll('feed_item')
.where('originatorDid', '=', actorDid)

if (mediaOnly) {
builder = builder
// only your own posts
.where('type', '=', 'post')
// only posts with media
.whereExists((qb) =>
qb
.selectFrom('post_embed_image')
.select('post_embed_image.postUri')
.whereRef('post_embed_image.postUri', '=', 'feed_item.postUri'),
)
} else if (repliesOnly) {
// @TODO
}

const keyset = new TimeCidKeyset(
ref('feed_item.sortAt'),
ref('feed_item.cid'),
)

builder = paginate(builder, {
limit,
cursor,
keyset,
})

const feedItems = await builder.execute()

return {
uris: feedItems.map((row) => row.uri),
cursor: keyset.packFromResult(feedItems),
}
},

async getTimeline(req) {
const { actorDid, limit, cursor } = req
const { ref } = db.db.dynamic

const keyset = new TimeCidKeyset(
ref('feed_item.sortAt'),
ref('feed_item.cid'),
)

let followQb = db.db
.selectFrom('feed_item')
.innerJoin('follow', 'follow.subjectDid', 'feed_item.originatorDid')
.where('follow.creator', '=', actorDid)
.selectAll('feed_item')

followQb = paginate(followQb, {
limit,
cursor,
keyset,
tryIndex: true,
})

let selfQb = db.db
.selectFrom('feed_item')
.where('feed_item.originatorDid', '=', actorDid)
.selectAll('feed_item')

selfQb = paginate(selfQb, {
limit: Math.min(limit, 10),
cursor,
keyset,
tryIndex: true,
})

const [followRes, selfRes] = await Promise.all([
followQb.execute(),
selfQb.execute(),
])

const feedItems = [...followRes, ...selfRes]
.sort((a, b) => {
if (a.sortAt > b.sortAt) return -1
if (a.sortAt < b.sortAt) return 1
return a.cid > b.cid ? -1 : 1
})
.slice(0, limit)

return {
uris: feedItems.map((item) => item.uri),
cursor: keyset.packFromResult(feedItems),
}
},

async getListFeed(req) {
const { listUri, cursor, limit } = req
const { ref } = db.db.dynamic

let builder = db.db
.selectFrom('post')
.selectAll()
.innerJoin('list_item', 'list_item.subjectDid', 'post.creator')
.where('list_item.listUri', '=', listUri)

const keyset = new TimeCidKeyset(ref('post.sortAt'), ref('post.cid'))
builder = paginate(builder, {
limit,
cursor,
keyset,
tryIndex: true,
})
const feedItems = await builder.execute()

return {
uris: feedItems.map((item) => item.uri),
cursor: keyset.packFromResult(feedItems),
}
},
})
Loading
Loading