diff --git a/docker-compose.swarm.yml b/docker-compose.swarm.yml index 4d15d75e..7035df51 100644 --- a/docker-compose.swarm.yml +++ b/docker-compose.swarm.yml @@ -15,6 +15,9 @@ services: proxynet: aliases: - streamer + dr-data-stager-net: + aliases: + - streamer secrets: - streamer-service-config.json - streamer-mailer-config.json @@ -29,6 +32,9 @@ services: proxynet: aliases: - streamer-ui + dr-data-stager-net: + aliases: + - streamer-ui secrets: - streamer-service-config.json - streamer-ui-config.json @@ -50,6 +56,8 @@ networks: default: name: streamer4user-net attachable: true + dr-data-stager-net: + external: true proxynet: external: true diff --git a/streamer-ui/packages/server/routes/stager.js b/streamer-ui/packages/server/routes/stager.js index ed9e985d..8ef0695b 100644 --- a/streamer-ui/packages/server/routes/stager.js +++ b/streamer-ui/packages/server/routes/stager.js @@ -1,7 +1,6 @@ const createError = require("http-errors"); const path = require("path"); -const { basicAuthString, fetchOnce } = require("./utils"); -const fetch = require("node-fetch"); +const { fetchOnce } = require("./utils"); // location of the streamer-service-config.json file. const fconfig = path.join(__dirname, '../config/streamer-service-config.json'); @@ -20,15 +19,13 @@ var _getDac = async function(req, res, next) { delete require.cache[require.resolve(fconfig)]; const headers = { - 'Content-Type': 'application/json', - 'Authorization': basicAuthString(config.DataStager.username, config.DataStager.password), + 'Content-Type': 'application/json' }; fetchOnce( - config.DataStager.url + "/rdm/DAC/project/" + projectId, + config.DataStager.url + "/dac/project/" + projectId, { method: 'GET', - credentials: 'include', headers, }, 1000 * 30, // timeout after 30 seconds diff --git a/streamer/Dockerfile b/streamer/Dockerfile index 10641ea7..edc3d8bb 100644 --- a/streamer/Dockerfile +++ b/streamer/Dockerfile @@ -1,4 +1,4 @@ -FROM centos:7 +FROM almalinux:8 # application metadata MAINTAINER Donders Institute diff --git a/streamer/config/default.json.template b/streamer/config/default.json.template index 5d5c0885..bd5faefb 100644 --- a/streamer/config/default.json.template +++ b/streamer/config/default.json.template @@ -9,7 +9,7 @@ "password": "bind_password" }, "DataStager": { - "url": "http://stager.dccn.nl:3000", + "url": "http://dr-data-stager-api:8080", "username": "admin", "password": "xxxxxx" }, diff --git a/streamer/lib/modalityMEG.js b/streamer/lib/modalityMEG.js index 07ae4451..2ba9e267 100644 --- a/streamer/lib/modalityMEG.js +++ b/streamer/lib/modalityMEG.js @@ -365,7 +365,7 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { var rget_args = { headers: { 'Accept': 'application/json' } }; - var myurl = sconfig.url + '/rdm/DAC/project/'; + var myurl = sconfig.url + '/dac/project/'; if ( toCatchall || p == 'unknown' ) { myurl += '_CATCHALL.MEG'; } else { @@ -389,9 +389,13 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { // here we get the collection namespace for the project var rpost_args = { - headers: { 'Accept': 'application/json', - 'Content-Type': 'application/json' }, - data: [] + headers: { + 'Accept': 'application/json', + 'Content-Type': 'application/json' + }, + data: { + jobs: [] + } }; if ( src_list.length == 0 ) { @@ -412,32 +416,28 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { for( var i=0; i 0 ) { - c_stager.post(sconfig.url + '/job', rpost_args, function(rdata, resp) { + c_stager.post(sconfig.url + '/jobs', rpost_args, function(rdata, resp) { if ( resp.statusCode >= 400 ) { //HTTP error var errmsg = 'HTTP error: (' + resp.statusCode + ') ' + resp.statusMessage; utility.printErr(job.id + ':MEG:execStreamerJob:submitStagerJob', errmsg); return cb_async_stager(errmsg, false); } else { - rdata.forEach( function(d) { - utility.printLog(job.id + ':MEG:execStreamerJob:submitStagerJob', JSON.stringify(d)); + rdata.jobs.forEach( function(stagerJobData) { + utility.printLog(job.id + ':MEG:execStreamerJob:submitStagerJob', JSON.stringify(stagerJobData)); }); // everything is fine return cb_async_stager(null, true); diff --git a/streamer/lib/modalityMRI.js b/streamer/lib/modalityMRI.js index 77ad58da..5a2d3b0f 100644 --- a/streamer/lib/modalityMRI.js +++ b/streamer/lib/modalityMRI.js @@ -350,7 +350,7 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { // construct project and RESTful endpoint for resolving RDM collection namespace var p = (toCatchall) ? '_CATCHALL.MRI':projectNumber; - var myurl = sconfig.url + '/rdm/DAC/project/' + p; + var myurl = sconfig.url + '/dac/project/' + p; // general function to construct destination URL for stager job var _mkDst = function(_src, _collName) { @@ -402,34 +402,30 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { } var rpost_args = { - headers: { 'Accept': 'application/json', - 'Content-Type': 'application/json' }, - data: [{ - 'type': 'rdm', - 'data': { 'clientIF': 'irods', - 'stagerUser': 'root', - 'rdmUser': 'irods', - 'title': '[' + (new Date()).toISOString() + '] Streamer.MRI: ' + src, - 'timeout': 3600, - 'timeout_noprogress': 600, - 'srcURL': src, - 'dstURL': _mkDst(src, rdata.collName) }, - 'options': { 'attempts': 5, - 'backoff': { 'delay' : 60000, - 'type' : 'fixed' } } - }] + headers: { + 'Accept': 'application/json', + 'Content-Type': 'application/json' + }, + data: { + "drUser": utility.getOuFromCollName(rdata.collName) + "-stager@ru.nl", + "dstURL": _mkDst(src, rdata.collName), + "srcURL": src, + "stagerUser": sconfig.username, + "stagerUserEmail": "", + "timeout": 3600, + "timeout_noprogress": 600, + "title": '[' + (new Date()).toISOString() + '] Streamer.MRI: ' + src + } }; // Submit stager job - c_stager.post(sconfig.url + '/job', rpost_args, function(rdata, resp) { + c_stager.post(sconfig.url + '/job', rpost_args, function(stagerJobData, resp) { if ( resp.statusCode >= 400 ) { //HTTP error var errmsg = 'HTTP error: (' + resp.statusCode + ') ' + resp.statusMessage; utility.printErr(job.id + ':MRI:execStreamerJob:submitStagerJob', errmsg); return cb_async(errmsg, src, projectNumber); } else { - rdata.forEach( function(d) { - utility.printLog(job.id + ':MRI:execStreamerJob:submitStagerJob', JSON.stringify(d)); - }); + utility.printLog(job.id + ':MRI:execStreamerJob:submitStagerJob', JSON.stringify(stagerJobData)); // job submitted!! set to job's maxProgress for the task job.progress(maxProgress, 100); return cb_async(null, src, projectNumber); diff --git a/streamer/lib/modalityUSER.js b/streamer/lib/modalityUSER.js index 4edda5aa..fb32c0ea 100644 --- a/streamer/lib/modalityUSER.js +++ b/streamer/lib/modalityUSER.js @@ -243,7 +243,7 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { password: sconfig.password }); var rget_args = { headers: { 'Accept': 'application/json' } }; - var myurl = sconfig.url + '/rdm/DAC/project/'; + var myurl = sconfig.url + '/dac/project/'; if ( toCatchall || p == 'unknown' ) { // NOTE: it requires the stager to provide endpoint to get the USER catchall collection. myurl += '_CATCHALL.USER'; @@ -271,9 +271,13 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { // here we get the collection namespace for the project var rpost_args = { - headers: { 'Accept': 'application/json', - 'Content-Type': 'application/json' }, - data: [] + headers: { + 'Accept': 'application/json', + 'Content-Type': 'application/json' + }, + data: { + jobs: [] + } }; // construct destination collection @@ -298,30 +302,26 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) { } // compose POST data for submitting stager jobs - rpost_args.data.push({ - 'type': 'rdm', - 'data': { 'clientIF': 'irods', - 'stagerUser': 'root', - 'rdmUser': 'irods', - 'title': '[' + (new Date()).toISOString() + '] Streamer.USER: ' + src, - 'timeout': 3600, - 'timeout_noprogress': 600, - 'srcURL': src, - 'dstURL': dst }, - 'options': { 'attempts': 5, - 'backoff': { 'delay' : 60000, - 'type' : 'fixed' } } + rpost_args.data.jobs.push({ + "drUser": utility.getOuFromCollName(rdata.collName) + "-stager@ru.nl", + "dstURL": dst, + "srcURL": src, + "stagerUser": sconfig.username, + "stagerUserEmail": "", + "timeout": 3600, + "timeout_noprogress": 600, + "title": '[' + (new Date()).toISOString() + '] Streamer.USER: ' + src }); // submit jobs to stager - c_stager.post(sconfig.url + '/job', rpost_args, function(rdata, resp) { + c_stager.post(sconfig.url + '/jobs', rpost_args, function(rdata, resp) { if ( resp.statusCode >= 400 ) { //HTTP error var errmsg = 'HTTP error: (' + resp.statusCode + ') ' + resp.statusMessage; utility.printErr(job.id + ':USER:execStreamerJob:submitStagerJob', errmsg); return cb_async(errmsg, false); } else { - rdata.forEach( function(d) { - utility.printLog(job.id + ':USER:execStreamerJob:submitStagerJob', JSON.stringify(d)); + rdata.jobs.forEach( function(stagerJobData) { + utility.printLog(job.id + ':USER:execStreamerJob:submitStagerJob', JSON.stringify(stagerJobData)); }); // everything is fine job.progress(maxProgress, 100); diff --git a/streamer/lib/utility.js b/streamer/lib/utility.js index ab3d9d08..060306f0 100644 --- a/streamer/lib/utility.js +++ b/streamer/lib/utility.js @@ -1,4 +1,5 @@ const child_process = require('child_process'); +const path = require('path'); // general error handler to send response to the client var _responseOnError = function(c_type, c_data, resp) { @@ -41,7 +42,20 @@ var _diskFree = function(path) { return freespace; } +// parses the RDR iRODS collName to get the ou name in lower case. +// +// The RDR iRODS collName is structured as follows: +// +// `/{zone}/{o}/{ou}/{collection}` +// +// This function gets the value of {ou} assuming the input `collName` +// follows the structure. +var _getOuFromCollName = function(collName) { + return path.basename(path.dirname(collName)).toLowerCase() +} + module.exports.responseOnError = _responseOnError; module.exports.printLog = _printLog; module.exports.printErr = _printErr; module.exports.diskFree = _diskFree; +module.exports.getOuFromCollName = _getOuFromCollName; diff --git a/streamer/lib/utility_ad.js b/streamer/lib/utility_ad.js index 4824f5eb..df0890bc 100644 --- a/streamer/lib/utility_ad.js +++ b/streamer/lib/utility_ad.js @@ -1,7 +1,5 @@ var config = require('config'); var ActiveDirectory = require('activedirectory'); -var utility = require('./utility'); - // utility function for finding a user profile in the Active Directory. // // The argument `name` can be one of the following type of string: