Skip to content

Commit

Permalink
Merge branch 'master' into ASSETS-23937
Browse files Browse the repository at this point in the history
  • Loading branch information
tmathern authored Jan 5, 2024
2 parents dbfb305 + 7711ac1 commit b5c0fc7
Show file tree
Hide file tree
Showing 11 changed files with 2,047 additions and 1,554 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/node.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
if: "!contains(github.event.head_commit.message, '[ci skip]')"
strategy:
matrix:
node-version: [12.16, 12.19, 14.16.1, 14.17]
node-version: [14.16.1, 14.17]

steps:
- uses: actions/checkout@v2
Expand All @@ -33,6 +33,11 @@ jobs:
run: npm install -g @adobe/[email protected]
- name: Log aio details
run: aio info
- name: Docker login
run: docker login -u $REGISTRY_ID -p $REGISTRY_SECRET adobeassetcompute.azurecr.io
env:
REGISTRY_ID: ${{ secrets.AZURE_CONTAINER_REGISTRY_ID }}
REGISTRY_SECRET: ${{ secrets.AZURE_CONTAINER_REGISTRY_SECRET }}
- name: Install dependencies (all)
run: npm install
- name: Run unit tests
Expand Down
49 changes: 33 additions & 16 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ const ShellScriptWorker = require('./shell/shellscript');
const actionWrapper = require('./webaction');
const { AssetComputeWorkerPipeline } = require('./worker-pipeline');

/**
* Checks if at least rendition needs the pipeline.
* If at least one rendition needs pipeline, all will go through pipeline
* @param {*} params Params
* @returns True is pipeline is needed, false otherwise
*/
function hasAtLeastOnePipelineRendtion(params){
return params.renditions.some(rendition => {
return rendition.pipeline === true;
});
}

/**
* Worker where the renditionCallback is called for each rendition.
Expand All @@ -28,22 +39,16 @@ const { AssetComputeWorkerPipeline } = require('./worker-pipeline');
* @param {*} options optional options
*/
function worker(renditionCallback, options = {}) {

if (typeof renditionCallback !== "function") {
throw new Error("renditionCallback must be a function");
}

return actionWrapper(async function (params) {

// if any rendition needs pipeline, all will go through pipeline
const usePipeline = params.renditions.some(rendition => {
return rendition.pipeline === true;
});

const usePipeline = hasAtLeastOnePipelineRendtion(params);
if(usePipeline){
// here the pipeline only wraps the worker callback
// and other transformers potentially already available in a transformer catalog
console.log("Using pipeline (`worker#AssetComputeWorkerPipeline # WorkerCallbackTransformer`)");
console.log("Using pipeline (`worker#AssetComputeWorkerPipeline #WorkerCallbackTransformer`)");
return new AssetComputeWorkerPipeline(renditionCallback, options).compute(params);
} else {
console.log("Using worker callback (`worker#AssetComputeWorker`)");
Expand All @@ -67,15 +72,11 @@ function batchWorker(renditionsCallback, options = {}) {
}

return actionWrapper(async function (params) {
// if any rendition needs pipeline, all will go through pipeline
const usePipeline = params.renditions.some(rendition => {
return rendition.pipeline === true;
});

const usePipeline = hasAtLeastOnePipelineRendtion(params);
if(usePipeline){
// here the pipeline only wraps the worker callback
// and other transformers potentially already available in a transformer catalog
console.log("Using pipeline (`AssetComputeWorkerPipeline # BatchWorkerCallbackTransformer`)");
console.log("Using pipeline (`AssetComputeWorkerPipeline #BatchWorkerCallbackTransformer`)");
options.isBatchWorker = true;
return new AssetComputeWorkerPipeline(renditionsCallback, options).compute(params);
} else {
Expand All @@ -88,11 +89,27 @@ function batchWorker(renditionsCallback, options = {}) {

function shellScriptWorker(script = "worker.sh", options = {}) {
ShellScriptWorker.validate(script);

options.script = script;

return actionWrapper(function (params) {
return new ShellScriptWorker(params, options).compute();
const usePipeline = hasAtLeastOnePipelineRendtion(params);
if(usePipeline){
console.log("Using pipeline (`AssetComputeWorkerPipeline #ShellscriptCallbackTransformer`)");
if(!options.supportsPipeline) {
throw new Error("This shellscript worker does not support running as part of pipelines");
}

options.isShellscriptWorker = true;
options.params = params;
if(process.env.ASSET_COMPUTE_SENSEI_CATALOG) {
options.transformerCatalog = process.env.ASSET_COMPUTE_SENSEI_CATALOG;
}

return new AssetComputeWorkerPipeline(options.script, options).compute(params);
} else {
console.log("Using shellscript worker callback (`ShellScriptWorker`)");
return new ShellScriptWorker(params, options).compute();
}
});
}

Expand Down
12 changes: 4 additions & 8 deletions lib/shell/shellscript.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
'use strict';

const AssetComputeWorker = require('../worker');


const errors = require('@adobe/asset-compute-commons');

const fs = require('fs-extra');
const path = require('path');
const { spawn } = require('child_process');

const stripAnsi = require('strip-ansi'); // escaping ansi content
const contentType = require('content-type');
const { Utils, Action } = require('@adobe/asset-compute-pipeline');
Expand Down Expand Up @@ -58,15 +54,15 @@ class ShellScriptWorker {
// use of spawn() with separate arguments prevents shell/command injection
await spawnProcess("/usr/bin/env", ["bash", "-x", this.script], { env });

// process metadata, if any
// process metadata, if any
if(await fs.pathExists(preparedFiles.typeFile)){ // use the path containing the file with mime information
console.log('Reading content type information from worker generated file');

let mimeInfoContent = await fs.readFile(preparedFiles.typeFile);
mimeInfoContent = mimeInfoContent.toString();
mimeInfoContent = mimeInfoContent.trim();

try{
try {
const contenttype = contentType.parse(mimeInfoContent);
rendition.setContentType(contenttype.type, contenttype.parameters.charset);
} catch(ex){
Expand Down Expand Up @@ -103,13 +99,13 @@ async function prepareMetadata(directory) {
// Folder structure has activationid as root, so concurrent executions should not collide
const typeFile = path.resolve(errDir, "type.txt");

// rendition options file for optiosn like postProcess
// rendition options file for options like postProcess
const optionsFile = path.resolve(directory, 'options.json');

return {
errorFile: errFile,
typeFile: typeFile,
optionsFile: optionsFile
optionsFile: optionsFile,
};
}

Expand Down
39 changes: 39 additions & 0 deletions lib/shellscript-worker-transformer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

'use strict';

const { Transformer } = require("@adobe/asset-compute-pipeline");
const ShellScriptWorker = require('./shell/shellscript');

class ShellscriptCallbackTransformer extends Transformer {
/**
* Construct a transformer for shellscript workers
* @param {*} callback
* @param {*} manifest worker manifest (only one, not a list)
*/
constructor(callback, manifest, params) {
super(`shellscript-workerCallback-${manifest.name}`, manifest);
this._params = params;
this._callback = callback;
}

async compute(input, output) {
const executionOptions = {
script: this._callback
};
const shellscriptWorker = new ShellScriptWorker(this._params, executionOptions);
return shellscriptWorker.processWithScript(input, output, executionOptions);
}
}

module.exports = ShellscriptCallbackTransformer;
23 changes: 12 additions & 11 deletions lib/worker-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const debug = console.log;
const { Engine, Plan } = require('@adobe/asset-compute-pipeline');
const WorkerCallbackTransformer = require('./worker-transformer');
const BatchWorkerCallbackTransformer = require('./batch-worker-transformer');
const ShellscriptCallbackTransformer = require('./shellscript-worker-transformer');
const { AssetComputeLogUtils } = require('@adobe/asset-compute-commons');

const fs = require('fs-extra');
Expand All @@ -39,7 +40,8 @@ class AssetComputeWorkerPipeline {

// set by compatibility layer
this.options.hasBatchModeWorker = options.isBatchWorker || false;

this.options.hasShellscriptWorker = options.isShellscriptWorker || false;

this.transformers = [];
this.buildTransformersFromManifests(renditionCallback);

Expand All @@ -48,9 +50,6 @@ class AssetComputeWorkerPipeline {
this.transformers = this.transformers.concat(options.transformerCatalog);
}
debug("Built worker transformer using rendition callback 'renditionCallback'");

// set by compatibility layer
this.options.hasBatchModeWorker = options.isBatchWorker || false;
}

/**
Expand All @@ -65,17 +64,20 @@ class AssetComputeWorkerPipeline {
return this.transformers;
}

//const transformer = new WorkerCallbackTransformer(renditionCallback, this.options.manifest);
let transformer;
if (this.options.hasBatchModeWorker === true) {
if (this.options.hasShellscriptWorker === true) {
debug("Creating a shellscript worker");
// renditionCallback is the script in this case
transformer = new ShellscriptCallbackTransformer(renditionCallback, this.options.manifest, this.options.params);
} else if (this.options.hasBatchModeWorker === true) {
debug("Creating a batch worker");
transformer = new BatchWorkerCallbackTransformer(renditionCallback, this.options.manifest);
} else {
debug("Creating a normal worker");
transformer = new WorkerCallbackTransformer(renditionCallback, this.options.manifest);
}
this.transformers.push(transformer);

debug(`Adding ${transformer.name} WorkerCallbackTransformer to the pipeline`);
}

Expand All @@ -92,7 +94,7 @@ class AssetComputeWorkerPipeline {
}

normalizeInputOuput(input, output) {
// TODO: we will have special cases for beta-worker-creative
// TODO: we will have special cases for beta-worker-creative
// special case for sensei
if(output.fmt === 'machine-metadata-json') {
output.type = 'machine-metadata-json';
Expand All @@ -102,7 +104,6 @@ class AssetComputeWorkerPipeline {
output.type = mimetype && mimetype.toLowerCase();
}


// if source.mimetype does not exist, or it does not match the extension,
// try to find mimetype by mapping the extension
// this can happen if the client (for example the devtool) does not define the content-type correctly
Expand Down Expand Up @@ -138,7 +139,7 @@ class AssetComputeWorkerPipeline {

let input = params.source;

// WORKER_TEST_MODE:
// WORKER_TEST_MODE:
// for test-worker framework, input and output are mounted at /in and /out
// random access reading and writing from that mount can be problematic on Docker for Mac at least,
// so we are copying all files over into the container
Expand Down Expand Up @@ -178,7 +179,7 @@ class AssetComputeWorkerPipeline {
AssetComputeLogUtils.log(output,'Output for refinePlan:');
// TODO we will need to add support for multiple renditions
// TODO we have to integrate options into our plan and support everything we support now

debug("Preparing plan for rendition creation...");
await engine.refinePlan(plan, input, output);
debug("Refined plan to create rendition");
Expand Down
3 changes: 1 addition & 2 deletions lib/worker-transformer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ const mime = require('mime-types');
class WorkerCallbackTransformer extends Transformer {

/**
*
* Constructor
* @param {*} callback
* @param {*} manifest worker manifest (only one, not a list)
*/
constructor(callback, manifest) {
super(`workerCallback-${manifest.name}`, manifest);

this._callback = callback;
}

Expand Down
23 changes: 2 additions & 21 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class AssetComputeWorker {
new Date(this.params.times.gateway) :
new Date(this.params.times.process);


this.metrics.add({
startWorkerDuration: Utils.durationSec(this.processingStartTime, this.workerStartTime),
gatewayToProcessDuration: Utils.durationSec(this.params.times.gateway, this.params.times.process),
Expand Down Expand Up @@ -149,7 +148,6 @@ class AssetComputeWorker {
const baseDirectory = "";
const usePipeline = false;
this.directories = await Prepare.createDirectories(folderName, baseDirectory, usePipeline);

this.renditions = Rendition.forEach(this.params.renditions, this.directories.out, usePipeline);

this.timers.download.start();
Expand Down Expand Up @@ -185,7 +183,6 @@ class AssetComputeWorker {
console.log(`no rendition found after worker() callback processing at: ${rendition.path}`);
throw new GenericError(`No rendition generated for ${rendition.id()}`, `${this.actionName}_process_norendition`);
}

} catch (err) {
this.timers.processingCallback.stop();
console.log(`worker() callback processing failed with error after ${this.timers.processingCallback} seconds: ${err.message || err}`);
Expand Down Expand Up @@ -570,28 +567,12 @@ class AssetComputeWorker {

getResult(err) {
// make sure to not return urls, customer data or credentials
// reduce to requestId only

const result = {
requestId: this.params.requestId,
metrics: this.activationMetrics
requestId: this.params.requestId
};

const source = this.params.source;
if (source) {
result.source = {
name: source.name,
mimetype: source.mimetype,
size: source.size
};
}

if (this.renditions) {
result.renditions = [];
for (const rendition of this.renditions) {
result.renditions.push(rendition.instructionsForEvent());
}
}

if (this.renditionErrors.length > 0) {
result.renditionErrors = this.renditionErrors;
}
Expand Down
Loading

0 comments on commit b5c0fc7

Please sign in to comment.