diff --git a/.github/workflows/build-and-push-bsky-aws.yaml b/.github/workflows/build-and-push-bsky-aws.yaml index beaf10eb655..34bba3070cd 100644 --- a/.github/workflows/build-and-push-bsky-aws.yaml +++ b/.github/workflows/build-and-push-bsky-aws.yaml @@ -3,7 +3,7 @@ on: push: branches: - main - - timeline-limit-1-opt + - bsky-node-clustering env: REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} diff --git a/services/bsky/api.js b/services/bsky/api.js index c4882335761..42737d72b56 100644 --- a/services/bsky/api.js +++ b/services/bsky/api.js @@ -13,6 +13,7 @@ require('dd-trace') // Only works with commonjs // Tracer code above must come before anything else const path = require('path') const assert = require('assert') +const cluster = require('cluster') const { BunnyInvalidator, CloudfrontInvalidator, @@ -140,12 +141,14 @@ const main = async () => { await bsky.start() // Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/) - process.on('SIGTERM', async () => { + const shutdown = async () => { // Gracefully shutdown periodic-moderation-event-reversal before destroying bsky instance periodicModerationEventReversal.destroy() await periodicModerationEventReversalRunning await bsky.destroy() - }) + } + process.on('SIGTERM', shutdown) + process.on('disconnect', shutdown) // when clustering } const getEnv = () => ({ @@ -223,4 +226,31 @@ const maintainXrpcResource = (span, req) => { } } -main() +const workerCount = maybeParseInt(process.env.CLUSTER_WORKER_COUNT) + +if (workerCount) { + if (cluster.isPrimary) { + console.log(`primary ${process.pid} is running`) + const workers = new Set() + for (let i = 0; i < workerCount; ++i) { + workers.add(cluster.fork()) + } + let teardown = false + cluster.on('exit', (worker) => { + workers.delete(worker) + if (!teardown) { + workers.add(cluster.fork()) // restart on crash + } + }) + process.on('SIGTERM', () => { + teardown = true + console.log('disconnecting workers') + workers.forEach((w) => w.disconnect()) + }) + } else { + console.log(`worker ${process.pid} is running`) + main() + } +} else { + main() // non-clustering +}