Skip to content

Commit

Permalink
refactor: worker/task
Browse files Browse the repository at this point in the history
  • Loading branch information
BatLeDev committed Oct 30, 2024
1 parent 516ddb2 commit 77d71b1
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 49 deletions.
2 changes: 0 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ Run a development server (access it at http://localhost:8082/api/) :
npm run dev-api
```

You can find the API documentation here : [API](./docs/api.md)

## Working on @data-fair/processings/ui

The UI is a [Nuxt 3](https://nuxt.com/) project that uses [Vuetify 3](https://vuetifyjs.com/).
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion worker/index.js → worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { start, stop } from './src/worker.js'
import { start, stop } from './src/worker.ts'

start().then(() => {}, err => {
console.error('Failure while starting worker', err)
Expand Down
6 changes: 3 additions & 3 deletions worker/src/task/index.js → worker/src/task/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import mongo from '@data-fair/lib/node/mongo.js'
import mongo from '@data-fair/lib-node/mongo.js'
import nodemailer from 'nodemailer'
import { initPublisher } from '../../../shared/ws.js'
import config from '../config.js'
import { run, stop } from './task.js'
import config from '#config'
import { run, stop } from './task.ts'

let exitCode = 0

Expand Down
67 changes: 24 additions & 43 deletions worker/src/task/task.js → worker/src/task/task.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import type { LogFunctions, ProcessingContext } from '@data-fair/lib-common-types/processings.ts'
import type { Account } from '@data-fair/lib-express/index.js'
import type { Collection, Db } from 'mongodb'

import axios from 'axios'
import axiosRetry from 'axios-retry'
import util from 'node:util'
Expand All @@ -6,32 +10,26 @@ import fs from 'fs-extra'
import path from 'path'
import resolvePath from 'resolve-path'
import tmp from 'tmp-promise'
import { DataFairWsClient } from '@data-fair/lib/node/ws.js'
import { httpAgent, httpsAgent } from '@data-fair/lib/node/http-agents.js'
import { DataFairWsClient } from '@data-fair/lib-node/ws-client.js'
import { httpAgent, httpsAgent } from '@data-fair/lib-node/http-agents.js'
import { running } from '../utils/runs.js'
import { Processing, Run } from '../../../api/types/index.ts'

fs.ensureDirSync(config.dataDir)
const baseTmpDir = config.tmpDir || path.join(config.dataDir, 'tmp')
fs.ensureDirSync(baseTmpDir)

tmp.setGracefulCleanup()

/** @type {any} */
let pluginModule
/** @type {boolean} */
let _stopped
let pluginModule: { run: (context: ProcessingContext) => Promise<void>, stop?: () => Promise<void> }
let _stopped: boolean
const processingsDir = path.join(config.dataDir, 'processings')

/** @typedef {import('../../../shared/types/run/index.js').Run} Run */

/**
* Create an Axios instance.
* @param {import('../../../shared/types/processing/index.js').Processing} processing
* @returns {import('axios').AxiosInstance} Axios instance.
*/
const getAxiosInstance = (processing) => {
/** @type {any} */
const headers = {
const getAxiosInstance = (processing: Processing) => {
const headers: Record<string, string> = {
'x-apiKey': config.dataFairAPIKey
}
if (config.dataFairAdminMode) headers['x-account'] = JSON.stringify(processing.owner)
Expand Down Expand Up @@ -62,8 +60,7 @@ const getAxiosInstance = (processing) => {
isDataFairUrl &&
(config.getFromPrivateDataFairUrl || ['post', 'put', 'delete', 'patch'].includes(cfg.method || ''))
if (usePrivate) {
// @ts-ignore -> privateDataFairUrl can't be undefined here
cfg.url = cfg.url.replace(config.dataFairUrl, config.privateDataFairUrl)
cfg.url = cfg.url.replace(config.dataFairUrl, config.privateDataFairUrl!)
cfg.headers.host = new URL(config.dataFairUrl).host
}
return cfg
Expand All @@ -74,11 +71,11 @@ const getAxiosInstance = (processing) => {

// customize axios errors for shorter stack traces when a request fails
// WARNING: we used to do it in an interceptor, but it was incompatible with axios-retry
const prepareAxiosError = (/** @type {any} */error) => {
const prepareAxiosError = (error: any) => {
const response = error.response ?? error.request?.res ?? error.res
if (!response) return error
delete response.request
const headers = {}
const headers: Record<string, string> = {}
if (response.headers?.location) headers.location = response.headers.location
response.headers = headers
response.config = response.config ?? error.config
Expand All @@ -94,11 +91,8 @@ const prepareAxiosError = (/** @type {any} */error) => {

/**
* Create a WebSocket instance.
* @param {import('@data-fair/lib/processings/types.js').LogFunctions} log - Log functions.
* @param {import('@data-fair/lib/express/index.js').Account} owner - Owner account.
* @returns {DataFairWsClient} WebSocket instance.
*/
const wsInstance = (log, owner) => {
const wsInstance = (log: LogFunctions, owner: Account): DataFairWsClient => {
return new DataFairWsClient({
url: config.privateDataFairUrl || config.dataFairUrl,
apiKey: config.dataFairAPIKey,
Expand All @@ -110,15 +104,9 @@ const wsInstance = (log, owner) => {

/**
* Prepare log functions.
* @param {import('mongodb').Collection<Run>} runsCollection - Runs collection
* @param {(channel: string, data: any) => Promise<void>} wsPublish - Publish function
* @param {import('../../../shared/types/processing/index.js').Processing} processing - Processing
* @param {Run} run - run
* @returns {import('@data-fair/lib/processings/types.js').LogFunctions} Log functions
*/
const prepareLog = (runsCollection, wsPublish, processing, run) => {
/** @param {any} log - Log */
const pushLog = async (log) => {
const prepareLog = (runsCollection: Collection<Run>, wsPublish: (channel: string, data: any) => Promise<void>, processing: Processing, run: Run): LogFunctions => {
const pushLog = async (log: any) => {
log.date = new Date().toISOString()
await runsCollection.updateOne({ _id: run._id }, { $push: { log } })
await wsPublish(`processings/${processing._id}/run-log`, { _id: run._id, log })
Expand All @@ -144,16 +132,10 @@ const prepareLog = (runsCollection, wsPublish, processing, run) => {

/**
* Run a processing.
* @param {import('mongodb').Db} db - Database.
* @param {any} mailTransport - Mail transport.
* @param {(channel: string, data: any) => Promise<void>} wsPublish - Publish function.
*/
export const run = async (db, mailTransport, wsPublish) => {
/** @type {import('mongodb').Collection<Run>} */
const runsCollection = db.collection('runs')
/** @type {import('mongodb').Collection<import('../../../shared/types/processing/index.js').Processing>} */
const processingsCollection = db.collection('processings')
/** @type {[Run | null, import('../../../shared/types/processing/index.js').Processing | null]} */
export const run = async (db: Db, mailTransport: any, wsPublish: (channel: string, data: any) => Promise<void>) => {
const runsCollection = db.collection('runs') as Collection<Run>
const processingsCollection = db.collection('processings') as Collection<Processing>
const [run, processing] = await Promise.all([
runsCollection.findOne({ _id: process.argv[2] }),
processingsCollection.findOne({ _id: process.argv[3] })
Expand Down Expand Up @@ -195,8 +177,7 @@ export const run = async (db, mailTransport, wsPublish) => {
}
})

/** @type {import('@data-fair/lib/processings/types.js').ProcessingContext} */
const context = {
const context: ProcessingContext = {
pluginConfig,
processingConfig,
processingId: processing?._id,
Expand Down Expand Up @@ -224,9 +205,9 @@ export const run = async (db, mailTransport, wsPublish) => {
process.chdir(cwd)
if (_stopped) await log.error('L\'exécution a été interrompue', '')
else await log.info('L\'exécution est terminée', '')
} catch (/** @type {any} */ _err) {
} catch (e: any) {
process.chdir(cwd)
const err = prepareAxiosError(_err)
const err = prepareAxiosError(e)
const httpMessage = getHttpErrorMessage(err)

if (httpMessage) {
Expand Down Expand Up @@ -259,7 +240,7 @@ export const stop = async () => {
await new Promise(resolve => setTimeout(resolve, config.worker.gracePeriod))
}

const getHttpErrorMessage = (/** @type {any} */err) => {
const getHttpErrorMessage = (err: any) => {
let httpMessage = err.status ?? err.statusCode
if (httpMessage) {
const statusText = err.statusText ?? err.statusMessage
Expand Down

0 comments on commit 77d71b1

Please sign in to comment.