Skip to content

Commit

Permalink
Merge branch 'main' into pds-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Dec 4, 2023
2 parents ab063b9 + cad30a7 commit 3441489
Show file tree
Hide file tree
Showing 38 changed files with 730 additions and 20 deletions.
5 changes: 0 additions & 5 deletions .changeset/brave-swans-kiss.md

This file was deleted.

11 changes: 11 additions & 0 deletions packages/api/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @atproto/api

## 0.6.24

### Patch Changes

- [#1912](https://github.com/bluesky-social/atproto/pull/1912) [`378fc613`](https://github.com/bluesky-social/atproto/commit/378fc6132f621ca517897c9467ed5bba134b3776) Thanks [@devinivy](https://github.com/devinivy)! - Contains breaking lexicon changes: removing legacy com.atproto admin endpoints, making uri field required on app.bsky list views.

- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60)]:
- @atproto/syntax@0.1.5
- @atproto/lexicon@0.3.1
- @atproto/xrpc@0.4.1

## 0.6.23

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/api/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@atproto/api",
"version": "0.6.24-next.1",
"version": "0.6.24",
"license": "MIT",
"description": "Client library for atproto and Bluesky",
"keywords": [
Expand Down
7 changes: 7 additions & 0 deletions packages/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# @atproto/aws

## 0.1.6

### Patch Changes

- Updated dependencies []:
- @atproto/repo@0.3.6

## 0.1.5

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/aws/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@atproto/aws",
"version": "0.1.5",
"version": "0.1.6",
"license": "MIT",
"description": "Shared AWS cloud API helpers for atproto services",
"keywords": [
Expand Down
11 changes: 11 additions & 0 deletions packages/bsky/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# @atproto/bsky

## 0.0.16

### Patch Changes

- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60), [`378fc613`](https://github.com/bluesky-social/atproto/commit/378fc6132f621ca517897c9467ed5bba134b3776)]:
- @atproto/syntax@0.1.5
- @atproto/api@0.6.24
- @atproto/lexicon@0.3.1
- @atproto/repo@0.3.6
- @atproto/xrpc-server@0.4.2

## 0.0.15

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@atproto/bsky",
"version": "0.0.15",
"version": "0.0.16",
"license": "MIT",
"description": "Reference implementation of app.bsky App View (Bluesky API)",
"keywords": [
Expand Down
50 changes: 50 additions & 0 deletions packages/bsky/src/daemon/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import assert from 'assert'

export interface DaemonConfigValues {
version: string
dbPostgresUrl: string
dbPostgresSchema?: string
}

export class DaemonConfig {
constructor(private cfg: DaemonConfigValues) {}

static readEnv(overrides?: Partial<DaemonConfigValues>) {
const version = process.env.BSKY_VERSION || '0.0.0'
const dbPostgresUrl =
overrides?.dbPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL
const dbPostgresSchema =
overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA
assert(dbPostgresUrl)
return new DaemonConfig({
version,
dbPostgresUrl,
dbPostgresSchema,
...stripUndefineds(overrides ?? {}),
})
}

get version() {
return this.cfg.version
}

get dbPostgresUrl() {
return this.cfg.dbPostgresUrl
}

get dbPostgresSchema() {
return this.cfg.dbPostgresSchema
}
}

function stripUndefineds(
obj: Record<string, unknown>,
): Record<string, unknown> {
const result = {}
Object.entries(obj).forEach(([key, val]) => {
if (val !== undefined) {
result[key] = val
}
})
return result
}
27 changes: 27 additions & 0 deletions packages/bsky/src/daemon/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { PrimaryDatabase } from '../db'
import { DaemonConfig } from './config'
import { Services } from './services'

export class DaemonContext {
constructor(
private opts: {
db: PrimaryDatabase
cfg: DaemonConfig
services: Services
},
) {}

get db(): PrimaryDatabase {
return this.opts.db
}

get cfg(): DaemonConfig {
return this.opts.cfg
}

get services(): Services {
return this.opts.services
}
}

export default DaemonContext
79 changes: 79 additions & 0 deletions packages/bsky/src/daemon/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { PrimaryDatabase } from '../db'
import { dbLogger } from '../logger'
import { DaemonConfig } from './config'
import { DaemonContext } from './context'
import { createServices } from './services'
import { ImageUriBuilder } from '../image/uri'
import { LabelCache } from '../label-cache'
import { NotificationsDaemon } from './notifications'
import logger from './logger'

export { DaemonConfig } from './config'
export type { DaemonConfigValues } from './config'

export class BskyDaemon {
public ctx: DaemonContext
public notifications: NotificationsDaemon
private dbStatsInterval: NodeJS.Timer
private notifStatsInterval: NodeJS.Timer

constructor(opts: {
ctx: DaemonContext
notifications: NotificationsDaemon
}) {
this.ctx = opts.ctx
this.notifications = opts.notifications
}

static create(opts: { db: PrimaryDatabase; cfg: DaemonConfig }): BskyDaemon {
const { db, cfg } = opts
const imgUriBuilder = new ImageUriBuilder('https://daemon.invalid') // will not be used by daemon
const labelCache = new LabelCache(db)
const services = createServices({
imgUriBuilder,
labelCache,
})
const ctx = new DaemonContext({
db,
cfg,
services,
})
const notifications = new NotificationsDaemon(ctx)
return new BskyDaemon({ ctx, notifications })
}

async start() {
const { db } = this.ctx
const pool = db.pool
this.notifications.run()
this.dbStatsInterval = setInterval(() => {
dbLogger.info(
{
idleCount: pool.idleCount,
totalCount: pool.totalCount,
waitingCount: pool.waitingCount,
},
'db pool stats',
)
}, 10000)
this.notifStatsInterval = setInterval(() => {
logger.info(
{
count: this.notifications.count,
lastDid: this.notifications.lastDid,
},
'notifications daemon stats',
)
}, 10000)
return this
}

async destroy(): Promise<void> {
await this.notifications.destroy()
await this.ctx.db.close()
clearInterval(this.dbStatsInterval)
clearInterval(this.notifStatsInterval)
}
}

export default BskyDaemon
6 changes: 6 additions & 0 deletions packages/bsky/src/daemon/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { subsystemLogger } from '@atproto/common'

const logger: ReturnType<typeof subsystemLogger> =
subsystemLogger('bsky:daemon')

export default logger
50 changes: 50 additions & 0 deletions packages/bsky/src/daemon/notifications.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { tidyNotifications } from '../services/util/notification'
import DaemonContext from './context'
import logger from './logger'

export class NotificationsDaemon {
ac = new AbortController()
running: Promise<void> | undefined
count = 0
lastDid: string | null = null

constructor(private ctx: DaemonContext) {}

run(opts?: RunOptions) {
if (this.running) return
this.count = 0
this.lastDid = null
this.ac = new AbortController()
this.running = this.tidyNotifications({
...opts,
forever: opts?.forever !== false, // run forever by default
})
.catch((err) => {
// allow this to cause an unhandled rejection, let deployment handle the crash.
logger.error({ err }, 'notifications daemon crashed')
throw err
})
.finally(() => (this.running = undefined))
}

private async tidyNotifications(opts: RunOptions) {
const actorService = this.ctx.services.actor(this.ctx.db)
for await (const { did } of actorService.all(opts)) {
if (this.ac.signal.aborted) return
try {
await tidyNotifications(this.ctx.db, did)
this.count++
this.lastDid = did
} catch (err) {
logger.warn({ err, did }, 'failed to tidy notifications for actor')
}
}
}

async destroy() {
this.ac.abort()
await this.running
}
}

type RunOptions = { forever?: boolean; batchSize?: number }
20 changes: 20 additions & 0 deletions packages/bsky/src/daemon/services.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { PrimaryDatabase } from '../db'
import { ActorService } from '../services/actor'
import { ImageUriBuilder } from '../image/uri'
import { LabelCache } from '../label-cache'

export function createServices(resources: {
imgUriBuilder: ImageUriBuilder
labelCache: LabelCache
}): Services {
const { imgUriBuilder, labelCache } = resources
return {
actor: ActorService.creator(imgUriBuilder, labelCache),
}
}

export type Services = {
actor: FromDbPrimary<ActorService>
}

type FromDbPrimary<T> = (db: PrimaryDatabase) => T
1 change: 1 addition & 0 deletions packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export { Redis } from './redis'
export { ViewMaintainer } from './db/views'
export { AppContext } from './context'
export { makeAlgos } from './feed-gen'
export * from './daemon'
export * from './indexer'
export * from './ingester'
export { MigrateModerationData } from './migrate-moderation-data'
Expand Down
29 changes: 29 additions & 0 deletions packages/bsky/src/services/actor/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { sql } from 'kysely'
import { wait } from '@atproto/common'
import { Database } from '../../db'
import { notSoftDeletedClause } from '../../db/util'
import { ActorViews } from './views'
Expand Down Expand Up @@ -144,6 +145,34 @@ export class ActorService {
.executeTakeFirst()
return res?.repoRev ?? null
}

async *all(
opts: { batchSize?: number; forever?: boolean; cooldownMs?: number } = {},
) {
const { cooldownMs = 1000, batchSize = 1000, forever = false } = opts
const baseQuery = this.db.db
.selectFrom('actor')
.selectAll()
.orderBy('did')
.limit(batchSize)
while (true) {
let cursor: ActorResult | undefined
do {
const actors = cursor
? await baseQuery.where('did', '>', cursor.did).execute()
: await baseQuery.execute()
for (const actor of actors) {
yield actor
}
cursor = actors.at(-1)
} while (cursor)
if (forever) {
await wait(cooldownMs)
} else {
return
}
}
}
}

type ActorResult = Actor
Expand Down
13 changes: 13 additions & 0 deletions packages/bsky/src/services/indexing/plugins/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const insertFn = async (
obj.reply,
)
if (invalidReplyRoot || violatesThreadGate) {
Object.assign(insertedPost, { invalidReplyRoot, violatesThreadGate })
await db
.updateTable('post')
.where('uri', '=', post.uri)
Expand Down Expand Up @@ -241,6 +242,13 @@ const notifsForInsert = (obj: IndexedPost) => {
}
}

if (obj.post.violatesThreadGate) {
// don't generate reply notifications when post violates threadgate
return notifs
}

// reply notifications

for (const ancestor of obj.ancestors ?? []) {
if (ancestor.uri === obj.post.uri) continue // no need to notify for own post
if (ancestor.height < REPLY_NOTIF_DEPTH) {
Expand Down Expand Up @@ -353,6 +361,11 @@ const updateAggregates = async (db: DatabaseSchema, postIdx: IndexedPost) => {
replyCount: db
.selectFrom('post')
.where('post.replyParent', '=', postIdx.post.replyParent)
.where((qb) =>
qb
.where('post.violatesThreadGate', 'is', null)
.orWhere('post.violatesThreadGate', '=', false),
)
.select(countAll.as('count')),
})
.onConflict((oc) =>
Expand Down
Loading

0 comments on commit 3441489

Please sign in to comment.