From 6933f094c72479082eb75f134a048595585eda70 Mon Sep 17 00:00:00 2001 From: Daniele Guido Date: Mon, 6 May 2024 16:49:31 +0200 Subject: [PATCH] update celery client --- README.md | 2 + package-lock.json | 173 ++++++++++++++++++++++++++++++++++++++++++++++ package.json | 5 +- src/celery.js | 118 +++++++++++++------------------ 4 files changed, 226 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index 298e1ee1..6bf4f405 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ npm run watch # Run the app in another terminal: make run-dev +# or +VUE_APP_MIDDLELAYER_API_PATH=/ VUE_APP_MIDDLELAYER_API=http://localhost:3030 VUE_APP_MIDDLELAYER_API_SOCKET_PATH=/socket.io make run-dev ``` ## About diff --git a/package-lock.json b/package-lock.json index 656dc777..d33f9fae 100644 --- a/package-lock.json +++ b/package-lock.json @@ -24,6 +24,7 @@ "body-parser": "^1.18.3", "cache-manager": "^4.1.0", "cache-manager-redis-store": "^2.0.0", + "celery-node": "^0.5.9", "child-process-async": "^1.0.1", "chrono-node": "^2.3.2", "cli-color": "^1.4.0", @@ -1686,6 +1687,22 @@ "lodash": "^4.0.0" } }, + "node_modules/amqplib": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.8.0.tgz", + "integrity": "sha512-icU+a4kkq4Y1PS4NNi+YPDMwdlbFcZ1EZTQT2nigW3fvOb6AOgUQ9+Mk4ue0Zu5cBg/XpDzB40oH10ysrk2dmA==", + "dependencies": { + "bitsyntax": "~0.1.0", + "bluebird": "^3.7.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "safe-buffer": "~5.2.1", + "url-parse": "~1.5.1" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/ansi-colors": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.1.tgz", @@ -2034,6 +2051,37 @@ "node": ">=8" } }, + "node_modules/bitsyntax": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.1.0.tgz", + "integrity": "sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==", + "dependencies": { + "buffer-more-ints": "~1.0.0", + "debug": "~2.6.9", + "safe-buffer": "~5.1.2" + }, + "engines": { + "node": ">=0.8" + } + }, + "node_modules/bitsyntax/node_modules/debug": { + "version": "2.6.9", + "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", + "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", + "dependencies": { + "ms": "2.0.0" + } + }, + "node_modules/bitsyntax/node_modules/ms": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", + "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==" + }, + "node_modules/bitsyntax/node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, "node_modules/bl": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", @@ -2088,6 +2136,11 @@ "safe-buffer": "~5.2.0" } }, + "node_modules/bluebird": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", + "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==" + }, "node_modules/bn.js": { "version": "4.12.0", "resolved": "https://registry.npmjs.org/bn.js/-/bn.js-4.12.0.tgz", @@ -2177,6 +2230,11 @@ "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.1.tgz", "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==" }, + "node_modules/buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "node_modules/buffer/node_modules/isarray": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", @@ -2338,6 +2396,16 @@ "node": ">= 0.8.0" } }, + "node_modules/celery-node": { + "version": "0.5.9", + "resolved": "https://registry.npmjs.org/celery-node/-/celery-node-0.5.9.tgz", + "integrity": "sha512-FvLigz3WAAraynNLIZy+BoMBWnskJgRVPeeb2SSY7MOrB01ue52ffYtHYKLNayCmGGag8KBdYvNaRXpN18OW9g==", + "dependencies": { + "amqplib": "^0.8.0", + "ioredis": "^4.28.5", + "uuid": "^3.3.2" + } + }, "node_modules/chalk": { "version": "2.4.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", @@ -5834,6 +5902,39 @@ "node": ">= 0.4" } }, + "node_modules/ioredis": { + "version": "4.28.5", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.28.5.tgz", + "integrity": "sha512-3GYo0GJtLqgNXj4YhrisLaNNvWSNwSS2wS4OELGfGxH8I69+XfNdnmV1AyN+ZqMh0i7eX+SWjrwFKDBDgfBC1A==", + "dependencies": { + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.1", + "denque": "^1.1.0", + "lodash.defaults": "^4.2.0", + "lodash.flatten": "^4.4.0", + "lodash.isarguments": "^3.1.0", + "p-map": "^2.1.0", + "redis-commands": "1.7.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=6" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/ioredis/node_modules/denque": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/denque/-/denque-1.5.1.tgz", + "integrity": "sha512-XwE+iZ4D6ZUB7mfYRMb5wByE8L74HCn30FBN7sWnXksWc1LO1bPDl67pBR9o/kC4z/xSNAwkMYcGgqDV3BE3Hw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -6804,11 +6905,21 @@ "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz", "integrity": "sha1-4j8/nE+Pvd6HJSnBBxhXoIblzO8=" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==" + }, "node_modules/lodash.first": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/lodash.first/-/lodash.first-3.0.0.tgz", "integrity": "sha1-Xa4YDX+BjuZfxbIQsQSnu++YoWo=" }, + "node_modules/lodash.flatten": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz", + "integrity": "sha512-C5N2Z3DgnnKr0LOpv/hKCgKdb7ZZwafIrsesve6lmzvZIRZRGaZ/l6Q8+2W7NaT+ZwO3fFlSCzCzrDCFdJfZ4g==" + }, "node_modules/lodash.get": { "version": "4.4.2", "resolved": "https://registry.npmjs.org/lodash.get/-/lodash.get-4.4.2.tgz", @@ -6824,6 +6935,11 @@ "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, "node_modules/lodash.isboolean": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", @@ -7927,6 +8043,14 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-map": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-2.1.0.tgz", + "integrity": "sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==", + "engines": { + "node": ">=6" + } + }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -8380,6 +8504,11 @@ "node": ">=0.4.x" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, "node_modules/queue-microtask": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.3.tgz", @@ -8450,6 +8579,22 @@ "rc": "cli.js" } }, + "node_modules/readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "node_modules/readable-stream/node_modules/isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" + }, "node_modules/readdirp": { "version": "3.6.0", "resolved": "https://registry.npmjs.org/readdirp/-/readdirp-3.6.0.tgz", @@ -9265,6 +9410,11 @@ "node": "*" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", @@ -9293,6 +9443,11 @@ "bare-events": "^2.2.0" } }, + "node_modules/string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" + }, "node_modules/string-width-cjs": { "name": "string-width", "version": "4.2.3", @@ -10180,6 +10335,15 @@ "querystring": "0.2.0" } }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/url/node_modules/punycode": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/punycode/-/punycode-1.3.2.tgz", @@ -10210,6 +10374,15 @@ "node": ">= 0.4.0" } }, + "node_modules/uuid": { + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz", + "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A==", + "deprecated": "Please upgrade to version 7 or higher. Older versions may use Math.random() in certain circumstances, which is known to be problematic. See https://v8.dev/blog/math-random for details.", + "bin": { + "uuid": "bin/uuid" + } + }, "node_modules/validator": { "version": "13.11.0", "resolved": "https://registry.npmjs.org/validator/-/validator-13.11.0.tgz", diff --git a/package.json b/package.json index 58b10c77..b4751a1d 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,7 @@ "body-parser": "^1.18.3", "cache-manager": "^4.1.0", "cache-manager-redis-store": "^2.0.0", + "celery-node": "^0.5.9", "child-process-async": "^1.0.1", "chrono-node": "^2.3.2", "cli-color": "^1.4.0", @@ -107,7 +108,7 @@ "mysql2": "^2.3.3", "nanoid": "^2.0.1", "neo4j-driver": "^1.7.2", - "node-celery": "^0.2.8", + "node-celery": "0.5.9", "node-eta": "^0.9.0", "node-fetch": "2.7.0", "node-http-proxy-json": "^0.1.6", @@ -139,10 +140,10 @@ "eslint-plugin-import": "^2.26.0", "eslint-plugin-n": "^15.2.3", "eslint-plugin-promise": "^6.0.0", + "json-schema-to-typescript": "14.0.0", "mocha": "^9.1.3", "nodemon": "3.1.0", "prettier": "3.2.5", - "json-schema-to-typescript": "14.0.0", "typescript": "5.4.3", "typescript-cp": "0.1.9" } diff --git a/src/celery.js b/src/celery.js index a94651cd..1d926866 100644 --- a/src/celery.js +++ b/src/celery.js @@ -1,12 +1,13 @@ -const debug = require('debug')('impresso/celery'); -const celery = require('node-celery'); -const Job = require('./models/jobs.model'); +const debug = require('debug')('impresso/celery') +const celery = require('celery-node') +const Job = require('./models/jobs.model') +const { de } = require('chrono-node') const JOB_STATUS_TRANSLATIONS = { REA: 'A new job has been created', RUN: 'Job is doing its job ...', DON: 'Job done! Congrats.', -}; +} /** * @@ -16,76 +17,53 @@ const JOB_STATUS_TRANSLATIONS = { * CELERY_RESULT_BACKEND: 'redis://localhost/5' * } * @param {[type]} config [description] - * @return {[type]} [description] */ -const getCeleryClient = (config, app) => { - const client = celery.createClient(config); - - client.on('error', err => { - debug(`Error! ${err}`); - }); - - client.on('ready', err => { - debug(`ready! ${err}`); - }); - client.on('message', msg => { - let result = msg.result; - - if (result && typeof result === 'string') { - try { - result = JSON.parse(msg); - } catch (err) { - debug('@message, ERROR, cannot get json from this string:', result); - console.error(err); - } - } - - if (result && typeof result === 'object') { - if (result.job) { - debug(`@message related to job: ${result.job_id}, send to: ${result.user_uid}`, result); - app.service('logs').create({ - ...result, - job: new Job({ - ...result.job, - creationDate: result.job.date_created, - }), - msg: JOB_STATUS_TRANSLATIONS[result.job.status], - to: result.user_uid, - from: 'jobs', - }); - } else { - debug('@message from unknown origin, cannot propagate:', result); - } - } - }); - - client.on('connect', async () => { - debug('Celery is ready!'); - }); - - client.run = ({ task = 'echo', args = [] } = {}) => - new Promise((resolve, reject) => { - debug(`run celery task ${task}`); - client.call(task, args, res => { - debug('Celery task retrieved!', res); - if (['SUCCESS', 'INIT', 'PROGRESS', 'STOPPED'].indexOf(res.status) !== -1) { - resolve(res); - } else { - reject(res); - } - }); - }); +const getCeleryClient = config => { + const client = celery.createClient(config.CELERY_BROKER_URL, config.CELERY_RESULT_BACKEND) + debug( + 'getCeleryClient CELERY_BROKER_URL:', + config.CELERY_BROKER_URL, + ' CELERY_RESULT_BACKEND:', + config.CELERY_RESULT_BACKEND + ) + const run = ({ task = 'impresso.tasks.echo', args = ['this is a test'] } = {}) => { + debug(`run celery task ${task}`) + const celeryTask = client.createTask(task) + const result = celeryTask.applyAsync(args) + return result + .get() + .then(data => { + debug('Celery task retrieved!', data) + console.log(data) + // client.disconnect(); + }) + .catch(err => { + console.error(err) + debug(`Error! ${err}`, err) + }) + } - return client; -}; + return () => ({ + run, + }) +} module.exports = function (app) { - const config = app.get('celery'); + const config = app.get('celery') + // wait for redis to be ready if (!config?.enable) { - debug('Celery is not configured. No task management is available.'); - app.set('celeryClient', null); + debug('Celery is not configured. No task management is available.') + app.set('celeryClient', null) } else { - debug("Celery configuration found, let's see if it works..."); - app.set('celeryClient', getCeleryClient(config, app)); + debug("Celery configuration found, let's see if it works...") + try { + const client = getCeleryClient(config) + console.log('client', client) + client.run() + app.set('celeryClient', client) + } catch (err) { + console.error(err) + debug(`Error! ${err}`, err) + } } -}; +}