diff --git a/api/src/server.ts b/api/src/server.ts index bab90cd4..b9b69d27 100644 --- a/api/src/server.ts +++ b/api/src/server.ts @@ -1,8 +1,8 @@ import { existsSync } from 'fs' import resolvePath from 'resolve-path' import { app } from './app.ts' -import { startWSServer, stopWSServer } from './utils/wsServer.ts' import { session } from '@data-fair/lib-express/index.js' +import * as wsServer from '@data-fair/lib-express/ws-server.js' import { startObserver, stopObserver } from '@data-fair/lib-node/observer.js' import * as locks from '@data-fair/lib-node/locks.js' import { createHttpTerminator } from 'http-terminator' @@ -31,11 +31,17 @@ export const start = async () => { await mongo.init() await locks.init(mongo.db) + await wsServer.start(server, mongo.db, async (channel, sessionState) => { + const [ownerType, ownerId] = channel.split('/') + if (!sessionState.user) return false + if (sessionState.user.adminMode) return true + return (['admin', 'contrib'].includes(ownerType) && ownerId === sessionState.user.id) + }) + server.listen(config.port) await new Promise(resolve => server.once('listening', resolve)) const npmHttpsProxy = config.npm?.httpsProxy || process.env.HTTPS_PROXY || process.env.https_proxy if (npmHttpsProxy) await exec('npm --workspaces=false --include-workspace-root config set https-proxy ' + npmHttpsProxy) - await startWSServer(server, mongo.db, session) console.log(` API server listening on port ${config.port} @@ -48,5 +54,5 @@ export const stop = async () => { if (config.observer.active) await stopObserver() await locks.stop() await mongo.client.close() - await stopWSServer() + await wsServer.stop() } diff --git a/api/src/utils/wsServer.ts b/api/src/utils/wsServer.ts deleted file mode 100644 index 39e6197e..00000000 --- a/api/src/utils/wsServer.ts +++ /dev/null @@ -1,123 +0,0 @@ -// Simple subscribe mechanism to follow channels of messages from the server -// Bodies are simple JSON objects following theses conventions: - -/* -Upstream examples: -{type: 'subscribe', channel: 'my_channel'} -{type: 'unsubscribe', channel: 'my_channel'} - -Downstream examples: -{type: 'subscribe-confirm', channel: 'my_channel'} -{type: 'unsubscribe-confirm', channel: 'my_channel'} -{type: 'message', channel: 'my_channel', data: {...}} -{type: 'error', data: {...}} -*/ -import { nanoid } from 'nanoid' -import { channel } from '../../../shared/ws.js' -import { WebSocketServer } from 'ws' -import permissions from './permissions.ts' - -let cursor: any -let wss: WebSocketServer -const subscribers: Record> = {} -const clients: Record = {} - -let stopped = false - -export const startWSServer = async (server: any, db: any, session: any) => { - wss = new WebSocketServer({ server }) - wss.on('connection', async (ws, req) => { - // Associate ws connections to ids for subscriptions - const clientId = nanoid() - clients[clientId] = ws - - // Manage subscribe/unsubscribe demands - ws.on('message', async str => { - try { - if (stopped) return - const message = JSON.parse(str.toString()) - if (!message.type || ['subscribe', 'unsubscribe'].indexOf(message.type) === -1) { - return ws.send(JSON.stringify({ type: 'error', data: 'type should be "subscribe" or "unsubscribe"' })) - } - if (!message.channel) { - return ws.send(JSON.stringify({ type: 'error', data: '"channel" is required' })) - } - if (message.type === 'subscribe') { - const [type, _id] = message.channel.split('/') - const resource = await db.collection(type).findOne({ _id }) - const sessionState = await session.reqAuthenticated(req) - if (!permissions.isContrib(sessionState, resource.owner)) { - return ws.send(JSON.stringify({ type: 'error', status: 403, data: 'Permission manquante.' })) - } - - subscribers[message.channel] = subscribers[message.channel] || {} - subscribers[message.channel][clientId] = 1 - return ws.send(JSON.stringify({ type: 'subscribe-confirm', channel: message.channel })) - } - if (message.type === 'unsubscribe') { - subscribers[message.channel] = subscribers[message.channel] || {} - delete subscribers[message.channel][clientId] - return ws.send(JSON.stringify({ type: 'unsubscribe-confirm', channel: message.channel })) - } - } catch (err: any) { - return ws.send(JSON.stringify({ type: 'error', data: err.message })) - } - }) - - ws.on('close', () => { - Object.keys(subscribers).forEach(channel => { - delete subscribers[channel][clientId] - }) - delete clients[clientId] - }) - - ws.on('error', () => ws.terminate()) - - ws.isAlive = true - ws.on('pong', () => { ws.isAlive = true }) - }) - - // standard ping/pong used to detect lost connections - setInterval(function ping () { - if (stopped) return - wss.clients.forEach(ws => { - if (ws.isAlive === false) return ws.terminate() - - ws.isAlive = false - ws.ping('', false, () => {}) - }) - }, 30000) - - const mongoChannel = await channel(db) - await mongoChannel.insertOne({ type: 'init' }) - initCursor(db, mongoChannel) -} - -export const stopWSServer = async () => { - wss.close() - stopped = true - if (cursor) await cursor.close() -} - -// Listen to pubsub channel based on mongodb to support scaling on multiple processes -let startDate = new Date().toISOString() - -const initCursor = async (db: any, mongoChannel: any) => { - cursor = mongoChannel.find({}, { tailable: true, awaitData: true }) - cursor.forEach((doc: any) => { - if (stopped) return - if (doc && doc.type === 'message') { - if (doc.data.date && doc.data.date < startDate) return - const subs = subscribers[doc.channel] || {} - Object.keys(subs).forEach(sub => { - if (clients[sub]) clients[sub].send(JSON.stringify(doc)) - }) - } - }, async (err: any) => { - if (stopped) return - startDate = new Date().toISOString() - await new Promise(resolve => setTimeout(resolve, 1000)) - console.log('WS tailable cursor was interrupted, reinit it', err && err.message) - initCursor(db, mongoChannel) - }) -} diff --git a/shared/ws.js b/shared/ws.js deleted file mode 100644 index 23239a29..00000000 --- a/shared/ws.js +++ /dev/null @@ -1,22 +0,0 @@ -/** - * @param {import('mongodb').Db } db - * @returns {Promise} - */ -export const channel = async (db) => { - const collection = (await db.listCollections({ name: 'messages' }).toArray())[0] - if (!collection) await db.createCollection('messages', { capped: true, size: 100000, max: 1000 }) - return db.collection('messages') -} - -/** - * @param {import('mongodb').Db} db - * @returns {Promise<(channel: string, data: any) => Promise>} - */ -export const initPublisher = async (db) => { - // Write to pubsub channel - const mongoChannel = await channel(db) - await mongoChannel.insertOne({ type: 'init' }) - return async (channel, data) => { - await mongoChannel.insertOne({ type: 'message', channel, data }) - } -} diff --git a/worker/src/task/index.ts b/worker/src/task/index.ts index 6a27e6b7..b5fb5dd2 100644 --- a/worker/src/task/index.ts +++ b/worker/src/task/index.ts @@ -1,6 +1,6 @@ import mongo from '@data-fair/lib-node/mongo.js' +import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js' import nodemailer from 'nodemailer' -import { initPublisher } from '../../../shared/ws.js' import config from '#config' import { run, stop } from './task.ts' @@ -21,9 +21,9 @@ process.on('SIGTERM', function onSigterm () { await mongo.connect(config.mongoUrl, { readPreference: 'primary', maxPoolSize: 1 }) const mailTransport = nodemailer.createTransport(config.mails.transport) -const wsPublish = await initPublisher(mongo.db) +await wsEmitter.init(mongo.db) -const err = await run(mongo.db, mailTransport, wsPublish) +const err = await run(mongo.db, mailTransport) if (err) exitCode = 1 await mongo.client.close() mailTransport.close() diff --git a/worker/src/task/task.ts b/worker/src/task/task.ts index e560f45c..336af964 100644 --- a/worker/src/task/task.ts +++ b/worker/src/task/task.ts @@ -11,6 +11,7 @@ import resolvePath from 'resolve-path' import tmp from 'tmp-promise' import { DataFairWsClient } from '@data-fair/lib-node/ws-client.js' import { httpAgent, httpsAgent } from '@data-fair/lib-node/http-agents.js' +import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js' import { running } from '../utils/runs.js' import config from '#config' import { Processing, Run } from '#types' @@ -105,11 +106,11 @@ const wsInstance = (log: LogFunctions, owner: Account): DataFairWsClient => { /** * Prepare log functions. */ -const prepareLog = (runsCollection: Collection, wsPublish: (channel: string, data: any) => Promise, processing: Processing, run: Run): LogFunctions => { +const prepareLog = (runsCollection: Collection, processing: Processing, run: Run): LogFunctions => { const pushLog = async (log: any) => { log.date = new Date().toISOString() await runsCollection.updateOne({ _id: run._id }, { $push: { log } }) - await wsPublish(`processings/${processing._id}/run-log`, { _id: run._id, log }) + await wsEmitter.emit(`processings/${processing._id}/run-log`, { _id: run._id, log }) } return { @@ -125,7 +126,7 @@ const prepareLog = (runsCollection: Collection, wsPublish: (channel: string const progressDate = new Date().toISOString() await runsCollection.updateOne({ _id: run._id, log: { $elemMatch: { type: 'task', msg } } }, { $set: { 'log.$.progress': progress, 'log.$.total': total, 'log.$.progressDate': progressDate } }) - await wsPublish(`processings/${processing._id}/run-log`, { _id: run._id, log: { type: 'task', msg, progressDate, progress, total } }) + await wsEmitter.emit(`processings/${processing._id}/run-log`, { _id: run._id, log: { type: 'task', msg, progressDate, progress, total } }) } } } @@ -133,7 +134,7 @@ const prepareLog = (runsCollection: Collection, wsPublish: (channel: string /** * Run a processing. */ -export const run = async (db: Db, mailTransport: any, wsPublish: (channel: string, data: any) => Promise) => { +export const run = async (db: Db, mailTransport: any) => { const runsCollection = db.collection('runs') as Collection const processingsCollection = db.collection('processings') as Collection const [run, processing] = await Promise.all([ @@ -143,13 +144,13 @@ export const run = async (db: Db, mailTransport: any, wsPublish: (channel: strin if (!run) throw new Error('Run not found') if (!processing) throw new Error('Processing not found') - const log = prepareLog(runsCollection, wsPublish, processing, run) + const log = prepareLog(runsCollection, processing, run) // @ts-expect-error -> warn is deprecated log.warn = log.warning // for compatibility with old plugins if (run.status === 'running') { await log.step('Reprise après interruption.') } - await running(db, wsPublish, run) + await running(db, run) console.log('') const pluginDir = path.resolve(config.dataDir, 'plugins', processing?.plugin) let pluginConfig = {} @@ -190,7 +191,7 @@ export const run = async (db: Db, mailTransport: any, wsPublish: (channel: strin await log.debug('patch config', patch) Object.assign(processingConfig, patch) processingsCollection.updateOne({ _id: processing._id }, { $set: { config: processingConfig } }) - await wsPublish(`processings/${processing._id}/patch-config`, { patch }) + await wsEmitter.emit(`processings/${processing._id}/patch-config`, { patch }) }, async sendMail (data) { return mailTransport.sendMail(data) diff --git a/worker/src/utils/runs.ts b/worker/src/utils/runs.ts index 791466dc..8112f1d6 100644 --- a/worker/src/utils/runs.ts +++ b/worker/src/utils/runs.ts @@ -1,18 +1,19 @@ import type { Db } from 'mongodb' import type { Run, Processing } from '#types' +import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js' import { incrementConsumption } from './limits.ts' import { runsMetrics } from './metrics.ts' import notifications from './notifications.ts' -export const running = async (db: Db, wsPublish: (channel: string, data: any) => Promise, run: Run) => { +export const running = async (db: Db, run: Run) => { const patch = { status: 'running' as Run['status'], startedAt: new Date().toISOString() } const lastRun = db.collection('runs').findOneAndUpdate( { _id: run._id }, { $set: patch, $unset: { finishedAt: '' } }, { returnDocument: 'after', projection: { log: 0, processing: 0, owner: 0 } } ) - await wsPublish(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch }) + await wsEmitter.emit(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch }) await db.collection('processings') .updateOne({ _id: run.processing._id }, { $set: { lastRun }, $unset: { nextRun: '' } }) } @@ -20,7 +21,7 @@ export const running = async (db: Db, wsPublish: (channel: string, data: any) => /** * Update the database when a run is finished (edit status, log, duration, etc.) */ -export const finish = async (db: Db, wsPublish: (channel: string, data: any) => Promise, run: Run, errorMessage: string | undefined = undefined, errorLogType: string = 'debug') => { +export const finish = async (db: Db, run: Run, errorMessage: string | undefined = undefined, errorLogType: string = 'debug') => { const query: Record = { $set: { status: 'finished', @@ -44,7 +45,7 @@ export const finish = async (db: Db, wsPublish: (channel: string, data: any) => { returnDocument: 'after', projection: { processing: 0, owner: 0 } } )) as Run } - await wsPublish(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch: query.$set }) + await wsEmitter.emit(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch: query.$set }) const duration = (new Date(lastRun.finishedAt!).getTime() - new Date(lastRun.startedAt!).getTime()) / 1000 runsMetrics.labels(({ status: query.$set.status, owner: run.owner.name })).observe(duration) await incrementConsumption(db, run.owner, 'processings_seconds', Math.round(duration)) diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 07e8c51b..c87dea40 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -8,13 +8,13 @@ import { existsSync } from 'fs' import resolvePath from 'resolve-path' import kill from 'tree-kill' +import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js' import * as locks from '@data-fair/lib-node/locks.js' import { startObserver, stopObserver, internalError } from '@data-fair/lib-node/observer.js' import upgradeScripts from '@data-fair/lib-node/upgrade-scripts.js' import config from '#config' import mongo from '#mongo' import { createNext } from '../../shared/runs.ts' -import { initPublisher } from '../../shared/ws.js' import limits from './utils/limits.ts' import { initMetrics } from './utils/metrics.ts' import { finish } from './utils/runs.ts' @@ -26,8 +26,6 @@ let stopped = false const promisePool: [Promise | null] = [null] const pids: Record = {} -let wsPublish: (channel: string, data: any) => Promise - // Loop promises, resolved when stopped let mainLoopPromise: Promise let killLoopPromise: Promise @@ -41,7 +39,7 @@ export const start = async () => { const db = mongo.db await locks.init(db) await upgradeScripts(db, config.upgradeRoot) - wsPublish = await initPublisher(db) + await wsEmitter.init(db) if (config.observer.active) { await initMetrics(db) await startObserver(config.observer.port) @@ -157,7 +155,7 @@ async function killRun (db: Db, run: Run) { console.warn('the run should be killed, it is not locked by another worker and we have no running PID, mark it as already killed', run._id) debug('mark as already killed', run) run.status = 'killed' - await finish(db, wsPublish, run) + await finish(db, run) } finally { await locks.release(run.processing._id) } @@ -190,7 +188,7 @@ async function iter (db: Db, run: Run) { return } if (!processing.active) { - await finish(db, wsPublish, run, 'le traitement a été désactivé', 'error') + await finish(db, run, 'le traitement a été désactivé', 'error') return } @@ -199,7 +197,7 @@ async function iter (db: Db, run: Run) { try { const remaining = await limits.remaining(db, processing.owner) if (remaining.processingsSeconds === 0) { - await finish(db, wsPublish, run, 'le temps de traitement autorisé est épuisé', 'error') + await finish(db, run, 'le temps de traitement autorisé est épuisé', 'error') // @test:spy("processingsSecondsExceeded") return } @@ -227,7 +225,7 @@ async function iter (db: Db, run: Run) { }) pids[run._id] = spawnPromise.childProcess.pid || -1 await spawnPromise // wait for the task to finish - await finish(db, wsPublish, run) + await finish(db, run) } catch (err: any) { // Build back the original error message from the stderr of the child process const errorMessage = [] @@ -245,11 +243,11 @@ async function iter (db: Db, run: Run) { // case of interruption by a SIGTERM if (err.code === 143) { run.status = 'killed' - await finish(db, wsPublish, run) + await finish(db, run) // @test:spy("isKilled") } else { console.warn(`failure ${processing.title} > ${run._id}`, errorMessage.join('\n')) - await finish(db, wsPublish, run, errorMessage.join('\n')) + await finish(db, run, errorMessage.join('\n')) // @test:spy("isFailure") } } else { @@ -307,7 +305,7 @@ async function acquireNext (db: Db): Promise { if (run.status === 'running') { try { console.warn('we had to close a run that was stuck in running status', run) - await finish(db, wsPublish, run, 'le traitement a été interrompu suite à une opération de maintenance', 'error') + await finish(db, run, 'le traitement a été interrompu suite à une opération de maintenance', 'error') const processing = await db.collection('processings').findOne({ _id: run.processing._id }) await locks.release(run.processing._id) if (processing && processing.scheduling.length) {