Skip to content

Commit

Permalink
refactor: integrate ws-server & ws-emiter from data-fair/lib
Browse files Browse the repository at this point in the history
  • Loading branch information
BatLeDev committed Oct 31, 2024
1 parent 0b86615 commit 1cd73af
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 173 deletions.
12 changes: 9 additions & 3 deletions api/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { existsSync } from 'fs'
import resolvePath from 'resolve-path'
import { app } from './app.ts'
import { startWSServer, stopWSServer } from './utils/wsServer.ts'
import { session } from '@data-fair/lib-express/index.js'
import * as wsServer from '@data-fair/lib-express/ws-server.js'
import { startObserver, stopObserver } from '@data-fair/lib-node/observer.js'
import * as locks from '@data-fair/lib-node/locks.js'
import { createHttpTerminator } from 'http-terminator'
Expand Down Expand Up @@ -31,11 +31,17 @@ export const start = async () => {
await mongo.init()
await locks.init(mongo.db)

await wsServer.start(server, mongo.db, async (channel, sessionState) => {
const [ownerType, ownerId] = channel.split('/')
if (!sessionState.user) return false
if (sessionState.user.adminMode) return true
return (['admin', 'contrib'].includes(ownerType) && ownerId === sessionState.user.id)
})

server.listen(config.port)
await new Promise(resolve => server.once('listening', resolve))
const npmHttpsProxy = config.npm?.httpsProxy || process.env.HTTPS_PROXY || process.env.https_proxy
if (npmHttpsProxy) await exec('npm --workspaces=false --include-workspace-root config set https-proxy ' + npmHttpsProxy)
await startWSServer(server, mongo.db, session)

console.log(`
API server listening on port ${config.port}
Expand All @@ -48,5 +54,5 @@ export const stop = async () => {
if (config.observer.active) await stopObserver()
await locks.stop()
await mongo.client.close()
await stopWSServer()
await wsServer.stop()
}
123 changes: 0 additions & 123 deletions api/src/utils/wsServer.ts

This file was deleted.

22 changes: 0 additions & 22 deletions shared/ws.js

This file was deleted.

6 changes: 3 additions & 3 deletions worker/src/task/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import mongo from '@data-fair/lib-node/mongo.js'
import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js'
import nodemailer from 'nodemailer'
import { initPublisher } from '../../../shared/ws.js'
import config from '#config'
import { run, stop } from './task.ts'

Expand All @@ -21,9 +21,9 @@ process.on('SIGTERM', function onSigterm () {

await mongo.connect(config.mongoUrl, { readPreference: 'primary', maxPoolSize: 1 })
const mailTransport = nodemailer.createTransport(config.mails.transport)
const wsPublish = await initPublisher(mongo.db)
await wsEmitter.init(mongo.db)

const err = await run(mongo.db, mailTransport, wsPublish)
const err = await run(mongo.db, mailTransport)
if (err) exitCode = 1
await mongo.client.close()
mailTransport.close()
Expand Down
15 changes: 8 additions & 7 deletions worker/src/task/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import resolvePath from 'resolve-path'
import tmp from 'tmp-promise'
import { DataFairWsClient } from '@data-fair/lib-node/ws-client.js'
import { httpAgent, httpsAgent } from '@data-fair/lib-node/http-agents.js'
import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js'
import { running } from '../utils/runs.js'
import config from '#config'
import { Processing, Run } from '#types'
Expand Down Expand Up @@ -105,11 +106,11 @@ const wsInstance = (log: LogFunctions, owner: Account): DataFairWsClient => {
/**
* Prepare log functions.
*/
const prepareLog = (runsCollection: Collection<Run>, wsPublish: (channel: string, data: any) => Promise<void>, processing: Processing, run: Run): LogFunctions => {
const prepareLog = (runsCollection: Collection<Run>, processing: Processing, run: Run): LogFunctions => {
const pushLog = async (log: any) => {
log.date = new Date().toISOString()
await runsCollection.updateOne({ _id: run._id }, { $push: { log } })
await wsPublish(`processings/${processing._id}/run-log`, { _id: run._id, log })
await wsEmitter.emit(`processings/${processing._id}/run-log`, { _id: run._id, log })
}

return {
Expand All @@ -125,15 +126,15 @@ const prepareLog = (runsCollection: Collection<Run>, wsPublish: (channel: string
const progressDate = new Date().toISOString()
await runsCollection.updateOne({ _id: run._id, log: { $elemMatch: { type: 'task', msg } } },
{ $set: { 'log.$.progress': progress, 'log.$.total': total, 'log.$.progressDate': progressDate } })
await wsPublish(`processings/${processing._id}/run-log`, { _id: run._id, log: { type: 'task', msg, progressDate, progress, total } })
await wsEmitter.emit(`processings/${processing._id}/run-log`, { _id: run._id, log: { type: 'task', msg, progressDate, progress, total } })
}
}
}

/**
* Run a processing.
*/
export const run = async (db: Db, mailTransport: any, wsPublish: (channel: string, data: any) => Promise<void>) => {
export const run = async (db: Db, mailTransport: any) => {
const runsCollection = db.collection('runs') as Collection<Run>
const processingsCollection = db.collection('processings') as Collection<Processing>
const [run, processing] = await Promise.all([
Expand All @@ -143,13 +144,13 @@ export const run = async (db: Db, mailTransport: any, wsPublish: (channel: strin
if (!run) throw new Error('Run not found')
if (!processing) throw new Error('Processing not found')

const log = prepareLog(runsCollection, wsPublish, processing, run)
const log = prepareLog(runsCollection, processing, run)
// @ts-expect-error -> warn is deprecated
log.warn = log.warning // for compatibility with old plugins
if (run.status === 'running') {
await log.step('Reprise après interruption.')
}
await running(db, wsPublish, run)
await running(db, run)
console.log('<running>')
const pluginDir = path.resolve(config.dataDir, 'plugins', processing?.plugin)
let pluginConfig = {}
Expand Down Expand Up @@ -190,7 +191,7 @@ export const run = async (db: Db, mailTransport: any, wsPublish: (channel: strin
await log.debug('patch config', patch)
Object.assign(processingConfig, patch)
processingsCollection.updateOne({ _id: processing._id }, { $set: { config: processingConfig } })
await wsPublish(`processings/${processing._id}/patch-config`, { patch })
await wsEmitter.emit(`processings/${processing._id}/patch-config`, { patch })
},
async sendMail (data) {
return mailTransport.sendMail(data)
Expand Down
9 changes: 5 additions & 4 deletions worker/src/utils/runs.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import type { Db } from 'mongodb'
import type { Run, Processing } from '#types'

import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js'
import { incrementConsumption } from './limits.ts'
import { runsMetrics } from './metrics.ts'
import notifications from './notifications.ts'

export const running = async (db: Db, wsPublish: (channel: string, data: any) => Promise<void>, run: Run) => {
export const running = async (db: Db, run: Run) => {
const patch = { status: 'running' as Run['status'], startedAt: new Date().toISOString() }
const lastRun = db.collection<Run>('runs').findOneAndUpdate(
{ _id: run._id },
{ $set: patch, $unset: { finishedAt: '' } },
{ returnDocument: 'after', projection: { log: 0, processing: 0, owner: 0 } }
)
await wsPublish(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch })
await wsEmitter.emit(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch })
await db.collection<Processing>('processings')
.updateOne({ _id: run.processing._id }, { $set: { lastRun }, $unset: { nextRun: '' } })
}

/**
* Update the database when a run is finished (edit status, log, duration, etc.)
*/
export const finish = async (db: Db, wsPublish: (channel: string, data: any) => Promise<void>, run: Run, errorMessage: string | undefined = undefined, errorLogType: string = 'debug') => {
export const finish = async (db: Db, run: Run, errorMessage: string | undefined = undefined, errorLogType: string = 'debug') => {
const query: Record<string, any> = {
$set: {
status: 'finished',
Expand All @@ -44,7 +45,7 @@ export const finish = async (db: Db, wsPublish: (channel: string, data: any) =>
{ returnDocument: 'after', projection: { processing: 0, owner: 0 } }
)) as Run
}
await wsPublish(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch: query.$set })
await wsEmitter.emit(`processings/${run.processing._id}/run-patch`, { _id: run._id, patch: query.$set })
const duration = (new Date(lastRun.finishedAt!).getTime() - new Date(lastRun.startedAt!).getTime()) / 1000
runsMetrics.labels(({ status: query.$set.status, owner: run.owner.name })).observe(duration)
await incrementConsumption(db, run.owner, 'processings_seconds', Math.round(duration))
Expand Down
20 changes: 9 additions & 11 deletions worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import { existsSync } from 'fs'
import resolvePath from 'resolve-path'
import kill from 'tree-kill'

import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js'
import * as locks from '@data-fair/lib-node/locks.js'
import { startObserver, stopObserver, internalError } from '@data-fair/lib-node/observer.js'
import upgradeScripts from '@data-fair/lib-node/upgrade-scripts.js'
import config from '#config'
import mongo from '#mongo'
import { createNext } from '../../shared/runs.ts'
import { initPublisher } from '../../shared/ws.js'
import limits from './utils/limits.ts'
import { initMetrics } from './utils/metrics.ts'
import { finish } from './utils/runs.ts'
Expand All @@ -26,8 +26,6 @@ let stopped = false
const promisePool: [Promise<void> | null] = [null]
const pids: Record<string, number> = {}

let wsPublish: (channel: string, data: any) => Promise<void>

// Loop promises, resolved when stopped
let mainLoopPromise: Promise<void>
let killLoopPromise: Promise<void>
Expand All @@ -41,7 +39,7 @@ export const start = async () => {
const db = mongo.db
await locks.init(db)
await upgradeScripts(db, config.upgradeRoot)
wsPublish = await initPublisher(db)
await wsEmitter.init(db)
if (config.observer.active) {
await initMetrics(db)
await startObserver(config.observer.port)
Expand Down Expand Up @@ -157,7 +155,7 @@ async function killRun (db: Db, run: Run) {
console.warn('the run should be killed, it is not locked by another worker and we have no running PID, mark it as already killed', run._id)
debug('mark as already killed', run)
run.status = 'killed'
await finish(db, wsPublish, run)
await finish(db, run)
} finally {
await locks.release(run.processing._id)
}
Expand Down Expand Up @@ -190,7 +188,7 @@ async function iter (db: Db, run: Run) {
return
}
if (!processing.active) {
await finish(db, wsPublish, run, 'le traitement a été désactivé', 'error')
await finish(db, run, 'le traitement a été désactivé', 'error')
return
}

Expand All @@ -199,7 +197,7 @@ async function iter (db: Db, run: Run) {
try {
const remaining = await limits.remaining(db, processing.owner)
if (remaining.processingsSeconds === 0) {
await finish(db, wsPublish, run, 'le temps de traitement autorisé est épuisé', 'error')
await finish(db, run, 'le temps de traitement autorisé est épuisé', 'error')
// @test:spy("processingsSecondsExceeded")
return
}
Expand Down Expand Up @@ -227,7 +225,7 @@ async function iter (db: Db, run: Run) {
})
pids[run._id] = spawnPromise.childProcess.pid || -1
await spawnPromise // wait for the task to finish
await finish(db, wsPublish, run)
await finish(db, run)
} catch (err: any) {
// Build back the original error message from the stderr of the child process
const errorMessage = []
Expand All @@ -245,11 +243,11 @@ async function iter (db: Db, run: Run) {
// case of interruption by a SIGTERM
if (err.code === 143) {
run.status = 'killed'
await finish(db, wsPublish, run)
await finish(db, run)
// @test:spy("isKilled")
} else {
console.warn(`failure ${processing.title} > ${run._id}`, errorMessage.join('\n'))
await finish(db, wsPublish, run, errorMessage.join('\n'))
await finish(db, run, errorMessage.join('\n'))
// @test:spy("isFailure")
}
} else {
Expand Down Expand Up @@ -307,7 +305,7 @@ async function acquireNext (db: Db): Promise<Run | undefined> {
if (run.status === 'running') {
try {
console.warn('we had to close a run that was stuck in running status', run)
await finish(db, wsPublish, run, 'le traitement a été interrompu suite à une opération de maintenance', 'error')
await finish(db, run, 'le traitement a été interrompu suite à une opération de maintenance', 'error')
const processing = await db.collection<Processing>('processings').findOne({ _id: run.processing._id })
await locks.release(run.processing._id)
if (processing && processing.scheduling.length) {
Expand Down

0 comments on commit 1cd73af

Please sign in to comment.